From 9bf20ba8160094be3d5f804908bd8f535b2fbbfd Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Sat, 14 Dec 2019 08:49:29 +0000 Subject: [PATCH 01/15] =?UTF-8?q?[Messenger]=C2=A0Multiple=20failure=20tra?= =?UTF-8?q?nsports=20support?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../FrameworkExtension.php | 61 +++++++- .../Resources/config/schema/symfony-1.0.xsd | 1 + .../messenger_multiple_failure_transports.php | 19 +++ ...ger_multiple_failure_transports_global.php | 21 +++ .../messenger_multiple_failure_transports.xml | 17 +++ ...ger_multiple_failure_transports_global.xml | 18 +++ .../messenger_multiple_failure_transports.yml | 12 ++ ...ger_multiple_failure_transports_global.yml | 14 ++ .../FrameworkExtensionTest.php | 67 +++++++++ .../Command/AbstractFailedMessagesCommand.php | 24 ++- .../Command/FailedMessagesRemoveCommand.php | 4 +- .../Command/FailedMessagesRetryCommand.php | 38 ++--- .../Command/FailedMessagesShowCommand.php | 19 ++- ...ailedMessageToFailureTransportListener.php | 31 +++- .../FailedMessagesRemoveCommandTest.php | 29 +++- .../FailedMessagesRetryCommandTest.php | 37 ++++- .../Command/FailedMessagesShowCommandTest.php | 69 ++++++++- ...dMessageToFailureTransportListenerTest.php | 14 ++ .../Tests/FailureIntegrationTest.php | 142 ++++++++++++++++++ 19 files changed, 579 insertions(+), 58 deletions(-) create mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_multiple_failure_transports.php create mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_multiple_failure_transports_global.php create mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_multiple_failure_transports.xml create mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_multiple_failure_transports_global.xml create mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_multiple_failure_transports.yml create mode 100644 src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_multiple_failure_transports_global.yml diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index e0c3dd1b9f204..acc19ec1103e2 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1850,7 +1850,6 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder } $sendersServiceLocator = ServiceLocatorTagPass::register($container, $senderReferences); - $container->getDefinition('messenger.senders_locator') ->replaceArgument(0, $messageToSendersMapping) ->replaceArgument(1, $sendersServiceLocator) @@ -1863,24 +1862,74 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $container->getDefinition('messenger.retry_strategy_locator') ->replaceArgument(0, $transportRetryReferences); + $hasAnyFailureTransport = false; + + $failureTransports = []; + $failureTransportsServiceLocatorId = 'messenger.failure_transports.locator'; + $failureTransportsByName = []; + $failureTransportsByNameServiceLocatorId = 'messenger.failure_transports_by_name.locator'; + if ($config['failure_transport']) { if (!isset($senderReferences[$config['failure_transport']])) { throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport'])); } + $hasAnyFailureTransport = true; + $failureTransportRef = $senderReferences[$config['failure_transport']]; + $failureTransportsByName[$config['failure_transport']] = $failureTransportRef; + + $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') + ->replaceArgument(0, $failureTransportRef); $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') - ->replaceArgument(0, $senderReferences[$config['failure_transport']]); + ->replaceArgument(2, null); + } + + foreach ($config['transports'] as $name => $transport) { + if ($transport['failure_transport']) { + if (!isset($config['transports'][$transport['failure_transport']])) { + throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $transport['failure_transport'])); + } + + $hasAnyFailureTransport = true; + $failureTransports[$name] = $senderReferences[$transport['failure_transport']]; + $failureTransportsByName[$transport['failure_transport']] = $senderReferences[$transport['failure_transport']]; + } + } + + if ($hasAnyFailureTransport) { + $failureTransportsServiceLocator = ServiceLocatorTagPass::register($container, $failureTransports, $failureTransportsServiceLocatorId); + $container->getDefinition($failureTransportsServiceLocatorId) + ->replaceArgument(0, $failureTransports) + ->replaceArgument(1, $failureTransportsServiceLocator); + $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') + ->replaceArgument(0, $senderReferences[$config['failure_transport']] ?? null); + $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') + ->replaceArgument(2, \count($failureTransports) > 0 ? $failureTransportsServiceLocator : null); + + $failureTransportsByNameServiceLocator = ServiceLocatorTagPass::register($container, $failureTransportsByName, $failureTransportsByNameServiceLocatorId); + $container->getDefinition($failureTransportsByNameServiceLocatorId) + ->replaceArgument(0, $failureTransportsByName) + ->replaceArgument(1, $failureTransportsByNameServiceLocator); + $container->getDefinition('console.command.messenger_failed_messages_retry') - ->replaceArgument(0, $config['failure_transport']); + ->replaceArgument(0, $config['failure_transport'] ?? null) + ->replaceArgument(1, $failureTransportsByName[$config['failure_transport']] ?? null) + ->replaceArgument(5, $container->getDefinition($failureTransportsByNameServiceLocatorId)); + $container->getDefinition('console.command.messenger_failed_messages_show') - ->replaceArgument(0, $config['failure_transport']); + ->replaceArgument(0, $config['failure_transport'] ?? null) + ->replaceArgument(1, $failureTransportsByName[$config['failure_transport']] ?? null) + ->replaceArgument(2, $container->getDefinition($failureTransportsByNameServiceLocatorId)); + $container->getDefinition('console.command.messenger_failed_messages_remove') - ->replaceArgument(0, $config['failure_transport']); + ->replaceArgument(0, $config['failure_transport'] ?? null) + ->replaceArgument(1, $failureTransportsByName[$config['failure_transport']] ?? null) + ->replaceArgument(2, $container->getDefinition($failureTransportsByNameServiceLocatorId)); } else { - $container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener'); $container->removeDefinition('console.command.messenger_failed_messages_retry'); $container->removeDefinition('console.command.messenger_failed_messages_show'); $container->removeDefinition('console.command.messenger_failed_messages_remove'); + $container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener'); } } diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd b/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd index 49c1eb8b57dba..c9f6226348482 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd @@ -480,6 +480,7 @@ + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_multiple_failure_transports.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_multiple_failure_transports.php new file mode 100644 index 0000000000000..8f85259aa6908 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_multiple_failure_transports.php @@ -0,0 +1,19 @@ +loadFromExtension('framework', [ + 'messenger' => [ + 'transports' => [ + 'transport_1' => [ + 'dsn' => 'null://', + 'failure_transport' => 'failure_transport_1' + ], + 'transport_2' => 'null://', + 'transport_3' => [ + 'dsn' => 'null://', + 'failure_transport' => 'failure_transport_3' + ], + 'failure_transport_1' => 'null://', + 'failure_transport_3' => 'null://' + ], + ], +]); diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_multiple_failure_transports_global.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_multiple_failure_transports_global.php new file mode 100644 index 0000000000000..0cff76887b152 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_multiple_failure_transports_global.php @@ -0,0 +1,21 @@ +loadFromExtension('framework', [ + 'messenger' => [ + 'failure_transport' => 'failure_transport_global', + 'transports' => [ + 'transport_1' => [ + 'dsn' => 'null://', + 'failure_transport' => 'failure_transport_1' + ], + 'transport_2' => 'null://', + 'transport_3' => [ + 'dsn' => 'null://', + 'failure_transport' => 'failure_transport_3' + ], + 'failure_transport_global' => 'null://', + 'failure_transport_1' => 'null://', + 'failure_transport_3' => 'null://', + ], + ], +]); diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_multiple_failure_transports.xml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_multiple_failure_transports.xml new file mode 100644 index 0000000000000..b8e9f19759429 --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_multiple_failure_transports.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_multiple_failure_transports_global.xml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_multiple_failure_transports_global.xml new file mode 100644 index 0000000000000..c6e5c530fda1b --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_multiple_failure_transports_global.xml @@ -0,0 +1,18 @@ + + + + + + + + + + + + + + diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_multiple_failure_transports.yml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_multiple_failure_transports.yml new file mode 100644 index 0000000000000..863f18a7d1a1f --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_multiple_failure_transports.yml @@ -0,0 +1,12 @@ +framework: + messenger: + transports: + transport_1: + dsn: 'null://' + failure_transport: failure_transport_1 + transport_2: 'null://' + transport_3: + dsn: 'null://' + failure_transport: failure_transport_3 + failure_transport_1: 'null://' + failure_transport_3: 'null://' diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_multiple_failure_transports_global.yml b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_multiple_failure_transports_global.yml new file mode 100644 index 0000000000000..10023edb0b9fd --- /dev/null +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_multiple_failure_transports_global.yml @@ -0,0 +1,14 @@ +framework: + messenger: + failure_transport: failure_transport_global + transports: + transport_1: + dsn: 'null://' + failure_transport: failure_transport_1 + transport_2: 'null://' + transport_3: + dsn: 'null://' + failure_transport: failure_transport_3 + failure_transport_global: 'null://' + failure_transport_1: 'null://' + failure_transport_3: 'null://' diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index 146cf8ed4fe58..be07a0e3d1946 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -648,6 +648,73 @@ public function testMessenger() $this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass()); } + public function testMessengerMultipleFailureTransports() + { + $container = $this->createContainerFromFile('messenger_multiple_failure_transports'); + $failureTransportsLocatorDefinition = $container->getDefinition('messenger.failure_transports.locator'); + + /** @var Reference $failureTransportsMapping */ + $failureTransportsMapping = $failureTransportsLocatorDefinition->getArgument(0); + + // transport 2 exists but does not appear in the mapping + $expectedFailureTransportsMapping = [ + 'transport_1' => 'failure_transport_1', + 'transport_3' => 'failure_transport_3', + ]; + + $failedTransports = [ + 'failure_transport_1', + 'failure_transport_3', + ]; + + foreach ($failureTransportsMapping as $transportName => $ref) { + if (\in_array($transportName, $failedTransports)) { + continue; + } + + $this->assertSame('messenger.transport.'.$expectedFailureTransportsMapping[$transportName], (string) $ref, sprintf('The transport "%s" does not have the expected failed transport reference', $transportName)); + } + + $failedMessageTransportListenerReference = + $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener'); + $this->assertNull($failedMessageTransportListenerReference->getArgument(0)); + $this->assertSame($failureTransportsLocatorDefinition->getArgument(1), $failedMessageTransportListenerReference->getArgument(2)); + } + + public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport() + { + $container = $this->createContainerFromFile('messenger_multiple_failure_transports_global'); + $failureTransportsLocatorDefinition = $container->getDefinition('messenger.failure_transports.locator'); + + /** @var Reference $failureTransportsMapping */ + $failureTransportsMapping = $failureTransportsLocatorDefinition->getArgument(0); + + $expectedFailureTransportsMapping = [ + 'transport_1' => 'failure_transport_1', + 'transport_2' => 'failure_transport_global', + 'transport_3' => 'failure_transport_3', + ]; + + $failed_transports = [ + 'failure_transport_global', + 'failure_transport_1', + 'failure_transport_3', + ]; + + foreach ($failureTransportsMapping as $transportName => $ref) { + if (\in_array($transportName, $failed_transports)) { + continue; + } + + $this->assertSame('messenger.transport.'.$expectedFailureTransportsMapping[$transportName], (string) $ref, sprintf('The transport "%s" does not have the expected failed transport reference', $transportName)); + } + + $failedMessageTransportListenerReference = + $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener'); + $this->assertSame((string) (new Reference('messenger.transport.failure_transport_global')), (string) $failedMessageTransportListenerReference->getArgument(0)); + $this->assertSame($failureTransportsLocatorDefinition->getArgument(1), $failedMessageTransportListenerReference->getArgument(2)); + } + public function testMessengerTransports() { $container = $this->createContainerFromFile('messenger_transports'); diff --git a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php index a95eb1fd41763..c4a3cf6387b76 100644 --- a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php @@ -14,6 +14,7 @@ use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Helper\Dumper; use Symfony\Component\Console\Style\SymfonyStyle; +use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; @@ -30,18 +31,27 @@ abstract class AbstractFailedMessagesCommand extends Command { private $receiverName; private $receiver; + /** + * @var ServiceLocator + */ + private $failureTransports; - public function __construct(string $receiverName, ReceiverInterface $receiver) + public function __construct(?string $receiverName, ?ReceiverInterface $receiver, ServiceLocator $failureTransports) { $this->receiverName = $receiverName; $this->receiver = $receiver; + $this->failureTransports = $failureTransports; parent::__construct(); } - protected function getReceiverName(): string + protected function getReceiverName(?string $name): string { - return $this->receiverName; + if (null === $name) { + return $this->receiverName; + } + + return $name; } /** @@ -115,9 +125,13 @@ protected function printPendingMessagesMessage(ReceiverInterface $receiver, Symf } } - protected function getReceiver(): ReceiverInterface + protected function getReceiver(?string $name): ReceiverInterface { - return $this->receiver; + if (null === $name) { + return $this->receiver; + } + + return $this->failureTransports->get($name); } protected function getLastRedeliveryStampWithException(Envelope $envelope): ?RedeliveryStamp diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php index 951b7d499ed1b..5d2fc06ec5560 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php @@ -37,6 +37,7 @@ protected function configure(): void ->setDefinition([ new InputArgument('id', InputArgument::REQUIRED | InputArgument::IS_ARRAY, 'Specific message id(s) to remove'), new InputOption('force', null, InputOption::VALUE_NONE, 'Force the operation without confirmation'), + new InputOption('failed-transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failed transport'), new InputOption('show-messages', null, InputOption::VALUE_NONE, 'Display messages before removing it (if multiple ids are given)'), ]) ->setDescription('Remove given messages from the failure transport') @@ -58,7 +59,8 @@ protected function execute(InputInterface $input, OutputInterface $output) { $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); - $receiver = $this->getReceiver(); + $failedTransport = $input->getOption('failed-transport'); + $receiver = $this->getReceiver($failedTransport); $shouldForce = $input->getOption('force'); $ids = (array) $input->getArgument('id'); diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php index 87426edd9dbaa..3987cb202b1a4 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php @@ -19,6 +19,7 @@ use Symfony\Component\Console\Output\ConsoleOutputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; +use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; @@ -39,14 +40,15 @@ class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand private $eventDispatcher; private $messageBus; private $logger; + private $failedTransportsByName; - public function __construct(string $receiverName, ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null) + public function __construct(?string $receiverName, ?ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, ServiceLocator $failedTransportsByName) { $this->eventDispatcher = $eventDispatcher; $this->messageBus = $messageBus; $this->logger = $logger; - parent::__construct($receiverName, $receiver); + parent::__construct($receiverName, $receiver, $failedTransportsByName); } /** @@ -58,6 +60,7 @@ protected function configure(): void ->setDefinition([ new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'), new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'), + new InputOption('failed-transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failed transport'), ]) ->setDescription('Retries one or more messages from the failure transport') ->setHelp(<<<'EOF' @@ -95,10 +98,11 @@ protected function execute(InputInterface $input, OutputInterface $output) $io->comment('Re-run the command with a -vv option to see logs about consumed messages.'); } - $receiver = $this->getReceiver(); + $failedTransportName = $input->getOption('failed-transport'); + $receiver = $this->getReceiver($failedTransportName); $this->printPendingMessagesMessage($receiver, $io); - $io->writeln(sprintf('To retry all the messages, run messenger:consume %s', $this->getReceiverName())); + $io->writeln(sprintf('To retry all the messages, run messenger:consume %s', $this->getReceiverName($failedTransportName))); $shouldForce = $input->getOption('force'); $ids = $input->getArgument('id'); @@ -107,20 +111,20 @@ protected function execute(InputInterface $input, OutputInterface $output) throw new RuntimeException('Message id must be passed when in non-interactive mode.'); } - $this->runInteractive($io, $shouldForce); + $this->runInteractive($receiver, $io, $shouldForce); return 0; } - $this->retrySpecificIds($ids, $io, $shouldForce); + $this->retrySpecificIds($failedTransportName, $ids, $io, $shouldForce); $io->success('All done!'); return 0; } - private function runInteractive(SymfonyStyle $io, bool $shouldForce) + private function runInteractive(ReceiverInterface $failedTransport, SymfonyStyle $io, bool $shouldForce) { - $receiver = $this->getReceiver(); + $receiver = $this->getReceiver($failedTransport); $count = 0; if ($receiver instanceof ListableReceiverInterface) { // for listable receivers, find the messages one-by-one @@ -135,7 +139,7 @@ private function runInteractive(SymfonyStyle $io, bool $shouldForce) $id = $this->getMessageId($envelope); if (null === $id) { - throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.', $this->getReceiverName())); + throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.', $this->getReceiverName($failedTransport))); } $ids[] = $id; } @@ -145,11 +149,11 @@ private function runInteractive(SymfonyStyle $io, bool $shouldForce) break; } - $this->retrySpecificIds($ids, $io, $shouldForce); + $this->retrySpecificIds($failedTransport, $ids, $io, $shouldForce); } } else { // get() and ask messages one-by-one - $count = $this->runWorker($this->getReceiver(), $io, $shouldForce); + $count = $this->runWorker($failedTransport, $this->getReceiver($failedTransport), $io, $shouldForce); } // avoid success message if nothing was processed @@ -158,7 +162,7 @@ private function runInteractive(SymfonyStyle $io, bool $shouldForce) } } - private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int + private function runWorker(?string $failedTransport, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int { $count = 0; $listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce, &$count) { @@ -179,7 +183,7 @@ private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $ $this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener); $worker = new Worker( - [$this->getReceiverName() => $receiver], + [$this->getReceiverName($failedTransport) => $receiver], $this->messageBus, $this->eventDispatcher, $this->logger @@ -194,12 +198,12 @@ private function runWorker(ReceiverInterface $receiver, SymfonyStyle $io, bool $ return $count; } - private function retrySpecificIds(array $ids, SymfonyStyle $io, bool $shouldForce) + private function retrySpecificIds(?string $failedTransport, array $ids, SymfonyStyle $io, bool $shouldForce) { - $receiver = $this->getReceiver(); + $receiver = $this->getReceiver($failedTransport); if (!$receiver instanceof ListableReceiverInterface) { - throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $this->getReceiverName())); + throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $this->getReceiverName($failedTransport))); } foreach ($ids as $id) { @@ -209,7 +213,7 @@ private function retrySpecificIds(array $ids, SymfonyStyle $io, bool $shouldForc } $singleReceiver = new SingleMessageReceiver($receiver, $envelope); - $this->runWorker($singleReceiver, $io, $shouldForce); + $this->runWorker($failedTransport, $singleReceiver, $io, $shouldForce); } } } diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php index 0baf7a419f190..c694ccc5180bc 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php @@ -36,6 +36,7 @@ protected function configure(): void ->setDefinition([ new InputArgument('id', InputArgument::OPTIONAL, 'Specific message id to show'), new InputOption('max', null, InputOption::VALUE_REQUIRED, 'Maximum number of messages to list', 50), + new InputOption('failed-transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failed transport'), ]) ->setDescription('Shows one or more messages from the failure transport') ->setHelp(<<<'EOF' @@ -58,26 +59,28 @@ protected function execute(InputInterface $input, OutputInterface $output) { $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); - $receiver = $this->getReceiver(); + $failedTransport = $input->getOption('failed-transport'); + $receiver = $this->getReceiver($failedTransport); + $this->printPendingMessagesMessage($receiver, $io); if (!$receiver instanceof ListableReceiverInterface) { - throw new RuntimeException(sprintf('The "%s" receiver does not support listing or showing specific messages.', $this->getReceiverName())); + throw new RuntimeException(sprintf('The "%s" receiver does not support listing or showing specific messages.', $this->getReceiverName($failedTransport))); } if (null === $id = $input->getArgument('id')) { - $this->listMessages($io, $input->getOption('max')); + $this->listMessages($failedTransport, $io, $input->getOption('max')); } else { - $this->showMessage($id, $io); + $this->showMessage($failedTransport, $id, $io); } return 0; } - private function listMessages(SymfonyStyle $io, int $max) + private function listMessages(?string $failedTransport, SymfonyStyle $io, int $max) { /** @var ListableReceiverInterface $receiver */ - $receiver = $this->getReceiver(); + $receiver = $this->getReceiver($failedTransport); $envelopes = $receiver->all($max); $rows = []; @@ -107,10 +110,10 @@ private function listMessages(SymfonyStyle $io, int $max) $io->comment('Run messenger:failed:show {id} -vv to see message details.'); } - private function showMessage(string $id, SymfonyStyle $io) + private function showMessage(?string $failedTransport, string $id, SymfonyStyle $io) { /** @var ListableReceiverInterface $receiver */ - $receiver = $this->getReceiver(); + $receiver = $this->getReceiver($failedTransport); $envelope = $receiver->find($id); if (null === $envelope) { throw new RuntimeException(sprintf('The message "%s" was not found.', $id)); diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php index 8c84cf7992786..718ecfdfd1094 100644 --- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php @@ -11,6 +11,7 @@ namespace Symfony\Component\Messenger\EventListener; use Psr\Log\LoggerInterface; +use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\ErrorHandler\Exception\FlattenException; use Symfony\Component\EventDispatcher\EventSubscriberInterface; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; @@ -27,12 +28,17 @@ */ class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface { - private $failureSender; + private $failureSenders; private $logger; + /** + * @var SenderInterface|null global failure sender will be null if it is not defined + */ + private $globalfailureSender; - public function __construct(SenderInterface $failureSender, LoggerInterface $logger = null) + public function __construct(SenderInterface $globalfailureSender = null, LoggerInterface $logger = null, ServiceLocator $failureSenders = null) { - $this->failureSender = $failureSender; + $this->globalfailureSender = $globalfailureSender; + $this->failureSenders = $failureSenders; $this->logger = $logger; } @@ -42,6 +48,11 @@ public function onMessageFailed(WorkerMessageFailedEvent $event) return; } + $hasFailureTransports = $this->failureSenders instanceof ServiceLocator && $this->failureSenders->has($event->getReceiverName()); + if (null === $this->globalfailureSender && !$hasFailureTransports) { + return; + } + $envelope = $event->getEnvelope(); // avoid re-sending to the failed sender @@ -61,14 +72,15 @@ public function onMessageFailed(WorkerMessageFailedEvent $event) new RedeliveryStamp(0, $throwable->getMessage(), $flattenedException) ); + $failureSender = $this->getFailureSender($event->getReceiverName()); if (null !== $this->logger) { $this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [ 'class' => \get_class($envelope->getMessage()), - 'transport' => \get_class($this->failureSender), + 'transport' => \get_class($failureSender), ]); } - $this->failureSender->send($envelope); + $failureSender->send($envelope); } public static function getSubscribedEvents() @@ -77,4 +89,13 @@ public static function getSubscribedEvents() WorkerMessageFailedEvent::class => ['onMessageFailed', -100], ]; } + + private function getFailureSender(string $receiverName): SenderInterface + { + if ($this->failureSenders instanceof ServiceLocator && $this->failureSenders->has($receiverName)) { + return $this->failureSenders->get($receiverName); + } + + return $this->globalfailureSender; + } } diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php index ba5e9cff00879..651b5eb4a2e9d 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php @@ -13,24 +13,31 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Console\Tester\CommandTester; +use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\Messenger\Command\FailedMessagesRemoveCommand; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Transport\Receiver\ListableReceiverInterface; class FailedMessagesRemoveCommandTest extends TestCase { - public function testRemoveSingleMessage() + public function testRemoveUniqueMessageSpecificFailedTransport() { + $messageId = 20; + $anotherFailedTransport = 'another_failure_receiver'; + $receiver = $this->createMock(ListableReceiverInterface::class); - $receiver->expects($this->once())->method('find')->with(20)->willReturn(new Envelope(new \stdClass())); + $receiver->expects($this->once())->method('find')->with($messageId)->willReturn(new Envelope(new \stdClass())); + $serviceLocator = $this->createMock(ServiceLocator::class); + $serviceLocator->expects($this->once())->method('get')->with($anotherFailedTransport)->willReturn($receiver); $command = new FailedMessagesRemoveCommand( - 'failure_receiver', - $receiver + null, + null, + $serviceLocator ); $tester = new CommandTester($command); - $tester->execute(['id' => 20, '--force' => true]); + $tester->execute(['id' => [$messageId], '--failed-transport' => $anotherFailedTransport, '--force' => true]); $this->assertStringContainsString('Failed Message Details', $tester->getDisplay()); $this->assertStringContainsString('Message with id 20 removed.', $tester->getDisplay()); @@ -40,10 +47,12 @@ public function testRemoveUniqueMessage() { $receiver = $this->createMock(ListableReceiverInterface::class); $receiver->expects($this->once())->method('find')->with(20)->willReturn(new Envelope(new \stdClass())); + $serviceLocator = $this->createMock(ServiceLocator::class); $command = new FailedMessagesRemoveCommand( 'failure_receiver', - $receiver + $receiver, + $serviceLocator ); $tester = new CommandTester($command); @@ -61,10 +70,12 @@ public function testRemoveMultipleMessages() null, new Envelope(new \stdClass()) ); + $serviceLocator = $this->createMock(ServiceLocator::class); $command = new FailedMessagesRemoveCommand( 'failure_receiver', - $receiver + $receiver, + $serviceLocator ); $tester = new CommandTester($command); @@ -83,10 +94,12 @@ public function testRemoveMultipleMessagesAndDisplayMessages() new Envelope(new \stdClass()), new Envelope(new \stdClass()) ); + $serviceLocator = $this->createMock(ServiceLocator::class); $command = new FailedMessagesRemoveCommand( 'failure_receiver', - $receiver + $receiver, + $serviceLocator ); $tester = new CommandTester($command); diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php index 901f70e1e1d69..557609913b3ba 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php @@ -12,7 +12,9 @@ namespace Symfony\Component\Messenger\Tests\Command; use PHPUnit\Framework\TestCase; +use Psr\Log\NullLogger; use Symfony\Component\Console\Tester\CommandTester; +use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand; use Symfony\Component\Messenger\Envelope; @@ -32,12 +34,15 @@ public function testBasicRun() $bus = $this->createMock(MessageBusInterface::class); // the bus should be called in the worker $bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass())); + $serviceLocator = $this->createMock(ServiceLocator::class); $command = new FailedMessagesRetryCommand( 'failure_receiver', $receiver, $bus, - $dispatcher + $dispatcher, + new NullLogger(), + $serviceLocator ); $tester = new CommandTester($command); @@ -45,4 +50,34 @@ public function testBasicRun() $this->assertStringContainsString('[OK]', $tester->getDisplay()); } + + public function testBasicRunWithSpecificFailedTransport() + { + $receiver = $this->createMock(ListableReceiverInterface::class); + $receiver->expects($this->exactly(2))->method('find')->withConsecutive([10], [12])->willReturn(new Envelope(new \stdClass())); + // message will eventually be ack'ed in Worker + $receiver->expects($this->exactly(2))->method('ack'); + + $dispatcher = new EventDispatcher(); + $bus = $this->createMock(MessageBusInterface::class); + // the bus should be called in the worker + $bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass())); + $failedTransportName = 'failure_receiver'; + $serviceLocator = $this->createMock(ServiceLocator::class); + $serviceLocator->method('get')->with($failedTransportName)->willReturn($receiver); + + $command = new FailedMessagesRetryCommand( + null, + null, + $bus, + $dispatcher, + new NullLogger(), + $serviceLocator + ); + + $tester = new CommandTester($command); + $tester->execute(['id' => [10, 12], '--failed-transport' => $failedTransportName, '--force' => true]); + + $this->assertStringContainsString('[OK]', $tester->getDisplay()); + } } diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php index e6d107eeaf1d1..f6baba85137fd 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php @@ -13,6 +13,7 @@ use PHPUnit\Framework\TestCase; use Symfony\Component\Console\Tester\CommandTester; +use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\Messenger\Command\FailedMessagesShowCommand; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Stamp\RedeliveryStamp; @@ -26,6 +27,45 @@ */ class FailedMessagesShowCommandTest extends TestCase { + public function testFailedServiceLocator() + { + $sentToFailureStamp = new SentToFailureTransportStamp('async'); + $redeliveryStamp = new RedeliveryStamp(0, 'Things are bad!'); + $envelope = new Envelope(new \stdClass(), [ + new TransportMessageIdStamp(15), + $sentToFailureStamp, + $redeliveryStamp, + ]); + + $receiverName = 'failure_receiver'; + $receiver = $this->createMock(ListableReceiverInterface::class); + $receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope); + $serviceLocator = $this->createMock(ServiceLocator::class); + $serviceLocator->method('get')->with($receiverName)->willReturn($receiver); + + $command = new FailedMessagesShowCommand( + null, + null, + $serviceLocator + ); + + $tester = new CommandTester($command); + $tester->execute(['id' => 15, '--failed-transport' => $receiverName]); + + $this->assertStringContainsString(sprintf(<<getRedeliveredAt()->format('Y-m-d H:i:s')), + $tester->getDisplay(true)); + } + public function testBasicRun() { $sentToFailureStamp = new SentToFailureTransportStamp('async'); @@ -37,10 +77,12 @@ public function testBasicRun() ]); $receiver = $this->createMock(ListableReceiverInterface::class); $receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope); + $serviceLocator = $this->createMock(ServiceLocator::class); $command = new FailedMessagesShowCommand( 'failure_receiver', - $receiver + $receiver, + $serviceLocator ); $tester = new CommandTester($command); @@ -73,10 +115,12 @@ public function testMultipleRedeliveryFails() ]); $receiver = $this->createMock(ListableReceiverInterface::class); $receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope); + $serviceLocator = $this->createMock(ServiceLocator::class); $command = new FailedMessagesShowCommand( 'failure_receiver', - $receiver + $receiver, + $serviceLocator ); $tester = new CommandTester($command); @@ -99,9 +143,12 @@ public function testMultipleRedeliveryFails() public function testReceiverShouldBeListable() { $receiver = $this->createMock(ReceiverInterface::class); + $serviceLocator = $this->createMock(ServiceLocator::class); + $command = new FailedMessagesShowCommand( 'failure_receiver', - $receiver + $receiver, + $serviceLocator ); $this->expectExceptionMessage('The "failure_receiver" receiver does not support listing or showing specific messages.'); @@ -121,10 +168,12 @@ public function testListMessages() ]); $receiver = $this->createMock(ListableReceiverInterface::class); $receiver->expects($this->once())->method('all')->with()->willReturn([$envelope]); + $serviceLocator = $this->createMock(ServiceLocator::class); $command = new FailedMessagesShowCommand( 'failure_receiver', - $receiver + $receiver, + $serviceLocator ); $tester = new CommandTester($command); @@ -141,10 +190,12 @@ public function testListMessagesReturnsNoMessagesFound() { $receiver = $this->createMock(ListableReceiverInterface::class); $receiver->expects($this->once())->method('all')->with()->willReturn([]); + $serviceLocator = $this->createMock(ServiceLocator::class); $command = new FailedMessagesShowCommand( 'failure_receiver', - $receiver + $receiver, + $serviceLocator ); $tester = new CommandTester($command); @@ -162,10 +213,12 @@ public function testListMessagesReturnsPaginatedMessages() ]); $receiver = $this->createMock(ListableReceiverInterface::class); $receiver->expects($this->once())->method('all')->with()->willReturn([$envelope]); + $serviceLocator = $this->createMock(ServiceLocator::class); $command = new FailedMessagesShowCommand( 'failure_receiver', - $receiver + $receiver, + $serviceLocator ); $tester = new CommandTester($command); @@ -181,10 +234,12 @@ public function testInvalidMessagesThrowsException() $sentToFailureStamp, ]); $receiver = $this->createMock(ListableReceiverInterface::class); + $serviceLocator = $this->createMock(ServiceLocator::class); $command = new FailedMessagesShowCommand( 'failure_receiver', - $receiver + $receiver, + $serviceLocator ); $this->expectExceptionMessage('The message "15" was not found.'); diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php index 1f648b83e1e35..4ac662cbbb082 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php @@ -22,6 +22,20 @@ class SendFailedMessageToFailureTransportListenerTest extends TestCase { + public function testDoNothingIfFailureTransportIsNotDefined() + { + $sender = $this->createMock(SenderInterface::class); + $sender->expects($this->never())->method('send'); + + $listener = new SendFailedMessageToFailureTransportListener(null); + + $exception = new \Exception('no!'); + $envelope = new Envelope(new \stdClass()); + $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); + + $listener->onMessageFailed($event); + } + public function testItSendsToTheFailureTransport() { $sender = $this->createMock(SenderInterface::class); diff --git a/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php b/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php index 3ea7602d5237d..fb0c349d4d189 100644 --- a/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php +++ b/src/Symfony/Component/Messenger/Tests/FailureIntegrationTest.php @@ -13,6 +13,7 @@ use PHPUnit\Framework\TestCase; use Psr\Container\ContainerInterface; +use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\EventDispatcher\EventDispatcher; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; @@ -214,6 +215,147 @@ public function testRequeMechanism() // the failure transport is empty because it worked $this->assertEmpty($failureTransport->getMessagesWaitingToBeReceived()); } + + public function testMultipleFailedTransportsWithoutGlobalFailureTransport() + { + $transport1 = new DummyFailureTestSenderAndReceiver(); + $transport2 = new DummyFailureTestSenderAndReceiver(); + $failureTransport1 = new DummyFailureTestSenderAndReceiver(); + $failureTransport2 = new DummyFailureTestSenderAndReceiver(); + + $transports = [ + 'transport1' => $transport1, + 'transport2' => $transport2, + 'the_failure_transport1' => $failureTransport1, + 'the_failure_transport2' => $failureTransport2, + ]; + + $locator = $this->createMock(ContainerInterface::class); + $locator->expects($this->any()) + ->method('has') + ->willReturn(true); + $locator->expects($this->any()) + ->method('get') + ->willReturnCallback(function ($transportName) use ($transports) { + return $transports[$transportName]; + }); + $senderLocator = new SendersLocator( + [DummyMessage::class => ['transport1', 'transport2']], + $locator + ); + + // retry strategy with zero retries so it goes to the failed transport after failure + $retryStrategyLocator = $this->createMock(ContainerInterface::class); + $retryStrategyLocator->expects($this->any()) + ->method('has') + ->willReturn(true); + $retryStrategyLocator->expects($this->any()) + ->method('get') + ->willReturn(new MultiplierRetryStrategy(0)); + + // using to so we can lazily get the bus later and avoid circular problem + $transport1HandlerThatFails = new DummyTestHandler(true); + $transport2HandlerThatFails = new DummyTestHandler(true); + $handlerLocator = new HandlersLocator([ + DummyMessage::class => [ + new HandlerDescriptor($transport1HandlerThatFails, [ + 'from_transport' => 'transport1', + ]), + new HandlerDescriptor($transport2HandlerThatFails, [ + 'from_transport' => 'transport2', + ]), + ], + ]); + + $dispatcher = new EventDispatcher(); + $bus = new MessageBus([ + new FailedMessageProcessingMiddleware(), + new SendMessageMiddleware($senderLocator), + new HandleMessageMiddleware($handlerLocator), + ]); + + $failureSendersLocator = $this->createMock(ServiceLocator::class); + $failureSendersLocator->expects($this->exactly(2)) + ->method('get') + ->will($this->onConsecutiveCalls( + $failureTransport1, + $failureTransport2 + )); + $failureSendersLocator + ->method('has') + ->willReturn(true); + + $dispatcher->addSubscriber(new SendFailedMessageForRetryListener($locator, $retryStrategyLocator)); + $dispatcher->addSubscriber(new SendFailedMessageToFailureTransportListener( + null, + null, + $failureSendersLocator + )); + $dispatcher->addSubscriber(new StopWorkerOnMessageLimitListener(1)); + + $runWorker = function (string $transportName) use ($transports, $bus, $dispatcher): ?\Throwable { + $throwable = null; + $failedListener = function (WorkerMessageFailedEvent $event) use (&$throwable) { + $throwable = $event->getThrowable(); + }; + $dispatcher->addListener(WorkerMessageFailedEvent::class, $failedListener); + + $worker = new Worker([$transportName => $transports[$transportName]], $bus, $dispatcher); + + $worker->run(); + + $dispatcher->removeListener(WorkerMessageFailedEvent::class, $failedListener); + + return $throwable; + }; + + // send the message + $envelope = new Envelope(new DummyMessage('API')); + $bus->dispatch($envelope); + + // message has been sent + $this->assertCount(1, $transport1->getMessagesWaitingToBeReceived()); + $this->assertCount(1, $transport2->getMessagesWaitingToBeReceived()); + $this->assertCount(0, $failureTransport1->getMessagesWaitingToBeReceived()); + $this->assertCount(0, $failureTransport2->getMessagesWaitingToBeReceived()); + + // Receive the message from "transport1" + $throwable = $runWorker('transport1'); + $this->assertInstanceOf(HandlerFailedException::class, $throwable); + // handler for transport1 is called + $this->assertSame(1, $transport1HandlerThatFails->getTimesCalled()); + $this->assertSame(0, $transport2HandlerThatFails->getTimesCalled()); + // one handler failed and the message is sent to the failed transport of transport1 + $this->assertCount(1, $failureTransport1->getMessagesWaitingToBeReceived()); + $this->assertCount(0, $failureTransport2->getMessagesWaitingToBeReceived()); + + // consume the failure message failed on "transport1" + $runWorker('the_failure_transport1'); + // "transport1" handler is called again from the "the_failed_transport1" and it fails + $this->assertSame(2, $transport1HandlerThatFails->getTimesCalled()); + $this->assertSame(0, $transport2HandlerThatFails->getTimesCalled()); + $this->assertCount(0, $failureTransport1->getMessagesWaitingToBeReceived()); + $this->assertCount(0, $failureTransport2->getMessagesWaitingToBeReceived()); + + // Receive the message from "transport2" + $throwable = $runWorker('transport2'); + $this->assertInstanceOf(HandlerFailedException::class, $throwable); + $this->assertSame(2, $transport1HandlerThatFails->getTimesCalled()); + // handler for "transport2" is called + $this->assertSame(1, $transport2HandlerThatFails->getTimesCalled()); + $this->assertCount(0, $failureTransport1->getMessagesWaitingToBeReceived()); + // the failure transport "the_failure_transport2" has 1 new message failed from "transport2" + $this->assertCount(1, $failureTransport2->getMessagesWaitingToBeReceived()); + + // Consume the failure message failed on "transport2" + $runWorker('the_failure_transport2'); + $this->assertSame(2, $transport1HandlerThatFails->getTimesCalled()); + // "transport2" handler is called again from the "the_failed_transport2" and it fails + $this->assertSame(2, $transport2HandlerThatFails->getTimesCalled()); + $this->assertCount(0, $failureTransport1->getMessagesWaitingToBeReceived()); + // After the message fails again, the message is discarded from the "the_failure_transport2" + $this->assertCount(0, $failureTransport2->getMessagesWaitingToBeReceived()); + } } class DummyFailureTestSenderAndReceiver implements ReceiverInterface, SenderInterface From b8591723c28e6c8d45015d3bf6886ed39f4c78f0 Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Sat, 19 Sep 2020 15:41:31 +0100 Subject: [PATCH 02/15] improved solution using one argument with 2 types in listener --- .../FrameworkExtension.php | 60 ++++++------------- .../Resources/config/messenger.php | 8 +++ .../FrameworkExtensionTest.php | 28 ++++----- .../Command/AbstractFailedMessagesCommand.php | 30 ++++------ .../Command/FailedMessagesRemoveCommand.php | 16 +++-- .../Command/FailedMessagesRetryCommand.php | 45 +++++++------- .../Command/FailedMessagesShowCommand.php | 24 ++++---- ...ailedMessageToFailureTransportListener.php | 36 ++++++----- .../FailedMessagesRemoveCommandTest.php | 22 +++---- .../FailedMessagesRetryCommandTest.php | 7 +-- .../Command/FailedMessagesShowCommandTest.php | 20 +++---- 11 files changed, 147 insertions(+), 149 deletions(-) diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index acc19ec1103e2..05987f53580c8 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1861,27 +1861,15 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $container->getDefinition('messenger.retry_strategy_locator') ->replaceArgument(0, $transportRetryReferences); - - $hasAnyFailureTransport = false; - + $failureTransports = []; - $failureTransportsServiceLocatorId = 'messenger.failure_transports.locator'; - $failureTransportsByName = []; - $failureTransportsByNameServiceLocatorId = 'messenger.failure_transports_by_name.locator'; if ($config['failure_transport']) { if (!isset($senderReferences[$config['failure_transport']])) { throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport'])); } - - $hasAnyFailureTransport = true; - $failureTransportRef = $senderReferences[$config['failure_transport']]; - $failureTransportsByName[$config['failure_transport']] = $failureTransportRef; - - $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') - ->replaceArgument(0, $failureTransportRef); - $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') - ->replaceArgument(2, null); + + $failureTransports[$config['failure_transport']] = $senderReferences[$config['failure_transport']]; } foreach ($config['transports'] as $name => $transport) { @@ -1890,41 +1878,31 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $transport['failure_transport'])); } - $hasAnyFailureTransport = true; - $failureTransports[$name] = $senderReferences[$transport['failure_transport']]; - $failureTransportsByName[$transport['failure_transport']] = $senderReferences[$transport['failure_transport']]; + $failureTransports[$transport['failure_transport']] = $senderReferences[$transport['failure_transport']]; } } - if ($hasAnyFailureTransport) { + $failureTransportsServiceLocatorId = 'messenger.failure_transports.locator'; + if (\count($failureTransports) > 0) { $failureTransportsServiceLocator = ServiceLocatorTagPass::register($container, $failureTransports, $failureTransportsServiceLocatorId); + $container->getDefinition($failureTransportsServiceLocatorId) - ->replaceArgument(0, $failureTransports) - ->replaceArgument(1, $failureTransportsServiceLocator); - $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') - ->replaceArgument(0, $senderReferences[$config['failure_transport']] ?? null); + ->replaceArgument(0, $failureTransports); + $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') - ->replaceArgument(2, \count($failureTransports) > 0 ? $failureTransportsServiceLocator : null); - - $failureTransportsByNameServiceLocator = ServiceLocatorTagPass::register($container, $failureTransportsByName, $failureTransportsByNameServiceLocatorId); - $container->getDefinition($failureTransportsByNameServiceLocatorId) - ->replaceArgument(0, $failureTransportsByName) - ->replaceArgument(1, $failureTransportsByNameServiceLocator); - + ->replaceArgument(0, $failureTransportsServiceLocator); + + $globalFailureReceiver = $config['failure_transport'] ?? null; $container->getDefinition('console.command.messenger_failed_messages_retry') - ->replaceArgument(0, $config['failure_transport'] ?? null) - ->replaceArgument(1, $failureTransportsByName[$config['failure_transport']] ?? null) - ->replaceArgument(5, $container->getDefinition($failureTransportsByNameServiceLocatorId)); - + ->replaceArgument(0, $globalFailureReceiver) + ->replaceArgument(1, $container->getDefinition($failureTransportsServiceLocator)); $container->getDefinition('console.command.messenger_failed_messages_show') - ->replaceArgument(0, $config['failure_transport'] ?? null) - ->replaceArgument(1, $failureTransportsByName[$config['failure_transport']] ?? null) - ->replaceArgument(2, $container->getDefinition($failureTransportsByNameServiceLocatorId)); - + ->replaceArgument(0, $globalFailureReceiver) + ->replaceArgument(1, $container->getDefinition($failureTransportsServiceLocatorId)); $container->getDefinition('console.command.messenger_failed_messages_remove') - ->replaceArgument(0, $config['failure_transport'] ?? null) - ->replaceArgument(1, $failureTransportsByName[$config['failure_transport']] ?? null) - ->replaceArgument(2, $container->getDefinition($failureTransportsByNameServiceLocatorId)); + ->replaceArgument(0, $globalFailureReceiver) + ->replaceArgument(1, $container->getDefinition($failureTransportsServiceLocatorId)); + } else { $container->removeDefinition('console.command.messenger_failed_messages_retry'); $container->removeDefinition('console.command.messenger_failed_messages_show'); diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index 3d99912972271..94927900204b1 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -11,6 +11,7 @@ namespace Symfony\Component\DependencyInjection\Loader\Configurator; +use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransportFactory; use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory; use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransportFactory; @@ -130,6 +131,13 @@ ->set('messenger.transport.beanstalkd.factory', BeanstalkdTransportFactory::class) + // failed transports + ->set('messenger.failure_transports.locator', ServiceLocator::class) + ->args([ + abstract_arg('failed transports map by name'), + ]) + ->tag('container.service_locator') + // retry ->set('messenger.retry_strategy_locator') ->args([ diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index be07a0e3d1946..702da37ceb9bc 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -40,6 +40,7 @@ use Symfony\Component\DependencyInjection\Loader\ClosureLoader; use Symfony\Component\DependencyInjection\ParameterBag\EnvPlaceholderParameterBag; use Symfony\Component\DependencyInjection\Reference; +use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\HttpClient\MockHttpClient; use Symfony\Component\HttpClient\RetryableHttpClient; @@ -671,21 +672,19 @@ public function testMessengerMultipleFailureTransports() if (\in_array($transportName, $failedTransports)) { continue; } - + $this->assertSame('messenger.transport.'.$expectedFailureTransportsMapping[$transportName], (string) $ref, sprintf('The transport "%s" does not have the expected failed transport reference', $transportName)); + } - - $failedMessageTransportListenerReference = - $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener'); - $this->assertNull($failedMessageTransportListenerReference->getArgument(0)); - $this->assertSame($failureTransportsLocatorDefinition->getArgument(1), $failedMessageTransportListenerReference->getArgument(2)); + + $this->assertTrue(true); } public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport() { $container = $this->createContainerFromFile('messenger_multiple_failure_transports_global'); $failureTransportsLocatorDefinition = $container->getDefinition('messenger.failure_transports.locator'); - + /** @var Reference $failureTransportsMapping */ $failureTransportsMapping = $failureTransportsLocatorDefinition->getArgument(0); @@ -708,11 +707,8 @@ public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport $this->assertSame('messenger.transport.'.$expectedFailureTransportsMapping[$transportName], (string) $ref, sprintf('The transport "%s" does not have the expected failed transport reference', $transportName)); } - - $failedMessageTransportListenerReference = - $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener'); - $this->assertSame((string) (new Reference('messenger.transport.failure_transport_global')), (string) $failedMessageTransportListenerReference->getArgument(0)); - $this->assertSame($failureTransportsLocatorDefinition->getArgument(1), $failedMessageTransportListenerReference->getArgument(2)); + + $this->assertTrue(true); } public function testMessengerTransports() @@ -760,8 +756,12 @@ public function testMessengerTransports() $this->assertSame(7, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(1)); $this->assertSame(3, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(2)); $this->assertSame(100, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(3)); - - $this->assertEquals(new Reference('messenger.transport.failed'), $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0)); + + $failureTransportsLocator = $container->getDefinition('messenger.failure_transports.locator'); + $expectedFailureTransports = [ + 'failed' => new Reference('messenger.transport.failed') + ]; + $this->assertEquals($expectedFailureTransports, $failureTransportsLocator->getArgument(0)); } public function testMessengerRouting() diff --git a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php index c4a3cf6387b76..051ae6ffcf8dd 100644 --- a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php @@ -29,29 +29,25 @@ */ abstract class AbstractFailedMessagesCommand extends Command { - private $receiverName; - private $receiver; - /** - * @var ServiceLocator - */ + private $globalFailureReceiverName; private $failureTransports; - public function __construct(?string $receiverName, ?ReceiverInterface $receiver, ServiceLocator $failureTransports) + public function __construct(?string $globalFailureReceiverName, $failureTransports) { - $this->receiverName = $receiverName; - $this->receiver = $receiver; $this->failureTransports = $failureTransports; + $this->globalFailureReceiverName = $globalFailureReceiverName; + if (!$failureTransports instanceof ServiceLocator) { + trigger_deprecation('symfony/messenger', '5.2', 'Passing failureTransports should now pass a ServiceLocator', __METHOD__); + $this->failureTransports = new ServiceLocator([$this->globalFailureReceiverName => function() use ($failureTransports) { return $failureTransports; }]); + } + parent::__construct(); } - protected function getReceiverName(?string $name): string + protected function getGlobalFailureReceiverName(): ?string { - if (null === $name) { - return $this->receiverName; - } - - return $name; + return $this->globalFailureReceiverName; } /** @@ -125,12 +121,8 @@ protected function printPendingMessagesMessage(ReceiverInterface $receiver, Symf } } - protected function getReceiver(?string $name): ReceiverInterface + protected function getReceiver(string $name): ReceiverInterface { - if (null === $name) { - return $this->receiver; - } - return $this->failureTransports->get($name); } diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php index 5d2fc06ec5560..8952624aed833 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php @@ -37,7 +37,7 @@ protected function configure(): void ->setDefinition([ new InputArgument('id', InputArgument::REQUIRED | InputArgument::IS_ARRAY, 'Specific message id(s) to remove'), new InputOption('force', null, InputOption::VALUE_NONE, 'Force the operation without confirmation'), - new InputOption('failed-transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failed transport'), + new InputOption('failure-transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport'), new InputOption('show-messages', null, InputOption::VALUE_NONE, 'Display messages before removing it (if multiple ids are given)'), ]) ->setDescription('Remove given messages from the failure transport') @@ -59,21 +59,25 @@ protected function execute(InputInterface $input, OutputInterface $output) { $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); - $failedTransport = $input->getOption('failed-transport'); - $receiver = $this->getReceiver($failedTransport); + $failureTransportName = $input->getOption('failure-transport'); + if ($failureTransportName === null) { + $failureTransportName = $this->getGlobalFailureReceiverName(); + } + + $receiver = $this->getReceiver($failureTransportName); $shouldForce = $input->getOption('force'); $ids = (array) $input->getArgument('id'); $shouldDisplayMessages = $input->getOption('show-messages') || 1 === \count($ids); - $this->removeMessages($ids, $receiver, $io, $shouldForce, $shouldDisplayMessages); + $this->removeMessages($failureTransportName, $ids, $receiver, $io, $shouldForce, $shouldDisplayMessages); return 0; } - private function removeMessages(array $ids, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce, bool $shouldDisplayMessages): void + private function removeMessages(string $failureTransportName, array $ids, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce, bool $shouldDisplayMessages): void { if (!$receiver instanceof ListableReceiverInterface) { - throw new RuntimeException(sprintf('The "%s" receiver does not support removing specific messages.', $this->getReceiverName())); + throw new RuntimeException(sprintf('The "%s" receiver does not support removing specific messages.', $failureTransportName)); } foreach ($ids as $id) { diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php index 3987cb202b1a4..cfa4b51a20a18 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php @@ -36,19 +36,20 @@ class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand { protected static $defaultName = 'messenger:failed:retry'; - + private $eventDispatcher; private $messageBus; private $logger; - private $failedTransportsByName; + private $globalReceiverName; - public function __construct(?string $receiverName, ?ReceiverInterface $receiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, ServiceLocator $failedTransportsByName) + public function __construct(?string $globalReceiverName, ?ReceiverInterface $globalReceiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, ?ServiceLocator $failureTransports = null) { $this->eventDispatcher = $eventDispatcher; $this->messageBus = $messageBus; $this->logger = $logger; - - parent::__construct($receiverName, $receiver, $failedTransportsByName); + $this->globalReceiverName = $globalReceiverName; + + parent::__construct($globalReceiverName, $failureTransports === null ? $globalReceiver : $failureTransports); } /** @@ -60,7 +61,7 @@ protected function configure(): void ->setDefinition([ new InputArgument('id', InputArgument::IS_ARRAY, 'Specific message id(s) to retry'), new InputOption('force', null, InputOption::VALUE_NONE, 'Force action without confirmation'), - new InputOption('failed-transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failed transport'), + new InputOption('failure-transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport'), ]) ->setDescription('Retries one or more messages from the failure transport') ->setHelp(<<<'EOF' @@ -98,11 +99,15 @@ protected function execute(InputInterface $input, OutputInterface $output) $io->comment('Re-run the command with a -vv option to see logs about consumed messages.'); } - $failedTransportName = $input->getOption('failed-transport'); - $receiver = $this->getReceiver($failedTransportName); + $failureTransportName = $input->getOption('failure-transport'); + if ($failureTransportName === null) { + $failureTransportName = $this->getGlobalFailureReceiverName(); + } + + $receiver = $this->getReceiver($failureTransportName); $this->printPendingMessagesMessage($receiver, $io); - $io->writeln(sprintf('To retry all the messages, run messenger:consume %s', $this->getReceiverName($failedTransportName))); + $io->writeln(sprintf('To retry all the messages, run messenger:consume %s', $failureTransportName)); $shouldForce = $input->getOption('force'); $ids = $input->getArgument('id'); @@ -111,18 +116,18 @@ protected function execute(InputInterface $input, OutputInterface $output) throw new RuntimeException('Message id must be passed when in non-interactive mode.'); } - $this->runInteractive($receiver, $io, $shouldForce); + $this->runInteractive($failureTransportName, $receiver, $io, $shouldForce); return 0; } - $this->retrySpecificIds($failedTransportName, $ids, $io, $shouldForce); + $this->retrySpecificIds($failureTransportName, $ids, $io, $shouldForce); $io->success('All done!'); return 0; } - private function runInteractive(ReceiverInterface $failedTransport, SymfonyStyle $io, bool $shouldForce) + private function runInteractive(string $failureTransportName, ReceiverInterface $failedTransport, SymfonyStyle $io, bool $shouldForce) { $receiver = $this->getReceiver($failedTransport); $count = 0; @@ -139,7 +144,7 @@ private function runInteractive(ReceiverInterface $failedTransport, SymfonyStyle $id = $this->getMessageId($envelope); if (null === $id) { - throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.', $this->getReceiverName($failedTransport))); + throw new LogicException(sprintf('The "%s" receiver is able to list messages by id but the envelope is missing the TransportMessageIdStamp stamp.', $failureTransportName)); } $ids[] = $id; } @@ -153,7 +158,7 @@ private function runInteractive(ReceiverInterface $failedTransport, SymfonyStyle } } else { // get() and ask messages one-by-one - $count = $this->runWorker($failedTransport, $this->getReceiver($failedTransport), $io, $shouldForce); + $count = $this->runWorker($failureTransportName, $this->getReceiver($failedTransport), $io, $shouldForce); } // avoid success message if nothing was processed @@ -162,7 +167,7 @@ private function runInteractive(ReceiverInterface $failedTransport, SymfonyStyle } } - private function runWorker(?string $failedTransport, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int + private function runWorker(string $failureTransportName, ReceiverInterface $receiver, SymfonyStyle $io, bool $shouldForce): int { $count = 0; $listener = function (WorkerMessageReceivedEvent $messageReceivedEvent) use ($io, $receiver, $shouldForce, &$count) { @@ -183,7 +188,7 @@ private function runWorker(?string $failedTransport, ReceiverInterface $receiver $this->eventDispatcher->addListener(WorkerMessageReceivedEvent::class, $listener); $worker = new Worker( - [$this->getReceiverName($failedTransport) => $receiver], + [$failureTransportName => $receiver], $this->messageBus, $this->eventDispatcher, $this->logger @@ -198,12 +203,12 @@ private function runWorker(?string $failedTransport, ReceiverInterface $receiver return $count; } - private function retrySpecificIds(?string $failedTransport, array $ids, SymfonyStyle $io, bool $shouldForce) + private function retrySpecificIds(string $failureTransportName, array $ids, SymfonyStyle $io, bool $shouldForce) { - $receiver = $this->getReceiver($failedTransport); + $receiver = $this->getReceiver($failureTransportName); if (!$receiver instanceof ListableReceiverInterface) { - throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $this->getReceiverName($failedTransport))); + throw new RuntimeException(sprintf('The "%s" receiver does not support retrying messages by id.', $failureTransportName)); } foreach ($ids as $id) { @@ -213,7 +218,7 @@ private function retrySpecificIds(?string $failedTransport, array $ids, SymfonyS } $singleReceiver = new SingleMessageReceiver($receiver, $envelope); - $this->runWorker($failedTransport, $singleReceiver, $io, $shouldForce); + $this->runWorker($failureTransportName, $singleReceiver, $io, $shouldForce); } } } diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php index c694ccc5180bc..3b77ed0c78545 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php @@ -36,7 +36,7 @@ protected function configure(): void ->setDefinition([ new InputArgument('id', InputArgument::OPTIONAL, 'Specific message id to show'), new InputOption('max', null, InputOption::VALUE_REQUIRED, 'Maximum number of messages to list', 50), - new InputOption('failed-transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failed transport'), + new InputOption('failure-transport', null, InputOption::VALUE_OPTIONAL, 'Use a specific failure transport'), ]) ->setDescription('Shows one or more messages from the failure transport') ->setHelp(<<<'EOF' @@ -59,28 +59,32 @@ protected function execute(InputInterface $input, OutputInterface $output) { $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); - $failedTransport = $input->getOption('failed-transport'); - $receiver = $this->getReceiver($failedTransport); + $failureTransportName = $input->getOption('failure-transport'); + if ($failureTransportName === null) { + $failureTransportName = $this->getGlobalFailureReceiverName(); + } + + $receiver = $this->getReceiver($failureTransportName); $this->printPendingMessagesMessage($receiver, $io); if (!$receiver instanceof ListableReceiverInterface) { - throw new RuntimeException(sprintf('The "%s" receiver does not support listing or showing specific messages.', $this->getReceiverName($failedTransport))); + throw new RuntimeException(sprintf('The "%s" receiver does not support listing or showing specific messages.', $failureTransportName)); } if (null === $id = $input->getArgument('id')) { - $this->listMessages($failedTransport, $io, $input->getOption('max')); + $this->listMessages($failureTransportName, $io, $input->getOption('max')); } else { - $this->showMessage($failedTransport, $id, $io); + $this->showMessage($failureTransportName, $id, $io); } return 0; } - private function listMessages(?string $failedTransport, SymfonyStyle $io, int $max) + private function listMessages(?string $failedTransportName, SymfonyStyle $io, int $max) { /** @var ListableReceiverInterface $receiver */ - $receiver = $this->getReceiver($failedTransport); + $receiver = $this->getReceiver($failedTransportName); $envelopes = $receiver->all($max); $rows = []; @@ -110,10 +114,10 @@ private function listMessages(?string $failedTransport, SymfonyStyle $io, int $m $io->comment('Run messenger:failed:show {id} -vv to see message details.'); } - private function showMessage(?string $failedTransport, string $id, SymfonyStyle $io) + private function showMessage(?string $failedTransportName, string $id, SymfonyStyle $io) { /** @var ListableReceiverInterface $receiver */ - $receiver = $this->getReceiver($failedTransport); + $receiver = $this->getReceiver($failedTransportName); $envelope = $receiver->find($id); if (null === $envelope) { throw new RuntimeException(sprintf('The message "%s" was not found.', $id)); diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php index 718ecfdfd1094..20fbbaa7d0005 100644 --- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php @@ -28,28 +28,30 @@ */ class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface { - private $failureSenders; - private $logger; /** - * @var SenderInterface|null global failure sender will be null if it is not defined + * @var ServiceLocator|SenderInterface */ - private $globalfailureSender; + private $failureSender; + private $logger; - public function __construct(SenderInterface $globalfailureSender = null, LoggerInterface $logger = null, ServiceLocator $failureSenders = null) + public function __construct($failureSender, LoggerInterface $logger = null) { - $this->globalfailureSender = $globalfailureSender; - $this->failureSenders = $failureSenders; + if (!$failureSender instanceof ServiceLocator) { + trigger_deprecation('symfony/messenger', '5.2', 'Passing failureTransports should now pass a ServiceLocator', __METHOD__); + } + + $this->failureSender = $failureSender; $this->logger = $logger; } - + public function onMessageFailed(WorkerMessageFailedEvent $event) { if ($event->willRetry()) { return; } - $hasFailureTransports = $this->failureSenders instanceof ServiceLocator && $this->failureSenders->has($event->getReceiverName()); - if (null === $this->globalfailureSender && !$hasFailureTransports) { + $hasFailureTransports = $this->failureSender instanceof ServiceLocator && $this->failureSender->has($event->getReceiverName()); + if (null === !$hasFailureTransports) { return; } @@ -73,6 +75,10 @@ public function onMessageFailed(WorkerMessageFailedEvent $event) ); $failureSender = $this->getFailureSender($event->getReceiverName()); + if ($failureSender === null) { + return; + } + if (null !== $this->logger) { $this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [ 'class' => \get_class($envelope->getMessage()), @@ -90,12 +96,12 @@ public static function getSubscribedEvents() ]; } - private function getFailureSender(string $receiverName): SenderInterface + private function getFailureSender(string $receiverName): ?SenderInterface { - if ($this->failureSenders instanceof ServiceLocator && $this->failureSenders->has($receiverName)) { - return $this->failureSenders->get($receiverName); + if ($this->failureSender instanceof ServiceLocator && $this->failureSender->has($receiverName)) { + return $this->failureSender->get($receiverName); } - - return $this->globalfailureSender; + + return $this->failureSender; } } diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php index 651b5eb4a2e9d..e2104afe2b200 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php @@ -31,13 +31,12 @@ public function testRemoveUniqueMessageSpecificFailedTransport() $serviceLocator->expects($this->once())->method('get')->with($anotherFailedTransport)->willReturn($receiver); $command = new FailedMessagesRemoveCommand( - null, null, $serviceLocator ); $tester = new CommandTester($command); - $tester->execute(['id' => [$messageId], '--failed-transport' => $anotherFailedTransport, '--force' => true]); + $tester->execute(['id' => [$messageId], '--failure-transport' => $anotherFailedTransport, '--force' => true]); $this->assertStringContainsString('Failed Message Details', $tester->getDisplay()); $this->assertStringContainsString('Message with id 20 removed.', $tester->getDisplay()); @@ -45,13 +44,14 @@ public function testRemoveUniqueMessageSpecificFailedTransport() public function testRemoveUniqueMessage() { + $globalFailureReceiverName = 'failure_receiver'; $receiver = $this->createMock(ListableReceiverInterface::class); $receiver->expects($this->once())->method('find')->with(20)->willReturn(new Envelope(new \stdClass())); $serviceLocator = $this->createMock(ServiceLocator::class); + $serviceLocator->expects($this->any())->method('get')->with($globalFailureReceiverName)->willReturn($receiver); $command = new FailedMessagesRemoveCommand( - 'failure_receiver', - $receiver, + $globalFailureReceiverName, $serviceLocator ); @@ -64,6 +64,7 @@ public function testRemoveUniqueMessage() public function testRemoveMultipleMessages() { + $globalFailureReceiverName = 'failure_receiver'; $receiver = $this->createMock(ListableReceiverInterface::class); $receiver->expects($this->exactly(3))->method('find')->withConsecutive([20], [30], [40])->willReturnOnConsecutiveCalls( new Envelope(new \stdClass()), @@ -71,11 +72,11 @@ public function testRemoveMultipleMessages() new Envelope(new \stdClass()) ); $serviceLocator = $this->createMock(ServiceLocator::class); - + $serviceLocator->expects($this->any())->method('get')->with($globalFailureReceiverName)->willReturn($receiver); + $command = new FailedMessagesRemoveCommand( - 'failure_receiver', - $receiver, - $serviceLocator + $globalFailureReceiverName, + $receiver ); $tester = new CommandTester($command); @@ -89,16 +90,17 @@ public function testRemoveMultipleMessages() public function testRemoveMultipleMessagesAndDisplayMessages() { + $globalFailureReceiverName = 'failure_receiver'; $receiver = $this->createMock(ListableReceiverInterface::class); $receiver->expects($this->exactly(2))->method('find')->withConsecutive([20], [30])->willReturnOnConsecutiveCalls( new Envelope(new \stdClass()), new Envelope(new \stdClass()) ); $serviceLocator = $this->createMock(ServiceLocator::class); + $serviceLocator->expects($this->any())->method('get')->with($globalFailureReceiverName)->willReturn($receiver); $command = new FailedMessagesRemoveCommand( - 'failure_receiver', - $receiver, + $globalFailureReceiverName, $serviceLocator ); diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php index 557609913b3ba..ea93dc2f13a26 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php @@ -34,15 +34,14 @@ public function testBasicRun() $bus = $this->createMock(MessageBusInterface::class); // the bus should be called in the worker $bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass())); - $serviceLocator = $this->createMock(ServiceLocator::class); - + $command = new FailedMessagesRetryCommand( 'failure_receiver', $receiver, $bus, $dispatcher, new NullLogger(), - $serviceLocator + null ); $tester = new CommandTester($command); @@ -76,7 +75,7 @@ public function testBasicRunWithSpecificFailedTransport() ); $tester = new CommandTester($command); - $tester->execute(['id' => [10, 12], '--failed-transport' => $failedTransportName, '--force' => true]); + $tester->execute(['id' => [10, 12], '--failure-transport' => $failedTransportName, '--force' => true]); $this->assertStringContainsString('[OK]', $tester->getDisplay()); } diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php index f6baba85137fd..01720a2b8299a 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php @@ -44,13 +44,12 @@ public function testFailedServiceLocator() $serviceLocator->method('get')->with($receiverName)->willReturn($receiver); $command = new FailedMessagesShowCommand( - null, null, $serviceLocator ); $tester = new CommandTester($command); - $tester->execute(['id' => 15, '--failed-transport' => $receiverName]); + $tester->execute(['id' => 15, '--failure-transport' => $receiverName]); $this->assertStringContainsString(sprintf(<<createMock(ListableReceiverInterface::class); $receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope); - $serviceLocator = $this->createMock(ServiceLocator::class); $command = new FailedMessagesShowCommand( 'failure_receiver', - $receiver, - $serviceLocator + $receiver ); $tester = new CommandTester($command); @@ -113,13 +110,15 @@ public function testMultipleRedeliveryFails() $redeliveryStamp1, $redeliveryStamp2, ]); + + $globalFailureReceiverName = 'failure_receiver'; $receiver = $this->createMock(ListableReceiverInterface::class); $receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope); $serviceLocator = $this->createMock(ServiceLocator::class); + $serviceLocator->expects($this->any())->method('get')->with($globalFailureReceiverName)->willReturn($receiver); $command = new FailedMessagesShowCommand( - 'failure_receiver', - $receiver, + $globalFailureReceiverName, $serviceLocator ); @@ -142,13 +141,14 @@ public function testMultipleRedeliveryFails() public function testReceiverShouldBeListable() { + $globalFailureReceiverName = 'failure_receiver'; $receiver = $this->createMock(ReceiverInterface::class); $serviceLocator = $this->createMock(ServiceLocator::class); + $serviceLocator->expects($this->any())->method('get')->with($globalFailureReceiverName)->willReturn($receiver); $command = new FailedMessagesShowCommand( - 'failure_receiver', - $receiver, - $serviceLocator + $globalFailureReceiverName, + $receiver ); $this->expectExceptionMessage('The "failure_receiver" receiver does not support listing or showing specific messages.'); From 3650bb1ce514fb6c1c1da7e17925372473ed41f1 Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Sat, 19 Sep 2020 15:49:40 +0100 Subject: [PATCH 03/15] fix --- .../DependencyInjection/FrameworkExtension.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 05987f53580c8..6c973e0622be2 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1850,6 +1850,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder } $sendersServiceLocator = ServiceLocatorTagPass::register($container, $senderReferences); + $container->getDefinition('messenger.senders_locator') ->replaceArgument(0, $messageToSendersMapping) ->replaceArgument(1, $sendersServiceLocator) @@ -1895,7 +1896,8 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $globalFailureReceiver = $config['failure_transport'] ?? null; $container->getDefinition('console.command.messenger_failed_messages_retry') ->replaceArgument(0, $globalFailureReceiver) - ->replaceArgument(1, $container->getDefinition($failureTransportsServiceLocator)); + ->replaceArgument(1, $senderReferences[$config['failure_transport']]) + ->replaceArgument(5, $container->getDefinition($failureTransportsServiceLocator)); $container->getDefinition('console.command.messenger_failed_messages_show') ->replaceArgument(0, $globalFailureReceiver) ->replaceArgument(1, $container->getDefinition($failureTransportsServiceLocatorId)); From 87b31d39cd00527e79c3d61c03fcfca69a75567c Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Sat, 19 Sep 2020 15:52:05 +0100 Subject: [PATCH 04/15] fix --- .../Bundle/FrameworkBundle/Resources/config/console.php | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php index e9b3d2e36a855..a988e1f67e3d2 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php @@ -170,20 +170,21 @@ service('messenger.routable_message_bus'), service('event_dispatcher'), service('logger'), + abstract_arg('Receivers') ]) ->tag('console.command', ['command' => 'messenger:failed:retry']) ->set('console.command.messenger_failed_messages_show', FailedMessagesShowCommand::class) ->args([ abstract_arg('Receiver name'), - abstract_arg('Receiver'), + abstract_arg('Receivers'), ]) ->tag('console.command', ['command' => 'messenger:failed:show']) ->set('console.command.messenger_failed_messages_remove', FailedMessagesRemoveCommand::class) ->args([ abstract_arg('Receiver name'), - abstract_arg('Receiver'), + abstract_arg('Receivers'), ]) ->tag('console.command', ['command' => 'messenger:failed:remove']) From 0cd81d0b6d86f18af058a6b9f4279aeecffe40fc Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Sat, 19 Sep 2020 15:54:24 +0100 Subject: [PATCH 05/15] cs fix --- .../DependencyInjection/FrameworkExtension.php | 11 +++++------ .../FrameworkBundle/Resources/config/console.php | 2 +- .../FrameworkBundle/Resources/config/messenger.php | 2 +- .../DependencyInjection/FrameworkExtensionTest.php | 14 ++++++-------- .../Command/AbstractFailedMessagesCommand.php | 6 +++--- .../Command/FailedMessagesRemoveCommand.php | 4 ++-- .../Command/FailedMessagesRetryCommand.php | 10 +++++----- .../Command/FailedMessagesShowCommand.php | 2 +- ...SendFailedMessageToFailureTransportListener.php | 12 ++++++------ .../Command/FailedMessagesRemoveCommandTest.php | 2 +- .../Command/FailedMessagesRetryCommandTest.php | 2 +- .../Command/FailedMessagesShowCommandTest.php | 2 +- 12 files changed, 33 insertions(+), 36 deletions(-) diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 6c973e0622be2..c703b08b019c5 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1862,14 +1862,14 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $container->getDefinition('messenger.retry_strategy_locator') ->replaceArgument(0, $transportRetryReferences); - + $failureTransports = []; if ($config['failure_transport']) { if (!isset($senderReferences[$config['failure_transport']])) { throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport'])); } - + $failureTransports[$config['failure_transport']] = $senderReferences[$config['failure_transport']]; } @@ -1886,13 +1886,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $failureTransportsServiceLocatorId = 'messenger.failure_transports.locator'; if (\count($failureTransports) > 0) { $failureTransportsServiceLocator = ServiceLocatorTagPass::register($container, $failureTransports, $failureTransportsServiceLocatorId); - + $container->getDefinition($failureTransportsServiceLocatorId) ->replaceArgument(0, $failureTransports); - + $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') ->replaceArgument(0, $failureTransportsServiceLocator); - + $globalFailureReceiver = $config['failure_transport'] ?? null; $container->getDefinition('console.command.messenger_failed_messages_retry') ->replaceArgument(0, $globalFailureReceiver) @@ -1904,7 +1904,6 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $container->getDefinition('console.command.messenger_failed_messages_remove') ->replaceArgument(0, $globalFailureReceiver) ->replaceArgument(1, $container->getDefinition($failureTransportsServiceLocatorId)); - } else { $container->removeDefinition('console.command.messenger_failed_messages_retry'); $container->removeDefinition('console.command.messenger_failed_messages_show'); diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php index a988e1f67e3d2..221492aca53d3 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/console.php @@ -170,7 +170,7 @@ service('messenger.routable_message_bus'), service('event_dispatcher'), service('logger'), - abstract_arg('Receivers') + abstract_arg('Receivers'), ]) ->tag('console.command', ['command' => 'messenger:failed:retry']) diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index 94927900204b1..f6a61fa58384f 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -137,7 +137,7 @@ abstract_arg('failed transports map by name'), ]) ->tag('container.service_locator') - + // retry ->set('messenger.retry_strategy_locator') ->args([ diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index 702da37ceb9bc..56c52b28d4da4 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -40,7 +40,6 @@ use Symfony\Component\DependencyInjection\Loader\ClosureLoader; use Symfony\Component\DependencyInjection\ParameterBag\EnvPlaceholderParameterBag; use Symfony\Component\DependencyInjection\Reference; -use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\HttpClient\MockHttpClient; use Symfony\Component\HttpClient\RetryableHttpClient; @@ -672,11 +671,10 @@ public function testMessengerMultipleFailureTransports() if (\in_array($transportName, $failedTransports)) { continue; } - + $this->assertSame('messenger.transport.'.$expectedFailureTransportsMapping[$transportName], (string) $ref, sprintf('The transport "%s" does not have the expected failed transport reference', $transportName)); - } - + $this->assertTrue(true); } @@ -684,7 +682,7 @@ public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport { $container = $this->createContainerFromFile('messenger_multiple_failure_transports_global'); $failureTransportsLocatorDefinition = $container->getDefinition('messenger.failure_transports.locator'); - + /** @var Reference $failureTransportsMapping */ $failureTransportsMapping = $failureTransportsLocatorDefinition->getArgument(0); @@ -707,7 +705,7 @@ public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport $this->assertSame('messenger.transport.'.$expectedFailureTransportsMapping[$transportName], (string) $ref, sprintf('The transport "%s" does not have the expected failed transport reference', $transportName)); } - + $this->assertTrue(true); } @@ -756,10 +754,10 @@ public function testMessengerTransports() $this->assertSame(7, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(1)); $this->assertSame(3, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(2)); $this->assertSame(100, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(3)); - + $failureTransportsLocator = $container->getDefinition('messenger.failure_transports.locator'); $expectedFailureTransports = [ - 'failed' => new Reference('messenger.transport.failed') + 'failed' => new Reference('messenger.transport.failed'), ]; $this->assertEquals($expectedFailureTransports, $failureTransportsLocator->getArgument(0)); } diff --git a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php index 051ae6ffcf8dd..d5357a2572d1d 100644 --- a/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php +++ b/src/Symfony/Component/Messenger/Command/AbstractFailedMessagesCommand.php @@ -39,9 +39,9 @@ public function __construct(?string $globalFailureReceiverName, $failureTranspor if (!$failureTransports instanceof ServiceLocator) { trigger_deprecation('symfony/messenger', '5.2', 'Passing failureTransports should now pass a ServiceLocator', __METHOD__); - $this->failureTransports = new ServiceLocator([$this->globalFailureReceiverName => function() use ($failureTransports) { return $failureTransports; }]); - } - + $this->failureTransports = new ServiceLocator([$this->globalFailureReceiverName => function () use ($failureTransports) { return $failureTransports; }]); + } + parent::__construct(); } diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php index 8952624aed833..ca6d772495602 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRemoveCommand.php @@ -60,10 +60,10 @@ protected function execute(InputInterface $input, OutputInterface $output) $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); $failureTransportName = $input->getOption('failure-transport'); - if ($failureTransportName === null) { + if (null === $failureTransportName) { $failureTransportName = $this->getGlobalFailureReceiverName(); } - + $receiver = $this->getReceiver($failureTransportName); $shouldForce = $input->getOption('force'); diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php index cfa4b51a20a18..8ef0bdaa26924 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php @@ -36,7 +36,7 @@ class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand { protected static $defaultName = 'messenger:failed:retry'; - + private $eventDispatcher; private $messageBus; private $logger; @@ -48,8 +48,8 @@ public function __construct(?string $globalReceiverName, ?ReceiverInterface $glo $this->messageBus = $messageBus; $this->logger = $logger; $this->globalReceiverName = $globalReceiverName; - - parent::__construct($globalReceiverName, $failureTransports === null ? $globalReceiver : $failureTransports); + + parent::__construct($globalReceiverName, null === $failureTransports ? $globalReceiver : $failureTransports); } /** @@ -100,10 +100,10 @@ protected function execute(InputInterface $input, OutputInterface $output) } $failureTransportName = $input->getOption('failure-transport'); - if ($failureTransportName === null) { + if (null === $failureTransportName) { $failureTransportName = $this->getGlobalFailureReceiverName(); } - + $receiver = $this->getReceiver($failureTransportName); $this->printPendingMessagesMessage($receiver, $io); diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php index 3b77ed0c78545..8e8e21b9335b6 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesShowCommand.php @@ -60,7 +60,7 @@ protected function execute(InputInterface $input, OutputInterface $output) $io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output); $failureTransportName = $input->getOption('failure-transport'); - if ($failureTransportName === null) { + if (null === $failureTransportName) { $failureTransportName = $this->getGlobalFailureReceiverName(); } diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php index 20fbbaa7d0005..f50973787cc2d 100644 --- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php @@ -38,12 +38,12 @@ public function __construct($failureSender, LoggerInterface $logger = null) { if (!$failureSender instanceof ServiceLocator) { trigger_deprecation('symfony/messenger', '5.2', 'Passing failureTransports should now pass a ServiceLocator', __METHOD__); - } - + } + $this->failureSender = $failureSender; $this->logger = $logger; } - + public function onMessageFailed(WorkerMessageFailedEvent $event) { if ($event->willRetry()) { @@ -75,10 +75,10 @@ public function onMessageFailed(WorkerMessageFailedEvent $event) ); $failureSender = $this->getFailureSender($event->getReceiverName()); - if ($failureSender === null) { + if (null === $failureSender) { return; } - + if (null !== $this->logger) { $this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [ 'class' => \get_class($envelope->getMessage()), @@ -101,7 +101,7 @@ private function getFailureSender(string $receiverName): ?SenderInterface if ($this->failureSender instanceof ServiceLocator && $this->failureSender->has($receiverName)) { return $this->failureSender->get($receiverName); } - + return $this->failureSender; } } diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php index e2104afe2b200..876ae391f4619 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRemoveCommandTest.php @@ -73,7 +73,7 @@ public function testRemoveMultipleMessages() ); $serviceLocator = $this->createMock(ServiceLocator::class); $serviceLocator->expects($this->any())->method('get')->with($globalFailureReceiverName)->willReturn($receiver); - + $command = new FailedMessagesRemoveCommand( $globalFailureReceiverName, $receiver diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php index ea93dc2f13a26..7ecc74fdb7d1e 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php @@ -34,7 +34,7 @@ public function testBasicRun() $bus = $this->createMock(MessageBusInterface::class); // the bus should be called in the worker $bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass())); - + $command = new FailedMessagesRetryCommand( 'failure_receiver', $receiver, diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php index 01720a2b8299a..d7f0a362461e9 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesShowCommandTest.php @@ -110,7 +110,7 @@ public function testMultipleRedeliveryFails() $redeliveryStamp1, $redeliveryStamp2, ]); - + $globalFailureReceiverName = 'failure_receiver'; $receiver = $this->createMock(ListableReceiverInterface::class); $receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope); From 129480be8e66b72bf2236526c6ce7b601f3b2030 Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Sun, 20 Sep 2020 15:53:10 +0100 Subject: [PATCH 06/15] add another service locator for the listener --- .../FrameworkExtension.php | 23 ++++--- .../Resources/config/messenger.php | 8 ++- .../FrameworkExtensionTest.php | 67 +++++++------------ .../DependencyInjection/MessengerPass.php | 12 ---- ...ailedMessageToFailureTransportListener.php | 30 ++++----- ...dMessageToFailureTransportListenerTest.php | 7 +- 6 files changed, 66 insertions(+), 81 deletions(-) diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index c703b08b019c5..c7b343f7c9fd6 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1864,6 +1864,10 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder ->replaceArgument(0, $transportRetryReferences); $failureTransports = []; + $failureTransportsByTransportName = []; + + $failureTransportsServiceLocatorId = 'messenger.failure_transports.locator'; + $failureTransportsByTransportNameServiceLocatorId = 'messenger.failure_transports_by_transport_name.locator'; if ($config['failure_transport']) { if (!isset($senderReferences[$config['failure_transport']])) { @@ -1879,24 +1883,20 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $transport['failure_transport'])); } + $failureTransportsByTransportName[$name] = $senderReferences[$transport['failure_transport']]; $failureTransports[$transport['failure_transport']] = $senderReferences[$transport['failure_transport']]; } } - - $failureTransportsServiceLocatorId = 'messenger.failure_transports.locator'; + if (\count($failureTransports) > 0) { $failureTransportsServiceLocator = ServiceLocatorTagPass::register($container, $failureTransports, $failureTransportsServiceLocatorId); - $container->getDefinition($failureTransportsServiceLocatorId) ->replaceArgument(0, $failureTransports); - - $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') - ->replaceArgument(0, $failureTransportsServiceLocator); - + $globalFailureReceiver = $config['failure_transport'] ?? null; $container->getDefinition('console.command.messenger_failed_messages_retry') ->replaceArgument(0, $globalFailureReceiver) - ->replaceArgument(1, $senderReferences[$config['failure_transport']]) + ->replaceArgument(1, $senderReferences[$config['failure_transport']] ?? null) ->replaceArgument(5, $container->getDefinition($failureTransportsServiceLocator)); $container->getDefinition('console.command.messenger_failed_messages_show') ->replaceArgument(0, $globalFailureReceiver) @@ -1904,6 +1904,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $container->getDefinition('console.command.messenger_failed_messages_remove') ->replaceArgument(0, $globalFailureReceiver) ->replaceArgument(1, $container->getDefinition($failureTransportsServiceLocatorId)); + + $failureTransportsByTransportNameServiceLocator = ServiceLocatorTagPass::register($container, $failureTransportsByTransportName, $failureTransportsByTransportNameServiceLocatorId); + $container->getDefinition($failureTransportsByTransportNameServiceLocatorId) + ->replaceArgument(0, $failureTransportsByTransportName); + $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') + ->replaceArgument(0, $senderReferences[$config['failure_transport']] ?? null) + ->replaceArgument(2, $failureTransportsByTransportNameServiceLocator); } else { $container->removeDefinition('console.command.messenger_failed_messages_retry'); $container->removeDefinition('console.command.messenger_failed_messages_show'); diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index f6a61fa58384f..4506899c1be8b 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -137,6 +137,11 @@ abstract_arg('failed transports map by name'), ]) ->tag('container.service_locator') + ->set('messenger.failure_transports_by_transport_name.locator', ServiceLocator::class) + ->args([ + abstract_arg('failed transports map by transport name'), + ]) + ->tag('container.service_locator') // retry ->set('messenger.retry_strategy_locator') @@ -166,8 +171,9 @@ ->set('messenger.failure.send_failed_message_to_failure_transport_listener', SendFailedMessageToFailureTransportListener::class) ->args([ - abstract_arg('failure transport'), + abstract_arg('global failure transport'), service('logger')->ignoreOnInvalid(), + abstract_arg('failure transports by transport name locator'), ]) ->tag('kernel.event_subscriber') ->tag('monolog.logger', ['channel' => 'messenger']) diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index 56c52b28d4da4..34337c4caa520 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -651,62 +651,43 @@ public function testMessenger() public function testMessengerMultipleFailureTransports() { $container = $this->createContainerFromFile('messenger_multiple_failure_transports'); - $failureTransportsLocatorDefinition = $container->getDefinition('messenger.failure_transports.locator'); - - /** @var Reference $failureTransportsMapping */ - $failureTransportsMapping = $failureTransportsLocatorDefinition->getArgument(0); - + // transport 2 exists but does not appear in the mapping - $expectedFailureTransportsMapping = [ - 'transport_1' => 'failure_transport_1', - 'transport_3' => 'failure_transport_3', + $expectedFailureTransports = [ + 'failure_transport_1' => new Reference('messenger.transport.failure_transport_1'), + 'failure_transport_3' => new Reference('messenger.transport.failure_transport_3'), ]; - - $failedTransports = [ - 'failure_transport_1', - 'failure_transport_3', + $failureTransportsLocator = $container->getDefinition('messenger.failure_transports.locator'); + $this->assertEquals($expectedFailureTransports, $failureTransportsLocator->getArgument(0)); + + $expectedFailureTransportsByTransportName = [ + 'transport_1' => new Reference('messenger.transport.failure_transport_1'), + 'transport_3' => new Reference('messenger.transport.failure_transport_3'), ]; - foreach ($failureTransportsMapping as $transportName => $ref) { - if (\in_array($transportName, $failedTransports)) { - continue; - } - - $this->assertSame('messenger.transport.'.$expectedFailureTransportsMapping[$transportName], (string) $ref, sprintf('The transport "%s" does not have the expected failed transport reference', $transportName)); - } - - $this->assertTrue(true); + $failureTransportsByTransportNameLocator = $container->getDefinition('messenger.failure_transports_by_transport_name.locator'); + $this->assertEquals($expectedFailureTransportsByTransportName, $failureTransportsByTransportNameLocator->getArgument(0)); } public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport() { $container = $this->createContainerFromFile('messenger_multiple_failure_transports_global'); - $failureTransportsLocatorDefinition = $container->getDefinition('messenger.failure_transports.locator'); - - /** @var Reference $failureTransportsMapping */ - $failureTransportsMapping = $failureTransportsLocatorDefinition->getArgument(0); - $expectedFailureTransportsMapping = [ - 'transport_1' => 'failure_transport_1', - 'transport_2' => 'failure_transport_global', - 'transport_3' => 'failure_transport_3', + $expectedFailureTransports = [ + 'failure_transport_global' => new Reference('messenger.transport.failure_transport_global'), + 'failure_transport_1' => new Reference('messenger.transport.failure_transport_1'), + 'failure_transport_3' => new Reference('messenger.transport.failure_transport_3'), ]; + $failureTransportsLocator = $container->getDefinition('messenger.failure_transports.locator'); + $this->assertEquals($expectedFailureTransports, $failureTransportsLocator->getArgument(0)); - $failed_transports = [ - 'failure_transport_global', - 'failure_transport_1', - 'failure_transport_3', + $expectedFailureTransportsByTransportName = [ + 'transport_1' => new Reference('messenger.transport.failure_transport_1'), + 'transport_3' => new Reference('messenger.transport.failure_transport_3'), ]; - foreach ($failureTransportsMapping as $transportName => $ref) { - if (\in_array($transportName, $failed_transports)) { - continue; - } - - $this->assertSame('messenger.transport.'.$expectedFailureTransportsMapping[$transportName], (string) $ref, sprintf('The transport "%s" does not have the expected failed transport reference', $transportName)); - } - - $this->assertTrue(true); + $failureTransportsByTransportNameLocator = $container->getDefinition('messenger.failure_transports_by_transport_name.locator'); + $this->assertEquals($expectedFailureTransportsByTransportName, $failureTransportsByTransportNameLocator->getArgument(0)); } public function testMessengerTransports() @@ -760,6 +741,8 @@ public function testMessengerTransports() 'failed' => new Reference('messenger.transport.failed'), ]; $this->assertEquals($expectedFailureTransports, $failureTransportsLocator->getArgument(0)); + $failureTransportsByTransportNameLocator = $container->getDefinition('messenger.failure_transports_by_transport_name.locator'); + $this->assertEquals([], $failureTransportsByTransportNameLocator->getArgument(0)); } public function testMessengerRouting() diff --git a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php index 25ffe4b869b9c..8240ad1f1c385 100644 --- a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php +++ b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php @@ -298,18 +298,6 @@ private function registerReceivers(ContainerBuilder $container, array $busIds) } $container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping); - - $failedCommandIds = [ - 'console.command.messenger_failed_messages_retry', - 'console.command.messenger_failed_messages_show', - 'console.command.messenger_failed_messages_remove', - ]; - foreach ($failedCommandIds as $failedCommandId) { - if ($container->hasDefinition($failedCommandId)) { - $definition = $container->getDefinition($failedCommandId); - $definition->replaceArgument(1, $receiverMapping[$definition->getArgument(0)]); - } - } } private function registerBusToCollector(ContainerBuilder $container, string $busId) diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php index f50973787cc2d..ee62c8b6eb3a1 100644 --- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php @@ -28,19 +28,14 @@ */ class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface { - /** - * @var ServiceLocator|SenderInterface - */ - private $failureSender; + private $failureSenders; + private $globalFailureSender; private $logger; - public function __construct($failureSender, LoggerInterface $logger = null) + public function __construct(?SenderInterface $globalFailureSender, LoggerInterface $logger = null, ?ServiceLocator $failureSenders = null) { - if (!$failureSender instanceof ServiceLocator) { - trigger_deprecation('symfony/messenger', '5.2', 'Passing failureTransports should now pass a ServiceLocator', __METHOD__); - } - - $this->failureSender = $failureSender; + $this->globalFailureSender = $globalFailureSender; + $this->failureSenders = $failureSenders; $this->logger = $logger; } @@ -50,8 +45,8 @@ public function onMessageFailed(WorkerMessageFailedEvent $event) return; } - $hasFailureTransports = $this->failureSender instanceof ServiceLocator && $this->failureSender->has($event->getReceiverName()); - if (null === !$hasFailureTransports) { + $hasFailureTransports = $this->hasFailureTransports($event); + if (!$hasFailureTransports) { return; } @@ -98,10 +93,15 @@ public static function getSubscribedEvents() private function getFailureSender(string $receiverName): ?SenderInterface { - if ($this->failureSender instanceof ServiceLocator && $this->failureSender->has($receiverName)) { - return $this->failureSender->get($receiverName); + if ($this->failureSenders instanceof ServiceLocator && $this->failureSenders->has($receiverName)) { + return $this->failureSenders->get($receiverName); } - return $this->failureSender; + return $this->globalFailureSender; + } + + private function hasFailureTransports(WorkerMessageFailedEvent $event): bool + { + return ($this->failureSenders instanceof ServiceLocator && $this->failureSenders->has($event->getReceiverName())) || $this->globalFailureSender !== null; } } diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php index 4ac662cbbb082..2992b4529e355 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php @@ -19,6 +19,7 @@ use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; +use Symfony\Component\Messenger\Transport\Sender\SendersLocator; class SendFailedMessageToFailureTransportListenerTest extends TestCase { @@ -26,9 +27,9 @@ public function testDoNothingIfFailureTransportIsNotDefined() { $sender = $this->createMock(SenderInterface::class); $sender->expects($this->never())->method('send'); - - $listener = new SendFailedMessageToFailureTransportListener(null); - + + $listener = new SendFailedMessageToFailureTransportListener(null, null, null); + $exception = new \Exception('no!'); $envelope = new Envelope(new \stdClass()); $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); From 1298546327d6b6b7b105f9d807719320a48c83de Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Sun, 20 Sep 2020 15:54:42 +0100 Subject: [PATCH 07/15] cs fix --- .../DependencyInjection/FrameworkExtension.php | 4 ++-- .../Tests/DependencyInjection/FrameworkExtensionTest.php | 8 ++++---- .../SendFailedMessageToFailureTransportListener.php | 4 ++-- .../SendFailedMessageToFailureTransportListenerTest.php | 5 ++--- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index c7b343f7c9fd6..6a0b0028b572b 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1887,12 +1887,12 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $failureTransports[$transport['failure_transport']] = $senderReferences[$transport['failure_transport']]; } } - + if (\count($failureTransports) > 0) { $failureTransportsServiceLocator = ServiceLocatorTagPass::register($container, $failureTransports, $failureTransportsServiceLocatorId); $container->getDefinition($failureTransportsServiceLocatorId) ->replaceArgument(0, $failureTransports); - + $globalFailureReceiver = $config['failure_transport'] ?? null; $container->getDefinition('console.command.messenger_failed_messages_retry') ->replaceArgument(0, $globalFailureReceiver) diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index 34337c4caa520..862eed9ed6c1e 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -651,7 +651,7 @@ public function testMessenger() public function testMessengerMultipleFailureTransports() { $container = $this->createContainerFromFile('messenger_multiple_failure_transports'); - + // transport 2 exists but does not appear in the mapping $expectedFailureTransports = [ 'failure_transport_1' => new Reference('messenger.transport.failure_transport_1'), @@ -659,10 +659,10 @@ public function testMessengerMultipleFailureTransports() ]; $failureTransportsLocator = $container->getDefinition('messenger.failure_transports.locator'); $this->assertEquals($expectedFailureTransports, $failureTransportsLocator->getArgument(0)); - + $expectedFailureTransportsByTransportName = [ 'transport_1' => new Reference('messenger.transport.failure_transport_1'), - 'transport_3' => new Reference('messenger.transport.failure_transport_3'), + 'transport_3' => new Reference('messenger.transport.failure_transport_3'), ]; $failureTransportsByTransportNameLocator = $container->getDefinition('messenger.failure_transports_by_transport_name.locator'); @@ -683,7 +683,7 @@ public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport $expectedFailureTransportsByTransportName = [ 'transport_1' => new Reference('messenger.transport.failure_transport_1'), - 'transport_3' => new Reference('messenger.transport.failure_transport_3'), + 'transport_3' => new Reference('messenger.transport.failure_transport_3'), ]; $failureTransportsByTransportNameLocator = $container->getDefinition('messenger.failure_transports_by_transport_name.locator'); diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php index ee62c8b6eb3a1..50a18c9be6ac2 100644 --- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php @@ -99,9 +99,9 @@ private function getFailureSender(string $receiverName): ?SenderInterface return $this->globalFailureSender; } - + private function hasFailureTransports(WorkerMessageFailedEvent $event): bool { - return ($this->failureSenders instanceof ServiceLocator && $this->failureSenders->has($event->getReceiverName())) || $this->globalFailureSender !== null; + return ($this->failureSenders instanceof ServiceLocator && $this->failureSenders->has($event->getReceiverName())) || null !== $this->globalFailureSender; } } diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php index 2992b4529e355..c0db9fb2ce375 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php @@ -19,7 +19,6 @@ use Symfony\Component\Messenger\Stamp\RedeliveryStamp; use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp; use Symfony\Component\Messenger\Transport\Sender\SenderInterface; -use Symfony\Component\Messenger\Transport\Sender\SendersLocator; class SendFailedMessageToFailureTransportListenerTest extends TestCase { @@ -27,9 +26,9 @@ public function testDoNothingIfFailureTransportIsNotDefined() { $sender = $this->createMock(SenderInterface::class); $sender->expects($this->never())->method('send'); - + $listener = new SendFailedMessageToFailureTransportListener(null, null, null); - + $exception = new \Exception('no!'); $envelope = new Envelope(new \stdClass()); $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); From 0ec420bce3d0b5cdd2193dea0350afe2b10bef32 Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Sun, 20 Sep 2020 16:23:18 +0100 Subject: [PATCH 08/15] cs fix --- .../Component/Messenger/DependencyInjection/MessengerPass.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php index 8240ad1f1c385..d6d9546bad8f1 100644 --- a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php +++ b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php @@ -121,7 +121,7 @@ private function registerHandlers(ContainerBuilder $container, array $busIds) if (!\in_array($options['bus'], $busIds)) { $messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method)); - throw new RuntimeException(sprintf('Invalid configuration %s for message "%s": bus "%s" does not exist.', $messageLocation, $message, $options['bus'])); + throw new RuntimeException(sprintf('Invalid configuration "%s" for message "%s": bus "%s" does not exist.', $messageLocation, $message, $options['bus'])); } $buses = [$options['bus']]; @@ -130,7 +130,7 @@ private function registerHandlers(ContainerBuilder $container, array $busIds) if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) { $messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method)); - throw new RuntimeException(sprintf('Invalid handler service "%s": class or interface "%s" %s not found.', $serviceId, $message, $messageLocation)); + throw new RuntimeException(sprintf('Invalid handler service "%s": class or interface "%s" "%s" not found.', $serviceId, $message, $messageLocation)); } if (!$r->hasMethod($method)) { From 7b4481aef8fbfca6d7677ddd002efc6b09241f7f Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Sun, 20 Sep 2020 16:30:27 +0100 Subject: [PATCH 09/15] fix --- .../FrameworkBundle/DependencyInjection/Configuration.php | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php index ee5818e0bbdbe..536d80fb4ac26 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php @@ -1264,6 +1264,10 @@ function ($a) { ->prototype('variable') ->end() ->end() + ->scalarNode('failure_transport') + ->defaultNull() + ->info('Transport name to send failed messages to (after all retries have failed).') + ->end() ->arrayNode('retry_strategy') ->addDefaultsIfNotSet() ->beforeNormalization() From 5b662b5b4d9be90faa7f7ddf198e59247446686d Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Sun, 20 Sep 2020 16:46:48 +0100 Subject: [PATCH 10/15] fix retry command --- .../Messenger/Command/FailedMessagesRetryCommand.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php index 8ef0bdaa26924..075383f876b7f 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php @@ -129,7 +129,7 @@ protected function execute(InputInterface $input, OutputInterface $output) private function runInteractive(string $failureTransportName, ReceiverInterface $failedTransport, SymfonyStyle $io, bool $shouldForce) { - $receiver = $this->getReceiver($failedTransport); + $receiver = $this->getReceiver($failureTransportName); $count = 0; if ($receiver instanceof ListableReceiverInterface) { // for listable receivers, find the messages one-by-one @@ -154,7 +154,7 @@ private function runInteractive(string $failureTransportName, ReceiverInterface break; } - $this->retrySpecificIds($failedTransport, $ids, $io, $shouldForce); + $this->retrySpecificIds($failureTransportName, $ids, $io, $shouldForce); } } else { // get() and ask messages one-by-one From 3e0aeae8a9d11b5ab9aba8698677c9a87e5e0f31 Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Sun, 20 Sep 2020 16:52:07 +0100 Subject: [PATCH 11/15] fix --- .../FrameworkBundle/DependencyInjection/Configuration.php | 2 +- .../Tests/DependencyInjection/FrameworkExtensionTest.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php index 536d80fb4ac26..3d796975db77d 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php @@ -1267,7 +1267,7 @@ function ($a) { ->scalarNode('failure_transport') ->defaultNull() ->info('Transport name to send failed messages to (after all retries have failed).') - ->end() + ->end() ->arrayNode('retry_strategy') ->addDefaultsIfNotSet() ->beforeNormalization() diff --git a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php index 862eed9ed6c1e..764ba74ddae94 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php +++ b/src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php @@ -61,10 +61,10 @@ use Symfony\Component\Validator\DependencyInjection\AddConstraintValidatorsPass; use Symfony\Component\Validator\Mapping\Loader\PropertyInfoLoader; use Symfony\Component\Workflow; +use Symfony\Component\Workflow\Metadata\InMemoryMetadataStore; use Symfony\Component\Workflow\WorkflowEvents; use Symfony\Contracts\Cache\CacheInterface; use Symfony\Contracts\Cache\TagAwareCacheInterface; -use Symfony\Component\Workflow\Metadata\InMemoryMetadataStore; abstract class FrameworkExtensionTest extends TestCase { From 5b3eb0d4ff214161cc83365c8e61380eb605e078 Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Thu, 1 Oct 2020 17:32:48 +0100 Subject: [PATCH 12/15] fix SendFailedMessageToFailureTransportListenerTest --- ...ailedMessageToFailureTransportListener.php | 25 ++++++++------ ...dMessageToFailureTransportListenerTest.php | 34 ++++++++++++++++++- 2 files changed, 48 insertions(+), 11 deletions(-) diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php index 50a18c9be6ac2..01d630fd277cd 100644 --- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php @@ -28,13 +28,18 @@ */ class SendFailedMessageToFailureTransportListener implements EventSubscriberInterface { + /** + * @var ServiceLocator|SenderInterface + */ private $failureSenders; - private $globalFailureSender; private $logger; - - public function __construct(?SenderInterface $globalFailureSender, LoggerInterface $logger = null, ?ServiceLocator $failureSenders = null) + + public function __construct($failureSenders, LoggerInterface $logger = null) { - $this->globalFailureSender = $globalFailureSender; + if (!$failureSenders instanceof ServiceLocator) { + trigger_deprecation('symfony/messenger', '5.2', 'Passing failureSenders should now pass a ServiceLocator with all the failure transports', __METHOD__); + } + $this->failureSenders = $failureSenders; $this->logger = $logger; } @@ -91,17 +96,17 @@ public static function getSubscribedEvents() ]; } - private function getFailureSender(string $receiverName): ?SenderInterface + private function getFailureSender(string $receiverName): SenderInterface { - if ($this->failureSenders instanceof ServiceLocator && $this->failureSenders->has($receiverName)) { - return $this->failureSenders->get($receiverName); + if ($this->failureSenders instanceof SenderInterface) { + return $this->failureSenders; } - - return $this->globalFailureSender; + + return $this->failureSenders->get($receiverName); } private function hasFailureTransports(WorkerMessageFailedEvent $event): bool { - return ($this->failureSenders instanceof ServiceLocator && $this->failureSenders->has($event->getReceiverName())) || null !== $this->globalFailureSender; + return $this->failureSenders instanceof ServiceLocator || $this->failureSenders instanceof SenderInterface; } } diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php index c0db9fb2ce375..32a70cda7f9ad 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php @@ -12,6 +12,7 @@ namespace Symfony\Component\Messenger\Tests\EventListener; use PHPUnit\Framework\TestCase; +use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\Messenger\Envelope; use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent; use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener; @@ -27,7 +28,7 @@ public function testDoNothingIfFailureTransportIsNotDefined() $sender = $this->createMock(SenderInterface::class); $sender->expects($this->never())->method('send'); - $listener = new SendFailedMessageToFailureTransportListener(null, null, null); + $listener = new SendFailedMessageToFailureTransportListener(null, null); $exception = new \Exception('no!'); $envelope = new Envelope(new \stdClass()); @@ -63,6 +64,37 @@ public function testItSendsToTheFailureTransport() $listener->onMessageFailed($event); } + + public function testItSendsToTheFailureTransportWithMultipleFailedTransports() { + $sender = $this->createMock(SenderInterface::class); + $sender->expects($this->once())->method('send')->with($this->callback(function ($envelope) { + /* @var Envelope $envelope */ + $this->assertInstanceOf(Envelope::class, $envelope); + + /** @var SentToFailureTransportStamp $sentToFailureTransportStamp */ + $sentToFailureTransportStamp = $envelope->last(SentToFailureTransportStamp::class); + $this->assertNotNull($sentToFailureTransportStamp); + $this->assertSame('my_receiver', $sentToFailureTransportStamp->getOriginalReceiverName()); + + /** @var RedeliveryStamp $redeliveryStamp */ + $redeliveryStamp = $envelope->last(RedeliveryStamp::class); + $this->assertSame('no!', $redeliveryStamp->getExceptionMessage()); + $this->assertSame('no!', $redeliveryStamp->getFlattenException()->getMessage()); + + return true; + }))->willReturnArgument(0); + + $serviceLocator = $this->createMock(ServiceLocator::class); + $serviceLocator->method('get')->with('my_receiver')->willReturn($sender); + + $listener = new SendFailedMessageToFailureTransportListener($serviceLocator); + + $exception = new \Exception('no!'); + $envelope = new Envelope(new \stdClass()); + $event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception); + + $listener->onMessageFailed($event); + } public function testItGetsNestedHandlerFailedException() { From 27eb47036fe5db0406c7c0ec59674dda91050a5d Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Thu, 1 Oct 2020 17:38:11 +0100 Subject: [PATCH 13/15] remove failureTransports last argument of FailedMessagesRetryCommand --- .../Messenger/Command/FailedMessagesRetryCommand.php | 4 ++-- .../Tests/Command/FailedMessagesRetryCommandTest.php | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php index 075383f876b7f..254864f85f845 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php @@ -42,14 +42,14 @@ class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand private $logger; private $globalReceiverName; - public function __construct(?string $globalReceiverName, ?ReceiverInterface $globalReceiver, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null, ?ServiceLocator $failureTransports = null) + public function __construct(?string $globalReceiverName, $failureTransports, MessageBusInterface $messageBus, EventDispatcherInterface $eventDispatcher, LoggerInterface $logger = null) { $this->eventDispatcher = $eventDispatcher; $this->messageBus = $messageBus; $this->logger = $logger; $this->globalReceiverName = $globalReceiverName; - parent::__construct($globalReceiverName, null === $failureTransports ? $globalReceiver : $failureTransports); + parent::__construct($globalReceiverName, $failureTransports); } /** diff --git a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php index 7ecc74fdb7d1e..06ebc0f79389d 100644 --- a/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php +++ b/src/Symfony/Component/Messenger/Tests/Command/FailedMessagesRetryCommandTest.php @@ -40,8 +40,7 @@ public function testBasicRun() $receiver, $bus, $dispatcher, - new NullLogger(), - null + new NullLogger() ); $tester = new CommandTester($command); @@ -67,11 +66,10 @@ public function testBasicRunWithSpecificFailedTransport() $command = new FailedMessagesRetryCommand( null, - null, + $serviceLocator, $bus, $dispatcher, - new NullLogger(), - $serviceLocator + new NullLogger() ); $tester = new CommandTester($command); From 077060a0136e5e91be628f8ccbca8e7cf81de41f Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Thu, 1 Oct 2020 17:39:10 +0100 Subject: [PATCH 14/15] php cs fix --- .../Bundle/FrameworkBundle/Resources/config/messenger.php | 3 +-- .../Messenger/Command/FailedMessagesRetryCommand.php | 1 - .../SendFailedMessageToFailureTransportListener.php | 6 +++--- .../SendFailedMessageToFailureTransportListenerTest.php | 7 ++++--- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php index 4506899c1be8b..68a1486514b6b 100644 --- a/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php +++ b/src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.php @@ -171,9 +171,8 @@ ->set('messenger.failure.send_failed_message_to_failure_transport_listener', SendFailedMessageToFailureTransportListener::class) ->args([ - abstract_arg('global failure transport'), + abstract_arg('failure transports'), service('logger')->ignoreOnInvalid(), - abstract_arg('failure transports by transport name locator'), ]) ->tag('kernel.event_subscriber') ->tag('monolog.logger', ['channel' => 'messenger']) diff --git a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php index 254864f85f845..750d59680de24 100644 --- a/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php +++ b/src/Symfony/Component/Messenger/Command/FailedMessagesRetryCommand.php @@ -19,7 +19,6 @@ use Symfony\Component\Console\Output\ConsoleOutputInterface; use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Style\SymfonyStyle; -use Symfony\Component\DependencyInjection\ServiceLocator; use Symfony\Component\EventDispatcher\EventDispatcherInterface; use Symfony\Component\Messenger\Event\WorkerMessageReceivedEvent; use Symfony\Component\Messenger\EventListener\StopWorkerOnMessageLimitListener; diff --git a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php index 01d630fd277cd..a628a43ce0d96 100644 --- a/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php +++ b/src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php @@ -33,13 +33,13 @@ class SendFailedMessageToFailureTransportListener implements EventSubscriberInte */ private $failureSenders; private $logger; - + public function __construct($failureSenders, LoggerInterface $logger = null) { if (!$failureSenders instanceof ServiceLocator) { trigger_deprecation('symfony/messenger', '5.2', 'Passing failureSenders should now pass a ServiceLocator with all the failure transports', __METHOD__); } - + $this->failureSenders = $failureSenders; $this->logger = $logger; } @@ -101,7 +101,7 @@ private function getFailureSender(string $receiverName): SenderInterface if ($this->failureSenders instanceof SenderInterface) { return $this->failureSenders; } - + return $this->failureSenders->get($receiverName); } diff --git a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php index 32a70cda7f9ad..e038ce0357b6b 100644 --- a/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php +++ b/src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php @@ -64,8 +64,9 @@ public function testItSendsToTheFailureTransport() $listener->onMessageFailed($event); } - - public function testItSendsToTheFailureTransportWithMultipleFailedTransports() { + + public function testItSendsToTheFailureTransportWithMultipleFailedTransports() + { $sender = $this->createMock(SenderInterface::class); $sender->expects($this->once())->method('send')->with($this->callback(function ($envelope) { /* @var Envelope $envelope */ @@ -86,7 +87,7 @@ public function testItSendsToTheFailureTransportWithMultipleFailedTransports() { $serviceLocator = $this->createMock(ServiceLocator::class); $serviceLocator->method('get')->with('my_receiver')->willReturn($sender); - + $listener = new SendFailedMessageToFailureTransportListener($serviceLocator); $exception = new \Exception('no!'); From 353fc62807b1c3ab3c0e7423a3708a88f252873e Mon Sep 17 00:00:00 2001 From: Hugo Monteiro Date: Sat, 3 Oct 2020 17:23:28 +0100 Subject: [PATCH 15/15] wip --- .../DependencyInjection/FrameworkExtension.php | 14 +++++++++----- .../DependencyInjection/MessengerPass.php | 14 +++++++++++++- 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php index 6a0b0028b572b..91f5259f9b669 100644 --- a/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php +++ b/src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php @@ -1793,7 +1793,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder ->replaceArgument(2, $config['serializer']['symfony_serializer']['context']); $container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']); } - + $senderAliases = []; $transportRetryReferences = []; foreach ($config['transports'] as $name => $transport) { @@ -1802,7 +1802,11 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $transportDefinition = (new Definition(TransportInterface::class)) ->setFactory([new Reference('messenger.transport_factory'), 'createTransport']) ->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)]) - ->addTag('messenger.receiver', ['alias' => $name]) + ->addTag('messenger.receiver', [ + 'alias' => $name, + 'failure_transport' => $transport['failure_transport'] ?? null + ] + ) ; $container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition); $senderAliases[$name] = $transportId; @@ -1873,8 +1877,9 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder if (!isset($senderReferences[$config['failure_transport']])) { throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport'])); } - + $failureTransports[$config['failure_transport']] = $senderReferences[$config['failure_transport']]; + $container->setAlias('messenger.failure_transports.default_transport', $config['failure_transport']); } foreach ($config['transports'] as $name => $transport) { @@ -1909,8 +1914,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder $container->getDefinition($failureTransportsByTransportNameServiceLocatorId) ->replaceArgument(0, $failureTransportsByTransportName); $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') - ->replaceArgument(0, $senderReferences[$config['failure_transport']] ?? null) - ->replaceArgument(2, $failureTransportsByTransportNameServiceLocator); + ->replaceArgument(0, $failureTransportsByTransportNameServiceLocator); } else { $container->removeDefinition('console.command.messenger_failed_messages_retry'); $container->removeDefinition('console.command.messenger_failed_messages_show'); diff --git a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php index d6d9546bad8f1..edc3398d10b97 100644 --- a/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php +++ b/src/Symfony/Component/Messenger/DependencyInjection/MessengerPass.php @@ -130,7 +130,7 @@ private function registerHandlers(ContainerBuilder $container, array $busIds) if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) { $messageLocation = isset($tag['handles']) ? 'declared in your tag attribute "handles"' : ($r->implementsInterface(MessageSubscriberInterface::class) ? sprintf('returned by method "%s::getHandledMessages()"', $r->getName()) : sprintf('used as argument type in method "%s::%s()"', $r->getName(), $method)); - throw new RuntimeException(sprintf('Invalid handler service "%s": class or interface "%s" "%s" not found.', $serviceId, $message, $messageLocation)); + throw new RuntimeException(sprintf('Invalid handler service "%s": class or interface "%s" %s not found.', $serviceId, $message, $messageLocation)); } if (!$r->hasMethod($method)) { @@ -298,6 +298,18 @@ private function registerReceivers(ContainerBuilder $container, array $busIds) } $container->getDefinition('messenger.receiver_locator')->replaceArgument(0, $receiverMapping); + + $failedCommandIds = [ + 'console.command.messenger_failed_messages_retry', + 'console.command.messenger_failed_messages_show', + 'console.command.messenger_failed_messages_remove', + ]; + foreach ($failedCommandIds as $failedCommandId) { + if ($container->hasDefinition($failedCommandId)) { + $definition = $container->getDefinition($failedCommandId); + $definition->replaceArgument(1, $receiverMapping[$definition->getArgument(0)]); + } + } } private function registerBusToCollector(ContainerBuilder $container, string $busId)