Skip to content

Commit d2b8014

Browse files
committed
feature #29010 [Messenger] make senders and handlers subscribing to parent interfaces receive *all* matching messages, wildcard included (nicolas-grekas)
This PR was merged into the 4.2-dev branch. Discussion ---------- [Messenger] make senders and handlers subscribing to parent interfaces receive *all* matching messages, wildcard included | Q | A | ------------- | --- | Branch? | 4.2 | Bug fix? | no | New feature? | yes | BC breaks? | yes | Deprecations? | no | Tests pass? | yes | Fixed tickets | - | License | MIT | Doc PR | - ~Embeds #29006 for now.~ From the CHANGELOG: * senders and handlers subscribing to parent interfaces now receive *all* matching messages, wildcard included * `HandlerLocatorInterface::resolve()` has been removed, use `HandlersLocator::getHandlers()` instead * `SenderLocatorInterface::getSenderForMessage()` has been removed, use `SendersLocator::getSenders()` instead * The `ChainHandler` and `ChainSender` classes have been removed * The `ContainerHandlerLocator`, `AbstractHandlerLocator`, `SenderLocator` and `AbstractSenderLocator` classes have been removed The new `HandlersLocatorInterface` and `SendersLocatorInterface` interfaces return **`iterable`** of corresponding handlers/senders. This allows simplifying a lot the DI configuration and standalone usage. Inheritance-based configuration is now stable: when a sender or a handler is bound to `SomeMessageInterface`, it will always get all messages of that kind. This fixes the unstable nature of the previous logic, where only the first matching type bound to a message would match, making routing/handling unpredictable (note that the new behavior is also how Laravel does it.) Some cleanups found meanwhile: * the `messenger.sender` tag was useless, it's removed * the reponsibility of the "send-and-handle" decision has been moved to the locator, where it belongs * thanks to type+argument autowiring aliases, we don't need to force defining a default bus - gaining nice errors when a named argument has a typo * some services have been renamed to make them more conventional As far as priorities are concerned, the priority number applies in the scope of the type bound to it: senders/handlers that are bound to more specific types are always called before less specific ones, no matter the defined priority. Commits ------- 1e7af4d [Messenger] make senders and handlers subscribing to parent interfaces receive *all* matching messages, wildcard included
2 parents 225746b + 1e7af4d commit d2b8014

38 files changed

+419
-977
lines changed

src/Symfony/Bundle/FrameworkBundle/Controller/ControllerTrait.php

