summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--async.c72
1 files changed, 64 insertions, 8 deletions
diff --git a/async.c b/async.c
index dbb70ec..3a5fe2d 100644
--- a/async.c
+++ b/async.c
@@ -31,6 +31,7 @@
#include <string.h>
#include <assert.h>
+#include <ctype.h>
#include "async.h"
#include "sds.h"
#include "util.h"
@@ -366,23 +367,78 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
}
}
-/* Helper function for the redisAsyncCommand* family of functions.
- *
- * Write a formatted command to the output buffer and register the provided
- * callback function with the context.
- */
+/* Sets a pointer to the first argument and its length starting at p. Returns
+ * the number of bytes to skip to get to the following argument. */
+static char *nextArgument(char *start, char **str, size_t *len) {
+ char *p = start;
+ if (p[0] != '$') {
+ p = strchr(p,'$');
+ if (p == NULL) return NULL;
+ }
+
+ *len = (int)strtol(p+1,NULL,10);
+ p = strchr(p,'\r');
+ assert(p);
+ *str = p+2;
+ return p+2+(*len)+2;
+}
+
+/* Helper function for the redisAsyncCommand* family of functions. Writes a
+ * formatted command to the output buffer and registers the provided callback
+ * function with the context. */
static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) {
redisContext *c = &(ac->c);
redisCallback cb;
+ int pvariant, hasnext;
+ char *cstr, *astr;
+ size_t clen, alen;
+ char *p;
+ sds sname;
/* Don't accept new commands when the connection is about to be closed. */
if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
- __redisAppendCommand(c,cmd,len);
- /* Store callback */
+ /* Setup callback */
cb.fn = fn;
cb.privdata = privdata;
- __redisPushCallback(&ac->replies,&cb);
+
+ /* Find out which command will be appended. */
+ p = nextArgument(cmd,&cstr,&clen);
+ assert(p != NULL);
+ hasnext = (p[0] == '$');
+ pvariant = (tolower(cstr[0]) == 'p') ? 1 : 0;
+ cstr += pvariant;
+ clen -= pvariant;
+
+ if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
+ c->flags |= REDIS_SUBSCRIBED;
+
+ /* Add every channel/pattern to the list of subscription callbacks. */
+ while ((p = nextArgument(p,&astr,&alen)) != NULL) {
+ sname = sdsnewlen(astr,alen);
+ if (pvariant)
+ dictReplace(ac->sub.patterns,sname,&cb);
+ else
+ dictReplace(ac->sub.channels,sname,&cb);
+ }
+ } else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
+ /* It is only useful to call (P)UNSUBSCRIBE when the context is
+ * subscribed to one or more channels or patterns. */
+ if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
+
+ /* (P)UNSUBSCRIBE does not have its own response: every channel or
+ * pattern that is unsubscribed will receive a message. This means we
+ * should not append a callback function for this command. */
+ } else {
+ if (c->flags & REDIS_SUBSCRIBED)
+ /* This will likely result in an error reply, but it needs to be
+ * received and passed to the callback. */
+ __redisPushCallback(&ac->sub.invalid,&cb);
+ else
+ __redisPushCallback(&ac->replies,&cb);
+ }
+
+ __redisAppendCommand(c,cmd,len);
/* Always schedule a write when the write buffer is non-empty */
if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data);