From 0152f128072c0fe6b9c64cc50758192d2940f7cd Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Sun, 31 Oct 2010 21:20:47 +0100 Subject: Extracted code to support callbacks in an asynchronous context --- Makefile | 3 +- async.c | 177 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ async.h | 80 +++++++++++++++++++++++++++++ 3 files changed, 259 insertions(+), 1 deletion(-) create mode 100644 async.c create mode 100644 async.h diff --git a/Makefile b/Makefile index 3c1b651..ad8acaa 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ # Copyright (C) 2010 Salvatore Sanfilippo # This file is released under the BSD license, see the COPYING file -OBJ = anet.o hiredis.o sds.o +OBJ = anet.o hiredis.o sds.o async.o BINS = hiredis-example hiredis-test uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') @@ -41,6 +41,7 @@ test.o: test.c hiredis.h sds.h hiredis.o: hiredis.c hiredis.h sds.h anet.h sds.o: sds.c sds.h hiredis.o: hiredis.c hiredis.h sds.h anet.h +async.o: async.c async.h hiredis.h sds.h util.h ${DYLIBNAME}: ${OBJ} ${DYLIB_MAKE_CMD} diff --git a/async.c b/async.c new file mode 100644 index 0000000..9bf74c8 --- /dev/null +++ b/async.c @@ -0,0 +1,177 @@ +/* + * Copyright (c) 2009-2010, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include "async.h" +#include "sds.h" +#include "util.h" + +static redisAsyncContext *redisAsyncInitialize(redisContext *c) { + redisAsyncContext *ac = realloc(c,sizeof(redisAsyncContext)); + /* Set all bytes in the async part of the context to 0 */ + memset(ac+sizeof(redisContext),0,sizeof(redisAsyncContext)-sizeof(redisContext)); + return ac; +} + +redisAsyncContext *redisAsyncConnect(const char *ip, int port) { + redisContext *_c = redisConnectNonBlock(ip,port); + return redisAsyncInitialize(_c); +} + +int redisAsyncSetReplyObjectFunctions(redisAsyncContext *ac, redisReplyObjectFunctions *fn) { + redisContext *c = &(ac->c); + return redisSetReplyObjectFunctions(c,fn); +} + +/* Helper functions to push/shift callbacks */ +static void __redisPushCallback(redisCallbackList *list, redisCallback *cb) { + if (list->head == NULL) + list->head = cb; + if (list->tail != NULL) + list->tail->next = cb; + list->tail = cb; +} + +static redisCallback *__redisShiftCallback(redisCallbackList *list) { + redisCallback *cb = list->head; + if (cb != NULL) { + list->head = cb->next; + if (cb == list->tail) + list->tail = NULL; + } + return cb; +} + +/* This function should be called when the socket is readable. + * It processes all replies that can be read and executes their callbacks. + */ +void redisAsyncHandleRead(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + redisCallback *cb; + void *reply = NULL; + int status; + + if (redisBufferRead(c) == REDIS_ERR) { + // needs error handling + assert(NULL); + } else { + /* Always re-schedule reads */ + if (ac->evAddRead) ac->evAddRead(ac->data); + + while((status = redisGetReply(c,&reply)) == REDIS_OK) { + /* Abort when there are no more replies */ + if (reply == NULL) break; + + /* Shift callback and execute it */ + cb = __redisShiftCallback(&ac->replies); + assert(cb != NULL); + if (cb->fn != NULL) { + cb->fn(ac,reply,cb->privdata); + } else { + c->fn->freeObject(reply); + } + } + + // needs error handling + assert(status == REDIS_OK); + } +} + +void redisAsyncHandleWrite(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + int done = 0; + + if (redisBufferWrite(c,&done) == REDIS_ERR) { + // needs error handling + assert(NULL); + } else { + /* Continue writing when not done, stop writing otherwise */ + if (!done) { + if (ac->evAddWrite) ac->evAddWrite(ac->data); + } else { + if (ac->evDelWrite) ac->evDelWrite(ac->data); + } + + /* Always schedule reads when something was written */ + if (ac->evAddRead) ac->evAddRead(ac->data); + } +} + +/* Helper function for the redisAsyncCommand* family of functions. + * + * Write a formatted command to the output buffer and register the provided + * callback function with the context. + */ +static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, char *cmd, size_t len) { + redisContext *c = &(ac->c); + redisCallback *cb; + c->obuf = sdscatlen(c->obuf,cmd,len); + + /* Store callback */ + cb = calloc(1,sizeof(redisCallback)); + if (!cb) redisOOM(); + cb->fn = fn; + cb->privdata = privdata; + __redisPushCallback(&(ac->replies),cb); + + /* Always schedule a write when the write buffer is non-empty */ + if (ac->evAddWrite) ac->evAddWrite(ac->data); + + return REDIS_OK; +} + +int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) { + char *cmd; + int len; + int status; + len = redisvFormatCommand(&cmd,format,ap); + status = __redisAsyncCommand(ac,fn,privdata,cmd,len); + free(cmd); + return status; +} + +int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) { + va_list ap; + int status; + va_start(ap,format); + status = redisvAsyncCommand(ac,fn,privdata,format,ap); + va_end(ap); + return status; +} + +int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) { + char *cmd; + int len; + int status; + len = redisFormatCommandArgv(&cmd,argc,argv,argvlen); + status = __redisAsyncCommand(ac,fn,privdata,cmd,len); + free(cmd); + return status; +} diff --git a/async.h b/async.h new file mode 100644 index 0000000..e0ac8bf --- /dev/null +++ b/async.h @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2009-2010, Salvatore Sanfilippo + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __HIREDIS_ASYNC_H +#define __HIREDIS_ASYNC_H +#include "hiredis.h" + +struct redisAsyncContext; /* need forward declaration of redisAsyncContext */ + +/* Reply callback prototype and container */ +typedef void (redisCallbackFn)(struct redisAsyncContext*, redisReply*, void*); +typedef struct redisCallback { + struct redisCallback *next; /* simple singly linked list */ + redisCallbackFn *fn; + void *privdata; +} redisCallback; + +/* List of callbacks for either regular replies or pub/sub */ +typedef struct redisCallbackList { + redisCallback *head, *tail; +} redisCallbackList; + +/* Context for an async connection to Redis */ +typedef struct redisAsyncContext { + /* Hold the regular context, so it can be realloc'ed. */ + redisContext c; + + /* Called when the library expects to start reading/writing. + * The supplied functions should be idempotent. */ + void (*evAddRead)(void *privdata); + void (*evDelRead)(void *privdata); + void (*evAddWrite)(void *privdata); + void (*evDelWrite)(void *privdata); + void *data; + + /* Reply callbacks */ + redisCallbackList replies; +} redisAsyncContext; + +/* Functions that proxy to hiredis */ +redisAsyncContext *redisAsyncConnect(const char *ip, int port); +int redisAsyncSetReplyObjectFunctions(redisAsyncContext *c, redisReplyObjectFunctions *fn); + +/* Handle read/write events */ +void redisAsyncHandleRead(redisAsyncContext *ac); +void redisAsyncHandleWrite(redisAsyncContext *ac); + +/* Command functions for an async context. Write the command to the + * output buffer and register the provided callback. */ +int redisvAsyncCommand(redisAsyncContext *c, redisCallbackFn *fn, void *privdata, const char *format, va_list ap); +int redisAsyncCommand(redisAsyncContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...); +int redisAsyncCommandArgv(redisAsyncContext *c, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen); + +#endif -- cgit v1.2.3