Skip to content

Commit 89bf0e5

Browse files
committed
[Messenger] Notify transports which messages are still being processed, using pcntl_alarm()
1 parent 1c70c98 commit 89bf0e5

15 files changed

+263
-8
lines changed

src/Symfony/Component/Messenger/Bridge/Beanstalkd/CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
7.2
5+
---
6+
7+
* Implement the `KeepaliveReceiverInterface` to enable asynchronously notifying Beanstalkd that the job is still being processed, in order to avoid timeouts
8+
49
5.2.0
510
-----
611

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdReceiverTest.php

+12
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1717
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceiver;
1818
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
19+
use Symfony\Component\Messenger\Envelope;
1920
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
2021
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2122
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
@@ -78,6 +79,17 @@ public function testItRejectTheMessageIfThereIsAMessageDecodingFailedException()
7879
$receiver->get();
7980
}
8081

82+
public function testKeepalive()
83+
{
84+
$serializer = $this->createSerializer();
85+
86+
$connection = $this->createMock(Connection::class);
87+
$connection->expects($this->once())->method('keepalive')->with(1);
88+
89+
$receiver = new BeanstalkdReceiver($connection, $serializer);
90+
$receiver->keepalive(new Envelope(new DummyMessage('foo'), [new BeanstalkdReceivedStamp(1, 'bar')]));
91+
}
92+
8193
private function createBeanstalkdEnvelope(): array
8294
{
8395
return [

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/BeanstalkdTransportTest.php

+13
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
use PHPUnit\Framework\TestCase;
1515
use Symfony\Component\Messenger\Bridge\Beanstalkd\Tests\Fixtures\DummyMessage;
16+
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdReceivedStamp;
1617
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransport;
1718
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\Connection;
1819
use Symfony\Component\Messenger\Envelope;
@@ -50,6 +51,18 @@ public function testReceivesMessages()
5051
$this->assertSame($decodedMessage, $envelopes[0]->getMessage());
5152
}
5253

54+
public function testKeepalive()
55+
{
56+
$transport = $this->getTransport(
57+
null,
58+
$connection = $this->createMock(Connection::class),
59+
);
60+
61+
$connection->expects($this->once())->method('keepalive')->with(1);
62+
63+
$transport->keepalive(new Envelope(new DummyMessage('foo'), [new BeanstalkdReceivedStamp(1, 'bar')]));
64+
}
65+
5366
private function getTransport(?SerializerInterface $serializer = null, ?Connection $connection = null): BeanstalkdTransport
5467
{
5568
$serializer ??= $this->createMock(SerializerInterface::class);

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Tests/Transport/ConnectionTest.php

+33
Original file line numberDiff line numberDiff line change
@@ -330,4 +330,37 @@ public function testSendWhenABeanstalkdExceptionOccurs()
330330

331331
$connection->send($body, $headers, $delay);
332332
}
333+
334+
public function testKeepalive()
335+
{
336+
$id = 123456;
337+
338+
$tube = 'baz';
339+
340+
$client = $this->createMock(PheanstalkInterface::class);
341+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
342+
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id));
343+
344+
$connection = new Connection(['tube_name' => $tube], $client);
345+
346+
$connection->keepalive((string) $id);
347+
}
348+
349+
public function testKeepaliveWhenABeanstalkdExceptionOccurs()
350+
{
351+
$id = 123456;
352+
353+
$tube = 'baz123';
354+
355+
$exception = new ServerException('baz error');
356+
357+
$client = $this->createMock(PheanstalkInterface::class);
358+
$client->expects($this->once())->method('useTube')->with($tube)->willReturn($client);
359+
$client->expects($this->once())->method('touch')->with($this->callback(fn (JobId $jobId): bool => $jobId->getId() === $id))->willThrowException($exception);
360+
361+
$connection = new Connection(['tube_name' => $tube], $client);
362+
363+
$this->expectExceptionObject(new TransportException($exception->getMessage(), 0, $exception));
364+
$connection->keepalive((string) $id);
365+
}
333366
}

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdReceiver.php

+7-2
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
use Symfony\Component\Messenger\Envelope;
1515
use Symfony\Component\Messenger\Exception\LogicException;
1616
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
17+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1718
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
18-
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
1919
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
2020
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2121

2222
/**
2323
* @author Antonio Pauletich <antonio.pauletich95@gmail.com>
2424
*/
25-
class BeanstalkdReceiver implements ReceiverInterface, MessageCountAwareInterface
25+
class BeanstalkdReceiver implements KeepaliveReceiverInterface, MessageCountAwareInterface
2626
{
2727
private SerializerInterface $serializer;
2828

@@ -65,6 +65,11 @@ public function reject(Envelope $envelope): void
6565
$this->connection->reject($this->findBeanstalkdReceivedStamp($envelope)->getId());
6666
}
6767

68+
public function keepalive(Envelope $envelope): void
69+
{
70+
$this->connection->keepalive($this->findBeanstalkdReceivedStamp($envelope)->getId());
71+
}
72+
6873
public function getMessageCount(): int
6974
{
7075
return $this->connection->getMessageCount();

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/BeanstalkdTransport.php

+7-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
namespace Symfony\Component\Messenger\Bridge\Beanstalkd\Transport;
1313

1414
use Symfony\Component\Messenger\Envelope;
15+
use Symfony\Component\Messenger\Transport\Receiver\KeepaliveReceiverInterface;
1516
use Symfony\Component\Messenger\Transport\Receiver\MessageCountAwareInterface;
1617
use Symfony\Component\Messenger\Transport\Serialization\PhpSerializer;
1718
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
@@ -20,7 +21,7 @@
2021
/**
2122
* @author Antonio Pauletich <antonio.pauletich95@gmail.com>
2223
*/
23-
class BeanstalkdTransport implements TransportInterface, MessageCountAwareInterface
24+
class BeanstalkdTransport implements TransportInterface, KeepaliveReceiverInterface, MessageCountAwareInterface
2425
{
2526
private SerializerInterface $serializer;
2627
private BeanstalkdReceiver $receiver;
@@ -48,6 +49,11 @@ public function reject(Envelope $envelope): void
4849
$this->getReceiver()->reject($envelope);
4950
}
5051

52+
public function keepalive(Envelope $envelope): void
53+
{
54+
$this->getReceiver()->keepalive($envelope);
55+
}
56+
5157
public function getMessageCount(): int
5258
{
5359
return $this->getReceiver()->getMessageCount();

src/Symfony/Component/Messenger/Bridge/Beanstalkd/Transport/Connection.php

+9
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,15 @@ public function reject(string $id): void
180180
}
181181
}
182182

183+
public function keepalive(string $id): void
184+
{
185+
try {
186+
$this->client->useTube($this->tube)->touch(new JobId((int) $id));
187+
} catch (Exception $exception) {
188+
throw new TransportException($exception->getMessage(), 0, $exception);
189+
}
190+
}
191+
183192
public function getMessageCount(): int
184193
{
185194
try {

src/Symfony/Component/Messenger/Bridge/Beanstalkd/composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"require": {
1515
"php": ">=8.2",
1616
"pda/pheanstalk": "^4.0",
17-
"symfony/messenger": "^6.4|^7.0"
17+
"symfony/messenger": "^7.2"
1818
},
1919
"require-dev": {
2020
"symfony/property-access": "^6.4|^7.0",

src/Symfony/Component/Messenger/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ CHANGELOG
99
* Add `#[AsMessage]` attribute with `$transport` parameter for message routing
1010
* Add `--format` option to the `messenger:stats` command
1111
* Add `getRetryDelay()` method to `RecoverableExceptionInterface`
12+
* Add the ability to asynchronously notify transports about which messages are still being processed by the handler, using `pcntl_alarm()`
1213

1314
7.1
1415
---

src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php

+19-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
#[AsCommand(name: 'messenger:consume', description: 'Consume messages')]
4444
class ConsumeMessagesCommand extends Command implements SignalableCommandInterface
4545
{
46+
private const DEFAULT_KEEPALIVE_INTERVAL = 5;
47+
4648
private ?Worker $worker = null;
4749

4850
public function __construct(
@@ -75,6 +77,7 @@ protected function configure(): void
7577
new InputOption('queues', null, InputOption::VALUE_REQUIRED | InputOption::VALUE_IS_ARRAY, 'Limit receivers to only consume from the specified queues'),
7678
new InputOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset container services after each message'),
7779
new InputOption('all', null, InputOption::VALUE_NONE, 'Consume messages from all receivers'),
80+
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
7881
])
7982
->setHelp(<<<'EOF'
8083
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
@@ -124,6 +127,13 @@ protected function configure(): void
124127
;
125128
}
126129

130+
protected function initialize(InputInterface $input, OutputInterface $output): void
131+
{
132+
if ($input->hasParameterOption('--keepalive')) {
133+
$this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL));
134+
}
135+
}
136+
127137
protected function interact(InputInterface $input, OutputInterface $output): void
128138
{
129139
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);
@@ -264,7 +274,7 @@ public function complete(CompletionInput $input, CompletionSuggestions $suggesti
264274

265275
public function getSubscribedSignals(): array
266276
{
267-
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT] : []);
277+
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT, \SIGALRM] : []);
268278
}
269279

