summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBjorn Svensson <bjorn.a.svensson@est.tech>2021-12-16 21:38:15 +0100
committerGitHub <noreply@github.com>2021-12-16 12:38:15 -0800
commitd3384260e7e1726747bd78474e4b0874a4b0a236 (patch)
treeac413c6734abad22e6244e9d91477e22e2275bf4
parente3a479e4098dddbf0b075bbe81b0baca5fe0e3df (diff)
Support PING while subscribing (RESP2) (#1027)
* Handle PING during pubsub in RESP2 * Rename invalid callback list Some commands are valid to send during a subscribe in RESP2, and most in RESP3. Renaming the callback list from `invalid` to `replies` to detail this fact. * Fix review comment
-rw-r--r--async.c25
-rw-r--r--async.h2
-rw-r--r--test.c14
3 files changed, 25 insertions, 16 deletions
diff --git a/async.c b/async.c
index 4459c19..389ba2b 100644
--- a/async.c
+++ b/async.c
@@ -144,8 +144,8 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
ac->replies.head = NULL;
ac->replies.tail = NULL;
- ac->sub.invalid.head = NULL;
- ac->sub.invalid.tail = NULL;
+ ac->sub.replies.head = NULL;
+ ac->sub.replies.tail = NULL;
ac->sub.channels = channels;
ac->sub.patterns = patterns;
@@ -312,9 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
/* Execute pending callbacks with NULL reply. */
while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
__redisRunCallback(ac,&cb,NULL);
-
- /* Execute callbacks for invalid commands */
- while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
+ while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK)
__redisRunCallback(ac,&cb,NULL);
/* Run subscription callbacks with NULL reply */
@@ -419,10 +417,11 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
char *stype;
sds sname;
- /* Custom reply functions are not supported for pub/sub. This will fail
- * very hard when they are used... */
- if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) {
- assert(reply->elements >= 2);
+ /* Match reply with the expected format of a pushed message.
+ * The type and number of elements (3 to 4) are specified at:
+ * https://redis.io/topics/pubsub#format-of-pushed-messages */
+ if ((reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) ||
+ reply->type == REDIS_REPLY_PUSH) {
assert(reply->element[0]->type == REDIS_REPLY_STRING);
stype = reply->element[0]->str;
pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
@@ -467,8 +466,8 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
}
sdsfree(sname);
} else {
- /* Shift callback for invalid commands. */
- __redisShiftCallback(&ac->sub.invalid,dstcb);
+ /* Shift callback for pending command in subscribed context. */
+ __redisShiftCallback(&ac->sub.replies,dstcb);
}
return REDIS_OK;
oom:
@@ -814,9 +813,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
goto oom;
} else {
if (c->flags & REDIS_SUBSCRIBED) {
- /* This will likely result in an error reply, but it needs to be
- * received and passed to the callback. */
- if (__redisPushCallback(&ac->sub.invalid,&cb) != REDIS_OK)
+ if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK)
goto oom;
} else {
if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK)
diff --git a/async.h b/async.h
index b1d2cb2..4c65203 100644
--- a/async.h
+++ b/async.h
@@ -102,7 +102,7 @@ typedef struct redisAsyncContext {
/* Subscription callbacks */
struct {
- redisCallbackList invalid;
+ redisCallbackList replies;
struct dict *channels;
struct dict *patterns;
} sub;
diff --git a/test.c b/test.c
index 7af9bee..ec1c419 100644
--- a/test.c
+++ b/test.c
@@ -1518,6 +1518,15 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
}
}
+/* Expect a reply of type ARRAY */
+void array_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ (void) ac;
+ redisReply *reply = r;
+ TestState *state = privdata;
+ assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY);
+ state->checkpoint++;
+}
+
static void test_pubsub_handling(struct config config) {
test("Subscribe, handle published message and unsubscribe: ");
/* Setup event dispatcher with a testcase timeout */
@@ -1539,13 +1548,16 @@ static void test_pubsub_handling(struct config config) {
TestState state = {.options = &options};
redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");
+ /* Make sure non-subscribe commands are handled */
+ redisAsyncCommand(ac,array_cb,&state,"PING");
+
/* Start event dispatching loop */
test_cond(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);
/* Verify test checkpoints */
- assert(state.checkpoint == 1);
+ assert(state.checkpoint == 2);
}
/* Unexpected push message, will trigger a failure */