Skip to content

[Messenger] Add a new Messenger component #24411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

sroze
Copy link
Contributor

@sroze sroze commented Oct 3, 2017

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? no
Deprecations? no
Tests pass? yes
Fixed tickets #24326
License MIT
Doc PR symfony/symfony-docs#9437

As discussed in #24326. This PR is to help going forward with the discussions of having a Message component.

Resources

What Where
Documentation In the PR
Demo In sroze/symfony-demo:message-component-demo
php-enqueue adapter 1. Source: In sroze/enqueue-bridge (to be moved as symfony/enqueue-bridge I guess)
2. Demo: In sroze/symfony-demo:message-component-demo-with-enqueue
Swarrot adapter Outdated adapter, waiting for stabilization 1. Source: In sroze/swarrot-bridge (to be moved as symfony/swarrot-bridge I guess)
2. Demo: In sroze/symfony-demo:message-component-demo-with-swarrot
HTTP adapter Outdated adapter, waiting for stabilization 1. Source: In sroze/message-http-adapter
2. Demo: In sroze/symfony-demo:message-component-demo-with-http-adapter
Web profiler integration In the pull-request

Important points

  1. Tests are not in the PR as they were written in PhpSpec & Behat. If we decide to go forward with this approach, I'll translate them to PHPUnit.
  2. The aim is not to solve all the message/queuing problems but provide a good, simple and extensible message bus for developers.
  3. The communication with the actual AMQP/API brokers is down to the adapters for now. Not sure if we need to ship some by default or not 🤔

I guess that this would replace #23842 & #23315.

Changes from the proposals

Based on the comments, a few changes have been made from the proposal.

  1. MessageProducers have been renamed to MessageSenders
  2. MessageConsumers have been renamed to MessageReceivers

@sroze sroze force-pushed the add-message-component branch from 9a45c8f to f342984 Compare October 3, 2017 16:45
@nicolas-grekas nicolas-grekas added this to the 4.1 milestone Oct 3, 2017
@sroze sroze force-pushed the add-message-component branch from f342984 to a67efc3 Compare October 4, 2017 07:30
*/
public function handle($message)
{
call_user_func($this->callableForNextMiddleware(0), $message);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return missing

@sroze sroze force-pushed the add-message-component branch from a67efc3 to 5fdb3e7 Compare October 6, 2017 13:35
Copy link
Contributor

@everzet everzet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd work on naming a bit, Sam :)

Otherwise - 👍

*/
private $producerForMessageResolver;

public function __construct(ProducerForMessageResolverInterface $producerForMessageResolver)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageSenderResolverInterface

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class SendMessageToProducersMiddleware implements MessageBusMiddlewareInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageSendingMiddleware

{
if ($message instanceof ConsumedMessage) {
$message = $message->getMessage();
} elseif (!empty($producers = $this->producerForMessageResolver->getProducersForMessage($message))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getProducersForMessage() => getSender()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I guess we'll rename Producer to Sender I see where you are going. But you replace plural by singular here and that's an interesting place for me to explain why it's plural.

At the beginning it was returning ProducerInterface|null and I had a collection of producer that was dispatching to a set of producers to handle this plural. But there's one use-case that didn't work:

When I dispatch a message "Foo" in the message bus
Then I want the message to be sent to the producer "FooProducer"
And I want the message to be handled by the handler

i.e. I want to send the message to something and I want to call the local handler. This can be useful for audit purposes, duplication, etc... To make this use case happening, you just need to have to use a null handler. (See the note in `README.md)

Therefore, I'd say getProducersForMessage() => getSenders()

*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class CollectionOfMessageHandlers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageHandlers or MessageHandlerCollection for consistency with other names

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's go for MessageHandlerCollection.

* @author Samuel Roze <samuel.roze@gmail.com>
* @author Matthias Noback <matthiasnoback@gmail.com>
*/
class MessageBus implements MessageBusInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MiddleawareMessageBus :D

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we took that the interface is called MessageBusMiddlewareInterface then it should be MessageBusMiddleware which is more accurate

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
interface MessageConsumerInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsumerInterface

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class MessageHandlerResolver implements MessageHandlerResolverInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MappedHandlerResolver

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
interface MessageHandlerResolverInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HandlerResolverInterface

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
interface MessageProducerInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProducerInterface

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class CallMessageHandlerMiddleware implements MessageBusMiddlewareInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HandlerMiddleware

protected function execute(InputInterface $input, OutputInterface $output)
{
/** @var ContainerInterface $container */
$container = $this->getApplication()->getKernel()->getContainer();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use service locators instead of container?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Koc the point here is that the consumer - i.e. the service name - can be given as an argument. Not sure how would the service locator plays with that (as it needs to know the dependencies when constructed AFAIK)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Require tags on consumers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we can require the receivers (note the name change) to be tagged, so we can use service locators 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't see a real point of using service locators in here. It forces us to create a new compiler pass and have another set of tags... while just using the container (as in many other commands in the FrameworkBundle) is the simplest option.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to promote the good practice of not injecting the container in our own code. A service locator + autoconfiguration looks better to me, especially because you have a ReceiverInterface.

Copy link
Contributor Author

@sroze sroze Mar 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Auto-configuration would be nice if we also define a generic folder (such as src/MessageReceiver). Without, it feels a bit too much work for not much benefits, as I explained in the previous comment. But obviously, if the general feedback is that we need this, we can add it (even later hehe :))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fetching the receiver from the container has one big drawback: it must be public. I would say go for an autoconfigured tag + scoped receiver locator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, updated to use service locators 👍

*
* @param object $message
*
* @return mixed

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally a command bus should return void, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And that's the big point here: that's not "just" a command bus but it is a message bus. This can also be used for queries in a CQRS-type architecture.

Note that if you want to enforce such limitation for commands, you can very easily create your own instance of the MessageBus and have a special "noop returns" (or "EnforceCommandsDoNotReturnAnything") middleware that would force return null for example.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thank for explanations.

*/
interface MessageConsumerInterface
{
public function consume(): \Generator;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand this method creates a collection of messages? So you don't consume messages but you create them, right? Perhaps this is not the good wording. Plus, it is strange to use a verb for a method which builds something it is better to use a word like messages()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really not found of the messages() suggestion but agree the naming can be better. What about FetcherInterface that has a fetch(): \Generator method? Because these things are fetching messages from various sources...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or FetchedMessage::messages() :) but Fetcher::fetch() is more clear than the previous proposal.

*
* @param object $message
*/
public function produce($message);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

: void ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should enforce this for now and leave the door open for results. i.e. I know that some brokers like Google Pub/Sub can return things like the "message ID" that is quite useful for logging/debugging purposes sometimes - so I can expect to see people logging these.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or one can pass this directly to the e.g frontend, for it to check periodically if this is already processed. and you don't need to predefine the message id and store it to application db.

3. `CallMessageHandlerMiddleware` (call the registered handle)

```php
$result = $this->get('message_bus')->dispatch(new MyMessage(/* ... */));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the bus handles message: dispatch -> handle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not against this change; waiting to see more opinions on this :)

return $this->messageToProducerMapping['*'];
}

return array();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[]

$handlerResolver->replaceArgument(0, $this->findHandlers($container));
}

private function findHandlers(ContainerBuilder $container)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing return type hint

*
* @author Samuel Roze <samuel.roze@gmail.com>
*/
interface ExceptionInterface

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any value to add an empty interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question; but it's a constant in every Symfony Component so I didn't want to start questioning this one 😉

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using this interface, allows you to catch all the exceptions that come from this component.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok thanks!

@mvrhov
Copy link

mvrhov commented Dec 1, 2017

Now that 4.0 is out the door, can we get some more input from the symfony core team.
Ping @symfony/deciders

/**
* {@inheritdoc}
*/
public function getProducersForMessage($message): array
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this method be refactored as follows?

return $this->messageToProducerMapping[get_class($message)] ?? $this->messageToProducerMapping['*'] ?? array();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point 👍

private $messageBusService;
private $middlewareTag;

