Skip to content

[Messenger] make senders and handlers subscribing to parent interfaces receive *all* matching messages, wildcard included #29010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -391,14 +391,15 @@ protected function isCsrfTokenValid(string $id, ?string $token): bool
/**
* Dispatches a message to the bus.
*
* @param object $message The message to dispatch
* @param object|Envelope $message The message or the message pre-wrapped in an envelope
*
* @final
*/
protected function dispatchMessage($message): Envelope
{
if (!$this->container->has('message_bus')) {
throw new \LogicException('The message bus is not enabled in your application. Try running "composer require symfony/messenger".');
$message = class_exists(Envelope::class) ? 'You need to define the "messenger.default_bus" configuration option.' : 'Try running "composer require symfony/messenger".';
throw new \LogicException('The message bus is not enabled in your application. '.$message);
}

return $this->container->get('message_bus')->dispatch($message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class UnusedTagsPass implements CompilerPassInterface
'kernel.event_subscriber',
'kernel.fragment_renderer',
'messenger.bus',
'messenger.sender',
'messenger.receiver',
'messenger.message_handler',
'monolog.logger',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ function ($a) {
->end()
->end()
->end()
->scalarNode('default_bus')->defaultValue(null)->end()
->scalarNode('default_bus')->defaultNull()->end()
->arrayNode('buses')
->defaultValue(array('messenger.bus.default' => array('default_middleware' => true, 'middleware' => array())))
->useAttributeAsKey('name')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
use Symfony\Component\Console\Application;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\DependencyInjection\Alias;
use Symfony\Component\DependencyInjection\Argument\IteratorArgument;
use Symfony\Component\DependencyInjection\Argument\RewindableGenerator;
use Symfony\Component\DependencyInjection\Argument\ServiceClosureArgument;
use Symfony\Component\DependencyInjection\ChildDefinition;
use Symfony\Component\DependencyInjection\ContainerBuilder;
Expand Down Expand Up @@ -68,7 +70,6 @@
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Sender\ChainSender;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
use Symfony\Component\PropertyAccess\PropertyAccessor;
Expand Down Expand Up @@ -1491,7 +1492,7 @@ private function registerLockConfiguration(array $config, ContainerBuilder $cont
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader, array $serializerConfig, array $validationConfig)
{
if (!interface_exists(MessageBusInterface::class)) {
throw new LogicException('Messenger support cannot be enabled as the Messenger component is not installed.');
throw new LogicException('Messenger support cannot be enabled as the Messenger component is not installed. Try running "composer require symfony/messenger".');
}

$loader->load('messenger.xml');
Expand All @@ -1502,7 +1503,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
} else {
if ('messenger.transport.symfony_serializer' === $config['serializer']['id']) {
if (!$this->isConfigEnabled($container, $serializerConfig)) {
throw new LogicException('The default Messenger serializer cannot be enabled as the Serializer support is not available. Try enable it or install it by running "composer require symfony/serializer-pack".');
throw new LogicException('The default Messenger serializer cannot be enabled as the Serializer support is not available. Try enabling it or running "composer require symfony/serializer-pack".');
}

$container->getDefinition('messenger.transport.symfony_serializer')
Expand All @@ -1517,17 +1518,13 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
}
}

if (null === $config['default_bus']) {
if (\count($config['buses']) > 1) {
throw new LogicException(sprintf('You need to define a default bus with the "default_bus" configuration. Possible values: %s', implode(', ', array_keys($config['buses']))));
}

if (null === $config['default_bus'] && 1 === \count($config['buses'])) {
$config['default_bus'] = key($config['buses']);
}

$defaultMiddleware = array(
'before' => array(array('id' => 'logging')),
'after' => array(array('id' => 'route_messages'), array('id' => 'call_message_handler')),
'after' => array(array('id' => 'send_message'), array('id' => 'handle_message')),
);
foreach ($config['buses'] as $busId => $bus) {
$middleware = $bus['middleware'];
Expand Down Expand Up @@ -1562,51 +1559,44 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
}
}

if (!$container->hasAlias('message_bus')) {
throw new LogicException(sprintf('The default bus named "%s" is not defined. Define it or change the default bus name.', $config['default_bus']));
}

$senderAliases = array();
foreach ($config['transports'] as $name => $transport) {
if (0 === strpos($transport['dsn'], 'amqp://') && !$container->hasDefinition('messenger.transport.amqp.factory')) {
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enable it or install it by running "composer require symfony/serializer-pack".');
throw new LogicException('The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enabling it or running "composer require symfony/serializer-pack".');
}

$transportDefinition = (new Definition(TransportInterface::class))
->setFactory(array(new Reference('messenger.transport_factory'), 'createTransport'))
->setArguments(array($transport['dsn'], $transport['options']))
->addTag('messenger.receiver', array('alias' => $name))
->addTag('messenger.sender', array('alias' => $name))
;
$container->setDefinition($transportId = 'messenger.transport.'.$name, $transportDefinition);
$senderAliases[$name] = $transportId;
}

$messageToSenderIdMapping = array();
$messageToSendAndHandleMapping = array();
$messageToSendersMapping = array();
$messagesToSendAndHandle = array();
foreach ($config['routing'] as $message => $messageConfiguration) {
if ('*' !== $message && !class_exists($message) && !interface_exists($message, false)) {
throw new LogicException(sprintf('Messenger routing configuration contains a mistake: message "%s" does not exist. It needs to match an existing class or interface.', $message));
throw new LogicException(sprintf('Invalid Messenger routing configuration: class or interface "%s" not found.', $message));
}
$senders = array_map(function ($sender) use ($senderAliases) {
return new Reference($senderAliases[$sender] ?? $sender);
}, $messageConfiguration['senders']);

if (1 < \count($messageConfiguration['senders'])) {
$senders = array_map(function ($sender) use ($senderAliases) {
return new Reference($senderAliases[$sender] ?? $sender);
}, $messageConfiguration['senders']);
$chainSenderDefinition = new Definition(ChainSender::class, array($senders));
$chainSenderDefinition->addTag('messenger.sender');
$chainSenderId = '.messenger.chain_sender.'.$message;
$container->setDefinition($chainSenderId, $chainSenderDefinition);
$messageToSenderIdMapping[$message] = $chainSenderId;
} else {
$messageToSenderIdMapping[$message] = $messageConfiguration['senders'][0];
}
$sendersId = 'messenger.senders.'.$message;
$sendersDefinition = $container->register($sendersId, RewindableGenerator::class)
->setFactory('current')
->addArgument(array(new IteratorArgument($senders)));
$messageToSendersMapping[$message] = new Reference($sendersId);

$messageToSendAndHandleMapping[$message] = $messageConfiguration['send_and_handle'];
$messagesToSendAndHandle[$message] = $messageConfiguration['send_and_handle'];
}

$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdMapping);
$container->getDefinition('messenger.middleware.route_messages')->replaceArgument(1, $messageToSendAndHandleMapping);
$container->getDefinition('messenger.senders_locator')
->replaceArgument(0, $messageToSendersMapping)
->replaceArgument(1, $messagesToSendAndHandle)
;
}

private function registerCacheConfiguration(array $config, ContainerBuilder $container)
Expand Down
17 changes: 6 additions & 11 deletions src/Symfony/Bundle/FrameworkBundle/Resources/config/messenger.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@
<defaults public="false" />

<!-- Asynchronous -->
<service id="messenger.asynchronous.routing.sender_locator" class="Symfony\Component\Messenger\Transport\Sender\Locator\ContainerSenderLocator">
<argument type="service" id="messenger.sender_locator" />
<argument type="collection" /> <!-- Message to sender ID mapping -->
<service id="messenger.senders_locator" class="Symfony\Component\Messenger\Transport\Sender\SendersLocator">
<argument type="collection" /> <!-- Per message sender iterators -->
<argument type="collection" /> <!-- Messages to send and handle -->
</service>
<service id="messenger.middleware.route_messages" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
<argument type="service" id="messenger.asynchronous.routing.sender_locator" />
<argument type="collection" /> <!-- Message to send and handle mapping -->
<service id="messenger.middleware.send_message" class="Symfony\Component\Messenger\Middleware\SendMessageMiddleware">
<argument type="service" id="messenger.senders_locator" />
</service>

<!-- Message encoding/decoding -->
Expand All @@ -25,7 +24,7 @@
</service>

<!-- Middleware -->
<service id="messenger.middleware.call_message_handler" class="Symfony\Component\Messenger\Middleware\HandleMessageMiddleware" abstract="true">
<service id="messenger.middleware.handle_message" class="Symfony\Component\Messenger\Middleware\HandleMessageMiddleware" abstract="true">
<argument /> <!-- Bus handler resolver -->
</service>

Expand All @@ -48,10 +47,6 @@
<tag name="container.service_locator" />
<argument type="collection" />
</service>
<service id="messenger.sender_locator">
<tag name="container.service_locator" />
<argument type="collection" />
</service>

<!-- transports -->
<service id="messenger.transport_factory" class="Symfony\Component\Messenger\Transport\TransportFactory">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
'messenger.bus.queries' => array(
'default_middleware' => false,
'middleware' => array(
'route_messages',
'call_message_handler',
'send_message',
'handle_message',
),
),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
</framework:middleware>
</framework:bus>
<framework:bus name="messenger.bus.queries" default-middleware="false">
<framework:middleware id="route_messages" />
<framework:middleware id="call_message_handler" />
<framework:middleware id="send_message" />
<framework:middleware id="handle_message" />
</framework:bus>
</framework:messenger>
</framework:config>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ framework:
messenger.bus.queries:
default_middleware: false
middleware:
- "route_messages"
- "call_message_handler"
- "send_message"
- "handle_message"
Original file line number Diff line number Diff line change
Expand Up @@ -544,9 +544,7 @@ public function testMessengerTransports()
$container = $this->createContainerFromFile('messenger_transports');
$this->assertTrue($container->hasDefinition('messenger.transport.default'));
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.receiver'));
$this->assertTrue($container->getDefinition('messenger.transport.default')->hasTag('messenger.sender'));
$this->assertEquals(array(array('alias' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.receiver'));
$this->assertEquals(array(array('alias' => 'default')), $container->getDefinition('messenger.transport.default')->getTag('messenger.sender'));

$this->assertTrue($container->hasDefinition('messenger.transport.customised'));
$transportFactory = $container->getDefinition('messenger.transport.customised')->getFactory();
Expand All @@ -563,28 +561,21 @@ public function testMessengerTransports()
public function testMessengerRouting()
{
$container = $this->createContainerFromFile('messenger_routing');
$senderLocatorDefinition = $container->getDefinition('messenger.asynchronous.routing.sender_locator');
$sendMessageMiddlewareDefinition = $container->getDefinition('messenger.middleware.route_messages');
$senderLocatorDefinition = $container->getDefinition('messenger.senders_locator');

$messageToSenderIdsMapping = array(
DummyMessage::class => '.messenger.chain_sender.'.DummyMessage::class,
SecondMessage::class => '.messenger.chain_sender.'.SecondMessage::class,
'*' => 'amqp',
);
$messageToSendAndHandleMapping = array(
DummyMessage::class => false,
SecondMessage::class => true,
'*' => false,
);

$this->assertSame($messageToSenderIdsMapping, $senderLocatorDefinition->getArgument(1));
$this->assertSame($messageToSendAndHandleMapping, $sendMessageMiddlewareDefinition->getArgument(1));
$this->assertEquals(array(new Reference('messenger.transport.amqp'), new Reference('audit')), $container->getDefinition('.messenger.chain_sender.'.DummyMessage::class)->getArgument(0));
$this->assertSame($messageToSendAndHandleMapping, $senderLocatorDefinition->getArgument(1));
$this->assertEquals(array(new Reference('messenger.transport.amqp'), new Reference('audit')), $container->getDefinition('messenger.senders.'.DummyMessage::class)->getArgument(0)[0]->getValues());
}

/**
* @expectedException \Symfony\Component\DependencyInjection\Exception\LogicException
* @expectedExceptionMessage The default Messenger serializer cannot be enabled as the Serializer support is not available. Try enable it or install it by running "composer require symfony/serializer-pack".
* @expectedExceptionMessage The default Messenger serializer cannot be enabled as the Serializer support is not available. Try enabling it or running "composer require symfony/serializer-pack".
*/
public function testMessengerTransportConfigurationWithoutSerializer()
{
Expand All @@ -593,7 +584,7 @@ public function testMessengerTransportConfigurationWithoutSerializer()

/**
* @expectedException \Symfony\Component\DependencyInjection\Exception\LogicException
* @expectedExceptionMessage The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enable it or install it by running "composer require symfony/serializer-pack".
* @expectedExceptionMessage The default AMQP transport is not available. Make sure you have installed and enabled the Serializer component. Try enabling it or running "composer require symfony/serializer-pack".
*/
public function testMessengerAMQPTransportConfigurationWithoutSerializer()
{
Expand All @@ -619,22 +610,22 @@ public function testMessengerWithMultipleBuses()
$this->assertSame(array(), $container->getDefinition('messenger.bus.commands')->getArgument(0));
$this->assertEquals(array(
array('id' => 'logging'),
array('id' => 'route_messages'),
array('id' => 'call_message_handler'),
array('id' => 'send_message'),
array('id' => 'handle_message'),
), $container->getParameter('messenger.bus.commands.middleware'));
$this->assertTrue($container->has('messenger.bus.events'));
$this->assertSame(array(), $container->getDefinition('messenger.bus.events')->getArgument(0));
$this->assertEquals(array(
array('id' => 'logging'),
array('id' => 'with_factory', 'arguments' => array('foo', true, array('bar' => 'baz'))),
array('id' => 'route_messages'),
array('id' => 'call_message_handler'),
array('id' => 'send_message'),
array('id' => 'handle_message'),
), $container->getParameter('messenger.bus.events.middleware'));
$this->assertTrue($container->has('messenger.bus.queries'));
$this->assertSame(array(), $container->getDefinition('messenger.bus.queries')->getArgument(0));
$this->assertEquals(array(
array('id' => 'route_messages', 'arguments' => array()),
array('id' => 'call_message_handler', 'arguments' => array()),
array('id' => 'send_message', 'arguments' => array()),
array('id' => 'handle_message', 'arguments' => array()),
), $container->getParameter('messenger.bus.queries.middleware'));

$this->assertTrue($container->hasAlias('message_bus'));
Expand Down
Loading