From f2ce5980e6bc9739b85a5ac882980956c80907ba Mon Sep 17 00:00:00 2001 From: Bjorn Svensson Date: Tue, 18 Jan 2022 19:09:38 +0100 Subject: Allow sending commands after sending an unsubscribe (#1036) * Add test of async commands after unsubscribe Verify that commands are handled after unsubscribing from a channel. A command is sent before the `unsubscribe` response is received, which currently triggers an assert in async.c:567: `redisProcessCallbacks: Assertion `(c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)' failed.` * Handle async commands after an unsubscribe When unsubscribing from the last channel we move from the `subscribe` state to a normal state. These states uses different holders for the command callback information. By moving the callback info during the state change the callback order can be maintained. --- async.c | 9 ++++++++- test.c | 41 ++++++++++++++++++++++++----------------- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/async.c b/async.c index 3b5ec27..8614b20 100644 --- a/async.c +++ b/async.c @@ -460,8 +460,15 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, /* Unset subscribed flag only when no pipelined pending subscribe. */ if (reply->element[2]->integer == 0 && dictSize(ac->sub.channels) == 0 - && dictSize(ac->sub.patterns) == 0) + && dictSize(ac->sub.patterns) == 0) { c->flags &= ~REDIS_SUBSCRIBED; + + /* Move ongoing regular command callbacks. */ + redisCallback cb; + while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) { + __redisPushCallback(&ac->replies,&cb); + } + } } } sdsfree(sname); diff --git a/test.c b/test.c index e747439..4ef47f6 100644 --- a/test.c +++ b/test.c @@ -1454,8 +1454,15 @@ typedef struct TestState { redisOptions *options; int checkpoint; int resp3; + int disconnect; } TestState; +/* Helper to disconnect and stop event loop */ +void async_disconnect(redisAsyncContext *ac) { + redisAsyncDisconnect(ac); + event_base_loopbreak(base); +} + /* Testcase timeout, will trigger a failure */ void timeout_cb(int fd, short event, void *arg) { (void) fd; (void) event; (void) arg; @@ -1480,9 +1487,18 @@ void publish_msg(redisOptions *options, const char* channel, const char* msg) { disconnect(c, 0); } +/* Expect a reply of type INTEGER */ +void integer_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER); + state->checkpoint++; + if (state->disconnect) async_disconnect(ac); +} + /* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3: * - a published message triggers an unsubscribe - * - an unsubscribe response triggers a disconnect */ + * - a command is sent before the unsubscribe response is received. */ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { redisReply *reply = r; TestState *state = privdata; @@ -1505,13 +1521,13 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { redisAsyncCommand(ac,unexpected_cb, (void*)"unsubscribe should call subscribe_cb()", "unsubscribe"); + /* Send a regular command after unsubscribing, then disconnect */ + state->disconnect = 1; + redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo"); + } else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) { assert(strcmp(reply->element[1]->str,"mychannel") == 0 && reply->element[2]->str == NULL); - - /* Disconnect after unsubscribe */ - redisAsyncDisconnect(ac); - event_base_loopbreak(base); } else { printf("Unexpected pubsub command: %s\n", reply->element[0]->str); exit(1); @@ -1520,11 +1536,11 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { /* Expect a reply of type ARRAY */ void array_cb(redisAsyncContext *ac, void *r, void *privdata) { - (void) ac; redisReply *reply = r; TestState *state = privdata; assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY); state->checkpoint++; + if (state->disconnect) async_disconnect(ac); } /* Expect a NULL reply */ @@ -1565,7 +1581,7 @@ static void test_pubsub_handling(struct config config) { event_base_free(base); /* Verify test checkpoints */ - assert(state.checkpoint == 2); + assert(state.checkpoint == 3); } /* Unexpected push message, will trigger a failure */ @@ -1575,15 +1591,6 @@ void unexpected_push_cb(redisAsyncContext *ac, void *r) { exit(1); } -/* Expect a reply of type INTEGER */ -void integer_cb(redisAsyncContext *ac, void *r, void *privdata) { - (void) ac; - redisReply *reply = r; - TestState *state = privdata; - assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER); - state->checkpoint++; -} - static void test_pubsub_handling_resp3(struct config config) { test("Subscribe, handle published message and unsubscribe using RESP3: "); /* Setup event dispatcher with a testcase timeout */ @@ -1624,7 +1631,7 @@ static void test_pubsub_handling_resp3(struct config config) { event_base_free(base); /* Verify test checkpoints */ - assert(state.checkpoint == 5); + assert(state.checkpoint == 6); } /* Subscribe callback for test_command_timeout_during_pubsub: -- cgit v1.2.3