summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPieter Noordhuis <pcnoordhuis@gmail.com>2011-05-02 14:21:48 +0200
committerPieter Noordhuis <pcnoordhuis@gmail.com>2011-05-05 17:01:34 +0200
commit77540aa31694aa1e14d41f60a0452e49a2fed86a (patch)
treee0a49f7c6a73a17cfb10fccae8d848685b4f9ac2
parentdf203bc328fb4ff9d19c591b8ec31f62d5ef9dae (diff)
Add function to retrieve formatted reply
This is done by only truncating the read buffer once a full reply has been read. The buffer is no longer truncated halfway through reading a reply. In addition: pass offset/length of protocol and content via the read tasks.
-rw-r--r--hiredis.c50
-rw-r--r--hiredis.h7
-rw-r--r--test.c95
3 files changed, 140 insertions, 12 deletions
diff --git a/hiredis.c b/hiredis.c
index ef9e189..0b0db52 100644
--- a/hiredis.c
+++ b/hiredis.c
@@ -356,7 +356,12 @@ static int processLineItem(redisReader *r) {
char *p;
int len;
+ cur->poff = (r->pos-r->roff)-1;
+ cur->coff = cur->poff+1;
if ((p = readLine(r,&len)) != NULL) {
+ cur->plen = 1+len+2; /* include \r\n */
+ cur->clen = len;
+
if (cur->type == REDIS_REPLY_INTEGER) {
if (r->fn && r->fn->createInteger)
obj = r->fn->createInteger(cur,readLongLong(p));
@@ -395,10 +400,13 @@ static int processBulkItem(redisReader *r) {
p = r->buf+r->pos;
s = seekNewline(p,r->len-r->pos);
if (s != NULL) {
- p = r->buf+r->pos;
bytelen = s-(r->buf+r->pos)+2; /* include \r\n */
- len = readLongLong(p);
+ cur->poff = (r->pos-r->roff)-1;
+ cur->plen = bytelen+1;
+ cur->coff = cur->poff+1+bytelen;
+ cur->clen = 0;
+ len = readLongLong(p);
if (len < 0) {
/* The nil object can always be created. */
if (r->fn && r->fn->createNil)
@@ -410,6 +418,8 @@ static int processBulkItem(redisReader *r) {
/* Only continue when the buffer contains the entire bulk item. */
bytelen += len+2; /* include \r\n */
if (r->pos+bytelen <= r->len) {
+ cur->plen += len+2;
+ cur->clen = len;
if (r->fn && r->fn->createString)
obj = r->fn->createString(cur,s+2,len);
else
@@ -451,7 +461,12 @@ static int processMultiBulkItem(redisReader *r) {
return REDIS_ERR;
}
+ cur->poff = (r->pos-r->roff)-1;
+ cur->coff = 0;
if ((p = readLine(r,NULL)) != NULL) {
+ cur->plen = (r->pos-r->roff)-cur->poff; /* includes \r\n */
+ cur->clen = 0;
+
elements = readLongLong(p);
root = (r->ridx == 0);
@@ -588,7 +603,7 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
/* Copy the provided buffer. */
if (buf != NULL && len >= 1) {
- /* Destroy internal buffer when it is empty and is quite large. */
+ /* Destroy buffer when it is empty and is quite large. */
if (r->len == 0 && sdsavail(r->buf) > 16*1024) {
sdsfree(r->buf);
r->buf = sdsempty();
@@ -598,6 +613,15 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) {
assert(r->buf != NULL);
}
+ /* Discard consumed part of the buffer when the offset for the reply
+ * that is currently being read is high enough. */
+ if (r->roff >= 1024) {
+ r->buf = sdsrange(r->buf,r->roff,-1);
+ r->pos -= r->roff;
+ r->roff = 0;
+ r->len = sdslen(r->buf);
+ }
+
newbuf = sdscatlen(r->buf,buf,len);
if (newbuf == NULL) {
__redisReaderSetErrorOOM(r);
@@ -633,6 +657,7 @@ int redisReaderGetReply(redisReader *r, void **reply) {
r->rstack[0].parent = NULL;
r->rstack[0].privdata = r->privdata;
r->ridx = 0;
+ r->roff = r->pos; /* Start offset in buffer. */
}
/* Process items in reply. */
@@ -644,14 +669,6 @@ int redisReaderGetReply(redisReader *r, void **reply) {
if (r->err)
return REDIS_ERR;
- /* Discard part of the buffer when we've consumed at least 1k, to avoid
- * doing unnecessary calls to memmove() in sds.c. */
- if (r->pos >= 1024) {
- r->buf = sdsrange(r->buf,r->pos,-1);
- r->pos = 0;
- r->len = sdslen(r->buf);
- }
-
/* Emit a reply when there is one. */
if (r->ridx == -1) {
if (reply != NULL)
@@ -661,6 +678,17 @@ int redisReaderGetReply(redisReader *r, void **reply) {
return REDIS_OK;
}
+const char *redisReaderGetRaw(redisReader *r, size_t *len) {
+ /* ridx == -1: No or a full reply has been read. */
+ /* pos > roff: Buffer position is larger than start offset, meaning
+ * the buffer has not yet been truncated. */
+ if (r->ridx == -1 && r->pos > r->roff) {
+ if (len) *len = (r->pos-r->roff);
+ return r->buf+r->roff;
+ }
+ return NULL;
+}
+
/* Calculate the number of bytes needed to represent an integer as string. */
static int intlen(int i) {
int len = 0;
diff --git a/hiredis.h b/hiredis.h
index 6c51245..26e68cb 100644
--- a/hiredis.h
+++ b/hiredis.h
@@ -98,6 +98,11 @@ typedef struct redisReply {
} redisReply;
typedef struct redisReadTask {
+ size_t poff; /* Protocol offset */
+ size_t plen; /* Protocol length */
+ size_t coff; /* Content offset */
+ size_t clen; /* Content length */
+
int type;
int elements; /* number of elements in multibulk container */
int idx; /* index in parent (array) object */
@@ -122,6 +127,7 @@ typedef struct redisReader {
char *buf; /* Read buffer */
size_t pos; /* Buffer cursor */
size_t len; /* Buffer length */
+ size_t roff; /* Reply offset */
redisReadTask rstack[3];
int ridx; /* Index of current read task */
@@ -136,6 +142,7 @@ redisReader *redisReaderCreate(void);
void redisReaderFree(redisReader *r);
int redisReaderFeed(redisReader *r, const char *buf, size_t len);
int redisReaderGetReply(redisReader *r, void **reply);
+const char *redisReaderGetRaw(redisReader *r, size_t *len);
/* Backwards compatibility, can be removed on big version bump. */
#define redisReplyReaderCreate redisReaderCreate
diff --git a/test.c b/test.c
index c00a473..d166669 100644
--- a/test.c
+++ b/test.c
@@ -331,6 +331,98 @@ static void test_reply_reader(void) {
test_cond(ret == REDIS_ERR && reply == NULL);
}
+static void *test_create_string(const redisReadTask *task, char *str, size_t len) {
+ redisReader *r = (redisReader*)task->privdata;
+ const char *roff = r->buf+r->roff;
+ ((void)str); ((void)len);
+
+ assert(task->plen > 0);
+ assert(task->clen > 0);
+ switch(task->type) {
+ case REDIS_REPLY_STATUS:
+ assert(strncmp("+status\r\n", roff+task->poff, task->plen) == 0);
+ assert(strncmp("status", roff+task->coff, task->clen) == 0);
+ break;
+ case REDIS_REPLY_ERROR:
+ assert(strncmp("-error\r\n", roff+task->poff, task->plen) == 0);
+ assert(strncmp("error", roff+task->coff, task->clen) == 0);
+ break;
+ case REDIS_REPLY_STRING: /* bulk */
+ assert(strncmp("$4\r\nbulk\r\n", roff+task->poff, task->plen) == 0);
+ assert(strncmp("bulk", roff+task->coff, task->clen) == 0);
+ break;
+ default:
+ assert(NULL);
+ }
+ return (void*)1;
+}
+
+static void *test_create_array(const redisReadTask *task, int len) {
+ redisReader *r = (redisReader*)task->privdata;
+ const char *roff = r->buf+r->roff;
+ ((void)len);
+
+ assert(task->plen > 0);
+ assert(task->clen == 0);
+ assert(strncmp("*5\r\n", roff+task->poff, task->plen) == 0);
+ return (void*)1;
+}
+
+static void *test_create_integer(const redisReadTask *task, long long value) {
+ redisReader *r = (redisReader*)task->privdata;
+ const char *roff = r->buf+r->roff;
+ ((void)value);
+
+ assert(task->plen > 0);
+ assert(task->clen > 0);
+ assert(strncmp(":1234\r\n", roff+task->poff, task->plen) == 0);
+ assert(strncmp("1234", roff+task->coff, task->clen) == 0);
+ return (void*)1;
+}
+
+static void *test_create_nil(const redisReadTask *task) {
+ redisReader *r = (redisReader*)task->privdata;
+ const char *roff = r->buf+r->roff;
+
+ assert(task->plen > 0);
+ assert(task->clen == 0);
+ assert(strncmp("$-1\r\n", roff+task->poff, task->plen) == 0);
+ return (void*)1;
+}
+
+static redisReplyObjectFunctions test_reader_fn = {
+ test_create_string,
+ test_create_array,
+ test_create_integer,
+ test_create_nil,
+ NULL
+};
+
+static void test_reader_functions(void) {
+ redisReader *reader;
+ const char *input;
+ int ret;
+ void *obj;
+
+ input =
+ "*5\r\n"
+ "$-1\r\n"
+ ":1234\r\n"
+ "+status\r\n"
+ "-error\r\n"
+ "$4\r\nbulk\r\n";
+
+ test("Custom object functions in reply reader: ");
+ reader = redisReaderCreate();
+ reader->fn = &test_reader_fn;
+ reader->privdata = reader;
+
+ redisReaderFeed(reader,input,strlen(input));
+ ret = redisReaderGetReply(reader,&obj);
+ test_cond(ret == REDIS_OK && obj == (void*)1);
+ redisReaderFree(reader);
+}
+
static void test_throughput(void) {
int i, num;
long long t1, t2;
@@ -513,8 +605,9 @@ int main(int argc, char **argv) {
signal(SIGPIPE, SIG_IGN);
test_format_commands();
- test_blocking_connection();
test_reply_reader();
+ test_reader_functions();
+ test_blocking_connection();
// test_nonblocking_connection();
test_throughput();
cleanup();