summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/build.yml7
-rw-r--r--CMakeLists.txt15
-rw-r--r--Makefile5
-rw-r--r--README.md99
-rw-r--r--adapters/poll.h197
-rw-r--r--async.c121
-rw-r--r--async.h2
-rw-r--r--examples/example-poll.c62
-rw-r--r--net.c20
-rw-r--r--read.c6
-rw-r--r--sds.c15
-rw-r--r--sockcompat.c19
-rw-r--r--test.c247
-rwxr-xr-xtest.sh9
14 files changed, 753 insertions, 71 deletions
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 362bc77..7859ff4 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -14,7 +14,8 @@ jobs:
- name: Install dependencies
run: |
- sudo add-apt-repository -y ppa:chris-lea/redis-server
+ curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg
+ echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list
sudo apt-get update
sudo apt-get install -y redis-server valgrind libevent-dev
@@ -151,7 +152,8 @@ jobs:
- name: Install dependencies
run: |
- brew install openssl redis
+ brew install openssl redis@6.2
+ brew link redis@6.2 --force
- name: Build hiredis
run: USE_SSL=1 make
@@ -197,6 +199,7 @@ jobs:
HIREDIS_PATH: ${{ github.workspace }}
run: |
build_hiredis() {
+ git config --global --add safe.directory "$(cygpath -u $HIREDIS_PATH)"
cd $(cygpath -u $HIREDIS_PATH)
git clean -xfd
make
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 448d954..f77dfd5 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -50,13 +50,18 @@ ADD_LIBRARY(hiredis_static STATIC ${hiredis_sources})
ADD_LIBRARY(hiredis::hiredis ALIAS hiredis)
ADD_LIBRARY(hiredis::hiredis_static ALIAS hiredis_static)
+IF(NOT MSVC)
+ SET_TARGET_PROPERTIES(hiredis_static
+ PROPERTIES OUTPUT_NAME hiredis)
+ENDIF()
+
SET_TARGET_PROPERTIES(hiredis
PROPERTIES WINDOWS_EXPORT_ALL_SYMBOLS TRUE
VERSION "${HIREDIS_SONAME}")
+IF(WIN32)
SET_TARGET_PROPERTIES(hiredis_static
- PROPERTIES COMPILE_PDB_NAME hiredis_static)
-SET_TARGET_PROPERTIES(hiredis_static
- PROPERTIES COMPILE_PDB_NAME_DEBUG hiredis_static${CMAKE_DEBUG_POSTFIX})
+ PROPERTIES COMPILE_FLAGS /Z7)
+ENDIF()
IF(WIN32 OR MINGW)
TARGET_LINK_LIBRARIES(hiredis PUBLIC ws2_32 crypt32)
TARGET_LINK_LIBRARIES(hiredis_static PUBLIC ws2_32 crypt32)
@@ -160,6 +165,10 @@ IF(ENABLE_SSL)
${hiredis_ssl_sources})
ADD_LIBRARY(hiredis_ssl_static STATIC
${hiredis_ssl_sources})
+ IF(NOT MSVC)
+ SET_TARGET_PROPERTIES(hiredis_ssl_static
+ PROPERTIES OUTPUT_NAME hiredis_ssl)
+ ENDIF()
IF (APPLE)
SET_PROPERTY(TARGET hiredis_ssl PROPERTY LINK_FLAGS "-Wl,-undefined -Wl,dynamic_lookup")
diff --git a/Makefile b/Makefile
index a2ad84c..667a600 100644
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,7 @@
# This file is released under the BSD license, see the COPYING file
OBJ=alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o
-EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push
+EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push hiredis-example-poll
TESTS=hiredis-test
LIBNAME=libhiredis
PKGCONFNAME=hiredis.pc
@@ -192,6 +192,9 @@ hiredis-example-macosx: examples/example-macosx.c adapters/macosx.h $(STLIBNAME)
hiredis-example-ssl: examples/example-ssl.c $(STLIBNAME) $(SSL_STLIBNAME)
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(SSL_STLIBNAME) $(REAL_LDFLAGS) $(SSL_LDFLAGS)
+hiredis-example-poll: examples/example-poll.c adapters/poll.h $(STLIBNAME)
+ $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS)
+
ifndef AE_DIR
hiredis-example-ae:
@echo "Please specify AE_DIR (e.g. <redis repository>/src)"
diff --git a/README.md b/README.md
index ed66220..3907add 100644
--- a/README.md
+++ b/README.md
@@ -320,23 +320,48 @@ Redis. It returns a pointer to the newly created `redisAsyncContext` struct. The
should be checked after creation to see if there were errors creating the connection.
Because the connection that will be created is non-blocking, the kernel is not able to
instantly return if the specified host and port is able to accept a connection.
+In case of error, it is the caller's responsibility to free the context using `redisAsyncFree()`
*Note: A `redisAsyncContext` is not thread-safe.*
+An application function creating a connection might look like this:
+
```c
-redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
-if (c->err) {
- printf("Error: %s\n", c->errstr);
- // handle error
+void appConnect(myAppData *appData)
+{
+ redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
+ if (c->err) {
+ printf("Error: %s\n", c->errstr);
+ // handle error
+ redisAsyncFree(c);
+ c = NULL;
+ } else {
+ appData->context = c;
+ appData->connecting = 1;
+ c->data = appData; /* store application pointer for the callbacks */
+ redisAsyncSetConnectCallback(c, appOnConnect);
+ redisAsyncSetDisconnectCallback(c, appOnDisconnect);
+ }
}
+
```
-The asynchronous context can hold a disconnect callback function that is called when the
-connection is disconnected (either because of an error or per user request). This function should
+
+The asynchronous context _should_ hold a *connect* callback function that is called when the connection
+attempt completes, either successfully or with an error.
+It _can_ also hold a *disconnect* callback function that is called when the
+connection is disconnected (either because of an error or per user request). Both callbacks should
have the following prototype:
```c
void(const redisAsyncContext *c, int status);
```
+
+On a *connect*, the `status` argument is set to `REDIS_OK` if the connection attempt succeeded. In this
+case, the context is ready to accept commands. If it is called with `REDIS_ERR` then the
+connection attempt failed. The `err` field in the context can be accessed to find out the cause of the error.
+After a failed connection attempt, the context object is automatically freed by the libary after calling
+the connect callback. This may be a good point to create a new context and retry the connection.
+
On a disconnect, the `status` argument is set to `REDIS_OK` when disconnection was initiated by the
user, or `REDIS_ERR` when the disconnection was caused by an error. When it is `REDIS_ERR`, the `err`
field in the context can be accessed to find out the cause of the error.
@@ -344,12 +369,45 @@ field in the context can be accessed to find out the cause of the error.
The context object is always freed after the disconnect callback fired. When a reconnect is needed,
the disconnect callback is a good point to do so.
-Setting the disconnect callback can only be done once per context. For subsequent calls it will
-return `REDIS_ERR`. The function to set the disconnect callback has the following prototype:
+Setting the connect or disconnect callbacks can only be done once per context. For subsequent calls the
+api will return `REDIS_ERR`. The function to set the callbacks have the following prototype:
```c
+int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
```
-`ac->data` may be used to pass user data to this callback, the same can be done for redisConnectCallback.
+`ac->data` may be used to pass user data to both of thes callbacks. An typical implementation
+might look something like this:
+```c
+void appOnConnect(redisAsyncContext *c, int status)
+{
+ myAppData *appData = (myAppData*)c->data; /* get my application specific context*/
+ appData->connecting = 0;
+ if (status == REDIS_OK) {
+ appData->connected = 1;
+ } else {
+ appData->connected = 0;
+ appData->err = c->err;
+ appData->context = NULL; /* avoid stale pointer when callback returns */
+ }
+ appAttemptReconnect();
+}
+
+void appOnDisconnect(redisAsyncContext *c, int status)
+{
+ myAppData *appData = (myAppData*)c->data; /* get my application specific context*/
+ appData->connected = 0;
+ appData->err = c->err;
+ appData->context = NULL; /* avoid stale pointer when callback returns */
+ if (status == REDIS_OK) {
+ appNotifyDisconnectCompleted(mydata);
+ } else {
+ appNotifyUnexpectedDisconnect(mydata);
+ appAttemptReconnect();
+ }
+}
+```
+
+
### Sending commands and their callbacks
In an asynchronous context, commands are automatically pipelined due to the nature of an event loop.
@@ -382,6 +440,14 @@ valid for the duration of the callback.
All pending callbacks are called with a `NULL` reply when the context encountered an error.
+For every command issued, with the exception of **SUBSCRIBE** and **PSUBSCRIBE**, the callback is
+called exactly once. Even if the context object id disconnected or deleted, every pending callback
+will be called with a `NULL` reply.
+
+For **SUBSCRIBE** and **PSUBSCRIBE**, the callbacks may be called repeatedly until a `unsubscribe`
+message arrives. This will be the last invocation of the callback. In case of error, the callbacks
+may reive a final `NULL` reply instead.
+
### Disconnecting
An asynchronous connection can be terminated using:
@@ -394,6 +460,15 @@ have been written to the socket, their respective replies have been read and the
callbacks have been executed. After this, the disconnection callback is executed with the
`REDIS_OK` status and the context object is freed.
+The connection can be forcefully disconnected using
+```c
+void redisAsyncFree(redisAsyncContext *ac);
+```
+In this case, nothing more is written to the socket, all pending callbacks are called with a `NULL`
+reply and the disconnection callback is called with `REDIS_OK`, after which the context object
+is freed.
+
+
### Hooking it up to event library *X*
There are a few hooks that need to be set on the context object after it is created.
@@ -549,9 +624,9 @@ ssl_context = redisCreateSSLContext(
if(ssl_context == NULL || ssl_error != 0) {
/* Handle error and abort... */
- /* e.g.
- printf("SSL error: %s\n",
- (ssl_error != 0) ?
+ /* e.g.
+ printf("SSL error: %s\n",
+ (ssl_error != 0) ?
redisSSLContextGetError(ssl_error) : "Unknown error");
// Abort
*/
diff --git a/adapters/poll.h b/adapters/poll.h
new file mode 100644
index 0000000..f138650
--- /dev/null
+++ b/adapters/poll.h
@@ -0,0 +1,197 @@
+
+#ifndef HIREDIS_POLL_H
+#define HIREDIS_POLL_H
+
+#include "../async.h"
+#include "../sockcompat.h"
+#include <string.h> // for memset
+#include <errno.h>
+
+/* Values to return from redisPollTick */
+#define REDIS_POLL_HANDLED_READ 1
+#define REDIS_POLL_HANDLED_WRITE 2
+#define REDIS_POLL_HANDLED_TIMEOUT 4
+
+/* An adapter to allow manual polling of the async context by checking the state
+ * of the underlying file descriptor. Useful in cases where there is no formal
+ * IO event loop but regular ticking can be used, such as in game engines. */
+
+typedef struct redisPollEvents {
+ redisAsyncContext *context;
+ redisFD fd;
+ char reading, writing;
+ char in_tick;
+ char deleted;
+ double deadline;
+} redisPollEvents;
+
+static double redisPollTimevalToDouble(struct timeval *tv) {
+ if (tv == NULL)
+ return 0.0;
+ return tv->tv_sec + tv->tv_usec / 1000000.00;
+}
+
+static double redisPollGetNow(void) {
+#ifndef _MSC_VER
+ struct timeval tv;
+ gettimeofday(&tv,NULL);
+ return redisPollTimevalToDouble(&tv);
+#else
+ FILETIME ft;
+ ULARGE_INTEGER li;
+ GetSystemTimeAsFileTime(&ft);
+ li.HighPart = ft.dwHighDateTime;
+ li.LowPart = ft.dwLowDateTime;
+ return (double)li.QuadPart * 1e-7;
+#endif
+}
+
+/* Poll for io, handling any pending callbacks. The timeout argument can be
+ * positive to wait for a maximum given time for IO, zero to poll, or negative
+ * to wait forever */
+static int redisPollTick(redisAsyncContext *ac, double timeout) {
+ int reading, writing;
+ struct pollfd pfd;
+ int handled;
+ int ns;
+ int itimeout;
+
+ redisPollEvents *e = (redisPollEvents*)ac->ev.data;
+ if (!e)
+ return 0;
+
+ /* local flags, won't get changed during callbacks */
+ reading = e->reading;
+ writing = e->writing;
+ if (!reading && !writing)
+ return 0;
+
+ pfd.fd = e->fd;
+ pfd.events = 0;
+ if (reading)
+ pfd.events = POLLIN;
+ if (writing)
+ pfd.events |= POLLOUT;
+
+ if (timeout >= 0.0) {
+ itimeout = (int)(timeout * 1000.0);
+ } else {
+ itimeout = -1;
+ }
+
+ ns = poll(&pfd, 1, itimeout);
+ if (ns < 0) {
+ /* ignore the EINTR error */
+ if (errno != EINTR)
+ return ns;
+ ns = 0;
+ }
+
+ handled = 0;
+ e->in_tick = 1;
+ if (ns) {
+ if (reading && (pfd.revents & POLLIN)) {
+ redisAsyncHandleRead(ac);
+ handled |= REDIS_POLL_HANDLED_READ;
+ }
+ /* on Windows, connection failure is indicated with the Exception fdset.
+ * handle it the same as writable. */
+ if (writing && (pfd.revents & (POLLOUT | POLLERR))) {
+ /* context Read callback may have caused context to be deleted, e.g.
+ by doing an redisAsyncDisconnect() */
+ if (!e->deleted) {
+ redisAsyncHandleWrite(ac);
+ handled |= REDIS_POLL_HANDLED_WRITE;
+ }
+ }
+ }
+
+ /* perform timeouts */
+ if (!e->deleted && e->deadline != 0.0) {
+ double now = redisPollGetNow();
+ if (now >= e->deadline) {
+ /* deadline has passed. disable timeout and perform callback */
+ e->deadline = 0.0;
+ redisAsyncHandleTimeout(ac);
+ handled |= REDIS_POLL_HANDLED_TIMEOUT;
+ }
+ }
+
+ /* do a delayed cleanup if required */
+ if (e->deleted)
+ hi_free(e);
+ else
+ e->in_tick = 0;
+
+ return handled;
+}
+
+static void redisPollAddRead(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+ e->reading = 1;
+}
+
+static void redisPollDelRead(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+ e->reading = 0;
+}
+
+static void redisPollAddWrite(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+ e->writing = 1;
+}
+
+static void redisPollDelWrite(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+ e->writing = 0;
+}
+
+static void redisPollCleanup(void *data) {
+ redisPollEvents *e = (redisPollEvents*)data;
+
+ /* if we are currently processing a tick, postpone deletion */
+ if (e->in_tick)
+ e->deleted = 1;
+ else
+ hi_free(e);
+}
+
+static void redisPollScheduleTimer(void *data, struct timeval tv)
+{
+ redisPollEvents *e = (redisPollEvents*)data;
+ double now = redisPollGetNow();
+ e->deadline = now + redisPollTimevalToDouble(&tv);
+}
+
+static int redisPollAttach(redisAsyncContext *ac) {
+ redisContext *c = &(ac->c);
+ redisPollEvents *e;
+
+ /* Nothing should be attached when something is already attached */
+ if (ac->ev.data != NULL)
+ return REDIS_ERR;
+
+ /* Create container for context and r/w events */
+ e = (redisPollEvents*)hi_malloc(sizeof(*e));
+ if (e == NULL)
+ return REDIS_ERR;
+ memset(e, 0, sizeof(*e));
+
+ e->context = ac;
+ e->fd = c->fd;
+ e->reading = e->writing = 0;
+ e->in_tick = e->deleted = 0;
+ e->deadline = 0.0;
+
+ /* Register functions to start/stop listening for events */
+ ac->ev.addRead = redisPollAddRead;
+ ac->ev.delRead = redisPollDelRead;
+ ac->ev.addWrite = redisPollAddWrite;
+ ac->ev.delWrite = redisPollDelWrite;
+ ac->ev.scheduleTimer = redisPollScheduleTimer;
+ ac->ev.cleanup = redisPollCleanup;
+ ac->ev.data = e;
+
+ return REDIS_OK;
+}
+#endif /* HIREDIS_POLL_H */
diff --git a/async.c b/async.c
index 6555114..3dad137 100644
--- a/async.c
+++ b/async.c
@@ -148,6 +148,7 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
ac->sub.replies.tail = NULL;
ac->sub.channels = channels;
ac->sub.patterns = patterns;
+ ac->sub.pending_unsubs = 0;
return ac;
oom:
@@ -411,11 +412,11 @@ void redisAsyncDisconnect(redisAsyncContext *ac) {
static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
redisContext *c = &(ac->c);
dict *callbacks;
- redisCallback *cb;
+ redisCallback *cb = NULL;
dictEntry *de;
int pvariant;
char *stype;
- sds sname;
+ sds sname = NULL;
/* Match reply with the expected format of a pushed message.
* The type and number of elements (3 to 4) are specified at:
@@ -432,42 +433,43 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
callbacks = ac->sub.channels;
/* Locate the right callback */
- assert(reply->element[1]->type == REDIS_REPLY_STRING);
- sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
- if (sname == NULL)
- goto oom;
+ if (reply->element[1]->type == REDIS_REPLY_STRING) {
+ sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
+ if (sname == NULL) goto oom;
- de = dictFind(callbacks,sname);
- if (de != NULL) {
- cb = dictGetEntryVal(de);
-
- /* If this is an subscribe reply decrease pending counter. */
- if (strcasecmp(stype+pvariant,"subscribe") == 0) {
- cb->pending_subs -= 1;
+ if ((de = dictFind(callbacks,sname)) != NULL) {
+ cb = dictGetEntryVal(de);
+ memcpy(dstcb,cb,sizeof(*dstcb));
}
+ }
- memcpy(dstcb,cb,sizeof(*dstcb));
-
- /* If this is an unsubscribe message, remove it. */
- if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
- if (cb->pending_subs == 0)
- dictDelete(callbacks,sname);
-
- /* If this was the last unsubscribe message, revert to
- * non-subscribe mode. */
- assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
-
- /* Unset subscribed flag only when no pipelined pending subscribe. */
- if (reply->element[2]->integer == 0
- && dictSize(ac->sub.channels) == 0
- && dictSize(ac->sub.patterns) == 0) {
- c->flags &= ~REDIS_SUBSCRIBED;
-
- /* Move ongoing regular command callbacks. */
- redisCallback cb;
- while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
- __redisPushCallback(&ac->replies,&cb);
- }
+ /* If this is an subscribe reply decrease pending counter. */
+ if (strcasecmp(stype+pvariant,"subscribe") == 0) {
+ assert(cb != NULL);
+ cb->pending_subs -= 1;
+
+ } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
+ if (cb == NULL)
+ ac->sub.pending_unsubs -= 1;
+ else if (cb->pending_subs == 0)
+ dictDelete(callbacks,sname);
+
+ /* If this was the last unsubscribe message, revert to
+ * non-subscribe mode. */
+ assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
+
+ /* Unset subscribed flag only when no pipelined pending subscribe
+ * or pending unsubscribe replies. */
+ if (reply->element[2]->integer == 0
+ && dictSize(ac->sub.channels) == 0
+ && dictSize(ac->sub.patterns) == 0
+ && ac->sub.pending_unsubs == 0) {
+ c->flags &= ~REDIS_SUBSCRIBED;
+
+ /* Move ongoing regular command callbacks. */
+ redisCallback cb;
+ while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
+ __redisPushCallback(&ac->replies,&cb);
}
}
}
@@ -540,7 +542,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
/* Even if the context is subscribed, pending regular
* callbacks will get a reply before pub/sub messages arrive. */
- redisCallback cb = {NULL, NULL, 0, NULL};
+ redisCallback cb = {NULL, NULL, 0, 0, NULL};
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
/*
* A spontaneous reply in a not-subscribed context can be the error
@@ -757,6 +759,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
redisContext *c = &(ac->c);
redisCallback cb;
struct dict *cbdict;
+ dictIterator it;
dictEntry *de;
redisCallback *existcb;
int pvariant, hasnext;
@@ -773,6 +776,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
cb.fn = fn;
cb.privdata = privdata;
cb.pending_subs = 1;
+ cb.unsubscribe_sent = 0;
/* Find out which command will be appended. */
p = nextArgument(cmd,&cstr,&clen);
@@ -812,6 +816,51 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
* subscribed to one or more channels or patterns. */
if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
+ if (pvariant)
+ cbdict = ac->sub.patterns;
+ else
+ cbdict = ac->sub.channels;
+
+ if (hasnext) {
+ /* Send an unsubscribe with specific channels/patterns.
+ * Bookkeeping the number of expected replies */
+ while ((p = nextArgument(p,&astr,&alen)) != NULL) {
+ sname = sdsnewlen(astr,alen);
+ if (sname == NULL)
+ goto oom;
+
+ de = dictFind(cbdict,sname);
+ if (de != NULL) {
+ existcb = dictGetEntryVal(de);
+ if (existcb->unsubscribe_sent == 0)
+ existcb->unsubscribe_sent = 1;
+ else
+ /* Already sent, reply to be ignored */
+ ac->sub.pending_unsubs += 1;
+ } else {
+ /* Not subscribed to, reply to be ignored */
+ ac->sub.pending_unsubs += 1;
+ }
+ sdsfree(sname);
+ }
+ } else {
+ /* Send an unsubscribe without specific channels/patterns.
+ * Bookkeeping the number of expected replies */
+ int no_subs = 1;
+ dictInitIterator(&it,cbdict);
+ while ((de = dictNext(&it)) != NULL) {
+ existcb = dictGetEntryVal(de);
+ if (existcb->unsubscribe_sent == 0) {
+ existcb->unsubscribe_sent = 1;
+ no_subs = 0;
+ }
+ }
+ /* Unsubscribing to all channels/patterns, where none is
+ * subscribed to, results in a single reply to be ignored. */
+ if (no_subs == 1)
+ ac->sub.pending_unsubs += 1;
+ }
+
/* (P)UNSUBSCRIBE does not have its own response: every channel or
* pattern that is unsubscribed will receive a message. This means we
* should not append a callback function for this command. */
diff --git a/async.h b/async.h
index 4c65203..41951d4 100644
--- a/async.h
+++ b/async.h
@@ -46,6 +46,7 @@ typedef struct redisCallback {
struct redisCallback *next; /* simple singly linked list */
redisCallbackFn *fn;
int pending_subs;
+ int unsubscribe_sent;
void *privdata;
} redisCallback;
@@ -105,6 +106,7 @@ typedef struct redisAsyncContext {
redisCallbackList replies;
struct dict *channels;
struct dict *patterns;
+ int pending_unsubs;
} sub;
/* Any configured RESP3 PUSH handler */
diff --git a/examples/example-poll.c b/examples/example-poll.c
new file mode 100644
index 0000000..954673d
--- /dev/null
+++ b/examples/example-poll.c
@@ -0,0 +1,62 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <unistd.h>
+
+#include <async.h>
+#include <adapters/poll.h>
+
+/* Put in the global scope, so that loop can be explicitly stopped */
+static int exit_loop = 0;
+
+void getCallback(redisAsyncContext *c, void *r, void *privdata) {
+ redisReply *reply = r;
+ if (reply == NULL) return;
+ printf("argv[%s]: %s\n", (char*)privdata, reply->str);
+
+ /* Disconnect after receiving the reply to GET */
+ redisAsyncDisconnect(c);
+}
+
+void connectCallback(const redisAsyncContext *c, int status) {
+ if (status != REDIS_OK) {
+ printf("Error: %s\n", c->errstr);
+ exit_loop = 1;
+ return;
+ }
+
+ printf("Connected...\n");
+}
+
+void disconnectCallback(const redisAsyncContext *c, int status) {
+ exit_loop = 1;
+ if (status != REDIS_OK) {
+ printf("Error: %s\n", c->errstr);
+ return;
+ }
+
+ printf("Disconnected...\n");
+}
+
+int main (int argc, char **argv) {
+ signal(SIGPIPE, SIG_IGN);
+
+ redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379);
+ if (c->err) {
+ /* Let *c leak for now... */
+ printf("Error: %s\n", c->errstr);
+ return 1;
+ }
+
+ redisPollAttach(c);
+ redisAsyncSetConnectCallback(c,connectCallback);
+ redisAsyncSetDisconnectCallback(c,disconnectCallback);
+ redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1]));
+ redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key");
+ while (!exit_loop)
+ {
+ redisPollTick(c, 0.1);
+ }
+ return 0;
+}
diff --git a/net.c b/net.c
index c6b0e5d..3ff89ed 100644
--- a/net.c
+++ b/net.c
@@ -277,12 +277,28 @@ int redisCheckConnectDone(redisContext *c, int *completed) {
*completed = 1;
return REDIS_OK;
}
- switch (errno) {
+ int error = errno;
+ if (error == EINPROGRESS) {
+ /* must check error to see if connect failed. Get the socket error */
+ int fail, so_error;
+ socklen_t optlen = sizeof(so_error);
+ fail = getsockopt(c->fd, SOL_SOCKET, SO_ERROR, &so_error, &optlen);
+ if (fail == 0) {
+ if (so_error == 0) {
+ /* Socket is connected! */
+ *completed = 1;
+ return REDIS_OK;
+ }
+ /* connection error; */
+ errno = so_error;
+ error = so_error;
+ }
+ }
+ switch (error) {
case EISCONN:
*completed = 1;
return REDIS_OK;
case EALREADY:
- case EINPROGRESS:
case EWOULDBLOCK:
*completed = 0;
return REDIS_OK;
diff --git a/read.c b/read.c
index de62b9a..228d4d0 100644
--- a/read.c
+++ b/read.c
@@ -374,7 +374,7 @@ static int processLineItem(redisReader *r) {
if (r->fn && r->fn->createString)
obj = r->fn->createString(cur,p,len);
else
- obj = (void*)(size_t)(cur->type);
+ obj = (void*)(uintptr_t)(cur->type);
}
if (obj == NULL) {
@@ -439,7 +439,7 @@ static int processBulkItem(redisReader *r) {
if (r->fn && r->fn->createString)
obj = r->fn->createString(cur,s+2,len);
else
- obj = (void*)(long)cur->type;
+ obj = (void*)(uintptr_t)cur->type;
success = 1;
}
}
@@ -536,7 +536,7 @@ static int processAggregateItem(redisReader *r) {
if (r->fn && r->fn->createArray)
obj = r->fn->createArray(cur,elements);
else
- obj = (void*)(long)cur->type;
+ obj = (void*)(uintptr_t)cur->type;
if (obj == NULL) {
__redisReaderSetErrorOOM(r);
diff --git a/sds.c b/sds.c
index 35baa05..a20ba19 100644
--- a/sds.c
+++ b/sds.c
@@ -90,6 +90,7 @@ sds sdsnewlen(const void *init, size_t initlen) {
int hdrlen = sdsHdrSize(type);
unsigned char *fp; /* flags pointer. */
+ if (hdrlen+initlen+1 <= initlen) return NULL; /* Catch size_t overflow */
sh = s_malloc(hdrlen+initlen+1);
if (sh == NULL) return NULL;
if (!init)
@@ -174,7 +175,7 @@ void sdsfree(sds s) {
* the output will be "6" as the string was modified but the logical length
* remains 6 bytes. */
void sdsupdatelen(sds s) {
- int reallen = strlen(s);
+ size_t reallen = strlen(s);
sdssetlen(s, reallen);
}
@@ -196,7 +197,7 @@ void sdsclear(sds s) {
sds sdsMakeRoomFor(sds s, size_t addlen) {
void *sh, *newsh;
size_t avail = sdsavail(s);
- size_t len, newlen;
+ size_t len, newlen, reqlen;
char type, oldtype = s[-1] & SDS_TYPE_MASK;
int hdrlen;
@@ -205,7 +206,8 @@ sds sdsMakeRoomFor(sds s, size_t addlen) {
len = sdslen(s);
sh = (char*)s-sdsHdrSize(oldtype);
- newlen = (len+addlen);
+ reqlen = newlen = (len+addlen);
+ if (newlen <= len) return NULL; /* Catch size_t overflow */
if (newlen < SDS_MAX_PREALLOC)
newlen *= 2;
else
@@ -219,6 +221,7 @@ sds sdsMakeRoomFor(sds s, size_t addlen) {
if (type == SDS_TYPE_5) type = SDS_TYPE_8;
hdrlen = sdsHdrSize(type);
+ if (hdrlen+newlen+1 <= reqlen) return NULL; /* Catch size_t overflow */
if (oldtype==type) {
newsh = s_realloc(sh, hdrlen+newlen+1);
if (newsh == NULL) return NULL;
@@ -580,7 +583,7 @@ sds sdscatprintf(sds s, const char *fmt, ...) {
*/
sds sdscatfmt(sds s, char const *fmt, ...) {
const char *f = fmt;
- int i;
+ long i;
va_list ap;
va_start(ap,fmt);
@@ -755,14 +758,14 @@ int sdsrange(sds s, ssize_t start, ssize_t end) {
/* Apply tolower() to every character of the sds string 's'. */
void sdstolower(sds s) {
- int len = sdslen(s), j;
+ size_t len = sdslen(s), j;
for (j = 0; j < len; j++) s[j] = tolower(s[j]);
}
/* Apply toupper() to every character of the sds string 's'. */
void sdstoupper(sds s) {
- int len = sdslen(s), j;
+ size_t len = sdslen(s), j;
for (j = 0; j < len; j++) s[j] = toupper(s[j]);
}
diff --git a/sockcompat.c b/sockcompat.c
index f99d14b..31df325 100644
--- a/sockcompat.c
+++ b/sockcompat.c
@@ -180,10 +180,17 @@ int win32_connect(SOCKET sockfd, const struct sockaddr *addr, socklen_t addrlen)
/* For Winsock connect(), the WSAEWOULDBLOCK error means the same thing as
* EINPROGRESS for POSIX connect(), so we do that translation to keep POSIX
- * logic consistent. */
- if (errno == EWOULDBLOCK) {
+ * logic consistent.
+ * Additionally, WSAALREADY is can be reported as WSAEINVAL to and this is
+ * translated to EIO. Convert appropriately
+ */
+ int err = errno;
+ if (err == EWOULDBLOCK) {
errno = EINPROGRESS;
}
+ else if (err == EIO) {
+ errno = EALREADY;
+ }
return ret != SOCKET_ERROR ? ret : -1;
}
@@ -205,6 +212,14 @@ int win32_getsockopt(SOCKET sockfd, int level, int optname, void *optval, sockle
} else {
ret = getsockopt(sockfd, level, optname, (char*)optval, optlen);
}
+ if (ret != SOCKET_ERROR && level == SOL_SOCKET && optname == SO_ERROR) {
+ /* translate SO_ERROR codes, if non-zero */
+ int err = *(int*)optval;
+ if (err != 0) {
+ err = _wsaErrorToErrno(err);
+ *(int*)optval = err;
+ }
+ }
_updateErrno(ret != SOCKET_ERROR);
return ret != SOCKET_ERROR ? ret : -1;
}
diff --git a/test.c b/test.c
index f991ef1..e201cbc 100644
--- a/test.c
+++ b/test.c
@@ -15,6 +15,7 @@
#include "hiredis.h"
#include "async.h"
+#include "adapters/poll.h"
#ifdef HIREDIS_TEST_SSL
#include "hiredis_ssl.h"
#endif
@@ -914,6 +915,11 @@ static void test_resp3_push_handler(redisContext *c) {
old = redisSetPushCallback(c, push_handler);
test("We can set a custom RESP3 PUSH handler: ");
reply = redisCommand(c, "SET key:0 val:0");
+ /* We need another command because depending on the version of Redis, the
+ * notification may be delivered after the command's reply. */
+ test_cond(reply != NULL);
+ freeReplyObject(reply);
+ reply = redisCommand(c, "PING");
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && pc.str == 1);
freeReplyObject(reply);
@@ -929,6 +935,12 @@ static void test_resp3_push_handler(redisContext *c) {
assert((reply = redisCommand(c, "GET key:0")) != NULL);
freeReplyObject(reply);
assert((reply = redisCommand(c, "SET key:0 invalid")) != NULL);
+ /* Depending on Redis version, we may receive either push notification or
+ * status reply. Both cases are valid. */
+ if (reply->type == REDIS_REPLY_STATUS) {
+ freeReplyObject(reply);
+ reply = redisCommand(c, "PING");
+ }
test_cond(reply->type == REDIS_REPLY_PUSH);
freeReplyObject(reply);
@@ -1729,10 +1741,14 @@ void subscribe_channel_a_cb(redisAsyncContext *ac, void *r, void *privdata) {
strcmp(reply->element[2]->str,"Hello!") == 0);
state->checkpoint++;
- /* Unsubscribe to channels, including a channel X which we don't subscribe to */
+ /* Unsubscribe to channels, including channel X & Z which we don't subscribe to */
redisAsyncCommand(ac,unexpected_cb,
(void*)"unsubscribe should not call unexpected_cb()",
- "unsubscribe B X A");
+ "unsubscribe B X A A Z");
+ /* Unsubscribe to patterns, none which we subscribe to */
+ redisAsyncCommand(ac,unexpected_cb,
+ (void*)"punsubscribe should not call unexpected_cb()",
+ "punsubscribe");
/* Send a regular command after unsubscribing, then disconnect */
state->disconnect = 1;
redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo");
@@ -1767,8 +1783,10 @@ void subscribe_channel_b_cb(redisAsyncContext *ac, void *r, void *privdata) {
/* Test handling of multiple channels
* - subscribe to channel A and B
- * - a published message on A triggers an unsubscribe of channel B, X and A
- * where channel X is not subscribed to.
+ * - a published message on A triggers an unsubscribe of channel B, X, A and Z
+ * where channel X and Z are not subscribed to.
+ * - the published message also triggers an unsubscribe to patterns. Since no
+ * pattern is subscribed to the responded pattern element type is NIL.
* - a command sent after unsubscribe triggers a disconnect */
static void test_pubsub_multiple_channels(struct config config) {
test("Subscribe to multiple channels: ");
@@ -1881,6 +1899,217 @@ static void test_monitor(struct config config) {
}
#endif /* HIREDIS_TEST_ASYNC */
+/* tests for async api using polling adapter, requires no extra libraries*/
+
+/* enum for the test cases, the callbacks have different logic based on them */
+typedef enum astest_no
+{
+ ASTEST_CONNECT=0,
+ ASTEST_CONN_TIMEOUT,
+ ASTEST_PINGPONG,
+ ASTEST_PINGPONG_TIMEOUT
+}astest_no;
+
+/* a static context for the async tests */
+struct _astest {
+ redisAsyncContext *ac;
+ astest_no testno;
+ int counter;
+ int connects;
+ int connect_status;
+ int disconnects;
+ int disconnect_status;
+ int connected;
+ int err;
+ char errstr[256];
+};
+static struct _astest astest;
+
+static void asSleep(int ms)
+{
+#if _MSC_VER
+ Sleep(ms);
+#else
+ usleep(ms*1000);
+#endif
+}
+
+/* async callbacks */
+static void asCleanup(void* data)
+{
+ struct _astest *t = (struct _astest *)data;
+ t->ac = NULL;
+}
+
+static void connectCallback(const redisAsyncContext *c, int status) {
+ struct _astest *t = (struct _astest *)c->data;
+ assert(t == &astest);
+ assert(t->connects == 0);
+ t->err = c->err;
+ strcpy(t->errstr, c->errstr);
+ t->connects++;
+ t->connect_status = status;
+ t->connected = status == REDIS_OK ? 1 : -1;
+}
+static void disconnectCallback(const redisAsyncContext *c, int status) {
+ assert(c->data == (void*)&astest);
+ assert(astest.disconnects == 0);
+ astest.err = c->err;
+ strcpy(astest.errstr, c->errstr);
+ astest.disconnects++;
+ astest.disconnect_status = status;
+ astest.connected = 0;
+}
+
+static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _privdata)
+{
+ redisReply *reply = (redisReply*)_reply;
+ struct _astest *t = (struct _astest *)ac->data;
+ assert(t == &astest);
+ (void)_privdata;
+ t->err = ac->err;
+ strcpy(t->errstr, ac->errstr);
+ if (t->testno == ASTEST_PINGPONG)
+ {
+ test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
+ redisAsyncFree(ac);
+ }
+ if (t->testno == ASTEST_PINGPONG_TIMEOUT)
+ {
+ /* two ping pongs */
+ assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
+ if (++t->counter == 1) {
+ int status = redisAsyncCommand(ac, commandCallback, NULL, "PING");
+ assert(status == REDIS_OK);
+ } else {
+ test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
+ redisAsyncFree(ac);
+ }
+ }
+}
+
+static redisAsyncContext *do_aconnect(struct config config, astest_no testno)
+{
+ redisOptions options = {0};
+ memset(&astest, 0, sizeof(astest));
+
+ astest.testno = testno;
+ astest.connect_status = astest.disconnect_status = -2;
+
+ if (config.type == CONN_TCP) {
+ options.type = REDIS_CONN_TCP;
+ options.connect_timeout = &config.tcp.timeout;
+ REDIS_OPTIONS_SET_TCP(&options, config.tcp.host, config.tcp.port);
+ } else if (config.type == CONN_SSL) {
+ options.type = REDIS_CONN_TCP;
+ options.connect_timeout = &config.tcp.timeout;
+ REDIS_OPTIONS_SET_TCP(&options, config.ssl.host, config.ssl.port);
+ } else if (config.type == CONN_UNIX) {
+ options.type = REDIS_CONN_UNIX;
+ options.endpoint.unix_socket = config.unix_sock.path;
+ } else if (config.type == CONN_FD) {
+ options.type = REDIS_CONN_USERFD;
+ /* Create a dummy connection just to get an fd to inherit */
+ redisContext *dummy_ctx = redisConnectUnix(config.unix_sock.path);
+ if (dummy_ctx) {
+ redisFD fd = disconnect(dummy_ctx, 1);
+ printf("Connecting to inherited fd %d\n", (int)fd);
+ options.endpoint.fd = fd;
+ }
+ }
+ redisAsyncContext *c = redisAsyncConnectWithOptions(&options);
+ assert(c);
+ astest.ac = c;
+ c->data = &astest;
+ c->dataCleanup = asCleanup;
+ redisPollAttach(c);
+ redisAsyncSetConnectCallback(c, connectCallback);
+ redisAsyncSetDisconnectCallback(c, disconnectCallback);
+ return c;
+}
+
+static void as_printerr(void) {
+ printf("Async err %d : %s\n", astest.err, astest.errstr);
+}
+
+#define ASASSERT(e) do { \
+ if (!(e)) \
+ as_printerr(); \
+ assert(e); \
+} while (0);
+
+static void test_async_polling(struct config config) {
+ int status;
+ redisAsyncContext *c;
+ struct config defaultconfig = config;
+
+ test("Async connect: ");
+ c = do_aconnect(config, ASTEST_CONNECT);
+ assert(c);
+ while(astest.connected == 0)
+ redisPollTick(c, 0.1);
+ assert(astest.connects == 1);
+ ASASSERT(astest.connect_status == REDIS_OK);
+ assert(astest.disconnects == 0);
+ test_cond(astest.connected == 1);
+
+ test("Async free after connect: ");
+ assert(astest.ac != NULL);
+ redisAsyncFree(c);
+ assert(astest.disconnects == 1);
+ assert(astest.ac == NULL);
+ test_cond(astest.disconnect_status == REDIS_OK);
+
+ if (config.type == CONN_TCP || config.type == CONN_SSL) {
+ /* timeout can only be simulated with network */
+ test("Async connect timeout: ");
+ config.tcp.host = "192.168.254.254"; /* blackhole ip */
+ config.tcp.timeout.tv_usec = 100000;
+ c = do_aconnect(config, ASTEST_CONN_TIMEOUT);
+ assert(c);
+ assert(c->err == 0);
+ while(astest.connected == 0)
+ redisPollTick(c, 0.1);
+ assert(astest.connected == -1);
+ /*
+ * freeing should not be done, clearing should have happened.
+ *redisAsyncFree(c);
+ */
+ assert(astest.ac == NULL);
+ test_cond(astest.connect_status == REDIS_ERR);
+ config = defaultconfig;
+ }
+
+ /* Test a ping/pong after connection */
+ test("Async PING/PONG: ");
+ c = do_aconnect(config, ASTEST_PINGPONG);
+ while(astest.connected == 0)
+ redisPollTick(c, 0.1);
+ status = redisAsyncCommand(c, commandCallback, NULL, "PING");
+ assert(status == REDIS_OK);
+ while(astest.ac)
+ redisPollTick(c, 0.1);
+
+ /* Test a ping/pong after connection that didn't time out.
+ * see https://github.com/redis/hiredis/issues/945
+ */
+ if (config.type == CONN_TCP || config.type == CONN_SSL) {
+ test("Async PING/PONG after connect timeout: ");
+ config.tcp.timeout.tv_usec = 10000; /* 10ms */
+ c = do_aconnect(config, ASTEST_PINGPONG_TIMEOUT);
+ while(astest.connected == 0)
+ redisPollTick(c, 0.1);
+ /* sleep 0.1 s, allowing old timeout to arrive */
+ asSleep(10);
+ status = redisAsyncCommand(c, commandCallback, NULL, "PING");
+ assert(status == REDIS_OK);
+ while(astest.ac)
+ redisPollTick(c, 0.1);
+ config = defaultconfig;
+ }
+}
+/* End of Async polling_adapter driven tests */
+
int main(int argc, char **argv) {
struct config cfg = {
.tcp = {
@@ -2000,6 +2229,7 @@ int main(int argc, char **argv) {
#endif
#ifdef HIREDIS_TEST_ASYNC
+ cfg.type = CONN_TCP;
printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
cfg.type = CONN_TCP;
@@ -2017,6 +2247,15 @@ int main(int argc, char **argv) {
}
#endif /* HIREDIS_TEST_ASYNC */
+ cfg.type = CONN_TCP;
+ printf("\nTesting asynchronous API using polling_adapter TCP (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
+ test_async_polling(cfg);
+ if (test_unix_socket) {
+ cfg.type = CONN_UNIX;
+ printf("\nTesting asynchronous API using polling_adapter UNIX (%s):\n", cfg.unix_sock.path);
+ test_async_polling(cfg);
+ }
+
if (test_inherit_fd) {
printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path);
if (test_unix_socket) {
diff --git a/test.sh b/test.sh
index c72bcb0..a518db7 100755
--- a/test.sh
+++ b/test.sh
@@ -5,9 +5,16 @@ REDIS_PORT=${REDIS_PORT:-56379}
REDIS_SSL_PORT=${REDIS_SSL_PORT:-56443}
TEST_SSL=${TEST_SSL:-0}
SKIPS_AS_FAILS=${SKIPS_AS_FAILS-:0}
+ENABLE_DEBUG_CMD=
SSL_TEST_ARGS=
SKIPS_ARG=
+# We need to enable the DEBUG command for redis-server >= 7.0.0
+REDIS_MAJOR_VERSION="$(redis-server --version|awk -F'[^0-9]+' '{ print $2 }')"
+if [ "$REDIS_MAJOR_VERSION" -gt "6" ]; then
+ ENABLE_DEBUG_CMD="enable-debug-command local"
+fi
+
tmpdir=$(mktemp -d)
PID_FILE=${tmpdir}/hiredis-test-redis.pid
SOCK_FILE=${tmpdir}/hiredis-test-redis.sock
@@ -49,8 +56,10 @@ cleanup() {
}
trap cleanup INT TERM EXIT
+
cat > ${tmpdir}/redis.conf <<EOF
daemonize yes
+${ENABLE_DEBUG_CMD}
pidfile ${PID_FILE}
port ${REDIS_PORT}
bind 127.0.0.1