diff options
author | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-12-31 13:46:08 +0100 |
---|---|---|
committer | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-12-31 15:49:26 +0100 |
commit | 3ac8ef927d86df1d7ff1a6546fe590d9bcc0bbd3 (patch) | |
tree | cfeaa9fd72e2a7641135f4d531c6f10ea90a360e | |
parent | e6621d05b4b7a2bcc866757255297012b749dbac (diff) |
Add callbacks to channel/pattern dictionaries
-rw-r--r-- | async.c | 72 |
1 files changed, 64 insertions, 8 deletions
@@ -31,6 +31,7 @@ #include <string.h> #include <assert.h> +#include <ctype.h> #include "async.h" #include "sds.h" #include "util.h" @@ -366,23 +367,78 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { } } -/* Helper function for the redisAsyncCommand* family of functions. - * - * Write a formatted command to the output buffer and register the provided - * callback function with the context. - */ +/* Sets a pointer to the first argument and its length starting at p. Returns + * the number of bytes to skip to get to the following argument. */ +static char *nextArgument(char *start, char **str, size_t *len) { + char *p = start; + if (p[0] != '$') { + p = strchr(p,'$'); + if (p == NULL) return NULL; + } + + *len = (int)strtol(p+1,NULL,10); + p = strchr(p,'\r'); + assert(p); + *str = p+2; + return p+2+(*len)+2; +} + +/* Helper function for the redisAsyncCommand* family of functions. Writes a + * formatted command to the output buffer and registers the provided callback + * function with the context. */ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) { redisContext *c = &(ac->c); redisCallback cb; + int pvariant, hasnext; + char *cstr, *astr; + size_t clen, alen; + char *p; + sds sname; /* Don't accept new commands when the connection is about to be closed. */ if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR; - __redisAppendCommand(c,cmd,len); - /* Store callback */ + /* Setup callback */ cb.fn = fn; cb.privdata = privdata; - __redisPushCallback(&ac->replies,&cb); + + /* Find out which command will be appended. */ + p = nextArgument(cmd,&cstr,&clen); + assert(p != NULL); + hasnext = (p[0] == '$'); + pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0; + cstr += pvariant; + clen -= pvariant; + + if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) { + c->flags |= REDIS_SUBSCRIBED; + + /* Add every channel/pattern to the list of subscription callbacks. */ + while ((p = nextArgument(p,&astr,&alen)) != NULL) { + sname = sdsnewlen(astr,alen); + if (pvariant) + dictReplace(ac->sub.patterns,sname,&cb); + else + dictReplace(ac->sub.channels,sname,&cb); + } + } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) { + /* It is only useful to call (P)UNSUBSCRIBE when the context is + * subscribed to one or more channels or patterns. */ + if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR; + + /* (P)UNSUBSCRIBE does not have its own response: every channel or + * pattern that is unsubscribed will receive a message. This means we + * should not append a callback function for this command. */ + } else { + if (c->flags & REDIS_SUBSCRIBED) + /* This will likely result in an error reply, but it needs to be + * received and passed to the callback. */ + __redisPushCallback(&ac->sub.invalid,&cb); + else + __redisPushCallback(&ac->replies,&cb); + } + + __redisAppendCommand(c,cmd,len); /* Always schedule a write when the write buffer is non-empty */ if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data); |