diff options
-rw-r--r-- | adapters/libhv.h | 78 | ||||
-rw-r--r-- | examples/example-libhv.c | 15 |
2 files changed, 81 insertions, 12 deletions
diff --git a/adapters/libhv.h b/adapters/libhv.h index 368a379..e88e543 100644 --- a/adapters/libhv.h +++ b/adapters/libhv.h @@ -5,6 +5,11 @@ #include "../hiredis.h" #include "../async.h" +typedef struct redisLibhvEvents { + hio_t *io; + htimer_t *timer; +} redisLibhvEvents; + static void redisLibhvHandleEvents(hio_t* io) { redisAsyncContext* context = (redisAsyncContext*)hevent_userdata(io); int events = hio_events(io); @@ -18,51 +23,100 @@ static void redisLibhvHandleEvents(hio_t* io) { } static void redisLibhvAddRead(void *privdata) { - hio_t* io = (hio_t*)privdata; - hio_add(io, redisLibhvHandleEvents, HV_READ); + redisLibhvEvents* events = (redisLibhvEvents*)privdata; + hio_add(events->io, redisLibhvHandleEvents, HV_READ); } static void redisLibhvDelRead(void *privdata) { - hio_t* io = (hio_t*)privdata; - hio_del(io, HV_READ); + redisLibhvEvents* events = (redisLibhvEvents*)privdata; + hio_del(events->io, HV_READ); } static void redisLibhvAddWrite(void *privdata) { - hio_t* io = (hio_t*)privdata; - hio_add(io, redisLibhvHandleEvents, HV_WRITE); + redisLibhvEvents* events = (redisLibhvEvents*)privdata; + hio_add(events->io, redisLibhvHandleEvents, HV_WRITE); } static void redisLibhvDelWrite(void *privdata) { - hio_t* io = (hio_t*)privdata; - hio_del(io, HV_WRITE); + redisLibhvEvents* events = (redisLibhvEvents*)privdata; + hio_del(events->io, HV_WRITE); } static void redisLibhvCleanup(void *privdata) { - hio_t* io = (hio_t*)privdata; - hio_close(io); - hevent_set_userdata(io, NULL); + redisLibhvEvents* events = (redisLibhvEvents*)privdata; + + if (events->timer) + htimer_del(events->timer); + + hio_close(events->io); + hevent_set_userdata(events->io, NULL); + + hi_free(events); +} + +static void redisLibhvTimeout(htimer_t* timer) { + hio_t* io = (hio_t*)hevent_userdata(timer); + redisAsyncHandleTimeout(hevent_userdata(io)); +} + +static void redisLibhvSetTimeout(void *privdata, struct timeval tv) { + redisLibhvEvents* events; + uint32_t millis; + hloop_t* loop; + + events = (redisLibhvEvents*)privdata; + millis = tv.tv_sec * 1000 + tv.tv_usec / 1000; + + if (millis == 0) { + /* Libhv disallows zero'd timers so treat this as a delete or NO OP */ + if (events->timer) { + htimer_del(events->timer); + events->timer = NULL; + } + } else if (events->timer == NULL) { + /* Add new timer */ + loop = hevent_loop(events->io); + events->timer = htimer_add(loop, redisLibhvTimeout, millis, 1); + hevent_set_userdata(events->timer, events->io); + } else { + /* Update existing timer */ + htimer_reset(events->timer, millis); + } } static int redisLibhvAttach(redisAsyncContext* ac, hloop_t* loop) { redisContext *c = &(ac->c); + redisLibhvEvents *events; hio_t* io = NULL; if (ac->ev.data != NULL) { return REDIS_ERR; } + /* Create container struct to keep track of our io and any timer */ + events = hi_malloc(sizeof(*events)); + if (events == NULL) { + return REDIS_ERR; + } + io = hio_get(loop, c->fd); if (io == NULL) { + hi_free(events); return REDIS_ERR; } + hevent_set_userdata(io, ac); + events->io = io; + events->timer = NULL; + ac->ev.addRead = redisLibhvAddRead; ac->ev.delRead = redisLibhvDelRead; ac->ev.addWrite = redisLibhvAddWrite; ac->ev.delWrite = redisLibhvDelWrite; ac->ev.cleanup = redisLibhvCleanup; - ac->ev.data = io; + ac->ev.scheduleTimer = redisLibhvSetTimeout; + ac->ev.data = events; return REDIS_OK; } diff --git a/examples/example-libhv.c b/examples/example-libhv.c index 23c8bc1..ac68b00 100644 --- a/examples/example-libhv.c +++ b/examples/example-libhv.c @@ -16,6 +16,18 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisAsyncDisconnect(c); } +void debugCallback(redisAsyncContext *c, void *r, void *privdata) { + (void)privdata; + redisReply *reply = r; + + if (reply == NULL) { + printf("`DEBUG SLEEP` error: %s\n", c->errstr ? c->errstr : "unknown error"); + return; + } + + redisAsyncDisconnect(c); +} + void connectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); @@ -46,10 +58,13 @@ int main (int argc, char **argv) { hloop_t* loop = hloop_new(HLOOP_FLAG_QUIT_WHEN_NO_ACTIVE_EVENTS); redisLibhvAttach(c, loop); + redisAsyncSetTimeout(c, (struct timeval){.tv_sec = 0, .tv_usec = 500000}); redisAsyncSetConnectCallback(c,connectCallback); redisAsyncSetDisconnectCallback(c,disconnectCallback); redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); + redisAsyncCommand(c, debugCallback, NULL, "DEBUG SLEEP %d", 1); hloop_run(loop); + hloop_free(&loop); return 0; } |