From ff860e55dbe49bbd3dc9a89d055047d0731ecc9f Mon Sep 17 00:00:00 2001 From: Bjorn Svensson Date: Tue, 11 Jan 2022 19:19:43 +0100 Subject: Correction for command timeout during pubsub (#1038) * Add test of command timeout during pubsub A timeout of a non-subscribe command will be ignored during pubsub. It will be handled as an idle timeout and a response is awaited for. * Correction for command timeout during pubsub Disconnect when a sent non-subscribe command triggers a timeout. --- async.c | 2 +- test.c | 95 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 93 insertions(+), 4 deletions(-) diff --git a/async.c b/async.c index e4a2309..3b5ec27 100644 --- a/async.c +++ b/async.c @@ -696,7 +696,7 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) { redisCallback cb; if ((c->flags & REDIS_CONNECTED)) { - if ( ac->replies.head == NULL) { + if (ac->replies.head == NULL && ac->sub.replies.head == NULL) { /* Nothing to do - just an idle timeout */ return; } diff --git a/test.c b/test.c index 6915ba2..e747439 100644 --- a/test.c +++ b/test.c @@ -1527,6 +1527,14 @@ void array_cb(redisAsyncContext *ac, void *r, void *privdata) { state->checkpoint++; } +/* Expect a NULL reply */ +void null_cb(redisAsyncContext *ac, void *r, void *privdata) { + (void) ac; + assert(r == NULL); + TestState *state = privdata; + state->checkpoint++; +} + static void test_pubsub_handling(struct config config) { test("Subscribe, handle published message and unsubscribe: "); /* Setup event dispatcher with a testcase timeout */ @@ -1618,7 +1626,85 @@ static void test_pubsub_handling_resp3(struct config config) { /* Verify test checkpoints */ assert(state.checkpoint == 5); } -#endif + +/* Subscribe callback for test_command_timeout_during_pubsub: + * - a subscribe response triggers a published message + * - the published message triggers a command that times out + * - the command timeout triggers a disconnect */ +void subscribe_with_timeout_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + + /* The non-clean disconnect should trigger the + * subscription callback with a NULL reply. */ + if (reply == NULL) { + state->checkpoint++; + event_base_loopbreak(base); + return; + } + + assert(reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) && + reply->elements == 3); + + if (strcmp(reply->element[0]->str,"subscribe") == 0) { + assert(strcmp(reply->element[1]->str,"mychannel") == 0 && + reply->element[2]->str == NULL); + publish_msg(state->options,"mychannel","Hello!"); + state->checkpoint++; + } else if (strcmp(reply->element[0]->str,"message") == 0) { + assert(strcmp(reply->element[1]->str,"mychannel") == 0 && + strcmp(reply->element[2]->str,"Hello!") == 0); + state->checkpoint++; + + /* Send a command that will trigger a timeout */ + redisAsyncCommand(ac,null_cb,state,"DEBUG SLEEP 3"); + redisAsyncCommand(ac,null_cb,state,"LPUSH mylist foo"); + } else { + printf("Unexpected pubsub command: %s\n", reply->element[0]->str); + exit(1); + } +} + +static void test_command_timeout_during_pubsub(struct config config) { + test("Command timeout during Pub/Sub: "); + /* Setup event dispatcher with a testcase timeout */ + base = event_base_new(); + struct event *timeout = evtimer_new(base,timeout_cb,NULL); + assert(timeout != NULL); + + evtimer_assign(timeout,base,timeout_cb,NULL); + struct timeval timeout_tv = {.tv_sec = 10}; + evtimer_add(timeout,&timeout_tv); + + /* Connect */ + redisOptions options = get_redis_tcp_options(config); + redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); + assert(ac != NULL && ac->err == 0); + redisLibeventAttach(ac,base); + + /* Configure a command timout */ + struct timeval command_timeout = {.tv_sec = 2}; + redisAsyncSetTimeout(ac,command_timeout); + + /* Not expecting any push messages in this test */ + redisAsyncSetPushCallback(ac,unexpected_push_cb); + + /* Switch protocol */ + redisAsyncCommand(ac,NULL,NULL,"HELLO 3"); + + /* Start subscribe */ + TestState state = {.options = &options, .resp3 = 1}; + redisAsyncCommand(ac,subscribe_with_timeout_cb,&state,"subscribe mychannel"); + + /* Start event dispatching loop */ + assert(event_base_dispatch(base) == 0); + event_free(timeout); + event_base_free(base); + + /* Verify test checkpoints */ + test_cond(state.checkpoint == 5); +} +#endif /* HIREDIS_TEST_ASYNC */ int main(int argc, char **argv) { struct config cfg = { @@ -1748,8 +1834,11 @@ int main(int argc, char **argv) { disconnect(c, 0); test_pubsub_handling(cfg); - if (major >= 6) test_pubsub_handling_resp3(cfg); -#endif + if (major >= 6) { + test_pubsub_handling_resp3(cfg); + test_command_timeout_during_pubsub(cfg); + } +#endif /* HIREDIS_TEST_ASYNC */ if (test_inherit_fd) { printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path); -- cgit v1.2.3