summaryrefslogtreecommitdiff
path: root/adapters
diff options
context:
space:
mode:
Diffstat (limited to 'adapters')
-rw-r--r--adapters/libev.h21
-rw-r--r--adapters/libevent.h2
-rw-r--r--adapters/libuv.h162
-rw-r--r--adapters/poll.h197
4 files changed, 317 insertions, 65 deletions
diff --git a/adapters/libev.h b/adapters/libev.h
index 6191543..c59d3da 100644
--- a/adapters/libev.h
+++ b/adapters/libev.h
@@ -66,8 +66,9 @@ static void redisLibevWriteEvent(EV_P_ ev_io *watcher, int revents) {
static void redisLibevAddRead(void *privdata) {
redisLibevEvents *e = (redisLibevEvents*)privdata;
+#if EV_MULTIPLICITY
struct ev_loop *loop = e->loop;
- ((void)loop);
+#endif
if (!e->reading) {
e->reading = 1;
ev_io_start(EV_A_ &e->rev);
@@ -76,8 +77,9 @@ static void redisLibevAddRead(void *privdata) {
static void redisLibevDelRead(void *privdata) {
redisLibevEvents *e = (redisLibevEvents*)privdata;
+#if EV_MULTIPLICITY
struct ev_loop *loop = e->loop;
- ((void)loop);
+#endif
if (e->reading) {
e->reading = 0;
ev_io_stop(EV_A_ &e->rev);
@@ -86,8 +88,9 @@ static void redisLibevDelRead(void *privdata) {
static void redisLibevAddWrite(void *privdata) {
redisLibevEvents *e = (redisLibevEvents*)privdata;
+#if EV_MULTIPLICITY
struct ev_loop *loop = e->loop;
- ((void)loop);
+#endif
if (!e->writing) {
e->writing = 1;
ev_io_start(EV_A_ &e->wev);
@@ -96,8 +99,9 @@ static void redisLibevAddWrite(void *privdata) {
static void redisLibevDelWrite(void *privdata) {
redisLibevEvents *e = (redisLibevEvents*)privdata;
+#if EV_MULTIPLICITY
struct ev_loop *loop = e->loop;
- ((void)loop);
+#endif
if (e->writing) {
e->writing = 0;
ev_io_stop(EV_A_ &e->wev);
@@ -106,8 +110,9 @@ static void redisLibevDelWrite(void *privdata) {
static void redisLibevStopTimer(void *privdata) {
redisLibevEvents *e = (redisLibevEvents*)privdata;
+#if EV_MULTIPLICITY
struct ev_loop *loop = e->loop;
- ((void)loop);
+#endif
ev_timer_stop(EV_A_ &e->timer);
}
@@ -120,6 +125,9 @@ static void redisLibevCleanup(void *privdata) {
}
static void redisLibevTimeout(EV_P_ ev_timer *timer, int revents) {
+#if EV_MULTIPLICITY
+ ((void)EV_A);
+#endif
((void)revents);
redisLibevEvents *e = (redisLibevEvents*)timer->data;
redisAsyncHandleTimeout(e->context);
@@ -127,8 +135,9 @@ static void redisLibevTimeout(EV_P_ ev_timer *timer, int revents) {
static void redisLibevSetTimeout(void *privdata, struct timeval tv) {
redisLibevEvents *e = (redisLibevEvents*)privdata;
+#if EV_MULTIPLICITY
struct ev_loop *loop = e->loop;
- ((void)loop);
+#endif
if (!ev_is_active(&e->timer)) {
ev_init(&e->timer, redisLibevTimeout);
diff --git a/adapters/libevent.h b/adapters/libevent.h
index 9150979..73bb8ed 100644
--- a/adapters/libevent.h
+++ b/adapters/libevent.h
@@ -50,7 +50,7 @@ static void redisLibeventDestroy(redisLibeventEvents *e) {
hi_free(e);
}
-static void redisLibeventHandler(int fd, short event, void *arg) {
+static void redisLibeventHandler(evutil_socket_t fd, short event, void *arg) {
((void)fd);
redisLibeventEvents *e = (redisLibeventEvents*)arg;
e->state |= REDIS_LIBEVENT_ENTERED;
diff --git a/adapters/libuv.h b/adapters/libuv.h
index c120b1b..df0a845 100644
--- a/adapters/libuv.h
+++ b/adapters/libuv.h
@@ -7,111 +7,157 @@
#include <string.h>
typedef struct redisLibuvEvents {
- redisAsyncContext* context;
- uv_poll_t handle;
- int events;
+ redisAsyncContext* context;
+ uv_poll_t handle;
+ uv_timer_t timer;
+ int events;
} redisLibuvEvents;
static void redisLibuvPoll(uv_poll_t* handle, int status, int events) {
- redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
- int ev = (status ? p->events : events);
-
- if (p->context != NULL && (ev & UV_READABLE)) {
- redisAsyncHandleRead(p->context);
- }
- if (p->context != NULL && (ev & UV_WRITABLE)) {
- redisAsyncHandleWrite(p->context);
- }
+ redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
+ int ev = (status ? p->events : events);
+
+ if (p->context != NULL && (ev & UV_READABLE)) {
+ redisAsyncHandleRead(p->context);
+ }
+ if (p->context != NULL && (ev & UV_WRITABLE)) {
+ redisAsyncHandleWrite(p->context);
+ }
}
static void redisLibuvAddRead(void *privdata) {
- redisLibuvEvents* p = (redisLibuvEvents*)privdata;
+ redisLibuvEvents* p = (redisLibuvEvents*)privdata;
- p->events |= UV_READABLE;
+ p->events |= UV_READABLE;
- uv_poll_start(&p->handle, p->events, redisLibuvPoll);
+ uv_poll_start(&p->handle, p->events, redisLibuvPoll);
}
static void redisLibuvDelRead(void *privdata) {
- redisLibuvEvents* p = (redisLibuvEvents*)privdata;
+ redisLibuvEvents* p = (redisLibuvEvents*)privdata;
- p->events &= ~UV_READABLE;
+ p->events &= ~UV_READABLE;
- if (p->events) {
- uv_poll_start(&p->handle, p->events, redisLibuvPoll);
- } else {
- uv_poll_stop(&p->handle);
- }
+ if (p->events) {
+ uv_poll_start(&p->handle, p->events, redisLibuvPoll);
+ } else {
+ uv_poll_stop(&p->handle);
+ }
}
static void redisLibuvAddWrite(void *privdata) {
- redisLibuvEvents* p = (redisLibuvEvents*)privdata;
+ redisLibuvEvents* p = (redisLibuvEvents*)privdata;
- p->events |= UV_WRITABLE;
+ p->events |= UV_WRITABLE;
- uv_poll_start(&p->handle, p->events, redisLibuvPoll);
+ uv_poll_start(&p->handle, p->events, redisLibuvPoll);
}
static void redisLibuvDelWrite(void *privdata) {
- redisLibuvEvents* p = (redisLibuvEvents*)privdata;
+ redisLibuvEvents* p = (redisLibuvEvents*)privdata;
- p->events &= ~UV_WRITABLE;
+ p->events &= ~UV_WRITABLE;
- if (p->events) {
- uv_poll_start(&p->handle, p->events, redisLibuvPoll);
- } else {
- uv_poll_stop(&p->handle);
- }
+ if (p->events) {
+ uv_poll_start(&p->handle, p->events, redisLibuvPoll);
+ } else {
+ uv_poll_stop(&p->handle);
+ }
}
+static void on_timer_close(uv_handle_t *handle) {
+ redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
+ p->timer.data = NULL;
+ if (!p->handle.data) {
+ // both timer and handle are closed
+ hi_free(p);
+ }
+ // else, wait for `on_handle_close`
+}
-static void on_close(uv_handle_t* handle) {
- redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
+static void on_handle_close(uv_handle_t *handle) {
+ redisLibuvEvents* p = (redisLibuvEvents*)handle->data;
+ p->handle.data = NULL;
+ if (!p->timer.data) {
+ // timer never started, or timer already destroyed
+ hi_free(p);
+ }
+ // else, wait for `on_timer_close`
+}
- hi_free(p);
+// libuv removed `status` parameter since v0.11.23
+// see: https://github.com/libuv/libuv/blob/v0.11.23/include/uv.h
+#if (UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR < 11) || \
+ (UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR == 11 && UV_VERSION_PATCH < 23)
+static void redisLibuvTimeout(uv_timer_t *timer, int status) {
+ (void)status; // unused
+#else
+static void redisLibuvTimeout(uv_timer_t *timer) {
+#endif
+ redisLibuvEvents *e = (redisLibuvEvents*)timer->data;
+ redisAsyncHandleTimeout(e->context);
}
+static void redisLibuvSetTimeout(void *privdata, struct timeval tv) {
+ redisLibuvEvents* p = (redisLibuvEvents*)privdata;
+
+ uint64_t millsec = tv.tv_sec * 1000 + tv.tv_usec / 1000.0;
+ if (!p->timer.data) {
+ // timer is uninitialized
+ if (uv_timer_init(p->handle.loop, &p->timer) != 0) {
+ return;
+ }
+ p->timer.data = p;
+ }
+ // updates the timeout if the timer has already started
+ // or start the timer
+ uv_timer_start(&p->timer, redisLibuvTimeout, millsec, 0);
+}
static void redisLibuvCleanup(void *privdata) {
- redisLibuvEvents* p = (redisLibuvEvents*)privdata;
+ redisLibuvEvents* p = (redisLibuvEvents*)privdata;
- p->context = NULL; // indicate that context might no longer exist
- uv_close((uv_handle_t*)&p->handle, on_close);
+ p->context = NULL; // indicate that context might no longer exist
+ if (p->timer.data) {
+ uv_close((uv_handle_t*)&p->timer, on_timer_close);
+ }
+ uv_close((uv_handle_t*)&p->handle, on_handle_close);
}
static int redisLibuvAttach(redisAsyncContext* ac, uv_loop_t* loop) {
- redisContext *c = &(ac->c);
+ redisContext *c = &(ac->c);
- if (ac->ev.data != NULL) {
- return REDIS_ERR;
- }
+ if (ac->ev.data != NULL) {
+ return REDIS_ERR;
+ }
- ac->ev.addRead = redisLibuvAddRead;
- ac->ev.delRead = redisLibuvDelRead;
- ac->ev.addWrite = redisLibuvAddWrite;
- ac->ev.delWrite = redisLibuvDelWrite;
- ac->ev.cleanup = redisLibuvCleanup;
+ ac->ev.addRead = redisLibuvAddRead;
+ ac->ev.delRead = redisLibuvDelRead;
+ ac->ev.addWrite = redisLibuvAddWrite;
+ ac->ev.delWrite = redisLibuvDelWrite;
+ ac->ev.cleanup = redisLibuvCleanup;
+ ac->ev.scheduleTimer = redisLibuvSetTimeout;
- redisLibuvEvents* p = (redisLibuvEvents*)hi_malloc(sizeof(*p));
- if (p == NULL)
- return REDIS_ERR;
+ redisLibuvEvents* p = (redisLibuvEvents*)hi_malloc(sizeof(*p));
+ if (p == NULL)
+ return REDIS_ERR;
- memset(p, 0, sizeof(*p));
+ memset(p, 0, sizeof(*p));
- if (uv_poll_init_socket(loop, &p->handle, c->fd) != 0) {
- return REDIS_ERR;
- }
+ if (uv_poll_init_socket(loop, &p->handle, c->fd) != 0) {
+ return REDIS_ERR;
+ }
- ac->ev.data = p;
- p->handle.data = p;
- p->context = ac;
+ ac->ev.data = p;
+ p->handle.data = p;
+ p->context = ac;
- return REDIS_OK;
+ return REDIS_OK;
}
#endif
diff --git a/adapters/poll.h b/adapters/poll.h
new file mode 100644
index 0000000..f138650
--- /dev/null
+++ b/adapters/poll.h
@@ -0,0 +1,197 @@
+
+#ifndef HIREDIS_POLL_H
+#define HIREDIS_POLL_H
+
+#include "../async.h"
+#include "../sockcompat.h"
+#include <string.h> // for memset
+#include <errno.h>
+
+/* Values to return from redisPollTick */
+#define REDIS_POLL_HANDLED_READ 1
+#define REDIS_POLL_HANDLED_WRITE 2
+#define REDIS_POLL_HANDLED_TIMEOUT 4
+
+/* An adapter to allow manual polling of the async context by checking the state
+ * of the underlying file descriptor. Useful in cases where there is no formal
+ * IO event loop but regular ticking can be used, such as in game engines. */
+
+typedef struct redisPollEvents {
+ redisAsyncContext *context;
+ redisFD fd;
+ char reading, writing;
+ char in_tick;
+ char deleted;
+ double deadline;
+} redisPollEvents;
+
+static double redisPollTimevalToDouble(struct timeval *tv) {
+ if (tv == NULL)
+ return 0.0;
+ return tv->tv_sec + tv->tv_usec / 1000000.00;
+}
+
+static double redisPollGetNow(void) {
+#ifndef _MSC_VER
+ struct timeval tv;
+ gettimeofday(&tv,NULL);
+ return redisPollTimevalToDouble(&tv);
+#else
+ FILETIME ft;
+ ULARGE_INTEGER li;
+ GetSystemTimeAsFileTime(&ft);
+ li.HighPart = ft.dwHighDateTime;
+ li.LowPart = ft.dwLowDateTime;
+ return (double)li.QuadPart * 1e-7;
+#endif
+}
+
+/* Poll for io, handling any pending callbacks. The timeout argument can be
+ * positive to wait for a maximum given time for IO, zero to poll, or negative
+ * to wait forever */
+static int redisPollTick(redisAsyncContext *ac, double timeout) {
+ int reading, writing;
+ struct pollfd pfd;
+ int handled;
+ int ns;
+ int itimeout;
+
+ redisPollEvents *e = (redisPollEvents*)ac->ev.data;
+ if (!e)
+ return 0;
+
+ /* local flags, won't get changed during callbacks */
+ reading = e->reading;
+ writing = e->writing;
+ if (!reading && !writing)
+ return 0;
+
+ pfd.fd = e->fd;
+ pfd.events = 0;
+ if (reading)
+ pfd.events = POLLIN;
+ if (writing)
+ pfd.events |= POLLOUT;
+
+ if (timeout >= 0.0) {
+ itimeout = (int)(timeout * 1000.0);
+ } else {
+ itimeout = -1;
+ }
+
+ ns = poll(&pfd, 1, itimeout);
+ if (ns < 0) {
+ /* ignore the EINTR error */
+ if (errno != EINTR)
+ return ns;
+ ns = 0;
+ }
+
+ handled = 0;
+ e->in_tick = 1;
+ if (ns) {
+ if (reading && (pfd.revents & POLLIN)) {
+ redisAsyncHandleRead(ac);
+ handled |= REDIS_POLL_HANDLED_READ;
+ }
+ /* on Windows, connection failure is indicated with the Exception fdset.
+ * handle it the same as writable. */
+ if (writing && (pfd.revents & (POLLOUT | POLLERR))) {
+ /* context Read callback may have caused context to be deleted, e.g.
+ by doing an redisAsyncDisconnect() */
+ if (!e->deleted) {
+ redisAsyncHandleWrite(ac);
+ handled |= REDIS_POLL_HANDLED_WRITE;
+ }
+ }
+ }
+
+ /* perform timeouts */
+ if (!e->deleted && e->deadline != 0.0) {
+ double now = redisPollGetNow();
+ if (now >= e->deadline) {
+ /* deadline has passed. disable timeout and perform callback */
+ e->deadline = 0.0;
+ redisAsyncHandleTimeout(ac);
+ handled |= REDIS_POLL_HANDLED_TIMEOUT;
+ }
+ }
+
+ /* do a delayed cleanup if required */
+ if (e->deleted)
+ hi_free(e);
+ else
+ e->in_tick = 0;
+
+ return handled;
+}
+
+static void redisPollAddRead(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+ e->reading = 1;
+}
+
+static void redisPollDelRead(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+ e->reading = 0;
+}
+
+static void redisPollAddWrite(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+ e->writing = 1;
+}
+
+static void redisPollDelWrite(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+ e->writing = 0;
+}
+
+static void redisPollCleanup(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+
+ /* if we are currently processing a tick, postpone deletion */
+ if (e->in_tick)
+ e->deleted = 1;
+ else
+ hi_free(e);
+}
+
+static void redisPollScheduleTimer(void *data, struct timeval tv)
+{
+ redisPollEvents *e = (redisPollEvents*)data;
+ double now = redisPollGetNow();
+ e->deadline = now + redisPollTimevalToDouble(&tv);
+}
+
+static int redisPollAttach(redisAsyncContext *ac) {
+ redisContext *c = &(ac->c);
+ redisPollEvents *e;
+
+ /* Nothing should be attached when something is already attached */
+ if (ac->ev.data != NULL)
+ return REDIS_ERR;
+
+ /* Create container for context and r/w events */
+ e = (redisPollEvents*)hi_malloc(sizeof(*e));
+ if (e == NULL)
+ return REDIS_ERR;
+ memset(e, 0, sizeof(*e));
+
+ e->context = ac;
+ e->fd = c->fd;
+ e->reading = e->writing = 0;
+ e->in_tick = e->deleted = 0;
+ e->deadline = 0.0;
+
+ /* Register functions to start/stop listening for events */
+ ac->ev.addRead = redisPollAddRead;
+ ac->ev.delRead = redisPollDelRead;
+ ac->ev.addWrite = redisPollAddWrite;
+ ac->ev.delWrite = redisPollDelWrite;
+ ac->ev.scheduleTimer = redisPollScheduleTimer;
+ ac->ev.cleanup = redisPollCleanup;
+ ac->ev.data = e;
+
+ return REDIS_OK;
+}
+#endif /* HIREDIS_POLL_H */