summaryrefslogtreecommitdiff
path: root/async.c
diff options
context:
space:
mode:
Diffstat (limited to 'async.c')
-rw-r--r--async.c143
1 files changed, 21 insertions, 122 deletions
diff --git a/async.c b/async.c
index e46573f..4f422d5 100644
--- a/async.c
+++ b/async.c
@@ -42,42 +42,9 @@
#include "net.h"
#include "dict.c"
#include "sds.h"
-#include "sslio.h"
#include "win32.h"
-#define _EL_ADD_READ(ctx) \
- do { \
- refreshTimeout(ctx); \
- if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \
- } while (0)
-#define _EL_DEL_READ(ctx) do { \
- if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \
- } while(0)
-#define _EL_ADD_WRITE(ctx) \
- do { \
- refreshTimeout(ctx); \
- if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \
- } while (0)
-#define _EL_DEL_WRITE(ctx) do { \
- if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \
- } while(0)
-#define _EL_CLEANUP(ctx) do { \
- if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \
- ctx->ev.cleanup = NULL; \
- } while(0);
-
-static void refreshTimeout(redisAsyncContext *ctx) {
- if (ctx->c.timeout && ctx->ev.scheduleTimer &&
- (ctx->c.timeout->tv_sec || ctx->c.timeout->tv_usec)) {
- ctx->ev.scheduleTimer(ctx->ev.data, *ctx->c.timeout);
- // } else {
- // printf("Not scheduling timer.. (tmo=%p)\n", ctx->c.timeout);
- // if (ctx->c.timeout){
- // printf("tv_sec: %u. tv_usec: %u\n", ctx->c.timeout->tv_sec,
- // ctx->c.timeout->tv_usec);
- // }
- }
-}
+#include "async_private.h"
/* Forward declaration of function in hiredis.c */
int __redisAppendCommand(redisContext *c, const char *cmd, size_t len);
@@ -347,7 +314,7 @@ void redisAsyncFree(redisAsyncContext *ac) {
}
/* Helper function to make the disconnect happen and clean up. */
-static void __redisAsyncDisconnect(redisAsyncContext *ac) {
+void __redisAsyncDisconnect(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
/* Make sure error is accessible if there is any */
@@ -552,76 +519,18 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) {
}
}
-/**
- * Handle SSL when socket becomes available for reading. This also handles
- * read-while-write and write-while-read.
- *
- * These functions will not work properly unless `HIREDIS_SSL` is defined
- * (however, they will compile)
- */
-static void asyncSslRead(redisAsyncContext *ac) {
- int rv;
- redisSsl *ssl = ac->c.ssl;
- redisContext *c = &ac->c;
-
- ssl->wantRead = 0;
-
- if (ssl->pendingWrite) {
- int done;
-
- /* This is probably just a write event */
- ssl->pendingWrite = 0;
- rv = redisBufferWrite(c, &done);
- if (rv == REDIS_ERR) {
- __redisAsyncDisconnect(ac);
- return;
- } else if (!done) {
- _EL_ADD_WRITE(ac);
- }
- }
+void redisAsyncRead(redisAsyncContext *ac) {
+ redisContext *c = &(ac->c);
- rv = redisBufferRead(c);
- if (rv == REDIS_ERR) {
+ if (redisBufferRead(c) == REDIS_ERR) {
__redisAsyncDisconnect(ac);
} else {
+ /* Always re-schedule reads */
_EL_ADD_READ(ac);
redisProcessCallbacks(ac);
}
}
-/**
- * Handle SSL when socket becomes available for writing
- */
-static void asyncSslWrite(redisAsyncContext *ac) {
- int rv, done = 0;
- redisSsl *ssl = ac->c.ssl;
- redisContext *c = &ac->c;
-
- ssl->pendingWrite = 0;
- rv = redisBufferWrite(c, &done);
- if (rv == REDIS_ERR) {
- __redisAsyncDisconnect(ac);
- return;
- }
-
- if (!done) {
- if (ssl->wantRead) {
- /* Need to read-before-write */
- ssl->pendingWrite = 1;
- _EL_DEL_WRITE(ac);
- } else {
- /* No extra reads needed, just need to write more */
- _EL_ADD_WRITE(ac);
- }
- } else {
- /* Already done! */
- _EL_DEL_WRITE(ac);
- }
-
- /* Always reschedule a read */
- _EL_ADD_READ(ac);
-}
-
/* This function should be called when the socket is readable.
* It processes all replies that can be read and executes their callbacks.
*/
@@ -637,23 +546,29 @@ void redisAsyncHandleRead(redisAsyncContext *ac) {
return;
}
- if (c->flags & REDIS_SSL) {
- asyncSslRead(ac);
- return;
- }
+ c->funcs->async_read(ac);
+}
- if (redisBufferRead(c) == REDIS_ERR) {
+void redisAsyncWrite(redisAsyncContext *ac) {
+ redisContext *c = &(ac->c);
+ int done = 0;
+
+ if (redisBufferWrite(c,&done) == REDIS_ERR) {
__redisAsyncDisconnect(ac);
} else {
- /* Always re-schedule reads */
+ /* Continue writing when not done, stop writing otherwise */
+ if (!done)
+ _EL_ADD_WRITE(ac);
+ else
+ _EL_DEL_WRITE(ac);
+
+ /* Always schedule reads after writes */
_EL_ADD_READ(ac);
- redisProcessCallbacks(ac);
}
}
void redisAsyncHandleWrite(redisAsyncContext *ac) {
redisContext *c = &(ac->c);
- int done = 0;
if (!(c->flags & REDIS_CONNECTED)) {
/* Abort connect was not successful. */
@@ -664,23 +579,7 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) {
return;
}
- if (c->flags & REDIS_SSL) {
- asyncSslWrite(ac);
- return;
- }
-
- if (redisBufferWrite(c,&done) == REDIS_ERR) {
- __redisAsyncDisconnect(ac);
- } else {
- /* Continue writing when not done, stop writing otherwise */
- if (!done)
- _EL_ADD_WRITE(ac);
- else
- _EL_DEL_WRITE(ac);
-
- /* Always schedule reads after writes */
- _EL_ADD_READ(ac);
- }
+ c->funcs->async_write(ac);
}
void __redisSetError(redisContext *c, int type, const char *str);