summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--async.c50
1 files changed, 48 insertions, 2 deletions
diff --git a/async.c b/async.c
index 3a5fe2d..781b56f 100644
--- a/async.c
+++ b/async.c
@@ -273,6 +273,53 @@ void redisAsyncDisconnect(redisAsyncContext *ac) {
__redisAsyncDisconnect(ac);
}
+static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
+ redisContext *c = &(ac->c);
+ dict *callbacks;
+ dictEntry *de;
+ int pvariant;
+ char *stype;
+ sds sname;
+
+ /* Custom reply functions are not supported for pub/sub. This will fail
+ * very hard when they are used... */
+ if (reply->type == REDIS_REPLY_ARRAY) {
+ assert(reply->elements >= 2);
+ assert(reply->element[0]->type == REDIS_REPLY_STRING);
+ stype = reply->element[0]->str;
+ pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
+
+ if (pvariant)
+ callbacks = ac->sub.patterns;
+ else
+ callbacks = ac->sub.channels;
+
+ /* Locate the right callback */
+ assert(reply->element[1]->type == REDIS_REPLY_STRING);
+ sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
+ de = dictFind(callbacks,sname);
+ if (de != NULL) {
+ memcpy(dstcb,dictGetEntryVal(de),sizeof(*dstcb));
+
+ /* If this is an unsubscribe message, remove it. */
+ if (strcasecmp(stype+pvariant,"unsubscribe") == 0) {
+ dictDelete(callbacks,sname);
+
+ /* If this was the last unsubscribe message, revert to
+ * non-subscribe mode. */
+ assert(reply->element[2+pvariant]->type == REDIS_REPLY_INTEGER);
+ if (reply->element[2+pvariant]->integer == 0)
+ c->flags &= ~REDIS_SUBSCRIBED;
+ }
+ }
+ sdsfree(sname);
+ } else {
+ /* Shift callback for invalid commands. */
+ __redisShiftCallback(&ac->sub.invalid,dstcb);
+ }
+ return REDIS_OK;
+}
+
void redisProcessCallbacks(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb;
@@ -298,8 +345,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
/* No more regular callbacks, the context *must* be subscribed. */
assert(c->flags & REDIS_SUBSCRIBED);
-
- /* TODO: find the right callback for pub/sub message. */
+ __redisGetSubscribeCallback(ac,reply,&cb);
}
if (cb.fn != NULL) {