diff options
author | Michael Grunder <michael.grunder@gmail.com> | 2020-07-19 18:54:42 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-07-19 18:54:42 -0700 |
commit | 2e7d7cbabd32912342218078282fce92f6cc0ab6 (patch) | |
tree | ba60c50c28b433aef6e128a67522085751acb6cb /test.c | |
parent | 1864e76ea7323fd8789d9c8b5b3c8ca27d4840a6 (diff) |
Resp3 oob push support (#841)
Proper support for RESP3 PUSH messages.
By default, PUSH messages are now intercepted and the reply memory freed.
This means existing code should work unchanged when connecting to Redis
>= 6.0.0 even if `CLIENT TRACKING` were then enabled.
Additionally, we define two callbacks users can configure if they wish to handle
these messages in a custom way:
void redisPushFn(void *privdata, void *reply);
void redisAsyncPushFn(redisAsyncContext *ac, void *reply);
See #825
Diffstat (limited to 'test.c')
-rw-r--r-- | test.c | 189 |
1 files changed, 177 insertions, 12 deletions
@@ -13,6 +13,7 @@ #include <limits.h> #include "hiredis.h" +#include "async.h" #ifdef HIREDIS_TEST_SSL #include "hiredis_ssl.h" #endif @@ -77,6 +78,43 @@ static long long usec(void) { #define assert(e) (void)(e) #endif +/* Helper to extract Redis version information. Aborts on any failure. */ +#define REDIS_VERSION_FIELD "redis_version:" +void get_redis_version(redisContext *c, int *majorptr, int *minorptr) { + redisReply *reply; + char *eptr, *s, *e; + int major, minor; + + reply = redisCommand(c, "INFO"); + if (reply == NULL || c->err || reply->type != REDIS_REPLY_STRING) + goto abort; + if ((s = strstr(reply->str, REDIS_VERSION_FIELD)) == NULL) + goto abort; + + s += strlen(REDIS_VERSION_FIELD); + + /* We need a field terminator and at least 'x.y.z' (5) bytes of data */ + if ((e = strstr(s, "\r\n")) == NULL || (e - s) < 5) + goto abort; + + /* Extract version info */ + major = strtol(s, &eptr, 10); + if (*eptr != '.') goto abort; + minor = strtol(eptr+1, NULL, 10); + + /* Push info the caller wants */ + if (majorptr) *majorptr = major; + if (minorptr) *minorptr = minor; + + freeReplyObject(reply); + return; + +abort: + freeReplyObject(reply); + fprintf(stderr, "Error: Cannot determine Redis version, aborting\n"); + exit(1); +} + static redisContext *select_database(redisContext *c) { redisReply *reply; @@ -99,6 +137,26 @@ static redisContext *select_database(redisContext *c) { return c; } +/* Switch protocol */ +static void send_hello(redisContext *c, int version) { + redisReply *reply; + int expected; + + reply = redisCommand(c, "HELLO %d", version); + expected = version == 3 ? REDIS_REPLY_MAP : REDIS_REPLY_ARRAY; + assert(reply != NULL && reply->type == expected); + freeReplyObject(reply); +} + +/* Togggle client tracking */ +static void send_client_tracking(redisContext *c, const char *str) { + redisReply *reply; + + reply = redisCommand(c, "CLIENT TRACKING %s", str); + assert(reply != NULL && reply->type == REDIS_REPLY_STATUS); + freeReplyObject(reply); +} + static int disconnect(redisContext *c, int keep_fd) { redisReply *reply; @@ -615,9 +673,123 @@ static void test_blocking_connection_errors(void) { #endif } +/* Dummy push handler */ +void push_handler(void *privdata, void *reply) { + int *counter = privdata; + freeReplyObject(reply); + *counter += 1; +} + +/* Dummy function just to test setting a callback with redisOptions */ +void push_handler_async(redisAsyncContext *ac, void *reply) { + (void)ac; + (void)reply; +} + +static void test_resp3_push_handler(redisContext *c) { + redisPushFn *old = NULL; + redisReply *reply; + void *privdata; + int n = 0; + + /* Switch to RESP3 and turn on client tracking */ + send_hello(c, 3); + send_client_tracking(c, "ON"); + privdata = c->privdata; + c->privdata = &n; + + reply = redisCommand(c, "GET key:0"); + assert(reply != NULL); + freeReplyObject(reply); + + test("RESP3 PUSH messages are handled out of band by default: "); + reply = redisCommand(c, "SET key:0 val:0"); + test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS); + freeReplyObject(reply); + + assert((reply = redisCommand(c, "GET key:0")) != NULL); + freeReplyObject(reply); + + old = redisSetPushCallback(c, push_handler); + test("We can set a custom RESP3 PUSH handler: "); + reply = redisCommand(c, "SET key:0 val:0"); + test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && n == 1); + freeReplyObject(reply); + + /* Unset the push callback and generate an invalidate message making + * sure it is not handled out of band. */ + test("With no handler, PUSH replies come in-band: "); + redisSetPushCallback(c, NULL); + assert((reply = redisCommand(c, "GET key:0")) != NULL); + freeReplyObject(reply); + assert((reply = redisCommand(c, "SET key:0 invalid")) != NULL); + test_cond(reply->type == REDIS_REPLY_PUSH); + freeReplyObject(reply); + + test("With no PUSH handler, no replies are lost: "); + assert(redisGetReply(c, (void**)&reply) == REDIS_OK); + test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS); + freeReplyObject(reply); + + /* Return to the originally set PUSH handler */ + assert(old != NULL); + redisSetPushCallback(c, old); + + /* Switch back to RESP2 and disable tracking */ + c->privdata = privdata; + send_client_tracking(c, "OFF"); + send_hello(c, 2); +} + +redisOptions get_redis_tcp_options(struct config config) { + redisOptions options = {0}; + REDIS_OPTIONS_SET_TCP(&options, config.tcp.host, config.tcp.port); + return options; +} + +static void test_resp3_push_options(struct config config) { + redisAsyncContext *ac; + redisContext *c; + redisOptions options; + + test("We set a default RESP3 handler for redisContext: "); + options = get_redis_tcp_options(config); + assert((c = redisConnectWithOptions(&options)) != NULL); + test_cond(c->push_cb != NULL); + redisFree(c); + + test("We don't set a default RESP3 push handler for redisAsyncContext: "); + options = get_redis_tcp_options(config); + assert((ac = redisAsyncConnectWithOptions(&options)) != NULL); + test_cond(ac->c.push_cb == NULL); + redisAsyncFree(ac); + + test("Our REDIS_OPT_NO_PUSH_AUTOFREE flag works: "); + options = get_redis_tcp_options(config); + options.options |= REDIS_OPT_NO_PUSH_AUTOFREE; + assert((c = redisConnectWithOptions(&options)) != NULL); + test_cond(c->push_cb == NULL); + redisFree(c); + + test("We can use redisOptions to set a custom PUSH handler for redisContext: "); + options = get_redis_tcp_options(config); + options.push_cb = push_handler; + assert((c = redisConnectWithOptions(&options)) != NULL); + test_cond(c->push_cb == push_handler); + redisFree(c); + + test("We can use redisOptions to set a custom PUSH handler for redisAsyncContext: "); + options = get_redis_tcp_options(config); + options.async_push_cb = push_handler_async; + assert((ac = redisAsyncConnectWithOptions(&options)) != NULL); + test_cond(ac->push_cb == push_handler_async); + redisAsyncFree(ac); +} + static void test_blocking_connection(struct config config) { redisContext *c; redisReply *reply; + int major; c = do_connect(config); @@ -695,6 +867,10 @@ static void test_blocking_connection(struct config config) { assert(redisAppendCommand(c, "PING") == REDIS_OK); test_cond(redisGetReply(c, NULL) == REDIS_OK); + get_redis_version(c, &major, NULL); + if (major >= 6) test_resp3_push_handler(c); + test_resp3_push_options(config); + disconnect(c, 0); } @@ -780,18 +956,7 @@ static void test_blocking_io_errors(struct config config) { /* Connect to target given by config. */ c = do_connect(config); - { - /* Find out Redis version to determine the path for the next test */ - const char *field = "redis_version:"; - char *p, *eptr; - - reply = redisCommand(c,"INFO"); - p = strstr(reply->str,field); - major = strtol(p+strlen(field),&eptr,10); - p = eptr+1; /* char next to the first "." */ - minor = strtol(p,&eptr,10); - freeReplyObject(reply); - } + get_redis_version(c, &major, &minor); test("Returns I/O error when the connection is lost: "); reply = redisCommand(c,"QUIT"); |