diff options
-rw-r--r-- | adapters/libevent.h | 79 | ||||
-rw-r--r-- | async.c | 62 | ||||
-rw-r--r-- | async.h | 5 | ||||
-rw-r--r-- | read.h | 1 |
4 files changed, 121 insertions, 26 deletions
diff --git a/adapters/libevent.h b/adapters/libevent.h index 7d2bef1..9843696 100644 --- a/adapters/libevent.h +++ b/adapters/libevent.h @@ -36,48 +36,81 @@ typedef struct redisLibeventEvents { redisAsyncContext *context; - struct event *rev, *wev; + struct event *ev, *tmr; + struct event_base *base; + struct timeval tv; + short flags; } redisLibeventEvents; -static void redisLibeventReadEvent(int fd, short event, void *arg) { - ((void)fd); ((void)event); +static void redisLibeventHandler(int fd, short event, void *arg) { + ((void)fd); redisLibeventEvents *e = (redisLibeventEvents*)arg; - redisAsyncHandleRead(e->context); + if (event & EV_TIMEOUT) { + redisAsyncHandleTimeout(e->context); + } + if (e->context && (event & EV_READ)) { + redisAsyncHandleRead(e->context); + } + if (e->context && (event & EV_WRITE)) { + redisAsyncHandleWrite(e->context); + } } -static void redisLibeventWriteEvent(int fd, short event, void *arg) { - ((void)fd); ((void)event); - redisLibeventEvents *e = (redisLibeventEvents*)arg; - redisAsyncHandleWrite(e->context); +static void redisLibeventUpdate(void *privdata, short flag, int isRemove) { + redisLibeventEvents *e = (redisLibeventEvents *)privdata; + const struct timeval *tv = e->tv.tv_sec || e->tv.tv_usec ? &e->tv : NULL; + + if (isRemove) { + if ((e->flags & flag) == 0) { + return; + } else { + e->flags &= ~flag; + } + } else { + if (e->flags & flag) { + return; + } else { + e->flags |= flag; + } + } + + event_del(e->ev); + event_assign(e->ev, e->base, e->context->c.fd, e->flags, + redisLibeventHandler, privdata); + event_add(e->ev, tv); } static void redisLibeventAddRead(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_add(e->rev,NULL); + redisLibeventUpdate(privdata, EV_READ, 0); } static void redisLibeventDelRead(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_del(e->rev); + redisLibeventUpdate(privdata, EV_READ, 1); } static void redisLibeventAddWrite(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_add(e->wev,NULL); + redisLibeventUpdate(privdata, EV_WRITE, 0); } static void redisLibeventDelWrite(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_del(e->wev); + redisLibeventUpdate(privdata, EV_WRITE, 1); } static void redisLibeventCleanup(void *privdata) { redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_free(e->rev); - event_free(e->wev); + event_free(e->ev); free(e); } +static void redisLibeventSetTimeout(void *privdata, struct timeval tv) { + redisLibeventEvents *e = (redisLibeventEvents *)privdata; + short flags = e->flags; + e->flags = 0; + e->tv = tv; + event_del(e->ev); + redisLibeventUpdate(e, flags, 0); +} + static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { redisContext *c = &(ac->c); redisLibeventEvents *e; @@ -96,13 +129,15 @@ static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { ac->ev.addWrite = redisLibeventAddWrite; ac->ev.delWrite = redisLibeventDelWrite; ac->ev.cleanup = redisLibeventCleanup; + ac->ev.scheduleTimer = redisLibeventSetTimeout; ac->ev.data = e; /* Initialize and install read/write events */ - e->rev = event_new(base, c->fd, EV_READ, redisLibeventReadEvent, e); - e->wev = event_new(base, c->fd, EV_WRITE, redisLibeventWriteEvent, e); - event_add(e->rev, NULL); - event_add(e->wev, NULL); + e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e); + e->flags = 0; + e->base = base; + e->tv.tv_sec = 0; + e->tv.tv_usec = 0; return REDIS_OK; } #endif @@ -42,15 +42,19 @@ #include "sds.h" #include "sslio.h" -#define _EL_ADD_READ(ctx) do { \ +#define _EL_ADD_READ(ctx) \ + do { \ + refreshTimeout(ctx); \ if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \ - } while(0) + } while (0) #define _EL_DEL_READ(ctx) do { \ if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \ } while(0) -#define _EL_ADD_WRITE(ctx) do { \ +#define _EL_ADD_WRITE(ctx) \ + do { \ + refreshTimeout(ctx); \ if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \ - } while(0) + } while (0) #define _EL_DEL_WRITE(ctx) do { \ if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \ } while(0) @@ -58,6 +62,19 @@ if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \ } while(0); +static void refreshTimeout(redisAsyncContext *ctx) { + if (ctx->c.timeout && ctx->ev.scheduleTimer && + (ctx->c.timeout->tv_sec || ctx->c.timeout->tv_usec)) { + ctx->ev.scheduleTimer(ctx->ev.data, *ctx->c.timeout); + // } else { + // printf("Not scheduling timer.. (tmo=%p)\n", ctx->c.timeout); + // if (ctx->c.timeout){ + // printf("tv_sec: %u. tv_usec: %u\n", ctx->c.timeout->tv_sec, + // ctx->c.timeout->tv_usec); + // } + } +} + /* Forward declaration of function in hiredis.c */ int __redisAppendCommand(redisContext *c, const char *cmd, size_t len); @@ -656,6 +673,30 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { } } +void __redisSetError(redisContext *c, int type, const char *str); + +void redisAsyncHandleTimeout(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + redisCallback cb; + + if ((c->flags & REDIS_CONNECTED) && ac->replies.head == NULL) { + /* Nothing to do - just an idle timeout */ + return; + } + + if (!c->err) { + __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout"); + } + + if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) { + ac->onConnect(ac, REDIS_ERR); + } + + while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) { + __redisRunCallback(ac, &cb, NULL); + } +} + /* Sets a pointer to the first argument and its length starting at p. Returns * the number of bytes to skip to get to the following argument. */ static const char *nextArgument(const char *start, const char **str, size_t *len) { @@ -795,3 +836,16 @@ int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void int status = __redisAsyncCommand(ac,fn,privdata,cmd,len); return status; } + +void redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) { + if (!ac->c.timeout) { + ac->c.timeout = calloc(1, sizeof(tv)); + } + + if (tv.tv_sec == ac->c.timeout->tv_sec && + tv.tv_usec == ac->c.timeout->tv_usec) { + return; + } + + *ac->c.timeout = tv; +}
\ No newline at end of file @@ -57,6 +57,7 @@ typedef struct redisCallbackList { /* Connection callback prototypes */ typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status); typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status); +typedef void(redisTimerCallback)(void *timer, void *privdata); /* Context for an async connection to Redis */ typedef struct redisAsyncContext { @@ -81,6 +82,7 @@ typedef struct redisAsyncContext { void (*addWrite)(void *privdata); void (*delWrite)(void *privdata); void (*cleanup)(void *privdata); + void (*scheduleTimer)(void *privdata, struct timeval tv); } ev; /* Called when either the connection is terminated due to an error or per @@ -113,12 +115,15 @@ redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, redisAsyncContext *redisAsyncConnectUnix(const char *path); int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn); int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); + +void redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv); void redisAsyncDisconnect(redisAsyncContext *ac); void redisAsyncFree(redisAsyncContext *ac); /* Handle read/write events */ void redisAsyncHandleRead(redisAsyncContext *ac); void redisAsyncHandleWrite(redisAsyncContext *ac); +void redisAsyncHandleTimeout(redisAsyncContext *ac); /* Command functions for an async context. Write the command to the * output buffer and register the provided callback. */ @@ -45,6 +45,7 @@ #define REDIS_ERR_EOF 3 /* End of file */ #define REDIS_ERR_PROTOCOL 4 /* Protocol error */ #define REDIS_ERR_OOM 5 /* Out of memory */ +#define REDIS_ERR_TIMEOUT 6 /* Timed out */ #define REDIS_ERR_OTHER 2 /* Everything else... */ #define REDIS_REPLY_STRING 1 |