Skip to content

Commit e157ded

Browse files
committed
feature #24411 [Messenger] Add a new Messenger component (sroze)
This PR was squashed before being merged into the 4.1-dev branch (closes #24411). Discussion ---------- [Messenger] Add a new Messenger component | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | #24326 | License | MIT | Doc PR | symfony/symfony-docs#9437 As discussed in #24326. This PR is to help going forward with the discussions of having a Message component. # Resources | What | Where | --- | --- | Documentation | [In the PR](https://github.com/sroze/symfony/blob/add-message-component/src/Symfony/Component/Message/README.md) | Demo | [In `sroze/symfony-demo:message-component-demo`](https://github.com/sroze/symfony-demo/compare/message-component-demo) | [php-enqueue](https://github.com/php-enqueue/enqueue-dev) adapter | 1. Source: [In `sroze/enqueue-bridge`](https://github.com/sroze/enqueue-bridge) _(to be moved as `symfony/enqueue-bridge` I guess)_<br/>2. Demo: [In `sroze/symfony-demo:message-component-demo-with-enqueue`](sroze/symfony-demo@message-component-demo...sroze:message-component-demo-with-enqueue) | [Swarrot](https://github.com/swarrot/swarrot) adapter | **Outdated adapter, waiting for stabilization** 1. Source: [In `sroze/swarrot-bridge`](https://github.com/sroze/swarrot-bridge) _(to be moved as `symfony/swarrot-bridge` I guess)_<br/>2. Demo: [In `sroze/symfony-demo:message-component-demo-with-swarrot`](sroze/symfony-demo@message-component-demo...sroze:message-component-demo-with-swarrot) | [HTTP](https://github.com/sroze/message-http-adapter) adapter | **Outdated adapter, waiting for stabilization** 1. Source: [In `sroze/message-http-adapter`](https://github.com/sroze/message-http-adapter) <br/>2. Demo: [In `sroze/symfony-demo:message-component-demo-with-http-adapter`](sroze/symfony-demo@message-component-demo...sroze:message-component-demo-with-http-adapter) | Web profiler integration | _In the pull-request_ # Important points 1. **Tests are not in the PR as they were written in PhpSpec & Behat.** If we decide to go forward with this approach, I'll translate them to PHPUnit. 2. The aim is not to solve all the message/queuing problems but provide a good, simple and extensible message bus for developers. 3. The communication with the actual AMQP/API brokers is down to the adapters for now. Not sure if we need to ship some by default or not 🤔 I guess that this would replace #23842 & #23315. # Changes from the proposals Based on the comments, a few changes have been made from the proposal. 1. `MessageProducer`s have been renamed to `MessageSender`s 2. `MessageConsumer`s have been renamed to `MessageReceiver`s Commits ------- c9cfda9 [Messenger] Add a new Messenger component
2 parents acf49e9 + c9cfda9 commit e157ded

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1967
-0
lines changed

composer.json

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
"symfony/intl": "self.version",
5656
"symfony/ldap": "self.version",
5757
"symfony/lock": "self.version",
58+
"symfony/messenger": "self.version",
5859
"symfony/monolog-bridge": "self.version",
5960
"symfony/options-resolver": "self.version",
6061
"symfony/process": "self.version",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Bundle\FrameworkBundle\Command;
13+
14+
use Psr\Container\ContainerInterface;
15+
use Symfony\Component\Console\Command\Command;
16+
use Symfony\Component\Console\Input\InputArgument;
17+
use Symfony\Component\Console\Input\InputInterface;
18+
use Symfony\Component\Console\Input\InputOption;
19+
use Symfony\Component\Console\Output\OutputInterface;
20+
use Symfony\Component\Messenger\MessageBusInterface;
21+
use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver;
22+
use Symfony\Component\Messenger\Transport\ReceiverInterface;
23+
use Symfony\Component\Messenger\Worker;
24+
25+
/**
26+
* @author Samuel Roze <samuel.roze@gmail.com>
27+
*/
28+
class MessengerConsumeMessagesCommand extends Command
29+
{
30+
protected static $defaultName = 'messenger:consume-messages';
31+
32+
private $bus;
33+
private $receiverLocator;
34+
35+
public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator)
36+
{
37+
parent::__construct();
38+
39+
$this->bus = $bus;
40+
$this->receiverLocator = $receiverLocator;
41+
}
42+
43+
/**
44+
* {@inheritdoc}
45+
*/
46+
protected function configure()
47+
{
48+
$this
49+
->setDefinition(array(
50+
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
51+
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
52+
))
53+
->setDescription('Consumes messages')
54+
->setHelp(<<<'EOF'
55+
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.
56+
57+
<info>php %command.full_name% <receiver-name></info>
58+
59+
Use the --limit option to limit the number of messages received:
60+
61+
<info>php %command.full_name% <receiver-name> --limit=10</info>
62+
EOF
63+
)
64+
;
65+
}
66+
67+
/**
68+
* {@inheritdoc}
69+
*/
70+
protected function execute(InputInterface $input, OutputInterface $output)
71+
{
72+
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
73+
throw new \RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
74+
}
75+
76+
if (!($receiver = $this->receiverLocator->get($receiverName)) instanceof ReceiverInterface) {
77+
throw new \RuntimeException(sprintf('Receiver "%s" is not a valid message consumer. It must implement the "%s" interface.', $receiverName, ReceiverInterface::class));
78+
}
79+
80+
if ($limit = $input->getOption('limit')) {
81+
$receiver = new MaximumCountReceiver($receiver, $limit);
82+
}
83+
84+
$worker = new Worker($receiver, $this->bus);
85+
$worker->run();
86+
}
87+
}

