summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBjörn Svensson <bjorn.a.svensson@est.tech>2021-10-25 11:18:08 +0200
committerBjörn Svensson <bjorn.a.svensson@est.tech>2021-10-27 16:46:37 +0200
commit4021726a69abfa31005c8653e1856b59c21bb032 (patch)
tree3a97124ef13a128390379beab303101b21df94da
parent648763c36e9f6493b13a77da35eb33ef0652b4e2 (diff)
Add asynchronous test for pubsub using RESP2
The testcase will subscribe to a channel, and via a second client a message is published to the channel. After receiving the message the testcase will unsubscribe and disconnect.
-rw-r--r--test.c107
1 files changed, 107 insertions, 0 deletions
diff --git a/test.c b/test.c
index 78290a2..870c4c3 100644
--- a/test.c
+++ b/test.c
@@ -18,6 +18,10 @@
#ifdef HIREDIS_TEST_SSL
#include "hiredis_ssl.h"
#endif
+#ifdef HIREDIS_TEST_ASYNC
+#include "adapters/libevent.h"
+#include <event.h>
+#endif
#include "net.h"
#include "win32.h"
@@ -1443,6 +1447,104 @@ static void test_throughput(struct config config) {
// redisFree(c);
// }
+#ifdef HIREDIS_TEST_ASYNC
+struct event_base *base;
+
+typedef struct TestState {
+ redisOptions *options;
+ int checkpoint;
+} TestState;
+
+/* Testcase timeout, will trigger a failure */
+void timeout_cb(int fd, short event, void *arg) {
+ (void) fd; (void) event; (void) arg;
+ printf("Timeout in async testing!\n");
+ exit(1);
+}
+
+/* Unexpected call, will trigger a failure */
+void unexpected_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ (void) ac; (void) r;
+ printf("Unexpected call: %s\n",(char*)privdata);
+ exit(1);
+}
+
+/* Helper function to publish a message via own client. */
+void publish_msg(redisOptions *options, const char* channel, const char* msg) {
+ redisContext *c = redisConnectWithOptions(options);
+ assert(c != NULL);
+ redisReply *reply = redisCommand(c,"PUBLISH %s %s",channel,msg);
+ assert(reply->type == REDIS_REPLY_INTEGER && reply->integer == 1);
+ freeReplyObject(reply);
+ disconnect(c, 0);
+}
+
+/* Subscribe callback for test_pubsub_handling:
+ * - a published message triggers an unsubscribe
+ * - an unsubscribe response triggers a disconnect */
+void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ redisReply *reply = r;
+ TestState *state = privdata;
+
+ assert(reply != NULL &&
+ reply->type == REDIS_REPLY_ARRAY &&
+ reply->elements == 3);
+
+ if (strcmp(reply->element[0]->str,"subscribe") == 0) {
+ assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
+ reply->element[2]->str == NULL);
+ publish_msg(state->options,"mychannel","Hello!");
+ } else if (strcmp(reply->element[0]->str,"message") == 0) {
+ assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
+ strcmp(reply->element[2]->str,"Hello!") == 0);
+ state->checkpoint++;
+
+ /* Unsubscribe after receiving the published message. Send unsubscribe
+ * which should call the callback registered during subscribe */
+ redisAsyncCommand(ac,unexpected_cb,
+ (void*)"unsubscribe should call subscribe_cb()",
+ "unsubscribe");
+ } else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
+ assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
+ reply->element[2]->str == NULL);
+
+ /* Disconnect after unsubscribe */
+ redisAsyncDisconnect(ac);
+ event_base_loopbreak(base);
+ } else {
+ printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
+ exit(1);
+ }
+}
+
+static void test_pubsub_handling(struct config config) {
+ test("Subscribe, handle published message and unsubscribe: ");
+ /* Setup event dispatcher with a testcase timeout */
+ base = event_base_new();
+ struct event timeout;
+ evtimer_assign(&timeout,base,timeout_cb,NULL);
+ struct timeval timeout_tv = {.tv_sec = 10};
+ evtimer_add(&timeout, &timeout_tv);
+
+ /* Connect */
+ redisOptions options = get_redis_tcp_options(config);
+ redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
+ assert(ac != NULL && ac->err == 0);
+ redisLibeventAttach(ac,base);
+
+ /* Start subscribe */
+ TestState state = {.options = &options};
+ redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");
+
+ /* Start event dispatching loop */
+ test_cond(event_base_dispatch(base) == 0);
+ event_base_free(base);
+
+ /* Verify test checkpoints */
+ assert(state.checkpoint == 1);
+}
+#endif
+
int main(int argc, char **argv) {
struct config cfg = {
.tcp = {
@@ -1561,6 +1663,11 @@ int main(int argc, char **argv) {
}
#endif
+#ifdef HIREDIS_TEST_ASYNC
+ printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
+ test_pubsub_handling(cfg);
+#endif
+
if (test_inherit_fd) {
printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path);
if (test_unix_socket) {