summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--async.c83
-rw-r--r--async.h2
-rw-r--r--example-ae.c13
-rw-r--r--example-libev.c12
-rw-r--r--example-libevent.c12
-rw-r--r--net.c37
-rw-r--r--net.h1
7 files changed, 113 insertions, 47 deletions
diff --git a/async.c b/async.c
index cbbfb6d..b293ecd 100644
--- a/async.c
+++ b/async.c
@@ -34,10 +34,28 @@
#include <strings.h>
#include <assert.h>
#include <ctype.h>
+#include <errno.h>
#include "async.h"
+#include "net.h"
#include "dict.c"
#include "sds.h"
+#define _EL_ADD_READ(ctx) do { \
+ if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
+ } while(0)
+#define _EL_DEL_READ(ctx) do { \
+ if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
+ } while(0)
+#define _EL_ADD_WRITE(ctx) do { \
+ if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
+ } while(0)
+#define _EL_DEL_WRITE(ctx) do { \
+ if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
+ } while(0)
+#define _EL_CLEANUP(ctx) do { \
+ if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \
+ } while(0);
+
/* Forward declaration of function in hiredis.c */
void __redisAppendCommand(redisContext *c, char *cmd, size_t len);
@@ -143,7 +161,7 @@ int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn
/* The common way to detect an established connection is to wait for
* the first write event to be fired. This assumes the related event
* library functions are already set. */
- if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data);
+ _EL_ADD_WRITE(ac);
return REDIS_OK;
}
return REDIS_ERR;
@@ -231,7 +249,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
dictRelease(ac->sub.patterns);
/* Signal event lib to clean up */
- if (ac->ev.cleanup) ac->ev.cleanup(ac->ev.data);
+ _EL_CLEANUP(ac);
/* Execute disconnect callback. When redisAsyncFree() initiated destroying
* this context, the status will always be REDIS_OK. */
@@ -403,17 +421,48 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
__redisAsyncDisconnect(ac);
}
+/* Internal helper function to detect socket status the first time a read or
+ * write event fires. When connecting was not succesful, the connect callback
+ * is called with a REDIS_ERR status and the context is free'd. */
+static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
+ redisContext *c = &(ac->c);
+
+ if (redisCheckSocketError(c,c->fd) == REDIS_ERR) {
+ /* Try again later when connect(2) is still in progress. */
+ if (errno == EINPROGRESS)
+ return REDIS_OK;
+
+ if (ac->onConnect) ac->onConnect(ac,REDIS_ERR);
+ __redisAsyncDisconnect(ac);
+ return REDIS_ERR;
+ }
+
+ /* Mark context as connected. */
+ c->flags |= REDIS_CONNECTED;
+ if (ac->onConnect) ac->onConnect(ac,REDIS_OK);
+ return REDIS_OK;
+}
+
/* This function should be called when the socket is readable.
* It processes all replies that can be read and executes their callbacks.
*/
void redisAsyncHandleRead(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
+ if (!(c->flags & REDIS_CONNECTED)) {
+ /* Abort connect was not successful. */
+ if (__redisAsyncHandleConnect(ac) != REDIS_OK)
+ return;
+ /* Try again later when the context is still not connected. */
+ if (!(c->flags & REDIS_CONNECTED))
+ return;
+ }
+
if (redisBufferRead(c) == REDIS_ERR) {
__redisAsyncDisconnect(ac);
} else {
/* Always re-schedule reads */
- if (ac->ev.addRead) ac->ev.addRead(ac->ev.data);
+ _EL_ADD_READ(ac);
redisProcessCallbacks(ac);
}
}
@@ -422,24 +471,26 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
int done = 0;
+ if (!(c->flags & REDIS_CONNECTED)) {
+ /* Abort connect was not successful. */
+ if (__redisAsyncHandleConnect(ac) != REDIS_OK)
+ return;
+ /* Try again later when the context is still not connected. */
+ if (!(c->flags & REDIS_CONNECTED))
+ return;
+ }
+
if (redisBufferWrite(c,&done) == REDIS_ERR) {
__redisAsyncDisconnect(ac);
} else {
/* Continue writing when not done, stop writing otherwise */
- if (!done) {
- if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data);
- } else {
- if (ac->ev.delWrite) ac->ev.delWrite(ac->ev.data);
- }
+ if (!done)
+ _EL_ADD_WRITE(ac);
+ else
+ _EL_DEL_WRITE(ac);
/* Always schedule reads after writes */
- if (ac->ev.addRead) ac->ev.addRead(ac->ev.data);
-
- /* Fire onConnect when this is the first write event. */
- if (!(c->flags & REDIS_CONNECTED)) {
- c->flags |= REDIS_CONNECTED;
- if (ac->onConnect) ac->onConnect(ac);
- }
+ _EL_ADD_READ(ac);
}
}
@@ -517,7 +568,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
__redisAppendCommand(c,cmd,len);
/* Always schedule a write when the write buffer is non-empty */
- if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data);
+ _EL_ADD_WRITE(ac);
return REDIS_OK;
}
diff --git a/async.h b/async.h
index bb5c87d..268274e 100644
--- a/async.h
+++ b/async.h
@@ -55,7 +55,7 @@ typedef struct redisCallbackList {
/* Connection callback prototypes */
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
-typedef void (redisConnectCallback)(const struct redisAsyncContext*);
+typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status);
/* Context for an async connection to Redis */
typedef struct redisAsyncContext {
diff --git a/example-ae.c b/example-ae.c
index 28c34dc..5ed34a3 100644
--- a/example-ae.c
+++ b/example-ae.c
@@ -18,17 +18,20 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisAsyncDisconnect(c);
}
-void connectCallback(const redisAsyncContext *c) {
- ((void)c);
- printf("connected...\n");
+void connectCallback(const redisAsyncContext *c, int status) {
+ if (status != REDIS_OK) {
+ printf("Error: %s\n", c->errstr);
+ return;
+ }
+ printf("Connected...\n");
}
void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
+ return;
}
- printf("disconnected...\n");
- aeStop(loop);
+ printf("Disconnected...\n");
}
int main (int argc, char **argv) {
diff --git a/example-libev.c b/example-libev.c
index 8efa1e3..7894f1f 100644
--- a/example-libev.c
+++ b/example-libev.c
@@ -15,16 +15,20 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisAsyncDisconnect(c);
}
-void connectCallback(const redisAsyncContext *c) {
- ((void)c);
- printf("connected...\n");
+void connectCallback(const redisAsyncContext *c, int status) {
+ if (status != REDIS_OK) {
+ printf("Error: %s\n", c->errstr);
+ return;
+ }
+ printf("Connected...\n");
}
void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
+ return;
}
- printf("disconnected...\n");
+ printf("Disconnected...\n");
}
int main (int argc, char **argv) {
diff --git a/example-libevent.c b/example-libevent.c
index f6f8c83..9da8e02 100644
--- a/example-libevent.c
+++ b/example-libevent.c
@@ -15,16 +15,20 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) {
redisAsyncDisconnect(c);
}
-void connectCallback(const redisAsyncContext *c) {
- ((void)c);
- printf("connected...\n");
+void connectCallback(const redisAsyncContext *c, int status) {
+ if (status != REDIS_OK) {
+ printf("Error: %s\n", c->errstr);
+ return;
+ }
+ printf("Connected...\n");
}
void disconnectCallback(const redisAsyncContext *c, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", c->errstr);
+ return;
}
- printf("disconnected...\n");
+ printf("Disconnected...\n");
}
int main (int argc, char **argv) {
diff --git a/net.c b/net.c
index 98eee5d..35b6ad2 100644
--- a/net.c
+++ b/net.c
@@ -117,8 +117,6 @@ static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *
struct timeval to;
struct timeval *toptr = NULL;
fd_set wfd;
- int err;
- socklen_t errlen;
/* Only use timeout when not NULL. */
if (timeout != NULL) {
@@ -143,21 +141,6 @@ static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *
return REDIS_ERR;
}
- err = 0;
- errlen = sizeof(err);
- if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) {
- __redisSetErrorFromErrno(c,REDIS_ERR_IO,"getsockopt(SO_ERROR)");
- close(fd);
- return REDIS_ERR;
- }
-
- if (err) {
- errno = err;
- __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
- close(fd);
- return REDIS_ERR;
- }
-
return REDIS_OK;
}
@@ -166,6 +149,26 @@ static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *
return REDIS_ERR;
}
+int redisCheckSocketError(redisContext *c, int fd) {
+ int err = 0;
+ socklen_t errlen = sizeof(err);
+
+ if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) {
+ __redisSetErrorFromErrno(c,REDIS_ERR_IO,"getsockopt(SO_ERROR)");
+ close(fd);
+ return REDIS_ERR;
+ }
+
+ if (err) {
+ errno = err;
+ __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL);
+ close(fd);
+ return REDIS_ERR;
+ }
+
+ return REDIS_OK;
+}
+
int redisContextSetTimeout(redisContext *c, struct timeval tv) {
if (setsockopt(c->fd,SOL_SOCKET,SO_RCVTIMEO,&tv,sizeof(tv)) == -1) {
__redisSetErrorFromErrno(c,REDIS_ERR_IO,"setsockopt(SO_RCVTIMEO)");
diff --git a/net.h b/net.h
index f9d3755..eb8a0a1 100644
--- a/net.h
+++ b/net.h
@@ -39,6 +39,7 @@
#define AF_LOCAL AF_UNIX
#endif
+int redisCheckSocketError(redisContext *c, int fd);
int redisContextSetTimeout(redisContext *c, struct timeval tv);
int redisContextConnectTcp(redisContext *c, const char *addr, int port, struct timeval *timeout);
int redisContextConnectUnix(redisContext *c, const char *path, struct timeval *timeout);