Skip to content

Commit f1967aa

Browse files
committed
minor #27002 [Messenger] implement several senders using a ChainSender (Tobion)
This PR was merged into the 4.1 branch. Discussion ---------- [Messenger] implement several senders using a ChainSender | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no <!-- don't forget to update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Tests pass? | yes <!-- please add some, will be required by reviewers --> | Fixed tickets | | License | MIT | Doc PR | Commits ------- 198925e [Messenger] implement several senders using a ChainSender
2 parents 68eda49 + 198925e commit f1967aa

File tree

14 files changed

+204
-90
lines changed

14 files changed

+204
-90
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -986,14 +986,18 @@ private function addMessengerSection(ArrayNodeDefinition $rootNode)
986986
$newConfig = array();
987987
foreach ($config as $k => $v) {
988988
if (!\is_int($k)) {
989-
$newConfig[$k] = array('senders' => \is_array($v) ? array_values($v) : array($v));
989+
$newConfig[$k] = array(
990+
'senders' => $v['senders'] ?? (\is_array($v) ? array_values($v) : array($v)),
991+
'send_and_handle' => $v['send_and_handle'] ?? false,
992+
);
990993
} else {
991994
$newConfig[$v['message-class']]['senders'] = array_map(
992995
function ($a) {
993996
return \is_string($a) ? $a : $a['service'];
994997
},
995998
array_values($v['sender'])
996999
);
1000+
$newConfig[$v['message-class']]['send-and-handle'] = $v['send-and-handle'] ?? false;
9971001
}
9981002
}
9991003

@@ -1006,6 +1010,7 @@ function ($a) {
10061010
->requiresAtLeastOneElement()
10071011
->prototype('scalar')->end()
10081012
->end()
1013+
->booleanNode('send_and_handle')->defaultFalse()->end()
10091014
->end()
10101015
->end()
10111016
->end()

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
6464
use Symfony\Component\Messenger\MessageBus;
6565
use Symfony\Component\Messenger\MessageBusInterface;
66+
use Symfony\Component\Messenger\Transport\ChainSender;
6667
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
6768
use Symfony\Component\Messenger\Transport\TransportInterface;
6869
use Symfony\Component\PropertyAccess\PropertyAccessor;
@@ -1494,16 +1495,28 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
14941495
throw new LogicException(sprintf('The default bus named "%s" is not defined. Define it or change the default bus name.', $config['default_bus']));
14951496
}
14961497

1497-
$messageToSenderIdsMapping = array();
1498+
$messageToSenderIdMapping = array();
1499+
$messageToSendAndHandleMapping = array();
14981500
foreach ($config['routing'] as $message => $messageConfiguration) {
14991501
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
15001502
throw new LogicException(sprintf('Messenger routing configuration contains a mistake: message "%s" does not exist. It needs to match an existing class or interface.', $message));
15011503
}
15021504

1503-
$messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
1505+
if (1 < \count($messageConfiguration['senders'])) {
1506+
$senders = array_map(function ($sender) { return new Reference($sender); }, $messageConfiguration['senders']);
1507+
$chainSenderDefinition = new Definition(ChainSender::class, array($senders));
1508+
$chainSenderId = '.messenger.chain_sender.'.$message;
1509+
$container->setDefinition($chainSenderId, $chainSenderDefinition);
1510+
$messageToSenderIdMapping[$message] = $chainSenderId;
1511+
} else {
1512+
$messageToSenderIdMapping[$message] = $messageConfiguration['senders'][0];
1513+
}
1514+
1515+
$messageToSendAndHandleMapping[$message] = $messageConfiguration['send_and_handle'];
15041516
}
15051517

1506-
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping);
1518+
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdMapping);
1519+
$container->getDefinition('messenger.middleware.route_messages')->replaceArgument(1, $messageToSendAndHandleMapping);
15071520

