From 920128a260b0056f7b14b479232d96405d9a6e62 Mon Sep 17 00:00:00 2001 From: Bjorn Svensson Date: Mon, 25 Jan 2021 15:43:40 +0100 Subject: Stack allocate dict iterators Replacing the get & release functions with an initiation function. Simplifies the code and will make sure we run subscription callbacks in OOM scenarios. --- async.c | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) (limited to 'async.c') diff --git a/async.c b/async.c index 145d949..938838a 100644 --- a/async.c +++ b/async.c @@ -306,7 +306,7 @@ static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) { static void __redisAsyncFree(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb; - dictIterator *it; + dictIterator it; dictEntry *de; /* Execute pending callbacks with NULL reply. */ @@ -319,23 +319,17 @@ static void __redisAsyncFree(redisAsyncContext *ac) { /* Run subscription callbacks with NULL reply */ if (ac->sub.channels) { - it = dictGetIterator(ac->sub.channels); - if (it != NULL) { - while ((de = dictNext(it)) != NULL) - __redisRunCallback(ac,dictGetEntryVal(de),NULL); - dictReleaseIterator(it); - } + dictInitIterator(&it,ac->sub.channels); + while ((de = dictNext(&it)) != NULL) + __redisRunCallback(ac,dictGetEntryVal(de),NULL); dictRelease(ac->sub.channels); } if (ac->sub.patterns) { - it = dictGetIterator(ac->sub.patterns); - if (it != NULL) { - while ((de = dictNext(it)) != NULL) - __redisRunCallback(ac,dictGetEntryVal(de),NULL); - dictReleaseIterator(it); - } + dictInitIterator(&it,ac->sub.patterns); + while ((de = dictNext(&it)) != NULL) + __redisRunCallback(ac,dictGetEntryVal(de),NULL); dictRelease(ac->sub.patterns); } -- cgit v1.2.3 From 4bba72103c93eaaa8a6e07176e60d46ab277cf8a Mon Sep 17 00:00:00 2001 From: Bjorn Svensson Date: Tue, 26 Jan 2021 09:10:14 +0100 Subject: Handle OOM during async command callback registration Unless the callback is pushed to the list it will trigger an assert in redisProcessCallbacks() when the response arrives. This change let the user get an early error instead, available in the async context directly. --- async.c | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) (limited to 'async.c') diff --git a/async.c b/async.c index 145d949..074b46e 100644 --- a/async.c +++ b/async.c @@ -802,17 +802,21 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void /* (P)UNSUBSCRIBE does not have its own response: every channel or * pattern that is unsubscribed will receive a message. This means we * should not append a callback function for this command. */ - } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) { - /* Set monitor flag and push callback */ - c->flags |= REDIS_MONITORING; - __redisPushCallback(&ac->replies,&cb); + } else if (strncasecmp(cstr,"monitor\r\n",9) == 0) { + /* Set monitor flag and push callback */ + c->flags |= REDIS_MONITORING; + if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) + goto oom; } else { - if (c->flags & REDIS_SUBSCRIBED) + if (c->flags & REDIS_SUBSCRIBED) { /* This will likely result in an error reply, but it needs to be * received and passed to the callback. */ - __redisPushCallback(&ac->sub.invalid,&cb); - else - __redisPushCallback(&ac->replies,&cb); + if (__redisPushCallback(&ac->sub.invalid,&cb) != REDIS_OK) + goto oom; + } else { + if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) + goto oom; + } } __redisAppendCommand(c,cmd,len); @@ -823,6 +827,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void return REDIS_OK; oom: __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory"); + __redisAsyncCopyError(ac); return REDIS_ERR; } -- cgit v1.2.3 From e06ecf7e45c6a976a2089240fe0b1eae3098e18a Mon Sep 17 00:00:00 2001 From: Kristján Valur Jónsson Date: Thu, 8 Apr 2021 09:38:33 +0000 Subject: Ignore timeout callback from a successful connect --- async.c | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) (limited to 'async.c') diff --git a/async.c b/async.c index dd78dda..3f31286 100644 --- a/async.c +++ b/async.c @@ -690,9 +690,17 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb; - if ((c->flags & REDIS_CONNECTED) && ac->replies.head == NULL) { - /* Nothing to do - just an idle timeout */ - return; + if ((c->flags & REDIS_CONNECTED)) { + if ( ac->replies.head == NULL) { + /* Nothing to do - just an idle timeout */ + return; + } + + if (!ac->c.command_timeout || + (!ac->c.command_timeout->tv_sec && !ac->c.command_timeout->tv_usec)) { + /* A belated connect timeout arriving, ignore */ + return; + } } if (!c->err) { -- cgit v1.2.3 From 5850a8ecd2fb4ab39d80773e3017f02aff097ec4 Mon Sep 17 00:00:00 2001 From: michael-grunder Date: Thu, 17 Jun 2021 13:01:15 -0700 Subject: Ensure we curry any connect error to an async context. --- async.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'async.c') diff --git a/async.c b/async.c index 3f31286..29f6924 100644 --- a/async.c +++ b/async.c @@ -604,7 +604,8 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) { if (redisCheckConnectDone(c, &completed) == REDIS_ERR) { /* Error! */ - redisCheckSocketError(c); + if (redisCheckSocketError(c) == REDIS_ERR) + __redisAsyncCopyError(ac); __redisAsyncHandleConnectFailure(ac); return REDIS_ERR; } else if (completed == 1) { @@ -696,7 +697,7 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) { return; } - if (!ac->c.command_timeout || + if (!ac->c.command_timeout || (!ac->c.command_timeout->tv_sec && !ac->c.command_timeout->tv_usec)) { /* A belated connect timeout arriving, ignore */ return; -- cgit v1.2.3 From f5f31ff9b92b6bdf628716449d0d0782ceb7704a Mon Sep 17 00:00:00 2001 From: "Meir Shpilraien (Spielrein)" Date: Sun, 11 Jul 2021 21:26:20 +0300 Subject: Added REDIS_NO_AUTO_FREE_REPLIES flag (#962) When set hiredis will not automatically free replies in an async context, and the replies must be freed instead by the user. Co-authored-by: Michael Grunder --- async.c | 4 +++- hiredis.c | 3 +++ hiredis.h | 8 ++++++++ 3 files changed, 14 insertions(+), 1 deletion(-) (limited to 'async.c') diff --git a/async.c b/async.c index 29f6924..e37afbd 100644 --- a/async.c +++ b/async.c @@ -569,7 +569,9 @@ void redisProcessCallbacks(redisAsyncContext *ac) { if (cb.fn != NULL) { __redisRunCallback(ac,&cb,reply); - c->reader->fn->freeObject(reply); + if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){ + c->reader->fn->freeObject(reply); + } /* Proceed with free'ing when redisAsyncFree() was called. */ if (c->flags & REDIS_FREEING) { diff --git a/hiredis.c b/hiredis.c index 5825174..9947b1e 100644 --- a/hiredis.c +++ b/hiredis.c @@ -804,6 +804,9 @@ redisContext *redisConnectWithOptions(const redisOptions *options) { if (options->options & REDIS_OPT_NOAUTOFREE) { c->flags |= REDIS_NO_AUTO_FREE; } + if (options->options & REDIS_OPT_NOAUTOFREEREPLIES) { + c->flags |= REDIS_NO_AUTO_FREE_REPLIES; + } /* Set any user supplied RESP3 PUSH handler or use freeReplyObject * as a default unless specifically flagged that we don't want one. */ diff --git a/hiredis.h b/hiredis.h index e77a88a..be8525f 100644 --- a/hiredis.h +++ b/hiredis.h @@ -86,6 +86,9 @@ typedef long long ssize_t; */ #define REDIS_NO_AUTO_FREE 0x200 +/* Flag that indicates the user does not want replies to be automatically freed */ +#define REDIS_NO_AUTO_FREE_REPLIES 0x400 + #define REDIS_KEEPALIVE_INTERVAL 15 /* seconds */ /* number of times we retry to connect in the case of EADDRNOTAVAIL and @@ -153,6 +156,11 @@ struct redisSsl; /* Don't automatically intercept and free RESP3 PUSH replies. */ #define REDIS_OPT_NO_PUSH_AUTOFREE 0x08 +/** + * Don't automatically free replies + */ +#define REDIS_OPT_NOAUTOFREEREPLIES 0x10 + /* In Unix systems a file descriptor is a regular signed int, with -1 * representing an invalid descriptor. In Windows it is a SOCKET * (32- or 64-bit unsigned integer depending on the architecture), where -- cgit v1.2.3 From da5a4ff3622e8744b772a76f6ce580dc9134fb38 Mon Sep 17 00:00:00 2001 From: Bjorn Svensson Date: Wed, 1 Dec 2021 20:43:23 +0100 Subject: Add asynchronous test for pubsub using RESP3 (#1012) * Include `unsubscribe` as a subscribe reply in RESP3 By providing the (p)unsubscribe message via the subscribe callback, instead of via the push callback, we get the same behavior in RESP3 as in RESP2. * Add asynchronous test for pubsub using RESP3 The testcase will subscribe to a channel, and via a second client a message is published to the channel. After receiving the message the testcase will unsubscribe and disconnect. This RESP3 testcase reuses the subscribe callback from the RESP2 testcase to make sure we have a common behavior. --- async.c | 4 ++-- test.c | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 56 insertions(+), 4 deletions(-) (limited to 'async.c') diff --git a/async.c b/async.c index e37afbd..4459c19 100644 --- a/async.c +++ b/async.c @@ -496,8 +496,8 @@ static int redisIsSubscribeReply(redisReply *reply) { len = reply->element[0]->len - off; return !strncasecmp(str, "subscribe", len) || - !strncasecmp(str, "message", len); - + !strncasecmp(str, "message", len) || + !strncasecmp(str, "unsubscribe", len); } void redisProcessCallbacks(redisAsyncContext *ac) { diff --git a/test.c b/test.c index 04d5197..7af9bee 100644 --- a/test.c +++ b/test.c @@ -1453,6 +1453,7 @@ struct event_base *base; typedef struct TestState { redisOptions *options; int checkpoint; + int resp3; } TestState; /* Testcase timeout, will trigger a failure */ @@ -1479,7 +1480,7 @@ void publish_msg(redisOptions *options, const char* channel, const char* msg) { disconnect(c, 0); } -/* Subscribe callback for test_pubsub_handling: +/* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3: * - a published message triggers an unsubscribe * - an unsubscribe response triggers a disconnect */ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { @@ -1487,7 +1488,7 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { TestState *state = privdata; assert(reply != NULL && - reply->type == REDIS_REPLY_ARRAY && + reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) && reply->elements == 3); if (strcmp(reply->element[0]->str,"subscribe") == 0) { @@ -1546,6 +1547,49 @@ static void test_pubsub_handling(struct config config) { /* Verify test checkpoints */ assert(state.checkpoint == 1); } + +/* Unexpected push message, will trigger a failure */ +void unexpected_push_cb(redisAsyncContext *ac, void *r) { + (void) ac; (void) r; + printf("Unexpected call to the PUSH callback!\n"); + exit(1); +} + +static void test_pubsub_handling_resp3(struct config config) { + test("Subscribe, handle published message and unsubscribe using RESP3: "); + /* Setup event dispatcher with a testcase timeout */ + base = event_base_new(); + struct event *timeout = evtimer_new(base, timeout_cb, NULL); + assert(timeout != NULL); + + evtimer_assign(timeout,base,timeout_cb,NULL); + struct timeval timeout_tv = {.tv_sec = 10}; + evtimer_add(timeout, &timeout_tv); + + /* Connect */ + redisOptions options = get_redis_tcp_options(config); + redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); + assert(ac != NULL && ac->err == 0); + redisLibeventAttach(ac,base); + + /* Not expecting any push messages in this test */ + redisAsyncSetPushCallback(ac, unexpected_push_cb); + + /* Switch protocol */ + redisAsyncCommand(ac,NULL,NULL,"HELLO 3"); + + /* Start subscribe */ + TestState state = {.options = &options, .resp3 = 1}; + redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel"); + + /* Start event dispatching loop */ + test_cond(event_base_dispatch(base) == 0); + event_free(timeout); + event_base_free(base); + + /* Verify test checkpoints */ + assert(state.checkpoint == 1); +} #endif int main(int argc, char **argv) { @@ -1668,7 +1712,15 @@ int main(int argc, char **argv) { #ifdef HIREDIS_TEST_ASYNC printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port); + cfg.type = CONN_TCP; + + int major; + redisContext *c = do_connect(cfg); + get_redis_version(c, &major, NULL); + disconnect(c, 0); + test_pubsub_handling(cfg); + if (major >= 6) test_pubsub_handling_resp3(cfg); #endif if (test_inherit_fd) { -- cgit v1.2.3