Skip to content

Commit 0a3be1a

Browse files
authored
add core tests (swoole#4179)
* add core tests * fix * add tests * optimize tests * rename, optimize * revert
1 parent 7fe0940 commit 0a3be1a

File tree

12 files changed

+250
-101
lines changed

12 files changed

+250
-101
lines changed

core-tests/include/test_core.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#define TEST_HTTP_PROXY_HOST "127.0.0.1"
2626

2727
#define ASSERT_MEMEQ(x,y,n) ASSERT_EQ(memcmp((x), (y), n), 0)
28+
#define EXPECT_MEMEQ(x,y,n) EXPECT_EQ(memcmp((x), (y), n), 0)
2829

2930
namespace swoole { namespace test {
3031
const std::string &get_root_path();

core-tests/src/os/process_pool.cpp

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#include "test_core.h"
2+
#include "swoole_process_pool.h"
3+
4+
using namespace swoole;
5+
6+
static void test_func(ProcessPool &pool) {
7+
EventData data{};
8+
data.info.len = strlen(TEST_JPG_MD5SUM);
9+
strcpy(data.data, TEST_JPG_MD5SUM);
10+
11+
int worker_id = -1;
12+
ASSERT_EQ(pool.dispatch_blocking(&data, &worker_id), SW_OK);
13+
14+
pool.running = true;
15+
pool.onTask = [](ProcessPool *pool, EventData *task) -> int {
16+
pool->running = false;
17+
EXPECT_MEMEQ(task->data, TEST_JPG_MD5SUM, task->info.len);
18+
return 0;
19+
};
20+
pool.main_loop(&pool, pool.get_worker(0));
21+
pool.destroy();
22+
}
23+
24+
TEST(process_pool, tcp) {
25+
ProcessPool pool{};
26+
ASSERT_EQ(pool.create(1, 0, SW_IPC_SOCKET), SW_OK);
27+
ASSERT_EQ(pool.listen(TEST_HOST, TEST_PORT, 128), SW_OK);
28+
29+
test_func(pool);
30+
}
31+
32+
TEST(process_pool, unix_sock) {
33+
ProcessPool pool{};
34+
signal(SIGPIPE, SIG_IGN);
35+
ASSERT_EQ(pool.create(1, 0, SW_IPC_SOCKET), SW_OK);
36+
ASSERT_EQ(pool.listen(TEST_TMP_FILE, 128), SW_OK);
37+
38+
test_func(pool);
39+
}
40+
41+
TEST(process_pool, tcp_raw) {
42+
ProcessPool pool{};
43+
constexpr int size = 2*1024*1024;
44+
ASSERT_EQ(pool.create(1, 0, SW_IPC_SOCKET), SW_OK);
45+
ASSERT_EQ(pool.listen(TEST_HOST, TEST_PORT, 128), SW_OK);
46+
pool.set_protocol(0, size);
47+
48+
String data(size);
49+
data.append_random_bytes(size-1);
50+
data.append("\0");
51+
52+
ASSERT_EQ(pool.dispatch_blocking(data.str, data.length), SW_OK);
53+
54+
pool.running = true;
55+
pool.ptr = &data;
56+
pool.onMessage = [](ProcessPool *pool, const char *recv_data, uint32_t len) -> void {
57+
pool->running = false;
58+
String *_data = (String *) pool->ptr;
59+
EXPECT_MEMEQ(_data->str, recv_data, len);
60+
};
61+
pool.main_loop(&pool, pool.get_worker(0));
62+
pool.destroy();
63+
}
64+
65+
TEST(process_pool, msgqueue) {
66+
ProcessPool pool{};
67+
ASSERT_EQ(pool.create(1, 0x9501, SW_IPC_MSGQUEUE), SW_OK);
68+
69+
test_func(pool);
70+
}
71+
72+
constexpr int magic_number = 99900011;
73+
static ProcessPool *current_pool = nullptr;
74+
75+
TEST(process_pool, shutdown) {
76+
ProcessPool pool{};
77+
int *shm_value = (int *) sw_mem_pool()->alloc(sizeof(int));
78+
ASSERT_EQ(pool.create(1, 0x9501, SW_IPC_MSGQUEUE), SW_OK);
79+
80+
// init
81+
pool.set_protocol(1, 8192);
82+
pool.ptr = shm_value;
83+
pool.onWorkerStart = [](ProcessPool *pool, int worker_id) {
84+
int *shm_value = (int *) pool->ptr;
85+
*shm_value = magic_number;
86+
usleep(1);
87+
};
88+
89+
pool.onTask = [](ProcessPool *pool, EventData *task) -> int {
90+
kill(pool->master_pid, SIGTERM);
91+
92+
return 0;
93+
};
94+
95+
current_pool = &pool;
96+
sysv_signal(SIGTERM, [](int sig) {
97+
current_pool->running = false;
98+
});
99+
100+
// start
101+
ASSERT_EQ(pool.start(), SW_OK);
102+
103+
EventData msg{};
104+
msg.info.len = 128;
105+
swoole_random_string(msg.data, msg.info.len);
106+
int worker_id = -1;
107+
pool.dispatch_blocking(&msg, &worker_id);
108+
109+
// wait
110+
ASSERT_EQ(pool.wait(), SW_OK);
111+
112+
// shutdown
113+
pool.shutdown();
114+
pool.destroy();
115+
116+
ASSERT_EQ(*shm_value, magic_number);
117+
}

core-tests/src/os/timer.cpp

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,24 @@
2020
#include "test_core.h"
2121
#include "swoole_util.h"
2222

23-
static int timer1_count = 0;
24-
static int timer2_count = 0;
25-
static int timer_running = false;
26-
2723
using swoole::Timer;
2824
using swoole::TimerNode;
2925

3026
TEST(timer, sys) {
3127
SwooleG.use_signalfd = 0;
32-
timer_running = true;
33-
timer1_count = timer2_count = 0;
28+
int timer1_count = 0;
29+
int timer2_count = 0;
30+
int timer_running = true;
3431

3532
uint64_t ms1 = swoole::time<std::chrono::milliseconds>();
3633

3734
swoole_timer_add(
38-
20, false, [](Timer *, TimerNode *) { timer1_count++; }, nullptr);
35+
20, false, [&](Timer *, TimerNode *) { timer1_count++; }, nullptr);
3936

4037
swoole_timer_add(
4138
100,
4239
true,
43-
[](Timer *, TimerNode *tnode) {
40+
[&](Timer *, TimerNode *tnode) {
4441
timer2_count++;
4542
if (timer2_count == 5) {
4643
swoole_timer_del(tnode);
@@ -63,5 +60,34 @@ TEST(timer, sys) {
6360

6461
swoole_timer_free();
6562

66-
ASSERT_LE(ms2 - ms1, 505);
63+
ASSERT_LE(ms2 - ms1, 510);
64+
ASSERT_EQ(timer1_count, 1);
65+
ASSERT_EQ(timer2_count, 5);
66+
}
67+
68+
TEST(timer, async) {
69+
int timer1_count = 0;
70+
int timer2_count = 0;
71+
72+
swoole_event_init(SW_EVENTLOOP_WAIT_EXIT);
73+
74+
uint64_t ms1 = swoole::time<std::chrono::milliseconds>();
75+
swoole_timer_after(
76+
20, [&](Timer *, TimerNode *) { timer1_count++; }, nullptr);
77+
78+
swoole_timer_tick(
79+
100,
80+
[&](Timer *, TimerNode *tnode) {
81+
timer2_count++;
82+
if (timer2_count == 5) {
83+
swoole_timer_del(tnode);
84+
}
85+
},
86+
nullptr);
87+
88+
swoole_event_wait();
89+
uint64_t ms2 = swoole::time<std::chrono::milliseconds>();
90+
ASSERT_LE(ms2 - ms1, 510);
91+
ASSERT_EQ(timer1_count, 1);
92+
ASSERT_EQ(timer2_count, 5);
6793
}

ext-src/swoole_process_pool.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,8 @@ static PHP_METHOD(swoole_process_pool, __construct) {
307307
}
308308

309309
ProcessPool *pool = (ProcessPool *) emalloc(sizeof(*pool));
310-
if (ProcessPool::create(pool, worker_num, (key_t) msgq_key, ipc_type) < 0) {
310+
*pool = {};
311+
if (pool->create(worker_num, (key_t) msgq_key, (swIPC_type) ipc_type) < 0) {
311312
zend_throw_exception_ex(swoole_exception_ce, errno, "failed to create process pool");
312313
efree(pool);
313314
RETURN_FALSE;
@@ -450,9 +451,9 @@ static PHP_METHOD(swoole_process_pool, listen) {
450451
int ret;
451452
// unix socket
452453
if (SW_STRCASECT(host, l_host, "unix:/")) {
453-
ret = pool->create_unix_socket(host + 5, backlog);
454+
ret = pool->listen(host + 5, backlog);
454455
} else {
455-
ret = pool->create_tcp_socket(host, port, backlog);
456+
ret = pool->listen(host, port, backlog);
456457
}
457458
pool->stream_info_->socket->set_fd_option(0, 1);
458459

ext-src/swoole_server.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3120,7 +3120,7 @@ static PHP_METHOD(swoole_server, taskwait) {
31203120

31213121
sw_atomic_fetch_add(&serv->gs->tasking_num, 1);
31223122

3123-
if (serv->gs->task_workers.dispatch_blocking(&buf, &_dst_worker_id) >= 0) {
3123+
if (serv->gs->task_workers.dispatch_blocking(&buf, &_dst_worker_id) == SW_OK) {
31243124
while (1) {
31253125
if (task_notify_socket->wait_event((int) (timeout * 1000), SW_EVENT_READ) != SW_OK) {
31263126
break;

include/swoole_client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ class Stream {
174174
}
175175
}
176176
~Stream();
177-
static int recv_blocking(Socket *sock, void *__buf, size_t __len);
177+
static ssize_t recv_blocking(Socket *sock, void *__buf, size_t __len);
178178
static void set_protocol(Protocol *protocol);
179179

180180
private:

include/swoole_process_pool.h

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,9 @@ class ExitStatus {
4545
private:
4646
pid_t pid_;
4747
int status_;
48-
public:
49-
ExitStatus(pid_t _pid, int _status) : pid_(_pid), status_(_status) {
5048

51-
}
49+
public:
50+
ExitStatus(pid_t _pid, int _status) : pid_(_pid), status_(_status) {}
5251

5352
pid_t get_pid() const {
5453
return pid_;
@@ -163,6 +162,7 @@ struct StreamInfo {
163162
network::Socket *socket;
164163
network::Socket *last_connection;
165164
char *socket_file;
165+
int socket_port;
166166
String *response_buffer;
167167
};
168168

@@ -176,6 +176,7 @@ struct ProcessPool {
176176
bool started;
177177
uint8_t dispatch_mode;
178178
uint8_t ipc_mode;
179+
pid_t master_pid;
179180
uint32_t reload_worker_i;
180181
uint32_t max_wait_time;
181182
Worker *reload_workers;
@@ -218,13 +219,12 @@ struct ProcessPool {
218219
uint8_t scheduler_warning;
219220
time_t warning_time;
220221

221-
int (*onTask)(ProcessPool *pool, swEventData *task);
222+
int (*onTask)(ProcessPool *pool, EventData *task);
222223
void (*onWorkerStart)(ProcessPool *pool, int worker_id);
223224
void (*onMessage)(ProcessPool *pool, const char *data, uint32_t length);
224225
void (*onWorkerStop)(ProcessPool *pool, int worker_id);
225-
226-
int (*main_loop)(ProcessPool *pool, Worker *worker);
227226
int (*onWorkerNotFound)(ProcessPool *pool, const ExitStatus &exit_status);
227+
int (*main_loop)(ProcessPool *pool, Worker *worker);
228228

229229
sw_atomic_t round_id;
230230

@@ -266,15 +266,15 @@ struct ProcessPool {
266266
pid_t spawn(Worker *worker);
267267
int dispatch(EventData *data, int *worker_id);
268268
int response(const char *data, int length);
269-
int dispatch_blocking(swEventData *data, int *dst_worker_id);
269+
int dispatch_blocking(EventData *data, int *dst_worker_id);
270+
int dispatch_blocking(const char *data, uint32_t len);
270271
int add_worker(Worker *worker);
271272
int del_worker(Worker *worker);
272273
void destroy();
273-
int create_unix_socket(const char *socket_file, int blacklog);
274-
int create_tcp_socket(const char *host, int port, int blacklog);
274+
int create(uint32_t worker_num, key_t msgqueue_key = 0, swIPC_type ipc_mode = SW_IPC_NONE);
275+
int listen(const char *socket_file, int blacklog);
276+
int listen(const char *host, int port, int blacklog);
275277
int schedule();
276-
277-
static int create(ProcessPool *pool, uint32_t worker_num, key_t msgqueue_key, int ipc_mode);
278278
};
279279
}; // namespace swoole
280280

src/network/stream.cc

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -128,26 +128,17 @@ int Stream::send(const char *data, size_t length) {
128128
return SW_OK;
129129
}
130130

131-
int Stream::recv_blocking(Socket *sock, void *__buf, size_t __len) {
131+
ssize_t Stream::recv_blocking(Socket *sock, void *__buf, size_t __len) {
132132
int tmp = 0;
133133
ssize_t ret = sock->recv_blocking(&tmp, sizeof(tmp), MSG_WAITALL);
134-
135134
if (ret <= 0) {
136-
return SW_CLOSE;
135+
return SW_ERR;
137136
}
138137
int length = (int) ntohl(tmp);
139-
if (length <= 0) {
140-
return SW_CLOSE;
141-
} else if (length > (int) __len) {
142-
return SW_CLOSE;
143-
}
144-
145-
ret = sock->recv_blocking(__buf, length, MSG_WAITALL);
146-
if (ret <= 0) {
147-
return SW_CLOSE;
148-
} else {
149-
return SW_READY;
138+
if (length <= 0 || length > (int) __len) {
139+
return SW_ERR;
150140
}
141+
return sock->recv_blocking(__buf, length, MSG_WAITALL);
151142
}
152143

153144
} // namespace network

0 commit comments

Comments
 (0)