summaryrefslogtreecommitdiff
path: root/hiredis.c
diff options
context:
space:
mode:
Diffstat (limited to 'hiredis.c')
-rw-r--r--hiredis.c70
1 files changed, 64 insertions, 6 deletions
diff --git a/hiredis.c b/hiredis.c
index e08c596..9d5f107 100644
--- a/hiredis.c
+++ b/hiredis.c
@@ -583,10 +583,13 @@ static int redisContextConnect(redisContext *c, const char *ip, int port) {
static redisContext *redisContextInit(redisReplyFunctions *fn) {
redisContext *c = malloc(sizeof(*c));
- c->fn = fn == NULL ? &defaultFunctions : fn;
- c->obuf = sdsempty();
c->error = NULL;
- c->reader = redisReplyReaderCreate(fn);
+ c->obuf = sdsempty();
+ c->fn = fn == NULL ? &defaultFunctions : fn;
+ c->reader = redisReplyReaderCreate(c->fn);
+ c->callbacks = calloc(sizeof(redisCallback),1);
+ c->clen = 1;
+ c->cpos = 0;
return c;
}
@@ -634,10 +637,31 @@ int redisBufferRead(redisContext *c) {
return HIREDIS_OK;
}
+static void redisPopCallback(redisContext *c) {
+ if (c->cpos > 1) {
+ memmove(&c->callbacks[0],&c->callbacks[1],(c->cpos-1)*sizeof(redisCallback));
+ }
+ c->cpos--;
+}
+
void *redisGetReply(redisContext *c) {
+ redisPopCallback(c);
return redisReplyReaderGetReply(c->reader);
}
+int redisProcessCallbacks(redisContext *c) {
+ void *reply;
+ redisCallback cb;
+
+ while ((reply = redisReplyReaderGetReply(c->reader)) != NULL) {
+ cb = c->callbacks[0];
+ if (cb.fn != NULL)
+ cb.fn(c,reply,cb.privdata);
+ redisPopCallback(c);
+ }
+ return HIREDIS_OK;
+}
+
/* Use this function to try and write the entire output buffer to the
* descriptor. Returns 1 when the entire buffer was written, 0 otherwise. */
int redisBufferWrite(redisContext *c, int *done) {
@@ -663,12 +687,12 @@ int redisBufferWrite(redisContext *c, int *done) {
return HIREDIS_OK;
}
-static void* redisCommandWrite(redisContext *c, char *str, size_t len) {
+static void* redisCommandWrite(redisContext *c, redisCallback *cb, char *str, size_t len) {
void *reply = NULL;
int wdone = 0;
c->obuf = sdscatlen(c->obuf,str,len);
- /* Only take action when this is a blocking context. */
+ /* Read reply immediately when the context is blocking. */
if (c->flags & HIREDIS_BLOCK) {
do { /* Write until done. */
if (redisBufferWrite(c,&wdone) == HIREDIS_ERR)
@@ -680,6 +704,20 @@ static void* redisCommandWrite(redisContext *c, char *str, size_t len) {
return c->error;
reply = redisGetReply(c);
} while (reply == NULL);
+ } else {
+ /* Make room for the callback. */
+ assert(c->cpos <= c->clen);
+ if (c->cpos == c->clen) {
+ c->clen++;
+ c->callbacks = realloc(c->callbacks,c->clen*sizeof(redisCallback));
+ }
+
+ if (cb != NULL) {
+ c->callbacks[c->cpos] = *cb;
+ } else {
+ memset(&c->callbacks[c->cpos],0,sizeof(redisCallback));
+ }
+ c->cpos++;
}
return reply;
}
@@ -695,7 +733,27 @@ void *redisCommand(redisContext *c, const char *format, ...) {
cmd = redisFormatCommand(format,ap);
va_end(ap);
- reply = redisCommandWrite(c,cmd,sdslen(cmd));
+ reply = redisCommandWrite(c,NULL,cmd,sdslen(cmd));
sdsfree(cmd);
return reply;
}
+
+/* Write a formatted command to the output buffer and register the provided
+ * callback function and argument. When this function is called in a
+ * non-blocking context, it is a no-op. Always returns NULL. */
+void *redisCommandWithCallback(redisContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...) {
+ va_list ap;
+ redisCallback cb = { fn, privdata };
+ sds cmd;
+
+ /* Skip if the context is blocking. */
+ if (c->flags & HIREDIS_BLOCK) return NULL;
+
+ va_start(ap,format);
+ cmd = redisFormatCommand(format,ap);
+ va_end(ap);
+
+ redisCommandWrite(c,&cb,cmd,sdslen(cmd));
+ sdsfree(cmd);
+ return NULL;
+}