public function __construct(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Symfony, the arguments are always put on the same line, no matter how long they are.

{
$handlersByMessage = array();

foreach ($container->findTaggedServiceIds($this->handlerTag, true) as $serviceId => $tags) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use a SplPriorityQueue here instead of manually dealing with priorities? We use it here for example:

Copy link
Contributor Author

@sroze sroze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll rename the following things in this pull-request, following a number of valid comments regarding the names.

  1. Producer -> Sender
  2. Consumer -> Receiver

protected function execute(InputInterface $input, OutputInterface $output)
{
/** @var ContainerInterface $container */
$container = $this->getApplication()->getKernel()->getContainer();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Koc the point here is that the consumer - i.e. the service name - can be given as an argument. Not sure how would the service locator plays with that (as it needs to know the dependencies when constructed AFAIK)

@sroze sroze force-pushed the add-message-component branch from 1202153 to ba18c84 Compare December 1, 2017 16:46
@Koc
Copy link
Contributor

Koc commented Dec 2, 2017

@sroze can we tag all consumers with tag queue.consumer? Even reference tagged services

@mvrhov
Copy link

mvrhov commented Dec 2, 2017

message.consumer or message.handler? would be better.

@Koc
Copy link
Contributor

Koc commented Dec 2, 2017

It depends on accepted terminology. Maybe even without dot separatorm like already proposed in this PR message_middleware tag.

@sroze
Copy link
Contributor Author

sroze commented Dec 3, 2017

@Koc do you mean the message handlers or message receivers? Following the reviews, I had to rename them. Check out the concepts documentation to clarify.

@mvrhov
Copy link

mvrhov commented Dec 23, 2017

I've converted a small project from SimpleBus to Messaging component.
I had to change the following:

  • namespace from SimpleBus to Symfony
  • rename the method from handle to dispatch I'm still not fond of having dispatch instead of handle, but I understand the reasoning. However this will have to be explained to people.
  • change tag from command_handler to message_handler
  • change namespace from *\Command\* to *\Message\*
  • inject message_bus instead of command_bus
  • Use MessageBus instead of CommandBus

I still cannot get rid of the SimpleBus as this is missing the EventRecorder and a EventBus.
More to follow

@sroze sroze force-pushed the add-message-component branch 3 times, most recently from 3bd29c4 to ac80529 Compare December 24, 2017 10:48
{
return array_map(function ($handler) use ($message) {
return $handler($message);
}, $this->handlers);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't an array_walk be better ? Don't see why we would need to fetch the return values of each handlers..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed here I think we should have all we need to get the result of handlers, to support queries. Therefore, array_map makes sense here :)

Copy link
Contributor

@ro0NL ro0NL left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And a 👍 from me :)


<!-- Bus -->
<service id="message_bus" class="Symfony\Component\Message\MessageBus" public="true">
<argument type="collection" /> <!-- Middlewares -->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it be type tagged? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think so 👍

class SendMessageMiddleware implements MiddlewareInterface
{
/**
* @var SenderLocatorInterface
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

obvious IMHO

*/
public function handle($message, callable $next)
{
$this->logger->debug('Starting processing message', array(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... message "'.get_class($message).'"'? (as for logs below).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should have that as part of the log message (as it's in the context) but happy to change if more people believe it should be the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would make identifying messages a bit easier (i.e. in the web profiler log panel, which would boldify the message class :))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if the message is Starting processing {class} message and we add class as context I believe. Which... makes sense actually 👍

try {
$result = $next($message);
} catch (\Throwable $e) {
$this->logger->warning('Something went wrong while processing message', array(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An exception occured while ...?


try {
$method = $reflection->getMethod('__invoke');
} catch (\ReflectionException $e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use hasMethod?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getMethod has to be called nonetheless. As we need the access to the method parameters. Do we really need to call 2 methods instead of just catching the exception in those rare cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to call 2 methods instead of just catching the exception

For readability / less LoC, i'd say yes. No real issue i guess.. just looks like an unneeded micro-optim.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we should also support having the handles attribute on the tag. In case a handler is handling multiple commands/queries and/or the typehint is not present, we need to be able to specify the handled message.

*/
public function __construct(array $middlewares = array())
{
$this->middlewares = $middlewares;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to leverage tagged (which implies iterable type), what about;

$this->middlewares  = is_array($middlewares) ? array_values($middlewares) : iterator_to_array($middlewares, false);

or find a lazy way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, good point. Why would you array_values if array?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to re-index middlewares, ensuring callableForNextMiddleware implem always works. (needs an int $index type btw :))

*/
private $serializer;

public function __construct(SerializerInterface $serializer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

, $format = 'json?

*
* @param object $message
*
* @return array
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about Transport\EncodedMessage?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the value?

Copy link
Contributor

@ro0NL ro0NL Dec 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type info :) i.e. not having to rely on such a comment;

* The most common keys of the encoded array are:
* - `body` (string) - the message body
* - `headers` (string<string>) - a key/value pair of headers

common means required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But much less flexibility for use-cases we don't know about yet. That's for the same reason that the serializer's context is an array.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been back and forth on this.
Serializers are never used in Symfony (message component) code. They are only used by queues to transform a PHP object to array to json. Which means that we should not really care. =)

Except for the SymfonySerialization that ships with the component, any implementation of EncoderInterface and DecoderInterface will only be used inside the concept of a "my-queue-bridge". Which leads to the question, do we need these interfaces?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serializers are never used in Symfony (message component) code. They are only used by queues to transform a PHP object to array to json.

I've read it this way:

Encoders are never used in Symfony (message component) code. They are only used by adapters to transform a PHP object to a PHP array. Adapters will then transform the PHP array to their own transport layer (can be JSON, can be whatever...)

Which means that we should not really care. =)

We, as users, that's correct, we shouldn't care at all about these technical details when dispatching messages to the bus.

Any implementation of EncoderInterface and DecoderInterface will only be used inside the concept of a "my-queue-bridge".

I don't think so, that's why I've put these interfaces within the component. There are multiple use-cases that are not adapter-specific but might provider encoder/decoders:

  • When dispatching messages, it's pretty useful to be able to trace them. To get things like OpenTracing or Zipkin to work well, all we need is "just" proper headers on the messages (via AMQP, HTTP, whatever). So a symfony-message-zipkin package could offer encoder/decoders decorators to populate and read these headers, regardless of the transport.
  • It might be useful for some users/companies to add their own routing specific keys (in body, header or anything else) within the message before it being sent to either of the senders (which could be different adapters). Such encoder/decoder would help them here.

Which leads to the question, do we need these interfaces?

The reasons given above expresses my answer which is: I believe we need them. Also, it reduces the duplication within the various adapters and reduces the overhead of creating one.

#### Same bus received and sender

To allow us to receive and send messages on the same bus and prevent a loop, the message bus is equipped with the
`WrappedIntoReceivedMessage` received. It will wraps the received messages into `ReceivedMessage` objects and the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WrapIntoReceivedMessage + It will wrap the..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, good point 👍

*/
interface ReceiverInterface
{
public function receive(): \Generator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iterable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been done then reverted to \Generator for the reasons described in this comment.

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class HandlerLocator implements HandlerLocatorInterface
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sroze: HandlerLocator should be able to lazy load the handlers. Now the question is, can I change this class to have ServiceLocator injected into the constructor, or should a new class be created and set as default in framework bundle.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for a PSR-11 implem. (IMHO fits core/component)

Copy link
Contributor Author

@sroze sroze Dec 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mvrhov did the work in sroze#3 and it has been merged within this PR 👏

{
$this
->setDefinition(array(
new InputArgument('consumer', InputArgument::REQUIRED, 'Name of the consumer'),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We renamed "consumer" to "receiver": this needs to reflect the change as well.

protected function execute(InputInterface $input, OutputInterface $output)
{
/** @var ContainerInterface $container */
$container = $this->getApplication()->getKernel()->getContainer();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we can require the receivers (note the name change) to be tagged, so we can use service locators 👍


<!-- Bus -->
<service id="message_bus" class="Symfony\Component\Message\MessageBus" public="true">
<argument type="collection" /> <!-- Middlewares -->
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I think so 👍

*/
public function handle($message, callable $next)
{
$this->logger->debug('Starting processing message', array(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if the message is Starting processing {class} message and we add class as context I believe. Which... makes sense actually 👍

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class HandlerLocator implements HandlerLocatorInterface
Copy link
Contributor Author

@sroze sroze Dec 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mvrhov did the work in sroze#3 and it has been merged within this PR 👏

@sroze sroze force-pushed the add-message-component branch 4 times, most recently from 21ae05b to 2824e9a Compare December 25, 2017 12:11
@ogizanagi
Copy link
Contributor

ogizanagi commented Mar 20, 2018

I'd also vote for Message for the reasons given by @sroze & @dunglas.

@sroze
Copy link
Contributor Author

sroze commented Mar 20, 2018

After maaany discussions, especially with @fabpot and @nicolas-grekas, the component is going to be called « Messenger ».

@sroze sroze force-pushed the add-message-component branch from def7896 to 1b4be4e Compare March 21, 2018 07:43

$handlersLocatorMapping = array();
foreach ($handlersByMessage as $message => $handler) {
$handlersLocatorMapping['handles.'.$message] = $handler;
Copy link

@mvrhov mvrhov Mar 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this one supposed to be handler just like in ContainerHandlerLocator.php line 33?
I'm curious why the test didn't catch this

@sroze sroze changed the title [Message] Add a new Message component [Messenger] Add a new Messenger component Mar 21, 2018
@pborreli
Copy link
Contributor

@sroze so now you have to rename your conference name at Symfony Live Paris 😄

protected function dispatchMessage($message)
{
if (!$this->container->has('message_bus')) {
throw new \LogicException('The message bus is not enabled in your application. Try running "composer require symfony/message".');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to be renamed as well.

@@ -118,6 +119,7 @@ public function build(ContainerBuilder $container)
$container->addCompilerPass(new ResettableServicePass());
$container->addCompilerPass(new TestServiceContainerWeakRefPass(), PassConfig::TYPE_BEFORE_REMOVING, -32);
$container->addCompilerPass(new TestServiceContainerRealRefPass(), PassConfig::TYPE_AFTER_REMOVING);
$this->addCompilerPassIfExists($container, MessagePass::class);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be renamed to MessengerPass


<!-- Bus -->
<service id="message_bus" class="Symfony\Component\Messenger\MessageBus" public="true">
<argument type="tagged" tag="message_middleware" /> <!-- Middlewares -->
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep explicit, I'll therefore rename to message_bus_middleware.

@@ -249,6 +250,10 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor
),
),
),
'message' => array(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messenger

interface SenderLocatorInterface
{
/**
* Gets the producer (if applicable) for the given message object.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/producer/sender

try {
$result = $next($message);
} catch (\Throwable $e) {
$this->logger->warning('An exception occurred while processing message {class}', array(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the same language: s/processing/handling

*/
public function handle($message, callable $next)
{
$this->logger->debug('Starting processing message {class}', array(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the same language: s/processing/handling

throw $e;
}

$this->logger->debug('Finished processing message {class}', array(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the same language: s/processing/handling

/**
* @author Samuel Roze <samuel.roze@gmail.com>
*/
class MessagePass implements CompilerPassInterface
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be MessengerPass


$parameter = $parameters[0];
if (null === $parameter->getClass()) {
throw new RuntimeException(sprintf('The parameter of `__invoke` function of service "%s" must type hint the Message class it handles.', $serviceId));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Message => message

@sroze
Copy link
Contributor Author

sroze commented Mar 21, 2018

@pborreli I know 😅

@@ -69,6 +69,13 @@
<tag name="console.command" command="debug:event-dispatcher" />
</service>

<service id="console.command.messenger_consume_message" class="Symfony\Bundle\FrameworkBundle\Command\MessengerConsumeMessagesCommand">
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s\messenger_consume_message\messenger_consume_messages

Copy link
Contributor

@GwendolenLynch GwendolenLynch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So much awesome! Thank you @sroze 🍾

</service>

<service id="data_collector.messenger" class="Symfony\Bundle\FrameworkBundle\DataCollector\MessengerDataCollector">
<tag name="data_collector" template="@WebProfiler/Collector/messages.html.twig" id="messenger" priority="100" />
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s\id="messenger"\id="messages"

this breaks the profiler :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed sir' 👍

@sroze sroze force-pushed the add-message-component branch from 86a91ec to c091d66 Compare March 21, 2018 23:26
@sroze
Copy link
Contributor Author

sroze commented Mar 21, 2018

I have squashed all the commits and added tests for what I believe is really important for now. I'd argue the bits that aren't covered by tests will be covered as long as we keep the policy that every change in this experimental component has to be covered by tests.

Therefore, I'd say it's ready to be merged :)

@fabpot
Copy link
Member

fabpot commented Mar 22, 2018

@sroze Can you add a sentence about the fact that the new component is experimental in the README file?

@sroze sroze force-pushed the add-message-component branch from c63befd to 117b922 Compare March 22, 2018 08:45
@sroze
Copy link
Contributor Author

sroze commented Mar 22, 2018

@fabpot just added a message and a link to the documentation 👍 (kept two commits so it's super easy to revert the experimental one before 4.2)

@sroze sroze mentioned this pull request Mar 22, 2018
2 tasks
@fabpot
Copy link
Member

fabpot commented Mar 23, 2018

Thank you @sroze.

@mcfedr
Copy link
Contributor

mcfedr commented Mar 26, 2018

This looks really interesting, I look forward to trying it out, I have been developing/using a library that solves a similar problem, but in a slightly different (probably less abstract/extensible) way, so you might interested to have a look, https://github.com/mcfedr/queue-manager-bundle, its been used in production on a number of sites that handle ~50 million messages/month - its less of a by-the-book solution, but thought might interesting for comparision

*
* @param object $message The message to dispatch
*
* @final
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an annotation @final but method is not final itself

sroze added a commit that referenced this pull request Apr 12, 2018
This PR was squashed before being merged into the 4.1-dev branch (closes #26632).

Discussion
----------

[Messenger] Add AMQP adapter

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | ø
| License       | MIT

- [x] Depends on the Messenger component #24411
- [x] Add tests once we are all happy about the structure

---

In order to give a great DX for simple needs such as sending messages through an AMQP broker such as RabbitMq, we should ship an AMQP adapter for the Messenger component within Symfony Core. It should be as simple as this proposal. We don't need to handle more specific use-cases nor brokers as other adapters such as the [enqueue adapter](https://github.com/sroze/enqueue-bridge) can also be used.

Configuring the adapter is as simple as the following configuration:
```yaml
# config/packages/messenger_adapters.yaml
framework:
    messenger:
        adapter: "%env(MESSENGER_DSN)%"
```

With the given `.env` for example:
```
MESSENGER_DSN=amqp://guest:guest@localhost:5672/%2f/messages
```

Keep in mind that after having configured the adapter, developers have to route their messages to the given adapter.

```yaml
# config/packages/messenger_routes.yaml
framework:
    messenger:
        routing:
producer).
            'App\Message\Command\CreateNumber': messenger.default_sender
```

---

Additionally, multiple adapters can be created and messages routed to these ones.

```yaml
# config/packages/messenger_routes.yaml
framework:
    messenger:
        adapters:
            commands: "amqp://guest:guest@localhost:5672/%2f/commands"
            maintenance: "amqp://guest:guest@localhost:5672/%2f/maintenance"
        routing:
producer).
            'App\Message\Command\CreateNumber': messenger.commands_sender
            'App\Message\Command\MaintenanceSpecificCommand': messenger.maintenance_sender
```

Commits
-------

798c230 [Messenger] Add AMQP adapter
@fabpot fabpot mentioned this pull request May 7, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Feature ❄️ Feature Freeze Important Pull Requests to finish before the next Symfony "feature freeze" Status: Reviewed
Projects
None yet
Development

Successfully merging this pull request may close these issues.