diff options
Diffstat (limited to 'adapters')
| -rw-r--r-- | adapters/libhv.h | 78 | 
1 files changed, 66 insertions, 12 deletions
| diff --git a/adapters/libhv.h b/adapters/libhv.h index 368a379..e88e543 100644 --- a/adapters/libhv.h +++ b/adapters/libhv.h @@ -5,6 +5,11 @@  #include "../hiredis.h"  #include "../async.h" +typedef struct redisLibhvEvents { +    hio_t *io; +    htimer_t *timer; +} redisLibhvEvents; +  static void redisLibhvHandleEvents(hio_t* io) {      redisAsyncContext* context = (redisAsyncContext*)hevent_userdata(io);      int events = hio_events(io); @@ -18,51 +23,100 @@ static void redisLibhvHandleEvents(hio_t* io) {  }  static void redisLibhvAddRead(void *privdata) { -    hio_t* io = (hio_t*)privdata; -    hio_add(io, redisLibhvHandleEvents, HV_READ); +    redisLibhvEvents* events = (redisLibhvEvents*)privdata; +    hio_add(events->io, redisLibhvHandleEvents, HV_READ);  }  static void redisLibhvDelRead(void *privdata) { -    hio_t* io = (hio_t*)privdata; -    hio_del(io, HV_READ); +    redisLibhvEvents* events = (redisLibhvEvents*)privdata; +    hio_del(events->io, HV_READ);  }  static void redisLibhvAddWrite(void *privdata) { -    hio_t* io = (hio_t*)privdata; -    hio_add(io, redisLibhvHandleEvents, HV_WRITE); +    redisLibhvEvents* events = (redisLibhvEvents*)privdata; +    hio_add(events->io, redisLibhvHandleEvents, HV_WRITE);  }  static void redisLibhvDelWrite(void *privdata) { -    hio_t* io = (hio_t*)privdata; -    hio_del(io, HV_WRITE); +    redisLibhvEvents* events = (redisLibhvEvents*)privdata; +    hio_del(events->io, HV_WRITE);  }  static void redisLibhvCleanup(void *privdata) { -    hio_t* io = (hio_t*)privdata; -    hio_close(io); -    hevent_set_userdata(io, NULL); +    redisLibhvEvents* events = (redisLibhvEvents*)privdata; + +    if (events->timer) +        htimer_del(events->timer); + +    hio_close(events->io); +    hevent_set_userdata(events->io, NULL); + +    hi_free(events); +} + +static void redisLibhvTimeout(htimer_t* timer) { +    hio_t* io = (hio_t*)hevent_userdata(timer); +    redisAsyncHandleTimeout(hevent_userdata(io)); +} + +static void redisLibhvSetTimeout(void *privdata, struct timeval tv) { +    redisLibhvEvents* events; +    uint32_t millis; +    hloop_t* loop; + +    events = (redisLibhvEvents*)privdata; +    millis = tv.tv_sec * 1000 + tv.tv_usec / 1000; + +    if (millis == 0) { +        /* Libhv disallows zero'd timers so treat this as a delete or NO OP */ +        if (events->timer) { +            htimer_del(events->timer); +            events->timer = NULL; +        } +    } else if (events->timer == NULL) { +        /* Add new timer */ +        loop = hevent_loop(events->io); +        events->timer = htimer_add(loop, redisLibhvTimeout, millis, 1); +        hevent_set_userdata(events->timer, events->io); +    } else { +        /* Update existing timer */ +        htimer_reset(events->timer, millis); +    }  }  static int redisLibhvAttach(redisAsyncContext* ac, hloop_t* loop) {      redisContext *c = &(ac->c); +    redisLibhvEvents *events;      hio_t* io = NULL;      if (ac->ev.data != NULL) {          return REDIS_ERR;      } +    /* Create container struct to keep track of our io and any timer */ +    events = hi_malloc(sizeof(*events)); +    if (events == NULL) { +        return REDIS_ERR; +    } +      io = hio_get(loop, c->fd);      if (io == NULL) { +        hi_free(events);          return REDIS_ERR;      } +      hevent_set_userdata(io, ac); +    events->io = io; +    events->timer = NULL; +      ac->ev.addRead  = redisLibhvAddRead;      ac->ev.delRead  = redisLibhvDelRead;      ac->ev.addWrite = redisLibhvAddWrite;      ac->ev.delWrite = redisLibhvDelWrite;      ac->ev.cleanup  = redisLibhvCleanup; -    ac->ev.data     = io; +    ac->ev.scheduleTimer = redisLibhvSetTimeout; +    ac->ev.data = events;      return REDIS_OK;  } | 
