diff options
author | Michael Grunder <michael.grunder@gmail.com> | 2020-07-19 18:54:42 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-19 18:54:42 -0700 |
commit | 2e7d7cbabd32912342218078282fce92f6cc0ab6 (patch) | |
tree | ba60c50c28b433aef6e128a67522085751acb6cb /async.c | |
parent | 1864e76ea7323fd8789d9c8b5b3c8ca27d4840a6 (diff) |
Resp3 oob push support (#841)
Proper support for RESP3 PUSH messages.
By default, PUSH messages are now intercepted and the reply memory freed.
This means existing code should work unchanged when connecting to Redis
>= 6.0.0 even if `CLIENT TRACKING` were then enabled.
Additionally, we define two callbacks users can configure if they wish to handle
these messages in a custom way:
void redisPushFn(void *privdata, void *reply);
void redisAsyncPushFn(redisAsyncContext *ac, void *reply);
See #825
Diffstat (limited to 'async.c')
-rw-r--r-- | async.c | 64 |
1 files changed, 61 insertions, 3 deletions
@@ -167,16 +167,26 @@ redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) { redisContext *c; redisAsyncContext *ac; + /* Clear any erroneously set sync callback and flag that we don't want to + * use freeReplyObject by default. */ + myOptions.push_cb = NULL; + myOptions.options |= REDIS_OPT_NO_PUSH_AUTOFREE; + myOptions.options |= REDIS_OPT_NONBLOCK; c = redisConnectWithOptions(&myOptions); if (c == NULL) { return NULL; } + ac = redisAsyncInitialize(c); if (ac == NULL) { redisFree(c); return NULL; } + + /* Set any configured async push handler */ + redisAsyncSetPushCallback(ac, myOptions.async_push_cb); + __redisAsyncCopyError(ac); return ac; } @@ -279,6 +289,14 @@ static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisRe } } +static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) { + if (ac->push_cb != NULL) { + ac->c.flags |= REDIS_IN_CALLBACK; + ac->push_cb(ac, reply); + ac->c.flags &= ~REDIS_IN_CALLBACK; + } +} + /* Helper function to free the context. */ static void __redisAsyncFree(redisAsyncContext *ac) { redisContext *c = &(ac->c); @@ -294,7 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) { while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL); - /* Run subscription callbacks callbacks with NULL reply */ + /* Run subscription callbacks with NULL reply */ if (ac->sub.channels) { it = dictGetIterator(ac->sub.channels); if (it != NULL) { @@ -459,6 +477,30 @@ oom: return REDIS_ERR; } +#define redisIsSpontaneousPushReply(r) \ + (redisIsPushReply(r) && !redisIsSubscribeReply(r)) + +static int redisIsSubscribeReply(redisReply *reply) { + char *str; + size_t len, off; + + /* We will always have at least one string with the subscribe/message type */ + if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING || + reply->element[0]->len < sizeof("message") - 1) + { + return 0; + } + + /* Get the string/len moving past 'p' if needed */ + off = tolower(reply->element[0]->str[0]) == 'p'; + str = reply->element[0]->str + off; + len = reply->element[0]->len - off; + + return !strncasecmp(str, "subscribe", len) || + !strncasecmp(str, "message", len); + +} + void redisProcessCallbacks(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb = {NULL, NULL, 0, NULL}; @@ -485,8 +527,18 @@ void redisProcessCallbacks(redisAsyncContext *ac) { break; } - /* Even if the context is subscribed, pending regular callbacks will - * get a reply before pub/sub messages arrive. */ + /* Send any non-subscribe related PUSH messages to our PUSH handler + * while allowing subscribe related PUSH messages to pass through. + * This allows existing code to be backward compatible and work in + * either RESP2 or RESP3 mode. */ + if (redisIsSpontaneousPushReply(reply)) { + __redisRunPushCallback(ac, reply); + c->reader->fn->freeObject(reply); + continue; + } + + /* Even if the context is subscribed, pending regular + * callbacks will get a reply before pub/sub messages arrive. */ if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { /* * A spontaneous reply in a not-subscribed context can be the error @@ -809,6 +861,12 @@ int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void return status; } +redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) { + redisAsyncPushFn *old = ac->push_cb; + ac->push_cb = fn; + return old; +} + int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) { if (!ac->c.timeout) { ac->c.timeout = hi_calloc(1, sizeof(tv)); |