From 3ac8ef927d86df1d7ff1a6546fe590d9bcc0bbd3 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 31 Dec 2010 13:46:08 +0100 Subject: Add callbacks to channel/pattern dictionaries --- async.c | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 8 deletions(-) diff --git a/async.c b/async.c index dbb70ec..3a5fe2d 100644 --- a/async.c +++ b/async.c @@ -31,6 +31,7 @@ #include #include +#include #include "async.h" #include "sds.h" #include "util.h" @@ -366,23 +367,78 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { } } -/* Helper function for the redisAsyncCommand* family of functions. - * - * Write a formatted command to the output buffer and register the provided - * callback function with the context. - */ +/* Sets a pointer to the first argument and its length starting at p. Returns + * the number of bytes to skip to get to the following argument. */ +static char *nextArgument(char *start, char **str, size_t *len) { + char *p = start; + if (p[0] != '$') { + p = strchr(p,'$'); + if (p == NULL) return NULL; + } + + *len = (int)strtol(p+1,NULL,10); + p = strchr(p,'\r'); + assert(p); + *str = p+2; + return p+2+(*len)+2; +} + +/* Helper function for the redisAsyncCommand* family of functions. Writes a + * formatted command to the output buffer and registers the provided callback + * function with the context. */ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) { redisContext *c = &(ac->c); redisCallback cb; + int pvariant, hasnext; + char *cstr, *astr; + size_t clen, alen; + char *p; + sds sname; /* Don't accept new commands when the connection is about to be closed. */ if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR; - __redisAppendCommand(c,cmd,len); - /* Store callback */ + /* Setup callback */ cb.fn = fn; cb.privdata = privdata; - __redisPushCallback(&ac->replies,&cb); + + /* Find out which command will be appended. */ + p = nextArgument(cmd,&cstr,&clen); + assert(p != NULL); + hasnext = (p[0] == '$'); + pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0; + cstr += pvariant; + clen -= pvariant; + + if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) { + c->flags |= REDIS_SUBSCRIBED; + + /* Add every channel/pattern to the list of subscription callbacks. */ + while ((p = nextArgument(p,&astr,&alen)) != NULL) { + sname = sdsnewlen(astr,alen); + if (pvariant) + dictReplace(ac->sub.patterns,sname,&cb); + else + dictReplace(ac->sub.channels,sname,&cb); + } + } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) { + /* It is only useful to call (P)UNSUBSCRIBE when the context is + * subscribed to one or more channels or patterns. */ + if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR; + + /* (P)UNSUBSCRIBE does not have its own response: every channel or + * pattern that is unsubscribed will receive a message. This means we + * should not append a callback function for this command. */ + } else { + if (c->flags & REDIS_SUBSCRIBED) + /* This will likely result in an error reply, but it needs to be + * received and passed to the callback. */ + __redisPushCallback(&ac->sub.invalid,&cb); + else + __redisPushCallback(&ac->replies,&cb); + } + + __redisAppendCommand(c,cmd,len); /* Always schedule a write when the write buffer is non-empty */ if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data); -- cgit v1.2.3