Description
Symfony version(s) affected: 4.4.2 upwards and 5.0.*
Description
I noticed that despite having a retry mechanism in place for a failure transport, messages does not get retried if run by the FailureMessagesRetryCommand.
This is due to the fact that the provided maximum number of messages for the StopWorkerOnMessageLimitListener in the command is 1, which results in the listener stopping the worker after one attempted handling of the message, not taking the retry mechanism into account.
How to reproduce
This is the retry mechanism (making use of the doctrine transport):
failure:
dsn: 'doctrine://database'
options:
table_name: failed_messages
auto_setup: false
retry_strategy:
max_retries: 3
delay: 1000
multiplier: 10
max_delay: 0
When consuming a message in the respective handler, just throw an exception.
When running it the first time, it will get retried three times.
Then, run php bin/console messenger:failed:retry -vv
.
You will notice it just runs once before the worker is stopped.
Additional context
See the following code fragments:
Symfony\Component\Messenger\Command\FailedMessagesRetryCommand
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
...
}
Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener
public function onWorkerRunning(WorkerRunningEvent $event): void
{
if (!$event->isWorkerIdle() && ++$this->receivedMessages >= $this->maximumNumberOfMessages) {
$this->receivedMessages = 0;
$event->getWorker()->stop();
if (null !== $this->logger) {
$this->logger->info('Worker stopped due to maximum count of {count} messages processed', ['count' => $this->maximumNumberOfMessages]);
}
}
}
Symfony\Component\Messenger\Worker
public function run(array $options = []): void
{
...
while (false === $this->shouldStop) {
$envelopeHandled = false;
foreach ($this->receivers as $transportName => $receiver) {
$envelopes = $receiver->get();
foreach ($envelopes as $envelope) {
$envelopeHandled = true;
$this->handleMessage($envelope, $receiver, $transportName);
$this->dispatchEvent(new WorkerRunningEvent($this, false));
if ($this->shouldStop) { <--- This is where the worker stops despite the retry mechanism.
break 2;
}
}
// after handling a single receiver, quit and start the loop again
// this should prevent multiple lower priority receivers from
// blocking too long before the higher priority are checked
if ($envelopeHandled) {
break;
}
}
if (false === $envelopeHandled) {
$this->dispatchEvent(new WorkerRunningEvent($this, true));
usleep($options['sleep']);
}
}
$this->dispatchEvent(new WorkerStoppedEvent($this));
}