diff options
-rw-r--r-- | adapters/libsdevent.h | 177 | ||||
-rw-r--r-- | examples/CMakeLists.txt | 6 | ||||
-rw-r--r-- | examples/example-libsdevent.c | 86 |
3 files changed, 269 insertions, 0 deletions
diff --git a/adapters/libsdevent.h b/adapters/libsdevent.h new file mode 100644 index 0000000..1268ed9 --- /dev/null +++ b/adapters/libsdevent.h @@ -0,0 +1,177 @@ +#ifndef HIREDIS_LIBSDEVENT_H +#define HIREDIS_LIBSDEVENT_H +#include <systemd/sd-event.h> +#include "../hiredis.h" +#include "../async.h" + +#define REDIS_LIBSDEVENT_DELETED 0x01 +#define REDIS_LIBSDEVENT_ENTERED 0x02 + +typedef struct redisLibsdeventEvents { + redisAsyncContext *context; + struct sd_event *event; + struct sd_event_source *fdSource; + struct sd_event_source *timerSource; + int fd; + short flags; + short state; +} redisLibsdeventEvents; + +static void redisLibsdeventDestroy(redisLibsdeventEvents *e) { + if (e->fdSource) { + e->fdSource = sd_event_source_disable_unref(e->fdSource); + } + if (e->timerSource) { + e->timerSource = sd_event_source_disable_unref(e->timerSource); + } + sd_event_unref(e->event); + hi_free(e); +} + +static int redisLibsdeventTimeoutHandler(sd_event_source *s, uint64_t usec, void *userdata) { + ((void)s); + ((void)usec); + redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata; + redisAsyncHandleTimeout(e->context); + return 0; +} + +static int redisLibsdeventHandler(sd_event_source *s, int fd, uint32_t event, void *userdata) { + ((void)s); + ((void)fd); + redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata; + e->state |= REDIS_LIBSDEVENT_ENTERED; + +#define CHECK_DELETED() if (e->state & REDIS_LIBSDEVENT_DELETED) {\ + redisLibsdeventDestroy(e);\ + return 0; \ + } + + if ((event & EPOLLIN) && e->context && (e->state & REDIS_LIBSDEVENT_DELETED) == 0) { + redisAsyncHandleRead(e->context); + CHECK_DELETED(); + } + + if ((event & EPOLLOUT) && e->context && (e->state & REDIS_LIBSDEVENT_DELETED) == 0) { + redisAsyncHandleWrite(e->context); + CHECK_DELETED(); + } + + e->state &= ~REDIS_LIBSDEVENT_ENTERED; +#undef CHECK_DELETED + + return 0; +} + +static void redisLibsdeventAddRead(void *userdata) { + redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata; + + if (e->flags & EPOLLIN) { + return; + } + + e->flags |= EPOLLIN; + + if (e->flags & EPOLLOUT) { + sd_event_source_set_io_events(e->fdSource, e->flags); + } else { + sd_event_add_io(e->event, &e->fdSource, e->fd, e->flags, redisLibsdeventHandler, e); + } +} + +static void redisLibsdeventDelRead(void *userdata) { + redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata; + + e->flags &= ~EPOLLIN; + + if (e->flags) { + sd_event_source_set_io_events(e->fdSource, e->flags); + } else { + e->fdSource = sd_event_source_disable_unref(e->fdSource); + } +} + +static void redisLibsdeventAddWrite(void *userdata) { + redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata; + + if (e->flags & EPOLLOUT) { + return; + } + + e->flags |= EPOLLOUT; + + if (e->flags & EPOLLIN) { + sd_event_source_set_io_events(e->fdSource, e->flags); + } else { + sd_event_add_io(e->event, &e->fdSource, e->fd, e->flags, redisLibsdeventHandler, e); + } +} + +static void redisLibsdeventDelWrite(void *userdata) { + redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata; + + e->flags &= ~EPOLLOUT; + + if (e->flags) { + sd_event_source_set_io_events(e->fdSource, e->flags); + } else { + e->fdSource = sd_event_source_disable_unref(e->fdSource); + } +} + +static void redisLibsdeventCleanup(void *userdata) { + redisLibsdeventEvents *e = (redisLibsdeventEvents*)userdata; + + if (!e) { + return; + } + + if (e->state & REDIS_LIBSDEVENT_ENTERED) { + e->state |= REDIS_LIBSDEVENT_DELETED; + } else { + redisLibsdeventDestroy(e); + } +} + +static void redisLibsdeventSetTimeout(void *userdata, struct timeval tv) { + redisLibsdeventEvents *e = (redisLibsdeventEvents *)userdata; + + uint64_t usec = tv.tv_sec * 1000000 + tv.tv_usec; + if (!e->timerSource) { + sd_event_add_time_relative(e->event, &e->timerSource, CLOCK_MONOTONIC, usec, 1, redisLibsdeventTimeoutHandler, e); + } else { + sd_event_source_set_time_relative(e->timerSource, usec); + } +} + +static int redisLibsdeventAttach(redisAsyncContext *ac, struct sd_event *event) { + redisContext *c = &(ac->c); + redisLibsdeventEvents *e; + + /* Nothing should be attached when something is already attached */ + if (ac->ev.data != NULL) + return REDIS_ERR; + + /* Create container for context and r/w events */ + e = (redisLibsdeventEvents*)hi_calloc(1, sizeof(*e)); + if (e == NULL) + return REDIS_ERR; + + /* Initialize and increase event refcount */ + e->context = ac; + e->event = event; + e->fd = c->fd; + sd_event_ref(event); + + /* Register functions to start/stop listening for events */ + ac->ev.addRead = redisLibsdeventAddRead; + ac->ev.delRead = redisLibsdeventDelRead; + ac->ev.addWrite = redisLibsdeventAddWrite; + ac->ev.delWrite = redisLibsdeventDelWrite; + ac->ev.cleanup = redisLibsdeventCleanup; + ac->ev.scheduleTimer = redisLibsdeventSetTimeout; + ac->ev.data = e; + + return REDIS_OK; +} +#endif diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 23b6a92..214898b 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -37,6 +37,12 @@ IF (LIBUV) TARGET_LINK_LIBRARIES(example-libuv hiredis uv) ENDIF() +FIND_PATH(LIBSDEVENT systemd/sd-event.h) +IF (LIBSDEVENT) + ADD_EXECUTABLE(example-libsdevent example-libsdevent.c) + TARGET_LINK_LIBRARIES(example-libsdevent hiredis systemd) +ENDIF() + IF (APPLE) FIND_LIBRARY(CF CoreFoundation) ADD_EXECUTABLE(example-macosx example-macosx.c) diff --git a/examples/example-libsdevent.c b/examples/example-libsdevent.c new file mode 100644 index 0000000..c3b902b --- /dev/null +++ b/examples/example-libsdevent.c @@ -0,0 +1,86 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <signal.h> + +#include <hiredis.h> +#include <async.h> +#include <adapters/libsdevent.h> + +void debugCallback(redisAsyncContext *c, void *r, void *privdata) { + (void)privdata; + redisReply *reply = r; + if (reply == NULL) { + /* The DEBUG SLEEP command will almost always fail, because we have set a 1 second timeout */ + printf("`DEBUG SLEEP` error: %s\n", c->errstr ? c->errstr : "unknown error"); + return; + } + /* Disconnect after receiving the reply of DEBUG SLEEP (which will not)*/ + redisAsyncDisconnect(c); +} + +void getCallback(redisAsyncContext *c, void *r, void *privdata) { + redisReply *reply = r; + if (reply == NULL) { + printf("`GET key` error: %s\n", c->errstr ? c->errstr : "unknown error"); + return; + } + printf("`GET key` result: argv[%s]: %s\n", (char*)privdata, reply->str); + + /* start another request that demonstrate timeout */ + redisAsyncCommand(c, debugCallback, NULL, "DEBUG SLEEP %f", 1.5); +} + +void connectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("connect error: %s\n", c->errstr); + return; + } + printf("Connected...\n"); +} + +void disconnectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("disconnect because of error: %s\n", c->errstr); + return; + } + printf("Disconnected...\n"); +} + +int main (int argc, char **argv) { + signal(SIGPIPE, SIG_IGN); + + struct sd_event *event; + sd_event_default(&event); + + redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); + if (c->err) { + printf("Error: %s\n", c->errstr); + redisAsyncFree(c); + return 1; + } + + redisLibsdeventAttach(c,event); + redisAsyncSetConnectCallback(c,connectCallback); + redisAsyncSetDisconnectCallback(c,disconnectCallback); + redisAsyncSetTimeout(c, (struct timeval){ .tv_sec = 1, .tv_usec = 0}); + + /* + In this demo, we first `set key`, then `get key` to demonstrate the basic usage of libsdevent adapter. + Then in `getCallback`, we start a `debug sleep` command to create 1.5 second long request. + Because we have set a 1 second timeout to the connection, the command will always fail with a + timeout error, which is shown in the `debugCallback`. + */ + + redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); + redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); + + /* sd-event does not quit when there are no handlers registered. Manually exit after 1.5 seconds */ + sd_event_source *s; + sd_event_add_time_relative(event, &s, CLOCK_MONOTONIC, 1500000, 1, NULL, NULL); + + sd_event_loop(event); + sd_event_source_disable_unref(s); + sd_event_unref(event); + return 0; +} |