From b6fb548fc65497ce933cfd15b6fd91c700d4b34e Mon Sep 17 00:00:00 2001 From: Björn Svensson Date: Wed, 2 Feb 2022 22:42:23 +0100 Subject: Ignore pubsub replies without a channel/pattern --- async.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'async.c') diff --git a/async.c b/async.c index 6555114..296d31b 100644 --- a/async.c +++ b/async.c @@ -431,8 +431,10 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, else callbacks = ac->sub.channels; + /* Ignore replies without a channel/pattern string */ + if (reply->element[1]->type != REDIS_REPLY_STRING) return REDIS_OK; + /* Locate the right callback */ - assert(reply->element[1]->type == REDIS_REPLY_STRING); sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); if (sname == NULL) goto oom; -- cgit v1.2.3 From 7123b87f6d237e0ddf75c9698b2961e2d102a69d Mon Sep 17 00:00:00 2001 From: Björn Svensson Date: Wed, 2 Feb 2022 23:29:59 +0100 Subject: Handle any pipelined unsubscribe in async Redis responds to an unsubscribe with one or many replies, depending on the current subscribe state. When channels/patterns names are provided in a command each given name will trigger a reply even if duplicated or not subscribed to. To know when we can return from the subscribed state we need to do bookkeeping on pending additional unsubscribe replies, and make sure we receive them all before switching state. --- async.c | 123 ++++++++++++++++++++++++++++++++++++++++++++-------------------- async.h | 2 ++ test.c | 8 ++--- 3 files changed, 91 insertions(+), 42 deletions(-) (limited to 'async.c') diff --git a/async.c b/async.c index 296d31b..3dad137 100644 --- a/async.c +++ b/async.c @@ -148,6 +148,7 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { ac->sub.replies.tail = NULL; ac->sub.channels = channels; ac->sub.patterns = patterns; + ac->sub.pending_unsubs = 0; return ac; oom: @@ -411,11 +412,11 @@ void redisAsyncDisconnect(redisAsyncContext *ac) { static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) { redisContext *c = &(ac->c); dict *callbacks; - redisCallback *cb; + redisCallback *cb = NULL; dictEntry *de; int pvariant; char *stype; - sds sname; + sds sname = NULL; /* Match reply with the expected format of a pushed message. * The type and number of elements (3 to 4) are specified at: @@ -431,45 +432,44 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, else callbacks = ac->sub.channels; - /* Ignore replies without a channel/pattern string */ - if (reply->element[1]->type != REDIS_REPLY_STRING) return REDIS_OK; - /* Locate the right callback */ - sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); - if (sname == NULL) - goto oom; - - de = dictFind(callbacks,sname); - if (de != NULL) { - cb = dictGetEntryVal(de); + if (reply->element[1]->type == REDIS_REPLY_STRING) { + sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); + if (sname == NULL) goto oom; - /* If this is an subscribe reply decrease pending counter. */ - if (strcasecmp(stype+pvariant,"subscribe") == 0) { - cb->pending_subs -= 1; + if ((de = dictFind(callbacks,sname)) != NULL) { + cb = dictGetEntryVal(de); + memcpy(dstcb,cb,sizeof(*dstcb)); } + } - memcpy(dstcb,cb,sizeof(*dstcb)); - - /* If this is an unsubscribe message, remove it. */ - if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { - if (cb->pending_subs == 0) - dictDelete(callbacks,sname); - - /* If this was the last unsubscribe message, revert to - * non-subscribe mode. */ - assert(reply->element[2]->type == REDIS_REPLY_INTEGER); - - /* Unset subscribed flag only when no pipelined pending subscribe. */ - if (reply->element[2]->integer == 0 - && dictSize(ac->sub.channels) == 0 - && dictSize(ac->sub.patterns) == 0) { - c->flags &= ~REDIS_SUBSCRIBED; - - /* Move ongoing regular command callbacks. */ - redisCallback cb; - while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) { - __redisPushCallback(&ac->replies,&cb); - } + /* If this is an subscribe reply decrease pending counter. */ + if (strcasecmp(stype+pvariant,"subscribe") == 0) { + assert(cb != NULL); + cb->pending_subs -= 1; + + } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { + if (cb == NULL) + ac->sub.pending_unsubs -= 1; + else if (cb->pending_subs == 0) + dictDelete(callbacks,sname); + + /* If this was the last unsubscribe message, revert to + * non-subscribe mode. */ + assert(reply->element[2]->type == REDIS_REPLY_INTEGER); + + /* Unset subscribed flag only when no pipelined pending subscribe + * or pending unsubscribe replies. */ + if (reply->element[2]->integer == 0 + && dictSize(ac->sub.channels) == 0 + && dictSize(ac->sub.patterns) == 0 + && ac->sub.pending_unsubs == 0) { + c->flags &= ~REDIS_SUBSCRIBED; + + /* Move ongoing regular command callbacks. */ + redisCallback cb; + while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) { + __redisPushCallback(&ac->replies,&cb); } } } @@ -542,7 +542,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) { /* Even if the context is subscribed, pending regular * callbacks will get a reply before pub/sub messages arrive. */ - redisCallback cb = {NULL, NULL, 0, NULL}; + redisCallback cb = {NULL, NULL, 0, 0, NULL}; if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { /* * A spontaneous reply in a not-subscribed context can be the error @@ -759,6 +759,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void redisContext *c = &(ac->c); redisCallback cb; struct dict *cbdict; + dictIterator it; dictEntry *de; redisCallback *existcb; int pvariant, hasnext; @@ -775,6 +776,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void cb.fn = fn; cb.privdata = privdata; cb.pending_subs = 1; + cb.unsubscribe_sent = 0; /* Find out which command will be appended. */ p = nextArgument(cmd,&cstr,&clen); @@ -814,6 +816,51 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void * subscribed to one or more channels or patterns. */ if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR; + if (pvariant) + cbdict = ac->sub.patterns; + else + cbdict = ac->sub.channels; + + if (hasnext) { + /* Send an unsubscribe with specific channels/patterns. + * Bookkeeping the number of expected replies */ + while ((p = nextArgument(p,&astr,&alen)) != NULL) { + sname = sdsnewlen(astr,alen); + if (sname == NULL) + goto oom; + + de = dictFind(cbdict,sname); + if (de != NULL) { + existcb = dictGetEntryVal(de); + if (existcb->unsubscribe_sent == 0) + existcb->unsubscribe_sent = 1; + else + /* Already sent, reply to be ignored */ + ac->sub.pending_unsubs += 1; + } else { + /* Not subscribed to, reply to be ignored */ + ac->sub.pending_unsubs += 1; + } + sdsfree(sname); + } + } else { + /* Send an unsubscribe without specific channels/patterns. + * Bookkeeping the number of expected replies */ + int no_subs = 1; + dictInitIterator(&it,cbdict); + while ((de = dictNext(&it)) != NULL) { + existcb = dictGetEntryVal(de); + if (existcb->unsubscribe_sent == 0) { + existcb->unsubscribe_sent = 1; + no_subs = 0; + } + } + /* Unsubscribing to all channels/patterns, where none is + * subscribed to, results in a single reply to be ignored. */ + if (no_subs == 1) + ac->sub.pending_unsubs += 1; + } + /* (P)UNSUBSCRIBE does not have its own response: every channel or * pattern that is unsubscribed will receive a message. This means we * should not append a callback function for this command. */ diff --git a/async.h b/async.h index 4c65203..41951d4 100644 --- a/async.h +++ b/async.h @@ -46,6 +46,7 @@ typedef struct redisCallback { struct redisCallback *next; /* simple singly linked list */ redisCallbackFn *fn; int pending_subs; + int unsubscribe_sent; void *privdata; } redisCallback; @@ -105,6 +106,7 @@ typedef struct redisAsyncContext { redisCallbackList replies; struct dict *channels; struct dict *patterns; + int pending_unsubs; } sub; /* Any configured RESP3 PUSH handler */ diff --git a/test.c b/test.c index b76b49a..f43bc24 100644 --- a/test.c +++ b/test.c @@ -1729,10 +1729,10 @@ void subscribe_channel_a_cb(redisAsyncContext *ac, void *r, void *privdata) { strcmp(reply->element[2]->str,"Hello!") == 0); state->checkpoint++; - /* Unsubscribe to channels, including a channel X which we don't subscribe to */ + /* Unsubscribe to channels, including channel X & Z which we don't subscribe to */ redisAsyncCommand(ac,unexpected_cb, (void*)"unsubscribe should not call unexpected_cb()", - "unsubscribe B X A"); + "unsubscribe B X A A Z"); /* Unsubscribe to patterns, none which we subscribe to */ redisAsyncCommand(ac,unexpected_cb, (void*)"punsubscribe should not call unexpected_cb()", @@ -1771,8 +1771,8 @@ void subscribe_channel_b_cb(redisAsyncContext *ac, void *r, void *privdata) { /* Test handling of multiple channels * - subscribe to channel A and B - * - a published message on A triggers an unsubscribe of channel B, X and A - * where channel X is not subscribed to. + * - a published message on A triggers an unsubscribe of channel B, X, A and Z + * where channel X and Z are not subscribed to. * - the published message also triggers an unsubscribe to patterns. Since no * pattern is subscribed to the responded pattern element type is NIL. * - a command sent after unsubscribe triggers a disconnect */ -- cgit v1.2.3