summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPieter Noordhuis <pcnoordhuis@gmail.com>2010-09-25 12:06:00 +0200
committerPieter Noordhuis <pcnoordhuis@gmail.com>2010-09-25 12:06:00 +0200
commitbb668e1b9450bb86da4ea2cff8ef3a0ea146f0e7 (patch)
tree3617c043a5a2977220cee6dab7c279890705d87c
parentde9c172b50e8c28055dc5241585a0bcb690582b9 (diff)
Support callback functions in a non-blocking context
-rw-r--r--hiredis.c70
-rw-r--r--hiredis.h23
2 files changed, 83 insertions, 10 deletions
diff --git a/hiredis.c b/hiredis.c
index e08c596..9d5f107 100644
--- a/hiredis.c
+++ b/hiredis.c
@@ -583,10 +583,13 @@ static int redisContextConnect(redisContext *c, const char *ip, int port) {
static redisContext *redisContextInit(redisReplyFunctions *fn) {
redisContext *c = malloc(sizeof(*c));
- c->fn = fn == NULL ? &defaultFunctions : fn;
- c->obuf = sdsempty();
c->error = NULL;
- c->reader = redisReplyReaderCreate(fn);
+ c->obuf = sdsempty();
+ c->fn = fn == NULL ? &defaultFunctions : fn;
+ c->reader = redisReplyReaderCreate(c->fn);
+ c->callbacks = calloc(sizeof(redisCallback),1);
+ c->clen = 1;
+ c->cpos = 0;
return c;
}
@@ -634,10 +637,31 @@ int redisBufferRead(redisContext *c) {
return HIREDIS_OK;
}
+static void redisPopCallback(redisContext *c) {
+ if (c->cpos > 1) {
+ memmove(&c->callbacks[0],&c->callbacks[1],(c->cpos-1)*sizeof(redisCallback));
+ }
+ c->cpos--;
+}
+
void *redisGetReply(redisContext *c) {
+ redisPopCallback(c);
return redisReplyReaderGetReply(c->reader);
}
+int redisProcessCallbacks(redisContext *c) {
+ void *reply;
+ redisCallback cb;
+
+ while ((reply = redisReplyReaderGetReply(c->reader)) != NULL) {
+ cb = c->callbacks[0];
+ if (cb.fn != NULL)
+ cb.fn(c,reply,cb.privdata);
+ redisPopCallback(c);
+ }
+ return HIREDIS_OK;
+}
+
/* Use this function to try and write the entire output buffer to the
* descriptor. Returns 1 when the entire buffer was written, 0 otherwise. */
int redisBufferWrite(redisContext *c, int *done) {
@@ -663,12 +687,12 @@ int redisBufferWrite(redisContext *c, int *done) {
return HIREDIS_OK;
}
-static void* redisCommandWrite(redisContext *c, char *str, size_t len) {
+static void* redisCommandWrite(redisContext *c, redisCallback *cb, char *str, size_t len) {
void *reply = NULL;
int wdone = 0;
c->obuf = sdscatlen(c->obuf,str,len);
- /* Only take action when this is a blocking context. */
+ /* Read reply immediately when the context is blocking. */
if (c->flags & HIREDIS_BLOCK) {
do { /* Write until done. */
if (redisBufferWrite(c,&wdone) == HIREDIS_ERR)
@@ -680,6 +704,20 @@ static void* redisCommandWrite(redisContext *c, char *str, size_t len) {
return c->error;
reply = redisGetReply(c);
} while (reply == NULL);
+ } else {
+ /* Make room for the callback. */
+ assert(c->cpos <= c->clen);
+ if (c->cpos == c->clen) {
+ c->clen++;
+ c->callbacks = realloc(c->callbacks,c->clen*sizeof(redisCallback));
+ }
+
+ if (cb != NULL) {
+ c->callbacks[c->cpos] = *cb;
+ } else {
+ memset(&c->callbacks[c->cpos],0,sizeof(redisCallback));
+ }
+ c->cpos++;
}
return reply;
}
@@ -695,7 +733,27 @@ void *redisCommand(redisContext *c, const char *format, ...) {
cmd = redisFormatCommand(format,ap);
va_end(ap);
- reply = redisCommandWrite(c,cmd,sdslen(cmd));
+ reply = redisCommandWrite(c,NULL,cmd,sdslen(cmd));
sdsfree(cmd);
return reply;
}
+
+/* Write a formatted command to the output buffer and register the provided
+ * callback function and argument. When this function is called in a
+ * non-blocking context, it is a no-op. Always returns NULL. */
+void *redisCommandWithCallback(redisContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...) {
+ va_list ap;
+ redisCallback cb = { fn, privdata };
+ sds cmd;
+
+ /* Skip if the context is blocking. */
+ if (c->flags & HIREDIS_BLOCK) return NULL;
+
+ va_start(ap,format);
+ cmd = redisFormatCommand(format,ap);
+ va_end(ap);
+
+ redisCommandWrite(c,&cb,cmd,sdslen(cmd));
+ sdsfree(cmd);
+ return NULL;
+}
diff --git a/hiredis.h b/hiredis.h
index 60615b1..85ef87e 100644
--- a/hiredis.h
+++ b/hiredis.h
@@ -71,14 +71,27 @@ typedef struct redisReplyObjectFunctions {
void (*freeObject)(void*);
} redisReplyFunctions;
+/* Callback prototype */
+struct redisContext; /* needs forward declaration of redisContext */
+typedef void redisCallbackFn(struct redisContext*, redisReply*, void*);
+
+/* Callback container */
+typedef struct redisCallback {
+ redisCallbackFn *fn;
+ void *privdata;
+} redisCallback;
+
/* Context for a connection to Redis */
typedef struct redisContext {
int fd;
int flags;
- char *error; /* error object is set when in erronous state */
- void *reader; /* reply reader */
- sds obuf; /* output buffer */
- redisReplyFunctions *fn; /* functions for reply buildup */
+ void *error; /* Error object is set when in erronous state */
+ sds obuf; /* Write buffer */
+ redisReplyFunctions *fn;
+ void *reader;
+ redisCallback *callbacks;
+ int cpos;
+ int clen;
} redisContext;
void freeReplyObject(void *reply);
@@ -93,7 +106,9 @@ redisContext *redisConnectNonBlock(const char *ip, int port, redisReplyFunctions
int redisBufferRead(redisContext *c);
int redisBufferWrite(redisContext *c, int *done);
void *redisGetReply(redisContext *c);
+int redisProcessCallbacks(redisContext *c);
void *redisCommand(redisContext *c, const char *format, ...);
+void *redisCommandWithCallback(redisContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...);
#endif