Skip to content

Commit 9619412

Browse files
committed
add function "wait_message" to blocking websocket client incl. example
1 parent e06a8b0 commit 9619412

File tree

7 files changed

+593
-232
lines changed

7 files changed

+593
-232
lines changed

examples/builtin/CMakeLists.txt

+17
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,20 @@ target_compile_definitions(ex_reconnect_builtin PRIVATE
112112
WS_CLIENT_LOG_MSG_SIZES=0
113113
WS_CLIENT_LOG_FRAMES=0
114114
WS_CLIENT_LOG_COMPRESSION=0)
115+
116+
# --------------------------------------
117+
add_executable(ex_wait_message_wss_builtin ex_wait_message_wss_builtin.cpp)
118+
target_link_libraries(ex_wait_message_wss_builtin PRIVATE
119+
OpenSSL::SSL
120+
OpenSSL::Crypto
121+
PkgConfig::ZLIBNG
122+
websocketclient)
123+
target_compile_definitions(ex_wait_message_wss_builtin PRIVATE
124+
WS_CLIENT_USE_ZLIB_NG=1 # Use zlib-ng instead of zlib
125+
WS_CLIENT_USE_SIMD_UTF8=0 # Use simdutf for utf-8 validation
126+
WS_CLIENT_VALIDATE_UTF8=0 # Disable utf-8 validation
127+
WS_CLIENT_LOG_HANDSHAKE=1
128+
WS_CLIENT_LOG_MSG_PAYLOADS=0
129+
WS_CLIENT_LOG_MSG_SIZES=0
130+
WS_CLIENT_LOG_FRAMES=0
131+
WS_CLIENT_LOG_COMPRESSION=0)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
#include <iostream>
2+
#include <string>
3+
#include <variant>
4+
#include <expected>
5+
#include <chrono>
6+
#include <algorithm>
7+
#include <iomanip>
8+
9+
#include "ws_client/ws_client.hpp"
10+
#include "ws_client/transport/builtin/TcpSocket.hpp"
11+
#include "ws_client/transport/builtin/OpenSslSocket.hpp"
12+
#include "ws_client/PermessageDeflate.hpp"
13+
14+
using namespace ws_client;
15+
using namespace std::chrono;
16+
17+
expected<void, WSError> run()
18+
{
19+
// parse URL
20+
WS_TRY(url, URL::parse("wss://fstream.binance.com/ws"));
21+
22+
// websocketclient logger
23+
ConsoleLogger<LogLevel::D> logger;
24+
25+
// resolve hostname
26+
DnsResolver dns(&logger);
27+
WS_TRY(dns_res, dns.resolve(url->host(), url->port_str()));
28+
AddressInfo& addr = (*dns_res)[0];
29+
30+
// create TCP socket
31+
auto tcp = TcpSocket(&logger, std::move(addr));
32+
WS_TRYV(tcp.init());
33+
WS_TRYV(tcp.set_SO_RCVBUF(1 * 1024 * 1024)); // 1 MB
34+
35+
// SSL socket wrapper
36+
OpenSslContext ctx(&logger);
37+
WS_TRYV(ctx.init());
38+
WS_TRYV(ctx.set_default_verify_paths());
39+
auto ssl = OpenSslSocket(&logger, std::move(tcp), &ctx, url->host(), true);
40+
WS_TRYV(ssl.init());
41+
WS_TRYV(ssl.connect(2s)); // 2 sec connect timeout
42+
43+
// websocket client
44+
auto client = WebSocketClient(&logger, std::move(ssl));
45+
46+
// handshake handler
47+
auto handshake = Handshake(&logger, *url);
48+
49+
// enable compression (permessage-deflate extension)
50+
handshake.set_permessage_deflate({
51+
.logger = &logger,
52+
.server_max_window_bits = 15,
53+
.client_max_window_bits = 15,
54+
.server_no_context_takeover = true,
55+
.client_no_context_takeover = true,
56+
.decompress_buffer_size = 2 * 1024 * 1024, // 2 MB
57+
.compress_buffer_size = 2 * 1024 * 1024 // 2 MB
58+
});
59+
60+
// perform handshake
61+
WS_TRYV(client.handshake(handshake, 5s)); // 5 sec timeout
62+
63+
// subscribe
64+
std::string sub_msg = R"({
65+
"method": "SUBSCRIBE",
66+
"params": ["aptusdt@aggTrade"],
67+
"id": 1
68+
})";
69+
Message msg(MessageType::text, sub_msg);
70+
WS_TRYV(client.send_message(msg, {.compress = false}));
71+
72+
// allocate message buffer with 4 KiB initial size and 1 MiB max size
73+
WS_TRY(buffer, Buffer::create(4096, 1 * 1024 * 1024));
74+
75+
while (true)
76+
{
77+
// wait for message for 3 sec
78+
bool readable = false;
79+
do
80+
{
81+
WS_TRY(read_res, client.wait_message(3s));
82+
if (!(readable = read_res.value()))
83+
logger.log<LogLevel::W>("No message received within 3 sec, continue waiting...");
84+
} while (!readable);
85+
86+
// read message (only 1 sec timeout since we know socket is readable)
87+
variant<Message, PingFrame, PongFrame, CloseFrame, WSError> var = //
88+
client.read_message(*buffer, 1s);
89+
90+
if (auto msg = std::get_if<Message>(&var))
91+
{
92+
std::cout << "Message received: " << msg->to_string_view() << std::endl;
93+
}
94+
else if (auto ping_frame = std::get_if<PingFrame>(&var))
95+
{
96+
logger.log<LogLevel::D>("Ping frame received");
97+
WS_TRYV(client.send_pong_frame(ping_frame->payload_bytes()));
98+
}
99+
else if (std::get_if<PongFrame>(&var))
100+
{
101+
logger.log<LogLevel::D>("Pong frame received");
102+
}
103+
else if (auto close_frame = std::get_if<CloseFrame>(&var))
104+
{
105+
// server initiated close
106+
if (close_frame->has_reason())
107+
{
108+
logger.log<LogLevel::I>(
109+
"Close frame received: " + string(close_frame->get_reason())
110+
);
111+
}
112+
else
113+
logger.log<LogLevel::I>("Close frame received");
114+
break;
115+
}
116+
else if (auto err = std::get_if<WSError>(&var))
117+
{
118+
// error occurred - must close connection
119+
logger.log<LogLevel::E>("Error: " + err->message);
120+
WS_TRYV(client.close(err->close_with_code));
121+
return {};
122+
}
123+
}
124+
125+
return {};
126+
};
127+
128+
129+
int main()
130+
{
131+
auto res = run();
132+
if (!res.has_value())
133+
{
134+
std::cerr << "Error: " << res.error().message << std::endl;
135+
return 2;
136+
}
137+
return 0;
138+
};

