diff options
Diffstat (limited to 'hiredis.c')
-rw-r--r-- | hiredis.c | 286 |
1 files changed, 204 insertions, 82 deletions
@@ -37,6 +37,17 @@ #include "anet.h" #include "sds.h" +typedef struct redisReader { + char *buf; /* read buffer */ + int len; /* buffer length */ + int avail; /* available bytes for consumption */ + int pos; /* buffer cursor */ + + redisReply **rlist; /* list of items to process */ + int rlen; /* list length */ + int rpos; /* list cursor */ +} redisReader; + static redisReply *redisReadReply(int fd); static redisReply *createReplyObject(int type, sds reply); @@ -93,107 +104,218 @@ static redisReply *redisIOError(void) { return createReplyObject(REDIS_REPLY_ERROR,sdsnew("I/O error")); } -/* In a real high performance C client this should be bufferized */ -static sds redisReadLine(int fd) { - sds line = sdsempty(); +static char *readBytes(redisReader *r, int bytes) { + char *p; + if (r->len-r->pos >= bytes) { + p = r->buf+r->pos; + r->pos += bytes; + return p; + } + return NULL; +} + +static char *readLine(redisReader *r, int *_len) { + char *p, *s = strstr(r->buf+r->pos,"\r\n"); + int len; + if (s != NULL) { + p = r->buf+r->pos; + len = s-(r->buf+r->pos); + r->pos += len+2; /* skip \r\n */ + if (_len) *_len = len; + return p; + } + return NULL; +} - while(1) { - char c; - ssize_t ret; +static int processLineItem(redisReader *r) { + redisReply *cur = r->rlist[r->rpos]; + char *p; + int len; - ret = read(fd,&c,1); - if (ret == -1) { - sdsfree(line); - return NULL; - } else if ((ret == 0) || (c == '\n')) { - break; + if ((p = readLine(r,&len)) != NULL) { + if (cur->type == REDIS_REPLY_INTEGER) { + cur->integer = strtoll(p,NULL,10); } else { - line = sdscatlen(line,&c,1); + cur->reply = sdsnewlen(p,len); } - } - return sdstrim(line,"\r\n"); -} -static redisReply *redisReadSingleLineReply(int fd, int type) { - sds buf = redisReadLine(fd); - - if (buf == NULL) return redisIOError(); - return createReplyObject(type,buf); + /* for API compat, set STATUS to STRING */ + if (cur->type == REDIS_REPLY_STATUS) + cur->type = REDIS_REPLY_STRING; + + r->rpos++; + return 0; + } + return -1; } -static redisReply *redisReadIntegerReply(int fd) { - sds buf = redisReadLine(fd); - redisReply *r = malloc(sizeof(*r)); +static int processBulkItem(redisReader *r) { + redisReply *cur = r->rlist[r->rpos]; + char *p; + int len; + + if (cur->reply == NULL) { + if ((p = readLine(r,NULL)) != NULL) { + len = atoi(p); + if (len == -1) { + /* nil means this item is done */ + cur->type = REDIS_REPLY_NIL; + cur->reply = sdsempty(); + r->rpos++; + return 0; + } else { + cur->reply = sdsnewlen(NULL,len); + } + } else { + return -1; + } + } - if (r == NULL) redisOOM(); - if (buf == NULL) { - free(r); - return redisIOError(); + len = sdslen(cur->reply); + /* add two bytes for crlf */ + if ((p = readBytes(r,len+2)) != NULL) { + memcpy(cur->reply,p,len); + r->rpos++; + return 0; } - r->type = REDIS_REPLY_INTEGER; - r->integer = strtoll(buf,NULL,10); - sdsfree(buf); - return r; + return -1; } -static redisReply *redisReadBulkReply(int fd) { - sds replylen = redisReadLine(fd); - sds buf; - char crlf[2]; - int bulklen; - - if (replylen == NULL) return redisIOError(); - bulklen = atoi(replylen); - sdsfree(replylen); - if (bulklen == -1) - return createReplyObject(REDIS_REPLY_NIL,sdsempty()); - - buf = sdsnewlen(NULL,bulklen); - anetRead(fd,buf,bulklen); - anetRead(fd,crlf,2); - return createReplyObject(REDIS_REPLY_STRING,buf); -} +static int processMultiBulkItem(redisReader *r) { + redisReply *cur = r->rlist[r->rpos]; + char *p; + int elements, j; + + if ((p = readLine(r,NULL)) != NULL) { + elements = atoi(p); + if (elements == -1) { + /* empty */ + cur->type = REDIS_REPLY_NIL; + cur->reply = sdsempty(); + r->rpos++; + return 0; + } + } else { + return -1; + } -static redisReply *redisReadMultiBulkReply(int fd) { - sds replylen = redisReadLine(fd); - long elements, j; - redisReply *r; + cur->elements = elements; + r->rlen += elements; + r->rpos++; + + /* create placeholder items */ + if ((cur->element = malloc(sizeof(redisReply*)*elements)) == NULL) + redisOOM(); + if ((r->rlist = realloc(r->rlist,sizeof(redisReply*)*r->rlen)) == NULL) + redisOOM(); + + /* move existing items backwards */ + memmove(&(r->rlist[r->rpos+elements]), + &(r->rlist[r->rpos]), + (r->rlen-(r->rpos+elements))*sizeof(redisReply*)); + + /* populate item list */ + for (j = 0; j < elements; j++) { + cur->element[j] = createReplyObject(-1,NULL); + r->rlist[r->rpos+j] = cur->element[j]; + } + return 0; +} - if (replylen == NULL) return redisIOError(); - elements = strtol(replylen,NULL,10); - sdsfree(replylen); +static int processItem(redisReader *r) { + redisReply *cur = r->rlist[r->rpos]; + char *p; - if (elements == -1) - return createReplyObject(REDIS_REPLY_NIL,sdsempty()); + /* check if we need to read type */ + if (cur->type < 0) { + if ((p = readBytes(r,1)) != NULL) { + switch (p[0]) { + case '-': + cur->type = REDIS_REPLY_ERROR; + break; + case '+': + cur->type = REDIS_REPLY_STATUS; + break; + case ':': + cur->type = REDIS_REPLY_INTEGER; + break; + case '$': + cur->type = REDIS_REPLY_STRING; + break; + case '*': + cur->type = REDIS_REPLY_ARRAY; + break; + default: + printf("protocol error, got '%c' as reply type byte\n", p[0]); + exit(1); + } + } else { + /* could not consume 1 byte */ + return -1; + } + } - if ((r = malloc(sizeof(*r))) == NULL) redisOOM(); - r->type = REDIS_REPLY_ARRAY; - r->elements = elements; - if ((r->element = malloc(sizeof(*r)*elements)) == NULL) redisOOM(); - for (j = 0; j < elements; j++) - r->element[j] = redisReadReply(fd); - return r; + /* process typed item */ + switch(cur->type) { + case REDIS_REPLY_ERROR: + case REDIS_REPLY_STATUS: + case REDIS_REPLY_INTEGER: + return processLineItem(r); + case REDIS_REPLY_STRING: + return processBulkItem(r); + case REDIS_REPLY_ARRAY: + return processMultiBulkItem(r); + default: + printf("unknown item type: %d\n", cur->type); + exit(1); + } } +#define READ_BUFFER_SIZE 2048 static redisReply *redisReadReply(int fd) { - char type; - - if (anetRead(fd,&type,1) <= 0) return redisIOError(); - switch(type) { - case '-': - return redisReadSingleLineReply(fd,REDIS_REPLY_ERROR); - case '+': - return redisReadSingleLineReply(fd,REDIS_REPLY_STRING); - case ':': - return redisReadIntegerReply(fd); - case '$': - return redisReadBulkReply(fd); - case '*': - return redisReadMultiBulkReply(fd); - default: - printf("protocol error, got '%c' as reply type byte\n", type); - exit(1); + redisReader r; + int bytes; + + /* setup read buffer */ + r.buf = malloc(READ_BUFFER_SIZE+1); + r.len = READ_BUFFER_SIZE; + r.avail = 0; + r.pos = 0; + + /* setup list of items to process */ + r.rlist = malloc(sizeof(redisReply*)); + r.rlist[0] = createReplyObject(-1,NULL); + r.rlen = 1; + r.rpos = 0; + + while (r.rpos < r.rlen) { + /* discard the buffer upto pos */ + if (r.pos > 0) { + memmove(r.buf,r.buf+r.pos,r.len-r.pos); + r.avail -= r.pos; + r.pos = 0; + } + + /* make sure there is room for at least BUFFER_SIZE */ + if (r.len-r.avail < READ_BUFFER_SIZE) { + r.buf = realloc(r.buf,r.avail+READ_BUFFER_SIZE+1); + r.len = r.avail+READ_BUFFER_SIZE; + } + + /* read from socket into buffer */ + if ((bytes = read(fd,r.buf+r.avail,READ_BUFFER_SIZE)) <= 0) + return redisIOError(); + r.avail += bytes; + r.buf[r.avail] = '\0'; + + /* process items in reply */ + while (r.rpos < r.rlen) + if (processItem(&r) < 0) + break; } + free(r.buf); + free(r.rlist); + return r.rlist[0]; } /* Helper function for redisCommand(). It's used to append the next argument |