Skip to content

Commit e628afc

Browse files
authored
add Process\Pool::detach() (swoole#4221)
* add Process\Pool::detach() * add tests
1 parent 234b2f3 commit e628afc

File tree

7 files changed

+175
-5
lines changed

7 files changed

+175
-5
lines changed

examples/process_pool/detach.php

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
<?php
2+
use Swoole\Process;
3+
use Swoole\Atomic;
4+
5+
$pool = new Process\Pool(2, SWOOLE_IPC_SOCKET);
6+
7+
$pool->on('WorkerStart', function (Process\Pool $pool, $workerId) {
8+
echo("[Worker #{$workerId}] WorkerStart\n");
9+
if ($workerId == 1) {
10+
11+
}
12+
});
13+
14+
$pool->on('WorkerStop', function (\Swoole\Process\Pool $pool, $workerId) {
15+
echo("[Worker #{$workerId}] WorkerStop\n");
16+
});
17+
18+
$pool->on('Message', function ($pool, $msg) {
19+
var_dump($msg);
20+
$pool->detach();
21+
22+
while(1) {
23+
sleep(1);
24+
echo "pid=".posix_getpid()."\n";
25+
};
26+
});
27+
28+
$pool->listen('127.0.0.1', 8089);
29+
30+
$pool->start();

examples/process_pool/send.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
<?php
2+
$fp = stream_socket_client("tcp://127.0.0.1:8089", $errno, $errstr) or die("error: $errstr\n");
3+
$msg = json_encode(['data' => 'hello', 'uid' => 1991]);
4+
fwrite($fp, pack('N', strlen($msg)) . $msg);
5+
sleep(1);

ext-src/swoole_process_pool.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ static PHP_METHOD(swoole_process_pool, set);
125125
static PHP_METHOD(swoole_process_pool, on);
126126
static PHP_METHOD(swoole_process_pool, listen);
127127
static PHP_METHOD(swoole_process_pool, write);
128+
static PHP_METHOD(swoole_process_pool, detach);
128129
static PHP_METHOD(swoole_process_pool, getProcess);
129130
static PHP_METHOD(swoole_process_pool, start);
130131
static PHP_METHOD(swoole_process_pool, shutdown);
@@ -174,6 +175,7 @@ static const zend_function_entry swoole_process_pool_methods[] =
174175
PHP_ME(swoole_process_pool, getProcess, arginfo_swoole_process_pool_getProcess, ZEND_ACC_PUBLIC)
175176
PHP_ME(swoole_process_pool, listen, arginfo_swoole_process_pool_listen, ZEND_ACC_PUBLIC)
176177
PHP_ME(swoole_process_pool, write, arginfo_swoole_process_pool_write, ZEND_ACC_PUBLIC)
178+
PHP_ME(swoole_process_pool, detach, arginfo_swoole_process_pool_void, ZEND_ACC_PUBLIC)
177179
PHP_ME(swoole_process_pool, start, arginfo_swoole_process_pool_void, ZEND_ACC_PUBLIC)
178180
PHP_ME(swoole_process_pool, shutdown, arginfo_swoole_process_pool_void, ZEND_ACC_PUBLIC)
179181
PHP_FE_END
@@ -266,6 +268,9 @@ static void pool_signal_handler(int sig) {
266268
current_pool->reloading = true;
267269
current_pool->reload_init = false;
268270
break;
271+
case SIGIO:
272+
current_pool->read_message = true;
273+
break;
269274
default:
270275
break;
271276
}
@@ -497,6 +502,7 @@ static PHP_METHOD(swoole_process_pool, start) {
497502
ori_handlers[SIGTERM] = swSignal_set(SIGTERM, pool_signal_handler);
498503
ori_handlers[SIGUSR1] = swSignal_set(SIGUSR1, pool_signal_handler);
499504
ori_handlers[SIGUSR2] = swSignal_set(SIGUSR2, pool_signal_handler);
505+
ori_handlers[SIGIO] = swSignal_set(SIGIO, pool_signal_handler);
500506

501507
if (pool->ipc_mode == SW_IPC_NONE || pp->enable_coroutine) {
502508
if (pp->onWorkerStart == nullptr) {
@@ -542,6 +548,13 @@ static PHP_METHOD(swoole_process_pool, start) {
542548

543549
extern void php_swoole_process_set_worker(zval *zobject, Worker *worker);
544550

551+
static PHP_METHOD(swoole_process_pool, detach) {
552+
if (current_pool == nullptr) {
553+
RETURN_FALSE;
554+
}
555+
RETURN_BOOL(current_pool->detach());
556+
}
557+
545558
static PHP_METHOD(swoole_process_pool, getProcess) {
546559
long worker_id = -1;
547560

include/swoole_process_pool.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include "swoole_lock.h"
2626
#include "swoole_pipe.h"
27+
#include "swoole_channel.h"
2728
#include "swoole_msg_queue.h"
2829

2930
enum swWorker_status {
@@ -173,6 +174,7 @@ struct ProcessPool {
173174
bool reloading;
174175
bool running;
175176
bool reload_init;
177+
bool read_message;
176178
bool started;
177179
uint8_t dispatch_mode;
178180
uint8_t ipc_mode;
@@ -234,6 +236,7 @@ struct ProcessPool {
234236
Reactor *reactor;
235237
MsgQueue *queue;
236238
StreamInfo *stream_info_;
239+
Channel *message_box = nullptr;
237240

238241
void *ptr;
239242

@@ -257,9 +260,18 @@ struct ProcessPool {
257260
return &(workers[worker_id - start_id]);
258261
}
259262

263+
Worker *get_worker_by_pid(pid_t pid) {
264+
auto iter = map_->find(pid);
265+
if (iter == map_->end()) {
266+
return nullptr;
267+
}
268+
return iter->second;
269+
}
270+
260271
void set_max_request(uint32_t _max_request, uint32_t _max_request_grace);
261272
int get_max_request();
262273
int set_protocol(int task_protocol, uint32_t max_packet_size);
274+
bool detach();
263275
int wait();
264276
int start();
265277
void shutdown();

src/os/process_pool.cc

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,11 @@ int ProcessPool::create(uint32_t _worker_num, key_t _msgqueue_key, swIPC_type _i
7676
return SW_ERR;
7777
}
7878

79+
message_box = Channel::make(65536, sizeof(WorkerStopMessage), SW_CHAN_LOCK | SW_CHAN_SHM);
80+
if (message_box == nullptr) {
81+
return SW_ERR;
82+
}
83+
7984
if (_ipc_mode == SW_IPC_MSGQUEUE) {
8085
use_msgqueue = 1;
8186
msgqueue_key = _msgqueue_key;
@@ -605,6 +610,21 @@ int ProcessPool_add_worker(ProcessPool *pool, Worker *worker) {
605610
return SW_OK;
606611
}
607612

613+
bool ProcessPool::detach() {
614+
WorkerStopMessage msg;
615+
msg.pid = getpid();
616+
msg.worker_id = SwooleG.process_id;
617+
618+
if (message_box && message_box->push(&msg, sizeof(msg)) < 0) {
619+
return false;
620+
}
621+
if (swoole_kill(master_pid, SIGIO) < 0) {
622+
return false;
623+
}
624+
running = false;
625+
return true;
626+
}
627+
608628
int ProcessPool::wait() {
609629
pid_t new_pid, reload_worker_pid = 0;
610630
int ret;
@@ -622,6 +642,25 @@ int ProcessPool::wait() {
622642
SwooleG.signal_alarm = false;
623643
SwooleTG.timer->select();
624644
}
645+
if (read_message) {
646+
WorkerStopMessage msg;
647+
while (message_box->pop(&msg, sizeof(msg)) > 0) {
648+
if (!running) {
649+
continue;
650+
}
651+
Worker *exit_worker = get_worker_by_pid(msg.pid);
652+
if (exit_worker == nullptr) {
653+
continue;
654+
}
655+
pid_t new_pid = spawn(exit_worker);
656+
if (new_pid < 0) {
657+
swSysWarn("Fork worker process failed");
658+
return SW_ERR;
659+
}
660+
map_->erase(msg.pid);
661+
}
662+
read_message = false;
663+
}
625664
if (exit_status.get_pid() < 0) {
626665
if (!running) {
627666
break;
@@ -645,8 +684,8 @@ int ProcessPool::wait() {
645684
}
646685

647686
if (running) {
648-
auto iter = map_->find(exit_status.get_pid());
649-
if (iter == map_->end()) {
687+
Worker *exit_worker = get_worker_by_pid(exit_status.get_pid());
688+
if (exit_worker == nullptr) {
650689
if (onWorkerNotFound) {
651690
onWorkerNotFound(this, exit_status);
652691
} else {
@@ -655,7 +694,6 @@ int ProcessPool::wait() {
655694
continue;
656695
}
657696

658-
Worker *exit_worker = iter->second;
659697
if (!exit_status.is_normal_exit()) {
660698
swWarn("worker#%d abnormal exit, status=%d, signal=%d"
661699
"%s",
@@ -732,6 +770,10 @@ void ProcessPool::destroy() {
732770
delete map_;
733771
}
734772

773+
if (message_box) {
774+
message_box->destroy();
775+
}
776+
735777
sw_mem_pool()->free(workers);
736778
}
737779

src/reactor/base.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ int Reactor::_write(Reactor *reactor, Socket *socket, const void *buf, size_t n)
301301
send_bytes = socket->send(buf, n, 0);
302302
return send_bytes;
303303
};
304-
auto append_fn = [&send_bytes, socket, buf, n](Buffer *buffer) {
304+
auto append_fn = [&send_bytes, buf, n](Buffer *buffer) {
305305
ssize_t offset = send_bytes > 0 ? send_bytes : 0;
306306
buffer->append((const char *) buf + offset, n - offset);
307307
};
@@ -325,7 +325,7 @@ int Reactor::_writev(Reactor *reactor, network::Socket *socket, const iovec *iov
325325
send_bytes = socket->writev(iov, iovcnt);
326326
return send_bytes;
327327
};
328-
auto append_fn = [&send_bytes, socket, iov, iovcnt](Buffer *buffer) {
328+
auto append_fn = [&send_bytes, iov, iovcnt](Buffer *buffer) {
329329
ssize_t offset = send_bytes > 0 ? send_bytes : 0;
330330
buffer->append(iov, iovcnt, offset);
331331
};

tests/swoole_process_pool/detach.phpt

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
--TEST--
2+
swoole_process_pool: detach
3+
--SKIPIF--
4+
<?php require __DIR__ . '/../include/skipif.inc';
5+
?>
6+
--FILE--
7+
<?php
8+
require __DIR__ . '/../include/bootstrap.php';
9+
10+
use Swoole\Process\Pool;
11+
use Swoole\Atomic;
12+
13+
const N = 100;
14+
15+
$atomic = new Atomic();
16+
17+
$pm = new ProcessManager;
18+
$pm->initFreePorts();
19+
20+
$pm->parentFunc = function ($pid) use ($pm, $atomic) {
21+
foreach (range(1, 2) as $i) {
22+
$fp = stream_socket_client("tcp://127.0.0.1:".$pm->getFreePort(), $errno, $errstr) or die("error: $errstr\n");
23+
$msg = "HELLO-{$i}";
24+
fwrite($fp, pack('N', strlen($msg)) . $msg);
25+
}
26+
$pm->wait();
27+
Assert::eq($atomic->get(), N + 1);
28+
echo "DONE\n";
29+
$pm->kill();
30+
};
31+
32+
$pm->childFunc = function () use ($pm, $atomic) {
33+
$pool = new Pool(1, SWOOLE_IPC_SOCKET);
34+
35+
$pool->on('WorkerStart', function (Pool $pool, $workerId) use($pm, $atomic) {
36+
echo("[Worker #{$workerId}] WorkerStart\n");
37+
if ($atomic->get() == 0) {
38+
$pm->wakeup();
39+
}
40+
});
41+
42+
$pool->on('Message', function (Pool $pool, $msg) use($pm, $atomic) {
43+
if ($atomic->get() == 0) {
44+
$atomic->add();
45+
$pool->detach();
46+
$n = N;
47+
while($n--) {
48+
usleep(1000);
49+
$atomic->add();
50+
}
51+
$pm->wakeup();
52+
} else {
53+
echo $msg.PHP_EOL;
54+
}
55+
});
56+
57+
$pool->listen('127.0.0.1', $pm->getFreePort());
58+
$pool->start();
59+
};
60+
61+
$pm->childFirst();
62+
$pm->run();
63+
?>
64+
--EXPECT--
65+
[Worker #0] WorkerStart
66+
[Worker #0] WorkerStart
67+
HELLO-2
68+
DONE

0 commit comments

Comments
 (0)