diff options
author | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-09-20 14:05:23 +0200 |
---|---|---|
committer | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-09-20 14:05:23 +0200 |
commit | e944ea366225d4a925cd3cd6ab927fa671ba6f1d (patch) | |
tree | 1b93bcca264cbd4fa0f7fcac960ef0e2287909b3 | |
parent | aec1fbd2ad9bb4505ae0ba915740802baee76ebc (diff) |
Add reply type for protocol errors, in order to never exit()
-rw-r--r-- | hiredis.c | 45 | ||||
-rw-r--r-- | hiredis.h | 1 | ||||
-rw-r--r-- | test.c | 23 |
3 files changed, 62 insertions, 7 deletions
@@ -48,6 +48,8 @@ typedef struct redisReader { static redisReply *redisReadReply(int fd); static redisReply *createReplyObject(int type, sds reply); +static redisReply *createErrorObject(const char *fmt, ...); +static void redisSetReplyReaderError(redisReader *r, redisReply *error); /* We simply abort on out of memory */ static void redisOOM(void) { @@ -64,7 +66,7 @@ redisReply *redisConnect(int *fd, const char *ip, int port) { *fd = anetTcpConnect(err,ip,port); if (*fd == ANET_ERR) - return createReplyObject(REDIS_REPLY_ERROR,sdsnew(err)); + return createErrorObject(err); anetTcpNoDelay(NULL,*fd); return NULL; } @@ -99,8 +101,19 @@ void freeReplyObject(redisReply *r) { free(r); } +static redisReply *createErrorObject(const char *fmt, ...) { + va_list ap; + sds err; + redisReply *r; + va_start(ap,fmt); + err = sdscatvprintf(sdsempty(),fmt,ap); + va_end(ap); + r = createReplyObject(REDIS_PROTOCOL_ERROR,err); + return r; +} + static redisReply *redisIOError(void) { - return createReplyObject(REDIS_REPLY_ERROR,sdsnew("I/O error")); + return createErrorObject("I/O error"); } static char *readBytes(redisReader *r, unsigned int bytes) { @@ -224,6 +237,7 @@ static int processMultiBulkItem(redisReader *r) { static int processItem(redisReader *r) { redisReply *cur = r->rlist[r->rpos]; char *p; + sds byte; /* check if we need to read type */ if (cur->type < 0) { @@ -245,8 +259,11 @@ static int processItem(redisReader *r) { cur->type = REDIS_REPLY_ARRAY; break; default: - printf("protocol error, got '%c' as reply type byte\n", p[0]); - exit(1); + byte = sdscatrepr(sdsempty(),p,1); + redisSetReplyReaderError(r,createErrorObject( + "protocol error, got %s as reply type byte", byte)); + sdsfree(byte); + return -1; } } else { /* could not consume 1 byte */ @@ -265,8 +282,9 @@ static int processItem(redisReader *r) { case REDIS_REPLY_ARRAY: return processMultiBulkItem(r); default: - printf("unknown item type: %d\n", cur->type); - exit(1); + redisSetReplyReaderError(r,createErrorObject( + "unknown item type '%d'", cur->type)); + return -1; } } @@ -308,6 +326,20 @@ void redisFreeReplyReader(void *reader) { free(r); } +static void redisSetReplyReaderError(redisReader *r, redisReply *error) { + /* Clear remaining buffer when we see a protocol error. */ + if (r->buf != NULL) { + sdsfree(r->buf); + r->buf = sdsempty(); + r->pos = 0; + } + /* Clear currently allocated objects. */ + if (r->rlist[0] != NULL) + freeReplyObject(r->rlist[0]); + r->rlen = r->rpos = 1; + r->rlist[0] = error; +} + void *redisFeedReplyReader(void *reader, char *buf, int len) { redisReader *r = reader; @@ -357,6 +389,7 @@ void *redisFeedReplyReader(void *reader, char *buf, int len) { /* Free list of items to process. */ free(r->rlist); + r->rlist = NULL; r->rlen = r->rpos = 0; return reply; } else { @@ -36,6 +36,7 @@ #define REDIS_REPLY_INTEGER 3 #define REDIS_REPLY_NIL 4 #define REDIS_REPLY_STATUS 5 +#define REDIS_PROTOCOL_ERROR 6 #include "sds.h" @@ -29,11 +29,12 @@ int main(void) { int i, tests = 0, fails = 0; long long t1, t2; redisReply *reply; + void *reader; __connect(&fd); test("Returns I/O error when the connection is lost: "); reply = redisCommand(fd,"QUIT"); - test_cond(reply->type == REDIS_REPLY_ERROR && + test_cond(reply->type == REDIS_PROTOCOL_ERROR && strcasecmp(reply->reply,"i/o error") == 0); freeReplyObject(reply); __connect(&fd); /* reconnect */ @@ -122,6 +123,26 @@ int main(void) { strcasecmp(reply->element[1]->reply,"pong") == 0); freeReplyObject(reply); + test("Error handling in reply parser: "); + reader = redisCreateReplyReader(); + reply = redisFeedReplyReader(reader,(char*)"@foo\r\n",6); + test_cond(reply->type == REDIS_PROTOCOL_ERROR && + strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0); + freeReplyObject(reply); + redisFreeReplyReader(reader); + + /* when the reply already contains multiple items, they must be free'd + * on an error. valgrind will bark when this doesn't happen. */ + test("Memory cleanup in reply parser: "); + reader = redisCreateReplyReader(); + redisFeedReplyReader(reader,(char*)"*2\r\n",4); + redisFeedReplyReader(reader,(char*)"$5\r\nhello\r\n",11); + reply = redisFeedReplyReader(reader,(char*)"@foo\r\n",6); + test_cond(reply->type == REDIS_PROTOCOL_ERROR && + strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0); + freeReplyObject(reply); + redisFreeReplyReader(reader); + test("Throughput:\n"); for (i = 0; i < 500; i++) freeReplyObject(redisCommand(fd,"LPUSH mylist foo")); |