diff options
-rw-r--r-- | .github/release-drafter-config.yml | 49 | ||||
-rw-r--r-- | .github/workflows/build.yml | 208 | ||||
-rw-r--r-- | .github/workflows/release-drafter.yml | 19 | ||||
-rw-r--r-- | CHANGELOG.md | 19 | ||||
-rw-r--r-- | CMakeLists.txt | 42 | ||||
-rw-r--r-- | Makefile | 113 | ||||
-rw-r--r-- | README.md | 128 | ||||
-rw-r--r-- | adapters/libev.h | 21 | ||||
-rw-r--r-- | adapters/libevent.h | 2 | ||||
-rw-r--r-- | adapters/libuv.h | 162 | ||||
-rw-r--r-- | adapters/poll.h | 197 | ||||
-rw-r--r-- | alloc.c | 4 | ||||
-rw-r--r-- | alloc.h | 5 | ||||
-rw-r--r-- | async.c | 337 | ||||
-rw-r--r-- | async.h | 7 | ||||
-rw-r--r-- | async_private.h | 2 | ||||
-rw-r--r-- | dict.c | 11 | ||||
-rw-r--r-- | dict.h | 3 | ||||
-rw-r--r-- | examples/CMakeLists.txt | 2 | ||||
-rw-r--r-- | examples/example-libevent-ssl.c | 2 | ||||
-rw-r--r-- | examples/example-libuv.c | 37 | ||||
-rw-r--r-- | examples/example-poll.c | 62 | ||||
-rw-r--r-- | examples/example-ssl.c | 7 | ||||
-rw-r--r-- | fmacros.h | 2 | ||||
-rw-r--r-- | fuzzing/format_command_fuzzer.c | 57 | ||||
-rw-r--r-- | hiredis.c | 80 | ||||
-rw-r--r-- | hiredis.h | 22 | ||||
-rw-r--r-- | hiredis_ssl.h | 33 | ||||
-rw-r--r-- | net.c | 20 | ||||
-rw-r--r-- | read.c | 150 | ||||
-rw-r--r-- | sds.c | 15 | ||||
-rw-r--r-- | sockcompat.c | 19 | ||||
-rw-r--r-- | ssl.c | 28 | ||||
-rw-r--r-- | test.c | 911 | ||||
-rwxr-xr-x | test.sh | 9 |
35 files changed, 2419 insertions, 366 deletions
diff --git a/.github/release-drafter-config.yml b/.github/release-drafter-config.yml new file mode 100644 index 0000000..de4fea7 --- /dev/null +++ b/.github/release-drafter-config.yml @@ -0,0 +1,49 @@ +name-template: '$NEXT_MAJOR_VERSION' +tag-template: 'v$NEXT_MAJOR_VERSION' +autolabeler: + - label: 'maintenance' + files: + - '*.md' + - '.github/*' + - label: 'bug' + branch: + - '/bug-.+' + - label: 'maintenance' + branch: + - '/maintenance-.+' + - label: 'feature' + branch: + - '/feature-.+' +categories: + - title: 'Breaking Changes' + labels: + - 'breakingchange' + + - title: '๐งช Experimental Features' + labels: + - 'experimental' + - title: '๐ New Features' + labels: + - 'feature' + - 'enhancement' + - title: '๐ Bug Fixes' + labels: + - 'fix' + - 'bugfix' + - 'bug' + - 'BUG' + - title: '๐งฐ Maintenance' + label: 'maintenance' +change-template: '- $TITLE (#$NUMBER)' +exclude-labels: + - 'skip-changelog' +template: | + ## Changes + + $CHANGES + + ## Contributors + We'd like to thank all the contributors who worked on this release! + + $CONTRIBUTORS + diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..c1e4f55 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,208 @@ +name: Build and test +on: [push, pull_request] + +jobs: + ubuntu: + name: Ubuntu + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v2 + with: + repository: ${{ env.GITHUB_REPOSITORY }} + ref: ${{ env.GITHUB_HEAD_REF }} + + - name: Install dependencies + run: | + curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg + echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list + sudo apt-get update + sudo apt-get install -y redis-server valgrind libevent-dev + + - name: Build using cmake + env: + EXTRA_CMAKE_OPTS: -DENABLE_EXAMPLES:BOOL=ON -DENABLE_SSL:BOOL=ON -DENABLE_SSL_TESTS:BOOL=ON -DENABLE_ASYNC_TESTS:BOOL=ON + CFLAGS: -Werror + CXXFLAGS: -Werror + run: mkdir build-ubuntu && cd build-ubuntu && cmake .. + + - name: Build using makefile + run: USE_SSL=1 TEST_ASYNC=1 make + + - name: Run tests + env: + SKIPS_AS_FAILS: 1 + TEST_SSL: 1 + run: $GITHUB_WORKSPACE/test.sh + + # - name: Run tests under valgrind + # env: + # SKIPS_AS_FAILS: 1 + # TEST_PREFIX: valgrind --error-exitcode=99 --track-origins=yes --leak-check=full + # run: $GITHUB_WORKSPACE/test.sh + + centos7: + name: CentOS 7 + runs-on: ubuntu-latest + container: centos:7 + steps: + - name: Checkout code + uses: actions/checkout@v2 + with: + repository: ${{ env.GITHUB_REPOSITORY }} + ref: ${{ env.GITHUB_HEAD_REF }} + + - name: Install dependencies + run: | + yum -y install http://rpms.remirepo.net/enterprise/remi-release-7.rpm + yum -y --enablerepo=remi install redis + yum -y install gcc gcc-c++ make openssl openssl-devel cmake3 valgrind libevent-devel + + - name: Build using cmake + env: + EXTRA_CMAKE_OPTS: -DENABLE_EXAMPLES:BOOL=ON -DENABLE_SSL:BOOL=ON -DENABLE_SSL_TESTS:BOOL=ON -DENABLE_ASYNC_TESTS:BOOL=ON + CFLAGS: -Werror + CXXFLAGS: -Werror + run: mkdir build-centos7 && cd build-centos7 && cmake3 .. + + - name: Build using Makefile + run: USE_SSL=1 TEST_ASYNC=1 make + + - name: Run tests + env: + SKIPS_AS_FAILS: 1 + TEST_SSL: 1 + run: $GITHUB_WORKSPACE/test.sh + + - name: Run tests under valgrind + env: + SKIPS_AS_FAILS: 1 + TEST_SSL: 1 + TEST_PREFIX: valgrind --error-exitcode=99 --track-origins=yes --leak-check=full + run: $GITHUB_WORKSPACE/test.sh + + centos8: + name: RockyLinux 8 + runs-on: ubuntu-latest + container: rockylinux:8 + steps: + - name: Checkout code + uses: actions/checkout@v2 + with: + repository: ${{ env.GITHUB_REPOSITORY }} + ref: ${{ env.GITHUB_HEAD_REF }} + + - name: Install dependencies + run: | + dnf -y install https://rpms.remirepo.net/enterprise/remi-release-8.rpm + dnf -y module install redis:remi-6.0 + dnf -y group install "Development Tools" + dnf -y install openssl-devel cmake valgrind libevent-devel + + - name: Build using cmake + env: + EXTRA_CMAKE_OPTS: -DENABLE_EXAMPLES:BOOL=ON -DENABLE_SSL:BOOL=ON -DENABLE_SSL_TESTS:BOOL=ON -DENABLE_ASYNC_TESTS:BOOL=ON + CFLAGS: -Werror + CXXFLAGS: -Werror + run: mkdir build-centos8 && cd build-centos8 && cmake .. + + - name: Build using Makefile + run: USE_SSL=1 TEST_ASYNC=1 make + + - name: Run tests + env: + SKIPS_AS_FAILS: 1 + TEST_SSL: 1 + run: $GITHUB_WORKSPACE/test.sh + + - name: Run tests under valgrind + env: + SKIPS_AS_FAILS: 1 + TEST_SSL: 1 + TEST_PREFIX: valgrind --error-exitcode=99 --track-origins=yes --leak-check=full + run: $GITHUB_WORKSPACE/test.sh + + freebsd: + runs-on: macos-12 + name: FreeBSD + steps: + - name: Checkout code + uses: actions/checkout@v2 + with: + repository: ${{ env.GITHUB_REPOSITORY }} + ref: ${{ env.GITHUB_HEAD_REF }} + + - name: Build in FreeBSD + uses: vmactions/freebsd-vm@v0 + with: + prepare: pkg install -y gmake cmake + run: | + mkdir build && cd build && cmake .. && make && cd .. + gmake + + macos: + name: macOS + runs-on: macos-latest + steps: + - name: Checkout code + uses: actions/checkout@v2 + with: + repository: ${{ env.GITHUB_REPOSITORY }} + ref: ${{ env.GITHUB_HEAD_REF }} + + - name: Install dependencies + run: | + brew install openssl redis@6.2 + brew link redis@6.2 --force + + - name: Build hiredis + run: USE_SSL=1 make + + - name: Run tests + env: + TEST_SSL: 1 + run: $GITHUB_WORKSPACE/test.sh + + windows: + name: Windows + runs-on: windows-latest + steps: + - name: Checkout code + uses: actions/checkout@v2 + with: + repository: ${{ env.GITHUB_REPOSITORY }} + ref: ${{ env.GITHUB_HEAD_REF }} + + - name: Install dependencies + run: | + choco install -y ninja memurai-developer + + - uses: ilammy/msvc-dev-cmd@v1 + - name: Build hiredis + run: | + mkdir build && cd build + cmake .. -G Ninja -DCMAKE_BUILD_TYPE=Release -DENABLE_EXAMPLES=ON + ninja -v + + - name: Run tests + run: | + ./build/hiredis-test.exe + + - name: Setup cygwin + uses: egor-tensin/setup-cygwin@v3 + with: + platform: x64 + packages: make git gcc-core + + - name: Build in cygwin + env: + HIREDIS_PATH: ${{ github.workspace }} + run: | + build_hiredis() { + git config --global --add safe.directory "$(cygpath -u $HIREDIS_PATH)" + cd $(cygpath -u $HIREDIS_PATH) + git clean -xfd + make + } + build_hiredis + shell: C:\tools\cygwin\bin\bash.exe --login --norc -eo pipefail -o igncr '{0}' diff --git a/.github/workflows/release-drafter.yml b/.github/workflows/release-drafter.yml new file mode 100644 index 0000000..ec2d88b --- /dev/null +++ b/.github/workflows/release-drafter.yml @@ -0,0 +1,19 @@ +name: Release Drafter + +on: + push: + # branches to consider in the event; optional, defaults to all + branches: + - master + +jobs: + update_release_draft: + runs-on: ubuntu-latest + steps: + # Drafts your next Release notes as Pull Requests are merged into "master" + - uses: release-drafter/release-drafter@v5 + with: + # (Optional) specify config name to use, relative to .github/. Default: release-drafter.yml + config-name: release-drafter-config.yml + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/CHANGELOG.md b/CHANGELOG.md index 271f1fc..2a2bc31 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,22 @@ +## [1.0.2](https://github.com/redis/hiredis/tree/v1.0.2) - (2021-10-07) + +Announcing Hiredis v1.0.2, which fixes CVE-2021-32765 but returns the SONAME to the correct value of `1.0.0`. + +- [Revert SONAME bump](https://github.com/redis/hiredis/commit/d4e6f109a064690cde64765c654e679fea1d3548) + ([Michael Grunder](https://github.com/michael-grunder)) + +## [1.0.1](https://github.com/redis/hiredis/tree/v1.0.1) - (2021-10-04) + +<span style="color:red">This release erroneously bumped the SONAME, please use [1.0.2](https://github.com/redis/hiredis/tree/v1.0.2)</span> + +Announcing Hiredis v1.0.1, a security release fixing CVE-2021-32765 + +- Fix for [CVE-2021-32765](https://github.com/redis/hiredis/security/advisories/GHSA-hfm9-39pp-55p2) + [commit](https://github.com/redis/hiredis/commit/76a7b10005c70babee357a7d0f2becf28ec7ed1e) + ([Yossi Gottlieb](https://github.com/yossigo)) + +_Thanks to [Yossi Gottlieb](https://github.com/yossigo) for the security fix and to [Microsoft Security Vulnerability Research](https://www.microsoft.com/en-us/msrc/msvr) for finding the bug._ :sparkling_heart: + ## [1.0.0](https://github.com/redis/hiredis/tree/v1.0.0) - (2020-08-03) Announcing Hiredis v1.0.0, which adds support for RESP3, SSL connections, allocator injection, and better Windows support! :tada: diff --git a/CMakeLists.txt b/CMakeLists.txt index a8dfaa9..f77dfd5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,10 +1,9 @@ -CMAKE_MINIMUM_REQUIRED(VERSION 3.4.0) -INCLUDE(GNUInstallDirs) -PROJECT(hiredis) +CMAKE_MINIMUM_REQUIRED(VERSION 3.0.0) OPTION(ENABLE_SSL "Build hiredis_ssl for SSL support" OFF) OPTION(DISABLE_TESTS "If tests should be compiled or not" OFF) -OPTION(ENABLE_SSL_TESTS, "Should we test SSL connections" OFF) +OPTION(ENABLE_SSL_TESTS "Should we test SSL connections" OFF) +OPTION(ENABLE_ASYNC_TESTS "Should we run all asynchronous API tests" OFF) MACRO(getVersionBit name) SET(VERSION_REGEX "^#define ${name} (.+)$") @@ -20,7 +19,8 @@ getVersionBit(HIREDIS_SONAME) SET(VERSION "${HIREDIS_MAJOR}.${HIREDIS_MINOR}.${HIREDIS_PATCH}") MESSAGE("Detected version: ${VERSION}") -PROJECT(hiredis VERSION "${VERSION}") +PROJECT(hiredis LANGUAGES "C" VERSION "${VERSION}") +INCLUDE(GNUInstallDirs) # Hiredis requires C99 SET(CMAKE_C_STANDARD 99) @@ -47,17 +47,30 @@ ENDIF() ADD_LIBRARY(hiredis SHARED ${hiredis_sources}) ADD_LIBRARY(hiredis_static STATIC ${hiredis_sources}) +ADD_LIBRARY(hiredis::hiredis ALIAS hiredis) +ADD_LIBRARY(hiredis::hiredis_static ALIAS hiredis_static) + +IF(NOT MSVC) + SET_TARGET_PROPERTIES(hiredis_static + PROPERTIES OUTPUT_NAME hiredis) +ENDIF() SET_TARGET_PROPERTIES(hiredis PROPERTIES WINDOWS_EXPORT_ALL_SYMBOLS TRUE VERSION "${HIREDIS_SONAME}") +IF(WIN32) SET_TARGET_PROPERTIES(hiredis_static - PROPERTIES COMPILE_PDB_NAME hiredis_static) -SET_TARGET_PROPERTIES(hiredis_static - PROPERTIES COMPILE_PDB_NAME_DEBUG hiredis_static${CMAKE_DEBUG_POSTFIX}) + PROPERTIES COMPILE_FLAGS /Z7) +ENDIF() IF(WIN32 OR MINGW) TARGET_LINK_LIBRARIES(hiredis PUBLIC ws2_32 crypt32) TARGET_LINK_LIBRARIES(hiredis_static PUBLIC ws2_32 crypt32) +ELSEIF(CMAKE_SYSTEM_NAME MATCHES "FreeBSD") + TARGET_LINK_LIBRARIES(hiredis PUBLIC m) + TARGET_LINK_LIBRARIES(hiredis_static PUBLIC m) +ELSEIF(CMAKE_SYSTEM_NAME MATCHES "SunOS") + TARGET_LINK_LIBRARIES(hiredis PUBLIC socket) + TARGET_LINK_LIBRARIES(hiredis_static PUBLIC socket) ENDIF() TARGET_INCLUDE_DIRECTORIES(hiredis PUBLIC $<INSTALL_INTERFACE:include> $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>) @@ -152,6 +165,10 @@ IF(ENABLE_SSL) ${hiredis_ssl_sources}) ADD_LIBRARY(hiredis_ssl_static STATIC ${hiredis_ssl_sources}) + IF(NOT MSVC) + SET_TARGET_PROPERTIES(hiredis_ssl_static + PROPERTIES OUTPUT_NAME hiredis_ssl) + ENDIF() IF (APPLE) SET_PROPERTY(TARGET hiredis_ssl PROPERTY LINK_FLAGS "-Wl,-undefined -Wl,dynamic_lookup") @@ -218,11 +235,14 @@ ENDIF() IF(NOT DISABLE_TESTS) ENABLE_TESTING() ADD_EXECUTABLE(hiredis-test test.c) + TARGET_LINK_LIBRARIES(hiredis-test hiredis) IF(ENABLE_SSL_TESTS) ADD_DEFINITIONS(-DHIREDIS_TEST_SSL=1) - TARGET_LINK_LIBRARIES(hiredis-test hiredis hiredis_ssl) - ELSE() - TARGET_LINK_LIBRARIES(hiredis-test hiredis) + TARGET_LINK_LIBRARIES(hiredis-test hiredis_ssl) + ENDIF() + IF(ENABLE_ASYNC_TESTS) + ADD_DEFINITIONS(-DHIREDIS_TEST_ASYNC=1) + TARGET_LINK_LIBRARIES(hiredis-test event) ENDIF() ADD_TEST(NAME hiredis-test COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/test.sh) @@ -4,16 +4,10 @@ # This file is released under the BSD license, see the COPYING file OBJ=alloc.o net.o hiredis.o sds.o async.o read.o sockcompat.o -SSL_OBJ=ssl.o -EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push -ifeq ($(USE_SSL),1) -EXAMPLES+=hiredis-example-ssl hiredis-example-libevent-ssl -endif +EXAMPLES=hiredis-example hiredis-example-libevent hiredis-example-libev hiredis-example-glib hiredis-example-push hiredis-example-poll TESTS=hiredis-test LIBNAME=libhiredis PKGCONFNAME=hiredis.pc -SSL_LIBNAME=libhiredis_ssl -SSL_PKGCONFNAME=hiredis_ssl.pc HIREDIS_MAJOR=$(shell grep HIREDIS_MAJOR hiredis.h | awk '{print $$3}') HIREDIS_MINOR=$(shell grep HIREDIS_MINOR hiredis.h | awk '{print $$3}') @@ -60,28 +54,66 @@ DYLIB_MAKE_CMD=$(CC) -shared -Wl,-soname,$(DYLIB_MINOR_NAME) STLIBNAME=$(LIBNAME).$(STLIBSUFFIX) STLIB_MAKE_CMD=$(AR) rcs +#################### SSL variables start #################### +SSL_OBJ=ssl.o +SSL_LIBNAME=libhiredis_ssl +SSL_PKGCONFNAME=hiredis_ssl.pc +SSL_INSTALLNAME=install-ssl SSL_DYLIB_MINOR_NAME=$(SSL_LIBNAME).$(DYLIBSUFFIX).$(HIREDIS_SONAME) SSL_DYLIB_MAJOR_NAME=$(SSL_LIBNAME).$(DYLIBSUFFIX).$(HIREDIS_MAJOR) SSL_DYLIBNAME=$(SSL_LIBNAME).$(DYLIBSUFFIX) SSL_STLIBNAME=$(SSL_LIBNAME).$(STLIBSUFFIX) SSL_DYLIB_MAKE_CMD=$(CC) -shared -Wl,-soname,$(SSL_DYLIB_MINOR_NAME) +USE_SSL?=0 +ifeq ($(USE_SSL),1) + # This is required for test.c only + CFLAGS+=-DHIREDIS_TEST_SSL + EXAMPLES+=hiredis-example-ssl hiredis-example-libevent-ssl + SSL_STLIB=$(SSL_STLIBNAME) + SSL_DYLIB=$(SSL_DYLIBNAME) + SSL_PKGCONF=$(SSL_PKGCONFNAME) + SSL_INSTALL=$(SSL_INSTALLNAME) +else + SSL_STLIB= + SSL_DYLIB= + SSL_PKGCONF= + SSL_INSTALL= +endif +##################### SSL variables end ##################### + + # Platform-specific overrides uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') -USE_SSL?=0 - # This is required for test.c only +ifeq ($(TEST_ASYNC),1) + export CFLAGS+=-DHIREDIS_TEST_ASYNC +endif + ifeq ($(USE_SSL),1) - CFLAGS+=-DHIREDIS_TEST_SSL + ifeq ($(uname_S),Linux) + ifdef OPENSSL_PREFIX + CFLAGS+=-I$(OPENSSL_PREFIX)/include + SSL_LDFLAGS+=-L$(OPENSSL_PREFIX)/lib -lssl -lcrypto + else + SSL_LDFLAGS=-lssl -lcrypto + endif + else + OPENSSL_PREFIX?=/usr/local/opt/openssl + CFLAGS+=-I$(OPENSSL_PREFIX)/include + SSL_LDFLAGS+=-L$(OPENSSL_PREFIX)/lib -lssl -lcrypto + endif endif -ifeq ($(uname_S),Linux) - SSL_LDFLAGS=-lssl -lcrypto +ifeq ($(uname_S),FreeBSD) + LDFLAGS+=-lm + IS_GCC=$(shell sh -c '$(CC) --version 2>/dev/null |egrep -i -c "gcc"') + ifeq ($(IS_GCC),1) + REAL_CFLAGS+=-pedantic + endif else - OPENSSL_PREFIX?=/usr/local/opt/openssl - CFLAGS+=-I$(OPENSSL_PREFIX)/include - SSL_LDFLAGS+=-L$(OPENSSL_PREFIX)/lib -lssl -lcrypto + REAL_CFLAGS+=-pedantic endif ifeq ($(uname_S),SunOS) @@ -103,10 +135,13 @@ ifeq ($(uname_S),Darwin) DYLIB_PLUGIN=-Wl,-undefined -Wl,dynamic_lookup endif -all: $(DYLIBNAME) $(STLIBNAME) hiredis-test $(PKGCONFNAME) -ifeq ($(USE_SSL),1) -all: $(SSL_DYLIBNAME) $(SSL_STLIBNAME) $(SSL_PKGCONFNAME) -endif +all: dynamic static hiredis-test pkgconfig + +dynamic: $(DYLIBNAME) $(SSL_DYLIB) + +static: $(STLIBNAME) $(SSL_STLIB) + +pkgconfig: $(PKGCONFNAME) $(SSL_PKGCONF) # Deps (use make dep to generate this) alloc.o: alloc.c fmacros.h alloc.h @@ -117,7 +152,6 @@ net.o: net.c fmacros.h net.h hiredis.h read.h sds.h alloc.h sockcompat.h win32.h read.o: read.c fmacros.h alloc.h read.h sds.h win32.h sds.o: sds.c sds.h sdsalloc.h alloc.h sockcompat.o: sockcompat.c sockcompat.h -ssl.o: ssl.c hiredis.h read.h sds.h alloc.h async.h win32.h async_private.h test.o: test.c fmacros.h hiredis.h read.h sds.h alloc.h net.h sockcompat.h win32.h $(DYLIBNAME): $(OBJ) @@ -126,18 +160,15 @@ $(DYLIBNAME): $(OBJ) $(STLIBNAME): $(OBJ) $(STLIB_MAKE_CMD) $(STLIBNAME) $(OBJ) +#################### SSL building rules start #################### $(SSL_DYLIBNAME): $(SSL_OBJ) $(SSL_DYLIB_MAKE_CMD) $(DYLIB_PLUGIN) -o $(SSL_DYLIBNAME) $(SSL_OBJ) $(REAL_LDFLAGS) $(LDFLAGS) $(SSL_LDFLAGS) $(SSL_STLIBNAME): $(SSL_OBJ) $(STLIB_MAKE_CMD) $(SSL_STLIBNAME) $(SSL_OBJ) -dynamic: $(DYLIBNAME) -static: $(STLIBNAME) -ifeq ($(USE_SSL),1) -dynamic: $(SSL_DYLIBNAME) -static: $(SSL_STLIBNAME) -endif +$(SSL_OBJ): ssl.c hiredis.h read.h sds.h alloc.h async.h win32.h async_private.h +#################### SSL building rules end #################### # Binaries: hiredis-example-libevent: examples/example-libevent.c adapters/libevent.h $(STLIBNAME) @@ -161,6 +192,8 @@ hiredis-example-macosx: examples/example-macosx.c adapters/macosx.h $(STLIBNAME) hiredis-example-ssl: examples/example-ssl.c $(STLIBNAME) $(SSL_STLIBNAME) $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(SSL_STLIBNAME) $(REAL_LDFLAGS) $(SSL_LDFLAGS) +hiredis-example-poll: examples/example-poll.c adapters/poll.h $(STLIBNAME) + $(CC) -o examples/$@ $(REAL_CFLAGS) -I. $< $(STLIBNAME) $(REAL_LDFLAGS) ifndef AE_DIR hiredis-example-ae: @@ -172,10 +205,11 @@ hiredis-example-ae: examples/example-ae.c adapters/ae.h $(STLIBNAME) endif ifndef LIBUV_DIR -hiredis-example-libuv: - @echo "Please specify LIBUV_DIR (e.g. ../libuv/)" - @false +# dynamic link libuv.so +hiredis-example-libuv: examples/example-libuv.c adapters/libuv.h $(STLIBNAME) + $(CC) -o examples/$@ $(REAL_CFLAGS) -I. -I$(LIBUV_DIR)/include $< -luv -lpthread -lrt $(STLIBNAME) $(REAL_LDFLAGS) else +# use user provided static lib hiredis-example-libuv: examples/example-libuv.c adapters/libuv.h $(STLIBNAME) $(CC) -o examples/$@ $(REAL_CFLAGS) -I. -I$(LIBUV_DIR)/include $< $(LIBUV_DIR)/.libs/libuv.a -lpthread -lrt $(STLIBNAME) $(REAL_LDFLAGS) endif @@ -201,10 +235,13 @@ hiredis-example-push: examples/example-push.c $(STLIBNAME) examples: $(EXAMPLES) -TEST_LIBS = $(STLIBNAME) +TEST_LIBS = $(STLIBNAME) $(SSL_STLIB) +TEST_LDFLAGS = $(SSL_LDFLAGS) ifeq ($(USE_SSL),1) - TEST_LIBS += $(SSL_STLIBNAME) - TEST_LDFLAGS = $(SSL_LDFLAGS) -lssl -lcrypto -lpthread + TEST_LDFLAGS += -pthread +endif +ifeq ($(TEST_ASYNC),1) + TEST_LDFLAGS += -levent endif hiredis-test: test.o $(TEST_LIBS) @@ -220,7 +257,7 @@ check: hiredis-test TEST_SSL=$(USE_SSL) ./test.sh .c.o: - $(CC) -std=c99 -pedantic -c $(REAL_CFLAGS) $< + $(CC) -std=c99 -c $(REAL_CFLAGS) $< clean: rm -rf $(DYLIBNAME) $(STLIBNAME) $(SSL_DYLIBNAME) $(SSL_STLIBNAME) $(TESTS) $(PKGCONFNAME) examples/hiredis-example* *.o *.gcda *.gcno *.gcov @@ -257,7 +294,7 @@ $(SSL_PKGCONFNAME): hiredis_ssl.h @echo Libs: -L\$${libdir} -lhiredis_ssl >> $@ @echo Libs.private: -lssl -lcrypto >> $@ -install: $(DYLIBNAME) $(STLIBNAME) $(PKGCONFNAME) +install: $(DYLIBNAME) $(STLIBNAME) $(PKGCONFNAME) $(SSL_INSTALL) mkdir -p $(INSTALL_INCLUDE_PATH) $(INSTALL_INCLUDE_PATH)/adapters $(INSTALL_LIBRARY_PATH) $(INSTALL) hiredis.h async.h read.h sds.h alloc.h $(INSTALL_INCLUDE_PATH) $(INSTALL) adapters/*.h $(INSTALL_INCLUDE_PATH)/adapters @@ -267,9 +304,6 @@ install: $(DYLIBNAME) $(STLIBNAME) $(PKGCONFNAME) mkdir -p $(INSTALL_PKGCONF_PATH) $(INSTALL) $(PKGCONFNAME) $(INSTALL_PKGCONF_PATH) -ifeq ($(USE_SSL),1) -install: install-ssl - install-ssl: $(SSL_DYLIBNAME) $(SSL_STLIBNAME) $(SSL_PKGCONFNAME) mkdir -p $(INSTALL_INCLUDE_PATH) $(INSTALL_LIBRARY_PATH) $(INSTALL) hiredis_ssl.h $(INSTALL_INCLUDE_PATH) @@ -278,7 +312,6 @@ install-ssl: $(SSL_DYLIBNAME) $(SSL_STLIBNAME) $(SSL_PKGCONFNAME) $(INSTALL) $(SSL_STLIBNAME) $(INSTALL_LIBRARY_PATH) mkdir -p $(INSTALL_PKGCONF_PATH) $(INSTALL) $(SSL_PKGCONFNAME) $(INSTALL_PKGCONF_PATH) -endif 32bit: @echo "" @@ -294,12 +327,12 @@ gprof: $(MAKE) CFLAGS="-pg" LDFLAGS="-pg" gcov: - $(MAKE) CFLAGS="-fprofile-arcs -ftest-coverage" LDFLAGS="-fprofile-arcs" + $(MAKE) CFLAGS+="-fprofile-arcs -ftest-coverage" LDFLAGS="-fprofile-arcs" coverage: gcov make check mkdir -p tmp/lcov - lcov -d . -c -o tmp/lcov/hiredis.info + lcov -d . -c --exclude '/usr*' -o tmp/lcov/hiredis.info genhtml --legend -o tmp/lcov/report tmp/lcov/hiredis.info noopt: @@ -1,10 +1,11 @@ -[![Build Status](https://travis-ci.org/redis/hiredis.png)](https://travis-ci.org/redis/hiredis) + +[![Build Status](https://github.com/redis/hiredis/actions/workflows/build.yml/badge.svg)](https://github.com/redis/hiredis/actions/workflows/build.yml) **This Readme reflects the latest changed in the master branch. See [v1.0.0](https://github.com/redis/hiredis/tree/v1.0.0) for the Readme and documentation for the latest release ([API/ABI history](https://abi-laboratory.pro/?view=timeline&l=hiredis)).** # HIREDIS -Hiredis is a minimalistic C client library for the [Redis](http://redis.io/) database. +Hiredis is a minimalistic C client library for the [Redis](https://redis.io/) database. It is minimalistic because it just adds minimal support for the protocol, but at the same time it uses a high level printf-alike API in order to make it @@ -22,6 +23,12 @@ Redis version >= 1.2.0. The library comes with multiple APIs. There is the *synchronous API*, the *asynchronous API* and the *reply parsing API*. +## Upgrading to `1.0.2` + +<span style="color:red">NOTE: v1.0.1 erroneously bumped SONAME, which is why it is skipped here.</span> + +Version 1.0.2 is simply 1.0.0 with a fix for [CVE-2021-32765](https://github.com/redis/hiredis/security/advisories/GHSA-hfm9-39pp-55p2). They are otherwise identical. + ## Upgrading to `1.0.0` Version 1.0.0 marks the first stable release of Hiredis. @@ -169,7 +176,7 @@ Hiredis also supports every new `RESP3` data type which are as follows. For mor * **`REDIS_REPLY_MAP`**: * An array with the added invariant that there will always be an even number of elements. - The MAP is functionally equivelant to `REDIS_REPLY_ARRAY` except for the previously mentioned invariant. + The MAP is functionally equivalent to `REDIS_REPLY_ARRAY` except for the previously mentioned invariant. * **`REDIS_REPLY_SET`**: * An array response where each entry is unique. @@ -189,7 +196,7 @@ Hiredis also supports every new `RESP3` data type which are as follows. For mor * **`REDIS_REPLY_VERB`**: * A verbatim string, intended to be presented to the user without modification. - The string payload is stored in the `str` memeber, and type data is stored in the `vtype` member (e.g. `txt` for raw text or `md` for markdown). + The string payload is stored in the `str` member, and type data is stored in the `vtype` member (e.g. `txt` for raw text or `md` for markdown). Replies should be freed using the `freeReplyObject()` function. Note that this function will take care of freeing sub-reply objects @@ -243,7 +250,7 @@ following two execution paths: * Read from the socket until a single reply could be parsed The function `redisGetReply` is exported as part of the Hiredis API and can be used when a reply -is expected on the socket. To pipeline commands, the only things that needs to be done is +is expected on the socket. To pipeline commands, the only thing that needs to be done is filling up the output buffer. For this cause, two commands can be used that are identical to the `redisCommand` family, apart from not returning a reply: ```c @@ -261,9 +268,9 @@ a single call to `read(2)`): redisReply *reply; redisAppendCommand(context,"SET foo bar"); redisAppendCommand(context,"GET foo"); -redisGetReply(context,(void *)&reply); // reply for SET +redisGetReply(context,(void**)&reply); // reply for SET freeReplyObject(reply); -redisGetReply(context,(void *)&reply); // reply for GET +redisGetReply(context,(void**)&reply); // reply for GET freeReplyObject(reply); ``` This API can also be used to implement a blocking subscriber: @@ -313,23 +320,48 @@ Redis. It returns a pointer to the newly created `redisAsyncContext` struct. The should be checked after creation to see if there were errors creating the connection. Because the connection that will be created is non-blocking, the kernel is not able to instantly return if the specified host and port is able to accept a connection. +In case of error, it is the caller's responsibility to free the context using `redisAsyncFree()` *Note: A `redisAsyncContext` is not thread-safe.* +An application function creating a connection might look like this: + ```c -redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); -if (c->err) { - printf("Error: %s\n", c->errstr); - // handle error +void appConnect(myAppData *appData) +{ + redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); + if (c->err) { + printf("Error: %s\n", c->errstr); + // handle error + redisAsyncFree(c); + c = NULL; + } else { + appData->context = c; + appData->connecting = 1; + c->data = appData; /* store application pointer for the callbacks */ + redisAsyncSetConnectCallback(c, appOnConnect); + redisAsyncSetDisconnectCallback(c, appOnDisconnect); + } } + ``` -The asynchronous context can hold a disconnect callback function that is called when the -connection is disconnected (either because of an error or per user request). This function should + +The asynchronous context _should_ hold a *connect* callback function that is called when the connection +attempt completes, either successfully or with an error. +It _can_ also hold a *disconnect* callback function that is called when the +connection is disconnected (either because of an error or per user request). Both callbacks should have the following prototype: ```c void(const redisAsyncContext *c, int status); ``` + +On a *connect*, the `status` argument is set to `REDIS_OK` if the connection attempt succeeded. In this +case, the context is ready to accept commands. If it is called with `REDIS_ERR` then the +connection attempt failed. The `err` field in the context can be accessed to find out the cause of the error. +After a failed connection attempt, the context object is automatically freed by the library after calling +the connect callback. This may be a good point to create a new context and retry the connection. + On a disconnect, the `status` argument is set to `REDIS_OK` when disconnection was initiated by the user, or `REDIS_ERR` when the disconnection was caused by an error. When it is `REDIS_ERR`, the `err` field in the context can be accessed to find out the cause of the error. @@ -337,12 +369,47 @@ field in the context can be accessed to find out the cause of the error. The context object is always freed after the disconnect callback fired. When a reconnect is needed, the disconnect callback is a good point to do so. -Setting the disconnect callback can only be done once per context. For subsequent calls it will -return `REDIS_ERR`. The function to set the disconnect callback has the following prototype: +Setting the connect or disconnect callbacks can only be done once per context. For subsequent calls the +api will return `REDIS_ERR`. The function to set the callbacks have the following prototype: ```c +/* Alternatively you can use redisAsyncSetConnectCallbackNC which will be passed a non-const + redisAsyncContext* on invocation (e.g. allowing writes to the privdata member). */ +int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn); int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); ``` -`ac->data` may be used to pass user data to this callback, the same can be done for redisConnectCallback. +`ac->data` may be used to pass user data to both callbacks. A typical implementation +might look something like this: +```c +void appOnConnect(redisAsyncContext *c, int status) +{ + myAppData *appData = (myAppData*)c->data; /* get my application specific context*/ + appData->connecting = 0; + if (status == REDIS_OK) { + appData->connected = 1; + } else { + appData->connected = 0; + appData->err = c->err; + appData->context = NULL; /* avoid stale pointer when callback returns */ + } + appAttemptReconnect(); +} + +void appOnDisconnect(redisAsyncContext *c, int status) +{ + myAppData *appData = (myAppData*)c->data; /* get my application specific context*/ + appData->connected = 0; + appData->err = c->err; + appData->context = NULL; /* avoid stale pointer when callback returns */ + if (status == REDIS_OK) { + appNotifyDisconnectCompleted(mydata); + } else { + appNotifyUnexpectedDisconnect(mydata); + appAttemptReconnect(); + } +} +``` + + ### Sending commands and their callbacks In an asynchronous context, commands are automatically pipelined due to the nature of an event loop. @@ -375,6 +442,14 @@ valid for the duration of the callback. All pending callbacks are called with a `NULL` reply when the context encountered an error. +For every command issued, with the exception of **SUBSCRIBE** and **PSUBSCRIBE**, the callback is +called exactly once. Even if the context object id disconnected or deleted, every pending callback +will be called with a `NULL` reply. + +For **SUBSCRIBE** and **PSUBSCRIBE**, the callbacks may be called repeatedly until an `unsubscribe` +message arrives. This will be the last invocation of the callback. In case of error, the callbacks +may receive a final `NULL` reply instead. + ### Disconnecting An asynchronous connection can be terminated using: @@ -387,6 +462,15 @@ have been written to the socket, their respective replies have been read and the callbacks have been executed. After this, the disconnection callback is executed with the `REDIS_OK` status and the context object is freed. +The connection can be forcefully disconnected using +```c +void redisAsyncFree(redisAsyncContext *ac); +``` +In this case, nothing more is written to the socket, all pending callbacks are called with a `NULL` +reply and the disconnection callback is called with `REDIS_OK`, after which the context object +is freed. + + ### Hooking it up to event library *X* There are a few hooks that need to be set on the context object after it is created. @@ -522,7 +606,7 @@ redisSSLContext *ssl_context; /* An error variable to indicate what went wrong, if the context fails to * initialize. */ -redisSSLContextError ssl_error; +redisSSLContextError ssl_error = REDIS_SSL_CTX_NONE; /* Initialize global OpenSSL state. * @@ -540,11 +624,11 @@ ssl_context = redisCreateSSLContext( "redis.mydomain.com", /* Server name to request (SNI), optional */ &ssl_error); -if(ssl_context == NULL || ssl_error != 0) { +if(ssl_context == NULL || ssl_error != REDIS_SSL_CTX_NONE) { /* Handle error and abort... */ - /* e.g. - printf("SSL error: %s\n", - (ssl_error != 0) ? + /* e.g. + printf("SSL error: %s\n", + (ssl_error != REDIS_SSL_CTX_NONE) ? redisSSLContextGetError(ssl_error) : "Unknown error"); // Abort */ @@ -626,7 +710,7 @@ If you have a unique use-case where you don't want hiredis to automatically inte redisSetPushCallback(context, NULL); ``` - _Note: With no handler configured, calls to `redisCommand` may generate more than one reply, so this strategy is only applicable when there's some kind of blocking`redisGetReply()` loop (e.g. `MONITOR` or `SUBSCRIBE` workloads)._ + _Note: With no handler configured, calls to `redisCommand` may generate more than one reply, so this strategy is only applicable when there's some kind of blocking `redisGetReply()` loop (e.g. `MONITOR` or `SUBSCRIBE` workloads)._ ## Allocator injection diff --git a/adapters/libev.h b/adapters/libev.h index 6191543..c59d3da 100644 --- a/adapters/libev.h +++ b/adapters/libev.h @@ -66,8 +66,9 @@ static void redisLibevWriteEvent(EV_P_ ev_io *watcher, int revents) { static void redisLibevAddRead(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY struct ev_loop *loop = e->loop; - ((void)loop); +#endif if (!e->reading) { e->reading = 1; ev_io_start(EV_A_ &e->rev); @@ -76,8 +77,9 @@ static void redisLibevAddRead(void *privdata) { static void redisLibevDelRead(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY struct ev_loop *loop = e->loop; - ((void)loop); +#endif if (e->reading) { e->reading = 0; ev_io_stop(EV_A_ &e->rev); @@ -86,8 +88,9 @@ static void redisLibevDelRead(void *privdata) { static void redisLibevAddWrite(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY struct ev_loop *loop = e->loop; - ((void)loop); +#endif if (!e->writing) { e->writing = 1; ev_io_start(EV_A_ &e->wev); @@ -96,8 +99,9 @@ static void redisLibevAddWrite(void *privdata) { static void redisLibevDelWrite(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY struct ev_loop *loop = e->loop; - ((void)loop); +#endif if (e->writing) { e->writing = 0; ev_io_stop(EV_A_ &e->wev); @@ -106,8 +110,9 @@ static void redisLibevDelWrite(void *privdata) { static void redisLibevStopTimer(void *privdata) { redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY struct ev_loop *loop = e->loop; - ((void)loop); +#endif ev_timer_stop(EV_A_ &e->timer); } @@ -120,6 +125,9 @@ static void redisLibevCleanup(void *privdata) { } static void redisLibevTimeout(EV_P_ ev_timer *timer, int revents) { +#if EV_MULTIPLICITY + ((void)EV_A); +#endif ((void)revents); redisLibevEvents *e = (redisLibevEvents*)timer->data; redisAsyncHandleTimeout(e->context); @@ -127,8 +135,9 @@ static void redisLibevTimeout(EV_P_ ev_timer *timer, int revents) { static void redisLibevSetTimeout(void *privdata, struct timeval tv) { redisLibevEvents *e = (redisLibevEvents*)privdata; +#if EV_MULTIPLICITY struct ev_loop *loop = e->loop; - ((void)loop); +#endif if (!ev_is_active(&e->timer)) { ev_init(&e->timer, redisLibevTimeout); diff --git a/adapters/libevent.h b/adapters/libevent.h index 9150979..73bb8ed 100644 --- a/adapters/libevent.h +++ b/adapters/libevent.h @@ -50,7 +50,7 @@ static void redisLibeventDestroy(redisLibeventEvents *e) { hi_free(e); } -static void redisLibeventHandler(int fd, short event, void *arg) { +static void redisLibeventHandler(evutil_socket_t fd, short event, void *arg) { ((void)fd); redisLibeventEvents *e = (redisLibeventEvents*)arg; e->state |= REDIS_LIBEVENT_ENTERED; diff --git a/adapters/libuv.h b/adapters/libuv.h index c120b1b..df0a845 100644 --- a/adapters/libuv.h +++ b/adapters/libuv.h @@ -7,111 +7,157 @@ #include <string.h> typedef struct redisLibuvEvents { - redisAsyncContext* context; - uv_poll_t handle; - int events; + redisAsyncContext* context; + uv_poll_t handle; + uv_timer_t timer; + int events; } redisLibuvEvents; static void redisLibuvPoll(uv_poll_t* handle, int status, int events) { - redisLibuvEvents* p = (redisLibuvEvents*)handle->data; - int ev = (status ? p->events : events); - - if (p->context != NULL && (ev & UV_READABLE)) { - redisAsyncHandleRead(p->context); - } - if (p->context != NULL && (ev & UV_WRITABLE)) { - redisAsyncHandleWrite(p->context); - } + redisLibuvEvents* p = (redisLibuvEvents*)handle->data; + int ev = (status ? p->events : events); + + if (p->context != NULL && (ev & UV_READABLE)) { + redisAsyncHandleRead(p->context); + } + if (p->context != NULL && (ev & UV_WRITABLE)) { + redisAsyncHandleWrite(p->context); + } } static void redisLibuvAddRead(void *privdata) { - redisLibuvEvents* p = (redisLibuvEvents*)privdata; + redisLibuvEvents* p = (redisLibuvEvents*)privdata; - p->events |= UV_READABLE; + p->events |= UV_READABLE; - uv_poll_start(&p->handle, p->events, redisLibuvPoll); + uv_poll_start(&p->handle, p->events, redisLibuvPoll); } static void redisLibuvDelRead(void *privdata) { - redisLibuvEvents* p = (redisLibuvEvents*)privdata; + redisLibuvEvents* p = (redisLibuvEvents*)privdata; - p->events &= ~UV_READABLE; + p->events &= ~UV_READABLE; - if (p->events) { - uv_poll_start(&p->handle, p->events, redisLibuvPoll); - } else { - uv_poll_stop(&p->handle); - } + if (p->events) { + uv_poll_start(&p->handle, p->events, redisLibuvPoll); + } else { + uv_poll_stop(&p->handle); + } } static void redisLibuvAddWrite(void *privdata) { - redisLibuvEvents* p = (redisLibuvEvents*)privdata; + redisLibuvEvents* p = (redisLibuvEvents*)privdata; - p->events |= UV_WRITABLE; + p->events |= UV_WRITABLE; - uv_poll_start(&p->handle, p->events, redisLibuvPoll); + uv_poll_start(&p->handle, p->events, redisLibuvPoll); } static void redisLibuvDelWrite(void *privdata) { - redisLibuvEvents* p = (redisLibuvEvents*)privdata; + redisLibuvEvents* p = (redisLibuvEvents*)privdata; - p->events &= ~UV_WRITABLE; + p->events &= ~UV_WRITABLE; - if (p->events) { - uv_poll_start(&p->handle, p->events, redisLibuvPoll); - } else { - uv_poll_stop(&p->handle); - } + if (p->events) { + uv_poll_start(&p->handle, p->events, redisLibuvPoll); + } else { + uv_poll_stop(&p->handle); + } } +static void on_timer_close(uv_handle_t *handle) { + redisLibuvEvents* p = (redisLibuvEvents*)handle->data; + p->timer.data = NULL; + if (!p->handle.data) { + // both timer and handle are closed + hi_free(p); + } + // else, wait for `on_handle_close` +} -static void on_close(uv_handle_t* handle) { - redisLibuvEvents* p = (redisLibuvEvents*)handle->data; +static void on_handle_close(uv_handle_t *handle) { + redisLibuvEvents* p = (redisLibuvEvents*)handle->data; + p->handle.data = NULL; + if (!p->timer.data) { + // timer never started, or timer already destroyed + hi_free(p); + } + // else, wait for `on_timer_close` +} - hi_free(p); +// libuv removed `status` parameter since v0.11.23 +// see: https://github.com/libuv/libuv/blob/v0.11.23/include/uv.h +#if (UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR < 11) || \ + (UV_VERSION_MAJOR == 0 && UV_VERSION_MINOR == 11 && UV_VERSION_PATCH < 23) +static void redisLibuvTimeout(uv_timer_t *timer, int status) { + (void)status; // unused +#else +static void redisLibuvTimeout(uv_timer_t *timer) { +#endif + redisLibuvEvents *e = (redisLibuvEvents*)timer->data; + redisAsyncHandleTimeout(e->context); } +static void redisLibuvSetTimeout(void *privdata, struct timeval tv) { + redisLibuvEvents* p = (redisLibuvEvents*)privdata; + + uint64_t millsec = tv.tv_sec * 1000 + tv.tv_usec / 1000.0; + if (!p->timer.data) { + // timer is uninitialized + if (uv_timer_init(p->handle.loop, &p->timer) != 0) { + return; + } + p->timer.data = p; + } + // updates the timeout if the timer has already started + // or start the timer + uv_timer_start(&p->timer, redisLibuvTimeout, millsec, 0); +} static void redisLibuvCleanup(void *privdata) { - redisLibuvEvents* p = (redisLibuvEvents*)privdata; + redisLibuvEvents* p = (redisLibuvEvents*)privdata; - p->context = NULL; // indicate that context might no longer exist - uv_close((uv_handle_t*)&p->handle, on_close); + p->context = NULL; // indicate that context might no longer exist + if (p->timer.data) { + uv_close((uv_handle_t*)&p->timer, on_timer_close); + } + uv_close((uv_handle_t*)&p->handle, on_handle_close); } static int redisLibuvAttach(redisAsyncContext* ac, uv_loop_t* loop) { - redisContext *c = &(ac->c); + redisContext *c = &(ac->c); - if (ac->ev.data != NULL) { - return REDIS_ERR; - } + if (ac->ev.data != NULL) { + return REDIS_ERR; + } - ac->ev.addRead = redisLibuvAddRead; - ac->ev.delRead = redisLibuvDelRead; - ac->ev.addWrite = redisLibuvAddWrite; - ac->ev.delWrite = redisLibuvDelWrite; - ac->ev.cleanup = redisLibuvCleanup; + ac->ev.addRead = redisLibuvAddRead; + ac->ev.delRead = redisLibuvDelRead; + ac->ev.addWrite = redisLibuvAddWrite; + ac->ev.delWrite = redisLibuvDelWrite; + ac->ev.cleanup = redisLibuvCleanup; + ac->ev.scheduleTimer = redisLibuvSetTimeout; - redisLibuvEvents* p = (redisLibuvEvents*)hi_malloc(sizeof(*p)); - if (p == NULL) - return REDIS_ERR; + redisLibuvEvents* p = (redisLibuvEvents*)hi_malloc(sizeof(*p)); + if (p == NULL) + return REDIS_ERR; - memset(p, 0, sizeof(*p)); + memset(p, 0, sizeof(*p)); - if (uv_poll_init_socket(loop, &p->handle, c->fd) != 0) { - return REDIS_ERR; - } + if (uv_poll_init_socket(loop, &p->handle, c->fd) != 0) { + return REDIS_ERR; + } - ac->ev.data = p; - p->handle.data = p; - p->context = ac; + ac->ev.data = p; + p->handle.data = p; + p->context = ac; - return REDIS_OK; + return REDIS_OK; } #endif diff --git a/adapters/poll.h b/adapters/poll.h new file mode 100644 index 0000000..f138650 --- /dev/null +++ b/adapters/poll.h @@ -0,0 +1,197 @@ + +#ifndef HIREDIS_POLL_H +#define HIREDIS_POLL_H + +#include "../async.h" +#include "../sockcompat.h" +#include <string.h> // for memset +#include <errno.h> + +/* Values to return from redisPollTick */ +#define REDIS_POLL_HANDLED_READ 1 +#define REDIS_POLL_HANDLED_WRITE 2 +#define REDIS_POLL_HANDLED_TIMEOUT 4 + +/* An adapter to allow manual polling of the async context by checking the state + * of the underlying file descriptor. Useful in cases where there is no formal + * IO event loop but regular ticking can be used, such as in game engines. */ + +typedef struct redisPollEvents { + redisAsyncContext *context; + redisFD fd; + char reading, writing; + char in_tick; + char deleted; + double deadline; +} redisPollEvents; + +static double redisPollTimevalToDouble(struct timeval *tv) { + if (tv == NULL) + return 0.0; + return tv->tv_sec + tv->tv_usec / 1000000.00; +} + +static double redisPollGetNow(void) { +#ifndef _MSC_VER + struct timeval tv; + gettimeofday(&tv,NULL); + return redisPollTimevalToDouble(&tv); +#else + FILETIME ft; + ULARGE_INTEGER li; + GetSystemTimeAsFileTime(&ft); + li.HighPart = ft.dwHighDateTime; + li.LowPart = ft.dwLowDateTime; + return (double)li.QuadPart * 1e-7; +#endif +} + +/* Poll for io, handling any pending callbacks. The timeout argument can be + * positive to wait for a maximum given time for IO, zero to poll, or negative + * to wait forever */ +static int redisPollTick(redisAsyncContext *ac, double timeout) { + int reading, writing; + struct pollfd pfd; + int handled; + int ns; + int itimeout; + + redisPollEvents *e = (redisPollEvents*)ac->ev.data; + if (!e) + return 0; + + /* local flags, won't get changed during callbacks */ + reading = e->reading; + writing = e->writing; + if (!reading && !writing) + return 0; + + pfd.fd = e->fd; + pfd.events = 0; + if (reading) + pfd.events = POLLIN; + if (writing) + pfd.events |= POLLOUT; + + if (timeout >= 0.0) { + itimeout = (int)(timeout * 1000.0); + } else { + itimeout = -1; + } + + ns = poll(&pfd, 1, itimeout); + if (ns < 0) { + /* ignore the EINTR error */ + if (errno != EINTR) + return ns; + ns = 0; + } + + handled = 0; + e->in_tick = 1; + if (ns) { + if (reading && (pfd.revents & POLLIN)) { + redisAsyncHandleRead(ac); + handled |= REDIS_POLL_HANDLED_READ; + } + /* on Windows, connection failure is indicated with the Exception fdset. + * handle it the same as writable. */ + if (writing && (pfd.revents & (POLLOUT | POLLERR))) { + /* context Read callback may have caused context to be deleted, e.g. + by doing an redisAsyncDisconnect() */ + if (!e->deleted) { + redisAsyncHandleWrite(ac); + handled |= REDIS_POLL_HANDLED_WRITE; + } + } + } + + /* perform timeouts */ + if (!e->deleted && e->deadline != 0.0) { + double now = redisPollGetNow(); + if (now >= e->deadline) { + /* deadline has passed. disable timeout and perform callback */ + e->deadline = 0.0; + redisAsyncHandleTimeout(ac); + handled |= REDIS_POLL_HANDLED_TIMEOUT; + } + } + + /* do a delayed cleanup if required */ + if (e->deleted) + hi_free(e); + else + e->in_tick = 0; + + return handled; +} + +static void redisPollAddRead(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + e->reading = 1; +} + +static void redisPollDelRead(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + e->reading = 0; +} + +static void redisPollAddWrite(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + e->writing = 1; +} + +static void redisPollDelWrite(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + e->writing = 0; +} + +static void redisPollCleanup(void *data) { + redisPollEvents *e = (redisPollEvents*)data; + + /* if we are currently processing a tick, postpone deletion */ + if (e->in_tick) + e->deleted = 1; + else + hi_free(e); +} + +static void redisPollScheduleTimer(void *data, struct timeval tv) +{ + redisPollEvents *e = (redisPollEvents*)data; + double now = redisPollGetNow(); + e->deadline = now + redisPollTimevalToDouble(&tv); +} + +static int redisPollAttach(redisAsyncContext *ac) { + redisContext *c = &(ac->c); + redisPollEvents *e; + + /* Nothing should be attached when something is already attached */ + if (ac->ev.data != NULL) + return REDIS_ERR; + + /* Create container for context and r/w events */ + e = (redisPollEvents*)hi_malloc(sizeof(*e)); + if (e == NULL) + return REDIS_ERR; + memset(e, 0, sizeof(*e)); + + e->context = ac; + e->fd = c->fd; + e->reading = e->writing = 0; + e->in_tick = e->deleted = 0; + e->deadline = 0.0; + + /* Register functions to start/stop listening for events */ + ac->ev.addRead = redisPollAddRead; + ac->ev.delRead = redisPollDelRead; + ac->ev.addWrite = redisPollAddWrite; + ac->ev.delWrite = redisPollDelWrite; + ac->ev.scheduleTimer = redisPollScheduleTimer; + ac->ev.cleanup = redisPollCleanup; + ac->ev.data = e; + + return REDIS_OK; +} +#endif /* HIREDIS_POLL_H */ @@ -68,6 +68,10 @@ void *hi_malloc(size_t size) { } void *hi_calloc(size_t nmemb, size_t size) { + /* Overflow check as the user can specify any arbitrary allocator */ + if (SIZE_MAX / size < nmemb) + return NULL; + return hiredisAllocFns.callocFn(nmemb, size); } @@ -32,6 +32,7 @@ #define HIREDIS_ALLOC_H #include <stddef.h> /* for size_t */ +#include <stdint.h> #ifdef __cplusplus extern "C" { @@ -59,6 +60,10 @@ static inline void *hi_malloc(size_t size) { } static inline void *hi_calloc(size_t nmemb, size_t size) { + /* Overflow check as the user can specify any arbitrary allocator */ + if (SIZE_MAX / size < nmemb) + return NULL; + return hiredisAllocFns.callocFn(nmemb, size); } @@ -140,14 +140,16 @@ static redisAsyncContext *redisAsyncInitialize(redisContext *c) { ac->ev.scheduleTimer = NULL; ac->onConnect = NULL; + ac->onConnectNC = NULL; ac->onDisconnect = NULL; ac->replies.head = NULL; ac->replies.tail = NULL; - ac->sub.invalid.head = NULL; - ac->sub.invalid.tail = NULL; + ac->sub.replies.head = NULL; + ac->sub.replies.tail = NULL; ac->sub.channels = channels; ac->sub.patterns = patterns; + ac->sub.pending_unsubs = 0; return ac; oom: @@ -225,17 +227,34 @@ redisAsyncContext *redisAsyncConnectUnix(const char *path) { return redisAsyncConnectWithOptions(&options); } -int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) { - if (ac->onConnect == NULL) { - ac->onConnect = fn; +static int +redisAsyncSetConnectCallbackImpl(redisAsyncContext *ac, redisConnectCallback *fn, + redisConnectCallbackNC *fn_nc) +{ + /* If either are already set, this is an error */ + if (ac->onConnect || ac->onConnectNC) + return REDIS_ERR; - /* The common way to detect an established connection is to wait for - * the first write event to be fired. This assumes the related event - * library functions are already set. */ - _EL_ADD_WRITE(ac); - return REDIS_OK; + if (fn) { + ac->onConnect = fn; + } else if (fn_nc) { + ac->onConnectNC = fn_nc; } - return REDIS_ERR; + + /* The common way to detect an established connection is to wait for + * the first write event to be fired. This assumes the related event + * library functions are already set. */ + _EL_ADD_WRITE(ac); + + return REDIS_OK; +} + +int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn) { + return redisAsyncSetConnectCallbackImpl(ac, fn, NULL); +} + +int redisAsyncSetConnectCallbackNC(redisAsyncContext *ac, redisConnectCallbackNC *fn) { + return redisAsyncSetConnectCallbackImpl(ac, NULL, fn); } int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn) { @@ -302,40 +321,69 @@ static void __redisRunPushCallback(redisAsyncContext *ac, redisReply *reply) { } } +static void __redisRunConnectCallback(redisAsyncContext *ac, int status) +{ + if (ac->onConnect == NULL && ac->onConnectNC == NULL) + return; + + if (!(ac->c.flags & REDIS_IN_CALLBACK)) { + ac->c.flags |= REDIS_IN_CALLBACK; + if (ac->onConnect) { + ac->onConnect(ac, status); + } else { + ac->onConnectNC(ac, status); + } + ac->c.flags &= ~REDIS_IN_CALLBACK; + } else { + /* already in callback */ + if (ac->onConnect) { + ac->onConnect(ac, status); + } else { + ac->onConnectNC(ac, status); + } + } +} + +static void __redisRunDisconnectCallback(redisAsyncContext *ac, int status) +{ + if (ac->onDisconnect) { + if (!(ac->c.flags & REDIS_IN_CALLBACK)) { + ac->c.flags |= REDIS_IN_CALLBACK; + ac->onDisconnect(ac, status); + ac->c.flags &= ~REDIS_IN_CALLBACK; + } else { + /* already in callback */ + ac->onDisconnect(ac, status); + } + } +} + /* Helper function to free the context. */ static void __redisAsyncFree(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb; - dictIterator *it; + dictIterator it; dictEntry *de; /* Execute pending callbacks with NULL reply. */ while (__redisShiftCallback(&ac->replies,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL); - - /* Execute callbacks for invalid commands */ - while (__redisShiftCallback(&ac->sub.invalid,&cb) == REDIS_OK) + while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) __redisRunCallback(ac,&cb,NULL); /* Run subscription callbacks with NULL reply */ if (ac->sub.channels) { - it = dictGetIterator(ac->sub.channels); - if (it != NULL) { - while ((de = dictNext(it)) != NULL) - __redisRunCallback(ac,dictGetEntryVal(de),NULL); - dictReleaseIterator(it); - } + dictInitIterator(&it,ac->sub.channels); + while ((de = dictNext(&it)) != NULL) + __redisRunCallback(ac,dictGetEntryVal(de),NULL); dictRelease(ac->sub.channels); } if (ac->sub.patterns) { - it = dictGetIterator(ac->sub.patterns); - if (it != NULL) { - while ((de = dictNext(it)) != NULL) - __redisRunCallback(ac,dictGetEntryVal(de),NULL); - dictReleaseIterator(it); - } + dictInitIterator(&it,ac->sub.patterns); + while ((de = dictNext(&it)) != NULL) + __redisRunCallback(ac,dictGetEntryVal(de),NULL); dictRelease(ac->sub.patterns); } @@ -345,12 +393,11 @@ static void __redisAsyncFree(redisAsyncContext *ac) { /* Execute disconnect callback. When redisAsyncFree() initiated destroying * this context, the status will always be REDIS_OK. */ - if (ac->onDisconnect && (c->flags & REDIS_CONNECTED)) { - if (c->flags & REDIS_FREEING) { - ac->onDisconnect(ac,REDIS_OK); - } else { - ac->onDisconnect(ac,(ac->err == 0) ? REDIS_OK : REDIS_ERR); - } + if (c->flags & REDIS_CONNECTED) { + int status = ac->err == 0 ? REDIS_OK : REDIS_ERR; + if (c->flags & REDIS_FREEING) + status = REDIS_OK; + __redisRunDisconnectCallback(ac, status); } if (ac->dataCleanup) { @@ -419,16 +466,17 @@ void redisAsyncDisconnect(redisAsyncContext *ac) { static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, redisCallback *dstcb) { redisContext *c = &(ac->c); dict *callbacks; - redisCallback *cb; + redisCallback *cb = NULL; dictEntry *de; int pvariant; char *stype; - sds sname; + sds sname = NULL; - /* Custom reply functions are not supported for pub/sub. This will fail - * very hard when they are used... */ - if (reply->type == REDIS_REPLY_ARRAY || reply->type == REDIS_REPLY_PUSH) { - assert(reply->elements >= 2); + /* Match reply with the expected format of a pushed message. + * The type and number of elements (3 to 4) are specified at: + * https://redis.io/topics/pubsub#format-of-pushed-messages */ + if ((reply->type == REDIS_REPLY_ARRAY && !(c->flags & REDIS_SUPPORTS_PUSH) && reply->elements >= 3) || + reply->type == REDIS_REPLY_PUSH) { assert(reply->element[0]->type == REDIS_REPLY_STRING); stype = reply->element[0]->str; pvariant = (tolower(stype[0]) == 'p') ? 1 : 0; @@ -439,46 +487,55 @@ static int __redisGetSubscribeCallback(redisAsyncContext *ac, redisReply *reply, callbacks = ac->sub.channels; /* Locate the right callback */ - assert(reply->element[1]->type == REDIS_REPLY_STRING); - sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); - if (sname == NULL) - goto oom; - - de = dictFind(callbacks,sname); - if (de != NULL) { - cb = dictGetEntryVal(de); + if (reply->element[1]->type == REDIS_REPLY_STRING) { + sname = sdsnewlen(reply->element[1]->str,reply->element[1]->len); + if (sname == NULL) goto oom; - /* If this is an subscribe reply decrease pending counter. */ - if (strcasecmp(stype+pvariant,"subscribe") == 0) { - cb->pending_subs -= 1; + if ((de = dictFind(callbacks,sname)) != NULL) { + cb = dictGetEntryVal(de); + memcpy(dstcb,cb,sizeof(*dstcb)); } + } - memcpy(dstcb,cb,sizeof(*dstcb)); - - /* If this is an unsubscribe message, remove it. */ - if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { - if (cb->pending_subs == 0) - dictDelete(callbacks,sname); - - /* If this was the last unsubscribe message, revert to - * non-subscribe mode. */ - assert(reply->element[2]->type == REDIS_REPLY_INTEGER); - - /* Unset subscribed flag only when no pipelined pending subscribe. */ - if (reply->element[2]->integer == 0 - && dictSize(ac->sub.channels) == 0 - && dictSize(ac->sub.patterns) == 0) - c->flags &= ~REDIS_SUBSCRIBED; + /* If this is an subscribe reply decrease pending counter. */ + if (strcasecmp(stype+pvariant,"subscribe") == 0) { + assert(cb != NULL); + cb->pending_subs -= 1; + + } else if (strcasecmp(stype+pvariant,"unsubscribe") == 0) { + if (cb == NULL) + ac->sub.pending_unsubs -= 1; + else if (cb->pending_subs == 0) + dictDelete(callbacks,sname); + + /* If this was the last unsubscribe message, revert to + * non-subscribe mode. */ + assert(reply->element[2]->type == REDIS_REPLY_INTEGER); + + /* Unset subscribed flag only when no pipelined pending subscribe + * or pending unsubscribe replies. */ + if (reply->element[2]->integer == 0 + && dictSize(ac->sub.channels) == 0 + && dictSize(ac->sub.patterns) == 0 + && ac->sub.pending_unsubs == 0) { + c->flags &= ~REDIS_SUBSCRIBED; + + /* Move ongoing regular command callbacks. */ + redisCallback cb; + while (__redisShiftCallback(&ac->sub.replies,&cb) == REDIS_OK) { + __redisPushCallback(&ac->replies,&cb); + } } } sdsfree(sname); } else { - /* Shift callback for invalid commands. */ - __redisShiftCallback(&ac->sub.invalid,dstcb); + /* Shift callback for pending command in subscribed context. */ + __redisShiftCallback(&ac->sub.replies,dstcb); } return REDIS_OK; oom: __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory"); + __redisAsyncCopyError(ac); return REDIS_ERR; } @@ -502,13 +559,12 @@ static int redisIsSubscribeReply(redisReply *reply) { len = reply->element[0]->len - off; return !strncasecmp(str, "subscribe", len) || - !strncasecmp(str, "message", len); - + !strncasecmp(str, "message", len) || + !strncasecmp(str, "unsubscribe", len); } void redisProcessCallbacks(redisAsyncContext *ac) { redisContext *c = &(ac->c); - redisCallback cb = {NULL, NULL, 0, NULL}; void *reply = NULL; int status; @@ -521,17 +577,14 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); return; } - - /* If monitor mode, repush callback */ - if(c->flags & REDIS_MONITORING) { - __redisPushCallback(&ac->replies,&cb); - } - /* When the connection is not being disconnected, simply stop * trying to get replies and wait for the next loop tick. */ break; } + /* Keep track of push message support for subscribe handling */ + if (redisIsPushReply(reply)) c->flags |= REDIS_SUPPORTS_PUSH; + /* Send any non-subscribe related PUSH messages to our PUSH handler * while allowing subscribe related PUSH messages to pass through. * This allows existing code to be backward compatible and work in @@ -544,6 +597,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) { /* Even if the context is subscribed, pending regular * callbacks will get a reply before pub/sub messages arrive. */ + redisCallback cb = {NULL, NULL, 0, 0, NULL}; if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) { /* * A spontaneous reply in a not-subscribed context can be the error @@ -567,15 +621,17 @@ void redisProcessCallbacks(redisAsyncContext *ac) { __redisAsyncDisconnect(ac); return; } - /* No more regular callbacks and no errors, the context *must* be subscribed or monitoring. */ - assert((c->flags & REDIS_SUBSCRIBED || c->flags & REDIS_MONITORING)); - if(c->flags & REDIS_SUBSCRIBED) + /* No more regular callbacks and no errors, the context *must* be subscribed. */ + assert(c->flags & REDIS_SUBSCRIBED); + if (c->flags & REDIS_SUBSCRIBED) __redisGetSubscribeCallback(ac,reply,&cb); } if (cb.fn != NULL) { __redisRunCallback(ac,&cb,reply); - c->reader->fn->freeObject(reply); + if (!(c->flags & REDIS_NO_AUTO_FREE_REPLIES)){ + c->reader->fn->freeObject(reply); + } /* Proceed with free'ing when redisAsyncFree() was called. */ if (c->flags & REDIS_FREEING) { @@ -589,6 +645,11 @@ void redisProcessCallbacks(redisAsyncContext *ac) { * doesn't know what the server will spit out over the wire. */ c->reader->fn->freeObject(reply); } + + /* If in monitor mode, repush the callback */ + if (c->flags & REDIS_MONITORING) { + __redisPushCallback(&ac->replies,&cb); + } } /* Disconnect when there was an error reading the reply */ @@ -597,7 +658,7 @@ void redisProcessCallbacks(redisAsyncContext *ac) { } static void __redisAsyncHandleConnectFailure(redisAsyncContext *ac) { - if (ac->onConnect) ac->onConnect(ac, REDIS_ERR); + __redisRunConnectCallback(ac, REDIS_ERR); __redisAsyncDisconnect(ac); } @@ -610,7 +671,8 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) { if (redisCheckConnectDone(c, &completed) == REDIS_ERR) { /* Error! */ - redisCheckSocketError(c); + if (redisCheckSocketError(c) == REDIS_ERR) + __redisAsyncCopyError(ac); __redisAsyncHandleConnectFailure(ac); return REDIS_ERR; } else if (completed == 1) { @@ -621,8 +683,19 @@ static int __redisAsyncHandleConnect(redisAsyncContext *ac) { return REDIS_ERR; } - if (ac->onConnect) ac->onConnect(ac, REDIS_OK); + /* flag us as fully connect, but allow the callback + * to disconnect. For that reason, permit the function + * to delete the context here after callback return. + */ c->flags |= REDIS_CONNECTED; + __redisRunConnectCallback(ac, REDIS_OK); + if ((ac->c.flags & REDIS_DISCONNECTING)) { + redisAsyncDisconnect(ac); + return REDIS_ERR; + } else if ((ac->c.flags & REDIS_FREEING)) { + redisAsyncFree(ac); + return REDIS_ERR; + } return REDIS_OK; } else { return REDIS_OK; @@ -646,6 +719,8 @@ void redisAsyncRead(redisAsyncContext *ac) { */ void redisAsyncHandleRead(redisAsyncContext *ac) { redisContext *c = &(ac->c); + /* must not be called from a callback */ + assert(!(c->flags & REDIS_IN_CALLBACK)); if (!(c->flags & REDIS_CONNECTED)) { /* Abort connect was not successful. */ @@ -679,6 +754,8 @@ void redisAsyncWrite(redisAsyncContext *ac) { void redisAsyncHandleWrite(redisAsyncContext *ac) { redisContext *c = &(ac->c); + /* must not be called from a callback */ + assert(!(c->flags & REDIS_IN_CALLBACK)); if (!(c->flags & REDIS_CONNECTED)) { /* Abort connect was not successful. */ @@ -695,10 +772,20 @@ void redisAsyncHandleWrite(redisAsyncContext *ac) { void redisAsyncHandleTimeout(redisAsyncContext *ac) { redisContext *c = &(ac->c); redisCallback cb; + /* must not be called from a callback */ + assert(!(c->flags & REDIS_IN_CALLBACK)); - if ((c->flags & REDIS_CONNECTED) && ac->replies.head == NULL) { - /* Nothing to do - just an idle timeout */ - return; + if ((c->flags & REDIS_CONNECTED)) { + if (ac->replies.head == NULL && ac->sub.replies.head == NULL) { + /* Nothing to do - just an idle timeout */ + return; + } + + if (!ac->c.command_timeout || + (!ac->c.command_timeout->tv_sec && !ac->c.command_timeout->tv_usec)) { + /* A belated connect timeout arriving, ignore */ + return; + } } if (!c->err) { @@ -706,8 +793,8 @@ void redisAsyncHandleTimeout(redisAsyncContext *ac) { __redisAsyncCopyError(ac); } - if (!(c->flags & REDIS_CONNECTED) && ac->onConnect) { - ac->onConnect(ac, REDIS_ERR); + if (!(c->flags & REDIS_CONNECTED)) { + __redisRunConnectCallback(ac, REDIS_ERR); } while (__redisShiftCallback(&ac->replies, &cb) == REDIS_OK) { @@ -744,6 +831,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void redisContext *c = &(ac->c); redisCallback cb; struct dict *cbdict; + dictIterator it; dictEntry *de; redisCallback *existcb; int pvariant, hasnext; @@ -760,6 +848,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void cb.fn = fn; cb.privdata = privdata; cb.pending_subs = 1; + cb.unsubscribe_sent = 0; /* Find out which command will be appended. */ p = nextArgument(cmd,&cstr,&clen); @@ -799,20 +888,67 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void * subscribed to one or more channels or patterns. */ if (!(c->flags & REDIS_SUBSCRIBED)) return REDIS_ERR; + if (pvariant) + cbdict = ac->sub.patterns; + else + cbdict = ac->sub.channels; + + if (hasnext) { + /* Send an unsubscribe with specific channels/patterns. + * Bookkeeping the number of expected replies */ + while ((p = nextArgument(p,&astr,&alen)) != NULL) { + sname = sdsnewlen(astr,alen); + if (sname == NULL) + goto oom; + + de = dictFind(cbdict,sname); + if (de != NULL) { + existcb = dictGetEntryVal(de); + if (existcb->unsubscribe_sent == 0) + existcb->unsubscribe_sent = 1; + else + /* Already sent, reply to be ignored */ + ac->sub.pending_unsubs += 1; + } else { + /* Not subscribed to, reply to be ignored */ + ac->sub.pending_unsubs += 1; + } + sdsfree(sname); + } + } else { + /* Send an unsubscribe without specific channels/patterns. + * Bookkeeping the number of expected replies */ + int no_subs = 1; + dictInitIterator(&it,cbdict); + while ((de = dictNext(&it)) != NULL) { + existcb = dictGetEntryVal(de); + if (existcb->unsubscribe_sent == 0) { + existcb->unsubscribe_sent = 1; + no_subs = 0; + } + } + /* Unsubscribing to all channels/patterns, where none is + * subscribed to, results in a single reply to be ignored. */ + if (no_subs == 1) + ac->sub.pending_unsubs += 1; + } + /* (P)UNSUBSCRIBE does not have its own response: every channel or * pattern that is unsubscribed will receive a message. This means we * should not append a callback function for this command. */ - } else if(strncasecmp(cstr,"monitor\r\n",9) == 0) { - /* Set monitor flag and push callback */ - c->flags |= REDIS_MONITORING; - __redisPushCallback(&ac->replies,&cb); + } else if (strncasecmp(cstr,"monitor\r\n",9) == 0) { + /* Set monitor flag and push callback */ + c->flags |= REDIS_MONITORING; + if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) + goto oom; } else { - if (c->flags & REDIS_SUBSCRIBED) - /* This will likely result in an error reply, but it needs to be - * received and passed to the callback. */ - __redisPushCallback(&ac->sub.invalid,&cb); - else - __redisPushCallback(&ac->replies,&cb); + if (c->flags & REDIS_SUBSCRIBED) { + if (__redisPushCallback(&ac->sub.replies,&cb) != REDIS_OK) + goto oom; + } else { + if (__redisPushCallback(&ac->replies,&cb) != REDIS_OK) + goto oom; + } } __redisAppendCommand(c,cmd,len); @@ -823,6 +959,7 @@ static int __redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void return REDIS_OK; oom: __redisSetError(&(ac->c), REDIS_ERR_OOM, "Out of memory"); + __redisAsyncCopyError(ac); return REDIS_ERR; } @@ -852,7 +989,7 @@ int redisAsyncCommand(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata int redisAsyncCommandArgv(redisAsyncContext *ac, redisCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen) { sds cmd; - int len; + long long len; int status; len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen); if (len < 0) @@ -46,6 +46,7 @@ typedef struct redisCallback { struct redisCallback *next; /* simple singly linked list */ redisCallbackFn *fn; int pending_subs; + int unsubscribe_sent; void *privdata; } redisCallback; @@ -57,6 +58,7 @@ typedef struct redisCallbackList { /* Connection callback prototypes */ typedef void (redisDisconnectCallback)(const struct redisAsyncContext*, int status); typedef void (redisConnectCallback)(const struct redisAsyncContext*, int status); +typedef void (redisConnectCallbackNC)(struct redisAsyncContext *, int status); typedef void(redisTimerCallback)(void *timer, void *privdata); /* Context for an async connection to Redis */ @@ -92,6 +94,7 @@ typedef struct redisAsyncContext { /* Called when the first write event was received. */ redisConnectCallback *onConnect; + redisConnectCallbackNC *onConnectNC; /* Regular command callbacks */ redisCallbackList replies; @@ -102,9 +105,10 @@ typedef struct redisAsyncContext { /* Subscription callbacks */ struct { - redisCallbackList invalid; + redisCallbackList replies; struct dict *channels; struct dict *patterns; + int pending_unsubs; } sub; /* Any configured RESP3 PUSH handler */ @@ -119,6 +123,7 @@ redisAsyncContext *redisAsyncConnectBindWithReuse(const char *ip, int port, const char *source_addr); redisAsyncContext *redisAsyncConnectUnix(const char *path); int redisAsyncSetConnectCallback(redisAsyncContext *ac, redisConnectCallback *fn); +int redisAsyncSetConnectCallbackNC(redisAsyncContext *ac, redisConnectCallbackNC *fn); int redisAsyncSetDisconnectCallback(redisAsyncContext *ac, redisDisconnectCallback *fn); redisAsyncPushFn *redisAsyncSetPushCallback(redisAsyncContext *ac, redisAsyncPushFn *fn); diff --git a/async_private.h b/async_private.h index b9d23ff..ea0558d 100644 --- a/async_private.h +++ b/async_private.h @@ -51,7 +51,7 @@ #define _EL_CLEANUP(ctx) do { \ if ((ctx)->ev.cleanup) (ctx)->ev.cleanup((ctx)->ev.data); \ ctx->ev.cleanup = NULL; \ - } while(0); + } while(0) static inline void refreshTimeout(redisAsyncContext *ctx) { #define REDIS_TIMER_ISSET(tvp) \ @@ -267,16 +267,11 @@ static dictEntry *dictFind(dict *ht, const void *key) { return NULL; } -static dictIterator *dictGetIterator(dict *ht) { - dictIterator *iter = hi_malloc(sizeof(*iter)); - if (iter == NULL) - return NULL; - +static void dictInitIterator(dictIterator *iter, dict *ht) { iter->ht = ht; iter->index = -1; iter->entry = NULL; iter->nextEntry = NULL; - return iter; } static dictEntry *dictNext(dictIterator *iter) { @@ -299,10 +294,6 @@ static dictEntry *dictNext(dictIterator *iter) { return NULL; } -static void dictReleaseIterator(dictIterator *iter) { - hi_free(iter); -} - /* ------------------------- private functions ------------------------------ */ /* Expand the hash table if needed */ @@ -119,8 +119,7 @@ static int dictReplace(dict *ht, void *key, void *val); static int dictDelete(dict *ht, const void *key); static void dictRelease(dict *ht); static dictEntry * dictFind(dict *ht, const void *key); -static dictIterator *dictGetIterator(dict *ht); +static void dictInitIterator(dictIterator *iter, dict *ht); static dictEntry *dictNext(dictIterator *iter); -static void dictReleaseIterator(dictIterator *iter); #endif /* __DICT_H */ diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 1d5bc56..49cd8d4 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -21,7 +21,7 @@ ENDIF() FIND_PATH(LIBEVENT event.h) if (LIBEVENT) - ADD_EXECUTABLE(example-libevent example-libevent) + ADD_EXECUTABLE(example-libevent example-libevent.c) TARGET_LINK_LIBRARIES(example-libevent hiredis event) ENDIF() diff --git a/examples/example-libevent-ssl.c b/examples/example-libevent-ssl.c index 7d99af1..d0998ba 100644 --- a/examples/example-libevent-ssl.c +++ b/examples/example-libevent-ssl.c @@ -56,7 +56,7 @@ int main (int argc, char **argv) { const char *caCert = argc > 5 ? argv[6] : NULL; redisSSLContext *ssl; - redisSSLContextError ssl_error; + redisSSLContextError ssl_error = REDIS_SSL_CTX_NONE; redisInitOpenSSL(); diff --git a/examples/example-libuv.c b/examples/example-libuv.c index cbde452..53fd04a 100644 --- a/examples/example-libuv.c +++ b/examples/example-libuv.c @@ -7,18 +7,33 @@ #include <async.h> #include <adapters/libuv.h> +void debugCallback(redisAsyncContext *c, void *r, void *privdata) { + (void)privdata; //unused + redisReply *reply = r; + if (reply == NULL) { + /* The DEBUG SLEEP command will almost always fail, because we have set a 1 second timeout */ + printf("`DEBUG SLEEP` error: %s\n", c->errstr ? c->errstr : "unknown error"); + return; + } + /* Disconnect after receiving the reply of DEBUG SLEEP (which will not)*/ + redisAsyncDisconnect(c); +} + void getCallback(redisAsyncContext *c, void *r, void *privdata) { redisReply *reply = r; - if (reply == NULL) return; - printf("argv[%s]: %s\n", (char*)privdata, reply->str); + if (reply == NULL) { + printf("`GET key` error: %s\n", c->errstr ? c->errstr : "unknown error"); + return; + } + printf("`GET key` result: argv[%s]: %s\n", (char*)privdata, reply->str); - /* Disconnect after receiving the reply to GET */ - redisAsyncDisconnect(c); + /* start another request that demonstrate timeout */ + redisAsyncCommand(c, debugCallback, NULL, "DEBUG SLEEP %f", 1.5); } void connectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { - printf("Error: %s\n", c->errstr); + printf("connect error: %s\n", c->errstr); return; } printf("Connected...\n"); @@ -26,7 +41,7 @@ void connectCallback(const redisAsyncContext *c, int status) { void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { - printf("Error: %s\n", c->errstr); + printf("disconnect because of error: %s\n", c->errstr); return; } printf("Disconnected...\n"); @@ -49,8 +64,18 @@ int main (int argc, char **argv) { redisLibuvAttach(c,loop); redisAsyncSetConnectCallback(c,connectCallback); redisAsyncSetDisconnectCallback(c,disconnectCallback); + redisAsyncSetTimeout(c, (struct timeval){ .tv_sec = 1, .tv_usec = 0}); + + /* + In this demo, we first `set key`, then `get key` to demonstrate the basic usage of libuv adapter. + Then in `getCallback`, we start a `debug sleep` command to create 1.5 second long request. + Because we have set a 1 second timeout to the connection, the command will always fail with a + timeout error, which is shown in the `debugCallback`. + */ + redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); + uv_run(loop, UV_RUN_DEFAULT); return 0; } diff --git a/examples/example-poll.c b/examples/example-poll.c new file mode 100644 index 0000000..954673d --- /dev/null +++ b/examples/example-poll.c @@ -0,0 +1,62 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <signal.h> +#include <unistd.h> + +#include <async.h> +#include <adapters/poll.h> + +/* Put in the global scope, so that loop can be explicitly stopped */ +static int exit_loop = 0; + +void getCallback(redisAsyncContext *c, void *r, void *privdata) { + redisReply *reply = r; + if (reply == NULL) return; + printf("argv[%s]: %s\n", (char*)privdata, reply->str); + + /* Disconnect after receiving the reply to GET */ + redisAsyncDisconnect(c); +} + +void connectCallback(const redisAsyncContext *c, int status) { + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + exit_loop = 1; + return; + } + + printf("Connected...\n"); +} + +void disconnectCallback(const redisAsyncContext *c, int status) { + exit_loop = 1; + if (status != REDIS_OK) { + printf("Error: %s\n", c->errstr); + return; + } + + printf("Disconnected...\n"); +} + +int main (int argc, char **argv) { + signal(SIGPIPE, SIG_IGN); + + redisAsyncContext *c = redisAsyncConnect("127.0.0.1", 6379); + if (c->err) { + /* Let *c leak for now... */ + printf("Error: %s\n", c->errstr); + return 1; + } + + redisPollAttach(c); + redisAsyncSetConnectCallback(c,connectCallback); + redisAsyncSetDisconnectCallback(c,disconnectCallback); + redisAsyncCommand(c, NULL, NULL, "SET key %b", argv[argc-1], strlen(argv[argc-1])); + redisAsyncCommand(c, getCallback, (char*)"end-1", "GET key"); + while (!exit_loop) + { + redisPollTick(c, 0.1); + } + return 0; +} diff --git a/examples/example-ssl.c b/examples/example-ssl.c index b8ca442..6d1e32e 100644 --- a/examples/example-ssl.c +++ b/examples/example-ssl.c @@ -12,7 +12,7 @@ int main(int argc, char **argv) { unsigned int j; redisSSLContext *ssl; - redisSSLContextError ssl_error; + redisSSLContextError ssl_error = REDIS_SSL_CTX_NONE; redisContext *c; redisReply *reply; if (argc < 4) { @@ -27,9 +27,8 @@ int main(int argc, char **argv) { redisInitOpenSSL(); ssl = redisCreateSSLContext(ca, NULL, cert, key, NULL, &ssl_error); - if (!ssl) { - printf("SSL Context error: %s\n", - redisSSLContextGetError(ssl_error)); + if (!ssl || ssl_error != REDIS_SSL_CTX_NONE) { + printf("SSL Context error: %s\n", redisSSLContextGetError(ssl_error)); exit(1); } @@ -1,8 +1,10 @@ #ifndef __HIREDIS_FMACRO_H #define __HIREDIS_FMACRO_H +#ifndef _AIX #define _XOPEN_SOURCE 600 #define _POSIX_C_SOURCE 200112L +#endif #if defined(__APPLE__) && defined(__MACH__) /* Enable TCP_KEEPALIVE */ diff --git a/fuzzing/format_command_fuzzer.c b/fuzzing/format_command_fuzzer.c new file mode 100644 index 0000000..91adeac --- /dev/null +++ b/fuzzing/format_command_fuzzer.c @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2020, Salvatore Sanfilippo <antirez at gmail dot com> + * Copyright (c) 2020, Pieter Noordhuis <pcnoordhuis at gmail dot com> + * Copyright (c) 2020, Matt Stancliff <matt at genges dot com>, + * Jan-Erik Rediger <janerik at fnordig dot com> + * + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of Redis nor the names of its contributors may be used + * to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include <stdlib.h> +#include <string.h> +#include "hiredis.h" + +int LLVMFuzzerTestOneInput(const uint8_t *data, size_t size) { + char *new_str, *cmd; + + if (size < 3) + return 0; + + new_str = malloc(size+1); + if (new_str == NULL) + return 0; + + memcpy(new_str, data, size); + new_str[size] = '\0'; + + redisFormatCommand(&cmd, new_str); + + if (cmd != NULL) + hi_free(cmd); + free(new_str); + return 0; +} @@ -96,6 +96,8 @@ void freeReplyObject(void *reply) { switch(r->type) { case REDIS_REPLY_INTEGER: + case REDIS_REPLY_NIL: + case REDIS_REPLY_BOOL: break; /* Nothing to free */ case REDIS_REPLY_ARRAY: case REDIS_REPLY_MAP: @@ -112,6 +114,7 @@ void freeReplyObject(void *reply) { case REDIS_REPLY_STRING: case REDIS_REPLY_DOUBLE: case REDIS_REPLY_VERB: + case REDIS_REPLY_BIGNUM: hi_free(r->str); break; } @@ -129,7 +132,8 @@ static void *createStringObject(const redisReadTask *task, char *str, size_t len assert(task->type == REDIS_REPLY_ERROR || task->type == REDIS_REPLY_STATUS || task->type == REDIS_REPLY_STRING || - task->type == REDIS_REPLY_VERB); + task->type == REDIS_REPLY_VERB || + task->type == REDIS_REPLY_BIGNUM); /* Copy string value */ if (task->type == REDIS_REPLY_VERB) { @@ -235,12 +239,14 @@ static void *createDoubleObject(const redisReadTask *task, double value, char *s * decimal string conversion artifacts. */ memcpy(r->str, str, len); r->str[len] = '\0'; + r->len = len; if (task->parent) { parent = task->parent->obj; assert(parent->type == REDIS_REPLY_ARRAY || parent->type == REDIS_REPLY_MAP || - parent->type == REDIS_REPLY_SET); + parent->type == REDIS_REPLY_SET || + parent->type == REDIS_REPLY_PUSH); parent->element[task->idx] = r; } return r; @@ -277,7 +283,8 @@ static void *createBoolObject(const redisReadTask *task, int bval) { parent = task->parent->obj; assert(parent->type == REDIS_REPLY_ARRAY || parent->type == REDIS_REPLY_MAP || - parent->type == REDIS_REPLY_SET); + parent->type == REDIS_REPLY_SET || + parent->type == REDIS_REPLY_PUSH); parent->element[task->idx] = r; } return r; @@ -565,13 +572,12 @@ int redisFormatCommand(char **target, const char *format, ...) { * lengths. If the latter is set to NULL, strlen will be used to compute the * argument lengths. */ -int redisFormatSdsCommandArgv(sds *target, int argc, const char **argv, - const size_t *argvlen) +long long redisFormatSdsCommandArgv(sds *target, int argc, const char **argv, + const size_t *argvlen) { sds cmd, aux; - unsigned long long totlen; + unsigned long long totlen, len; int j; - size_t len; /* Abort on a NULL target */ if (target == NULL) @@ -602,7 +608,7 @@ int redisFormatSdsCommandArgv(sds *target, int argc, const char **argv, cmd = sdscatfmt(cmd, "*%i\r\n", argc); for (j=0; j < argc; j++) { len = argvlen ? argvlen[j] : strlen(argv[j]); - cmd = sdscatfmt(cmd, "$%u\r\n", len); + cmd = sdscatfmt(cmd, "$%U\r\n", len); cmd = sdscatlen(cmd, argv[j], len); cmd = sdscatlen(cmd, "\r\n", sizeof("\r\n")-1); } @@ -622,11 +628,11 @@ void redisFreeSdsCommand(sds cmd) { * lengths. If the latter is set to NULL, strlen will be used to compute the * argument lengths. */ -int redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen) { +long long redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen) { char *cmd = NULL; /* final command */ - int pos; /* position in final command */ - size_t len; - int totlen, j; + size_t pos; /* position in final command */ + size_t len, totlen; + int j; /* Abort on a NULL target */ if (target == NULL) @@ -797,6 +803,9 @@ redisContext *redisConnectWithOptions(const redisOptions *options) { if (options->options & REDIS_OPT_NOAUTOFREE) { c->flags |= REDIS_NO_AUTO_FREE; } + if (options->options & REDIS_OPT_NOAUTOFREEREPLIES) { + c->flags |= REDIS_NO_AUTO_FREE_REPLIES; + } /* Set any user supplied RESP3 PUSH handler or use freeReplyObject * as a default unless specifically flagged that we don't want one. */ @@ -825,7 +834,7 @@ redisContext *redisConnectWithOptions(const redisOptions *options) { c->fd = options->endpoint.fd; c->flags |= REDIS_CONNECTED; } else { - // Unknown type - FIXME - FREE + redisFree(c); return NULL; } @@ -987,17 +996,6 @@ oom: return REDIS_ERR; } -/* Internal helper function to try and get a reply from the reader, - * or set an error in the context otherwise. */ -int redisGetReplyFromReader(redisContext *c, void **reply) { - if (redisReaderGetReply(c->reader,reply) == REDIS_ERR) { - __redisSetError(c,c->reader->err,c->reader->errstr); - return REDIS_ERR; - } - - return REDIS_OK; -} - /* Internal helper that returns 1 if the reply was a RESP3 PUSH * message and we handled it with a user-provided callback. */ static int redisHandledPushReply(redisContext *c, void *reply) { @@ -1009,12 +1007,34 @@ static int redisHandledPushReply(redisContext *c, void *reply) { return 0; } +/* Get a reply from our reader or set an error in the context. */ +int redisGetReplyFromReader(redisContext *c, void **reply) { + if (redisReaderGetReply(c->reader, reply) == REDIS_ERR) { + __redisSetError(c,c->reader->err,c->reader->errstr); + return REDIS_ERR; + } + + return REDIS_OK; +} + +/* Internal helper to get the next reply from our reader while handling + * any PUSH messages we encounter along the way. This is separate from + * redisGetReplyFromReader so as to not change its behavior. */ +static int redisNextInBandReplyFromReader(redisContext *c, void **reply) { + do { + if (redisGetReplyFromReader(c, reply) == REDIS_ERR) + return REDIS_ERR; + } while (redisHandledPushReply(c, *reply)); + + return REDIS_OK; +} + int redisGetReply(redisContext *c, void **reply) { int wdone = 0; void *aux = NULL; /* Try to read pending replies */ - if (redisGetReplyFromReader(c,&aux) == REDIS_ERR) + if (redisNextInBandReplyFromReader(c,&aux) == REDIS_ERR) return REDIS_ERR; /* For the blocking context, flush output buffer and read reply */ @@ -1030,12 +1050,8 @@ int redisGetReply(redisContext *c, void **reply) { if (redisBufferRead(c) == REDIS_ERR) return REDIS_ERR; - /* We loop here in case the user has specified a RESP3 - * PUSH handler (e.g. for client tracking). */ - do { - if (redisGetReplyFromReader(c,&aux) == REDIS_ERR) - return REDIS_ERR; - } while (redisHandledPushReply(c, aux)); + if (redisNextInBandReplyFromReader(c,&aux) == REDIS_ERR) + return REDIS_ERR; } while (aux == NULL); } @@ -1112,7 +1128,7 @@ int redisAppendCommand(redisContext *c, const char *format, ...) { int redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) { sds cmd; - int len; + long long len; len = redisFormatSdsCommandArgv(&cmd,argc,argv,argvlen); if (len == -1) { @@ -47,8 +47,8 @@ typedef long long ssize_t; #define HIREDIS_MAJOR 1 #define HIREDIS_MINOR 0 -#define HIREDIS_PATCH 1 -#define HIREDIS_SONAME 1.0.1-dev +#define HIREDIS_PATCH 3 +#define HIREDIS_SONAME 1.0.3-dev /* Connection type can be blocking or non-blocking and is set in the * least significant bit of the flags field in redisContext. */ @@ -80,12 +80,18 @@ typedef long long ssize_t; /* Flag that is set when we should set SO_REUSEADDR before calling bind() */ #define REDIS_REUSEADDR 0x80 +/* Flag that is set when the async connection supports push replies. */ +#define REDIS_SUPPORTS_PUSH 0x100 + /** * Flag that indicates the user does not want the context to * be automatically freed upon error */ #define REDIS_NO_AUTO_FREE 0x200 +/* Flag that indicates the user does not want replies to be automatically freed */ +#define REDIS_NO_AUTO_FREE_REPLIES 0x400 + #define REDIS_KEEPALIVE_INTERVAL 15 /* seconds */ /* number of times we retry to connect in the case of EADDRNOTAVAIL and @@ -112,7 +118,8 @@ typedef struct redisReply { double dval; /* The double when type is REDIS_REPLY_DOUBLE */ size_t len; /* Length of string */ char *str; /* Used for REDIS_REPLY_ERROR, REDIS_REPLY_STRING - REDIS_REPLY_VERB, and REDIS_REPLY_DOUBLE (in additional to dval). */ + REDIS_REPLY_VERB, REDIS_REPLY_DOUBLE (in additional to dval), + and REDIS_REPLY_BIGNUM. */ char vtype[4]; /* Used for REDIS_REPLY_VERB, contains the null terminated 3 character content type, such as "txt". */ size_t elements; /* number of elements, for REDIS_REPLY_ARRAY */ @@ -127,8 +134,8 @@ void freeReplyObject(void *reply); /* Functions to format a command according to the protocol. */ int redisvFormatCommand(char **target, const char *format, va_list ap); int redisFormatCommand(char **target, const char *format, ...); -int redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen); -int redisFormatSdsCommandArgv(sds *target, int argc, const char ** argv, const size_t *argvlen); +long long redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen); +long long redisFormatSdsCommandArgv(sds *target, int argc, const char ** argv, const size_t *argvlen); void redisFreeCommand(char *cmd); void redisFreeSdsCommand(sds cmd); @@ -152,6 +159,11 @@ struct redisSsl; /* Don't automatically intercept and free RESP3 PUSH replies. */ #define REDIS_OPT_NO_PUSH_AUTOFREE 0x08 +/** + * Don't automatically free replies + */ +#define REDIS_OPT_NOAUTOFREEREPLIES 0x10 + /* In Unix systems a file descriptor is a regular signed int, with -1 * representing an invalid descriptor. In Windows it is a SOCKET * (32- or 64-bit unsigned integer depending on the architecture), where diff --git a/hiredis_ssl.h b/hiredis_ssl.h index e3d3e1c..26bc9e9 100644 --- a/hiredis_ssl.h +++ b/hiredis_ssl.h @@ -61,6 +61,27 @@ typedef enum { REDIS_SSL_CTX_OS_CERT_ADD_FAILED /* Failed to add CA certificates obtained from system to the SSL context */ } redisSSLContextError; +/* Constants that mirror OpenSSL's verify modes. By default, + * REDIS_SSL_VERIFY_PEER is used with redisCreateSSLContext(). + * Some Redis clients disable peer verification if there are no + * certificates specified. + */ +#define REDIS_SSL_VERIFY_NONE 0x00 +#define REDIS_SSL_VERIFY_PEER 0x01 +#define REDIS_SSL_VERIFY_FAIL_IF_NO_PEER_CERT 0x02 +#define REDIS_SSL_VERIFY_CLIENT_ONCE 0x04 +#define REDIS_SSL_VERIFY_POST_HANDSHAKE 0x08 + +/* Options to create an OpenSSL context. */ +typedef struct { + const char *cacert_filename; + const char *capath; + const char *cert_filename; + const char *private_key_filename; + const char *server_name; + int verify_mode; +} redisSSLOptions; + /** * Return the error message corresponding with the specified error code. */ @@ -102,6 +123,18 @@ redisSSLContext *redisCreateSSLContext(const char *cacert_filename, const char * const char *server_name, redisSSLContextError *error); /** + * Helper function to initialize an OpenSSL context that can be used + * to initiate SSL connections. This is a more extensible version of redisCreateSSLContext(). + * + * options contains a structure of SSL options to use. + * + * If error is non-null, it will be populated in case the context creation fails + * (returning a NULL). +*/ +redisSSLContext *redisCreateSSLContextWithOptions(redisSSLOptions *options, + redisSSLContextError *error); + +/** * Free a previously created OpenSSL context. */ void redisFreeSSLContext(redisSSLContext *redis_ssl_ctx); @@ -277,12 +277,28 @@ int redisCheckConnectDone(redisContext *c, int *completed) { *completed = 1; return REDIS_OK; } - switch (errno) { + int error = errno; + if (error == EINPROGRESS) { + /* must check error to see if connect failed. Get the socket error */ + int fail, so_error; + socklen_t optlen = sizeof(so_error); + fail = getsockopt(c->fd, SOL_SOCKET, SO_ERROR, &so_error, &optlen); + if (fail == 0) { + if (so_error == 0) { + /* Socket is connected! */ + *completed = 1; + return REDIS_OK; + } + /* connection error; */ + errno = so_error; + error = so_error; + } + } + switch (error) { case EISCONN: *completed = 1; return REDIS_OK; case EALREADY: - case EINPROGRESS: case EWOULDBLOCK: *completed = 0; return REDIS_OK; @@ -123,29 +123,28 @@ static char *readBytes(redisReader *r, unsigned int bytes) { /* Find pointer to \r\n. */ static char *seekNewline(char *s, size_t len) { - int pos = 0; - int _len = len-1; - - /* Position should be < len-1 because the character at "pos" should be - * followed by a \n. Note that strchr cannot be used because it doesn't - * allow to search a limited length and the buffer that is being searched - * might not have a trailing NULL character. */ - while (pos < _len) { - while(pos < _len && s[pos] != '\r') pos++; - if (pos==_len) { - /* Not found. */ - return NULL; - } else { - if (s[pos+1] == '\n') { - /* Found. */ - return s+pos; - } else { - /* Continue searching. */ - pos++; - } + char *ret; + + /* We cannot match with fewer than 2 bytes */ + if (len < 2) + return NULL; + + /* Search up to len - 1 characters */ + len--; + + /* Look for the \r */ + while ((ret = memchr(s, '\r', len)) != NULL) { + if (ret[1] == '\n') { + /* Found. */ + break; } + /* Continue searching. */ + ret++; + len -= ret - s; + s = ret; } - return NULL; + + return ret; } /* Convert a string into a long long. Returns REDIS_OK if the string could be @@ -274,64 +273,108 @@ static int processLineItem(redisReader *r) { if ((p = readLine(r,&len)) != NULL) { if (cur->type == REDIS_REPLY_INTEGER) { + long long v; + + if (string2ll(p, len, &v) == REDIS_ERR) { + __redisReaderSetError(r,REDIS_ERR_PROTOCOL, + "Bad integer value"); + return REDIS_ERR; + } + if (r->fn && r->fn->createInteger) { - long long v; - if (string2ll(p, len, &v) == REDIS_ERR) { - __redisReaderSetError(r,REDIS_ERR_PROTOCOL, - "Bad integer value"); - return REDIS_ERR; - } obj = r->fn->createInteger(cur,v); } else { obj = (void*)REDIS_REPLY_INTEGER; } } else if (cur->type == REDIS_REPLY_DOUBLE) { - if (r->fn && r->fn->createDouble) { - char buf[326], *eptr; - double d; + char buf[326], *eptr; + double d; - if ((size_t)len >= sizeof(buf)) { + if ((size_t)len >= sizeof(buf)) { + __redisReaderSetError(r,REDIS_ERR_PROTOCOL, + "Double value is too large"); + return REDIS_ERR; + } + + memcpy(buf,p,len); + buf[len] = '\0'; + + if (len == 3 && strcasecmp(buf,"inf") == 0) { + d = INFINITY; /* Positive infinite. */ + } else if (len == 4 && strcasecmp(buf,"-inf") == 0) { + d = -INFINITY; /* Negative infinite. */ + } else { + d = strtod((char*)buf,&eptr); + /* RESP3 only allows "inf", "-inf", and finite values, while + * strtod() allows other variations on infinity, NaN, + * etc. We explicity handle our two allowed infinite cases + * above, so strtod() should only result in finite values. */ + if (buf[0] == '\0' || eptr != &buf[len] || !isfinite(d)) { __redisReaderSetError(r,REDIS_ERR_PROTOCOL, - "Double value is too large"); + "Bad double value"); return REDIS_ERR; } + } - memcpy(buf,p,len); - buf[len] = '\0'; - - if (strcasecmp(buf,",inf") == 0) { - d = INFINITY; /* Positive infinite. */ - } else if (strcasecmp(buf,",-inf") == 0) { - d = -INFINITY; /* Negative infinite. */ - } else { - d = strtod((char*)buf,&eptr); - if (buf[0] == '\0' || eptr[0] != '\0' || isnan(d)) { - __redisReaderSetError(r,REDIS_ERR_PROTOCOL, - "Bad double value"); - return REDIS_ERR; - } - } + if (r->fn && r->fn->createDouble) { obj = r->fn->createDouble(cur,d,buf,len); } else { obj = (void*)REDIS_REPLY_DOUBLE; } } else if (cur->type == REDIS_REPLY_NIL) { + if (len != 0) { + __redisReaderSetError(r,REDIS_ERR_PROTOCOL, + "Bad nil value"); + return REDIS_ERR; + } + if (r->fn && r->fn->createNil) obj = r->fn->createNil(cur); else obj = (void*)REDIS_REPLY_NIL; } else if (cur->type == REDIS_REPLY_BOOL) { - int bval = p[0] == 't' || p[0] == 'T'; + int bval; + + if (len != 1 || !strchr("tTfF", p[0])) { + __redisReaderSetError(r,REDIS_ERR_PROTOCOL, + "Bad bool value"); + return REDIS_ERR; + } + + bval = p[0] == 't' || p[0] == 'T'; if (r->fn && r->fn->createBool) obj = r->fn->createBool(cur,bval); else obj = (void*)REDIS_REPLY_BOOL; + } else if (cur->type == REDIS_REPLY_BIGNUM) { + /* Ensure all characters are decimal digits (with possible leading + * minus sign). */ + for (int i = 0; i < len; i++) { + /* XXX Consider: Allow leading '+'? Error on leading '0's? */ + if (i == 0 && p[0] == '-') continue; + if (p[i] < '0' || p[i] > '9') { + __redisReaderSetError(r,REDIS_ERR_PROTOCOL, + "Bad bignum value"); + return REDIS_ERR; + } + } + if (r->fn && r->fn->createString) + obj = r->fn->createString(cur,p,len); + else + obj = (void*)REDIS_REPLY_BIGNUM; } else { /* Type will be error or status. */ + for (int i = 0; i < len; i++) { + if (p[i] == '\r' || p[i] == '\n') { + __redisReaderSetError(r,REDIS_ERR_PROTOCOL, + "Bad simple string value"); + return REDIS_ERR; + } + } if (r->fn && r->fn->createString) obj = r->fn->createString(cur,p,len); else - obj = (void*)(size_t)(cur->type); + obj = (void*)(uintptr_t)(cur->type); } if (obj == NULL) { @@ -396,7 +439,7 @@ static int processBulkItem(redisReader *r) { if (r->fn && r->fn->createString) obj = r->fn->createString(cur,s+2,len); else - obj = (void*)(long)cur->type; + obj = (void*)(uintptr_t)cur->type; success = 1; } } @@ -453,7 +496,6 @@ static int processAggregateItem(redisReader *r) { long long elements; int root = 0, len; - /* Set error for nested multi bulks with depth > 7 */ if (r->ridx == r->tasks - 1) { if (redisReaderGrow(r) == REDIS_ERR) return REDIS_ERR; @@ -494,7 +536,7 @@ static int processAggregateItem(redisReader *r) { if (r->fn && r->fn->createArray) obj = r->fn->createArray(cur,elements); else - obj = (void*)(long)cur->type; + obj = (void*)(uintptr_t)cur->type; if (obj == NULL) { __redisReaderSetErrorOOM(r); @@ -569,6 +611,9 @@ static int processItem(redisReader *r) { case '>': cur->type = REDIS_REPLY_PUSH; break; + case '(': + cur->type = REDIS_REPLY_BIGNUM; + break; default: __redisReaderSetErrorProtocolByte(r,*p); return REDIS_ERR; @@ -587,6 +632,7 @@ static int processItem(redisReader *r) { case REDIS_REPLY_DOUBLE: case REDIS_REPLY_NIL: case REDIS_REPLY_BOOL: + case REDIS_REPLY_BIGNUM: return processLineItem(r); case REDIS_REPLY_STRING: case REDIS_REPLY_VERB: @@ -90,6 +90,7 @@ sds sdsnewlen(const void *init, size_t initlen) { int hdrlen = sdsHdrSize(type); unsigned char *fp; /* flags pointer. */ + if (hdrlen+initlen+1 <= initlen) return NULL; /* Catch size_t overflow */ sh = s_malloc(hdrlen+initlen+1); if (sh == NULL) return NULL; if (!init) @@ -174,7 +175,7 @@ void sdsfree(sds s) { * the output will be "6" as the string was modified but the logical length * remains 6 bytes. */ void sdsupdatelen(sds s) { - int reallen = strlen(s); + size_t reallen = strlen(s); sdssetlen(s, reallen); } @@ -196,7 +197,7 @@ void sdsclear(sds s) { sds sdsMakeRoomFor(sds s, size_t addlen) { void *sh, *newsh; size_t avail = sdsavail(s); - size_t len, newlen; + size_t len, newlen, reqlen; char type, oldtype = s[-1] & SDS_TYPE_MASK; int hdrlen; @@ -205,7 +206,8 @@ sds sdsMakeRoomFor(sds s, size_t addlen) { len = sdslen(s); sh = (char*)s-sdsHdrSize(oldtype); - newlen = (len+addlen); + reqlen = newlen = (len+addlen); + if (newlen <= len) return NULL; /* Catch size_t overflow */ if (newlen < SDS_MAX_PREALLOC) newlen *= 2; else @@ -219,6 +221,7 @@ sds sdsMakeRoomFor(sds s, size_t addlen) { if (type == SDS_TYPE_5) type = SDS_TYPE_8; hdrlen = sdsHdrSize(type); + if (hdrlen+newlen+1 <= reqlen) return NULL; /* Catch size_t overflow */ if (oldtype==type) { newsh = s_realloc(sh, hdrlen+newlen+1); if (newsh == NULL) return NULL; @@ -580,7 +583,7 @@ sds sdscatprintf(sds s, const char *fmt, ...) { */ sds sdscatfmt(sds s, char const *fmt, ...) { const char *f = fmt; - int i; + long i; va_list ap; va_start(ap,fmt); @@ -755,14 +758,14 @@ int sdsrange(sds s, ssize_t start, ssize_t end) { /* Apply tolower() to every character of the sds string 's'. */ void sdstolower(sds s) { - int len = sdslen(s), j; + size_t len = sdslen(s), j; for (j = 0; j < len; j++) s[j] = tolower(s[j]); } /* Apply toupper() to every character of the sds string 's'. */ void sdstoupper(sds s) { - int len = sdslen(s), j; + size_t len = sdslen(s), j; for (j = 0; j < len; j++) s[j] = toupper(s[j]); } diff --git a/sockcompat.c b/sockcompat.c index f99d14b..31df325 100644 --- a/sockcompat.c +++ b/sockcompat.c @@ -180,10 +180,17 @@ int win32_connect(SOCKET sockfd, const struct sockaddr *addr, socklen_t addrlen) /* For Winsock connect(), the WSAEWOULDBLOCK error means the same thing as * EINPROGRESS for POSIX connect(), so we do that translation to keep POSIX - * logic consistent. */ - if (errno == EWOULDBLOCK) { + * logic consistent. + * Additionally, WSAALREADY is can be reported as WSAEINVAL to and this is + * translated to EIO. Convert appropriately + */ + int err = errno; + if (err == EWOULDBLOCK) { errno = EINPROGRESS; } + else if (err == EIO) { + errno = EALREADY; + } return ret != SOCKET_ERROR ? ret : -1; } @@ -205,6 +212,14 @@ int win32_getsockopt(SOCKET sockfd, int level, int optname, void *optval, sockle } else { ret = getsockopt(sockfd, level, optname, (char*)optval, optlen); } + if (ret != SOCKET_ERROR && level == SOL_SOCKET && optname == SO_ERROR) { + /* translate SO_ERROR codes, if non-zero */ + int err = *(int*)optval; + if (err != 0) { + err = _wsaErrorToErrno(err); + *(int*)optval = err; + } + } _updateErrno(ret != SOCKET_ERROR); return ret != SOCKET_ERROR ? ret : -1; } @@ -219,6 +219,25 @@ redisSSLContext *redisCreateSSLContext(const char *cacert_filename, const char * const char *cert_filename, const char *private_key_filename, const char *server_name, redisSSLContextError *error) { + redisSSLOptions options = { + .cacert_filename = cacert_filename, + .capath = capath, + .cert_filename = cert_filename, + .private_key_filename = private_key_filename, + .server_name = server_name, + .verify_mode = REDIS_SSL_VERIFY_PEER, + }; + + return redisCreateSSLContextWithOptions(&options, error); +} + +redisSSLContext *redisCreateSSLContextWithOptions(redisSSLOptions *options, redisSSLContextError *error) { + const char *cacert_filename = options->cacert_filename; + const char *capath = options->capath; + const char *cert_filename = options->cert_filename; + const char *private_key_filename = options->private_key_filename; + const char *server_name = options->server_name; + #ifdef _WIN32 HCERTSTORE win_store = NULL; PCCERT_CONTEXT win_ctx = NULL; @@ -235,7 +254,7 @@ redisSSLContext *redisCreateSSLContext(const char *cacert_filename, const char * } SSL_CTX_set_options(ctx->ssl_ctx, SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3); - SSL_CTX_set_verify(ctx->ssl_ctx, SSL_VERIFY_PEER, NULL); + SSL_CTX_set_verify(ctx->ssl_ctx, options->verify_mode, NULL); if ((cert_filename != NULL && private_key_filename == NULL) || (private_key_filename != NULL && cert_filename == NULL)) { @@ -351,7 +370,6 @@ static int redisSSLConnect(redisContext *c, SSL *ssl) { } hi_free(rssl); - SSL_free(ssl); return REDIS_ERR; } @@ -393,7 +411,11 @@ int redisInitiateSSLWithContext(redisContext *c, redisSSLContext *redis_ssl_ctx) } } - return redisSSLConnect(c, ssl); + if (redisSSLConnect(c, ssl) != REDIS_OK) { + goto error; + } + + return REDIS_OK; error: if (ssl) @@ -11,12 +11,18 @@ #include <signal.h> #include <errno.h> #include <limits.h> +#include <math.h> #include "hiredis.h" #include "async.h" +#include "adapters/poll.h" #ifdef HIREDIS_TEST_SSL #include "hiredis_ssl.h" #endif +#ifdef HIREDIS_TEST_ASYNC +#include "adapters/libevent.h" +#include <event2/event.h> +#endif #include "net.h" #include "win32.h" @@ -58,6 +64,8 @@ struct pushCounters { int str; }; +static int insecure_calloc_calls; + #ifdef HIREDIS_TEST_SSL redisSSLContext *_ssl_ctx = NULL; #endif @@ -498,6 +506,20 @@ static void test_reply_reader(void) { freeReplyObject(reply); redisReaderFree(reader); + test("Multi-bulk never overflows regardless of maxelements: "); + size_t bad_mbulk_len = (SIZE_MAX / sizeof(void *)) + 3; + char bad_mbulk_reply[100]; + snprintf(bad_mbulk_reply, sizeof(bad_mbulk_reply), "*%llu\r\n+asdf\r\n", + (unsigned long long) bad_mbulk_len); + + reader = redisReaderCreate(); + reader->maxelements = 0; /* Don't rely on default limit */ + redisReaderFeed(reader, bad_mbulk_reply, strlen(bad_mbulk_reply)); + ret = redisReaderGetReply(reader,&reply); + test_cond(ret == REDIS_ERR && strcasecmp(reader->errstr, "Out of memory") == 0); + freeReplyObject(reply); + redisReaderFree(reader); + #if LLONG_MAX > SIZE_MAX test("Set error when array > SIZE_MAX: "); reader = redisReaderCreate(); @@ -583,6 +605,147 @@ static void test_reply_reader(void) { ((redisReply*)reply)->element[1]->integer == 42); freeReplyObject(reply); redisReaderFree(reader); + + test("Can parse RESP3 doubles: "); + reader = redisReaderCreate(); + redisReaderFeed(reader, ",3.14159265358979323846\r\n",25); + ret = redisReaderGetReply(reader,&reply); + test_cond(ret == REDIS_OK && + ((redisReply*)reply)->type == REDIS_REPLY_DOUBLE && + fabs(((redisReply*)reply)->dval - 3.14159265358979323846) < 0.00000001 && + ((redisReply*)reply)->len == 22 && + strcmp(((redisReply*)reply)->str, "3.14159265358979323846") == 0); + freeReplyObject(reply); + redisReaderFree(reader); + + test("Set error on invalid RESP3 double: "); + reader = redisReaderCreate(); + redisReaderFeed(reader, ",3.14159\000265358979323846\r\n",26); + ret = redisReaderGetReply(reader,&reply); + test_cond(ret == REDIS_ERR && + strcasecmp(reader->errstr,"Bad double value") == 0); + freeReplyObject(reply); + redisReaderFree(reader); + + test("Correctly parses RESP3 double INFINITY: "); + reader = redisReaderCreate(); + redisReaderFeed(reader, ",inf\r\n",6); + ret = redisReaderGetReply(reader,&reply); + test_cond(ret == REDIS_OK && + ((redisReply*)reply)->type == REDIS_REPLY_DOUBLE && + isinf(((redisReply*)reply)->dval) && + ((redisReply*)reply)->dval > 0); + freeReplyObject(reply); + redisReaderFree(reader); + + test("Set error when RESP3 double is NaN: "); + reader = redisReaderCreate(); + redisReaderFeed(reader, ",nan\r\n",6); + ret = redisReaderGetReply(reader,&reply); + test_cond(ret == REDIS_ERR && + strcasecmp(reader->errstr,"Bad double value") == 0); + freeReplyObject(reply); + redisReaderFree(reader); + + test("Can parse RESP3 nil: "); + reader = redisReaderCreate(); + redisReaderFeed(reader, "_\r\n",3); + ret = redisReaderGetReply(reader,&reply); + test_cond(ret == REDIS_OK && + ((redisReply*)reply)->type == REDIS_REPLY_NIL); + freeReplyObject(reply); + redisReaderFree(reader); + + test("Set error on invalid RESP3 nil: "); + reader = redisReaderCreate(); + redisReaderFeed(reader, "_nil\r\n",6); + ret = redisReaderGetReply(reader,&reply); + test_cond(ret == REDIS_ERR && + strcasecmp(reader->errstr,"Bad nil value") == 0); + freeReplyObject(reply); + redisReaderFree(reader); + + test("Can parse RESP3 bool (true): "); + reader = redisReaderCreate(); + redisReaderFeed(reader, "#t\r\n",4); + ret = redisReaderGetReply(reader,&reply); + test_cond(ret == REDIS_OK && + ((redisReply*)reply)->type == REDIS_REPLY_BOOL && + ((redisReply*)reply)->integer); + freeReplyObject(reply); + redisReaderFree(reader); + + test("Can parse RESP3 bool (false): "); + reader = redisReaderCreate(); + redisReaderFeed(reader, "#f\r\n",4); + ret = redisReaderGetReply(reader,&reply); + test_cond(ret == REDIS_OK && + ((redisReply*)reply)->type == REDIS_REPLY_BOOL && + !((redisReply*)reply)->integer); + freeReplyObject(reply); + redisReaderFree(reader); + + test("Set error on invalid RESP3 bool: "); + reader = redisReaderCreate(); + redisReaderFeed(reader, "#foobar\r\n",9); + ret = redisReaderGetReply(reader,&reply); + test_cond(ret == REDIS_ERR && + strcasecmp(reader->errstr,"Bad bool value") == 0); + freeReplyObject(reply); + redisReaderFree(reader); + + test("Can parse RESP3 map: "); + reader = redisReaderCreate(); + redisReaderFeed(reader, "%2\r\n+first\r\n:123\r\n$6\r\nsecond\r\n#t\r\n",34); + ret = redisReaderGetReply(reader,&reply); + test_cond(ret == REDIS_OK && + ((redisReply*)reply)->type == REDIS_REPLY_MAP && + ((redisReply*)reply)->elements == 4 && + ((redisReply*)reply)->element[0]->type == REDIS_REPLY_STATUS && + ((redisReply*)reply)->element[0]->len == 5 && + !strcmp(((redisReply*)reply)->element[0]->str,"first") && + ((redisReply*)reply)->element[1]->type == REDIS_REPLY_INTEGER && + ((redisReply*)reply)->element[1]->integer == 123 && + ((redisReply*)reply)->element[2]->type == REDIS_REPLY_STRING && + ((redisReply*)reply)->element[2]->len == 6 && + !strcmp(((redisReply*)reply)->element[2]->str,"second") && + ((redisReply*)reply)->element[3]->type == REDIS_REPLY_BOOL && + ((redisReply*)reply)->element[3]->integer); + freeReplyObject(reply); + redisReaderFree(reader); + + test("Can parse RESP3 set: "); + reader = redisReaderCreate(); + redisReaderFeed(reader, "~5\r\n+orange\r\n$5\r\napple\r\n#f\r\n:100\r\n:999\r\n",40); + ret = redisReaderGetReply(reader,&reply); + test_cond(ret == REDIS_OK && + ((redisReply*)reply)->type == REDIS_REPLY_SET && + ((redisReply*)reply)->elements == 5 && + ((redisReply*)reply)->element[0]->type == REDIS_REPLY_STATUS && + ((redisReply*)reply)->element[0]->len == 6 && + !strcmp(((redisReply*)reply)->element[0]->str,"orange") && + ((redisReply*)reply)->element[1]->type == REDIS_REPLY_STRING && + ((redisReply*)reply)->element[1]->len == 5 && + !strcmp(((redisReply*)reply)->element[1]->str,"apple") && + ((redisReply*)reply)->element[2]->type == REDIS_REPLY_BOOL && + !((redisReply*)reply)->element[2]->integer && + ((redisReply*)reply)->element[3]->type == REDIS_REPLY_INTEGER && + ((redisReply*)reply)->element[3]->integer == 100 && + ((redisReply*)reply)->element[4]->type == REDIS_REPLY_INTEGER && + ((redisReply*)reply)->element[4]->integer == 999); + freeReplyObject(reply); + redisReaderFree(reader); + + test("Can parse RESP3 bignum: "); + reader = redisReaderCreate(); + redisReaderFeed(reader,"(3492890328409238509324850943850943825024385\r\n",46); + ret = redisReaderGetReply(reader,&reply); + test_cond(ret == REDIS_OK && + ((redisReply*)reply)->type == REDIS_REPLY_BIGNUM && + ((redisReply*)reply)->len == 43 && + !strcmp(((redisReply*)reply)->str,"3492890328409238509324850943850943825024385")); + freeReplyObject(reply); + redisReaderFree(reader); } static void test_free_null(void) { @@ -609,6 +772,13 @@ static void *hi_calloc_fail(size_t nmemb, size_t size) { return NULL; } +static void *hi_calloc_insecure(size_t nmemb, size_t size) { + (void)nmemb; + (void)size; + insecure_calloc_calls++; + return (void*)0xdeadc0de; +} + static void *hi_realloc_fail(void *ptr, size_t size) { (void)ptr; (void)size; @@ -616,6 +786,8 @@ static void *hi_realloc_fail(void *ptr, size_t size) { } static void test_allocator_injection(void) { + void *ptr; + hiredisAllocFuncs ha = { .mallocFn = hi_malloc_fail, .callocFn = hi_calloc_fail, @@ -635,6 +807,13 @@ static void test_allocator_injection(void) { redisReader *reader = redisReaderCreate(); test_cond(reader == NULL); + /* Make sure hiredis itself protects against a non-overflow checking calloc */ + test("hiredis calloc wrapper protects against overflow: "); + ha.callocFn = hi_calloc_insecure; + hiredisSetAllocators(&ha); + ptr = hi_calloc((SIZE_MAX / sizeof(void*)) + 3, sizeof(void*)); + test_cond(ptr == NULL && insecure_calloc_calls == 0); + // Return allocators to default hiredisResetAllocators(); } @@ -737,6 +916,11 @@ static void test_resp3_push_handler(redisContext *c) { old = redisSetPushCallback(c, push_handler); test("We can set a custom RESP3 PUSH handler: "); reply = redisCommand(c, "SET key:0 val:0"); + /* We need another command because depending on the version of Redis, the + * notification may be delivered after the command's reply. */ + test_cond(reply != NULL); + freeReplyObject(reply); + reply = redisCommand(c, "PING"); test_cond(reply != NULL && reply->type == REDIS_REPLY_STATUS && pc.str == 1); freeReplyObject(reply); @@ -752,6 +936,12 @@ static void test_resp3_push_handler(redisContext *c) { assert((reply = redisCommand(c, "GET key:0")) != NULL); freeReplyObject(reply); assert((reply = redisCommand(c, "SET key:0 invalid")) != NULL); + /* Depending on Redis version, we may receive either push notification or + * status reply. Both cases are valid. */ + if (reply->type == REDIS_REPLY_STATUS) { + freeReplyObject(reply); + reply = redisCommand(c, "PING"); + } test_cond(reply->type == REDIS_REPLY_PUSH); freeReplyObject(reply); @@ -1270,6 +1460,699 @@ static void test_throughput(struct config config) { // redisFree(c); // } +#ifdef HIREDIS_TEST_ASYNC +struct event_base *base; + +typedef struct TestState { + redisOptions *options; + int checkpoint; + int resp3; + int disconnect; +} TestState; + +/* Helper to disconnect and stop event loop */ +void async_disconnect(redisAsyncContext *ac) { + redisAsyncDisconnect(ac); + event_base_loopbreak(base); +} + +/* Testcase timeout, will trigger a failure */ +void timeout_cb(int fd, short event, void *arg) { + (void) fd; (void) event; (void) arg; + printf("Timeout in async testing!\n"); + exit(1); +} + +/* Unexpected call, will trigger a failure */ +void unexpected_cb(redisAsyncContext *ac, void *r, void *privdata) { + (void) ac; (void) r; + printf("Unexpected call: %s\n",(char*)privdata); + exit(1); +} + +/* Helper function to publish a message via own client. */ +void publish_msg(redisOptions *options, const char* channel, const char* msg) { + redisContext *c = redisConnectWithOptions(options); + assert(c != NULL); + redisReply *reply = redisCommand(c,"PUBLISH %s %s",channel,msg); + assert(reply->type == REDIS_REPLY_INTEGER && reply->integer == 1); + freeReplyObject(reply); + disconnect(c, 0); +} + +/* Expect a reply of type INTEGER */ +void integer_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + assert(reply != NULL && reply->type == REDIS_REPLY_INTEGER); + state->checkpoint++; + if (state->disconnect) async_disconnect(ac); +} + +/* Subscribe callback for test_pubsub_handling and test_pubsub_handling_resp3: + * - a published message triggers an unsubscribe + * - a command is sent before the unsubscribe response is received. */ +void subscribe_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + + assert(reply != NULL && + reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) && + reply->elements == 3); + + if (strcmp(reply->element[0]->str,"subscribe") == 0) { + assert(strcmp(reply->element[1]->str,"mychannel") == 0 && + reply->element[2]->str == NULL); + publish_msg(state->options,"mychannel","Hello!"); + } else if (strcmp(reply->element[0]->str,"message") == 0) { + assert(strcmp(reply->element[1]->str,"mychannel") == 0 && + strcmp(reply->element[2]->str,"Hello!") == 0); + state->checkpoint++; + + /* Unsubscribe after receiving the published message. Send unsubscribe + * which should call the callback registered during subscribe */ + redisAsyncCommand(ac,unexpected_cb, + (void*)"unsubscribe should call subscribe_cb()", + "unsubscribe"); + /* Send a regular command after unsubscribing, then disconnect */ + state->disconnect = 1; + redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo"); + + } else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) { + assert(strcmp(reply->element[1]->str,"mychannel") == 0 && + reply->element[2]->str == NULL); + } else { + printf("Unexpected pubsub command: %s\n", reply->element[0]->str); + exit(1); + } +} + +/* Expect a reply of type ARRAY */ +void array_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY); + state->checkpoint++; + if (state->disconnect) async_disconnect(ac); +} + +/* Expect a NULL reply */ +void null_cb(redisAsyncContext *ac, void *r, void *privdata) { + (void) ac; + assert(r == NULL); + TestState *state = privdata; + state->checkpoint++; +} + +static void test_pubsub_handling(struct config config) { + test("Subscribe, handle published message and unsubscribe: "); + /* Setup event dispatcher with a testcase timeout */ + base = event_base_new(); + struct event *timeout = evtimer_new(base, timeout_cb, NULL); + assert(timeout != NULL); + + evtimer_assign(timeout,base,timeout_cb,NULL); + struct timeval timeout_tv = {.tv_sec = 10}; + evtimer_add(timeout, &timeout_tv); + + /* Connect */ + redisOptions options = get_redis_tcp_options(config); + redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); + assert(ac != NULL && ac->err == 0); + redisLibeventAttach(ac,base); + + /* Start subscribe */ + TestState state = {.options = &options}; + redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel"); + + /* Make sure non-subscribe commands are handled */ + redisAsyncCommand(ac,array_cb,&state,"PING"); + + /* Start event dispatching loop */ + test_cond(event_base_dispatch(base) == 0); + event_free(timeout); + event_base_free(base); + + /* Verify test checkpoints */ + assert(state.checkpoint == 3); +} + +/* Unexpected push message, will trigger a failure */ +void unexpected_push_cb(redisAsyncContext *ac, void *r) { + (void) ac; (void) r; + printf("Unexpected call to the PUSH callback!\n"); + exit(1); +} + +static void test_pubsub_handling_resp3(struct config config) { + test("Subscribe, handle published message and unsubscribe using RESP3: "); + /* Setup event dispatcher with a testcase timeout */ + base = event_base_new(); + struct event *timeout = evtimer_new(base, timeout_cb, NULL); + assert(timeout != NULL); + + evtimer_assign(timeout,base,timeout_cb,NULL); + struct timeval timeout_tv = {.tv_sec = 10}; + evtimer_add(timeout, &timeout_tv); + + /* Connect */ + redisOptions options = get_redis_tcp_options(config); + redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); + assert(ac != NULL && ac->err == 0); + redisLibeventAttach(ac,base); + + /* Not expecting any push messages in this test */ + redisAsyncSetPushCallback(ac, unexpected_push_cb); + + /* Switch protocol */ + redisAsyncCommand(ac,NULL,NULL,"HELLO 3"); + + /* Start subscribe */ + TestState state = {.options = &options, .resp3 = 1}; + redisAsyncCommand(ac,subscribe_cb,&state,"subscribe mychannel"); + + /* Make sure non-subscribe commands are handled in RESP3 */ + redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo"); + redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo"); + redisAsyncCommand(ac,integer_cb,&state,"LPUSH mylist foo"); + /* Handle an array with 3 elements as a non-subscribe command */ + redisAsyncCommand(ac,array_cb,&state,"LRANGE mylist 0 2"); + + /* Start event dispatching loop */ + test_cond(event_base_dispatch(base) == 0); + event_free(timeout); + event_base_free(base); + + /* Verify test checkpoints */ + assert(state.checkpoint == 6); +} + +/* Subscribe callback for test_command_timeout_during_pubsub: + * - a subscribe response triggers a published message + * - the published message triggers a command that times out + * - the command timeout triggers a disconnect */ +void subscribe_with_timeout_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + + /* The non-clean disconnect should trigger the + * subscription callback with a NULL reply. */ + if (reply == NULL) { + state->checkpoint++; + event_base_loopbreak(base); + return; + } + + assert(reply->type == (state->resp3 ? REDIS_REPLY_PUSH : REDIS_REPLY_ARRAY) && + reply->elements == 3); + + if (strcmp(reply->element[0]->str,"subscribe") == 0) { + assert(strcmp(reply->element[1]->str,"mychannel") == 0 && + reply->element[2]->str == NULL); + publish_msg(state->options,"mychannel","Hello!"); + state->checkpoint++; + } else if (strcmp(reply->element[0]->str,"message") == 0) { + assert(strcmp(reply->element[1]->str,"mychannel") == 0 && + strcmp(reply->element[2]->str,"Hello!") == 0); + state->checkpoint++; + + /* Send a command that will trigger a timeout */ + redisAsyncCommand(ac,null_cb,state,"DEBUG SLEEP 3"); + redisAsyncCommand(ac,null_cb,state,"LPUSH mylist foo"); + } else { + printf("Unexpected pubsub command: %s\n", reply->element[0]->str); + exit(1); + } +} + +static void test_command_timeout_during_pubsub(struct config config) { + test("Command timeout during Pub/Sub: "); + /* Setup event dispatcher with a testcase timeout */ + base = event_base_new(); + struct event *timeout = evtimer_new(base,timeout_cb,NULL); + assert(timeout != NULL); + + evtimer_assign(timeout,base,timeout_cb,NULL); + struct timeval timeout_tv = {.tv_sec = 10}; + evtimer_add(timeout,&timeout_tv); + + /* Connect */ + redisOptions options = get_redis_tcp_options(config); + redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); + assert(ac != NULL && ac->err == 0); + redisLibeventAttach(ac,base); + + /* Configure a command timout */ + struct timeval command_timeout = {.tv_sec = 2}; + redisAsyncSetTimeout(ac,command_timeout); + + /* Not expecting any push messages in this test */ + redisAsyncSetPushCallback(ac,unexpected_push_cb); + + /* Switch protocol */ + redisAsyncCommand(ac,NULL,NULL,"HELLO 3"); + + /* Start subscribe */ + TestState state = {.options = &options, .resp3 = 1}; + redisAsyncCommand(ac,subscribe_with_timeout_cb,&state,"subscribe mychannel"); + + /* Start event dispatching loop */ + assert(event_base_dispatch(base) == 0); + event_free(timeout); + event_base_free(base); + + /* Verify test checkpoints */ + test_cond(state.checkpoint == 5); +} + +/* Subscribe callback for test_pubsub_multiple_channels */ +void subscribe_channel_a_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + + assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY && + reply->elements == 3); + + if (strcmp(reply->element[0]->str,"subscribe") == 0) { + assert(strcmp(reply->element[1]->str,"A") == 0); + publish_msg(state->options,"A","Hello!"); + state->checkpoint++; + } else if (strcmp(reply->element[0]->str,"message") == 0) { + assert(strcmp(reply->element[1]->str,"A") == 0 && + strcmp(reply->element[2]->str,"Hello!") == 0); + state->checkpoint++; + + /* Unsubscribe to channels, including channel X & Z which we don't subscribe to */ + redisAsyncCommand(ac,unexpected_cb, + (void*)"unsubscribe should not call unexpected_cb()", + "unsubscribe B X A A Z"); + /* Unsubscribe to patterns, none which we subscribe to */ + redisAsyncCommand(ac,unexpected_cb, + (void*)"punsubscribe should not call unexpected_cb()", + "punsubscribe"); + /* Send a regular command after unsubscribing, then disconnect */ + state->disconnect = 1; + redisAsyncCommand(ac,integer_cb,state,"LPUSH mylist foo"); + } else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) { + assert(strcmp(reply->element[1]->str,"A") == 0); + state->checkpoint++; + } else { + printf("Unexpected pubsub command: %s\n", reply->element[0]->str); + exit(1); + } +} + +/* Subscribe callback for test_pubsub_multiple_channels */ +void subscribe_channel_b_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + + assert(reply != NULL && reply->type == REDIS_REPLY_ARRAY && + reply->elements == 3); + + if (strcmp(reply->element[0]->str,"subscribe") == 0) { + assert(strcmp(reply->element[1]->str,"B") == 0); + state->checkpoint++; + } else if (strcmp(reply->element[0]->str,"unsubscribe") == 0) { + assert(strcmp(reply->element[1]->str,"B") == 0); + state->checkpoint++; + } else { + printf("Unexpected pubsub command: %s\n", reply->element[0]->str); + exit(1); + } +} + +/* Test handling of multiple channels + * - subscribe to channel A and B + * - a published message on A triggers an unsubscribe of channel B, X, A and Z + * where channel X and Z are not subscribed to. + * - the published message also triggers an unsubscribe to patterns. Since no + * pattern is subscribed to the responded pattern element type is NIL. + * - a command sent after unsubscribe triggers a disconnect */ +static void test_pubsub_multiple_channels(struct config config) { + test("Subscribe to multiple channels: "); + /* Setup event dispatcher with a testcase timeout */ + base = event_base_new(); + struct event *timeout = evtimer_new(base,timeout_cb,NULL); + assert(timeout != NULL); + + evtimer_assign(timeout,base,timeout_cb,NULL); + struct timeval timeout_tv = {.tv_sec = 10}; + evtimer_add(timeout,&timeout_tv); + + /* Connect */ + redisOptions options = get_redis_tcp_options(config); + redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); + assert(ac != NULL && ac->err == 0); + redisLibeventAttach(ac,base); + + /* Not expecting any push messages in this test */ + redisAsyncSetPushCallback(ac,unexpected_push_cb); + + /* Start subscribing to two channels */ + TestState state = {.options = &options}; + redisAsyncCommand(ac,subscribe_channel_a_cb,&state,"subscribe A"); + redisAsyncCommand(ac,subscribe_channel_b_cb,&state,"subscribe B"); + + /* Start event dispatching loop */ + assert(event_base_dispatch(base) == 0); + event_free(timeout); + event_base_free(base); + + /* Verify test checkpoints */ + test_cond(state.checkpoint == 6); +} + +/* Command callback for test_monitor() */ +void monitor_cb(redisAsyncContext *ac, void *r, void *privdata) { + redisReply *reply = r; + TestState *state = privdata; + + /* NULL reply is received when BYE triggers a disconnect. */ + if (reply == NULL) { + event_base_loopbreak(base); + return; + } + + assert(reply != NULL && reply->type == REDIS_REPLY_STATUS); + state->checkpoint++; + + if (state->checkpoint == 1) { + /* Response from MONITOR */ + redisContext *c = redisConnectWithOptions(state->options); + assert(c != NULL); + redisReply *reply = redisCommand(c,"SET first 1"); + assert(reply->type == REDIS_REPLY_STATUS); + freeReplyObject(reply); + redisFree(c); + } else if (state->checkpoint == 2) { + /* Response for monitored command 'SET first 1' */ + assert(strstr(reply->str,"first") != NULL); + redisContext *c = redisConnectWithOptions(state->options); + assert(c != NULL); + redisReply *reply = redisCommand(c,"SET second 2"); + assert(reply->type == REDIS_REPLY_STATUS); + freeReplyObject(reply); + redisFree(c); + } else if (state->checkpoint == 3) { + /* Response for monitored command 'SET second 2' */ + assert(strstr(reply->str,"second") != NULL); + /* Send QUIT to disconnect */ + redisAsyncCommand(ac,NULL,NULL,"QUIT"); + } +} + +/* Test handling of the monitor command + * - sends MONITOR to enable monitoring. + * - sends SET commands via separate clients to be monitored. + * - sends QUIT to stop monitoring and disconnect. */ +static void test_monitor(struct config config) { + test("Enable monitoring: "); + /* Setup event dispatcher with a testcase timeout */ + base = event_base_new(); + struct event *timeout = evtimer_new(base, timeout_cb, NULL); + assert(timeout != NULL); + + evtimer_assign(timeout,base,timeout_cb,NULL); + struct timeval timeout_tv = {.tv_sec = 10}; + evtimer_add(timeout, &timeout_tv); + + /* Connect */ + redisOptions options = get_redis_tcp_options(config); + redisAsyncContext *ac = redisAsyncConnectWithOptions(&options); + assert(ac != NULL && ac->err == 0); + redisLibeventAttach(ac,base); + + /* Not expecting any push messages in this test */ + redisAsyncSetPushCallback(ac,unexpected_push_cb); + + /* Start monitor */ + TestState state = {.options = &options}; + redisAsyncCommand(ac,monitor_cb,&state,"monitor"); + + /* Start event dispatching loop */ + test_cond(event_base_dispatch(base) == 0); + event_free(timeout); + event_base_free(base); + + /* Verify test checkpoints */ + assert(state.checkpoint == 3); +} +#endif /* HIREDIS_TEST_ASYNC */ + +/* tests for async api using polling adapter, requires no extra libraries*/ + +/* enum for the test cases, the callbacks have different logic based on them */ +typedef enum astest_no +{ + ASTEST_CONNECT=0, + ASTEST_CONN_TIMEOUT, + ASTEST_PINGPONG, + ASTEST_PINGPONG_TIMEOUT, + ASTEST_ISSUE_931, + ASTEST_ISSUE_931_PING +}astest_no; + +/* a static context for the async tests */ +struct _astest { + redisAsyncContext *ac; + astest_no testno; + int counter; + int connects; + int connect_status; + int disconnects; + int pongs; + int disconnect_status; + int connected; + int err; + char errstr[256]; +}; +static struct _astest astest; + +static void asSleep(int ms) +{ +#if _MSC_VER + Sleep(ms); +#else + usleep(ms*1000); +#endif +} + +/* async callbacks */ +static void asCleanup(void* data) +{ + struct _astest *t = (struct _astest *)data; + t->ac = NULL; +} + +static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _privdata); + +static void connectCallback(redisAsyncContext *c, int status) { + struct _astest *t = (struct _astest *)c->data; + assert(t == &astest); + assert(t->connects == 0); + t->err = c->err; + strcpy(t->errstr, c->errstr); + t->connects++; + t->connect_status = status; + t->connected = status == REDIS_OK ? 1 : -1; + + if (t->testno == ASTEST_ISSUE_931) { + /* disconnect again */ + redisAsyncDisconnect(c); + } + else if (t->testno == ASTEST_ISSUE_931_PING) + { + status = redisAsyncCommand(c, commandCallback, NULL, "PING"); + } +} +static void disconnectCallback(const redisAsyncContext *c, int status) { + assert(c->data == (void*)&astest); + assert(astest.disconnects == 0); + astest.err = c->err; + strcpy(astest.errstr, c->errstr); + astest.disconnects++; + astest.disconnect_status = status; + astest.connected = 0; +} + +static void commandCallback(struct redisAsyncContext *ac, void* _reply, void* _privdata) +{ + redisReply *reply = (redisReply*)_reply; + struct _astest *t = (struct _astest *)ac->data; + assert(t == &astest); + (void)_privdata; + t->err = ac->err; + strcpy(t->errstr, ac->errstr); + t->counter++; + if (t->testno == ASTEST_PINGPONG ||t->testno == ASTEST_ISSUE_931_PING) + { + assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0); + t->pongs++; + redisAsyncFree(ac); + } + if (t->testno == ASTEST_PINGPONG_TIMEOUT) + { + /* two ping pongs */ + assert(reply != NULL && reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "PONG") == 0); + t->pongs++; + if (t->counter == 1) { + int status = redisAsyncCommand(ac, commandCallback, NULL, "PING"); + assert(status == REDIS_OK); + } else { + redisAsyncFree(ac); + } + } +} + +static redisAsyncContext *do_aconnect(struct config config, astest_no testno) +{ + redisOptions options = {0}; + memset(&astest, 0, sizeof(astest)); + + astest.testno = testno; + astest.connect_status = astest.disconnect_status = -2; + + if (config.type == CONN_TCP) { + options.type = REDIS_CONN_TCP; + options.connect_timeout = &config.tcp.timeout; + REDIS_OPTIONS_SET_TCP(&options, config.tcp.host, config.tcp.port); + } else if (config.type == CONN_SSL) { + options.type = REDIS_CONN_TCP; + options.connect_timeout = &config.tcp.timeout; + REDIS_OPTIONS_SET_TCP(&options, config.ssl.host, config.ssl.port); + } else if (config.type == CONN_UNIX) { + options.type = REDIS_CONN_UNIX; + options.endpoint.unix_socket = config.unix_sock.path; + } else if (config.type == CONN_FD) { + options.type = REDIS_CONN_USERFD; + /* Create a dummy connection just to get an fd to inherit */ + redisContext *dummy_ctx = redisConnectUnix(config.unix_sock.path); + if (dummy_ctx) { + redisFD fd = disconnect(dummy_ctx, 1); + printf("Connecting to inherited fd %d\n", (int)fd); + options.endpoint.fd = fd; + } + } + redisAsyncContext *c = redisAsyncConnectWithOptions(&options); + assert(c); + astest.ac = c; + c->data = &astest; + c->dataCleanup = asCleanup; + redisPollAttach(c); + redisAsyncSetConnectCallbackNC(c, connectCallback); + redisAsyncSetDisconnectCallback(c, disconnectCallback); + return c; +} + +static void as_printerr(void) { + printf("Async err %d : %s\n", astest.err, astest.errstr); +} + +#define ASASSERT(e) do { \ + if (!(e)) \ + as_printerr(); \ + assert(e); \ +} while (0); + +static void test_async_polling(struct config config) { + int status; + redisAsyncContext *c; + struct config defaultconfig = config; + + test("Async connect: "); + c = do_aconnect(config, ASTEST_CONNECT); + assert(c); + while(astest.connected == 0) + redisPollTick(c, 0.1); + assert(astest.connects == 1); + ASASSERT(astest.connect_status == REDIS_OK); + assert(astest.disconnects == 0); + test_cond(astest.connected == 1); + + test("Async free after connect: "); + assert(astest.ac != NULL); + redisAsyncFree(c); + assert(astest.disconnects == 1); + assert(astest.ac == NULL); + test_cond(astest.disconnect_status == REDIS_OK); + + if (config.type == CONN_TCP || config.type == CONN_SSL) { + /* timeout can only be simulated with network */ + test("Async connect timeout: "); + config.tcp.host = "192.168.254.254"; /* blackhole ip */ + config.tcp.timeout.tv_usec = 100000; + c = do_aconnect(config, ASTEST_CONN_TIMEOUT); + assert(c); + assert(c->err == 0); + while(astest.connected == 0) + redisPollTick(c, 0.1); + assert(astest.connected == -1); + /* + * freeing should not be done, clearing should have happened. + *redisAsyncFree(c); + */ + assert(astest.ac == NULL); + test_cond(astest.connect_status == REDIS_ERR); + config = defaultconfig; + } + + /* Test a ping/pong after connection */ + test("Async PING/PONG: "); + c = do_aconnect(config, ASTEST_PINGPONG); + while(astest.connected == 0) + redisPollTick(c, 0.1); + status = redisAsyncCommand(c, commandCallback, NULL, "PING"); + assert(status == REDIS_OK); + while(astest.ac) + redisPollTick(c, 0.1); + test_cond(astest.pongs == 1); + + /* Test a ping/pong after connection that didn't time out. + * see https://github.com/redis/hiredis/issues/945 + */ + if (config.type == CONN_TCP || config.type == CONN_SSL) { + test("Async PING/PONG after connect timeout: "); + config.tcp.timeout.tv_usec = 10000; /* 10ms */ + c = do_aconnect(config, ASTEST_PINGPONG_TIMEOUT); + while(astest.connected == 0) + redisPollTick(c, 0.1); + /* sleep 0.1 s, allowing old timeout to arrive */ + asSleep(10); + status = redisAsyncCommand(c, commandCallback, NULL, "PING"); + assert(status == REDIS_OK); + while(astest.ac) + redisPollTick(c, 0.1); + test_cond(astest.pongs == 2); + config = defaultconfig; + } + + /* Test disconnect from an on_connect callback + * see https://github.com/redis/hiredis/issues/931 + */ + test("Disconnect from onConnected callback (Issue #931): "); + c = do_aconnect(config, ASTEST_ISSUE_931); + while(astest.disconnects == 0) + redisPollTick(c, 0.1); + assert(astest.connected == 0); + assert(astest.connects == 1); + test_cond(astest.disconnects == 1); + + /* Test ping/pong from an on_connect callback + * see https://github.com/redis/hiredis/issues/931 + */ + test("Ping/Pong from onConnected callback (Issue #931): "); + c = do_aconnect(config, ASTEST_ISSUE_931_PING); + /* connect callback issues ping, reponse callback destroys context */ + while(astest.ac) + redisPollTick(c, 0.1); + assert(astest.connected == 0); + assert(astest.connects == 1); + assert(astest.disconnects == 1); + test_cond(astest.pongs == 1); +} +/* End of Async polling_adapter driven tests */ + int main(int argc, char **argv) { struct config cfg = { .tcp = { @@ -1388,6 +2271,34 @@ int main(int argc, char **argv) { } #endif +#ifdef HIREDIS_TEST_ASYNC + cfg.type = CONN_TCP; + printf("\nTesting asynchronous API against TCP connection (%s:%d):\n", cfg.tcp.host, cfg.tcp.port); + cfg.type = CONN_TCP; + + int major; + redisContext *c = do_connect(cfg); + get_redis_version(c, &major, NULL); + disconnect(c, 0); + + test_pubsub_handling(cfg); + test_pubsub_multiple_channels(cfg); + test_monitor(cfg); + if (major >= 6) { + test_pubsub_handling_resp3(cfg); + test_command_timeout_during_pubsub(cfg); + } +#endif /* HIREDIS_TEST_ASYNC */ + + cfg.type = CONN_TCP; + printf("\nTesting asynchronous API using polling_adapter TCP (%s:%d):\n", cfg.tcp.host, cfg.tcp.port); + test_async_polling(cfg); + if (test_unix_socket) { + cfg.type = CONN_UNIX; + printf("\nTesting asynchronous API using polling_adapter UNIX (%s):\n", cfg.unix_sock.path); + test_async_polling(cfg); + } + if (test_inherit_fd) { printf("\nTesting against inherited fd (%s): ", cfg.unix_sock.path); if (test_unix_socket) { @@ -5,9 +5,16 @@ REDIS_PORT=${REDIS_PORT:-56379} REDIS_SSL_PORT=${REDIS_SSL_PORT:-56443} TEST_SSL=${TEST_SSL:-0} SKIPS_AS_FAILS=${SKIPS_AS_FAILS-:0} +ENABLE_DEBUG_CMD= SSL_TEST_ARGS= SKIPS_ARG= +# We need to enable the DEBUG command for redis-server >= 7.0.0 +REDIS_MAJOR_VERSION="$(redis-server --version|awk -F'[^0-9]+' '{ print $2 }')" +if [ "$REDIS_MAJOR_VERSION" -gt "6" ]; then + ENABLE_DEBUG_CMD="enable-debug-command local" +fi + tmpdir=$(mktemp -d) PID_FILE=${tmpdir}/hiredis-test-redis.pid SOCK_FILE=${tmpdir}/hiredis-test-redis.sock @@ -49,8 +56,10 @@ cleanup() { } trap cleanup INT TERM EXIT + cat > ${tmpdir}/redis.conf <<EOF daemonize yes +${ENABLE_DEBUG_CMD} pidfile ${PID_FILE} port ${REDIS_PORT} bind 127.0.0.1 |