diff options
| author | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-09-20 14:05:23 +0200 | 
|---|---|---|
| committer | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-09-20 14:05:23 +0200 | 
| commit | e944ea366225d4a925cd3cd6ab927fa671ba6f1d (patch) | |
| tree | 1b93bcca264cbd4fa0f7fcac960ef0e2287909b3 | |
| parent | aec1fbd2ad9bb4505ae0ba915740802baee76ebc (diff) | |
| download | hiredict-e944ea366225d4a925cd3cd6ab927fa671ba6f1d.tar.xz | |
Add reply type for protocol errors, in order to never exit()
| -rw-r--r-- | hiredis.c | 45 | ||||
| -rw-r--r-- | hiredis.h | 1 | ||||
| -rw-r--r-- | test.c | 23 | 
3 files changed, 62 insertions, 7 deletions
| @@ -48,6 +48,8 @@ typedef struct redisReader {  static redisReply *redisReadReply(int fd);  static redisReply *createReplyObject(int type, sds reply); +static redisReply *createErrorObject(const char *fmt, ...); +static void redisSetReplyReaderError(redisReader *r, redisReply *error);  /* We simply abort on out of memory */  static void redisOOM(void) { @@ -64,7 +66,7 @@ redisReply *redisConnect(int *fd, const char *ip, int port) {      *fd = anetTcpConnect(err,ip,port);      if (*fd == ANET_ERR) -        return createReplyObject(REDIS_REPLY_ERROR,sdsnew(err)); +        return createErrorObject(err);      anetTcpNoDelay(NULL,*fd);      return NULL;  } @@ -99,8 +101,19 @@ void freeReplyObject(redisReply *r) {      free(r);  } +static redisReply *createErrorObject(const char *fmt, ...) { +    va_list ap; +    sds err; +    redisReply *r; +    va_start(ap,fmt); +    err = sdscatvprintf(sdsempty(),fmt,ap); +    va_end(ap); +    r = createReplyObject(REDIS_PROTOCOL_ERROR,err); +    return r; +} +  static redisReply *redisIOError(void) { -    return createReplyObject(REDIS_REPLY_ERROR,sdsnew("I/O error")); +    return createErrorObject("I/O error");  }  static char *readBytes(redisReader *r, unsigned int bytes) { @@ -224,6 +237,7 @@ static int processMultiBulkItem(redisReader *r) {  static int processItem(redisReader *r) {      redisReply *cur = r->rlist[r->rpos];      char *p; +    sds byte;      /* check if we need to read type */      if (cur->type < 0) { @@ -245,8 +259,11 @@ static int processItem(redisReader *r) {                  cur->type = REDIS_REPLY_ARRAY;                  break;              default: -                printf("protocol error, got '%c' as reply type byte\n", p[0]); -                exit(1); +                byte = sdscatrepr(sdsempty(),p,1); +                redisSetReplyReaderError(r,createErrorObject( +                    "protocol error, got %s as reply type byte", byte)); +                sdsfree(byte); +                return -1;              }          } else {              /* could not consume 1 byte */ @@ -265,8 +282,9 @@ static int processItem(redisReader *r) {      case REDIS_REPLY_ARRAY:          return processMultiBulkItem(r);      default: -        printf("unknown item type: %d\n", cur->type); -        exit(1); +        redisSetReplyReaderError(r,createErrorObject( +            "unknown item type '%d'", cur->type)); +        return -1;      }  } @@ -308,6 +326,20 @@ void redisFreeReplyReader(void *reader) {      free(r);  } +static void redisSetReplyReaderError(redisReader *r, redisReply *error) { +    /* Clear remaining buffer when we see a protocol error. */ +    if (r->buf != NULL) { +        sdsfree(r->buf); +        r->buf = sdsempty(); +        r->pos = 0; +    } +    /* Clear currently allocated objects. */ +    if (r->rlist[0] != NULL) +        freeReplyObject(r->rlist[0]); +    r->rlen = r->rpos = 1; +    r->rlist[0] = error; +} +  void *redisFeedReplyReader(void *reader, char *buf, int len) {      redisReader *r = reader; @@ -357,6 +389,7 @@ void *redisFeedReplyReader(void *reader, char *buf, int len) {          /* Free list of items to process. */          free(r->rlist); +        r->rlist = NULL;          r->rlen = r->rpos = 0;          return reply;      } else { @@ -36,6 +36,7 @@  #define REDIS_REPLY_INTEGER 3  #define REDIS_REPLY_NIL 4  #define REDIS_REPLY_STATUS 5 +#define REDIS_PROTOCOL_ERROR 6  #include "sds.h" @@ -29,11 +29,12 @@ int main(void) {      int i, tests = 0, fails = 0;      long long t1, t2;      redisReply *reply; +    void *reader;      __connect(&fd);      test("Returns I/O error when the connection is lost: ");      reply = redisCommand(fd,"QUIT"); -    test_cond(reply->type == REDIS_REPLY_ERROR && +    test_cond(reply->type == REDIS_PROTOCOL_ERROR &&          strcasecmp(reply->reply,"i/o error") == 0);      freeReplyObject(reply);      __connect(&fd); /* reconnect */ @@ -122,6 +123,26 @@ int main(void) {                strcasecmp(reply->element[1]->reply,"pong") == 0);      freeReplyObject(reply); +    test("Error handling in reply parser: "); +    reader = redisCreateReplyReader(); +    reply = redisFeedReplyReader(reader,(char*)"@foo\r\n",6); +    test_cond(reply->type == REDIS_PROTOCOL_ERROR && +              strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0); +    freeReplyObject(reply); +    redisFreeReplyReader(reader); + +    /* when the reply already contains multiple items, they must be free'd +     * on an error. valgrind will bark when this doesn't happen. */ +    test("Memory cleanup in reply parser: "); +    reader = redisCreateReplyReader(); +    redisFeedReplyReader(reader,(char*)"*2\r\n",4); +    redisFeedReplyReader(reader,(char*)"$5\r\nhello\r\n",11); +    reply = redisFeedReplyReader(reader,(char*)"@foo\r\n",6); +    test_cond(reply->type == REDIS_PROTOCOL_ERROR && +              strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0); +    freeReplyObject(reply); +    redisFreeReplyReader(reader); +      test("Throughput:\n");      for (i = 0; i < 500; i++)          freeReplyObject(redisCommand(fd,"LPUSH mylist foo")); | 
