diff options
-rw-r--r-- | async.c | 4 | ||||
-rw-r--r-- | test.c | 56 |
2 files changed, 56 insertions, 4 deletions
@@ -496,8 +496,8 @@ static int redisIsSubscribeReply(redisReply *reply) { len = reply->element[0]->len - off; return !strncasecmp(str, "subscribe", len) || - !strncasecmp(str, "message", len); - + !strncasecmp(str, "message", len) || + !strncasecmp(str, "unsubscribe", len); } void redisProcessCallbacks(redisAsyncContext *ac) { @@ -1453,6 +1453,7 @@ struct event_base *base; typedef struct TestState { redisOptions *options; int checkpoint; + int resp3; } TestState; /* Testcase timeout, will trigger a failure */ @@ -1479,7 +1480,7 @@ void publish_msg(redisOptions *options, const char* channel, const char* msg) { disconnect(c, 0); } -/* Subscribe callback for test_pubsub_handling: +/* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3: * - a published message triggers an unsubscribe * - an unsubscribe response triggers a disconnect */ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { @@ -1487,7 +1488,7 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { TestState *state = privdata; assert(reply != NULL && - reply->type == REDIS_REPLY_ARRAY && + reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) && reply->elements == 3); if (strcmp(reply->element[0]->str,"subscribe") == 0) { @@ -1546,6 +1547,49 @@ static void test_pubsub_handling(struct config config) { /* Verify test checkpoints */ assert(state.checkpoint == 1); } + +/* Unexpected push message, will trigger a failure */ +void unexpected_push_cb(redisAsyncContext *ac, void *r) { + (void) ac; (void) r; + printf("Unexpected call to the PUSH callback!\n"); + exit(1); +} + +static void test_pubsub_handling_resp3(struct config config) { + test("Subscribe, handle published message and unsubscribe using RESP3: "); + /* Setup event dispatcher with a testcase timeout */ + base = event_base_new(); + struct event *timeout = evtimer_new(base, timeout_cb, NULL); + assert(timeout != NULL); + + evtimer_assign(timeout,base,timeout_cb,NULL); + struct timeval timeout_tv = {.tv_sec = 10}; + evtimer_add(timeout, &timeout_tv); + + /* Connect */ + redisOptions options = get_redis_tcp_options(config); + redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); + assert(ac != NULL && ac->err == 0); + redisLibeventAttach(ac,base); + + /* Not expecting any push messages in this test */ + redisAsyncSetPushCallback(ac, unexpected_push_cb); + + /* Switch protocol */ + redisAsyncCommand(ac,NULL,NULL,"HELLO 3"); + + /* Start subscribe */ + TestState state = {.options = &options, .resp3 = 1}; + redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel"); + + /* Start event dispatching loop */ + test_cond(event_base_dispatch(base) == 0); + event_free(timeout); + event_base_free(base); + + /* Verify test checkpoints */ + assert(state.checkpoint == 1); +} #endif int main(int argc, char **argv) { @@ -1668,7 +1712,15 @@ int main(int argc, char **argv) { #ifdef HIREDIS_TEST_ASYNC printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port); + cfg.type = CONN_TCP; + + int major; + redisContext *c = do_connect(cfg); + get_redis_version(c, &major, NULL); + disconnect(c, 0); + test_pubsub_handling(cfg); + if (major >= 6) test_pubsub_handling_resp3(cfg); #endif if (test_inherit_fd) { |