summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPieter Noordhuis <pcnoordhuis@gmail.com>2010-09-20 09:50:19 +0200
committerPieter Noordhuis <pcnoordhuis@gmail.com>2010-09-20 09:50:19 +0200
commite621f31306d0fd7592a98dad44f3d6f0eaeb84db (patch)
tree526a5c8660a41185938c035b57360dcab4f4de48
parent4ec97f5907e997e64071a52b6e5db2371ad84ab4 (diff)
Expose API for streaming bytes to a reply
-rw-r--r--hiredis.c138
-rw-r--r--hiredis.h3
2 files changed, 91 insertions, 50 deletions
diff --git a/hiredis.c b/hiredis.c
index 611a122..560657f 100644
--- a/hiredis.c
+++ b/hiredis.c
@@ -38,14 +38,12 @@
#include "sds.h"
typedef struct redisReader {
- char *buf; /* read buffer */
- int len; /* buffer length */
- int avail; /* available bytes for consumption */
- int pos; /* buffer cursor */
+ sds buf; /* read buffer */
+ unsigned int pos; /* buffer cursor */
redisReply **rlist; /* list of items to process */
- int rlen; /* list length */
- int rpos; /* list cursor */
+ unsigned int rlen; /* list length */
+ unsigned int rpos; /* list cursor */
} redisReader;
static redisReply *redisReadReply(int fd);
@@ -105,9 +103,9 @@ static redisReply *redisIOError(void) {
return createReplyObject(REDIS_REPLY_ERROR,sdsnew("I/O error"));
}
-static char *readBytes(redisReader *r, int bytes) {
+static char *readBytes(redisReader *r, unsigned int bytes) {
char *p;
- if (r->len-r->pos >= bytes) {
+ if (sdslen(r->buf)-r->pos >= bytes) {
p = r->buf+r->pos;
r->pos += bytes;
return p;
@@ -274,56 +272,96 @@ static int processItem(redisReader *r) {
#define READ_BUFFER_SIZE 2048
static redisReply *redisReadReply(int fd) {
+ void *reader = redisCreateReplyReader();
redisReply *reply;
- redisReader r;
- int bytes;
-
- /* setup read buffer */
- r.buf = malloc(READ_BUFFER_SIZE+1);
- r.len = READ_BUFFER_SIZE;
- r.avail = 0;
- r.pos = 0;
-
- /* setup list of items to process */
- r.rlist = malloc(sizeof(redisReply*));
- r.rlist[0] = createReplyObject(-1,NULL);
- r.rlen = 1;
- r.rpos = 0;
-
- while (r.rpos < r.rlen) {
- /* discard the buffer upto pos */
- if (r.pos > 0) {
- memmove(r.buf,r.buf+r.pos,r.len-r.pos);
- r.avail -= r.pos;
- r.pos = 0;
+ char buf[1024];
+ int nread;
+
+ do {
+ if ((nread = read(fd,buf,sizeof(buf))) <= 0) {
+ reply = redisIOError();
+ break;
+ } else {
+ reply = redisFeedReplyReader(reader,buf,nread);
}
+ } while (reply == NULL);
+
+ redisFreeReplyReader(reader);
+ return reply;
+}
+
+void *redisCreateReplyReader() {
+ redisReader *r = calloc(sizeof(redisReader),1);
+ r->buf = sdsempty();
+ return r;
+}
+
+void redisFreeReplyReader(void *reader) {
+ redisReader *r = reader;
+ if (r->buf != NULL) {
+ sdsfree(r->buf);
+ }
+ if (r->rlen > 0) {
+ freeReplyObject(r->rlist[0]);
+ free(r->rlist);
+ }
+ free(r);
+}
+
+void *redisFeedReplyReader(void *reader, char *buf, int len) {
+ redisReader *r = reader;
+
+ /* Check if we are able to do *something*. */
+ if (sdslen(r->buf) == 0 && (buf == NULL || len <= 0))
+ return NULL;
+
+ /* Copy the provided buffer. */
+ if (buf != NULL && len >= 1)
+ r->buf = sdscatlen(r->buf,buf,len);
+
+ /* Create first item to process when the item list is empty. */
+ if (r->rlen == 0) {
+ r->rlist = malloc(sizeof(redisReply*));
+ r->rlist[0] = createReplyObject(-1,NULL);
+ r->rlen = 1;
+ r->rpos = 0;
+ }
- /* make sure there is room for at least BUFFER_SIZE */
- if (r.len-r.avail < READ_BUFFER_SIZE) {
- r.buf = realloc(r.buf,r.avail+READ_BUFFER_SIZE+1);
- r.len = r.avail+READ_BUFFER_SIZE;
+ /* Process items in reply. */
+ while (r->rpos < r->rlen)
+ if (processItem(r) < 0)
+ break;
+
+ /* Discard the consumed part of the buffer. */
+ if (r->pos > 0) {
+ if (r->pos == sdslen(r->buf)) {
+ /* sdsrange has a quirck on this edge case. */
+ sdsfree(r->buf);
+ r->buf = sdsempty();
+ } else {
+ r->buf = sdsrange(r->buf,r->pos,sdslen(r->buf));
}
+ r->pos = 0;
+ }
+
+ /* Emit a reply when there is one. */
+ if (r->rpos == r->rlen) {
+ redisReply *reply = r->rlist[0];
- /* read from socket into buffer */
- if ((bytes = read(fd,r.buf+r.avail,READ_BUFFER_SIZE)) <= 0) {
- /* rlist[0] is the "root" reply object */
- freeReplyObject(r.rlist[0]);
- free(r.buf);
- free(r.rlist);
- return redisIOError();
+ /* Destroy the buffer when it is empty and is quite large. */
+ if (sdslen(r->buf) == 0 && sdsavail(r->buf) > 16*1024) {
+ sdsfree(r->buf);
+ r->buf = sdsempty();
+ r->pos = 0;
}
- r.avail += bytes;
- r.buf[r.avail] = '\0';
- /* process items in reply */
- while (r.rpos < r.rlen)
- if (processItem(&r) < 0)
- break;
+ /* Free list of items to process. */
+ free(r->rlist);
+ r->rlen = r->rpos = 0;
+ return reply;
+ } else {
+ return NULL;
}
- reply = r.rlist[0];
- free(r.buf);
- free(r.rlist);
- return reply;
}
/* Helper function for redisCommand(). It's used to append the next argument
diff --git a/hiredis.h b/hiredis.h
index 3c76407..6169b7d 100644
--- a/hiredis.h
+++ b/hiredis.h
@@ -51,5 +51,8 @@ typedef struct redisReply {
redisReply *redisConnect(int *fd, const char *ip, int port);
void freeReplyObject(redisReply *r);
redisReply *redisCommand(int fd, const char *format, ...);
+void *redisCreateReplyReader();
+void redisFreeReplyReader(void *ptr);
+void *redisFeedReplyReader(void *reader, char *buf, int len);
#endif