From 77540aa31694aa1e14d41f60a0452e49a2fed86a Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 2 May 2011 14:21:48 +0200 Subject: Add function to retrieve formatted reply This is done by only truncating the read buffer once a full reply has been read. The buffer is no longer truncated halfway through reading a reply. In addition: pass offset/length of protocol and content via the read tasks. --- hiredis.c | 50 +++++++++++++++++++++++++-------- hiredis.h | 7 +++++ test.c | 95 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 140 insertions(+), 12 deletions(-) diff --git a/hiredis.c b/hiredis.c index ef9e189..0b0db52 100644 --- a/hiredis.c +++ b/hiredis.c @@ -356,7 +356,12 @@ static int processLineItem(redisReader *r) { char *p; int len; + cur->poff = (r->pos-r->roff)-1; + cur->coff = cur->poff+1; if ((p = readLine(r,&len)) != NULL) { + cur->plen = 1+len+2; /* include \r\n */ + cur->clen = len; + if (cur->type == REDIS_REPLY_INTEGER) { if (r->fn && r->fn->createInteger) obj = r->fn->createInteger(cur,readLongLong(p)); @@ -395,10 +400,13 @@ static int processBulkItem(redisReader *r) { p = r->buf+r->pos; s = seekNewline(p,r->len-r->pos); if (s != NULL) { - p = r->buf+r->pos; bytelen = s-(r->buf+r->pos)+2; /* include \r\n */ - len = readLongLong(p); + cur->poff = (r->pos-r->roff)-1; + cur->plen = bytelen+1; + cur->coff = cur->poff+1+bytelen; + cur->clen = 0; + len = readLongLong(p); if (len < 0) { /* The nil object can always be created. */ if (r->fn && r->fn->createNil) @@ -410,6 +418,8 @@ static int processBulkItem(redisReader *r) { /* Only continue when the buffer contains the entire bulk item. */ bytelen += len+2; /* include \r\n */ if (r->pos+bytelen <= r->len) { + cur->plen += len+2; + cur->clen = len; if (r->fn && r->fn->createString) obj = r->fn->createString(cur,s+2,len); else @@ -451,7 +461,12 @@ static int processMultiBulkItem(redisReader *r) { return REDIS_ERR; } + cur->poff = (r->pos-r->roff)-1; + cur->coff = 0; if ((p = readLine(r,NULL)) != NULL) { + cur->plen = (r->pos-r->roff)-cur->poff; /* includes \r\n */ + cur->clen = 0; + elements = readLongLong(p); root = (r->ridx == 0); @@ -588,7 +603,7 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) { /* Copy the provided buffer. */ if (buf != NULL && len >= 1) { - /* Destroy internal buffer when it is empty and is quite large. */ + /* Destroy buffer when it is empty and is quite large. */ if (r->len == 0 && sdsavail(r->buf) > 16*1024) { sdsfree(r->buf); r->buf = sdsempty(); @@ -598,6 +613,15 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) { assert(r->buf != NULL); } + /* Discard consumed part of the buffer when the offset for the reply + * that is currently being read is high enough. */ + if (r->roff >= 1024) { + r->buf = sdsrange(r->buf,r->roff,-1); + r->pos -= r->roff; + r->roff = 0; + r->len = sdslen(r->buf); + } + newbuf = sdscatlen(r->buf,buf,len); if (newbuf == NULL) { __redisReaderSetErrorOOM(r); @@ -633,6 +657,7 @@ int redisReaderGetReply(redisReader *r, void **reply) { r->rstack[0].parent = NULL; r->rstack[0].privdata = r->privdata; r->ridx = 0; + r->roff = r->pos; /* Start offset in buffer. */ } /* Process items in reply. */ @@ -644,14 +669,6 @@ int redisReaderGetReply(redisReader *r, void **reply) { if (r->err) return REDIS_ERR; - /* Discard part of the buffer when we've consumed at least 1k, to avoid - * doing unnecessary calls to memmove() in sds.c. */ - if (r->pos >= 1024) { - r->buf = sdsrange(r->buf,r->pos,-1); - r->pos = 0; - r->len = sdslen(r->buf); - } - /* Emit a reply when there is one. */ if (r->ridx == -1) { if (reply != NULL) @@ -661,6 +678,17 @@ int redisReaderGetReply(redisReader *r, void **reply) { return REDIS_OK; } +const char *redisReaderGetRaw(redisReader *r, size_t *len) { + /* ridx == -1: No or a full reply has been read. */ + /* pos > roff: Buffer position is larger than start offset, meaning + * the buffer has not yet been truncated. */ + if (r->ridx == -1 && r->pos > r->roff) { + if (len) *len = (r->pos-r->roff); + return r->buf+r->roff; + } + return NULL; +} + /* Calculate the number of bytes needed to represent an integer as string. */ static int intlen(int i) { int len = 0; diff --git a/hiredis.h b/hiredis.h index 6c51245..26e68cb 100644 --- a/hiredis.h +++ b/hiredis.h @@ -98,6 +98,11 @@ typedef struct redisReply { } redisReply; typedef struct redisReadTask { + size_t poff; /* Protocol offset */ + size_t plen; /* Protocol length */ + size_t coff; /* Content offset */ + size_t clen; /* Content length */ + int type; int elements; /* number of elements in multibulk container */ int idx; /* index in parent (array) object */ @@ -122,6 +127,7 @@ typedef struct redisReader { char *buf; /* Read buffer */ size_t pos; /* Buffer cursor */ size_t len; /* Buffer length */ + size_t roff; /* Reply offset */ redisReadTask rstack[3]; int ridx; /* Index of current read task */ @@ -136,6 +142,7 @@ redisReader *redisReaderCreate(void); void redisReaderFree(redisReader *r); int redisReaderFeed(redisReader *r, const char *buf, size_t len); int redisReaderGetReply(redisReader *r, void **reply); +const char *redisReaderGetRaw(redisReader *r, size_t *len); /* Backwards compatibility, can be removed on big version bump. */ #define redisReplyReaderCreate redisReaderCreate diff --git a/test.c b/test.c index c00a473..d166669 100644 --- a/test.c +++ b/test.c @@ -331,6 +331,98 @@ static void test_reply_reader(void) { test_cond(ret == REDIS_ERR && reply == NULL); } +static void *test_create_string(const redisReadTask *task, char *str, size_t len) { + redisReader *r = (redisReader*)task->privdata; + const char *roff = r->buf+r->roff; + ((void)str); ((void)len); + + assert(task->plen > 0); + assert(task->clen > 0); + switch(task->type) { + case REDIS_REPLY_STATUS: + assert(strncmp("+status\r\n", roff+task->poff, task->plen) == 0); + assert(strncmp("status", roff+task->coff, task->clen) == 0); + break; + case REDIS_REPLY_ERROR: + assert(strncmp("-error\r\n", roff+task->poff, task->plen) == 0); + assert(strncmp("error", roff+task->coff, task->clen) == 0); + break; + case REDIS_REPLY_STRING: /* bulk */ + assert(strncmp("$4\r\nbulk\r\n", roff+task->poff, task->plen) == 0); + assert(strncmp("bulk", roff+task->coff, task->clen) == 0); + break; + default: + assert(NULL); + } + return (void*)1; +} + +static void *test_create_array(const redisReadTask *task, int len) { + redisReader *r = (redisReader*)task->privdata; + const char *roff = r->buf+r->roff; + ((void)len); + + assert(task->plen > 0); + assert(task->clen == 0); + assert(strncmp("*5\r\n", roff+task->poff, task->plen) == 0); + return (void*)1; +} + +static void *test_create_integer(const redisReadTask *task, long long value) { + redisReader *r = (redisReader*)task->privdata; + const char *roff = r->buf+r->roff; + ((void)value); + + assert(task->plen > 0); + assert(task->clen > 0); + assert(strncmp(":1234\r\n", roff+task->poff, task->plen) == 0); + assert(strncmp("1234", roff+task->coff, task->clen) == 0); + return (void*)1; +} + +static void *test_create_nil(const redisReadTask *task) { + redisReader *r = (redisReader*)task->privdata; + const char *roff = r->buf+r->roff; + + assert(task->plen > 0); + assert(task->clen == 0); + assert(strncmp("$-1\r\n", roff+task->poff, task->plen) == 0); + return (void*)1; +} + +static redisReplyObjectFunctions test_reader_fn = { + test_create_string, + test_create_array, + test_create_integer, + test_create_nil, + NULL +}; + +static void test_reader_functions(void) { + redisReader *reader; + const char *input; + int ret; + void *obj; + + input = + "*5\r\n" + "$-1\r\n" + ":1234\r\n" + "+status\r\n" + "-error\r\n" + "$4\r\nbulk\r\n"; + + test("Custom object functions in reply reader: "); + reader = redisReaderCreate(); + reader->fn = &test_reader_fn; + reader->privdata = reader; + + redisReaderFeed(reader,input,strlen(input)); + ret = redisReaderGetReply(reader,&obj); + test_cond(ret == REDIS_OK && obj == (void*)1); + redisReaderFree(reader); +} + static void test_throughput(void) { int i, num; long long t1, t2; @@ -513,8 +605,9 @@ int main(int argc, char **argv) { signal(SIGPIPE, SIG_IGN); test_format_commands(); - test_blocking_connection(); test_reply_reader(); + test_reader_functions(); + test_blocking_connection(); // test_nonblocking_connection(); test_throughput(); cleanup(); -- cgit v1.2.3