diff options
| -rw-r--r-- | async.c | 25 | ||||
| -rw-r--r-- | async.h | 2 | ||||
| -rw-r--r-- | test.c | 14 | 
3 files changed, 25 insertions, 16 deletions
| @@ -144,8 +144,8 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) {      ac->replies.head = NULL;      ac->replies.tail = NULL; -    ac->sub.invalid.head = NULL; -    ac->sub.invalid.tail = NULL; +    ac->sub.replies.head = NULL; +    ac->sub.replies.tail = NULL;      ac->sub.channels = channels;      ac->sub.patterns = patterns; @@ -312,9 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) {      /* Execute pending callbacks with NULL reply. */      while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)          __redisRunCallback(ac,&cb,NULL); - -    /* Execute callbacks for invalid commands */ -    while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK) +    while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK)          __redisRunCallback(ac,&cb,NULL);      /* Run subscription callbacks with NULL reply */ @@ -419,10 +417,11 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,      char *stype;      sds sname; -    /* Custom reply functions are not supported for pub/sub. This will fail -     * very hard when they are used... */ -    if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) { -        assert(reply->elements >= 2); +    /* Match reply with the expected format of a pushed message. +     * The type and number of elements (3 to 4) are specified at: +     * https://redis.io/topics/pubsub#format-of-pushed-messages */ +    if ((reply->type == REDIS_REPLY_ARRAY && reply->elements >= 3) || +        reply->type == REDIS_REPLY_PUSH) {          assert(reply->element[0]->type == REDIS_REPLY_STRING);          stype = reply->element[0]->str;          pvariant = (tolower(stype[0]) == 'p') ? 1 : 0; @@ -467,8 +466,8 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,          }          sdsfree(sname);      } else { -        /* Shift callback for invalid commands. */ -        __redisShiftCallback(&ac->sub.invalid,dstcb); +        /* Shift callback for pending command in subscribed context. */ +        __redisShiftCallback(&ac->sub.replies,dstcb);      }      return REDIS_OK;  oom: @@ -814,9 +813,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void              goto oom;      } else {          if (c->flags & REDIS_SUBSCRIBED) { -            /* This will likely result in an error reply, but it needs to be -             * received and passed to the callback. */ -            if (__redisPushCallback(&ac->sub.invalid,&cb) != REDIS_OK) +            if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK)                  goto oom;          } else {              if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) @@ -102,7 +102,7 @@ typedef struct redisAsyncContext {      /* Subscription callbacks */      struct { -        redisCallbackList invalid; +        redisCallbackList replies;          struct dict *channels;          struct dict *patterns;      } sub; @@ -1518,6 +1518,15 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {      }  } +/* Expect a reply of type ARRAY */ +void array_cb(redisAsyncContext *ac, void *r, void *privdata) { +    (void) ac; +    redisReply *reply = r; +    TestState *state = privdata; +    assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY); +    state->checkpoint++; +} +  static void test_pubsub_handling(struct config config) {      test("Subscribe, handle published message and unsubscribe: ");      /* Setup event dispatcher with a testcase timeout */ @@ -1539,13 +1548,16 @@ static void test_pubsub_handling(struct config config) {      TestState state = {.options = &options};      redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel"); +    /* Make sure non-subscribe commands are handled */ +    redisAsyncCommand(ac,array_cb,&state,"PING"); +      /* Start event dispatching loop */      test_cond(event_base_dispatch(base) == 0);      event_free(timeout);      event_base_free(base);      /* Verify test checkpoints */ -    assert(state.checkpoint == 1); +    assert(state.checkpoint == 2);  }  /* Unexpected push message, will trigger a failure */ | 
