Skip to content

[Messenger] multiple failure transports support #34979

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,10 @@ function ($a) {
->prototype('variable')
->end()
->end()
->scalarNode('failure_transport')
->defaultNull()
->info('Transport name to send failed messages to (after all retries have failed).')
->end()
->arrayNode('retry_strategy')
->addDefaultsIfNotSet()
->beforeNormalization()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1793,7 +1793,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
->replaceArgument(2, $config['serializer']['symfony_serializer']['context']);
$container->setAlias('messenger.default_serializer', $config['serializer']['default_serializer']);
}

$senderAliases = [];
$transportRetryReferences = [];
foreach ($config['transports'] as $name => $transport) {
Expand All @@ -1802,7 +1802,11 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$transportDefinition = (new Definition(TransportInterface::class))
->setFactory([new Reference('messenger.transport_factory'), 'createTransport'])
->setArguments([$transport['dsn'], $transport['options'] + ['transport_name' => $name], new Reference($serializerId)])
->addTag('messenger.receiver', ['alias' => $name])
->addTag('messenger.receiver', [
'alias' => $name,
'failure_transport' => $transport['failure_transport'] ?? null
]
)
;
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
$senderAliases[$name] = $transportId;
Expand Down Expand Up @@ -1863,24 +1867,59 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
$container->getDefinition('messenger.retry_strategy_locator')
->replaceArgument(0, $transportRetryReferences);

$failureTransports = [];
$failureTransportsByTransportName = [];

$failureTransportsServiceLocatorId = 'messenger.failure_transports.locator';
$failureTransportsByTransportNameServiceLocatorId = 'messenger.failure_transports_by_transport_name.locator';

if ($config['failure_transport']) {
if (!isset($senderReferences[$config['failure_transport']])) {
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $config['failure_transport']));
}

$failureTransports[$config['failure_transport']] = $senderReferences[$config['failure_transport']];
$container->setAlias('messenger.failure_transports.default_transport', $config['failure_transport']);
}

$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
->replaceArgument(0, $senderReferences[$config['failure_transport']]);
foreach ($config['transports'] as $name => $transport) {
if ($transport['failure_transport']) {
if (!isset($config['transports'][$transport['failure_transport']])) {
throw new LogicException(sprintf('Invalid Messenger configuration: the failure transport "%s" is not a valid transport or service id.', $transport['failure_transport']));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be checking for !isset($senderReferences[$transport['failure_transport']]) here instead? That would be consistent with the above code. It's more confusing, but there is the edge case that the failure_transport is not a registered transport, but just a "sender service id"


$failureTransportsByTransportName[$name] = $senderReferences[$transport['failure_transport']];
$failureTransports[$transport['failure_transport']] = $senderReferences[$transport['failure_transport']];
}
}

if (\count($failureTransports) > 0) {
$failureTransportsServiceLocator = ServiceLocatorTagPass::register($container, $failureTransports, $failureTransportsServiceLocatorId);
$container->getDefinition($failureTransportsServiceLocatorId)
->replaceArgument(0, $failureTransports);

$globalFailureReceiver = $config['failure_transport'] ?? null;
$container->getDefinition('console.command.messenger_failed_messages_retry')
->replaceArgument(0, $config['failure_transport']);
->replaceArgument(0, $globalFailureReceiver)
->replaceArgument(1, $senderReferences[$config['failure_transport']] ?? null)
->replaceArgument(5, $container->getDefinition($failureTransportsServiceLocator));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem here (ish) as below - we should pass a Reference (which $failureTransportsServiceLocator is), not a Definition.

$container->getDefinition('console.command.messenger_failed_messages_show')
->replaceArgument(0, $config['failure_transport']);
->replaceArgument(0, $globalFailureReceiver)
->replaceArgument(1, $container->getDefinition($failureTransportsServiceLocatorId));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should be references, right? We don't normally set a Definition directly on an argument. And should it use $failureTransportsServiceLocatorId or $failureTransportsServiceLocator - I'm getting lost (this stuff confuses me) in which is which and what should be used.

$container->getDefinition('console.command.messenger_failed_messages_remove')
->replaceArgument(0, $config['failure_transport']);
->replaceArgument(0, $globalFailureReceiver)
->replaceArgument(1, $container->getDefinition($failureTransportsServiceLocatorId));

$failureTransportsByTransportNameServiceLocator = ServiceLocatorTagPass::register($container, $failureTransportsByTransportName, $failureTransportsByTransportNameServiceLocatorId);
$container->getDefinition($failureTransportsByTransportNameServiceLocatorId)
->replaceArgument(0, $failureTransportsByTransportName);
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')
->replaceArgument(0, $failureTransportsByTransportNameServiceLocator);
} else {
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
$container->removeDefinition('console.command.messenger_failed_messages_retry');
$container->removeDefinition('console.command.messenger_failed_messages_show');
$container->removeDefinition('console.command.messenger_failed_messages_remove');
$container->removeDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could revert this change to lessen the diff

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,20 +170,21 @@
service('messenger.routable_message_bus'),
service('event_dispatcher'),
service('logger'),
abstract_arg('Receivers'),
])
->tag('console.command', ['command' => 'messenger:failed:retry'])

->set('console.command.messenger_failed_messages_show', FailedMessagesShowCommand::class)
->args([
abstract_arg('Receiver name'),
abstract_arg('Receiver'),
abstract_arg('Receivers'),
])
->tag('console.command', ['command' => 'messenger:failed:show'])

->set('console.command.messenger_failed_messages_remove', FailedMessagesRemoveCommand::class)
->args([
abstract_arg('Receiver name'),
abstract_arg('Receiver'),
abstract_arg('Receivers'),
])
->tag('console.command', ['command' => 'messenger:failed:remove'])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

namespace Symfony\Component\DependencyInjection\Loader\Configurator;

use Symfony\Component\DependencyInjection\ServiceLocator;
use Symfony\Component\Messenger\Bridge\AmazonSqs\Transport\AmazonSqsTransportFactory;
use Symfony\Component\Messenger\Bridge\Amqp\Transport\AmqpTransportFactory;
use Symfony\Component\Messenger\Bridge\Beanstalkd\Transport\BeanstalkdTransportFactory;
Expand Down Expand Up @@ -130,6 +131,18 @@

->set('messenger.transport.beanstalkd.factory', BeanstalkdTransportFactory::class)

// failed transports
->set('messenger.failure_transports.locator', ServiceLocator::class)
->args([
abstract_arg('failed transports map by name'),
])
->tag('container.service_locator')
->set('messenger.failure_transports_by_transport_name.locator', ServiceLocator::class)
->args([
abstract_arg('failed transports map by transport name'),
])
->tag('container.service_locator')

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm correct in FrameworkExtension, these won't be needed.

