diff options
Diffstat (limited to 'hiredis.c')
-rw-r--r-- | hiredis.c | 70 |
1 files changed, 64 insertions, 6 deletions
@@ -583,10 +583,13 @@ static int redisContextConnect(redisContext *c, const char *ip, int port) { static redisContext *redisContextInit(redisReplyFunctions *fn) { redisContext *c = malloc(sizeof(*c)); - c->fn = fn == NULL ? &defaultFunctions : fn; - c->obuf = sdsempty(); c->error = NULL; - c->reader = redisReplyReaderCreate(fn); + c->obuf = sdsempty(); + c->fn = fn == NULL ? &defaultFunctions : fn; + c->reader = redisReplyReaderCreate(c->fn); + c->callbacks = calloc(sizeof(redisCallback),1); + c->clen = 1; + c->cpos = 0; return c; } @@ -634,10 +637,31 @@ int redisBufferRead(redisContext *c) { return HIREDIS_OK; } +static void redisPopCallback(redisContext *c) { + if (c->cpos > 1) { + memmove(&c->callbacks[0],&c->callbacks[1],(c->cpos-1)*sizeof(redisCallback)); + } + c->cpos--; +} + void *redisGetReply(redisContext *c) { + redisPopCallback(c); return redisReplyReaderGetReply(c->reader); } +int redisProcessCallbacks(redisContext *c) { + void *reply; + redisCallback cb; + + while ((reply = redisReplyReaderGetReply(c->reader)) != NULL) { + cb = c->callbacks[0]; + if (cb.fn != NULL) + cb.fn(c,reply,cb.privdata); + redisPopCallback(c); + } + return HIREDIS_OK; +} + /* Use this function to try and write the entire output buffer to the * descriptor. Returns 1 when the entire buffer was written, 0 otherwise. */ int redisBufferWrite(redisContext *c, int *done) { @@ -663,12 +687,12 @@ int redisBufferWrite(redisContext *c, int *done) { return HIREDIS_OK; } -static void* redisCommandWrite(redisContext *c, char *str, size_t len) { +static void* redisCommandWrite(redisContext *c, redisCallback *cb, char *str, size_t len) { void *reply = NULL; int wdone = 0; c->obuf = sdscatlen(c->obuf,str,len); - /* Only take action when this is a blocking context. */ + /* Read reply immediately when the context is blocking. */ if (c->flags & HIREDIS_BLOCK) { do { /* Write until done. */ if (redisBufferWrite(c,&wdone) == HIREDIS_ERR) @@ -680,6 +704,20 @@ static void* redisCommandWrite(redisContext *c, char *str, size_t len) { return c->error; reply = redisGetReply(c); } while (reply == NULL); + } else { + /* Make room for the callback. */ + assert(c->cpos <= c->clen); + if (c->cpos == c->clen) { + c->clen++; + c->callbacks = realloc(c->callbacks,c->clen*sizeof(redisCallback)); + } + + if (cb != NULL) { + c->callbacks[c->cpos] = *cb; + } else { + memset(&c->callbacks[c->cpos],0,sizeof(redisCallback)); + } + c->cpos++; } return reply; } @@ -695,7 +733,27 @@ void *redisCommand(redisContext *c, const char *format, ...) { cmd = redisFormatCommand(format,ap); va_end(ap); - reply = redisCommandWrite(c,cmd,sdslen(cmd)); + reply = redisCommandWrite(c,NULL,cmd,sdslen(cmd)); sdsfree(cmd); return reply; } + +/* Write a formatted command to the output buffer and register the provided + * callback function and argument. When this function is called in a + * non-blocking context, it is a no-op. Always returns NULL. */ +void *redisCommandWithCallback(redisContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...) { + va_list ap; + redisCallback cb = { fn, privdata }; + sds cmd; + + /* Skip if the context is blocking. */ + if (c->flags & HIREDIS_BLOCK) return NULL; + + va_start(ap,format); + cmd = redisFormatCommand(format,ap); + va_end(ap); + + redisCommandWrite(c,&cb,cmd,sdslen(cmd)); + sdsfree(cmd); + return NULL; +} |