270280
public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false
@@ -273,6 +283,14 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
273283
return false;
274284
}
275285

286+
if (\SIGALRM === $signal) {
287+
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
288+
289+
$this->worker->keepalive();
290+
291+
return false;
292+
}
293+
276294
$this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $this->worker->getMetadata()->getTransportNames()]);
277295

278296
$this->worker->stop();

src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php

+19-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@
3939
#[AsCommand(name: 'messenger:failed:retry', description: 'Retry one or more messages from the failure transport')]
4040
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand implements SignalableCommandInterface
4141
{
42+
private const DEFAULT_KEEPALIVE_INTERVAL = 5;
43+
4244
private bool $shouldStop = false;
4345
private bool $forceExit = false;
4446
private ?Worker $worker = null;
@@ -62,6 +64,7 @@ protected function configure(): void
6264
new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'),
6365
new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'),
6466
new InputOption('transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport', self::DEFAULT_TRANSPORT_OPTION),
67+
new InputOption('keepalive', null, InputOption::VALUE_OPTIONAL, 'Whether to use the transport\'s keepalive mechanism if implemented', self::DEFAULT_KEEPALIVE_INTERVAL),
6568
])
6669
->setHelp(<<<'EOF'
6770
The <info>%command.name%</info> retries message in the failure transport.
@@ -85,6 +88,13 @@ protected function configure(): void
8588
;
8689
}
8790

91+
protected function initialize(InputInterface $input, OutputInterface $output): void
92+
{
93+
if ($input->hasParameterOption('--keepalive')) {
94+
$this->getApplication()->setAlarmInterval((int) ($input->getOption('keepalive') ?? self::DEFAULT_KEEPALIVE_INTERVAL));
95+
}
96+
}
97+
8898
protected function execute(InputInterface $input, OutputInterface $output): int
8999
{
90100
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
@@ -132,7 +142,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
132142

133143
public function getSubscribedSignals(): array
134144
{
135-
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT] : []);
145+
return $this->signals ?? (\extension_loaded('pcntl') ? [\SIGTERM, \SIGINT, \SIGQUIT, \SIGALRM] : []);
136146
}
137147

138148
public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false
@@ -141,6 +151,14 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
141151
return false;
142152
}
143153

154+
if (\SIGALRM === $signal) {
155+
$this->logger?->info('Sending keepalive request.', ['transport_names' => $this->worker->getMetadata()->getTransportNames()]);
156+
157+
$this->worker->keepalive();
158+
159+
return false;
160+
}
161+
144162
$this->logger?->info('Received signal {signal}.', ['signal' => $signal, 'transport_names' => $this->worker->getMetadata()->getTransportNames()]);
145163

146164
$this->worker->stop();

0 commit comments

Comments
 (0)