Skip to content

Commit f995ffa

Browse files
committed
[Messenger] Multiple failure transports support
1 parent 099a646 commit f995ffa

12 files changed

+313
-14
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

+1-3
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,6 @@
8181
use Symfony\Component\Mailer\Bridge\Postmark\Transport\PostmarkTransportFactory;
8282
use Symfony\Component\Mailer\Bridge\Sendgrid\Transport\SendgridTransportFactory;
8383
use Symfony\Component\Mailer\Mailer;
84-
use Symfony\Component\Messenger\Command\FailedMessagesRetryCommand;
85-
use Symfony\Component\Messenger\EventListener\SendFailedMessageToFailureTransportListener;
8684
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
8785
use Symfony\Component\Messenger\MessageBus;
8886
use Symfony\Component\Messenger\MessageBusInterface;
@@ -1710,7 +1708,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
17101708
}
17111709

17121710
$transports = array_keys($config['transports']);
1713-
$numberOfTransports = count($transports);
1711+
$numberOfTransports = \count($transports);
17141712
$failureTransports = array_combine($transports, array_fill(0, $numberOfTransports, $senderReferences[$config['failure_transport']]));
17151713

17161714
$container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener')

src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

+2
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,7 @@
412412
<xsd:element name="bus" type="messenger_bus" minOccurs="0" maxOccurs="unbounded" />
413413
</xsd:sequence>
414414
<xsd:attribute name="default-bus" type="xsd:string" />
415+
<xsd:attribute name="failure_transport" type="xsd:string" />
415416
</xsd:complexType>
416417

417418
<xsd:complexType name="messenger_serializer">
@@ -444,6 +445,7 @@
444445
<xsd:element name="options" type="metadata" minOccurs="0" maxOccurs="unbounded" />
445446
</xsd:sequence>
446447
<xsd:attribute name="name" type="xsd:string" />
448+
<xsd:attribute name="failure_transport" type="xsd:string" />
447449
<xsd:attribute name="serializer" type="xsd:string" />
448450
<xsd:attribute name="dsn" type="xsd:string" />
449451
</xsd:complexType>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework', [
4+
'messenger' => [
5+
'transports' => [
6+
'transport_1' => [
7+
'dsn' => 'null://',
8+
'failure_transport' => 'failure_transport_1'
9+
],
10+
'transport_2' => 'null://',
11+
'transport_3' => [
12+
'dsn' => 'null://',
13+
'failure_transport' => 'failure_transport_3'
14+
],
15+
'failure_transport_1' => 'null://',
16+
'failure_transport_3' => 'null://'
17+
],
18+
],
19+
]);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
$container->loadFromExtension('framework', [
4+
'messenger' => [
5+
'failure_transport' => 'failure_transport_global',
6+
'transports' => [
7+
'transport_1' => [
8+
'dsn' => 'null://',
9+
'failure_transport' => 'failure_transport_1'
10+
],
11+
'transport_2' => 'null://',
12+
'transport_3' => [
13+
'dsn' => 'null://',
14+
'failure_transport' => 'failure_transport_3'
15+
],
16+
'failure_transport_global' => 'null://',
17+
'failure_transport_1' => 'null://',
18+
'failure_transport_3' => 'null://',
19+
],
20+
],
21+
]);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<container xmlns="http://symfony.com/schema/dic/services"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:framework="http://symfony.com/schema/dic/symfony"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
6+
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
7+
8+
<framework:config>
9+
<framework:messenger>
10+
<framework:transport name="transport_1" dsn="null://" failure_transport="failure_transport_1" />
11+
<framework:transport name="transport_2" dsn="null://" />
12+
<framework:transport name="transport_3" dsn="null://" failure_transport="failure_transport_3" />
13+
<framework:transport name="failure_transport_1" dsn="null://" />
14+
<framework:transport name="failure_transport_3" dsn="null://" />
15+
</framework:messenger>
16+
</framework:config>
17+
</container>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<?xml version="1.0" encoding="utf-8" ?>
2+
<container xmlns="http://symfony.com/schema/dic/services"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xmlns:framework="http://symfony.com/schema/dic/symfony"
5+
xsi:schemaLocation="http://symfony.com/schema/dic/services https://symfony.com/schema/dic/services/services-1.0.xsd
6+
http://symfony.com/schema/dic/symfony https://symfony.com/schema/dic/symfony/symfony-1.0.xsd">
7+
8+
<framework:config>
9+
<framework:messenger failure_transport="failure_transport_global">
10+
<framework:transport name="transport_1" dsn="null://" failure_transport="failure_transport_1" />
11+
<framework:transport name="transport_2" dsn="null://" />
12+
<framework:transport name="transport_3" dsn="null://" failure_transport="failure_transport_3" />
13+
<framework:transport name="failure_transport_global" dsn="null://" />
14+
<framework:transport name="failure_transport_1" dsn="null://" />
15+
<framework:transport name="failure_transport_3" dsn="null://" />
16+
</framework:messenger>
17+
</framework:config>
18+
</container>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
framework:
2+
messenger:
3+
transports:
4+
transport_1:
5+
dsn: 'null://'
6+
failure_transport: failure_transport_1
7+
transport_2: 'null://'
8+
transport_3:
9+
dsn: 'null://'
10+
failure_transport: failure_transport_3
11+
failure_transport_1: 'null://'
12+
failure_transport_3: 'null://'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
framework:
2+
messenger:
3+
failure_transport: failure_transport_global
4+
transports:
5+
transport_1:
6+
dsn: 'null://'
7+
failure_transport: failure_transport_1
8+
transport_2: 'null://'
9+
transport_3:
10+
dsn: 'null://'
11+
failure_transport: failure_transport_3
12+
failure_transport_global: 'null://'
13+
failure_transport_1: 'null://'
14+
failure_transport_3: 'null://'

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

