summaryrefslogtreecommitdiff
path: root/test.c
diff options
context:
space:
mode:
Diffstat (limited to 'test.c')
-rw-r--r--test.c738
1 files changed, 738 insertions, 0 deletions
diff --git a/test.c b/test.c
index 78290a2..e7582d7 100644
--- a/test.c
+++ b/test.c
@@ -15,9 +15,14 @@
#include "hiredis.h"
#include "async.h"
+#include "adapters/poll.h"
#ifdef HIREDIS_TEST_SSL
#include "hiredis_ssl.h"
#endif
+#ifdef HIREDIS_TEST_ASYNC
+#include "adapters/libevent.h"
+#include <event2/event.h>
+#endif
#include "net.h"
#include "win32.h"
@@ -831,6 +836,7 @@ static void test_blocking_connection_errors(void) {
strcmp(c->errstr, "Can't resolve: " HIREDIS_BAD_DOMAIN) == 0 ||
strcmp(c->errstr, "Name does not resolve") == 0 ||
strcmp(c->errstr, "nodename nor servname provided, or not known") == 0 ||
+ strcmp(c->errstr, "node name or service name not known") == 0 ||
strcmp(c->errstr, "No address associated with hostname") == 0 ||
strcmp(c->errstr, "Temporary failure in name resolution") == 0 ||
strcmp(c->errstr, "hostname nor servname provided, or not known") == 0 ||
@@ -910,6 +916,11 @@ static void test_resp3_push_handler(redisContext *c) {
old = redisSetPushCallback(c, push_handler);
test("We can set a custom RESP3 PUSH handler: ");
reply = redisCommand(c, "SET key:0 val:0");
+ /* We need another command because depending on the version of Redis, the
+ * notification may be delivered after the command's reply. */
+ test_cond(reply != NULL);
+ freeReplyObject(reply);
+ reply = redisCommand(c, "PING");
test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && pc.str == 1);
freeReplyObject(reply);
@@ -925,6 +936,12 @@ static void test_resp3_push_handler(redisContext *c) {
assert((reply = redisCommand(c, "GET key:0")) != NULL);
freeReplyObject(reply);
assert((reply = redisCommand(c, "SET key:0 invalid")) != NULL);
+ /* Depending on Redis version, we may receive either push notification or
+ * status reply. Both cases are valid. */
+ if (reply->type == REDIS_REPLY_STATUS) {
+ freeReplyObject(reply);
+ reply = redisCommand(c, "PING");
+ }
test_cond(reply->type == REDIS_REPLY_PUSH);
freeReplyObject(reply);
@@ -1443,6 +1460,699 @@ static void test_throughput(struct config config) {
// redisFree(c);
// }
+#ifdef HIREDIS_TEST_ASYNC
+struct event_base *base;
+
+typedef struct TestState {
+ redisOptions *options;
+ int checkpoint;
+ int resp3;
+ int disconnect;
+} TestState;
+
+/* Helper to disconnect and stop event loop */
+void async_disconnect(redisAsyncContext *ac) {
+ redisAsyncDisconnect(ac);
+ event_base_loopbreak(base);
+}
+
+/* Testcase timeout, will trigger a failure */
+void timeout_cb(int fd, short event, void *arg) {
+ (void) fd; (void) event; (void) arg;
+ printf("Timeout in async testing!\n");
+ exit(1);
+}
+
+/* Unexpected call, will trigger a failure */
+void unexpected_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ (void) ac; (void) r;
+ printf("Unexpected call: %s\n",(char*)privdata);
+ exit(1);
+}
+
+/* Helper function to publish a message via own client. */
+void publish_msg(redisOptions *options, const char* channel, const char* msg) {
+ redisContext *c = redisConnectWithOptions(options);
+ assert(c != NULL);
+ redisReply *reply = redisCommand(c,"PUBLISH %s %s",channel,msg);
+ assert(reply->type == REDIS_REPLY_INTEGER && reply->integer == 1);
+ freeReplyObject(reply);
+ disconnect(c, 0);
+}
+
+/* Expect a reply of type INTEGER */
+void integer_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ redisReply *reply = r;
+ TestState *state = privdata;
+ assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER);
+ state->checkpoint++;
+ if (state->disconnect) async_disconnect(ac);
+}
+
+/* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3:
+ * - a published message triggers an unsubscribe
+ * - a command is sent before the unsubscribe response is received. */
+void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ redisReply *reply = r;
+ TestState *state = privdata;
+
+ assert(reply != NULL &&
+ reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) &&
+ reply->elements == 3);
+
+ if (strcmp(reply->element[0]->str,"subscribe") == 0) {
+ assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
+ reply->element[2]->str == NULL);
+ publish_msg(state->options,"mychannel","Hello!");
+ } else if (strcmp(reply->element[0]->str,"message") == 0) {
+ assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
+ strcmp(reply->element[2]->str,"Hello!") == 0);
+ state->checkpoint++;
+
+ /* Unsubscribe after receiving the published message. Send unsubscribe
+ * which should call the callback registered during subscribe */
+ redisAsyncCommand(ac,unexpected_cb,
+ (void*)"unsubscribe should call subscribe_cb()",
+ "unsubscribe");
+ /* Send a regular command after unsubscribing, then disconnect */
+ state->disconnect = 1;
+ redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo");
+
+ } else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
+ assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
+ reply->element[2]->str == NULL);
+ } else {
+ printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
+ exit(1);
+ }
+}
+
+/* Expect a reply of type ARRAY */
+void array_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ redisReply *reply = r;
+ TestState *state = privdata;
+ assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY);
+ state->checkpoint++;
+ if (state->disconnect) async_disconnect(ac);
+}
+
+/* Expect a NULL reply */
+void null_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ (void) ac;
+ assert(r == NULL);
+ TestState *state = privdata;
+ state->checkpoint++;
+}
+
+static void test_pubsub_handling(struct config config) {
+ test("Subscribe, handle published message and unsubscribe: ");
+ /* Setup event dispatcher with a testcase timeout */
+ base = event_base_new();
+ struct event *timeout = evtimer_new(base, timeout_cb, NULL);
+ assert(timeout != NULL);
+
+ evtimer_assign(timeout,base,timeout_cb,NULL);
+ struct timeval timeout_tv = {.tv_sec = 10};
+ evtimer_add(timeout, &timeout_tv);
+
+ /* Connect */
+ redisOptions options = get_redis_tcp_options(config);
+ redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
+ assert(ac != NULL && ac->err == 0);
+ redisLibeventAttach(ac,base);
+
+ /* Start subscribe */
+ TestState state = {.options = &options};
+ redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");
+
+ /* Make sure non-subscribe commands are handled */
+ redisAsyncCommand(ac,array_cb,&state,"PING");
+
+ /* Start event dispatching loop */
+ test_cond(event_base_dispatch(base) == 0);
+ event_free(timeout);
+ event_base_free(base);
+
+ /* Verify test checkpoints */
+ assert(state.checkpoint == 3);
+}
+
+/* Unexpected push message, will trigger a failure */
+void unexpected_push_cb(redisAsyncContext *ac, void *r) {
+ (void) ac; (void) r;
+ printf("Unexpected call to the PUSH callback!\n");
+ exit(1);
+}
+
+static void test_pubsub_handling_resp3(struct config config) {
+ test("Subscribe, handle published message and unsubscribe using RESP3: ");
+ /* Setup event dispatcher with a testcase timeout */
+ base = event_base_new();
+ struct event *timeout = evtimer_new(base, timeout_cb, NULL);
+ assert(timeout != NULL);
+
+ evtimer_assign(timeout,base,timeout_cb,NULL);
+ struct timeval timeout_tv = {.tv_sec = 10};
+ evtimer_add(timeout, &timeout_tv);
+
+ /* Connect */
+ redisOptions options = get_redis_tcp_options(config);
+ redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
+ assert(ac != NULL && ac->err == 0);
+ redisLibeventAttach(ac,base);
+
+ /* Not expecting any push messages in this test */
+ redisAsyncSetPushCallback(ac, unexpected_push_cb);
+
+ /* Switch protocol */
+ redisAsyncCommand(ac,NULL,NULL,"HELLO 3");
+
+ /* Start subscribe */
+ TestState state = {.options = &options, .resp3 = 1};
+ redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel");
+
+ /* Make sure non-subscribe commands are handled in RESP3 */
+ redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
+ redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
+ redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo");
+ /* Handle an array with 3 elements as a non-subscribe command */
+ redisAsyncCommand(ac,array_cb,&state,"LRANGE mylist 0 2");
+
+ /* Start event dispatching loop */
+ test_cond(event_base_dispatch(base) == 0);
+ event_free(timeout);
+ event_base_free(base);
+
+ /* Verify test checkpoints */
+ assert(state.checkpoint == 6);
+}
+
+/* Subscribe callback for test_command_timeout_during_pubsub:
+ * - a subscribe response triggers a published message
+ * - the published message triggers a command that times out
+ * - the command timeout triggers a disconnect */
+void subscribe_with_timeout_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ redisReply *reply = r;
+ TestState *state = privdata;
+
+ /* The non-clean disconnect should trigger the
+ * subscription callback with a NULL reply. */
+ if (reply == NULL) {
+ state->checkpoint++;
+ event_base_loopbreak(base);
+ return;
+ }
+
+ assert(reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) &&
+ reply->elements == 3);
+
+ if (strcmp(reply->element[0]->str,"subscribe") == 0) {
+ assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
+ reply->element[2]->str == NULL);
+ publish_msg(state->options,"mychannel","Hello!");
+ state->checkpoint++;
+ } else if (strcmp(reply->element[0]->str,"message") == 0) {
+ assert(strcmp(reply->element[1]->str,"mychannel") == 0 &&
+ strcmp(reply->element[2]->str,"Hello!") == 0);
+ state->checkpoint++;
+
+ /* Send a command that will trigger a timeout */
+ redisAsyncCommand(ac,null_cb,state,"DEBUG SLEEP 3");
+ redisAsyncCommand(ac,null_cb,state,"LPUSH mylist foo");
+ } else {
+ printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
+ exit(1);
+ }
+}
+
+static void test_command_timeout_during_pubsub(struct config config) {
+ test("Command timeout during Pub/Sub: ");
+ /* Setup event dispatcher with a testcase timeout */
+ base = event_base_new();
+ struct event *timeout = evtimer_new(base,timeout_cb,NULL);
+ assert(timeout != NULL);
+
+ evtimer_assign(timeout,base,timeout_cb,NULL);
+ struct timeval timeout_tv = {.tv_sec = 10};
+ evtimer_add(timeout,&timeout_tv);
+
+ /* Connect */
+ redisOptions options = get_redis_tcp_options(config);
+ redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
+ assert(ac != NULL && ac->err == 0);
+ redisLibeventAttach(ac,base);
+
+ /* Configure a command timout */
+ struct timeval command_timeout = {.tv_sec = 2};
+ redisAsyncSetTimeout(ac,command_timeout);
+
+ /* Not expecting any push messages in this test */
+ redisAsyncSetPushCallback(ac,unexpected_push_cb);
+
+ /* Switch protocol */
+ redisAsyncCommand(ac,NULL,NULL,"HELLO 3");
+
+ /* Start subscribe */
+ TestState state = {.options = &options, .resp3 = 1};
+ redisAsyncCommand(ac,subscribe_with_timeout_cb,&state,"subscribe mychannel");
+
+ /* Start event dispatching loop */
+ assert(event_base_dispatch(base) == 0);
+ event_free(timeout);
+ event_base_free(base);
+
+ /* Verify test checkpoints */
+ test_cond(state.checkpoint == 5);
+}
+
+/* Subscribe callback for test_pubsub_multiple_channels */
+void subscribe_channel_a_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ redisReply *reply = r;
+ TestState *state = privdata;
+
+ assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY &&
+ reply->elements == 3);
+
+ if (strcmp(reply->element[0]->str,"subscribe") == 0) {
+ assert(strcmp(reply->element[1]->str,"A") == 0);
+ publish_msg(state->options,"A","Hello!");
+ state->checkpoint++;
+ } else if (strcmp(reply->element[0]->str,"message") == 0) {
+ assert(strcmp(reply->element[1]->str,"A") == 0 &&
+ strcmp(reply->element[2]->str,"Hello!") == 0);
+ state->checkpoint++;
+
+ /* Unsubscribe to channels, including channel X & Z which we don't subscribe to */
+ redisAsyncCommand(ac,unexpected_cb,
+ (void*)"unsubscribe should not call unexpected_cb()",
+ "unsubscribe B X A A Z");
+ /* Unsubscribe to patterns, none which we subscribe to */
+ redisAsyncCommand(ac,unexpected_cb,
+ (void*)"punsubscribe should not call unexpected_cb()",
+ "punsubscribe");
+ /* Send a regular command after unsubscribing, then disconnect */
+ state->disconnect = 1;
+ redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo");
+ } else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
+ assert(strcmp(reply->element[1]->str,"A") == 0);
+ state->checkpoint++;
+ } else {
+ printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
+ exit(1);
+ }
+}
+
+/* Subscribe callback for test_pubsub_multiple_channels */
+void subscribe_channel_b_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ redisReply *reply = r;
+ TestState *state = privdata;
+
+ assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY &&
+ reply->elements == 3);
+
+ if (strcmp(reply->element[0]->str,"subscribe") == 0) {
+ assert(strcmp(reply->element[1]->str,"B") == 0);
+ state->checkpoint++;
+ } else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) {
+ assert(strcmp(reply->element[1]->str,"B") == 0);
+ state->checkpoint++;
+ } else {
+ printf("Unexpected pubsub command: %s\n", reply->element[0]->str);
+ exit(1);
+ }
+}
+
+/* Test handling of multiple channels
+ * - subscribe to channel A and B
+ * - a published message on A triggers an unsubscribe of channel B, X, A and Z
+ * where channel X and Z are not subscribed to.
+ * - the published message also triggers an unsubscribe to patterns. Since no
+ * pattern is subscribed to the responded pattern element type is NIL.
+ * - a command sent after unsubscribe triggers a disconnect */
+static void test_pubsub_multiple_channels(struct config config) {
+ test("Subscribe to multiple channels: ");
+ /* Setup event dispatcher with a testcase timeout */
+ base = event_base_new();
+ struct event *timeout = evtimer_new(base,timeout_cb,NULL);
+ assert(timeout != NULL);
+
+ evtimer_assign(timeout,base,timeout_cb,NULL);
+ struct timeval timeout_tv = {.tv_sec = 10};
+ evtimer_add(timeout,&timeout_tv);
+
+ /* Connect */
+ redisOptions options = get_redis_tcp_options(config);
+ redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
+ assert(ac != NULL && ac->err == 0);
+ redisLibeventAttach(ac,base);
+
+ /* Not expecting any push messages in this test */
+ redisAsyncSetPushCallback(ac,unexpected_push_cb);
+
+ /* Start subscribing to two channels */
+ TestState state = {.options = &options};
+ redisAsyncCommand(ac,subscribe_channel_a_cb,&state,"subscribe A");
+ redisAsyncCommand(ac,subscribe_channel_b_cb,&state,"subscribe B");
+
+ /* Start event dispatching loop */
+ assert(event_base_dispatch(base) == 0);
+ event_free(timeout);
+ event_base_free(base);
+
+ /* Verify test checkpoints */
+ test_cond(state.checkpoint == 6);
+}
+
+/* Command callback for test_monitor() */
+void monitor_cb(redisAsyncContext *ac, void *r, void *privdata) {
+ redisReply *reply = r;
+ TestState *state = privdata;
+
+ /* NULL reply is received when BYE triggers a disconnect. */
+ if (reply == NULL) {
+ event_base_loopbreak(base);
+ return;
+ }
+
+ assert(reply != NULL && reply->type == REDIS_REPLY_STATUS);
+ state->checkpoint++;
+
+ if (state->checkpoint == 1) {
+ /* Response from MONITOR */
+ redisContext *c = redisConnectWithOptions(state->options);
+ assert(c != NULL);
+ redisReply *reply = redisCommand(c,"SET first 1");
+ assert(reply->type == REDIS_REPLY_STATUS);
+ freeReplyObject(reply);
+ redisFree(c);
+ } else if (state->checkpoint == 2) {
+ /* Response for monitored command 'SET first 1' */
+ assert(strstr(reply->str,"first") != NULL);
+ redisContext *c = redisConnectWithOptions(state->options);
+ assert(c != NULL);
+ redisReply *reply = redisCommand(c,"SET second 2");
+ assert(reply->type == REDIS_REPLY_STATUS);
+ freeReplyObject(reply);
+ redisFree(c);
+ } else if (state->checkpoint == 3) {
+ /* Response for monitored command 'SET second 2' */
+ assert(strstr(reply->str,"second") != NULL);
+ /* Send QUIT to disconnect */
+ redisAsyncCommand(ac,NULL,NULL,"QUIT");
+ }
+}
+
+/* Test handling of the monitor command
+ * - sends MONITOR to enable monitoring.
+ * - sends SET commands via separate clients to be monitored.
+ * - sends QUIT to stop monitoring and disconnect. */
+static void test_monitor(struct config config) {
+ test("Enable monitoring: ");
+ /* Setup event dispatcher with a testcase timeout */
+ base = event_base_new();
+ struct event *timeout = evtimer_new(base, timeout_cb, NULL);
+ assert(timeout != NULL);
+
+ evtimer_assign(timeout,base,timeout_cb,NULL);
+ struct timeval timeout_tv = {.tv_sec = 10};
+ evtimer_add(timeout, &timeout_tv);
+
+ /* Connect */
+ redisOptions options = get_redis_tcp_options(config);
+ redisAsyncContext *ac = redisAsyncConnectWithOptions(&options);
+ assert(ac != NULL && ac->err == 0);
+ redisLibeventAttach(ac,base);
+
+ /* Not expecting any push messages in this test */
+ redisAsyncSetPushCallback(ac,unexpected_push_cb);
+
+ /* Start monitor */
+ TestState state = {.options = &options};
+ redisAsyncCommand(ac,monitor_cb,&state,"monitor");
+
+ /* Start event dispatching loop */
+ test_cond(event_base_dispatch(base) == 0);
+ event_free(timeout);
+ event_base_free(base);
+
+ /* Verify test checkpoints */
+ assert(state.checkpoint == 3);
+}
+#endif /* HIREDIS_TEST_ASYNC */
+
+/* tests for async api using polling adapter, requires no extra libraries*/
+
+/* enum for the test cases, the callbacks have different logic based on them */
+typedef enum astest_no
+{
+ ASTEST_CONNECT=0,
+ ASTEST_CONN_TIMEOUT,
+ ASTEST_PINGPONG,
+ ASTEST_PINGPONG_TIMEOUT,
+ ASTEST_ISSUE_931,
+ ASTEST_ISSUE_931_PING
+}astest_no;
+
+/* a static context for the async tests */
+struct _astest {
+ redisAsyncContext *ac;
+ astest_no testno;
+ int counter;
+ int connects;
+ int connect_status;
+ int disconnects;
+ int pongs;
+ int disconnect_status;
+ int connected;
+ int err;
+ char errstr[256];
+};
+static struct _astest astest;
+
+static void asSleep(int ms)
+{
+#if _MSC_VER
+ Sleep(ms);
+#else
+ usleep(ms*1000);
+#endif
+}
+
+/* async callbacks */
+static void asCleanup(void* data)
+{
+ struct _astest *t = (struct _astest *)data;
+ t->ac = NULL;
+}
+
+static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _privdata);
+
+static void connectCallback(redisAsyncContext *c, int status) {
+ struct _astest *t = (struct _astest *)c->data;
+ assert(t == &astest);
+ assert(t->connects == 0);
+ t->err = c->err;
+ strcpy(t->errstr, c->errstr);
+ t->connects++;
+ t->connect_status = status;
+ t->connected = status == REDIS_OK ? 1 : -1;
+
+ if (t->testno == ASTEST_ISSUE_931) {
+ /* disconnect again */
+ redisAsyncDisconnect(c);
+ }
+ else if (t->testno == ASTEST_ISSUE_931_PING)
+ {
+ status = redisAsyncCommand(c, commandCallback, NULL, "PING");
+ }
+}
+static void disconnectCallback(const redisAsyncContext *c, int status) {
+ assert(c->data == (void*)&astest);
+ assert(astest.disconnects == 0);
+ astest.err = c->err;
+ strcpy(astest.errstr, c->errstr);
+ astest.disconnects++;
+ astest.disconnect_status = status;
+ astest.connected = 0;
+}
+
+static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _privdata)
+{
+ redisReply *reply = (redisReply*)_reply;
+ struct _astest *t = (struct _astest *)ac->data;
+ assert(t == &astest);
+ (void)_privdata;
+ t->err = ac->err;
+ strcpy(t->errstr, ac->errstr);
+ t->counter++;
+ if (t->testno == ASTEST_PINGPONG ||t->testno == ASTEST_ISSUE_931_PING)
+ {
+ assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
+ t->pongs++;
+ redisAsyncFree(ac);
+ }
+ if (t->testno == ASTEST_PINGPONG_TIMEOUT)
+ {
+ /* two ping pongs */
+ assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
+ t->pongs++;
+ if (t->counter == 1) {
+ int status = redisAsyncCommand(ac, commandCallback, NULL, "PING");
+ assert(status == REDIS_OK);
+ } else {
+ redisAsyncFree(ac);
+ }
+ }
+}
+
+static redisAsyncContext *do_aconnect(struct config config, astest_no testno)
+{
+ redisOptions options = {0};
+ memset(&astest, 0, sizeof(astest));
+
+ astest.testno = testno;
+ astest.connect_status = astest.disconnect_status = -2;
+
+ if (config.type == CONN_TCP) {
+ options.type = REDIS_CONN_TCP;
+ options.connect_timeout = &config.tcp.timeout;
+ REDIS_OPTIONS_SET_TCP(&options, config.tcp.host, config.tcp.port);
+ } else if (config.type == CONN_SSL) {
+ options.type = REDIS_CONN_TCP;
+ options.connect_timeout = &config.tcp.timeout;
+ REDIS_OPTIONS_SET_TCP(&options, config.ssl.host, config.ssl.port);
+ } else if (config.type == CONN_UNIX) {
+ options.type = REDIS_CONN_UNIX;
+ options.endpoint.unix_socket = config.unix_sock.path;
+ } else if (config.type == CONN_FD) {
+ options.type = REDIS_CONN_USERFD;
+ /* Create a dummy connection just to get an fd to inherit */
+ redisContext *dummy_ctx = redisConnectUnix(config.unix_sock.path);
+ if (dummy_ctx) {
+ redisFD fd = disconnect(dummy_ctx, 1);
+ printf("Connecting to inherited fd %d\n", (int)fd);
+ options.endpoint.fd = fd;
+ }
+ }
+ redisAsyncContext *c = redisAsyncConnectWithOptions(&options);
+ assert(c);
+ astest.ac = c;
+ c->data = &astest;
+ c->dataCleanup = asCleanup;
+ redisPollAttach(c);
+ redisAsyncSetConnectCallbackNC(c, connectCallback);
+ redisAsyncSetDisconnectCallback(c, disconnectCallback);
+ return c;
+}
+
+static void as_printerr(void) {
+ printf("Async err %d : %s\n", astest.err, astest.errstr);
+}
+
+#define ASASSERT(e) do { \
+ if (!(e)) \
+ as_printerr(); \
+ assert(e); \
+} while (0);
+
+static void test_async_polling(struct config config) {
+ int status;
+ redisAsyncContext *c;
+ struct config defaultconfig = config;
+
+ test("Async connect: ");
+ c = do_aconnect(config, ASTEST_CONNECT);
+ assert(c);
+ while(astest.connected == 0)
+ redisPollTick(c, 0.1);
+ assert(astest.connects == 1);
+ ASASSERT(astest.connect_status == REDIS_OK);
+ assert(astest.disconnects == 0);
+ test_cond(astest.connected == 1);
+
+ test("Async free after connect: ");
+ assert(astest.ac != NULL);
+ redisAsyncFree(c);
+ assert(astest.disconnects == 1);
+ assert(astest.ac == NULL);
+ test_cond(astest.disconnect_status == REDIS_OK);
+
+ if (config.type == CONN_TCP || config.type == CONN_SSL) {
+ /* timeout can only be simulated with network */
+ test("Async connect timeout: ");
+ config.tcp.host = "192.168.254.254"; /* blackhole ip */
+ config.tcp.timeout.tv_usec = 100000;
+ c = do_aconnect(config, ASTEST_CONN_TIMEOUT);
+ assert(c);
+ assert(c->err == 0);
+ while(astest.connected == 0)
+ redisPollTick(c, 0.1);
+ assert(astest.connected == -1);
+ /*
+ * freeing should not be done, clearing should have happened.
+ *redisAsyncFree(c);
+ */
+ assert(astest.ac == NULL);
+ test_cond(astest.connect_status == REDIS_ERR);
+ config = defaultconfig;
+ }
+
+ /* Test a ping/pong after connection */
+ test("Async PING/PONG: ");
+ c = do_aconnect(config, ASTEST_PINGPONG);
+ while(astest.connected == 0)
+ redisPollTick(c, 0.1);
+ status = redisAsyncCommand(c, commandCallback, NULL, "PING");
+ assert(status == REDIS_OK);
+ while(astest.ac)
+ redisPollTick(c, 0.1);
+ test_cond(astest.pongs == 1);
+
+ /* Test a ping/pong after connection that didn't time out.
+ * see https://github.com/redis/hiredis/issues/945
+ */
+ if (config.type == CONN_TCP || config.type == CONN_SSL) {
+ test("Async PING/PONG after connect timeout: ");
+ config.tcp.timeout.tv_usec = 10000; /* 10ms */
+ c = do_aconnect(config, ASTEST_PINGPONG_TIMEOUT);
+ while(astest.connected == 0)
+ redisPollTick(c, 0.1);
+ /* sleep 0.1 s, allowing old timeout to arrive */
+ asSleep(10);
+ status = redisAsyncCommand(c, commandCallback, NULL, "PING");
+ assert(status == REDIS_OK);
+ while(astest.ac)
+ redisPollTick(c, 0.1);
+ test_cond(astest.pongs == 2);
+ config = defaultconfig;
+ }
+
+ /* Test disconnect from an on_connect callback
+ * see https://github.com/redis/hiredis/issues/931
+ */
+ test("Disconnect from onConnected callback (Issue #931): ");
+ c = do_aconnect(config, ASTEST_ISSUE_931);
+ while(astest.disconnects == 0)
+ redisPollTick(c, 0.1);
+ assert(astest.connected == 0);
+ assert(astest.connects == 1);
+ test_cond(astest.disconnects == 1);
+
+ /* Test ping/pong from an on_connect callback
+ * see https://github.com/redis/hiredis/issues/931
+ */
+ test("Ping/Pong from onConnected callback (Issue #931): ");
+ c = do_aconnect(config, ASTEST_ISSUE_931_PING);
+ /* connect callback issues ping, reponse callback destroys context */
+ while(astest.ac)
+ redisPollTick(c, 0.1);
+ assert(astest.connected == 0);
+ assert(astest.connects == 1);
+ assert(astest.disconnects == 1);
+ test_cond(astest.pongs == 1);
+}
+/* End of Async polling_adapter driven tests */
+
int main(int argc, char **argv) {
struct config cfg = {
.tcp = {
@@ -1561,6 +2271,34 @@ int main(int argc, char **argv) {
}
#endif
+#ifdef HIREDIS_TEST_ASYNC
+ cfg.type = CONN_TCP;
+ printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
+ cfg.type = CONN_TCP;
+
+ int major;
+ redisContext *c = do_connect(cfg);
+ get_redis_version(c, &major, NULL);
+ disconnect(c, 0);
+
+ test_pubsub_handling(cfg);
+ test_pubsub_multiple_channels(cfg);
+ test_monitor(cfg);
+ if (major >= 6) {
+ test_pubsub_handling_resp3(cfg);
+ test_command_timeout_during_pubsub(cfg);
+ }
+#endif /* HIREDIS_TEST_ASYNC */
+
+ cfg.type = CONN_TCP;
+ printf("\nTesting asynchronous API using polling_adapter TCP (%s:%d):\n", cfg.tcp.host, cfg.tcp.port);
+ test_async_polling(cfg);
+ if (test_unix_socket) {
+ cfg.type = CONN_UNIX;
+ printf("\nTesting asynchronous API using polling_adapter UNIX (%s):\n", cfg.unix_sock.path);
+ test_async_polling(cfg);
+ }
+
if (test_inherit_fd) {
printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path);
if (test_unix_socket) {