diff options
-rw-r--r-- | async.c | 83 | ||||
-rw-r--r-- | async.h | 2 | ||||
-rw-r--r-- | example-ae.c | 13 | ||||
-rw-r--r-- | example-libev.c | 12 | ||||
-rw-r--r-- | example-libevent.c | 12 | ||||
-rw-r--r-- | net.c | 37 | ||||
-rw-r--r-- | net.h | 1 |
7 files changed, 113 insertions, 47 deletions
@@ -34,10 +34,28 @@ #include <strings.h> #include <assert.h> #include <ctype.h> +#include <errno.h> #include "async.h" +#include "net.h" #include "dict.c" #include "sds.h" +#define _EL_ADD_READ(ctx) do { \ + if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \ + } while(0) +#define _EL_DEL_READ(ctx) do { \ + if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \ + } while(0) +#define _EL_ADD_WRITE(ctx) do { \ + if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \ + } while(0) +#define _EL_DEL_WRITE(ctx) do { \ + if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \ + } while(0) +#define _EL_CLEANUP(ctx) do { \ + if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \ + } while(0); + /* Forward declaration of function in hiredis.c */ void __redisAppendCommand(redisContext *c, char *cmd, size_t len); @@ -143,7 +161,7 @@ int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn /* The common way to detect an established connection is to wait for * the first write event to be fired. This assumes the related event * library functions are already set. */ - if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data); + _EL_ADD_WRITE(ac); return REDIS_OK; } return REDIS_ERR; @@ -231,7 +249,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) { dictRelease(ac->sub.patterns); /* Signal event lib to clean up */ - if (ac->ev.cleanup) ac->ev.cleanup(ac->ev.data); + _EL_CLEANUP(ac); /* Execute disconnect callback. When redisAsyncFree() initiated destroying * this context, the status will always be REDIS_OK. */ @@ -403,17 +421,48 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); } +/* Internal helper function to detect socket status the first time a read or + * write event fires. When connecting was not succesful, the connect callback + * is called with a REDIS_ERR status and the context is free'd. */ +static int __redisAsyncHandleConnect(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + + if (redisCheckSocketError(c,c->fd) == REDIS_ERR) { + /* Try again later when connect(2) is still in progress. */ + if (errno == EINPROGRESS) + return REDIS_OK; + + if (ac->onConnect) ac->onConnect(ac,REDIS_ERR); + __redisAsyncDisconnect(ac); + return REDIS_ERR; + } + + /* Mark context as connected. */ + c->flags |= REDIS_CONNECTED; + if (ac->onConnect) ac->onConnect(ac,REDIS_OK); + return REDIS_OK; +} + /* This function should be called when the socket is readable. * It processes all replies that can be read and executes their callbacks. */ void redisAsyncHandleRead(redisAsyncContext *ac) { redisContext *c = &(ac->c); + if (!(c->flags & REDIS_CONNECTED)) { + /* Abort connect was not successful. */ + if (__redisAsyncHandleConnect(ac) != REDIS_OK) + return; + /* Try again later when the context is still not connected. */ + if (!(c->flags & REDIS_CONNECTED)) + return; + } + if (redisBufferRead(c) == REDIS_ERR) { __redisAsyncDisconnect(ac); } else { /* Always re-schedule reads */ - if (ac->ev.addRead) ac->ev.addRead(ac->ev.data); + _EL_ADD_READ(ac); redisProcessCallbacks(ac); } } @@ -422,24 +471,26 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { redisContext *c = &(ac->c); int done = 0; + if (!(c->flags & REDIS_CONNECTED)) { + /* Abort connect was not successful. */ + if (__redisAsyncHandleConnect(ac) != REDIS_OK) + return; + /* Try again later when the context is still not connected. */ + if (!(c->flags & REDIS_CONNECTED)) + return; + } + if (redisBufferWrite(c,&done) == REDIS_ERR) { __redisAsyncDisconnect(ac); } else { /* Continue writing when not done, stop writing otherwise */ - if (!done) { - if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data); - } else { - if (ac->ev.delWrite) ac->ev.delWrite(ac->ev.data); - } + if (!done) + _EL_ADD_WRITE(ac); + else + _EL_DEL_WRITE(ac); /* Always schedule reads after writes */ - if (ac->ev.addRead) ac->ev.addRead(ac->ev.data); - - /* Fire onConnect when this is the first write event. */ - if (!(c->flags & REDIS_CONNECTED)) { - c->flags |= REDIS_CONNECTED; - if (ac->onConnect) ac->onConnect(ac); - } + _EL_ADD_READ(ac); } } @@ -517,7 +568,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void __redisAppendCommand(c,cmd,len); /* Always schedule a write when the write buffer is non-empty */ - if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data); + _EL_ADD_WRITE(ac); return REDIS_OK; } @@ -55,7 +55,7 @@ typedef struct redisCallbackList { /* Connection callback prototypes */ typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status); -typedef void (redisConnectCallback)(const struct redisAsyncContext*); +typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status); /* Context for an async connection to Redis */ typedef struct redisAsyncContext { diff --git a/example-ae.c b/example-ae.c index 28c34dc..5ed34a3 100644 --- a/example-ae.c +++ b/example-ae.c @@ -18,17 +18,20 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisAsyncDisconnect(c); } -void connectCallback(const redisAsyncContext *c) { - ((void)c); - printf("connected...\n"); +void connectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + printf("Connected...\n"); } void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); + return; } - printf("disconnected...\n"); - aeStop(loop); + printf("Disconnected...\n"); } int main (int argc, char **argv) { diff --git a/example-libev.c b/example-libev.c index 8efa1e3..7894f1f 100644 --- a/example-libev.c +++ b/example-libev.c @@ -15,16 +15,20 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisAsyncDisconnect(c); } -void connectCallback(const redisAsyncContext *c) { - ((void)c); - printf("connected...\n"); +void connectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + printf("Connected...\n"); } void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); + return; } - printf("disconnected...\n"); + printf("Disconnected...\n"); } int main (int argc, char **argv) { diff --git a/example-libevent.c b/example-libevent.c index f6f8c83..9da8e02 100644 --- a/example-libevent.c +++ b/example-libevent.c @@ -15,16 +15,20 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisAsyncDisconnect(c); } -void connectCallback(const redisAsyncContext *c) { - ((void)c); - printf("connected...\n"); +void connectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + printf("Connected...\n"); } void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); + return; } - printf("disconnected...\n"); + printf("Disconnected...\n"); } int main (int argc, char **argv) { @@ -117,8 +117,6 @@ static int redisContextWaitReady(redisContext *c, int fd, const struct timeval * struct timeval to; struct timeval *toptr = NULL; fd_set wfd; - int err; - socklen_t errlen; /* Only use timeout when not NULL. */ if (timeout != NULL) { @@ -143,21 +141,6 @@ static int redisContextWaitReady(redisContext *c, int fd, const struct timeval * return REDIS_ERR; } - err = 0; - errlen = sizeof(err); - if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) { - __redisSetErrorFromErrno(c,REDIS_ERR_IO,"getsockopt(SO_ERROR)"); - close(fd); - return REDIS_ERR; - } - - if (err) { - errno = err; - __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL); - close(fd); - return REDIS_ERR; - } - return REDIS_OK; } @@ -166,6 +149,26 @@ static int redisContextWaitReady(redisContext *c, int fd, const struct timeval * return REDIS_ERR; } +int redisCheckSocketError(redisContext *c, int fd) { + int err = 0; + socklen_t errlen = sizeof(err); + + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) { + __redisSetErrorFromErrno(c,REDIS_ERR_IO,"getsockopt(SO_ERROR)"); + close(fd); + return REDIS_ERR; + } + + if (err) { + errno = err; + __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL); + close(fd); + return REDIS_ERR; + } + + return REDIS_OK; +} + int redisContextSetTimeout(redisContext *c, struct timeval tv) { if (setsockopt(c->fd,SOL_SOCKET,SO_RCVTIMEO,&tv,sizeof(tv)) == -1) { __redisSetErrorFromErrno(c,REDIS_ERR_IO,"setsockopt(SO_RCVTIMEO)"); @@ -39,6 +39,7 @@ #define AF_LOCAL AF_UNIX #endif +int redisCheckSocketError(redisContext *c, int fd); int redisContextSetTimeout(redisContext *c, struct timeval tv); int redisContextConnectTcp(redisContext *c, const char *addr, int port, struct timeval *timeout); int redisContextConnectUnix(redisContext *c, const char *path, struct timeval *timeout); |