diff options
-rw-r--r-- | async.c | 16 | ||||
-rw-r--r-- | hiredis.h | 3 | ||||
-rw-r--r-- | net.c | 36 |
3 files changed, 40 insertions, 15 deletions
@@ -372,6 +372,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); return; } + + /* If monitor mode, repush callback */ + if(c->flags & REDIS_MONITORING) { + __redisPushCallback(&ac->replies,&cb); + } /* When the connection is not being disconnected, simply stop * trying to get replies and wait for the next loop tick. */ @@ -394,9 +399,10 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); return; } - /* No more regular callbacks and no errors, the context *must* be subscribed. */ - assert(c->flags & REDIS_SUBSCRIBED); - __redisGetSubscribeCallback(ac,reply,&cb); + /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */ + assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)); + if(c->flags & REDIS_SUBSCRIBED) + __redisGetSubscribeCallback(ac,reply,&cb); } if (cb.fn != NULL) { @@ -557,6 +563,10 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void /* (P)UNSUBSCRIBE does not have its own response: every channel or * pattern that is unsubscribed will receive a message. This means we * should not append a callback function for this command. */ + } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) { + /* Set monitor flag and push callback */ + c->flags |= REDIS_MONITORING; + __redisPushCallback(&ac->replies,&cb); } else { if (c->flags & REDIS_SUBSCRIBED) /* This will likely result in an error reply, but it needs to be @@ -76,6 +76,9 @@ /* Flag that is set when the async context has one or more subscriptions. */ #define REDIS_SUBSCRIBED 0x20 +/* Flag that is set when monitor mode is active */ +#define REDIS_MONITORING 0x40 + #define REDIS_REPLY_STRING 1 #define REDIS_REPLY_ARRAY 2 #define REDIS_REPLY_INTEGER 3 @@ -45,6 +45,8 @@ #include <errno.h> #include <stdarg.h> #include <stdio.h> +#include <poll.h> +#include <limits.h> #include "net.h" #include "sds.h" @@ -121,28 +123,38 @@ static int redisSetTcpNoDelay(redisContext *c, int fd) { return REDIS_OK; } +#define __MAX_MSEC (((LONG_MAX) - 999) / 1000) + static int redisContextWaitReady(redisContext *c, int fd, const struct timeval *timeout) { - struct timeval to; - struct timeval *toptr = NULL; - fd_set wfd; + struct pollfd wfd[1]; + long msec; + + msec = -1; + wfd[0].fd = fd; + wfd[0].events = POLLOUT; /* Only use timeout when not NULL. */ if (timeout != NULL) { - to = *timeout; - toptr = &to; + if (timeout->tv_usec > 1000000 || timeout->tv_sec > __MAX_MSEC) { + close(fd); + return REDIS_ERR; + } + + msec = (timeout->tv_sec * 1000) + ((timeout->tv_usec + 999) / 1000); + + if (msec < 0 || msec > INT_MAX) { + msec = INT_MAX; + } } if (errno == EINPROGRESS) { - FD_ZERO(&wfd); - FD_SET(fd, &wfd); + int res; - if (select(FD_SETSIZE, NULL, &wfd, NULL, toptr) == -1) { - __redisSetErrorFromErrno(c,REDIS_ERR_IO,"select(2)"); + if ((res = poll(wfd, 1, msec)) == -1) { + __redisSetErrorFromErrno(c, REDIS_ERR_IO, "poll(2)"); close(fd); return REDIS_ERR; - } - - if (!FD_ISSET(fd, &wfd)) { + } else if (res == 0) { errno = ETIMEDOUT; __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL); close(fd); |