From b1bedf5c6df6290249850f73912d9fc566782b64 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 20 Sep 2010 22:04:35 +0200 Subject: Use a set of function pointers for building the reply Allows libraries to wrap the reply parsing code and build the reply in a streaming fashion. Reply objects can immediately be initialized to the required type without having to convert an intermediary format. --- hiredis.c | 51 ++++++++++++++++++++++++++++++++++++++------------- hiredis.h | 14 +++++++++++--- test.c | 4 ++-- 3 files changed, 51 insertions(+), 18 deletions(-) diff --git a/hiredis.c b/hiredis.c index fec27ec..30e5838 100644 --- a/hiredis.c +++ b/hiredis.c @@ -39,6 +39,7 @@ #include "sds.h" typedef struct redisReader { + struct redisReplyObjectFunctions *fn; void *reply; /* holds temporary reply */ sds buf; /* read buffer */ @@ -58,6 +59,15 @@ static void *createIntegerObject(redisReadTask *task, long long value); static void *createNilObject(redisReadTask *task); static void redisSetReplyReaderError(redisReader *r, void *obj); +/* Default set of functions to build the reply. */ +static redisReplyFunctions defaultFunctions = { + createStringObject, + createArrayObject, + createIntegerObject, + createNilObject, + freeReplyObject +}; + /* We simply abort on out of memory */ static void redisOOM(void) { fprintf(stderr,"Out of memory in hiredis.c"); @@ -89,7 +99,8 @@ static redisReply *createReplyObject(int type, sds reply) { } /* Free a reply object */ -void freeReplyObject(redisReply *r) { +void freeReplyObject(void *reply) { + redisReply *r = reply; size_t j; switch(r->type) { @@ -116,7 +127,12 @@ static void *createErrorObject(redisReader *context, const char *fmt, ...) { va_start(ap,fmt); err = sdscatvprintf(sdsempty(),fmt,ap); va_end(ap); - obj = createStringObject(&t,err,sdslen(err)); + + /* Use the context of the reader if it is provided. */ + if (context) + obj = context->fn->createString(&t,err,sdslen(err)); + else + obj = createStringObject(&t,err,sdslen(err)); sdsfree(err); return obj; } @@ -205,9 +221,9 @@ static int processLineItem(redisReader *r) { if ((p = readLine(r,&len)) != NULL) { if (cur->type == REDIS_REPLY_INTEGER) { - obj = createIntegerObject(cur,strtoll(p,NULL,10)); + obj = r->fn->createInteger(cur,strtoll(p,NULL,10)); } else { - obj = createStringObject(cur,p,len); + obj = r->fn->createString(cur,p,len); } /* If there is no root yet, register this object as root. */ @@ -235,12 +251,12 @@ static int processBulkItem(redisReader *r) { if (len < 0) { /* The nil object can always be created. */ - obj = createNilObject(cur); + obj = r->fn->createNil(cur); } else { /* Only continue when the buffer contains the entire bulk item. */ bytelen += len+2; /* include \r\n */ if (r->pos+bytelen <= sdslen(r->buf)) { - obj = createStringObject(cur,s+2,len); + obj = r->fn->createString(cur,s+2,len); } } @@ -265,9 +281,9 @@ static int processMultiBulkItem(redisReader *r) { if ((p = readLine(r,NULL)) != NULL) { elements = strtol(p,NULL,10); if (elements == -1) { - obj = createNilObject(cur); + obj = r->fn->createNil(cur); } else { - obj = createArrayObject(cur,elements); + obj = r->fn->createArray(cur,elements); /* Modify read list when there are more than 0 elements. */ if (elements > 0) { @@ -358,7 +374,7 @@ static int processItem(redisReader *r) { #define READ_BUFFER_SIZE 2048 static redisReply *redisReadReply(int fd) { - void *reader = redisCreateReplyReader(); + void *reader = redisCreateReplyReader(&defaultFunctions); redisReply *reply; char buf[1024]; int nread; @@ -376,17 +392,27 @@ static redisReply *redisReadReply(int fd) { return reply; } -void *redisCreateReplyReader() { +void *redisCreateReplyReader(redisReplyFunctions *fn) { redisReader *r = calloc(sizeof(redisReader),1); + r->fn = fn == NULL ? &defaultFunctions : fn; r->buf = sdsempty(); r->rlist = malloc(sizeof(redisReadTask)*1); return r; } +/* External libraries wrapping hiredis might need access to the temporary + * variable while the reply is built up. When the reader contains an + * object in between receiving some bytes to parse, this object might + * otherwise be free'd by garbage collection. */ +void *redisGetReplyObjectFromReplyReader(void *reader) { + redisReader *r = reader; + return r->reply; +} + void redisFreeReplyReader(void *reader) { redisReader *r = reader; if (r->reply != NULL) - freeReplyObject(r->reply); + r->fn->freeObject(r->reply); if (r->buf != NULL) sdsfree(r->buf); if (r->rlist != NULL) @@ -403,7 +429,7 @@ int redisIsReplyReaderEmpty(void *reader) { static void redisSetReplyReaderError(redisReader *r, void *obj) { if (r->reply != NULL) - freeReplyObject(r->reply); + r->fn->freeObject(r->reply); /* Clear remaining buffer when we see a protocol error. */ if (r->buf != NULL) { @@ -456,7 +482,6 @@ void *redisFeedReplyReader(void *reader, char *buf, int len) { /* Emit a reply when there is one. */ if (r->rpos == r->rlen) { void *reply = r->reply; - assert(reply != NULL); r->reply = NULL; /* Destroy the buffer when it is empty and is quite large. */ diff --git a/hiredis.h b/hiredis.h index 1f30b79..c354935 100644 --- a/hiredis.h +++ b/hiredis.h @@ -49,17 +49,25 @@ typedef struct redisReply { struct redisReply **element; /* elements vector for REDIS_REPLY_ARRAY */ } redisReply; - typedef struct redisReadTask { int type; void *parent; /* optional pointer to parent object */ int idx; /* index in parent (array) object */ } redisReadTask; +typedef struct redisReplyObjectFunctions { + void *(*createString)(redisReadTask*, char*, size_t); + void *(*createArray)(redisReadTask*, int); + void *(*createInteger)(redisReadTask*, long long); + void *(*createNil)(redisReadTask*); + void (*freeObject)(void*); +} redisReplyFunctions; + redisReply *redisConnect(int *fd, const char *ip, int port); -void freeReplyObject(redisReply *r); +void freeReplyObject(void *reply); redisReply *redisCommand(int fd, const char *format, ...); -void *redisCreateReplyReader(); +void *redisCreateReplyReader(redisReplyFunctions *fn); +void *redisGetReplyObjectFromReplyReader(void *reader); void redisFreeReplyReader(void *ptr); int redisIsReplyReaderEmpty(void *ptr); void *redisFeedReplyReader(void *reader, char *buf, int len); diff --git a/test.c b/test.c index d563a28..020d125 100644 --- a/test.c +++ b/test.c @@ -124,7 +124,7 @@ int main(void) { freeReplyObject(reply); test("Error handling in reply parser: "); - reader = redisCreateReplyReader(); + reader = redisCreateReplyReader(NULL); reply = redisFeedReplyReader(reader,(char*)"@foo\r\n",6); test_cond(reply->type == REDIS_PROTOCOL_ERROR && strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0); @@ -134,7 +134,7 @@ int main(void) { /* when the reply already contains multiple items, they must be free'd * on an error. valgrind will bark when this doesn't happen. */ test("Memory cleanup in reply parser: "); - reader = redisCreateReplyReader(); + reader = redisCreateReplyReader(NULL); redisFeedReplyReader(reader,(char*)"*2\r\n",4); redisFeedReplyReader(reader,(char*)"$5\r\nhello\r\n",11); reply = redisFeedReplyReader(reader,(char*)"@foo\r\n",6); -- cgit v1.2.3