-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] multiple failure transports support #34979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9bf20ba
b859172
3650bb1
87b31d3
0cd81d0
129480b
1298546
0ec420b
7b4481a
5b662b5
3e0aeae
5b3eb0d
27eb470
077060a
353fc62
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1793,7 +1793,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder | |
->replaceArgument(2, $config['serializer']['symfony_serializer']['context']); | ||
$container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']); | ||
} | ||
|
||
$senderAliases = []; | ||
$transportRetryReferences = []; | ||
foreach ($config['transports'] as $name => $transport) { | ||
|
@@ -1802,7 +1802,11 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder | |
$transportDefinition = (new Definition(TransportInterface::class)) | ||
->setFactory([new Reference('messenger.transport_factory'), 'createTransport']) | ||
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)]) | ||
->addTag('messenger.receiver', ['alias' => $name]) | ||
->addTag('messenger.receiver', [ | ||
'alias' => $name, | ||
'failure_transport' => $transport['failure_transport'] ?? null | ||
] | ||
) | ||
; | ||
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition); | ||
$senderAliases[$name] = $transportId; | ||
|
@@ -1863,24 +1867,59 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder | |
$container->getDefinition('messenger.retry_strategy_locator') | ||
->replaceArgument(0, $transportRetryReferences); | ||
|
||
$failureTransports = []; | ||
$failureTransportsByTransportName = []; | ||
|
||
$failureTransportsServiceLocatorId = 'messenger.failure_transports.locator'; | ||
$failureTransportsByTransportNameServiceLocatorId = 'messenger.failure_transports_by_transport_name.locator'; | ||
|
||
if ($config['failure_transport']) { | ||
if (!isset($senderReferences[$config['failure_transport']])) { | ||
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport'])); | ||
} | ||
|
||
$failureTransports[$config['failure_transport']] = $senderReferences[$config['failure_transport']]; | ||
$container->setAlias('messenger.failure_transports.default_transport', $config['failure_transport']); | ||
} | ||
|
||
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') | ||
->replaceArgument(0, $senderReferences[$config['failure_transport']]); | ||
foreach ($config['transports'] as $name => $transport) { | ||
if ($transport['failure_transport']) { | ||
if (!isset($config['transports'][$transport['failure_transport']])) { | ||
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $transport['failure_transport'])); | ||
} | ||
|
||
$failureTransportsByTransportName[$name] = $senderReferences[$transport['failure_transport']]; | ||
$failureTransports[$transport['failure_transport']] = $senderReferences[$transport['failure_transport']]; | ||
} | ||
} | ||
|
||
if (\count($failureTransports) > 0) { | ||
$failureTransportsServiceLocator = ServiceLocatorTagPass::register($container, $failureTransports, $failureTransportsServiceLocatorId); | ||
$container->getDefinition($failureTransportsServiceLocatorId) | ||
->replaceArgument(0, $failureTransports); | ||
|
||
$globalFailureReceiver = $config['failure_transport'] ?? null; | ||
$container->getDefinition('console.command.messenger_failed_messages_retry') | ||
->replaceArgument(0, $config['failure_transport']); | ||
->replaceArgument(0, $globalFailureReceiver) | ||
->replaceArgument(1, $senderReferences[$config['failure_transport']] ?? null) | ||
->replaceArgument(5, $container->getDefinition($failureTransportsServiceLocator)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same problem here (ish) as below - we should pass a Reference (which |
||
$container->getDefinition('console.command.messenger_failed_messages_show') | ||
->replaceArgument(0, $config['failure_transport']); | ||
->replaceArgument(0, $globalFailureReceiver) | ||
->replaceArgument(1, $container->getDefinition($failureTransportsServiceLocatorId)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these should be references, right? We don't normally set a Definition directly on an argument. And should it use |
||
$container->getDefinition('console.command.messenger_failed_messages_remove') | ||
->replaceArgument(0, $config['failure_transport']); | ||
->replaceArgument(0, $globalFailureReceiver) | ||
->replaceArgument(1, $container->getDefinition($failureTransportsServiceLocatorId)); | ||
|
||
$failureTransportsByTransportNameServiceLocator = ServiceLocatorTagPass::register($container, $failureTransportsByTransportName, $failureTransportsByTransportNameServiceLocatorId); | ||
$container->getDefinition($failureTransportsByTransportNameServiceLocatorId) | ||
->replaceArgument(0, $failureTransportsByTransportName); | ||
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener') | ||
->replaceArgument(0, $failureTransportsByTransportNameServiceLocator); | ||
} else { | ||
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener'); | ||
$container->removeDefinition('console.command.messenger_failed_messages_retry'); | ||
$container->removeDefinition('console.command.messenger_failed_messages_show'); | ||
$container->removeDefinition('console.command.messenger_failed_messages_remove'); | ||
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we could revert this change to lessen the diff |
||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,7 @@ | |
|
||
namespace Symfony\Component\DependencyInjection\Loader\Configurator; | ||
|
||
use Symfony\Component\DependencyInjection\ServiceLocator; | ||
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransportFactory; | ||
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory; | ||
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransportFactory; | ||
|
@@ -130,6 +131,18 @@ | |
|
||
->set('messenger.transport.beanstalkd.factory', BeanstalkdTransportFactory::class) | ||
|
||
// failed transports | ||
->set('messenger.failure_transports.locator', ServiceLocator::class) | ||
->args([ | ||
abstract_arg('failed transports map by name'), | ||
]) | ||
->tag('container.service_locator') | ||
->set('messenger.failure_transports_by_transport_name.locator', ServiceLocator::class) | ||
->args([ | ||
abstract_arg('failed transports map by transport name'), | ||
]) | ||
->tag('container.service_locator') | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I'm correct in |
||
// retry | ||
->set('messenger.retry_strategy_locator') | ||
->args([ | ||
|
@@ -158,7 +171,7 @@ | |
|
||
->set('messenger.failure.send_failed_message_to_failure_transport_listener', SendFailedMessageToFailureTransportListener::class) | ||
->args([ | ||
abstract_arg('failure transport'), | ||
abstract_arg('failure transports'), | ||
service('logger')->ignoreOnInvalid(), | ||
]) | ||
->tag('kernel.event_subscriber') | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
<?php | ||
|
||
$container->loadFromExtension('framework', [ | ||
'messenger' => [ | ||
'transports' => [ | ||
'transport_1' => [ | ||
'dsn' => 'null://', | ||
'failure_transport' => 'failure_transport_1' | ||
], | ||
'transport_2' => 'null://', | ||
'transport_3' => [ | ||
'dsn' => 'null://', | ||
'failure_transport' => 'failure_transport_3' | ||
], | ||
'failure_transport_1' => 'null://', | ||
'failure_transport_3' => 'null://' | ||
], | ||
], | ||
]); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
<?php | ||
|
||
$container->loadFromExtension('framework', [ | ||
'messenger' => [ | ||
'failure_transport' => 'failure_transport_global', | ||
'transports' => [ | ||
'transport_1' => [ | ||
'dsn' => 'null://', | ||
'failure_transport' => 'failure_transport_1' | ||
], | ||
'transport_2' => 'null://', | ||
'transport_3' => [ | ||
'dsn' => 'null://', | ||
'failure_transport' => 'failure_transport_3' | ||
], | ||
'failure_transport_global' => 'null://', | ||
'failure_transport_1' => 'null://', | ||
'failure_transport_3' => 'null://', | ||
], | ||
], | ||
]); |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
<?xml version="1.0" encoding="utf-8" ?> | ||
<container xmlns="http://symfony.com/schema/dic/services" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xmlns:framework="http://symfony.com/schema/dic/symfony" | ||
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd | ||
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> | ||
|
||
<framework:config> | ||
<framework:messenger> | ||
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" /> | ||
<framework:transport name="transport_2" dsn="null://" /> | ||
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" /> | ||
<framework:transport name="failure_transport_1" dsn="null://" /> | ||
<framework:transport name="failure_transport_3" dsn="null://" /> | ||
</framework:messenger> | ||
</framework:config> | ||
</container> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
<?xml version="1.0" encoding="utf-8" ?> | ||
<container xmlns="http://symfony.com/schema/dic/services" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xmlns:framework="http://symfony.com/schema/dic/symfony" | ||
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd | ||
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd"> | ||
|
||
<framework:config> | ||
<framework:messenger failure-transport="failure_transport_global"> | ||
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" /> | ||
<framework:transport name="transport_2" dsn="null://" /> | ||
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" /> | ||
<framework:transport name="failure_transport_global" dsn="null://" /> | ||
<framework:transport name="failure_transport_1" dsn="null://" /> | ||
<framework:transport name="failure_transport_3" dsn="null://" /> | ||
</framework:messenger> | ||
</framework:config> | ||
</container> |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
framework: | ||
messenger: | ||
transports: | ||
transport_1: | ||
dsn: 'null://' | ||
failure_transport: failure_transport_1 | ||
transport_2: 'null://' | ||
transport_3: | ||
dsn: 'null://' | ||
failure_transport: failure_transport_3 | ||
failure_transport_1: 'null://' | ||
failure_transport_3: 'null://' |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
framework: | ||
messenger: | ||
failure_transport: failure_transport_global | ||
transports: | ||
transport_1: | ||
dsn: 'null://' | ||
failure_transport: failure_transport_1 | ||
transport_2: 'null://' | ||
transport_3: | ||
dsn: 'null://' | ||
failure_transport: failure_transport_3 | ||
failure_transport_global: 'null://' | ||
failure_transport_1: 'null://' | ||
failure_transport_3: 'null://' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be checking for
!isset($senderReferences[$transport['failure_transport']])
here instead? That would be consistent with the above code. It's more confusing, but there is the edge case that the failure_transport is not a registered transport, but just a "sender service id"