Skip to content

Commit 65c5dad

Browse files
committed
Messenger: add duration option to messenger:stop-workers command
1 parent cb08480 commit 65c5dad

File tree

4 files changed

+114
-23
lines changed

4 files changed

+114
-23
lines changed

src/Symfony/Component/Messenger/Command/StopWorkersCommand.php

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
use Psr\Cache\CacheItemPoolInterface;
1515
use Symfony\Component\Console\Attribute\AsCommand;
1616
use Symfony\Component\Console\Command\Command;
17+
use Symfony\Component\Console\Exception\InvalidOptionException;
1718
use Symfony\Component\Console\Input\InputInterface;
19+
use Symfony\Component\Console\Input\InputOption;
1820
use Symfony\Component\Console\Output\ConsoleOutputInterface;
1921
use Symfony\Component\Console\Output\OutputInterface;
2022
use Symfony\Component\Console\Style\SymfonyStyle;
@@ -35,7 +37,9 @@ public function __construct(
3537
protected function configure(): void
3638
{
3739
$this
38-
->setDefinition([])
40+
->setDefinition([
41+
new InputOption('duration', 'd', InputOption::VALUE_REQUIRED, 'Duration in seconds to keep the workers stopped'),
42+
])
3943
->setHelp(<<<'EOF'
4044
The <info>%command.name%</info> command sends a signal to stop any <info>messenger:consume</info> processes that are running.
4145
@@ -44,6 +48,11 @@ protected function configure(): void
4448
Each worker command will finish the message they are currently processing
4549
and then exit. Worker commands are *not* automatically restarted: that
4650
should be handled by a process control system.
51+
52+
Use the --duration option to keep the workers in a paused state (not processing messages) for the given duration (in seconds).
53+
During this time, no messages will be handled, and the workers will not resume until the pause period has passed:
54+
55+
<info>php %command.full_name% --duration=60</info>
4756
EOF
4857
)
4958
;
@@ -53,11 +62,20 @@ protected function execute(InputInterface $input, OutputInterface $output): int
5362
{
5463
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
5564

65+
if (null !== $duration = $input->getOption('duration')) {
66+
if (!is_numeric($duration) || 0 >= $duration) {
67+
throw new InvalidOptionException(\sprintf('Option "duration" must be a positive integer, "%s" passed.', $duration));
68+
}
69+
}
70+
5671
$cacheItem = $this->restartSignalCachePool->getItem(StopWorkerOnRestartSignalListener::RESTART_REQUESTED_TIMESTAMP_KEY);
57-
$cacheItem->set(microtime(true));
72+
$cacheItem->set(microtime(true) + $duration);
5873
$this->restartSignalCachePool->save($cacheItem);
5974

6075
$io->success('Signal successfully sent to stop any running workers.');
76+
if ($duration > 0) {
77+
$io->info(sprintf('Workers will be stopped for next %s seconds.', $duration));
78+
}
6179

6280
return 0;
6381
}

src/Symfony/Component/Messenger/EventListener/StopWorkerOnRestartSignalListener.php

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,15 @@ public function __construct(
3232
) {
3333
}
3434

35-
public function onWorkerStarted(): void
35+
public function onWorkerStarted(WorkerStartedEvent $event): void
3636
{
3737
$this->workerStartedAt = microtime(true);
38+
39+
if ($this->shouldRestart()) {
40+
$event->getWorker()->stop();
41+
$remainingStopSeconds = ceil($this->getEndOfStopTime()) - time();
42+
$this->logger?->info(sprintf('Worker is stopped and message processing is paused for the next %d seconds.', $remainingStopSeconds));
43+
}
3844
}
3945

4046
public function onWorkerRunning(WorkerRunningEvent $event): void
@@ -54,6 +60,11 @@ public static function getSubscribedEvents(): array
5460
}
5561

5662
private function shouldRestart(): bool
63+
{
64+
return $this->workerStartedAt < $this->getEndOfStopTime();
65+
}
66+
67+
private function getEndOfStopTime(): float
5768
{
5869
$cacheItem = $this->cachePool->getItem(self::RESTART_REQUESTED_TIMESTAMP_KEY);
5970

@@ -62,6 +73,6 @@ private function shouldRestart(): bool
6273
return false;
6374
}
6475

65-
return $this->workerStartedAt < $cacheItem->get();
76+
return (float) $cacheItem->get();
6677
}
6778
}