src/Symfony/Bundle/FrameworkBundle/Controller/AbstractController.php

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use Symfony\Component\HttpFoundation\RequestStack;
1919
use Symfony\Component\HttpFoundation\Session\SessionInterface;
2020
use Symfony\Component\HttpKernel\HttpKernelInterface;
21+
use Symfony\Component\Messenger\MessageBusInterface;
2122
use Symfony\Component\Routing\RouterInterface;
2223
use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface;
2324
use Symfony\Component\Security\Core\Authorization\AuthorizationCheckerInterface;
@@ -84,6 +85,7 @@ public static function getSubscribedServices()
8485
'security.token_storage' => '?'.TokenStorageInterface::class,
8586
'security.csrf.token_manager' => '?'.CsrfTokenManagerInterface::class,
8687
'parameter_bag' => '?'.ContainerInterface::class,
88+
'message_bus' => '?'.MessageBusInterface::class,
8789
);
8890
}
8991
}

src/Symfony/Bundle/FrameworkBundle/Controller/ControllerTrait.php

+16
Original file line numberDiff line numberDiff line change
@@ -382,4 +382,20 @@ protected function isCsrfTokenValid(string $id, ?string $token): bool
382382

383383
return $this->container->get('security.csrf.token_manager')->isTokenValid(new CsrfToken($id, $token));
384384
}
385+
386+
/**
387+
* Dispatches a message to the bus.
388+
*
389+
* @param object $message The message to dispatch
390+
*
391+
* @final
392+
*/
393+
protected function dispatchMessage($message)
394+
{
395+
if (!$this->container->has('message_bus')) {
396+
throw new \LogicException('The message bus is not enabled in your application. Try running "composer require symfony/messenger".');
397+
}
398+
399+
return $this->container->get('message_bus')->dispatch($message);
400+
}
385401
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Bundle\FrameworkBundle\DataCollector;
13+
14+
use Symfony\Component\HttpFoundation\Request;
15+
use Symfony\Component\HttpFoundation\Response;
16+
use Symfony\Component\HttpKernel\DataCollector\DataCollector;
17+
use Symfony\Component\Messenger\MiddlewareInterface;
18+
19+
/**
20+
* @author Samuel Roze <samuel.roze@gmail.com>
21+
*/
22+
class MessengerDataCollector extends DataCollector implements MiddlewareInterface
23+
{
24+
/**
25+
* {@inheritdoc}
26+
*/
27+
public function collect(Request $request, Response $response, \Exception $exception = null)
28+
{
29+
return $this->data;
30+
}
31+
32+
/**
33+
* {@inheritdoc}
34+
*/
35+
public function getName()
36+
{
37+
return 'messages';
38+
}
39+
40+
/**
41+
* {@inheritdoc}
42+
*/
43+
public function reset()
44+
{
45+
$this->data = array();
46+
}
47+
48+
/**
49+
* {@inheritdoc}
50+
*/
51+
public function handle($message, callable $next)
52+
{
53+
$debugRepresentation = array(
54+
'message' => array(
55+
'type' => get_class($message),
56+
),
57+
);
58+
59+
$exception = null;
60+
try {
61+
$result = $next($message);
62+
63+
if (is_object($result)) {
64+
$debugRepresentation['result'] = array(
65+
'type' => get_class($result),
66+
);
67+
} else {
68+
$debugRepresentation['result'] = array(
69+
'type' => gettype($result),
70+
'value' => $result,
71+
);
72+
}
73+
} catch (\Throwable $exception) {
74+
$debugRepresentation['exception'] = array(
75+
'type' => get_class($exception),
76+
'message' => $exception->getMessage(),
77+
);
78+
}
79+
80+
$this->data[] = $debugRepresentation;
81+
82+
if (null !== $exception) {
83+
throw $exception;
84+
}
85+
86+
return $result;
87+
}
88+
89+
public function getMessages(): array
90+
{
91+
return $this->data;
92+
}
93+
}

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php

