Skip to content

Messages retried through FailedMessagesRetryCommand do not take retry mechanism into account #35449

Closed
@chq81

Description

@chq81

Symfony version(s) affected: 4.4.2 upwards and 5.0.*

Description
I noticed that despite having a retry mechanism in place for a failure transport, messages does not get retried if run by the FailureMessagesRetryCommand.
This is due to the fact that the provided maximum number of messages for the StopWorkerOnMessageLimitListener in the command is 1, which results in the listener stopping the worker after one attempted handling of the message, not taking the retry mechanism into account.

How to reproduce

This is the retry mechanism (making use of the doctrine transport):

failure:
    dsn: 'doctrine://database'
    options:
        table_name: failed_messages
        auto_setup: false
    retry_strategy:
        max_retries: 3
        delay: 1000
        multiplier: 10
        max_delay: 0

When consuming a message in the respective handler, just throw an exception.
When running it the first time, it will get retried three times.

Then, run php bin/console messenger:failed:retry -vv.
You will notice it just runs once before the worker is stopped.

Additional context

See the following code fragments:

Symfony\Component\Messenger\Command\FailedMessagesRetryCommand

protected function execute(InputInterface $input, OutputInterface $output)
{
    $this->eventDispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1));
    ...
}

Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener

public function onWorkerRunning(WorkerRunningEvent $event): void
{
    if (!$event->isWorkerIdle() && ++$this->receivedMessages >= $this->maximumNumberOfMessages) {
        $this->receivedMessages = 0;
        $event->getWorker()->stop();

        if (null !== $this->logger) {
            $this->logger->info('Worker stopped due to maximum count of {count} messages processed', ['count' => $this->maximumNumberOfMessages]);
        }
    }
}

Symfony\Component\Messenger\Worker

public function run(array $options = []): void
{
       ...

        while (false === $this->shouldStop) {
            $envelopeHandled = false;
            foreach ($this->receivers as $transportName => $receiver) {
                $envelopes = $receiver->get();

                foreach ($envelopes as $envelope) {
                    $envelopeHandled = true;

                    $this->handleMessage($envelope, $receiver, $transportName);
                    $this->dispatchEvent(new WorkerRunningEvent($this, false));

                    if ($this->shouldStop) {  <--- This is where the worker stops despite the retry mechanism.
                        break 2;
                    }
                }

                // after handling a single receiver, quit and start the loop again
                // this should prevent multiple lower priority receivers from
                // blocking too long before the higher priority are checked
                if ($envelopeHandled) {
                    break;
                }
            }

            if (false === $envelopeHandled) {
                $this->dispatchEvent(new WorkerRunningEvent($this, true));

                usleep($options['sleep']);
            }
        }

        $this->dispatchEvent(new WorkerStoppedEvent($this));
    }

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions