From 2f04a649a87b32a5b74bc9b60702d220c8d62162 Mon Sep 17 00:00:00 2001 From: valtzu Date: Wed, 23 Aug 2023 22:48:50 +0300 Subject: [PATCH] Fix missing stamps in delayed message handling --- .../DelayedMessageHandlingException.php | 12 ++- .../DispatchAfterCurrentBusMiddleware.php | 2 +- .../Tests/FailureIntegrationTest.php | 86 +++++++++++++++++++ src/Symfony/Component/Messenger/Worker.php | 3 +- 4 files changed, 100 insertions(+), 3 deletions(-) diff --git a/src/Symfony/Component/Messenger/Exception/DelayedMessageHandlingException.php b/src/Symfony/Component/Messenger/Exception/DelayedMessageHandlingException.php index 6934562bda05f..3baafda76e3b9 100644 --- a/src/Symfony/Component/Messenger/Exception/DelayedMessageHandlingException.php +++ b/src/Symfony/Component/Messenger/Exception/DelayedMessageHandlingException.php @@ -11,6 +11,8 @@ namespace Symfony\Component\Messenger\Exception; +use Symfony\Component\Messenger\Envelope; + /** * When handling queued messages from {@link DispatchAfterCurrentBusMiddleware}, * some handlers caused an exception. This exception contains all those handler exceptions. @@ -20,9 +22,12 @@ class DelayedMessageHandlingException extends RuntimeException { private array $exceptions; + private Envelope $envelope; - public function __construct(array $exceptions) + public function __construct(array $exceptions, Envelope $envelope) { + $this->envelope = $envelope; + $exceptionMessages = implode(", \n", array_map( fn (\Throwable $e) => $e::class.': '.$e->getMessage(), $exceptions @@ -43,4 +48,9 @@ public function getExceptions(): array { return $this->exceptions; } + + public function getEnvelope(): Envelope + { + return $this->envelope; + } } diff --git a/src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php b/src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php index bc5618af8d2a4..1842b35b6a10e 100644 --- a/src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php +++ b/src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php @@ -97,7 +97,7 @@ public function handle(Envelope $envelope, StackInterface $stack): Envelope $this->isRootDispatchCallRunning = false; if (\count($exceptions) > 0) { - throw new DelayedMessageHandlingException($exceptions); + throw new DelayedMessageHandlingException($exceptions, $returnedEnvelope); } return $returnedEnvelope; diff --git a/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php index a41ee33ac72af..473916aa0106b 100644 --- a/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php @@ -22,14 +22,19 @@ use Symfony\Component\Messenger\EventListener\SendFailedMessageForRetryListener; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; +use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException; use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Handler\HandlerDescriptor; use Symfony\Component\Messenger\Handler\HandlersLocator; use Symfony\Component\Messenger\MessageBus; +use Symfony\Component\Messenger\Middleware\AddBusNameStampMiddleware; +use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware; use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Retry\MultiplierRetryStrategy; +use Symfony\Component\Messenger\Stamp\BusNameStamp; +use Symfony\Component\Messenger\Stamp\DispatchAfterCurrentBusStamp; use Symfony\Component\Messenger\Stamp\ErrorDetailsStamp; use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage; @@ -354,6 +359,87 @@ public function testMultipleFailedTransportsWithoutGlobalFailureTransport() // After the message fails again, the message is discarded from the "the_failure_transport2" $this->assertCount(0, $failureTransport2->getMessagesWaitingToBeReceived()); } + + public function testStampsAddedByMiddlewaresDontDisappearWhenDelayedMessageFails() + { + $transport1 = new DummyFailureTestSenderAndReceiver(); + + $transports = [ + 'transport1' => $transport1, + ]; + + $locator = $this->createMock(ContainerInterface::class); + $locator->expects($this->any()) + ->method('has') + ->willReturn(true); + $locator->expects($this->any()) + ->method('get') + ->willReturnCallback(fn ($transportName) => $transports[$transportName]); + $senderLocator = new SendersLocator([], $locator); + + $retryStrategyLocator = $this->createMock(ContainerInterface::class); + $retryStrategyLocator->expects($this->any()) + ->method('has') + ->willReturn(true); + $retryStrategyLocator->expects($this->any()) + ->method('get') + ->willReturn(new MultiplierRetryStrategy(1)); + + $syncHandlerThatFails = new DummyTestHandler(true); + + $middlewareStack = new \ArrayIterator([ + new AddBusNameStampMiddleware('some.bus'), + new DispatchAfterCurrentBusMiddleware(), + new SendMessageMiddleware($senderLocator), + ]); + + $bus = new MessageBus($middlewareStack); + + $transport1Handler = fn () => $bus->dispatch(new \stdClass(), [new DispatchAfterCurrentBusStamp()]); + + $handlerLocator = new HandlersLocator([ + DummyMessage::class => [new HandlerDescriptor($transport1Handler)], + \stdClass::class => [new HandlerDescriptor($syncHandlerThatFails)], + ]); + + $middlewareStack->append(new HandleMessageMiddleware($handlerLocator)); + + $dispatcher = new EventDispatcher(); + + $dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator, $retryStrategyLocator)); + $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); + + $runWorker = function (string $transportName) use ($transports, $bus, $dispatcher): ?\Throwable { + $throwable = null; + $failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) { + $throwable = $event->getThrowable(); + }; + $dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener); + + $worker = new Worker([$transportName => $transports[$transportName]], $bus, $dispatcher); + + $worker->run(); + + $dispatcher->removeListener(WorkerMessageFailedEvent::class, $failedListener); + + return $throwable; + }; + + // Simulate receive from external source + $transport1->send(new Envelope(new DummyMessage('API'))); + + // Receive the message from "transport1" + $throwable = $runWorker('transport1'); + + $this->assertInstanceOf(DelayedMessageHandlingException::class, $throwable, $throwable->getMessage()); + $this->assertSame(1, $syncHandlerThatFails->getTimesCalled()); + + $messagesWaiting = $transport1->getMessagesWaitingToBeReceived(); + + // Stamps should not be dropped on message that's queued for retry + $this->assertCount(1, $messagesWaiting); + $this->assertSame('some.bus', $messagesWaiting[0]->last(BusNameStamp::class)?->getBusName()); + } } class DummyFailureTestSenderAndReceiver implements ReceiverInterface, SenderInterface diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 606d4bbd387a2..103dbf5d93e78 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -22,6 +22,7 @@ use Symfony\Component\Messenger\Event\WorkerRunningEvent; use Symfony\Component\Messenger\Event\WorkerStartedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent; +use Symfony\Component\Messenger\Exception\DelayedMessageHandlingException; use Symfony\Component\Messenger\Exception\HandlerFailedException; use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; use Symfony\Component\Messenger\Exception\RuntimeException; @@ -186,7 +187,7 @@ private function ack(): bool $receiver->reject($envelope); } - if ($e instanceof HandlerFailedException) { + if ($e instanceof HandlerFailedException || $e instanceof DelayedMessageHandlingException) { $envelope = $e->getEnvelope(); }