summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--hiredis.c286
-rw-r--r--hiredis.h1
-rw-r--r--test.c17
3 files changed, 222 insertions, 82 deletions
diff --git a/hiredis.c b/hiredis.c
index c774c69..96a35f3 100644
--- a/hiredis.c
+++ b/hiredis.c
@@ -37,6 +37,17 @@
#include "anet.h"
#include "sds.h"
+typedef struct redisReader {
+ char *buf; /* read buffer */
+ int len; /* buffer length */
+ int avail; /* available bytes for consumption */
+ int pos; /* buffer cursor */
+
+ redisReply **rlist; /* list of items to process */
+ int rlen; /* list length */
+ int rpos; /* list cursor */
+} redisReader;
+
static redisReply *redisReadReply(int fd);
static redisReply *createReplyObject(int type, sds reply);
@@ -93,107 +104,218 @@ static redisReply *redisIOError(void) {
return createReplyObject(REDIS_REPLY_ERROR,sdsnew("I/O error"));
}
-/* In a real high performance C client this should be bufferized */
-static sds redisReadLine(int fd) {
- sds line = sdsempty();
+static char *readBytes(redisReader *r, int bytes) {
+ char *p;
+ if (r->len-r->pos >= bytes) {
+ p = r->buf+r->pos;
+ r->pos += bytes;
+ return p;
+ }
+ return NULL;
+}
+
+static char *readLine(redisReader *r, int *_len) {
+ char *p, *s = strstr(r->buf+r->pos,"\r\n");
+ int len;
+ if (s != NULL) {
+ p = r->buf+r->pos;
+ len = s-(r->buf+r->pos);
+ r->pos += len+2; /* skip \r\n */
+ if (_len) *_len = len;
+ return p;
+ }
+ return NULL;
+}
- while(1) {
- char c;
- ssize_t ret;
+static int processLineItem(redisReader *r) {
+ redisReply *cur = r->rlist[r->rpos];
+ char *p;
+ int len;
- ret = read(fd,&c,1);
- if (ret == -1) {
- sdsfree(line);
- return NULL;
- } else if ((ret == 0) || (c == '\n')) {
- break;
+ if ((p = readLine(r,&len)) != NULL) {
+ if (cur->type == REDIS_REPLY_INTEGER) {
+ cur->integer = strtoll(p,NULL,10);
} else {
- line = sdscatlen(line,&c,1);
+ cur->reply = sdsnewlen(p,len);
}
- }
- return sdstrim(line,"\r\n");
-}
-static redisReply *redisReadSingleLineReply(int fd, int type) {
- sds buf = redisReadLine(fd);
-
- if (buf == NULL) return redisIOError();
- return createReplyObject(type,buf);
+ /* for API compat, set STATUS to STRING */
+ if (cur->type == REDIS_REPLY_STATUS)
+ cur->type = REDIS_REPLY_STRING;
+
+ r->rpos++;
+ return 0;
+ }
+ return -1;
}
-static redisReply *redisReadIntegerReply(int fd) {
- sds buf = redisReadLine(fd);
- redisReply *r = malloc(sizeof(*r));
+static int processBulkItem(redisReader *r) {
+ redisReply *cur = r->rlist[r->rpos];
+ char *p;
+ int len;
+
+ if (cur->reply == NULL) {
+ if ((p = readLine(r,NULL)) != NULL) {
+ len = atoi(p);
+ if (len == -1) {
+ /* nil means this item is done */
+ cur->type = REDIS_REPLY_NIL;
+ cur->reply = sdsempty();
+ r->rpos++;
+ return 0;
+ } else {
+ cur->reply = sdsnewlen(NULL,len);
+ }
+ } else {
+ return -1;
+ }
+ }
- if (r == NULL) redisOOM();
- if (buf == NULL) {
- free(r);
- return redisIOError();
+ len = sdslen(cur->reply);
+ /* add two bytes for crlf */
+ if ((p = readBytes(r,len+2)) != NULL) {
+ memcpy(cur->reply,p,len);
+ r->rpos++;
+ return 0;
}
- r->type = REDIS_REPLY_INTEGER;
- r->integer = strtoll(buf,NULL,10);
- sdsfree(buf);
- return r;
+ return -1;
}
-static redisReply *redisReadBulkReply(int fd) {
- sds replylen = redisReadLine(fd);
- sds buf;
- char crlf[2];
- int bulklen;
-
- if (replylen == NULL) return redisIOError();
- bulklen = atoi(replylen);
- sdsfree(replylen);
- if (bulklen == -1)
- return createReplyObject(REDIS_REPLY_NIL,sdsempty());
-
- buf = sdsnewlen(NULL,bulklen);
- anetRead(fd,buf,bulklen);
- anetRead(fd,crlf,2);
- return createReplyObject(REDIS_REPLY_STRING,buf);
-}
+static int processMultiBulkItem(redisReader *r) {
+ redisReply *cur = r->rlist[r->rpos];
+ char *p;
+ int elements, j;
+
+ if ((p = readLine(r,NULL)) != NULL) {
+ elements = atoi(p);
+ if (elements == -1) {
+ /* empty */
+ cur->type = REDIS_REPLY_NIL;
+ cur->reply = sdsempty();
+ r->rpos++;
+ return 0;
+ }
+ } else {
+ return -1;
+ }
-static redisReply *redisReadMultiBulkReply(int fd) {
- sds replylen = redisReadLine(fd);
- long elements, j;
- redisReply *r;
+ cur->elements = elements;
+ r->rlen += elements;
+ r->rpos++;
+
+ /* create placeholder items */
+ if ((cur->element = malloc(sizeof(redisReply*)*elements)) == NULL)
+ redisOOM();
+ if ((r->rlist = realloc(r->rlist,sizeof(redisReply*)*r->rlen)) == NULL)
+ redisOOM();
+
+ /* move existing items backwards */
+ memmove(&(r->rlist[r->rpos+elements]),
+ &(r->rlist[r->rpos]),
+ (r->rlen-(r->rpos+elements))*sizeof(redisReply*));
+
+ /* populate item list */
+ for (j = 0; j < elements; j++) {
+ cur->element[j] = createReplyObject(-1,NULL);
+ r->rlist[r->rpos+j] = cur->element[j];
+ }
+ return 0;
+}
- if (replylen == NULL) return redisIOError();
- elements = strtol(replylen,NULL,10);
- sdsfree(replylen);
+static int processItem(redisReader *r) {
+ redisReply *cur = r->rlist[r->rpos];
+ char *p;
- if (elements == -1)
- return createReplyObject(REDIS_REPLY_NIL,sdsempty());
+ /* check if we need to read type */
+ if (cur->type < 0) {
+ if ((p = readBytes(r,1)) != NULL) {
+ switch (p[0]) {
+ case '-':
+ cur->type = REDIS_REPLY_ERROR;
+ break;
+ case '+':
+ cur->type = REDIS_REPLY_STATUS;
+ break;
+ case ':':
+ cur->type = REDIS_REPLY_INTEGER;
+ break;
+ case '$':
+ cur->type = REDIS_REPLY_STRING;
+ break;
+ case '*':
+ cur->type = REDIS_REPLY_ARRAY;
+ break;
+ default:
+ printf("protocol error, got '%c' as reply type byte\n", p[0]);
+ exit(1);
+ }
+ } else {
+ /* could not consume 1 byte */
+ return -1;
+ }
+ }
- if ((r = malloc(sizeof(*r))) == NULL) redisOOM();
- r->type = REDIS_REPLY_ARRAY;
- r->elements = elements;
- if ((r->element = malloc(sizeof(*r)*elements)) == NULL) redisOOM();
- for (j = 0; j < elements; j++)
- r->element[j] = redisReadReply(fd);
- return r;
+ /* process typed item */
+ switch(cur->type) {
+ case REDIS_REPLY_ERROR:
+ case REDIS_REPLY_STATUS:
+ case REDIS_REPLY_INTEGER:
+ return processLineItem(r);
+ case REDIS_REPLY_STRING:
+ return processBulkItem(r);
+ case REDIS_REPLY_ARRAY:
+ return processMultiBulkItem(r);
+ default:
+ printf("unknown item type: %d\n", cur->type);
+ exit(1);
+ }
}
+#define READ_BUFFER_SIZE 2048
static redisReply *redisReadReply(int fd) {
- char type;
-
- if (anetRead(fd,&type,1) <= 0) return redisIOError();
- switch(type) {
- case '-':
- return redisReadSingleLineReply(fd,REDIS_REPLY_ERROR);
- case '+':
- return redisReadSingleLineReply(fd,REDIS_REPLY_STRING);
- case ':':
- return redisReadIntegerReply(fd);
- case '$':
- return redisReadBulkReply(fd);
- case '*':
- return redisReadMultiBulkReply(fd);
- default:
- printf("protocol error, got '%c' as reply type byte\n", type);
- exit(1);
+ redisReader r;
+ int bytes;
+
+ /* setup read buffer */
+ r.buf = malloc(READ_BUFFER_SIZE+1);
+ r.len = READ_BUFFER_SIZE;
+ r.avail = 0;
+ r.pos = 0;
+
+ /* setup list of items to process */
+ r.rlist = malloc(sizeof(redisReply*));
+ r.rlist[0] = createReplyObject(-1,NULL);
+ r.rlen = 1;
+ r.rpos = 0;
+
+ while (r.rpos < r.rlen) {
+ /* discard the buffer upto pos */
+ if (r.pos > 0) {
+ memmove(r.buf,r.buf+r.pos,r.len-r.pos);
+ r.avail -= r.pos;
+ r.pos = 0;
+ }
+
+ /* make sure there is room for at least BUFFER_SIZE */
+ if (r.len-r.avail < READ_BUFFER_SIZE) {
+ r.buf = realloc(r.buf,r.avail+READ_BUFFER_SIZE+1);
+ r.len = r.avail+READ_BUFFER_SIZE;
+ }
+
+ /* read from socket into buffer */
+ if ((bytes = read(fd,r.buf+r.avail,READ_BUFFER_SIZE)) <= 0)
+ return redisIOError();
+ r.avail += bytes;
+ r.buf[r.avail] = '\0';
+
+ /* process items in reply */
+ while (r.rpos < r.rlen)
+ if (processItem(&r) < 0)
+ break;
}
+ free(r.buf);
+ free(r.rlist);
+ return r.rlist[0];
}
/* Helper function for redisCommand(). It's used to append the next argument
diff --git a/hiredis.h b/hiredis.h
index f28dcdb..3c76407 100644
--- a/hiredis.h
+++ b/hiredis.h
@@ -35,6 +35,7 @@
#define REDIS_REPLY_ARRAY 2
#define REDIS_REPLY_INTEGER 3
#define REDIS_REPLY_NIL 4
+#define REDIS_REPLY_STATUS 5
#include "sds.h"
diff --git a/test.c b/test.c
index 526af33..cec8ada 100644
--- a/test.c
+++ b/test.c
@@ -99,6 +99,23 @@ int main(void) {
!memcmp(reply->element[1]->reply,"foo",3))
freeReplyObject(reply);
+ /* test 9 (m/e with multi bulk reply *before* other reply).
+ * specifically test ordering of reply items to parse. */
+ printf("#10 can handle nested multi bulk replies: ");
+ freeReplyObject(redisCommand(fd,"MULTI"));
+ freeReplyObject(redisCommand(fd,"LRANGE mylist 0 -1"));
+ freeReplyObject(redisCommand(fd,"PING"));
+ reply = (redisCommand(fd,"EXEC"));
+ test_cond(reply->type == REDIS_REPLY_ARRAY &&
+ reply->elements == 2 &&
+ reply->element[0]->type == REDIS_REPLY_ARRAY &&
+ reply->element[0]->elements == 2 &&
+ !memcmp(reply->element[0]->element[0]->reply,"bar",3) &&
+ !memcmp(reply->element[0]->element[1]->reply,"foo",3) &&
+ reply->element[1]->type == REDIS_REPLY_STRING &&
+ strcasecmp(reply->element[1]->reply,"pong") == 0);
+ freeReplyObject(reply);
+
if (fails == 0) {
printf("ALL TESTS PASSED\n");
} else {