diff options
| author | Bjorn Svensson <bjorn.a.svensson@est.tech> | 2022-01-11 19:19:43 +0100 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2022-01-11 10:19:43 -0800 | 
| commit | ff860e55dbe49bbd3dc9a89d055047d0731ecc9f (patch) | |
| tree | b957661023eee22b6f12e975bb0158a0f7303fd4 | |
| parent | 24d5344934b866590646591ef0d696cb705a055a (diff) | |
| download | hiredict-ff860e55dbe49bbd3dc9a89d055047d0731ecc9f.tar.xz | |
Correction for command timeout during pubsub (#1038)
* Add test of command timeout during pubsub
A timeout of a non-subscribe command will be ignored during pubsub.
It will be handled as an idle timeout and a response is awaited for.
* Correction for command timeout during pubsub
Disconnect when a sent non-subscribe command triggers a timeout.
| -rw-r--r-- | async.c | 2 | ||||
| -rw-r--r-- | test.c | 95 | 
2 files changed, 93 insertions, 4 deletions
| @@ -696,7 +696,7 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) {      redisCallback cb;      if ((c->flags & REDIS_CONNECTED)) { -        if ( ac->replies.head == NULL) { +        if (ac->replies.head == NULL && ac->sub.replies.head == NULL) {              /* Nothing to do - just an idle timeout */              return;          } @@ -1527,6 +1527,14 @@ void array_cb(redisAsyncContext *ac, void *r, void *privdata) {      state->checkpoint++;  } +/* Expect a NULL reply */ +void null_cb(redisAsyncContext *ac, void *r, void *privdata) { +    (void) ac; +    assert(r == NULL); +    TestState *state = privdata; +    state->checkpoint++; +} +  static void test_pubsub_handling(struct config config) {      test("Subscribe, handle published message and unsubscribe: ");      /* Setup event dispatcher with a testcase timeout */ @@ -1618,7 +1626,85 @@ static void test_pubsub_handling_resp3(struct config config) {      /* Verify test checkpoints */      assert(state.checkpoint == 5);  } -#endif + +/* Subscribe callback for test_command_timeout_during_pubsub: + * - a subscribe response triggers a published message + * - the published message triggers a command that times out + * - the command timeout triggers a disconnect */ +void subscribe_with_timeout_cb(redisAsyncContext *ac, void *r, void *privdata) { +    redisReply *reply = r; +    TestState *state = privdata; + +    /* The non-clean disconnect should trigger the +     * subscription callback with a NULL reply. */ +    if (reply == NULL) { +        state->checkpoint++; +        event_base_loopbreak(base); +        return; +    } + +    assert(reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) && +           reply->elements == 3); + +    if (strcmp(reply->element[0]->str,"subscribe") == 0) { +        assert(strcmp(reply->element[1]->str,"mychannel") == 0 && +               reply->element[2]->str == NULL); +        publish_msg(state->options,"mychannel","Hello!"); +        state->checkpoint++; +    } else if (strcmp(reply->element[0]->str,"message") == 0) { +        assert(strcmp(reply->element[1]->str,"mychannel") == 0 && +               strcmp(reply->element[2]->str,"Hello!") == 0); +        state->checkpoint++; + +        /* Send a command that will trigger a timeout */ +        redisAsyncCommand(ac,null_cb,state,"DEBUG SLEEP 3"); +        redisAsyncCommand(ac,null_cb,state,"LPUSH mylist foo"); +    } else { +        printf("Unexpected pubsub command: %s\n", reply->element[0]->str); +        exit(1); +    } +} + +static void test_command_timeout_during_pubsub(struct config config) { +    test("Command timeout during Pub/Sub: "); +    /* Setup event dispatcher with a testcase timeout */ +    base = event_base_new(); +    struct event *timeout = evtimer_new(base,timeout_cb,NULL); +    assert(timeout != NULL); + +    evtimer_assign(timeout,base,timeout_cb,NULL); +    struct timeval timeout_tv = {.tv_sec = 10}; +    evtimer_add(timeout,&timeout_tv); + +    /* Connect */ +    redisOptions options = get_redis_tcp_options(config); +    redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); +    assert(ac != NULL && ac->err == 0); +    redisLibeventAttach(ac,base); + +    /* Configure a command timout */ +    struct timeval command_timeout = {.tv_sec = 2}; +    redisAsyncSetTimeout(ac,command_timeout); + +    /* Not expecting any push messages in this test */ +    redisAsyncSetPushCallback(ac,unexpected_push_cb); + +    /* Switch protocol */ +    redisAsyncCommand(ac,NULL,NULL,"HELLO 3"); + +    /* Start subscribe */ +    TestState state = {.options = &options, .resp3 = 1}; +    redisAsyncCommand(ac,subscribe_with_timeout_cb,&state,"subscribe mychannel"); + +    /* Start event dispatching loop */ +    assert(event_base_dispatch(base) == 0); +    event_free(timeout); +    event_base_free(base); + +    /* Verify test checkpoints */ +    test_cond(state.checkpoint == 5); +} +#endif /* HIREDIS_TEST_ASYNC */  int main(int argc, char **argv) {      struct config cfg = { @@ -1748,8 +1834,11 @@ int main(int argc, char **argv) {      disconnect(c, 0);      test_pubsub_handling(cfg); -    if (major >= 6) test_pubsub_handling_resp3(cfg); -#endif +    if (major >= 6) { +        test_pubsub_handling_resp3(cfg); +        test_command_timeout_during_pubsub(cfg); +    } +#endif /* HIREDIS_TEST_ASYNC */      if (test_inherit_fd) {          printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path); | 
