summaryrefslogtreecommitdiff
path: root/async.c
diff options
context:
space:
mode:
authorBjorn Svensson <bjorn.a.svensson@est.tech>2021-12-16 21:38:15 +0100
committerGitHub <noreply@github.com>2021-12-16 12:38:15 -0800
commitd3384260e7e1726747bd78474e4b0874a4b0a236 (patch)
treeac413c6734abad22e6244e9d91477e22e2275bf4 /async.c
parente3a479e4098dddbf0b075bbe81b0baca5fe0e3df (diff)
Support PING while subscribing (RESP2) (#1027)
* Handle PING during pubsub in RESP2 * Rename invalid callback list Some commands are valid to send during a subscribe in RESP2, and most in RESP3. Renaming the callback list from `invalid` to `replies` to detail this fact. * Fix review comment
Diffstat (limited to 'async.c')
-rw-r--r--async.c25
1 files changed, 11 insertions, 14 deletions
diff --git a/async.c b/async.c
index 4459c19..389ba2b 100644
--- a/async.c
+++ b/async.c
@@ -144,8 +144,8 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
ac->replies.head = NULL;
ac->replies.tail = NULL;
- ac->sub.invalid.head = NULL;
- ac->sub.invalid.tail = NULL;
+ ac->sub.replies.head = NULL;
+ ac->sub.replies.tail = NULL;
ac->sub.channels = channels;
ac->sub.patterns = patterns;
@@ -312,9 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
/* Execute pending callbacks with NULL reply. */
while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
__redisRunCallback(ac,&cb,NULL);
-
- /* Execute callbacks for invalid commands */
- while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
+ while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK)
__redisRunCallback(ac,&cb,NULL);
/* Run subscription callbacks with NULL reply */
@@ -419,10 +417,11 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
char *stype;
sds sname;
- /* Custom reply functions are not supported for pub/sub. This will fail
- * very hard when they are used... */
- if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) {
- assert(reply->elements >= 2);
+ /* Match reply with the expected format of a pushed message.
+ * The type and number of elements (3 to 4) are specified at:
+ * https://redis.io/topics/pubsub#format-of-pushed-messages */
+ if ((reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) ||
+ reply->type == REDIS_REPLY_PUSH) {
assert(reply->element[0]->type == REDIS_REPLY_STRING);
stype = reply->element[0]->str;
pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
@@ -467,8 +466,8 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
}
sdsfree(sname);
} else {
- /* Shift callback for invalid commands. */
- __redisShiftCallback(&ac->sub.invalid,dstcb);
+ /* Shift callback for pending command in subscribed context. */
+ __redisShiftCallback(&ac->sub.replies,dstcb);
}
return REDIS_OK;
oom:
@@ -814,9 +813,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
goto oom;
} else {
if (c->flags & REDIS_SUBSCRIBED) {
- /* This will likely result in an error reply, but it needs to be
- * received and passed to the callback. */
- if (__redisPushCallback(&ac->sub.invalid,&cb) != REDIS_OK)
+ if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK)
goto oom;
} else {
if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK)