diff options
author | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-09-24 18:38:14 +0200 |
---|---|---|
committer | Pieter Noordhuis <pcnoordhuis@gmail.com> | 2010-09-24 18:42:22 +0200 |
commit | 817d26b81df7028ff8bcb8690dae0cec1319b63a (patch) | |
tree | df16a1c92f3dd4c9e68d18fda2b423477c2c5d2e /hiredis.c | |
parent | 81f6b8ffd42460038e3b9498e7a60347eef1fcd4 (diff) |
Use context variable to keep state for a connection
The context supports both blocking and non-blocking connections. All
read and write events are not handled by hiredis, but hiredis exposes an
API for buffered reading/writing to Redis.
Diffstat (limited to 'hiredis.c')
-rw-r--r-- | hiredis.c | 203 |
1 files changed, 148 insertions, 55 deletions
@@ -33,6 +33,7 @@ #include <unistd.h> #include <stdarg.h> #include <assert.h> +#include <errno.h> #include "hiredis.h" #include "anet.h" @@ -50,9 +51,8 @@ typedef struct redisReader { unsigned int rpos; /* list cursor */ } redisReader; -static redisReply *redisReadReply(int fd); static redisReply *createReplyObject(int type, sds reply); -static void *createErrorObject(redisReader *context, const char *fmt, ...); +static void *createErrorObject(const char *str, size_t len); static void *createStringObject(redisReadTask *task, char *str, size_t len); static void *createArrayObject(redisReadTask *task, int elements); static void *createIntegerObject(redisReadTask *task, long long value); @@ -61,6 +61,7 @@ static void redisSetReplyReaderError(redisReader *r, void *obj); /* Default set of functions to build the reply. */ static redisReplyFunctions defaultFunctions = { + createErrorObject, createStringObject, createArrayObject, createIntegerObject, @@ -74,20 +75,6 @@ static void redisOOM(void) { exit(1); } -/* Connect to a Redis instance. On success NULL is returned and *fd is set - * to the socket file descriptor. On error a redisReply object is returned - * with reply->type set to REDIS_REPLY_ERROR and reply->string containing - * the error message. This replyObject must be freed with redisFreeReply(). */ -redisReply *redisConnect(int *fd, const char *ip, int port) { - char err[ANET_ERR_LEN]; - - *fd = anetTcpConnect(err,ip,port); - if (*fd == ANET_ERR) - return (redisReply*)createErrorObject(NULL,err); - anetTcpNoDelay(NULL,*fd); - return NULL; -} - /* Create a reply object */ static redisReply *createReplyObject(int type, sds reply) { redisReply *r = calloc(sizeof(*r),1); @@ -119,28 +106,27 @@ void freeReplyObject(void *reply) { free(r); } -static void *createErrorObject(redisReader *context, const char *fmt, ...) { +/* Helper function that allows printf-like creation of error objects. */ +static void *formatError(redisReplyFunctions *fn, const char *fmt, ...) { va_list ap; sds err; void *obj; - redisReadTask t = { REDIS_PROTOCOL_ERROR, NULL, -1 }; va_start(ap,fmt); err = sdscatvprintf(sdsempty(),fmt,ap); va_end(ap); - - /* Use the context of the reader if it is provided. */ - if (context) - obj = context->fn->createString(&t,err,sdslen(err)); - else - obj = createStringObject(&t,err,sdslen(err)); + obj = fn->createError(err,sdslen(err)); sdsfree(err); return obj; } +static void *createErrorObject(const char *str, size_t len) { + redisReply *r = createReplyObject(REDIS_ERROR,sdsnewlen(str,len)); + return r; +} + static void *createStringObject(redisReadTask *task, char *str, size_t len) { redisReply *r = createReplyObject(task->type,sdsnewlen(str,len)); - assert(task->type == REDIS_PROTOCOL_ERROR || - task->type == REDIS_REPLY_ERROR || + assert(task->type == REDIS_REPLY_ERROR || task->type == REDIS_REPLY_STATUS || task->type == REDIS_REPLY_STRING); @@ -322,6 +308,7 @@ static int processItem(redisReader *r) { redisReadTask *cur = &(r->rlist[r->rpos]); char *p; sds byte; + void *err; /* check if we need to read type */ if (cur->type < 0) { @@ -344,8 +331,9 @@ static int processItem(redisReader *r) { break; default: byte = sdscatrepr(sdsempty(),p,1); - redisSetReplyReaderError(r,createErrorObject(r, - "protocol error, got %s as reply type byte", byte)); + err = formatError(r->fn, + "protocol error, got %s as reply type byte", byte); + redisSetReplyReaderError(r,err); sdsfree(byte); return -1; } @@ -366,33 +354,12 @@ static int processItem(redisReader *r) { case REDIS_REPLY_ARRAY: return processMultiBulkItem(r); default: - redisSetReplyReaderError(r,createErrorObject(r, - "unknown item type '%d'", cur->type)); + err = formatError(r->fn,"unknown item type '%d'", cur->type); + redisSetReplyReaderError(r,err); return -1; } } -#define READ_BUFFER_SIZE 2048 -static redisReply *redisReadReply(int fd) { - void *reader = redisReplyReaderCreate(&defaultFunctions); - redisReply *reply; - char buf[1024]; - int nread; - - do { - if ((nread = read(fd,buf,sizeof(buf))) <= 0) { - reply = createErrorObject(reader,"I/O error"); - break; - } else { - redisReplyReaderFeed(reader,buf,nread); - reply = redisReplyReaderGetReply(reader); - } - } while (reply == NULL); - - redisReplyReaderFree(reader); - return reply; -} - void *redisReplyReaderCreate(redisReplyFunctions *fn) { redisReader *r = calloc(sizeof(redisReader),1); r->fn = fn == NULL ? &defaultFunctions : fn; @@ -592,17 +559,143 @@ static sds redisFormatCommand(const char *format, va_list ap) { sdsfree(argv[j]); } free(argv); + return cmd; } -redisReply *redisCommand(int fd, const char *format, ...) { +static int redisContextConnect(redisContext *c, const char *ip, int port) { + char err[ANET_ERR_LEN]; + if (c->flags & HIREDIS_BLOCK) { + c->fd = anetTcpConnect(err,(char*)ip,port); + } else { + c->fd = anetTcpNonBlockConnect(err,(char*)ip,port); + } + + if (c->fd == ANET_ERR) { + c->error = c->fn->createError(err,strlen(err)); + return HIREDIS_ERR; + } + if (anetTcpNoDelay(err,c->fd) == ANET_ERR) { + c->error = c->fn->createError(err,strlen(err)); + return HIREDIS_ERR; + } + return HIREDIS_OK; +} + +static redisContext *redisContextInit(redisReplyFunctions *fn) { + redisContext *c = malloc(sizeof(*c)); + c->fn = fn == NULL ? &defaultFunctions : fn; + c->obuf = sdsempty(); + c->error = NULL; + c->reader = redisReplyReaderCreate(fn); + return c; +} + +/* Connect to a Redis instance. On error the field error in the returned + * context will be set to the return value of the error function. + * When no set of reply functions is given, the default set will be used. */ +redisContext *redisConnect(const char *ip, int port, redisReplyFunctions *fn) { + redisContext *c = redisContextInit(fn); + c->flags |= HIREDIS_BLOCK; + redisContextConnect(c,ip,port); + return c; +} + +redisContext *redisConnectNonBlock(const char *ip, int port, redisReplyFunctions *fn) { + redisContext *c = redisContextInit(fn); + c->flags &= ~HIREDIS_BLOCK; + redisContextConnect(c,ip,port); + return c; +} + +/* Use this function to handle a read event on the descriptor. It will try + * and read some bytes from the socket and feed them to the reply parser. + * + * After this function is called, you may use redisContextReadReply to + * see if there is a reply available. */ +int redisBufferRead(redisContext *c) { + char buf[2048]; + int nread = read(c->fd,buf,sizeof(buf)); + if (nread == -1) { + if (errno == EAGAIN) { + /* Try again later */ + } else { + /* Set error in context */ + c->error = formatError(c->fn, + "Error reading from socket: %s", strerror(errno)); + return HIREDIS_ERR; + } + } else if (nread == 0) { + c->error = formatError(c->fn, + "Server closed the connection"); + return HIREDIS_ERR; + } else { + redisReplyReaderFeed(c->reader,buf,nread); + } + return HIREDIS_OK; +} + +void *redisGetReply(redisContext *c) { + return redisReplyReaderGetReply(c->reader); +} + +/* Use this function to try and write the entire output buffer to the + * descriptor. Returns 1 when the entire buffer was written, 0 otherwise. */ +int redisBufferWrite(redisContext *c, int *done) { + int nwritten = write(c->fd,c->obuf,sdslen(c->obuf)); + if (nwritten == -1) { + if (errno == EAGAIN) { + /* Try again later */ + } else { + /* Set error in context */ + c->error = formatError(c->fn, + "Error writing to socket: %s", strerror(errno)); + return HIREDIS_ERR; + } + } else if (nwritten > 0) { + if (nwritten == (signed)sdslen(c->obuf)) { + sdsfree(c->obuf); + c->obuf = sdsempty(); + } else { + c->obuf = sdsrange(c->obuf,nwritten,-1); + } + } + if (done != NULL) *done = (sdslen(c->obuf) == 0); + return HIREDIS_OK; +} + +static void* redisCommandWrite(redisContext *c, char *str, size_t len) { + void *reply = NULL; + int wdone = 0; + c->obuf = sdscatlen(c->obuf,str,len); + + /* Only take action when this is a blocking context. */ + if (c->flags & HIREDIS_BLOCK) { + do { /* Write until done. */ + if (redisBufferWrite(c,&wdone) == HIREDIS_ERR) + return c->error; + } while (!wdone); + + do { /* Read until there is a reply. */ + if (redisBufferRead(c) == HIREDIS_ERR) + return c->error; + reply = redisGetReply(c); + } while (reply == NULL); + } + return reply; +} + +/* Write a formatted command to the output buffer, and, if the context is a + * non-blocking connection, read the reply and return it. When this function + * is called from a blocking context, it will always return NULL. */ +void *redisCommand(redisContext *c, const char *format, ...) { va_list ap; sds cmd; + void *reply; va_start(ap,format); cmd = redisFormatCommand(format,ap); va_end(ap); - /* Send the command via socket */ - anetWrite(fd,cmd,sdslen(cmd)); + reply = redisCommandWrite(c,cmd,sdslen(cmd)); sdsfree(cmd); - return redisReadReply(fd); + return reply; } |