// retry
->set('messenger.retry_strategy_locator')
->args([
Expand Down Expand Up @@ -158,7 +171,7 @@

->set('messenger.failure.send_failed_message_to_failure_transport_listener', SendFailedMessageToFailureTransportListener::class)
->args([
abstract_arg('failure transport'),
abstract_arg('failure transports'),
service('logger')->ignoreOnInvalid(),
])
->tag('kernel.event_subscriber')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@
<xsd:attribute name="name" type="xsd:string" />
<xsd:attribute name="serializer" type="xsd:string" />
<xsd:attribute name="dsn" type="xsd:string" />
<xsd:attribute name="failure-transport" type="xsd:string" />
</xsd:complexType>

<xsd:complexType name="messenger_retry_strategy">
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<?php

$container->loadFromExtension('framework', [
'messenger' => [
'transports' => [
'transport_1' => [
'dsn' => 'null://',
'failure_transport' => 'failure_transport_1'
],
'transport_2' => 'null://',
'transport_3' => [
'dsn' => 'null://',
'failure_transport' => 'failure_transport_3'
],
'failure_transport_1' => 'null://',
'failure_transport_3' => 'null://'
],
],
]);
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

$container->loadFromExtension('framework', [
'messenger' => [
'failure_transport' => 'failure_transport_global',
'transports' => [
'transport_1' => [
'dsn' => 'null://',
'failure_transport' => 'failure_transport_1'
],
'transport_2' => 'null://',
'transport_3' => [
'dsn' => 'null://',
'failure_transport' => 'failure_transport_3'
],
'failure_transport_global' => 'null://',
'failure_transport_1' => 'null://',
'failure_transport_3' => 'null://',
],
],
]);
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger>
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" />
<framework:transport name="transport_2" dsn="null://" />
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" />
<framework:transport name="failure_transport_1" dsn="null://" />
<framework:transport name="failure_transport_3" dsn="null://" />
</framework:messenger>
</framework:config>
</container>
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="utf-8" ?>
<container xmlns="http://symfony.com/schema/dic/services"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:framework="http://symfony.com/schema/dic/symfony"
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">

<framework:config>
<framework:messenger failure-transport="failure_transport_global">
<framework:transport name="transport_1" dsn="null://" failure-transport="failure_transport_1" />
<framework:transport name="transport_2" dsn="null://" />
<framework:transport name="transport_3" dsn="null://" failure-transport="failure_transport_3" />
<framework:transport name="failure_transport_global" dsn="null://" />
<framework:transport name="failure_transport_1" dsn="null://" />
<framework:transport name="failure_transport_3" dsn="null://" />
</framework:messenger>
</framework:config>
</container>
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
framework:
messenger:
transports:
transport_1:
dsn: 'null://'
failure_transport: failure_transport_1
transport_2: 'null://'
transport_3:
dsn: 'null://'
failure_transport: failure_transport_3
failure_transport_1: 'null://'
failure_transport_3: 'null://'
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
framework:
messenger:
failure_transport: failure_transport_global
transports:
transport_1:
dsn: 'null://'
failure_transport: failure_transport_1
transport_2: 'null://'
transport_3:
dsn: 'null://'
failure_transport: failure_transport_3
failure_transport_global: 'null://'
failure_transport_1: 'null://'
failure_transport_3: 'null://'
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,10 @@
use Symfony\Component\Validator\DependencyInjection\AddConstraintValidatorsPass;
use Symfony\Component\Validator\Mapping\Loader\PropertyInfoLoader;
use Symfony\Component\Workflow;
use Symfony\Component\Workflow\Metadata\InMemoryMetadataStore;
use Symfony\Component\Workflow\WorkflowEvents;
use Symfony\Contracts\Cache\CacheInterface;
use Symfony\Contracts\Cache\TagAwareCacheInterface;
use Symfony\Component\Workflow\Metadata\InMemoryMetadataStore;

abstract class FrameworkExtensionTest extends TestCase
{
Expand Down Expand Up @@ -648,6 +648,48 @@ public function testMessenger()
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
}

public function testMessengerMultipleFailureTransports()
{
$container = $this->createContainerFromFile('messenger_multiple_failure_transports');

// transport 2 exists but does not appear in the mapping
$expectedFailureTransports = [
'failure_transport_1' => new Reference('messenger.transport.failure_transport_1'),
'failure_transport_3' => new Reference('messenger.transport.failure_transport_3'),
];
$failureTransportsLocator = $container->getDefinition('messenger.failure_transports.locator');
$this->assertEquals($expectedFailureTransports, $failureTransportsLocator->getArgument(0));

$expectedFailureTransportsByTransportName = [
'transport_1' => new Reference('messenger.transport.failure_transport_1'),
'transport_3' => new Reference('messenger.transport.failure_transport_3'),
];

$failureTransportsByTransportNameLocator = $container->getDefinition('messenger.failure_transports_by_transport_name.locator');
$this->assertEquals($expectedFailureTransportsByTransportName, $failureTransportsByTransportNameLocator->getArgument(0));
}

public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport()
{
$container = $this->createContainerFromFile('messenger_multiple_failure_transports_global');

$expectedFailureTransports = [
'failure_transport_global' => new Reference('messenger.transport.failure_transport_global'),
'failure_transport_1' => new Reference('messenger.transport.failure_transport_1'),
'failure_transport_3' => new Reference('messenger.transport.failure_transport_3'),
];
$failureTransportsLocator = $container->getDefinition('messenger.failure_transports.locator');
$this->assertEquals($expectedFailureTransports, $failureTransportsLocator->getArgument(0));

$expectedFailureTransportsByTransportName = [
'transport_1' => new Reference('messenger.transport.failure_transport_1'),
'transport_3' => new Reference('messenger.transport.failure_transport_3'),
];

$failureTransportsByTransportNameLocator = $container->getDefinition('messenger.failure_transports_by_transport_name.locator');
$this->assertEquals($expectedFailureTransportsByTransportName, $failureTransportsByTransportNameLocator->getArgument(0));
}

public function testMessengerTransports()
{
$container = $this->createContainerFromFile('messenger_transports');
Expand Down Expand Up @@ -694,7 +736,13 @@ public function testMessengerTransports()
$this->assertSame(3, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(2));
$this->assertSame(100, $container->getDefinition('messenger.retry.multiplier_retry_strategy.customised')->getArgument(3));

$this->assertEquals(new Reference('messenger.transport.failed'), $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')->getArgument(0));
$failureTransportsLocator = $container->getDefinition('messenger.failure_transports.locator');
$expectedFailureTransports = [
'failed' => new Reference('messenger.transport.failed'),
];
$this->assertEquals($expectedFailureTransports, $failureTransportsLocator->getArgument(0));
$failureTransportsByTransportNameLocator = $container->getDefinition('messenger.failure_transports_by_transport_name.locator');
$this->assertEquals([], $failureTransportsByTransportNameLocator->getArgument(0));
}

public function testMessengerRouting()
Expand Down
Loading