summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--hiredis.c211
-rw-r--r--hiredis.h8
-rw-r--r--test.c25
3 files changed, 139 insertions, 105 deletions
diff --git a/hiredis.c b/hiredis.c
index b914d21..5cfd815 100644
--- a/hiredis.c
+++ b/hiredis.c
@@ -41,6 +41,7 @@
typedef struct redisReader {
struct redisReplyObjectFunctions *fn;
+ sds error; /* holds optional error */
void *reply; /* holds temporary reply */
sds buf; /* read buffer */
@@ -52,16 +53,14 @@ typedef struct redisReader {
} redisReader;
static redisReply *createReplyObject(int type, sds reply);
-static void *createErrorObject(const char *str, size_t len);
static void *createStringObject(redisReadTask *task, char *str, size_t len);
static void *createArrayObject(redisReadTask *task, int elements);
static void *createIntegerObject(redisReadTask *task, long long value);
static void *createNilObject(redisReadTask *task);
-static void redisSetReplyReaderError(redisReader *r, void *obj);
+static void redisSetReplyReaderError(redisReader *r, sds err);
/* Default set of functions to build the reply. */
static redisReplyFunctions defaultFunctions = {
- createErrorObject,
createStringObject,
createArrayObject,
createIntegerObject,
@@ -106,24 +105,6 @@ void freeReplyObject(void *reply) {
free(r);
}
-/* Helper function that allows printf-like creation of error objects. */
-static void *formatError(redisReplyFunctions *fn, const char *fmt, ...) {
- va_list ap;
- sds err;
- void *obj;
- va_start(ap,fmt);
- err = sdscatvprintf(sdsempty(),fmt,ap);
- va_end(ap);
- obj = fn->createError(err,sdslen(err));
- sdsfree(err);
- return obj;
-}
-
-static void *createErrorObject(const char *str, size_t len) {
- redisReply *r = createReplyObject(REDIS_ERROR,sdsnewlen(str,len));
- return r;
-}
-
static void *createStringObject(redisReadTask *task, char *str, size_t len) {
redisReply *r = createReplyObject(task->type,sdsnewlen(str,len));
assert(task->type == REDIS_REPLY_ERROR ||
@@ -308,7 +289,6 @@ static int processItem(redisReader *r) {
redisReadTask *cur = &(r->rlist[r->rpos]);
char *p;
sds byte;
- void *err;
/* check if we need to read type */
if (cur->type < 0) {
@@ -331,9 +311,8 @@ static int processItem(redisReader *r) {
break;
default:
byte = sdscatrepr(sdsempty(),p,1);
- err = formatError(r->fn,
- "protocol error, got %s as reply type byte", byte);
- redisSetReplyReaderError(r,err);
+ redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
+ "protocol error, got %s as reply type byte", byte));
sdsfree(byte);
return -1;
}
@@ -354,14 +333,15 @@ static int processItem(redisReader *r) {
case REDIS_REPLY_ARRAY:
return processMultiBulkItem(r);
default:
- err = formatError(r->fn,"unknown item type '%d'", cur->type);
- redisSetReplyReaderError(r,err);
+ redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
+ "unknown item type '%d'", cur->type));
return -1;
}
}
void *redisReplyReaderCreate(redisReplyFunctions *fn) {
redisReader *r = calloc(sizeof(redisReader),1);
+ r->error = NULL;
r->fn = fn == NULL ? &defaultFunctions : fn;
r->buf = sdsempty();
r->rlist = malloc(sizeof(redisReadTask)*1);
@@ -379,6 +359,8 @@ void *redisReplyReaderGetObject(void *reader) {
void redisReplyReaderFree(void *reader) {
redisReader *r = reader;
+ if (r->error != NULL)
+ sdsfree(r->error);
if (r->reply != NULL)
r->fn->freeObject(r->reply);
if (r->buf != NULL)
@@ -388,7 +370,7 @@ void redisReplyReaderFree(void *reader) {
free(r);
}
-static void redisSetReplyReaderError(redisReader *r, void *obj) {
+static void redisSetReplyReaderError(redisReader *r, sds err) {
if (r->reply != NULL)
r->fn->freeObject(r->reply);
@@ -399,7 +381,12 @@ static void redisSetReplyReaderError(redisReader *r, void *obj) {
r->pos = 0;
}
r->rlen = r->rpos = 0;
- r->reply = obj;
+ r->error = err;
+}
+
+char *redisReplyReaderGetError(void *reader) {
+ redisReader *r = reader;
+ return r->error;
}
void redisReplyReaderFeed(void *reader, char *buf, int len) {
@@ -410,12 +397,13 @@ void redisReplyReaderFeed(void *reader, char *buf, int len) {
r->buf = sdscatlen(r->buf,buf,len);
}
-void *redisReplyReaderGetReply(void *reader) {
+int redisReplyReaderGetReply(void *reader, void **reply) {
redisReader *r = reader;
+ if (reply != NULL) *reply = NULL;
/* When the buffer is empty, there will never be a reply. */
if (sdslen(r->buf) == 0)
- return NULL;
+ return REDIS_OK;
/* Create first item to process when the item list is empty. */
if (r->rlen == 0) {
@@ -446,7 +434,7 @@ void *redisReplyReaderGetReply(void *reader) {
/* Emit a reply when there is one. */
if (r->rpos == r->rlen) {
- void *reply = r->reply;
+ void *aux = r->reply;
r->reply = NULL;
/* Destroy the buffer when it is empty and is quite large. */
@@ -458,10 +446,15 @@ void *redisReplyReaderGetReply(void *reader) {
/* Set list of items to read to be empty. */
r->rlen = r->rpos = 0;
- return reply;
- } else {
- return NULL;
+
+ /* Check if there actually *is* a reply. */
+ if (r->error != NULL) {
+ return REDIS_ERR;
+ } else {
+ if (reply != NULL) *reply = aux;
+ }
}
+ return REDIS_OK;
}
/* Helper function for redisCommand(). It's used to append the next argument
@@ -571,11 +564,11 @@ static int redisContextConnect(redisContext *c, const char *ip, int port) {
}
if (c->fd == ANET_ERR) {
- c->error = c->fn->createError(err,strlen(err));
+ c->error = sdsnew(err);
return REDIS_ERR;
}
if (anetTcpNoDelay(err,c->fd) == ANET_ERR) {
- c->error = c->fn->createError(err,strlen(err));
+ c->error = sdsnew(err);
return REDIS_ERR;
}
return REDIS_OK;
@@ -623,12 +616,12 @@ int redisBufferRead(redisContext *c) {
/* Try again later */
} else {
/* Set error in context */
- c->error = formatError(c->fn,
+ c->error = sdscatprintf(sdsempty(),
"Error reading from socket: %s", strerror(errno));
return REDIS_ERR;
}
} else if (nread == 0) {
- c->error = formatError(c->fn,
+ c->error = sdscatprintf(sdsempty(),
"Server closed the connection");
return REDIS_ERR;
} else {
@@ -644,21 +637,34 @@ static void redisPopCallback(redisContext *c) {
c->cpos--;
}
-void *redisGetReply(redisContext *c) {
+int redisGetReply(redisContext *c, void **reply) {
redisPopCallback(c);
- return redisReplyReaderGetReply(c->reader);
+ if (redisReplyReaderGetReply(c->reader,reply) == REDIS_ERR) {
+ /* Copy the (protocol) error from the reader to the context. */
+ c->error = sdsnew(((redisReader*)c->reader)->error);
+ return REDIS_ERR;
+ }
+ return REDIS_OK;
}
int redisProcessCallbacks(redisContext *c) {
- void *reply;
+ void *reply = NULL;
redisCallback cb;
- while ((reply = redisReplyReaderGetReply(c->reader)) != NULL) {
+ do {
cb = c->callbacks[0];
- if (cb.fn != NULL)
- cb.fn(c,reply,cb.privdata);
- redisPopCallback(c);
- }
+ if (redisGetReply(c,&reply) == REDIS_ERR)
+ return REDIS_ERR;
+
+ /* Fire callback when there is a reply. */
+ if (reply != NULL) {
+ if (cb.fn != NULL) {
+ cb.fn(c,reply,cb.privdata);
+ } else {
+ c->fn->freeObject(reply);
+ }
+ }
+ } while (reply != NULL);
return REDIS_OK;
}
@@ -671,7 +677,7 @@ int redisBufferWrite(redisContext *c, int *done) {
/* Try again later */
} else {
/* Set error in context */
- c->error = formatError(c->fn,
+ c->error = sdscatprintf(sdsempty(),
"Error writing to socket: %s", strerror(errno));
return REDIS_ERR;
}
@@ -687,73 +693,102 @@ int redisBufferWrite(redisContext *c, int *done) {
return REDIS_OK;
}
-static void* redisCommandWrite(redisContext *c, redisCallback *cb, char *str, size_t len) {
- void *reply = NULL;
+static int redisCommandWriteBlock(redisContext *c, void **reply, char *str, size_t len) {
int wdone = 0;
+ void *aux = NULL;
+ assert(c->flags & HIREDIS_BLOCK);
c->obuf = sdscatlen(c->obuf,str,len);
- /* Read reply immediately when the context is blocking. */
- if (c->flags & HIREDIS_BLOCK) {
- do { /* Write until done. */
- if (redisBufferWrite(c,&wdone) == REDIS_ERR)
- return c->error;
- } while (!wdone);
-
- do { /* Read until there is a reply. */
- if (redisBufferRead(c) == REDIS_ERR)
- return c->error;
- reply = redisGetReply(c);
- } while (reply == NULL);
- } else {
- /* Make room for the callback. */
- assert(c->cpos <= c->clen);
- if (c->cpos == c->clen) {
- c->clen++;
- c->callbacks = realloc(c->callbacks,c->clen*sizeof(redisCallback));
- }
+ /* Write until done. */
+ do {
+ if (redisBufferWrite(c,&wdone) == REDIS_ERR)
+ return REDIS_ERR;
+ } while (!wdone);
- if (cb != NULL) {
- c->callbacks[c->cpos] = *cb;
- } else {
- memset(&c->callbacks[c->cpos],0,sizeof(redisCallback));
- }
- c->cpos++;
+ /* Read until there is a reply. */
+ do {
+ if (redisBufferRead(c) == REDIS_ERR)
+ return REDIS_ERR;
+ if (redisGetReply(c,&aux) == REDIS_ERR)
+ return REDIS_ERR;
+ } while (aux == NULL);
+
+ /* Set reply object. */
+ if (reply != NULL)
+ *reply = aux;
+
+ return REDIS_OK;
+}
+
+static int redisCommandWriteNonBlock(redisContext *c, redisCallback *cb, char *str, size_t len) {
+ assert(!(c->flags & HIREDIS_BLOCK));
+ c->obuf = sdscatlen(c->obuf,str,len);
+
+ /* Make sure there is space for the callback. */
+ assert(c->cpos <= c->clen);
+ if (c->cpos == c->clen) {
+ c->clen++;
+ c->callbacks = realloc(c->callbacks,c->clen*sizeof(redisCallback));
}
- return reply;
+
+ if (cb != NULL) {
+ c->callbacks[c->cpos] = *cb;
+ } else {
+ memset(&c->callbacks[c->cpos],0,sizeof(redisCallback));
+ }
+ c->cpos++;
+
+ return REDIS_OK;
}
-/* Write a formatted command to the output buffer, and, if the context is a
- * blocking connection, read the reply and return it. When this function
- * is called from a non-blocking context, it will always return NULL. */
+/* Write a formatted command to the output buffer. If the given context is
+ * blocking, immediately read the reply into the "reply" pointer. When the
+ * context is non-blocking, the "reply" pointer will not be used and a
+ * NULL callback will be appended to the list of callbacks.
+ *
+ * Returns the reply when a reply was succesfully retrieved. Returns NULL
+ * otherwise. When NULL is returned in a blocking context, provided that
+ * the reply build functions did not return NULL when building the reply,
+ * the error field in the context will be set. */
void *redisCommand(redisContext *c, const char *format, ...) {
va_list ap;
sds cmd;
- void *reply;
+ void *reply = NULL;
va_start(ap,format);
cmd = redisFormatCommand(format,ap);
va_end(ap);
- reply = redisCommandWrite(c,NULL,cmd,sdslen(cmd));
+ if (c->flags & HIREDIS_BLOCK) {
+ if (redisCommandWriteBlock(c,&reply,cmd,sdslen(cmd)) == REDIS_OK) {
+ return reply;
+ }
+ } else {
+ redisCommandWriteNonBlock(c,NULL,cmd,sdslen(cmd));
+ }
sdsfree(cmd);
- return reply;
+ return NULL;
}
-/* Write a formatted command to the output buffer and register the provided
- * callback function and argument. When this function is called in a
- * non-blocking context, it is a no-op. Always returns NULL. */
+/* Write a formatted command to the output buffer. Registers the provided
+ * callback function and argument in the callback list.
+ *
+ * Always returns NULL. In a non-blocking context this will never fail because
+ * this function does not do any I/O. In a blocking context this function will
+ * have no effect (a callback in a blocking context makes no sense). */
void *redisCommandWithCallback(redisContext *c, redisCallbackFn *fn, void *privdata, const char *format, ...) {
va_list ap;
- redisCallback cb = { fn, privdata };
sds cmd;
+ int status;
+ redisCallback cb = { fn, privdata };
- /* Skip if the context is blocking. */
+ /* This function may only be used in a non-blocking context. */
if (c->flags & HIREDIS_BLOCK) return NULL;
va_start(ap,format);
cmd = redisFormatCommand(format,ap);
va_end(ap);
- redisCommandWrite(c,&cb,cmd,sdslen(cmd));
+ status = redisCommandWriteNonBlock(c,&cb,cmd,sdslen(cmd));
sdsfree(cmd);
return NULL;
}
diff --git a/hiredis.h b/hiredis.h
index e57c3ab..e5be725 100644
--- a/hiredis.h
+++ b/hiredis.h
@@ -63,7 +63,6 @@ typedef struct redisReadTask {
} redisReadTask;
typedef struct redisReplyObjectFunctions {
- void *(*createError)(const char*, size_t);
void *(*createString)(redisReadTask*, char*, size_t);
void *(*createArray)(redisReadTask*, int);
void *(*createInteger)(redisReadTask*, long long);
@@ -85,7 +84,7 @@ typedef struct redisCallback {
typedef struct redisContext {
int fd;
int flags;
- void *error; /* Error object is set when in erronous state */
+ sds error; /* Error object is set when in erronous state */
sds obuf; /* Write buffer */
redisReplyFunctions *fn;
void *reader;
@@ -97,15 +96,16 @@ typedef struct redisContext {
void freeReplyObject(void *reply);
void *redisReplyReaderCreate(redisReplyFunctions *fn);
void *redisReplyReaderGetObject(void *reader);
+char *redisReplyReaderGetError(void *reader);
void redisReplyReaderFree(void *ptr);
void redisReplyReaderFeed(void *reader, char *buf, int len);
-void *redisReplyReaderGetReply(void *reader);
+int redisReplyReaderGetReply(void *reader, void **reply);
redisContext *redisConnect(const char *ip, int port, redisReplyFunctions *fn);
redisContext *redisConnectNonBlock(const char *ip, int port, redisReplyFunctions *fn);
int redisBufferRead(redisContext *c);
int redisBufferWrite(redisContext *c, int *done);
-void *redisGetReply(redisContext *c);
+int redisGetReply(redisContext *c, void **reply);
int redisProcessCallbacks(redisContext *c);
void *redisCommand(redisContext *c, const char *format, ...);
diff --git a/test.c b/test.c
index 13de115..fed149e 100644
--- a/test.c
+++ b/test.c
@@ -25,18 +25,17 @@ static void __connect(redisContext **c) {
}
int main(void) {
- int i, tests = 0, fails = 0;
+ int i, ret, tests = 0, fails = 0;
long long t1, t2;
redisContext *c;
redisReply *reply;
void *reader;
+ char *err;
__connect(&c);
test("Returns I/O error when the connection is lost: ");
- reply = redisCommand(c,"QUIT");
- test_cond(reply->type == REDIS_ERROR &&
- strcmp(reply->reply,"Server closed the connection") == 0);
- freeReplyObject(reply);
+ test_cond(redisCommand(c,"QUIT") == NULL &&
+ strcmp(c->error,"Server closed the connection") == 0);
__connect(&c); /* reconnect */
test("Is able to deliver commands: ");
@@ -126,10 +125,10 @@ int main(void) {
test("Error handling in reply parser: ");
reader = redisReplyReaderCreate(NULL);
redisReplyReaderFeed(reader,(char*)"@foo\r\n",6);
- reply = redisReplyReaderGetReply(reader);
- test_cond(reply->type == REDIS_ERROR &&
- strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0);
- freeReplyObject(reply);
+ ret = redisReplyReaderGetReply(reader,(void*)&reply);
+ err = redisReplyReaderGetError(reader);
+ test_cond(ret == REDIS_ERR &&
+ strcasecmp(err,"protocol error, got \"@\" as reply type byte") == 0);
redisReplyReaderFree(reader);
/* when the reply already contains multiple items, they must be free'd
@@ -139,10 +138,10 @@ int main(void) {
redisReplyReaderFeed(reader,(char*)"*2\r\n",4);
redisReplyReaderFeed(reader,(char*)"$5\r\nhello\r\n",11);
redisReplyReaderFeed(reader,(char*)"@foo\r\n",6);
- reply = redisReplyReaderGetReply(reader);
- test_cond(reply->type == REDIS_ERROR &&
- strcasecmp(reply->reply,"protocol error, got \"@\" as reply type byte") == 0);
- freeReplyObject(reply);
+ ret = redisReplyReaderGetReply(reader,(void*)&reply);
+ err = redisReplyReaderGetError(reader);
+ test_cond(ret == REDIS_ERR &&
+ strcasecmp(err,"protocol error, got \"@\" as reply type byte") == 0);
redisReplyReaderFree(reader);
test("Throughput:\n");