summaryrefslogtreecommitdiff
path: root/hiredis.c
diff options
context:
space:
mode:
authorMichael Grunder <michael.grunder@gmail.com>2020-07-19 18:54:42 -0700
committerGitHub <noreply@github.com>2020-07-19 18:54:42 -0700
commit2e7d7cbabd32912342218078282fce92f6cc0ab6 (patch)
treeba60c50c28b433aef6e128a67522085751acb6cb /hiredis.c
parent1864e76ea7323fd8789d9c8b5b3c8ca27d4840a6 (diff)
Resp3 oob push support (#841)
Proper support for RESP3 PUSH messages. By default, PUSH messages are now intercepted and the reply memory freed. This means existing code should work unchanged when connecting to Redis >= 6.0.0 even if `CLIENT TRACKING` were then enabled. Additionally, we define two callbacks users can configure if they wish to handle these messages in a custom way: void redisPushFn(void *privdata, void *reply); void redisAsyncPushFn(redisAsyncContext *ac, void *reply); See #825
Diffstat (limited to 'hiredis.c')
-rw-r--r--hiredis.c46
1 files changed, 42 insertions, 4 deletions
diff --git a/hiredis.c b/hiredis.c
index 7eaa991..e9761ec 100644
--- a/hiredis.c
+++ b/hiredis.c
@@ -184,7 +184,8 @@ static void *createArrayObject(const redisReadTask *task, size_t elements) {
parent = task->parent->obj;
assert(parent->type == REDIS_REPLY_ARRAY ||
parent->type == REDIS_REPLY_MAP ||
- parent->type == REDIS_REPLY_SET);
+ parent->type == REDIS_REPLY_SET ||
+ parent->type == REDIS_REPLY_PUSH);
parent->element[task->idx] = r;
}
return r;
@@ -679,6 +680,11 @@ redisReader *redisReaderCreate(void) {
return redisReaderCreateWithFunctions(&defaultFunctions);
}
+static void redisPushAutoFree(void *privdata, void *reply) {
+ (void)privdata;
+ freeReplyObject(reply);
+}
+
static redisContext *redisContextInit(const redisOptions *options) {
redisContext *c;
@@ -687,6 +693,14 @@ static redisContext *redisContextInit(const redisOptions *options) {
return NULL;
c->funcs = &redisContextDefaultFuncs;
+
+ /* Set any user supplied RESP3 PUSH handler or use freeReplyObject
+ * as a default unless specifically flagged that we don't want one. */
+ if (options->push_cb != NULL)
+ redisSetPushCallback(c, options->push_cb);
+ else if (!(options->options & REDIS_OPT_NO_PUSH_AUTOFREE))
+ redisSetPushCallback(c, redisPushAutoFree);
+
c->obuf = sdsempty();
c->reader = redisReaderCreate();
c->fd = REDIS_INVALID_FD;
@@ -773,7 +787,7 @@ redisContext *redisConnectWithOptions(const redisOptions *options) {
c->flags |= REDIS_REUSEADDR;
}
if (options->options & REDIS_OPT_NOAUTOFREE) {
- c->flags |= REDIS_NO_AUTO_FREE;
+ c->flags |= REDIS_NO_AUTO_FREE;
}
if (options->type == REDIS_CONN_TCP) {
@@ -876,6 +890,13 @@ int redisEnableKeepAlive(redisContext *c) {
return REDIS_OK;
}
+/* Set a user provided RESP3 PUSH handler and return any old one set. */
+redisPushFn *redisSetPushCallback(redisContext *c, redisPushFn *fn) {
+ redisPushFn *old = c->push_cb;
+ c->push_cb = fn;
+ return old;
+}
+
/* Use this function to handle a read event on the descriptor. It will try
* and read some bytes from the socket and feed them to the reply parser.
*
@@ -947,9 +968,21 @@ int redisGetReplyFromReader(redisContext *c, void **reply) {
__redisSetError(c,c->reader->err,c->reader->errstr);
return REDIS_ERR;
}
+
return REDIS_OK;
}
+/* Internal helper that returns 1 if the reply was a RESP3 PUSH
+ * message and we handled it with a user-provided callback. */
+static int redisHandledPushReply(redisContext *c, void *reply) {
+ if (reply && c->push_cb && redisIsPushReply(reply)) {
+ c->push_cb(c->privdata, reply);
+ return 1;
+ }
+
+ return 0;
+}
+
int redisGetReply(redisContext *c, void **reply) {
int wdone = 0;
void *aux = NULL;
@@ -970,8 +1003,13 @@ int redisGetReply(redisContext *c, void **reply) {
do {
if (redisBufferRead(c) == REDIS_ERR)
return REDIS_ERR;
- if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
- return REDIS_ERR;
+
+ /* We loop here in case the user has specified a RESP3
+ * PUSH handler (e.g. for client tracking). */
+ do {
+ if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
+ return REDIS_ERR;
+ } while (redisHandledPushReply(c, aux));
} while (aux == NULL);
}