diff options
Diffstat (limited to 'adapters')
-rw-r--r-- | adapters/libev.h | 21 | ||||
-rw-r--r-- | adapters/libevent.h | 2 | ||||
-rw-r--r-- | adapters/libuv.h | 162 | ||||
-rw-r--r-- | adapters/poll.h | 197 |
4 files changed, 317 insertions, 65 deletions
diff --git a/adapters/libev.h b/adapters/libev.h index 6191543..c59d3da 100644 --- a/adapters/libev.h +++ b/adapters/libev.h @@ -66,8 +66,9 @@ static void redisLibevWriteEvent(EV_P_ ev_io *watcher, int revents) { static void redisLibevAddRead(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY struct ev_loop *loop = e->loop; - ((void)loop); +#endif if (!e->reading) { e->reading = 1; ev_io_start(EV_A_ &e->rev); @@ -76,8 +77,9 @@ static void redisLibevAddRead(void *privdata) { static void redisLibevDelRead(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY struct ev_loop *loop = e->loop; - ((void)loop); +#endif if (e->reading) { e->reading = 0; ev_io_stop(EV_A_ &e->rev); @@ -86,8 +88,9 @@ static void redisLibevDelRead(void *privdata) { static void redisLibevAddWrite(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY struct ev_loop *loop = e->loop; - ((void)loop); +#endif if (!e->writing) { e->writing = 1; ev_io_start(EV_A_ &e->wev); @@ -96,8 +99,9 @@ static void redisLibevAddWrite(void *privdata) { static void redisLibevDelWrite(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY struct ev_loop *loop = e->loop; - ((void)loop); +#endif if (e->writing) { e->writing = 0; ev_io_stop(EV_A_ &e->wev); @@ -106,8 +110,9 @@ static void redisLibevDelWrite(void *privdata) { static void redisLibevStopTimer(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY struct ev_loop *loop = e->loop; - ((void)loop); +#endif ev_timer_stop(EV_A_ &e->timer); } @@ -120,6 +125,9 @@ static void redisLibevCleanup(void *privdata) { } static void redisLibevTimeout(EV_P_ ev_timer *timer, int revents) { +#if EV_MULTIPLICITY + ((void)EV_A); +#endif ((void)revents); redisLibevEvents *e = (redisLibevEvents*)timer->data; redisAsyncHandleTimeout(e->context); @@ -127,8 +135,9 @@ static void redisLibevTimeout(EV_P_ ev_timer *timer, int revents) { static void redisLibevSetTimeout(void *privdata, struct timeval tv) { redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY struct ev_loop *loop = e->loop; - ((void)loop); +#endif if (!ev_is_active(&e->timer)) { ev_init(&e->timer, redisLibevTimeout); diff --git a/adapters/libevent.h b/adapters/libevent.h index 9150979..73bb8ed 100644 --- a/adapters/libevent.h +++ b/adapters/libevent.h @@ -50,7 +50,7 @@ static void redisLibeventDestroy(redisLibeventEvents *e) { hi_free(e); } -static void redisLibeventHandler(int fd, short event, void *arg) { +static void redisLibeventHandler(evutil_socket_t fd, short event, void *arg) { ((void)fd); redisLibeventEvents *e = (redisLibeventEvents*)arg; e->state |= REDIS_LIBEVENT_ENTERED; diff --git a/adapters/libuv.h b/adapters/libuv.h index c120b1b..df0a845 100644 --- a/adapters/libuv.h +++ b/adapters/libuv.h @@ -7,111 +7,157 @@ #include <string.h> typedef struct redisLibuvEvents { - redisAsyncContext* context; - uv_poll_t handle; - int events; + redisAsyncContext* context; + uv_poll_t handle; + uv_timer_t timer; + int events; } redisLibuvEvents; static void redisLibuvPoll(uv_poll_t* handle, int status, int events) { - redisLibuvEvents* p = (redisLibuvEvents*)handle->data; - int ev = (status ? p->events : events); - - if (p->context != NULL && (ev & UV_READABLE)) { - redisAsyncHandleRead(p->context); - } - if (p->context != NULL && (ev & UV_WRITABLE)) { - redisAsyncHandleWrite(p->context); - } + redisLibuvEvents* p = (redisLibuvEvents*)handle->data; + int ev = (status ? p->events : events); + + if (p->context != NULL && (ev & UV_READABLE)) { + redisAsyncHandleRead(p->context); + } + if (p->context != NULL && (ev & UV_WRITABLE)) { + redisAsyncHandleWrite(p->context); + } } static void redisLibuvAddRead(void *privdata) { - redisLibuvEvents* p = (redisLibuvEvents*)privdata; + redisLibuvEvents* p = (redisLibuvEvents*)privdata; - p->events |= UV_READABLE; + p->events |= UV_READABLE; - uv_poll_start(&p->handle, p->events, redisLibuvPoll); + uv_poll_start(&p->handle, p->events, redisLibuvPoll); } static void redisLibuvDelRead(void *privdata) { - redisLibuvEvents* p = (redisLibuvEvents*)privdata; + redisLibuvEvents* p = (redisLibuvEvents*)privdata; - p->events &= ~UV_READABLE; + p->events &= ~UV_READABLE; - if (p->events) { - uv_poll_start(&p->handle, p->events, redisLibuvPoll); - } else { - uv_poll_stop(&p->handle); - } + if (p->events) { + uv_poll_start(&p->handle, p->events, redisLibuvPoll); + } else { + uv_poll_stop(&p->handle); + } } static void redisLibuvAddWrite(void *privdata) { - redisLibuvEvents* p = (redisLibuvEvents*)privdata; + redisLibuvEvents* p = (redisLibuvEvents*)privdata; - p->events |= UV_WRITABLE; + p->events |= UV_WRITABLE; - uv_poll_start(&p->handle, p->events, redisLibuvPoll); + uv_poll_start(&p->handle, p->events, redisLibuvPoll); } static void redisLibuvDelWrite(void *privdata) { - redisLibuvEvents* p = (redisLibuvEvents*)privdata; + redisLibuvEvents* p = (redisLibuvEvents*)privdata; - p->events &= ~UV_WRITABLE; + p->events &= ~UV_WRITABLE; - if (p->events) { - uv_poll_start(&p->handle, p->events, redisLibuvPoll); - } else { - uv_poll_stop(&p->handle); - } + if (p->events) { + uv_poll_start(&p->handle, p->events, redisLibuvPoll); + } else { + uv_poll_stop(&p->handle); + } } +static void on_timer_close(uv_handle_t *handle) { + redisLibuvEvents* p = (redisLibuvEvents*)handle->data; + p->timer.data = NULL; + if (!p->handle.data) { + // both timer and handle are closed + hi_free(p); + } + // else, wait for `on_handle_close` +} -static void on_close(uv_handle_t* handle) { - redisLibuvEvents* p = (redisLibuvEvents*)handle->data; +static void on_handle_close(uv_handle_t *handle) { + redisLibuvEvents* p = (redisLibuvEvents*)handle->data; + p->handle.data = NULL; + if (!p->timer.data) { + // timer never started, or timer already destroyed + hi_free(p); + } + // else, wait for `on_timer_close` +} - hi_free(p); +// libuv removed `status` parameter since v0.11.23 +// see: https://github.com/libuv/libuv/blob/v0.11.23/include/uv.h +#if (UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR < 11) || \ + (UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR == 11 && UV_VERSION_PATCH < 23) +static void redisLibuvTimeout(uv_timer_t *timer, int status) { + (void)status; // unused +#else +static void redisLibuvTimeout(uv_timer_t *timer) { +#endif + redisLibuvEvents *e = (redisLibuvEvents*)timer->data; + redisAsyncHandleTimeout(e->context); } +static void redisLibuvSetTimeout(void *privdata, struct timeval tv) { + redisLibuvEvents* p = (redisLibuvEvents*)privdata; + + uint64_t millsec = tv.tv_sec * 1000 + tv.tv_usec / 1000.0; + if (!p->timer.data) { + // timer is uninitialized + if (uv_timer_init(p->handle.loop, &p->timer) != 0) { + return; + } + p->timer.data = p; + } + // updates the timeout if the timer has already started + // or start the timer + uv_timer_start(&p->timer, redisLibuvTimeout, millsec, 0); +} static void redisLibuvCleanup(void *privdata) { - redisLibuvEvents* p = (redisLibuvEvents*)privdata; + redisLibuvEvents* p = (redisLibuvEvents*)privdata; - p->context = NULL; // indicate that context might no longer exist - uv_close((uv_handle_t*)&p->handle, on_close); + p->context = NULL; // indicate that context might no longer exist + if (p->timer.data) { + uv_close((uv_handle_t*)&p->timer, on_timer_close); + } + uv_close((uv_handle_t*)&p->handle, on_handle_close); } static int redisLibuvAttach(redisAsyncContext* ac, uv_loop_t* loop) { - redisContext *c = &(ac->c); + redisContext *c = &(ac->c); - if (ac->ev.data != NULL) { - return REDIS_ERR; - } + if (ac->ev.data != NULL) { + return REDIS_ERR; + } - ac->ev.addRead = redisLibuvAddRead; - ac->ev.delRead = redisLibuvDelRead; - ac->ev.addWrite = redisLibuvAddWrite; - ac->ev.delWrite = redisLibuvDelWrite; - ac->ev.cleanup = redisLibuvCleanup; + ac->ev.addRead = redisLibuvAddRead; + ac->ev.delRead = redisLibuvDelRead; + ac->ev.addWrite = redisLibuvAddWrite; + ac->ev.delWrite = redisLibuvDelWrite; + ac->ev.cleanup = redisLibuvCleanup; + ac->ev.scheduleTimer = redisLibuvSetTimeout; - redisLibuvEvents* p = (redisLibuvEvents*)hi_malloc(sizeof(*p)); - if (p == NULL) - return REDIS_ERR; + redisLibuvEvents* p = (redisLibuvEvents*)hi_malloc(sizeof(*p)); + if (p == NULL) + return REDIS_ERR; - memset(p, 0, sizeof(*p)); + memset(p, 0, sizeof(*p)); - if (uv_poll_init_socket(loop, &p->handle, c->fd) != 0) { - return REDIS_ERR; - } + if (uv_poll_init_socket(loop, &p->handle, c->fd) != 0) { + return REDIS_ERR; + } - ac->ev.data = p; - p->handle.data = p; - p->context = ac; + ac->ev.data = p; + p->handle.data = p; + p->context = ac; - return REDIS_OK; + return REDIS_OK; } #endif diff --git a/adapters/poll.h b/adapters/poll.h new file mode 100644 index 0000000..f138650 --- /dev/null +++ b/adapters/poll.h @@ -0,0 +1,197 @@ + +#ifndef HIREDIS_POLL_H +#define HIREDIS_POLL_H + +#include "../async.h" +#include "../sockcompat.h" +#include <string.h> // for memset +#include <errno.h> + +/* Values to return from redisPollTick */ +#define REDIS_POLL_HANDLED_READ 1 +#define REDIS_POLL_HANDLED_WRITE 2 +#define REDIS_POLL_HANDLED_TIMEOUT 4 + +/* An adapter to allow manual polling of the async context by checking the state + * of the underlying file descriptor. Useful in cases where there is no formal + * IO event loop but regular ticking can be used, such as in game engines. */ + +typedef struct redisPollEvents { + redisAsyncContext *context; + redisFD fd; + char reading, writing; + char in_tick; + char deleted; + double deadline; +} redisPollEvents; + +static double redisPollTimevalToDouble(struct timeval *tv) { + if (tv == NULL) + return 0.0; + return tv->tv_sec + tv->tv_usec / 1000000.00; +} + +static double redisPollGetNow(void) { +#ifndef _MSC_VER + struct timeval tv; + gettimeofday(&tv,NULL); + return redisPollTimevalToDouble(&tv); +#else + FILETIME ft; + ULARGE_INTEGER li; + GetSystemTimeAsFileTime(&ft); + li.HighPart = ft.dwHighDateTime; + li.LowPart = ft.dwLowDateTime; + return (double)li.QuadPart * 1e-7; +#endif +} + +/* Poll for io, handling any pending callbacks. The timeout argument can be + * positive to wait for a maximum given time for IO, zero to poll, or negative + * to wait forever */ +static int redisPollTick(redisAsyncContext *ac, double timeout) { + int reading, writing; + struct pollfd pfd; + int handled; + int ns; + int itimeout; + + redisPollEvents *e = (redisPollEvents*)ac->ev.data; + if (!e) + return 0; + + /* local flags, won't get changed during callbacks */ + reading = e->reading; + writing = e->writing; + if (!reading && !writing) + return 0; + + pfd.fd = e->fd; + pfd.events = 0; + if (reading) + pfd.events = POLLIN; + if (writing) + pfd.events |= POLLOUT; + + if (timeout >= 0.0) { + itimeout = (int)(timeout * 1000.0); + } else { + itimeout = -1; + } + + ns = poll(&pfd, 1, itimeout); + if (ns < 0) { + /* ignore the EINTR error */ + if (errno != EINTR) + return ns; + ns = 0; + } + + handled = 0; + e->in_tick = 1; + if (ns) { + if (reading && (pfd.revents & POLLIN)) { + redisAsyncHandleRead(ac); + handled |= REDIS_POLL_HANDLED_READ; + } + /* on Windows, connection failure is indicated with the Exception fdset. + * handle it the same as writable. */ + if (writing && (pfd.revents & (POLLOUT | POLLERR))) { + /* context Read callback may have caused context to be deleted, e.g. + by doing an redisAsyncDisconnect() */ + if (!e->deleted) { + redisAsyncHandleWrite(ac); + handled |= REDIS_POLL_HANDLED_WRITE; + } + } + } + + /* perform timeouts */ + if (!e->deleted && e->deadline != 0.0) { + double now = redisPollGetNow(); + if (now >= e->deadline) { + /* deadline has passed. disable timeout and perform callback */ + e->deadline = 0.0; + redisAsyncHandleTimeout(ac); + handled |= REDIS_POLL_HANDLED_TIMEOUT; + } + } + + /* do a delayed cleanup if required */ + if (e->deleted) + hi_free(e); + else + e->in_tick = 0; + + return handled; +} + +static void redisPollAddRead(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + e->reading = 1; +} + +static void redisPollDelRead(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + e->reading = 0; +} + +static void redisPollAddWrite(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + e->writing = 1; +} + +static void redisPollDelWrite(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + e->writing = 0; +} + +static void redisPollCleanup(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + + /* if we are currently processing a tick, postpone deletion */ + if (e->in_tick) + e->deleted = 1; + else + hi_free(e); +} + +static void redisPollScheduleTimer(void *data, struct timeval tv) +{ + redisPollEvents *e = (redisPollEvents*)data; + double now = redisPollGetNow(); + e->deadline = now + redisPollTimevalToDouble(&tv); +} + +static int redisPollAttach(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + redisPollEvents *e; + + /* Nothing should be attached when something is already attached */ + if (ac->ev.data != NULL) + return REDIS_ERR; + + /* Create container for context and r/w events */ + e = (redisPollEvents*)hi_malloc(sizeof(*e)); + if (e == NULL) + return REDIS_ERR; + memset(e, 0, sizeof(*e)); + + e->context = ac; + e->fd = c->fd; + e->reading = e->writing = 0; + e->in_tick = e->deleted = 0; + e->deadline = 0.0; + + /* Register functions to start/stop listening for events */ + ac->ev.addRead = redisPollAddRead; + ac->ev.delRead = redisPollDelRead; + ac->ev.addWrite = redisPollAddWrite; + ac->ev.delWrite = redisPollDelWrite; + ac->ev.scheduleTimer = redisPollScheduleTimer; + ac->ev.cleanup = redisPollCleanup; + ac->ev.data = e; + + return REDIS_OK; +} +#endif /* HIREDIS_POLL_H */ |