diff options
-rw-r--r-- | async.c | 48 | ||||
-rw-r--r-- | async.h | 2 |
2 files changed, 43 insertions, 7 deletions
@@ -34,7 +34,9 @@ #include <strings.h> #include <assert.h> #include <ctype.h> +#include <errno.h> #include "async.h" +#include "net.h" #include "dict.c" #include "sds.h" @@ -419,12 +421,43 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); } +/* Internal helper function to detect socket status the first time a read or + * write event fires. When connecting was not succesful, the connect callback + * is called with a REDIS_ERR status and the context is free'd. */ +static int __redisAsyncHandleConnect(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + + if (redisCheckSocketError(c,c->fd) == REDIS_ERR) { + /* Try again later when connect(2) is still in progress. */ + if (errno == EINPROGRESS) + return REDIS_OK; + + if (ac->onConnect) ac->onConnect(ac,REDIS_ERR); + __redisAsyncDisconnect(ac); + return REDIS_ERR; + } + + /* Mark context as connected. */ + c->flags |= REDIS_CONNECTED; + if (ac->onConnect) ac->onConnect(ac,REDIS_OK); + return REDIS_OK; +} + /* This function should be called when the socket is readable. * It processes all replies that can be read and executes their callbacks. */ void redisAsyncHandleRead(redisAsyncContext *ac) { redisContext *c = &(ac->c); + if (!(c->flags & REDIS_CONNECTED)) { + /* Abort connect was not successful. */ + if (__redisAsyncHandleConnect(ac) != REDIS_OK) + return; + /* Try again later when the context is still not connected. */ + if (!(c->flags & REDIS_CONNECTED)) + return; + } + if (redisBufferRead(c) == REDIS_ERR) { __redisAsyncDisconnect(ac); } else { @@ -438,6 +471,15 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { redisContext *c = &(ac->c); int done = 0; + if (!(c->flags & REDIS_CONNECTED)) { + /* Abort connect was not successful. */ + if (__redisAsyncHandleConnect(ac) != REDIS_OK) + return; + /* Try again later when the context is still not connected. */ + if (!(c->flags & REDIS_CONNECTED)) + return; + } + if (redisBufferWrite(c,&done) == REDIS_ERR) { __redisAsyncDisconnect(ac); } else { @@ -449,12 +491,6 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { /* Always schedule reads after writes */ _EL_ADD_READ(ac); - - /* Fire onConnect when this is the first write event. */ - if (!(c->flags & REDIS_CONNECTED)) { - c->flags |= REDIS_CONNECTED; - if (ac->onConnect) ac->onConnect(ac); - } } } @@ -55,7 +55,7 @@ typedef struct redisCallbackList { /* Connection callback prototypes */ typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status); -typedef void (redisConnectCallback)(const struct redisAsyncContext*); +typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status); /* Context for an async connection to Redis */ typedef struct redisAsyncContext { |