diff options
author | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-11-01 09:27:43 +0100 |
---|---|---|
committer | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-11-01 09:27:43 +0100 |
commit | 4e3bd7893d4e59ca23b629cc9084e9c61d5edf25 (patch) | |
tree | 8c864ce20d51c99cf46c04cdc8db5e024d338475 /async.c | |
parent | ae5a13f55753dc5dddc7d7a9a723c1559054359d (diff) |
Add support to lazily disconnect an asynchronous connection
Diffstat (limited to 'async.c')
-rw-r--r-- | async.c | 103 |
1 files changed, 77 insertions, 26 deletions
@@ -41,8 +41,9 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { } redisAsyncContext *redisAsyncConnect(const char *ip, int port) { - redisContext *_c = redisConnectNonBlock(ip,port); - return redisAsyncInitialize(_c); + redisContext *c = redisConnectNonBlock(ip,port); + redisAsyncContext *ac = redisAsyncInitialize(c); + return ac; } int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn) { @@ -50,6 +51,40 @@ int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFun return redisSetReplyObjectFunctions(c,fn); } +int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) { + if (ac->onDisconnect == NULL) { + ac->onDisconnect = fn; + return REDIS_OK; + } + return REDIS_ERR; +} + +/* Tries to do a clean disconnect from Redis, meaning it stops new commands + * from being issued, but tries to flush the output buffer and execute + * callbacks for all remaining replies. */ +void redisAsyncDisconnect(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + c->flags |= REDIS_DISCONNECTING; +} + +/* Helper function to make the disconnect happen and clean up. */ +static void __redisAsyncDisconnect(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + int status; + + /* Signal event lib to stop reading/writing */ + if (ac->evDelRead) ac->evDelRead(ac->data); + if (ac->evDelWrite) ac->evDelWrite(ac->data); + if (ac->evCleanup) ac->evCleanup(ac->data); + + /* Execute callback with proper status */ + status = (c->error == NULL) ? REDIS_OK : REDIS_ERR; + if (ac->onDisconnect) ac->onDisconnect(ac,status); + + /* Cleanup self */ + redisFree(c); +} + /* Helper functions to push/shift callbacks */ static void __redisPushCallback(redisCallbackList *list, redisCallback *cb) { if (list->head == NULL) @@ -69,38 +104,52 @@ static redisCallback *__redisShiftCallback(redisCallbackList *list) { return cb; } +void redisProcessCallbacks(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + redisCallback *cb; + void *reply = NULL; + int status; + + while((status = redisGetReply(c,&reply)) == REDIS_OK) { + if (reply == NULL) { + /* When the connection is being disconnected and there are + * no more replies, this is the cue to really disconnect. */ + if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0) { + __redisAsyncDisconnect(ac); + return; + } + + /* When the connection is not being disconnected, simply stop + * trying to get replies and wait for the next loop tick. */ + break; + } + + /* Shift callback and execute it */ + cb = __redisShiftCallback(&ac->replies); + assert(cb != NULL); + if (cb->fn != NULL) { + cb->fn(ac,reply,cb->privdata); + } else { + c->fn->freeObject(reply); + } + } + + /* Disconnect when there was an error reading the reply */ + if (status != REDIS_OK) + __redisAsyncDisconnect(ac); +} + /* This function should be called when the socket is readable. * It processes all replies that can be read and executes their callbacks. */ void redisAsyncHandleRead(redisAsyncContext *ac) { redisContext *c = &(ac->c); - redisCallback *cb; - void *reply = NULL; - int status; if (redisBufferRead(c) == REDIS_ERR) { - // needs error handling - assert(NULL); + __redisAsyncDisconnect(ac); } else { /* Always re-schedule reads */ if (ac->evAddRead) ac->evAddRead(ac->data); - - while((status = redisGetReply(c,&reply)) == REDIS_OK) { - /* Abort when there are no more replies */ - if (reply == NULL) break; - - /* Shift callback and execute it */ - cb = __redisShiftCallback(&ac->replies); - assert(cb != NULL); - if (cb->fn != NULL) { - cb->fn(ac,reply,cb->privdata); - } else { - c->fn->freeObject(reply); - } - } - - // needs error handling - assert(status == REDIS_OK); } } @@ -109,8 +158,7 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { int done = 0; if (redisBufferWrite(c,&done) == REDIS_ERR) { - // needs error handling - assert(NULL); + __redisAsyncDisconnect(ac); } else { /* Continue writing when not done, stop writing otherwise */ if (!done) { @@ -132,6 +180,9 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) { redisContext *c = &(ac->c); redisCallback *cb; + + /* Don't accept new commands when the connection is lazily closed. */ + if (c->flags & REDIS_DISCONNECTING) return REDIS_ERR; c->obuf = sdscatlen(c->obuf,cmd,len); /* Store callback */ |