diff options
| author | Michael Grunder <michael.grunder@gmail.com> | 2020-07-19 18:54:42 -0700 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-07-19 18:54:42 -0700 | 
| commit | 2e7d7cbabd32912342218078282fce92f6cc0ab6 (patch) | |
| tree | ba60c50c28b433aef6e128a67522085751acb6cb | |
| parent | 1864e76ea7323fd8789d9c8b5b3c8ca27d4840a6 (diff) | |
| download | hiredict-2e7d7cbabd32912342218078282fce92f6cc0ab6.tar.xz | |
Resp3 oob push support (#841)
Proper support for RESP3 PUSH messages.
By default, PUSH messages are now intercepted and the reply memory freed.  
This means existing code should work unchanged when connecting to Redis
>= 6.0.0 even if `CLIENT TRACKING` were then enabled.
Additionally, we define two callbacks users can configure if they wish to handle
these messages in a custom way:
void redisPushFn(void *privdata, void *reply);
void redisAsyncPushFn(redisAsyncContext *ac, void *reply);
See #825
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | .travis.yml | 3 | ||||
| -rw-r--r-- | Makefile | 6 | ||||
| -rw-r--r-- | README.md | 78 | ||||
| -rw-r--r-- | async.c | 64 | ||||
| -rw-r--r-- | async.h | 4 | ||||
| -rw-r--r-- | examples/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | examples/example-push.c | 118 | ||||
| -rw-r--r-- | hiredis.c | 46 | ||||
| -rw-r--r-- | hiredis.h | 23 | ||||
| -rw-r--r-- | test.c | 189 | 
11 files changed, 505 insertions, 30 deletions
| @@ -6,3 +6,4 @@  /*.a  /*.pc  *.dSYM +tags diff --git a/.travis.yml b/.travis.yml index 792d175..d0a551a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,6 +24,8 @@ before_script:  addons:    apt: +    sources: +    - sourceline: 'ppa:chris-lea/redis-server'      packages:      - libc6-dbg      - libc6-dev @@ -35,6 +37,7 @@ addons:      - libssl-dev      - libssl-dev:i386      - valgrind +    - redis  env:    - BITS="32" @@ -5,7 +5,7 @@  OBJ=alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o  SSL_OBJ=ssl.o -EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib +EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push  ifeq ($(USE_SSL),1)  EXAMPLES+=hiredis-example-ssl hiredis-example-libevent-ssl  endif @@ -161,6 +161,7 @@ hiredis-example-macosx: examples/example-macosx.c adapters/macosx.h $(STLIBNAME)  hiredis-example-ssl: examples/example-ssl.c $(STLIBNAME) $(SSL_STLIBNAME)  	$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(SSL_STLIBNAME) $(REAL_LDFLAGS) $(SSL_LDFLAGS) +  ifndef AE_DIR  hiredis-example-ae:  	@echo "Please specify AE_DIR (e.g. <redis repository>/src)" @@ -195,6 +196,9 @@ endif  hiredis-example: examples/example.c $(STLIBNAME)  	$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS) +hiredis-example-push: examples/example-push.c $(STLIBNAME) +	$(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS) +  examples: $(EXAMPLES)  TEST_LIBS = $(STLIBNAME) @@ -1,6 +1,6 @@  [](https://travis-ci.org/redis/hiredis) -**This Readme reflects the latest changed in the master branch. See [v0.13.3](https://github.com/redis/hiredis/tree/v0.13.3) for the Readme and documentation for the latest release ([API/ABI history](https://abi-laboratory.pro/?view=timeline&l=hiredis)).** +**This Readme reflects the latest changed in the master branch. See [v0.14.1](https://github.com/redis/hiredis/tree/v0.14.1) for the Readme and documentation for the latest release ([API/ABI history](https://abi-laboratory.pro/?view=timeline&l=hiredis)).**  # HIREDIS @@ -500,12 +500,75 @@ if (redisInitiateSSLWithContext(c, ssl) != REDIS_OK) {  }  ``` +## RESP3 PUSH replies +Redis 6.0 introduced PUSH replies with the reply-type `>`.  These messages are generated spontaneously and can arrive at any time, so must be handled using callbacks. + +### Default behavior +Hiredis installs handlers on `redisContext` and `redisAsyncContext` by default, which will intercept and free any PUSH replies detected.  This means existing code will work as-is after upgrading to Redis 6 and switching to `RESP3`. + +### Custom PUSH handler prototypes +The callback prototypes differ between `redisContext` and `redisAsyncContext`. + +#### redisContext +```c +void my_push_handler(void *privdata, void *reply) { +    /* Handle the reply */ + +    /* Note: We need to free the reply in our custom handler for +             blocking contexts.  This lets us keep the reply if +             we want. */ +    freeReplyObject(reply); +} +``` + +#### redisAsyncContext +```c +void my_async_push_handler(redisAsyncContext *ac, void *reply) { +    /* Handle the reply */ + +	/* Note:  Because async hiredis always frees replies, you should +	          not call freeReplyObject in an async push callback. */ +} +``` + +### Installing a custom handler +There are two ways to set your own PUSH handlers. + +1. Set `push_cb` or `async_push_cb` in the `redisOptions` struct and connect with `redisConnectWithOptions` or `redisAsyncConnectWithOptions`. +    ```c +    redisOptions = {0}; +    REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379); +    options->push_cb = my_push_handler; +    redisContext *context = redisConnectWithOptions(&options); +    ``` +2.  Call `redisSetPushCallback` or `redisAsyncSetPushCallback` on a connected context. +    ```c +    redisContext *context = redisConnect("127.0.0.1", 6379); +    redisSetPushCallback(context, my_push_handler); +    ``` + +    _Note `redisSetPushCallback` and `redisAsyncSetPushCallback` both return any currently configured handler,  making it easy to override and then return to the old value._ + +### Specifying no handler +If you have a unique use-case where you don't want hiredis to automatically intercept and free PUSH replies, you will want to configure no handler at all.  This can be done in two ways. +1.  Set the `REDIS_OPT_NO_PUSH_AUTOFREE` flag in `redisOptions` and leave the callback function pointer `NULL`. +    ```c +    redisOptions = {0}; +    REDIS_OPTIONS_SET_TCP(&options, "127.0.0.1", 6379); +    options->options |= REDIS_OPT_NO_PUSH_AUTOFREE; +    redisContext *context = redisConnectWithOptions(&options); +    ``` +3.  Call `redisSetPushCallback` with `NULL` once connected. +    ```c +    redisContext *context = redisConnect("127.0.0.1", 6379); +    redisSetPushCallback(context, NULL); +    ``` + +    _Note:  With no handler configured, calls to `redisCommand` may generate more than one reply, so this strategy is only applicable when there's some kind of blocking`redisGetReply()` loop (e.g. `MONITOR` or `SUBSCRIBE` workloads)._ +  ## Allocator injection -Hiredis uses a pass-thru structure of function pointers defined in -[alloc.h](https://github.com/redis/hiredis/blob/f5d25850/alloc.h#L41) that conttain -the currently configured allocation and deallocation functions.  By default they -just point to libc (`malloc`, `calloc`, `realloc`, etc). +Hiredis uses a pass-thru structure of function pointers defined in [alloc.h](https://github.com/redis/hiredis/blob/f5d25850/alloc.h#L41) that contain the currently configured allocation and deallocation functions.  By default they just point to libc (`malloc`, `calloc`, `realloc`, etc).  ### Overriding @@ -532,5 +595,6 @@ hiredisResetAllocators();  ## AUTHORS -Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and -Pieter Noordhuis (pcnoordhuis at gmail) and is released under the BSD license. +Hiredis was written by Salvatore Sanfilippo (antirez at gmail), +Pieter Noordhuis (pcnoordhuis at gmail), and Michael Grunder +(michael dot grunder at gmail) and is released under the BSD license. @@ -167,16 +167,26 @@ redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) {      redisContext *c;      redisAsyncContext *ac; +    /* Clear any erroneously set sync callback and flag that we don't want to +     * use freeReplyObject by default. */ +    myOptions.push_cb = NULL; +    myOptions.options |= REDIS_OPT_NO_PUSH_AUTOFREE; +      myOptions.options |= REDIS_OPT_NONBLOCK;      c = redisConnectWithOptions(&myOptions);      if (c == NULL) {          return NULL;      } +      ac = redisAsyncInitialize(c);      if (ac == NULL) {          redisFree(c);          return NULL;      } + +    /* Set any configured async push handler */ +    redisAsyncSetPushCallback(ac, myOptions.async_push_cb); +      __redisAsyncCopyError(ac);      return ac;  } @@ -279,6 +289,14 @@ static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisRe      }  } +static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) { +    if (ac->push_cb != NULL) { +        ac->c.flags |= REDIS_IN_CALLBACK; +        ac->push_cb(ac, reply); +        ac->c.flags &= ~REDIS_IN_CALLBACK; +    } +} +  /* Helper function to free the context. */  static void __redisAsyncFree(redisAsyncContext *ac) {      redisContext *c = &(ac->c); @@ -294,7 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) {      while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)          __redisRunCallback(ac,&cb,NULL); -    /* Run subscription callbacks callbacks with NULL reply */ +    /* Run subscription callbacks with NULL reply */      if (ac->sub.channels) {          it = dictGetIterator(ac->sub.channels);          if (it != NULL) { @@ -459,6 +477,30 @@ oom:      return REDIS_ERR;  } +#define redisIsSpontaneousPushReply(r) \ +    (redisIsPushReply(r) && !redisIsSubscribeReply(r)) + +static int redisIsSubscribeReply(redisReply *reply) { +    char *str; +    size_t len, off; + +    /* We will always have at least one string with the subscribe/message type */ +    if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING || +        reply->element[0]->len < sizeof("message") - 1) +    { +        return 0; +    } + +    /* Get the string/len moving past 'p' if needed */ +    off = tolower(reply->element[0]->str[0]) == 'p'; +    str = reply->element[0]->str + off; +    len = reply->element[0]->len - off; + +    return !strncasecmp(str, "subscribe", len) || +           !strncasecmp(str, "message", len); + +} +  void redisProcessCallbacks(redisAsyncContext *ac) {      redisContext *c = &(ac->c);      redisCallback cb = {NULL, NULL, 0, NULL}; @@ -485,8 +527,18 @@ void redisProcessCallbacks(redisAsyncContext *ac) {              break;          } -        /* Even if the context is subscribed, pending regular callbacks will -         * get a reply before pub/sub messages arrive. */ +        /* Send any non-subscribe related PUSH messages to our PUSH handler +         * while allowing subscribe related PUSH messages to pass through. +         * This allows existing code to be backward compatible and work in +         * either RESP2 or RESP3 mode. */ +        if (redisIsSpontaneousPushReply(reply)) { +            __redisRunPushCallback(ac, reply); +            c->reader->fn->freeObject(reply); +            continue; +        } + +        /* Even if the context is subscribed, pending regular +         * callbacks will get a reply before pub/sub messages arrive. */          if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {              /*               * A spontaneous reply in a not-subscribed context can be the error @@ -809,6 +861,12 @@ int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void      return status;  } +redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) { +    redisAsyncPushFn *old = ac->push_cb; +    ac->push_cb = fn; +    return old; +} +  int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {      if (!ac->c.timeout) {          ac->c.timeout = hi_calloc(1, sizeof(tv)); @@ -106,6 +106,9 @@ typedef struct redisAsyncContext {          struct dict *channels;          struct dict *patterns;      } sub; + +    /* Any configured RESP3 PUSH handler */ +    redisAsyncPushFn *push_cb;  } redisAsyncContext;  /* Functions that proxy to hiredis */ @@ -118,6 +121,7 @@ redisAsyncContext *redisAsyncConnectUnix(const char *path);  int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn);  int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); +redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn);  int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv);  void redisAsyncDisconnect(redisAsyncContext *ac);  void redisAsyncFree(redisAsyncContext *ac); diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index dd3a313..1d5bc56 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -44,3 +44,6 @@ ENDIF()  ADD_EXECUTABLE(example example.c)  TARGET_LINK_LIBRARIES(example hiredis) + +ADD_EXECUTABLE(example-push example-push.c) +TARGET_LINK_LIBRARIES(example-push hiredis) diff --git a/examples/example-push.c b/examples/example-push.c new file mode 100644 index 0000000..d8b41f9 --- /dev/null +++ b/examples/example-push.c @@ -0,0 +1,118 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <hiredis.h> +#include <win32.h> + +#define KEY_COUNT 5 + +#define panicAbort(fmt, ...) \ +    do { \ +        fprintf(stderr, "%s:%d:%s(): " fmt, __FILE__, __LINE__, __func__, __VA_ARGS__); \ +        exit(-1); \ +    } while (0) + +static void assertReplyAndFree(redisContext *context, redisReply *reply, int type) { +    if (reply == NULL) +        panicAbort("NULL reply from server (error: %s)", context->errstr); + +    if (reply->type != type) { +        if (reply->type == REDIS_REPLY_ERROR) +            fprintf(stderr, "Redis Error: %s\n", reply->str); + +        panicAbort("Expected reply type %d but got type %d", type, reply->type); +    } + +    freeReplyObject(reply); +} + +/* Switch to the RESP3 protocol and enable client tracking */ +static void enableClientTracking(redisContext *c) { +    redisReply *reply = redisCommand(c, "HELLO 3"); +    if (reply == NULL || c->err) { +        panicAbort("NULL reply or server error (error: %s)", c->errstr); +    } + +    if (reply->type != REDIS_REPLY_MAP) { +        fprintf(stderr, "Error: Can't send HELLO 3 command.  Are you sure you're "); +        fprintf(stderr, "connected to redis-server >= 6.0.0?\nRedis error: %s\n", +                        reply->type == REDIS_REPLY_ERROR ? reply->str : "(unknown)"); +        exit(-1); +    } + +    freeReplyObject(reply); + +    /* Enable client tracking */ +    reply = redisCommand(c, "CLIENT TRACKING ON"); +    assertReplyAndFree(c, reply, REDIS_REPLY_STATUS); +} + +void pushReplyHandler(void *privdata, void *r) { +    redisReply *reply = r; +    int *invalidations = privdata; + +    /* Sanity check on the invalidation reply */ +    if (reply->type != REDIS_REPLY_PUSH || reply->elements != 2 || +        reply->element[1]->type != REDIS_REPLY_ARRAY || +        reply->element[1]->element[0]->type != REDIS_REPLY_STRING) +    { +        panicAbort("%s", "Can't parse PUSH message!"); +    } + +    /* Increment our invalidation count */ +    *invalidations += 1; + +    printf("pushReplyHandler(): INVALIDATE '%s' (invalidation count: %d)\n", +           reply->element[1]->element[0]->str, *invalidations); + +    freeReplyObject(reply); +} + +int main(int argc, char **argv) { +    unsigned int j, invalidations = 0; +    redisContext *c; +    redisReply *reply; + +    const char *hostname = (argc > 1) ? argv[1] : "127.0.0.1"; +    int port = (argc > 2) ? atoi(argv[2]) : 6379; + +    redisOptions o = {0}; +    REDIS_OPTIONS_SET_TCP(&o, hostname, port); + +    /* Set our custom PUSH message handler */ +    o.push_cb = pushReplyHandler; + +    c = redisConnectWithOptions(&o); +    if (c == NULL || c->err) +        panicAbort("Connection error:  %s", c ? c->errstr : "OOM"); + +    /* Enable RESP3 and turn on client tracking */ +    enableClientTracking(c); + +    /* Set our context privdata to the address of our invalidation counter.  Each +     * time our PUSH handler is called, hiredis will pass the privdata for context */ +    c->privdata = &invalidations; + +    /* Set some keys and then read them back.  Once we do that, Redis will deliver +     * invalidation push messages whenever the key is modified */ +    for (j = 0; j < KEY_COUNT; j++) { +        reply = redisCommand(c, "SET key:%d initial:%d", j, j); +        assertReplyAndFree(c, reply, REDIS_REPLY_STATUS); + +        reply = redisCommand(c, "GET key:%d", j); +        assertReplyAndFree(c, reply, REDIS_REPLY_STRING); +    } + +    /* Trigger invalidation messages by updating keys we just read */ +    for (j = 0; j < KEY_COUNT; j++) { +        printf("            main(): SET key:%d update:%d\n", j, j); +        reply = redisCommand(c, "SET key:%d update:%d", j, j); +        assertReplyAndFree(c, reply, REDIS_REPLY_STATUS); +        printf("            main(): SET REPLY OK\n"); +    } + +    printf("\nTotal detected invalidations: %d, expected: %d\n", invalidations, KEY_COUNT); + +    /* PING server */ +    redisFree(c); +} @@ -184,7 +184,8 @@ static void *createArrayObject(const redisReadTask *task, size_t elements) {          parent = task->parent->obj;          assert(parent->type == REDIS_REPLY_ARRAY ||                 parent->type == REDIS_REPLY_MAP || -               parent->type == REDIS_REPLY_SET); +               parent->type == REDIS_REPLY_SET || +               parent->type == REDIS_REPLY_PUSH);          parent->element[task->idx] = r;      }      return r; @@ -679,6 +680,11 @@ redisReader *redisReaderCreate(void) {      return redisReaderCreateWithFunctions(&defaultFunctions);  } +static void redisPushAutoFree(void *privdata, void *reply) { +    (void)privdata; +    freeReplyObject(reply); +} +  static redisContext *redisContextInit(const redisOptions *options) {      redisContext *c; @@ -687,6 +693,14 @@ static redisContext *redisContextInit(const redisOptions *options) {          return NULL;      c->funcs = &redisContextDefaultFuncs; + +    /* Set any user supplied RESP3 PUSH handler or use freeReplyObject +     * as a default unless specifically flagged that we don't want one. */ +    if (options->push_cb != NULL) +        redisSetPushCallback(c, options->push_cb); +    else if (!(options->options & REDIS_OPT_NO_PUSH_AUTOFREE)) +        redisSetPushCallback(c, redisPushAutoFree); +      c->obuf = sdsempty();      c->reader = redisReaderCreate();      c->fd = REDIS_INVALID_FD; @@ -773,7 +787,7 @@ redisContext *redisConnectWithOptions(const redisOptions *options) {          c->flags |= REDIS_REUSEADDR;      }      if (options->options & REDIS_OPT_NOAUTOFREE) { -      c->flags |= REDIS_NO_AUTO_FREE; +        c->flags |= REDIS_NO_AUTO_FREE;      }      if (options->type == REDIS_CONN_TCP) { @@ -876,6 +890,13 @@ int redisEnableKeepAlive(redisContext *c) {      return REDIS_OK;  } +/* Set a user provided RESP3 PUSH handler and return any old one set. */ +redisPushFn *redisSetPushCallback(redisContext *c, redisPushFn *fn) { +    redisPushFn *old = c->push_cb; +    c->push_cb = fn; +    return old; +} +  /* Use this function to handle a read event on the descriptor. It will try   * and read some bytes from the socket and feed them to the reply parser.   * @@ -947,9 +968,21 @@ int redisGetReplyFromReader(redisContext *c, void **reply) {          __redisSetError(c,c->reader->err,c->reader->errstr);          return REDIS_ERR;      } +      return REDIS_OK;  } +/* Internal helper that returns 1 if the reply was a RESP3 PUSH + * message and we handled it with a user-provided callback. */ +static int redisHandledPushReply(redisContext *c, void *reply) { +    if (reply && c->push_cb && redisIsPushReply(reply)) { +        c->push_cb(c->privdata, reply); +        return 1; +    } + +    return 0; +} +  int redisGetReply(redisContext *c, void **reply) {      int wdone = 0;      void *aux = NULL; @@ -970,8 +1003,13 @@ int redisGetReply(redisContext *c, void **reply) {          do {              if (redisBufferRead(c) == REDIS_ERR)                  return REDIS_ERR; -            if (redisGetReplyFromReader(c,&aux) == REDIS_ERR) -                return REDIS_ERR; + +            /* We loop here in case the user has specified a RESP3 +             * PUSH handler (e.g. for client tracking). */ +            do { +                if (redisGetReplyFromReader(c,&aux) == REDIS_ERR) +                    return REDIS_ERR; +            } while (redisHandledPushReply(c, aux));          } while (aux == NULL);      } @@ -92,6 +92,15 @@ typedef long long ssize_t;   * SO_REUSEADDR is being used. */  #define REDIS_CONNECT_RETRIES  10 +/* Forward declarations for structs defined elsewhere */ +struct redisAsyncContext; +struct redisContext; + +/* RESP3 push helpers and callback prototypes */ +#define redisIsPushReply(r) (((redisReply*)(r))->type == REDIS_REPLY_PUSH) +typedef void (redisPushFn)(void *, void *); +typedef void (redisAsyncPushFn)(struct redisAsyncContext *, void *); +  #ifdef __cplusplus  extern "C" {  #endif @@ -140,6 +149,9 @@ struct redisSsl;   */  #define REDIS_OPT_NOAUTOFREE 0x04 +/* Don't automatically intercept and free RESP3 PUSH replies. */ +#define REDIS_OPT_NO_PUSH_AUTOFREE 0x08 +  /* In Unix systems a file descriptor is a regular signed int, with -1   * representing an invalid descriptor. In Windows it is a SOCKET   * (32- or 64-bit unsigned integer depending on the architecture), where @@ -180,6 +192,10 @@ typedef struct {           * file descriptor */          redisFD fd;      } endpoint; + +    /* A user defined PUSH message callback */ +    redisPushFn *push_cb; +    redisAsyncPushFn *async_push_cb;  } redisOptions;  /** @@ -194,9 +210,6 @@ typedef struct {      (opts)->type = REDIS_CONN_UNIX;        \      (opts)->endpoint.unix_socket = path; -struct redisAsyncContext; -struct redisContext; -  typedef struct redisContextFuncs {      void (*free_privdata)(void *);      void (*async_read)(struct redisAsyncContext *); @@ -235,6 +248,9 @@ typedef struct redisContext {      /* Additional private data for hiredis addons such as SSL */      void *privdata; + +    /* An optional RESP3 PUSH handler */ +    redisPushFn *push_cb;  } redisContext;  redisContext *redisConnectWithOptions(const redisOptions *options); @@ -261,6 +277,7 @@ redisContext *redisConnectFd(redisFD fd);   */  int redisReconnect(redisContext *c); +redisPushFn *redisSetPushCallback(redisContext *c, redisPushFn *fn);  int redisSetTimeout(redisContext *c, const struct timeval tv);  int redisEnableKeepAlive(redisContext *c);  void redisFree(redisContext *c); @@ -13,6 +13,7 @@  #include <limits.h>  #include "hiredis.h" +#include "async.h"  #ifdef HIREDIS_TEST_SSL  #include "hiredis_ssl.h"  #endif @@ -77,6 +78,43 @@ static long long usec(void) {  #define assert(e) (void)(e)  #endif +/* Helper to extract Redis version information.  Aborts on any failure. */ +#define REDIS_VERSION_FIELD "redis_version:" +void get_redis_version(redisContext *c, int *majorptr, int *minorptr) { +    redisReply *reply; +    char *eptr, *s, *e; +    int major, minor; + +    reply = redisCommand(c, "INFO"); +    if (reply == NULL || c->err || reply->type != REDIS_REPLY_STRING) +        goto abort; +    if ((s = strstr(reply->str, REDIS_VERSION_FIELD)) == NULL) +        goto abort; + +    s += strlen(REDIS_VERSION_FIELD); + +    /* We need a field terminator and at least 'x.y.z' (5) bytes of data */ +    if ((e = strstr(s, "\r\n")) == NULL || (e - s) < 5) +        goto abort; + +    /* Extract version info */ +    major = strtol(s, &eptr, 10); +    if (*eptr != '.') goto abort; +    minor = strtol(eptr+1, NULL, 10); + +    /* Push info the caller wants */ +    if (majorptr) *majorptr = major; +    if (minorptr) *minorptr = minor; + +    freeReplyObject(reply); +    return; + +abort: +    freeReplyObject(reply); +    fprintf(stderr, "Error:  Cannot determine Redis version, aborting\n"); +    exit(1); +} +  static redisContext *select_database(redisContext *c) {      redisReply *reply; @@ -99,6 +137,26 @@ static redisContext *select_database(redisContext *c) {      return c;  } +/* Switch protocol */ +static void send_hello(redisContext *c, int version) { +    redisReply *reply; +    int expected; + +    reply = redisCommand(c, "HELLO %d", version); +    expected = version == 3 ? REDIS_REPLY_MAP : REDIS_REPLY_ARRAY; +    assert(reply != NULL && reply->type == expected); +    freeReplyObject(reply); +} + +/* Togggle client tracking */ +static void send_client_tracking(redisContext *c, const char *str) { +    redisReply *reply; + +    reply = redisCommand(c, "CLIENT TRACKING %s", str); +    assert(reply != NULL && reply->type == REDIS_REPLY_STATUS); +    freeReplyObject(reply); +} +  static int disconnect(redisContext *c, int keep_fd) {      redisReply *reply; @@ -615,9 +673,123 @@ static void test_blocking_connection_errors(void) {  #endif  } +/* Dummy push handler */ +void push_handler(void *privdata, void *reply) { +    int *counter = privdata; +    freeReplyObject(reply); +    *counter += 1; +} + +/* Dummy function just to test setting a callback with redisOptions */ +void push_handler_async(redisAsyncContext *ac, void *reply) { +    (void)ac; +    (void)reply; +} + +static void test_resp3_push_handler(redisContext *c) { +    redisPushFn *old = NULL; +    redisReply *reply; +    void *privdata; +    int n = 0; + +    /* Switch to RESP3 and turn on client tracking */ +    send_hello(c, 3); +    send_client_tracking(c, "ON"); +    privdata = c->privdata; +    c->privdata = &n; + +    reply = redisCommand(c, "GET key:0"); +    assert(reply != NULL); +    freeReplyObject(reply); + +    test("RESP3 PUSH messages are handled out of band by default: "); +    reply = redisCommand(c, "SET key:0 val:0"); +    test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS); +    freeReplyObject(reply); + +    assert((reply = redisCommand(c, "GET key:0")) != NULL); +    freeReplyObject(reply); + +    old = redisSetPushCallback(c, push_handler); +    test("We can set a custom RESP3 PUSH handler: "); +    reply = redisCommand(c, "SET key:0 val:0"); +    test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && n == 1); +    freeReplyObject(reply); + +    /* Unset the push callback and generate an invalidate message making +     * sure it is not handled out of band. */ +    test("With no handler, PUSH replies come in-band: "); +    redisSetPushCallback(c, NULL); +    assert((reply = redisCommand(c, "GET key:0")) != NULL); +    freeReplyObject(reply); +    assert((reply = redisCommand(c, "SET key:0 invalid")) != NULL); +    test_cond(reply->type == REDIS_REPLY_PUSH); +    freeReplyObject(reply); + +    test("With no PUSH handler, no replies are lost: "); +    assert(redisGetReply(c, (void**)&reply) == REDIS_OK); +    test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS); +    freeReplyObject(reply); + +    /* Return to the originally set PUSH handler */ +    assert(old != NULL); +    redisSetPushCallback(c, old); + +    /* Switch back to RESP2 and disable tracking */ +    c->privdata = privdata; +    send_client_tracking(c, "OFF"); +    send_hello(c, 2); +} + +redisOptions get_redis_tcp_options(struct config config) { +    redisOptions options = {0}; +    REDIS_OPTIONS_SET_TCP(&options, config.tcp.host, config.tcp.port); +    return options; +} + +static void test_resp3_push_options(struct config config) { +    redisAsyncContext *ac; +    redisContext *c; +    redisOptions options; + +    test("We set a default RESP3 handler for redisContext: "); +    options = get_redis_tcp_options(config); +    assert((c = redisConnectWithOptions(&options)) != NULL); +    test_cond(c->push_cb != NULL); +    redisFree(c); + +    test("We don't set a default RESP3 push handler for redisAsyncContext: "); +    options = get_redis_tcp_options(config); +    assert((ac = redisAsyncConnectWithOptions(&options)) != NULL); +    test_cond(ac->c.push_cb == NULL); +    redisAsyncFree(ac); + +    test("Our REDIS_OPT_NO_PUSH_AUTOFREE flag works: "); +    options = get_redis_tcp_options(config); +    options.options |= REDIS_OPT_NO_PUSH_AUTOFREE; +    assert((c = redisConnectWithOptions(&options)) != NULL); +    test_cond(c->push_cb == NULL); +    redisFree(c); + +    test("We can use redisOptions to set a custom PUSH handler for redisContext: "); +    options = get_redis_tcp_options(config); +    options.push_cb = push_handler; +    assert((c = redisConnectWithOptions(&options)) != NULL); +    test_cond(c->push_cb == push_handler); +    redisFree(c); + +    test("We can use redisOptions to set a custom PUSH handler for redisAsyncContext: "); +    options = get_redis_tcp_options(config); +    options.async_push_cb = push_handler_async; +    assert((ac = redisAsyncConnectWithOptions(&options)) != NULL); +    test_cond(ac->push_cb == push_handler_async); +    redisAsyncFree(ac); +} +  static void test_blocking_connection(struct config config) {      redisContext *c;      redisReply *reply; +    int major;      c = do_connect(config); @@ -695,6 +867,10 @@ static void test_blocking_connection(struct config config) {      assert(redisAppendCommand(c, "PING") == REDIS_OK);      test_cond(redisGetReply(c, NULL) == REDIS_OK); +    get_redis_version(c, &major, NULL); +    if (major >= 6) test_resp3_push_handler(c); +    test_resp3_push_options(config); +      disconnect(c, 0);  } @@ -780,18 +956,7 @@ static void test_blocking_io_errors(struct config config) {      /* Connect to target given by config. */      c = do_connect(config); -    { -        /* Find out Redis version to determine the path for the next test */ -        const char *field = "redis_version:"; -        char *p, *eptr; - -        reply = redisCommand(c,"INFO"); -        p = strstr(reply->str,field); -        major = strtol(p+strlen(field),&eptr,10); -        p = eptr+1; /* char next to the first "." */ -        minor = strtol(p,&eptr,10); -        freeReplyObject(reply); -    } +    get_redis_version(c, &major, &minor);      test("Returns I/O error when the connection is lost: ");      reply = redisCommand(c,"QUIT"); | 
