From 9ce15c4b39253792899cbbe338cee430811d0eb2 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Sun, 4 Nov 2018 09:52:02 +0200 Subject: Fix errors not propagating properly with libuv.h. --- adapters/libuv.h | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) (limited to 'adapters') diff --git a/adapters/libuv.h b/adapters/libuv.h index ff08c25..39ef7cf 100644 --- a/adapters/libuv.h +++ b/adapters/libuv.h @@ -15,15 +15,12 @@ typedef struct redisLibuvEvents { static void redisLibuvPoll(uv_poll_t* handle, int status, int events) { redisLibuvEvents* p = (redisLibuvEvents*)handle->data; + int ev = (status ? p->events : events); - if (status != 0) { - return; - } - - if (p->context != NULL && (events & UV_READABLE)) { + if (p->context != NULL && (ev & UV_READABLE)) { redisAsyncHandleRead(p->context); } - if (p->context != NULL && (events & UV_WRITABLE)) { + if (p->context != NULL && (ev & UV_WRITABLE)) { redisAsyncHandleWrite(p->context); } } -- cgit v1.2.3 From 35a0a1f369cdeb8d7d93fda8278099d7aa60aee6 Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Thu, 8 Mar 2018 11:21:07 +0200 Subject: read/write timeouts --- adapters/libevent.h | 79 ++++++++++++++++++++++++++++++++++++++--------------- async.c | 62 ++++++++++++++++++++++++++++++++++++++--- async.h | 5 ++++ read.h | 1 + 4 files changed, 121 insertions(+), 26 deletions(-) (limited to 'adapters') diff --git a/adapters/libevent.h b/adapters/libevent.h index 7d2bef1..9843696 100644 --- a/adapters/libevent.h +++ b/adapters/libevent.h @@ -36,48 +36,81 @@ typedef struct redisLibeventEvents { redisAsyncContext *context; - struct event *rev, *wev; + struct event *ev, *tmr; + struct event_base *base; + struct timeval tv; + short flags; } redisLibeventEvents; -static void redisLibeventReadEvent(int fd, short event, void *arg) { - ((void)fd); ((void)event); +static void redisLibeventHandler(int fd, short event, void *arg) { + ((void)fd); redisLibeventEvents *e = (redisLibeventEvents*)arg; - redisAsyncHandleRead(e->context); + if (event & EV_TIMEOUT) { + redisAsyncHandleTimeout(e->context); + } + if (e->context && (event & EV_READ)) { + redisAsyncHandleRead(e->context); + } + if (e->context && (event & EV_WRITE)) { + redisAsyncHandleWrite(e->context); + } } -static void redisLibeventWriteEvent(int fd, short event, void *arg) { - ((void)fd); ((void)event); - redisLibeventEvents *e = (redisLibeventEvents*)arg; - redisAsyncHandleWrite(e->context); +static void redisLibeventUpdate(void *privdata, short flag, int isRemove) { + redisLibeventEvents *e = (redisLibeventEvents *)privdata; + const struct timeval *tv = e->tv.tv_sec || e->tv.tv_usec ? &e->tv : NULL; + + if (isRemove) { + if ((e->flags & flag) == 0) { + return; + } else { + e->flags &= ~flag; + } + } else { + if (e->flags & flag) { + return; + } else { + e->flags |= flag; + } + } + + event_del(e->ev); + event_assign(e->ev, e->base, e->context->c.fd, e->flags, + redisLibeventHandler, privdata); + event_add(e->ev, tv); } static void redisLibeventAddRead(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_add(e->rev,NULL); + redisLibeventUpdate(privdata, EV_READ, 0); } static void redisLibeventDelRead(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_del(e->rev); + redisLibeventUpdate(privdata, EV_READ, 1); } static void redisLibeventAddWrite(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_add(e->wev,NULL); + redisLibeventUpdate(privdata, EV_WRITE, 0); } static void redisLibeventDelWrite(void *privdata) { - redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_del(e->wev); + redisLibeventUpdate(privdata, EV_WRITE, 1); } static void redisLibeventCleanup(void *privdata) { redisLibeventEvents *e = (redisLibeventEvents*)privdata; - event_free(e->rev); - event_free(e->wev); + event_free(e->ev); free(e); } +static void redisLibeventSetTimeout(void *privdata, struct timeval tv) { + redisLibeventEvents *e = (redisLibeventEvents *)privdata; + short flags = e->flags; + e->flags = 0; + e->tv = tv; + event_del(e->ev); + redisLibeventUpdate(e, flags, 0); +} + static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { redisContext *c = &(ac->c); redisLibeventEvents *e; @@ -96,13 +129,15 @@ static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { ac->ev.addWrite = redisLibeventAddWrite; ac->ev.delWrite = redisLibeventDelWrite; ac->ev.cleanup = redisLibeventCleanup; + ac->ev.scheduleTimer = redisLibeventSetTimeout; ac->ev.data = e; /* Initialize and install read/write events */ - e->rev = event_new(base, c->fd, EV_READ, redisLibeventReadEvent, e); - e->wev = event_new(base, c->fd, EV_WRITE, redisLibeventWriteEvent, e); - event_add(e->rev, NULL); - event_add(e->wev, NULL); + e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e); + e->flags = 0; + e->base = base; + e->tv.tv_sec = 0; + e->tv.tv_usec = 0; return REDIS_OK; } #endif diff --git a/async.c b/async.c index db59036..b789f93 100644 --- a/async.c +++ b/async.c @@ -42,15 +42,19 @@ #include "sds.h" #include "sslio.h" -#define _EL_ADD_READ(ctx) do { \ +#define _EL_ADD_READ(ctx) \ + do { \ + refreshTimeout(ctx); \ if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \ - } while(0) + } while (0) #define _EL_DEL_READ(ctx) do { \ if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \ } while(0) -#define _EL_ADD_WRITE(ctx) do { \ +#define _EL_ADD_WRITE(ctx) \ + do { \ + refreshTimeout(ctx); \ if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \ - } while(0) + } while (0) #define _EL_DEL_WRITE(ctx) do { \ if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \ } while(0) @@ -58,6 +62,19 @@ if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \ } while(0); +static void refreshTimeout(redisAsyncContext *ctx) { + if (ctx->c.timeout && ctx->ev.scheduleTimer && + (ctx->c.timeout->tv_sec || ctx->c.timeout->tv_usec)) { + ctx->ev.scheduleTimer(ctx->ev.data, *ctx->c.timeout); + // } else { + // printf("Not scheduling timer.. (tmo=%p)\n", ctx->c.timeout); + // if (ctx->c.timeout){ + // printf("tv_sec: %u. tv_usec: %u\n", ctx->c.timeout->tv_sec, + // ctx->c.timeout->tv_usec); + // } + } +} + /* Forward declaration of function in hiredis.c */ int __redisAppendCommand(redisContext *c, const char *cmd, size_t len); @@ -656,6 +673,30 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { } } +void __redisSetError(redisContext *c, int type, const char *str); + +void redisAsyncHandleTimeout(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + redisCallback cb; + + if ((c->flags & REDIS_CONNECTED) && ac->replies.head == NULL) { + /* Nothing to do - just an idle timeout */ + return; + } + + if (!c->err) { + __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout"); + } + + if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) { + ac->onConnect(ac, REDIS_ERR); + } + + while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) { + __redisRunCallback(ac, &cb, NULL); + } +} + /* Sets a pointer to the first argument and its length starting at p. Returns * the number of bytes to skip to get to the following argument. */ static const char *nextArgument(const char *start, const char **str, size_t *len) { @@ -795,3 +836,16 @@ int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void int status = __redisAsyncCommand(ac,fn,privdata,cmd,len); return status; } + +void redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) { + if (!ac->c.timeout) { + ac->c.timeout = calloc(1, sizeof(tv)); + } + + if (tv.tv_sec == ac->c.timeout->tv_sec && + tv.tv_usec == ac->c.timeout->tv_usec) { + return; + } + + *ac->c.timeout = tv; +} \ No newline at end of file diff --git a/async.h b/async.h index 740555c..3d51a35 100644 --- a/async.h +++ b/async.h @@ -57,6 +57,7 @@ typedef struct redisCallbackList { /* Connection callback prototypes */ typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status); typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status); +typedef void(redisTimerCallback)(void *timer, void *privdata); /* Context for an async connection to Redis */ typedef struct redisAsyncContext { @@ -81,6 +82,7 @@ typedef struct redisAsyncContext { void (*addWrite)(void *privdata); void (*delWrite)(void *privdata); void (*cleanup)(void *privdata); + void (*scheduleTimer)(void *privdata, struct timeval tv); } ev; /* Called when either the connection is terminated due to an error or per @@ -113,12 +115,15 @@ redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, redisAsyncContext *redisAsyncConnectUnix(const char *path); int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn); int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); + +void redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv); void redisAsyncDisconnect(redisAsyncContext *ac); void redisAsyncFree(redisAsyncContext *ac); /* Handle read/write events */ void redisAsyncHandleRead(redisAsyncContext *ac); void redisAsyncHandleWrite(redisAsyncContext *ac); +void redisAsyncHandleTimeout(redisAsyncContext *ac); /* Command functions for an async context. Write the command to the * output buffer and register the provided callback. */ diff --git a/read.h b/read.h index 2988aa4..0894b78 100644 --- a/read.h +++ b/read.h @@ -45,6 +45,7 @@ #define REDIS_ERR_EOF 3 /* End of file */ #define REDIS_ERR_PROTOCOL 4 /* Protocol error */ #define REDIS_ERR_OOM 5 /* Out of memory */ +#define REDIS_ERR_TIMEOUT 6 /* Timed out */ #define REDIS_ERR_OTHER 2 /* Everything else... */ #define REDIS_REPLY_STRING 1 -- cgit v1.2.3 From 847a20122f3e3c6e69179943f2d397da6712de80 Mon Sep 17 00:00:00 2001 From: valentino Date: Sun, 20 Jan 2019 16:07:55 +0200 Subject: Fix memory leaks --- adapters/libevent.h | 5 ++--- hiredis.c | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'adapters') diff --git a/adapters/libevent.h b/adapters/libevent.h index 9843696..5959e89 100644 --- a/adapters/libevent.h +++ b/adapters/libevent.h @@ -48,10 +48,10 @@ static void redisLibeventHandler(int fd, short event, void *arg) { if (event & EV_TIMEOUT) { redisAsyncHandleTimeout(e->context); } - if (e->context && (event & EV_READ)) { + if ((event & EV_READ) && e->context) { redisAsyncHandleRead(e->context); } - if (e->context && (event & EV_WRITE)) { + if ((event & EV_WRITE) && e->context) { redisAsyncHandleWrite(e->context); } } @@ -107,7 +107,6 @@ static void redisLibeventSetTimeout(void *privdata, struct timeval tv) { short flags = e->flags; e->flags = 0; e->tv = tv; - event_del(e->ev); redisLibeventUpdate(e, flags, 0); } diff --git a/hiredis.c b/hiredis.c index dd499b8..9c2788a 100644 --- a/hiredis.c +++ b/hiredis.c @@ -618,6 +618,7 @@ void redisFree(redisContext *c) { if (c->ssl) { redisFreeSsl(c->ssl); } + free(c); } int redisFreeKeepFd(redisContext *c) { -- cgit v1.2.3 From 24e6166fedbd76a47db2ca0b4f7ab52edc0044e8 Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Mon, 11 Feb 2019 14:52:37 -0500 Subject: libevent: fix invalid mem access on delete within callback enter --- adapters/libevent.h | 41 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 4 deletions(-) (limited to 'adapters') diff --git a/adapters/libevent.h b/adapters/libevent.h index 5959e89..58bab4b 100644 --- a/adapters/libevent.h +++ b/adapters/libevent.h @@ -34,26 +34,49 @@ #include "../hiredis.h" #include "../async.h" +#define REDIS_LIBEVENT_DELETED 0x01 +#define REDIS_LIBEVENT_ENTERED 0x02 + typedef struct redisLibeventEvents { redisAsyncContext *context; struct event *ev, *tmr; struct event_base *base; struct timeval tv; short flags; + short state; } redisLibeventEvents; +static void redisLibeventDestroy(redisLibeventEvents *e) { + free(e); +} + static void redisLibeventHandler(int fd, short event, void *arg) { ((void)fd); redisLibeventEvents *e = (redisLibeventEvents*)arg; - if (event & EV_TIMEOUT) { + e->state |= REDIS_LIBEVENT_ENTERED; + + #define CHECK_DELETED() if (e->state & REDIS_LIBEVENT_DELETED) {\ + redisLibeventDestroy(e);\ + return; \ + } + + if ((event & EV_TIMEOUT) && (e->state & REDIS_LIBEVENT_DELETED) == 0) { redisAsyncHandleTimeout(e->context); + CHECK_DELETED(); } - if ((event & EV_READ) && e->context) { + + if ((event & EV_READ) && e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) { redisAsyncHandleRead(e->context); + CHECK_DELETED(); } - if ((event & EV_WRITE) && e->context) { + + if ((event & EV_WRITE) && e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) { redisAsyncHandleWrite(e->context); + CHECK_DELETED(); } + + e->state &= ~REDIS_LIBEVENT_ENTERED; + #undef CHECK_DELETED } static void redisLibeventUpdate(void *privdata, short flag, int isRemove) { @@ -98,8 +121,18 @@ static void redisLibeventDelWrite(void *privdata) { static void redisLibeventCleanup(void *privdata) { redisLibeventEvents *e = (redisLibeventEvents*)privdata; + if (!e) { + return; + } + event_del(e->ev); event_free(e->ev); - free(e); + e->ev = NULL; + + if (e->state & REDIS_LIBEVENT_ENTERED) { + e->state |= REDIS_LIBEVENT_DELETED; + } else { + free(e); + } } static void redisLibeventSetTimeout(void *privdata, struct timeval tv) { -- cgit v1.2.3 From f60c5506feab92f2b6185661cce16a303d28a6a3 Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Mon, 11 Feb 2019 14:53:42 -0500 Subject: Add EV_PERSIST flag to read events This will avoid the need to constantly reschedule the event --- adapters/libevent.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'adapters') diff --git a/adapters/libevent.h b/adapters/libevent.h index 58bab4b..fb5558e 100644 --- a/adapters/libevent.h +++ b/adapters/libevent.h @@ -98,7 +98,7 @@ static void redisLibeventUpdate(void *privdata, short flag, int isRemove) { } event_del(e->ev); - event_assign(e->ev, e->base, e->context->c.fd, e->flags, + event_assign(e->ev, e->base, e->context->c.fd, e->flags | EV_PERSIST, redisLibeventHandler, privdata); event_add(e->ev, tv); } -- cgit v1.2.3 From f0a7595056d00383701fcb2a6247ea1ec91c2ae9 Mon Sep 17 00:00:00 2001 From: Mark Nunberg Date: Mon, 11 Feb 2019 14:56:56 -0500 Subject: libevent: call destroy from cleanup also, indentation fix --- adapters/libevent.h | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'adapters') diff --git a/adapters/libevent.h b/adapters/libevent.h index fb5558e..1c86c15 100644 --- a/adapters/libevent.h +++ b/adapters/libevent.h @@ -47,7 +47,7 @@ typedef struct redisLibeventEvents { } redisLibeventEvents; static void redisLibeventDestroy(redisLibeventEvents *e) { - free(e); + free(e); } static void redisLibeventHandler(int fd, short event, void *arg) { @@ -56,8 +56,8 @@ static void redisLibeventHandler(int fd, short event, void *arg) { e->state |= REDIS_LIBEVENT_ENTERED; #define CHECK_DELETED() if (e->state & REDIS_LIBEVENT_DELETED) {\ - redisLibeventDestroy(e);\ - return; \ + redisLibeventDestroy(e);\ + return; \ } if ((event & EV_TIMEOUT) && (e->state & REDIS_LIBEVENT_DELETED) == 0) { @@ -122,16 +122,16 @@ static void redisLibeventDelWrite(void *privdata) { static void redisLibeventCleanup(void *privdata) { redisLibeventEvents *e = (redisLibeventEvents*)privdata; if (!e) { - return; + return; } event_del(e->ev); event_free(e->ev); e->ev = NULL; if (e->state & REDIS_LIBEVENT_ENTERED) { - e->state |= REDIS_LIBEVENT_DELETED; + e->state |= REDIS_LIBEVENT_DELETED; } else { - free(e); + redisLibeventDestroy(e); } } -- cgit v1.2.3 From d9e0299f1cf923ebba0e2027a2044954c5a22a38 Mon Sep 17 00:00:00 2001 From: valentino Date: Tue, 19 Feb 2019 19:17:41 +0200 Subject: fix redisLibeventEvents init --- adapters/libevent.h | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'adapters') diff --git a/adapters/libevent.h b/adapters/libevent.h index 1c86c15..a495277 100644 --- a/adapters/libevent.h +++ b/adapters/libevent.h @@ -39,7 +39,7 @@ typedef struct redisLibeventEvents { redisAsyncContext *context; - struct event *ev, *tmr; + struct event *ev; struct event_base *base; struct timeval tv; short flags; @@ -152,7 +152,7 @@ static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { return REDIS_ERR; /* Create container for context and r/w events */ - e = (redisLibeventEvents*)malloc(sizeof(*e)); + e = (redisLibeventEvents*)calloc(1, sizeof(*e)); e->context = ac; /* Register functions to start/stop listening for events */ @@ -166,10 +166,7 @@ static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { /* Initialize and install read/write events */ e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e); - e->flags = 0; e->base = base; - e->tv.tv_sec = 0; - e->tv.tv_usec = 0; return REDIS_OK; } #endif -- cgit v1.2.3