+3-2
Original file line numberDiff line numberDiff line change
@@ -391,14 +391,15 @@ protected function isCsrfTokenValid(string $id, ?string $token): bool
391391
/**
392392
* Dispatches a message to the bus.
393393
*
394-
* @param object $message The message to dispatch
394+
* @param object|Envelope $message The message or the message pre-wrapped in an envelope
395395
*
396396
* @final
397397
*/
398398
protected function dispatchMessage($message): Envelope
399399
{
400400
if (!$this->container->has('message_bus')) {
401-
throw new \LogicException('The message bus is not enabled in your application. Try running "composer require symfony/messenger".');
401+
$message = class_exists(Envelope::class) ? 'You need to define the "messenger.default_bus" configuration option.' : 'Try running "composer require symfony/messenger".';
402+
throw new \LogicException('The message bus is not enabled in your application. '.$message);
402403
}
403404

404405
return $this->container->get('message_bus')->dispatch($message);

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Compiler/UnusedTagsPass.php

-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ class UnusedTagsPass implements CompilerPassInterface
4040
'kernel.event_subscriber',
4141
'kernel.fragment_renderer',
4242
'messenger.bus',
43-
'messenger.sender',
4443
'messenger.receiver',
4544
'messenger.message_handler',
4645
'monolog.logger',

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -1077,7 +1077,7 @@ function ($a) {
10771077
->end()
10781078
->end()
10791079
->end()
1080-
->scalarNode('default_bus')->defaultValue(null)->end()
1080+
->scalarNode('default_bus')->defaultNull()->end()
10811081
->arrayNode('buses')
10821082
->defaultValue(array('messenger.bus.default' => array('default_middleware' => true, 'middleware' => array())))
10831083
->useAttributeAsKey('name')

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

+23-33
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
use Symfony\Component\Console\Application;
3737
use Symfony\Component\Console\Command\Command;
3838
use Symfony\Component\DependencyInjection\Alias;
39+
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
40+
use Symfony\Component\DependencyInjection\Argument\RewindableGenerator;
3941
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
4042
use Symfony\Component\DependencyInjection\ChildDefinition;
4143
use Symfony\Component\DependencyInjection\ContainerBuilder;
@@ -68,7 +70,6 @@
6870
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
6971
use Symfony\Component\Messenger\MessageBus;
7072
use Symfony\Component\Messenger\MessageBusInterface;
71-
use Symfony\Component\Messenger\Transport\Sender\ChainSender;
7273
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
7374
use Symfony\Component\Messenger\Transport\TransportInterface;
7475
use Symfony\Component\PropertyAccess\PropertyAccessor;
@@ -1491,7 +1492,7 @@ private function registerLockConfiguration(array $config, ContainerBuilder $cont
14911492
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader, array $serializerConfig, array $validationConfig)
14921493
{
14931494
if (!interface_exists(MessageBusInterface::class)) {
1494-
throw new LogicException('Messenger support cannot be enabled as the Messenger component is not installed.');
1495+
throw new LogicException('Messenger support cannot be enabled as the Messenger component is not installed. Try running "composer require symfony/messenger".');
14951496
}
14961497

14971498
$loader->load('messenger.xml');
@@ -1502,7 +1503,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
15021503
} else {
15031504
if ('messenger.transport.symfony_serializer' === $config['serializer']['id']) {
15041505
if (!$this->isConfigEnabled($container, $serializerConfig)) {
1505-
throw new LogicException('The default Messenger serializer cannot be enabled as the Serializer support is not available. Try enable it or install it by running "composer require symfony/serializer-pack".');
1506+
throw new LogicException('The default Messenger serializer cannot be enabled as the Serializer support is not available. Try enabling it or running "composer require symfony/serializer-pack".');
15061507
}
15071508

15081509
$container->getDefinition('messenger.transport.symfony_serializer')
@@ -1517,17 +1518,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
15171518
}
15181519
}
15191520

1520-
if (null === $config['default_bus']) {
1521-
if (\count($config['buses']) > 1) {
1522-
throw new LogicException(sprintf('You need to define a default bus with the "default_bus" configuration. Possible values: %s', implode(', ', array_keys($config['buses']))));
1523-
}
1524-
1521+
if (null === $config['default_bus'] && 1 === \count($config['buses'])) {
15251522
$config['default_bus'] = key($config['buses']);
15261523
}
15271524

15281525
$defaultMiddleware = array(
15291526
'before' => array(array('id' => 'logging')),
1530-
'after' => array(array('id' => 'route_messages'), array('id' => 'call_message_handler')),
1527+
'after' => array(array('id' => 'send_message'), array('id' => 'handle_message')),
15311528
);
15321529
foreach ($config['buses'] as $busId => $bus) {
15331530
$middleware = $bus['middleware'];
@@ -1562,51 +1559,44 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
15621559
}
15631560
}
15641561

1565-
if (!$container->hasAlias('message_bus')) {
1566-
throw new LogicException(sprintf('The default bus named "%s" is not defined. Define it or change the default bus name.', $config['default_bus']));
1567-
}
1568-
15691562
$senderAliases = array();
15701563
foreach ($config['transports'] as $name => $transport) {
15711564
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {
1572-
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enable it or install it by running "composer require symfony/serializer-pack".');
1565+
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enabling it or running "composer require symfony/serializer-pack".');
15731566
}
15741567

15751568
$transportDefinition = (new Definition(TransportInterface::class))
15761569
->setFactory(array(new Reference('messenger.transport_factory'), 'createTransport'))
15771570
->setArguments(array($transport['dsn'], $transport['options']))
15781571
->addTag('messenger.receiver', array('alias' => $name))
1579-
->addTag('messenger.sender', array('alias' => $name))
15801572
;
15811573
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
15821574
$senderAliases[$name] = $transportId;
15831575
}
15841576

1585-
$messageToSenderIdMapping = array();
1586-
$messageToSendAndHandleMapping = array();
1577+
$messageToSendersMapping = array();
1578+
$messagesToSendAndHandle = array();
15871579
foreach ($config['routing'] as $message => $messageConfiguration) {
15881580
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
1589-
throw new LogicException(sprintf('Messenger routing configuration contains a mistake: message "%s" does not exist. It needs to match an existing class or interface.', $message));
1581+
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
15901582
}
1583+
$senders = array_map(function ($sender) use ($senderAliases) {
1584+
return new Reference($senderAliases[$sender] ?? $sender);
1585+
}, $messageConfiguration['senders']);
15911586

1592-
if (1 < \count($messageConfiguration['senders'])) {
1593-
$senders = array_map(function ($sender) use ($senderAliases) {
1594-
return new Reference($senderAliases[$sender] ?? $sender);
1595-
}, $messageConfiguration['senders']);
1596-
$chainSenderDefinition = new Definition(ChainSender::class, array($senders));
1597-
$chainSenderDefinition->addTag('messenger.sender');
1598-
$chainSenderId = '.messenger.chain_sender.'.$message;
1599-
$container->setDefinition($chainSenderId, $chainSenderDefinition);
1600-
$messageToSenderIdMapping[$message] = $chainSenderId;
1601-
} else {
1602-
$messageToSenderIdMapping[$message] = $messageConfiguration['senders'][0];
1603-
}
1587+
$sendersId = 'messenger.senders.'.$message;
1588+
$sendersDefinition = $container->register($sendersId, RewindableGenerator::class)
1589+
->setFactory('current')
1590+
->addArgument(array(new IteratorArgument($senders)));
1591+
$messageToSendersMapping[$message] = new Reference($sendersId);
16041592

1605-
$messageToSendAndHandleMapping[$message] = $messageConfiguration['send_and_handle'];
1593+
$messagesToSendAndHandle[$message] = $messageConfiguration['send_and_handle'];
16061594
}
16071595

1608-
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdMapping);
1609-
$container->getDefinition('messenger.middleware.route_messages')->replaceArgument(1, $messageToSendAndHandleMapping);
1596+
$container->getDefinition('messenger.senders_locator')
1597+
->replaceArgument(0, $messageToSendersMapping)
1598+
->replaceArgument(1, $messagesToSendAndHandle)
1599+
;
16101600
}
16111601

16121602
private function registerCacheConfiguration(array $config, ContainerBuilder $container)

src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml

+6-11
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,12 @@
88
<defaults public="false" />
99

1010
<!-- Asynchronous -->
11-
<service id="messenger.asynchronous.routing.sender_locator" class="Symfony\Component\Messenger\Transport\Sender\Locator\ContainerSenderLocator">
12-
<argument type="service" id="messenger.sender_locator" />
13-
<argument type="collection" /> <!-- Message to sender ID mapping -->
11+
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
12+
<argument type="collection" /> <!-- Per message sender iterators -->
13+
<argument type="collection" /> <!-- Messages to send and handle -->
1414
</service>
15-
<service id="messenger.middleware.route_messages" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
16-
<argument type="service" id="messenger.asynchronous.routing.sender_locator" />
17-
<argument type="collection" /> <!-- Message to send and handle mapping -->
15+
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
16+
<argument type="service" id="messenger.senders_locator" />
1817
</service>
1918

2019
<!-- Message encoding/decoding -->
@@ -25,7 +24,7 @@
2524
</service>
2625

2726
<!-- Middleware -->
28-
<service id="messenger.middleware.call_message_handler" class="Symfony\Component\Messenger\Middleware\HandleMessageMiddleware" abstract="true">
27+
<service id="messenger.middleware.handle_message" class="Symfony\Component\Messenger\Middleware\HandleMessageMiddleware" abstract="true">
2928
<argument /> <!-- Bus handler resolver -->
3029
</service>
3130

@@ -48,10 +47,6 @@
4847
<tag name="container.service_locator" />
4948
<argument type="collection" />
5049
</service>
51-
<service id="messenger.sender_locator">
52-
<tag name="container.service_locator" />
53-
<argument type="collection" />
54-
</service>
5550

5651
<!-- transports -->
5752
<service id="messenger.transport_factory" class="Symfony\Component\Messenger\Transport\TransportFactory">

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/php/messenger_multiple_buses.php

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
'messenger.bus.queries' => array(
1414
'default_middleware' => false,
1515
'middleware' => array(
16-
'route_messages',
17-
'call_message_handler',
16+
'send_message',
17+
'handle_message',
1818
),
1919
),
2020
),

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/xml/messenger_multiple_buses.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
</framework:middleware>
1919
</framework:bus>
2020
<framework:bus name="messenger.bus.queries" default-middleware="false">
21-
<framework:middleware id="route_messages" />
22-
<framework:middleware id="call_message_handler" />
21+
<framework:middleware id="send_message" />
22+
<framework:middleware id="handle_message" />
2323
</framework:bus>
2424
</framework:messenger>
2525
</framework:config>

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/Fixtures/yml/messenger_multiple_buses.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,5 @@ framework:
99
messenger.bus.queries:
1010
default_middleware: false
1111
middleware:
12-
- "route_messages"
13-
- "call_message_handler"
12+
- "send_message"
13+
- "handle_message"

src/Symfony/Bundle/FrameworkBundle/Tests/DependencyInjection/FrameworkExtensionTest.php

