From 31c91408efe53623e2e523fa9e5aed7a5b91af21 Mon Sep 17 00:00:00 2001 From: Kristján Valur Jónsson Date: Sat, 27 Mar 2021 14:19:07 +0000 Subject: Polling adapter and example --- Makefile | 5 +- adapters/poll.h | 197 ++++++++++++++++++++++++++++++++++++++++++++++++ examples/example-poll.c | 62 +++++++++++++++ 3 files changed, 263 insertions(+), 1 deletion(-) create mode 100644 adapters/poll.h create mode 100644 examples/example-poll.c diff --git a/Makefile b/Makefile index a2ad84c..667a600 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ # This file is released under the BSD license, see the COPYING file OBJ=alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o -EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push +EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push hiredis-example-poll TESTS=hiredis-test LIBNAME=libhiredis PKGCONFNAME=hiredis.pc @@ -192,6 +192,9 @@ hiredis-example-macosx: examples/example-macosx.c adapters/macosx.h $(STLIBNAME) hiredis-example-ssl: examples/example-ssl.c $(STLIBNAME) $(SSL_STLIBNAME) $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(SSL_STLIBNAME) $(REAL_LDFLAGS) $(SSL_LDFLAGS) +hiredis-example-poll: examples/example-poll.c adapters/poll.h $(STLIBNAME) + $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS) + ifndef AE_DIR hiredis-example-ae: @echo "Please specify AE_DIR (e.g. /src)" diff --git a/adapters/poll.h b/adapters/poll.h new file mode 100644 index 0000000..f138650 --- /dev/null +++ b/adapters/poll.h @@ -0,0 +1,197 @@ + +#ifndef HIREDIS_POLL_H +#define HIREDIS_POLL_H + +#include "../async.h" +#include "../sockcompat.h" +#include // for memset +#include + +/* Values to return from redisPollTick */ +#define REDIS_POLL_HANDLED_READ 1 +#define REDIS_POLL_HANDLED_WRITE 2 +#define REDIS_POLL_HANDLED_TIMEOUT 4 + +/* An adapter to allow manual polling of the async context by checking the state + * of the underlying file descriptor. Useful in cases where there is no formal + * IO event loop but regular ticking can be used, such as in game engines. */ + +typedef struct redisPollEvents { + redisAsyncContext *context; + redisFD fd; + char reading, writing; + char in_tick; + char deleted; + double deadline; +} redisPollEvents; + +static double redisPollTimevalToDouble(struct timeval *tv) { + if (tv == NULL) + return 0.0; + return tv->tv_sec + tv->tv_usec / 1000000.00; +} + +static double redisPollGetNow(void) { +#ifndef _MSC_VER + struct timeval tv; + gettimeofday(&tv,NULL); + return redisPollTimevalToDouble(&tv); +#else + FILETIME ft; + ULARGE_INTEGER li; + GetSystemTimeAsFileTime(&ft); + li.HighPart = ft.dwHighDateTime; + li.LowPart = ft.dwLowDateTime; + return (double)li.QuadPart * 1e-7; +#endif +} + +/* Poll for io, handling any pending callbacks. The timeout argument can be + * positive to wait for a maximum given time for IO, zero to poll, or negative + * to wait forever */ +static int redisPollTick(redisAsyncContext *ac, double timeout) { + int reading, writing; + struct pollfd pfd; + int handled; + int ns; + int itimeout; + + redisPollEvents *e = (redisPollEvents*)ac->ev.data; + if (!e) + return 0; + + /* local flags, won't get changed during callbacks */ + reading = e->reading; + writing = e->writing; + if (!reading && !writing) + return 0; + + pfd.fd = e->fd; + pfd.events = 0; + if (reading) + pfd.events = POLLIN; + if (writing) + pfd.events |= POLLOUT; + + if (timeout >= 0.0) { + itimeout = (int)(timeout * 1000.0); + } else { + itimeout = -1; + } + + ns = poll(&pfd, 1, itimeout); + if (ns < 0) { + /* ignore the EINTR error */ + if (errno != EINTR) + return ns; + ns = 0; + } + + handled = 0; + e->in_tick = 1; + if (ns) { + if (reading && (pfd.revents & POLLIN)) { + redisAsyncHandleRead(ac); + handled |= REDIS_POLL_HANDLED_READ; + } + /* on Windows, connection failure is indicated with the Exception fdset. + * handle it the same as writable. */ + if (writing && (pfd.revents & (POLLOUT | POLLERR))) { + /* context Read callback may have caused context to be deleted, e.g. + by doing an redisAsyncDisconnect() */ + if (!e->deleted) { + redisAsyncHandleWrite(ac); + handled |= REDIS_POLL_HANDLED_WRITE; + } + } + } + + /* perform timeouts */ + if (!e->deleted && e->deadline != 0.0) { + double now = redisPollGetNow(); + if (now >= e->deadline) { + /* deadline has passed. disable timeout and perform callback */ + e->deadline = 0.0; + redisAsyncHandleTimeout(ac); + handled |= REDIS_POLL_HANDLED_TIMEOUT; + } + } + + /* do a delayed cleanup if required */ + if (e->deleted) + hi_free(e); + else + e->in_tick = 0; + + return handled; +} + +static void redisPollAddRead(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + e->reading = 1; +} + +static void redisPollDelRead(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + e->reading = 0; +} + +static void redisPollAddWrite(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + e->writing = 1; +} + +static void redisPollDelWrite(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + e->writing = 0; +} + +static void redisPollCleanup(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + + /* if we are currently processing a tick, postpone deletion */ + if (e->in_tick) + e->deleted = 1; + else + hi_free(e); +} + +static void redisPollScheduleTimer(void *data, struct timeval tv) +{ + redisPollEvents *e = (redisPollEvents*)data; + double now = redisPollGetNow(); + e->deadline = now + redisPollTimevalToDouble(&tv); +} + +static int redisPollAttach(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + redisPollEvents *e; + + /* Nothing should be attached when something is already attached */ + if (ac->ev.data != NULL) + return REDIS_ERR; + + /* Create container for context and r/w events */ + e = (redisPollEvents*)hi_malloc(sizeof(*e)); + if (e == NULL) + return REDIS_ERR; + memset(e, 0, sizeof(*e)); + + e->context = ac; + e->fd = c->fd; + e->reading = e->writing = 0; + e->in_tick = e->deleted = 0; + e->deadline = 0.0; + + /* Register functions to start/stop listening for events */ + ac->ev.addRead = redisPollAddRead; + ac->ev.delRead = redisPollDelRead; + ac->ev.addWrite = redisPollAddWrite; + ac->ev.delWrite = redisPollDelWrite; + ac->ev.scheduleTimer = redisPollScheduleTimer; + ac->ev.cleanup = redisPollCleanup; + ac->ev.data = e; + + return REDIS_OK; +} +#endif /* HIREDIS_POLL_H */ diff --git a/examples/example-poll.c b/examples/example-poll.c new file mode 100644 index 0000000..954673d --- /dev/null +++ b/examples/example-poll.c @@ -0,0 +1,62 @@ +#include +#include +#include +#include +#include + +#include +#include + +/* Put in the global scope, so that loop can be explicitly stopped */ +static int exit_loop = 0; + +void getCallback(redisAsyncContext *c, void *r, void *privdata) { + redisReply *reply = r; + if (reply == NULL) return; + printf("argv[%s]: %s\n", (char*)privdata, reply->str); + + /* Disconnect after receiving the reply to GET */ + redisAsyncDisconnect(c); +} + +void connectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + exit_loop = 1; + return; + } + + printf("Connected...\n"); +} + +void disconnectCallback(const redisAsyncContext *c, int status) { + exit_loop = 1; + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + + printf("Disconnected...\n"); +} + +int main (int argc, char **argv) { + signal(SIGPIPE, SIG_IGN); + + redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); + if (c->err) { + /* Let *c leak for now... */ + printf("Error: %s\n", c->errstr); + return 1; + } + + redisPollAttach(c); + redisAsyncSetConnectCallback(c,connectCallback); + redisAsyncSetDisconnectCallback(c,disconnectCallback); + redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); + redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); + while (!exit_loop) + { + redisPollTick(c, 0.1); + } + return 0; +} -- cgit v1.2.3 From 4b901d44ad38670f223238edec93dcf3e1c57ac2 Mon Sep 17 00:00:00 2001 From: Kristján Valur Jónsson Date: Tue, 20 Apr 2021 11:35:20 +0000 Subject: Initial async tests --- test.c | 179 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) diff --git a/test.c b/test.c index b831680..8941c4f 100644 --- a/test.c +++ b/test.c @@ -15,6 +15,7 @@ #include "hiredis.h" #include "async.h" +#include "adapters/poll.h" #ifdef HIREDIS_TEST_SSL #include "hiredis_ssl.h" #endif @@ -1898,6 +1899,174 @@ static void test_monitor(struct config config) { } #endif /* HIREDIS_TEST_ASYNC */ +/* tests for async api using polling adapter, requires no extra libraries*/ + +/* a static context for the async tests */ +typedef enum astest_no +{ + ASTEST_CONNECT=0, + ASTEST_CONN_TIMEOUT, + ASTEST_PINGPONG +}astest_no; + +struct _astest { + redisAsyncContext *ac; + astest_no testno; + int counter; + int connects; + int connect_status; + int disconnects; + int disconnect_status; + int connected; + int err; + char errstr[256]; +}; +static struct _astest astest; +static void asCleanup(void* data) +{ + struct _astest *t = (struct _astest *)data; + t->ac = NULL; +} + +static void connectCallback(const redisAsyncContext *c, int status) { + struct _astest *t = (struct _astest *)c->data; + assert(t == &astest); + assert(t->connects == 0); + t->err = c->err; + strcpy(t->errstr, c->errstr); + t->connects++; + t->connect_status = status; + t->connected = status == REDIS_OK ? 1 : -1; +} +static void disconnectCallback(const redisAsyncContext *c, int status) { + assert(c->data == (void*)&astest); + assert(astest.disconnects == 0); + astest.err = c->err; + strcpy(astest.errstr, c->errstr); + astest.disconnects++; + astest.disconnect_status = status; + astest.connected = 0; +} + +static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _privdata) +{ + redisReply *reply = (redisReply*)_reply; + struct _astest *t = (struct _astest *)ac->data; + assert(t == &astest); + (void)_privdata; + t->err = ac->err; + strcpy(t->errstr, ac->errstr); + if (t->testno == ASTEST_PINGPONG) + { + test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0); + redisAsyncFree(ac); + } +} + +static redisAsyncContext *do_aconnect(struct config config, astest_no testno) +{ + redisOptions options = {0}; + memset(&astest, 0, sizeof(astest)); + + astest.testno = testno; + astest.connect_status = astest.disconnect_status = -2; + + if (config.type == CONN_TCP) { + options.type = REDIS_CONN_TCP; + options.connect_timeout = &config.tcp.timeout; + REDIS_OPTIONS_SET_TCP(&options, config.tcp.host, config.tcp.port); + } else if (config.type == CONN_SSL) { + options.type = REDIS_CONN_TCP; + options.connect_timeout = &config.tcp.timeout; + REDIS_OPTIONS_SET_TCP(&options, config.ssl.host, config.ssl.port); + } else if (config.type == CONN_UNIX) { + options.type = REDIS_CONN_UNIX; + options.endpoint.unix_socket = config.unix_sock.path; + } else if (config.type == CONN_FD) { + options.type = REDIS_CONN_USERFD; + /* Create a dummy connection just to get an fd to inherit */ + redisContext *dummy_ctx = redisConnectUnix(config.unix_sock.path); + if (dummy_ctx) { + redisFD fd = disconnect(dummy_ctx, 1); + printf("Connecting to inherited fd %d\n", (int)fd); + options.endpoint.fd = fd; + } + } + redisAsyncContext *c = redisAsyncConnectWithOptions(&options); + assert(c); + astest.ac = c; + c->data = &astest; + c->dataCleanup = asCleanup; + redisPollAttach(c); + redisAsyncSetConnectCallback(c, connectCallback); + redisAsyncSetDisconnectCallback(c, disconnectCallback); + return c; +} + +static void as_printerr(void) { + printf("Async err %d : %s\n", astest.err, astest.errstr); +} + +#define ASASSERT(e) do { \ + if (!(e)) \ + as_printerr(); \ + assert(e); \ +} while (0); + +static void test_async_polling(struct config config) { + int status; + redisAsyncContext *c; + struct config defaultconfig = config; + + test("Async connect: "); + c = do_aconnect(config, ASTEST_CONNECT); + assert(c); + while(astest.connected == 0) + redisPollTick(c, 0.1); + assert(astest.connects == 1); + ASASSERT(astest.connect_status == REDIS_OK); + assert(astest.disconnects == 0); + test_cond(astest.connected == 1); + + test("Async free after connect: "); + assert(astest.ac != NULL); + redisAsyncFree(c); + assert(astest.disconnects == 1); + assert(astest.ac == NULL); + test_cond(astest.disconnect_status == REDIS_OK); + + if (config.type == CONN_TCP || config.type == CONN_SSL) { + /* timeout can only be simulated with network */ + test("Async connect timeout: "); + config.tcp.host = "192.168.254.254"; /* blackhole ip */ + config.tcp.timeout.tv_usec = 100000; + c = do_aconnect(config, ASTEST_CONN_TIMEOUT); + assert(c); + assert(c->err == 0); + while(astest.connected == 0) + redisPollTick(c, 0.1); + assert(astest.connected == -1); + /* + * freeing should not be done, clearing should have happened. + *redisAsyncFree(c); + */ + assert(astest.ac == NULL); + test_cond(astest.connect_status == REDIS_ERR); + config = defaultconfig; + } + + /* Test a ping/pong after connection */ + test("Async PING/PONG: "); + c = do_aconnect(config, ASTEST_PINGPONG); + while(astest.connected == 0) + redisPollTick(c, 0.1); + status = redisAsyncCommand(c, commandCallback, NULL, "PING"); + assert(status == REDIS_OK); + while(astest.ac) + redisPollTick(c, 0.1); + config = defaultconfig; +} + int main(int argc, char **argv) { struct config cfg = { .tcp = { @@ -2017,6 +2186,7 @@ int main(int argc, char **argv) { #endif #ifdef HIREDIS_TEST_ASYNC + cfg.type = CONN_TCP; printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port); cfg.type = CONN_TCP; @@ -2034,6 +2204,15 @@ int main(int argc, char **argv) { } #endif /* HIREDIS_TEST_ASYNC */ + cfg.type = CONN_TCP; + printf("\nTesting asynchronous API using polling_adapter TCP (%s:%d):\n", cfg.tcp.host, cfg.tcp.port); + test_async_polling(cfg); + if (test_unix_socket) { + cfg.type = CONN_UNIX; + printf("\nTesting asynchronous API using polling_adapter UNIX (%s):\n", cfg.unix_sock.path); + test_async_polling(cfg); + } + if (test_inherit_fd) { printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path); if (test_unix_socket) { -- cgit v1.2.3 From 2ccef30f3e00603c6e392502e1f910a95d6cb24a Mon Sep 17 00:00:00 2001 From: Kristján Valur Jónsson Date: Wed, 1 Dec 2021 10:14:30 +0000 Subject: Add regression test for issue #945 --- test.c | 49 ++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 3 deletions(-) diff --git a/test.c b/test.c index 8941c4f..e201cbc 100644 --- a/test.c +++ b/test.c @@ -1901,14 +1901,16 @@ static void test_monitor(struct config config) { /* tests for async api using polling adapter, requires no extra libraries*/ -/* a static context for the async tests */ +/* enum for the test cases, the callbacks have different logic based on them */ typedef enum astest_no { ASTEST_CONNECT=0, ASTEST_CONN_TIMEOUT, - ASTEST_PINGPONG + ASTEST_PINGPONG, + ASTEST_PINGPONG_TIMEOUT }astest_no; +/* a static context for the async tests */ struct _astest { redisAsyncContext *ac; astest_no testno; @@ -1922,6 +1924,17 @@ struct _astest { char errstr[256]; }; static struct _astest astest; + +static void asSleep(int ms) +{ +#if _MSC_VER + Sleep(ms); +#else + usleep(ms*1000); +#endif +} + +/* async callbacks */ static void asCleanup(void* data) { struct _astest *t = (struct _astest *)data; @@ -1961,6 +1974,18 @@ static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _p test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0); redisAsyncFree(ac); } + if (t->testno == ASTEST_PINGPONG_TIMEOUT) + { + /* two ping pongs */ + assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0); + if (++t->counter == 1) { + int status = redisAsyncCommand(ac, commandCallback, NULL, "PING"); + assert(status == REDIS_OK); + } else { + test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0); + redisAsyncFree(ac); + } + } } static redisAsyncContext *do_aconnect(struct config config, astest_no testno) @@ -2064,8 +2089,26 @@ static void test_async_polling(struct config config) { assert(status == REDIS_OK); while(astest.ac) redisPollTick(c, 0.1); - config = defaultconfig; + + /* Test a ping/pong after connection that didn't time out. + * see https://github.com/redis/hiredis/issues/945 + */ + if (config.type == CONN_TCP || config.type == CONN_SSL) { + test("Async PING/PONG after connect timeout: "); + config.tcp.timeout.tv_usec = 10000; /* 10ms */ + c = do_aconnect(config, ASTEST_PINGPONG_TIMEOUT); + while(astest.connected == 0) + redisPollTick(c, 0.1); + /* sleep 0.1 s, allowing old timeout to arrive */ + asSleep(10); + status = redisAsyncCommand(c, commandCallback, NULL, "PING"); + assert(status == REDIS_OK); + while(astest.ac) + redisPollTick(c, 0.1); + config = defaultconfig; + } } +/* End of Async polling_adapter driven tests */ int main(int argc, char **argv) { struct config cfg = { -- cgit v1.2.3