diff options
-rw-r--r-- | async.c | 9 | ||||
-rw-r--r-- | test.c | 41 |
2 files changed, 32 insertions, 18 deletions
@@ -460,8 +460,15 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, /* Unset subscribed flag only when no pipelined pending subscribe. */ if (reply->element[2]->integer == 0 && dictSize(ac->sub.channels) == 0 - && dictSize(ac->sub.patterns) == 0) + && dictSize(ac->sub.patterns) == 0) { c->flags &= ~REDIS_SUBSCRIBED; + + /* Move ongoing regular command callbacks. */ + redisCallback cb; + while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) { + __redisPushCallback(&ac->replies,&cb); + } + } } } sdsfree(sname); @@ -1454,8 +1454,15 @@ typedef struct TestState { redisOptions *options; int checkpoint; int resp3; + int disconnect; } TestState; +/* Helper to disconnect and stop event loop */ +void async_disconnect(redisAsyncContext *ac) { + redisAsyncDisconnect(ac); + event_base_loopbreak(base); +} + /* Testcase timeout, will trigger a failure */ void timeout_cb(int fd, short event, void *arg) { (void) fd; (void) event; (void) arg; @@ -1480,9 +1487,18 @@ void publish_msg(redisOptions *options, const char* channel, const char* msg) { disconnect(c, 0); } +/* Expect a reply of type INTEGER */ +void integer_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER); + state->checkpoint++; + if (state->disconnect) async_disconnect(ac); +} + /* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3: * - a published message triggers an unsubscribe - * - an unsubscribe response triggers a disconnect */ + * - a command is sent before the unsubscribe response is received. */ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { redisReply *reply = r; TestState *state = privdata; @@ -1505,13 +1521,13 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { redisAsyncCommand(ac,unexpected_cb, (void*)"unsubscribe should call subscribe_cb()", "unsubscribe"); + /* Send a regular command after unsubscribing, then disconnect */ + state->disconnect = 1; + redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo"); + } else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) { assert(strcmp(reply->element[1]->str,"mychannel") == 0 && reply->element[2]->str == NULL); - - /* Disconnect after unsubscribe */ - redisAsyncDisconnect(ac); - event_base_loopbreak(base); } else { printf("Unexpected pubsub command: %s\n", reply->element[0]->str); exit(1); @@ -1520,11 +1536,11 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { /* Expect a reply of type ARRAY */ void array_cb(redisAsyncContext *ac, void *r, void *privdata) { - (void) ac; redisReply *reply = r; TestState *state = privdata; assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY); state->checkpoint++; + if (state->disconnect) async_disconnect(ac); } /* Expect a NULL reply */ @@ -1565,7 +1581,7 @@ static void test_pubsub_handling(struct config config) { event_base_free(base); /* Verify test checkpoints */ - assert(state.checkpoint == 2); + assert(state.checkpoint == 3); } /* Unexpected push message, will trigger a failure */ @@ -1575,15 +1591,6 @@ void unexpected_push_cb(redisAsyncContext *ac, void *r) { exit(1); } -/* Expect a reply of type INTEGER */ -void integer_cb(redisAsyncContext *ac, void *r, void *privdata) { - (void) ac; - redisReply *reply = r; - TestState *state = privdata; - assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER); - state->checkpoint++; -} - static void test_pubsub_handling_resp3(struct config config) { test("Subscribe, handle published message and unsubscribe using RESP3: "); /* Setup event dispatcher with a testcase timeout */ @@ -1624,7 +1631,7 @@ static void test_pubsub_handling_resp3(struct config config) { event_base_free(base); /* Verify test checkpoints */ - assert(state.checkpoint == 5); + assert(state.checkpoint == 6); } /* Subscribe callback for test_command_timeout_during_pubsub: |