15081521
foreach ($config['transports'] as $name => $transport) {
15091522
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
<!-- Asynchronous -->
1616
<service id="messenger.asynchronous.routing.sender_locator" class="Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator">
1717
<argument type="service" id="messenger.sender_locator" />
18-
<argument /> <!-- Message to sender ID mapping -->
18+
<argument type="collection" /> <!-- Message to sender ID mapping -->
1919
</service>
2020
<service id="messenger.middleware.route_messages" class="Symfony\Component\Messenger\Asynchronous\Middleware\SendMessageMiddleware">
2121
<argument type="service" id="messenger.asynchronous.routing.sender_locator" />
22+
<argument type="collection" /> <!-- Message to send and handle mapping -->
2223
</service>
2324

2425
<!-- Message encoding/decoding -->

src/Symfony/Bundle/FrameworkBundle/Resources/config/schema/symfony-1.0.xsd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,7 @@
375375
<xsd:element name="sender" type="messenger_routing_sender" />
376376
</xsd:choice>
377377
<xsd:attribute name="message-class" type="xsd:string" use="required"/>
378+
<xsd:attribute name="send-and-handle" type="xsd:boolean" default="false"/>
378379
</xsd:complexType>
379380

380381
<xsd:complexType name="messenger_routing_sender">

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_routing.php

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
$container->loadFromExtension('framework', array(
44
'messenger' => array(
55
'routing' => array(
6-
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage' => array('amqp'),
7-
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage' => array('amqp', 'audit', null),
6+
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage' => array('amqp', 'audit'),
7+
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage' => array(
8+
'senders' => array('amqp', 'audit'),
9+
'send_and_handle' => true,
10+
),
811
'*' => 'amqp',
912
),
1013
),

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_routing.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@
99
<framework:messenger>
1010
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\DummyMessage">
1111
<framework:sender service="amqp" />
12+
<framework:sender service="audit" />
1213
</framework:routing>
13-
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\SecondMessage">
14+
<framework:routing message-class="Symfony\Component\Messenger\Tests\Fixtures\SecondMessage" send-and-handle="true">
1415
<framework:sender service="amqp" />
1516
<framework:sender service="audit" />
16-
<framework:sender service="null" />
1717
</framework:routing>
1818
<framework:routing message-class="*">
1919
<framework:sender service="amqp" />
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
framework:
22
messenger:
33
routing:
4-
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage': amqp
5-
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage': [amqp, audit, ~]
4+
'Symfony\Component\Messenger\Tests\Fixtures\DummyMessage': [amqp, audit]
5+
'Symfony\Component\Messenger\Tests\Fixtures\SecondMessage':
6+
senders: [amqp, audit]
7+
send_and_handle: true
68
'*': amqp

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -558,14 +558,21 @@ public function testMessengerRouting()
558558
{
559559
$container = $this->createContainerFromFile('messenger_routing');
560560
$senderLocatorDefinition = $container->getDefinition('messenger.asynchronous.routing.sender_locator');
561+
$sendMessageMiddlewareDefinition = $container->getDefinition('messenger.middleware.route_messages');
561562

562563
$messageToSenderIdsMapping = array(
563-
DummyMessage::class => array('amqp'),
564-
SecondMessage::class => array('amqp', 'audit', null),
565-
'*' => array('amqp'),
564+
DummyMessage::class => '.messenger.chain_sender.'.DummyMessage::class,
565+
SecondMessage::class => '.messenger.chain_sender.'.SecondMessage::class,
566+
'*' => 'amqp',
567+
);
568+
$messageToSendAndHandleMapping = array(
569+
DummyMessage::class => false,
570+
SecondMessage::class => true,
571+
'*' => false,
566572
);
567573

568574
$this->assertSame($messageToSenderIdsMapping, $senderLocatorDefinition->getArgument(1));
575+
$this->assertSame($messageToSendAndHandleMapping, $sendMessageMiddlewareDefinition->getArgument(1));
569576
}
570577

571578
/**

src/Symfony/Component/Messenger/Asynchronous/Middleware/SendMessageMiddleware.php

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
namespace Symfony\Component\Messenger\Asynchronous\Middleware;
1313

14+
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocator;
1415
use Symfony\Component\Messenger\Asynchronous\Routing\SenderLocatorInterface;
1516
use Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage;
1617
use Symfony\Component\Messenger\Envelope;
@@ -19,14 +20,17 @@
1920

2021
/**
2122
* @author Samuel Roze <samuel.roze@gmail.com>
23+
* @author Tobias Schultze <http://tobion.de>
2224
*/
2325
class SendMessageMiddleware implements MiddlewareInterface, EnvelopeAwareInterface
2426
{
2527
private $senderLocator;
28+
private $messagesToSendAndHandleMapping;
2629

27-
public function __construct(SenderLocatorInterface $senderLocator)
30+
public function __construct(SenderLocatorInterface $senderLocator, array $messagesToSendAndHandleMapping = array())
2831
{
2932
$this->senderLocator = $senderLocator;
33+
$this->messagesToSendAndHandleMapping = $messagesToSendAndHandleMapping;
3034
}
3135

3236
/**
@@ -40,20 +44,21 @@ public function handle($message, callable $next)
4044
return $next($message);
4145
}
4246

43-
if (!empty($senders = $this->senderLocator->getSendersForMessage($envelope->getMessage()))) {
44-
foreach ($senders as $sender) {
45-
if (null === $sender) {
46-
continue;
47-
}
47+
$sender = $this->senderLocator->getSenderForMessage($envelope->getMessage());
4848

49-
$sender->send($envelope);
50-
}
49+
if ($sender) {
50+
$sender->send($envelope);
5151

52-
if (!\in_array(null, $senders, true)) {
52+
if (!$this->mustSendAndHandle($envelope->getMessage())) {
5353
return;
5454
}
5555
}
5656

5757
return $next($message);
5858
}
59+
60+
private function mustSendAndHandle($message): bool
61+
{
62+
return (bool) SenderLocator::getValueFromMessageRouting($this->messagesToSendAndHandleMapping, $message);
63+
}
5964
}

src/Symfony/Component/Messenger/Asynchronous/Routing/SenderLocator.php

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,49 +12,55 @@
1212
namespace Symfony\Component\Messenger\Asynchronous\Routing;
1313

1414
use Psr\Container\ContainerInterface;
15+
use Symfony\Component\Messenger\Transport\SenderInterface;
1516

1617
/**
1718
* @author Samuel Roze <samuel.roze@gmail.com>
1819
*/
1920
class SenderLocator implements SenderLocatorInterface
2021
{
2122
private $senderServiceLocator;
22-
private $messageToSenderIdsMapping;
23+
private $messageToSenderIdMapping;
2324

24-
public function __construct(ContainerInterface $senderServiceLocator, array $messageToSenderIdsMapping)
25+
public function __construct(ContainerInterface $senderServiceLocator, array $messageToSenderIdMapping)
2526
{
2627
$this->senderServiceLocator = $senderServiceLocator;
27-
$this->messageToSenderIdsMapping = $messageToSenderIdsMapping;
28+
$this->messageToSenderIdMapping = $messageToSenderIdMapping;
2829
}
2930

3031
/**
3132
* {@inheritdoc}
3233
*/
33-
public function getSendersForMessage($message): array
34+
public function getSenderForMessage($message): ?SenderInterface
3435
{
35-
$senders = array();
36-
foreach ($this->getSenderIds($message) as $senderId) {
37-
$senders[] = $this->senderServiceLocator->get($senderId);
38-
}
36+
$senderId = $this->getSenderId($message);
37+
38+
return $senderId ? $this->senderServiceLocator->get($senderId) : null;
39+
}
3940

40-
return $senders;
41+
private function getSenderId($message): ?string
42+
{
43+
return self::getValueFromMessageRouting($this->messageToSenderIdMapping, $message);
4144
}
4245

43-
private function getSenderIds($message): array
46+
/**
47+
* @internal
48+
*/
49+
public static function getValueFromMessageRouting(array $mapping, $message)
4450
{
45-
if (isset($this->messageToSenderIdsMapping[\get_class($message)])) {
46-
return $this->messageToSenderIdsMapping[\get_class($message)];
51+
if (isset($mapping[\get_class($message)])) {
52+
return $mapping[\get_class($message)];
4753
}
48-
if ($messageToSenderIdsMapping = array_intersect_key($this->messageToSenderIdsMapping, class_parents($message))) {
49-
return current($messageToSenderIdsMapping);
54+
if ($parentsMapping = array_intersect_key($mapping, class_parents($message))) {
55+
return current($parentsMapping);
5056
}
51-
if ($messageToSenderIdsMapping = array_intersect_key($this->messageToSenderIdsMapping, class_implements($message))) {
52-
return current($messageToSenderIdsMapping);
57+
if ($interfaceMapping = array_intersect_key($mapping, class_implements($message))) {
58+
return current($interfaceMapping);
5359
}
54-
if (isset($this->messageToSenderIdsMapping['*'])) {
55-
return $this->messageToSenderIdsMapping['*'];
60+
if (isset($mapping['*'])) {
61+
return $mapping['*'];
5662
}
5763

58-
return array();
64+
return null;
5965
}
6066
}

0 commit comments

Comments
 (0)