From 9a404bf50387d49e492e0616a38d735e0effa537 Mon Sep 17 00:00:00 2001 From: Congyu <52687642+Congyuwang@users.noreply.github.com> Date: Thu, 25 Jan 2024 14:45:12 +0800 Subject: [PATCH 1/6] minor updates, remove transitive includes (#52) * minor updates, remove transitive includes * add memory header in c_api.cc * add missing include for example * add stdexcept in connection.cc --- .clangd | 3 +- CMakeLists.txt | 2 +- Cargo.toml | 2 +- corrosion | 2 +- examples/echo_server/src/echo_server.cpp | 4 +- include/socket_manager/conn_callback.h | 3 - include/socket_manager/connection.h | 2 - include/socket_manager/msg_receiver.h | 4 - include/socket_manager/msg_sender.h | 2 - include/socket_manager/socket_manager.h | 1 - include/socket_manager_c_api.h | 123 +++++++++--------- socket_manager/connection.cc | 3 +- socket_manager/socket_manager.cc | 1 + socket_manager/socket_manager_c_api.cc | 2 + src/buf_read_write.rs | 2 +- src/lib.rs | 1 - tests/test_find_package/helloworld_server.cpp | 1 + tests/test_utils.h | 2 +- tests/transfer_common.h | 3 +- 19 files changed, 80 insertions(+), 83 deletions(-) diff --git a/.clangd b/.clangd index c8f81f7..0b8b86c 100644 --- a/.clangd +++ b/.clangd @@ -15,5 +15,6 @@ Diagnostics: 'cppcoreguidelines-pro-type-reinterpret-cast', 'cppcoreguidelines-pro-type-union-access', 'cppcoreguidelines-special-member-functions', - 'bugprone-easily-swappable-parameters' + 'bugprone-easily-swappable-parameters', + 'performance-avoid-endl' ] diff --git a/CMakeLists.txt b/CMakeLists.txt index a7dbf83..fa31e46 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ set(CMAKE_CXX_STANDARD 17) set(CMAKE_TOOLCHAIN_FILE ${CMAKE_SOURCE_DIR}/toolchain.cmake) # define project -project(socket_manager LANGUAGES C CXX VERSION 0.5.0) +project(socket_manager LANGUAGES C CXX VERSION 0.5.1) # set default build type as shared option(BUILD_SHARED_LIBS "Build using shared libraries" ON) diff --git a/Cargo.toml b/Cargo.toml index 16f10da..d340c5a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tokio-socket-manager" -version = "0.5.0" +version = "0.5.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/corrosion b/corrosion index 6ae04cf..8ddd6d5 160000 --- a/corrosion +++ b/corrosion @@ -1 +1 @@ -Subproject commit 6ae04cf691fa721945428b2f96b0818085135890 +Subproject commit 8ddd6d56ca597cb855f532e9ba4c7bc1cbe0803b diff --git a/examples/echo_server/src/echo_server.cpp b/examples/echo_server/src/echo_server.cpp index 196d930..c2c2617 100644 --- a/examples/echo_server/src/echo_server.cpp +++ b/examples/echo_server/src/echo_server.cpp @@ -1,9 +1,9 @@ #include #include +#include +#include #include -#include #include -#include /** * UniqueWaker is a wrapper of `socket_manager::Waker` diff --git a/include/socket_manager/conn_callback.h b/include/socket_manager/conn_callback.h index d6e3e23..7a15a0d 100644 --- a/include/socket_manager/conn_callback.h +++ b/include/socket_manager/conn_callback.h @@ -3,11 +3,8 @@ #include "connection.h" #include "socket_manager_c_api.h" -#include -#include #include #include -#include #include #include diff --git a/include/socket_manager/connection.h b/include/socket_manager/connection.h index 6faad3c..2e94f94 100644 --- a/include/socket_manager/connection.h +++ b/include/socket_manager/connection.h @@ -3,10 +3,8 @@ #include "msg_receiver.h" #include "socket_manager_c_api.h" -#include #include #include -#include #include namespace socket_manager { diff --git a/include/socket_manager/msg_receiver.h b/include/socket_manager/msg_receiver.h index b23cc0f..fbb1fa8 100644 --- a/include/socket_manager/msg_receiver.h +++ b/include/socket_manager/msg_receiver.h @@ -3,10 +3,6 @@ #include "socket_manager_c_api.h" #include "waker.h" -#include -#include -#include -#include #include namespace socket_manager { diff --git a/include/socket_manager/msg_sender.h b/include/socket_manager/msg_sender.h index a9aba71..b33dc21 100644 --- a/include/socket_manager/msg_sender.h +++ b/include/socket_manager/msg_sender.h @@ -2,11 +2,9 @@ #define SOCKET_MANAGER_MSG_SENDER_H #include "connection.h" -#include "notifier.h" #include "socket_manager_c_api.h" #include #include -#include namespace socket_manager { diff --git a/include/socket_manager/socket_manager.h b/include/socket_manager/socket_manager.h index 4ec7c11..38b4eef 100644 --- a/include/socket_manager/socket_manager.h +++ b/include/socket_manager/socket_manager.h @@ -2,7 +2,6 @@ #define SOCKET_MANAGER_H #include "conn_callback.h" -#include "msg_sender.h" #include "socket_manager_c_api.h" #include #include diff --git a/include/socket_manager_c_api.h b/include/socket_manager_c_api.h index e3df771..5a96012 100644 --- a/include/socket_manager_c_api.h +++ b/include/socket_manager_c_api.h @@ -1,12 +1,9 @@ #ifndef SOCKET_MANAGER_C_API_H #define SOCKET_MANAGER_C_API_H -#include #include #include #include -#include -#include enum class SOCKET_MANAGER_C_API_ConnStateCode { Connect = 0, @@ -230,7 +227,8 @@ extern "C" { /** * Waker for the try_send method. */ -extern void socket_manager_extern_notifier_wake(SOCKET_MANAGER_C_API_Notifier this_); +extern void +socket_manager_extern_notifier_wake(SOCKET_MANAGER_C_API_Notifier this_); /** * Call the waker to wake the relevant task of context. @@ -243,7 +241,8 @@ void socket_manager_waker_wake(const SOCKET_MANAGER_C_API_CWaker *waker); void socket_manager_waker_free(SOCKET_MANAGER_C_API_CWaker waker); /** - * Start a connection with the given `OnMsgCallback`, and return a pointer to a `MsgSender`. + * Start a connection with the given `OnMsgCallback`, and return a pointer to a + * `MsgSender`. * * Only one of `connection_start` or `connection_close` should be called, * or it will result in runtime error. @@ -256,17 +255,19 @@ void socket_manager_waker_free(SOCKET_MANAGER_C_API_CWaker waker); * * # Arguments * * `conn` - A pointer to a `CConnection`. - * * `on_msg` - A callback function that will be called when a message is received. + * * `on_msg` - A callback function that will be called when a message is + * received. * * `msg_buffer_size` - The size of the message buffer in bytes. * Set to 0 to use no buffer (i.e., call `on_msg` immediately on receiving * any data). The minimum is 8KB, and the maximum is 8MB. - * * `read_msg_flush_interval` - The interval in `milliseconds` of read message buffer - * auto flushing. The value is ignored when `msg_buffer_size` is 0. - * Set to 0 to disable auto flush (which is not recommended since there is no - * manual flush, and small messages might get stuck in buffer). - * * `write_flush_interval` - The interval in `milliseconds` of write buffer auto flushing. - * Set to 0 to disable auto flush. - * * `err` - A pointer to a pointer to a C string allocated by `malloc` on error. + * * `read_msg_flush_interval` - The interval in `milliseconds` of read message + * buffer auto flushing. The value is ignored when `msg_buffer_size` is 0. Set + * to 0 to disable auto flush (which is not recommended since there is no manual + * flush, and small messages might get stuck in buffer). + * * `write_flush_interval` - The interval in `milliseconds` of write buffer + * auto flushing. Set to 0 to disable auto flush. + * * `err` - A pointer to a pointer to a C string allocated by `malloc` on + * error. * * # Errors * Returns 1 on error, 0 on success. @@ -284,14 +285,16 @@ int socket_manager_connection_start(SOCKET_MANAGER_C_API_Connection *conn, * * The returned string is malloced and should be freed by the caller. */ -char *socket_manager_connection_local_addr(SOCKET_MANAGER_C_API_Connection *conn); +char * +socket_manager_connection_local_addr(SOCKET_MANAGER_C_API_Connection *conn); /** * Peer address of the connection. * * The returned string is malloced and should be freed by the caller. */ -char *socket_manager_connection_peer_addr(SOCKET_MANAGER_C_API_Connection *conn); +char * +socket_manager_connection_peer_addr(SOCKET_MANAGER_C_API_Connection *conn); /** * The close API works either before `start_connection` is called @@ -305,7 +308,8 @@ char *socket_manager_connection_peer_addr(SOCKET_MANAGER_C_API_Connection *conn) * Returns 1 on error, 0 on success. * On Error, `err` will be set to a pointer to a C string allocated by `malloc`. */ -int socket_manager_connection_close(SOCKET_MANAGER_C_API_Connection *conn, char **err); +int socket_manager_connection_close(SOCKET_MANAGER_C_API_Connection *conn, + char **err); /** * Destructor of `Connection`. @@ -319,19 +323,18 @@ void socket_manager_connection_free(SOCKET_MANAGER_C_API_Connection *conn); * # Thread Safety * Thread safe. * - * This function should never be called within the context of the async callbacks - * since it might block. + * This function should never be called within the context of the async + * callbacks since it might block. * * # Errors - * If the connection is closed, the function will return 1 and set `err` to a pointer - * with WriteZero error. + * If the connection is closed, the function will return 1 and set `err` to a + * pointer with WriteZero error. * * Returns 1 on error, 0 on success. * On Error, `err` will be set to a pointer to a C string allocated by `malloc`. */ int socket_manager_msg_sender_send_block(SOCKET_MANAGER_C_API_MsgSender *sender, - const char *msg, - size_t len, + const char *msg, size_t len, char **err); /** @@ -348,16 +351,15 @@ int socket_manager_msg_sender_send_block(SOCKET_MANAGER_C_API_MsgSender *sender, * This function can be called within the context of the async callbacks. * * # Errors - * If the connection is closed, the function will return 1 and set `err` to a pointer - * with WriteZero error. + * If the connection is closed, the function will return 1 and set `err` to a + * pointer with WriteZero error. * * Returns 1 on error, 0 on success. * On Error, `err` will be set to a pointer to a C string allocated by `malloc`. */ -int socket_manager_msg_sender_send_nonblock(SOCKET_MANAGER_C_API_MsgSender *sender, - const char *msg, - size_t len, - char **err); +int socket_manager_msg_sender_send_nonblock( + SOCKET_MANAGER_C_API_MsgSender *sender, const char *msg, size_t len, + char **err); /** * Try to send a message via the given `MsgSender` asynchronously. @@ -378,11 +380,9 @@ int socket_manager_msg_sender_send_nonblock(SOCKET_MANAGER_C_API_MsgSender *send * Use `err` pointer to check for error. * On Error, `err` will be set to a pointer to a C string allocated by `malloc`. */ -long socket_manager_msg_sender_send_async(SOCKET_MANAGER_C_API_MsgSender *sender, - const char *msg, - size_t len, - SOCKET_MANAGER_C_API_Notifier notifier, - char **err); +long socket_manager_msg_sender_send_async( + SOCKET_MANAGER_C_API_MsgSender *sender, const char *msg, size_t len, + SOCKET_MANAGER_C_API_Notifier notifier, char **err); /** * Manually flush the message sender. @@ -394,7 +394,8 @@ long socket_manager_msg_sender_send_async(SOCKET_MANAGER_C_API_MsgSender *sender * Returns 1 on error, 0 on success. * On Error, `err` will be set to a pointer to a C string allocated by `malloc`. */ -int socket_manager_msg_sender_flush(SOCKET_MANAGER_C_API_MsgSender *sender, char **err); +int socket_manager_msg_sender_flush(SOCKET_MANAGER_C_API_MsgSender *sender, + char **err); /** * Destructor of `MsgSender`. @@ -408,9 +409,10 @@ void socket_manager_msg_sender_free(SOCKET_MANAGER_C_API_MsgSender *sender); * pass error to `err` pointer. * Set `err` to null_ptr if there is no error. */ -extern void socket_manager_extern_on_conn(SOCKET_MANAGER_C_API_OnConnObj this_, - SOCKET_MANAGER_C_API_ConnStates states, - char **err); +extern void +socket_manager_extern_on_conn(SOCKET_MANAGER_C_API_OnConnObj this_, + SOCKET_MANAGER_C_API_ConnStates states, + char **err); /** * Rust calls this function to send `msg: ConnMsg` @@ -441,15 +443,17 @@ extern long socket_manager_extern_on_msg(SOCKET_MANAGER_C_API_OnMsgObj this_, * Initialize a new `SocketManager` and return a pointer to it. * * # Number of workers - * If `n_threads` is 0, the number of workers will be set to the number of logical cores. - * If `n_threads` is 1, uses single-threaded runtime. - * `n_threads` is capped at 256. + * If `n_threads` is 0, the number of workers will be set to the number of + * logical cores. If `n_threads` is 1, uses single-threaded runtime. `n_threads` + * is capped at 256. * * # connection callback - * `on_conn_self` is passed to the callback function `on_conn` as the first argument. + * `on_conn_self` is passed to the callback function `on_conn` as the first + * argument. * * # Safety - * The passed in callback pointers must live as long as the `SocketManager` does. + * The passed in callback pointers must live as long as the `SocketManager` + * does. * * # Non-blocking * Must ensure that the callback functions of `callback_obj` are non-blocking. @@ -458,9 +462,9 @@ extern long socket_manager_extern_on_msg(SOCKET_MANAGER_C_API_OnMsgObj this_, * On Error, `err` will be set to a pointer to a C string allocated by `malloc`, * and the returned pointer will be null. */ -SOCKET_MANAGER_C_API_SocketManager *socket_manager_init(SOCKET_MANAGER_C_API_OnConnObj on_conn, - size_t n_threads, - char **err); +SOCKET_MANAGER_C_API_SocketManager * +socket_manager_init(SOCKET_MANAGER_C_API_OnConnObj on_conn, size_t n_threads, + char **err); /** * Listen on the given address. @@ -473,8 +477,7 @@ SOCKET_MANAGER_C_API_SocketManager *socket_manager_init(SOCKET_MANAGER_C_API_OnC * On Error, `err` will be set to a pointer to a C string allocated by `malloc`. */ int socket_manager_listen_on_addr(SOCKET_MANAGER_C_API_SocketManager *manager, - const char *addr, - char **err); + const char *addr, char **err); /** * Connect to the given address. @@ -490,8 +493,7 @@ int socket_manager_listen_on_addr(SOCKET_MANAGER_C_API_SocketManager *manager, * On Error, `err` will be set to a pointer to a C string allocated by `malloc`. */ int socket_manager_connect_to_addr(SOCKET_MANAGER_C_API_SocketManager *manager, - const char *addr, - uint64_t delay, + const char *addr, uint64_t delay, char **err); /** @@ -504,9 +506,8 @@ int socket_manager_connect_to_addr(SOCKET_MANAGER_C_API_SocketManager *manager, * Returns 1 on error, 0 on success. * On Error, `err` will be set to a pointer to a C string allocated by `malloc`. */ -int socket_manager_cancel_listen_on_addr(SOCKET_MANAGER_C_API_SocketManager *manager, - const char *addr, - char **err); +int socket_manager_cancel_listen_on_addr( + SOCKET_MANAGER_C_API_SocketManager *manager, const char *addr, char **err); /** * Abort the `SocketManager`'s background runtime. @@ -521,7 +522,8 @@ int socket_manager_cancel_listen_on_addr(SOCKET_MANAGER_C_API_SocketManager *man * Returns 1 on error, 0 on success. * On Error, `err` will be set to a pointer to a C string allocated by `malloc`. */ -int socket_manager_abort(SOCKET_MANAGER_C_API_SocketManager *manager, bool wait, char **err); +int socket_manager_abort(SOCKET_MANAGER_C_API_SocketManager *manager, bool wait, + char **err); /** * Join and wait on the `SocketManager`. @@ -529,13 +531,14 @@ int socket_manager_abort(SOCKET_MANAGER_C_API_SocketManager *manager, bool wait, * # Thread Safety * Thread safe. Calling a second time will return immediately. * - * This function will block until the `SocketManager`'s background runtime finishes, - * (i.e., `abort` is called from another thread). + * This function will block until the `SocketManager`'s background runtime + * finishes, (i.e., `abort` is called from another thread). * * # Errors * Join returns error if the runtime panicked. */ -int socket_manager_join(SOCKET_MANAGER_C_API_SocketManager *manager, char **err); +int socket_manager_join(SOCKET_MANAGER_C_API_SocketManager *manager, + char **err); /** * Calling this function will abort all background runtime and join on them, @@ -551,10 +554,10 @@ void socket_manager_free(SOCKET_MANAGER_C_API_SocketManager *manager); * - `tracer_max_level`: The max level of the tracer. * - `log_print_level`: The level of the log to print. */ -void socket_manager_logger_init(void (*tracer)(SOCKET_MANAGER_C_API_LogData), - SOCKET_MANAGER_C_API_TraceLevel tracer_max_level, - SOCKET_MANAGER_C_API_TraceLevel log_print_level, - char **err); +void socket_manager_logger_init( + void (*tracer)(SOCKET_MANAGER_C_API_LogData), + SOCKET_MANAGER_C_API_TraceLevel tracer_max_level, + SOCKET_MANAGER_C_API_TraceLevel log_print_level, char **err); } // extern "C" diff --git a/socket_manager/connection.cc b/socket_manager/connection.cc index a931d50..165e589 100644 --- a/socket_manager/connection.cc +++ b/socket_manager/connection.cc @@ -1,6 +1,7 @@ #include "socket_manager/connection.h" -#include "socket_manager/msg_sender.h" +#include "socket_manager/common/notifier.h" #include "socket_manager_c_api.h" +#include namespace socket_manager { diff --git a/socket_manager/socket_manager.cc b/socket_manager/socket_manager.cc index 16db18f..c032ea6 100644 --- a/socket_manager/socket_manager.cc +++ b/socket_manager/socket_manager.cc @@ -1,5 +1,6 @@ #include "socket_manager/socket_manager.h" #include "socket_manager_c_api.h" +#include #include namespace socket_manager { diff --git a/socket_manager/socket_manager_c_api.cc b/socket_manager/socket_manager_c_api.cc index a244355..9f28e49 100644 --- a/socket_manager/socket_manager_c_api.cc +++ b/socket_manager/socket_manager_c_api.cc @@ -1,8 +1,10 @@ #include "socket_manager_c_api.h" +#include "socket_manager/common/notifier.h" #include "socket_manager/common/waker.h" #include "socket_manager/conn_callback.h" #include "socket_manager/msg_receiver.h" #include "socket_manager/msg_sender.h" +#include inline char *string_dup(const std::string &str) { auto size = str.size(); diff --git a/src/buf_read_write.rs b/src/buf_read_write.rs index 3ddb249..bd12ed4 100644 --- a/src/buf_read_write.rs +++ b/src/buf_read_write.rs @@ -94,7 +94,7 @@ impl Future for WriteAll<'_, W> { Ready(Ok(n)) => { // update local read progress pos += n; - if std::intrinsics::unlikely(n == 0) { + if n == 0 { // update read progress *this.r_offset = pos; return Ready(Err(std::io::ErrorKind::WriteZero.into())); diff --git a/src/lib.rs b/src/lib.rs index e77ef77..14874e0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,6 @@ #![feature(fn_traits)] #![feature(waker_getters)] #![allow(improper_ctypes)] -#![feature(core_intrinsics)] mod buf_read_write; mod c_api; diff --git a/tests/test_find_package/helloworld_server.cpp b/tests/test_find_package/helloworld_server.cpp index 1894cea..40ce64b 100644 --- a/tests/test_find_package/helloworld_server.cpp +++ b/tests/test_find_package/helloworld_server.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include diff --git a/tests/test_utils.h b/tests/test_utils.h index 46aed0b..ac85e18 100644 --- a/tests/test_utils.h +++ b/tests/test_utils.h @@ -5,10 +5,10 @@ #include "spdlog/spdlog.h" #include -#include #include #include #include +#include #include #include #include diff --git a/tests/transfer_common.h b/tests/transfer_common.h index ac1493b..ca5496c 100644 --- a/tests/transfer_common.h +++ b/tests/transfer_common.h @@ -8,7 +8,8 @@ #include "concurrentqueue/concurrentqueue.h" #include "concurrentqueue/lightweightsemaphore.h" #include "test_utils.h" -#include +#include +#include #include const int PRINT_INTERVAL = 100; From f144de696953e204300a84f16a1ec8aef9c7d851 Mon Sep 17 00:00:00 2001 From: Congyu Date: Tue, 20 Feb 2024 11:23:11 +0800 Subject: [PATCH 2/6] deps update --- Cargo.toml | 18 +++++++++++------ rust-toolchain.toml | 2 +- src/c_api/connection.rs | 5 +---- src/conn.rs | 3 +-- src/read.rs | 45 +++++++++-------------------------------- 5 files changed, 24 insertions(+), 49 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d340c5a..8d60ba1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,15 +10,21 @@ crate-type = ["staticlib"] [dependencies] dashmap = { version = "5.4.0", features = ["inline"] } libc = "0.2.146" -socket2 = "0.5.3" - -[dependencies.async-ringbuf] -git = "https://github.com/Congyuwang/ringbuf.git" +async-ringbuf = "0.2.0-rc.5" [dependencies.tokio] -version = "1.29.1" +version = "1.36.0" default-features = false -features = ["rt", "rt-multi-thread", "net", "time", "sync", "io-util", "macros", "parking_lot"] +features = [ + "rt", + "rt-multi-thread", + "net", + "time", + "sync", + "io-util", + "macros", + "parking_lot", +] [dependencies.futures] version = "0.3.28" diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 67d9a53..7435db0 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "nightly" +channel = "nightly-2024-02-04" profile = "minimal" diff --git a/src/c_api/connection.rs b/src/c_api/connection.rs index b6c0e37..3e94078 100644 --- a/src/c_api/connection.rs +++ b/src/c_api/connection.rs @@ -4,7 +4,6 @@ use crate::c_api::utils::write_display_c_str; use crate::conn::ConnConfig; use libc::size_t; use std::ffi::{c_char, c_int}; -use std::num::NonZeroUsize; use std::os::raw::c_ulonglong; use std::ptr::null_mut; use std::time::Duration; @@ -24,8 +23,7 @@ use std::time::Duration; /// * `conn` - A pointer to a `CConnection`. /// * `on_msg` - A callback function that will be called when a message is received. /// * `msg_buffer_size` - The size of the message buffer in bytes. -/// Set to 0 to use no buffer (i.e., call `on_msg` immediately on receiving -/// any data). The minimum is 8KB, and the maximum is 8MB. +/// The minimum is 8KB, and the maximum is 8MB. /// * `read_msg_flush_interval` - The interval in `milliseconds` of read message buffer /// auto flushing. The value is ignored when `msg_buffer_size` is 0. /// Set to 0 to disable auto flush (which is not recommended since there is no @@ -49,7 +47,6 @@ pub unsafe extern "C" fn socket_manager_connection_start( let conn = &mut (*conn).conn; let write_flush_interval = Duration::from_millis(write_flush_interval); let read_msg_flush_interval = Duration::from_millis(read_msg_flush_interval); - let msg_buffer_size = NonZeroUsize::new(msg_buffer_size); match conn.start_connection( on_msg, ConnConfig { diff --git a/src/conn.rs b/src/conn.rs index b5d7709..40f5d7e 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -1,6 +1,5 @@ use crate::{ConnectionState, Msg}; use std::net::SocketAddr; -use std::num::NonZeroUsize; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::{Poll, Waker}; @@ -44,7 +43,7 @@ pub struct ConnConfig { pub write_flush_interval: Duration, /// zero represent no auto flush pub read_msg_flush_interval: Duration, - pub msg_buffer_size: Option, + pub msg_buffer_size: usize, } impl, Waker) -> Poll> + Send + 'static> Conn { diff --git a/src/read.rs b/src/read.rs index 2e87a9f..185587d 100644 --- a/src/read.rs +++ b/src/read.rs @@ -5,7 +5,7 @@ use std::pin::Pin; use std::task::Poll::Ready; use std::task::{ready, Context, Poll, Waker}; use std::time::Duration; -use tokio::io::{AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader}; +use tokio::io::AsyncWrite; use tokio::net::tcp::OwnedReadHalf; use tokio::time::MissedTickBehavior; @@ -20,19 +20,15 @@ pub(crate) async fn handle_reader< on_msg: OnMsg, config: ConnConfig, ) -> std::io::Result<()> { - if let Some(msg_buf_size) = config.msg_buffer_size { - let msg_buf_size = msg_buf_size - .get() - .min(MAX_MSG_BUFFER_SIZE) - .max(MIN_MSG_BUFFER_SIZE); - let duration = config.read_msg_flush_interval; - if duration.is_zero() { - handle_reader_no_auto_flush(read, on_msg, msg_buf_size).await - } else { - handle_reader_auto_flush(read, on_msg, duration, msg_buf_size).await - } + let msg_buf_size = config + .msg_buffer_size + .min(MAX_MSG_BUFFER_SIZE) + .max(MIN_MSG_BUFFER_SIZE); + let duration = config.read_msg_flush_interval; + if duration.is_zero() { + handle_reader_no_auto_flush(read, on_msg, msg_buf_size).await } else { - handle_reader_no_buf(read, on_msg).await + handle_reader_auto_flush(read, on_msg, duration, msg_buf_size).await } } @@ -89,29 +85,6 @@ async fn handle_reader_no_auto_flush< Ok(()) } -/// Has no write buffer, received is sent immediately. -async fn handle_reader_no_buf< - OnMsg: Fn(Msg<'_>, Waker) -> Poll> + Send + Unpin + 'static, ->( - read: OwnedReadHalf, - on_msg: OnMsg, -) -> std::io::Result<()> { - let recv_buffer_size = socket2::SockRef::from(read.as_ref()).recv_buffer_size()?; - tracing::trace!("recv buffer size: {}", recv_buffer_size); - let mut on_msg = OnMsgWrite { on_msg }; - let mut buf_reader = BufReader::with_capacity(recv_buffer_size, read); - loop { - let bytes = buf_reader.fill_buf().await?; - let n = bytes.len(); - if n == 0 { - break; - } - on_msg.write_all(bytes).await?; - buf_reader.consume(n); - } - Ok(()) -} - /// async write wrapper for on_msg struct OnMsgWrite { on_msg: OnMsg, From aa065baf82d91837e2492b28e5b98dbe13ffa8b5 Mon Sep 17 00:00:00 2001 From: Congyu Date: Tue, 20 Feb 2024 11:24:32 +0800 Subject: [PATCH 3/6] update cbind --- include/socket_manager_c_api.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/include/socket_manager_c_api.h b/include/socket_manager_c_api.h index 5a96012..4c4ad61 100644 --- a/include/socket_manager_c_api.h +++ b/include/socket_manager_c_api.h @@ -258,8 +258,7 @@ void socket_manager_waker_free(SOCKET_MANAGER_C_API_CWaker waker); * * `on_msg` - A callback function that will be called when a message is * received. * * `msg_buffer_size` - The size of the message buffer in bytes. - * Set to 0 to use no buffer (i.e., call `on_msg` immediately on receiving - * any data). The minimum is 8KB, and the maximum is 8MB. + * The minimum is 8KB, and the maximum is 8MB. * * `read_msg_flush_interval` - The interval in `milliseconds` of read message * buffer auto flushing. The value is ignored when `msg_buffer_size` is 0. Set * to 0 to disable auto flush (which is not recommended since there is no manual From ef8dce2a4e882b1a725b2bb7221f400bc93bbbed Mon Sep 17 00:00:00 2001 From: Congyu Date: Tue, 20 Feb 2024 11:25:14 +0800 Subject: [PATCH 4/6] update msg_buffer_size settings --- include/socket_manager/connection.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/include/socket_manager/connection.h b/include/socket_manager/connection.h index 2e94f94..3ce0279 100644 --- a/include/socket_manager/connection.h +++ b/include/socket_manager/connection.h @@ -45,8 +45,6 @@ class Connection { * send buffer is ready. Pass nullptr to use a noop notifier. * This parameter is needed only for async sending. * @param msg_buffer_size The size of the message buffer in bytes. - * Set to 0 to use no buffer (i.e., call `on_msg` immediately on receiving - * any data, expecting the user to implement buffer if needed). * The minimum is 8KB, and the maximum is 8MB. Default to 64KB. * @param write_flush_interval The interval in `milliseconds` * of write buffer auto flushing. Set to 0 to disable auto flush. From 1731da4fc007f02b0979524cfe31e1fda9688331 Mon Sep 17 00:00:00 2001 From: Congyu Date: Tue, 20 Feb 2024 12:36:31 +0800 Subject: [PATCH 5/6] remove futures, libc from deps --- Cargo.toml | 6 ------ rust-toolchain.toml | 2 +- src/c_api/conn_events.rs | 3 +-- src/c_api/connection.rs | 3 +-- src/c_api/msg_sender.rs | 7 +++---- src/c_api/socket_manager.rs | 3 +-- src/c_api/tracer.rs | 7 +++---- src/c_api/utils.rs | 10 ++++++++++ src/msg_sender.rs | 16 +++++++++++++--- 9 files changed, 33 insertions(+), 24 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8d60ba1..28e055a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ crate-type = ["staticlib"] [dependencies] dashmap = { version = "5.4.0", features = ["inline"] } -libc = "0.2.146" async-ringbuf = "0.2.0-rc.5" [dependencies.tokio] @@ -26,11 +25,6 @@ features = [ "parking_lot", ] -[dependencies.futures] -version = "0.3.28" -default-features = false -features = ["async-await"] - [dependencies.tracing] version = "0.1.37" default-features = false diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 7435db0..0c7fa94 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] channel = "nightly-2024-02-04" -profile = "minimal" +profile = "default" diff --git a/src/c_api/conn_events.rs b/src/c_api/conn_events.rs index 43fd453..ddd1cea 100644 --- a/src/c_api/conn_events.rs +++ b/src/c_api/conn_events.rs @@ -1,13 +1,12 @@ use crate::c_api::on_msg::OnMsgObj; use crate::MsgSender; -use libc::size_t; use std::ffi::c_char; /// The data pointer is only valid for the duration of the callback. #[repr(C)] pub struct ConnMsg { pub(crate) bytes: *const c_char, - pub(crate) len: size_t, + pub(crate) len: usize, } /// All data is only valid for the duration of the callback diff --git a/src/c_api/connection.rs b/src/c_api/connection.rs index 3e94078..854098c 100644 --- a/src/c_api/connection.rs +++ b/src/c_api/connection.rs @@ -2,7 +2,6 @@ use crate::c_api::conn_events::Connection; use crate::c_api::on_msg::OnMsgObj; use crate::c_api::utils::write_display_c_str; use crate::conn::ConnConfig; -use libc::size_t; use std::ffi::{c_char, c_int}; use std::os::raw::c_ulonglong; use std::ptr::null_mut; @@ -39,7 +38,7 @@ use std::time::Duration; pub unsafe extern "C" fn socket_manager_connection_start( conn: *mut Connection, on_msg: OnMsgObj, - msg_buffer_size: size_t, + msg_buffer_size: usize, read_msg_flush_interval: c_ulonglong, write_flush_interval: c_ulonglong, err: *mut *mut c_char, diff --git a/src/c_api/msg_sender.rs b/src/c_api/msg_sender.rs index a62c68b..fa061d1 100644 --- a/src/c_api/msg_sender.rs +++ b/src/c_api/msg_sender.rs @@ -1,7 +1,6 @@ use crate::c_api::async_ffi::notifier::Notifier; use crate::c_api::utils::write_display_c_str; use crate::msg_sender::MsgSender; -use libc::size_t; use std::ffi::{c_char, c_int, c_long}; use std::ptr::null_mut; use std::task::Poll; @@ -27,7 +26,7 @@ pub const PENDING: c_long = -1; pub unsafe extern "C" fn socket_manager_msg_sender_send_block( sender: *mut MsgSender, msg: *const c_char, - len: size_t, + len: usize, err: *mut *mut c_char, ) -> c_int { let sender = &mut (*sender); @@ -66,7 +65,7 @@ pub unsafe extern "C" fn socket_manager_msg_sender_send_block( pub unsafe extern "C" fn socket_manager_msg_sender_send_nonblock( sender: *mut MsgSender, msg: *const c_char, - len: size_t, + len: usize, err: *mut *mut c_char, ) -> c_int { let sender = &mut (*sender); @@ -104,7 +103,7 @@ pub unsafe extern "C" fn socket_manager_msg_sender_send_nonblock( pub unsafe extern "C" fn socket_manager_msg_sender_send_async( sender: *mut MsgSender, msg: *const c_char, - len: size_t, + len: usize, notifier: Notifier, err: *mut *mut c_char, ) -> c_long { diff --git a/src/c_api/socket_manager.rs b/src/c_api/socket_manager.rs index 79433bb..1f34e99 100644 --- a/src/c_api/socket_manager.rs +++ b/src/c_api/socket_manager.rs @@ -1,7 +1,6 @@ use crate::c_api::on_conn::OnConnObj; use crate::c_api::utils::{socket_addr, write_display_c_str}; use crate::SocketManager; -use libc::size_t; use std::ffi::{c_char, c_int}; use std::ptr::null_mut; use std::time::Duration; @@ -28,7 +27,7 @@ use std::time::Duration; #[no_mangle] pub unsafe extern "C" fn socket_manager_init( on_conn: OnConnObj, - n_threads: size_t, + n_threads: usize, err: *mut *mut c_char, ) -> *mut SocketManager { match SocketManager::init(on_conn, n_threads) { diff --git a/src/c_api/tracer.rs b/src/c_api/tracer.rs index e00322e..41b59d0 100644 --- a/src/c_api/tracer.rs +++ b/src/c_api/tracer.rs @@ -1,7 +1,6 @@ //! Define a layer to pass log message to foreign interface. use super::utils::write_display_c_str; use crate::init_logger; -use libc::size_t; use std::{ ffi::{c_char, c_int}, fmt, @@ -47,14 +46,14 @@ pub enum TraceLevel { pub struct LogData { pub level: TraceLevel, pub target: *const c_char, - pub target_n: size_t, + pub target_n: usize, pub file: *const c_char, - pub file_n: size_t, + pub file_n: usize, /// -1 if not available pub line: c_int, /// The `message` pointer is only valid for the duration of the callback. pub message: *const c_char, - pub message_n: size_t, + pub message_n: usize, } /// Init logger. diff --git a/src/c_api/utils.rs b/src/c_api/utils.rs index 8a6cf98..89d8583 100644 --- a/src/c_api/utils.rs +++ b/src/c_api/utils.rs @@ -2,6 +2,16 @@ use std::ffi::{c_char, c_void, CStr, CString}; use std::fmt::Display; use std::net::SocketAddr; +/// cbindgen:ignore +mod libc { + use super::*; + extern "C" { + pub fn free(p: *mut c_void); + pub fn malloc(size: usize) -> *mut c_void; + pub fn strcpy(dst: *mut c_char, src: *const c_char) -> *mut c_char; + } +} + /// Parse a C string into a `SocketAddr`. pub(crate) unsafe fn socket_addr(addr: *const c_char) -> std::io::Result { if addr.is_null() { diff --git a/src/msg_sender.rs b/src/msg_sender.rs index 919b08f..794485a 100644 --- a/src/msg_sender.rs +++ b/src/msg_sender.rs @@ -1,7 +1,8 @@ use async_ringbuf::traits::{AsyncProducer, Producer, Split}; use async_ringbuf::wrap::{AsyncCons, AsyncProd}; use async_ringbuf::AsyncHeapRb; -use futures::AsyncWriteExt; +use std::future::poll_fn; +use std::pin::Pin; use std::sync::Arc; use std::task::Poll::{Pending, Ready}; use std::task::{Poll, Waker}; @@ -78,8 +79,17 @@ impl MsgSender { return Ok(()); } // unfinished, enter into future - self.handle - .block_on(self.ring_buf.write_all(&bytes[offset..])) + let write_all = async { + while offset < bytes.len() { + offset += poll_fn(|cx| { + let ring_buf = Pin::new(&mut self.ring_buf); + ring_buf.poll_write(cx, &bytes[offset..]) + }) + .await?; + } + Ok(()) + }; + self.handle.block_on(write_all) } /// The non-blocking API for sending bytes. From a000a6dd23b5d34ddbc5d2ce55bff7ad7b7bb0b2 Mon Sep 17 00:00:00 2001 From: Congyu Date: Tue, 20 Feb 2024 12:37:22 +0800 Subject: [PATCH 6/6] bump version --- CMakeLists.txt | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index fa31e46..83a8f35 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,7 +5,7 @@ set(CMAKE_CXX_STANDARD 17) set(CMAKE_TOOLCHAIN_FILE ${CMAKE_SOURCE_DIR}/toolchain.cmake) # define project -project(socket_manager LANGUAGES C CXX VERSION 0.5.1) +project(socket_manager LANGUAGES C CXX VERSION 0.5.2) # set default build type as shared option(BUILD_SHARED_LIBS "Build using shared libraries" ON) diff --git a/Cargo.toml b/Cargo.toml index 28e055a..6b9d061 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tokio-socket-manager" -version = "0.5.1" +version = "0.5.2" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html