From 49974c9359ad6b58cea15106cf6511bdb31d31a9 Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Sun, 4 Mar 2018 18:17:16 +0200 Subject: Call connect(2) again for non-blocking connect This retrieves the actual error which occurred, as getsockopt is not always reliable in this regard. --- async.c | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) (limited to 'async.c') diff --git a/async.c b/async.c index cb5b841..7309344 100644 --- a/async.c +++ b/async.c @@ -506,22 +506,22 @@ void redisProcessCallbacks(redisAsyncContext *ac) { * write event fires. When connecting was not successful, the connect callback * is called with a REDIS_ERR status and the context is free'd. */ static int __redisAsyncHandleConnect(redisAsyncContext *ac) { + int completed = 0; redisContext *c = &(ac->c); - - if (redisCheckSocketError(c) == REDIS_ERR) { - /* Try again later when connect(2) is still in progress. */ - if (errno == EINPROGRESS) - return REDIS_OK; - - if (ac->onConnect) ac->onConnect(ac,REDIS_ERR); + if (redisFinishAsyncConnect(c, &completed) == REDIS_ERR) { + /* Error! */ + redisCheckSocketError(c); + if (ac->onConnect) ac->onConnect(ac, REDIS_ERR); __redisAsyncDisconnect(ac); return REDIS_ERR; + } else if (completed == 1) { + /* connected! */ + if (ac->onConnect) ac->onConnect(ac, REDIS_OK); + c->flags |= REDIS_CONNECTED; + return REDIS_OK; + } else { + return REDIS_OK; } - - /* Mark context as connected. */ - c->flags |= REDIS_CONNECTED; - if (ac->onConnect) ac->onConnect(ac,REDIS_OK); - return REDIS_OK; } /* This function should be called when the socket is readable. -- cgit v1.2.3 From cbe4ae63aecee40674509a2bf23bc5db067e7f3d Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Mon, 5 Mar 2018 12:17:49 +0200 Subject: Handle connection errors better in blocking mode as well --- async.c | 2 +- net.c | 17 +++++++++++++---- net.h | 2 +- 3 files changed, 15 insertions(+), 6 deletions(-) (limited to 'async.c') diff --git a/async.c b/async.c index 7309344..0cecd30 100644 --- a/async.c +++ b/async.c @@ -508,7 +508,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) { static int __redisAsyncHandleConnect(redisAsyncContext *ac) { int completed = 0; redisContext *c = &(ac->c); - if (redisFinishAsyncConnect(c, &completed) == REDIS_ERR) { + if (redisCheckConnectDone(c, &completed) == REDIS_ERR) { /* Error! */ redisCheckSocketError(c); if (ac->onConnect) ac->onConnect(ac, REDIS_ERR); diff --git a/net.c b/net.c index 7d5588e..a4b3abc 100644 --- a/net.c +++ b/net.c @@ -221,8 +221,10 @@ static int redisContextWaitReady(redisContext *c, long msec) { return REDIS_ERR; } - if (redisCheckSocketError(c) != REDIS_OK) + if (redisCheckConnectDone(c, &res) != REDIS_OK || res == 0) { + redisCheckSocketError(c); return REDIS_ERR; + } return REDIS_OK; } @@ -232,7 +234,7 @@ static int redisContextWaitReady(redisContext *c, long msec) { return REDIS_ERR; } -int redisFinishAsyncConnect(redisContext *c, int *completed) { +int redisCheckConnectDone(redisContext *c, int *completed) { int rc = connect(c->fd, (const struct sockaddr *)c->saddr, c->addrlen); if (rc == 0) { *completed = 1; @@ -410,8 +412,14 @@ addrretry: if (errno == EHOSTUNREACH) { redisContextCloseFd(c); continue; - } else if (errno == EINPROGRESS && !blocking) { - /* This is ok. */ + } else if (errno == EINPROGRESS) { + if (blocking) { + goto wait_for_ready; + } + /* This is ok. + * Note that even when it's in blocking mode, we unset blocking + * for `connect()` + */ } else if (errno == EADDRNOTAVAIL && reuseaddr) { if (++reuses >= REDIS_CONNECT_RETRIES) { goto error; @@ -420,6 +428,7 @@ addrretry: goto addrretry; } } else { + wait_for_ready: if (redisContextWaitReady(c,timeout_msec) != REDIS_OK) goto error; } diff --git a/net.h b/net.h index c35facc..a11594e 100644 --- a/net.h +++ b/net.h @@ -45,6 +45,6 @@ int redisContextConnectBindTcp(redisContext *c, const char *addr, int port, const char *source_addr); int redisContextConnectUnix(redisContext *c, const char *path, const struct timeval *timeout); int redisKeepAlive(redisContext *c, int interval); -int redisFinishAsyncConnect(redisContext *c, int *completed); +int redisCheckConnectDone(redisContext *c, int *completed); #endif -- cgit v1.2.3 From 08efa46599410e6b56ab19ed1c9a72c67476db9c Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Mon, 27 Nov 2017 15:49:28 -0500 Subject: SSL for async I/O --- Makefile | 6 ++- async.c | 92 +++++++++++++++++++++++++++++++++++++++++ examples/example-libevent-ssl.c | 72 ++++++++++++++++++++++++++++++++ sslio.c | 22 ++++++++-- sslio.h | 9 ++++ 5 files changed, 197 insertions(+), 4 deletions(-) create mode 100644 examples/example-libevent-ssl.c (limited to 'async.c') diff --git a/Makefile b/Makefile index ea96419..b723245 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,8 @@ # This file is released under the BSD license, see the COPYING file OBJ=net.o hiredis.o sds.o async.o read.o sslio.o -EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-ssl +EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib \ + hiredis-example-ssl hiredis-example-libevent-ssl TESTS=hiredis-test LIBNAME=libhiredis PKGCONFNAME=hiredis.pc @@ -94,6 +95,9 @@ static: $(STLIBNAME) hiredis-example-libevent: examples/example-libevent.c adapters/libevent.h $(STLIBNAME) $(CC) -o examples/$@ $(REAL_CFLAGS) $(REAL_LDFLAGS) -I. $< -levent $(STLIBNAME) +hiredis-example-libevent-ssl: examples/example-libevent-ssl.c adapters/libevent.h $(STLIBNAME) + $(CC) -o examples/$@ $(REAL_CFLAGS) $(REAL_LDFLAGS) -I. $< -levent $(STLIBNAME) + hiredis-example-libev: examples/example-libev.c adapters/libev.h $(STLIBNAME) $(CC) -o examples/$@ $(REAL_CFLAGS) $(REAL_LDFLAGS) -I. $< -lev $(STLIBNAME) diff --git a/async.c b/async.c index 0cecd30..5a14d45 100644 --- a/async.c +++ b/async.c @@ -40,6 +40,7 @@ #include "net.h" #include "dict.c" #include "sds.h" +#include "sslio.h" #define _EL_ADD_READ(ctx) do { \ if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \ @@ -524,6 +525,87 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) { } } +#ifndef HIREDIS_NOSSL +/** + * Handle SSL when socket becomes available for reading. This also handles + * read-while-write and write-while-read + */ +static void asyncSslRead(redisAsyncContext *ac) { + int rv; + redisSsl *ssl = ac->c.ssl; + redisContext *c = &ac->c; + + ssl->wantRead = 0; + + if (ssl->pendingWrite) { + int done; + + /* This is probably just a write event */ + ssl->pendingWrite = 0; + rv = redisBufferWrite(c, &done); + if (rv == REDIS_ERR) { + __redisAsyncDisconnect(ac); + return; + } else if (!done) { + _EL_ADD_WRITE(ac); + } + } + + rv = redisBufferRead(c); + if (rv == REDIS_ERR) { + __redisAsyncDisconnect(ac); + } else { + _EL_ADD_READ(ac); + redisProcessCallbacks(ac); + } +} + +/** + * Handle SSL when socket becomes available for writing + */ +static void asyncSslWrite(redisAsyncContext *ac) { + int rv, done = 0; + redisSsl *ssl = ac->c.ssl; + redisContext *c = &ac->c; + + ssl->pendingWrite = 0; + rv = redisBufferWrite(c, &done); + if (rv == REDIS_ERR) { + __redisAsyncDisconnect(ac); + return; + } + + if (!done) { + if (ssl->wantRead) { + /* Need to read-before-write */ + ssl->pendingWrite = 1; + _EL_DEL_WRITE(ac); + } else { + /* No extra reads needed, just need to write more */ + _EL_ADD_WRITE(ac); + } + } else { + /* Already done! */ + _EL_DEL_WRITE(ac); + } + + /* Always reschedule a read */ + _EL_ADD_READ(ac); +} +#else + +/* Just so we're able to compile */ +static void asyncSslRead(redisAsyncContext *ac) { + abort(); + (void)ac; +} +static void asyncSslWrite(redisAsyncContext *ac) { + abort(); + (void)ac; +} + +#endif + /* This function should be called when the socket is readable. * It processes all replies that can be read and executes their callbacks. */ @@ -539,6 +621,11 @@ void redisAsyncHandleRead(redisAsyncContext *ac) { return; } + if (c->flags & REDIS_SSL) { + asyncSslRead(ac); + return; + } + if (redisBufferRead(c) == REDIS_ERR) { __redisAsyncDisconnect(ac); } else { @@ -561,6 +648,11 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { return; } + if (c->flags & REDIS_SSL) { + asyncSslWrite(ac); + return; + } + if (redisBufferWrite(c,&done) == REDIS_ERR) { __redisAsyncDisconnect(ac); } else { diff --git a/examples/example-libevent-ssl.c b/examples/example-libevent-ssl.c new file mode 100644 index 0000000..f780e3e --- /dev/null +++ b/examples/example-libevent-ssl.c @@ -0,0 +1,72 @@ +#include +#include +#include +#include + +#include +#include +#include + +void getCallback(redisAsyncContext *c, void *r, void *privdata) { + redisReply *reply = r; + if (reply == NULL) return; + printf("argv[%s]: %s\n", (char*)privdata, reply->str); + + /* Disconnect after receiving the reply to GET */ + redisAsyncDisconnect(c); +} + +void connectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + printf("Connected...\n"); +} + +void disconnectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + printf("Disconnected...\n"); +} + +int main (int argc, char **argv) { + signal(SIGPIPE, SIG_IGN); + struct event_base *base = event_base_new(); + if (argc < 5) { + fprintf(stderr, + "Usage: %s [ca]\n", argv[0]); + exit(1); + } + + const char *value = argv[1]; + size_t nvalue = strlen(value); + + const char *hostname = argv[2]; + int port = atoi(argv[3]); + + const char *cert = argv[4]; + const char *certKey = argv[5]; + const char *caCert = argc > 5 ? argv[6] : NULL; + + redisAsyncContext *c = redisAsyncConnect(hostname, port); + if (c->err) { + /* Let *c leak for now... */ + printf("Error: %s\n", c->errstr); + return 1; + } + if (redisSecureConnection(&c->c, caCert, cert, certKey) != REDIS_OK) { + printf("SSL Error!\n"); + exit(1); + } + + redisLibeventAttach(c,base); + redisAsyncSetConnectCallback(c,connectCallback); + redisAsyncSetDisconnectCallback(c,disconnectCallback); + redisAsyncCommand(c, NULL, NULL, "SET key %b", value, nvalue); + redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); + event_base_dispatch(base); + return 0; +} diff --git a/sslio.c b/sslio.c index 6958d37..3b08140 100644 --- a/sslio.c +++ b/sslio.c @@ -99,7 +99,7 @@ int redisSslCreate(redisContext *c, const char *capath, const char *certpath, redisSsl *s = c->ssl; s->ctx = SSL_CTX_new(SSLv23_client_method()); - /* SSL_CTX_set_info_callback(s->ctx, sslLogCallback); */ + SSL_CTX_set_info_callback(s->ctx, sslLogCallback); SSL_CTX_set_mode(s->ctx, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); SSL_CTX_set_options(s->ctx, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3); SSL_CTX_set_verify(s->ctx, SSL_VERIFY_PEER, NULL); @@ -153,6 +153,22 @@ int redisSslCreate(redisContext *c, const char *capath, const char *certpath, return REDIS_ERR; } +static int maybeCheckWant(redisSsl *rssl, int rv) { + /** + * If the error is WANT_READ or WANT_WRITE, the appropriate flags are set + * and true is returned. False is returned otherwise + */ + if (rv == SSL_ERROR_WANT_READ) { + rssl->wantRead = 1; + return 1; + } else if (rv == SSL_ERROR_WANT_WRITE) { + rssl->pendingWrite = 1; + return 1; + } else { + return 0; + } +} + int redisSslRead(redisContext *c, char *buf, size_t bufcap) { int nread = SSL_read(c->ssl->ssl, buf, bufcap); if (nread > 0) { @@ -162,7 +178,7 @@ int redisSslRead(redisContext *c, char *buf, size_t bufcap) { return -1; } else { int err = SSL_get_error(c->ssl->ssl, nread); - if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) { + if (maybeCheckWant(c->ssl, err)) { return 0; } else { __redisSetError(c, REDIS_ERR_IO, NULL); @@ -181,7 +197,7 @@ int redisSslWrite(redisContext *c) { c->ssl->lastLen = len; int err = SSL_get_error(c->ssl->ssl, rv); - if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) { + if (maybeCheckWant(c->ssl, err)) { return 0; } else { __redisSetError(c, REDIS_ERR_IO, NULL); diff --git a/sslio.h b/sslio.h index a410cb3..1f46b03 100644 --- a/sslio.h +++ b/sslio.h @@ -33,6 +33,15 @@ typedef struct redisSsl { * previously called with in the event of an SSL_read/SSL_write situation */ size_t lastLen; + + /** Whether the SSL layer requires read (possibly before a write) */ + int wantRead; + + /** + * Whether a write was requested prior to a read. If set, the write() + * should resume whenever a read takes place, if possible + */ + int pendingWrite; } redisSsl; struct redisContext; -- cgit v1.2.3 From 4127e4488bcebceac59aefe6013e3f854b192394 Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Tue, 9 Jan 2018 06:57:17 -0500 Subject: Don't add dead code for HIREDIS_NOSSL We changed this to `HIREDIS_SSL` --- async.c | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) (limited to 'async.c') diff --git a/async.c b/async.c index 5a14d45..db59036 100644 --- a/async.c +++ b/async.c @@ -525,10 +525,12 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) { } } -#ifndef HIREDIS_NOSSL /** * Handle SSL when socket becomes available for reading. This also handles - * read-while-write and write-while-read + * read-while-write and write-while-read. + * + * These functions will not work properly unless `HIREDIS_SSL` is defined + * (however, they will compile) */ static void asyncSslRead(redisAsyncContext *ac) { int rv; @@ -592,19 +594,6 @@ static void asyncSslWrite(redisAsyncContext *ac) { /* Always reschedule a read */ _EL_ADD_READ(ac); } -#else - -/* Just so we're able to compile */ -static void asyncSslRead(redisAsyncContext *ac) { - abort(); - (void)ac; -} -static void asyncSslWrite(redisAsyncContext *ac) { - abort(); - (void)ac; -} - -#endif /* This function should be called when the socket is readable. * It processes all replies that can be read and executes their callbacks. -- cgit v1.2.3 From 35a0a1f369cdeb8d7d93fda8278099d7aa60aee6 Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Thu, 8 Mar 2018 11:21:07 +0200 Subject: read/write timeouts --- adapters/libevent.h | 79 ++++++++++++++++++++++++++++++++++++++--------------- async.c | 62 ++++++++++++++++++++++++++++++++++++++--- async.h | 5 ++++ read.h | 1 + 4 files changed, 121 insertions(+), 26 deletions(-) (limited to 'async.c') diff --git a/adapters/libevent.h b/adapters/libevent.h index 7d2bef1..9843696 100644 --- a/adapters/libevent.h +++ b/adapters/libevent.h @@ -36,48 +36,81 @@ typedef struct redisLibeventEvents { redisAsyncContext *context; - struct event *rev, *wev; + struct event *ev, *tmr; + struct event_base *base; + struct timeval tv; + short flags; } redisLibeventEvents; -static void redisLibeventReadEvent(int fd, short event, void *arg) { - ((void)fd); ((void)event); +static void redisLibeventHandler(int fd, short event, void *arg) { + ((void)fd); redisLibeventEvents *e = (redisLibeventEvents*)arg; - redisAsyncHandleRead(e->context); + if (event & EV_TIMEOUT) { + redisAsyncHandleTimeout(e->context); + } + if (e->context && (event & EV_READ)) { + redisAsyncHandleRead(e->context); + } + if (e->context && (event & EV_WRITE)) { + redisAsyncHandleWrite(e->context); + } } -static void redisLibeventWriteEvent(int fd, short event, void *arg) { - ((void)fd); ((void)event); - redisLibeventEvents *e = (redisLibeventEvents*)arg; - redisAsyncHandleWrite(e->context); +static void redisLibeventUpdate(void *privdata, short flag, int isRemove) { + redisLibeventEvents *e = (redisLibeventEvents *)privdata; + const struct timeval *tv = e->tv.tv_sec || e->tv.tv_usec ? &e->tv : NULL; + + if (isRemove) { + if ((e->flags & flag) == 0) { + return; + } else { + e->flags &= ~flag; + } + } else { + if (e->flags & flag) { + return; + } else { + e->flags |= flag; + } + } + + event_del(e->ev); + event_assign(e->ev, e->base, e->context->c.fd, e->flags, + redisLibeventHandler, privdata); + event_add(e->ev, tv); } static void redisLibeventAddRead(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_add(e->rev,NULL); + redisLibeventUpdate(privdata, EV_READ, 0); } static void redisLibeventDelRead(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_del(e->rev); + redisLibeventUpdate(privdata, EV_READ, 1); } static void redisLibeventAddWrite(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_add(e->wev,NULL); + redisLibeventUpdate(privdata, EV_WRITE, 0); } static void redisLibeventDelWrite(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_del(e->wev); + redisLibeventUpdate(privdata, EV_WRITE, 1); } static void redisLibeventCleanup(void *privdata) { redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_free(e->rev); - event_free(e->wev); + event_free(e->ev); free(e); } +static void redisLibeventSetTimeout(void *privdata, struct timeval tv) { + redisLibeventEvents *e = (redisLibeventEvents *)privdata; + short flags = e->flags; + e->flags = 0; + e->tv = tv; + event_del(e->ev); + redisLibeventUpdate(e, flags, 0); +} + static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { redisContext *c = &(ac->c); redisLibeventEvents *e; @@ -96,13 +129,15 @@ static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { ac->ev.addWrite = redisLibeventAddWrite; ac->ev.delWrite = redisLibeventDelWrite; ac->ev.cleanup = redisLibeventCleanup; + ac->ev.scheduleTimer = redisLibeventSetTimeout; ac->ev.data = e; /* Initialize and install read/write events */ - e->rev = event_new(base, c->fd, EV_READ, redisLibeventReadEvent, e); - e->wev = event_new(base, c->fd, EV_WRITE, redisLibeventWriteEvent, e); - event_add(e->rev, NULL); - event_add(e->wev, NULL); + e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e); + e->flags = 0; + e->base = base; + e->tv.tv_sec = 0; + e->tv.tv_usec = 0; return REDIS_OK; } #endif diff --git a/async.c b/async.c index db59036..b789f93 100644 --- a/async.c +++ b/async.c @@ -42,15 +42,19 @@ #include "sds.h" #include "sslio.h" -#define _EL_ADD_READ(ctx) do { \ +#define _EL_ADD_READ(ctx) \ + do { \ + refreshTimeout(ctx); \ if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \ - } while(0) + } while (0) #define _EL_DEL_READ(ctx) do { \ if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \ } while(0) -#define _EL_ADD_WRITE(ctx) do { \ +#define _EL_ADD_WRITE(ctx) \ + do { \ + refreshTimeout(ctx); \ if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \ - } while(0) + } while (0) #define _EL_DEL_WRITE(ctx) do { \ if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \ } while(0) @@ -58,6 +62,19 @@ if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \ } while(0); +static void refreshTimeout(redisAsyncContext *ctx) { + if (ctx->c.timeout && ctx->ev.scheduleTimer && + (ctx->c.timeout->tv_sec || ctx->c.timeout->tv_usec)) { + ctx->ev.scheduleTimer(ctx->ev.data, *ctx->c.timeout); + // } else { + // printf("Not scheduling timer.. (tmo=%p)\n", ctx->c.timeout); + // if (ctx->c.timeout){ + // printf("tv_sec: %u. tv_usec: %u\n", ctx->c.timeout->tv_sec, + // ctx->c.timeout->tv_usec); + // } + } +} + /* Forward declaration of function in hiredis.c */ int __redisAppendCommand(redisContext *c, const char *cmd, size_t len); @@ -656,6 +673,30 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { } } +void __redisSetError(redisContext *c, int type, const char *str); + +void redisAsyncHandleTimeout(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + redisCallback cb; + + if ((c->flags & REDIS_CONNECTED) && ac->replies.head == NULL) { + /* Nothing to do - just an idle timeout */ + return; + } + + if (!c->err) { + __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout"); + } + + if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) { + ac->onConnect(ac, REDIS_ERR); + } + + while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) { + __redisRunCallback(ac, &cb, NULL); + } +} + /* Sets a pointer to the first argument and its length starting at p. Returns * the number of bytes to skip to get to the following argument. */ static const char *nextArgument(const char *start, const char **str, size_t *len) { @@ -795,3 +836,16 @@ int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void int status = __redisAsyncCommand(ac,fn,privdata,cmd,len); return status; } + +void redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) { + if (!ac->c.timeout) { + ac->c.timeout = calloc(1, sizeof(tv)); + } + + if (tv.tv_sec == ac->c.timeout->tv_sec && + tv.tv_usec == ac->c.timeout->tv_usec) { + return; + } + + *ac->c.timeout = tv; +} \ No newline at end of file diff --git a/async.h b/async.h index 740555c..3d51a35 100644 --- a/async.h +++ b/async.h @@ -57,6 +57,7 @@ typedef struct redisCallbackList { /* Connection callback prototypes */ typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status); typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status); +typedef void(redisTimerCallback)(void *timer, void *privdata); /* Context for an async connection to Redis */ typedef struct redisAsyncContext { @@ -81,6 +82,7 @@ typedef struct redisAsyncContext { void (*addWrite)(void *privdata); void (*delWrite)(void *privdata); void (*cleanup)(void *privdata); + void (*scheduleTimer)(void *privdata, struct timeval tv); } ev; /* Called when either the connection is terminated due to an error or per @@ -113,12 +115,15 @@ redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, redisAsyncContext *redisAsyncConnectUnix(const char *path); int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn); int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); + +void redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv); void redisAsyncDisconnect(redisAsyncContext *ac); void redisAsyncFree(redisAsyncContext *ac); /* Handle read/write events */ void redisAsyncHandleRead(redisAsyncContext *ac); void redisAsyncHandleWrite(redisAsyncContext *ac); +void redisAsyncHandleTimeout(redisAsyncContext *ac); /* Command functions for an async context. Write the command to the * output buffer and register the provided callback. */ diff --git a/read.h b/read.h index 2988aa4..0894b78 100644 --- a/read.h +++ b/read.h @@ -45,6 +45,7 @@ #define REDIS_ERR_EOF 3 /* End of file */ #define REDIS_ERR_PROTOCOL 4 /* Protocol error */ #define REDIS_ERR_OOM 5 /* Out of memory */ +#define REDIS_ERR_TIMEOUT 6 /* Timed out */ #define REDIS_ERR_OTHER 2 /* Everything else... */ #define REDIS_REPLY_STRING 1 -- cgit v1.2.3 From deba8d956d578547319919adedda4714f779dbb9 Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Thu, 6 Dec 2018 04:54:05 -0500 Subject: Allow connectWithOptions for async as well --- async.c | 52 ++++++++++++++++++++++++---------------------------- async.h | 1 + 2 files changed, 25 insertions(+), 28 deletions(-) (limited to 'async.c') diff --git a/async.c b/async.c index b789f93..a32d528 100644 --- a/async.c +++ b/async.c @@ -168,56 +168,52 @@ static void __redisAsyncCopyError(redisAsyncContext *ac) { ac->errstr = c->errstr; } -redisAsyncContext *redisAsyncConnect(const char *ip, int port) { +redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) { + redisOptions myOptions = *options; redisContext *c; redisAsyncContext *ac; - c = redisConnectNonBlock(ip,port); - if (c == NULL) + myOptions.options |= REDIS_OPT_NONBLOCK; + c = redisConnectWithOptions(&myOptions); + if (c == NULL) { return NULL; - + } ac = redisAsyncInitialize(c); if (ac == NULL) { redisFree(c); return NULL; } - __redisAsyncCopyError(ac); return ac; } +redisAsyncContext *redisAsyncConnect(const char *ip, int port) { + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, ip, port); + return redisAsyncConnectWithOptions(&options); +} + redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr) { - redisContext *c = redisConnectBindNonBlock(ip,port,source_addr); - redisAsyncContext *ac = redisAsyncInitialize(c); - __redisAsyncCopyError(ac); - return ac; + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, ip, port); + options.endpoint.tcp.source_addr = source_addr; + return redisAsyncConnectWithOptions(&options); } redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, const char *source_addr) { - redisContext *c = redisConnectBindNonBlockWithReuse(ip,port,source_addr); - redisAsyncContext *ac = redisAsyncInitialize(c); - __redisAsyncCopyError(ac); - return ac; + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, ip, port); + options.options |= REDIS_OPT_REUSEADDR; + options.endpoint.tcp.source_addr = source_addr; + return redisAsyncConnectWithOptions(&options); } redisAsyncContext *redisAsyncConnectUnix(const char *path) { - redisContext *c; - redisAsyncContext *ac; - - c = redisConnectUnixNonBlock(path); - if (c == NULL) - return NULL; - - ac = redisAsyncInitialize(c); - if (ac == NULL) { - redisFree(c); - return NULL; - } - - __redisAsyncCopyError(ac); - return ac; + redisOptions options = {0}; + REDIS_OPTIONS_SET_UNIX(&options, path); + return redisAsyncConnectWithOptions(&options); } int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) { diff --git a/async.h b/async.h index 3d51a35..40a1819 100644 --- a/async.h +++ b/async.h @@ -108,6 +108,7 @@ typedef struct redisAsyncContext { } redisAsyncContext; /* Functions that proxy to hiredis */ +redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options); redisAsyncContext *redisAsyncConnect(const char *ip, int port); redisAsyncContext *redisAsyncConnectBind(const char *ip, int port, const char *source_addr); redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, -- cgit v1.2.3 From 5f633ac4ec0ee818fb9785e751fbbdfab48f9542 Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Mon, 11 Feb 2019 11:26:15 -0500 Subject: fix potential uninitialized read If callback was set before scheduleTimer was set (i..e before one of the attach()) calls. --- async.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'async.c') diff --git a/async.c b/async.c index a32d528..0ed973e 100644 --- a/async.c +++ b/async.c @@ -144,6 +144,7 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { ac->ev.addWrite = NULL; ac->ev.delWrite = NULL; ac->ev.cleanup = NULL; + ac->ev.scheduleTimer = NULL; ac->onConnect = NULL; ac->onDisconnect = NULL; @@ -844,4 +845,4 @@ void redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) { } *ac->c.timeout = tv; -} \ No newline at end of file +} -- cgit v1.2.3 From 5eb6958870b71d30e3ed63e65e9f1d546dc419ec Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Thu, 21 Feb 2019 11:49:25 -0500 Subject: Allow option for async connections to not automatically free --- async.c | 7 ++++++- hiredis.c | 3 +++ hiredis.h | 12 ++++++++++++ 3 files changed, 21 insertions(+), 1 deletion(-) (limited to 'async.c') diff --git a/async.c b/async.c index 0ed973e..0a2659c 100644 --- a/async.c +++ b/async.c @@ -361,7 +361,9 @@ static void __redisAsyncDisconnect(redisAsyncContext *ac) { /* For non-clean disconnects, __redisAsyncFree() will execute pending * callbacks with a NULL-reply. */ - __redisAsyncFree(ac); + if (!(c->flags & REDIS_NO_AUTO_FREE)) { + __redisAsyncFree(ac); + } } /* Tries to do a clean disconnect from Redis, meaning it stops new commands @@ -373,6 +375,9 @@ static void __redisAsyncDisconnect(redisAsyncContext *ac) { void redisAsyncDisconnect(redisAsyncContext *ac) { redisContext *c = &(ac->c); c->flags |= REDIS_DISCONNECTING; + + /** unset the auto-free flag here, because disconnect undoes this */ + c->flags &= ~REDIS_NO_AUTO_FREE; if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL) __redisAsyncDisconnect(ac); } diff --git a/hiredis.c b/hiredis.c index ba8d316..2506d51 100644 --- a/hiredis.c +++ b/hiredis.c @@ -668,6 +668,9 @@ redisContext *redisConnectWithOptions(const redisOptions *options) { if (options->options & REDIS_OPT_REUSEADDR) { c->flags |= REDIS_REUSEADDR; } + if (options->options & REDIS_OPT_NOAUTOFREE) { + c->flags |= REDIS_NO_AUTO_FREE; + } if (options->type == REDIS_CONN_TCP) { redisContextConnectBindTcp(c, options->endpoint.tcp.ip, diff --git a/hiredis.h b/hiredis.h index 0d646f8..4025869 100644 --- a/hiredis.h +++ b/hiredis.h @@ -77,6 +77,12 @@ /* Flag that is set when this connection is done through SSL */ #define REDIS_SSL 0x100 +/** + * Flag that indicates the user does not want the context to + * be automatically freed upon error + */ +#define REDIS_NO_AUTO_FREE 0x200 + #define REDIS_KEEPALIVE_INTERVAL 15 /* seconds */ /* number of times we retry to connect in the case of EADDRNOTAVAIL and @@ -121,6 +127,12 @@ struct redisSsl; #define REDIS_OPT_NONBLOCK 0x01 #define REDIS_OPT_REUSEADDR 0x02 +/** + * Don't automatically free the async object on a connection failure, + * or other implicit conditions. Only free on an explicit call to disconnect() or free() + */ +#define REDIS_OPT_NOAUTOFREE 0x04 + typedef struct { /* * the type of connection to use. This also indicates which -- cgit v1.2.3 From 17b41740899ffcf22c150b3ab2d981938bb1a3e2 Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Mon, 11 Mar 2019 09:01:32 -0400 Subject: Retain the same semantics for connection error on connection timeout This ensures that a disconnect occurs. This commit also ensures that disconnects will clean the socket even if the user is in no-auto-free mode --- async.c | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'async.c') diff --git a/async.c b/async.c index 0a2659c..b6163f2 100644 --- a/async.c +++ b/async.c @@ -60,6 +60,7 @@ } while(0) #define _EL_CLEANUP(ctx) do { \ if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \ + ctx->ev.cleanup = NULL; \ } while(0); static void refreshTimeout(redisAsyncContext *ctx) { @@ -359,6 +360,10 @@ static void __redisAsyncDisconnect(redisAsyncContext *ac) { c->flags |= REDIS_DISCONNECTING; } + /* cleanup event library on disconnect. + * this is safe to call multiple times */ + _EL_CLEANUP(ac); + /* For non-clean disconnects, __redisAsyncFree() will execute pending * callbacks with a NULL-reply. */ if (!(c->flags & REDIS_NO_AUTO_FREE)) { @@ -697,6 +702,12 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) { while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) { __redisRunCallback(ac, &cb, NULL); } + + /** + * TODO: Don't automatically sever the connection, + * rather, allow to ignore responses before the queue is clear + */ + __redisAsyncDisconnect(ac); } /* Sets a pointer to the first argument and its length starting at p. Returns -- cgit v1.2.3 From cdb836d5f845d2bf0fbe58671d073869f41a9936 Mon Sep 17 00:00:00 2001 From: jinjiazhang Date: Sat, 13 Apr 2019 10:38:34 +0800 Subject: Fix Compile Error On Windows (Visual Studio) --- async.c | 5 +++++ hiredis.h | 4 ++++ sds.h | 3 +++ sockcompat.h | 2 ++ 4 files changed, 14 insertions(+) (limited to 'async.c') diff --git a/async.c b/async.c index b6163f2..50eb2d9 100644 --- a/async.c +++ b/async.c @@ -32,7 +32,12 @@ #include "fmacros.h" #include #include +#ifndef _WIN32 #include +#else +#define strcasecmp stricmp +#define strncasecmp strnicmp +#endif #include #include #include diff --git a/hiredis.h b/hiredis.h index 6e56c5e..1ae1c0a 100644 --- a/hiredis.h +++ b/hiredis.h @@ -35,7 +35,11 @@ #define __HIREDIS_H #include "read.h" #include /* for va_list */ +#ifndef _WIN32 #include /* for struct timeval */ +#else +#include +#endif #include /* uintXX_t, etc */ #include "sds.h" /* for sds */ diff --git a/sds.h b/sds.h index 13be75a..eafe2cf 100644 --- a/sds.h +++ b/sds.h @@ -34,6 +34,9 @@ #define __SDS_H #define SDS_MAX_PREALLOC (1024*1024) +#ifdef _WIN32 +#define __attribute__(x) +#endif #include #include diff --git a/sockcompat.h b/sockcompat.h index 4965b7b..e0b2e5e 100644 --- a/sockcompat.h +++ b/sockcompat.h @@ -50,6 +50,8 @@ #include #include +typedef signed long ssize_t; + /* Emulate the parts of the BSD socket API that we need (override the winsock signatures). */ int win32_getaddrinfo(const char *node, const char *service, const struct addrinfo *hints, struct addrinfo **res); const char *win32_gai_strerror(int errcode); -- cgit v1.2.3 From a1d4da63b87be5b99c96879105e47e8f3b83bd06 Mon Sep 17 00:00:00 2001 From: Odin Hultgren Van Der Horst Date: Mon, 22 Jul 2019 11:06:10 +0200 Subject: Removed whitespace before newline - Removed whitespace before newline - Removed win style newline --- README.md | 2 +- async.c | 8 ++++---- sds.c | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) (limited to 'async.c') diff --git a/README.md b/README.md index 01223ea..e757d07 100644 --- a/README.md +++ b/README.md @@ -406,6 +406,6 @@ as soon as possible in order to prevent allocation of useless memory. ## AUTHORS Hiredis was written by Salvatore Sanfilippo (antirez at gmail) and -Pieter Noordhuis (pcnoordhuis at gmail) and is released under the BSD license. +Pieter Noordhuis (pcnoordhuis at gmail) and is released under the BSD license. Hiredis is currently maintained by Matt Stancliff (matt at genges dot com) and Jan-Erik Rediger (janerik at fnordig dot com) diff --git a/async.c b/async.c index 50eb2d9..171fabd 100644 --- a/async.c +++ b/async.c @@ -34,8 +34,8 @@ #include #ifndef _WIN32 #include -#else -#define strcasecmp stricmp +#else +#define strcasecmp stricmp #define strncasecmp strnicmp #endif #include @@ -438,7 +438,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, assert(reply->element[2]->type == REDIS_REPLY_INTEGER); /* Unset subscribed flag only when no pipelined pending subscribe. */ - if (reply->element[2]->integer == 0 + if (reply->element[2]->integer == 0 && dictSize(ac->sub.channels) == 0 && dictSize(ac->sub.patterns) == 0) c->flags &= ~REDIS_SUBSCRIBED; @@ -557,7 +557,7 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) { /** * Handle SSL when socket becomes available for reading. This also handles * read-while-write and write-while-read. - * + * * These functions will not work properly unless `HIREDIS_SSL` is defined * (however, they will compile) */ diff --git a/sds.c b/sds.c index 44777b1..6cf7584 100644 --- a/sds.c +++ b/sds.c @@ -1035,7 +1035,7 @@ sds *sdssplitargs(const char *line, int *argc) { s_free(vector); return NULL; } - + vector = new_vector; vector[*argc] = current; (*argc)++; -- cgit v1.2.3