summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore1
-rw-r--r--.travis.yml3
-rw-r--r--Makefile6
-rw-r--r--README.md78
-rw-r--r--async.c64
-rw-r--r--async.h4
-rw-r--r--examples/CMakeLists.txt3
-rw-r--r--examples/example-push.c118
-rw-r--r--hiredis.c46
-rw-r--r--hiredis.h23
-rw-r--r--test.c189
11 files changed, 505 insertions, 30 deletions
diff --git a/.gitignore b/.gitignore
index 8e50b54..056959f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,3 +6,4 @@
/*.a
/*.pc
*.dSYM
+tags
diff --git a/.travis.yml b/.travis.yml
index 792d175..d0a551a 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -24,6 +24,8 @@ before_script:
addons:
apt:
+ sources:
+ - sourceline: 'ppa:chris-lea/redis-server'
packages:
- libc6-dbg
- libc6-dev
@@ -35,6 +37,7 @@ addons:
- libssl-dev
- libssl-dev:i386
- valgrind
+ - redis
env:
- BITS="32"
diff --git a/Makefile b/Makefile
index f19e760..a8d37a2 100644
--- a/Makefile
+++ b/Makefile
@@ -5,7 +5,7 @@
OBJ=alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o
SSL_OBJ=ssl.o
-EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib
+EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push
ifeq ($(USE_SSL),1)
EXAMPLES+=hiredis-example-ssl hiredis-example-libevent-ssl
endif
@@ -161,6 +161,7 @@ hiredis-example-macosx: examples/example-macosx.c adapters/macosx.h $(STLIBNAME)
hiredis-example-ssl: examples/example-ssl.c $(STLIBNAME) $(SSL_STLIBNAME)
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(SSL_STLIBNAME) $(REAL_LDFLAGS) $(SSL_LDFLAGS)
+
ifndef AE_DIR
hiredis-example-ae:
@echo "Please specify AE_DIR (e.g. <redis repository>/src)"
@@ -195,6 +196,9 @@ endif
hiredis-example: examples/example.c $(STLIBNAME)
$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS)
+hiredis-example-push: examples/example-push.c $(STLIBNAME)
+ $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS)
+
examples: $(EXAMPLES)
TEST_LIBS = $(STLIBNAME)
diff --git a/README.md b/README.md
index 3fc3418..c012f1a 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
[![Build Status](https://travis-ci.org/redis/hiredis.png)](https://travis-ci.org/redis/hiredis)
-**This Readme reflects the latest changed in the master branch. See [v0.13.3](https://github.com/redis/hiredis/tree/v0.13.3) for the Readme and documentation for the latest release ([API/ABI history](https://abi-laboratory.pro/?view=timeline&l=hiredis)).**
+**This Readme reflects the latest changed in the master branch. See [v0.14.1](https://github.com/redis/hiredis/tree/v0.14.1) for the Readme and documentation for the latest release ([API/ABI history](https://abi-laboratory.pro/?view=timeline&l=hiredis)).**
# HIREDIS
@@ -500,12 +500,75 @@ if (redisInitiateSSLWithContext(c, ssl) != REDIS_OK) {
}
```
+## RESP3 PUSH replies
+Redis 6.0 introduced PUSH replies with the reply-type `>`. These messages are generated spontaneously and can arrive at any time, so must be handled using callbacks.
+
+### Default behavior
+Hiredis installs handlers on `redisContext` and `redisAsyncContext` by default, which will intercept and free any PUSH replies detected. This means existing code will work as-is after upgrading to Redis 6 and switching to `RESP3`.
+
+### Custom PUSH handler prototypes
+The callback prototypes differ between `redisContext` and `redisAsyncContext`.
+
+#### redisContext
+```c
+void my_push_handler(void *privdata, void *reply) {
+ /* Handle the reply */
+
+ /* Note: We need to free the reply in our custom handler for
+ blocking contexts. This lets us keep the reply if
+ we want. */
+ freeReplyObject(reply);
+}
+```
+
+#### redisAsyncContext
+```c
+void my_async_push_handler(redisAsyncContext *ac, void *reply) {
+ /* Handle the reply */
+
+ /* Note: Because async hiredis always frees replies, you should
+ not call freeReplyObject in an async push callback. */
+}
+```
+
+### Installing a custom handler
+There are two ways to set your own PUSH handlers.
+
+1. Set `push_cb` or `async_push_cb` in the `redisOptions` struct and connect with `redisConnectWithOptions` or `redisAsyncConnectWithOptions`.
+ ```c
+ redisOptions = {0};
+ REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379);
+ options->push_cb = my_push_handler;
+ redisContext *context = redisConnectWithOptions(&options);
+ ```
+2. Call `redisSetPushCallback` or `redisAsyncSetPushCallback` on a connected context.
+ ```c
+ redisContext *context = redisConnect("127.0.0.1", 6379);
+ redisSetPushCallback(context, my_push_handler);
+ ```
+
+ _Note `redisSetPushCallback` and `redisAsyncSetPushCallback` both return any currently configured handler, making it easy to override and then return to the old value._
+
+### Specifying no handler
+If you have a unique use-case where you don't want hiredis to automatically intercept and free PUSH replies, you will want to configure no handler at all. This can be done in two ways.
+1. Set the `REDIS_OPT_NO_PUSH_AUTOFREE` flag in `redisOptions` and leave the callback function pointer `NULL`.
+ ```c
+ redisOptions = {0};
+ REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379);
+ options->options |= REDIS_OPT_NO_PUSH_AUTOFREE;
+ redisContext *context = redisConnectWithOptions(&options);
+ ```
+3. Call `redisSetPushCallback` with `NULL` once connected.
+ ```c
+ redisContext *context = redisConnect("127.0.0.1", 6379);
+ redisSetPushCallback(context, NULL);
+ ```
+
+ _Note: With no handler configured, calls to `redisCommand` may generate more than one reply, so this strategy is only applicable when there's some kind of blocking`redisGetReply()` loop (e.g. `MONITOR` or `SUBSCRIBE` workloads)._
+
## Allocator injection
-Hiredis uses a pass-thru structure of function pointers defined in
-[alloc.h](https://github.com/redis/hiredis/blob/f5d25850/alloc.h#L41) that conttain
-the currently configured allocation and deallocation functions. By default they
-just point to libc (`malloc`, `calloc`, `realloc`, etc).
+Hiredis uses a pass-thru structure of function pointers defined in [alloc.h](https://github.com/redis/hiredis/blob/f5d25850/alloc.h#L41) that contain the currently configured allocation and deallocation functions. By default they just point to libc (`malloc`, `calloc`, `realloc`, etc).
### Overriding
@@ -532,5 +595,6 @@ hiredisResetAllocators();
## AUTHORS
-Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and
-Pieter Noordhuis (pcnoordhuis at gmail) and is released under the BSD license.
+Hiredis was written by Salvatore Sanfilippo (antirez at gmail),
+Pieter Noordhuis (pcnoordhuis at gmail), and Michael Grunder
+(michael dot grunder at gmail) and is released under the BSD license.
diff --git a/async.c b/async.c
index 2cff0b1..1073d8d 100644
--- a/async.c
+++ b/async.c
@@ -167,16 +167,26 @@ redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) {
redisContext *c;
redisAsyncContext *ac;
+ /* Clear any erroneously set sync callback and flag that we don't want to
+ * use freeReplyObject by default. */
+ myOptions.push_cb = NULL;
+ myOptions.options |= REDIS_OPT_NO_PUSH_AUTOFREE;
+
myOptions.options |= REDIS_OPT_NONBLOCK;
c = redisConnectWithOptions(&myOptions);
if (c == NULL) {
return NULL;
}
+
ac = redisAsyncInitialize(c);
if (ac == NULL) {
redisFree(c);
return NULL;
}
+
+ /* Set any configured async push handler */
+ redisAsyncSetPushCallback(ac, myOptions.async_push_cb);
+
__redisAsyncCopyError(ac);
return ac;
}
@@ -279,6 +289,14 @@ static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisRe
}
}
+static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) {
+ if (ac->push_cb != NULL) {
+ ac->c.flags |= REDIS_IN_CALLBACK;
+ ac->push_cb(ac, reply);
+ ac->c.flags &= ~REDIS_IN_CALLBACK;
+ }
+}
+
/* Helper function to free the context. */
static void __redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
@@ -294,7 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
__redisRunCallback(ac,&cb,NULL);
- /* Run subscription callbacks callbacks with NULL reply */
+ /* Run subscription callbacks with NULL reply */
if (ac->sub.channels) {
it = dictGetIterator(ac->sub.channels);
if (it != NULL) {
@@ -459,6 +477,30 @@ oom:
return REDIS_ERR;
}
+#define redisIsSpontaneousPushReply(r) \
+ (redisIsPushReply(r) && !redisIsSubscribeReply(r))
+
+static int redisIsSubscribeReply(redisReply *reply) {
+ char *str;
+ size_t len, off;
+
+ /* We will always have at least one string with the subscribe/message type */
+ if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING ||
+ reply->element[0]->len < sizeof("message") - 1)
+ {
+ return 0;
+ }
+
+ /* Get the string/len moving past 'p' if needed */
+ off = tolower(reply->element[0]->str[0]) == 'p';
+ str = reply->element[0]->str + off;
+ len = reply->element[0]->len - off;
+
+ return !strncasecmp(str, "subscribe", len) ||
+ !strncasecmp(str, "message", len);
+
+}
+
void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb = {NULL, NULL, 0, NULL};
@@ -485,8 +527,18 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
break;
}
- /* Even if the context is subscribed, pending regular callbacks will
- * get a reply before pub/sub messages arrive. */
+ /* Send any non-subscribe related PUSH messages to our PUSH handler
+ * while allowing subscribe related PUSH messages to pass through.
+ * This allows existing code to be backward compatible and work in
+ * either RESP2 or RESP3 mode. */
+ if (redisIsSpontaneousPushReply(reply)) {
+ __redisRunPushCallback(ac, reply);
+ c->reader->fn->freeObject(reply);
+ continue;
+ }
+
+ /* Even if the context is subscribed, pending regular
+ * callbacks will get a reply before pub/sub messages arrive. */
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
/*
* A spontaneous reply in a not-subscribed context can be the error
@@ -809,6 +861,12 @@ int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
return status;
}
+redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) {
+ redisAsyncPushFn *old = ac->push_cb;
+ ac->push_cb = fn;
+ return old;
+}
+
int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
if (!ac->c.timeout) {
ac->c.timeout = hi_calloc(1, sizeof(tv));
diff --git a/async.h b/async.h
index ba301da..b1d2cb2 100644
--- a/async.h
+++ b/async.h
@@ -106,6 +106,9 @@ typedef struct redisAsyncContext {
struct dict *channels;
struct dict *patterns;
} sub;
+
+ /* Any configured RESP3 PUSH handler */
+ redisAsyncPushFn *push_cb;
} redisAsyncContext;
/* Functions that proxy to hiredis */
@@ -118,6 +121,7 @@ redisAsyncContext *redisAsyncConnectUnix(const char *path);
int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);
int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn);
+redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn);
int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv);
void redisAsyncDisconnect(redisAsyncContext *ac);
void redisAsyncFree(redisAsyncContext *ac);
diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt
index dd3a313..1d5bc56 100644
--- a/examples/CMakeLists.txt
+++ b/examples/CMakeLists.txt
@@ -44,3 +44,6 @@ ENDIF()
ADD_EXECUTABLE(example example.c)
TARGET_LINK_LIBRARIES(example hiredis)
+
+ADD_EXECUTABLE(example-push example-push.c)
+TARGET_LINK_LIBRARIES(example-push hiredis)
diff --git a/examples/example-push.c b/examples/example-push.c
new file mode 100644
index 0000000..d8b41f9
--- /dev/null
+++ b/examples/example-push.c
@@ -0,0 +1,118 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <hiredis.h>
+#include <win32.h>
+
+#define KEY_COUNT 5
+
+#define panicAbort(fmt, ...) \
+ do { \
+ fprintf(stderr, "%s:%d:%s(): " fmt, __FILE__, __LINE__, __func__, __VA_ARGS__); \
+ exit(-1); \
+ } while (0)
+
+static void assertReplyAndFree(redisContext *context, redisReply *reply, int type) {
+ if (reply == NULL)
+ panicAbort("NULL reply from server (error: %s)", context->errstr);
+
+ if (reply->type != type) {
+ if (reply->type == REDIS_REPLY_ERROR)
+ fprintf(stderr, "Redis Error: %s\n", reply->str);
+
+ panicAbort("Expected reply type %d but got type %d", type, reply->type);
+ }
+
+ freeReplyObject(reply);
+}
+
+/* Switch to the RESP3 protocol and enable client tracking */
+static void enableClientTracking(redisContext *c) {
+ redisReply *reply = redisCommand(c, "HELLO 3");
+ if (reply == NULL || c->err) {
+ panicAbort("NULL reply or server error (error: %s)", c->errstr);
+ }
+
+ if (reply->type != REDIS_REPLY_MAP) {
+ fprintf(stderr, "Error: Can't send HELLO 3 command. Are you sure you're ");
+ fprintf(stderr, "connected to redis-server >= 6.0.0?\nRedis error: %s\n",
+ reply->type == REDIS_REPLY_ERROR ? reply->str : "(unknown)");
+ exit(-1);
+ }
+
+ freeReplyObject(reply);
+
+ /* Enable client tracking */
+ reply = redisCommand(c, "CLIENT TRACKING ON");
+ assertReplyAndFree(c, reply, REDIS_REPLY_STATUS);
+}
+
+void pushReplyHandler(void *privdata, void *r) {
+ redisReply *reply = r;
+ int *invalidations = privdata;
+
+ /* Sanity check on the invalidation reply */
+ if (reply->type != REDIS_REPLY_PUSH || reply->elements != 2 ||
+ reply->element[1]->type != REDIS_REPLY_ARRAY ||
+ reply->element[1]->element[0]->type != REDIS_REPLY_STRING)
+ {
+ panicAbort("%s", "Can't parse PUSH message!");
+ }
+
+ /* Increment our invalidation count */
+ *invalidations += 1;
+
+ printf("pushReplyHandler(): INVALIDATE '%s' (invalidation count: %d)\n",
+ reply->element[1]->element[0]->str, *invalidations);
+
+ freeReplyObject(reply);
+}
+
+int main(int argc, char **argv) {
+ unsigned int j, invalidations = 0;
+ redisContext *c;
+ redisReply *reply;
+
+ const char *hostname = (argc > 1) ? argv[1] : "127.0.0.1";
+ int port = (argc > 2) ? atoi(argv[2]) : 6379;
+
+ redisOptions o = {0};
+ REDIS_OPTIONS_SET_TCP(&o, hostname, port);
+
+ /* Set our custom PUSH message handler */
+ o.push_cb = pushReplyHandler;
+
+ c = redisConnectWithOptions(&o);
+ if (c == NULL || c->err)
+ panicAbort("Connection error: %s", c ? c->errstr : "OOM");
+
+ /* Enable RESP3 and turn on client tracking */
+ enableClientTracking(c);
+
+ /* Set our context privdata to the address of our invalidation counter. Each
+ * time our PUSH handler is called, hiredis will pass the privdata for context */
+ c->privdata = &invalidations;
+
+ /* Set some keys and then read them back. Once we do that, Redis will deliver
+ * invalidation push messages whenever the key is modified */
+ for (j = 0; j < KEY_COUNT; j++) {
+ reply = redisCommand(c, "SET key:%d initial:%d", j, j);
+ assertReplyAndFree(c, reply, REDIS_REPLY_STATUS);
+
+ reply = redisCommand(c, "GET key:%d", j);
+ assertReplyAndFree(c, reply, REDIS_REPLY_STRING);
+ }
+
+ /* Trigger invalidation messages by updating keys we just read */
+ for (j = 0; j < KEY_COUNT; j++) {
+ printf(" main(): SET key:%d update:%d\n", j, j);
+ reply = redisCommand(c, "SET key:%d update:%d", j, j);
+ assertReplyAndFree(c, reply, REDIS_REPLY_STATUS);
+ printf(" main(): SET REPLY OK\n");
+ }
+
+ printf("\nTotal detected invalidations: %d, expected: %d\n", invalidations, KEY_COUNT);
+
+ /* PING server */
+ redisFree(c);
+}
diff --git a/hiredis.c b/hiredis.c
index 7eaa991..e9761ec 100644
--- a/hiredis.c
+++ b/hiredis.c
@@ -184,7 +184,8 @@ static void *createArrayObject(const redisReadTask *task, size_t elements) {
parent = task->parent->obj;
assert(parent->type == REDIS_REPLY_ARRAY ||
parent->type == REDIS_REPLY_MAP ||
- parent->type == REDIS_REPLY_SET);
+ parent->type == REDIS_REPLY_SET ||
+ parent->type == REDIS_REPLY_PUSH);
parent->element[task->idx] = r;
}
return r;
@@ -679,6 +680,11 @@ redisReader *redisReaderCreate(void) {
return redisReaderCreateWithFunctions(&defaultFunctions);
}
+static void redisPushAutoFree(void *privdata, void *reply) {
+ (void)privdata;
+ freeReplyObject(reply);
+}
+
static redisContext *redisContextInit(const redisOptions *options) {
redisContext *c;
@@ -687,6 +693,14 @@ static redisContext *redisContextInit(const redisOptions *options) {
return NULL;
c->funcs = &redisContextDefaultFuncs;
+
+ /* Set any user supplied RESP3 PUSH handler or use freeReplyObject
+ * as a default unless specifically flagged that we don't want one. */
+ if (options->push_cb != NULL)
+ redisSetPushCallback(c, options->push_cb);
+ else if (!(options->options & REDIS_OPT_NO_PUSH_AUTOFREE))
+ redisSetPushCallback(c, redisPushAutoFree);
+
c->obuf = sdsempty();
c->reader = redisReaderCreate();
c->fd = REDIS_INVALID_FD;
@@ -773,7 +787,7 @@ redisContext *redisConnectWithOptions(const redisOptions *options) {
c->flags |= REDIS_REUSEADDR;
}
if (options->options & REDIS_OPT_NOAUTOFREE) {
- c->flags |= REDIS_NO_AUTO_FREE;
+ c->flags |= REDIS_NO_AUTO_FREE;
}
if (options->type == REDIS_CONN_TCP) {
@@ -876,6 +890,13 @@ int redisEnableKeepAlive(redisContext *c) {
return REDIS_OK;
}
+/* Set a user provided RESP3 PUSH handler and return any old one set. */
+redisPushFn *redisSetPushCallback(redisContext *c, redisPushFn *fn) {
+ redisPushFn *old = c->push_cb;
+ c->push_cb = fn;
+ return old;
+}
+
/* Use this function to handle a read event on the descriptor. It will try
* and read some bytes from the socket and feed them to the reply parser.
*
@@ -947,9 +968,21 @@ int redisGetReplyFromReader(redisContext *c, void **reply) {
__redisSetError(c,c->reader->err,c->reader->errstr);
return REDIS_ERR;
}
+
return REDIS_OK;
}
+/* Internal helper that returns 1 if the reply was a RESP3 PUSH
+ * message and we handled it with a user-provided callback. */
+static int redisHandledPushReply(redisContext *c, void *reply) {
+ if (reply && c->push_cb && redisIsPushReply(reply)) {
+ c->push_cb(c->privdata, reply);
+ return 1;
+ }
+
+ return 0;
+}
+
int redisGetReply(redisContext *c, void **reply) {
int wdone = 0;
void *aux = NULL;
@@ -970,8 +1003,13 @@ int redisGetReply(redisContext *c, void **reply) {
do {
if (redisBufferRead(c) == REDIS_ERR)
return REDIS_ERR;
- if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
- return REDIS_ERR;
+
+ /* We loop here in case the user has specified a RESP3
+ * PUSH handler (e.g. for client tracking). */
+ do {
+ if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
+ return REDIS_ERR;
+ } while (redisHandledPushReply(c, aux));
} while (aux == NULL);
}
diff --git a/hiredis.h b/hiredis.h
index bf10509..20e9bfa 100644
--- a/hiredis.h
+++ b/hiredis.h
@@ -92,6 +92,15 @@ typedef long long ssize_t;
* SO_REUSEADDR is being used. */
#define REDIS_CONNECT_RETRIES 10
+/* Forward declarations for structs defined elsewhere */
+struct redisAsyncContext;
+struct redisContext;
+
+/* RESP3 push helpers and callback prototypes */
+#define redisIsPushReply(r) (((redisReply*)(r))->type == REDIS_REPLY_PUSH)
+typedef void (redisPushFn)(void *, void *);
+typedef void (redisAsyncPushFn)(struct redisAsyncContext *, void *);
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -140,6 +149,9 @@ struct redisSsl;
*/
#define REDIS_OPT_NOAUTOFREE 0x04
+/* Don't automatically intercept and free RESP3 PUSH replies. */
+#define REDIS_OPT_NO_PUSH_AUTOFREE 0x08
+
/* In Unix systems a file descriptor is a regular signed int, with -1
* representing an invalid descriptor. In Windows it is a SOCKET
* (32- or 64-bit unsigned integer depending on the architecture), where
@@ -180,6 +192,10 @@ typedef struct {
* file descriptor */
redisFD fd;
} endpoint;
+
+ /* A user defined PUSH message callback */
+ redisPushFn *push_cb;
+ redisAsyncPushFn *async_push_cb;
} redisOptions;
/**
@@ -194,9 +210,6 @@ typedef struct {
(opts)->type = REDIS_CONN_UNIX; \
(opts)->endpoint.unix_socket = path;
-struct redisAsyncContext;
-struct redisContext;
-
typedef struct redisContextFuncs {
void (*free_privdata)(void *);
void (*async_read)(struct redisAsyncContext *);
@@ -235,6 +248,9 @@ typedef struct redisContext {
/* Additional private data for hiredis addons such as SSL */
void *privdata;
+
+ /* An optional RESP3 PUSH handler */
+ redisPushFn *push_cb;
} redisContext;
redisContext *redisConnectWithOptions(const redisOptions *options);
@@ -261,6 +277,7 @@ redisContext *redisConnectFd(redisFD fd);
*/
int redisReconnect(redisContext *c);
+redisPushFn *redisSetPushCallback(redisContext *c, redisPushFn *fn);
int redisSetTimeout(redisContext *c, const struct timeval tv);
int redisEnableKeepAlive(redisContext *c);
void redisFree(redisContext *c);
diff --git a/test.c b/test.c
index 2aef0d5..2216ea3 100644
--- a/test.c
+++ b/test.c
@@ -13,6 +13,7 @@
#include <limits.h>
#include "hiredis.h"
+#include "async.h"
#ifdef HIREDIS_TEST_SSL
#include "hiredis_ssl.h"
#endif
@@ -77,6 +78,43 @@ static long long usec(void) {
#define assert(e) (void)(e)
#endif
+/* Helper to extract Redis version information. Aborts on any failure. */
+#define REDIS_VERSION_FIELD "redis_version:"
+void get_redis_version(redisContext *c, int *majorptr, int *minorptr) {
+ redisReply *reply;
+ char *eptr, *s, *e;
+ int major, minor;
+
+ reply = redisCommand(c, "INFO");
+ if (reply == NULL || c->err || reply->type != REDIS_REPLY_STRING)
+ goto abort;
+ if ((s = strstr(reply->str, REDIS_VERSION_FIELD)) == NULL)
+ goto abort;
+
+ s += strlen(REDIS_VERSION_FIELD);
+
+ /* We need a field terminator and at least 'x.y.z' (5) bytes of data */
+ if ((e = strstr(s, "\r\n")) == NULL || (e - s) < 5)
+ goto abort;
+
+ /* Extract version info */
+ major = strtol(s, &eptr, 10);
+ if (*eptr != '.') goto abort;
+ minor = strtol(eptr+1, NULL, 10);
+
+ /* Push info the caller wants */
+ if (majorptr) *majorptr = major;
+ if (minorptr) *minorptr = minor;
+
+ freeReplyObject(reply);
+ return;
+
+abort:
+ freeReplyObject(reply);
+ fprintf(stderr, "Error: Cannot determine Redis version, aborting\n");
+ exit(1);
+}
+
static redisContext *select_database(redisContext *c) {
redisReply *reply;
@@ -99,6 +137,26 @@ static redisContext *select_database(redisContext *c) {
return c;
}
+/* Switch protocol */
+static void send_hello(redisContext *c, int version) {
+ redisReply *reply;
+ int expected;
+
+ reply = redisCommand(c, "HELLO %d", version);
+ expected = version == 3 ? REDIS_REPLY_MAP : REDIS_REPLY_ARRAY;
+ assert(reply != NULL && reply->type == expected);
+ freeReplyObject(reply);
+}
+
+/* Togggle client tracking */
+static void send_client_tracking(redisContext *c, const char *str) {
+ redisReply *reply;
+
+ reply = redisCommand(c, "CLIENT TRACKING %s", str);
+ assert(reply != NULL && reply->type == REDIS_REPLY_STATUS);
+ freeReplyObject(reply);
+}
+
static int disconnect(redisContext *c, int keep_fd) {
redisReply *reply;
@@ -615,9 +673,123 @@ static void test_blocking_connection_errors(void) {
#endif
}
+/* Dummy push handler */
+void push_handler(void *privdata, void *reply) {
+ int *counter = privdata;
+ freeReplyObject(reply);
+ *counter += 1;
+}
+
+/* Dummy function just to test setting a callback with redisOptions */
+void push_handler_async(redisAsyncContext *ac, void *reply) {
+ (void)ac;
+ (void)reply;
+}
+
+static void test_resp3_push_handler(redisContext *c) {
+ redisPushFn *old = NULL;
+ redisReply *reply;
+ void *privdata;
+ int n = 0;
+
+ /* Switch to RESP3 and turn on client tracking */
+ send_hello(c, 3);
+ send_client_tracking(c, "ON");
+ privdata = c->privdata;
+ c->privdata = &n;
+
+ reply = redisCommand(c, "GET key:0");
+ assert(reply != NULL);
+ freeReplyObject(reply);
+
+ test("RESP3 PUSH messages are handled out of band by default: ");
+ reply = redisCommand(c, "SET key:0 val:0");
+ test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS);
+ freeReplyObject(reply);
+
+ assert((reply = redisCommand(c, "GET key:0")) != NULL);
+ freeReplyObject(reply);
+
+ old = redisSetPushCallback(c, push_handler);
+ test("We can set a custom RESP3 PUSH handler: ");
+ reply = redisCommand(c, "SET key:0 val:0");
+ test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && n == 1);
+ freeReplyObject(reply);
+
+ /* Unset the push callback and generate an invalidate message making
+ * sure it is not handled out of band. */
+ test("With no handler, PUSH replies come in-band: ");
+ redisSetPushCallback(c, NULL);
+ assert((reply = redisCommand(c, "GET key:0")) != NULL);
+ freeReplyObject(reply);
+ assert((reply = redisCommand(c, "SET key:0 invalid")) != NULL);
+ test_cond(reply->type == REDIS_REPLY_PUSH);
+ freeReplyObject(reply);
+
+ test("With no PUSH handler, no replies are lost: ");
+ assert(redisGetReply(c, (void**)&reply) == REDIS_OK);
+ test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS);
+ freeReplyObject(reply);
+
+ /* Return to the originally set PUSH handler */
+ assert(old != NULL);
+ redisSetPushCallback(c, old);
+
+ /* Switch back to RESP2 and disable tracking */
+ c->privdata = privdata;
+ send_client_tracking(c, "OFF");
+ send_hello(c, 2);
+}
+
+redisOptions get_redis_tcp_options(struct config config) {
+ redisOptions options = {0};
+ REDIS_OPTIONS_SET_TCP(&options, config.tcp.host, config.tcp.port);
+ return options;
+}
+
+static void test_resp3_push_options(struct config config) {
+ redisAsyncContext *ac;
+ redisContext *c;
+ redisOptions options;
+
+ test("We set a default RESP3 handler for redisContext: ");
+ options = get_redis_tcp_options(config);
+ assert((c = redisConnectWithOptions(&options)) != NULL);
+ test_cond(c->push_cb != NULL);
+ redisFree(c);
+
+ test("We don't set a default RESP3 push handler for redisAsyncContext: ");
+ options = get_redis_tcp_options(config);
+ assert((ac = redisAsyncConnectWithOptions(&options)) != NULL);
+ test_cond(ac->c.push_cb == NULL);
+ redisAsyncFree(ac);
+
+ test("Our REDIS_OPT_NO_PUSH_AUTOFREE flag works: ");
+ options = get_redis_tcp_options(config);
+ options.options |= REDIS_OPT_NO_PUSH_AUTOFREE;
+ assert((c = redisConnectWithOptions(&options)) != NULL);
+ test_cond(c->push_cb == NULL);
+ redisFree(c);
+
+ test("We can use redisOptions to set a custom PUSH handler for redisContext: ");
+ options = get_redis_tcp_options(config);
+ options.push_cb = push_handler;
+ assert((c = redisConnectWithOptions(&options)) != NULL);
+ test_cond(c->push_cb == push_handler);
+ redisFree(c);
+
+ test("We can use redisOptions to set a custom PUSH handler for redisAsyncContext: ");
+ options = get_redis_tcp_options(config);
+ options.async_push_cb = push_handler_async;
+ assert((ac = redisAsyncConnectWithOptions(&options)) != NULL);
+ test_cond(ac->push_cb == push_handler_async);
+ redisAsyncFree(ac);
+}
+
static void test_blocking_connection(struct config config) {
redisContext *c;
redisReply *reply;
+ int major;
c = do_connect(config);
@@ -695,6 +867,10 @@ static void test_blocking_connection(struct config config) {
assert(redisAppendCommand(c, "PING") == REDIS_OK);
test_cond(redisGetReply(c, NULL) == REDIS_OK);
+ get_redis_version(c, &major, NULL);
+ if (major >= 6) test_resp3_push_handler(c);
+ test_resp3_push_options(config);
+
disconnect(c, 0);
}
@@ -780,18 +956,7 @@ static void test_blocking_io_errors(struct config config) {
/* Connect to target given by config. */
c = do_connect(config);
- {
- /* Find out Redis version to determine the path for the next test */
- const char *field = "redis_version:";
- char *p, *eptr;
-
- reply = redisCommand(c,"INFO");
- p = strstr(reply->str,field);
- major = strtol(p+strlen(field),&eptr,10);
- p = eptr+1; /* char next to the first "." */
- minor = strtol(p,&eptr,10);
- freeReplyObject(reply);
- }
+ get_redis_version(c, &major, &minor);
test("Returns I/O error when the connection is lost: ");
reply = redisCommand(c,"QUIT");