From d3384260e7e1726747bd78474e4b0874a4b0a236 Mon Sep 17 00:00:00 2001 From: Bjorn Svensson Date: Thu, 16 Dec 2021 21:38:15 +0100 Subject: Support PING while subscribing (RESP2) (#1027) * Handle PING during pubsub in RESP2 * Rename invalid callback list Some commands are valid to send during a subscribe in RESP2, and most in RESP3. Renaming the callback list from `invalid` to `replies` to detail this fact. * Fix review comment --- async.c | 25 +++++++++++-------------- async.h | 2 +- test.c | 14 +++++++++++++- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/async.c b/async.c index 4459c19..389ba2b 100644 --- a/async.c +++ b/async.c @@ -144,8 +144,8 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { ac->replies.head = NULL; ac->replies.tail = NULL; - ac->sub.invalid.head = NULL; - ac->sub.invalid.tail = NULL; + ac->sub.replies.head = NULL; + ac->sub.replies.tail = NULL; ac->sub.channels = channels; ac->sub.patterns = patterns; @@ -312,9 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) { /* Execute pending callbacks with NULL reply. */ while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL); - - /* Execute callbacks for invalid commands */ - while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK) + while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL); /* Run subscription callbacks with NULL reply */ @@ -419,10 +417,11 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, char *stype; sds sname; - /* Custom reply functions are not supported for pub/sub. This will fail - * very hard when they are used... */ - if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) { - assert(reply->elements >= 2); + /* Match reply with the expected format of a pushed message. + * The type and number of elements (3 to 4) are specified at: + * https://redis.io/topics/pubsub#format-of-pushed-messages */ + if ((reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) || + reply->type == REDIS_REPLY_PUSH) { assert(reply->element[0]->type == REDIS_REPLY_STRING); stype = reply->element[0]->str; pvariant = (tolower(stype[0]) == 'p') ? 1 : 0; @@ -467,8 +466,8 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, } sdsfree(sname); } else { - /* Shift callback for invalid commands. */ - __redisShiftCallback(&ac->sub.invalid,dstcb); + /* Shift callback for pending command in subscribed context. */ + __redisShiftCallback(&ac->sub.replies,dstcb); } return REDIS_OK; oom: @@ -814,9 +813,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void goto oom; } else { if (c->flags & REDIS_SUBSCRIBED) { - /* This will likely result in an error reply, but it needs to be - * received and passed to the callback. */ - if (__redisPushCallback(&ac->sub.invalid,&cb) != REDIS_OK) + if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK) goto oom; } else { if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) diff --git a/async.h b/async.h index b1d2cb2..4c65203 100644 --- a/async.h +++ b/async.h @@ -102,7 +102,7 @@ typedef struct redisAsyncContext { /* Subscription callbacks */ struct { - redisCallbackList invalid; + redisCallbackList replies; struct dict *channels; struct dict *patterns; } sub; diff --git a/test.c b/test.c index 7af9bee..ec1c419 100644 --- a/test.c +++ b/test.c @@ -1518,6 +1518,15 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { } } +/* Expect a reply of type ARRAY */ +void array_cb(redisAsyncContext *ac, void *r, void *privdata) { + (void) ac; + redisReply *reply = r; + TestState *state = privdata; + assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY); + state->checkpoint++; +} + static void test_pubsub_handling(struct config config) { test("Subscribe, handle published message and unsubscribe: "); /* Setup event dispatcher with a testcase timeout */ @@ -1539,13 +1548,16 @@ static void test_pubsub_handling(struct config config) { TestState state = {.options = &options}; redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel"); + /* Make sure non-subscribe commands are handled */ + redisAsyncCommand(ac,array_cb,&state,"PING"); + /* Start event dispatching loop */ test_cond(event_base_dispatch(base) == 0); event_free(timeout); event_base_free(base); /* Verify test checkpoints */ - assert(state.checkpoint == 1); + assert(state.checkpoint == 2); } /* Unexpected push message, will trigger a failure */ -- cgit v1.2.3