diff options
Diffstat (limited to 'async.c')
-rw-r--r-- | async.c | 279 |
1 files changed, 203 insertions, 76 deletions
@@ -140,14 +140,16 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { ac->ev.scheduleTimer = NULL; ac->onConnect = NULL; + ac->onConnectNC = NULL; ac->onDisconnect = NULL; ac->replies.head = NULL; ac->replies.tail = NULL; - ac->sub.invalid.head = NULL; - ac->sub.invalid.tail = NULL; + ac->sub.replies.head = NULL; + ac->sub.replies.tail = NULL; ac->sub.channels = channels; ac->sub.patterns = patterns; + ac->sub.pending_unsubs = 0; return ac; oom: @@ -225,17 +227,34 @@ redisAsyncContext *redisAsyncConnectUnix(const char *path) { return redisAsyncConnectWithOptions(&options); } -int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) { - if (ac->onConnect == NULL) { - ac->onConnect = fn; +static int +redisAsyncSetConnectCallbackImpl(redisAsyncContext *ac, redisConnectCallback *fn, + redisConnectCallbackNC *fn_nc) +{ + /* If either are already set, this is an error */ + if (ac->onConnect || ac->onConnectNC) + return REDIS_ERR; - /* The common way to detect an established connection is to wait for - * the first write event to be fired. This assumes the related event - * library functions are already set. */ - _EL_ADD_WRITE(ac); - return REDIS_OK; + if (fn) { + ac->onConnect = fn; + } else if (fn_nc) { + ac->onConnectNC = fn_nc; } - return REDIS_ERR; + + /* The common way to detect an established connection is to wait for + * the first write event to be fired. This assumes the related event + * library functions are already set. */ + _EL_ADD_WRITE(ac); + + return REDIS_OK; +} + +int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) { + return redisAsyncSetConnectCallbackImpl(ac, fn, NULL); +} + +int redisAsyncSetConnectCallbackNC(redisAsyncContext *ac, redisConnectCallbackNC *fn) { + return redisAsyncSetConnectCallbackImpl(ac, NULL, fn); } int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) { @@ -302,6 +321,43 @@ static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) { } } +static void __redisRunConnectCallback(redisAsyncContext *ac, int status) +{ + if (ac->onConnect == NULL && ac->onConnectNC == NULL) + return; + + if (!(ac->c.flags & REDIS_IN_CALLBACK)) { + ac->c.flags |= REDIS_IN_CALLBACK; + if (ac->onConnect) { + ac->onConnect(ac, status); + } else { + ac->onConnectNC(ac, status); + } + ac->c.flags &= ~REDIS_IN_CALLBACK; + } else { + /* already in callback */ + if (ac->onConnect) { + ac->onConnect(ac, status); + } else { + ac->onConnectNC(ac, status); + } + } +} + +static void __redisRunDisconnectCallback(redisAsyncContext *ac, int status) +{ + if (ac->onDisconnect) { + if (!(ac->c.flags & REDIS_IN_CALLBACK)) { + ac->c.flags |= REDIS_IN_CALLBACK; + ac->onDisconnect(ac, status); + ac->c.flags &= ~REDIS_IN_CALLBACK; + } else { + /* already in callback */ + ac->onDisconnect(ac, status); + } + } +} + /* Helper function to free the context. */ static void __redisAsyncFree(redisAsyncContext *ac) { redisContext *c = &(ac->c); @@ -312,9 +368,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) { /* Execute pending callbacks with NULL reply. */ while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL); - - /* Execute callbacks for invalid commands */ - while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK) + while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL); /* Run subscription callbacks with NULL reply */ @@ -339,12 +393,11 @@ static void __redisAsyncFree(redisAsyncContext *ac) { /* Execute disconnect callback. When redisAsyncFree() initiated destroying * this context, the status will always be REDIS_OK. */ - if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) { - if (c->flags & REDIS_FREEING) { - ac->onDisconnect(ac,REDIS_OK); - } else { - ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR); - } + if (c->flags & REDIS_CONNECTED) { + int status = ac->err == 0 ? REDIS_OK : REDIS_ERR; + if (c->flags & REDIS_FREEING) + status = REDIS_OK; + __redisRunDisconnectCallback(ac, status); } if (ac->dataCleanup) { @@ -413,16 +466,17 @@ void redisAsyncDisconnect(redisAsyncContext *ac) { static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) { redisContext *c = &(ac->c); dict *callbacks; - redisCallback *cb; + redisCallback *cb = NULL; dictEntry *de; int pvariant; char *stype; - sds sname; + sds sname = NULL; - /* Custom reply functions are not supported for pub/sub. This will fail - * very hard when they are used... */ - if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) { - assert(reply->elements >= 2); + /* Match reply with the expected format of a pushed message. + * The type and number of elements (3 to 4) are specified at: + * https://redis.io/topics/pubsub#format-of-pushed-messages */ + if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH) && reply->elements >= 3) || + reply->type == REDIS_REPLY_PUSH) { assert(reply->element[0]->type == REDIS_REPLY_STRING); stype = reply->element[0]->str; pvariant = (tolower(stype[0]) == 'p') ? 1 : 0; @@ -433,46 +487,55 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, callbacks = ac->sub.channels; /* Locate the right callback */ - assert(reply->element[1]->type == REDIS_REPLY_STRING); - sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); - if (sname == NULL) - goto oom; - - de = dictFind(callbacks,sname); - if (de != NULL) { - cb = dictGetEntryVal(de); + if (reply->element[1]->type == REDIS_REPLY_STRING) { + sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); + if (sname == NULL) goto oom; - /* If this is an subscribe reply decrease pending counter. */ - if (strcasecmp(stype+pvariant,"subscribe") == 0) { - cb->pending_subs -= 1; + if ((de = dictFind(callbacks,sname)) != NULL) { + cb = dictGetEntryVal(de); + memcpy(dstcb,cb,sizeof(*dstcb)); } + } - memcpy(dstcb,cb,sizeof(*dstcb)); - - /* If this is an unsubscribe message, remove it. */ - if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { - if (cb->pending_subs == 0) - dictDelete(callbacks,sname); - - /* If this was the last unsubscribe message, revert to - * non-subscribe mode. */ - assert(reply->element[2]->type == REDIS_REPLY_INTEGER); - - /* Unset subscribed flag only when no pipelined pending subscribe. */ - if (reply->element[2]->integer == 0 - && dictSize(ac->sub.channels) == 0 - && dictSize(ac->sub.patterns) == 0) - c->flags &= ~REDIS_SUBSCRIBED; + /* If this is an subscribe reply decrease pending counter. */ + if (strcasecmp(stype+pvariant,"subscribe") == 0) { + assert(cb != NULL); + cb->pending_subs -= 1; + + } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { + if (cb == NULL) + ac->sub.pending_unsubs -= 1; + else if (cb->pending_subs == 0) + dictDelete(callbacks,sname); + + /* If this was the last unsubscribe message, revert to + * non-subscribe mode. */ + assert(reply->element[2]->type == REDIS_REPLY_INTEGER); + + /* Unset subscribed flag only when no pipelined pending subscribe + * or pending unsubscribe replies. */ + if (reply->element[2]->integer == 0 + && dictSize(ac->sub.channels) == 0 + && dictSize(ac->sub.patterns) == 0 + && ac->sub.pending_unsubs == 0) { + c->flags &= ~REDIS_SUBSCRIBED; + + /* Move ongoing regular command callbacks. */ + redisCallback cb; + while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) { + __redisPushCallback(&ac->replies,&cb); + } } } sdsfree(sname); } else { - /* Shift callback for invalid commands. */ - __redisShiftCallback(&ac->sub.invalid,dstcb); + /* Shift callback for pending command in subscribed context. */ + __redisShiftCallback(&ac->sub.replies,dstcb); } return REDIS_OK; oom: __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory"); + __redisAsyncCopyError(ac); return REDIS_ERR; } @@ -496,13 +559,12 @@ static int redisIsSubscribeReply(redisReply *reply) { len = reply->element[0]->len - off; return !strncasecmp(str, "subscribe", len) || - !strncasecmp(str, "message", len); - + !strncasecmp(str, "message", len) || + !strncasecmp(str, "unsubscribe", len); } void redisProcessCallbacks(redisAsyncContext *ac) { redisContext *c = &(ac->c); - redisCallback cb = {NULL, NULL, 0, NULL}; void *reply = NULL; int status; @@ -515,17 +577,14 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); return; } - - /* If monitor mode, repush callback */ - if(c->flags & REDIS_MONITORING) { - __redisPushCallback(&ac->replies,&cb); - } - /* When the connection is not being disconnected, simply stop * trying to get replies and wait for the next loop tick. */ break; } + /* Keep track of push message support for subscribe handling */ + if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH; + /* Send any non-subscribe related PUSH messages to our PUSH handler * while allowing subscribe related PUSH messages to pass through. * This allows existing code to be backward compatible and work in @@ -538,6 +597,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) { /* Even if the context is subscribed, pending regular * callbacks will get a reply before pub/sub messages arrive. */ + redisCallback cb = {NULL, NULL, 0, 0, NULL}; if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { /* * A spontaneous reply in a not-subscribed context can be the error @@ -561,9 +621,9 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); return; } - /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */ - assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)); - if(c->flags & REDIS_SUBSCRIBED) + /* No more regular callbacks and no errors, the context *must* be subscribed. */ + assert(c->flags & REDIS_SUBSCRIBED); + if (c->flags & REDIS_SUBSCRIBED) __redisGetSubscribeCallback(ac,reply,&cb); } @@ -585,6 +645,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) { * doesn't know what the server will spit out over the wire. */ c->reader->fn->freeObject(reply); } + + /* If in monitor mode, repush the callback */ + if (c->flags & REDIS_MONITORING) { + __redisPushCallback(&ac->replies,&cb); + } } /* Disconnect when there was an error reading the reply */ @@ -593,7 +658,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) { } static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) { - if (ac->onConnect) ac->onConnect(ac, REDIS_ERR); + __redisRunConnectCallback(ac, REDIS_ERR); __redisAsyncDisconnect(ac); } @@ -618,8 +683,19 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) { return REDIS_ERR; } - if (ac->onConnect) ac->onConnect(ac, REDIS_OK); + /* flag us as fully connect, but allow the callback + * to disconnect. For that reason, permit the function + * to delete the context here after callback return. + */ c->flags |= REDIS_CONNECTED; + __redisRunConnectCallback(ac, REDIS_OK); + if ((ac->c.flags & REDIS_DISCONNECTING)) { + redisAsyncDisconnect(ac); + return REDIS_ERR; + } else if ((ac->c.flags & REDIS_FREEING)) { + redisAsyncFree(ac); + return REDIS_ERR; + } return REDIS_OK; } else { return REDIS_OK; @@ -643,6 +719,8 @@ void redisAsyncRead(redisAsyncContext *ac) { */ void redisAsyncHandleRead(redisAsyncContext *ac) { redisContext *c = &(ac->c); + /* must not be called from a callback */ + assert(!(c->flags & REDIS_IN_CALLBACK)); if (!(c->flags & REDIS_CONNECTED)) { /* Abort connect was not successful. */ @@ -676,6 +754,8 @@ void redisAsyncWrite(redisAsyncContext *ac) { void redisAsyncHandleWrite(redisAsyncContext *ac) { redisContext *c = &(ac->c); + /* must not be called from a callback */ + assert(!(c->flags & REDIS_IN_CALLBACK)); if (!(c->flags & REDIS_CONNECTED)) { /* Abort connect was not successful. */ @@ -692,9 +772,11 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { void redisAsyncHandleTimeout(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb; + /* must not be called from a callback */ + assert(!(c->flags & REDIS_IN_CALLBACK)); if ((c->flags & REDIS_CONNECTED)) { - if ( ac->replies.head == NULL) { + if (ac->replies.head == NULL && ac->sub.replies.head == NULL) { /* Nothing to do - just an idle timeout */ return; } @@ -711,8 +793,8 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) { __redisAsyncCopyError(ac); } - if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) { - ac->onConnect(ac, REDIS_ERR); + if (!(c->flags & REDIS_CONNECTED)) { + __redisRunConnectCallback(ac, REDIS_ERR); } while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) { @@ -749,6 +831,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void redisContext *c = &(ac->c); redisCallback cb; struct dict *cbdict; + dictIterator it; dictEntry *de; redisCallback *existcb; int pvariant, hasnext; @@ -765,6 +848,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void cb.fn = fn; cb.privdata = privdata; cb.pending_subs = 1; + cb.unsubscribe_sent = 0; /* Find out which command will be appended. */ p = nextArgument(cmd,&cstr,&clen); @@ -804,6 +888,51 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void * subscribed to one or more channels or patterns. */ if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR; + if (pvariant) + cbdict = ac->sub.patterns; + else + cbdict = ac->sub.channels; + + if (hasnext) { + /* Send an unsubscribe with specific channels/patterns. + * Bookkeeping the number of expected replies */ + while ((p = nextArgument(p,&astr,&alen)) != NULL) { + sname = sdsnewlen(astr,alen); + if (sname == NULL) + goto oom; + + de = dictFind(cbdict,sname); + if (de != NULL) { + existcb = dictGetEntryVal(de); + if (existcb->unsubscribe_sent == 0) + existcb->unsubscribe_sent = 1; + else + /* Already sent, reply to be ignored */ + ac->sub.pending_unsubs += 1; + } else { + /* Not subscribed to, reply to be ignored */ + ac->sub.pending_unsubs += 1; + } + sdsfree(sname); + } + } else { + /* Send an unsubscribe without specific channels/patterns. + * Bookkeeping the number of expected replies */ + int no_subs = 1; + dictInitIterator(&it,cbdict); + while ((de = dictNext(&it)) != NULL) { + existcb = dictGetEntryVal(de); + if (existcb->unsubscribe_sent == 0) { + existcb->unsubscribe_sent = 1; + no_subs = 0; + } + } + /* Unsubscribing to all channels/patterns, where none is + * subscribed to, results in a single reply to be ignored. */ + if (no_subs == 1) + ac->sub.pending_unsubs += 1; + } + /* (P)UNSUBSCRIBE does not have its own response: every channel or * pattern that is unsubscribed will receive a message. This means we * should not append a callback function for this command. */ @@ -814,9 +943,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void goto oom; } else { if (c->flags & REDIS_SUBSCRIBED) { - /* This will likely result in an error reply, but it needs to be - * received and passed to the callback. */ - if (__redisPushCallback(&ac->sub.invalid,&cb) != REDIS_OK) + if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK) goto oom; } else { if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) @@ -862,7 +989,7 @@ int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) { sds cmd; - int len; + long long len; int status; len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen); if (len < 0) |