From e621f31306d0fd7592a98dad44f3d6f0eaeb84db Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 20 Sep 2010 09:50:19 +0200 Subject: Expose API for streaming bytes to a reply --- hiredis.c | 138 +++++++++++++++++++++++++++++++++++++++----------------------- hiredis.h | 3 ++ 2 files changed, 91 insertions(+), 50 deletions(-) diff --git a/hiredis.c b/hiredis.c index 611a122..560657f 100644 --- a/hiredis.c +++ b/hiredis.c @@ -38,14 +38,12 @@ #include "sds.h" typedef struct redisReader { - char *buf; /* read buffer */ - int len; /* buffer length */ - int avail; /* available bytes for consumption */ - int pos; /* buffer cursor */ + sds buf; /* read buffer */ + unsigned int pos; /* buffer cursor */ redisReply **rlist; /* list of items to process */ - int rlen; /* list length */ - int rpos; /* list cursor */ + unsigned int rlen; /* list length */ + unsigned int rpos; /* list cursor */ } redisReader; static redisReply *redisReadReply(int fd); @@ -105,9 +103,9 @@ static redisReply *redisIOError(void) { return createReplyObject(REDIS_REPLY_ERROR,sdsnew("I/O error")); } -static char *readBytes(redisReader *r, int bytes) { +static char *readBytes(redisReader *r, unsigned int bytes) { char *p; - if (r->len-r->pos >= bytes) { + if (sdslen(r->buf)-r->pos >= bytes) { p = r->buf+r->pos; r->pos += bytes; return p; @@ -274,56 +272,96 @@ static int processItem(redisReader *r) { #define READ_BUFFER_SIZE 2048 static redisReply *redisReadReply(int fd) { + void *reader = redisCreateReplyReader(); redisReply *reply; - redisReader r; - int bytes; - - /* setup read buffer */ - r.buf = malloc(READ_BUFFER_SIZE+1); - r.len = READ_BUFFER_SIZE; - r.avail = 0; - r.pos = 0; - - /* setup list of items to process */ - r.rlist = malloc(sizeof(redisReply*)); - r.rlist[0] = createReplyObject(-1,NULL); - r.rlen = 1; - r.rpos = 0; - - while (r.rpos < r.rlen) { - /* discard the buffer upto pos */ - if (r.pos > 0) { - memmove(r.buf,r.buf+r.pos,r.len-r.pos); - r.avail -= r.pos; - r.pos = 0; + char buf[1024]; + int nread; + + do { + if ((nread = read(fd,buf,sizeof(buf))) <= 0) { + reply = redisIOError(); + break; + } else { + reply = redisFeedReplyReader(reader,buf,nread); } + } while (reply == NULL); + + redisFreeReplyReader(reader); + return reply; +} + +void *redisCreateReplyReader() { + redisReader *r = calloc(sizeof(redisReader),1); + r->buf = sdsempty(); + return r; +} + +void redisFreeReplyReader(void *reader) { + redisReader *r = reader; + if (r->buf != NULL) { + sdsfree(r->buf); + } + if (r->rlen > 0) { + freeReplyObject(r->rlist[0]); + free(r->rlist); + } + free(r); +} + +void *redisFeedReplyReader(void *reader, char *buf, int len) { + redisReader *r = reader; + + /* Check if we are able to do *something*. */ + if (sdslen(r->buf) == 0 && (buf == NULL || len <= 0)) + return NULL; + + /* Copy the provided buffer. */ + if (buf != NULL && len >= 1) + r->buf = sdscatlen(r->buf,buf,len); + + /* Create first item to process when the item list is empty. */ + if (r->rlen == 0) { + r->rlist = malloc(sizeof(redisReply*)); + r->rlist[0] = createReplyObject(-1,NULL); + r->rlen = 1; + r->rpos = 0; + } - /* make sure there is room for at least BUFFER_SIZE */ - if (r.len-r.avail < READ_BUFFER_SIZE) { - r.buf = realloc(r.buf,r.avail+READ_BUFFER_SIZE+1); - r.len = r.avail+READ_BUFFER_SIZE; + /* Process items in reply. */ + while (r->rpos < r->rlen) + if (processItem(r) < 0) + break; + + /* Discard the consumed part of the buffer. */ + if (r->pos > 0) { + if (r->pos == sdslen(r->buf)) { + /* sdsrange has a quirck on this edge case. */ + sdsfree(r->buf); + r->buf = sdsempty(); + } else { + r->buf = sdsrange(r->buf,r->pos,sdslen(r->buf)); } + r->pos = 0; + } + + /* Emit a reply when there is one. */ + if (r->rpos == r->rlen) { + redisReply *reply = r->rlist[0]; - /* read from socket into buffer */ - if ((bytes = read(fd,r.buf+r.avail,READ_BUFFER_SIZE)) <= 0) { - /* rlist[0] is the "root" reply object */ - freeReplyObject(r.rlist[0]); - free(r.buf); - free(r.rlist); - return redisIOError(); + /* Destroy the buffer when it is empty and is quite large. */ + if (sdslen(r->buf) == 0 && sdsavail(r->buf) > 16*1024) { + sdsfree(r->buf); + r->buf = sdsempty(); + r->pos = 0; } - r.avail += bytes; - r.buf[r.avail] = '\0'; - /* process items in reply */ - while (r.rpos < r.rlen) - if (processItem(&r) < 0) - break; + /* Free list of items to process. */ + free(r->rlist); + r->rlen = r->rpos = 0; + return reply; + } else { + return NULL; } - reply = r.rlist[0]; - free(r.buf); - free(r.rlist); - return reply; } /* Helper function for redisCommand(). It's used to append the next argument diff --git a/hiredis.h b/hiredis.h index 3c76407..6169b7d 100644 --- a/hiredis.h +++ b/hiredis.h @@ -51,5 +51,8 @@ typedef struct redisReply { redisReply *redisConnect(int *fd, const char *ip, int port); void freeReplyObject(redisReply *r); redisReply *redisCommand(int fd, const char *format, ...); +void *redisCreateReplyReader(); +void redisFreeReplyReader(void *ptr); +void *redisFeedReplyReader(void *reader, char *buf, int len); #endif -- cgit v1.2.3