+56
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,62 @@ public function testMessenger()
576576
$this->assertSame(TransportFactory::class, $container->getDefinition('messenger.transport_factory')->getClass());
577577
}
578578

579+
public function testMessengerMultipleFailureTransports()
580+
{
581+
$container = $this->createContainerFromFile('messenger_multiple_failure_transports');
582+
$failedMessageTransportListenerReference = $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
583+
584+
// transport 2 exists but does not appear in the mapping
585+
$expectedFailureTransportsMapping = [
586+
'transport_1' => 'failure_transport_1',
587+
'transport_3' => 'failure_transport_3',
588+
];
589+
590+
$failed_transports = [
591+
'failure_transport_1',
592+
'failure_transport_3',
593+
];
594+
595+
/** @var Reference[] $failureTransportsMapping */
596+
$failureTransportsMapping = $failedMessageTransportListenerReference->getArgument(0);
597+
foreach ($failureTransportsMapping as $transportName => $ref) {
598+
if (\in_array($transportName, $failed_transports)) {
599+
continue;
600+
}
601+
602+
$this->assertSame('messenger.transport.'.$expectedFailureTransportsMapping[$transportName], (string) $ref, sprintf('The transport "%s" does not have the expected failed transport reference', $transportName));
603+
}
604+
}
605+
606+
public function testMessengerMultipleFailureTransportsWithGlobalFailureTransport()
607+
{
608+
$container = $this->createContainerFromFile('messenger_multiple_failure_transports_global');
609+
$failedMessageTransportListenerReference = $container->getDefinition('messenger.failure.send_failed_message_to_failure_transport_listener');
610+
611+
$expectedFailureTransportsMapping = [
612+
'transport_1' => 'failure_transport_1',
613+
'transport_2' => 'failure_transport_global',
614+
'transport_3' => 'failure_transport_3',
615+
];
616+
617+
$failed_transports = [
618+
'failure_transport_global',
619+
'failure_transport_1',
620+
'failure_transport_3',
621+
];
622+
623+
/** @var Reference[] $failureTransportsMapping */
624+
$failureTransportsMapping = $failedMessageTransportListenerReference->getArgument(0);
625+
626+
foreach ($failureTransportsMapping as $transportName => $ref) {
627+
if (\in_array($transportName, $failed_transports)) {
628+
continue;
629+
}
630+
631+
$this->assertSame('messenger.transport.'.$expectedFailureTransportsMapping[$transportName], (string) $ref, sprintf('The transport "%s" does not have the expected failed transport reference', $transportName));
632+
}
633+
}
634+
579635
public function testMessengerTransports()
580636
{
581637
$container = $this->createContainerFromFile('messenger_transports');

src/Symfony/Component/Messenger/EventListener/SendFailedMessageToFailureTransportListener.php

-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use Symfony\Component\Messenger\Stamp\DelayStamp;
1919
use Symfony\Component\Messenger\Stamp\RedeliveryStamp;
2020
use Symfony\Component\Messenger\Stamp\SentToFailureTransportStamp;
21-
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
2221

2322
/**
2423
* Sends a rejected message to a "failure transport".

src/Symfony/Component/Messenger/Tests/EventListener/SendFailedMessageToFailureTransportListenerTest.php

+22-9
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,14 @@ public function testItSendsToTheFailureTransport()
4141

4242
return true;
4343
}))->willReturnArgument(0);
44-
$listener = new SendFailedMessageToFailureTransportListener($sender);
44+
$receiverName = 'my_receiver';
45+
$listener = new SendFailedMessageToFailureTransportListener([
46+
$receiverName => $sender,
47+
]);
4548

4649
$exception = new \Exception('no!');
4750
$envelope = new Envelope(new \stdClass());
48-
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
51+
$event = new WorkerMessageFailedEvent($envelope, $receiverName, $exception);
4952

5053
$listener->onMessageFailed($event);
5154
}
@@ -64,12 +67,15 @@ public function testItGetsNestedHandlerFailedException()
6467
return true;
6568
}))->willReturnArgument(0);
6669

67-
$listener = new SendFailedMessageToFailureTransportListener($sender);
70+
$receiverName = 'my_receiver';
71+
$listener = new SendFailedMessageToFailureTransportListener([
72+
$receiverName => $sender,
73+
]);
6874

6975
$envelope = new Envelope(new \stdClass());
7076
$exception = new \Exception('I am inside!');
7177
$exception = new HandlerFailedException($envelope, [$exception]);
72-
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', $exception);
78+
$event = new WorkerMessageFailedEvent($envelope, $receiverName, $exception);
7379

7480
$listener->onMessageFailed($event);
7581
}
@@ -78,10 +84,13 @@ public function testDoNothingOnRetry()
7884
{
7985
$sender = $this->createMock(SenderInterface::class);
8086
$sender->expects($this->never())->method('send');
81-
$listener = new SendFailedMessageToFailureTransportListener($sender);
87+
$receiverName = 'my_receiver';
88+
$listener = new SendFailedMessageToFailureTransportListener([
89+
$receiverName => $sender,
90+
]);
8291

8392
$envelope = new Envelope(new \stdClass());
84-
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception());
93+
$event = new WorkerMessageFailedEvent($envelope, $receiverName, new \Exception());
8594
$event->setForRetry();
8695

8796
$listener->onMessageFailed($event);
@@ -91,12 +100,16 @@ public function testDoNotRedeliverToFailed()
91100
{
92101
$sender = $this->createMock(SenderInterface::class);
93102
$sender->expects($this->never())->method('send');
94-
$listener = new SendFailedMessageToFailureTransportListener($sender);
103+
$receiverName = 'my_receiver';
104+
$listener = new SendFailedMessageToFailureTransportListener([
105+
$receiverName => $sender,
106+
]);
95107

96108
$envelope = new Envelope(new \stdClass(), [
97-
new SentToFailureTransportStamp('my_receiver'),
109+
new SentToFailureTransportStamp($receiverName),
98110
]);
99-
$event = new WorkerMessageFailedEvent($envelope, 'my_receiver', new \Exception());
111+
112+
$event = new WorkerMessageFailedEvent($envelope, $receiverName, new \Exception());
100113

101114
$listener->onMessageFailed($event);
102115
}

0 commit comments

Comments
 (0)