summaryrefslogtreecommitdiff
path: root/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'async.c')
-rw-r--r--async.c64
1 files changed, 61 insertions, 3 deletions
diff --git a/async.c b/async.c
index 2cff0b1..1073d8d 100644
--- a/async.c
+++ b/async.c
@@ -167,16 +167,26 @@ redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) {
redisContext *c;
redisAsyncContext *ac;
+ /* Clear any erroneously set sync callback and flag that we don't want to
+ * use freeReplyObject by default. */
+ myOptions.push_cb = NULL;
+ myOptions.options |= REDIS_OPT_NO_PUSH_AUTOFREE;
+
myOptions.options |= REDIS_OPT_NONBLOCK;
c = redisConnectWithOptions(&myOptions);
if (c == NULL) {
return NULL;
}
+
ac = redisAsyncInitialize(c);
if (ac == NULL) {
redisFree(c);
return NULL;
}
+
+ /* Set any configured async push handler */
+ redisAsyncSetPushCallback(ac, myOptions.async_push_cb);
+
__redisAsyncCopyError(ac);
return ac;
}
@@ -279,6 +289,14 @@ static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisRe
}
}
+static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) {
+ if (ac->push_cb != NULL) {
+ ac->c.flags |= REDIS_IN_CALLBACK;
+ ac->push_cb(ac, reply);
+ ac->c.flags &= ~REDIS_IN_CALLBACK;
+ }
+}
+
/* Helper function to free the context. */
static void __redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
@@ -294,7 +312,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
__redisRunCallback(ac,&cb,NULL);
- /* Run subscription callbacks callbacks with NULL reply */
+ /* Run subscription callbacks with NULL reply */
if (ac->sub.channels) {
it = dictGetIterator(ac->sub.channels);
if (it != NULL) {
@@ -459,6 +477,30 @@ oom:
return REDIS_ERR;
}
+#define redisIsSpontaneousPushReply(r) \
+ (redisIsPushReply(r) && !redisIsSubscribeReply(r))
+
+static int redisIsSubscribeReply(redisReply *reply) {
+ char *str;
+ size_t len, off;
+
+ /* We will always have at least one string with the subscribe/message type */
+ if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING ||
+ reply->element[0]->len < sizeof("message") - 1)
+ {
+ return 0;
+ }
+
+ /* Get the string/len moving past 'p' if needed */
+ off = tolower(reply->element[0]->str[0]) == 'p';
+ str = reply->element[0]->str + off;
+ len = reply->element[0]->len - off;
+
+ return !strncasecmp(str, "subscribe", len) ||
+ !strncasecmp(str, "message", len);
+
+}
+
void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb = {NULL, NULL, 0, NULL};
@@ -485,8 +527,18 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
break;
}
- /* Even if the context is subscribed, pending regular callbacks will
- * get a reply before pub/sub messages arrive. */
+ /* Send any non-subscribe related PUSH messages to our PUSH handler
+ * while allowing subscribe related PUSH messages to pass through.
+ * This allows existing code to be backward compatible and work in
+ * either RESP2 or RESP3 mode. */
+ if (redisIsSpontaneousPushReply(reply)) {
+ __redisRunPushCallback(ac, reply);
+ c->reader->fn->freeObject(reply);
+ continue;
+ }
+
+ /* Even if the context is subscribed, pending regular
+ * callbacks will get a reply before pub/sub messages arrive. */
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
/*
* A spontaneous reply in a not-subscribed context can be the error
@@ -809,6 +861,12 @@ int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
return status;
}
+redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) {
+ redisAsyncPushFn *old = ac->push_cb;
+ ac->push_cb = fn;
+ return old;
+}
+
int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
if (!ac->c.timeout) {
ac->c.timeout = hi_calloc(1, sizeof(tv));