summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHyungjin Kim <kim@hyjin.com>2016-09-30 00:11:27 +0900
committerHyungjin Kim <kim@hyjin.com>2016-09-30 00:26:32 +0900
commit073dc84399d29a2648aa20ad687524795d13609e (patch)
tree57140347926b6995a8a7a23f3931e7c9095e9d84
parent5f98e1d35dcf00a026793ada2662f6e1ba77eb17 (diff)
Counting pending subscribe. Fix #396
-rw-r--r--async.c34
-rw-r--r--async.h1
2 files changed, 30 insertions, 5 deletions
diff --git a/async.c b/async.c
index d955203..3654580 100644
--- a/async.c
+++ b/async.c
@@ -364,6 +364,7 @@ void redisAsyncDisconnect(redisAsyncContext *ac) {
static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
redisContext *c = &(ac->c);
dict *callbacks;
+ redisCallback *cb;
dictEntry *de;
int pvariant;
char *stype;
@@ -387,16 +388,26 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
de = dictFind(callbacks,sname);
if (de != NULL) {
+ cb = dictGetEntryVal(de);
+
+ /* If this is an subscribe reply decrease pending counter. */
+ if (strcasecmp(stype+pvariant,"subscribe") == 0) {
+ cb->pending_subs -= 1;
+ }
+
memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb));
/* If this is an unsubscribe message, remove it. */
if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
- dictDelete(callbacks,sname);
+ if (cb->pending_subs == 0)
+ dictDelete(callbacks,sname);
/* If this was the last unsubscribe message, revert to
* non-subscribe mode. */
assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
- if (reply->element[2]->integer == 0)
+
+ /* Unset subscribed flag only when no pipelined pending subscribe. */
+ if (reply->element[2]->integer == 0 && dictSize(callbacks) == 0)
c->flags &= ~REDIS_SUBSCRIBED;
}
}
@@ -410,7 +421,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
- redisCallback cb = {NULL, NULL, NULL};
+ redisCallback cb = {NULL, NULL, 0, NULL};
void *reply = NULL;
int status;
@@ -583,6 +594,9 @@ static const char *nextArgument(const char *start, const char **str, size_t *len
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
redisContext *c = &(ac->c);
redisCallback cb;
+ struct dict *cbdict;
+ dictEntry *de;
+ redisCallback *existcb;
int pvariant, hasnext;
const char *cstr, *astr;
size_t clen, alen;
@@ -596,6 +610,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
/* Setup callback */
cb.fn = fn;
cb.privdata = privdata;
+ cb.pending_subs = 1;
/* Find out which command will be appended. */
p = nextArgument(cmd,&cstr,&clen);
@@ -612,9 +627,18 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
while ((p = nextArgument(p,&astr,&alen)) != NULL) {
sname = sdsnewlen(astr,alen);
if (pvariant)
- ret = dictReplace(ac->sub.patterns,sname,&cb);
+ cbdict = ac->sub.patterns;
else
- ret = dictReplace(ac->sub.channels,sname,&cb);
+ cbdict = ac->sub.channels;
+
+ de = dictFind(cbdict,sname);
+
+ if (de != NULL) {
+ existcb = dictGetEntryVal(de);
+ cb.pending_subs = existcb->pending_subs + 1;
+ }
+
+ ret = dictReplace(cbdict,sname,&cb);
if (ret == 0) sdsfree(sname);
}
diff --git a/async.h b/async.h
index 59cbf46..e69d840 100644
--- a/async.h
+++ b/async.h
@@ -45,6 +45,7 @@ typedef void (redisCallbackFn)(struct redisAsyncContext*, void*, void*);
typedef struct redisCallback {
struct redisCallback *next; /* simple singly linked list */
redisCallbackFn *fn;
+ int pending_subs;
void *privdata;
} redisCallback;