summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPieter Noordhuis <pcnoordhuis@gmail.com>2010-10-18 16:27:12 +0200
committerPieter Noordhuis <pcnoordhuis@gmail.com>2010-10-18 16:27:52 +0200
commitf9596db90bf030dfbd008151521d1c72e1e497e4 (patch)
treeff41c5b676e323d48e49c4d040bfbf2db3fd73e3
parente332a32b3514d8a5603610e9e9febd5f6ddaae3c (diff)
Test callback sequence in non-blocking context
-rw-r--r--hiredis.c25
-rw-r--r--test.c32
2 files changed, 46 insertions, 11 deletions
diff --git a/hiredis.c b/hiredis.c
index 585dfe8..052285b 100644
--- a/hiredis.c
+++ b/hiredis.c
@@ -676,15 +676,7 @@ int redisBufferRead(redisContext *c) {
return REDIS_OK;
}
-static void redisPopCallback(redisContext *c) {
- if (c->cpos > 1) {
- memmove(&c->callbacks[0],&c->callbacks[1],(c->cpos-1)*sizeof(redisCallback));
- }
- c->cpos--;
-}
-
int redisGetReply(redisContext *c, void **reply) {
- redisPopCallback(c);
if (redisReplyReaderGetReply(c->reader,reply) == REDIS_ERR) {
/* Copy the (protocol) error from the reader to the context. */
c->error = sdsnew(((redisReader*)c->reader)->error);
@@ -693,24 +685,35 @@ int redisGetReply(redisContext *c, void **reply) {
return REDIS_OK;
}
+static void redisPopCallback(redisContext *c) {
+ assert(c->cpos > 0);
+ if (c->cpos > 1)
+ memmove(&c->callbacks[0],&c->callbacks[1],(c->cpos-1)*sizeof(redisCallback));
+ c->cpos--;
+}
+
int redisProcessCallbacks(redisContext *c) {
void *reply = NULL;
redisCallback cb;
- do {
+ /* Continue while there are callbacks */
+ while(c->cpos > 0) {
cb = c->callbacks[0];
if (redisGetReply(c,&reply) == REDIS_ERR)
return REDIS_ERR;
- /* Fire callback when there is a reply. */
if (reply != NULL) {
+ redisPopCallback(c);
if (cb.fn != NULL) {
cb.fn(c,reply,cb.privdata);
} else {
c->fn->freeObject(reply);
}
+ } else {
+ /* Stop trying */
+ break;
}
- } while (reply != NULL);
+ }
return REDIS_OK;
}
diff --git a/test.c b/test.c
index 2800616..b1a6ed6 100644
--- a/test.c
+++ b/test.c
@@ -202,6 +202,15 @@ static void __test_callback(redisContext *c, const void *privdata) {
__test_callback_flags |= (long)privdata;
}
+static long __test_reply_callback_flags = 0;
+static void __test_reply_callback(redisContext *c, redisReply *reply, const void *privdata) {
+ ((void)c);
+ /* Shift to detect execution order */
+ __test_reply_callback_flags <<= 8;
+ __test_reply_callback_flags |= (long)privdata;
+ freeReplyObject(reply);
+}
+
static void test_nonblocking_connection() {
redisContext *c;
int wdone = 0;
@@ -249,6 +258,29 @@ static void test_nonblocking_connection() {
test_cond(redisBufferWrite(c,NULL) == REDIS_ERR &&
strncmp(c->error,"write:",6) == 0);
redisFree(c);
+
+ wdone = __test_reply_callback_flags = 0;
+ test("Process callbacks in the right sequence: ");
+ c = redisConnectNonBlock("127.0.0.1", 6379, NULL);
+ redisCommandWithCallback(c,__test_reply_callback,(const void*)1,"PING");
+ redisCommandWithCallback(c,__test_reply_callback,(const void*)2,"PING");
+ redisCommandWithCallback(c,__test_reply_callback,(const void*)3,"PING");
+
+ /* Write output buffer */
+ while(!wdone) {
+ usleep(500);
+ redisBufferWrite(c,&wdone);
+ }
+
+ /* Read until at least one callback is executed (the 3 replies will
+ * arrive in a single packet, causing all callbacks to be executed in
+ * a single pass). */
+ while(__test_reply_callback_flags == 0) {
+ assert(redisBufferRead(c) == REDIS_OK);
+ redisProcessCallbacks(c);
+ }
+ test_cond(__test_reply_callback_flags == 0x010203);
+ redisFree(c);
}
int main(void) {