Skip to content

Commit db0cab1

Browse files
committed
Consume daemon
Allows easily to spawn consume workers. Reload them by USR1 signal and stop corretnly when the daemon is stopped.
1 parent a633b0a commit db0cab1

File tree

1 file changed

+118
-0
lines changed

1 file changed

+118
-0
lines changed

pkg/enqueue/Symfony/ConsumeDaemon.php

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
<?php
2+
namespace Enqueue\Symfony;
3+
4+
use Symfony\Component\Process\Process;
5+
use Symfony\Component\Process\ProcessBuilder;
6+
7+
class ConsumeDaemon
8+
{
9+
/**
10+
* @var ProcessBuilder
11+
*/
12+
private $workerBuilder;
13+
14+
/**
15+
* @var Process[]
16+
*/
17+
private $workers;
18+
19+
/**
20+
* @param ProcessBuilder $workerBuilder
21+
*/
22+
public function __construct(ProcessBuilder $workerBuilder)
23+
{
24+
$this->workerBuilder = $workerBuilder;
25+
$this->workers = [];
26+
$this->interrupt = false;
27+
}
28+
29+
public function start($workersNumber)
30+
{
31+
/** @var Process[] $workers */
32+
$workers = [];
33+
34+
$handleSignal = function($signal) use (&$workers) {
35+
switch ($signal) {
36+
case SIGUSR1:
37+
echo 'Daemon is reloading now.'.PHP_EOL;
38+
39+
foreach ($workers as $id => $worker) {
40+
$worker->signal(SIGQUIT);
41+
}
42+
43+
break;
44+
case SIGTERM: // 15 : supervisor default stop
45+
case SIGQUIT: // 3 : kill -s QUIT
46+
case SIGINT: // 2 : ctrl+c
47+
foreach ($workers as $id => $worker) {
48+
$worker->stop(3, SIGQUIT);
49+
}
50+
51+
while ($workers) {
52+
foreach ($workers as $id => $worker) {
53+
if (false == $worker->isRunning()) {
54+
unset($workers[$id]);
55+
}
56+
}
57+
}
58+
59+
exit;
60+
61+
break;
62+
default:
63+
break;
64+
}
65+
};
66+
67+
pcntl_signal(SIGTERM, $handleSignal);
68+
pcntl_signal(SIGQUIT, $handleSignal);
69+
pcntl_signal(SIGINT, $handleSignal);
70+
pcntl_signal(SIGUSR1, $handleSignal);
71+
72+
foreach (range(1, $workersNumber) as $id) {
73+
$workers[] = $this->startWorker($id);
74+
}
75+
76+
while (true) {
77+
pcntl_signal_dispatch();
78+
79+
foreach ($workers as $id => $worker) {
80+
if (false == $worker->isRunning()) {
81+
echo sprintf('Worker %s exited with status %d', $id, $worker->getExitCode()).PHP_EOL;
82+
83+
unset($this->workers[$id]);
84+
$workers[$id] = $this->startWorker($id);
85+
}
86+
}
87+
88+
pcntl_signal_dispatch();
89+
90+
sleep(1);
91+
}
92+
}
93+
94+
/**
95+
* @param int $workerId
96+
*
97+
* @return Process
98+
*/
99+
public function startWorker($workerId)
100+
{
101+
if (array_key_exists($workerId, $this->workers)) {
102+
throw new \LogicException(sprintf('Such worker %s is already in pool.', $workerId));
103+
}
104+
105+
echo sprintf('Start worker %s', $workerId).PHP_EOL;
106+
107+
$process = $this->workerBuilder->getProcess();
108+
$process->start(function($type, $buffer) use ($workerId) {
109+
echo $workerId.' | '.$buffer;
110+
});
111+
112+
if (false == $process->isStarted()) {
113+
throw new \LogicException('Cannot start a worker process.');
114+
}
115+
116+
return $process;
117+
}
118+
}

0 commit comments

Comments
 (0)