From 2e7d7cbabd32912342218078282fce92f6cc0ab6 Mon Sep 17 00:00:00 2001 From: Michael Grunder Date: Sun, 19 Jul 2020 18:54:42 -0700 Subject: Resp3 oob push support (#841) Proper support for RESP3 PUSH messages. By default, PUSH messages are now intercepted and the reply memory freed. This means existing code should work unchanged when connecting to Redis >= 6.0.0 even if `CLIENT TRACKING` were then enabled. Additionally, we define two callbacks users can configure if they wish to handle these messages in a custom way: void redisPushFn(void *privdata, void *reply); void redisAsyncPushFn(redisAsyncContext *ac, void *reply); See #825 --- hiredis.c | 46 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 4 deletions(-) (limited to 'hiredis.c') diff --git a/hiredis.c b/hiredis.c index 7eaa991..e9761ec 100644 --- a/hiredis.c +++ b/hiredis.c @@ -184,7 +184,8 @@ static void *createArrayObject(const redisReadTask *task, size_t elements) { parent = task->parent->obj; assert(parent->type == REDIS_REPLY_ARRAY || parent->type == REDIS_REPLY_MAP || - parent->type == REDIS_REPLY_SET); + parent->type == REDIS_REPLY_SET || + parent->type == REDIS_REPLY_PUSH); parent->element[task->idx] = r; } return r; @@ -679,6 +680,11 @@ redisReader *redisReaderCreate(void) { return redisReaderCreateWithFunctions(&defaultFunctions); } +static void redisPushAutoFree(void *privdata, void *reply) { + (void)privdata; + freeReplyObject(reply); +} + static redisContext *redisContextInit(const redisOptions *options) { redisContext *c; @@ -687,6 +693,14 @@ static redisContext *redisContextInit(const redisOptions *options) { return NULL; c->funcs = &redisContextDefaultFuncs; + + /* Set any user supplied RESP3 PUSH handler or use freeReplyObject + * as a default unless specifically flagged that we don't want one. */ + if (options->push_cb != NULL) + redisSetPushCallback(c, options->push_cb); + else if (!(options->options & REDIS_OPT_NO_PUSH_AUTOFREE)) + redisSetPushCallback(c, redisPushAutoFree); + c->obuf = sdsempty(); c->reader = redisReaderCreate(); c->fd = REDIS_INVALID_FD; @@ -773,7 +787,7 @@ redisContext *redisConnectWithOptions(const redisOptions *options) { c->flags |= REDIS_REUSEADDR; } if (options->options & REDIS_OPT_NOAUTOFREE) { - c->flags |= REDIS_NO_AUTO_FREE; + c->flags |= REDIS_NO_AUTO_FREE; } if (options->type == REDIS_CONN_TCP) { @@ -876,6 +890,13 @@ int redisEnableKeepAlive(redisContext *c) { return REDIS_OK; } +/* Set a user provided RESP3 PUSH handler and return any old one set. */ +redisPushFn *redisSetPushCallback(redisContext *c, redisPushFn *fn) { + redisPushFn *old = c->push_cb; + c->push_cb = fn; + return old; +} + /* Use this function to handle a read event on the descriptor. It will try * and read some bytes from the socket and feed them to the reply parser. * @@ -947,9 +968,21 @@ int redisGetReplyFromReader(redisContext *c, void **reply) { __redisSetError(c,c->reader->err,c->reader->errstr); return REDIS_ERR; } + return REDIS_OK; } +/* Internal helper that returns 1 if the reply was a RESP3 PUSH + * message and we handled it with a user-provided callback. */ +static int redisHandledPushReply(redisContext *c, void *reply) { + if (reply && c->push_cb && redisIsPushReply(reply)) { + c->push_cb(c->privdata, reply); + return 1; + } + + return 0; +} + int redisGetReply(redisContext *c, void **reply) { int wdone = 0; void *aux = NULL; @@ -970,8 +1003,13 @@ int redisGetReply(redisContext *c, void **reply) { do { if (redisBufferRead(c) == REDIS_ERR) return REDIS_ERR; - if (redisGetReplyFromReader(c,&aux) == REDIS_ERR) - return REDIS_ERR; + + /* We loop here in case the user has specified a RESP3 + * PUSH handler (e.g. for client tracking). */ + do { + if (redisGetReplyFromReader(c,&aux) == REDIS_ERR) + return REDIS_ERR; + } while (redisHandledPushReply(c, aux)); } while (aux == NULL); } -- cgit v1.2.3