summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBjorn Svensson <bjorn.a.svensson@est.tech>2022-01-18 19:09:38 +0100
committerGitHub <noreply@github.com>2022-01-18 10:09:38 -0800
commitf2ce5980e6bc9739b85a5ac882980956c80907ba (patch)
tree588820b9a0c288d9021358f0938c11668e0453fa
parentff860e55dbe49bbd3dc9a89d055047d0731ecc9f (diff)
Allow sending commands after sending an unsubscribe (#1036)
* Add test of async commands after unsubscribe Verify that commands are handled after unsubscribing from a channel. A command is sent before the `unsubscribe` response is received, which currently triggers an assert in async.c:567: `redisProcessCallbacks: Assertion `(c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)' failed.` * Handle async commands after an unsubscribe When unsubscribing from the last channel we move from the `subscribe` state to a normal state. These states uses different holders for the command callback information. By moving the callback info during the state change the callback order can be maintained.
-rw-r--r--async.c9
-rw-r--r--test.c41
2 files changed, 32 insertions, 18 deletions
diff --git a/async.c b/async.c
index 3b5ec27..8614b20 100644
--- a/async.c
+++ b/async.c
@@ -460,8 +460,15 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
/* Unset subscribed flag only when no pipelined pending subscribe. */
if (reply->element[2]->integer == 0
&& dictSize(ac->sub.channels) == 0
- && dictSize(ac->sub.patterns) == 0)
+ && dictSize(ac->sub.patterns) == 0) {
c->flags &= ~REDIS_SUBSCRIBED;
+
+ /* Move ongoing regular command callbacks. */
+ redisCallback cb;
+ while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
+ __redisPushCallback(&ac->replies,&cb);
+ }
+ }
}
}
sdsfree(sname);
diff --git a/test.c b/test.c
index e747439..4ef47f6 100644
--- a/test.c
+++ b/test.c
@@ -1454,8 +1454,15 @@ typedef struct TestState {
redisOptions *options;
int checkpoint;
int resp3;
+ int disconnect;
} TestState;
+/* Helper to disconnect and stop event loop */
+void async_disconnect(redisAsyncContext *ac) {
+ redisAsyncDisconnect(ac);
+ event_base_loopbreak(base);
+}
+
/* Testcase timeout, will trigger a failure */
void timeout_cb(int fd, short event, void *arg) {
(void) fd; (void) event; (void) arg;
@@ -1480,9 +1487,18 @@ void publish_msg(redisOptions *options, const char* channel, const char* msg) {
disconnect(c, 0);
}
+/* Expect a reply of type INTEGER */
+void integer_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ redisReply *reply = r;
+ TestState *state = privdata;
+ assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER);
+ state->checkpoint++;
+ if (state->disconnect) async_disconnect(ac);
+}
+
/* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3:
* - a published message triggers an unsubscribe
- * - an unsubscribe response triggers a disconnect */
+ * - a command is sent before the unsubscribe response is received. */
void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisReply *reply = r;
TestState *state = privdata;
@@ -1505,13 +1521,13 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
redisAsyncCommand(ac,unexpected_cb,
(void*)"unsubscribe should call subscribe_cb()",
"unsubscribe");
+ /* Send a regular command after unsubscribing, then disconnect */
+ state->disconnect = 1;
+ redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo");
+
} else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
reply->element[2]->str == NULL);
-
- /* Disconnect after unsubscribe */
- redisAsyncDisconnect(ac);
- event_base_loopbreak(base);
} else {
printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
exit(1);
@@ -1520,11 +1536,11 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
/* Expect a reply of type ARRAY */
void array_cb(redisAsyncContext *ac, void *r, void *privdata) {
- (void) ac;
redisReply *reply = r;
TestState *state = privdata;
assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY);
state->checkpoint++;
+ if (state->disconnect) async_disconnect(ac);
}
/* Expect a NULL reply */
@@ -1565,7 +1581,7 @@ static void test_pubsub_handling(struct config config) {
event_base_free(base);
/* Verify test checkpoints */
- assert(state.checkpoint == 2);
+ assert(state.checkpoint == 3);
}
/* Unexpected push message, will trigger a failure */
@@ -1575,15 +1591,6 @@ void unexpected_push_cb(redisAsyncContext *ac, void *r) {
exit(1);
}
-/* Expect a reply of type INTEGER */
-void integer_cb(redisAsyncContext *ac, void *r, void *privdata) {
- (void) ac;
- redisReply *reply = r;
- TestState *state = privdata;
- assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER);
- state->checkpoint++;
-}
-
static void test_pubsub_handling_resp3(struct config config) {
test("Subscribe, handle published message and unsubscribe using RESP3: ");
/* Setup event dispatcher with a testcase timeout */
@@ -1624,7 +1631,7 @@ static void test_pubsub_handling_resp3(struct config config) {
event_base_free(base);
/* Verify test checkpoints */
- assert(state.checkpoint == 5);
+ assert(state.checkpoint == 6);
}
/* Subscribe callback for test_command_timeout_during_pubsub: