Skip to content

Commit 76708f6

Browse files
committed
[Messenger] implement several senders using a ChainSender
1 parent 7a4abf1 commit 76708f6

File tree

14 files changed

+153
-73
lines changed

14 files changed

+153
-73
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -986,14 +986,18 @@ private function addMessengerSection(ArrayNodeDefinition $rootNode)
986986
$newConfig = array();
987987
foreach ($config as $k => $v) {
988988
if (!\is_int($k)) {
989-
$newConfig[$k] = array('senders' => \is_array($v) ? array_values($v) : array($v));
989+
$newConfig[$k] = array(
990+
'senders' => $v['senders'] ?? (\is_array($v) ? array_values($v) : array($v)),
991+
'send_and_handle' => $v['send_and_handle'] ?? false,
992+
);
990993
} else {
991994
$newConfig[$v['message-class']]['senders'] = array_map(
992995
function ($a) {
993996
return \is_string($a) ? $a : $a['service'];
994997
},
995998
array_values($v['sender'])
996999
);
1000+
$newConfig[$v['message-class']]['send-and-handle'] = $v['send-and-handle'] ?? false;
9971001
}
9981002
}
9991003

@@ -1006,6 +1010,7 @@ function ($a) {
10061010
->requiresAtLeastOneElement()
10071011
->prototype('scalar')->end()
10081012
->end()
1013+
->booleanNode('send_and_handle')->defaultFalse()->end()
10091014
->end()
10101015
->end()
10111016
->end()

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
6464
use Symfony\Component\Messenger\MessageBus;
6565
use Symfony\Component\Messenger\MessageBusInterface;
66+
use Symfony\Component\Messenger\Transport\ChainSender;
6667
use Symfony\Component\Messenger\Transport\ReceiverInterface;
6768
use Symfony\Component\Messenger\Transport\SenderInterface;
6869
use Symfony\Component\Messenger\Transport\TransportInterface;
@@ -1491,16 +1492,30 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
14911492
throw new LogicException(sprintf('The default bus named "%s" is not defined. Define it or change the default bus name.', $config['default_bus']));
14921493
}
14931494

1494-
$messageToSenderIdsMapping = array();
1495+
$messageToSenderIdMapping = array();
1496+
$messageToSendAndHandleMapping = array();
14951497
foreach ($config['routing'] as $message => $messageConfiguration) {
14961498
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
14971499
throw new LogicException(sprintf('Messenger routing configuration contains a mistake: message "%s" does not exist. It needs to match an existing class or interface.', $message));
14981500
}
14991501

1500-
$messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
1502+
if (1 < count($messageConfiguration['senders'])) {
1503+
$senders = array_map(function ($sender) { return new Reference($sender); }, $messageConfiguration['senders']);
1504+
$chainSenderDefinition = new Definition(ChainSender::class, array($senders));
1505+
$chainSenderId = 'messenger.chain_sender.'.$message;
1506+
$container->setDefinition($chainSenderId, $chainSenderDefinition);
1507+
$messageToSenderIdMapping[$message] = $chainSenderId;
1508+
} else {
1509+
$messageToSenderIdMapping[$message] = $messageConfiguration['senders'][0];
1510+
}
1511+
1512+
$messageToSendAndHandleMapping[$message] = $messageConfiguration['send_and_handle'];
15011513
}
15021514

1503-
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping);
1515+
$container->getDefinition('messenger.asynchronous.routing.sender_locator')
1516+
->replaceArgument(1, $messageToSenderIdMapping)
1517+
->replaceArgument(2, $messageToSendAndHandleMapping)
1518+
;
15041519

15051520
foreach ($config['transports'] as $name => $transport) {
15061521
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
<!-- Asynchronous -->
1616
<service id="messenger.asynchronous.routing.sender_locator" class="Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator">
1717
<argument type="service" id="messenger.sender_locator" />
18-
<argument /> <!-- Message to sender ID mapping -->
18+
<argument type="collection" /> <!-- Message to sender ID mapping -->
19+
<argument type="collection" /> <!-- Message to send and handle mapping -->
1920
</service>
2021
<service id="messenger.middleware.route_messages" class="Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware">
2122
<argument type="service" id="messenger.asynchronous.routing.sender_locator" />

src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@
375375
<xsd:element name="sender" type="messenger_routing_sender" />
376376
</xsd:choice>
377377
<xsd:attribute name="message-class" type="xsd:string" use="required"/>
378+
<xsd:attribute name="send-and-handle" type="xsd:boolean" default="false"/>
378379
</xsd:complexType>
379380

380381
<xsd:complexType name="messenger_routing_sender">

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_routing.php

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
'messenger' => array(
55
'routing' => array(
66
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage' => array('amqp'),
7-
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage' => array('amqp', 'audit', null),
7+
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage' => array(
8+
'senders' => array('amqp', 'audit'),
9+
'send_and_handle' => true,
10+
),
811
'*' => 'amqp',
912
),
1013
),

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_routing.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@
1010
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\DummyMessage">
1111
<framework:sender service="amqp" />
1212
</framework:routing>
13-
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\SecondMessage">
13+
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\SecondMessage" send-and-handle="true">
1414
<framework:sender service="amqp" />
1515
<framework:sender service="audit" />
16-
<framework:sender service="null" />
1716
</framework:routing>
1817
<framework:routing message-class="*">
1918
<framework:sender service="amqp" />

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_routing.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,7 @@ framework:
22
messenger:
33
routing:
44
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage': amqp
5-
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage': [amqp, audit, ~]
5+
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage':
6+
senders: [amqp, audit]
7+
send_and_handle: true
68
'*': amqp

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -557,12 +557,18 @@ public function testMessengerRouting()
557557
$senderLocatorDefinition = $container->getDefinition('messenger.asynchronous.routing.sender_locator');
558558

559559
$messageToSenderIdsMapping = array(
560-
DummyMessage::class => array('amqp'),
561-
SecondMessage::class => array('amqp', 'audit', null),
562-
'*' => array('amqp'),
560+
DummyMessage::class => 'amqp',
561+
SecondMessage::class => 'messenger.chain_sender.'.SecondMessage::class,
562+
'*' => 'amqp',
563+
);
564+
$messageToSendAndHandleMapping = array(
565+
DummyMessage::class => false,
566+
SecondMessage::class => true,
567+
'*' => false,
563568
);
564569

565570
$this->assertSame($messageToSenderIdsMapping, $senderLocatorDefinition->getArgument(1));
571+
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(2));
566572
}
567573

568574
/**

src/Symfony/Component/Messenger/Asynchronous/Middleware/SendMessageMiddleware.php

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,16 @@ public function handle($message, callable $next)
3636
return $next($message->getMessage());
3737
}
3838

39-
$hasSender = false;
40-
$forwardToSenderAndHandler = false;
41-
foreach ($this->senderLocator->getSendersForMessage($message) as $sender) {
42-
$hasSender = true;
43-
44-
if (null === $sender) {
45-
$forwardToSenderAndHandler = true;
46-
continue;
47-
}
39+
$sender = $this->senderLocator->getSenderForMessage($message);
4840

41+
if ($sender) {
4942
$sender->send($message);
50-
}
5143

52-
if (!$hasSender || $forwardToSenderAndHandler) {
53-
return $next($message);
44+
if (!$this->senderLocator->forwardToSenderAndHandler($message)) {
45+
return;
46+
}
5447
}
48+
49+
return $next($message);
5550
}
5651
}

src/Symfony/Component/Messenger/Asynchronous/Routing/SenderLocator.php

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,30 +12,39 @@
1212
namespace Symfony\Component\Messenger\Asynchronous\Routing;
1313

1414
use Psr\Container\ContainerInterface;
15+
use Symfony\Component\Messenger\Transport\SenderInterface;
1516

1617
/**
1718
* @author Samuel Roze <samuel.roze@gmail.com>
1819
*/
1920
class SenderLocator implements SenderLocatorInterface
2021
{
2122
private $senderServiceLocator;
22-
private $messageToSenderIdsMapping;
23+
private $messageToSenderIdMapping;
24+
private $messagesToSendAndHandleMapping;
2325

24-
public function __construct(ContainerInterface $senderServiceLocator, array $messageToSenderIdsMapping)
26+
public function __construct(ContainerInterface $senderServiceLocator, array $messageToSenderIdMapping, array $messagesToSendAndHandleMapping = array())
2527
{
2628
$this->senderServiceLocator = $senderServiceLocator;
27-
$this->messageToSenderIdsMapping = $messageToSenderIdsMapping;
29+
$this->messageToSenderIdMapping = $messageToSenderIdMapping;
30+
$this->messagesToSendAndHandleMapping = $messagesToSendAndHandleMapping;
2831
}
2932

3033
/**
3134
* {@inheritdoc}
3235
*/
33-
public function getSendersForMessage($message): iterable
36+
public function getSenderForMessage($message): ?SenderInterface
3437
{
35-
$senderIds = $this->messageToSenderIdsMapping[\get_class($message)] ?? $this->messageToSenderIdsMapping['*'] ?? array();
38+
$senderId = $this->messageToSenderIdMapping[\get_class($message)] ?? $this->messageToSenderIdMapping['*'] ?? null;
3639

37-
foreach ($senderIds as $senderId) {
38-
yield $this->senderServiceLocator->get($senderId);
39-
}
40+
return $senderId ? $this->senderServiceLocator->get($senderId) : null;
41+
}
42+
43+
/**
44+
* {@inheritdoc}
45+
*/
46+
public function forwardToSenderAndHandler($message): bool
47+
{
48+
return $this->messagesToSendAndHandleMapping[\get_class($message)] ?? $this->messagesToSendAndHandleMapping['*'] ?? false;
4049
}
4150
}

0 commit comments

Comments
 (0)