src/Symfony/Component/Messenger/Tests/Command/StopWorkersCommandTest.php

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public function testItSetsCacheItem()
2323
{
2424
$cachePool = $this->createMock(CacheItemPoolInterface::class);
2525
$cacheItem = $this->createMock(CacheItemInterface::class);
26-
$cacheItem->expects($this->once())->method('set');
26+
$cacheItem->expects($this->once())->method('set')->with();
2727
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
2828
$cachePool->expects($this->once())->method('save')->with($cacheItem);
2929

@@ -32,4 +32,21 @@ public function testItSetsCacheItem()
3232
$tester = new CommandTester($command);
3333
$tester->execute([]);
3434
}
35+
36+
/**
37+
* @group time-sensitive
38+
*/
39+
public function testItSetsCacheItemWithDurationAdded()
40+
{
41+
$cachePool = $this->createMock(CacheItemPoolInterface::class);
42+
$cacheItem = $this->createMock(CacheItemInterface::class);
43+
$cacheItem->expects($this->once())->method('set')->with($this->equalToWithDelta(microtime(true), 62)); // "extra" 2 seconds
44+
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
45+
$cachePool->expects($this->once())->method('save')->with($cacheItem);
46+
47+
$command = new StopWorkersCommand($cachePool);
48+
49+
$tester = new CommandTester($command);
50+
$tester->execute(['--duration' => 60]);
51+
}
3552
}

src/Symfony/Component/Messenger/Tests/EventListener/StopWorkerOnRestartSignalListenerTest.php

Lines changed: 63 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
use PHPUnit\Framework\TestCase;
1515
use Psr\Cache\CacheItemInterface;
1616
use Psr\Cache\CacheItemPoolInterface;
17+
use Symfony\Component\Cache\DependencyInjection\CachePoolPass;
1718
use Symfony\Component\Messenger\Event\WorkerRunningEvent;
19+
use Symfony\Component\Messenger\Event\WorkerStartedEvent;
1820
use Symfony\Component\Messenger\EventListener\StopWorkerOnRestartSignalListener;
1921
use Symfony\Component\Messenger\Worker;
2022

@@ -26,44 +28,87 @@ class StopWorkerOnRestartSignalListenerTest extends TestCase
2628
/**
2729
* @dataProvider restartTimeProvider
2830
*/
29-
public function testWorkerStopsWhenMemoryLimitExceeded(?int $lastRestartTimeOffset, bool $shouldStop)
31+
public function testWorkerStopsOnStartIfRestartInCache(?int $lastRestartTimeOffset, bool $shouldStop)
3032
{
31-
$cachePool = $this->createMock(CacheItemPoolInterface::class);
32-
$cacheItem = $this->createMock(CacheItemInterface::class);
33-
$cacheItem->expects($this->once())->method('isHit')->willReturn(true);
34-
$cacheItem->expects($this->once())->method('get')->willReturn(null === $lastRestartTimeOffset ? null : time() + $lastRestartTimeOffset);
35-
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
33+
$cachePool = $this->createRestartInCachePool($lastRestartTimeOffset);
3634

3735
$worker = $this->createMock(Worker::class);
3836
$worker->expects($shouldStop ? $this->once() : $this->never())->method('stop');
39-
$event = new WorkerRunningEvent($worker, false);
37+
$workerStartedEvent = new WorkerStartedEvent($worker);
4038

4139
$stopOnSignalListener = new StopWorkerOnRestartSignalListener($cachePool);
42-
$stopOnSignalListener->onWorkerStarted();
43-
$stopOnSignalListener->onWorkerRunning($event);
40+
$stopOnSignalListener->onWorkerStarted($workerStartedEvent);
4441
}
4542

46-
public static function restartTimeProvider()
43+
/**
44+
* @dataProvider restartTimeProvider
45+
*/
46+
public function testWorkerStopsIfRestartInCache(?int $lastRestartTimeOffset, bool $shouldStop)
47+
{
48+
$cachePool = $this->createRestartInCachePool($lastRestartTimeOffset);
49+
50+
$worker = $this->createMock(Worker::class);
51+
$worker->expects($shouldStop ? $this->atLeast(1) : $this->never())->method('stop');
52+
$workerStartedEvent = new WorkerStartedEvent($worker);
53+
$workerRunningEvent = new WorkerRunningEvent($worker, false);
54+
55+
$stopOnSignalListener = new StopWorkerOnRestartSignalListener($cachePool);
56+
$stopOnSignalListener->onWorkerStarted($workerStartedEvent);
57+
$stopOnSignalListener->onWorkerRunning($workerRunningEvent);
58+
}
59+
60+
public static function restartTimeProvider(): iterable
4761
{
4862
yield [null, false]; // no cached restart time, do not restart
4963
yield [+10, true]; // 10 seconds after starting, a restart was requested
5064
yield [-10, false]; // a restart was requested, but 10 seconds before we started
5165
}
5266

67+
public function testWorkerDoesNotStopOnStartIfRestartNotInCache()
68+
{
69+
$cachePool = $this->createRestartNotInCachePool();
70+
71+
$worker = $this->createMock(Worker::class);
72+
$worker->expects($this->never())->method('stop');
73+
$workerStartedEvent = new WorkerStartedEvent($worker);
74+
75+
$stopOnSignalListener = new StopWorkerOnRestartSignalListener($cachePool);
76+
$stopOnSignalListener->onWorkerStarted($workerStartedEvent);
77+
}
78+
5379
public function testWorkerDoesNotStopIfRestartNotInCache()
5480
{
55-
$cachePool = $this->createMock(CacheItemPoolInterface::class);
56-
$cacheItem = $this->createMock(CacheItemInterface::class);
57-
$cacheItem->expects($this->once())->method('isHit')->willReturn(false);
58-
$cacheItem->expects($this->never())->method('get');
59-
$cachePool->expects($this->once())->method('getItem')->willReturn($cacheItem);
81+
$cachePool = $this->createRestartNotInCachePool();
6082

6183
$worker = $this->createMock(Worker::class);
6284
$worker->expects($this->never())->method('stop');
63-
$event = new WorkerRunningEvent($worker, false);
85+
$workerStartedEvent = new WorkerStartedEvent($worker);
86+
$workerRunningEvent = new WorkerRunningEvent($worker, false);
6487

6588
$stopOnSignalListener = new StopWorkerOnRestartSignalListener($cachePool);
66-
$stopOnSignalListener->onWorkerStarted();
67-
$stopOnSignalListener->onWorkerRunning($event);
89+
$stopOnSignalListener->onWorkerStarted($workerStartedEvent);
90+
$stopOnSignalListener->onWorkerRunning($workerRunningEvent);
91+
}
92+
93+
private function createRestartInCachePool(?int $value): CacheItemPoolInterface
94+
{
95+
$cachePool = $this->createMock(CacheItemPoolInterface::class);
96+
$cacheItem = $this->createMock(CacheItemInterface::class);
97+
$cacheItem->method('isHit')->willReturn(true);
98+
$cacheItem->method('get')->willReturn(null === $value ? null : time() + $value);
99+
$cachePool->method('getItem')->willReturn($cacheItem);
100+
101+
return $cachePool;
102+
}
103+
104+
private function createRestartNotInCachePool(): CacheItemPoolInterface
105+
{
106+
$cachePool = $this->createMock(CacheItemPoolInterface::class);
107+
$cacheItem = $this->createMock(CacheItemInterface::class);
108+
$cacheItem->method('isHit')->willReturn(false);
109+
$cacheItem->method('get');
110+
$cachePool->method('getItem')->willReturn($cacheItem);
111+
112+
return $cachePool;
68113
}
69114
}

0 commit comments

Comments
 (0)