diff options
author | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-09-20 18:02:28 +0200 |
---|---|---|
committer | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-09-20 18:02:28 +0200 |
commit | db5244045cb4c5600ba36f678ada2a9f0c4e9469 (patch) | |
tree | f4d68a79546795b0b6ed6049ddc075b424645046 | |
parent | b1fa529cf973247ba79f23b1b5c9249582fc1bfe (diff) |
Introduce read tasks to allow hooking other code into reply parsing
-rw-r--r-- | hiredis.c | 231 | ||||
-rw-r--r-- | hiredis.h | 7 |
2 files changed, 155 insertions, 83 deletions
@@ -32,16 +32,19 @@ #include <stdlib.h> #include <unistd.h> #include <stdarg.h> +#include <assert.h> #include "hiredis.h" #include "anet.h" #include "sds.h" typedef struct redisReader { + void *reply; /* holds temporary reply */ + sds buf; /* read buffer */ unsigned int pos; /* buffer cursor */ - redisReply **rlist; /* list of items to process */ + redisReadTask *rlist; /* list of items to process */ unsigned int rlen; /* list length */ unsigned int rpos; /* list cursor */ } redisReader; @@ -90,7 +93,7 @@ void freeReplyObject(redisReply *r) { break; /* Nothing to free */ case REDIS_REPLY_ARRAY: for (j = 0; j < r->elements; j++) - freeReplyObject(r->element[j]); + if (r->element[j]) freeReplyObject(r->element[j]); free(r->element); break; default: @@ -116,6 +119,58 @@ static redisReply *redisIOError(void) { return createErrorObject("I/O error"); } +static void *createStringObject(redisReadTask *task, char *str, size_t len) { + redisReply *r = createReplyObject(task->type,sdsnewlen(str,len)); + assert(task->type == REDIS_REPLY_ERROR || + task->type == REDIS_REPLY_STATUS || + task->type == REDIS_REPLY_STRING); + + /* for API compat, set STATUS to STRING */ + if (task->type == REDIS_REPLY_STATUS) + r->type = REDIS_REPLY_STRING; + + if (task->parent) { + redisReply *parent = task->parent; + assert(parent->type == REDIS_REPLY_ARRAY); + parent->element[task->idx] = r; + } + return r; +} + +static void *createArrayObject(redisReadTask *task, int elements) { + redisReply *r = createReplyObject(REDIS_REPLY_ARRAY,NULL); + r->elements = elements; + if ((r->element = calloc(sizeof(redisReply*),elements)) == NULL) + redisOOM(); + if (task->parent) { + redisReply *parent = task->parent; + assert(parent->type == REDIS_REPLY_ARRAY); + parent->element[task->idx] = r; + } + return r; +} + +static void *createIntegerObject(redisReadTask *task, long long value) { + redisReply *r = createReplyObject(REDIS_REPLY_INTEGER,NULL); + r->integer = value; + if (task->parent) { + redisReply *parent = task->parent; + assert(parent->type == REDIS_REPLY_ARRAY); + parent->element[task->idx] = r; + } + return r; +} + +static void *createNilObject(redisReadTask *task) { + redisReply *r = createReplyObject(REDIS_REPLY_NIL,NULL); + if (task->parent) { + redisReply *parent = task->parent; + assert(parent->type == REDIS_REPLY_ARRAY); + parent->element[task->idx] = r; + } + return r; +} + static char *readBytes(redisReader *r, unsigned int bytes) { char *p; if (sdslen(r->buf)-r->pos >= bytes) { @@ -140,21 +195,21 @@ static char *readLine(redisReader *r, int *_len) { } static int processLineItem(redisReader *r) { - redisReply *cur = r->rlist[r->rpos]; + redisReadTask *cur = &(r->rlist[r->rpos]); + void *obj; char *p; int len; if ((p = readLine(r,&len)) != NULL) { if (cur->type == REDIS_REPLY_INTEGER) { - cur->integer = strtoll(p,NULL,10); + obj = createIntegerObject(cur,strtoll(p,NULL,10)); } else { - cur->reply = sdsnewlen(p,len); + obj = createStringObject(cur,p,len); } - /* for API compat, set STATUS to STRING */ - if (cur->type == REDIS_REPLY_STATUS) - cur->type = REDIS_REPLY_STRING; - + /* If there is no root yet, register this object as root. */ + if (r->reply == NULL) + r->reply = obj; r->rpos++; return 0; } @@ -162,80 +217,90 @@ static int processLineItem(redisReader *r) { } static int processBulkItem(redisReader *r) { - redisReply *cur = r->rlist[r->rpos]; - char *p; - int len; + redisReadTask *cur = &(r->rlist[r->rpos]); + void *obj = NULL; + char *p, *s; + long len; + unsigned long bytelen; + + p = r->buf+r->pos; + s = strstr(p,"\r\n"); + if (s != NULL) { + p = r->buf+r->pos; + bytelen = s-(r->buf+r->pos)+2; /* include \r\n */ + len = strtol(p,NULL,10); - if (cur->reply == NULL) { - if ((p = readLine(r,NULL)) != NULL) { - len = atoi(p); - if (len == -1) { - /* nil means this item is done */ - cur->type = REDIS_REPLY_NIL; - cur->reply = sdsempty(); - r->rpos++; - return 0; - } else { - cur->reply = sdsnewlen(NULL,len); - } + if (len < 0) { + /* The nil object can always be created. */ + obj = createNilObject(cur); } else { - return -1; + /* Only continue when the buffer contains the entire bulk item. */ + bytelen += len+2; /* include \r\n */ + if (r->pos+bytelen <= sdslen(r->buf)) { + obj = createStringObject(cur,s+2,len); + } } - } - len = sdslen(cur->reply); - /* add two bytes for crlf */ - if ((p = readBytes(r,len+2)) != NULL) { - memcpy(cur->reply,p,len); - r->rpos++; - return 0; + /* Proceed when obj was created. */ + if (obj != NULL) { + r->pos += bytelen; + if (r->reply == NULL) + r->reply = obj; + r->rpos++; + return 0; + } } return -1; } static int processMultiBulkItem(redisReader *r) { - redisReply *cur = r->rlist[r->rpos]; + redisReadTask *cur = &(r->rlist[r->rpos]); + void *obj; char *p; - int elements, j; + long elements, j; if ((p = readLine(r,NULL)) != NULL) { - elements = atoi(p); + elements = strtol(p,NULL,10); if (elements == -1) { - /* empty */ - cur->type = REDIS_REPLY_NIL; - cur->reply = sdsempty(); + obj = createNilObject(cur); + } else { + obj = createArrayObject(cur,elements); + + /* Modify read list when there are more than 0 elements. */ + if (elements > 0) { + /* Append elements to the read list. */ + r->rlen += elements; + if ((r->rlist = realloc(r->rlist,sizeof(redisReadTask)*r->rlen)) == NULL) + redisOOM(); + + /* Move existing items backwards. */ + memmove(&(r->rlist[r->rpos+1+elements]), + &(r->rlist[r->rpos+1]), + (r->rlen-(r->rpos+1+elements))*sizeof(redisReadTask)); + + /* Populate new read items. */ + redisReadTask *t; + for (j = 0; j < elements; j++) { + t = &(r->rlist[r->rpos+1+j]); + t->type = -1; + t->parent = obj; + t->idx = j; + } + } + } + + if (obj != NULL) { + if (r->reply == NULL) + r->reply = obj; r->rpos++; return 0; } - } else { - return -1; - } - - cur->elements = elements; - r->rlen += elements; - r->rpos++; - - /* create placeholder items */ - if ((cur->element = malloc(sizeof(redisReply*)*elements)) == NULL) - redisOOM(); - if ((r->rlist = realloc(r->rlist,sizeof(redisReply*)*r->rlen)) == NULL) - redisOOM(); - - /* move existing items backwards */ - memmove(&(r->rlist[r->rpos+elements]), - &(r->rlist[r->rpos]), - (r->rlen-(r->rpos+elements))*sizeof(redisReply*)); - - /* populate item list */ - for (j = 0; j < elements; j++) { - cur->element[j] = createReplyObject(-1,NULL); - r->rlist[r->rpos+j] = cur->element[j]; } - return 0; + return -1; } static int processItem(redisReader *r) { - redisReply *cur = r->rlist[r->rpos]; + redisReadTask *cur = &(r->rlist[r->rpos]); char *p; sds byte; @@ -311,42 +376,40 @@ static redisReply *redisReadReply(int fd) { void *redisCreateReplyReader() { redisReader *r = calloc(sizeof(redisReader),1); r->buf = sdsempty(); + r->rlist = malloc(sizeof(redisReadTask)*1); return r; } void redisFreeReplyReader(void *reader) { redisReader *r = reader; - if (r->buf != NULL) { + if (r->reply != NULL) + freeReplyObject(r->reply); + if (r->buf != NULL) sdsfree(r->buf); - } - if (r->rlen > 0) { - freeReplyObject(r->rlist[0]); + if (r->rlist != NULL) free(r->rlist); - } free(r); } int redisIsReplyReaderEmpty(void *reader) { redisReader *r = reader; - if (r->buf != NULL && sdslen(r->buf) > 0) - return 0; - if (r->rlist != NULL && r->rpos < r->rlen) - return 0; + if ((r->buf != NULL && sdslen(r->buf) > 0) || + (r->rpos < r->rlen)) return 0; return 1; } static void redisSetReplyReaderError(redisReader *r, redisReply *error) { + if (r->reply != NULL) + freeReplyObject(r->reply); + /* Clear remaining buffer when we see a protocol error. */ if (r->buf != NULL) { sdsfree(r->buf); r->buf = sdsempty(); r->pos = 0; } - /* Clear currently allocated objects. */ - if (r->rlist[0] != NULL) - freeReplyObject(r->rlist[0]); - r->rlen = r->rpos = 1; - r->rlist[0] = error; + r->rlen = r->rpos = 0; + r->reply = error; } void *redisFeedReplyReader(void *reader, char *buf, int len) { @@ -362,8 +425,10 @@ void *redisFeedReplyReader(void *reader, char *buf, int len) { /* Create first item to process when the item list is empty. */ if (r->rlen == 0) { - r->rlist = malloc(sizeof(redisReply*)); - r->rlist[0] = createReplyObject(-1,NULL); + r->rlist = realloc(r->rlist,sizeof(redisReadTask)*1); + r->rlist[0].type = -1; + r->rlist[0].parent = NULL; + r->rlist[0].idx = -1; r->rlen = 1; r->rpos = 0; } @@ -387,7 +452,9 @@ void *redisFeedReplyReader(void *reader, char *buf, int len) { /* Emit a reply when there is one. */ if (r->rpos == r->rlen) { - redisReply *reply = r->rlist[0]; + void *reply = r->reply; + assert(reply != NULL); + r->reply = NULL; /* Destroy the buffer when it is empty and is quite large. */ if (sdslen(r->buf) == 0 && sdsavail(r->buf) > 16*1024) { @@ -396,9 +463,7 @@ void *redisFeedReplyReader(void *reader, char *buf, int len) { r->pos = 0; } - /* Free list of items to process. */ - free(r->rlist); - r->rlist = NULL; + /* Set list of items to read to be empty. */ r->rlen = r->rpos = 0; return reply; } else { @@ -49,6 +49,13 @@ typedef struct redisReply { struct redisReply **element; /* elements vector for REDIS_REPLY_ARRAY */ } redisReply; + +typedef struct redisReadTask { + int type; + void *parent; /* optional pointer to parent object */ + int idx; /* index in parent (array) object */ +} redisReadTask; + redisReply *redisConnect(int *fd, const char *ip, int port); void freeReplyObject(redisReply *r); redisReply *redisCommand(int fd, const char *format, ...); |