diff options
-rw-r--r-- | CHANGELOG.md | 16 | ||||
-rw-r--r-- | Makefile | 115 | ||||
-rw-r--r-- | adapters/ae.h | 20 | ||||
-rw-r--r-- | adapters/libev.h | 20 | ||||
-rw-r--r-- | adapters/libevent.h | 19 | ||||
-rw-r--r-- | async.c | 85 | ||||
-rw-r--r-- | async.h | 2 | ||||
-rw-r--r-- | example-ae.c | 13 | ||||
-rw-r--r-- | example-libev.c | 12 | ||||
-rw-r--r-- | example-libevent.c | 12 | ||||
-rw-r--r-- | fmacros.h | 8 | ||||
-rw-r--r-- | hiredis.c | 138 | ||||
-rw-r--r-- | hiredis.h | 9 | ||||
-rw-r--r-- | net.c | 36 | ||||
-rw-r--r-- | net.h | 1 | ||||
-rw-r--r-- | test.c | 161 |
16 files changed, 348 insertions, 319 deletions
diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..d41db8a --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,16 @@ +### 0.10.1 + +* Makefile overhaul. Important to check out if you override one or more + variables using environment variables or via arguments to the "make" tool. + +* Issue #45: Fix potential memory leak for a multi bulk reply with 0 elements + being created by the default reply object functions. + +* Issue #43: Don't crash in an asynchronous context when Redis returns an error + reply after the connection has been made (this happens when the maximum + number of connections is reached). + +### 0.10.0 + +* See commit log. + @@ -1,5 +1,6 @@ # Hiredis Makefile -# Copyright (C) 2010 Salvatore Sanfilippo <antirez at gmail dot com> +# Copyright (C) 2010-2011 Salvatore Sanfilippo <antirez at gmail dot com> +# Copyright (C) 2010-2011 Pieter Noordhuis <pcnoordhuis at gmail dot com> # This file is released under the BSD license, see the COPYING file OBJ=net.o hiredis.o sds.o async.o @@ -9,61 +10,37 @@ LIBNAME=libhiredis HIREDIS_MAJOR=0 HIREDIS_MINOR=10 -uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') +# Fallback to gcc when $CC is not in $PATH. +CC:=$(shell sh -c 'type $(CC) >/dev/null 2>/dev/null && echo $(CC) || echo gcc') OPTIMIZATION?=-O3 +WARNINGS=-Wall -W -Wstrict-prototypes -Wwrite-strings +DEBUG?= -g -ggdb +REAL_CFLAGS=$(OPTIMIZATION) -fPIC $(CFLAGS) $(WARNINGS) $(DEBUG) +REAL_LDFLAGS=$(LDFLAGS) + +DYLIBSUFFIX=so +STLIBSUFFIX=a +DYLIB_MINOR_NAME=$(LIBNAME).$(DYLIBSUFFIX).$(HIREDIS_MAJOR).$(HIREDIS_MINOR) +DYLIB_MAJOR_NAME=$(LIBNAME).$(DYLIBSUFFIX).$(HIREDIS_MAJOR) +DYLIBNAME=$(LIBNAME).$(DYLIBSUFFIX) +DYLIB_MAKE_CMD=$(CC) -shared -Wl,-soname,$(DYLIB_MINOR_NAME) -o $(DYLIBNAME) $(LDFLAGS) +STLIBNAME=$(LIBNAME).$(STLIBSUFFIX) +STLIB_MAKE_CMD=ar rcs $(STLIBNAME) + +# Platform-specific overrides +uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') ifeq ($(uname_S),SunOS) - CFLAGS?=$(OPTIMIZATION) -fPIC -Wall -W -D__EXTENSIONS__ -D_XPG6 $(ARCH) $(PROF) - CCLINK?=-ldl -lnsl -lsocket -lm -lpthread - LDFLAGS?=-L. - DYLIBSUFFIX=so - STLIBSUFFIX=a - DYLIB_MINOR_NAME?=$(LIBNAME).$(DYLIBSUFFIX).$(HIREDIS_MAJOR).$(HIREDIS_MINOR) - DYLIB_MAJOR_NAME?=$(LIBNAME).$(DYLIBSUFFIX).$(HIREDIS_MAJOR) - DYLIBNAME?=$(LIBNAME).$(DYLIBSUFFIX) - DYLIB_MAKE_CMD?=$(CC) -G -o $(DYLIBNAME) -h $(DYLIB_MINOR_NAME) - STLIBNAME?=$(LIBNAME).$(STLIBSUFFIX) - STLIB_MAKE_CMD?=ar rcs $(STLIBNAME) + REAL_LDFLAGS+= -ldl -lnsl -lsocket + DYLIB_MAKE_CMD=$(CC) -G -o $(DYLIBNAME) -h $(DYLIB_MINOR_NAME) $(LDFLAGS) INSTALL= cp -r -else +endif ifeq ($(uname_S),Darwin) - CFLAGS?=$(OPTIMIZATION) -fPIC -Wall -W -Wstrict-prototypes -Wwrite-strings $(ARCH) $(PROF) - CCLINK?=-lm -pthread - LDFLAGS?=-L. - OBJARCH?=-arch i386 -arch x86_64 DYLIBSUFFIX=dylib - STLIBSUFFIX=a - DYLIB_MINOR_NAME?=$(LIBNAME).$(HIREDIS_MAJOR).$(HIREDIS_MINOR).$(DYLIBSUFFIX) - DYLIB_MAJOR_NAME?=$(LIBNAME).$(HIREDIS_MAJOR).$(DYLIBSUFFIX) - DYLIBNAME?=$(LIBNAME).$(DYLIBSUFFIX) - DYLIB_MAKE_CMD?=libtool -dynamic -o $(DYLIBNAME) -install_name $(DYLIB_MINOR_NAME) -lm $(DEBUG) - - STLIBNAME?=$(LIBNAME).$(STLIBSUFFIX) - STLIB_MAKE_CMD?=libtool -static -o $(STLIBNAME) - - INSTALL= cp -a -else - CFLAGS?=$(OPTIMIZATION) -fPIC -Wall -W -Wstrict-prototypes -Wwrite-strings $(ARCH) $(PROF) - CCLINK?=-lm -pthread - LDFLAGS?=-L. - DYLIBSUFFIX=so - STLIBSUFFIX=a - DYLIB_MINOR_NAME?=$(LIBNAME).$(DYLIBSUFFIX).$(HIREDIS_MAJOR).$(HIREDIS_MINOR) - DYLIB_MAJOR_NAME?=$(LIBNAME).$(DYLIBSUFFIX).$(HIREDIS_MAJOR) - DYLIBNAME?=$(LIBNAME).$(DYLIBSUFFIX) - DYLIB_MAKE_CMD?=gcc -shared -Wl,-soname,$(DYLIB_MINOR_NAME) -o $(DYLIBNAME) - STLIBNAME?=$(LIBNAME).$(STLIBSUFFIX) - STLIB_MAKE_CMD?=ar rcs $(STLIBNAME) - INSTALL= cp -a -endif + DYLIB_MINOR_NAME=$(LIBNAME).$(HIREDIS_MAJOR).$(HIREDIS_MINOR).$(DYLIBSUFFIX) + DYLIB_MAJOR_NAME=$(LIBNAME).$(HIREDIS_MAJOR).$(DYLIBSUFFIX) + DYLIB_MAKE_CMD=$(CC) -shared -Wl,-install_name,$(DYLIB_MINOR_NAME) -o $(DYLIBNAME) $(LDFLAGS) endif -CCOPT= $(CFLAGS) $(CCLINK) -DEBUG?= -g -ggdb - -PREFIX?=/usr/local -INCLUDE_PATH?=include/hiredis -LIBRARY_PATH?=lib -INSTALL_INCLUDE_PATH= $(PREFIX)/$(INCLUDE_PATH) -INSTALL_LIBRARY_PATH= $(PREFIX)/$(LIBRARY_PATH) - all: $(DYLIBNAME) $(BINS) # Deps (use make dep to generate this) @@ -85,10 +62,10 @@ static: $(STLIBNAME) # Binaries: hiredis-example-libevent: example-libevent.c adapters/libevent.h $(STLIBNAME) - $(CC) -o $@ $(CCOPT) $(DEBUG) $(LDFLAGS) -levent example-libevent.c $(STLIBNAME) + $(CC) -o $@ $(REAL_CFLAGS) $(REAL_LDFLAGS) -levent example-libevent.c $(STLIBNAME) hiredis-example-libev: example-libev.c adapters/libev.h $(STLIBNAME) - $(CC) -o $@ $(CCOPT) $(DEBUG) $(LDFLAGS) -lev example-libev.c $(STLIBNAME) + $(CC) -o $@ $(REAL_CFLAGS) $(REAL_LDFLAGS) -lev example-libev.c $(STLIBNAME) ifndef AE_DIR hiredis-example-ae: @@ -96,11 +73,11 @@ hiredis-example-ae: @false else hiredis-example-ae: example-ae.c adapters/ae.h $(STLIBNAME) - $(CC) -o $@ $(CCOPT) $(DEBUG) -I$(AE_DIR) $(LDFLAGS) $(AE_DIR)/ae.o $(AE_DIR)/zmalloc.o example-ae.c $(STLIBNAME) + $(CC) -o $@ $(REAL_CFLAGS) $(REAL_LDFLAGS) -I$(AE_DIR) $(AE_DIR)/ae.o $(AE_DIR)/zmalloc.o example-ae.c $(STLIBNAME) endif hiredis-%: %.o $(STLIBNAME) - $(CC) -o $@ $(CCOPT) $(DEBUG) $(LDFLAGS) $< $(STLIBNAME) + $(CC) -o $@ $(REAL_LDFLAGS) $< $(STLIBNAME) test: hiredis-test ./hiredis-test @@ -118,7 +95,7 @@ check: hiredis-test kill `cat /tmp/hiredis-test-redis.pid` .c.o: - $(CC) -std=c99 -pedantic -c $(CFLAGS) $(OBJARCH) $(DEBUG) $(COMPILE_TIME) $< + $(CC) -std=c99 -pedantic -c $(REAL_CFLAGS) $< clean: rm -rf $(DYLIBNAME) $(STLIBNAME) $(BINS) hiredis-example* *.o *.gcda *.gcno *.gcov @@ -126,6 +103,19 @@ clean: dep: $(CC) -MM *.c +# Installation related variables and target +PREFIX?=/usr/local +INCLUDE_PATH?=include/hiredis +LIBRARY_PATH?=lib +INSTALL_INCLUDE_PATH= $(PREFIX)/$(INCLUDE_PATH) +INSTALL_LIBRARY_PATH= $(PREFIX)/$(LIBRARY_PATH) + +ifeq ($(uname_S),SunOS) + INSTALL?= cp -r +endif + +INSTALL?= cp -a + install: $(DYLIBNAME) $(STLIBNAME) mkdir -p $(INSTALL_INCLUDE_PATH) $(INSTALL_LIBRARY_PATH) $(INSTALL) hiredis.h async.h adapters $(INSTALL_INCLUDE_PATH) @@ -134,18 +124,25 @@ install: $(DYLIBNAME) $(STLIBNAME) cd $(INSTALL_LIBRARY_PATH) && ln -sf $(DYLIB_MAJOR_NAME) $(DYLIBNAME) $(INSTALL) $(STLIBNAME) $(INSTALL_LIBRARY_PATH) - 32bit: @echo "" - @echo "WARNING: if it fails under Linux you probably need to install libc6-dev-i386" + @echo "WARNING: if this fails under Linux you probably need to install libc6-dev-i386" @echo "" - $(MAKE) ARCH="-m32" + $(MAKE) CFLAGS="-m32" LDFLAGS="-m32" gprof: - $(MAKE) PROF="-pg" + $(MAKE) CFLAGS="-pg" LDFLAGS="-pg" gcov: - $(MAKE) PROF="-fprofile-arcs -ftest-coverage" + $(MAKE) CFLAGS="-fprofile-arcs -ftest-coverage" LDFLAGS="-fprofile-arcs" + +coverage: gcov + make check + mkdir -p tmp/lcov + lcov -d . -c -o tmp/lcov/hiredis.info + genhtml --legend -o tmp/lcov/report tmp/lcov/hiredis.info noopt: $(MAKE) OPTIMIZATION="" + +.PHONY: all test check clean dep install 32bit gprof gcov noopt diff --git a/adapters/ae.h b/adapters/ae.h index 85260a7..65235f8 100644 --- a/adapters/ae.h +++ b/adapters/ae.h @@ -1,3 +1,5 @@ +#ifndef __HIREDIS_AE_H__ +#define __HIREDIS_AE_H__ #include <sys/types.h> #include <ae.h> #include "../hiredis.h" @@ -10,21 +12,21 @@ typedef struct redisAeEvents { int reading, writing; } redisAeEvents; -void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) { +static void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) { ((void)el); ((void)fd); ((void)mask); redisAeEvents *e = (redisAeEvents*)privdata; redisAsyncHandleRead(e->context); } -void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) { +static void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) { ((void)el); ((void)fd); ((void)mask); redisAeEvents *e = (redisAeEvents*)privdata; redisAsyncHandleWrite(e->context); } -void redisAeAddRead(void *privdata) { +static void redisAeAddRead(void *privdata) { redisAeEvents *e = (redisAeEvents*)privdata; aeEventLoop *loop = e->loop; if (!e->reading) { @@ -33,7 +35,7 @@ void redisAeAddRead(void *privdata) { } } -void redisAeDelRead(void *privdata) { +static void redisAeDelRead(void *privdata) { redisAeEvents *e = (redisAeEvents*)privdata; aeEventLoop *loop = e->loop; if (e->reading) { @@ -42,7 +44,7 @@ void redisAeDelRead(void *privdata) { } } -void redisAeAddWrite(void *privdata) { +static void redisAeAddWrite(void *privdata) { redisAeEvents *e = (redisAeEvents*)privdata; aeEventLoop *loop = e->loop; if (!e->writing) { @@ -51,7 +53,7 @@ void redisAeAddWrite(void *privdata) { } } -void redisAeDelWrite(void *privdata) { +static void redisAeDelWrite(void *privdata) { redisAeEvents *e = (redisAeEvents*)privdata; aeEventLoop *loop = e->loop; if (e->writing) { @@ -60,14 +62,14 @@ void redisAeDelWrite(void *privdata) { } } -void redisAeCleanup(void *privdata) { +static void redisAeCleanup(void *privdata) { redisAeEvents *e = (redisAeEvents*)privdata; redisAeDelRead(privdata); redisAeDelWrite(privdata); free(e); } -int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) { +static int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) { redisContext *c = &(ac->c); redisAeEvents *e; @@ -92,4 +94,4 @@ int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) { return REDIS_OK; } - +#endif diff --git a/adapters/libev.h b/adapters/libev.h index 7b2b6af..534d743 100644 --- a/adapters/libev.h +++ b/adapters/libev.h @@ -1,3 +1,6 @@ +#ifndef __HIREDIS_LIBEV_H__ +#define __HIREDIS_LIBEV_H__ +#include <stdlib.h> #include <sys/types.h> #include <ev.h> #include "../hiredis.h" @@ -10,7 +13,7 @@ typedef struct redisLibevEvents { ev_io rev, wev; } redisLibevEvents; -void redisLibevReadEvent(EV_P_ ev_io *watcher, int revents) { +static void redisLibevReadEvent(EV_P_ ev_io *watcher, int revents) { #if EV_MULTIPLICITY ((void)loop); #endif @@ -20,7 +23,7 @@ void redisLibevReadEvent(EV_P_ ev_io *watcher, int revents) { redisAsyncHandleRead(e->context); } -void redisLibevWriteEvent(EV_P_ ev_io *watcher, int revents) { +static void redisLibevWriteEvent(EV_P_ ev_io *watcher, int revents) { #if EV_MULTIPLICITY ((void)loop); #endif @@ -30,7 +33,7 @@ void redisLibevWriteEvent(EV_P_ ev_io *watcher, int revents) { redisAsyncHandleWrite(e->context); } -void redisLibevAddRead(void *privdata) { +static void redisLibevAddRead(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; struct ev_loop *loop = e->loop; ((void)loop); @@ -40,7 +43,7 @@ void redisLibevAddRead(void *privdata) { } } -void redisLibevDelRead(void *privdata) { +static void redisLibevDelRead(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; struct ev_loop *loop = e->loop; ((void)loop); @@ -50,7 +53,7 @@ void redisLibevDelRead(void *privdata) { } } -void redisLibevAddWrite(void *privdata) { +static void redisLibevAddWrite(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; struct ev_loop *loop = e->loop; ((void)loop); @@ -60,7 +63,7 @@ void redisLibevAddWrite(void *privdata) { } } -void redisLibevDelWrite(void *privdata) { +static void redisLibevDelWrite(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; struct ev_loop *loop = e->loop; ((void)loop); @@ -70,14 +73,14 @@ void redisLibevDelWrite(void *privdata) { } } -void redisLibevCleanup(void *privdata) { +static void redisLibevCleanup(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; redisLibevDelRead(privdata); redisLibevDelWrite(privdata); free(e); } -int redisLibevAttach(EV_P_ redisAsyncContext *ac) { +static int redisLibevAttach(EV_P_ redisAsyncContext *ac) { redisContext *c = &(ac->c); redisLibevEvents *e; @@ -111,3 +114,4 @@ int redisLibevAttach(EV_P_ redisAsyncContext *ac) { return REDIS_OK; } +#endif diff --git a/adapters/libevent.h b/adapters/libevent.h index 2c18480..4055ec0 100644 --- a/adapters/libevent.h +++ b/adapters/libevent.h @@ -1,3 +1,5 @@ +#ifndef __HIREDIS_LIBEVENT_H__ +#define __HIREDIS_LIBEVENT_H__ #include <event.h> #include "../hiredis.h" #include "../async.h" @@ -7,46 +9,46 @@ typedef struct redisLibeventEvents { struct event rev, wev; } redisLibeventEvents; -void redisLibeventReadEvent(int fd, short event, void *arg) { +static void redisLibeventReadEvent(int fd, short event, void *arg) { ((void)fd); ((void)event); redisLibeventEvents *e = (redisLibeventEvents*)arg; redisAsyncHandleRead(e->context); } -void redisLibeventWriteEvent(int fd, short event, void *arg) { +static void redisLibeventWriteEvent(int fd, short event, void *arg) { ((void)fd); ((void)event); redisLibeventEvents *e = (redisLibeventEvents*)arg; redisAsyncHandleWrite(e->context); } -void redisLibeventAddRead(void *privdata) { +static void redisLibeventAddRead(void *privdata) { redisLibeventEvents *e = (redisLibeventEvents*)privdata; event_add(&e->rev,NULL); } -void redisLibeventDelRead(void *privdata) { +static void redisLibeventDelRead(void *privdata) { redisLibeventEvents *e = (redisLibeventEvents*)privdata; event_del(&e->rev); } -void redisLibeventAddWrite(void *privdata) { +static void redisLibeventAddWrite(void *privdata) { redisLibeventEvents *e = (redisLibeventEvents*)privdata; event_add(&e->wev,NULL); } -void redisLibeventDelWrite(void *privdata) { +static void redisLibeventDelWrite(void *privdata) { redisLibeventEvents *e = (redisLibeventEvents*)privdata; event_del(&e->wev); } -void redisLibeventCleanup(void *privdata) { +static void redisLibeventCleanup(void *privdata) { redisLibeventEvents *e = (redisLibeventEvents*)privdata; event_del(&e->rev); event_del(&e->wev); free(e); } -int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { +static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { redisContext *c = &(ac->c); redisLibeventEvents *e; @@ -73,3 +75,4 @@ int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) { event_base_set(base,&e->wev); return REDIS_OK; } +#endif @@ -29,14 +29,34 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include "fmacros.h" +#include <stdlib.h> #include <string.h> #include <strings.h> #include <assert.h> #include <ctype.h> +#include <errno.h> #include "async.h" +#include "net.h" #include "dict.c" #include "sds.h" +#define _EL_ADD_READ(ctx) do { \ + if ((ctx)->ev.addRead) (ctx)->ev.addRead((ctx)->ev.data); \ + } while(0) +#define _EL_DEL_READ(ctx) do { \ + if ((ctx)->ev.delRead) (ctx)->ev.delRead((ctx)->ev.data); \ + } while(0) +#define _EL_ADD_WRITE(ctx) do { \ + if ((ctx)->ev.addWrite) (ctx)->ev.addWrite((ctx)->ev.data); \ + } while(0) +#define _EL_DEL_WRITE(ctx) do { \ + if ((ctx)->ev.delWrite) (ctx)->ev.delWrite((ctx)->ev.data); \ + } while(0) +#define _EL_CLEANUP(ctx) do { \ + if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \ + } while(0); + /* Forward declaration of function in hiredis.c */ void __redisAppendCommand(redisContext *c, char *cmd, size_t len); @@ -142,7 +162,7 @@ int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn /* The common way to detect an established connection is to wait for * the first write event to be fired. This assumes the related event * library functions are already set. */ - if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data); + _EL_ADD_WRITE(ac); return REDIS_OK; } return REDIS_ERR; @@ -230,7 +250,7 @@ static void __redisAsyncFree(redisAsyncContext *ac) { dictRelease(ac->sub.patterns); /* Signal event lib to clean up */ - if (ac->ev.cleanup) ac->ev.cleanup(ac->ev.data); + _EL_CLEANUP(ac); /* Execute disconnect callback. When redisAsyncFree() initiated destroying * this context, the status will always be REDIS_OK. */ @@ -402,17 +422,48 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); } +/* Internal helper function to detect socket status the first time a read or + * write event fires. When connecting was not succesful, the connect callback + * is called with a REDIS_ERR status and the context is free'd. */ +static int __redisAsyncHandleConnect(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + + if (redisCheckSocketError(c,c->fd) == REDIS_ERR) { + /* Try again later when connect(2) is still in progress. */ + if (errno == EINPROGRESS) + return REDIS_OK; + + if (ac->onConnect) ac->onConnect(ac,REDIS_ERR); + __redisAsyncDisconnect(ac); + return REDIS_ERR; + } + + /* Mark context as connected. */ + c->flags |= REDIS_CONNECTED; + if (ac->onConnect) ac->onConnect(ac,REDIS_OK); + return REDIS_OK; +} + /* This function should be called when the socket is readable. * It processes all replies that can be read and executes their callbacks. */ void redisAsyncHandleRead(redisAsyncContext *ac) { redisContext *c = &(ac->c); + if (!(c->flags & REDIS_CONNECTED)) { + /* Abort connect was not successful. */ + if (__redisAsyncHandleConnect(ac) != REDIS_OK) + return; + /* Try again later when the context is still not connected. */ + if (!(c->flags & REDIS_CONNECTED)) + return; + } + if (redisBufferRead(c) == REDIS_ERR) { __redisAsyncDisconnect(ac); } else { /* Always re-schedule reads */ - if (ac->ev.addRead) ac->ev.addRead(ac->ev.data); + _EL_ADD_READ(ac); redisProcessCallbacks(ac); } } @@ -421,24 +472,26 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { redisContext *c = &(ac->c); int done = 0; + if (!(c->flags & REDIS_CONNECTED)) { + /* Abort connect was not successful. */ + if (__redisAsyncHandleConnect(ac) != REDIS_OK) + return; + /* Try again later when the context is still not connected. */ + if (!(c->flags & REDIS_CONNECTED)) + return; + } + if (redisBufferWrite(c,&done) == REDIS_ERR) { __redisAsyncDisconnect(ac); } else { /* Continue writing when not done, stop writing otherwise */ - if (!done) { - if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data); - } else { - if (ac->ev.delWrite) ac->ev.delWrite(ac->ev.data); - } + if (!done) + _EL_ADD_WRITE(ac); + else + _EL_DEL_WRITE(ac); /* Always schedule reads after writes */ - if (ac->ev.addRead) ac->ev.addRead(ac->ev.data); - - /* Fire onConnect when this is the first write event. */ - if (!(c->flags & REDIS_CONNECTED)) { - c->flags |= REDIS_CONNECTED; - if (ac->onConnect) ac->onConnect(ac); - } + _EL_ADD_READ(ac); } } @@ -516,7 +569,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void __redisAppendCommand(c,cmd,len); /* Always schedule a write when the write buffer is non-empty */ - if (ac->ev.addWrite) ac->ev.addWrite(ac->ev.data); + _EL_ADD_WRITE(ac); return REDIS_OK; } @@ -55,7 +55,7 @@ typedef struct redisCallbackList { /* Connection callback prototypes */ typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status); -typedef void (redisConnectCallback)(const struct redisAsyncContext*); +typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status); /* Context for an async connection to Redis */ typedef struct redisAsyncContext { diff --git a/example-ae.c b/example-ae.c index 28c34dc..5ed34a3 100644 --- a/example-ae.c +++ b/example-ae.c @@ -18,17 +18,20 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisAsyncDisconnect(c); } -void connectCallback(const redisAsyncContext *c) { - ((void)c); - printf("connected...\n"); +void connectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + printf("Connected...\n"); } void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); + return; } - printf("disconnected...\n"); - aeStop(loop); + printf("Disconnected...\n"); } int main (int argc, char **argv) { diff --git a/example-libev.c b/example-libev.c index 8efa1e3..7894f1f 100644 --- a/example-libev.c +++ b/example-libev.c @@ -15,16 +15,20 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisAsyncDisconnect(c); } -void connectCallback(const redisAsyncContext *c) { - ((void)c); - printf("connected...\n"); +void connectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + printf("Connected...\n"); } void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); + return; } - printf("disconnected...\n"); + printf("Disconnected...\n"); } int main (int argc, char **argv) { diff --git a/example-libevent.c b/example-libevent.c index f6f8c83..9da8e02 100644 --- a/example-libevent.c +++ b/example-libevent.c @@ -15,16 +15,20 @@ void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisAsyncDisconnect(c); } -void connectCallback(const redisAsyncContext *c) { - ((void)c); - printf("connected...\n"); +void connectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + printf("Connected...\n"); } void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { printf("Error: %s\n", c->errstr); + return; } - printf("disconnected...\n"); + printf("Disconnected...\n"); } int main (int argc, char **argv) { @@ -1,12 +1,14 @@ #ifndef __HIREDIS_FMACRO_H #define __HIREDIS_FMACRO_H -#ifndef _BSD_SOURCE +#if !defined(_BSD_SOURCE) #define _BSD_SOURCE #endif -#ifdef __linux__ -#define _XOPEN_SOURCE 700 +#if defined(__sun__) +#define _POSIX_C_SOURCE 200112L +#elif defined(__linux__) +#define _XOPEN_SOURCE 600 #else #define _XOPEN_SOURCE #endif @@ -358,12 +358,7 @@ static int processLineItem(redisReader *r) { char *p; int len; - cur->poff = (r->pos-r->roff)-1; - cur->coff = cur->poff+1; if ((p = readLine(r,&len)) != NULL) { - cur->plen = 1+len+2; /* include \r\n */ - cur->clen = len; - if (cur->type == REDIS_REPLY_INTEGER) { if (r->fn && r->fn->createInteger) obj = r->fn->createInteger(cur,readLongLong(p)); @@ -402,13 +397,10 @@ static int processBulkItem(redisReader *r) { p = r->buf+r->pos; s = seekNewline(p,r->len-r->pos); if (s != NULL) { + p = r->buf+r->pos; bytelen = s-(r->buf+r->pos)+2; /* include \r\n */ - cur->poff = (r->pos-r->roff)-1; - cur->plen = bytelen+1; - cur->coff = cur->poff+1+bytelen; - cur->clen = 0; - len = readLongLong(p); + if (len < 0) { /* The nil object can always be created. */ if (r->fn && r->fn->createNil) @@ -420,8 +412,6 @@ static int processBulkItem(redisReader *r) { /* Only continue when the buffer contains the entire bulk item. */ bytelen += len+2; /* include \r\n */ if (r->pos+bytelen <= r->len) { - cur->plen += len+2; - cur->clen = len; if (r->fn && r->fn->createString) obj = r->fn->createString(cur,s+2,len); else @@ -463,12 +453,7 @@ static int processMultiBulkItem(redisReader *r) { return REDIS_ERR; } - cur->poff = (r->pos-r->roff)-1; - cur->coff = 0; if ((p = readLine(r,NULL)) != NULL) { - cur->plen = (r->pos-r->roff)-cur->poff; /* includes \r\n */ - cur->clen = 0; - elements = readLongLong(p); root = (r->ridx == 0); @@ -605,7 +590,7 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) { /* Copy the provided buffer. */ if (buf != NULL && len >= 1) { - /* Destroy buffer when it is empty and is quite large. */ + /* Destroy internal buffer when it is empty and is quite large. */ if (r->len == 0 && sdsavail(r->buf) > 16*1024) { sdsfree(r->buf); r->buf = sdsempty(); @@ -615,15 +600,6 @@ int redisReaderFeed(redisReader *r, const char *buf, size_t len) { assert(r->buf != NULL); } - /* Discard consumed part of the buffer when the offset for the reply - * that is currently being read is high enough. */ - if (r->roff >= 1024) { - r->buf = sdsrange(r->buf,r->roff,-1); - r->pos -= r->roff; - r->roff = 0; - r->len = sdslen(r->buf); - } - newbuf = sdscatlen(r->buf,buf,len); if (newbuf == NULL) { __redisReaderSetErrorOOM(r); @@ -659,7 +635,6 @@ int redisReaderGetReply(redisReader *r, void **reply) { r->rstack[0].parent = NULL; r->rstack[0].privdata = r->privdata; r->ridx = 0; - r->roff = r->pos; /* Start offset in buffer. */ } /* Process items in reply. */ @@ -671,6 +646,14 @@ int redisReaderGetReply(redisReader *r, void **reply) { if (r->err) return REDIS_ERR; + /* Discard part of the buffer when we've consumed at least 1k, to avoid + * doing unnecessary calls to memmove() in sds.c. */ + if (r->pos >= 1024) { + r->buf = sdsrange(r->buf,r->pos,-1); + r->pos = 0; + r->len = sdslen(r->buf); + } + /* Emit a reply when there is one. */ if (r->ridx == -1) { if (reply != NULL) @@ -680,17 +663,6 @@ int redisReaderGetReply(redisReader *r, void **reply) { return REDIS_OK; } -const char *redisReaderGetRaw(redisReader *r, size_t *len) { - /* ridx == -1: No or a full reply has been read. */ - /* pos > roff: Buffer position is larger than start offset, meaning - * the buffer has not yet been truncated. */ - if (r->ridx == -1 && r->pos > r->roff) { - if (len) *len = (r->pos-r->roff); - return r->buf+r->roff; - } - return NULL; -} - /* Calculate the number of bytes needed to represent an integer as string. */ static int intlen(int i) { int len = 0; @@ -798,33 +770,79 @@ int redisvFormatCommand(char **target, const char *format, va_list ap) { while (*_p != '\0' && isdigit(*_p)) _p++; } - /* Modifiers */ - if (*_p != '\0') { - if (*_p == 'h' || *_p == 'l') { - /* Allow a single repetition for these modifiers */ - if (_p[0] == _p[1]) _p++; - _p++; + /* Copy va_list before consuming with va_arg */ + va_copy(_cpy,ap); + + /* Integer conversion (without modifiers) */ + if (strchr("diouxX",*_p) != NULL) { + va_arg(ap,int); + goto fmt_valid; + } + + /* Double conversion (without modifiers) */ + if (strchr("eEfFgGaA",*_p) != NULL) { + va_arg(ap,double); + goto fmt_valid; + } + + /* Size: char */ + if (_p[0] == 'h' && _p[1] == 'h') { + _p += 2; + if (*_p != '\0' && strchr("diouxX",*_p) != NULL) { + va_arg(ap,int); /* char gets promoted to int */ + goto fmt_valid; + } + goto fmt_invalid; + } + + /* Size: short */ + if (_p[0] == 'h') { + _p += 1; + if (*_p != '\0' && strchr("diouxX",*_p) != NULL) { + va_arg(ap,int); /* short gets promoted to int */ + goto fmt_valid; } + goto fmt_invalid; } - /* Conversion specifier */ - if (*_p != '\0' && strchr("diouxXeEfFgGaA",*_p) != NULL) { - _l = (_p+1)-c; - if (_l < sizeof(_format)-2) { - memcpy(_format,c,_l); - _format[_l] = '\0'; - va_copy(_cpy,ap); - newarg = sdscatvprintf(curarg,_format,_cpy); - va_end(_cpy); - - /* Update current position (note: outer blocks - * increment c twice so compensate here) */ - c = _p-1; + /* Size: long long */ + if (_p[0] == 'l' && _p[1] == 'l') { + _p += 2; + if (*_p != '\0' && strchr("diouxX",*_p) != NULL) { + va_arg(ap,long long); + goto fmt_valid; } + goto fmt_invalid; + } + + /* Size: long */ + if (_p[0] == 'l') { + _p += 1; + if (*_p != '\0' && strchr("diouxX",*_p) != NULL) { + va_arg(ap,long); + goto fmt_valid; + } + goto fmt_invalid; + } + + fmt_invalid: + va_end(_cpy); + goto err; + + fmt_valid: + _l = (_p+1)-c; + if (_l < sizeof(_format)-2) { + memcpy(_format,c,_l); + _format[_l] = '\0'; + newarg = sdscatvprintf(curarg,_format,_cpy); + + /* Update current position (note: outer blocks + * increment c twice so compensate here) */ + c = _p-1; } - /* Consume and discard vararg */ - va_arg(ap,void); + va_end(_cpy); + break; } } @@ -37,7 +37,7 @@ #define HIREDIS_MAJOR 0 #define HIREDIS_MINOR 10 -#define HIREDIS_PATCH 0 +#define HIREDIS_PATCH 1 #define REDIS_ERR -1 #define REDIS_OK 0 @@ -98,11 +98,6 @@ typedef struct redisReply { } redisReply; typedef struct redisReadTask { - size_t poff; /* Protocol offset */ - size_t plen; /* Protocol length */ - size_t coff; /* Content offset */ - size_t clen; /* Content length */ - int type; int elements; /* number of elements in multibulk container */ int idx; /* index in parent (array) object */ @@ -127,7 +122,6 @@ typedef struct redisReader { char *buf; /* Read buffer */ size_t pos; /* Buffer cursor */ size_t len; /* Buffer length */ - size_t roff; /* Reply offset */ redisReadTask rstack[3]; int ridx; /* Index of current read task */ @@ -142,7 +136,6 @@ redisReader *redisReaderCreate(void); void redisReaderFree(redisReader *r); int redisReaderFeed(redisReader *r, const char *buf, size_t len); int redisReaderGetReply(redisReader *r, void **reply); -const char *redisReaderGetRaw(redisReader *r, size_t *len); /* Backwards compatibility, can be removed on big version bump. */ #define redisReplyReaderCreate redisReaderCreate @@ -125,8 +125,6 @@ static int redisContextWaitReady(redisContext *c, int fd, const struct timeval * struct timeval to; struct timeval *toptr = NULL; fd_set wfd; - int err; - socklen_t errlen; /* Only use timeout when not NULL. */ if (timeout != NULL) { @@ -151,20 +149,8 @@ static int redisContextWaitReady(redisContext *c, int fd, const struct timeval * return REDIS_ERR; } - err = 0; - errlen = sizeof(err); - if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) { - __redisSetErrorFromErrno(c,REDIS_ERR_IO,"getsockopt(SO_ERROR)"); - close(fd); - return REDIS_ERR; - } - - if (err) { - errno = err; - __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL); - close(fd); + if (redisCheckSocketError(c, fd) != REDIS_OK) return REDIS_ERR; - } return REDIS_OK; } @@ -174,6 +160,26 @@ static int redisContextWaitReady(redisContext *c, int fd, const struct timeval * return REDIS_ERR; } +int redisCheckSocketError(redisContext *c, int fd) { + int err = 0; + socklen_t errlen = sizeof(err); + + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen) == -1) { + __redisSetErrorFromErrno(c,REDIS_ERR_IO,"getsockopt(SO_ERROR)"); + close(fd); + return REDIS_ERR; + } + + if (err) { + errno = err; + __redisSetErrorFromErrno(c,REDIS_ERR_IO,NULL); + close(fd); + return REDIS_ERR; + } + + return REDIS_OK; +} + int redisContextSetTimeout(redisContext *c, struct timeval tv) { if (setsockopt(c->fd,SOL_SOCKET,SO_RCVTIMEO,&tv,sizeof(tv)) == -1) { __redisSetErrorFromErrno(c,REDIS_ERR_IO,"setsockopt(SO_RCVTIMEO)"); @@ -39,6 +39,7 @@ #define AF_LOCAL AF_UNIX #endif +int redisCheckSocketError(redisContext *c, int fd); int redisContextSetTimeout(redisContext *c, struct timeval tv); int redisContextConnectTcp(redisContext *c, const char *addr, int port, struct timeval *timeout); int redisContextConnectUnix(redisContext *c, const char *path, struct timeval *timeout); @@ -1,3 +1,4 @@ +#include "fmacros.h" #include <stdio.h> #include <stdlib.h> #include <string.h> @@ -31,7 +32,7 @@ struct config { /* The following lines make up our testing "framework" :) */ static int tests = 0, fails = 0; #define test(_s) { printf("#%02d ", ++tests); printf(_s); } -#define test_cond(_c) if(_c) printf("PASSED\n"); else {printf("FAILED\n"); fails++;} +#define test_cond(_c) if(_c) printf("\033[0;32mPASSED\033[0;0m\n"); else {printf("\033[0;31mFAILED\033[0;0m\n"); fails++;} static long long usec(void) { struct timeval tv; @@ -77,7 +78,7 @@ static void disconnect(redisContext *c) { } static redisContext *connect(struct config config) { - redisContext *c; + redisContext *c = NULL; if (config.type == CONN_TCP) { c = redisConnect(config.tcp.host, config.tcp.port); @@ -141,29 +142,43 @@ static void test_format_commands(void) { len == 4+4+(3+2)+4+(1+2)+4+(1+2)); free(cmd); - test("Format command with printf-delegation (long long): "); - len = redisFormatCommand(&cmd,"key:%08lld",1234ll); - test_cond(strncmp(cmd,"*1\r\n$12\r\nkey:00001234\r\n",len) == 0 && - len == 4+5+(12+2)); - free(cmd); - - test("Format command with printf-delegation (float): "); - len = redisFormatCommand(&cmd,"v:%06.1f",12.34f); - test_cond(strncmp(cmd,"*1\r\n$8\r\nv:0012.3\r\n",len) == 0 && - len == 4+4+(8+2)); - free(cmd); - - test("Format command with printf-delegation and extra interpolation: "); - len = redisFormatCommand(&cmd,"key:%d %b",1234,"foo",3); - test_cond(strncmp(cmd,"*2\r\n$8\r\nkey:1234\r\n$3\r\nfoo\r\n",len) == 0 && - len == 4+4+(8+2)+4+(3+2)); - free(cmd); - - test("Format command with wrong printf format and extra interpolation: "); + /* Vararg width depends on the type. These tests make sure that the + * width is correctly determined using the format and subsequent varargs + * can correctly be interpolated. */ +#define INTEGER_WIDTH_TEST(fmt, type) do { \ + type value = 123; \ + test("Format command with printf-delegation (" #type "): "); \ + len = redisFormatCommand(&cmd,"key:%08" fmt " str:%s", value, "hello"); \ + test_cond(strncmp(cmd,"*2\r\n$12\r\nkey:00000123\r\n$9\r\nstr:hello\r\n",len) == 0 && \ + len == 4+5+(12+2)+4+(9+2)); \ + free(cmd); \ +} while(0) + +#define FLOAT_WIDTH_TEST(type) do { \ + type value = 123.0; \ + test("Format command with printf-delegation (" #type "): "); \ + len = redisFormatCommand(&cmd,"key:%08.3f str:%s", value, "hello"); \ + test_cond(strncmp(cmd,"*2\r\n$12\r\nkey:0123.000\r\n$9\r\nstr:hello\r\n",len) == 0 && \ + len == 4+5+(12+2)+4+(9+2)); \ + free(cmd); \ +} while(0) + + INTEGER_WIDTH_TEST("d", int); + INTEGER_WIDTH_TEST("hhd", char); + INTEGER_WIDTH_TEST("hd", short); + INTEGER_WIDTH_TEST("ld", long); + INTEGER_WIDTH_TEST("lld", long long); + INTEGER_WIDTH_TEST("u", unsigned int); + INTEGER_WIDTH_TEST("hhu", unsigned char); + INTEGER_WIDTH_TEST("hu", unsigned short); + INTEGER_WIDTH_TEST("lu", unsigned long); + INTEGER_WIDTH_TEST("llu", unsigned long long); + FLOAT_WIDTH_TEST(float); + FLOAT_WIDTH_TEST(double); + + test("Format command with invalid printf format: "); len = redisFormatCommand(&cmd,"key:%08p %b",1234,"foo",3); - test_cond(strncmp(cmd,"*2\r\n$6\r\nkey:8p\r\n$3\r\nfoo\r\n",len) == 0 && - len == 4+4+(6+2)+4+(3+2)); - free(cmd); + test_cond(len == -1); const char *argv[3]; argv[0] = "SET"; @@ -261,98 +276,6 @@ static void test_reply_reader(void) { redisReaderFree(reader); } -static void *test_create_string(const redisReadTask *task, char *str, size_t len) { - redisReader *r = (redisReader*)task->privdata; - const char *roff = r->buf+r->roff; - ((void)str); ((void)len); - - assert(task->plen > 0); - assert(task->clen > 0); - switch(task->type) { - case REDIS_REPLY_STATUS: - assert(strncmp("+status\r\n", roff+task->poff, task->plen) == 0); - assert(strncmp("status", roff+task->coff, task->clen) == 0); - break; - case REDIS_REPLY_ERROR: - assert(strncmp("-error\r\n", roff+task->poff, task->plen) == 0); - assert(strncmp("error", roff+task->coff, task->clen) == 0); - break; - case REDIS_REPLY_STRING: /* bulk */ - assert(strncmp("$4\r\nbulk\r\n", roff+task->poff, task->plen) == 0); - assert(strncmp("bulk", roff+task->coff, task->clen) == 0); - break; - default: - assert(NULL); - } - return (void*)1; -} - -static void *test_create_array(const redisReadTask *task, int len) { - redisReader *r = (redisReader*)task->privdata; - const char *roff = r->buf+r->roff; - ((void)len); - - assert(task->plen > 0); - assert(task->clen == 0); - assert(strncmp("*5\r\n", roff+task->poff, task->plen) == 0); - return (void*)1; -} - -static void *test_create_integer(const redisReadTask *task, long long value) { - redisReader *r = (redisReader*)task->privdata; - const char *roff = r->buf+r->roff; - ((void)value); - - assert(task->plen > 0); - assert(task->clen > 0); - assert(strncmp(":1234\r\n", roff+task->poff, task->plen) == 0); - assert(strncmp("1234", roff+task->coff, task->clen) == 0); - return (void*)1; -} - -static void *test_create_nil(const redisReadTask *task) { - redisReader *r = (redisReader*)task->privdata; - const char *roff = r->buf+r->roff; - - assert(task->plen > 0); - assert(task->clen == 0); - assert(strncmp("$-1\r\n", roff+task->poff, task->plen) == 0); - return (void*)1; -} - -static redisReplyObjectFunctions test_reader_fn = { - test_create_string, - test_create_array, - test_create_integer, - test_create_nil, - NULL -}; - -static void test_reader_functions(void) { - redisReader *reader; - const char *input; - int ret; - void *obj; - - input = - "*5\r\n" - "$-1\r\n" - ":1234\r\n" - "+status\r\n" - "-error\r\n" - "$4\r\nbulk\r\n"; - - test("Custom object functions in reply reader: "); - reader = redisReaderCreate(); - reader->fn = &test_reader_fn; - reader->privdata = reader; - - redisReaderFeed(reader,input,strlen(input)); - ret = redisReaderGetReply(reader,&obj); - test_cond(ret == REDIS_OK && obj == (void*)1); - redisReaderFree(reader); -} - static void test_blocking_connection_errors(void) { redisContext *c; @@ -455,6 +378,7 @@ static void test_blocking_connection(struct config config) { static void test_blocking_io_errors(struct config config) { redisContext *c; redisReply *reply; + void *_reply; int major, minor; /* Connect to target given by config. */ @@ -478,7 +402,7 @@ static void test_blocking_io_errors(struct config config) { /* > 2.0 returns OK on QUIT and read() should be issued once more * to know the descriptor is at EOF. */ test_cond(strcasecmp(reply->str,"OK") == 0 && - redisGetReply(c,(void**)&reply) == REDIS_ERR); + redisGetReply(c,&_reply) == REDIS_ERR); freeReplyObject(reply); } else { test_cond(reply == NULL); @@ -497,7 +421,7 @@ static void test_blocking_io_errors(struct config config) { test("Returns I/O error on socket timeout: "); struct timeval tv = { 0, 1000 }; assert(redisSetTimeout(c,tv) == REDIS_OK); - test_cond(redisGetReply(c,(void**)&reply) == REDIS_ERR && + test_cond(redisGetReply(c,&_reply) == REDIS_ERR && c->err == REDIS_ERR_IO && errno == EAGAIN); redisFree(c); } @@ -704,7 +628,6 @@ int main(int argc, char **argv) { test_format_commands(); test_reply_reader(); - test_reader_functions(); test_blocking_connection_errors(); printf("\nTesting against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port); |