summaryrefslogtreecommitdiff
path: root/hiredis.c
diff options
context:
space:
mode:
authorPieter Noordhuis <pcnoordhuis@gmail.com>2010-09-24 18:38:14 +0200
committerPieter Noordhuis <pcnoordhuis@gmail.com>2010-09-24 18:42:22 +0200
commit817d26b81df7028ff8bcb8690dae0cec1319b63a (patch)
treedf16a1c92f3dd4c9e68d18fda2b423477c2c5d2e /hiredis.c
parent81f6b8ffd42460038e3b9498e7a60347eef1fcd4 (diff)
Use context variable to keep state for a connection
The context supports both blocking and non-blocking connections. All read and write events are not handled by hiredis, but hiredis exposes an API for buffered reading/writing to Redis.
Diffstat (limited to 'hiredis.c')
-rw-r--r--hiredis.c203
1 files changed, 148 insertions, 55 deletions
diff --git a/hiredis.c b/hiredis.c
index 6ab8063..6dc3e1d 100644
--- a/hiredis.c
+++ b/hiredis.c
@@ -33,6 +33,7 @@
#include <unistd.h>
#include <stdarg.h>
#include <assert.h>
+#include <errno.h>
#include "hiredis.h"
#include "anet.h"
@@ -50,9 +51,8 @@ typedef struct redisReader {
unsigned int rpos; /* list cursor */
} redisReader;
-static redisReply *redisReadReply(int fd);
static redisReply *createReplyObject(int type, sds reply);
-static void *createErrorObject(redisReader *context, const char *fmt, ...);
+static void *createErrorObject(const char *str, size_t len);
static void *createStringObject(redisReadTask *task, char *str, size_t len);
static void *createArrayObject(redisReadTask *task, int elements);
static void *createIntegerObject(redisReadTask *task, long long value);
@@ -61,6 +61,7 @@ static void redisSetReplyReaderError(redisReader *r, void *obj);
/* Default set of functions to build the reply. */
static redisReplyFunctions defaultFunctions = {
+ createErrorObject,
createStringObject,
createArrayObject,
createIntegerObject,
@@ -74,20 +75,6 @@ static void redisOOM(void) {
exit(1);
}
-/* Connect to a Redis instance. On success NULL is returned and *fd is set
- * to the socket file descriptor. On error a redisReply object is returned
- * with reply->type set to REDIS_REPLY_ERROR and reply->string containing
- * the error message. This replyObject must be freed with redisFreeReply(). */
-redisReply *redisConnect(int *fd, const char *ip, int port) {
- char err[ANET_ERR_LEN];
-
- *fd = anetTcpConnect(err,ip,port);
- if (*fd == ANET_ERR)
- return (redisReply*)createErrorObject(NULL,err);
- anetTcpNoDelay(NULL,*fd);
- return NULL;
-}
-
/* Create a reply object */
static redisReply *createReplyObject(int type, sds reply) {
redisReply *r = calloc(sizeof(*r),1);
@@ -119,28 +106,27 @@ void freeReplyObject(void *reply) {
free(r);
}
-static void *createErrorObject(redisReader *context, const char *fmt, ...) {
+/* Helper function that allows printf-like creation of error objects. */
+static void *formatError(redisReplyFunctions *fn, const char *fmt, ...) {
va_list ap;
sds err;
void *obj;
- redisReadTask t = { REDIS_PROTOCOL_ERROR, NULL, -1 };
va_start(ap,fmt);
err = sdscatvprintf(sdsempty(),fmt,ap);
va_end(ap);
-
- /* Use the context of the reader if it is provided. */
- if (context)
- obj = context->fn->createString(&t,err,sdslen(err));
- else
- obj = createStringObject(&t,err,sdslen(err));
+ obj = fn->createError(err,sdslen(err));
sdsfree(err);
return obj;
}
+static void *createErrorObject(const char *str, size_t len) {
+ redisReply *r = createReplyObject(REDIS_ERROR,sdsnewlen(str,len));
+ return r;
+}
+
static void *createStringObject(redisReadTask *task, char *str, size_t len) {
redisReply *r = createReplyObject(task->type,sdsnewlen(str,len));
- assert(task->type == REDIS_PROTOCOL_ERROR ||
- task->type == REDIS_REPLY_ERROR ||
+ assert(task->type == REDIS_REPLY_ERROR ||
task->type == REDIS_REPLY_STATUS ||
task->type == REDIS_REPLY_STRING);
@@ -322,6 +308,7 @@ static int processItem(redisReader *r) {
redisReadTask *cur = &(r->rlist[r->rpos]);
char *p;
sds byte;
+ void *err;
/* check if we need to read type */
if (cur->type < 0) {
@@ -344,8 +331,9 @@ static int processItem(redisReader *r) {
break;
default:
byte = sdscatrepr(sdsempty(),p,1);
- redisSetReplyReaderError(r,createErrorObject(r,
- "protocol error, got %s as reply type byte", byte));
+ err = formatError(r->fn,
+ "protocol error, got %s as reply type byte", byte);
+ redisSetReplyReaderError(r,err);
sdsfree(byte);
return -1;
}
@@ -366,33 +354,12 @@ static int processItem(redisReader *r) {
case REDIS_REPLY_ARRAY:
return processMultiBulkItem(r);
default:
- redisSetReplyReaderError(r,createErrorObject(r,
- "unknown item type '%d'", cur->type));
+ err = formatError(r->fn,"unknown item type '%d'", cur->type);
+ redisSetReplyReaderError(r,err);
return -1;
}
}
-#define READ_BUFFER_SIZE 2048
-static redisReply *redisReadReply(int fd) {
- void *reader = redisReplyReaderCreate(&defaultFunctions);
- redisReply *reply;
- char buf[1024];
- int nread;
-
- do {
- if ((nread = read(fd,buf,sizeof(buf))) <= 0) {
- reply = createErrorObject(reader,"I/O error");
- break;
- } else {
- redisReplyReaderFeed(reader,buf,nread);
- reply = redisReplyReaderGetReply(reader);
- }
- } while (reply == NULL);
-
- redisReplyReaderFree(reader);
- return reply;
-}
-
void *redisReplyReaderCreate(redisReplyFunctions *fn) {
redisReader *r = calloc(sizeof(redisReader),1);
r->fn = fn == NULL ? &defaultFunctions : fn;
@@ -592,17 +559,143 @@ static sds redisFormatCommand(const char *format, va_list ap) {
sdsfree(argv[j]);
}
free(argv);
+ return cmd;
}
-redisReply *redisCommand(int fd, const char *format, ...) {
+static int redisContextConnect(redisContext *c, const char *ip, int port) {
+ char err[ANET_ERR_LEN];
+ if (c->flags & HIREDIS_BLOCK) {
+ c->fd = anetTcpConnect(err,(char*)ip,port);
+ } else {
+ c->fd = anetTcpNonBlockConnect(err,(char*)ip,port);
+ }
+
+ if (c->fd == ANET_ERR) {
+ c->error = c->fn->createError(err,strlen(err));
+ return HIREDIS_ERR;
+ }
+ if (anetTcpNoDelay(err,c->fd) == ANET_ERR) {
+ c->error = c->fn->createError(err,strlen(err));
+ return HIREDIS_ERR;
+ }
+ return HIREDIS_OK;
+}
+
+static redisContext *redisContextInit(redisReplyFunctions *fn) {
+ redisContext *c = malloc(sizeof(*c));
+ c->fn = fn == NULL ? &defaultFunctions : fn;
+ c->obuf = sdsempty();
+ c->error = NULL;
+ c->reader = redisReplyReaderCreate(fn);
+ return c;
+}
+
+/* Connect to a Redis instance. On error the field error in the returned
+ * context will be set to the return value of the error function.
+ * When no set of reply functions is given, the default set will be used. */
+redisContext *redisConnect(const char *ip, int port, redisReplyFunctions *fn) {
+ redisContext *c = redisContextInit(fn);
+ c->flags |= HIREDIS_BLOCK;
+ redisContextConnect(c,ip,port);
+ return c;
+}
+
+redisContext *redisConnectNonBlock(const char *ip, int port, redisReplyFunctions *fn) {
+ redisContext *c = redisContextInit(fn);
+ c->flags &= ~HIREDIS_BLOCK;
+ redisContextConnect(c,ip,port);
+ return c;
+}
+
+/* Use this function to handle a read event on the descriptor. It will try
+ * and read some bytes from the socket and feed them to the reply parser.
+ *
+ * After this function is called, you may use redisContextReadReply to
+ * see if there is a reply available. */
+int redisBufferRead(redisContext *c) {
+ char buf[2048];
+ int nread = read(c->fd,buf,sizeof(buf));
+ if (nread == -1) {
+ if (errno == EAGAIN) {
+ /* Try again later */
+ } else {
+ /* Set error in context */
+ c->error = formatError(c->fn,
+ "Error reading from socket: %s", strerror(errno));
+ return HIREDIS_ERR;
+ }
+ } else if (nread == 0) {
+ c->error = formatError(c->fn,
+ "Server closed the connection");
+ return HIREDIS_ERR;
+ } else {
+ redisReplyReaderFeed(c->reader,buf,nread);
+ }
+ return HIREDIS_OK;
+}
+
+void *redisGetReply(redisContext *c) {
+ return redisReplyReaderGetReply(c->reader);
+}
+
+/* Use this function to try and write the entire output buffer to the
+ * descriptor. Returns 1 when the entire buffer was written, 0 otherwise. */
+int redisBufferWrite(redisContext *c, int *done) {
+ int nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
+ if (nwritten == -1) {
+ if (errno == EAGAIN) {
+ /* Try again later */
+ } else {
+ /* Set error in context */
+ c->error = formatError(c->fn,
+ "Error writing to socket: %s", strerror(errno));
+ return HIREDIS_ERR;
+ }
+ } else if (nwritten > 0) {
+ if (nwritten == (signed)sdslen(c->obuf)) {
+ sdsfree(c->obuf);
+ c->obuf = sdsempty();
+ } else {
+ c->obuf = sdsrange(c->obuf,nwritten,-1);
+ }
+ }
+ if (done != NULL) *done = (sdslen(c->obuf) == 0);
+ return HIREDIS_OK;
+}
+
+static void* redisCommandWrite(redisContext *c, char *str, size_t len) {
+ void *reply = NULL;
+ int wdone = 0;
+ c->obuf = sdscatlen(c->obuf,str,len);
+
+ /* Only take action when this is a blocking context. */
+ if (c->flags & HIREDIS_BLOCK) {
+ do { /* Write until done. */
+ if (redisBufferWrite(c,&wdone) == HIREDIS_ERR)
+ return c->error;
+ } while (!wdone);
+
+ do { /* Read until there is a reply. */
+ if (redisBufferRead(c) == HIREDIS_ERR)
+ return c->error;
+ reply = redisGetReply(c);
+ } while (reply == NULL);
+ }
+ return reply;
+}
+
+/* Write a formatted command to the output buffer, and, if the context is a
+ * non-blocking connection, read the reply and return it. When this function
+ * is called from a blocking context, it will always return NULL. */
+void *redisCommand(redisContext *c, const char *format, ...) {
va_list ap;
sds cmd;
+ void *reply;
va_start(ap,format);
cmd = redisFormatCommand(format,ap);
va_end(ap);
- /* Send the command via socket */
- anetWrite(fd,cmd,sdslen(cmd));
+ reply = redisCommandWrite(c,cmd,sdslen(cmd));
sdsfree(cmd);
- return redisReadReply(fd);
+ return reply;
}