diff options
| author | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-09-19 18:47:05 +0200 | 
|---|---|---|
| committer | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-09-19 18:47:05 +0200 | 
| commit | 457cdbf7c5f98c7ac5b4d727a1bad0c041f7cc66 (patch) | |
| tree | 4bf2ef4d1e1a83142ff49a8956a3076dcea67a17 | |
| parent | 66036d113eb5ea45e61ad4e672377996c081330c (diff) | |
| download | hiredict-457cdbf7c5f98c7ac5b4d727a1bad0c041f7cc66.tar.xz | |
Rewrite reply parsing to use a read buffer
| -rw-r--r-- | hiredis.c | 286 | ||||
| -rw-r--r-- | hiredis.h | 1 | ||||
| -rw-r--r-- | test.c | 17 | 
3 files changed, 222 insertions, 82 deletions
| @@ -37,6 +37,17 @@  #include "anet.h"  #include "sds.h" +typedef struct redisReader { +    char *buf; /* read buffer */ +    int len; /* buffer length */ +    int avail; /* available bytes for consumption */ +    int pos; /* buffer cursor */ + +    redisReply **rlist; /* list of items to process */ +    int rlen; /* list length */ +    int rpos; /* list cursor */ +} redisReader; +  static redisReply *redisReadReply(int fd);  static redisReply *createReplyObject(int type, sds reply); @@ -93,107 +104,218 @@ static redisReply *redisIOError(void) {      return createReplyObject(REDIS_REPLY_ERROR,sdsnew("I/O error"));  } -/* In a real high performance C client this should be bufferized */ -static sds redisReadLine(int fd) { -    sds line = sdsempty(); +static char *readBytes(redisReader *r, int bytes) { +    char *p; +    if (r->len-r->pos >= bytes) { +        p = r->buf+r->pos; +        r->pos += bytes; +        return p; +    } +    return NULL; +} + +static char *readLine(redisReader *r, int *_len) { +    char *p, *s = strstr(r->buf+r->pos,"\r\n"); +    int len; +    if (s != NULL) { +        p = r->buf+r->pos; +        len = s-(r->buf+r->pos); +        r->pos += len+2; /* skip \r\n */ +        if (_len) *_len = len; +        return p; +    } +    return NULL; +} -    while(1) { -        char c; -        ssize_t ret; +static int processLineItem(redisReader *r) { +    redisReply *cur = r->rlist[r->rpos]; +    char *p; +    int len; -        ret = read(fd,&c,1); -        if (ret == -1) { -            sdsfree(line); -            return NULL; -        } else if ((ret == 0) || (c == '\n')) { -            break; +    if ((p = readLine(r,&len)) != NULL) { +        if (cur->type == REDIS_REPLY_INTEGER) { +            cur->integer = strtoll(p,NULL,10);          } else { -            line = sdscatlen(line,&c,1); +            cur->reply = sdsnewlen(p,len);          } -    } -    return sdstrim(line,"\r\n"); -} -static redisReply *redisReadSingleLineReply(int fd, int type) { -    sds buf = redisReadLine(fd); -     -    if (buf == NULL) return redisIOError(); -    return createReplyObject(type,buf); +        /* for API compat, set STATUS to STRING */ +        if (cur->type == REDIS_REPLY_STATUS) +            cur->type = REDIS_REPLY_STRING; + +        r->rpos++; +        return 0; +    } +    return -1;  } -static redisReply *redisReadIntegerReply(int fd) { -    sds buf = redisReadLine(fd); -    redisReply *r = malloc(sizeof(*r)); +static int processBulkItem(redisReader *r) { +    redisReply *cur = r->rlist[r->rpos]; +    char *p; +    int len; + +    if (cur->reply == NULL) { +        if ((p = readLine(r,NULL)) != NULL) { +            len = atoi(p); +            if (len == -1) { +                /* nil means this item is done */ +                cur->type = REDIS_REPLY_NIL; +                cur->reply = sdsempty(); +                r->rpos++; +                return 0; +            } else { +                cur->reply = sdsnewlen(NULL,len); +            } +        } else { +            return -1; +        } +    } -    if (r == NULL) redisOOM(); -    if (buf == NULL) { -        free(r); -        return redisIOError(); +    len = sdslen(cur->reply); +    /* add two bytes for crlf */ +    if ((p = readBytes(r,len+2)) != NULL) { +        memcpy(cur->reply,p,len); +        r->rpos++; +        return 0;      } -    r->type = REDIS_REPLY_INTEGER; -    r->integer = strtoll(buf,NULL,10); -    sdsfree(buf); -    return r; +    return -1;  } -static redisReply *redisReadBulkReply(int fd) { -    sds replylen = redisReadLine(fd); -    sds buf; -    char crlf[2]; -    int bulklen; - -    if (replylen == NULL) return redisIOError(); -    bulklen = atoi(replylen); -    sdsfree(replylen); -    if (bulklen == -1) -        return createReplyObject(REDIS_REPLY_NIL,sdsempty()); - -    buf = sdsnewlen(NULL,bulklen); -    anetRead(fd,buf,bulklen); -    anetRead(fd,crlf,2); -    return createReplyObject(REDIS_REPLY_STRING,buf); -} +static int processMultiBulkItem(redisReader *r) { +    redisReply *cur = r->rlist[r->rpos]; +    char *p; +    int elements, j; + +    if ((p = readLine(r,NULL)) != NULL) { +        elements = atoi(p); +        if (elements == -1) { +            /* empty */ +            cur->type = REDIS_REPLY_NIL; +            cur->reply = sdsempty(); +            r->rpos++; +            return 0; +        } +    } else { +        return -1; +    } -static redisReply *redisReadMultiBulkReply(int fd) { -    sds replylen = redisReadLine(fd); -    long elements, j; -    redisReply *r; +    cur->elements = elements; +    r->rlen += elements; +    r->rpos++; + +    /* create placeholder items */ +    if ((cur->element = malloc(sizeof(redisReply*)*elements)) == NULL) +        redisOOM(); +    if ((r->rlist = realloc(r->rlist,sizeof(redisReply*)*r->rlen)) == NULL) +        redisOOM(); + +    /* move existing items backwards */ +    memmove(&(r->rlist[r->rpos+elements]), +            &(r->rlist[r->rpos]), +            (r->rlen-(r->rpos+elements))*sizeof(redisReply*)); + +    /* populate item list */ +    for (j = 0; j < elements; j++) { +        cur->element[j] = createReplyObject(-1,NULL); +        r->rlist[r->rpos+j] = cur->element[j]; +    } +    return 0; +} -    if (replylen == NULL) return redisIOError(); -    elements = strtol(replylen,NULL,10); -    sdsfree(replylen); +static int processItem(redisReader *r) { +    redisReply *cur = r->rlist[r->rpos]; +    char *p; -    if (elements == -1) -        return createReplyObject(REDIS_REPLY_NIL,sdsempty()); +    /* check if we need to read type */ +    if (cur->type < 0) { +        if ((p = readBytes(r,1)) != NULL) { +            switch (p[0]) { +            case '-': +                cur->type = REDIS_REPLY_ERROR; +                break; +            case '+': +                cur->type = REDIS_REPLY_STATUS; +                break; +            case ':': +                cur->type = REDIS_REPLY_INTEGER; +                break; +            case '$': +                cur->type = REDIS_REPLY_STRING; +                break; +            case '*': +                cur->type = REDIS_REPLY_ARRAY; +                break; +            default: +                printf("protocol error, got '%c' as reply type byte\n", p[0]); +                exit(1); +            } +        } else { +            /* could not consume 1 byte */ +            return -1; +        } +    } -    if ((r = malloc(sizeof(*r))) == NULL) redisOOM(); -    r->type = REDIS_REPLY_ARRAY; -    r->elements = elements; -    if ((r->element = malloc(sizeof(*r)*elements)) == NULL) redisOOM(); -    for (j = 0; j < elements; j++) -        r->element[j] = redisReadReply(fd); -    return r; +    /* process typed item */ +    switch(cur->type) { +    case REDIS_REPLY_ERROR: +    case REDIS_REPLY_STATUS: +    case REDIS_REPLY_INTEGER: +        return processLineItem(r); +    case REDIS_REPLY_STRING: +        return processBulkItem(r); +    case REDIS_REPLY_ARRAY: +        return processMultiBulkItem(r); +    default: +        printf("unknown item type: %d\n", cur->type); +        exit(1); +    }  } +#define READ_BUFFER_SIZE 2048  static redisReply *redisReadReply(int fd) { -    char type; - -    if (anetRead(fd,&type,1) <= 0) return redisIOError(); -    switch(type) { -    case '-': -        return redisReadSingleLineReply(fd,REDIS_REPLY_ERROR); -    case '+': -        return redisReadSingleLineReply(fd,REDIS_REPLY_STRING); -    case ':': -        return redisReadIntegerReply(fd); -    case '$': -        return redisReadBulkReply(fd); -    case '*': -        return redisReadMultiBulkReply(fd); -    default: -        printf("protocol error, got '%c' as reply type byte\n", type); -        exit(1); +    redisReader r; +    int bytes; + +    /* setup read buffer */ +    r.buf = malloc(READ_BUFFER_SIZE+1); +    r.len = READ_BUFFER_SIZE; +    r.avail = 0; +    r.pos = 0; + +    /* setup list of items to process */ +    r.rlist = malloc(sizeof(redisReply*)); +    r.rlist[0] = createReplyObject(-1,NULL); +    r.rlen = 1; +    r.rpos = 0; + +    while (r.rpos < r.rlen) { +        /* discard the buffer upto pos */ +        if (r.pos > 0) { +            memmove(r.buf,r.buf+r.pos,r.len-r.pos); +            r.avail -= r.pos; +            r.pos = 0; +        } + +        /* make sure there is room for at least BUFFER_SIZE */ +        if (r.len-r.avail < READ_BUFFER_SIZE) { +            r.buf = realloc(r.buf,r.avail+READ_BUFFER_SIZE+1); +            r.len = r.avail+READ_BUFFER_SIZE; +        } + +        /* read from socket into buffer */ +        if ((bytes = read(fd,r.buf+r.avail,READ_BUFFER_SIZE)) <= 0) +            return redisIOError(); +        r.avail += bytes; +        r.buf[r.avail] = '\0'; + +        /* process items in reply */ +        while (r.rpos < r.rlen) +            if (processItem(&r) < 0) +                break;      } +    free(r.buf); +    free(r.rlist); +    return r.rlist[0];  }  /* Helper function for redisCommand(). It's used to append the next argument @@ -35,6 +35,7 @@  #define REDIS_REPLY_ARRAY 2  #define REDIS_REPLY_INTEGER 3  #define REDIS_REPLY_NIL 4 +#define REDIS_REPLY_STATUS 5  #include "sds.h" @@ -99,6 +99,23 @@ int main(void) {                !memcmp(reply->element[1]->reply,"foo",3))      freeReplyObject(reply); +    /* test 9 (m/e with multi bulk reply *before* other reply). +     * specifically test ordering of reply items to parse. */ +    printf("#10 can handle nested multi bulk replies: "); +    freeReplyObject(redisCommand(fd,"MULTI")); +    freeReplyObject(redisCommand(fd,"LRANGE mylist 0 -1")); +    freeReplyObject(redisCommand(fd,"PING")); +    reply = (redisCommand(fd,"EXEC")); +    test_cond(reply->type == REDIS_REPLY_ARRAY && +              reply->elements == 2 && +              reply->element[0]->type == REDIS_REPLY_ARRAY && +              reply->element[0]->elements == 2 && +              !memcmp(reply->element[0]->element[0]->reply,"bar",3) && +              !memcmp(reply->element[0]->element[1]->reply,"foo",3) && +              reply->element[1]->type == REDIS_REPLY_STRING && +              strcasecmp(reply->element[1]->reply,"pong") == 0); +    freeReplyObject(reply); +      if (fails == 0) {          printf("ALL TESTS PASSED\n");      } else { | 