+33
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
use Symfony\Component\Lock\Lock;
2323
use Symfony\Component\Lock\Store\SemaphoreStore;
2424
use Symfony\Component\Security\Csrf\CsrfTokenManagerInterface;
25+
use Symfony\Component\Messenger\MessageBusInterface;
2526
use Symfony\Component\Serializer\Serializer;
2627
use Symfony\Component\Translation\Translator;
2728
use Symfony\Component\Validator\Validation;
@@ -102,6 +103,7 @@ public function getConfigTreeBuilder()
102103
$this->addPhpErrorsSection($rootNode);
103104
$this->addWebLinkSection($rootNode);
104105
$this->addLockSection($rootNode);
106+
$this->addMessengerSection($rootNode);
105107

106108
return $treeBuilder;
107109
}
@@ -956,4 +958,35 @@ private function addWebLinkSection(ArrayNodeDefinition $rootNode)
956958
->end()
957959
;
958960
}
961+
962+
private function addMessengerSection(ArrayNodeDefinition $rootNode)
963+
{
964+
$rootNode
965+
->children()
966+
->arrayNode('messenger')
967+
->info('Messenger configuration')
968+
->{!class_exists(FullStack::class) && class_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}()
969+
->children()
970+
->arrayNode('routing')
971+
->useAttributeAsKey('message_class')
972+
->prototype('array')
973+
->beforeNormalization()
974+
->ifString()
975+
->then(function ($v) {
976+
return array('senders' => array($v));
977+
})
978+
->end()
979+
->children()
980+
->arrayNode('senders')
981+
->requiresAtLeastOneElement()
982+
->prototype('scalar')->end()
983+
->end()
984+
->end()
985+
->end()
986+
->end()
987+
->end()
988+
->end()
989+
->end()
990+
;
991+
}
959992
}

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

+32
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
use Symfony\Component\Lock\LockInterface;
6060
use Symfony\Component\Lock\Store\StoreFactory;
6161
use Symfony\Component\Lock\StoreInterface;
62+
use Symfony\Component\Messenger\Transport\ReceiverInterface;
63+
use Symfony\Component\Messenger\Transport\SenderInterface;
6264
use Symfony\Component\PropertyAccess\PropertyAccessor;
6365
use Symfony\Component\PropertyInfo\PropertyAccessExtractorInterface;
6466
use Symfony\Component\PropertyInfo\PropertyDescriptionExtractorInterface;
@@ -267,6 +269,12 @@ public function load(array $configs, ContainerBuilder $container)
267269
$this->registerLockConfiguration($config['lock'], $container, $loader);
268270
}
269271

