summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--async.c64
-rw-r--r--async.h2
-rw-r--r--test.c54
3 files changed, 103 insertions, 17 deletions
diff --git a/async.c b/async.c
index 3dad137..73b7980 100644
--- a/async.c
+++ b/async.c
@@ -303,6 +303,34 @@ static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) {
}
}
+static void __redisRunConnectCallback(redisAsyncContext *ac, int status)
+{
+ if (ac->onConnect) {
+ if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
+ ac->c.flags |= REDIS_IN_CALLBACK;
+ ac->onConnect(ac, status);
+ ac->c.flags &= ~REDIS_IN_CALLBACK;
+ } else {
+ /* already in callback */
+ ac->onConnect(ac, status);
+ }
+ }
+}
+
+static void __redisRunDisconnectCallback(redisAsyncContext *ac, int status)
+{
+ if (ac->onDisconnect) {
+ if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
+ ac->c.flags |= REDIS_IN_CALLBACK;
+ ac->onDisconnect(ac, status);
+ ac->c.flags &= ~REDIS_IN_CALLBACK;
+ } else {
+ /* already in callback */
+ ac->onDisconnect(ac, status);
+ }
+ }
+}
+
/* Helper function to free the context. */
static void __redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
@@ -338,12 +366,11 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
/* Execute disconnect callback. When redisAsyncFree() initiated destroying
* this context, the status will always be REDIS_OK. */
- if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) {
- if (c->flags & REDIS_FREEING) {
- ac->onDisconnect(ac,REDIS_OK);
- } else {
- ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR);
- }
+ if (c->flags & REDIS_CONNECTED) {
+ int status = ac->err == 0 ? REDIS_OK : REDIS_ERR;
+ if (c->flags & REDIS_FREEING)
+ status = REDIS_OK;
+ __redisRunDisconnectCallback(ac, status);
}
if (ac->dataCleanup) {
@@ -603,7 +630,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
}
static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) {
- if (ac->onConnect) ac->onConnect(ac, REDIS_ERR);
+ __redisRunConnectCallback(ac, REDIS_ERR);
__redisAsyncDisconnect(ac);
}
@@ -628,8 +655,19 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
return REDIS_ERR;
}
- if (ac->onConnect) ac->onConnect(ac, REDIS_OK);
+ /* flag us as fully connect, but allow the callback
+ * to disconnect. For that reason, permit the function
+ * to delete the context here after callback return.
+ */
c->flags |= REDIS_CONNECTED;
+ __redisRunConnectCallback(ac, REDIS_OK);
+ if ((ac->c.flags & REDIS_DISCONNECTING)) {
+ redisAsyncDisconnect(ac);
+ return REDIS_ERR;
+ } else if ((ac->c.flags & REDIS_FREEING)) {
+ redisAsyncFree(ac);
+ return REDIS_ERR;
+ }
return REDIS_OK;
} else {
return REDIS_OK;
@@ -653,6 +691,8 @@ void redisAsyncRead(redisAsyncContext *ac) {
*/
void redisAsyncHandleRead(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
+ /* must not be called from a callback */
+ assert(!(c->flags & REDIS_IN_CALLBACK));
if (!(c->flags & REDIS_CONNECTED)) {
/* Abort connect was not successful. */
@@ -686,6 +726,8 @@ void redisAsyncWrite(redisAsyncContext *ac) {
void redisAsyncHandleWrite(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
+ /* must not be called from a callback */
+ assert(!(c->flags & REDIS_IN_CALLBACK));
if (!(c->flags & REDIS_CONNECTED)) {
/* Abort connect was not successful. */
@@ -702,6 +744,8 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
void redisAsyncHandleTimeout(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb;
+ /* must not be called from a callback */
+ assert(!(c->flags & REDIS_IN_CALLBACK));
if ((c->flags & REDIS_CONNECTED)) {
if (ac->replies.head == NULL && ac->sub.replies.head == NULL) {
@@ -721,8 +765,8 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) {
__redisAsyncCopyError(ac);
}
- if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) {
- ac->onConnect(ac, REDIS_ERR);
+ if (!(c->flags & REDIS_CONNECTED)) {
+ __redisRunConnectCallback(ac, REDIS_ERR);
}
while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
diff --git a/async.h b/async.h
index 41951d4..690b31f 100644
--- a/async.h
+++ b/async.h
@@ -57,7 +57,7 @@ typedef struct redisCallbackList {
/* Connection callback prototypes */
typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status);
-typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status);
+typedef void (redisConnectCallback)(struct redisAsyncContext*, int status);
typedef void(redisTimerCallback)(void *timer, void *privdata);
/* Context for an async connection to Redis */
diff --git a/test.c b/test.c
index e201cbc..fccaca4 100644
--- a/test.c
+++ b/test.c
@@ -1907,7 +1907,9 @@ typedef enum astest_no
ASTEST_CONNECT=0,
ASTEST_CONN_TIMEOUT,
ASTEST_PINGPONG,
- ASTEST_PINGPONG_TIMEOUT
+ ASTEST_PINGPONG_TIMEOUT,
+ ASTEST_ISSUE_931,
+ ASTEST_ISSUE_931_PING
}astest_no;
/* a static context for the async tests */
@@ -1918,6 +1920,7 @@ struct _astest {
int connects;
int connect_status;
int disconnects;
+ int pongs;
int disconnect_status;
int connected;
int err;
@@ -1941,7 +1944,9 @@ static void asCleanup(void* data)
t->ac = NULL;
}
-static void connectCallback(const redisAsyncContext *c, int status) {
+static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _privdata);
+
+static void connectCallback(redisAsyncContext *c, int status) {
struct _astest *t = (struct _astest *)c->data;
assert(t == &astest);
assert(t->connects == 0);
@@ -1950,6 +1955,15 @@ static void connectCallback(const redisAsyncContext *c, int status) {
t->connects++;
t->connect_status = status;
t->connected = status == REDIS_OK ? 1 : -1;
+
+ if (t->testno == ASTEST_ISSUE_931) {
+ /* disconnect again */
+ redisAsyncDisconnect(c);
+ }
+ else if (t->testno == ASTEST_ISSUE_931_PING)
+ {
+ status = redisAsyncCommand(c, commandCallback, NULL, "PING");
+ }
}
static void disconnectCallback(const redisAsyncContext *c, int status) {
assert(c->data == (void*)&astest);
@@ -1969,20 +1983,22 @@ static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _p
(void)_privdata;
t->err = ac->err;
strcpy(t->errstr, ac->errstr);
- if (t->testno == ASTEST_PINGPONG)
+ t->counter++;
+ if (t->testno == ASTEST_PINGPONG ||t->testno == ASTEST_ISSUE_931_PING)
{
- test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
+ assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
+ t->pongs++;
redisAsyncFree(ac);
}
if (t->testno == ASTEST_PINGPONG_TIMEOUT)
{
/* two ping pongs */
assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
- if (++t->counter == 1) {
+ t->pongs++;
+ if (t->counter == 1) {
int status = redisAsyncCommand(ac, commandCallback, NULL, "PING");
assert(status == REDIS_OK);
} else {
- test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0);
redisAsyncFree(ac);
}
}
@@ -2089,6 +2105,7 @@ static void test_async_polling(struct config config) {
assert(status == REDIS_OK);
while(astest.ac)
redisPollTick(c, 0.1);
+ test_cond(astest.pongs == 1);
/* Test a ping/pong after connection that didn't time out.
* see https://github.com/redis/hiredis/issues/945
@@ -2105,8 +2122,33 @@ static void test_async_polling(struct config config) {
assert(status == REDIS_OK);
while(astest.ac)
redisPollTick(c, 0.1);
+ test_cond(astest.pongs == 2);
config = defaultconfig;
}
+
+ /* Test disconnect from an on_connect callback
+ * see https://github.com/redis/hiredis/issues/931
+ */
+ test("Disconnect from onConnected callback (Issue #931): ");
+ c = do_aconnect(config, ASTEST_ISSUE_931);
+ while(astest.disconnects == 0)
+ redisPollTick(c, 0.1);
+ assert(astest.connected == 0);
+ assert(astest.connects == 1);
+ test_cond(astest.disconnects == 1);
+
+ /* Test ping/pong from an on_connect callback
+ * see https://github.com/redis/hiredis/issues/931
+ */
+ test("Ping/Pong from onConnected callback (Issue #931): ");
+ c = do_aconnect(config, ASTEST_ISSUE_931_PING);
+ /* connect callback issues ping, reponse callback destroys context */
+ while(astest.ac)
+ redisPollTick(c, 0.1);
+ assert(astest.connected == 0);
+ assert(astest.connects == 1);
+ assert(astest.disconnects == 1);
+ test_cond(astest.pongs == 1);
}
/* End of Async polling_adapter driven tests */