summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJan-Erik Rediger <janerik@fnordig.de>2015-04-16 19:28:12 +0200
committerJan-Erik Rediger <janerik@fnordig.de>2015-04-16 21:00:30 +0200
commitd9e0b0f6abfbb8918f73607bdfcc707d0df3fd41 (patch)
tree18a642cfca4697e0349767238ddf6d6ff368ea38
parentb872919463fbc04c3a8fde113cb9ae89dfcb3859 (diff)
Implement a reconnect method for the client context
Originally implemented by @abedra as part of #306. In case a write or read times out, we force an error state, because we can't guarantuee that the next read will get the right data. Instead we need to reconnect to have a clean-state connection, which is now easily possible with this method.
-rw-r--r--hiredis.c40
-rw-r--r--hiredis.h20
-rw-r--r--net.c66
-rw-r--r--test.c49
4 files changed, 170 insertions, 5 deletions
diff --git a/hiredis.c b/hiredis.c
index d2b6103..6e8ad3b 100644
--- a/hiredis.c
+++ b/hiredis.c
@@ -598,6 +598,10 @@ static redisContext *redisContextInit(void) {
c->errstr[0] = '\0';
c->obuf = sdsempty();
c->reader = redisReaderCreate();
+ c->tcp.host = NULL;
+ c->tcp.source_addr = NULL;
+ c->unix.path = NULL;
+ c->timeout = NULL;
if (c->obuf == NULL || c->reader == NULL) {
redisFree(c);
@@ -616,6 +620,14 @@ void redisFree(redisContext *c) {
sdsfree(c->obuf);
if (c->reader != NULL)
redisReaderFree(c->reader);
+ if (c->tcp.host)
+ free(c->tcp.host);
+ if (c->tcp.source_addr)
+ free(c->tcp.source_addr);
+ if (c->unix.path)
+ free(c->unix.path);
+ if (c->timeout)
+ free(c->timeout);
free(c);
}
@@ -626,6 +638,34 @@ int redisFreeKeepFd(redisContext *c) {
return fd;
}
+int redisReconnect(redisContext *c) {
+ c->err = 0;
+ memset(c->errstr, '\0', strlen(c->errstr));
+
+ if (c->fd > 0) {
+ close(c->fd);
+ }
+
+ sdsfree(c->obuf);
+ redisReaderFree(c->reader);
+
+ c->obuf = sdsempty();
+ c->reader = redisReaderCreate();
+
+ if (c->connection_type == REDIS_CONN_TCP) {
+ return redisContextConnectBindTcp(c, c->tcp.host, c->tcp.port,
+ c->timeout, c->tcp.source_addr);
+ } else if (c->connection_type == REDIS_CONN_UNIX) {
+ return redisContextConnectUnix(c, c->unix.path, c->timeout);
+ } else {
+ /* Something bad happened here and shouldn't have. There isn't
+ enough information in the context to reconnect. */
+ __redisSetError(c,REDIS_ERR_OTHER,"Not enough information to reconnect");
+ }
+
+ return REDIS_ERR;
+}
+
/* Connect to a Redis instance. On error the field error in the returned
* context will be set to the return value of the error function.
* When no set of reply functions is given, the default set will be used. */
diff --git a/hiredis.h b/hiredis.h
index 93df4ef..dc4bf5d 100644
--- a/hiredis.h
+++ b/hiredis.h
@@ -128,6 +128,11 @@ int redisFormatSdsCommandArgv(sds *target, int argc, const char ** argv, const s
void redisFreeCommand(char *cmd);
void redisFreeSdsCommand(sds cmd);
+enum redisConnectionType {
+ REDIS_CONN_TCP,
+ REDIS_CONN_UNIX,
+};
+
/* Context for a connection to Redis */
typedef struct redisContext {
int err; /* Error flags, 0 when there is no error */
@@ -136,6 +141,20 @@ typedef struct redisContext {
int flags;
char *obuf; /* Write buffer */
redisReader *reader; /* Protocol reader */
+
+ enum redisConnectionType connection_type;
+ struct timeval *timeout;
+
+ struct {
+ char *host;
+ char *source_addr;
+ int port;
+ } tcp;
+
+ struct {
+ char *path;
+ } unix;
+
} redisContext;
redisContext *redisConnect(const char *ip, int port);
@@ -149,6 +168,7 @@ redisContext *redisConnectUnix(const char *path);
redisContext *redisConnectUnixWithTimeout(const char *path, const struct timeval tv);
redisContext *redisConnectUnixNonBlock(const char *path);
redisContext *redisConnectFd(int fd);
+int redisReconnect(redisContext *c);
int redisSetTimeout(redisContext *c, const struct timeval tv);
int redisEnableKeepAlive(redisContext *c);
void redisFree(redisContext *c);
diff --git a/net.c b/net.c
index be7a047..9718f76 100644
--- a/net.c
+++ b/net.c
@@ -47,6 +47,7 @@
#include <stdio.h>
#include <poll.h>
#include <limits.h>
+#include <stdlib.h>
#include "net.h"
#include "sds.h"
@@ -263,6 +264,44 @@ static int _redisContextConnectTcp(redisContext *c, const char *addr, int port,
int reuseaddr = (c->flags & REDIS_REUSEADDR);
int reuses = 0;
+ c->connection_type = REDIS_CONN_TCP;
+ c->tcp.port = port;
+
+ /* We need to take possession of the passed parameters
+ * to make them reusable for a reconnect.
+ * We also carefully check we don't free data we already own,
+ * as in the case of the reconnect method.
+ *
+ * This is a bit ugly, but atleast it works and doesn't leak memory.
+ **/
+ if (c->tcp.host != addr) {
+ if (c->tcp.host)
+ free(c->tcp.host);
+
+ c->tcp.host = strdup(addr);
+ }
+
+ if (timeout) {
+ if (c->timeout != timeout) {
+ if (c->timeout == NULL)
+ c->timeout = malloc(sizeof(struct timeval));
+
+ memcpy(c->timeout, timeout, sizeof(struct timeval));
+ }
+ } else {
+ if (c->timeout)
+ free(c->timeout);
+ c->timeout = NULL;
+ }
+
+ if (source_addr == NULL) {
+ free(c->tcp.source_addr);
+ c->tcp.source_addr = NULL;
+ } else if (c->tcp.source_addr != source_addr) {
+ free(c->tcp.source_addr);
+ c->tcp.source_addr = strdup(source_addr);
+ }
+
snprintf(_port, 6, "%d", port);
memset(&hints,0,sizeof(hints));
hints.ai_family = AF_INET;
@@ -273,7 +312,7 @@ static int _redisContextConnectTcp(redisContext *c, const char *addr, int port,
* as this would add latency to every connect. Otherwise a more sensible
* route could be: Use IPv6 if both addresses are available and there is IPv6
* connectivity. */
- if ((rv = getaddrinfo(addr,_port,&hints,&servinfo)) != 0) {
+ if ((rv = getaddrinfo(c->tcp.host,_port,&hints,&servinfo)) != 0) {
hints.ai_family = AF_INET6;
if ((rv = getaddrinfo(addr,_port,&hints,&servinfo)) != 0) {
__redisSetError(c,REDIS_ERR_OTHER,gai_strerror(rv));
@@ -288,10 +327,10 @@ addrretry:
c->fd = s;
if (redisSetBlocking(c,0) != REDIS_OK)
goto error;
- if (source_addr) {
+ if (c->tcp.source_addr) {
int bound = 0;
/* Using getaddrinfo saves us from self-determining IPv4 vs IPv6 */
- if ((rv = getaddrinfo(source_addr, NULL, &hints, &bservinfo)) != 0) {
+ if ((rv = getaddrinfo(c->tcp.source_addr, NULL, &hints, &bservinfo)) != 0) {
char buf[128];
snprintf(buf,sizeof(buf),"Can't get addr: %s",gai_strerror(rv));
__redisSetError(c,REDIS_ERR_OTHER,buf);
@@ -333,7 +372,7 @@ addrretry:
goto addrretry;
}
} else {
- if (redisContextWaitReady(c,timeout) != REDIS_OK)
+ if (redisContextWaitReady(c,c->timeout) != REDIS_OK)
goto error;
}
}
@@ -380,13 +419,30 @@ int redisContextConnectUnix(redisContext *c, const char *path, const struct time
if (redisSetBlocking(c,0) != REDIS_OK)
return REDIS_ERR;
+ c->connection_type = REDIS_CONN_UNIX;
+ if (c->unix.path != path)
+ c->unix.path = strdup(path);
+
+ if (timeout) {
+ if (c->timeout != timeout) {
+ if (c->timeout == NULL)
+ c->timeout = malloc(sizeof(struct timeval));
+
+ memcpy(c->timeout, timeout, sizeof(struct timeval));
+ }
+ } else {
+ if (c->timeout)
+ free(c->timeout);
+ c->timeout = NULL;
+ }
+
sa.sun_family = AF_LOCAL;
strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1);
if (connect(c->fd, (struct sockaddr*)&sa, sizeof(sa)) == -1) {
if (errno == EINPROGRESS && !blocking) {
/* This is ok. */
} else {
- if (redisContextWaitReady(c,timeout) != REDIS_OK)
+ if (redisContextWaitReady(c,c->timeout) != REDIS_OK)
return REDIS_ERR;
}
}
diff --git a/test.c b/test.c
index c1575b1..8fde554 100644
--- a/test.c
+++ b/test.c
@@ -11,6 +11,7 @@
#include <limits.h>
#include "hiredis.h"
+#include "net.h"
enum connection_type {
CONN_TCP,
@@ -443,6 +444,52 @@ static void test_blocking_connection(struct config config) {
disconnect(c, 0);
}
+static void test_blocking_connection_timeouts(struct config config) {
+ redisContext *c;
+ redisReply *reply;
+ ssize_t s;
+ const char *cmd = "DEBUG SLEEP 3\r\n";
+ struct timeval tv;
+
+ c = connect(config);
+ test("Successfully completes a command when the timeout is not exceeded: ");
+ reply = redisCommand(c,"SET foo fast");
+ freeReplyObject(reply);
+ tv.tv_sec = 0;
+ tv.tv_usec = 10000;
+ redisSetTimeout(c, tv);
+ reply = redisCommand(c, "GET foo");
+ test_cond(reply != NULL && reply->type == REDIS_REPLY_STRING && memcmp(reply->str, "fast", 4) == 0);
+ freeReplyObject(reply);
+ disconnect(c, 0);
+
+ c = connect(config);
+ test("Does not return a reply when the command times out: ");
+ s = write(c->fd, cmd, strlen(cmd));
+ tv.tv_sec = 0;
+ tv.tv_usec = 10000;
+ redisSetTimeout(c, tv);
+ reply = redisCommand(c, "GET foo");
+ test_cond(s > 0 && reply == NULL && c->err == REDIS_ERR_IO && strcmp(c->errstr, "Resource temporarily unavailable") == 0);
+ freeReplyObject(reply);
+
+ test("Reconnect properly reconnects after a timeout: ");
+ redisReconnect(c);
+ reply = redisCommand(c, "PING");
+ test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
+ freeReplyObject(reply);
+
+ test("Reconnect properly uses owned parameters: ");
+ config.tcp.host = "foo";
+ config.unix.path = "foo";
+ redisReconnect(c);
+ reply = redisCommand(c, "PING");
+ test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
+ freeReplyObject(reply);
+
+ disconnect(c, 0);
+}
+
static void test_blocking_io_errors(struct config config) {
redisContext *c;
redisReply *reply;
@@ -729,6 +776,7 @@ int main(int argc, char **argv) {
printf("\nTesting against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
cfg.type = CONN_TCP;
test_blocking_connection(cfg);
+ test_blocking_connection_timeouts(cfg);
test_blocking_io_errors(cfg);
test_invalid_timeout_errors(cfg);
test_append_formatted_commands(cfg);
@@ -737,6 +785,7 @@ int main(int argc, char **argv) {
printf("\nTesting against Unix socket connection (%s):\n", cfg.unix.path);
cfg.type = CONN_UNIX;
test_blocking_connection(cfg);
+ test_blocking_connection_timeouts(cfg);
test_blocking_io_errors(cfg);
if (throughput) test_throughput(cfg);