diff options
author | Bjorn Svensson <bjorn.a.svensson@est.tech> | 2021-12-16 21:38:15 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-16 12:38:15 -0800 |
commit | d3384260e7e1726747bd78474e4b0874a4b0a236 (patch) | |
tree | ac413c6734abad22e6244e9d91477e22e2275bf4 | |
parent | e3a479e4098dddbf0b075bbe81b0baca5fe0e3df (diff) |
Support PING while subscribing (RESP2) (#1027)
* Handle PING during pubsub in RESP2
* Rename invalid callback list
Some commands are valid to send during a subscribe in RESP2, and
most in RESP3. Renaming the callback list from `invalid` to `replies`
to detail this fact.
* Fix review comment
-rw-r--r-- | async.c | 25 | ||||
-rw-r--r-- | async.h | 2 | ||||
-rw-r--r-- | test.c | 14 |
3 files changed, 25 insertions, 16 deletions
@@ -144,8 +144,8 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { ac->replies.head = NULL; ac->replies.tail = NULL; - ac->sub.invalid.head = NULL; - ac->sub.invalid.tail = NULL; + ac->sub.replies.head = NULL; + ac->sub.replies.tail = NULL; ac->sub.channels = channels; ac->sub.patterns = patterns; @@ -312,9 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) { /* Execute pending callbacks with NULL reply. */ while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL); - - /* Execute callbacks for invalid commands */ - while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK) + while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL); /* Run subscription callbacks with NULL reply */ @@ -419,10 +417,11 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, char *stype; sds sname; - /* Custom reply functions are not supported for pub/sub. This will fail - * very hard when they are used... */ - if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) { - assert(reply->elements >= 2); + /* Match reply with the expected format of a pushed message. + * The type and number of elements (3 to 4) are specified at: + * https://redis.io/topics/pubsub#format-of-pushed-messages */ + if ((reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) || + reply->type == REDIS_REPLY_PUSH) { assert(reply->element[0]->type == REDIS_REPLY_STRING); stype = reply->element[0]->str; pvariant = (tolower(stype[0]) == 'p') ? 1 : 0; @@ -467,8 +466,8 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, } sdsfree(sname); } else { - /* Shift callback for invalid commands. */ - __redisShiftCallback(&ac->sub.invalid,dstcb); + /* Shift callback for pending command in subscribed context. */ + __redisShiftCallback(&ac->sub.replies,dstcb); } return REDIS_OK; oom: @@ -814,9 +813,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void goto oom; } else { if (c->flags & REDIS_SUBSCRIBED) { - /* This will likely result in an error reply, but it needs to be - * received and passed to the callback. */ - if (__redisPushCallback(&ac->sub.invalid,&cb) != REDIS_OK) + if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK) goto oom; } else { if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) @@ -102,7 +102,7 @@ typedef struct redisAsyncContext { /* Subscription callbacks */ struct { - redisCallbackList invalid; + redisCallbackList replies; struct dict *channels; struct dict *patterns; } sub; @@ -1518,6 +1518,15 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { } } +/* Expect a reply of type ARRAY */ +void array_cb(redisAsyncContext *ac, void *r, void *privdata) { + (void) ac; + redisReply *reply = r; + TestState *state = privdata; + assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY); + state->checkpoint++; +} + static void test_pubsub_handling(struct config config) { test("Subscribe, handle published message and unsubscribe: "); /* Setup event dispatcher with a testcase timeout */ @@ -1539,13 +1548,16 @@ static void test_pubsub_handling(struct config config) { TestState state = {.options = &options}; redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel"); + /* Make sure non-subscribe commands are handled */ + redisAsyncCommand(ac,array_cb,&state,"PING"); + /* Start event dispatching loop */ test_cond(event_base_dispatch(base) == 0); event_free(timeout); event_base_free(base); /* Verify test checkpoints */ - assert(state.checkpoint == 1); + assert(state.checkpoint == 2); } /* Unexpected push message, will trigger a failure */ |