diff options
| -rw-r--r-- | async.c | 4 | ||||
| -rw-r--r-- | test.c | 56 | 
2 files changed, 56 insertions, 4 deletions
| @@ -496,8 +496,8 @@ static int redisIsSubscribeReply(redisReply *reply) {      len = reply->element[0]->len - off;      return !strncasecmp(str, "subscribe", len) || -           !strncasecmp(str, "message", len); - +           !strncasecmp(str, "message", len) || +           !strncasecmp(str, "unsubscribe", len);  }  void redisProcessCallbacks(redisAsyncContext *ac) { @@ -1453,6 +1453,7 @@ struct event_base *base;  typedef struct TestState {      redisOptions *options;      int           checkpoint; +    int           resp3;  } TestState;  /* Testcase timeout, will trigger a failure */ @@ -1479,7 +1480,7 @@ void publish_msg(redisOptions *options, const char* channel, const char* msg) {      disconnect(c, 0);  } -/* Subscribe callback for test_pubsub_handling: +/* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3:   * - a published message triggers an unsubscribe   * - an unsubscribe response triggers a disconnect */  void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { @@ -1487,7 +1488,7 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {      TestState *state = privdata;      assert(reply != NULL && -           reply->type == REDIS_REPLY_ARRAY && +           reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) &&             reply->elements == 3);      if (strcmp(reply->element[0]->str,"subscribe") == 0) { @@ -1546,6 +1547,49 @@ static void test_pubsub_handling(struct config config) {      /* Verify test checkpoints */      assert(state.checkpoint == 1);  } + +/* Unexpected push message, will trigger a failure */ +void unexpected_push_cb(redisAsyncContext *ac, void *r) { +    (void) ac; (void) r; +    printf("Unexpected call to the PUSH callback!\n"); +    exit(1); +} + +static void test_pubsub_handling_resp3(struct config config) { +    test("Subscribe, handle published message and unsubscribe using RESP3: "); +    /* Setup event dispatcher with a testcase timeout */ +    base = event_base_new(); +    struct event *timeout = evtimer_new(base, timeout_cb, NULL); +    assert(timeout != NULL); + +    evtimer_assign(timeout,base,timeout_cb,NULL); +    struct timeval timeout_tv = {.tv_sec = 10}; +    evtimer_add(timeout, &timeout_tv); + +    /* Connect */ +    redisOptions options = get_redis_tcp_options(config); +    redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); +    assert(ac != NULL && ac->err == 0); +    redisLibeventAttach(ac,base); + +    /* Not expecting any push messages in this test */ +    redisAsyncSetPushCallback(ac, unexpected_push_cb); + +    /* Switch protocol */ +    redisAsyncCommand(ac,NULL,NULL,"HELLO 3"); + +    /* Start subscribe */ +    TestState state = {.options = &options, .resp3 = 1}; +    redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel"); + +    /* Start event dispatching loop */ +    test_cond(event_base_dispatch(base) == 0); +    event_free(timeout); +    event_base_free(base); + +    /* Verify test checkpoints */ +    assert(state.checkpoint == 1); +}  #endif  int main(int argc, char **argv) { @@ -1668,7 +1712,15 @@ int main(int argc, char **argv) {  #ifdef HIREDIS_TEST_ASYNC      printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port); +    cfg.type = CONN_TCP; + +    int major; +    redisContext *c = do_connect(cfg); +    get_redis_version(c, &major, NULL); +    disconnect(c, 0); +      test_pubsub_handling(cfg); +    if (major >= 6) test_pubsub_handling_resp3(cfg);  #endif      if (test_inherit_fd) { | 
