summaryrefslogtreecommitdiff
path: root/adapters
diff options
context:
space:
mode:
Diffstat (limited to 'adapters')
-rw-r--r--adapters/libevent.h79
1 files changed, 57 insertions, 22 deletions
diff --git a/adapters/libevent.h b/adapters/libevent.h
index 7d2bef1..9843696 100644
--- a/adapters/libevent.h
+++ b/adapters/libevent.h
@@ -36,48 +36,81 @@
typedef struct redisLibeventEvents {
redisAsyncContext *context;
- struct event *rev, *wev;
+ struct event *ev, *tmr;
+ struct event_base *base;
+ struct timeval tv;
+ short flags;
} redisLibeventEvents;
-static void redisLibeventReadEvent(int fd, short event, void *arg) {
- ((void)fd); ((void)event);
+static void redisLibeventHandler(int fd, short event, void *arg) {
+ ((void)fd);
redisLibeventEvents *e = (redisLibeventEvents*)arg;
- redisAsyncHandleRead(e->context);
+ if (event & EV_TIMEOUT) {
+ redisAsyncHandleTimeout(e->context);
+ }
+ if (e->context && (event & EV_READ)) {
+ redisAsyncHandleRead(e->context);
+ }
+ if (e->context && (event & EV_WRITE)) {
+ redisAsyncHandleWrite(e->context);
+ }
}
-static void redisLibeventWriteEvent(int fd, short event, void *arg) {
- ((void)fd); ((void)event);
- redisLibeventEvents *e = (redisLibeventEvents*)arg;
- redisAsyncHandleWrite(e->context);
+static void redisLibeventUpdate(void *privdata, short flag, int isRemove) {
+ redisLibeventEvents *e = (redisLibeventEvents *)privdata;
+ const struct timeval *tv = e->tv.tv_sec || e->tv.tv_usec ? &e->tv : NULL;
+
+ if (isRemove) {
+ if ((e->flags & flag) == 0) {
+ return;
+ } else {
+ e->flags &= ~flag;
+ }
+ } else {
+ if (e->flags & flag) {
+ return;
+ } else {
+ e->flags |= flag;
+ }
+ }
+
+ event_del(e->ev);
+ event_assign(e->ev, e->base, e->context->c.fd, e->flags,
+ redisLibeventHandler, privdata);
+ event_add(e->ev, tv);
}
static void redisLibeventAddRead(void *privdata) {
- redisLibeventEvents *e = (redisLibeventEvents*)privdata;
- event_add(e->rev,NULL);
+ redisLibeventUpdate(privdata, EV_READ, 0);
}
static void redisLibeventDelRead(void *privdata) {
- redisLibeventEvents *e = (redisLibeventEvents*)privdata;
- event_del(e->rev);
+ redisLibeventUpdate(privdata, EV_READ, 1);
}
static void redisLibeventAddWrite(void *privdata) {
- redisLibeventEvents *e = (redisLibeventEvents*)privdata;
- event_add(e->wev,NULL);
+ redisLibeventUpdate(privdata, EV_WRITE, 0);
}
static void redisLibeventDelWrite(void *privdata) {
- redisLibeventEvents *e = (redisLibeventEvents*)privdata;
- event_del(e->wev);
+ redisLibeventUpdate(privdata, EV_WRITE, 1);
}
static void redisLibeventCleanup(void *privdata) {
redisLibeventEvents *e = (redisLibeventEvents*)privdata;
- event_free(e->rev);
- event_free(e->wev);
+ event_free(e->ev);
free(e);
}
+static void redisLibeventSetTimeout(void *privdata, struct timeval tv) {
+ redisLibeventEvents *e = (redisLibeventEvents *)privdata;
+ short flags = e->flags;
+ e->flags = 0;
+ e->tv = tv;
+ event_del(e->ev);
+ redisLibeventUpdate(e, flags, 0);
+}
+
static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
redisContext *c = &(ac->c);
redisLibeventEvents *e;
@@ -96,13 +129,15 @@ static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
ac->ev.addWrite = redisLibeventAddWrite;
ac->ev.delWrite = redisLibeventDelWrite;
ac->ev.cleanup = redisLibeventCleanup;
+ ac->ev.scheduleTimer = redisLibeventSetTimeout;
ac->ev.data = e;
/* Initialize and install read/write events */
- e->rev = event_new(base, c->fd, EV_READ, redisLibeventReadEvent, e);
- e->wev = event_new(base, c->fd, EV_WRITE, redisLibeventWriteEvent, e);
- event_add(e->rev, NULL);
- event_add(e->wev, NULL);
+ e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e);
+ e->flags = 0;
+ e->base = base;
+ e->tv.tv_sec = 0;
+ e->tv.tv_usec = 0;
return REDIS_OK;
}
#endif