summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBjorn Svensson <bjorn.a.svensson@est.tech>2022-01-11 19:19:43 +0100
committerGitHub <noreply@github.com>2022-01-11 10:19:43 -0800
commitff860e55dbe49bbd3dc9a89d055047d0731ecc9f (patch)
treeb957661023eee22b6f12e975bb0158a0f7303fd4
parent24d5344934b866590646591ef0d696cb705a055a (diff)
Correction for command timeout during pubsub (#1038)
* Add test of command timeout during pubsub A timeout of a non-subscribe command will be ignored during pubsub. It will be handled as an idle timeout and a response is awaited for. * Correction for command timeout during pubsub Disconnect when a sent non-subscribe command triggers a timeout.
-rw-r--r--async.c2
-rw-r--r--test.c95
2 files changed, 93 insertions, 4 deletions
diff --git a/async.c b/async.c
index e4a2309..3b5ec27 100644
--- a/async.c
+++ b/async.c
@@ -696,7 +696,7 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) {
redisCallback cb;
if ((c->flags & REDIS_CONNECTED)) {
- if ( ac->replies.head == NULL) {
+ if (ac->replies.head == NULL && ac->sub.replies.head == NULL) {
/* Nothing to do - just an idle timeout */
return;
}
diff --git a/test.c b/test.c
index 6915ba2..e747439 100644
--- a/test.c
+++ b/test.c
@@ -1527,6 +1527,14 @@ void array_cb(redisAsyncContext *ac, void *r, void *privdata) {
state->checkpoint++;
}
+/* Expect a NULL reply */
+void null_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ (void) ac;
+ assert(r == NULL);
+ TestState *state = privdata;
+ state->checkpoint++;
+}
+
static void test_pubsub_handling(struct config config) {
test("Subscribe, handle published message and unsubscribe: ");
/* Setup event dispatcher with a testcase timeout */
@@ -1618,7 +1626,85 @@ static void test_pubsub_handling_resp3(struct config config) {
/* Verify test checkpoints */
assert(state.checkpoint == 5);
}
-#endif
+
+/* Subscribe callback for test_command_timeout_during_pubsub:
+ * - a subscribe response triggers a published message
+ * - the published message triggers a command that times out
+ * - the command timeout triggers a disconnect */
+void subscribe_with_timeout_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ redisReply *reply = r;
+ TestState *state = privdata;
+
+ /* The non-clean disconnect should trigger the
+ * subscription callback with a NULL reply. */
+ if (reply == NULL) {
+ state->checkpoint++;
+ event_base_loopbreak(base);
+ return;
+ }
+
+ assert(reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) &&
+ reply->elements == 3);
+
+ if (strcmp(reply->element[0]->str,"subscribe") == 0) {
+ assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
+ reply->element[2]->str == NULL);
+ publish_msg(state->options,"mychannel","Hello!");
+ state->checkpoint++;
+ } else if (strcmp(reply->element[0]->str,"message") == 0) {
+ assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
+ strcmp(reply->element[2]->str,"Hello!") == 0);
+ state->checkpoint++;
+
+ /* Send a command that will trigger a timeout */
+ redisAsyncCommand(ac,null_cb,state,"DEBUG SLEEP 3");
+ redisAsyncCommand(ac,null_cb,state,"LPUSH mylist foo");
+ } else {
+ printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
+ exit(1);
+ }
+}
+
+static void test_command_timeout_during_pubsub(struct config config) {
+ test("Command timeout during Pub/Sub: ");
+ /* Setup event dispatcher with a testcase timeout */
+ base = event_base_new();
+ struct event *timeout = evtimer_new(base,timeout_cb,NULL);
+ assert(timeout != NULL);
+
+ evtimer_assign(timeout,base,timeout_cb,NULL);
+ struct timeval timeout_tv = {.tv_sec = 10};
+ evtimer_add(timeout,&timeout_tv);
+
+ /* Connect */
+ redisOptions options = get_redis_tcp_options(config);
+ redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
+ assert(ac != NULL && ac->err == 0);
+ redisLibeventAttach(ac,base);
+
+ /* Configure a command timout */
+ struct timeval command_timeout = {.tv_sec = 2};
+ redisAsyncSetTimeout(ac,command_timeout);
+
+ /* Not expecting any push messages in this test */
+ redisAsyncSetPushCallback(ac,unexpected_push_cb);
+
+ /* Switch protocol */
+ redisAsyncCommand(ac,NULL,NULL,"HELLO 3");
+
+ /* Start subscribe */
+ TestState state = {.options = &options, .resp3 = 1};
+ redisAsyncCommand(ac,subscribe_with_timeout_cb,&state,"subscribe mychannel");
+
+ /* Start event dispatching loop */
+ assert(event_base_dispatch(base) == 0);
+ event_free(timeout);
+ event_base_free(base);
+
+ /* Verify test checkpoints */
+ test_cond(state.checkpoint == 5);
+}
+#endif /* HIREDIS_TEST_ASYNC */
int main(int argc, char **argv) {
struct config cfg = {
@@ -1748,8 +1834,11 @@ int main(int argc, char **argv) {
disconnect(c, 0);
test_pubsub_handling(cfg);
- if (major >= 6) test_pubsub_handling_resp3(cfg);
-#endif
+ if (major >= 6) {
+ test_pubsub_handling_resp3(cfg);
+ test_command_timeout_during_pubsub(cfg);
+ }
+#endif /* HIREDIS_TEST_ASYNC */
if (test_inherit_fd) {
printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path);