summaryrefslogtreecommitdiff
path: root/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'async.c')
-rw-r--r--async.c516
1 files changed, 258 insertions, 258 deletions
diff --git a/async.c b/async.c
index 5595bca..2363092 100644
--- a/async.c
+++ b/async.c
@@ -34,9 +34,9 @@
#define assert(e) (void)(e)
#endif
-/* Forward declarations of hiredis.c functions */
-int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
-void __redisSetError(redisContext *c, int type, const char *str);
+/* Forward declarations of hiredict.c functions */
+int __redictAppendCommand(redictContext *c, const char *cmd, size_t len);
+void __redictSetError(redictContext *c, int type, const char *str);
/* Functions managing dictionary of callbacks for pub/sub. */
static unsigned int callbackHash(const void *key) {
@@ -46,7 +46,7 @@ static unsigned int callbackHash(const void *key) {
static void *callbackValDup(void *privdata, const void *src) {
((void) privdata);
- redisCallback *dup;
+ redictCallback *dup;
dup = hi_malloc(sizeof(*dup));
if (dup == NULL)
@@ -85,8 +85,8 @@ static dictType callbackDict = {
callbackValDestructor
};
-static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
- redisAsyncContext *ac;
+static redictAsyncContext *redictAsyncInitialize(redictContext *c) {
+ redictAsyncContext *ac;
dict *channels = NULL, *patterns = NULL;
channels = dictCreate(&callbackDict,NULL);
@@ -97,16 +97,16 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) {
if (patterns == NULL)
goto oom;
- ac = hi_realloc(c,sizeof(redisAsyncContext));
+ ac = hi_realloc(c,sizeof(redictAsyncContext));
if (ac == NULL)
goto oom;
c = &(ac->c);
- /* The regular connect functions will always set the flag REDIS_CONNECTED.
+ /* The regular connect functions will always set the flag REDICT_CONNECTED.
* For the async API, we want to wait until the first write event is
* received up before setting this flag, so reset it here. */
- c->flags &= ~REDIS_CONNECTED;
+ c->flags &= ~REDICT_CONNECTED;
ac->err = 0;
ac->errstr = NULL;
@@ -141,81 +141,81 @@ oom:
}
/* We want the error field to be accessible directly instead of requiring
- * an indirection to the redisContext struct. */
-static void __redisAsyncCopyError(redisAsyncContext *ac) {
+ * an indirection to the redictContext struct. */
+static void __redictAsyncCopyError(redictAsyncContext *ac) {
if (!ac)
return;
- redisContext *c = &(ac->c);
+ redictContext *c = &(ac->c);
ac->err = c->err;
ac->errstr = c->errstr;
}
-redisAsyncContext *redisAsyncConnectWithOptions(const redisOptions *options) {
- redisOptions myOptions = *options;
- redisContext *c;
- redisAsyncContext *ac;
+redictAsyncContext *redictAsyncConnectWithOptions(const redictOptions *options) {
+ redictOptions myOptions = *options;
+ redictContext *c;
+ redictAsyncContext *ac;
/* Clear any erroneously set sync callback and flag that we don't want to
* use freeReplyObject by default. */
myOptions.push_cb = NULL;
- myOptions.options |= REDIS_OPT_NO_PUSH_AUTOFREE;
+ myOptions.options |= REDICT_OPT_NO_PUSH_AUTOFREE;
- myOptions.options |= REDIS_OPT_NONBLOCK;
- c = redisConnectWithOptions(&myOptions);
+ myOptions.options |= REDICT_OPT_NONBLOCK;
+ c = redictConnectWithOptions(&myOptions);
if (c == NULL) {
return NULL;
}
- ac = redisAsyncInitialize(c);
+ ac = redictAsyncInitialize(c);
if (ac == NULL) {
- redisFree(c);
+ redictFree(c);
return NULL;
}
/* Set any configured async push handler */
- redisAsyncSetPushCallback(ac, myOptions.async_push_cb);
+ redictAsyncSetPushCallback(ac, myOptions.async_push_cb);
- __redisAsyncCopyError(ac);
+ __redictAsyncCopyError(ac);
return ac;
}
-redisAsyncContext *redisAsyncConnect(const char *ip, int port) {
- redisOptions options = {0};
- REDIS_OPTIONS_SET_TCP(&options, ip, port);
- return redisAsyncConnectWithOptions(&options);
+redictAsyncContext *redictAsyncConnect(const char *ip, int port) {
+ redictOptions options = {0};
+ REDICT_OPTIONS_SET_TCP(&options, ip, port);
+ return redictAsyncConnectWithOptions(&options);
}
-redisAsyncContext *redisAsyncConnectBind(const char *ip, int port,
+redictAsyncContext *redictAsyncConnectBind(const char *ip, int port,
const char *source_addr) {
- redisOptions options = {0};
- REDIS_OPTIONS_SET_TCP(&options, ip, port);
+ redictOptions options = {0};
+ REDICT_OPTIONS_SET_TCP(&options, ip, port);
options.endpoint.tcp.source_addr = source_addr;
- return redisAsyncConnectWithOptions(&options);
+ return redictAsyncConnectWithOptions(&options);
}
-redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port,
+redictAsyncContext *redictAsyncConnectBindWithReuse(const char *ip, int port,
const char *source_addr) {
- redisOptions options = {0};
- REDIS_OPTIONS_SET_TCP(&options, ip, port);
- options.options |= REDIS_OPT_REUSEADDR;
+ redictOptions options = {0};
+ REDICT_OPTIONS_SET_TCP(&options, ip, port);
+ options.options |= REDICT_OPT_REUSEADDR;
options.endpoint.tcp.source_addr = source_addr;
- return redisAsyncConnectWithOptions(&options);
+ return redictAsyncConnectWithOptions(&options);
}
-redisAsyncContext *redisAsyncConnectUnix(const char *path) {
- redisOptions options = {0};
- REDIS_OPTIONS_SET_UNIX(&options, path);
- return redisAsyncConnectWithOptions(&options);
+redictAsyncContext *redictAsyncConnectUnix(const char *path) {
+ redictOptions options = {0};
+ REDICT_OPTIONS_SET_UNIX(&options, path);
+ return redictAsyncConnectWithOptions(&options);
}
static int
-redisAsyncSetConnectCallbackImpl(redisAsyncContext *ac, redisConnectCallback *fn,
- redisConnectCallbackNC *fn_nc)
+redictAsyncSetConnectCallbackImpl(redictAsyncContext *ac, redictConnectCallback *fn,
+ redictConnectCallbackNC *fn_nc)
{
/* If either are already set, this is an error */
if (ac->onConnect || ac->onConnectNC)
- return REDIS_ERR;
+ return REDICT_ERR;
if (fn) {
ac->onConnect = fn;
@@ -228,33 +228,33 @@ redisAsyncSetConnectCallbackImpl(redisAsyncContext *ac, redisConnectCallback *fn
* library functions are already set. */
_EL_ADD_WRITE(ac);
- return REDIS_OK;
+ return REDICT_OK;
}
-int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) {
- return redisAsyncSetConnectCallbackImpl(ac, fn, NULL);
+int redictAsyncSetConnectCallback(redictAsyncContext *ac, redictConnectCallback *fn) {
+ return redictAsyncSetConnectCallbackImpl(ac, fn, NULL);
}
-int redisAsyncSetConnectCallbackNC(redisAsyncContext *ac, redisConnectCallbackNC *fn) {
- return redisAsyncSetConnectCallbackImpl(ac, NULL, fn);
+int redictAsyncSetConnectCallbackNC(redictAsyncContext *ac, redictConnectCallbackNC *fn) {
+ return redictAsyncSetConnectCallbackImpl(ac, NULL, fn);
}
-int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) {
+int redictAsyncSetDisconnectCallback(redictAsyncContext *ac, redictDisconnectCallback *fn) {
if (ac->onDisconnect == NULL) {
ac->onDisconnect = fn;
- return REDIS_OK;
+ return REDICT_OK;
}
- return REDIS_ERR;
+ return REDICT_ERR;
}
/* Helper functions to push/shift callbacks */
-static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
- redisCallback *cb;
+static int __redictPushCallback(redictCallbackList *list, redictCallback *source) {
+ redictCallback *cb;
/* Copy callback from stack to heap */
cb = hi_malloc(sizeof(*cb));
if (cb == NULL)
- return REDIS_ERR_OOM;
+ return REDICT_ERR_OOM;
if (source != NULL) {
memcpy(cb,source,sizeof(*cb));
@@ -267,11 +267,11 @@ static int __redisPushCallback(redisCallbackList *list, redisCallback *source) {
if (list->tail != NULL)
list->tail->next = cb;
list->tail = cb;
- return REDIS_OK;
+ return REDICT_OK;
}
-static int __redisShiftCallback(redisCallbackList *list, redisCallback *target) {
- redisCallback *cb = list->head;
+static int __redictShiftCallback(redictCallbackList *list, redictCallback *target) {
+ redictCallback *cb = list->head;
if (cb != NULL) {
list->head = cb->next;
if (cb == list->tail)
@@ -281,41 +281,41 @@ static int __redisShiftCallback(redisCallbackList *list, redisCallback *target)
if (target != NULL)
memcpy(target,cb,sizeof(*cb));
hi_free(cb);
- return REDIS_OK;
+ return REDICT_OK;
}
- return REDIS_ERR;
+ return REDICT_ERR;
}
-static void __redisRunCallback(redisAsyncContext *ac, redisCallback *cb, redisReply *reply) {
- redisContext *c = &(ac->c);
+static void __redictRunCallback(redictAsyncContext *ac, redictCallback *cb, redictReply *reply) {
+ redictContext *c = &(ac->c);
if (cb->fn != NULL) {
- c->flags |= REDIS_IN_CALLBACK;
+ c->flags |= REDICT_IN_CALLBACK;
cb->fn(ac,reply,cb->privdata);
- c->flags &= ~REDIS_IN_CALLBACK;
+ c->flags &= ~REDICT_IN_CALLBACK;
}
}
-static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) {
+static void __redictRunPushCallback(redictAsyncContext *ac, redictReply *reply) {
if (ac->push_cb != NULL) {
- ac->c.flags |= REDIS_IN_CALLBACK;
+ ac->c.flags |= REDICT_IN_CALLBACK;
ac->push_cb(ac, reply);
- ac->c.flags &= ~REDIS_IN_CALLBACK;
+ ac->c.flags &= ~REDICT_IN_CALLBACK;
}
}
-static void __redisRunConnectCallback(redisAsyncContext *ac, int status)
+static void __redictRunConnectCallback(redictAsyncContext *ac, int status)
{
if (ac->onConnect == NULL && ac->onConnectNC == NULL)
return;
- if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
- ac->c.flags |= REDIS_IN_CALLBACK;
+ if (!(ac->c.flags & REDICT_IN_CALLBACK)) {
+ ac->c.flags |= REDICT_IN_CALLBACK;
if (ac->onConnect) {
ac->onConnect(ac, status);
} else {
ac->onConnectNC(ac, status);
}
- ac->c.flags &= ~REDIS_IN_CALLBACK;
+ ac->c.flags &= ~REDICT_IN_CALLBACK;
} else {
/* already in callback */
if (ac->onConnect) {
@@ -326,13 +326,13 @@ static void __redisRunConnectCallback(redisAsyncContext *ac, int status)
}
}
-static void __redisRunDisconnectCallback(redisAsyncContext *ac, int status)
+static void __redictRunDisconnectCallback(redictAsyncContext *ac, int status)
{
if (ac->onDisconnect) {
- if (!(ac->c.flags & REDIS_IN_CALLBACK)) {
- ac->c.flags |= REDIS_IN_CALLBACK;
+ if (!(ac->c.flags & REDICT_IN_CALLBACK)) {
+ ac->c.flags |= REDICT_IN_CALLBACK;
ac->onDisconnect(ac, status);
- ac->c.flags &= ~REDIS_IN_CALLBACK;
+ ac->c.flags &= ~REDICT_IN_CALLBACK;
} else {
/* already in callback */
ac->onDisconnect(ac, status);
@@ -341,23 +341,23 @@ static void __redisRunDisconnectCallback(redisAsyncContext *ac, int status)
}
/* Helper function to free the context. */
-static void __redisAsyncFree(redisAsyncContext *ac) {
- redisContext *c = &(ac->c);
- redisCallback cb;
+static void __redictAsyncFree(redictAsyncContext *ac) {
+ redictContext *c = &(ac->c);
+ redictCallback cb;
dictIterator it;
dictEntry *de;
/* Execute pending callbacks with NULL reply. */
- while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK)
- __redisRunCallback(ac,&cb,NULL);
- while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK)
- __redisRunCallback(ac,&cb,NULL);
+ while (__redictShiftCallback(&ac->replies,&cb) == REDICT_OK)
+ __redictRunCallback(ac,&cb,NULL);
+ while (__redictShiftCallback(&ac->sub.replies,&cb) == REDICT_OK)
+ __redictRunCallback(ac,&cb,NULL);
/* Run subscription callbacks with NULL reply */
if (ac->sub.channels) {
dictInitIterator(&it,ac->sub.channels);
while ((de = dictNext(&it)) != NULL)
- __redisRunCallback(ac,dictGetEntryVal(de),NULL);
+ __redictRunCallback(ac,dictGetEntryVal(de),NULL);
dictRelease(ac->sub.channels);
}
@@ -365,7 +365,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
if (ac->sub.patterns) {
dictInitIterator(&it,ac->sub.patterns);
while ((de = dictNext(&it)) != NULL)
- __redisRunCallback(ac,dictGetEntryVal(de),NULL);
+ __redictRunCallback(ac,dictGetEntryVal(de),NULL);
dictRelease(ac->sub.patterns);
}
@@ -373,13 +373,13 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
/* Signal event lib to clean up */
_EL_CLEANUP(ac);
- /* Execute disconnect callback. When redisAsyncFree() initiated destroying
- * this context, the status will always be REDIS_OK. */
- if (c->flags & REDIS_CONNECTED) {
- int status = ac->err == 0 ? REDIS_OK : REDIS_ERR;
- if (c->flags & REDIS_FREEING)
- status = REDIS_OK;
- __redisRunDisconnectCallback(ac, status);
+ /* Execute disconnect callback. When redictAsyncFree() initiated destroying
+ * this context, the status will always be REDICT_OK. */
+ if (c->flags & REDICT_CONNECTED) {
+ int status = ac->err == 0 ? REDICT_OK : REDICT_ERR;
+ if (c->flags & REDICT_FREEING)
+ status = REDICT_OK;
+ __redictRunDisconnectCallback(ac, status);
}
if (ac->dataCleanup) {
@@ -387,72 +387,72 @@ static void __redisAsyncFree(redisAsyncContext *ac) {
}
/* Cleanup self */
- redisFree(c);
+ redictFree(c);
}
/* Free the async context. When this function is called from a callback,
- * control needs to be returned to redisProcessCallbacks() before actual
+ * control needs to be returned to redictProcessCallbacks() before actual
* free'ing. To do so, a flag is set on the context which is picked up by
- * redisProcessCallbacks(). Otherwise, the context is immediately free'd. */
-void redisAsyncFree(redisAsyncContext *ac) {
+ * redictProcessCallbacks(). Otherwise, the context is immediately free'd. */
+void redictAsyncFree(redictAsyncContext *ac) {
if (ac == NULL)
return;
- redisContext *c = &(ac->c);
+ redictContext *c = &(ac->c);
- c->flags |= REDIS_FREEING;
- if (!(c->flags & REDIS_IN_CALLBACK))
- __redisAsyncFree(ac);
+ c->flags |= REDICT_FREEING;
+ if (!(c->flags & REDICT_IN_CALLBACK))
+ __redictAsyncFree(ac);
}
/* Helper function to make the disconnect happen and clean up. */
-void __redisAsyncDisconnect(redisAsyncContext *ac) {
- redisContext *c = &(ac->c);
+void __redictAsyncDisconnect(redictAsyncContext *ac) {
+ redictContext *c = &(ac->c);
/* Make sure error is accessible if there is any */
- __redisAsyncCopyError(ac);
+ __redictAsyncCopyError(ac);
if (ac->err == 0) {
/* For clean disconnects, there should be no pending callbacks. */
- int ret = __redisShiftCallback(&ac->replies,NULL);
- assert(ret == REDIS_ERR);
+ int ret = __redictShiftCallback(&ac->replies,NULL);
+ assert(ret == REDICT_ERR);
} else {
/* Disconnection is caused by an error, make sure that pending
* callbacks cannot call new commands. */
- c->flags |= REDIS_DISCONNECTING;
+ c->flags |= REDICT_DISCONNECTING;
}
/* cleanup event library on disconnect.
* this is safe to call multiple times */
_EL_CLEANUP(ac);
- /* For non-clean disconnects, __redisAsyncFree() will execute pending
+ /* For non-clean disconnects, __redictAsyncFree() will execute pending
* callbacks with a NULL-reply. */
- if (!(c->flags & REDIS_NO_AUTO_FREE)) {
- __redisAsyncFree(ac);
+ if (!(c->flags & REDICT_NO_AUTO_FREE)) {
+ __redictAsyncFree(ac);
}
}
-/* Tries to do a clean disconnect from Redis, meaning it stops new commands
+/* Tries to do a clean disconnect from Redict, meaning it stops new commands
* from being issued, but tries to flush the output buffer and execute
* callbacks for all remaining replies. When this function is called from a
* callback, there might be more replies and we can safely defer disconnecting
- * to redisProcessCallbacks(). Otherwise, we can only disconnect immediately
+ * to redictProcessCallbacks(). Otherwise, we can only disconnect immediately
* when there are no pending callbacks. */
-void redisAsyncDisconnect(redisAsyncContext *ac) {
- redisContext *c = &(ac->c);
- c->flags |= REDIS_DISCONNECTING;
+void redictAsyncDisconnect(redictAsyncContext *ac) {
+ redictContext *c = &(ac->c);
+ c->flags |= REDICT_DISCONNECTING;
/** unset the auto-free flag here, because disconnect undoes this */
- c->flags &= ~REDIS_NO_AUTO_FREE;
- if (!(c->flags & REDIS_IN_CALLBACK) && ac->replies.head == NULL)
- __redisAsyncDisconnect(ac);
+ c->flags &= ~REDICT_NO_AUTO_FREE;
+ if (!(c->flags & REDICT_IN_CALLBACK) && ac->replies.head == NULL)
+ __redictAsyncDisconnect(ac);
}
-static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) {
- redisContext *c = &(ac->c);
+static int __redictGetSubscribeCallback(redictAsyncContext *ac, redictReply *reply, redictCallback *dstcb) {
+ redictContext *c = &(ac->c);
dict *callbacks;
- redisCallback *cb = NULL;
+ redictCallback *cb = NULL;
dictEntry *de;
int pvariant;
char *stype;
@@ -460,10 +460,10 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
/* Match reply with the expected format of a pushed message.
* The type and number of elements (3 to 4) are specified at:
- * https://redis.io/topics/pubsub#format-of-pushed-messages */
- if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH) && reply->elements >= 3) ||
- reply->type == REDIS_REPLY_PUSH) {
- assert(reply->element[0]->type == REDIS_REPLY_STRING);
+ * TODO: documentation for pubsub#format-of-pushed-messages */
+ if ((reply->type == REDICT_REPLY_ARRAY && !(c->flags & REDICT_SUPPORTS_PUSH) && reply->elements >= 3) ||
+ reply->type == REDICT_REPLY_PUSH) {
+ assert(reply->element[0]->type == REDICT_REPLY_STRING);
stype = reply->element[0]->str;
pvariant = (tolower(stype[0]) == 'p') ? 1 : 0;
@@ -473,7 +473,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
callbacks = ac->sub.channels;
/* Locate the right callback */
- if (reply->element[1]->type == REDIS_REPLY_STRING) {
+ if (reply->element[1]->type == REDICT_REPLY_STRING) {
sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len);
if (sname == NULL) goto oom;
@@ -496,7 +496,7 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
/* If this was the last unsubscribe message, revert to
* non-subscribe mode. */
- assert(reply->element[2]->type == REDIS_REPLY_INTEGER);
+ assert(reply->element[2]->type == REDICT_REPLY_INTEGER);
/* Unset subscribed flag only when no pipelined pending subscribe
* or pending unsubscribe replies. */
@@ -504,36 +504,36 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply,
&& dictSize(ac->sub.channels) == 0
&& dictSize(ac->sub.patterns) == 0
&& ac->sub.pending_unsubs == 0) {
- c->flags &= ~REDIS_SUBSCRIBED;
+ c->flags &= ~REDICT_SUBSCRIBED;
/* Move ongoing regular command callbacks. */
- redisCallback cb;
- while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) {
- __redisPushCallback(&ac->replies,&cb);
+ redictCallback cb;
+ while (__redictShiftCallback(&ac->sub.replies,&cb) == REDICT_OK) {
+ __redictPushCallback(&ac->replies,&cb);
}
}
}
sdsfree(sname);
} else {
/* Shift callback for pending command in subscribed context. */
- __redisShiftCallback(&ac->sub.replies,dstcb);
+ __redictShiftCallback(&ac->sub.replies,dstcb);
}
- return REDIS_OK;
+ return REDICT_OK;
oom:
- __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
- __redisAsyncCopyError(ac);
- return REDIS_ERR;
+ __redictSetError(&(ac->c), REDICT_ERR_OOM, "Out of memory");
+ __redictAsyncCopyError(ac);
+ return REDICT_ERR;
}
-#define redisIsSpontaneousPushReply(r) \
- (redisIsPushReply(r) && !redisIsSubscribeReply(r))
+#define redictIsSpontaneousPushReply(r) \
+ (redictIsPushReply(r) && !redictIsSubscribeReply(r))
-static int redisIsSubscribeReply(redisReply *reply) {
+static int redictIsSubscribeReply(redictReply *reply) {
char *str;
size_t len, off;
/* We will always have at least one string with the subscribe/message type */
- if (reply->elements < 1 || reply->element[0]->type != REDIS_REPLY_STRING ||
+ if (reply->elements < 1 || reply->element[0]->type != REDICT_REPLY_STRING ||
reply->element[0]->len < sizeof("message") - 1)
{
return 0;
@@ -549,18 +549,18 @@ static int redisIsSubscribeReply(redisReply *reply) {
!strncasecmp(str, "unsubscribe", len);
}
-void redisProcessCallbacks(redisAsyncContext *ac) {
- redisContext *c = &(ac->c);
+void redictProcessCallbacks(redictAsyncContext *ac) {
+ redictContext *c = &(ac->c);
void *reply = NULL;
int status;
- while((status = redisGetReply(c,&reply)) == REDIS_OK) {
+ while((status = redictGetReply(c,&reply)) == REDICT_OK) {
if (reply == NULL) {
/* When the connection is being disconnected and there are
* no more replies, this is the cue to really disconnect. */
- if (c->flags & REDIS_DISCONNECTING && sdslen(c->obuf) == 0
+ if (c->flags & REDICT_DISCONNECTING && sdslen(c->obuf) == 0
&& ac->replies.head == NULL) {
- __redisAsyncDisconnect(ac);
+ __redictAsyncDisconnect(ac);
return;
}
/* When the connection is not being disconnected, simply stop
@@ -569,22 +569,22 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
}
/* Keep track of push message support for subscribe handling */
- if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH;
+ if (redictIsPushReply(reply)) c->flags |= REDICT_SUPPORTS_PUSH;
/* Send any non-subscribe related PUSH messages to our PUSH handler
* while allowing subscribe related PUSH messages to pass through.
* This allows existing code to be backward compatible and work in
* either RESP2 or RESP3 mode. */
- if (redisIsSpontaneousPushReply(reply)) {
- __redisRunPushCallback(ac, reply);
+ if (redictIsSpontaneousPushReply(reply)) {
+ __redictRunPushCallback(ac, reply);
c->reader->fn->freeObject(reply);
continue;
}
/* Even if the context is subscribed, pending regular
* callbacks will get a reply before pub/sub messages arrive. */
- redisCallback cb = {NULL, NULL, 0, 0, NULL};
- if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
+ redictCallback cb = {NULL, NULL, 0, 0, NULL};
+ if (__redictShiftCallback(&ac->replies,&cb) != REDICT_OK) {
/*
* A spontaneous reply in a not-subscribed context can be the error
* reply that is sent when a new connection exceeds the maximum
@@ -600,28 +600,28 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
* In this case we also want to close the connection, and have the
* user wait until the server is ready to take our request.
*/
- if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
- c->err = REDIS_ERR_OTHER;
- snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
+ if (((redictReply*)reply)->type == REDICT_REPLY_ERROR) {
+ c->err = REDICT_ERR_OTHER;
+ snprintf(c->errstr,sizeof(c->errstr),"%s",((redictReply*)reply)->str);
c->reader->fn->freeObject(reply);
- __redisAsyncDisconnect(ac);
+ __redictAsyncDisconnect(ac);
return;
}
/* No more regular callbacks and no errors, the context *must* be subscribed. */
- assert(c->flags & REDIS_SUBSCRIBED);
- if (c->flags & REDIS_SUBSCRIBED)
- __redisGetSubscribeCallback(ac,reply,&cb);
+ assert(c->flags & REDICT_SUBSCRIBED);
+ if (c->flags & REDICT_SUBSCRIBED)
+ __redictGetSubscribeCallback(ac,reply,&cb);
}
if (cb.fn != NULL) {
- __redisRunCallback(ac,&cb,reply);
- if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){
+ __redictRunCallback(ac,&cb,reply);
+ if (!(c->flags & REDICT_NO_AUTO_FREE_REPLIES)){
c->reader->fn->freeObject(reply);
}
- /* Proceed with free'ing when redisAsyncFree() was called. */
- if (c->flags & REDIS_FREEING) {
- __redisAsyncFree(ac);
+ /* Proceed with free'ing when redictAsyncFree() was called. */
+ if (c->flags & REDICT_FREEING) {
+ __redictAsyncFree(ac);
return;
}
} else {
@@ -633,99 +633,99 @@ void redisProcessCallbacks(redisAsyncContext *ac) {
}
/* If in monitor mode, repush the callback */
- if (c->flags & REDIS_MONITORING) {
- __redisPushCallback(&ac->replies,&cb);
+ if (c->flags & REDICT_MONITORING) {
+ __redictPushCallback(&ac->replies,&cb);
}
}
/* Disconnect when there was an error reading the reply */
- if (status != REDIS_OK)
- __redisAsyncDisconnect(ac);
+ if (status != REDICT_OK)
+ __redictAsyncDisconnect(ac);
}
-static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) {
- __redisRunConnectCallback(ac, REDIS_ERR);
- __redisAsyncDisconnect(ac);
+static void __redictAsyncHandleConnectFailure(redictAsyncContext *ac) {
+ __redictRunConnectCallback(ac, REDICT_ERR);
+ __redictAsyncDisconnect(ac);
}
/* Internal helper function to detect socket status the first time a read or
* write event fires. When connecting was not successful, the connect callback
- * is called with a REDIS_ERR status and the context is free'd. */
-static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
+ * is called with a REDICT_ERR status and the context is free'd. */
+static int __redictAsyncHandleConnect(redictAsyncContext *ac) {
int completed = 0;
- redisContext *c = &(ac->c);
+ redictContext *c = &(ac->c);
- if (redisCheckConnectDone(c, &completed) == REDIS_ERR) {
+ if (redictCheckConnectDone(c, &completed) == REDICT_ERR) {
/* Error! */
- if (redisCheckSocketError(c) == REDIS_ERR)
- __redisAsyncCopyError(ac);
- __redisAsyncHandleConnectFailure(ac);
- return REDIS_ERR;
+ if (redictCheckSocketError(c) == REDICT_ERR)
+ __redictAsyncCopyError(ac);
+ __redictAsyncHandleConnectFailure(ac);
+ return REDICT_ERR;
} else if (completed == 1) {
/* connected! */
- if (c->connection_type == REDIS_CONN_TCP &&
- redisSetTcpNoDelay(c) == REDIS_ERR) {
- __redisAsyncHandleConnectFailure(ac);
- return REDIS_ERR;
+ if (c->connection_type == REDICT_CONN_TCP &&
+ redictSetTcpNoDelay(c) == REDICT_ERR) {
+ __redictAsyncHandleConnectFailure(ac);
+ return REDICT_ERR;
}
/* flag us as fully connect, but allow the callback
* to disconnect. For that reason, permit the function
* to delete the context here after callback return.
*/
- c->flags |= REDIS_CONNECTED;
- __redisRunConnectCallback(ac, REDIS_OK);
- if ((ac->c.flags & REDIS_DISCONNECTING)) {
- redisAsyncDisconnect(ac);
- return REDIS_ERR;
- } else if ((ac->c.flags & REDIS_FREEING)) {
- redisAsyncFree(ac);
- return REDIS_ERR;
+ c->flags |= REDICT_CONNECTED;
+ __redictRunConnectCallback(ac, REDICT_OK);
+ if ((ac->c.flags & REDICT_DISCONNECTING)) {
+ redictAsyncDisconnect(ac);
+ return REDICT_ERR;
+ } else if ((ac->c.flags & REDICT_FREEING)) {
+ redictAsyncFree(ac);
+ return REDICT_ERR;
}
- return REDIS_OK;
+ return REDICT_OK;
} else {
- return REDIS_OK;
+ return REDICT_OK;
}
}
-void redisAsyncRead(redisAsyncContext *ac) {
- redisContext *c = &(ac->c);
+void redictAsyncRead(redictAsyncContext *ac) {
+ redictContext *c = &(ac->c);
- if (redisBufferRead(c) == REDIS_ERR) {
- __redisAsyncDisconnect(ac);
+ if (redictBufferRead(c) == REDICT_ERR) {
+ __redictAsyncDisconnect(ac);
} else {
/* Always re-schedule reads */
_EL_ADD_READ(ac);
- redisProcessCallbacks(ac);
+ redictProcessCallbacks(ac);
}
}
/* This function should be called when the socket is readable.
* It processes all replies that can be read and executes their callbacks.
*/
-void redisAsyncHandleRead(redisAsyncContext *ac) {
- redisContext *c = &(ac->c);
+void redictAsyncHandleRead(redictAsyncContext *ac) {
+ redictContext *c = &(ac->c);
/* must not be called from a callback */
- assert(!(c->flags & REDIS_IN_CALLBACK));
+ assert(!(c->flags & REDICT_IN_CALLBACK));
- if (!(c->flags & REDIS_CONNECTED)) {
+ if (!(c->flags & REDICT_CONNECTED)) {
/* Abort connect was not successful. */
- if (__redisAsyncHandleConnect(ac) != REDIS_OK)
+ if (__redictAsyncHandleConnect(ac) != REDICT_OK)
return;
/* Try again later when the context is still not connected. */
- if (!(c->flags & REDIS_CONNECTED))
+ if (!(c->flags & REDICT_CONNECTED))
return;
}
c->funcs->async_read(ac);
}
-void redisAsyncWrite(redisAsyncContext *ac) {
- redisContext *c = &(ac->c);
+void redictAsyncWrite(redictAsyncContext *ac) {
+ redictContext *c = &(ac->c);
int done = 0;
- if (redisBufferWrite(c,&done) == REDIS_ERR) {
- __redisAsyncDisconnect(ac);
+ if (redictBufferWrite(c,&done) == REDICT_ERR) {
+ __redictAsyncDisconnect(ac);
} else {
/* Continue writing when not done, stop writing otherwise */
if (!done)
@@ -738,30 +738,30 @@ void redisAsyncWrite(redisAsyncContext *ac) {
}
}
-void redisAsyncHandleWrite(redisAsyncContext *ac) {
- redisContext *c = &(ac->c);
+void redictAsyncHandleWrite(redictAsyncContext *ac) {
+ redictContext *c = &(ac->c);
/* must not be called from a callback */
- assert(!(c->flags & REDIS_IN_CALLBACK));
+ assert(!(c->flags & REDICT_IN_CALLBACK));
- if (!(c->flags & REDIS_CONNECTED)) {
+ if (!(c->flags & REDICT_CONNECTED)) {
/* Abort connect was not successful. */
- if (__redisAsyncHandleConnect(ac) != REDIS_OK)
+ if (__redictAsyncHandleConnect(ac) != REDICT_OK)
return;
/* Try again later when the context is still not connected. */
- if (!(c->flags & REDIS_CONNECTED))
+ if (!(c->flags & REDICT_CONNECTED))
return;
}
c->funcs->async_write(ac);
}
-void redisAsyncHandleTimeout(redisAsyncContext *ac) {
- redisContext *c = &(ac->c);
- redisCallback cb;
+void redictAsyncHandleTimeout(redictAsyncContext *ac) {
+ redictContext *c = &(ac->c);
+ redictCallback cb;
/* must not be called from a callback */
- assert(!(c->flags & REDIS_IN_CALLBACK));
+ assert(!(c->flags & REDICT_IN_CALLBACK));
- if ((c->flags & REDIS_CONNECTED)) {
+ if ((c->flags & REDICT_CONNECTED)) {
if (ac->replies.head == NULL && ac->sub.replies.head == NULL) {
/* Nothing to do - just an idle timeout */
return;
@@ -775,23 +775,23 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) {
}
if (!c->err) {
- __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout");
- __redisAsyncCopyError(ac);
+ __redictSetError(c, REDICT_ERR_TIMEOUT, "Timeout");
+ __redictAsyncCopyError(ac);
}
- if (!(c->flags & REDIS_CONNECTED)) {
- __redisRunConnectCallback(ac, REDIS_ERR);
+ if (!(c->flags & REDICT_CONNECTED)) {
+ __redictRunConnectCallback(ac, REDICT_ERR);
}
- while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
- __redisRunCallback(ac, &cb, NULL);
+ while (__redictShiftCallback(&ac->replies, &cb) == REDICT_OK) {
+ __redictRunCallback(ac, &cb, NULL);
}
/**
* TODO: Don't automatically sever the connection,
* rather, allow to ignore <x> responses before the queue is clear
*/
- __redisAsyncDisconnect(ac);
+ __redictAsyncDisconnect(ac);
}
/* Sets a pointer to the first argument and its length starting at p. Returns
@@ -810,16 +810,16 @@ static const char *nextArgument(const char *start, const char **str, size_t *len
return p+2+(*len)+2;
}
-/* Helper function for the redisAsyncCommand* family of functions. Writes a
+/* Helper function for the redictAsyncCommand* family of functions. Writes a
* formatted command to the output buffer and registers the provided callback
* function with the context. */
-static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
- redisContext *c = &(ac->c);
- redisCallback cb;
+static int __redictAsyncCommand(redictAsyncContext *ac, redictCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
+ redictContext *c = &(ac->c);
+ redictCallback cb;
struct dict *cbdict;
dictIterator it;
dictEntry *de;
- redisCallback *existcb;
+ redictCallback *existcb;
int pvariant, hasnext;
const char *cstr, *astr;
size_t clen, alen;
@@ -828,7 +828,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
int ret;
/* Don't accept new commands when the connection is about to be closed. */
- if (c->flags & (REDIS_DISCONNECTING | REDIS_FREEING)) return REDIS_ERR;
+ if (c->flags & (REDICT_DISCONNECTING | REDICT_FREEING)) return REDICT_ERR;
/* Setup callback */
cb.fn = fn;
@@ -845,7 +845,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
clen -= pvariant;
if (hasnext && strncasecmp(cstr,"subscribe\r\n",11) == 0) {
- c->flags |= REDIS_SUBSCRIBED;
+ c->flags |= REDICT_SUBSCRIBED;
/* Add every channel/pattern to the list of subscription callbacks. */
while ((p = nextArgument(p,&astr,&alen)) != NULL) {
@@ -872,7 +872,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
} else if (strncasecmp(cstr,"unsubscribe\r\n",13) == 0) {
/* It is only useful to call (P)UNSUBSCRIBE when the context is
* subscribed to one or more channels or patterns. */
- if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR;
+ if (!(c->flags & REDICT_SUBSCRIBED)) return REDICT_ERR;
if (pvariant)
cbdict = ac->sub.patterns;
@@ -924,85 +924,85 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
* should not append a callback function for this command. */
} else if (strncasecmp(cstr,"monitor\r\n",9) == 0) {
/* Set monitor flag and push callback */
- c->flags |= REDIS_MONITORING;
- if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK)
+ c->flags |= REDICT_MONITORING;
+ if (__redictPushCallback(&ac->replies,&cb) != REDICT_OK)
goto oom;
} else {
- if (c->flags & REDIS_SUBSCRIBED) {
- if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK)
+ if (c->flags & REDICT_SUBSCRIBED) {
+ if (__redictPushCallback(&ac->sub.replies,&cb) != REDICT_OK)
goto oom;
} else {
- if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK)
+ if (__redictPushCallback(&ac->replies,&cb) != REDICT_OK)
goto oom;
}
}
- __redisAppendCommand(c,cmd,len);
+ __redictAppendCommand(c,cmd,len);
/* Always schedule a write when the write buffer is non-empty */
_EL_ADD_WRITE(ac);
- return REDIS_OK;
+ return REDICT_OK;
oom:
- __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory");
- __redisAsyncCopyError(ac);
- return REDIS_ERR;
+ __redictSetError(&(ac->c), REDICT_ERR_OOM, "Out of memory");
+ __redictAsyncCopyError(ac);
+ return REDICT_ERR;
}
-int redisvAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, va_list ap) {
+int redictvAsyncCommand(redictAsyncContext *ac, redictCallbackFn *fn, void *privdata, const char *format, va_list ap) {
char *cmd;
int len;
int status;
- len = redisvFormatCommand(&cmd,format,ap);
+ len = redictvFormatCommand(&cmd,format,ap);
/* We don't want to pass -1 or -2 to future functions as a length. */
if (len < 0)
- return REDIS_ERR;
+ return REDICT_ERR;
- status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
+ status = __redictAsyncCommand(ac,fn,privdata,cmd,len);
hi_free(cmd);
return status;
}
-int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *format, ...) {
+int redictAsyncCommand(redictAsyncContext *ac, redictCallbackFn *fn, void *privdata, const char *format, ...) {
va_list ap;
int status;
va_start(ap,format);
- status = redisvAsyncCommand(ac,fn,privdata,format,ap);
+ status = redictvAsyncCommand(ac,fn,privdata,format,ap);
va_end(ap);
return status;
}
-int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
+int redictAsyncCommandArgv(redictAsyncContext *ac, redictCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) {
sds cmd;
long long len;
int status;
- len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
+ len = redictFormatSdsCommandArgv(&cmd,argc,argv,argvlen);
if (len < 0)
- return REDIS_ERR;
- status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
+ return REDICT_ERR;
+ status = __redictAsyncCommand(ac,fn,privdata,cmd,len);
sdsfree(cmd);
return status;
}
-int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
- int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
+int redictAsyncFormattedCommand(redictAsyncContext *ac, redictCallbackFn *fn, void *privdata, const char *cmd, size_t len) {
+ int status = __redictAsyncCommand(ac,fn,privdata,cmd,len);
return status;
}
-redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn) {
- redisAsyncPushFn *old = ac->push_cb;
+redictAsyncPushFn *redictAsyncSetPushCallback(redictAsyncContext *ac, redictAsyncPushFn *fn) {
+ redictAsyncPushFn *old = ac->push_cb;
ac->push_cb = fn;
return old;
}
-int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
+int redictAsyncSetTimeout(redictAsyncContext *ac, struct timeval tv) {
if (!ac->c.command_timeout) {
ac->c.command_timeout = hi_calloc(1, sizeof(tv));
if (ac->c.command_timeout == NULL) {
- __redisSetError(&ac->c, REDIS_ERR_OOM, "Out of memory");
- __redisAsyncCopyError(ac);
- return REDIS_ERR;
+ __redictSetError(&ac->c, REDICT_ERR_OOM, "Out of memory");
+ __redictAsyncCopyError(ac);
+ return REDICT_ERR;
}
}
@@ -1012,5 +1012,5 @@ int redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
*ac->c.command_timeout = tv;
}
- return REDIS_OK;
+ return REDICT_OK;
}