+11-20
Original file line numberDiff line numberDiff line change
@@ -544,9 +544,7 @@ public function testMessengerTransports()
544544
$container = $this->createContainerFromFile('messenger_transports');
545545
$this->assertTrue($container->hasDefinition('messenger.transport.default'));
546546
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver'));
547-
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.sender'));
548547
$this->assertEquals(array(array('alias' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
549-
$this->assertEquals(array(array('alias' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.sender'));
550548

551549
$this->assertTrue($container->hasDefinition('messenger.transport.customised'));
552550
$transportFactory = $container->getDefinition('messenger.transport.customised')->getFactory();
@@ -563,28 +561,21 @@ public function testMessengerTransports()
563561
public function testMessengerRouting()
564562
{
565563
$container = $this->createContainerFromFile('messenger_routing');
566-
$senderLocatorDefinition = $container->getDefinition('messenger.asynchronous.routing.sender_locator');
567-
$sendMessageMiddlewareDefinition = $container->getDefinition('messenger.middleware.route_messages');
564+
$senderLocatorDefinition = $container->getDefinition('messenger.senders_locator');
568565

569-
$messageToSenderIdsMapping = array(
570-
DummyMessage::class => '.messenger.chain_sender.'.DummyMessage::class,
571-
SecondMessage::class => '.messenger.chain_sender.'.SecondMessage::class,
572-
'*' => 'amqp',
573-
);
574566
$messageToSendAndHandleMapping = array(
575567
DummyMessage::class => false,
576568
SecondMessage::class => true,
577569
'*' => false,
578570
);
579571

580-
$this->assertSame($messageToSenderIdsMapping, $senderLocatorDefinition->getArgument(1));
581-
$this->assertSame($messageToSendAndHandleMapping, $sendMessageMiddlewareDefinition->getArgument(1));
582-
$this->assertEquals(array(new Reference('messenger.transport.amqp'), new Reference('audit')), $container->getDefinition('.messenger.chain_sender.'.DummyMessage::class)->getArgument(0));
572+
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(1));
573+
$this->assertEquals(array(new Reference('messenger.transport.amqp'), new Reference('audit')), $container->getDefinition('messenger.senders.'.DummyMessage::class)->getArgument(0)[0]->getValues());
583574
}
584575

585576
/**
586577
* @expectedException \Symfony\Component\DependencyInjection\Exception\LogicException
587-
* @expectedExceptionMessage The default Messenger serializer cannot be enabled as the Serializer support is not available. Try enable it or install it by running "composer require symfony/serializer-pack".
578+
* @expectedExceptionMessage The default Messenger serializer cannot be enabled as the Serializer support is not available. Try enabling it or running "composer require symfony/serializer-pack".
588579
*/
589580
public function testMessengerTransportConfigurationWithoutSerializer()
590581
{
@@ -593,7 +584,7 @@ public function testMessengerTransportConfigurationWithoutSerializer()
593584

594585
/**
595586
* @expectedException \Symfony\Component\DependencyInjection\Exception\LogicException
596-
* @expectedExceptionMessage The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enable it or install it by running "composer require symfony/serializer-pack".
587+
* @expectedExceptionMessage The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enabling it or running "composer require symfony/serializer-pack".
597588
*/
598589
public function testMessengerAMQPTransportConfigurationWithoutSerializer()
599590
{
@@ -619,22 +610,22 @@ public function testMessengerWithMultipleBuses()
619610
$this->assertSame(array(), $container->getDefinition('messenger.bus.commands')->getArgument(0));
620611
$this->assertEquals(array(
621612
array('id' => 'logging'),
622-
array('id' => 'route_messages'),
623-
array('id' => 'call_message_handler'),
613+
array('id' => 'send_message'),
614+
array('id' => 'handle_message'),
624615
), $container->getParameter('messenger.bus.commands.middleware'));
625616
$this->assertTrue($container->has('messenger.bus.events'));
626617
$this->assertSame(array(), $container->getDefinition('messenger.bus.events')->getArgument(0));
627618
$this->assertEquals(array(
628619
array('id' => 'logging'),
629620
array('id' => 'with_factory', 'arguments' => array('foo', true, array('bar' => 'baz'))),
630-
array('id' => 'route_messages'),
631-
array('id' => 'call_message_handler'),
621+
array('id' => 'send_message'),
622+
array('id' => 'handle_message'),
632623
), $container->getParameter('messenger.bus.events.middleware'));
633624
$this->assertTrue($container->has('messenger.bus.queries'));
634625
$this->assertSame(array(), $container->getDefinition('messenger.bus.queries')->getArgument(0));
635626
$this->assertEquals(array(
636-
array('id' => 'route_messages', 'arguments' => array()),
637-
array('id' => 'call_message_handler', 'arguments' => array()),
627+
array('id' => 'send_message', 'arguments' => array()),
628+
array('id' => 'handle_message', 'arguments' => array()),
638629
), $container->getParameter('messenger.bus.queries.middleware'));
639630

640631
$this->assertTrue($container->hasAlias('message_bus'));

0 commit comments

Comments
 (0)