summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--adapters/libevent.h79
-rw-r--r--async.c62
-rw-r--r--async.h5
-rw-r--r--read.h1
4 files changed, 121 insertions, 26 deletions
diff --git a/adapters/libevent.h b/adapters/libevent.h
index 7d2bef1..9843696 100644
--- a/adapters/libevent.h
+++ b/adapters/libevent.h
@@ -36,48 +36,81 @@
typedef struct redisLibeventEvents {
redisAsyncContext *context;
- struct event *rev, *wev;
+ struct event *ev, *tmr;
+ struct event_base *base;
+ struct timeval tv;
+ short flags;
} redisLibeventEvents;
-static void redisLibeventReadEvent(int fd, short event, void *arg) {
- ((void)fd); ((void)event);
+static void redisLibeventHandler(int fd, short event, void *arg) {
+ ((void)fd);
redisLibeventEvents *e = (redisLibeventEvents*)arg;
- redisAsyncHandleRead(e->context);
+ if (event & EV_TIMEOUT) {
+ redisAsyncHandleTimeout(e->context);
+ }
+ if (e->context && (event & EV_READ)) {
+ redisAsyncHandleRead(e->context);
+ }
+ if (e->context && (event & EV_WRITE)) {
+ redisAsyncHandleWrite(e->context);
+ }
}
-static void redisLibeventWriteEvent(int fd, short event, void *arg) {
- ((void)fd); ((void)event);
- redisLibeventEvents *e = (redisLibeventEvents*)arg;
- redisAsyncHandleWrite(e->context);
+static void redisLibeventUpdate(void *privdata, short flag, int isRemove) {
+ redisLibeventEvents *e = (redisLibeventEvents *)privdata;
+ const struct timeval *tv = e->tv.tv_sec || e->tv.tv_usec ? &e->tv : NULL;
+
+ if (isRemove) {
+ if ((e->flags & flag) == 0) {
+ return;
+ } else {
+ e->flags &= ~flag;
+ }
+ } else {
+ if (e->flags & flag) {
+ return;
+ } else {
+ e->flags |= flag;
+ }
+ }
+
+ event_del(e->ev);
+ event_assign(e->ev, e->base, e->context->c.fd, e->flags,
+ redisLibeventHandler, privdata);
+ event_add(e->ev, tv);
}
static void redisLibeventAddRead(void *privdata) {
- redisLibeventEvents *e = (redisLibeventEvents*)privdata;
- event_add(e->rev,NULL);
+ redisLibeventUpdate(privdata, EV_READ, 0);
}
static void redisLibeventDelRead(void *privdata) {
- redisLibeventEvents *e = (redisLibeventEvents*)privdata;
- event_del(e->rev);
+ redisLibeventUpdate(privdata, EV_READ, 1);
}
static void redisLibeventAddWrite(void *privdata) {
- redisLibeventEvents *e = (redisLibeventEvents*)privdata;
- event_add(e->wev,NULL);
+ redisLibeventUpdate(privdata, EV_WRITE, 0);
}
static void redisLibeventDelWrite(void *privdata) {
- redisLibeventEvents *e = (redisLibeventEvents*)privdata;
- event_del(e->wev);
+ redisLibeventUpdate(privdata, EV_WRITE, 1);
}
static void redisLibeventCleanup(void *privdata) {
redisLibeventEvents *e = (redisLibeventEvents*)privdata;
- event_free(e->rev);
- event_free(e->wev);
+ event_free(e->ev);
free(e);
}
+static void redisLibeventSetTimeout(void *privdata, struct timeval tv) {
+ redisLibeventEvents *e = (redisLibeventEvents *)privdata;
+ short flags = e->flags;
+ e->flags = 0;
+ e->tv = tv;
+ event_del(e->ev);
+ redisLibeventUpdate(e, flags, 0);
+}
+
static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
redisContext *c = &(ac->c);
redisLibeventEvents *e;
@@ -96,13 +129,15 @@ static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
ac->ev.addWrite = redisLibeventAddWrite;
ac->ev.delWrite = redisLibeventDelWrite;
ac->ev.cleanup = redisLibeventCleanup;
+ ac->ev.scheduleTimer = redisLibeventSetTimeout;
ac->ev.data = e;
/* Initialize and install read/write events */
- e->rev = event_new(base, c->fd, EV_READ, redisLibeventReadEvent, e);
- e->wev = event_new(base, c->fd, EV_WRITE, redisLibeventWriteEvent, e);
- event_add(e->rev, NULL);
- event_add(e->wev, NULL);
+ e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e);
+ e->flags = 0;
+ e->base = base;
+ e->tv.tv_sec = 0;
+ e->tv.tv_usec = 0;
return REDIS_OK;
}
#endif
diff --git a/async.c b/async.c
index db59036..b789f93 100644
--- a/async.c
+++ b/async.c
@@ -42,15 +42,19 @@
#include "sds.h"
#include "sslio.h"
-#define _EL_ADD_READ(ctx) do { \
+#define _EL_ADD_READ(ctx) \
+ do { \
+ refreshTimeout(ctx); \
if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
- } while(0)
+ } while (0)
#define _EL_DEL_READ(ctx) do { \
if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
} while(0)
-#define _EL_ADD_WRITE(ctx) do { \
+#define _EL_ADD_WRITE(ctx) \
+ do { \
+ refreshTimeout(ctx); \
if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
- } while(0)
+ } while (0)
#define _EL_DEL_WRITE(ctx) do { \
if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
} while(0)
@@ -58,6 +62,19 @@
if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \
} while(0);
+static void refreshTimeout(redisAsyncContext *ctx) {
+ if (ctx->c.timeout && ctx->ev.scheduleTimer &&
+ (ctx->c.timeout->tv_sec || ctx->c.timeout->tv_usec)) {
+ ctx->ev.scheduleTimer(ctx->ev.data, *ctx->c.timeout);
+ // } else {
+ // printf("Not scheduling timer.. (tmo=%p)\n", ctx->c.timeout);
+ // if (ctx->c.timeout){
+ // printf("tv_sec: %u. tv_usec: %u\n", ctx->c.timeout->tv_sec,
+ // ctx->c.timeout->tv_usec);
+ // }
+ }
+}
+
/* Forward declaration of function in hiredis.c */
int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
@@ -656,6 +673,30 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
}
}
+void __redisSetError(redisContext *c, int type, const char *str);
+
+void redisAsyncHandleTimeout(redisAsyncContext *ac) {
+ redisContext *c = &(ac->c);
+ redisCallback cb;
+
+ if ((c->flags & REDIS_CONNECTED) && ac->replies.head == NULL) {
+ /* Nothing to do - just an idle timeout */
+ return;
+ }
+
+ if (!c->err) {
+ __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout");
+ }
+
+ if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) {
+ ac->onConnect(ac, REDIS_ERR);
+ }
+
+ while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
+ __redisRunCallback(ac, &cb, NULL);
+ }
+}
+
/* Sets a pointer to the first argument and its length starting at p. Returns
* the number of bytes to skip to get to the following argument. */
static const char *nextArgument(const char *start, const char **str, size_t *len) {
@@ -795,3 +836,16 @@ int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
return status;
}
+
+void redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
+ if (!ac->c.timeout) {
+ ac->c.timeout = calloc(1, sizeof(tv));
+ }
+
+ if (tv.tv_sec == ac->c.timeout->tv_sec &&
+ tv.tv_usec == ac->c.timeout->tv_usec) {
+ return;
+ }
+
+ *ac->c.timeout = tv;
+} \ No newline at end of file
diff --git a/async.h b/async.h
index 740555c..3d51a35 100644
--- a/async.h
+++ b/async.h
@@ -57,6 +57,7 @@ typedef struct redisCallbackList {
/* Connection callback prototypes */
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status);
+typedef void(redisTimerCallback)(void *timer, void *privdata);
/* Context for an async connection to Redis */
typedef struct redisAsyncContext {
@@ -81,6 +82,7 @@ typedef struct redisAsyncContext {
void (*addWrite)(void *privdata);
void (*delWrite)(void *privdata);
void (*cleanup)(void *privdata);
+ void (*scheduleTimer)(void *privdata, struct timeval tv);
} ev;
/* Called when either the connection is terminated due to an error or per
@@ -113,12 +115,15 @@ redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
redisAsyncContext *redisAsyncConnectUnix(const char *path);
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
+
+void redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv);
void redisAsyncDisconnect(redisAsyncContext *ac);
void redisAsyncFree(redisAsyncContext *ac);
/* Handle read/write events */
void redisAsyncHandleRead(redisAsyncContext *ac);
void redisAsyncHandleWrite(redisAsyncContext *ac);
+void redisAsyncHandleTimeout(redisAsyncContext *ac);
/* Command functions for an async context. Write the command to the
* output buffer and register the provided callback. */
diff --git a/read.h b/read.h
index 2988aa4..0894b78 100644
--- a/read.h
+++ b/read.h
@@ -45,6 +45,7 @@
#define REDIS_ERR_EOF 3 /* End of file */
#define REDIS_ERR_PROTOCOL 4 /* Protocol error */
#define REDIS_ERR_OOM 5 /* Out of memory */
+#define REDIS_ERR_TIMEOUT 6 /* Timed out */
#define REDIS_ERR_OTHER 2 /* Everything else... */
#define REDIS_REPLY_STRING 1