From 9513749c2bf6f374aca1717ead8937e796150eab Mon Sep 17 00:00:00 2001 From: Congyu <52687642+Congyuwang@users.noreply.github.com> Date: Wed, 1 Nov 2023 17:59:01 +0800 Subject: [PATCH] Unique sender (#51) * make sender unique pointer * bump version to v0.5.0 * just use latest nightly * remove PR build MACOS --- .github/workflows/PR.yml | 86 ------------------- CMakeLists.txt | 2 +- Cargo.toml | 2 +- examples/echo_server/CMakeLists.txt | 2 +- examples/echo_server/src/echo_server.cpp | 6 +- include/socket_manager/conn_callback.h | 2 +- socket_manager/socket_manager_c_api.cc | 2 +- tests/test_auto_flush.cpp | 13 +-- tests/test_callback_throw_error.cpp | 16 ++-- tests/test_find_package/CMakeLists.txt | 2 +- tests/test_find_package/helloworld_server.cpp | 17 ++-- tests/test_manual_flush.cpp | 15 ++-- tests/test_utils.h | 9 +- tests/transfer_common.h | 22 ++--- 14 files changed, 57 insertions(+), 139 deletions(-) delete mode 100644 .github/workflows/PR.yml diff --git a/.github/workflows/PR.yml b/.github/workflows/PR.yml deleted file mode 100644 index a974478..0000000 --- a/.github/workflows/PR.yml +++ /dev/null @@ -1,86 +0,0 @@ -name: PR Check - -on: - push: - branches: [ "main" ] - pull_request: - branches: [ "main" ] - -env: - # Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.) - BUILD_TYPE: Release - -jobs: - build-macos: - runs-on: macos-latest - - strategy: - matrix: - shared: ["ON", "OFF"] - - steps: - - uses: actions/checkout@v3 - with: - submodules: true - - - name: Install LLVM and Clang - run: | - brew update || true - brew install llvm@17 || true - - - name: Install Rust toolchain - uses: actions-rs/toolchain@v1 - with: - toolchain: nightly-2023-07-07 - profile: minimal - - - name: Configure CMake - # Configure CMake in a 'build' subdirectory. `CMAKE_BUILD_TYPE` is only required if you are using a single-configuration generator such as make. - # See https://cmake.org/cmake/help/latest/variable/CMAKE_BUILD_TYPE.html?highlight=cmake_build_type - run: | - eval "$(brew shellenv)" - cmake -B ${{github.workspace}}/build \ - -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}} \ - -DCMAKE_TOOLCHAIN_FILE=toolchain.cmake \ - -DCMAKE_INTERPROCEDURAL_OPTIMIZATION=ON \ - -DBUILD_SHARED_LIBS=${{ matrix.shared }} - - - name: Build - # Build your program with the given configuration - run: cmake --build ${{github.workspace}}/build --parallel 4 --config ${{env.BUILD_TYPE}} --verbose - - - name: Test - working-directory: ${{github.workspace}}/build - # Execute tests defined by the CMake configuration. - # See https://cmake.org/cmake/help/latest/manual/ctest.1.html for more detail - run: ctest -C ${{env.BUILD_TYPE}} --output-on-failure - - - name: Install - run: sudo cmake --install build --config Release - - - name: Test Linking - working-directory: ${{github.workspace}}/tests/test_find_package - run: | - eval "$(brew shellenv)" - cmake -B build -DCMAKE_BUILD_TYPE=Release - cmake --build build --config Release - ./build/helloworld_server & - SERVER_PID=$! - # Give the server time to start - sleep 1 - rsp=$(curl http://127.0.0.1:49999) - kill $SERVER_PID - if [[ "$rsp" == "Hello, world" ]]; then - echo "Test passed" - exit 0 - else - echo "Test failed" - exit 1 - fi - - - name: Test Build Examples - working-directory: ${{github.workspace}}/examples/echo_server - run: | - eval "$(brew shellenv)" - cmake -B build -DCMAKE_BUILD_TYPE=Release - cmake --build build --config Release diff --git a/CMakeLists.txt b/CMakeLists.txt index 0645309..a7dbf83 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ set(CMAKE_CXX_STANDARD 17) set(CMAKE_TOOLCHAIN_FILE ${CMAKE_SOURCE_DIR}/toolchain.cmake) # define project -project(socket_manager LANGUAGES C CXX VERSION 0.4.0) +project(socket_manager LANGUAGES C CXX VERSION 0.5.0) # set default build type as shared option(BUILD_SHARED_LIBS "Build using shared libraries" ON) diff --git a/Cargo.toml b/Cargo.toml index b31fb44..16f10da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tokio-socket-manager" -version = "0.4.0" +version = "0.5.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/examples/echo_server/CMakeLists.txt b/examples/echo_server/CMakeLists.txt index 59b769c..a7ed827 100644 --- a/examples/echo_server/CMakeLists.txt +++ b/examples/echo_server/CMakeLists.txt @@ -8,5 +8,5 @@ set(CMAKE_CXX_STANDARD 20) add_executable(echo_server src/echo_server.cpp) set_property(TARGET echo_server PROPERTY INTERPROCEDURAL_OPTIMIZATION TRUE) -find_package(socket_manager 0.4.0 REQUIRED) +find_package(socket_manager 0.5.0 REQUIRED) target_link_libraries(echo_server PUBLIC socket_manager) diff --git a/examples/echo_server/src/echo_server.cpp b/examples/echo_server/src/echo_server.cpp index 94fe6f1..196d930 100644 --- a/examples/echo_server/src/echo_server.cpp +++ b/examples/echo_server/src/echo_server.cpp @@ -29,7 +29,7 @@ class WrapWaker : public socket_manager::Notifier { */ class EchoReceiver : public socket_manager::MsgReceiverAsync { public: - explicit EchoReceiver(std::shared_ptr &&sender, + explicit EchoReceiver(std::unique_ptr &&sender, const std::shared_ptr &waker) : waker(waker), sender(std::move(sender)){}; @@ -48,7 +48,7 @@ class EchoReceiver : public socket_manager::MsgReceiverAsync { return sender->send_async(data); }; std::shared_ptr waker; - std::shared_ptr sender; + std::unique_ptr sender; }; /** @@ -57,7 +57,7 @@ class EchoReceiver : public socket_manager::MsgReceiverAsync { class EchoCallback : public socket_manager::ConnCallback { private: void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { auto waker = std::make_shared(socket_manager::Waker()); auto recv = std::make_shared(std::move(sender), waker); { diff --git a/include/socket_manager/conn_callback.h b/include/socket_manager/conn_callback.h index e273c80..d6e3e23 100644 --- a/include/socket_manager/conn_callback.h +++ b/include/socket_manager/conn_callback.h @@ -47,7 +47,7 @@ class ConnCallback { * @param sender a `Sender` object for sending data. */ virtual void on_connect(std::shared_ptr conn, - std::shared_ptr sender) = 0; + std::unique_ptr sender) = 0; /** * Called when a connection is closed. diff --git a/socket_manager/socket_manager_c_api.cc b/socket_manager/socket_manager_c_api.cc index 74a196f..a244355 100644 --- a/socket_manager/socket_manager_c_api.cc +++ b/socket_manager/socket_manager_c_api.cc @@ -56,7 +56,7 @@ socket_manager_extern_on_conn(SOCKET_MANAGER_C_API_OnConnObj this_, std::shared_ptr conn( new socket_manager::Connection(on_connect.Conn)); - std::shared_ptr sender( + std::unique_ptr sender( new socket_manager::MsgSender(on_connect.Send, conn)); // keep the connection alive diff --git a/tests/test_auto_flush.cpp b/tests/test_auto_flush.cpp index d0b8bbb..df90112 100644 --- a/tests/test_auto_flush.cpp +++ b/tests/test_auto_flush.cpp @@ -1,3 +1,4 @@ +#include #undef NDEBUG #include "test_utils.h" @@ -32,33 +33,33 @@ class HelloWorldManager : public DoNothingConnCallback { received(std::make_shared(false)) {} void on_connect(std::shared_ptr conn, - std::shared_ptr send) override { + std::unique_ptr send) override { auto do_nothing = std::make_unique(mutex, cond, received); conn->start(std::move(do_nothing)); - this->sender = send; + this->sender = std::move(send); sender->send_block("hello world"); } std::shared_ptr mutex; std::shared_ptr cond; std::shared_ptr received; - std::shared_ptr sender; + std::unique_ptr sender; }; class SendHelloWorldDoNotClose : public DoNothingConnCallback { void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { auto do_nothing = std::make_unique(); conn->start(std::move(do_nothing)); - this->sender = sender; + this->sender = std::move(sender); std::thread sender_t([this] { this->sender->send_block("hello world"); }); sender_t.detach(); } private: // store sender, do not close connection - std::shared_ptr sender; + std::unique_ptr sender; }; int test_auto_flush(int argc, char **argv) { diff --git a/tests/test_callback_throw_error.cpp b/tests/test_callback_throw_error.cpp index 07e97a0..eec83c0 100644 --- a/tests/test_callback_throw_error.cpp +++ b/tests/test_callback_throw_error.cpp @@ -1,3 +1,4 @@ +#include #undef NDEBUG #include "test_utils.h" @@ -9,14 +10,14 @@ using namespace socket_manager; class OnConnectErrorBeforeStartCallback : public DoNothingConnCallback { void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { throw std::runtime_error("throw some error before calling start"); } }; class OnConnectErrorAfterStartCallback : public DoNothingConnCallback { void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { conn->start(std::make_unique()); throw std::runtime_error("throw some error after calling start"); } @@ -30,10 +31,9 @@ class OnMsgErrorReceiver : public MsgReceiver { class OnMsgErrorCallback : public ConnCallback { void on_connect(std::shared_ptr conn, - std::shared_ptr send) override { + std::unique_ptr send) override { conn->start(std::make_unique()); - this->sender = send; - sender.use_count(); + this->sender = std::move(send); } void on_remote_close(const std::string &local_addr, @@ -48,19 +48,19 @@ class OnMsgErrorCallback : public ConnCallback { void on_connect_error(const std::string &addr, const std::string &err) override {} - std::shared_ptr sender; + std::unique_ptr sender; }; class StoreAllEventsConnHelloCallback : public StoreAllEventsConnCallback { void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { std::unique_lock lock(*mutex); auto conn_id = conn->local_address() + "->" + conn->peer_address(); events->emplace_back(CONNECTED, conn_id); auto msg_storer = std::make_unique(conn_id, mutex, cond, buffer); conn->start(std::move(msg_storer)); - std::thread sender_t([sender]() { + std::thread sender_t([sender = std::move(sender)]() { try { sender->send_block("hello"); } catch (std::runtime_error &e) { /* ignore */ diff --git a/tests/test_find_package/CMakeLists.txt b/tests/test_find_package/CMakeLists.txt index 5a8b09e..d550e9d 100644 --- a/tests/test_find_package/CMakeLists.txt +++ b/tests/test_find_package/CMakeLists.txt @@ -8,5 +8,5 @@ set(CMAKE_CXX_STANDARD 17) add_executable(helloworld_server helloworld_server.cpp) set_property(TARGET helloworld_server PROPERTY INTERPROCEDURAL_OPTIMIZATION TRUE) -find_package(socket_manager 0.4.0 REQUIRED) +find_package(socket_manager 0.5.0 REQUIRED) target_link_libraries(helloworld_server PUBLIC socket_manager) diff --git a/tests/test_find_package/helloworld_server.cpp b/tests/test_find_package/helloworld_server.cpp index 4c054d9..1894cea 100644 --- a/tests/test_find_package/helloworld_server.cpp +++ b/tests/test_find_package/helloworld_server.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -9,15 +10,15 @@ class HelloWorldReceiver : public socket_manager::MsgReceiver { HelloWorldReceiver( std::string conn_id, std::mutex &mutex, std::unordered_map> &senders) + std::unique_ptr> &senders) : conn_id(std::move(conn_id)), mutex(mutex), senders(senders) {} void on_message(std::string_view data) override { try { std::unique_lock my_lock(mutex); - auto sender = senders.at(conn_id); - sender->send_block("HTTP/1.1 200 OK\r\nContent-Length: 12\r\nConnection: " - "close\r\n\r\nHello, world"); + senders.at(conn_id)->send_block( + "HTTP/1.1 200 OK\r\nContent-Length: 12\r\nConnection: " + "close\r\n\r\nHello, world"); senders.erase(conn_id); } catch (const std::out_of_range &e) { std::cerr << "Exception at " << e.what() << std::endl; @@ -27,19 +28,19 @@ class HelloWorldReceiver : public socket_manager::MsgReceiver { private: std::string conn_id; std::mutex &mutex; - std::unordered_map> + std::unordered_map> &senders; }; class MyCallback : public socket_manager::ConnCallback { public: void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { auto id = conn->local_address() + "->" + conn->peer_address(); conn->start(std::make_unique(id, mutex, senders)); { std::unique_lock my_lock(mutex); - senders[id] = sender; + senders[id] = std::move(sender); } } @@ -67,7 +68,7 @@ class MyCallback : public socket_manager::ConnCallback { private: std::mutex mutex; - std::unordered_map> + std::unordered_map> senders; }; diff --git a/tests/test_manual_flush.cpp b/tests/test_manual_flush.cpp index a842755..772862e 100644 --- a/tests/test_manual_flush.cpp +++ b/tests/test_manual_flush.cpp @@ -52,17 +52,18 @@ class EchoReceiver : public MsgReceiver { class HelloCallback : public ConnCallback { void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { auto rcv = std::make_unique(has_received, mutex, cond); // disable write auto flush conn->start(std::move(rcv), nullptr, DEFAULT_MSG_BUF_SIZE, DEFAULT_READ_MSG_FLUSH_MILLI_SEC, 0); - std::thread sender_t([sender] { - sender->send_block("hello world"); - sender->flush(); + std::shared_ptr sender_shared = std::move(sender); + std::thread sender_t([sender_shared] { + sender_shared->send_block("hello world"); + sender_shared->flush(); }); sender_t.detach(); - _sender = sender; + _sender = sender_shared; std::cout << "hello world sent" << std::endl; } @@ -91,11 +92,11 @@ class HelloCallback : public ConnCallback { class EchoCallback : public ConnCallback { void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { auto rcv = std::make_unique(has_received, _data, mutex, cond); // disable write auto flush conn->start(std::move(rcv), nullptr, DEFAULT_MSG_BUF_SIZE, 1, 0); - std::thread sender_t([sender, this]() { + std::thread sender_t([sender = std::move(sender), this]() { std::unique_lock lock(*mutex); cond->wait(lock, [this]() { return has_received; }); sender->send_block(*_data); diff --git a/tests/test_utils.h b/tests/test_utils.h index e5c79f8..46aed0b 100644 --- a/tests/test_utils.h +++ b/tests/test_utils.h @@ -1,3 +1,4 @@ +#include #undef NDEBUG #ifndef SOCKET_MANAGER_TEST_UTILS_H #define SOCKET_MANAGER_TEST_UTILS_H @@ -90,7 +91,7 @@ class MsgStoreReceiver : public MsgReceiver { class DoNothingConnCallback : public ConnCallback { public: void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { conn->close(); } @@ -119,7 +120,7 @@ class BitFlagCallback : public ConnCallback { : mutex(mutex), cond(cond), sig(sig), buffer(buffer) {} void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { set_sig(CONNECTED); auto conn_id = conn->local_address() + "->" + conn->peer_address(); auto msg_storer = @@ -177,14 +178,14 @@ class StoreAllEventsConnCallback : public ConnCallback { clean_sender_on_close(clean_sender_on_close) {} void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { std::unique_lock lock(*mutex); auto conn_id = conn->local_address() + "->" + conn->peer_address(); events->emplace_back(CONNECTED, conn_id); auto msg_storer = std::make_unique(conn_id, mutex, cond, buffer); conn->start(std::move(msg_storer)); - senders.emplace(conn_id, sender); + senders.emplace(conn_id, std::shared_ptr(sender.release())); connected_count->fetch_add(1, std::memory_order_seq_cst); cond->notify_all(); } diff --git a/tests/transfer_common.h b/tests/transfer_common.h index d98e198..ac1493b 100644 --- a/tests/transfer_common.h +++ b/tests/transfer_common.h @@ -36,14 +36,14 @@ class SendBlockCB : public DoNothingConnCallback { : msg_size(msg_size), total_size(total_size) {} void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { auto rcv = std::make_unique(); std::string data = transfer_private::make_test_message(msg_size); size_t msg_count = total_size / msg_size; conn->start(std::move(rcv)); - std::thread([sender, data, conn, msg_count]() { + std::thread([sender = std::move(sender), data, conn, msg_count]() { for (int i = 0; i < msg_count; ++i) { sender->send_block(data); } @@ -73,7 +73,7 @@ class SendAsyncCB : public DoNothingConnCallback { : msg_size(msg_size), total_size(total_size) {} void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { auto rcv = std::make_unique(); auto sem = std::make_shared(); auto waker = std::make_shared(sem); @@ -82,7 +82,7 @@ class SendAsyncCB : public DoNothingConnCallback { size_t msg_count = total_size / msg_size; conn->start(std::move(rcv), waker); - std::thread([sender, conn, msg_count, data, sem]() { + std::thread([sender = std::move(sender), conn, msg_count, data, sem]() { int progress = 0; size_t offset = 0; std::string_view data_view(data); @@ -112,7 +112,7 @@ class SendNoFlushCB : public DoNothingConnCallback { : msg_size(msg_size), total_size(total_size) {} void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { auto rcv = std::make_unique(); std::string data = transfer_private::make_test_message(msg_size); @@ -120,7 +120,7 @@ class SendNoFlushCB : public DoNothingConnCallback { conn->start(std::move(rcv), nullptr, DEFAULT_MSG_BUF_SIZE, DEFAULT_READ_MSG_FLUSH_MILLI_SEC, 0); - std::thread([sender, data, msg_count, conn]() { + std::thread([sender = std::move(sender), data, msg_count, conn]() { for (int i = 0; i < msg_count; ++i) { sender->send_block(data); } @@ -138,14 +138,14 @@ class SendNonBlockCB : public DoNothingConnCallback { : msg_size(msg_size), total_size(total_size) {} void on_connect(std::shared_ptr conn, - std::shared_ptr sender) override { + std::unique_ptr sender) override { auto rcv = std::make_unique(); std::string data = transfer_private::make_test_message(msg_size); size_t msg_count = total_size / msg_size; conn->start(std::move(rcv)); - std::thread([sender, data, msg_count, conn]() { + std::thread([sender = std::move(sender), data, msg_count, conn]() { for (int i = 0; i < msg_count; ++i) { sender->send_nonblock(data); } @@ -183,10 +183,10 @@ class CountDataNotifyOnCloseCallback : public ConnCallback { count(std::make_shared(0)) {} void on_connect(std::shared_ptr conn, - std::shared_ptr send) override { + std::unique_ptr send) override { auto rcv = std::make_unique(add_data, count); conn->start(std::move(rcv)); - this->sender = send; + this->sender = std::move(send); } void on_connection_close(const std::string &local_addr, @@ -213,7 +213,7 @@ class CountDataNotifyOnCloseCallback : public ConnCallback { std::atomic_bool has_closed; std::shared_ptr add_data; std::shared_ptr count; - std::shared_ptr sender; + std::unique_ptr sender; }; #endif // SOCKET_MANAGER_TEST_TRANSFER_COMMON_H