diff options
Diffstat (limited to 'adapters')
| -rw-r--r-- | adapters/libev.h | 21 | ||||
| -rw-r--r-- | adapters/libevent.h | 2 | ||||
| -rw-r--r-- | adapters/libuv.h | 162 | ||||
| -rw-r--r-- | adapters/poll.h | 197 | 
4 files changed, 317 insertions, 65 deletions
| diff --git a/adapters/libev.h b/adapters/libev.h index 6191543..c59d3da 100644 --- a/adapters/libev.h +++ b/adapters/libev.h @@ -66,8 +66,9 @@ static void redisLibevWriteEvent(EV_P_ ev_io *watcher, int revents) {  static void redisLibevAddRead(void *privdata) {      redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY      struct ev_loop *loop = e->loop; -    ((void)loop); +#endif      if (!e->reading) {          e->reading = 1;          ev_io_start(EV_A_ &e->rev); @@ -76,8 +77,9 @@ static void redisLibevAddRead(void *privdata) {  static void redisLibevDelRead(void *privdata) {      redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY      struct ev_loop *loop = e->loop; -    ((void)loop); +#endif      if (e->reading) {          e->reading = 0;          ev_io_stop(EV_A_ &e->rev); @@ -86,8 +88,9 @@ static void redisLibevDelRead(void *privdata) {  static void redisLibevAddWrite(void *privdata) {      redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY      struct ev_loop *loop = e->loop; -    ((void)loop); +#endif      if (!e->writing) {          e->writing = 1;          ev_io_start(EV_A_ &e->wev); @@ -96,8 +99,9 @@ static void redisLibevAddWrite(void *privdata) {  static void redisLibevDelWrite(void *privdata) {      redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY      struct ev_loop *loop = e->loop; -    ((void)loop); +#endif      if (e->writing) {          e->writing = 0;          ev_io_stop(EV_A_ &e->wev); @@ -106,8 +110,9 @@ static void redisLibevDelWrite(void *privdata) {  static void redisLibevStopTimer(void *privdata) {      redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY      struct ev_loop *loop = e->loop; -    ((void)loop); +#endif      ev_timer_stop(EV_A_ &e->timer);  } @@ -120,6 +125,9 @@ static void redisLibevCleanup(void *privdata) {  }  static void redisLibevTimeout(EV_P_ ev_timer *timer, int revents) { +#if EV_MULTIPLICITY +    ((void)EV_A); +#endif      ((void)revents);      redisLibevEvents *e = (redisLibevEvents*)timer->data;      redisAsyncHandleTimeout(e->context); @@ -127,8 +135,9 @@ static void redisLibevTimeout(EV_P_ ev_timer *timer, int revents) {  static void redisLibevSetTimeout(void *privdata, struct timeval tv) {      redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY      struct ev_loop *loop = e->loop; -    ((void)loop); +#endif      if (!ev_is_active(&e->timer)) {          ev_init(&e->timer, redisLibevTimeout); diff --git a/adapters/libevent.h b/adapters/libevent.h index 9150979..73bb8ed 100644 --- a/adapters/libevent.h +++ b/adapters/libevent.h @@ -50,7 +50,7 @@ static void redisLibeventDestroy(redisLibeventEvents *e) {      hi_free(e);  } -static void redisLibeventHandler(int fd, short event, void *arg) { +static void redisLibeventHandler(evutil_socket_t fd, short event, void *arg) {      ((void)fd);      redisLibeventEvents *e = (redisLibeventEvents*)arg;      e->state |= REDIS_LIBEVENT_ENTERED; diff --git a/adapters/libuv.h b/adapters/libuv.h index c120b1b..df0a845 100644 --- a/adapters/libuv.h +++ b/adapters/libuv.h @@ -7,111 +7,157 @@  #include <string.h>  typedef struct redisLibuvEvents { -  redisAsyncContext* context; -  uv_poll_t          handle; -  int                events; +    redisAsyncContext* context; +    uv_poll_t          handle; +    uv_timer_t         timer; +    int                events;  } redisLibuvEvents;  static void redisLibuvPoll(uv_poll_t* handle, int status, int events) { -  redisLibuvEvents* p = (redisLibuvEvents*)handle->data; -  int ev = (status ? p->events : events); - -  if (p->context != NULL && (ev & UV_READABLE)) { -    redisAsyncHandleRead(p->context); -  } -  if (p->context != NULL && (ev & UV_WRITABLE)) { -    redisAsyncHandleWrite(p->context); -  } +    redisLibuvEvents* p = (redisLibuvEvents*)handle->data; +    int ev = (status ? p->events : events); + +    if (p->context != NULL && (ev & UV_READABLE)) { +        redisAsyncHandleRead(p->context); +    } +    if (p->context != NULL && (ev & UV_WRITABLE)) { +        redisAsyncHandleWrite(p->context); +    }  }  static void redisLibuvAddRead(void *privdata) { -  redisLibuvEvents* p = (redisLibuvEvents*)privdata; +    redisLibuvEvents* p = (redisLibuvEvents*)privdata; -  p->events |= UV_READABLE; +    p->events |= UV_READABLE; -  uv_poll_start(&p->handle, p->events, redisLibuvPoll); +    uv_poll_start(&p->handle, p->events, redisLibuvPoll);  }  static void redisLibuvDelRead(void *privdata) { -  redisLibuvEvents* p = (redisLibuvEvents*)privdata; +    redisLibuvEvents* p = (redisLibuvEvents*)privdata; -  p->events &= ~UV_READABLE; +    p->events &= ~UV_READABLE; -  if (p->events) { -    uv_poll_start(&p->handle, p->events, redisLibuvPoll); -  } else { -    uv_poll_stop(&p->handle); -  } +    if (p->events) { +        uv_poll_start(&p->handle, p->events, redisLibuvPoll); +    } else { +        uv_poll_stop(&p->handle); +    }  }  static void redisLibuvAddWrite(void *privdata) { -  redisLibuvEvents* p = (redisLibuvEvents*)privdata; +    redisLibuvEvents* p = (redisLibuvEvents*)privdata; -  p->events |= UV_WRITABLE; +    p->events |= UV_WRITABLE; -  uv_poll_start(&p->handle, p->events, redisLibuvPoll); +    uv_poll_start(&p->handle, p->events, redisLibuvPoll);  }  static void redisLibuvDelWrite(void *privdata) { -  redisLibuvEvents* p = (redisLibuvEvents*)privdata; +    redisLibuvEvents* p = (redisLibuvEvents*)privdata; -  p->events &= ~UV_WRITABLE; +    p->events &= ~UV_WRITABLE; -  if (p->events) { -    uv_poll_start(&p->handle, p->events, redisLibuvPoll); -  } else { -    uv_poll_stop(&p->handle); -  } +    if (p->events) { +        uv_poll_start(&p->handle, p->events, redisLibuvPoll); +    } else { +        uv_poll_stop(&p->handle); +    }  } +static void on_timer_close(uv_handle_t *handle) { +    redisLibuvEvents* p = (redisLibuvEvents*)handle->data; +    p->timer.data = NULL; +    if (!p->handle.data) { +        // both timer and handle are closed +        hi_free(p); +    } +    // else, wait for `on_handle_close` +} -static void on_close(uv_handle_t* handle) { -  redisLibuvEvents* p = (redisLibuvEvents*)handle->data; +static void on_handle_close(uv_handle_t *handle) { +    redisLibuvEvents* p = (redisLibuvEvents*)handle->data; +    p->handle.data = NULL; +    if (!p->timer.data) { +        // timer never started, or timer already destroyed +        hi_free(p); +    } +    // else, wait for `on_timer_close` +} -  hi_free(p); +// libuv removed `status` parameter since v0.11.23 +// see: https://github.com/libuv/libuv/blob/v0.11.23/include/uv.h +#if (UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR < 11) || \ +    (UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR == 11 && UV_VERSION_PATCH < 23) +static void redisLibuvTimeout(uv_timer_t *timer, int status) { +    (void)status; // unused +#else +static void redisLibuvTimeout(uv_timer_t *timer) { +#endif +    redisLibuvEvents *e = (redisLibuvEvents*)timer->data; +    redisAsyncHandleTimeout(e->context);  } +static void redisLibuvSetTimeout(void *privdata, struct timeval tv) { +    redisLibuvEvents* p = (redisLibuvEvents*)privdata; + +    uint64_t millsec = tv.tv_sec * 1000 + tv.tv_usec / 1000.0; +    if (!p->timer.data) { +        // timer is uninitialized +        if (uv_timer_init(p->handle.loop, &p->timer) != 0) { +            return; +        } +        p->timer.data = p; +    } +    // updates the timeout if the timer has already started +    // or start the timer +    uv_timer_start(&p->timer, redisLibuvTimeout, millsec, 0); +}  static void redisLibuvCleanup(void *privdata) { -  redisLibuvEvents* p = (redisLibuvEvents*)privdata; +    redisLibuvEvents* p = (redisLibuvEvents*)privdata; -  p->context = NULL; // indicate that context might no longer exist -  uv_close((uv_handle_t*)&p->handle, on_close); +    p->context = NULL; // indicate that context might no longer exist +    if (p->timer.data) { +        uv_close((uv_handle_t*)&p->timer, on_timer_close); +    } +    uv_close((uv_handle_t*)&p->handle, on_handle_close);  }  static int redisLibuvAttach(redisAsyncContext* ac, uv_loop_t* loop) { -  redisContext *c = &(ac->c); +    redisContext *c = &(ac->c); -  if (ac->ev.data != NULL) { -    return REDIS_ERR; -  } +    if (ac->ev.data != NULL) { +        return REDIS_ERR; +    } -  ac->ev.addRead  = redisLibuvAddRead; -  ac->ev.delRead  = redisLibuvDelRead; -  ac->ev.addWrite = redisLibuvAddWrite; -  ac->ev.delWrite = redisLibuvDelWrite; -  ac->ev.cleanup  = redisLibuvCleanup; +    ac->ev.addRead        = redisLibuvAddRead; +    ac->ev.delRead        = redisLibuvDelRead; +    ac->ev.addWrite       = redisLibuvAddWrite; +    ac->ev.delWrite       = redisLibuvDelWrite; +    ac->ev.cleanup        = redisLibuvCleanup; +    ac->ev.scheduleTimer  = redisLibuvSetTimeout; -  redisLibuvEvents* p = (redisLibuvEvents*)hi_malloc(sizeof(*p)); -  if (p == NULL) -      return REDIS_ERR; +    redisLibuvEvents* p = (redisLibuvEvents*)hi_malloc(sizeof(*p)); +    if (p == NULL) +        return REDIS_ERR; -  memset(p, 0, sizeof(*p)); +    memset(p, 0, sizeof(*p)); -  if (uv_poll_init_socket(loop, &p->handle, c->fd) != 0) { -    return REDIS_ERR; -  } +    if (uv_poll_init_socket(loop, &p->handle, c->fd) != 0) { +        return REDIS_ERR; +    } -  ac->ev.data    = p; -  p->handle.data = p; -  p->context     = ac; +    ac->ev.data    = p; +    p->handle.data = p; +    p->context     = ac; -  return REDIS_OK; +    return REDIS_OK;  }  #endif diff --git a/adapters/poll.h b/adapters/poll.h new file mode 100644 index 0000000..f138650 --- /dev/null +++ b/adapters/poll.h @@ -0,0 +1,197 @@ + +#ifndef HIREDIS_POLL_H +#define HIREDIS_POLL_H + +#include "../async.h" +#include "../sockcompat.h" +#include <string.h> // for memset +#include <errno.h> + +/* Values to return from redisPollTick */ +#define REDIS_POLL_HANDLED_READ    1 +#define REDIS_POLL_HANDLED_WRITE   2 +#define REDIS_POLL_HANDLED_TIMEOUT 4 + +/* An adapter to allow manual polling of the async context by checking the state + * of the underlying file descriptor.  Useful in cases where there is no formal + * IO event loop but regular ticking can be used, such as in game engines. */ + +typedef struct redisPollEvents { +    redisAsyncContext *context; +    redisFD fd; +    char reading, writing; +    char in_tick; +    char deleted; +    double deadline; +} redisPollEvents; + +static double redisPollTimevalToDouble(struct timeval *tv) { +    if (tv == NULL) +        return 0.0; +    return tv->tv_sec + tv->tv_usec / 1000000.00; +} + +static double redisPollGetNow(void) { +#ifndef _MSC_VER +    struct timeval tv; +    gettimeofday(&tv,NULL); +    return redisPollTimevalToDouble(&tv); +#else +    FILETIME ft; +    ULARGE_INTEGER li; +    GetSystemTimeAsFileTime(&ft); +    li.HighPart = ft.dwHighDateTime; +    li.LowPart = ft.dwLowDateTime; +    return (double)li.QuadPart * 1e-7; +#endif +} + +/* Poll for io, handling any pending callbacks.  The timeout argument can be + * positive to wait for a maximum given time for IO, zero to poll, or negative + * to wait forever */ +static int redisPollTick(redisAsyncContext *ac, double timeout) { +    int reading, writing; +    struct pollfd pfd; +    int handled; +    int ns; +    int itimeout; + +    redisPollEvents *e = (redisPollEvents*)ac->ev.data; +    if (!e) +        return 0; + +    /* local flags, won't get changed during callbacks */ +    reading = e->reading; +    writing = e->writing; +    if (!reading && !writing) +        return 0; + +    pfd.fd = e->fd; +    pfd.events = 0; +    if (reading) +        pfd.events = POLLIN;    +    if (writing) +        pfd.events |= POLLOUT; + +    if (timeout >= 0.0) { +        itimeout = (int)(timeout * 1000.0); +    } else { +        itimeout = -1; +    } + +    ns = poll(&pfd, 1, itimeout); +    if (ns < 0) { +        /* ignore the EINTR error */ +        if (errno != EINTR) +            return ns; +        ns = 0; +    } +     +    handled = 0; +    e->in_tick = 1; +    if (ns) { +        if (reading && (pfd.revents & POLLIN)) { +            redisAsyncHandleRead(ac); +            handled |= REDIS_POLL_HANDLED_READ; +        } +        /* on Windows, connection failure is indicated with the Exception fdset. +         * handle it the same as writable. */ +        if (writing && (pfd.revents & (POLLOUT | POLLERR))) { +            /* context Read callback may have caused context to be deleted, e.g. +               by doing an redisAsyncDisconnect() */ +            if (!e->deleted) { +                redisAsyncHandleWrite(ac); +                handled |= REDIS_POLL_HANDLED_WRITE; +            } +        } +    } + +    /* perform timeouts */ +    if (!e->deleted && e->deadline != 0.0) { +        double now = redisPollGetNow(); +        if (now >= e->deadline) { +            /* deadline has passed.  disable timeout and perform callback */ +            e->deadline = 0.0; +            redisAsyncHandleTimeout(ac); +            handled |= REDIS_POLL_HANDLED_TIMEOUT; +        } +    } + +    /* do a delayed cleanup if required */ +    if (e->deleted) +        hi_free(e); +    else +        e->in_tick = 0; + +    return handled; +} + +static void redisPollAddRead(void *data) { +    redisPollEvents *e = (redisPollEvents*)data; +    e->reading = 1; +} + +static void redisPollDelRead(void *data) { +    redisPollEvents *e = (redisPollEvents*)data; +    e->reading = 0; +} + +static void redisPollAddWrite(void *data) { +    redisPollEvents *e = (redisPollEvents*)data; +    e->writing = 1; +} + +static void redisPollDelWrite(void *data) { +    redisPollEvents *e = (redisPollEvents*)data; +    e->writing = 0; +} + +static void redisPollCleanup(void *data) { +    redisPollEvents *e = (redisPollEvents*)data; + +    /* if we are currently processing a tick, postpone deletion */ +    if (e->in_tick) +        e->deleted = 1; +    else +        hi_free(e); +} + +static void redisPollScheduleTimer(void *data, struct timeval tv) +{ +    redisPollEvents *e = (redisPollEvents*)data; +    double now = redisPollGetNow(); +    e->deadline = now + redisPollTimevalToDouble(&tv); +} + +static int redisPollAttach(redisAsyncContext *ac) { +    redisContext *c = &(ac->c); +    redisPollEvents *e; + +    /* Nothing should be attached when something is already attached */ +    if (ac->ev.data != NULL) +        return REDIS_ERR; + +    /* Create container for context and r/w events */ +    e = (redisPollEvents*)hi_malloc(sizeof(*e)); +    if (e == NULL) +        return REDIS_ERR; +    memset(e, 0, sizeof(*e)); + +    e->context = ac; +    e->fd = c->fd; +    e->reading = e->writing = 0; +    e->in_tick = e->deleted = 0; +    e->deadline = 0.0; + +    /* Register functions to start/stop listening for events */ +    ac->ev.addRead = redisPollAddRead; +    ac->ev.delRead = redisPollDelRead; +    ac->ev.addWrite = redisPollAddWrite; +    ac->ev.delWrite = redisPollDelWrite; +    ac->ev.scheduleTimer = redisPollScheduleTimer; +    ac->ev.cleanup = redisPollCleanup; +    ac->ev.data = e; + +    return REDIS_OK; +} +#endif /* HIREDIS_POLL_H */ | 
