summaryrefslogtreecommitdiff
path: root/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'async.c')
-rw-r--r--async.c62
1 files changed, 58 insertions, 4 deletions
diff --git a/async.c b/async.c
index db59036..b789f93 100644
--- a/async.c
+++ b/async.c
@@ -42,15 +42,19 @@
#include "sds.h"
#include "sslio.h"
-#define _EL_ADD_READ(ctx) do { \
+#define _EL_ADD_READ(ctx) \
+ do { \
+ refreshTimeout(ctx); \
if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
- } while(0)
+ } while (0)
#define _EL_DEL_READ(ctx) do { \
if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
} while(0)
-#define _EL_ADD_WRITE(ctx) do { \
+#define _EL_ADD_WRITE(ctx) \
+ do { \
+ refreshTimeout(ctx); \
if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
- } while(0)
+ } while (0)
#define _EL_DEL_WRITE(ctx) do { \
if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
} while(0)
@@ -58,6 +62,19 @@
if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \
} while(0);
+static void refreshTimeout(redisAsyncContext *ctx) {
+ if (ctx->c.timeout && ctx->ev.scheduleTimer &&
+ (ctx->c.timeout->tv_sec || ctx->c.timeout->tv_usec)) {
+ ctx->ev.scheduleTimer(ctx->ev.data, *ctx->c.timeout);
+ // } else {
+ // printf("Not scheduling timer.. (tmo=%p)\n", ctx->c.timeout);
+ // if (ctx->c.timeout){
+ // printf("tv_sec: %u. tv_usec: %u\n", ctx->c.timeout->tv_sec,
+ // ctx->c.timeout->tv_usec);
+ // }
+ }
+}
+
/* Forward declaration of function in hiredis.c */
int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
@@ -656,6 +673,30 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
}
}
+void __redisSetError(redisContext *c, int type, const char *str);
+
+void redisAsyncHandleTimeout(redisAsyncContext *ac) {
+ redisContext *c = &(ac->c);
+ redisCallback cb;
+
+ if ((c->flags & REDIS_CONNECTED) && ac->replies.head == NULL) {
+ /* Nothing to do - just an idle timeout */
+ return;
+ }
+
+ if (!c->err) {
+ __redisSetError(c, REDIS_ERR_TIMEOUT, "Timeout");
+ }
+
+ if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) {
+ ac->onConnect(ac, REDIS_ERR);
+ }
+
+ while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) {
+ __redisRunCallback(ac, &cb, NULL);
+ }
+}
+
/* Sets a pointer to the first argument and its length starting at p. Returns
* the number of bytes to skip to get to the following argument. */
static const char *nextArgument(const char *start, const char **str, size_t *len) {
@@ -795,3 +836,16 @@ int redisAsyncFormattedCommand(redisAsyncContext *ac, redisCallbackFn *fn, void
int status = __redisAsyncCommand(ac,fn,privdata,cmd,len);
return status;
}
+
+void redisAsyncSetTimeout(redisAsyncContext *ac, struct timeval tv) {
+ if (!ac->c.timeout) {
+ ac->c.timeout = calloc(1, sizeof(tv));
+ }
+
+ if (tv.tv_sec == ac->c.timeout->tv_sec &&
+ tv.tv_usec == ac->c.timeout->tv_usec) {
+ return;
+ }
+
+ *ac->c.timeout = tv;
+} \ No newline at end of file