diff options
author | Bjorn Svensson <bjorn.a.svensson@est.tech> | 2021-12-16 21:38:15 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-12-16 12:38:15 -0800 |
commit | d3384260e7e1726747bd78474e4b0874a4b0a236 (patch) | |
tree | ac413c6734abad22e6244e9d91477e22e2275bf4 /async.c | |
parent | e3a479e4098dddbf0b075bbe81b0baca5fe0e3df (diff) |
Support PING while subscribing (RESP2) (#1027)
* Handle PING during pubsub in RESP2
* Rename invalid callback list
Some commands are valid to send during a subscribe in RESP2, and
most in RESP3. Renaming the callback list from `invalid` to `replies`
to detail this fact.
* Fix review comment
Diffstat (limited to 'async.c')
-rw-r--r-- | async.c | 25 |
1 files changed, 11 insertions, 14 deletions
@@ -144,8 +144,8 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { ac->replies.head = NULL; ac->replies.tail = NULL; - ac->sub.invalid.head = NULL; - ac->sub.invalid.tail = NULL; + ac->sub.replies.head = NULL; + ac->sub.replies.tail = NULL; ac->sub.channels = channels; ac->sub.patterns = patterns; @@ -312,9 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) { /* Execute pending callbacks with NULL reply. */ while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL); - - /* Execute callbacks for invalid commands */ - while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK) + while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL); /* Run subscription callbacks with NULL reply */ @@ -419,10 +417,11 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, char *stype; sds sname; - /* Custom reply functions are not supported for pub/sub. This will fail - * very hard when they are used... */ - if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) { - assert(reply->elements >= 2); + /* Match reply with the expected format of a pushed message. + * The type and number of elements (3 to 4) are specified at: + * https://redis.io/topics/pubsub#format-of-pushed-messages */ + if ((reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) || + reply->type == REDIS_REPLY_PUSH) { assert(reply->element[0]->type == REDIS_REPLY_STRING); stype = reply->element[0]->str; pvariant = (tolower(stype[0]) == 'p') ? 1 : 0; @@ -467,8 +466,8 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, } sdsfree(sname); } else { - /* Shift callback for invalid commands. */ - __redisShiftCallback(&ac->sub.invalid,dstcb); + /* Shift callback for pending command in subscribed context. */ + __redisShiftCallback(&ac->sub.replies,dstcb); } return REDIS_OK; oom: @@ -814,9 +813,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void goto oom; } else { if (c->flags & REDIS_SUBSCRIBED) { - /* This will likely result in an error reply, but it needs to be - * received and passed to the callback. */ - if (__redisPushCallback(&ac->sub.invalid,&cb) != REDIS_OK) + if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK) goto oom; } else { if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) |