Skip to content

[Messenger] multiple failure transports support #34979

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Prev Previous commit
Next Next commit
cs fix
  • Loading branch information
monteiro committed Sep 20, 2020
commit 0cd81d0b6d86f18af058a6b9f4279aeecffe40fc
Original file line number Diff line number Diff line change
Expand Up @@ -1862,14 +1862,14 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder

$container->getDefinition('messenger.retry_strategy_locator')
->replaceArgument(0, $transportRetryReferences);

$failureTransports = [];

if ($config['failure_transport']) {
if (!isset($senderReferences[$config['failure_transport']])) {
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
}

$failureTransports[$config['failure_transport']] = $senderReferences[$config['failure_transport']];
}

Expand All @@ -1886,13 +1886,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$failureTransportsServiceLocatorId = 'messenger.failure_transports.locator';
if (\count($failureTransports) > 0) {
$failureTransportsServiceLocator = ServiceLocatorTagPass::register($container, $failureTransports, $failureTransportsServiceLocatorId);

$container->getDefinition($failureTransportsServiceLocatorId)
->replaceArgument(0, $failureTransports);

$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
->replaceArgument(0, $failureTransportsServiceLocator);

$globalFailureReceiver = $config['failure_transport'] ?? null;
$container->getDefinition('console.command.messenger_failed_messages_retry')
->replaceArgument(0, $globalFailureReceiver)
Expand All @@ -1904,7 +1904,6 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$container->getDefinition('console.command.messenger_failed_messages_remove')
->replaceArgument(0, $globalFailureReceiver)
->replaceArgument(1, $container->getDefinition($failureTransportsServiceLocatorId));

} else {
$container->removeDefinition('console.command.messenger_failed_messages_retry');
$container->removeDefinition('console.command.messenger_failed_messages_show');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@
service('messenger.routable_message_bus'),
service('event_dispatcher'),
service('logger'),
abstract_arg('Receivers')
abstract_arg('Receivers'),
])
->tag('console.command', ['command' => 'messenger:failed:retry'])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
abstract_arg('failed transports map by name'),
])
->tag('container.service_locator')

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm correct in FrameworkExtension, these won't be needed.

// retry
->set('messenger.retry_strategy_locator')
->args([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
use Symfony\Component\DependencyInjection\Loader\ClosureLoader;
use Symfony\Component\DependencyInjection\ParameterBag\EnvPlaceholderParameterBag;
use Symfony\Component\DependencyInjection\Reference;
use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
use Symfony\Component\HttpClient\MockHttpClient;
use Symfony\Component\HttpClient\RetryableHttpClient;
Expand Down Expand Up @@ -672,19 +671,18 @@ public function testMessengerMultipleFailureTransports()
if (\in_array($transportName, $failedTransports)) {
continue;
}

$this->assertSame('messenger.transport.'.$expectedFailureTransportsMapping[$transportName], (string) $ref, sprintf('The transport "%s" does not have the expected failed transport reference', $transportName));

}

$this->assertTrue(true);
}

public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport()
{
$container = $this->createContainerFromFile('messenger_multiple_failure_transports_global');
$failureTransportsLocatorDefinition = $container->getDefinition('messenger.failure_transports.locator');

/** @var Reference $failureTransportsMapping */
$failureTransportsMapping = $failureTransportsLocatorDefinition->getArgument(0);

Expand All @@ -707,7 +705,7 @@ public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport

$this->assertSame('messenger.transport.'.$expectedFailureTransportsMapping[$transportName], (string) $ref, sprintf('The transport "%s" does not have the expected failed transport reference', $transportName));
}

$this->assertTrue(true);
}

Expand Down Expand Up @@ -756,10 +754,10 @@ public function testMessengerTransports()
$this->assertSame(7, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(1));
$this->assertSame(3, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(2));
$this->assertSame(100, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(3));

$failureTransportsLocator = $container->getDefinition('messenger.failure_transports.locator');
$expectedFailureTransports = [
'failed' => new Reference('messenger.transport.failed')
'failed' => new Reference('messenger.transport.failed'),
];
$this->assertEquals($expectedFailureTransports, $failureTransportsLocator->getArgument(0));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public function __construct(?string $globalFailureReceiverName, $failureTranspor
if (!$failureTransports instanceof ServiceLocator) {
trigger_deprecation('symfony/messenger', '5.2', 'Passing failureTransports should now pass a ServiceLocator', __METHOD__);

$this->failureTransports = new ServiceLocator([$this->globalFailureReceiverName => function() use ($failureTransports) { return $failureTransports; }]);
}
$this->failureTransports = new ServiceLocator([$this->globalFailureReceiverName => function () use ($failureTransports) { return $failureTransports; }]);
}

