summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--async.c41
1 files changed, 31 insertions, 10 deletions
diff --git a/async.c b/async.c
index 781b56f..d89a2a0 100644
--- a/async.c
+++ b/async.c
@@ -197,19 +197,42 @@ static int __redisShiftCallback(redisCallbackList *list, redisCallback *target)
return REDIS_ERR;
}
+static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
+ redisContext *c = &(ac->c);
+ if (cb->fn != NULL) {
+ c->flags |= REDIS_IN_CALLBACK;
+ cb->fn(ac,reply,cb->privdata);
+ c->flags &= ~REDIS_IN_CALLBACK;
+ }
+}
+
/* Helper function to free the context. */
static void __redisAsyncFree(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
redisCallback cb;
+ dictIterator *it;
+ dictEntry *de;
/* Execute pending callbacks with NULL reply. */
- while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) {
- if (cb.fn != NULL) {
- c->flags |= REDIS_IN_CALLBACK;
- cb.fn(ac,NULL,cb.privdata);
- c->flags &= ~REDIS_IN_CALLBACK;
- }
- }
+ while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
+ __redisRunCallback(ac,&cb,NULL);
+
+ /* Execute callbacks for invalid commands */
+ while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK)
+ __redisRunCallback(ac,&cb,NULL);
+
+ /* Run subscription callbacks callbacks with NULL reply */
+ it = dictGetIterator(ac->sub.channels);
+ while ((de = dictNext(it)) != NULL)
+ __redisRunCallback(ac,dictGetEntryVal(de),NULL);
+ dictReleaseIterator(it);
+ dictRelease(ac->sub.channels);
+
+ it = dictGetIterator(ac->sub.patterns);
+ while ((de = dictNext(it)) != NULL)
+ __redisRunCallback(ac,dictGetEntryVal(de),NULL);
+ dictReleaseIterator(it);
+ dictRelease(ac->sub.patterns);
/* Signal event lib to clean up */
if (ac->ev.cleanup) ac->ev.cleanup(ac->ev.data);
@@ -349,9 +372,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
}
if (cb.fn != NULL) {
- c->flags |= REDIS_IN_CALLBACK;
- cb.fn(ac,reply,cb.privdata);
- c->flags &= ~REDIS_IN_CALLBACK;
+ __redisRunCallback(ac,&cb,reply);
c->fn->freeObject(reply);
/* Proceed with free'ing when redisAsyncFree() was called. */