summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPieter Noordhuis <pcnoordhuis@gmail.com>2010-09-20 18:02:28 +0200
committerPieter Noordhuis <pcnoordhuis@gmail.com>2010-09-20 18:02:28 +0200
commitdb5244045cb4c5600ba36f678ada2a9f0c4e9469 (patch)
treef4d68a79546795b0b6ed6049ddc075b424645046
parentb1fa529cf973247ba79f23b1b5c9249582fc1bfe (diff)
Introduce read tasks to allow hooking other code into reply parsing
-rw-r--r--hiredis.c231
-rw-r--r--hiredis.h7
2 files changed, 155 insertions, 83 deletions
diff --git a/hiredis.c b/hiredis.c
index 0b7a17e..454c01e 100644
--- a/hiredis.c
+++ b/hiredis.c
@@ -32,16 +32,19 @@
#include <stdlib.h>
#include <unistd.h>
#include <stdarg.h>
+#include <assert.h>
#include "hiredis.h"
#include "anet.h"
#include "sds.h"
typedef struct redisReader {
+ void *reply; /* holds temporary reply */
+
sds buf; /* read buffer */
unsigned int pos; /* buffer cursor */
- redisReply **rlist; /* list of items to process */
+ redisReadTask *rlist; /* list of items to process */
unsigned int rlen; /* list length */
unsigned int rpos; /* list cursor */
} redisReader;
@@ -90,7 +93,7 @@ void freeReplyObject(redisReply *r) {
break; /* Nothing to free */
case REDIS_REPLY_ARRAY:
for (j = 0; j < r->elements; j++)
- freeReplyObject(r->element[j]);
+ if (r->element[j]) freeReplyObject(r->element[j]);
free(r->element);
break;
default:
@@ -116,6 +119,58 @@ static redisReply *redisIOError(void) {
return createErrorObject("I/O error");
}
+static void *createStringObject(redisReadTask *task, char *str, size_t len) {
+ redisReply *r = createReplyObject(task->type,sdsnewlen(str,len));
+ assert(task->type == REDIS_REPLY_ERROR ||
+ task->type == REDIS_REPLY_STATUS ||
+ task->type == REDIS_REPLY_STRING);
+
+ /* for API compat, set STATUS to STRING */
+ if (task->type == REDIS_REPLY_STATUS)
+ r->type = REDIS_REPLY_STRING;
+
+ if (task->parent) {
+ redisReply *parent = task->parent;
+ assert(parent->type == REDIS_REPLY_ARRAY);
+ parent->element[task->idx] = r;
+ }
+ return r;
+}
+
+static void *createArrayObject(redisReadTask *task, int elements) {
+ redisReply *r = createReplyObject(REDIS_REPLY_ARRAY,NULL);
+ r->elements = elements;
+ if ((r->element = calloc(sizeof(redisReply*),elements)) == NULL)
+ redisOOM();
+ if (task->parent) {
+ redisReply *parent = task->parent;
+ assert(parent->type == REDIS_REPLY_ARRAY);
+ parent->element[task->idx] = r;
+ }
+ return r;
+}
+
+static void *createIntegerObject(redisReadTask *task, long long value) {
+ redisReply *r = createReplyObject(REDIS_REPLY_INTEGER,NULL);
+ r->integer = value;
+ if (task->parent) {
+ redisReply *parent = task->parent;
+ assert(parent->type == REDIS_REPLY_ARRAY);
+ parent->element[task->idx] = r;
+ }
+ return r;
+}
+
+static void *createNilObject(redisReadTask *task) {
+ redisReply *r = createReplyObject(REDIS_REPLY_NIL,NULL);
+ if (task->parent) {
+ redisReply *parent = task->parent;
+ assert(parent->type == REDIS_REPLY_ARRAY);
+ parent->element[task->idx] = r;
+ }
+ return r;
+}
+
static char *readBytes(redisReader *r, unsigned int bytes) {
char *p;
if (sdslen(r->buf)-r->pos >= bytes) {
@@ -140,21 +195,21 @@ static char *readLine(redisReader *r, int *_len) {
}
static int processLineItem(redisReader *r) {
- redisReply *cur = r->rlist[r->rpos];
+ redisReadTask *cur = &(r->rlist[r->rpos]);
+ void *obj;
char *p;
int len;
if ((p = readLine(r,&len)) != NULL) {
if (cur->type == REDIS_REPLY_INTEGER) {
- cur->integer = strtoll(p,NULL,10);
+ obj = createIntegerObject(cur,strtoll(p,NULL,10));
} else {
- cur->reply = sdsnewlen(p,len);
+ obj = createStringObject(cur,p,len);
}
- /* for API compat, set STATUS to STRING */
- if (cur->type == REDIS_REPLY_STATUS)
- cur->type = REDIS_REPLY_STRING;
-
+ /* If there is no root yet, register this object as root. */
+ if (r->reply == NULL)
+ r->reply = obj;
r->rpos++;
return 0;
}
@@ -162,80 +217,90 @@ static int processLineItem(redisReader *r) {
}
static int processBulkItem(redisReader *r) {
- redisReply *cur = r->rlist[r->rpos];
- char *p;
- int len;
+ redisReadTask *cur = &(r->rlist[r->rpos]);
+ void *obj = NULL;
+ char *p, *s;
+ long len;
+ unsigned long bytelen;
+
+ p = r->buf+r->pos;
+ s = strstr(p,"\r\n");
+ if (s != NULL) {
+ p = r->buf+r->pos;
+ bytelen = s-(r->buf+r->pos)+2; /* include \r\n */
+ len = strtol(p,NULL,10);
- if (cur->reply == NULL) {
- if ((p = readLine(r,NULL)) != NULL) {
- len = atoi(p);
- if (len == -1) {
- /* nil means this item is done */
- cur->type = REDIS_REPLY_NIL;
- cur->reply = sdsempty();
- r->rpos++;
- return 0;
- } else {
- cur->reply = sdsnewlen(NULL,len);
- }
+ if (len < 0) {
+ /* The nil object can always be created. */
+ obj = createNilObject(cur);
} else {
- return -1;
+ /* Only continue when the buffer contains the entire bulk item. */
+ bytelen += len+2; /* include \r\n */
+ if (r->pos+bytelen <= sdslen(r->buf)) {
+ obj = createStringObject(cur,s+2,len);
+ }
}
- }
- len = sdslen(cur->reply);
- /* add two bytes for crlf */
- if ((p = readBytes(r,len+2)) != NULL) {
- memcpy(cur->reply,p,len);
- r->rpos++;
- return 0;
+ /* Proceed when obj was created. */
+ if (obj != NULL) {
+ r->pos += bytelen;
+ if (r->reply == NULL)
+ r->reply = obj;
+ r->rpos++;
+ return 0;
+ }
}
return -1;
}
static int processMultiBulkItem(redisReader *r) {
- redisReply *cur = r->rlist[r->rpos];
+ redisReadTask *cur = &(r->rlist[r->rpos]);
+ void *obj;
char *p;
- int elements, j;
+ long elements, j;
if ((p = readLine(r,NULL)) != NULL) {
- elements = atoi(p);
+ elements = strtol(p,NULL,10);
if (elements == -1) {
- /* empty */
- cur->type = REDIS_REPLY_NIL;
- cur->reply = sdsempty();
+ obj = createNilObject(cur);
+ } else {
+ obj = createArrayObject(cur,elements);
+
+ /* Modify read list when there are more than 0 elements. */
+ if (elements > 0) {
+ /* Append elements to the read list. */
+ r->rlen += elements;
+ if ((r->rlist = realloc(r->rlist,sizeof(redisReadTask)*r->rlen)) == NULL)
+ redisOOM();
+
+ /* Move existing items backwards. */
+ memmove(&(r->rlist[r->rpos+1+elements]),
+ &(r->rlist[r->rpos+1]),
+ (r->rlen-(r->rpos+1+elements))*sizeof(redisReadTask));
+
+ /* Populate new read items. */
+ redisReadTask *t;
+ for (j = 0; j < elements; j++) {
+ t = &(r->rlist[r->rpos+1+j]);
+ t->type = -1;
+ t->parent = obj;
+ t->idx = j;
+ }
+ }
+ }
+
+ if (obj != NULL) {
+ if (r->reply == NULL)
+ r->reply = obj;
r->rpos++;
return 0;
}
- } else {
- return -1;
- }
-
- cur->elements = elements;
- r->rlen += elements;
- r->rpos++;
-
- /* create placeholder items */
- if ((cur->element = malloc(sizeof(redisReply*)*elements)) == NULL)
- redisOOM();
- if ((r->rlist = realloc(r->rlist,sizeof(redisReply*)*r->rlen)) == NULL)
- redisOOM();
-
- /* move existing items backwards */
- memmove(&(r->rlist[r->rpos+elements]),
- &(r->rlist[r->rpos]),
- (r->rlen-(r->rpos+elements))*sizeof(redisReply*));
-
- /* populate item list */
- for (j = 0; j < elements; j++) {
- cur->element[j] = createReplyObject(-1,NULL);
- r->rlist[r->rpos+j] = cur->element[j];
}
- return 0;
+ return -1;
}
static int processItem(redisReader *r) {
- redisReply *cur = r->rlist[r->rpos];
+ redisReadTask *cur = &(r->rlist[r->rpos]);
char *p;
sds byte;
@@ -311,42 +376,40 @@ static redisReply *redisReadReply(int fd) {
void *redisCreateReplyReader() {
redisReader *r = calloc(sizeof(redisReader),1);
r->buf = sdsempty();
+ r->rlist = malloc(sizeof(redisReadTask)*1);
return r;
}
void redisFreeReplyReader(void *reader) {
redisReader *r = reader;
- if (r->buf != NULL) {
+ if (r->reply != NULL)
+ freeReplyObject(r->reply);
+ if (r->buf != NULL)
sdsfree(r->buf);
- }
- if (r->rlen > 0) {
- freeReplyObject(r->rlist[0]);
+ if (r->rlist != NULL)
free(r->rlist);
- }
free(r);
}
int redisIsReplyReaderEmpty(void *reader) {
redisReader *r = reader;
- if (r->buf != NULL && sdslen(r->buf) > 0)
- return 0;
- if (r->rlist != NULL && r->rpos < r->rlen)
- return 0;
+ if ((r->buf != NULL && sdslen(r->buf) > 0) ||
+ (r->rpos < r->rlen)) return 0;
return 1;
}
static void redisSetReplyReaderError(redisReader *r, redisReply *error) {
+ if (r->reply != NULL)
+ freeReplyObject(r->reply);
+
/* Clear remaining buffer when we see a protocol error. */
if (r->buf != NULL) {
sdsfree(r->buf);
r->buf = sdsempty();
r->pos = 0;
}
- /* Clear currently allocated objects. */
- if (r->rlist[0] != NULL)
- freeReplyObject(r->rlist[0]);
- r->rlen = r->rpos = 1;
- r->rlist[0] = error;
+ r->rlen = r->rpos = 0;
+ r->reply = error;
}
void *redisFeedReplyReader(void *reader, char *buf, int len) {
@@ -362,8 +425,10 @@ void *redisFeedReplyReader(void *reader, char *buf, int len) {
/* Create first item to process when the item list is empty. */
if (r->rlen == 0) {
- r->rlist = malloc(sizeof(redisReply*));
- r->rlist[0] = createReplyObject(-1,NULL);
+ r->rlist = realloc(r->rlist,sizeof(redisReadTask)*1);
+ r->rlist[0].type = -1;
+ r->rlist[0].parent = NULL;
+ r->rlist[0].idx = -1;
r->rlen = 1;
r->rpos = 0;
}
@@ -387,7 +452,9 @@ void *redisFeedReplyReader(void *reader, char *buf, int len) {
/* Emit a reply when there is one. */
if (r->rpos == r->rlen) {
- redisReply *reply = r->rlist[0];
+ void *reply = r->reply;
+ assert(reply != NULL);
+ r->reply = NULL;
/* Destroy the buffer when it is empty and is quite large. */
if (sdslen(r->buf) == 0 && sdsavail(r->buf) > 16*1024) {
@@ -396,9 +463,7 @@ void *redisFeedReplyReader(void *reader, char *buf, int len) {
r->pos = 0;
}
- /* Free list of items to process. */
- free(r->rlist);
- r->rlist = NULL;
+ /* Set list of items to read to be empty. */
r->rlen = r->rpos = 0;
return reply;
} else {
diff --git a/hiredis.h b/hiredis.h
index 482072e..1f30b79 100644
--- a/hiredis.h
+++ b/hiredis.h
@@ -49,6 +49,13 @@ typedef struct redisReply {
struct redisReply **element; /* elements vector for REDIS_REPLY_ARRAY */
} redisReply;
+
+typedef struct redisReadTask {
+ int type;
+ void *parent; /* optional pointer to parent object */
+ int idx; /* index in parent (array) object */
+} redisReadTask;
+
redisReply *redisConnect(int *fd, const char *ip, int port);
void freeReplyObject(redisReply *r);
redisReply *redisCommand(int fd, const char *format, ...);