summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBjorn Svensson <bjorn.a.svensson@est.tech>2021-12-01 20:43:23 +0100
committerGitHub <noreply@github.com>2021-12-01 11:43:23 -0800
commitda5a4ff3622e8744b772a76f6ce580dc9134fb38 (patch)
tree100af25c22aa05feafa810b123392e80efa4071c
parentb5716ee82926316f7764b834eec636f5652d5600 (diff)
Add asynchronous test for pubsub using RESP3 (#1012)
* Include `unsubscribe` as a subscribe reply in RESP3 By providing the (p)unsubscribe message via the subscribe callback, instead of via the push callback, we get the same behavior in RESP3 as in RESP2. * Add asynchronous test for pubsub using RESP3 The testcase will subscribe to a channel, and via a second client a message is published to the channel. After receiving the message the testcase will unsubscribe and disconnect. This RESP3 testcase reuses the subscribe callback from the RESP2 testcase to make sure we have a common behavior.
-rw-r--r--async.c4
-rw-r--r--test.c56
2 files changed, 56 insertions, 4 deletions
diff --git a/async.c b/async.c
index e37afbd..4459c19 100644
--- a/async.c
+++ b/async.c
@@ -496,8 +496,8 @@ static int redisIsSubscribeReply(redisReply *reply) {
len = reply->element[0]->len - off;
return !strncasecmp(str, "subscribe", len) ||
- !strncasecmp(str, "message", len);
-
+ !strncasecmp(str, "message", len) ||
+ !strncasecmp(str, "unsubscribe", len);
}
void redisProcessCallbacks(redisAsyncContext *ac) {
diff --git a/test.c b/test.c
index 04d5197..7af9bee 100644
--- a/test.c
+++ b/test.c
@@ -1453,6 +1453,7 @@ struct event_base *base;
typedef struct TestState {
redisOptions *options;
int checkpoint;
+ int resp3;
} TestState;
/* Testcase timeout, will trigger a failure */
@@ -1479,7 +1480,7 @@ void publish_msg(redisOptions *options, const char* channel, const char* msg) {
disconnect(c, 0);
}
-/* Subscribe callback for test_pubsub_handling:
+/* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3:
* - a published message triggers an unsubscribe
* - an unsubscribe response triggers a disconnect */
void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
@@ -1487,7 +1488,7 @@ void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
TestState *state = privdata;
assert(reply != NULL &&
- reply->type == REDIS_REPLY_ARRAY &&
+ reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) &&
reply->elements == 3);
if (strcmp(reply->element[0]->str,"subscribe") == 0) {
@@ -1546,6 +1547,49 @@ static void test_pubsub_handling(struct config config) {
/* Verify test checkpoints */
assert(state.checkpoint == 1);
}
+
+/* Unexpected push message, will trigger a failure */
+void unexpected_push_cb(redisAsyncContext *ac, void *r) {
+ (void) ac; (void) r;
+ printf("Unexpected call to the PUSH callback!\n");
+ exit(1);
+}
+
+static void test_pubsub_handling_resp3(struct config config) {
+ test("Subscribe, handle published message and unsubscribe using RESP3: ");
+ /* Setup event dispatcher with a testcase timeout */
+ base = event_base_new();
+ struct event *timeout = evtimer_new(base, timeout_cb, NULL);
+ assert(timeout != NULL);
+
+ evtimer_assign(timeout,base,timeout_cb,NULL);
+ struct timeval timeout_tv = {.tv_sec = 10};
+ evtimer_add(timeout, &timeout_tv);
+
+ /* Connect */
+ redisOptions options = get_redis_tcp_options(config);
+ redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
+ assert(ac != NULL && ac->err == 0);
+ redisLibeventAttach(ac,base);
+
+ /* Not expecting any push messages in this test */
+ redisAsyncSetPushCallback(ac, unexpected_push_cb);
+
+ /* Switch protocol */
+ redisAsyncCommand(ac,NULL,NULL,"HELLO 3");
+
+ /* Start subscribe */
+ TestState state = {.options = &options, .resp3 = 1};
+ redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");
+
+ /* Start event dispatching loop */
+ test_cond(event_base_dispatch(base) == 0);
+ event_free(timeout);
+ event_base_free(base);
+
+ /* Verify test checkpoints */
+ assert(state.checkpoint == 1);
+}
#endif
int main(int argc, char **argv) {
@@ -1668,7 +1712,15 @@ int main(int argc, char **argv) {
#ifdef HIREDIS_TEST_ASYNC
printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
+ cfg.type = CONN_TCP;
+
+ int major;
+ redisContext *c = do_connect(cfg);
+ get_redis_version(c, &major, NULL);
+ disconnect(c, 0);
+
test_pubsub_handling(cfg);
+ if (major >= 6) test_pubsub_handling_resp3(cfg);
#endif
if (test_inherit_fd) {