From 354ba2afc400ec2dfb8d81a4b24e2eaccc652b95 Mon Sep 17 00:00:00 2001 From: theravel Date: Thu, 18 Feb 2021 22:56:35 +0100 Subject: [PATCH 1/3] Removed `RejectRedeliveredMessageMiddleware` to avoid message to be lost if it cannot be processed by both handler and failed transport --- .../FrameworkExtension.php | 1 - .../Resources/config/messenger.php | 3 -- .../FrameworkExtensionTest.php | 6 +-- .../RejectRedeliveredMessageException.php | 19 ------- .../RejectRedeliveredMessageMiddleware.php | 50 ------------------- src/Symfony/Component/Messenger/Worker.php | 13 +---- 6 files changed, 3 insertions(+), 89 deletions(-) delete mode 100644 src/Symfony/Component/Messenger/Exception/RejectRedeliveredMessageException.php delete mode 100644 src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index d41e0a97b9569..69463b22b6301 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1803,7 +1803,6 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $defaultMiddleware = [ 'before' => [ ['id' => 'add_bus_name_stamp_middleware'], - ['id' => 'reject_redelivered_message_middleware'], ['id' => 'dispatch_after_current_bus'], ['id' => 'failed_message_processing_middleware'], ], diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index a7d993d47e316..173575bee07b1 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -25,7 +25,6 @@ use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware; use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; -use Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware; use Symfony\Component\Messenger\Middleware\RouterContextMiddleware; use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Middleware\TraceableMiddleware; @@ -91,8 +90,6 @@ service('validator'), ]) - ->set('messenger.middleware.reject_redelivered_message_middleware', RejectRedeliveredMessageMiddleware::class) - ->set('messenger.middleware.failed_message_processing_middleware', FailedMessageProcessingMiddleware::class) ->set('messenger.middleware.traceable', TraceableMiddleware::class) diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index d6a3bf5ae995a..ef92c00cbd955 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -578,7 +578,7 @@ public function testNullSessionHandler() $expected = ['session', 'initialized_session', 'logger', 'session_collector']; $this->assertEquals($expected, array_keys($container->getDefinition('session_listener')->getArgument(0)->getValues())); - $this->assertSame(false, $container->getDefinition('session.storage.factory.native')->getArgument(3)); + $this->assertFalse($container->getDefinition('session.storage.factory.native')->getArgument(3)); } /** @@ -597,7 +597,7 @@ public function testNullSessionHandlerLegacy() $expected = ['session', 'initialized_session', 'logger', 'session_collector']; $this->assertEquals($expected, array_keys($container->getDefinition('session_listener')->getArgument(0)->getValues())); - $this->assertSame(false, $container->getDefinition('session.storage.factory.native')->getArgument(3)); + $this->assertFalse($container->getDefinition('session.storage.factory.native')->getArgument(3)); } public function testRequest() @@ -801,7 +801,6 @@ public function testMessengerWithMultipleBuses() $this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0)); $this->assertEquals([ ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']], - ['id' => 'reject_redelivered_message_middleware'], ['id' => 'dispatch_after_current_bus'], ['id' => 'failed_message_processing_middleware'], ['id' => 'send_message'], @@ -811,7 +810,6 @@ public function testMessengerWithMultipleBuses() $this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0)); $this->assertEquals([ ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']], - ['id' => 'reject_redelivered_message_middleware'], ['id' => 'dispatch_after_current_bus'], ['id' => 'failed_message_processing_middleware'], ['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]], diff --git a/src/Symfony/Component/Messenger/Exception/RejectRedeliveredMessageException.php b/src/Symfony/Component/Messenger/Exception/RejectRedeliveredMessageException.php deleted file mode 100644 index 0befccf4a1d1f..0000000000000 --- a/src/Symfony/Component/Messenger/Exception/RejectRedeliveredMessageException.php +++ /dev/null @@ -1,19 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Exception; - -/** - * @author Tobias Schultze - */ -class RejectRedeliveredMessageException extends RuntimeException -{ -} diff --git a/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php deleted file mode 100644 index 9e994ddd1e01d..0000000000000 --- a/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php +++ /dev/null @@ -1,50 +0,0 @@ - - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -namespace Symfony\Component\Messenger\Middleware; - -use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp; -use Symfony\Component\Messenger\Envelope; -use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; -use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp as LegacyAmqpReceivedStamp; - -/** - * Middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP. - * - * The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly. - * The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy. - * - * AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out - * or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the - * redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent - * infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry - * limit and potential delay. - * - * @author Tobias Schultze - */ -class RejectRedeliveredMessageMiddleware implements MiddlewareInterface -{ - public function handle(Envelope $envelope, StackInterface $stack): Envelope - { - $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class); - if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) { - throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.'); - } - - // Legacy code to support symfony/messenger < 5.1 - $amqpReceivedStamp = $envelope->last(LegacyAmqpReceivedStamp::class); - if ($amqpReceivedStamp instanceof LegacyAmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) { - throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.'); - } - - return $stack->next()->handle($envelope, $stack); - } -} diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index f13edcc2f5a05..64734dbe0a551 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -21,7 +21,6 @@ use Symfony\Component\Messenger\Event\WorkerStartedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent; use Symfony\Component\Messenger\Exception\HandlerFailedException; -use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; use Symfony\Component\Messenger\Exception\RuntimeException; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; @@ -130,13 +129,6 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, try { $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp())); } catch (\Throwable $throwable) { - $rejectFirst = $throwable instanceof RejectRedeliveredMessageException; - if ($rejectFirst) { - // redelivered messages are rejected first so that continuous failures in an event listener or while - // publishing for retry does not cause infinite redelivery loops - $receiver->reject($envelope); - } - if ($throwable instanceof HandlerFailedException) { $envelope = $throwable->getEnvelope(); } @@ -144,10 +136,7 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, $failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $throwable); $this->dispatchEvent($failedEvent); $envelope = $failedEvent->getEnvelope(); - - if (!$rejectFirst) { - $receiver->reject($envelope); - } + $receiver->reject($envelope); return; } From 787973c9bf6e9c895614d472fad9dfdf2f5e2161 Mon Sep 17 00:00:00 2001 From: theravel Date: Thu, 25 Feb 2021 00:39:45 +0100 Subject: [PATCH 2/3] Revert "Removed `RejectRedeliveredMessageMiddleware` to avoid message to be lost if it cannot be processed by both handler and failed transport" This reverts commit 354ba2afc400ec2dfb8d81a4b24e2eaccc652b95. --- .../FrameworkExtension.php | 1 + .../Resources/config/messenger.php | 3 ++ .../FrameworkExtensionTest.php | 6 ++- .../RejectRedeliveredMessageException.php | 19 +++++++ .../RejectRedeliveredMessageMiddleware.php | 50 +++++++++++++++++++ src/Symfony/Component/Messenger/Worker.php | 13 ++++- 6 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 src/Symfony/Component/Messenger/Exception/RejectRedeliveredMessageException.php create mode 100644 src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 69463b22b6301..d41e0a97b9569 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1803,6 +1803,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $defaultMiddleware = [ 'before' => [ ['id' => 'add_bus_name_stamp_middleware'], + ['id' => 'reject_redelivered_message_middleware'], ['id' => 'dispatch_after_current_bus'], ['id' => 'failed_message_processing_middleware'], ], diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index 173575bee07b1..a7d993d47e316 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -25,6 +25,7 @@ use Symfony\Component\Messenger\Middleware\DispatchAfterCurrentBusMiddleware; use Symfony\Component\Messenger\Middleware\FailedMessageProcessingMiddleware; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; +use Symfony\Component\Messenger\Middleware\RejectRedeliveredMessageMiddleware; use Symfony\Component\Messenger\Middleware\RouterContextMiddleware; use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Middleware\TraceableMiddleware; @@ -90,6 +91,8 @@ service('validator'), ]) + ->set('messenger.middleware.reject_redelivered_message_middleware', RejectRedeliveredMessageMiddleware::class) + ->set('messenger.middleware.failed_message_processing_middleware', FailedMessageProcessingMiddleware::class) ->set('messenger.middleware.traceable', TraceableMiddleware::class) diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index ef92c00cbd955..d6a3bf5ae995a 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -578,7 +578,7 @@ public function testNullSessionHandler() $expected = ['session', 'initialized_session', 'logger', 'session_collector']; $this->assertEquals($expected, array_keys($container->getDefinition('session_listener')->getArgument(0)->getValues())); - $this->assertFalse($container->getDefinition('session.storage.factory.native')->getArgument(3)); + $this->assertSame(false, $container->getDefinition('session.storage.factory.native')->getArgument(3)); } /** @@ -597,7 +597,7 @@ public function testNullSessionHandlerLegacy() $expected = ['session', 'initialized_session', 'logger', 'session_collector']; $this->assertEquals($expected, array_keys($container->getDefinition('session_listener')->getArgument(0)->getValues())); - $this->assertFalse($container->getDefinition('session.storage.factory.native')->getArgument(3)); + $this->assertSame(false, $container->getDefinition('session.storage.factory.native')->getArgument(3)); } public function testRequest() @@ -801,6 +801,7 @@ public function testMessengerWithMultipleBuses() $this->assertSame([], $container->getDefinition('messenger.bus.commands')->getArgument(0)); $this->assertEquals([ ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.commands']], + ['id' => 'reject_redelivered_message_middleware'], ['id' => 'dispatch_after_current_bus'], ['id' => 'failed_message_processing_middleware'], ['id' => 'send_message'], @@ -810,6 +811,7 @@ public function testMessengerWithMultipleBuses() $this->assertSame([], $container->getDefinition('messenger.bus.events')->getArgument(0)); $this->assertEquals([ ['id' => 'add_bus_name_stamp_middleware', 'arguments' => ['messenger.bus.events']], + ['id' => 'reject_redelivered_message_middleware'], ['id' => 'dispatch_after_current_bus'], ['id' => 'failed_message_processing_middleware'], ['id' => 'with_factory', 'arguments' => ['foo', true, ['bar' => 'baz']]], diff --git a/src/Symfony/Component/Messenger/Exception/RejectRedeliveredMessageException.php b/src/Symfony/Component/Messenger/Exception/RejectRedeliveredMessageException.php new file mode 100644 index 0000000000000..0befccf4a1d1f --- /dev/null +++ b/src/Symfony/Component/Messenger/Exception/RejectRedeliveredMessageException.php @@ -0,0 +1,19 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Exception; + +/** + * @author Tobias Schultze + */ +class RejectRedeliveredMessageException extends RuntimeException +{ +} diff --git a/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php b/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php new file mode 100644 index 0000000000000..9e994ddd1e01d --- /dev/null +++ b/src/Symfony/Component/Messenger/Middleware/RejectRedeliveredMessageMiddleware.php @@ -0,0 +1,50 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Messenger\Middleware; + +use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpReceivedStamp; +use Symfony\Component\Messenger\Envelope; +use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; +use Symfony\Component\Messenger\Transport\AmqpExt\AmqpReceivedStamp as LegacyAmqpReceivedStamp; + +/** + * Middleware that throws a RejectRedeliveredMessageException when a message is detected that has been redelivered by AMQP. + * + * The middleware runs before the HandleMessageMiddleware and prevents redelivered messages from being handled directly. + * The thrown exception is caught by the worker and will trigger the retry logic according to the retry strategy. + * + * AMQP redelivers messages when they do not get acknowledged or rejected. This can happen when the connection times out + * or an exception is thrown before acknowledging or rejecting. When such errors happen again while handling the + * redelivered message, the message would get redelivered again and again. The purpose of this middleware is to prevent + * infinite redelivery loops and to unblock the queue by republishing the redelivered messages as retries with a retry + * limit and potential delay. + * + * @author Tobias Schultze + */ +class RejectRedeliveredMessageMiddleware implements MiddlewareInterface +{ + public function handle(Envelope $envelope, StackInterface $stack): Envelope + { + $amqpReceivedStamp = $envelope->last(AmqpReceivedStamp::class); + if ($amqpReceivedStamp instanceof AmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) { + throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.'); + } + + // Legacy code to support symfony/messenger < 5.1 + $amqpReceivedStamp = $envelope->last(LegacyAmqpReceivedStamp::class); + if ($amqpReceivedStamp instanceof LegacyAmqpReceivedStamp && $amqpReceivedStamp->getAmqpEnvelope()->isRedelivery()) { + throw new RejectRedeliveredMessageException('Redelivered message from AMQP detected that will be rejected and trigger the retry logic.'); + } + + return $stack->next()->handle($envelope, $stack); + } +} diff --git a/src/Symfony/Component/Messenger/Worker.php b/src/Symfony/Component/Messenger/Worker.php index 64734dbe0a551..f13edcc2f5a05 100644 --- a/src/Symfony/Component/Messenger/Worker.php +++ b/src/Symfony/Component/Messenger/Worker.php @@ -21,6 +21,7 @@ use Symfony\Component\Messenger\Event\WorkerStartedEvent; use Symfony\Component\Messenger\Event\WorkerStoppedEvent; use Symfony\Component\Messenger\Exception\HandlerFailedException; +use Symfony\Component\Messenger\Exception\RejectRedeliveredMessageException; use Symfony\Component\Messenger\Exception\RuntimeException; use Symfony\Component\Messenger\Stamp\ConsumedByWorkerStamp; use Symfony\Component\Messenger\Stamp\ReceivedStamp; @@ -129,6 +130,13 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, try { $envelope = $this->bus->dispatch($envelope->with(new ReceivedStamp($transportName), new ConsumedByWorkerStamp())); } catch (\Throwable $throwable) { + $rejectFirst = $throwable instanceof RejectRedeliveredMessageException; + if ($rejectFirst) { + // redelivered messages are rejected first so that continuous failures in an event listener or while + // publishing for retry does not cause infinite redelivery loops + $receiver->reject($envelope); + } + if ($throwable instanceof HandlerFailedException) { $envelope = $throwable->getEnvelope(); } @@ -136,7 +144,10 @@ private function handleMessage(Envelope $envelope, ReceiverInterface $receiver, $failedEvent = new WorkerMessageFailedEvent($envelope, $transportName, $throwable); $this->dispatchEvent($failedEvent); $envelope = $failedEvent->getEnvelope(); - $receiver->reject($envelope); + + if (!$rejectFirst) { + $receiver->reject($envelope); + } return; } From 50b5183fa995898b5e4322fc8c6b9d6339f797fa Mon Sep 17 00:00:00 2001 From: theravel Date: Thu, 25 Feb 2021 00:41:26 +0100 Subject: [PATCH 3/3] Use RejectRedeliveredMessageMiddleware only in case failed transport is not configured. Otherwise redelivery can happen only when failed transport is not available and message cannot be lost --- .../DependencyInjection/FrameworkExtension.php | 13 +++++++------ .../php/messenger_redelivery_failure_disabled.php | 10 ++++++++++ .../php/messenger_redelivery_failure_enabled.php | 12 ++++++++++++ .../xml/messenger_redelivery_failure_disabled.xml | 13 +++++++++++++ .../xml/messenger_redelivery_failure_enabled.xml | 14 ++++++++++++++ .../yml/messenger_redelivery_failure_disabled.yml | 4 ++++ .../yml/messenger_redelivery_failure_enabled.yml | 6 ++++++ .../DependencyInjection/FrameworkExtensionTest.php | 12 ++++++++++++ 8 files changed, 78 insertions(+), 6 deletions(-) create mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_redelivery_failure_disabled.php create mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_redelivery_failure_enabled.php create mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_redelivery_failure_disabled.xml create mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_redelivery_failure_enabled.xml create mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_redelivery_failure_disabled.yml create mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_redelivery_failure_enabled.yml diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index d41e0a97b9569..97236fdaf5d8c 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1801,17 +1801,18 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder } $defaultMiddleware = [ - 'before' => [ - ['id' => 'add_bus_name_stamp_middleware'], - ['id' => 'reject_redelivered_message_middleware'], - ['id' => 'dispatch_after_current_bus'], - ['id' => 'failed_message_processing_middleware'], - ], + 'before' => [], 'after' => [ ['id' => 'send_message'], ['id' => 'handle_message'], ], ]; + $defaultMiddleware['before'][] = ['id' => 'add_bus_name_stamp_middleware']; + if (!$config['failure_transport']) { + $defaultMiddleware['before'][] = ['id' => 'reject_redelivered_message_middleware']; + } + $defaultMiddleware['before'][] = ['id' => 'dispatch_after_current_bus']; + $defaultMiddleware['before'][] = ['id' => 'failed_message_processing_middleware']; foreach ($config['buses'] as $busId => $bus) { $middleware = $bus['middleware']; diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_redelivery_failure_disabled.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_redelivery_failure_disabled.php new file mode 100644 index 0000000000000..7b4562510efd2 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_redelivery_failure_disabled.php @@ -0,0 +1,10 @@ +loadFromExtension('framework', [ + 'serializer' => true, + 'messenger' => [ + 'transports' => [ + 'example' => 'redis://127.0.0.1:6379/messages', + ], + ], +]); diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_redelivery_failure_enabled.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_redelivery_failure_enabled.php new file mode 100644 index 0000000000000..130a428c7b316 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_redelivery_failure_enabled.php @@ -0,0 +1,12 @@ +loadFromExtension('framework', [ + 'serializer' => true, + 'messenger' => [ + 'failure_transport' => 'failed', + 'transports' => [ + 'example' => 'redis://127.0.0.1:6379/messages', + 'failed' => 'in-memory:///', + ], + ], +]); diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_redelivery_failure_disabled.xml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_redelivery_failure_disabled.xml new file mode 100644 index 0000000000000..4f12a0bce5cf2 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_redelivery_failure_disabled.xml @@ -0,0 +1,13 @@ + + + + + + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_redelivery_failure_enabled.xml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_redelivery_failure_enabled.xml new file mode 100644 index 0000000000000..164e552529dd5 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_redelivery_failure_enabled.xml @@ -0,0 +1,14 @@ + + + + + + + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_redelivery_failure_disabled.yml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_redelivery_failure_disabled.yml new file mode 100644 index 0000000000000..c4eb7f1c2f4f2 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_redelivery_failure_disabled.yml @@ -0,0 +1,4 @@ +framework: + messenger: + transports: + example: 'redis://127.0.0.1:6379/messages' diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_redelivery_failure_enabled.yml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_redelivery_failure_enabled.yml new file mode 100644 index 0000000000000..538a684beaa1e --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_redelivery_failure_enabled.yml @@ -0,0 +1,6 @@ +framework: + messenger: + transports: + example: 'redis://127.0.0.1:6379/messages' + failed: 'in-memory:///' + failure_transport: failed diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index d6a3bf5ae995a..6d66ede78e00d 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -760,6 +760,18 @@ public function testMessengerTransports() $this->assertEquals(new Reference('messenger.transport.failed'), $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0)); } + public function testMessengerRedeliveryFailureTransportDisabled() + { + $container = $this->createContainerFromFile('messenger_redelivery_failure_disabled'); + $this->assertContains(['id' => 'reject_redelivered_message_middleware'], $container->getParameter('messenger.bus.default.middleware')); + } + + public function testMessengerRedeliveryFailureTransportEnabled() + { + $container = $this->createContainerFromFile('messenger_redelivery_failure_enabled'); + $this->assertNotContains(['id' => 'reject_redelivered_message_middleware'], $container->getParameter('messenger.bus.default.middleware')); + } + public function testMessengerRouting() { $container = $this->createContainerFromFile('messenger_routing');