From 7ec6914750667cb9b6b029067876c544a8ccd4ca Mon Sep 17 00:00:00 2001 From: Thibaut Chieux Date: Thu, 30 May 2024 15:29:25 +0200 Subject: [PATCH] [Messenger] Allow to skip message in FailedMessagesRetryCommand Update CHANGELOG message and help message --- src/Symfony/Component/Messenger/CHANGELOG.md | 1 + .../Command/FailedMessagesRetryCommand.php | 13 +++++-- .../Event/WorkerMessageSkipEvent.php | 21 +++++++++++ ...ailedMessageToFailureTransportListener.php | 17 +++++++++ .../FailedMessagesRetryCommandTest.php | 37 +++++++++++++++++++ 5 files changed, 86 insertions(+), 3 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Event/WorkerMessageSkipEvent.php diff --git a/src/Symfony/Component/Messenger/CHANGELOG.md b/src/Symfony/Component/Messenger/CHANGELOG.md index eb35afe06c0c9..3a1944824cc73 100644 --- a/src/Symfony/Component/Messenger/CHANGELOG.md +++ b/src/Symfony/Component/Messenger/CHANGELOG.md @@ -9,6 +9,7 @@ CHANGELOG * Add `#[AsMessage]` attribute with `$transport` parameter for message routing * Add `--format` option to the `messenger:stats` command * Add `getRetryDelay()` method to `RecoverableExceptionInterface` + * Add `skip` option to `messenger:failed:retry` command when run interactively to skip message and requeue it 7.1 --- diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php index 0622b08a0c2ed..aac0ec991059d 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php @@ -23,9 +23,11 @@ use Symfony\Component\Console\Style\SymfonyStyle; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; +use Symfony\Component\Messenger\Event\WorkerMessageSkipEvent; use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; use Symfony\Component\Messenger\MessageBusInterface; use Symfony\Component\Messenger\Stamp\MessageDecodingFailedStamp; +use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface; use Symfony\Component\Messenger\Transport\Receiver\SingleMessageReceiver; @@ -68,8 +70,8 @@ protected function configure(): void php %command.full_name% -The command will interactively ask if each message should be retried -or discarded. +The command will interactively ask if each message should be retried, +discarded or skipped. Some transports support retrying a specific message id, which comes from the messenger:failed:show command. @@ -204,7 +206,8 @@ private function runWorker(string $failureTransportName, ReceiverInterface $rece $this->forceExit = true; try { - $shouldHandle = $shouldForce || 'retry' === $io->choice('Please select an action', ['retry', 'delete'], 'retry'); + $choice = $io->choice('Please select an action', ['retry', 'delete', 'skip'], 'retry'); + $shouldHandle = $shouldForce || 'retry' === $choice; } finally { $this->forceExit = false; } @@ -213,6 +216,10 @@ private function runWorker(string $failureTransportName, ReceiverInterface $rece return; } + if ('skip' === $choice) { + $this->eventDispatcher->dispatch(new WorkerMessageSkipEvent($envelope, $envelope->last(SentToFailureTransportStamp::class)->getOriginalReceiverName())); + } + $messageReceivedEvent->shouldHandle(false); $receiver->reject($envelope); }; diff --git a/src/Symfony/Component/Messenger/Event/WorkerMessageSkipEvent.php b/src/Symfony/Component/Messenger/Event/WorkerMessageSkipEvent.php new file mode 100644 index 0000000000000..41aa9c48788b0 --- /dev/null +++ b/src/Symfony/Component/Messenger/Event/WorkerMessageSkipEvent.php @@ -0,0 +1,21 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Event; + +/** + * Dispatched when a message was skip. + * + * The event name is the class name. + */ +final class WorkerMessageSkipEvent extends AbstractWorkerMessageEvent +{ +} diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php index dd29ce1afd1bd..363403f627437 100644 --- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php @@ -15,6 +15,7 @@ use Psr\Log\LoggerInterface; use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; +use Symfony\Component\Messenger\Event\WorkerMessageSkipEvent; use Symfony\Component\Messenger\Stamp\DelayStamp; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; @@ -65,10 +66,26 @@ public function onMessageFailed(WorkerMessageFailedEvent $event): void $failureSender->send($envelope); } + public function onMessageSkip(WorkerMessageSkipEvent $event): void + { + if (!$this->failureSenders->has($event->getReceiverName())) { + return; + } + + $failureSender = $this->failureSenders->get($event->getReceiverName()); + $envelope = $event->getEnvelope()->with( + new SentToFailureTransportStamp($event->getReceiverName()), + new DelayStamp(0), + ); + + $failureSender->send($envelope); + } + public static function getSubscribedEvents(): array { return [ WorkerMessageFailedEvent::class => ['onMessageFailed', -100], + WorkerMessageSkipEvent::class => ['onMessageSkip', -100], ]; } } diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php index 4b72ff464965f..ec93a9684a1cf 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php @@ -19,6 +19,7 @@ use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\MessageBusInterface; +use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; use Symfony\Component\Messenger\Stamp\TransportMessageIdStamp; use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; @@ -223,4 +224,40 @@ public function testCompleteIdWithSpecifiedTransport() $this->assertSame(['2ab50dfa1fbf', '78c2da843723'], $suggestions); } + + public function testSkipRunWithServiceLocator() + { + $failureTransportName = 'failure_receiver'; + $originalTransportName = 'original_receiver'; + + $serviceLocator = $this->createMock(ServiceLocator::class); + $receiver = $this->createMock(ListableReceiverInterface::class); + + $dispatcher = new EventDispatcher(); + $bus = $this->createMock(MessageBusInterface::class); + + $serviceLocator->method('has')->willReturn(true); + $serviceLocator->method('get')->with($failureTransportName)->willReturn($receiver); + + $receiver->expects($this->once())->method('find') + ->willReturn(Envelope::wrap(new \stdClass(), [ + new SentToFailureTransportStamp($originalTransportName) + ])); + + $receiver->expects($this->never())->method('ack'); + $receiver->expects($this->once())->method('reject'); + + $command = new FailedMessagesRetryCommand( + $failureTransportName, + $serviceLocator, + $bus, + $dispatcher + ); + + $tester = new CommandTester($command); + $tester->setInputs(['skip']); + + $tester->execute(['id' => ['10']]); + $this->assertStringContainsString('[OK]', $tester->getDisplay()); + } }