summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--async.c4
-rw-r--r--test.c56
2 files changed, 56 insertions, 4 deletions
diff --git a/async.c b/async.c
index e37afbd..4459c19 100644
--- a/async.c
+++ b/async.c
@@ -496,8 +496,8 @@ static int redisIsSubscribeReply(redisReply *reply) {
len = reply->element[0]->len - off;
return !strncasecmp(str, "subscribe", len) ||
- !strncasecmp(str, "message", len);
-
+ !strncasecmp(str, "message", len) ||
+ !strncasecmp(str, "unsubscribe", len);
}
void redisProcessCallbacks(redisAsyncContext *ac) {
diff --git a/test.c b/test.c
index 04d5197..7af9bee 100644
--- a/test.c
+++ b/test.c
@@ -1453,6 +1453,7 @@ struct event_base *base;
typedef struct TestState {
redisOptions *options;
int checkpoint;
+ int resp3;
} TestState;
/* Testcase timeout, will trigger a failure */
@@ -1479,7 +1480,7 @@ void publish_msg(redisOptions *options, const char* channel, const char* msg) {
disconnect(c, 0);
}
-/* Subscribe callback for test_pubsub_handling:
+/* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3:
* - a published message triggers an unsubscribe
* - an unsubscribe response triggers a disconnect */
void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
@@ -1487,7 +1488,7 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
TestState *state = privdata;
assert(reply != NULL &&
- reply->type == REDIS_REPLY_ARRAY &&
+ reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) &&
reply->elements == 3);
if (strcmp(reply->element[0]->str,"subscribe") == 0) {
@@ -1546,6 +1547,49 @@ static void test_pubsub_handling(struct config config) {
/* Verify test checkpoints */
assert(state.checkpoint == 1);
}
+
+/* Unexpected push message, will trigger a failure */
+void unexpected_push_cb(redisAsyncContext *ac, void *r) {
+ (void) ac; (void) r;
+ printf("Unexpected call to the PUSH callback!\n");
+ exit(1);
+}
+
+static void test_pubsub_handling_resp3(struct config config) {
+ test("Subscribe, handle published message and unsubscribe using RESP3: ");
+ /* Setup event dispatcher with a testcase timeout */
+ base = event_base_new();
+ struct event *timeout = evtimer_new(base, timeout_cb, NULL);
+ assert(timeout != NULL);
+
+ evtimer_assign(timeout,base,timeout_cb,NULL);
+ struct timeval timeout_tv = {.tv_sec = 10};
+ evtimer_add(timeout, &timeout_tv);
+
+ /* Connect */
+ redisOptions options = get_redis_tcp_options(config);
+ redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
+ assert(ac != NULL && ac->err == 0);
+ redisLibeventAttach(ac,base);
+
+ /* Not expecting any push messages in this test */
+ redisAsyncSetPushCallback(ac, unexpected_push_cb);
+
+ /* Switch protocol */
+ redisAsyncCommand(ac,NULL,NULL,"HELLO 3");
+
+ /* Start subscribe */
+ TestState state = {.options = &options, .resp3 = 1};
+ redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");
+
+ /* Start event dispatching loop */
+ test_cond(event_base_dispatch(base) == 0);
+ event_free(timeout);
+ event_base_free(base);
+
+ /* Verify test checkpoints */
+ assert(state.checkpoint == 1);
+}
#endif
int main(int argc, char **argv) {
@@ -1668,7 +1712,15 @@ int main(int argc, char **argv) {
#ifdef HIREDIS_TEST_ASYNC
printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
+ cfg.type = CONN_TCP;
+
+ int major;
+ redisContext *c = do_connect(cfg);
+ get_redis_version(c, &major, NULL);
+ disconnect(c, 0);
+
test_pubsub_handling(cfg);
+ if (major >= 6) test_pubsub_handling_resp3(cfg);
#endif
if (test_inherit_fd) {