diff options
| author | Kristján Valur Jónsson <sweskman@gmail.com> | 2021-03-27 14:19:07 +0000 | 
|---|---|---|
| committer | Kristján Valur Jónsson <sweskman@gmail.com> | 2022-07-05 11:14:03 +0000 | 
| commit | 31c91408efe53623e2e523fa9e5aed7a5b91af21 (patch) | |
| tree | ebe497ed1440a7441c1860bd7a5005ae343097e8 | |
| parent | 8a15f4d6578560f2a375c32fc567c4c88335c2a8 (diff) | |
| download | hiredict-31c91408efe53623e2e523fa9e5aed7a5b91af21.tar.xz | |
Polling adapter and example
| -rw-r--r-- | Makefile | 5 | ||||
| -rw-r--r-- | adapters/poll.h | 197 | ||||
| -rw-r--r-- | examples/example-poll.c | 62 | 
3 files changed, 263 insertions, 1 deletions
| @@ -4,7 +4,7 @@  # This file is released under the BSD license, see the COPYING file  OBJ=alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o -EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push +EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push hiredis-example-poll  TESTS=hiredis-test  LIBNAME=libhiredis  PKGCONFNAME=hiredis.pc @@ -192,6 +192,9 @@ hiredis-example-macosx: examples/example-macosx.c adapters/macosx.h $(STLIBNAME)  hiredis-example-ssl: examples/example-ssl.c $(STLIBNAME) $(SSL_STLIBNAME)  	$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(SSL_STLIBNAME) $(REAL_LDFLAGS) $(SSL_LDFLAGS) +hiredis-example-poll: examples/example-poll.c adapters/poll.h $(STLIBNAME) +	$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS) +  ifndef AE_DIR  hiredis-example-ae:  	@echo "Please specify AE_DIR (e.g. <redis repository>/src)" diff --git a/adapters/poll.h b/adapters/poll.h new file mode 100644 index 0000000..f138650 --- /dev/null +++ b/adapters/poll.h @@ -0,0 +1,197 @@ + +#ifndef HIREDIS_POLL_H +#define HIREDIS_POLL_H + +#include "../async.h" +#include "../sockcompat.h" +#include <string.h> // for memset +#include <errno.h> + +/* Values to return from redisPollTick */ +#define REDIS_POLL_HANDLED_READ    1 +#define REDIS_POLL_HANDLED_WRITE   2 +#define REDIS_POLL_HANDLED_TIMEOUT 4 + +/* An adapter to allow manual polling of the async context by checking the state + * of the underlying file descriptor.  Useful in cases where there is no formal + * IO event loop but regular ticking can be used, such as in game engines. */ + +typedef struct redisPollEvents { +    redisAsyncContext *context; +    redisFD fd; +    char reading, writing; +    char in_tick; +    char deleted; +    double deadline; +} redisPollEvents; + +static double redisPollTimevalToDouble(struct timeval *tv) { +    if (tv == NULL) +        return 0.0; +    return tv->tv_sec + tv->tv_usec / 1000000.00; +} + +static double redisPollGetNow(void) { +#ifndef _MSC_VER +    struct timeval tv; +    gettimeofday(&tv,NULL); +    return redisPollTimevalToDouble(&tv); +#else +    FILETIME ft; +    ULARGE_INTEGER li; +    GetSystemTimeAsFileTime(&ft); +    li.HighPart = ft.dwHighDateTime; +    li.LowPart = ft.dwLowDateTime; +    return (double)li.QuadPart * 1e-7; +#endif +} + +/* Poll for io, handling any pending callbacks.  The timeout argument can be + * positive to wait for a maximum given time for IO, zero to poll, or negative + * to wait forever */ +static int redisPollTick(redisAsyncContext *ac, double timeout) { +    int reading, writing; +    struct pollfd pfd; +    int handled; +    int ns; +    int itimeout; + +    redisPollEvents *e = (redisPollEvents*)ac->ev.data; +    if (!e) +        return 0; + +    /* local flags, won't get changed during callbacks */ +    reading = e->reading; +    writing = e->writing; +    if (!reading && !writing) +        return 0; + +    pfd.fd = e->fd; +    pfd.events = 0; +    if (reading) +        pfd.events = POLLIN;    +    if (writing) +        pfd.events |= POLLOUT; + +    if (timeout >= 0.0) { +        itimeout = (int)(timeout * 1000.0); +    } else { +        itimeout = -1; +    } + +    ns = poll(&pfd, 1, itimeout); +    if (ns < 0) { +        /* ignore the EINTR error */ +        if (errno != EINTR) +            return ns; +        ns = 0; +    } +     +    handled = 0; +    e->in_tick = 1; +    if (ns) { +        if (reading && (pfd.revents & POLLIN)) { +            redisAsyncHandleRead(ac); +            handled |= REDIS_POLL_HANDLED_READ; +        } +        /* on Windows, connection failure is indicated with the Exception fdset. +         * handle it the same as writable. */ +        if (writing && (pfd.revents & (POLLOUT | POLLERR))) { +            /* context Read callback may have caused context to be deleted, e.g. +               by doing an redisAsyncDisconnect() */ +            if (!e->deleted) { +                redisAsyncHandleWrite(ac); +                handled |= REDIS_POLL_HANDLED_WRITE; +            } +        } +    } + +    /* perform timeouts */ +    if (!e->deleted && e->deadline != 0.0) { +        double now = redisPollGetNow(); +        if (now >= e->deadline) { +            /* deadline has passed.  disable timeout and perform callback */ +            e->deadline = 0.0; +            redisAsyncHandleTimeout(ac); +            handled |= REDIS_POLL_HANDLED_TIMEOUT; +        } +    } + +    /* do a delayed cleanup if required */ +    if (e->deleted) +        hi_free(e); +    else +        e->in_tick = 0; + +    return handled; +} + +static void redisPollAddRead(void *data) { +    redisPollEvents *e = (redisPollEvents*)data; +    e->reading = 1; +} + +static void redisPollDelRead(void *data) { +    redisPollEvents *e = (redisPollEvents*)data; +    e->reading = 0; +} + +static void redisPollAddWrite(void *data) { +    redisPollEvents *e = (redisPollEvents*)data; +    e->writing = 1; +} + +static void redisPollDelWrite(void *data) { +    redisPollEvents *e = (redisPollEvents*)data; +    e->writing = 0; +} + +static void redisPollCleanup(void *data) { +    redisPollEvents *e = (redisPollEvents*)data; + +    /* if we are currently processing a tick, postpone deletion */ +    if (e->in_tick) +        e->deleted = 1; +    else +        hi_free(e); +} + +static void redisPollScheduleTimer(void *data, struct timeval tv) +{ +    redisPollEvents *e = (redisPollEvents*)data; +    double now = redisPollGetNow(); +    e->deadline = now + redisPollTimevalToDouble(&tv); +} + +static int redisPollAttach(redisAsyncContext *ac) { +    redisContext *c = &(ac->c); +    redisPollEvents *e; + +    /* Nothing should be attached when something is already attached */ +    if (ac->ev.data != NULL) +        return REDIS_ERR; + +    /* Create container for context and r/w events */ +    e = (redisPollEvents*)hi_malloc(sizeof(*e)); +    if (e == NULL) +        return REDIS_ERR; +    memset(e, 0, sizeof(*e)); + +    e->context = ac; +    e->fd = c->fd; +    e->reading = e->writing = 0; +    e->in_tick = e->deleted = 0; +    e->deadline = 0.0; + +    /* Register functions to start/stop listening for events */ +    ac->ev.addRead = redisPollAddRead; +    ac->ev.delRead = redisPollDelRead; +    ac->ev.addWrite = redisPollAddWrite; +    ac->ev.delWrite = redisPollDelWrite; +    ac->ev.scheduleTimer = redisPollScheduleTimer; +    ac->ev.cleanup = redisPollCleanup; +    ac->ev.data = e; + +    return REDIS_OK; +} +#endif /* HIREDIS_POLL_H */ diff --git a/examples/example-poll.c b/examples/example-poll.c new file mode 100644 index 0000000..954673d --- /dev/null +++ b/examples/example-poll.c @@ -0,0 +1,62 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <signal.h> +#include <unistd.h> + +#include <async.h> +#include <adapters/poll.h> + +/* Put in the global scope, so that loop can be explicitly stopped */ +static int exit_loop = 0; + +void getCallback(redisAsyncContext *c, void *r, void *privdata) { +    redisReply *reply = r; +    if (reply == NULL) return; +    printf("argv[%s]: %s\n", (char*)privdata, reply->str); + +    /* Disconnect after receiving the reply to GET */ +    redisAsyncDisconnect(c); +} + +void connectCallback(const redisAsyncContext *c, int status) { +    if (status != REDIS_OK) { +        printf("Error: %s\n", c->errstr); +        exit_loop = 1; +        return; +    } + +    printf("Connected...\n"); +} + +void disconnectCallback(const redisAsyncContext *c, int status) { +    exit_loop = 1; +    if (status != REDIS_OK) { +        printf("Error: %s\n", c->errstr); +        return; +    } + +    printf("Disconnected...\n"); +} + +int main (int argc, char **argv) { +    signal(SIGPIPE, SIG_IGN); + +    redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); +    if (c->err) { +        /* Let *c leak for now... */ +        printf("Error: %s\n", c->errstr); +        return 1; +    } + +    redisPollAttach(c); +    redisAsyncSetConnectCallback(c,connectCallback); +    redisAsyncSetDisconnectCallback(c,disconnectCallback); +    redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); +    redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); +    while (!exit_loop) +    { +        redisPollTick(c, 0.1); +    } +    return 0; +} | 