parent::__construct();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ protected function execute(InputInterface $input, OutputInterface $output)
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);

$failureTransportName = $input->getOption('failure-transport');
if ($failureTransportName === null) {
if (null === $failureTransportName) {
$failureTransportName = $this->getGlobalFailureReceiverName();
}

$receiver = $this->getReceiver($failureTransportName);

$shouldForce = $input->getOption('force');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
class FailedMessagesRetryCommand extends AbstractFailedMessagesCommand
{
protected static $defaultName = 'messenger:failed:retry';

private $eventDispatcher;
private $messageBus;
private $logger;
Expand All @@ -48,8 +48,8 @@ public function __construct(?string $globalReceiverName, ?ReceiverInterface $glo
$this->messageBus = $messageBus;
$this->logger = $logger;
$this->globalReceiverName = $globalReceiverName;
parent::__construct($globalReceiverName, $failureTransports === null ? $globalReceiver : $failureTransports);

parent::__construct($globalReceiverName, null === $failureTransports ? $globalReceiver : $failureTransports);
}

/**
Expand Down Expand Up @@ -100,10 +100,10 @@ protected function execute(InputInterface $input, OutputInterface $output)
}

$failureTransportName = $input->getOption('failure-transport');
if ($failureTransportName === null) {
if (null === $failureTransportName) {
$failureTransportName = $this->getGlobalFailureReceiverName();
}

$receiver = $this->getReceiver($failureTransportName);
$this->printPendingMessagesMessage($receiver, $io);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
$io = new SymfonyStyle($input, $output instanceof ConsoleOutputInterface ? $output->getErrorOutput() : $output);

$failureTransportName = $input->getOption('failure-transport');
if ($failureTransportName === null) {
if (null === $failureTransportName) {
$failureTransportName = $this->getGlobalFailureReceiverName();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ public function __construct($failureSender, LoggerInterface $logger = null)
{
if (!$failureSender instanceof ServiceLocator) {
trigger_deprecation('symfony/messenger', '5.2', 'Passing failureTransports should now pass a ServiceLocator', __METHOD__);
}
}

$this->failureSender = $failureSender;
$this->logger = $logger;
}

public function onMessageFailed(WorkerMessageFailedEvent $event)
{
if ($event->willRetry()) {
Expand Down Expand Up @@ -75,10 +75,10 @@ public function onMessageFailed(WorkerMessageFailedEvent $event)
);

$failureSender = $this->getFailureSender($event->getReceiverName());
if ($failureSender === null) {
if (null === $failureSender) {
return;
}

if (null !== $this->logger) {
$this->logger->info('Rejected message {class} will be sent to the failure transport {transport}.', [
'class' => \get_class($envelope->getMessage()),
Expand All @@ -101,7 +101,7 @@ private function getFailureSender(string $receiverName): ?SenderInterface
if ($this->failureSender instanceof ServiceLocator && $this->failureSender->has($receiverName)) {
return $this->failureSender->get($receiverName);
}

return $this->failureSender;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public function testRemoveMultipleMessages()
);
$serviceLocator = $this->createMock(ServiceLocator::class);
$serviceLocator->expects($this->any())->method('get')->with($globalFailureReceiverName)->willReturn($receiver);

$command = new FailedMessagesRemoveCommand(
$globalFailureReceiverName,
$receiver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public function testBasicRun()
$bus = $this->createMock(MessageBusInterface::class);
// the bus should be called in the worker
$bus->expects($this->exactly(2))->method('dispatch')->willReturn(new Envelope(new \stdClass()));

$command = new FailedMessagesRetryCommand(
'failure_receiver',
$receiver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public function testMultipleRedeliveryFails()
$redeliveryStamp1,
$redeliveryStamp2,
]);

$globalFailureReceiverName = 'failure_receiver';
$receiver = $this->createMock(ListableReceiverInterface::class);
$receiver->expects($this->once())->method('find')->with(15)->willReturn($envelope);
Expand Down