Skip to content

Commit 0610387

Browse files
authored
Updated version that uses signals better. especially SIGCHILD
1 parent db0cab1 commit 0610387

File tree

2 files changed

+193
-118
lines changed

2 files changed

+193
-118
lines changed

pkg/enqueue/Symfony/ConsumeDaemon.php

Lines changed: 0 additions & 118 deletions
This file was deleted.

pkg/enqueue/Symfony/Daemon.php

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
<?php
2+
namespace Enqueue\Symfony;
3+
4+
use Symfony\Component\Process\Process;
5+
use Symfony\Component\Process\ProcessBuilder;
6+
7+
class Daemon
8+
{
9+
/**
10+
* @var ProcessBuilder[]
11+
*/
12+
private $builders;
13+
14+
/**
15+
* @var int[]
16+
*/
17+
private $quantities;
18+
19+
/**
20+
* @var Process[]
21+
*/
22+
private $processes;
23+
24+
/**
25+
* @var bool
26+
*/
27+
private $interrupt;
28+
29+
/**
30+
* @var bool
31+
*/
32+
private $quit;
33+
34+
private $asyncSignals;
35+
36+
public function __construct()
37+
{
38+
$this->builders = [];
39+
$this->quantities = [];
40+
$this->processes = [];
41+
$this->interrupt = false;
42+
$this->quit = false;
43+
$this->asyncSignals = false;
44+
}
45+
46+
public function addWorker($name, $quantity, ProcessBuilder $builder)
47+
{
48+
$this->quantities[$name] = $quantity;
49+
$this->builders[$name] = $builder;
50+
}
51+
52+
public function start()
53+
{
54+
if (function_exists('pcntl_async_signals')) {
55+
pcntl_async_signals(true);
56+
$this->asyncSignals = true;
57+
}
58+
59+
pcntl_signal(SIGCHLD, [$this, 'handleSignal']);
60+
pcntl_signal(SIGTERM, [$this, 'handleSignal']);
61+
pcntl_signal(SIGQUIT, [$this, 'handleSignal']);
62+
pcntl_signal(SIGINT, [$this, 'handleSignal']);
63+
pcntl_signal(SIGUSR1, [$this, 'handleSignal']);
64+
65+
foreach (array_keys($this->builders) as $name) {
66+
foreach (range(1, $this->quantities[$name]) as $index) {
67+
$workerId = $name.$index;
68+
69+
$this->processes[$workerId] = $this->startWorker($name, $workerId);
70+
}
71+
}
72+
73+
while (true) {
74+
if ($this->quit) {
75+
return;
76+
}
77+
78+
if (false == $this->asyncSignals) {
79+
pcntl_signal_dispatch();
80+
}
81+
82+
foreach($this->processes as $process) {
83+
// reads pipes internally.
84+
$process->getStatus();
85+
}
86+
87+
usleep(2000000); // 100ms
88+
}
89+
}
90+
91+
/**
92+
* @param string $name
93+
* @param string $workerId
94+
*
95+
* @return Process
96+
*/
97+
private function startWorker($name, $workerId)
98+
{
99+
echo sprintf('Starting worker %s', $workerId).PHP_EOL;
100+
101+
$process = $this->builders[$name]->getProcess();
102+
$process->start(function($type, $buffer) use ($workerId) {
103+
echo str_replace(PHP_EOL, PHP_EOL . $workerId . " | ", $buffer);
104+
});
105+
106+
if (false == $process->isStarted()) {
107+
throw new \LogicException(sprintf('Cannot start a worker process %s', $workerId));
108+
}
109+
110+
$process->name = $name;
111+
$process->workerId = $workerId;
112+
113+
return $process;
114+
}
115+
116+
/**
117+
* @param int $signal
118+
*/
119+
public function handleSignal($signal)
120+
{
121+
switch ($signal) {
122+
case SIGCHLD:
123+
if ($this->interrupt || $this->quit) {
124+
break;
125+
}
126+
127+
foreach ($this->processes as $workerId => $process) {
128+
if ($process->isRunning()) {
129+
continue;
130+
}
131+
132+
echo sprintf('Restarting stopped child process %s', $workerId).PHP_EOL;
133+
134+
$this->processes[$workerId] = $this->startWorker($process->name, $workerId);
135+
}
136+
137+
break;
138+
case SIGUSR1:
139+
if ($this->interrupt || $this->quit) {
140+
break;
141+
}
142+
143+
echo 'Reloading child processes.'.PHP_EOL;
144+
145+
foreach ($this->processes as $workerId => $process) {
146+
if (false == $process->isRunning()) {
147+
continue;
148+
}
149+
150+
$process->stop(5, SIGTERM);
151+
}
152+
153+
break;
154+
case SIGTERM: // 15 : supervisor default stop
155+
case SIGQUIT: // 3 : kill -s QUIT
156+
case SIGINT: // 2 : ctrl+c
157+
if ($this->interrupt || $this->quit) {
158+
break;
159+
}
160+
161+
echo 'Stopping child processes.'.PHP_EOL;
162+
163+
$this->interrupt = true;
164+
165+
foreach ($this->processes as $workerId => $process) {
166+
$process->signal(SIGTERM);
167+
}
168+
169+
$limit = microtime(true) + 3;
170+
while ($this->processes || microtime(true) < $limit) {
171+
foreach ($this->processes as $workerId => $process) {
172+
if (false == $process->isRunning()) {
173+
unset($this->processes[$workerId]);
174+
}
175+
}
176+
}
177+
178+
if ($this->processes) {
179+
foreach ($this->processes as $workerId => $process) {
180+
echo sprintf('Killing child process %s', $workerId).PHP_EOL;
181+
$process->stop(1, SIGKILL);
182+
}
183+
}
184+
185+
$this->quit = true;
186+
187+
break;
188+
default:
189+
echo sprintf('Caught signal %d is not handled.', $signal).PHP_EOL;
190+
break;
191+
}
192+
}
193+
}

0 commit comments

Comments
 (0)