From db3fb6e0629de665cd28dcc10c7c80a8c1c373f2 Mon Sep 17 00:00:00 2001 From: chenBright Date: Mon, 3 Nov 2025 22:09:39 +0800 Subject: [PATCH] Support signal thrift --- .../actions/init-ut-make-config/action.yml | 16 +- .../install-all-dependencies/action.yml | 2 +- .github/workflows/ci-linux.yml | 222 ++------------ .github/workflows/ci-macos.yml | 66 ---- .gitignore | 1 + CMakeLists.txt | 2 +- cmake/CMakeLists.download_gtest.in | 2 +- config_brpc.sh | 4 +- example/thrift_extension_c++/CMakeLists.txt | 152 ++++++++++ example/thrift_extension_c++/client.cpp | 1 + example/thrift_extension_c++/client2.cpp | 36 ++- example/thrift_extension_c++/server2.cpp | 4 +- src/brpc/global.cpp | 2 +- src/brpc/parallel_channel.h | 2 +- src/brpc/policy/http_rpc_protocol.cpp | 2 +- src/brpc/policy/thrift_protocol.cpp | 101 ++++--- src/brpc/socket.cpp | 56 +++- src/brpc/socket.h | 284 ++++++++++++++++++ src/butil/containers/flat_map.h | 2 +- src/butil/containers/flat_map_inl.h | 2 +- src/butil/iobuf.h | 2 + src/butil/thread_key.h | 205 +++++++------ test/CMakeLists.txt | 17 +- test/Makefile | 11 +- test/brpc_http_rpc_protocol_unittest.cpp | 3 +- test/brpc_thrift_protocol_unittest.cpp | 143 +++++++++ test/echo.thrift | 34 +++ test/resource_pool_unittest.cpp | 3 + 28 files changed, 928 insertions(+), 449 deletions(-) delete mode 100644 .github/workflows/ci-macos.yml create mode 100644 example/thrift_extension_c++/CMakeLists.txt create mode 100644 test/brpc_thrift_protocol_unittest.cpp create mode 100644 test/echo.thrift diff --git a/.github/actions/init-ut-make-config/action.yml b/.github/actions/init-ut-make-config/action.yml index 9df8103652..d66ce97f62 100644 --- a/.github/actions/init-ut-make-config/action.yml +++ b/.github/actions/init-ut-make-config/action.yml @@ -6,7 +6,8 @@ runs: using: "composite" steps: - run: | - sudo apt-get update && sudo apt-get install -y clang-12 lldb-12 lld-12 libgtest-dev cmake gdb libstdc++6-11-dbg + sudo apt-get update + sudo apt-get install -y clang-12 lldb-12 lld-12 libgtest-dev cmake gdb libstdc++6-11-dbg bison flex libboost-all-dev libevent-dev libibverbs1 libibverbs-dev shell: bash - run: | cd /usr/src/gtest && export CC=clang-12 && export CXX=clang++-12 && sudo cmake -DCMAKE_POLICY_VERSION_MINIMUM=3.5 . @@ -20,15 +21,20 @@ runs: shell: bash - run: | sudo git clone https://github.com/gperftools/gperftools.git - cd gperftools && sudo git checkout tags/gperftools-2.16 && sudo mkdir -p /gperftools - sudo ./autogen.sh && sudo CC=clang-12 CXX=clang++-12 ./configure --prefix=/gperftools --enable-frame-pointers + cd gperftools && sudo git checkout tags/gperftools-2.16 + sudo ./autogen.sh && sudo CC=clang-12 CXX=clang++-12 ./configure --prefix=/usr --enable-frame-pointers sudo make -j ${{env.proc_num}} && sudo make install shell: bash - run: | sudo git clone https://github.com/abseil/abseil-cpp.git cd abseil-cpp && sudo git checkout lts_2022_06_23 && sudo mkdir -p /abseil-cpp - sudo CC=clang-12 CXX=clang++-12 cmake -DBUILD_TESTING=OFF -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_INSTALL_PREFIX=/abseil-cpp -DCMAKE_POLICY_VERSION_MINIMUM=3.5 . + sudo CC=clang-12 CXX=clang++-12 cmake -DBUILD_TESTING=OFF -DCMAKE_POSITION_INDEPENDENT_CODE=ON -DCMAKE_INSTALL_PREFIX=/usr -DCMAKE_POLICY_VERSION_MINIMUM=3.5 . sudo make -j ${{env.proc_num}} && sudo sudo make install shell: bash - - run: sh config_brpc.sh --headers="/libunwind/include /gperftools/include /abseil-cpp/include /usr/include" --libs="/libunwind/lib /gperftools/lib /abseil-cpp/lib /usr/lib /usr/lib64" --cc=clang-12 --cxx=clang++-12 ${{inputs.options}} && cat config.mk + - run: | + wget https://archive.apache.org/dist/thrift/0.11.0/thrift-0.11.0.tar.gz && tar -xf thrift-0.11.0.tar.gz && cd thrift-0.11.0/ + sudo CC=clang-12 CXX=clang++-12 ./configure --prefix=/usr --with-rs=no --with-ruby=no --with-python=no --with-java=no --with-go=no --with-perl=no --with-php=no --with-csharp=no --with-erlang=no --with-lua=no --with-nodejs=no --with-haskell=no --with-dotnetcore=no CXXFLAGS="-Wno-unused-variable" + sudo make -j ${{env.proc_num}} && sudo make install && ls /usr/lib/libthrift* && ls /usr/include/thrift + shell: bash + - run: sh config_brpc.sh --headers="/libunwind/include /usr/include" --libs="/libunwind/lib /usr/lib /usr/lib64" --cc=clang-12 --cxx=clang++-12 ${{inputs.options}} && cat config.mk shell: bash diff --git a/.github/actions/install-all-dependencies/action.yml b/.github/actions/install-all-dependencies/action.yml index 179f86cd4f..a596a81aad 100644 --- a/.github/actions/install-all-dependencies/action.yml +++ b/.github/actions/install-all-dependencies/action.yml @@ -2,7 +2,7 @@ runs: using: "composite" steps: - uses: ./.github/actions/install-essential-dependencies - - run: sudo apt-get install -y libunwind-dev libgoogle-glog-dev automake bison flex libboost-all-dev libevent-dev libtool pkg-config libibverbs1 libibverbs-dev + - run: sudo apt-get install -y libunwind-dev libgoogle-glog-dev automake bison flex libboost-all-dev libevent-dev libtool pkg-config libibverbs1 libibverbs-dev shell: bash - run: | wget https://archive.apache.org/dist/thrift/0.11.0/thrift-0.11.0.tar.gz && tar -xf thrift-0.11.0.tar.gz && cd thrift-0.11.0/ diff --git a/.github/workflows/ci-linux.yml b/.github/workflows/ci-linux.yml index f2d6d69287..ec7eddb816 100644 --- a/.github/workflows/ci-linux.yml +++ b/.github/workflows/ci-linux.yml @@ -14,182 +14,8 @@ env: proc_num: $(nproc) # https://github.com/actions/runner-images +# https://github.com/SF-Zhou/setup-soft-roce-action/blob/main/.github/workflows/main.yml jobs: - compile-with-make: - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v2 - - uses: ./.github/actions/install-all-dependencies - - - name: gcc with default options - uses: ./.github/actions/compile-with-make - with: - options: --headers=/usr/include --libs=/usr/lib /usr/lib64 --cc=gcc --cxx=g++ --werror - - - name: gcc with all options - uses: ./.github/actions/compile-with-make - with: - options: --headers=/usr/include --libs=/usr/lib /usr/lib64 --cc=gcc --cxx=g++ --werror --with-thrift --with-glog --with-rdma --with-debug-bthread-sche-safety --with-debug-lock --with-bthread-tracer --with-asan - - - name: clang with default options - uses: ./.github/actions/compile-with-make - with: - options: --headers=/usr/include --libs=/usr/lib /usr/lib64 --cc=clang --cxx=clang++ --werror - - - name: clang with all options - uses: ./.github/actions/compile-with-make - with: - options: --headers=/usr/include --libs=/usr/lib /usr/lib64 --cc=clang --cxx=clang++ --werror --with-thrift --with-glog --with-rdma --with-debug-bthread-sche-safety --with-debug-lock --with-bthread-tracer --with-asan - - compile-with-cmake: - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v2 - - uses: ./.github/actions/install-all-dependencies - - - name: gcc with default options - run: | - export CC=gcc && export CXX=g++ - mkdir gcc_build && cd gcc_build && cmake -DCMAKE_POLICY_VERSION_MINIMUM=3.5 .. - make -j ${{env.proc_num}} && make clean - - - name: gcc with all options - run: | - export CC=gcc && export CXX=g++ - mkdir gcc_build_all && cd gcc_build_all - cmake -DWITH_MESALINK=OFF -DWITH_GLOG=ON -DWITH_THRIFT=ON -DWITH_RDMA=ON -DWITH_DEBUG_BTHREAD_SCHE_SAFETY=ON -DWITH_DEBUG_LOCK=ON -DWITH_BTHREAD_TRACER=ON -DWITH_ASAN=ON -DCMAKE_POLICY_VERSION_MINIMUM=3.5 .. - make -j ${{env.proc_num}} && make clean - - - name: clang with default options - run: | - export CC=clang && export CXX=clang++ - mkdir clang_build && cd clang_build && cmake -DCMAKE_POLICY_VERSION_MINIMUM=3.5 .. - make -j ${{env.proc_num}} && make clean - - - name: clang with all options - run: | - export CC=clang && export CXX=clang++ - mkdir clang_build_all && cd clang_build_all - cmake -DWITH_MESALINK=OFF -DWITH_GLOG=ON -DWITH_THRIFT=ON -DWITH_RDMA=ON -DWITH_DEBUG_BTHREAD_SCHE_SAFETY=ON -DWITH_DEBUG_LOCK=ON -DWITH_BTHREAD_TRACER=ON -DWITH_ASAN=ON -DCMAKE_POLICY_VERSION_MINIMUM=3.5 .. - make -j ${{env.proc_num}} && make clean - - gcc-compile-with-make-protobuf: - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v2 - - uses: ./.github/actions/install-essential-dependencies - - - name: protobuf 3.5.1 - uses: ./.github/actions/compile-with-make-protobuf - with: - protobuf-version: 3.5.1 - protobuf-cpp-version: 3.5.1 - protobuf-install-dir: /protobuf-3.5.1 - config-brpc-options: --cc=gcc --cxx=g++ --werror - - - name: protobuf 3.12.4 - uses: ./.github/actions/compile-with-make-protobuf - with: - protobuf-version: 3.12.4 - protobuf-cpp-version: 3.12.4 - protobuf-install-dir: /protobuf-3.12.4 - config-brpc-options: --cc=gcc --cxx=g++ --werror - - - name: protobuf 21.12 - uses: ./.github/actions/compile-with-make-protobuf - with: - protobuf-version: 21.12 - protobuf-cpp-version: 3.21.12 - protobuf-install-dir: /protobuf-3.21.12 - config-brpc-options: --cc=gcc --cxx=g++ --werror - - gcc-compile-with-bazel: - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v2 - - run: bazel build --verbose_failures -- //... -//example/... - - gcc-compile-with-boringssl: - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v2 - - run: bazel build --verbose_failures --define with_mesalink=false --define with_glog=true --define with_thrift=true --define BRPC_WITH_BORINGSSL=true -- //... -//example/... - - gcc-compile-with-bazel-all-options: - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v2 - - run: | - bazel build --verbose_failures \ - --define with_mesalink=false \ - --define with_glog=true \ - --define with_thrift=true \ - --define with_debug_bthread_sche_safety=true \ - --define with_debug_lock=true \ - --define with_asan=true \ - --define with_bthread_tracer=true \ - --define BRPC_WITH_NO_PTHREAD_MUTEX_HOOK=true \ - -- //... -//example/... - - clang-compile-with-make-protobuf: - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v2 - - uses: ./.github/actions/install-essential-dependencies - - - name: protobuf 3.5.1 - uses: ./.github/actions/compile-with-make-protobuf - with: - protobuf-version: 3.5.1 - protobuf-cpp-version: 3.5.1 - protobuf-install-dir: /protobuf-3.5.1 - config-brpc-options: --cc=clang --cxx=clang++ --werror - - - name: protobuf 3.12.4 - uses: ./.github/actions/compile-with-make-protobuf - with: - protobuf-version: 3.12.4 - protobuf-cpp-version: 3.12.4 - protobuf-install-dir: /protobuf-3.12.4 - config-brpc-options: --cc=clang --cxx=clang++ --werror - - - name: protobuf 21.12 - uses: ./.github/actions/compile-with-make-protobuf - with: - protobuf-version: 21.12 - protobuf-cpp-version: 3.21.12 - protobuf-install-dir: /protobuf-3.21.12 - config-brpc-options: --cc=clang --cxx=clang++ --werror - - clang-compile-with-bazel: - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v2 - - run: bazel build --verbose_failures --action_env=CC=clang -- //... -//example/... - - clang-compile-with-boringssl: - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v2 - - run: bazel build --verbose_failures --action_env=CC=clang --define with_mesalink=false --define with_glog=true --define with_thrift=true --define BRPC_WITH_BORINGSSL=true -- //... -//example/... - - clang-compile-with-bazel-all-options: - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v2 - - run: | - bazel build --verbose_failures \ - --action_env=CC=clang \ - --define with_mesalink=false \ - --define with_glog=true \ - --define with_thrift=true \ - --define with_debug_bthread_sche_safety=true \ - --define with_debug_lock=true \ - --define with_asan=true \ - --define with_bthread_tracer=true \ - --define BRPC_WITH_NO_PTHREAD_MUTEX_HOOK=true \ - -- //... -//example/... - clang-unittest: runs-on: ubuntu-22.04 steps: @@ -197,7 +23,24 @@ jobs: - uses: ./.github/actions/install-essential-dependencies - uses: ./.github/actions/init-ut-make-config with: - options: --with-bthread-tracer + options: --with-bthread-tracer --with-thrift + - name: Setup Soft-RoCE + run: | + KERNEL_VERSION=$(uname -r | cut -d '-' -f 1) + KERNEL_NAME="linux-${KERNEL_VERSION%'.0'}" + DOWNLOAD_LINK="https://cdn.kernel.org/pub/linux/kernel/v${KERNEL_VERSION%%.*}.x/${KERNEL_NAME}.tar.xz" + ETHERNET_CARD=$(ip link | awk -F ": " '$0 !~ "lo|vir|wl|^[^0-9]"{print $2;getline}' | head -1) + echo "kernel version is ${KERNEL_VERSION}, download link is ${DOWNLOAD_LINK}, ethernet card is ${ETHERNET_CARD}" + wget -q $DOWNLOAD_LINK -O /tmp/$KERNEL_NAME.tar.xz + tar xf /tmp/$KERNEL_NAME.tar.xz --directory=/tmp + RXE_PATH="/tmp/$KERNEL_NAME/drivers/infiniband/sw/rxe" + sed 's/$(CONFIG_RDMA_RXE)/m/g' $RXE_PATH/Makefile > $RXE_PATH/Kbuild + make -C /lib/modules/$(uname -r)/build M=$RXE_PATH modules -j + sudo modprobe ib_core + sudo modprobe rdma_ucm + sudo insmod $RXE_PATH/rdma_rxe.ko + sudo rdma link add rxe_0 type rxe netdev $ETHERNET_CARD + rdma link - name: compile tests run: | cat config.mk @@ -207,30 +50,3 @@ jobs: run: | cd test sh ./run_tests.sh - - clang-unittest-asan: - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v2 - - uses: ./.github/actions/install-essential-dependencies - - uses: ./.github/actions/init-ut-make-config - with: - options: --with-bthread-tracer --with-asan - - name: compile tests - run: | - cat config.mk - cd test - make NEED_GPERFTOOLS=0 -j ${{env.proc_num}} - - name: run tests - run: | - cd test - sh ./run_tests.sh - - bazel-bvar-unittest: - runs-on: ubuntu-22.04 - steps: - - uses: actions/checkout@v2 - - run: bazel test --verbose_failures //test:bvar_test - - run: bazel test --verbose_failures --define with_babylon_counter=true //test:bvar_test - - run: bazel test --verbose_failures --action_env=CC=clang //test:bvar_test - - run: bazel test --verbose_failures --action_env=CC=clang --define with_babylon_counter=true //test:bvar_test diff --git a/.github/workflows/ci-macos.yml b/.github/workflows/ci-macos.yml deleted file mode 100644 index 61d45ac821..0000000000 --- a/.github/workflows/ci-macos.yml +++ /dev/null @@ -1,66 +0,0 @@ -name: Build on Macos - -on: - push: - branches: [ master ] - paths-ignore: - - '**.md' - pull_request: - branches: [ master ] - paths-ignore: - - '**.md' - -env: - proc_num: $(sysctl -n hw.logicalcpu) - -jobs: - compile-with-make-cmake-protobuf21: - runs-on: macos-latest # https://github.com/actions/runner-images - - steps: - - uses: actions/checkout@v2 - - - name: install dependences - run: | - brew install openssl gnu-getopt coreutils gflags leveldb protobuf@21 - - - name: compile with make - run: | - GETOPT_PATH=$(brew --prefix gnu-getopt)/bin - export PATH=$GETOPT_PATH:$PATH - ./config_brpc.sh --header="$(brew --prefix)/include" --libs="$(brew --prefix)/lib" - make -j ${{env.proc_num}} && make clean - - - name: compile with cmake - run: | - echo "CMAKE_PREFIX_PATH=$(brew --prefix protobuf@21)" - mkdir build && cd build && cmake -DCMAKE_POLICY_VERSION_MINIMUM=3.5 -DCMAKE_PREFIX_PATH=$(brew --prefix protobuf@21) .. - make -j ${{env.proc_num}} && make clean - - compile-with-make-cmake-protobuf29: - runs-on: macos-latest # https://github.com/actions/runner-images - - steps: - - uses: actions/checkout@v2 - - - name: install dependences - run: | - brew install openssl gnu-getopt coreutils gflags leveldb protobuf@29 - - - name: compile with make - run: | - GETOPT_PATH=$(brew --prefix gnu-getopt)/bin - export PATH=$GETOPT_PATH:$PATH - ./config_brpc.sh --header="$(brew --prefix)/include" --libs="$(brew --prefix)/lib" - make -j ${{env.proc_num}} && make clean - - - name: compile with cmake - run: | - mkdir build && cd build && cmake -DCMAKE_POLICY_VERSION_MINIMUM=3.5 -DCMAKE_PREFIX_PATH=$(brew --prefix protobuf@29) .. - make -j ${{env.proc_num}} && make clean - - compile-with-bazel: - runs-on: macos-latest # https://github.com/actions/runner-images - steps: - - uses: actions/checkout@v2 - - run: bazel build --verbose_failures -- //:brpc -//example/... diff --git a/.gitignore b/.gitignore index 44a371abb8..c56b8805f8 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ *.rej /output /test/output +/test/gen-cpp build/ # Ignore hidden files diff --git a/CMakeLists.txt b/CMakeLists.txt index 63f3ee06cd..15aef0eca7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -36,7 +36,7 @@ option(BUILD_BRPC_TOOLS "Whether to build brpc tools" ON) option(DOWNLOAD_GTEST "Download and build a fresh copy of googletest. Requires Internet access." ON) # Enable MACOSX_RPATH. Run "cmake --help-policy CMP0042" for policy details. -if(POLICY CMP0042) +if (POLICY CMP0042) cmake_policy(SET CMP0042 NEW) endif() diff --git a/cmake/CMakeLists.download_gtest.in b/cmake/CMakeLists.download_gtest.in index df020c89fc..5b35ce8ed2 100644 --- a/cmake/CMakeLists.download_gtest.in +++ b/cmake/CMakeLists.download_gtest.in @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -cmake_minimum_required(VERSION 2.8.10) +cmake_minimum_required(VERSION 3.5) project(googletest-download NONE) diff --git a/config_brpc.sh b/config_brpc.sh index 75826452b4..b8aeb8304d 100755 --- a/config_brpc.sh +++ b/config_brpc.sh @@ -231,6 +231,7 @@ append_linking $GFLAGS_LIB gflags PROTOBUF_LIB=$(find_dir_of_lib_or_die protobuf) append_linking $PROTOBUF_LIB protobuf +PROTOC=$(find_bin_or_die protoc) LEVELDB_LIB=$(find_dir_of_lib_or_die leveldb) # required by leveldb @@ -260,8 +261,6 @@ else DYNAMIC_LINKINGS="$DYNAMIC_LINKINGS -lleveldb" fi -PROTOC=$(find_bin_or_die protoc) - GFLAGS_HDR=$(find_dir_of_header_or_die gflags/gflags.h) PROTOBUF_HDR=$(find_dir_of_header_or_die google/protobuf/message.h) @@ -460,6 +459,7 @@ if [ "$SYSTEM" = "Darwin" ]; then fi if [ $WITH_THRIFT != 0 ]; then + append_to_output "THRIFT=$(find_bin_or_die thrift)" THRIFT_LIB=$(find_dir_of_lib_or_die thriftnb) THRIFT_HDR=$(find_dir_of_header_or_die thrift/Thrift.h) append_to_output_libs "$THRIFT_LIB" diff --git a/example/thrift_extension_c++/CMakeLists.txt b/example/thrift_extension_c++/CMakeLists.txt new file mode 100644 index 0000000000..6b26595779 --- /dev/null +++ b/example/thrift_extension_c++/CMakeLists.txt @@ -0,0 +1,152 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cmake_minimum_required(VERSION 2.8.10) +project(asynchronous_echo_c++ C CXX) + +option(LINK_SO "Whether examples are linked dynamically" OFF) + +execute_process( + COMMAND bash -c "find ${PROJECT_SOURCE_DIR}/../.. -type d -regex \".*output/include$\" | head -n1 | xargs dirname | tr -d '\n'" + OUTPUT_VARIABLE OUTPUT_PATH +) + +set(CMAKE_PREFIX_PATH ${OUTPUT_PATH}) + +list(APPEND CMAKE_MODULE_PATH "${PROJECT_SOURCE_DIR}/../../cmake") + +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DBRPC_ENABLE_CPU_PROFILER") + +include(FindThreads) +include(FindProtobuf) +find_package(Gperftools) +include_directories(${GPERFTOOLS_INCLUDE_DIR}) + +include_directories(${CMAKE_CURRENT_BINARY_DIR}) + +# Search for libthrift* by best effort. If it is not found and brpc is +# compiled with thrift protocol enabled, a link error would be reported. +find_library(THRIFT_LIB NAMES thrift) +if (NOT THRIFT_LIB) + message(FATAL_ERROR "Fail to find thrift") +endif() + +# generate thrift echo.thrift +execute_process( + COMMAND thrift --gen cpp ${CMAKE_CURRENT_SOURCE_DIR}/echo.thrift +) +include_directories(${CMAKE_CURRENT_BINARY_DIR}/gen-cpp) + +find_path(BRPC_INCLUDE_PATH NAMES brpc/server.h) +if (LINK_SO) + find_library(BRPC_LIB NAMES brpc) +else() + find_library(BRPC_LIB NAMES libbrpc.a brpc) +endif() +if((NOT BRPC_INCLUDE_PATH) OR (NOT BRPC_LIB)) + message(FATAL_ERROR "Fail to find brpc") +endif() +include_directories(${BRPC_INCLUDE_PATH}) + +find_path(GFLAGS_INCLUDE_PATH gflags/gflags.h) +find_library(GFLAGS_LIBRARY NAMES gflags libgflags) +if((NOT GFLAGS_INCLUDE_PATH) OR (NOT GFLAGS_LIBRARY)) + message(FATAL_ERROR "Fail to find gflags") +endif() +include_directories(${GFLAGS_INCLUDE_PATH}) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + include(CheckFunctionExists) + CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) + if(NOT HAVE_CLOCK_GETTIME) + set(DEFINE_CLOCK_GETTIME "-DNO_CLOCK_GETTIME_IN_MAC") + endif() +endif() + +set(CMAKE_CXX_FLAGS "${DEFINE_CLOCK_GETTIME} -DNDEBUG -O2 -D__const__=__unused__ -pipe -W -Wall -Wno-unused-parameter -fPIC -fno-omit-frame-pointer") + +if(CMAKE_VERSION VERSION_LESS "3.1.3") + if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() + if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") + endif() +else() + set(CMAKE_CXX_STANDARD 11) + set(CMAKE_CXX_STANDARD_REQUIRED ON) +endif() + +find_path(LEVELDB_INCLUDE_PATH NAMES leveldb/db.h) +find_library(LEVELDB_LIB NAMES leveldb) +if ((NOT LEVELDB_INCLUDE_PATH) OR (NOT LEVELDB_LIB)) + message(FATAL_ERROR "Fail to find leveldb") +endif() +include_directories(${LEVELDB_INCLUDE_PATH}) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(OPENSSL_ROOT_DIR + "/usr/local/opt/openssl" # Homebrew installed OpenSSL + ) +endif() + +find_package(OpenSSL) +include_directories(${OPENSSL_INCLUDE_DIR}) + +set(DYNAMIC_LIB + ${CMAKE_THREAD_LIBS_INIT} + ${GFLAGS_LIBRARY} + ${PROTOBUF_LIBRARIES} + ${LEVELDB_LIB} + ${OPENSSL_CRYPTO_LIBRARY} + ${OPENSSL_SSL_LIBRARY} + ${THRIFT_LIB} + dl + ) + +if(CMAKE_SYSTEM_NAME STREQUAL "Darwin") + set(DYNAMIC_LIB ${DYNAMIC_LIB} + pthread + "-framework CoreFoundation" + "-framework CoreGraphics" + "-framework CoreData" + "-framework CoreText" + "-framework Security" + "-framework Foundation" + "-Wl,-U,_MallocExtension_ReleaseFreeMemory" + "-Wl,-U,_ProfilerStart" + "-Wl,-U,_ProfilerStop" + "-Wl,-U,__Z13GetStackTracePPvii" + "-Wl,-U,_mallctl" + "-Wl,-U,_malloc_stats_print" + ) +endif() + +add_executable(thrift_client client.cpp ${CMAKE_CURRENT_BINARY_DIR}/gen-cpp/echo_types.cpp) +add_executable(thrift_client2 client2.cpp ${CMAKE_CURRENT_BINARY_DIR}/gen-cpp/echo_types.cpp) +target_link_libraries(thrift_client ${BRPC_LIB} ${DYNAMIC_LIB} ${GPERFTOOLS_LIBRARIES}) +target_link_libraries(thrift_client2 ${BRPC_LIB} ${DYNAMIC_LIB} ${GPERFTOOLS_LIBRARIES}) + +add_executable(thrift_server server.cpp ${CMAKE_CURRENT_BINARY_DIR}/gen-cpp/echo_types.cpp) +add_executable(thrift_server2 server2.cpp ${CMAKE_CURRENT_BINARY_DIR}/gen-cpp/echo_types.cpp) +target_link_libraries(thrift_server ${BRPC_LIB} ${DYNAMIC_LIB} ${GPERFTOOLS_LIBRARIES}) +target_link_libraries(thrift_server2 ${BRPC_LIB} ${DYNAMIC_LIB} ${GPERFTOOLS_LIBRARIES}) + +add_executable(native_client native_client.cpp ${CMAKE_CURRENT_BINARY_DIR}/gen-cpp/echo_types.cpp) +add_executable(native_server native_server.cpp ${CMAKE_CURRENT_BINARY_DIR}/gen-cpp/echo_types.cpp) +target_link_libraries(native_client ${BRPC_LIB} ${DYNAMIC_LIB} ${GPERFTOOLS_LIBRARIES}) +target_link_libraries(native_server ${BRPC_LIB} ${DYNAMIC_LIB} ${GPERFTOOLS_LIBRARIES}) \ No newline at end of file diff --git a/example/thrift_extension_c++/client.cpp b/example/thrift_extension_c++/client.cpp index b358792434..543f6fbe85 100755 --- a/example/thrift_extension_c++/client.cpp +++ b/example/thrift_extension_c++/client.cpp @@ -44,6 +44,7 @@ int main(int argc, char* argv[]) { // Initialize the channel, NULL means using default options. brpc::ChannelOptions options; + options.connection_type = "single"; options.protocol = brpc::PROTOCOL_THRIFT; options.timeout_ms = FLAGS_timeout_ms/*milliseconds*/; options.max_retry = FLAGS_max_retry; diff --git a/example/thrift_extension_c++/client2.cpp b/example/thrift_extension_c++/client2.cpp index c6caa0a890..a2266cf17e 100644 --- a/example/thrift_extension_c++/client2.cpp +++ b/example/thrift_extension_c++/client2.cpp @@ -26,9 +26,12 @@ #include #include #include +#include +#include +// #include DEFINE_int32(thread_num, 50, "Number of threads to send requests"); -DEFINE_bool(use_bthread, false, "Use bthread to send requests"); +DEFINE_bool(use_bthread, true, "Use bthread to send requests"); DEFINE_int32(request_size, 16, "Bytes of each request"); DEFINE_string(connection_type, "", "Connection type. Available values: single, pooled, short"); DEFINE_string(server, "0.0.0.0:8019", "IP Address of server"); @@ -36,7 +39,7 @@ DEFINE_string(load_balancer, "", "The algorithm for load balancing"); DEFINE_int32(timeout_ms, 100, "RPC timeout in milliseconds"); DEFINE_int32(max_retry, 3, "Max retries(not including the first RPC)"); DEFINE_bool(dont_fail, false, "Print fatal when some call failed"); -DEFINE_int32(dummy_port, -1, "Launch dummy server at this port"); +DEFINE_int32(dummy_port, 32111, "Launch dummy server at this port"); std::string g_request; @@ -48,16 +51,29 @@ static void* sender(void* arg) { // a stub Service wrapping it. stub can be shared by all threads as well. brpc::ThriftStub stub(static_cast(arg)); + // We will receive response synchronously, safe to put variables + // on stack. + example::EchoRequest req; + example::EchoResponse res; + req.__set_data(g_request); + req.__set_need_by_proxy(10); + std::map map_data; + map_data[1] = true; + map_data[2] = false; + req.__set_map_data(map_data); + + using namespace apache::thrift; + auto memBuffer = std::make_shared(); + auto protocol = std::make_shared(memBuffer); + + req.write(protocol.get()); // 结构体序列化为JSON + // uint8_t* buf; + // uint32_t sz; + // memBuffer->getBuffer(&buf, &sz); + LOG(INFO) << "Serialized request: " << memBuffer->getBufferAsString(); while (!brpc::IsAskedToQuit()) { - // We will receive response synchronously, safe to put variables - // on stack. - example::EchoRequest req; - example::EchoResponse res; brpc::Controller cntl; - req.__set_data(g_request); - req.__set_need_by_proxy(10); - // Because `done'(last parameter) is NULL, this function waits until // the response comes back or error occurs(including timedout). stub.CallMethod("Echo", &cntl, &req, &res, NULL); @@ -71,6 +87,7 @@ static void* sender(void* arg) { // is a specific sleeping to prevent this thread from spinning too // fast. You should continue the business logic in a production // server rather than sleeping. + LOG(ERROR) << cntl.ErrorText(); bthread_usleep(50000); } } @@ -143,5 +160,6 @@ int main(int argc, char* argv[]) { } } + return 0; } diff --git a/example/thrift_extension_c++/server2.cpp b/example/thrift_extension_c++/server2.cpp index 920605331c..09a1b69dc2 100755 --- a/example/thrift_extension_c++/server2.cpp +++ b/example/thrift_extension_c++/server2.cpp @@ -50,12 +50,12 @@ class EchoServiceImpl : public brpc::ThriftService { if (cntl->thrift_method_name() == "Echo") { // Proxy request/response to RealEcho, note that as a proxy we // don't need to Cast the messages to native types. - brpc::Controller cntl; + brpc::Controller call_cntl; brpc::ThriftStub stub(&_channel); // TODO: Following Cast<> drops data field from ProxyRequest which // does not recognize the field, should be debugged further. // LOG(INFO) << "req=" << *req->Cast(); - stub.CallMethod("RealEcho", &cntl, req, res, NULL); + stub.CallMethod("RealEcho", &call_cntl, req, res, NULL); done->Run(); } else if (cntl->thrift_method_name() == "RealEcho") { return RealEcho(cntl, req->Cast(), diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index 0196b6d008..f617002a61 100644 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -544,7 +544,7 @@ static void GlobalInitializeOrDieImpl() { policy::SerializeThriftRequest, policy::PackThriftRequest, policy::ProcessThriftRequest, policy::ProcessThriftResponse, policy::VerifyThriftRequest, NULL, NULL, - CONNECTION_TYPE_POOLED_AND_SHORT, "thrift" }; + CONNECTION_TYPE_ALL, "thrift" }; if (RegisterProtocol(PROTOCOL_THRIFT, thrift_binary_protocol) != 0) { exit(1); } diff --git a/src/brpc/parallel_channel.h b/src/brpc/parallel_channel.h index 84e5f342cb..6dec0e8320 100644 --- a/src/brpc/parallel_channel.h +++ b/src/brpc/parallel_channel.h @@ -171,7 +171,7 @@ struct ParallelChannelOptions { // Default: number of sub channels, meaning that the RPC to ParallChannel // does not return unless all sub RPC succeed. // Note: `success_limit' is only valid when `fail_limit' is not set. - int success_limit{ -1}; + int success_limit{-1}; }; // ParallelChannel(aka "pchan") accesses all sub channels simultaneously with diff --git a/src/brpc/policy/http_rpc_protocol.cpp b/src/brpc/policy/http_rpc_protocol.cpp index 872c2897cc..7dd634dc03 100644 --- a/src/brpc/policy/http_rpc_protocol.cpp +++ b/src/brpc/policy/http_rpc_protocol.cpp @@ -126,7 +126,7 @@ CommonStrings::CommonStrings() , AUTHORIZATION("authorization") , ACCEPT_ENCODING("accept-encoding") , CONTENT_ENCODING("content-encoding") - , CONTENT_LENGTH("content_length") + , CONTENT_LENGTH("content-length") , EXPECT("expect") , CONTINUE_100("100-continue") , GZIP("gzip") diff --git a/src/brpc/policy/thrift_protocol.cpp b/src/brpc/policy/thrift_protocol.cpp index 1e25066d9f..dfe996741c 100755 --- a/src/brpc/policy/thrift_protocol.cpp +++ b/src/brpc/policy/thrift_protocol.cpp @@ -572,10 +572,28 @@ void ProcessThriftRequest(InputMessageBase* msg_base) { void ProcessThriftResponse(InputMessageBase* msg_base) { const int64_t start_parse_us = butil::cpuwide_time_us(); DestroyingPtr msg(static_cast(msg_base)); - - // Fetch correlation id that we saved before in `PacThriftRequest' - const CallId cid = { static_cast(msg->socket()->correlation_id()) }; + // The following code was taken from thrift auto generate code + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + uint32_t seq_id = 0; + butil::Status st = ReadThriftMessageBegin(&msg->payload, &fname, &mtype, &seq_id); + + // Fetch correlation id that we saved before in `PackThriftRequest'. Controller* cntl = NULL; + uint64_t cid_value = msg->socket()->correlation_id(); + if (0 == cid_value) { + if (!st.ok()) { + LOG_EVERY_SECOND(WARNING) << "Fail to read thrift message begin: " << st.error_cstr(); + return; + } + + if (!msg->socket()->Sid2Cid(seq_id, &cid_value)) { + LOG_EVERY_SECOND(WARNING) << "Fail to convert seq_id=" << seq_id << " to cid"; + return; + } + } + + CallId cid = {cid_value}; const int rc = bthread_id_lock(cid, (void**)&cntl); if (rc != 0) { LOG_IF(ERROR, rc != EINVAL && rc != EPERM) @@ -594,12 +612,6 @@ void ProcessThriftResponse(InputMessageBase* msg_base) { const int saved_error = cntl->ErrorCode(); do { - // The following code was taken from thrift auto generate code - std::string fname; - ::apache::thrift::protocol::TMessageType mtype; - uint32_t seq_id = 0; // unchecked - - butil::Status st = ReadThriftMessageBegin(&msg->payload, &fname, &mtype, &seq_id); if (!st.ok()) { cntl->SetFailed(ERESPONSE, "%s", st.error_cstr()); break; @@ -683,12 +695,12 @@ void SerializeThriftRequest(butil::IOBuf* request_buf, Controller* cntl, // xxx_pargs write if (req->raw_instance()) { auto out_buffer = - THRIFT_STDCXX::make_shared(); + THRIFT_STDCXX::make_shared(8192); apache::thrift::protocol::TBinaryProtocolT oprot(out_buffer); - oprot.writeMessageBegin( - method_name, ::apache::thrift::protocol::T_CALL, 0/*seq_id*/); - + // The message begin will be written in `PackThriftRequest'. + // oprot.writeMessageBegin( + // method_name, ::apache::thrift::protocol::T_CALL, 0/*seq_id*/); uint32_t xfer = 0; char struct_begin_str[32 + method_name.size()]; char* p = struct_begin_str; @@ -711,7 +723,7 @@ void SerializeThriftRequest(butil::IOBuf* request_buf, Controller* cntl, xfer += oprot.writeStructEnd(); (void)xfer; - oprot.writeMessageEnd(); + // oprot.writeMessageEnd(); oprot.getTransport()->writeEnd(); oprot.getTransport()->flush(); @@ -719,48 +731,57 @@ void SerializeThriftRequest(butil::IOBuf* request_buf, Controller* cntl, uint32_t sz; out_buffer->getBuffer(&buf, &sz); - const thrift_head_t head = { htonl(sz) }; - request_buf->append(&head, sizeof(head)); - request_buf->append(buf, sz); + // const thrift_head_t head = { htonl(sz) }; + // request_buf->append(&head, sizeof(head)); + // request_buf->append(buf, sz); + request_buf->append_user_data(buf, sz, [out_buffer](void*) { (void)out_buffer; }); } else { - const size_t mb_size = ThriftMessageBeginSize(method_name); - char buf[sizeof(thrift_head_t) + mb_size]; - // suppress strict-aliasing warning - thrift_head_t* head = (thrift_head_t*)buf; - head->body_len = htonl(mb_size + req->body.size()); - WriteThriftMessageBegin(buf + sizeof(thrift_head_t), method_name, - ::apache::thrift::protocol::T_CALL, 0/*seq_id*/); - request_buf->append(buf, sizeof(buf)); + // const size_t mb_size = ThriftMessageBeginSize(method_name); + // char buf[sizeof(thrift_head_t) + mb_size]; + // // suppress strict-aliasing warning + // thrift_head_t* head = (thrift_head_t*)buf; + // head->body_len = htonl(mb_size + req->body.size()); + // WriteThriftMessageBegin(buf + sizeof(thrift_head_t), method_name, + // ::apache::thrift::protocol::T_CALL, 0/*seq_id*/); + // request_buf->append(buf, sizeof(buf)); request_buf->append(req->body); } } -void PackThriftRequest( - butil::IOBuf* packet_buf, - SocketMessage**, - uint64_t correlation_id, - const google::protobuf::MethodDescriptor*, - Controller* cntl, - const butil::IOBuf& request, - const Authenticator*) { +void PackThriftRequest(butil::IOBuf* packet_buf, SocketMessage**, uint64_t correlation_id, + const google::protobuf::MethodDescriptor*, Controller* cntl, + const butil::IOBuf& request, const Authenticator*) { ControllerPrivateAccessor accessor(cntl); - if (cntl->connection_type() == CONNECTION_TYPE_SINGLE) { + uint32_t seq_id = 0; + if (CONNECTION_TYPE_SINGLE != cntl->connection_type()) { + // Store `correlation_id' into the socket since thrift protocol can't pack the field. + accessor.get_sending_socket()->set_correlation_id(correlation_id); + } else if (!accessor.get_sending_socket()->Cid2Sid(correlation_id, &seq_id)) { return cntl->SetFailed( - EINVAL, "thrift protocol can't work with CONNECTION_TYPE_SINGLE"); + EINVAL, "Fail to convert correlation_id=%llu", correlation_id); } - // Store `correlation_id' into the socket since thrift protocol can't - // pack the field. - accessor.get_sending_socket()->set_correlation_id(correlation_id); + + // Pack message begin. + const std::string& method_name = cntl->thrift_method_name(); + const size_t mb_size = ThriftMessageBeginSize(method_name); + char buf[sizeof(thrift_head_t) + mb_size]; + // Suppress strict-aliasing warning. + thrift_head_t* head = (thrift_head_t*)buf; + head->body_len = htonl(mb_size + request.size()); + WriteThriftMessageBegin(buf + sizeof(thrift_head_t), method_name, + ::apache::thrift::protocol::T_CALL, seq_id); + packet_buf->append(buf, sizeof(buf)); + // Message body. + packet_buf->append(request); Span* span = accessor.span(); if (span) { - span->set_request_size(request.length()); + span->set_request_size(packet_buf->length()); // TODO: Nowhere to set tracing ids. // request_meta->set_trace_id(span->trace_id()); // request_meta->set_span_id(span->span_id()); // request_meta->set_parent_span_id(span->parent_span_id()); } - packet_buf->append(request); } } // namespace policy diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 73ea309a71..8a89819e9e 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -747,8 +747,9 @@ int Socket::OnCreated(const SocketOptions& options) { _auth_flag_error.store(0, butil::memory_order_relaxed); const int rc2 = bthread_id_create(&_auth_id, NULL, NULL); if (rc2) { - LOG(ERROR) << "Fail to create auth_id: " << berror(rc2); - SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2)); + const char* error_text = berror(rc2); + LOG(ERROR) << "Fail to create auth_id: " << error_text; + SetFailed(rc2, "Fail to create auth_id: %s", error_text); return -1; } _force_ssl = options.force_ssl; @@ -892,10 +893,12 @@ void Socket::BeforeRecycled() { delete _stream_set; _stream_set = NULL; + _cid_resource_pool.clear_resources(); + const SocketId asid = _agent_socket_id.load(butil::memory_order_relaxed); if (asid != INVALID_SOCKET_ID) { SocketUniquePtr ptr; - if (Socket::Address(asid, &ptr) == 0) { + if (Address(asid, &ptr) == 0) { ptr->ReleaseAdditionalReference(); } } @@ -1131,7 +1134,7 @@ int Socket::SetFailed(int error_code, const char* error_fmt, ...) { butil::string_vprintf(&error_text, error_fmt, ap); va_end(ap); } - return VersionedRefWithId::SetFailed(error_code, error_text); + return VersionedRefWithId::SetFailed(error_code, error_text); } int Socket::SetFailed(SocketId id) { @@ -1195,9 +1198,9 @@ void* Socket::ProcessEvent(void* arg) { // `old_head' is last new_head got from this function or (in another word) // tail of current writing list. // `singular_node' is true iff `old_head' is the only node in its list. -bool Socket::IsWriteComplete(Socket::WriteRequest* old_head, +bool Socket::IsWriteComplete(WriteRequest* old_head, bool singular_node, - Socket::WriteRequest** new_tail) { + WriteRequest** new_tail) { CHECK(NULL == old_head->next); // Try to set _write_head to NULL to mark that the write is done. WriteRequest* new_head = old_head; @@ -1319,7 +1322,7 @@ int Socket::Connect(const timespec* abstime, SocketOptions options; options.bthread_tag = _io_event.bthread_tag(); options.user = req; - if (Socket::Create(options, &connect_id) != 0) { + if (Create(options, &connect_id) != 0) { LOG(FATAL) << "Fail to create Socket"; delete req; return -1; @@ -1328,7 +1331,7 @@ int Socket::Connect(const timespec* abstime, // `connect_id'. We hold an additional reference here to // ensure `req' to be valid in this scope SocketUniquePtr s; - CHECK_EQ(0, Socket::Address(connect_id, &s)); + CHECK_EQ(0, Address(connect_id, &s)); // Add `sockfd' into epoll so that `HandleEpollOutRequest' will // be called with `req' when epoll event reaches @@ -1436,7 +1439,7 @@ int Socket::OnOutputEvent(void* user_data, uint32_t, // added into epoll, these sockets miss the signal inside // `SetFailed' and therefore must be signalled here using // `AddressFailedAsWell' to prevent waiting forever - if (Socket::AddressFailedAsWell(id, &s) < 0) { + if (AddressFailedAsWell(id, &s) < 0) { // Ignore recycled sockets return -1; } @@ -1456,7 +1459,7 @@ int Socket::OnOutputEvent(void* user_data, uint32_t, void Socket::HandleEpollOutTimeout(void* arg) { SocketId id = (SocketId)arg; SocketUniquePtr s; - if (Socket::Address(id, &s) != 0) { + if (Address(id, &s) != 0) { return; } EpollOutRequest* req = dynamic_cast(s->user()); @@ -1530,16 +1533,16 @@ int Socket::KeepWriteIfConnected(int fd, int err, void* data) { // Run ssl connect in a new bthread to avoid blocking // the current bthread (thus blocking the EventDispatcher) bthread_t th; - std::unique_ptr thrd_func(brpc::NewCallback( - Socket::CheckConnectedAndKeepWrite, fd, err, data)); + std::unique_ptr thrd_func( + NewCallback(CheckConnectedAndKeepWrite, fd, err, data)); if ((err = bthread_start_background(&th, &BTHREAD_ATTR_NORMAL, RunClosure, thrd_func.get())) == 0) { thrd_func.release(); return 0; - } else { - PLOG(ERROR) << "Fail to start bthread"; - // Fall through with non zero `err' } + + PLOG(ERROR) << "Fail to start bthread"; + // Fall through with non zero `err' } CheckConnectedAndKeepWrite(fd, err, data); return 0; @@ -2316,7 +2319,7 @@ std::ostream& operator<<(std::ostream& os, const ObjectPtr& obj) { void Socket::DebugSocket(std::ostream& os, SocketId id) { SocketUniquePtr ptr; - int ret = Socket::AddressFailedAsWell(id, &ptr); + int ret = AddressFailedAsWell(id, &ptr); if (ret < 0) { os << "SocketId=" << id << " is invalid or recycled"; return; @@ -2960,6 +2963,27 @@ int Socket::PeekAgentSocket(SocketUniquePtr* out) const { return Address(id, out); } +bool Socket::Cid2Sid(uint64_t cid, uint32_t* seq_id) { + uint64_t* cid_ptr = _cid_resource_pool.get_resource(seq_id); + if (NULL == cid_ptr) { + return false; + } + + *cid_ptr = cid; + return true; +} + +bool Socket::Sid2Cid(uint32_t sid, uint64_t* cid) { + uint64_t* cid_ptr = _cid_resource_pool.address_resource(sid); + if (NULL == cid_ptr) { + return false; + } + + *cid = *cid_ptr; + _cid_resource_pool.return_resource(sid); + return true; +} + void Socket::GetStat(SocketStat* s) const { BAIDU_CASSERT(offsetof(Socket, _preferred_index) >= 64, different_cacheline); BAIDU_CASSERT(sizeof(WriteRequest) == 64, sizeof_write_request_is_64); diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 03ad43f867..12c2cf0088 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -42,6 +42,7 @@ #include "brpc/event_dispatcher.h" #include "brpc/versioned_ref_with_id.h" #include "brpc/health_check_option.h" +#include "butil/thread_key.h" namespace brpc { namespace policy { @@ -62,6 +63,283 @@ class AuthContext; class EventDispatcher; class Stream; +class CIdResourcePool { +public: + static constexpr size_t RP_MAX_BLOCK_NGROUP = 65536; + static constexpr size_t RP_GROUP_NBLOCK_NBIT = 16; + static constexpr size_t RP_GROUP_NBLOCK = (1UL << RP_GROUP_NBLOCK_NBIT); + static constexpr size_t RP_INITIAL_FREE_LIST_SIZE = 1024; + + static constexpr size_t BLOCK_NITEM = 64 * 1024 / sizeof(uint64_t); + static constexpr size_t FREE_CHUNK_NITEM = BLOCK_NITEM; + static constexpr size_t LOCAL_FREE_CHUNK_NITEM = 256; + + // Free identifiers are batched in a FreeChunk before they're added to + // global list(_free_chunks). + template + struct ResourcePoolFreeChunk { + size_t nfree{0}; + uint32_t ids[NITEM]{}; + }; + + typedef ResourcePoolFreeChunk FreeChunk; + typedef ResourcePoolFreeChunk<0> DynamicFreeChunk; + + // When a thread needs memory, it allocates a Block. To improve locality, + // items in the Block are only used by the thread. + struct BAIDU_CACHELINE_ALIGNMENT Block { + uint64_t items[BLOCK_NITEM]{}; + size_t nitem{0}; + }; + + // A Resource addresses at most RP_MAX_BLOCK_NGROUP BlockGroups, + // each BlockGroup addresses at most RP_GROUP_NBLOCK blocks. So a + // resource addresses at most RP_MAX_BLOCK_NGROUP * RP_GROUP_NBLOCK Blocks. + struct BlockGroup { + butil::atomic nblock{0}; + butil::atomic blocks[RP_GROUP_NBLOCK]{}; + }; + + // Each thread has an instance of this class. + class BAIDU_CACHELINE_ALIGNMENT LocalPool { + public: + explicit LocalPool(CIdResourcePool* pool) + : _pool(pool) + , _cur_block(NULL) + , _cur_block_index(0) {} + + ~LocalPool() { + // Add to global _free_chunks if there're some free resources + if (_cur_free.nfree > 0) { + _pool->push_free_chunk(_cur_free); + } + + // todo + // _pool->clear_from_destructor_of_local_pool(); + } + + void clear_resources() { + _cur_free.nfree = 0; + _cur_block = NULL; + _cur_block_index = 0; + } + + + uint64_t* get(uint32_t* id) { + // Fetch local free id. + if (_cur_free.nfree) { + uint32_t free_id = _cur_free.ids[--_cur_free.nfree]; + *id = free_id; + return _pool->unsafe_address_resource(free_id); + } + // Fetch a FreeChunk from global. + // TODO: Popping from _free needs to copy a FreeChunk which is + // costly, but hardly impacts amortized performance. + if (_pool->pop_free_chunk(_cur_free)) { + --_cur_free.nfree; + uint32_t free_id = _cur_free.ids[_cur_free.nfree]; + *id = free_id; + return _pool->unsafe_address_resource(free_id); + } + // Fetch memory from local block + if (_cur_block && _cur_block->nitem < BLOCK_NITEM) { + *id = _cur_block_index * BLOCK_NITEM + _cur_block->nitem; + auto item = _cur_block->items + _cur_block->nitem; + ++_cur_block->nitem; + return item; + } + // Fetch a Block from global. + _cur_block = _pool->add_block(&_cur_block_index); + if (NULL != _cur_block) { + *id = _cur_block_index * BLOCK_NITEM + _cur_block->nitem; + auto item = _cur_block->items + _cur_block->nitem; + ++_cur_block->nitem; + return item; + } + return NULL; + } + + int return_resource(uint32_t id) { + // Return to local free list + if (_cur_free.nfree < LOCAL_FREE_CHUNK_NITEM) { + _cur_free.ids[_cur_free.nfree++] = id; + return 0; + } + // Local free list is full, return it to global. + // For copying issue, check comment in upper get() + if (_pool->push_free_chunk(_cur_free)) { + _cur_free.nfree = 1; + _cur_free.ids[0] = id; + return 0; + } + return -1; + } + + private: + CIdResourcePool* _pool; + Block* _cur_block; + size_t _cur_block_index; + FreeChunk _cur_free; + }; + + CIdResourcePool() + : _local_pool(true) { + _free_chunks.reserve(RP_INITIAL_FREE_LIST_SIZE); + } + + ~CIdResourcePool() { + clear_resources(); + } + + uint64_t* unsafe_address_resource(uint32_t id) { + const size_t block_index = id / BLOCK_NITEM; + return (uint64_t*)(_block_groups[(block_index >> RP_GROUP_NBLOCK_NBIT)] + .load(butil::memory_order_consume) + ->blocks[(block_index & (RP_GROUP_NBLOCK - 1))] + .load(butil::memory_order_consume)->items) + + id - block_index * BLOCK_NITEM; + } + + uint64_t* address_resource(uint32_t id) { + const size_t block_index = id / BLOCK_NITEM; + const size_t group_index = (block_index >> RP_GROUP_NBLOCK_NBIT); + if (BAIDU_LIKELY(group_index < RP_MAX_BLOCK_NGROUP)) { + BlockGroup* bg = + _block_groups[group_index].load(butil::memory_order_consume); + if (BAIDU_LIKELY(bg != NULL)) { + Block* b = bg->blocks[block_index & (RP_GROUP_NBLOCK - 1)] + .load(butil::memory_order_consume); + if (BAIDU_LIKELY(b != NULL)) { + const size_t offset = id - block_index * BLOCK_NITEM; + if (BAIDU_LIKELY(offset < b->nitem)) { + return (uint64_t*)b->items + offset; + } + } + } + } + return NULL; + } + + uint64_t* get_resource(uint32_t* id) { + return _local_pool.get(this)->get(id); + } + + int return_resource(uint32_t id) { + LocalPool* pool = _local_pool.get_or_null(); + if (NULL == pool) { + return -1; + } + return pool->return_resource(id);; + } + + // Clear all resources: LocalPool, DynamicFreeChunk, Block and BlockGroup. + void clear_resources() { + // todo 析构的时候应该不需要清空local pool,直接析构即可 + _local_pool.for_each([](LocalPool* lp) { + lp->clear_resources(); + }); + + BAIDU_SCOPED_LOCK(_free_chunks_mutex); + for (auto& item : _free_chunks) { + free(item); + } + _free_chunks.clear(); + size_t ngroup = _ngroup.exchange(0, butil::memory_order_acquire); + for (size_t i = 0; i < ngroup; ++i) { + BlockGroup* group = _block_groups[i].exchange(NULL, butil::memory_order_acquire); + if (group != NULL) { + for (size_t j = 0; j < group->nblock.load(butil::memory_order_relaxed); ++j) { + delete group->blocks[j].load(butil::memory_order_acquire); + } + delete group; + } + } + } + + // Create a Block and append it to right-most BlockGroup. + Block* add_block(size_t* index) { + auto new_block = new Block; + size_t ngroup; + do { + ngroup = _ngroup.load(butil::memory_order_acquire); + if (ngroup >= 1) { + BlockGroup* g = + _block_groups[ngroup - 1].load(butil::memory_order_consume); + size_t block_index = + g->nblock.fetch_add(1, butil::memory_order_relaxed); + if (block_index < RP_GROUP_NBLOCK) { + g->blocks[block_index].store( + new_block, butil::memory_order_release); + *index = (ngroup - 1) * RP_GROUP_NBLOCK + block_index; + return new_block; + } + g->nblock.fetch_sub(1, butil::memory_order_relaxed); + } + add_block_group(ngroup); + } while (true); + } + + // Create a BlockGroup and append it to _block_groups. + // Shall be called infrequently because a BlockGroup is pretty big. + void add_block_group(size_t old_ngroup) { + BAIDU_SCOPED_LOCK(_block_group_mutex); + const size_t ngroup = _ngroup.load(butil::memory_order_acquire); + if (ngroup != old_ngroup) { + // Other thread got lock and added group before this thread. + return; + } + if (ngroup < RP_MAX_BLOCK_NGROUP) { + // Release fence is paired with consume fence in address() and + // add_block() to avoid un-constructed bg to be seen by other + // threads. + _block_groups[ngroup].store(new BlockGroup, butil::memory_order_release); + _ngroup.store(ngroup + 1, butil::memory_order_release); + } + } + + bool pop_free_chunk(FreeChunk& c) { + // Critical for the case that most return_object are called in + // different threads of get_object. + if (_free_chunks.empty()) { + return false; + } + std::unique_lock lock(_free_chunks_mutex); + if (_free_chunks.empty()) { + return false; + } + DynamicFreeChunk* p = _free_chunks.back(); + _free_chunks.pop_back(); + lock.unlock(); + + c.nfree = p->nfree; + memcpy(c.ids, p->ids, sizeof(*p->ids) * p->nfree); + free(p); + return true; + } + + bool push_free_chunk(const FreeChunk& c) { + DynamicFreeChunk* p = (DynamicFreeChunk*)malloc( + offsetof(DynamicFreeChunk, ids) + sizeof(*c.ids) * c.nfree); + if (NULL == p) { + return false; + } + p->nfree = c.nfree; + memcpy(p->ids, c.ids, sizeof(*c.ids) * c.nfree); + BAIDU_SCOPED_LOCK(_free_chunks_mutex); + _free_chunks.push_back(p); + return true; + } + +private: + butil::ThreadLocal _local_pool; + butil::atomic _ngroup{0}; + butil::Mutex _block_group_mutex; + butil::atomic _block_groups[RP_MAX_BLOCK_NGROUP]{}; + + std::vector _free_chunks; + butil::Mutex _free_chunks_mutex; +}; + // A special closure for processing the about-to-recycle socket. Socket does // not delete SocketUser, if you want, `delete this' at the end of // BeforeRecycle(). @@ -645,6 +923,10 @@ friend void DereferenceSocket(Socket*); void set_http_request_method(const HttpMethod& method) { _http_request_method = method; } HttpMethod http_request_method() const { return _http_request_method; } + // Convert between correlation id of Controller and seq id of thrift protocol. + bool Cid2Sid(uint64_t cid, uint32_t* seq_id); + bool Sid2Cid(uint32_t sid, uint64_t* cid); + private: DISALLOW_COPY_AND_ASSIGN(Socket); @@ -980,6 +1262,8 @@ friend void DereferenceSocket(Socket*); HttpMethod _http_request_method; HealthCheckOption _hc_option; + + CIdResourcePool _cid_resource_pool; }; } // namespace brpc diff --git a/src/butil/containers/flat_map.h b/src/butil/containers/flat_map.h index 54981b81e9..181dff77a3 100644 --- a/src/butil/containers/flat_map.h +++ b/src/butil/containers/flat_map.h @@ -173,7 +173,7 @@ class FlatMap { ~FlatMap(); FlatMap& operator=(const FlatMap& rhs); - void swap(FlatMap & rhs); + void swap(FlatMap & rhs) noexcept; // FlatMap will be automatically initialized with small FlatMap optimization, // so this function only needs to be call when a large initial number of diff --git a/src/butil/containers/flat_map_inl.h b/src/butil/containers/flat_map_inl.h index 93bcbf9d51..84c091940c 100644 --- a/src/butil/containers/flat_map_inl.h +++ b/src/butil/containers/flat_map_inl.h @@ -330,7 +330,7 @@ int FlatMap<_K, _T, _H, _E, _S, _A, _M>::init(size_t nbucket, u_int load_factor) template void FlatMap<_K, _T, _H, _E, _S, _A, _M>::swap( - FlatMap<_K, _T, _H, _E, _S, _A, _M>& rhs) { + FlatMap<_K, _T, _H, _E, _S, _A, _M>& rhs) noexcept { if (!is_default_buckets() && !rhs.is_default_buckets()) { std::swap(rhs._buckets, _buckets); std::swap(rhs._thumbnail, _thumbnail); diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h index 239e82d950..acababb378 100644 --- a/src/butil/iobuf.h +++ b/src/butil/iobuf.h @@ -202,6 +202,8 @@ friend class SingleIOBuf; static ssize_t cut_multiple_into_SSL_channel( struct ssl_st* ssl, IOBuf* const* pieces, size_t count, int* ssl_error); + static int ssl_flush(struct ssl_st* ssl, int* ssl_error); + // Append another IOBuf to back side, payload of the IOBuf is shared // rather than copied. void append(const IOBuf& other); diff --git a/src/butil/thread_key.h b/src/butil/thread_key.h index c150528b63..72b177daa3 100644 --- a/src/butil/thread_key.h +++ b/src/butil/thread_key.h @@ -18,16 +18,21 @@ #ifndef BUTIL_THREAD_KEY_H #define BUTIL_THREAD_KEY_H -#include #include #include +#include #include +#include +#include + #include "butil/scoped_lock.h" #include "butil/type_traits.h" +#include "butil/shared_object.h" +#include "butil/synchronization/lock.h" namespace butil { -typedef void (*DtorFunction)(void *); +using DtorFunction = std::function; class ThreadKey { public: @@ -95,18 +100,98 @@ void* thread_getspecific(ThreadKey& thread_key); template class ThreadLocal { + class InternalObject { + public: + template + void for_each(Callback&& callback) { + BAIDU_SCOPED_LOCK(mutex); + for (auto ptr : ptrs) { + callback(ptr); + } + } + + void push(T* ptr) { + BAIDU_SCOPED_LOCK(mutex); + ptrs.push_back(ptr); + } + + void replace(T* old_ptr, T* new_ptr) { + BAIDU_SCOPED_LOCK(mutex); + auto it = std::find(ptrs.begin(), ptrs.end(), old_ptr); + CHECK(it != ptrs.end()); + *it = new_ptr; + } + + void remove(T* ptr) { + // Remove and delete old_ptr. + if (NULL == ptr) { + return; + } + BAIDU_SCOPED_LOCK(mutex); + auto iter = std::find(ptrs.begin(), ptrs.end(), ptr); + if (iter != ptrs.end()) { + ptrs.erase(iter, ptrs.end()); + delete ptr; + } + } + + void remove_all() { + BAIDU_SCOPED_LOCK(mutex); + for (auto ptr : ptrs) { + delete ptr; + } + } + + private: + Mutex mutex; + // All pointers of data allocated by the ThreadLocal. + std::vector ptrs; + }; + public: ThreadLocal() : ThreadLocal(false) {} - explicit ThreadLocal(bool delete_on_thread_exit); + explicit ThreadLocal(bool delete_at_thread_exit) + : _object(std::make_shared()) + , _delete_at_thread_exit(delete_at_thread_exit) { + DtorFunction dtor; + if (_delete_at_thread_exit) { + // Remove and delete the thread local object at thread exit. + std::weak_ptr weak_object(_object); + dtor = [weak_object](void* ptr) { + auto object = weak_object.lock(); + if (NULL != object) { + object->remove(static_cast(ptr)); + } // else the ThreadLocal is destructed, do nothing. + }; + } + CHECK_EQ(0, thread_key_create(_key, dtor)); + } - ~ThreadLocal(); + ~ThreadLocal() { + thread_key_delete(_key); + _object->remove_all(); + } - // non-copyable - ThreadLocal(const ThreadLocal&) = delete; - ThreadLocal& operator=(const ThreadLocal&) = delete; + DISALLOW_COPY(ThreadLocal); - T* get(); + // Returns the thread local object if existed, NULL otherwise. + T* get_or_null() { + return static_cast(thread_getspecific(_key)); + } + + // Returns the thread local object, create one if not exist. + // Args are passed to T's constructor. + template + T* get(Args&&... args) { + T* ptr = static_cast(thread_getspecific(_key)); + if (NULL == ptr) { + ptr = new T(std::forward(args)...); + CHECK_EQ(0, thread_setspecific(_key, ptr)); + _object->push(ptr); + } + return ptr; + } T* operator->() { return get(); } @@ -117,104 +202,36 @@ class ThreadLocal { // will be called under a thread lock. template void for_each(Callback&& callback) { - BAIDU_CASSERT( - (is_result_void::value), - "Callback must accept Args params and return void"); - BAIDU_SCOPED_LOCK(_mutex); - for (auto ptr : ptrs) { - callback(ptr); - } + BAIDU_CASSERT((is_result_void::value), + "Callback must accept Args params and return void"); + _object->for_each(std::forward(callback)); } - void reset(T* ptr); + // Set the thread local object to `ptr', and delete the old one if any. + // If `ptr' is same as the old one, do nothing. + void reset(T* ptr) { + T* old_ptr = get(); + if (ptr == old_ptr) { + return; + } + if (thread_setspecific(_key, ptr) != 0) { + return; + } + _object->replace(old_ptr, ptr); + } void reset() { reset(NULL); } private: - static void DefaultDtor(void* ptr) { - if (ptr) { - delete static_cast(ptr); - } - } - ThreadKey _key; - pthread_mutex_t _mutex; - // All pointers of data allocated by the ThreadLocal. - std::vector ptrs; + std::shared_ptr _object; // Delete data on thread exit or destructor of ThreadLocal. - bool _delete_on_thread_exit; + bool _delete_at_thread_exit; }; -template -ThreadLocal::ThreadLocal(bool delete_on_thread_exit) - : _mutex(PTHREAD_MUTEX_INITIALIZER) - , _delete_on_thread_exit(delete_on_thread_exit) { - DtorFunction dtor = _delete_on_thread_exit ? DefaultDtor : NULL; - thread_key_create(_key, dtor); -} - - -template -ThreadLocal::~ThreadLocal() { - thread_key_delete(_key); - if (!_delete_on_thread_exit) { - BAIDU_SCOPED_LOCK(_mutex); - for (auto ptr : ptrs) { - DefaultDtor(ptr); - } - } - pthread_mutex_destroy(&_mutex); -} - -template -T* ThreadLocal::get() { - T* ptr = static_cast(thread_getspecific(_key)); - if (!ptr) { - ptr = new (std::nothrow) T; - if (!ptr) { - return NULL; - } - int rc = thread_setspecific(_key, ptr); - if (rc != 0) { - DefaultDtor(ptr); - return NULL; - } - { - BAIDU_SCOPED_LOCK(_mutex); - ptrs.push_back(ptr); - } - } - return ptr; -} - -template -void ThreadLocal::reset(T* ptr) { - T* old_ptr = get(); - if (ptr == old_ptr) { - return; - } - if (thread_setspecific(_key, ptr) != 0) { - return; - } - { - BAIDU_SCOPED_LOCK(_mutex); - if (ptr) { - ptrs.push_back(ptr); - } - // Remove and delete old_ptr. - if (old_ptr) { - auto iter = std::remove(ptrs.begin(), ptrs.end(), old_ptr); - if (iter != ptrs.end()) { - ptrs.erase(iter, ptrs.end()); - } - DefaultDtor(old_ptr); - } - } -} - -} +} // namespace butil #endif // BUTIL_THREAD_KEY_H diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index a478e8cc89..1c6dd50ae1 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -42,6 +42,19 @@ compile_proto(PROTO_HDRS PROTO_SRCS ${CMAKE_BINARY_DIR}/test "${TEST_PROTO_FILES}") add_library(TEST_PROTO_LIB OBJECT ${PROTO_SRCS} ${PROTO_HDRS}) +# Search for libthrift* by best effort. If it is not found and brpc is +# compiled with thrift protocol enabled, a link error would be reported. +find_library(THRIFT_LIB NAMES thrift) +if (NOT THRIFT_LIB) + message(FATAL_ERROR "Fail to find thrift") +endif() + +execute_process( + COMMAND thrift -o ${CMAKE_CURRENT_BINARY_DIR} --gen cpp ${CMAKE_CURRENT_SOURCE_DIR}/echo.thrift +) +include_directories(${CMAKE_CURRENT_BINARY_DIR}/gen-cpp) +add_library(THRIFT_TYPE_LIB OBJECT ${CMAKE_CURRENT_BINARY_DIR}/gen-cpp/echo_types.cpp) + set(BRPC_SYSTEM_GTEST_SOURCE_DIR "" CACHE PATH "System googletest source directory.") if(DOWNLOAD_GTEST) @@ -171,6 +184,7 @@ SET(TEST_BUTIL_SOURCES ${PROJECT_SOURCE_DIR}/test/flat_map_unittest.cpp ${PROJECT_SOURCE_DIR}/test/crc32c_unittest.cc ${PROJECT_SOURCE_DIR}/test/iobuf_unittest.cpp + ${PROJECT_SOURCE_DIR}/test/resource_pool_unittest.cpp ${PROJECT_SOURCE_DIR}/test/object_pool_unittest.cpp ${PROJECT_SOURCE_DIR}/test/test_switches.cc ${PROJECT_SOURCE_DIR}/test/scoped_locale.cc @@ -206,7 +220,8 @@ set_property(TARGET ${SOURCES_DEBUG_LIB} PROPERTY POSITION_INDEPENDENT_CODE 1) add_library(brpc-shared-debug SHARED $ $ - $) + $ + $) # change the debug lib output dir to be different from the release output set_target_properties(brpc-shared-debug PROPERTIES LIBRARY_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/test) diff --git a/test/Makefile b/test/Makefile index 30e196bbce..8e0644fa6b 100644 --- a/test/Makefile +++ b/test/Makefile @@ -193,7 +193,7 @@ all: $(TEST_BINS) .PHONY:clean clean:clean_bins @echo "> Cleaning" - rm -rf $(TEST_BUTIL_OBJS) $(TEST_BVAR_OBJS) $(TEST_BTHREAD_OBJS) $(TEST_BRPC_OBJS) $(TEST_PROTO_OBJS) $(TEST_PROTO_SOURCES:.proto=.pb.h) $(TEST_PROTO_SOURCES:.proto=.pb.cc) + rm -rf $(TEST_BUTIL_OBJS) $(TEST_BVAR_OBJS) $(TEST_BTHREAD_OBJS) $(TEST_BRPC_OBJS) $(TEST_PROTO_OBJS) $(TEST_PROTO_SOURCES:.proto=.pb.h) $(TEST_PROTO_SOURCES:.proto=.pb.cc) gen-cpp $(MAKE) -C.. clean_debug .PHONY:clean_bins @@ -234,7 +234,7 @@ else ifeq ($(SYSTEM),Darwin) $(CXX) -o $@ $(LIBPATHS) $(SOPATHS) $^ $(GTEST_STATIC_LINKINGS) $(UT_DYNAMIC_LINKINGS) endif -brpc_%_unittest:$(TEST_PROTO_OBJS) brpc_%_unittest.o | libbrpc.dbg.$(SOEXT) +brpc_%_unittest:$(TEST_PROTO_OBJS) gen-cpp/echo_types.o brpc_%_unittest.o | libbrpc.dbg.$(SOEXT) @echo "> Linking $@" ifeq ($(SYSTEM),Linux) $(CXX) -o $@ $(LIBPATHS) $(SOPATHS) -Xlinker "-(" $^ -Xlinker "-)" $(STATIC_LINKINGS) $(UT_DYNAMIC_LINKINGS) @@ -262,3 +262,10 @@ brpc_h2_unsent_message_unittest.o:brpc_h2_unsent_message_unittest.cpp | libbrpc. @echo "> Compiling $@" $(CXX) -c $(HDRPATHS) $(CXXFLAGS) $< -o $@ +gen-cpp/%_types.cpp gen-cpp/%_types.h:%.thrift + @echo "> Generating Thrift C++ code from $<" + thrift --gen cpp $< + +gen-cpp/%_types.o:gen-cpp/%_types.cpp + @echo "> Compiling $@" + $(CXX) -c $(HDRPATHS) -O2 $(CXXFLAGS) $< -o $@ diff --git a/test/brpc_http_rpc_protocol_unittest.cpp b/test/brpc_http_rpc_protocol_unittest.cpp index f13c6877f7..ff619c02ab 100644 --- a/test/brpc_http_rpc_protocol_unittest.cpp +++ b/test/brpc_http_rpc_protocol_unittest.cpp @@ -2020,7 +2020,7 @@ void ReadOneResponse(brpc::SocketUniquePtr& sock, continue; } brpc::ParseResult pr = brpc::policy::ParseHttpMessage(&read_buf, sock.get(), false, NULL); - ASSERT_TRUE(pr.error() == brpc::PARSE_ERROR_NOT_ENOUGH_DATA || pr.is_ok()); + ASSERT_TRUE(pr.error() == brpc::PARSE_ERROR_NOT_ENOUGH_DATA || pr.is_ok()) << pr.error_str(); if (pr.is_ok()) { imsg_guard.reset(static_cast(pr.message())); break; @@ -2044,6 +2044,7 @@ TEST_F(HttpTest, http_expect) { ASSERT_EQ(0, brpc::Socket::Create(options, &id)); brpc::SocketUniquePtr sock; ASSERT_EQ(0, brpc::Socket::Address(id, &sock)); + sock->set_http_request_method(brpc::HTTP_METHOD_POST); butil::IOBuf content; content.append("hello"); diff --git a/test/brpc_thrift_protocol_unittest.cpp b/test/brpc_thrift_protocol_unittest.cpp new file mode 100644 index 0000000000..d22b788fee --- /dev/null +++ b/test/brpc_thrift_protocol_unittest.cpp @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#include +#include +#include "brpc/controller.h" +#include "brpc/server.h" +#include "brpc/channel.h" +#include "brpc/thrift_service.h" +#include "brpc/thrift_message.h" +#include "gen-cpp/echo_types.h" + +int main(int argc, char* argv[]) { + testing::InitGoogleTest(&argc, argv); + GFLAGS_NAMESPACE::ParseCommandLineFlags(&argc, &argv, true); + return RUN_ALL_TESTS(); +} + +namespace { + +const std::string g_server_addr = "127.0.0.1:8011"; +const std::string echo_message = "hello"; +const std::string echo_message_suffix = " (Echo)"; + +class EchoServiceImpl : public brpc::ThriftService { +public: + void ProcessThriftFramedRequest(brpc::Controller* cntl, + brpc::ThriftFramedMessage* req, + brpc::ThriftFramedMessage* res, + google::protobuf::Closure* done) override { + // Dispatch calls to different methods + if (cntl->thrift_method_name() == "Echo") { + return Echo(cntl, req->Cast(), + res->Cast(), done); + } + + brpc::ClosureGuard done_guard(done); + cntl->SetFailed(brpc::ENOMETHOD, "Fail to find method=%s", + cntl->thrift_method_name().c_str()); + } + + void Echo(brpc::Controller* cntl, + const example::EchoRequest* req, + example::EchoResponse* res, + google::protobuf::Closure* done) { + // This object helps you to call done->Run() in RAII style. If you need + // to process the request asynchronously, pass done_guard.release(). + brpc::ClosureGuard done_guard(done); + + if (req->sleep_us > 0) { + bthread_usleep(req->sleep_us); + } + + res->message = req->message; + res->message.append(echo_message_suffix); + } +}; + +class ThriftTest : public ::testing::Test { +protected: + ThriftTest() { + brpc::ServerOptions server_options; + server_options.thrift_service = new EchoServiceImpl; + EXPECT_EQ(0, _server.Start(g_server_addr.c_str(), &server_options)); + + brpc::ChannelOptions options; + options.protocol = brpc::PROTOCOL_THRIFT; + EXPECT_EQ(0, _channel.Init(g_server_addr.c_str(), "", &options)); + } + + void CallMethod(brpc::ConnectionType type = brpc::CONNECTION_TYPE_POOLED) { + example::EchoRequest req; + example::EchoResponse res; + brpc::Controller cntl; + cntl.set_connection_type(type); + req.__set_message(echo_message); + req.__set_sleep_us(butil::fast_rand_in(100, 1000)); + + brpc::ThriftStub stub(&_channel); + + stub.CallMethod("Echo", &cntl, &req, &res, NULL); + ASSERT_FALSE(cntl.Failed()) << cntl.ErrorCode() << ": " << cntl.ErrorText(); + ASSERT_EQ(res.message, echo_message + echo_message_suffix); + } + + brpc::Server _server; + brpc::Channel _channel; +}; + +TEST_F(ThriftTest, sanity) { + std::vector threads; + threads.reserve(100); + for (int i = 0; i < 100; ++i) { + bthread_t tid; + ASSERT_EQ(0, bthread_start_background(&tid, NULL, [](void* arg) -> void* { + auto t = (ThriftTest*)arg; + for (int j = 0; j < 1000; ++j) { + t->CallMethod(); + } + return NULL; + }, this)); + threads.push_back(tid); + } + for (auto t : threads) { + bthread_join(t, NULL); + } +} + +TEST_F(ThriftTest, single) { + std::vector threads; + threads.reserve(100); + for (int i = 0; i < 100; ++i) { + bthread_t tid; + ASSERT_EQ(0, bthread_start_background(&tid, NULL, [](void* arg) -> void* { + auto t = (ThriftTest*)arg; + for (int j = 0; j < 1000; ++j) { + t->CallMethod(brpc::CONNECTION_TYPE_SINGLE); + } + return NULL; + }, this)); + threads.push_back(tid); + } + for (auto t : threads) { + bthread_join(t, NULL); + } +} + +} // namespace diff --git a/test/echo.thrift b/test/echo.thrift new file mode 100644 index 0000000000..21a4a2c94b --- /dev/null +++ b/test/echo.thrift @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +namespace cpp example + +struct EchoRequest { + 1: optional string message; + 2: optional i32 sleep_us; +} + +struct EchoResponse { + 1: required string message; +} + +service EchoService { + EchoResponse Echo(1:EchoRequest request); +} + diff --git a/test/resource_pool_unittest.cpp b/test/resource_pool_unittest.cpp index 9a56ff3bb5..23ecb90c12 100644 --- a/test/resource_pool_unittest.cpp +++ b/test/resource_pool_unittest.cpp @@ -266,6 +266,9 @@ TEST_F(ResourcePoolTest, get_perf) { get_resource(&id); } tm1.stop(); + for (size_t i = 0; i < N; ++i) { + return_resource(id); + } printf("get a SilentObj takes %luns\n", tm1.n_elapsed()/N); //clear_resources(); // free all blocks