summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Grunder <michael.grunder@gmail.com>2022-07-07 11:43:19 -0700
committerGitHub <noreply@github.com>2022-07-07 11:43:19 -0700
commiteaa2a7ee77f4ce25e73a23e6030d4fa4d138cb11 (patch)
treedbe10904e10917976524cd1b06a182bd49890ffd
parent8a15f4d6578560f2a375c32fc567c4c88335c2a8 (diff)
parent2ccef30f3e00603c6e392502e1f910a95d6cb24a (diff)
Merge pull request #932 from kristjanvalur/pr3
Polling adapter and example
-rw-r--r--Makefile5
-rw-r--r--adapters/poll.h197
-rw-r--r--examples/example-poll.c62
-rw-r--r--test.c222
4 files changed, 485 insertions, 1 deletions
diff --git a/Makefile b/Makefile
index a2ad84c..667a600 100644
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,7 @@
# This file is released under the BSD license, see the COPYING file
OBJ=alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o
-EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push
+EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push hiredis-example-poll
TESTS=hiredis-test
LIBNAME=libhiredis
PKGCONFNAME=hiredis.pc
@@ -192,6 +192,9 @@ hiredis-example-macosx: examples/example-macosx.c adapters/macosx.h $(STLIBNAME)
hiredis-example-ssl: examples/example-ssl.c $(STLIBNAME) $(SSL_STLIBNAME)
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(SSL_STLIBNAME) $(REAL_LDFLAGS) $(SSL_LDFLAGS)
+hiredis-example-poll: examples/example-poll.c adapters/poll.h $(STLIBNAME)
+ $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS)
+
ifndef AE_DIR
hiredis-example-ae:
@echo "Please specify AE_DIR (e.g. <redis repository>/src)"
diff --git a/adapters/poll.h b/adapters/poll.h
new file mode 100644
index 0000000..f138650
--- /dev/null
+++ b/adapters/poll.h
@@ -0,0 +1,197 @@
+
+#ifndef HIREDIS_POLL_H
+#define HIREDIS_POLL_H
+
+#include "../async.h"
+#include "../sockcompat.h"
+#include <string.h> // for memset
+#include <errno.h>
+
+/* Values to return from redisPollTick */
+#define REDIS_POLL_HANDLED_READ 1
+#define REDIS_POLL_HANDLED_WRITE 2
+#define REDIS_POLL_HANDLED_TIMEOUT 4
+
+/* An adapter to allow manual polling of the async context by checking the state
+ * of the underlying file descriptor. Useful in cases where there is no formal
+ * IO event loop but regular ticking can be used, such as in game engines. */
+
+typedef struct redisPollEvents {
+ redisAsyncContext *context;
+ redisFD fd;
+ char reading, writing;
+ char in_tick;
+ char deleted;
+ double deadline;
+} redisPollEvents;
+
+static double redisPollTimevalToDouble(struct timeval *tv) {
+ if (tv == NULL)
+ return 0.0;
+ return tv->tv_sec + tv->tv_usec / 1000000.00;
+}
+
+static double redisPollGetNow(void) {
+#ifndef _MSC_VER
+ struct timeval tv;
+ gettimeofday(&tv,NULL);
+ return redisPollTimevalToDouble(&tv);
+#else
+ FILETIME ft;
+ ULARGE_INTEGER li;
+ GetSystemTimeAsFileTime(&ft);
+ li.HighPart = ft.dwHighDateTime;
+ li.LowPart = ft.dwLowDateTime;
+ return (double)li.QuadPart * 1e-7;
+#endif
+}
+
+/* Poll for io, handling any pending callbacks. The timeout argument can be
+ * positive to wait for a maximum given time for IO, zero to poll, or negative
+ * to wait forever */
+static int redisPollTick(redisAsyncContext *ac, double timeout) {
+ int reading, writing;
+ struct pollfd pfd;
+ int handled;
+ int ns;
+ int itimeout;
+
+ redisPollEvents *e = (redisPollEvents*)ac->ev.data;
+ if (!e)
+ return 0;
+
+ /* local flags, won't get changed during callbacks */
+ reading = e->reading;
+ writing = e->writing;
+ if (!reading && !writing)
+ return 0;
+
+ pfd.fd = e->fd;
+ pfd.events = 0;
+ if (reading)
+ pfd.events = POLLIN;
+ if (writing)
+ pfd.events |= POLLOUT;
+
+ if (timeout >= 0.0) {
+ itimeout = (int)(timeout * 1000.0);
+ } else {
+ itimeout = -1;
+ }
+
+ ns = poll(&pfd, 1, itimeout);
+ if (ns < 0) {
+ /* ignore the EINTR error */
+ if (errno != EINTR)
+ return ns;
+ ns = 0;
+ }
+
+ handled = 0;
+ e->in_tick = 1;
+ if (ns) {
+ if (reading && (pfd.revents & POLLIN)) {
+ redisAsyncHandleRead(ac);
+ handled |= REDIS_POLL_HANDLED_READ;
+ }
+ /* on Windows, connection failure is indicated with the Exception fdset.
+ * handle it the same as writable. */
+ if (writing && (pfd.revents & (POLLOUT | POLLERR))) {
+ /* context Read callback may have caused context to be deleted, e.g.
+ by doing an redisAsyncDisconnect() */
+ if (!e->deleted) {
+ redisAsyncHandleWrite(ac);
+ handled |= REDIS_POLL_HANDLED_WRITE;
+ }
+ }
+ }
+
+ /* perform timeouts */
+ if (!e->deleted && e->deadline != 0.0) {
+ double now = redisPollGetNow();
+ if (now >= e->deadline) {
+ /* deadline has passed. disable timeout and perform callback */
+ e->deadline = 0.0;
+ redisAsyncHandleTimeout(ac);
+ handled |= REDIS_POLL_HANDLED_TIMEOUT;
+ }
+ }
+
+ /* do a delayed cleanup if required */
+ if (e->deleted)
+ hi_free(e);
+ else
+ e->in_tick = 0;
+
+ return handled;
+}
+
+static void redisPollAddRead(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+ e->reading = 1;
+}
+
+static void redisPollDelRead(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+ e->reading = 0;
+}
+
+static void redisPollAddWrite(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+ e->writing = 1;
+}
+
+static void redisPollDelWrite(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+ e->writing = 0;
+}
+
+static void redisPollCleanup(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+
+ /* if we are currently processing a tick, postpone deletion */
+ if (e->in_tick)
+ e->deleted = 1;
+ else
+ hi_free(e);
+}
+
+static void redisPollScheduleTimer(void *data, struct timeval tv)
+{
+ redisPollEvents *e = (redisPollEvents*)data;
+ double now = redisPollGetNow();
+ e->deadline = now + redisPollTimevalToDouble(&tv);
+}
+
+static int redisPollAttach(redisAsyncContext *ac) {
+ redisContext *c = &(ac->c);
+ redisPollEvents *e;
+
+ /* Nothing should be attached when something is already attached */
+ if (ac->ev.data != NULL)
+ return REDIS_ERR;
+
+ /* Create container for context and r/w events */
+ e = (redisPollEvents*)hi_malloc(sizeof(*e));
+ if (e == NULL)
+ return REDIS_ERR;
+ memset(e, 0, sizeof(*e));
+
+ e->context = ac;
+ e->fd = c->fd;
+ e->reading = e->writing = 0;
+ e->in_tick = e->deleted = 0;
+ e->deadline = 0.0;
+
+ /* Register functions to start/stop listening for events */
+ ac->ev.addRead = redisPollAddRead;
+ ac->ev.delRead = redisPollDelRead;
+ ac->ev.addWrite = redisPollAddWrite;
+ ac->ev.delWrite = redisPollDelWrite;
+ ac->ev.scheduleTimer = redisPollScheduleTimer;
+ ac->ev.cleanup = redisPollCleanup;
+ ac->ev.data = e;
+
+ return REDIS_OK;
+}
+#endif /* HIREDIS_POLL_H */
diff --git a/examples/example-poll.c b/examples/example-poll.c
new file mode 100644
index 0000000..954673d
--- /dev/null
+++ b/examples/example-poll.c
@@ -0,0 +1,62 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <unistd.h>
+
+#include <async.h>
+#include <adapters/poll.h>
+
+/* Put in the global scope, so that loop can be explicitly stopped */
+static int exit_loop = 0;
+
+void getCallback(redisAsyncContext *c, void *r, void *privdata) {
+ redisReply *reply = r;
+ if (reply == NULL) return;
+ printf("argv[%s]: %s\n", (char*)privdata, reply->str);
+
+ /* Disconnect after receiving the reply to GET */
+ redisAsyncDisconnect(c);
+}
+
+void connectCallback(const redisAsyncContext *c, int status) {
+ if (status != REDIS_OK) {
+ printf("Error: %s\n", c->errstr);
+ exit_loop = 1;
+ return;
+ }
+
+ printf("Connected...\n");
+}
+
+void disconnectCallback(const redisAsyncContext *c, int status) {
+ exit_loop = 1;
+ if (status != REDIS_OK) {
+ printf("Error: %s\n", c->errstr);
+ return;
+ }
+
+ printf("Disconnected...\n");
+}
+
+int main (int argc, char **argv) {
+ signal(SIGPIPE, SIG_IGN);
+
+ redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
+ if (c->err) {
+ /* Let *c leak for now... */
+ printf("Error: %s\n", c->errstr);
+ return 1;
+ }
+
+ redisPollAttach(c);
+ redisAsyncSetConnectCallback(c,connectCallback);
+ redisAsyncSetDisconnectCallback(c,disconnectCallback);
+ redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
+ redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
+ while (!exit_loop)
+ {
+ redisPollTick(c, 0.1);
+ }
+ return 0;
+}
diff --git a/test.c b/test.c
index b831680..e201cbc 100644
--- a/test.c
+++ b/test.c
@@ -15,6 +15,7 @@
#include "hiredis.h"
#include "async.h"
+#include "adapters/poll.h"
#ifdef HIREDIS_TEST_SSL
#include "hiredis_ssl.h"
#endif
@@ -1898,6 +1899,217 @@ static void test_monitor(struct config config) {
}
#endif /* HIREDIS_TEST_ASYNC */
+/* tests for async api using polling adapter, requires no extra libraries*/
+
+/* enum for the test cases, the callbacks have different logic based on them */
+typedef enum astest_no
+{
+ ASTEST_CONNECT=0,
+ ASTEST_CONN_TIMEOUT,
+ ASTEST_PINGPONG,
+ ASTEST_PINGPONG_TIMEOUT
+}astest_no;
+
+/* a static context for the async tests */
+struct _astest {
+ redisAsyncContext *ac;
+ astest_no testno;
+ int counter;
+ int connects;
+ int connect_status;
+ int disconnects;
+ int disconnect_status;
+ int connected;
+ int err;
+ char errstr[256];
+};
+static struct _astest astest;
+
+static void asSleep(int ms)
+{
+#if _MSC_VER
+ Sleep(ms);
+#else
+ usleep(ms*1000);
+#endif
+}
+
+/* async callbacks */
+static void asCleanup(void* data)
+{
+ struct _astest *t = (struct _astest *)data;
+ t->ac = NULL;
+}
+
+static void connectCallback(const redisAsyncContext *c, int status) {
+ struct _astest *t = (struct _astest *)c->data;
+ assert(t == &astest);
+ assert(t->connects == 0);
+ t->err = c->err;
+ strcpy(t->errstr, c->errstr);
+ t->connects++;
+ t->connect_status = status;
+ t->connected = status == REDIS_OK ? 1 : -1;
+}
+static void disconnectCallback(const redisAsyncContext *c, int status) {
+ assert(c->data == (void*)&astest);
+ assert(astest.disconnects == 0);
+ astest.err = c->err;
+ strcpy(astest.errstr, c->errstr);
+ astest.disconnects++;
+ astest.disconnect_status = status;
+ astest.connected = 0;
+}
+
+static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _privdata)
+{
+ redisReply *reply = (redisReply*)_reply;
+ struct _astest *t = (struct _astest *)ac->data;
+ assert(t == &astest);
+ (void)_privdata;
+ t->err = ac->err;
+ strcpy(t->errstr, ac->errstr);
+ if (t->testno == ASTEST_PINGPONG)
+ {
+ test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
+ redisAsyncFree(ac);
+ }
+ if (t->testno == ASTEST_PINGPONG_TIMEOUT)
+ {
+ /* two ping pongs */
+ assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
+ if (++t->counter == 1) {
+ int status = redisAsyncCommand(ac, commandCallback, NULL, "PING");
+ assert(status == REDIS_OK);
+ } else {
+ test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
+ redisAsyncFree(ac);
+ }
+ }
+}
+
+static redisAsyncContext *do_aconnect(struct config config, astest_no testno)
+{
+ redisOptions options = {0};
+ memset(&astest, 0, sizeof(astest));
+
+ astest.testno = testno;
+ astest.connect_status = astest.disconnect_status = -2;
+
+ if (config.type == CONN_TCP) {
+ options.type = REDIS_CONN_TCP;
+ options.connect_timeout = &config.tcp.timeout;
+ REDIS_OPTIONS_SET_TCP(&options, config.tcp.host, config.tcp.port);
+ } else if (config.type == CONN_SSL) {
+ options.type = REDIS_CONN_TCP;
+ options.connect_timeout = &config.tcp.timeout;
+ REDIS_OPTIONS_SET_TCP(&options, config.ssl.host, config.ssl.port);
+ } else if (config.type == CONN_UNIX) {
+ options.type = REDIS_CONN_UNIX;
+ options.endpoint.unix_socket = config.unix_sock.path;
+ } else if (config.type == CONN_FD) {
+ options.type = REDIS_CONN_USERFD;
+ /* Create a dummy connection just to get an fd to inherit */
+ redisContext *dummy_ctx = redisConnectUnix(config.unix_sock.path);
+ if (dummy_ctx) {
+ redisFD fd = disconnect(dummy_ctx, 1);
+ printf("Connecting to inherited fd %d\n", (int)fd);
+ options.endpoint.fd = fd;
+ }
+ }
+ redisAsyncContext *c = redisAsyncConnectWithOptions(&options);
+ assert(c);
+ astest.ac = c;
+ c->data = &astest;
+ c->dataCleanup = asCleanup;
+ redisPollAttach(c);
+ redisAsyncSetConnectCallback(c, connectCallback);
+ redisAsyncSetDisconnectCallback(c, disconnectCallback);
+ return c;
+}
+
+static void as_printerr(void) {
+ printf("Async err %d : %s\n", astest.err, astest.errstr);
+}
+
+#define ASASSERT(e) do { \
+ if (!(e)) \
+ as_printerr(); \
+ assert(e); \
+} while (0);
+
+static void test_async_polling(struct config config) {
+ int status;
+ redisAsyncContext *c;
+ struct config defaultconfig = config;
+
+ test("Async connect: ");
+ c = do_aconnect(config, ASTEST_CONNECT);
+ assert(c);
+ while(astest.connected == 0)
+ redisPollTick(c, 0.1);
+ assert(astest.connects == 1);
+ ASASSERT(astest.connect_status == REDIS_OK);
+ assert(astest.disconnects == 0);
+ test_cond(astest.connected == 1);
+
+ test("Async free after connect: ");
+ assert(astest.ac != NULL);
+ redisAsyncFree(c);
+ assert(astest.disconnects == 1);
+ assert(astest.ac == NULL);
+ test_cond(astest.disconnect_status == REDIS_OK);
+
+ if (config.type == CONN_TCP || config.type == CONN_SSL) {
+ /* timeout can only be simulated with network */
+ test("Async connect timeout: ");
+ config.tcp.host = "192.168.254.254"; /* blackhole ip */
+ config.tcp.timeout.tv_usec = 100000;
+ c = do_aconnect(config, ASTEST_CONN_TIMEOUT);
+ assert(c);
+ assert(c->err == 0);
+ while(astest.connected == 0)
+ redisPollTick(c, 0.1);
+ assert(astest.connected == -1);
+ /*
+ * freeing should not be done, clearing should have happened.
+ *redisAsyncFree(c);
+ */
+ assert(astest.ac == NULL);
+ test_cond(astest.connect_status == REDIS_ERR);
+ config = defaultconfig;
+ }
+
+ /* Test a ping/pong after connection */
+ test("Async PING/PONG: ");
+ c = do_aconnect(config, ASTEST_PINGPONG);
+ while(astest.connected == 0)
+ redisPollTick(c, 0.1);
+ status = redisAsyncCommand(c, commandCallback, NULL, "PING");
+ assert(status == REDIS_OK);
+ while(astest.ac)
+ redisPollTick(c, 0.1);
+
+ /* Test a ping/pong after connection that didn't time out.
+ * see https://github.com/redis/hiredis/issues/945
+ */
+ if (config.type == CONN_TCP || config.type == CONN_SSL) {
+ test("Async PING/PONG after connect timeout: ");
+ config.tcp.timeout.tv_usec = 10000; /* 10ms */
+ c = do_aconnect(config, ASTEST_PINGPONG_TIMEOUT);
+ while(astest.connected == 0)
+ redisPollTick(c, 0.1);
+ /* sleep 0.1 s, allowing old timeout to arrive */
+ asSleep(10);
+ status = redisAsyncCommand(c, commandCallback, NULL, "PING");
+ assert(status == REDIS_OK);
+ while(astest.ac)
+ redisPollTick(c, 0.1);
+ config = defaultconfig;
+ }
+}
+/* End of Async polling_adapter driven tests */
+
int main(int argc, char **argv) {
struct config cfg = {
.tcp = {
@@ -2017,6 +2229,7 @@ int main(int argc, char **argv) {
#endif
#ifdef HIREDIS_TEST_ASYNC
+ cfg.type = CONN_TCP;
printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
cfg.type = CONN_TCP;
@@ -2034,6 +2247,15 @@ int main(int argc, char **argv) {
}
#endif /* HIREDIS_TEST_ASYNC */
+ cfg.type = CONN_TCP;
+ printf("\nTesting asynchronous API using polling_adapter TCP (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
+ test_async_polling(cfg);
+ if (test_unix_socket) {
+ cfg.type = CONN_UNIX;
+ printf("\nTesting asynchronous API using polling_adapter UNIX (%s):\n", cfg.unix_sock.path);
+ test_async_polling(cfg);
+ }
+
if (test_inherit_fd) {
printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path);
if (test_unix_socket) {