From 4021726a69abfa31005c8653e1856b59c21bb032 Mon Sep 17 00:00:00 2001 From: Björn Svensson Date: Mon, 25 Oct 2021 11:18:08 +0200 Subject: Add asynchronous test for pubsub using RESP2 The testcase will subscribe to a channel, and via a second client a message is published to the channel. After receiving the message the testcase will unsubscribe and disconnect. --- test.c | 107 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/test.c b/test.c index 78290a2..870c4c3 100644 --- a/test.c +++ b/test.c @@ -18,6 +18,10 @@ #ifdef HIREDIS_TEST_SSL #include "hiredis_ssl.h" #endif +#ifdef HIREDIS_TEST_ASYNC +#include "adapters/libevent.h" +#include +#endif #include "net.h" #include "win32.h" @@ -1443,6 +1447,104 @@ static void test_throughput(struct config config) { // redisFree(c); // } +#ifdef HIREDIS_TEST_ASYNC +struct event_base *base; + +typedef struct TestState { + redisOptions *options; + int checkpoint; +} TestState; + +/* Testcase timeout, will trigger a failure */ +void timeout_cb(int fd, short event, void *arg) { + (void) fd; (void) event; (void) arg; + printf("Timeout in async testing!\n"); + exit(1); +} + +/* Unexpected call, will trigger a failure */ +void unexpected_cb(redisAsyncContext *ac, void *r, void *privdata) { + (void) ac; (void) r; + printf("Unexpected call: %s\n",(char*)privdata); + exit(1); +} + +/* Helper function to publish a message via own client. */ +void publish_msg(redisOptions *options, const char* channel, const char* msg) { + redisContext *c = redisConnectWithOptions(options); + assert(c != NULL); + redisReply *reply = redisCommand(c,"PUBLISH %s %s",channel,msg); + assert(reply->type == REDIS_REPLY_INTEGER && reply->integer == 1); + freeReplyObject(reply); + disconnect(c, 0); +} + +/* Subscribe callback for test_pubsub_handling: + * - a published message triggers an unsubscribe + * - an unsubscribe response triggers a disconnect */ +void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + + assert(reply != NULL && + reply->type == REDIS_REPLY_ARRAY && + reply->elements == 3); + + if (strcmp(reply->element[0]->str,"subscribe") == 0) { + assert(strcmp(reply->element[1]->str,"mychannel") == 0 && + reply->element[2]->str == NULL); + publish_msg(state->options,"mychannel","Hello!"); + } else if (strcmp(reply->element[0]->str,"message") == 0) { + assert(strcmp(reply->element[1]->str,"mychannel") == 0 && + strcmp(reply->element[2]->str,"Hello!") == 0); + state->checkpoint++; + + /* Unsubscribe after receiving the published message. Send unsubscribe + * which should call the callback registered during subscribe */ + redisAsyncCommand(ac,unexpected_cb, + (void*)"unsubscribe should call subscribe_cb()", + "unsubscribe"); + } else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) { + assert(strcmp(reply->element[1]->str,"mychannel") == 0 && + reply->element[2]->str == NULL); + + /* Disconnect after unsubscribe */ + redisAsyncDisconnect(ac); + event_base_loopbreak(base); + } else { + printf("Unexpected pubsub command: %s\n", reply->element[0]->str); + exit(1); + } +} + +static void test_pubsub_handling(struct config config) { + test("Subscribe, handle published message and unsubscribe: "); + /* Setup event dispatcher with a testcase timeout */ + base = event_base_new(); + struct event timeout; + evtimer_assign(&timeout,base,timeout_cb,NULL); + struct timeval timeout_tv = {.tv_sec = 10}; + evtimer_add(&timeout, &timeout_tv); + + /* Connect */ + redisOptions options = get_redis_tcp_options(config); + redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); + assert(ac != NULL && ac->err == 0); + redisLibeventAttach(ac,base); + + /* Start subscribe */ + TestState state = {.options = &options}; + redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel"); + + /* Start event dispatching loop */ + test_cond(event_base_dispatch(base) == 0); + event_base_free(base); + + /* Verify test checkpoints */ + assert(state.checkpoint == 1); +} +#endif + int main(int argc, char **argv) { struct config cfg = { .tcp = { @@ -1561,6 +1663,11 @@ int main(int argc, char **argv) { } #endif +#ifdef HIREDIS_TEST_ASYNC + printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port); + test_pubsub_handling(cfg); +#endif + if (test_inherit_fd) { printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path); if (test_unix_socket) { -- cgit v1.2.3