From d9e0b0f6abfbb8918f73607bdfcc707d0df3fd41 Mon Sep 17 00:00:00 2001 From: Jan-Erik Rediger Date: Thu, 16 Apr 2015 19:28:12 +0200 Subject: Implement a reconnect method for the client context Originally implemented by @abedra as part of #306. In case a write or read times out, we force an error state, because we can't guarantuee that the next read will get the right data. Instead we need to reconnect to have a clean-state connection, which is now easily possible with this method. --- hiredis.c | 40 ++++++++++++++++++++++++++++++++++++++ hiredis.h | 20 +++++++++++++++++++ net.c | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----- test.c | 49 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 170 insertions(+), 5 deletions(-) diff --git a/hiredis.c b/hiredis.c index d2b6103..6e8ad3b 100644 --- a/hiredis.c +++ b/hiredis.c @@ -598,6 +598,10 @@ static redisContext *redisContextInit(void) { c->errstr[0] = '\0'; c->obuf = sdsempty(); c->reader = redisReaderCreate(); + c->tcp.host = NULL; + c->tcp.source_addr = NULL; + c->unix.path = NULL; + c->timeout = NULL; if (c->obuf == NULL || c->reader == NULL) { redisFree(c); @@ -616,6 +620,14 @@ void redisFree(redisContext *c) { sdsfree(c->obuf); if (c->reader != NULL) redisReaderFree(c->reader); + if (c->tcp.host) + free(c->tcp.host); + if (c->tcp.source_addr) + free(c->tcp.source_addr); + if (c->unix.path) + free(c->unix.path); + if (c->timeout) + free(c->timeout); free(c); } @@ -626,6 +638,34 @@ int redisFreeKeepFd(redisContext *c) { return fd; } +int redisReconnect(redisContext *c) { + c->err = 0; + memset(c->errstr, '\0', strlen(c->errstr)); + + if (c->fd > 0) { + close(c->fd); + } + + sdsfree(c->obuf); + redisReaderFree(c->reader); + + c->obuf = sdsempty(); + c->reader = redisReaderCreate(); + + if (c->connection_type == REDIS_CONN_TCP) { + return redisContextConnectBindTcp(c, c->tcp.host, c->tcp.port, + c->timeout, c->tcp.source_addr); + } else if (c->connection_type == REDIS_CONN_UNIX) { + return redisContextConnectUnix(c, c->unix.path, c->timeout); + } else { + /* Something bad happened here and shouldn't have. There isn't + enough information in the context to reconnect. */ + __redisSetError(c,REDIS_ERR_OTHER,"Not enough information to reconnect"); + } + + return REDIS_ERR; +} + /* Connect to a Redis instance. On error the field error in the returned * context will be set to the return value of the error function. * When no set of reply functions is given, the default set will be used. */ diff --git a/hiredis.h b/hiredis.h index 93df4ef..dc4bf5d 100644 --- a/hiredis.h +++ b/hiredis.h @@ -128,6 +128,11 @@ int redisFormatSdsCommandArgv(sds *target, int argc, const char ** argv, const s void redisFreeCommand(char *cmd); void redisFreeSdsCommand(sds cmd); +enum redisConnectionType { + REDIS_CONN_TCP, + REDIS_CONN_UNIX, +}; + /* Context for a connection to Redis */ typedef struct redisContext { int err; /* Error flags, 0 when there is no error */ @@ -136,6 +141,20 @@ typedef struct redisContext { int flags; char *obuf; /* Write buffer */ redisReader *reader; /* Protocol reader */ + + enum redisConnectionType connection_type; + struct timeval *timeout; + + struct { + char *host; + char *source_addr; + int port; + } tcp; + + struct { + char *path; + } unix; + } redisContext; redisContext *redisConnect(const char *ip, int port); @@ -149,6 +168,7 @@ redisContext *redisConnectUnix(const char *path); redisContext *redisConnectUnixWithTimeout(const char *path, const struct timeval tv); redisContext *redisConnectUnixNonBlock(const char *path); redisContext *redisConnectFd(int fd); +int redisReconnect(redisContext *c); int redisSetTimeout(redisContext *c, const struct timeval tv); int redisEnableKeepAlive(redisContext *c); void redisFree(redisContext *c); diff --git a/net.c b/net.c index be7a047..9718f76 100644 --- a/net.c +++ b/net.c @@ -47,6 +47,7 @@ #include #include #include +#include #include "net.h" #include "sds.h" @@ -263,6 +264,44 @@ static int _redisContextConnectTcp(redisContext *c, const char *addr, int port, int reuseaddr = (c->flags & REDIS_REUSEADDR); int reuses = 0; + c->connection_type = REDIS_CONN_TCP; + c->tcp.port = port; + + /* We need to take possession of the passed parameters + * to make them reusable for a reconnect. + * We also carefully check we don't free data we already own, + * as in the case of the reconnect method. + * + * This is a bit ugly, but atleast it works and doesn't leak memory. + **/ + if (c->tcp.host != addr) { + if (c->tcp.host) + free(c->tcp.host); + + c->tcp.host = strdup(addr); + } + + if (timeout) { + if (c->timeout != timeout) { + if (c->timeout == NULL) + c->timeout = malloc(sizeof(struct timeval)); + + memcpy(c->timeout, timeout, sizeof(struct timeval)); + } + } else { + if (c->timeout) + free(c->timeout); + c->timeout = NULL; + } + + if (source_addr == NULL) { + free(c->tcp.source_addr); + c->tcp.source_addr = NULL; + } else if (c->tcp.source_addr != source_addr) { + free(c->tcp.source_addr); + c->tcp.source_addr = strdup(source_addr); + } + snprintf(_port, 6, "%d", port); memset(&hints,0,sizeof(hints)); hints.ai_family = AF_INET; @@ -273,7 +312,7 @@ static int _redisContextConnectTcp(redisContext *c, const char *addr, int port, * as this would add latency to every connect. Otherwise a more sensible * route could be: Use IPv6 if both addresses are available and there is IPv6 * connectivity. */ - if ((rv = getaddrinfo(addr,_port,&hints,&servinfo)) != 0) { + if ((rv = getaddrinfo(c->tcp.host,_port,&hints,&servinfo)) != 0) { hints.ai_family = AF_INET6; if ((rv = getaddrinfo(addr,_port,&hints,&servinfo)) != 0) { __redisSetError(c,REDIS_ERR_OTHER,gai_strerror(rv)); @@ -288,10 +327,10 @@ addrretry: c->fd = s; if (redisSetBlocking(c,0) != REDIS_OK) goto error; - if (source_addr) { + if (c->tcp.source_addr) { int bound = 0; /* Using getaddrinfo saves us from self-determining IPv4 vs IPv6 */ - if ((rv = getaddrinfo(source_addr, NULL, &hints, &bservinfo)) != 0) { + if ((rv = getaddrinfo(c->tcp.source_addr, NULL, &hints, &bservinfo)) != 0) { char buf[128]; snprintf(buf,sizeof(buf),"Can't get addr: %s",gai_strerror(rv)); __redisSetError(c,REDIS_ERR_OTHER,buf); @@ -333,7 +372,7 @@ addrretry: goto addrretry; } } else { - if (redisContextWaitReady(c,timeout) != REDIS_OK) + if (redisContextWaitReady(c,c->timeout) != REDIS_OK) goto error; } } @@ -380,13 +419,30 @@ int redisContextConnectUnix(redisContext *c, const char *path, const struct time if (redisSetBlocking(c,0) != REDIS_OK) return REDIS_ERR; + c->connection_type = REDIS_CONN_UNIX; + if (c->unix.path != path) + c->unix.path = strdup(path); + + if (timeout) { + if (c->timeout != timeout) { + if (c->timeout == NULL) + c->timeout = malloc(sizeof(struct timeval)); + + memcpy(c->timeout, timeout, sizeof(struct timeval)); + } + } else { + if (c->timeout) + free(c->timeout); + c->timeout = NULL; + } + sa.sun_family = AF_LOCAL; strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1); if (connect(c->fd, (struct sockaddr*)&sa, sizeof(sa)) == -1) { if (errno == EINPROGRESS && !blocking) { /* This is ok. */ } else { - if (redisContextWaitReady(c,timeout) != REDIS_OK) + if (redisContextWaitReady(c,c->timeout) != REDIS_OK) return REDIS_ERR; } } diff --git a/test.c b/test.c index c1575b1..8fde554 100644 --- a/test.c +++ b/test.c @@ -11,6 +11,7 @@ #include #include "hiredis.h" +#include "net.h" enum connection_type { CONN_TCP, @@ -443,6 +444,52 @@ static void test_blocking_connection(struct config config) { disconnect(c, 0); } +static void test_blocking_connection_timeouts(struct config config) { + redisContext *c; + redisReply *reply; + ssize_t s; + const char *cmd = "DEBUG SLEEP 3\r\n"; + struct timeval tv; + + c = connect(config); + test("Successfully completes a command when the timeout is not exceeded: "); + reply = redisCommand(c,"SET foo fast"); + freeReplyObject(reply); + tv.tv_sec = 0; + tv.tv_usec = 10000; + redisSetTimeout(c, tv); + reply = redisCommand(c, "GET foo"); + test_cond(reply != NULL && reply->type == REDIS_REPLY_STRING && memcmp(reply->str, "fast", 4) == 0); + freeReplyObject(reply); + disconnect(c, 0); + + c = connect(config); + test("Does not return a reply when the command times out: "); + s = write(c->fd, cmd, strlen(cmd)); + tv.tv_sec = 0; + tv.tv_usec = 10000; + redisSetTimeout(c, tv); + reply = redisCommand(c, "GET foo"); + test_cond(s > 0 && reply == NULL && c->err == REDIS_ERR_IO && strcmp(c->errstr, "Resource temporarily unavailable") == 0); + freeReplyObject(reply); + + test("Reconnect properly reconnects after a timeout: "); + redisReconnect(c); + reply = redisCommand(c, "PING"); + test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0); + freeReplyObject(reply); + + test("Reconnect properly uses owned parameters: "); + config.tcp.host = "foo"; + config.unix.path = "foo"; + redisReconnect(c); + reply = redisCommand(c, "PING"); + test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0); + freeReplyObject(reply); + + disconnect(c, 0); +} + static void test_blocking_io_errors(struct config config) { redisContext *c; redisReply *reply; @@ -729,6 +776,7 @@ int main(int argc, char **argv) { printf("\nTesting against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port); cfg.type = CONN_TCP; test_blocking_connection(cfg); + test_blocking_connection_timeouts(cfg); test_blocking_io_errors(cfg); test_invalid_timeout_errors(cfg); test_append_formatted_commands(cfg); @@ -737,6 +785,7 @@ int main(int argc, char **argv) { printf("\nTesting against Unix socket connection (%s):\n", cfg.unix.path); cfg.type = CONN_UNIX; test_blocking_connection(cfg); + test_blocking_connection_timeouts(cfg); test_blocking_io_errors(cfg); if (throughput) test_throughput(cfg); -- cgit v1.2.3