272+
if ($this->isConfigEnabled($container, $config['messenger'])) {
273+
$this->registerMessengerConfiguration($config['messenger'], $container, $loader);
274+
} else {
275+
$container->removeDefinition('console.command.messenger_consume_messages');
276+
}
277+
270278
if ($this->isConfigEnabled($container, $config['web_link'])) {
271279
if (!class_exists(HttpHeaderSerializer::class)) {
272280
throw new LogicException('WebLink support cannot be enabled as the WebLink component is not installed.');
@@ -334,6 +342,10 @@ public function load(array $configs, ContainerBuilder $container)
334342
->addTag('validator.constraint_validator');
335343
$container->registerForAutoconfiguration(ObjectInitializerInterface::class)
336344
->addTag('validator.initializer');
345+
$container->registerForAutoconfiguration(ReceiverInterface::class)
346+
->addTag('messenger.receiver');
347+
$container->registerForAutoconfiguration(SenderInterface::class)
348+
->addTag('messenger.sender');
337349

338350
if (!$container->getParameter('kernel.debug')) {
339351
// remove tagged iterator argument for resource checkers
@@ -1415,6 +1427,26 @@ private function registerLockConfiguration(array $config, ContainerBuilder $cont
14151427
}
14161428
}
14171429

1430+
private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader)
1431+
{
1432+
$loader->load('messenger.xml');
1433+
1434+
$senderLocatorMapping = array();
1435+
$messageToSenderIdsMapping = array();
1436+
foreach ($config['routing'] as $message => $messageConfiguration) {
1437+
foreach ($messageConfiguration['senders'] as $sender) {
1438+
if (null !== $sender) {
1439+
$senderLocatorMapping[$sender] = new Reference($sender);
1440+
}
1441+
}
1442+
1443+
$messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
1444+
}
1445+
1446+
$container->getDefinition('messenger.sender_locator')->replaceArgument(0, $senderLocatorMapping);
1447+
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping);
1448+
}
1449+
14181450
private function registerCacheConfiguration(array $config, ContainerBuilder $container)
14191451
{
14201452
$version = new Parameter('container.build_id');

src/Symfony/Bundle/FrameworkBundle/FrameworkBundle.php

+2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
use Symfony\Component\HttpKernel\DependencyInjection\RegisterControllerArgumentLocatorsPass;
3535
use Symfony\Component\HttpKernel\DependencyInjection\RemoveEmptyControllerArgumentLocatorsPass;
3636
use Symfony\Component\HttpKernel\DependencyInjection\ResettableServicePass;
37+
use Symfony\Component\Messenger\DependencyInjection\MessengerPass;
3738
use Symfony\Component\PropertyInfo\DependencyInjection\PropertyInfoPass;
3839
use Symfony\Component\Routing\DependencyInjection\RoutingResolverPass;
3940
use Symfony\Component\Serializer\DependencyInjection\SerializerPass;
@@ -118,6 +119,7 @@ public function build(ContainerBuilder $container)
118119
$container->addCompilerPass(new ResettableServicePass());
119120
$container->addCompilerPass(new TestServiceContainerWeakRefPass(), PassConfig::TYPE_BEFORE_REMOVING, -32);
120121
$container->addCompilerPass(new TestServiceContainerRealRefPass(), PassConfig::TYPE_AFTER_REMOVING);
122+
$this->addCompilerPassIfExists($container, MessengerPass::class);
121123

122124
if ($container->getParameter('kernel.debug')) {
123125
$container->addCompilerPass(new AddDebugLogProcessorPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, -32);

src/Symfony/Bundle/FrameworkBundle/Resources/config/console.xml

+7
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,13 @@
6969
<tag name="console.command" command="debug:event-dispatcher" />
7070
</service>
7171

72+
<service id="console.command.messenger_consume_messages" class="Symfony\Bundle\FrameworkBundle\Command\MessengerConsumeMessagesCommand">
73+
<argument type="service" id="message_bus" />
74+
<argument type="service" id="messenger.receiver_locator" />
75+
76+
<tag name="console.command" command="messenger:consume-messages" />
77+
</service>
78+
7279
<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
7380
<argument type="service" id="router" />
7481
<tag name="console.command" command="debug:router" />

0 commit comments

Comments
 (0)