summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--async.c5
-rw-r--r--hiredis.h3
-rw-r--r--test.c18
3 files changed, 24 insertions, 2 deletions
diff --git a/async.c b/async.c
index 389ba2b..e855bf7 100644
--- a/async.c
+++ b/async.c
@@ -420,7 +420,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
/* Match reply with the expected format of a pushed message.
* The type and number of elements (3 to 4) are specified at:
* https://redis.io/topics/pubsub#format-of-pushed-messages */
- if ((reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) ||
+ if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH) && reply->elements >= 3) ||
reply->type == REDIS_REPLY_PUSH) {
assert(reply->element[0]->type == REDIS_REPLY_STRING);
stype = reply->element[0]->str;
@@ -525,6 +525,9 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
break;
}
+ /* Keep track of push message support for subscribe handling */
+ if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH;
+
/* Send any non-subscribe related PUSH messages to our PUSH handler
* while allowing subscribe related PUSH messages to pass through.
* This allows existing code to be backward compatible and work in
diff --git a/hiredis.h b/hiredis.h
index a093abe..7dd44ee 100644
--- a/hiredis.h
+++ b/hiredis.h
@@ -80,6 +80,9 @@ typedef long long ssize_t;
/* Flag that is set when we should set SO_REUSEADDR before calling bind() */
#define REDIS_REUSEADDR 0x80
+/* Flag that is set when the async connection supports push replies. */
+#define REDIS_SUPPORTS_PUSH 0x100
+
/**
* Flag that indicates the user does not want the context to
* be automatically freed upon error
diff --git a/test.c b/test.c
index ec1c419..6915ba2 100644
--- a/test.c
+++ b/test.c
@@ -1567,6 +1567,15 @@ void unexpected_push_cb(redisAsyncContext *ac, void *r) {
exit(1);
}
+/* Expect a reply of type INTEGER */
+void integer_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ (void) ac;
+ redisReply *reply = r;
+ TestState *state = privdata;
+ assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER);
+ state->checkpoint++;
+}
+
static void test_pubsub_handling_resp3(struct config config) {
test("Subscribe, handle published message and unsubscribe using RESP3: ");
/* Setup event dispatcher with a testcase timeout */
@@ -1594,13 +1603,20 @@ static void test_pubsub_handling_resp3(struct config config) {
TestState state = {.options = &options, .resp3 = 1};
redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");
+ /* Make sure non-subscribe commands are handled in RESP3 */
+ redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
+ redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
+ redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
+ /* Handle an array with 3 elements as a non-subscribe command */
+ redisAsyncCommand(ac,array_cb,&state,"LRANGE mylist 0 2");
+
/* Start event dispatching loop */
test_cond(event_base_dispatch(base) == 0);
event_free(timeout);
event_base_free(base);
/* Verify test checkpoints */
- assert(state.checkpoint == 1);
+ assert(state.checkpoint == 5);
}
#endif