diff options
Diffstat (limited to 'async.c')
-rw-r--r-- | async.c | 68 |
1 files changed, 42 insertions, 26 deletions
@@ -47,6 +47,11 @@ #include "async_private.h" +#ifdef NDEBUG +#undef assert +#define assert(e) (void)(e) +#endif + /* Forward declarations of hiredis.c functions */ int __redisAppendCommand(redisContext *c, const char *cmd, size_t len); void __redisSetError(redisContext *c, int type, const char *str); @@ -301,7 +306,7 @@ static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) { static void __redisAsyncFree(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb; - dictIterator *it; + dictIterator it; dictEntry *de; /* Execute pending callbacks with NULL reply. */ @@ -314,23 +319,17 @@ static void __redisAsyncFree(redisAsyncContext *ac) { /* Run subscription callbacks with NULL reply */ if (ac->sub.channels) { - it = dictGetIterator(ac->sub.channels); - if (it != NULL) { - while ((de = dictNext(it)) != NULL) - __redisRunCallback(ac,dictGetEntryVal(de),NULL); - dictReleaseIterator(it); - } + dictInitIterator(&it,ac->sub.channels); + while ((de = dictNext(&it)) != NULL) + __redisRunCallback(ac,dictGetEntryVal(de),NULL); dictRelease(ac->sub.channels); } if (ac->sub.patterns) { - it = dictGetIterator(ac->sub.patterns); - if (it != NULL) { - while ((de = dictNext(it)) != NULL) - __redisRunCallback(ac,dictGetEntryVal(de),NULL); - dictReleaseIterator(it); - } + dictInitIterator(&it,ac->sub.patterns); + while ((de = dictNext(&it)) != NULL) + __redisRunCallback(ac,dictGetEntryVal(de),NULL); dictRelease(ac->sub.patterns); } @@ -570,7 +569,9 @@ void redisProcessCallbacks(redisAsyncContext *ac) { if (cb.fn != NULL) { __redisRunCallback(ac,&cb,reply); - c->reader->fn->freeObject(reply); + if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){ + c->reader->fn->freeObject(reply); + } /* Proceed with free'ing when redisAsyncFree() was called. */ if (c->flags & REDIS_FREEING) { @@ -605,7 +606,8 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) { if (redisCheckConnectDone(c, &completed) == REDIS_ERR) { /* Error! */ - redisCheckSocketError(c); + if (redisCheckSocketError(c) == REDIS_ERR) + __redisAsyncCopyError(ac); __redisAsyncHandleConnectFailure(ac); return REDIS_ERR; } else if (completed == 1) { @@ -691,13 +693,22 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb; - if ((c->flags & REDIS_CONNECTED) && ac->replies.head == NULL) { - /* Nothing to do - just an idle timeout */ - return; + if ((c->flags & REDIS_CONNECTED)) { + if ( ac->replies.head == NULL) { + /* Nothing to do - just an idle timeout */ + return; + } + + if (!ac->c.command_timeout || + (!ac->c.command_timeout->tv_sec && !ac->c.command_timeout->tv_usec)) { + /* A belated connect timeout arriving, ignore */ + return; + } } if (!c->err) { __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout"); + __redisAsyncCopyError(ac); } if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) { @@ -796,17 +807,21 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void /* (P)UNSUBSCRIBE does not have its own response: every channel or * pattern that is unsubscribed will receive a message. This means we * should not append a callback function for this command. */ - } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) { - /* Set monitor flag and push callback */ - c->flags |= REDIS_MONITORING; - __redisPushCallback(&ac->replies,&cb); + } else if (strncasecmp(cstr,"monitor\r\n",9) == 0) { + /* Set monitor flag and push callback */ + c->flags |= REDIS_MONITORING; + if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) + goto oom; } else { - if (c->flags & REDIS_SUBSCRIBED) + if (c->flags & REDIS_SUBSCRIBED) { /* This will likely result in an error reply, but it needs to be * received and passed to the callback. */ - __redisPushCallback(&ac->sub.invalid,&cb); - else - __redisPushCallback(&ac->replies,&cb); + if (__redisPushCallback(&ac->sub.invalid,&cb) != REDIS_OK) + goto oom; + } else { + if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) + goto oom; + } } __redisAppendCommand(c,cmd,len); @@ -817,6 +832,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void return REDIS_OK; oom: __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory"); + __redisAsyncCopyError(ac); return REDIS_ERR; } |