include/ws_client/BufferedSocket.hpp

+9
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,15 @@ class BufferedSocket final
5050
return socket_;
5151
}
5252

53+
/**
54+
* Waits for the socket to become readable, without consuming any data.
55+
* Readable is defined as having data application available to read.
56+
*/
57+
[[nodiscard]] expected<bool, WSError> wait_readable(Timeout<>& timeout) noexcept
58+
{
59+
return socket_.wait_readable(timeout);
60+
}
61+
5362
/**
5463
* Reads data from socket into `buffer`.
5564
* Does not guarantee to fill buffer completely, partial reads are possible.

include/ws_client/WebSocketClient.hpp

+21-1
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,25 @@ class WebSocketClient
229229
return expected<void, WSError>{};
230230
}
231231

232+
/**
233+
* Waits until a message is available to read from the WebSocket connection.
234+
* Note that this includes control frames like PING, PONG, and CLOSE frames.
235+
*
236+
* Note that the timeout does not cause an error, but returns `false` if it expires.
237+
*
238+
* @returns `true` if a message is available to read, `false` if timeout expires.
239+
*/
240+
[[nodiscard]] inline expected<bool, WSError> wait_message(std::chrono::milliseconds timeout_ms
241+
) noexcept
242+
{
243+
if (this->closed_)
244+
return WS_ERROR(connection_closed, "Connection in closed state.", close_code::not_set);
245+
246+
Timeout timeout(timeout_ms);
247+
248+
return this->socket_.wait_readable(timeout);
249+
}
250+
232251
/**
233252
* Reads a message from the WebSocket connection.
234253
* The message is read into the provided buffer, which must have enough space to hold the message.
@@ -368,7 +387,8 @@ class WebSocketClient
368387
// read payload directly into decompression buffer
369388
WS_TRY_RAW(
370389
frame_data_compressed_res,
371-
this->permessage_deflate_ctx_->decompress_buffer().append(frame.payload_size)
390+
this->permessage_deflate_ctx_->decompress_buffer().append(frame.payload_size
391+
)
372392
);
373393
WS_TRYV_RAW(this->socket_.read_exact(*frame_data_compressed_res, timeout));
374394
}

include/ws_client/transport/HasSocketOperations.hpp

+6
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,12 @@ concept HasSocketOperations = requires(T t, span<byte> buffer, Timeout<>& timeou
3939
*/
4040
{ t.write_some(buffer, timeout) } -> std::same_as<expected<size_t, WSError>>;
4141

42+
/**
43+
* Waits for the socket to become readable, without consuming any data.
44+
* Readable is defined as having data application available to read.
45+
*/
46+
{ t.wait_readable(timeout) } -> std::same_as<expected<bool, WSError>>;
47+
4248
/**
4349
* Shuts down socket communication.
4450
* This function should be called before closing the socket for a clean shutdown.

0 commit comments

Comments
 (0)