Skip to content

[Messenger] Add a new Messenger component #24411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"symfony/intl": "self.version",
"symfony/ldap": "self.version",
"symfony/lock": "self.version",
"symfony/messenger": "self.version",
"symfony/monolog-bridge": "self.version",
"symfony/options-resolver": "self.version",
"symfony/process": "self.version",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Bundle\FrameworkBundle\Command;

use Psr\Container\ContainerInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Transport\Enhancers\MaximumCountReceiver;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Worker;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class MessengerConsumeMessagesCommand extends Command
{
protected static $defaultName = 'messenger:consume-messages';

private $bus;
private $receiverLocator;

public function __construct(MessageBusInterface $bus, ContainerInterface $receiverLocator)
{
parent::__construct();

$this->bus = $bus;
$this->receiverLocator = $receiverLocator;
}

/**
* {@inheritdoc}
*/
protected function configure()
{
$this
->setDefinition(array(
new InputArgument('receiver', InputArgument::REQUIRED, 'Name of the receiver'),
new InputOption('limit', 'l', InputOption::VALUE_REQUIRED, 'Limit the number of received messages'),
))
->setDescription('Consumes messages')
->setHelp(<<<'EOF'
The <info>%command.name%</info> command consumes messages and dispatches them to the message bus.

<info>php %command.full_name% <receiver-name></info>

Use the --limit option to limit the number of messages received:

<info>php %command.full_name% <receiver-name> --limit=10</info>
EOF
)
;
}

/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
if (!$this->receiverLocator->has($receiverName = $input->getArgument('receiver'))) {
throw new \RuntimeException(sprintf('Receiver "%s" does not exist.', $receiverName));
}

if (!($receiver = $this->receiverLocator->get($receiverName)) instanceof ReceiverInterface) {
throw new \RuntimeException(sprintf('Receiver "%s" is not a valid message consumer. It must implement the "%s" interface.', $receiverName, ReceiverInterface::class));
}

if ($limit = $input->getOption('limit')) {
$receiver = new MaximumCountReceiver($receiver, $limit);
}

$worker = new Worker($receiver, $this->bus);
$worker->run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Symfony\Component\HttpFoundation\RequestStack;
use Symfony\Component\HttpFoundation\Session\SessionInterface;
use Symfony\Component\HttpKernel\HttpKernelInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Routing\RouterInterface;
use Symfony\Component\Security\Core\Authentication\Token\Storage\TokenStorageInterface;
use Symfony\Component\Security\Core\Authorization\AuthorizationCheckerInterface;
Expand Down Expand Up @@ -84,6 +85,7 @@ public static function getSubscribedServices()
'security.token_storage' => '?'.TokenStorageInterface::class,
'security.csrf.token_manager' => '?'.CsrfTokenManagerInterface::class,
'parameter_bag' => '?'.ContainerInterface::class,
'message_bus' => '?'.MessageBusInterface::class,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? It looks like this service is used nowhere in the trait.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you can do a $this->get('message_bus') within your controllers.

Copy link
Member

@dunglas dunglas Mar 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that we do that for any other service, nor that we want to promote that (we prefer to promote regular injection since 4.0). Other services are injected here mostly for backward compatibility (when using the proxy methods).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, maybe can you add a proxy method in the trait, for convenience and discoverability by newcomers.

Copy link
Contributor Author

@sroze sroze Mar 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the dispatchMessage method on the ControllerTrait in 0a31079.

);
}
}
16 changes: 16 additions & 0 deletions src/Symfony/Bundle/FrameworkBundle/Controller/ControllerTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -382,4 +382,20 @@ protected function isCsrfTokenValid(string $id, ?string $token): bool

return $this->container->get('security.csrf.token_manager')->isTokenValid(new CsrfToken($id, $token));
}

/**
* Dispatches a message to the bus.
*
* @param object $message The message to dispatch
*
* @final
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an annotation @final but method is not final itself

*/
protected function dispatchMessage($message)
{
if (!$this->container->has('message_bus')) {
throw new \LogicException('The message bus is not enabled in your application. Try running "composer require symfony/messenger".');
}

return $this->container->get('message_bus')->dispatch($message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Bundle\FrameworkBundle\DataCollector;

use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpKernel\DataCollector\DataCollector;
use Symfony\Component\Messenger\MiddlewareInterface;

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class MessengerDataCollector extends DataCollector implements MiddlewareInterface
{
/**
* {@inheritdoc}
*/
public function collect(Request $request, Response $response, \Exception $exception = null)
{
return $this->data;
}

/**
* {@inheritdoc}
*/
public function getName()
{
return 'messages';
}

/**
* {@inheritdoc}
*/
public function reset()
{
$this->data = array();
}

/**
* {@inheritdoc}
*/
public function handle($message, callable $next)
{
$debugRepresentation = array(
'message' => array(
'type' => get_class($message),
),
);

$exception = null;
try {
$result = $next($message);

if (is_object($result)) {
$debugRepresentation['result'] = array(
'type' => get_class($result),
);
} else {
$debugRepresentation['result'] = array(
'type' => gettype($result),
'value' => $result,
);
}
} catch (\Throwable $exception) {
$debugRepresentation['exception'] = array(
'type' => get_class($exception),
'message' => $exception->getMessage(),
);
}

$this->data[] = $debugRepresentation;

if (null !== $exception) {
throw $exception;
}

return $result;
}

public function getMessages(): array
{
return $this->data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use Symfony\Component\Lock\Lock;
use Symfony\Component\Lock\Store\SemaphoreStore;
use Symfony\Component\Security\Csrf\CsrfTokenManagerInterface;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Serializer\Serializer;
use Symfony\Component\Translation\Translator;
use Symfony\Component\Validator\Validation;
Expand Down Expand Up @@ -102,6 +103,7 @@ public function getConfigTreeBuilder()
$this->addPhpErrorsSection($rootNode);
$this->addWebLinkSection($rootNode);
$this->addLockSection($rootNode);
$this->addMessengerSection($rootNode);

return $treeBuilder;
}
Expand Down Expand Up @@ -956,4 +958,35 @@ private function addWebLinkSection(ArrayNodeDefinition $rootNode)
->end()
;
}

private function addMessengerSection(ArrayNodeDefinition $rootNode)
{
$rootNode
->children()
->arrayNode('messenger')
->info('Messenger configuration')
->{!class_exists(FullStack::class) && class_exists(MessageBusInterface::class) ? 'canBeDisabled' : 'canBeEnabled'}()
->children()
->arrayNode('routing')
->useAttributeAsKey('message_class')
->prototype('array')
->beforeNormalization()
->ifString()
->then(function ($v) {
return array('senders' => array($v));
})
->end()
->children()
->arrayNode('senders')
->requiresAtLeastOneElement()
->prototype('scalar')->end()
->end()
->end()
->end()
->end()
->end()
->end()
->end()
;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
use Symfony\Component\Lock\LockInterface;
use Symfony\Component\Lock\Store\StoreFactory;
use Symfony\Component\Lock\StoreInterface;
use Symfony\Component\Messenger\Transport\ReceiverInterface;
use Symfony\Component\Messenger\Transport\SenderInterface;
use Symfony\Component\PropertyAccess\PropertyAccessor;
use Symfony\Component\PropertyInfo\PropertyAccessExtractorInterface;
use Symfony\Component\PropertyInfo\PropertyDescriptionExtractorInterface;
Expand Down Expand Up @@ -266,6 +268,12 @@ public function load(array $configs, ContainerBuilder $container)
$this->registerLockConfiguration($config['lock'], $container, $loader);
}

if ($this->isConfigEnabled($container, $config['messenger'])) {
$this->registerMessengerConfiguration($config['messenger'], $container, $loader);
} else {
$container->removeDefinition('console.command.messenger_consume_messages');
}

if ($this->isConfigEnabled($container, $config['web_link'])) {
if (!class_exists(HttpHeaderSerializer::class)) {
throw new LogicException('WebLink support cannot be enabled as the WebLink component is not installed.');
Expand Down Expand Up @@ -333,6 +341,10 @@ public function load(array $configs, ContainerBuilder $container)
->addTag('validator.constraint_validator');
$container->registerForAutoconfiguration(ObjectInitializerInterface::class)
->addTag('validator.initializer');
$container->registerForAutoconfiguration(ReceiverInterface::class)
->addTag('messenger.receiver');
$container->registerForAutoconfiguration(SenderInterface::class)
->addTag('messenger.sender');

if (!$container->getParameter('kernel.debug')) {
// remove tagged iterator argument for resource checkers
Expand Down Expand Up @@ -1410,6 +1422,26 @@ private function registerLockConfiguration(array $config, ContainerBuilder $cont
}
}

private function registerMessengerConfiguration(array $config, ContainerBuilder $container, XmlFileLoader $loader)
{
$loader->load('messenger.xml');

$senderLocatorMapping = array();
$messageToSenderIdsMapping = array();
foreach ($config['routing'] as $message => $messageConfiguration) {
foreach ($messageConfiguration['senders'] as $sender) {
if (null !== $sender) {
$senderLocatorMapping[$sender] = new Reference($sender);
}
}

$messageToSenderIdsMapping[$message] = $messageConfiguration['senders'];
}

$container->getDefinition('messenger.sender_locator')->replaceArgument(0, $senderLocatorMapping);
$container->getDefinition('messenger.asynchronous.routing.sender_locator')->replaceArgument(1, $messageToSenderIdsMapping);
}

private function registerCacheConfiguration(array $config, ContainerBuilder $container)
{
$version = new Parameter('container.build_id');
Expand Down
2 changes: 2 additions & 0 deletions src/Symfony/Bundle/FrameworkBundle/FrameworkBundle.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
use Symfony\Component\HttpKernel\DependencyInjection\RegisterControllerArgumentLocatorsPass;
use Symfony\Component\HttpKernel\DependencyInjection\RemoveEmptyControllerArgumentLocatorsPass;
use Symfony\Component\HttpKernel\DependencyInjection\ResettableServicePass;
use Symfony\Component\Messenger\DependencyInjection\MessengerPass;
use Symfony\Component\PropertyInfo\DependencyInjection\PropertyInfoPass;
use Symfony\Component\Routing\DependencyInjection\RoutingResolverPass;
use Symfony\Component\Serializer\DependencyInjection\SerializerPass;
Expand Down Expand Up @@ -118,6 +119,7 @@ public function build(ContainerBuilder $container)
$container->addCompilerPass(new ResettableServicePass());
$container->addCompilerPass(new TestServiceContainerWeakRefPass(), PassConfig::TYPE_BEFORE_REMOVING, -32);
$container->addCompilerPass(new TestServiceContainerRealRefPass(), PassConfig::TYPE_AFTER_REMOVING);
$this->addCompilerPassIfExists($container, MessengerPass::class);

if ($container->getParameter('kernel.debug')) {
$container->addCompilerPass(new AddDebugLogProcessorPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, -32);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@
<tag name="console.command" command="debug:event-dispatcher" />
</service>

<service id="console.command.messenger_consume_messages" class="Symfony\Bundle\FrameworkBundle\Command\MessengerConsumeMessagesCommand">
<argument type="service" id="message_bus" />
<argument type="service" id="messenger.receiver_locator" />

<tag name="console.command" command="messenger:consume-messages" />
</service>

<service id="console.command.router_debug" class="Symfony\Bundle\FrameworkBundle\Command\RouterDebugCommand">
<argument type="service" id="router" />
<tag name="console.command" command="debug:router" />
Expand Down
Loading