diff options
Diffstat (limited to 'async.c')
-rw-r--r-- | async.c | 143 |
1 files changed, 21 insertions, 122 deletions
@@ -42,42 +42,9 @@ #include "net.h" #include "dict.c" #include "sds.h" -#include "sslio.h" #include "win32.h" -#define _EL_ADD_READ(ctx) \ - do { \ - refreshTimeout(ctx); \ - if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \ - } while (0) -#define _EL_DEL_READ(ctx) do { \ - if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \ - } while(0) -#define _EL_ADD_WRITE(ctx) \ - do { \ - refreshTimeout(ctx); \ - if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \ - } while (0) -#define _EL_DEL_WRITE(ctx) do { \ - if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \ - } while(0) -#define _EL_CLEANUP(ctx) do { \ - if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \ - ctx->ev.cleanup = NULL; \ - } while(0); - -static void refreshTimeout(redisAsyncContext *ctx) { - if (ctx->c.timeout && ctx->ev.scheduleTimer && - (ctx->c.timeout->tv_sec || ctx->c.timeout->tv_usec)) { - ctx->ev.scheduleTimer(ctx->ev.data, *ctx->c.timeout); - // } else { - // printf("Not scheduling timer.. (tmo=%p)\n", ctx->c.timeout); - // if (ctx->c.timeout){ - // printf("tv_sec: %u. tv_usec: %u\n", ctx->c.timeout->tv_sec, - // ctx->c.timeout->tv_usec); - // } - } -} +#include "async_private.h" /* Forward declaration of function in hiredis.c */ int __redisAppendCommand(redisContext *c, const char *cmd, size_t len); @@ -347,7 +314,7 @@ void redisAsyncFree(redisAsyncContext *ac) { } /* Helper function to make the disconnect happen and clean up. */ -static void __redisAsyncDisconnect(redisAsyncContext *ac) { +void __redisAsyncDisconnect(redisAsyncContext *ac) { redisContext *c = &(ac->c); /* Make sure error is accessible if there is any */ @@ -552,76 +519,18 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) { } } -/** - * Handle SSL when socket becomes available for reading. This also handles - * read-while-write and write-while-read. - * - * These functions will not work properly unless `HIREDIS_SSL` is defined - * (however, they will compile) - */ -static void asyncSslRead(redisAsyncContext *ac) { - int rv; - redisSsl *ssl = ac->c.ssl; - redisContext *c = &ac->c; - - ssl->wantRead = 0; - - if (ssl->pendingWrite) { - int done; - - /* This is probably just a write event */ - ssl->pendingWrite = 0; - rv = redisBufferWrite(c, &done); - if (rv == REDIS_ERR) { - __redisAsyncDisconnect(ac); - return; - } else if (!done) { - _EL_ADD_WRITE(ac); - } - } +void redisAsyncRead(redisAsyncContext *ac) { + redisContext *c = &(ac->c); - rv = redisBufferRead(c); - if (rv == REDIS_ERR) { + if (redisBufferRead(c) == REDIS_ERR) { __redisAsyncDisconnect(ac); } else { + /* Always re-schedule reads */ _EL_ADD_READ(ac); redisProcessCallbacks(ac); } } -/** - * Handle SSL when socket becomes available for writing - */ -static void asyncSslWrite(redisAsyncContext *ac) { - int rv, done = 0; - redisSsl *ssl = ac->c.ssl; - redisContext *c = &ac->c; - - ssl->pendingWrite = 0; - rv = redisBufferWrite(c, &done); - if (rv == REDIS_ERR) { - __redisAsyncDisconnect(ac); - return; - } - - if (!done) { - if (ssl->wantRead) { - /* Need to read-before-write */ - ssl->pendingWrite = 1; - _EL_DEL_WRITE(ac); - } else { - /* No extra reads needed, just need to write more */ - _EL_ADD_WRITE(ac); - } - } else { - /* Already done! */ - _EL_DEL_WRITE(ac); - } - - /* Always reschedule a read */ - _EL_ADD_READ(ac); -} - /* This function should be called when the socket is readable. * It processes all replies that can be read and executes their callbacks. */ @@ -637,23 +546,29 @@ void redisAsyncHandleRead(redisAsyncContext *ac) { return; } - if (c->flags & REDIS_SSL) { - asyncSslRead(ac); - return; - } + c->funcs->async_read(ac); +} - if (redisBufferRead(c) == REDIS_ERR) { +void redisAsyncWrite(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + int done = 0; + + if (redisBufferWrite(c,&done) == REDIS_ERR) { __redisAsyncDisconnect(ac); } else { - /* Always re-schedule reads */ + /* Continue writing when not done, stop writing otherwise */ + if (!done) + _EL_ADD_WRITE(ac); + else + _EL_DEL_WRITE(ac); + + /* Always schedule reads after writes */ _EL_ADD_READ(ac); - redisProcessCallbacks(ac); } } void redisAsyncHandleWrite(redisAsyncContext *ac) { redisContext *c = &(ac->c); - int done = 0; if (!(c->flags & REDIS_CONNECTED)) { /* Abort connect was not successful. */ @@ -664,23 +579,7 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { return; } - if (c->flags & REDIS_SSL) { - asyncSslWrite(ac); - return; - } - - if (redisBufferWrite(c,&done) == REDIS_ERR) { - __redisAsyncDisconnect(ac); - } else { - /* Continue writing when not done, stop writing otherwise */ - if (!done) - _EL_ADD_WRITE(ac); - else - _EL_DEL_WRITE(ac); - - /* Always schedule reads after writes */ - _EL_ADD_READ(ac); - } + c->funcs->async_write(ac); } void __redisSetError(redisContext *c, int type, const char *str); |