From 35a0a1f369cdeb8d7d93fda8278099d7aa60aee6 Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Thu, 8 Mar 2018 11:21:07 +0200 Subject: read/write timeouts --- adapters/libevent.h | 79 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 57 insertions(+), 22 deletions(-) (limited to 'adapters/libevent.h') diff --git a/adapters/libevent.h b/adapters/libevent.h index 7d2bef1..9843696 100644 --- a/adapters/libevent.h +++ b/adapters/libevent.h @@ -36,48 +36,81 @@ typedef struct redisLibeventEvents { redisAsyncContext *context; - struct event *rev, *wev; + struct event *ev, *tmr; + struct event_base *base; + struct timeval tv; + short flags; } redisLibeventEvents; -static void redisLibeventReadEvent(int fd, short event, void *arg) { - ((void)fd); ((void)event); +static void redisLibeventHandler(int fd, short event, void *arg) { + ((void)fd); redisLibeventEvents *e = (redisLibeventEvents*)arg; - redisAsyncHandleRead(e->context); + if (event & EV_TIMEOUT) { + redisAsyncHandleTimeout(e->context); + } + if (e->context && (event & EV_READ)) { + redisAsyncHandleRead(e->context); + } + if (e->context && (event & EV_WRITE)) { + redisAsyncHandleWrite(e->context); + } } -static void redisLibeventWriteEvent(int fd, short event, void *arg) { - ((void)fd); ((void)event); - redisLibeventEvents *e = (redisLibeventEvents*)arg; - redisAsyncHandleWrite(e->context); +static void redisLibeventUpdate(void *privdata, short flag, int isRemove) { + redisLibeventEvents *e = (redisLibeventEvents *)privdata; + const struct timeval *tv = e->tv.tv_sec || e->tv.tv_usec ? &e->tv : NULL; + + if (isRemove) { + if ((e->flags & flag) == 0) { + return; + } else { + e->flags &= ~flag; + } + } else { + if (e->flags & flag) { + return; + } else { + e->flags |= flag; + } + } + + event_del(e->ev); + event_assign(e->ev, e->base, e->context->c.fd, e->flags, + redisLibeventHandler, privdata); + event_add(e->ev, tv); } static void redisLibeventAddRead(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_add(e->rev,NULL); + redisLibeventUpdate(privdata, EV_READ, 0); } static void redisLibeventDelRead(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_del(e->rev); + redisLibeventUpdate(privdata, EV_READ, 1); } static void redisLibeventAddWrite(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_add(e->wev,NULL); + redisLibeventUpdate(privdata, EV_WRITE, 0); } static void redisLibeventDelWrite(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_del(e->wev); + redisLibeventUpdate(privdata, EV_WRITE, 1); } static void redisLibeventCleanup(void *privdata) { redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_free(e->rev); - event_free(e->wev); + event_free(e->ev); free(e); } +static void redisLibeventSetTimeout(void *privdata, struct timeval tv) { + redisLibeventEvents *e = (redisLibeventEvents *)privdata; + short flags = e->flags; + e->flags = 0; + e->tv = tv; + event_del(e->ev); + redisLibeventUpdate(e, flags, 0); +} + static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { redisContext *c = &(ac->c); redisLibeventEvents *e; @@ -96,13 +129,15 @@ static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { ac->ev.addWrite = redisLibeventAddWrite; ac->ev.delWrite = redisLibeventDelWrite; ac->ev.cleanup = redisLibeventCleanup; + ac->ev.scheduleTimer = redisLibeventSetTimeout; ac->ev.data = e; /* Initialize and install read/write events */ - e->rev = event_new(base, c->fd, EV_READ, redisLibeventReadEvent, e); - e->wev = event_new(base, c->fd, EV_WRITE, redisLibeventWriteEvent, e); - event_add(e->rev, NULL); - event_add(e->wev, NULL); + e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e); + e->flags = 0; + e->base = base; + e->tv.tv_sec = 0; + e->tv.tv_usec = 0; return REDIS_OK; } #endif -- cgit v1.2.3