-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Add a new Messenger component #24411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
9a45c8f
to
f342984
Compare
f342984
to
a67efc3
Compare
*/ | ||
public function handle($message) | ||
{ | ||
call_user_func($this->callableForNextMiddleware(0), $message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return missing
a67efc3
to
5fdb3e7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd work on naming a bit, Sam :)
Otherwise - 👍
*/ | ||
private $producerForMessageResolver; | ||
|
||
public function __construct(ProducerForMessageResolverInterface $producerForMessageResolver) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageSenderResolverInterface
/** | ||
* @author Samuel Roze <samuel.roze@gmail.com> | ||
*/ | ||
class SendMessageToProducersMiddleware implements MessageBusMiddlewareInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageSendingMiddleware
{ | ||
if ($message instanceof ConsumedMessage) { | ||
$message = $message->getMessage(); | ||
} elseif (!empty($producers = $this->producerForMessageResolver->getProducersForMessage($message))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getProducersForMessage()
=> getSender()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I guess we'll rename Producer
to Sender
I see where you are going. But you replace plural by singular here and that's an interesting place for me to explain why it's plural.
At the beginning it was returning ProducerInterface|null
and I had a collection of producer that was dispatching to a set of producers to handle this plural. But there's one use-case that didn't work:
When I dispatch a message "Foo" in the message bus
Then I want the message to be sent to the producer "FooProducer"
And I want the message to be handled by the handler
i.e. I want to send the message to something and I want to call the local handler. This can be useful for audit purposes, duplication, etc... To make this use case happening, you just need to have to use a null
handler. (See the note in `README.md)
Therefore, I'd say getProducersForMessage()
=> getSenders()
* | ||
* @author Samuel Roze <samuel.roze@gmail.com> | ||
*/ | ||
class CollectionOfMessageHandlers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MessageHandlers
or MessageHandlerCollection
for consistency with other names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's go for MessageHandlerCollection
.
* @author Samuel Roze <samuel.roze@gmail.com> | ||
* @author Matthias Noback <matthiasnoback@gmail.com> | ||
*/ | ||
class MessageBus implements MessageBusInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MiddleawareMessageBus
:D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we took that the interface is called MessageBusMiddlewareInterface
then it should be MessageBusMiddleware
which is more accurate
/** | ||
* @author Samuel Roze <samuel.roze@gmail.com> | ||
*/ | ||
interface MessageConsumerInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConsumerInterface
/** | ||
* @author Samuel Roze <samuel.roze@gmail.com> | ||
*/ | ||
class MessageHandlerResolver implements MessageHandlerResolverInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MappedHandlerResolver
/** | ||
* @author Samuel Roze <samuel.roze@gmail.com> | ||
*/ | ||
interface MessageHandlerResolverInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HandlerResolverInterface
/** | ||
* @author Samuel Roze <samuel.roze@gmail.com> | ||
*/ | ||
interface MessageProducerInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ProducerInterface
/** | ||
* @author Samuel Roze <samuel.roze@gmail.com> | ||
*/ | ||
class CallMessageHandlerMiddleware implements MessageBusMiddlewareInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HandlerMiddleware
protected function execute(InputInterface $input, OutputInterface $output) | ||
{ | ||
/** @var ContainerInterface $container */ | ||
$container = $this->getApplication()->getKernel()->getContainer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use service locators instead of container?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Koc the point here is that the consumer - i.e. the service name - can be given as an argument. Not sure how would the service locator plays with that (as it needs to know the dependencies when constructed AFAIK)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Require tags on consumers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, we can require the receivers (note the name change) to be tagged, so we can use service locators 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I don't see a real point of using service locators in here. It forces us to create a new compiler pass and have another set of tags... while just using the container (as in many other commands in the FrameworkBundle) is the simplest option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to promote the good practice of not injecting the container in our own code. A service locator + autoconfiguration looks better to me, especially because you have a ReceiverInterface
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Auto-configuration would be nice if we also define a generic folder (such as src/MessageReceiver
). Without, it feels a bit too much work for not much benefits, as I explained in the previous comment. But obviously, if the general feedback is that we need this, we can add it (even later hehe :))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fetching the receiver from the container has one big drawback: it must be public. I would say go for an autoconfigured tag + scoped receiver locator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough, updated to use service locators 👍
* | ||
* @param object $message | ||
* | ||
* @return mixed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally a command bus should return void, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And that's the big point here: that's not "just" a command bus but it is a message bus. This can also be used for queries in a CQRS-type architecture.
Note that if you want to enforce such limitation for commands, you can very easily create your own instance of the MessageBus
and have a special "noop returns" (or "EnforceCommandsDoNotReturnAnything") middleware that would force return null
for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, thank for explanations.
*/ | ||
interface MessageConsumerInterface | ||
{ | ||
public function consume(): \Generator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand this method creates a collection of messages? So you don't consume messages but you create them, right? Perhaps this is not the good wording. Plus, it is strange to use a verb for a method which builds something it is better to use a word like messages()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm really not found of the messages()
suggestion but agree the naming can be better. What about FetcherInterface
that has a fetch(): \Generator
method? Because these things are fetching messages from various sources...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or FetchedMessage::messages()
:) but Fetcher::fetch()
is more clear than the previous proposal.
* | ||
* @param object $message | ||
*/ | ||
public function produce($message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
: void ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we should enforce this for now and leave the door open for results. i.e. I know that some brokers like Google Pub/Sub can return things like the "message ID" that is quite useful for logging/debugging purposes sometimes - so I can expect to see people logging these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or one can pass this directly to the e.g frontend, for it to check periodically if this is already processed. and you don't need to predefine the message id and store it to application db.
3. `CallMessageHandlerMiddleware` (call the registered handle) | ||
|
||
```php | ||
$result = $this->get('message_bus')->dispatch(new MyMessage(/* ... */)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the bus handles message: dispatch -> handle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not against this change; waiting to see more opinions on this :)
return $this->messageToProducerMapping['*']; | ||
} | ||
|
||
return array(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[]
$handlerResolver->replaceArgument(0, $this->findHandlers($container)); | ||
} | ||
|
||
private function findHandlers(ContainerBuilder $container) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing return type hint
* | ||
* @author Samuel Roze <samuel.roze@gmail.com> | ||
*/ | ||
interface ExceptionInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any value to add an empty interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good question; but it's a constant in every Symfony Component so I didn't want to start questioning this one 😉
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using this interface, allows you to catch all the exceptions that come from this component.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok thanks!
Now that 4.0 is out the door, can we get some more input from the symfony core team. |
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function getProducersForMessage($message): array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this method be refactored as follows?
return $this->messageToProducerMapping[get_class($message)] ?? $this->messageToProducerMapping['*'] ?? array();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point 👍
private $messageBusService; | ||
private $middlewareTag; | ||
|
||
public function __construct( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Symfony, the arguments are always put on the same line, no matter how long they are.
{ | ||
$handlersByMessage = array(); | ||
|
||
foreach ($container->findTaggedServiceIds($this->handlerTag, true) as $serviceId => $tags) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we use a SplPriorityQueue
here instead of manually dealing with priorities? We use it here for example:
symfony/src/Symfony/Component/DependencyInjection/Compiler/DecoratorServicePass.php
Line 28 in bf4b09f
$definitions = new \SplPriorityQueue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll rename the following things in this pull-request, following a number of valid comments regarding the names.
- Producer -> Sender
- Consumer -> Receiver
protected function execute(InputInterface $input, OutputInterface $output) | ||
{ | ||
/** @var ContainerInterface $container */ | ||
$container = $this->getApplication()->getKernel()->getContainer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Koc the point here is that the consumer - i.e. the service name - can be given as an argument. Not sure how would the service locator plays with that (as it needs to know the dependencies when constructed AFAIK)
1202153
to
ba18c84
Compare
@sroze can we tag all consumers with tag |
|
It depends on accepted terminology. Maybe even without dot separatorm like already proposed in this PR |
@Koc do you mean the message handlers or message receivers? Following the reviews, I had to rename them. Check out the concepts documentation to clarify. |
I've converted a small project from SimpleBus to Messaging component.
I still cannot get rid of the SimpleBus as this is missing the EventRecorder and a EventBus. |
3bd29c4
to
ac80529
Compare
{ | ||
return array_map(function ($handler) use ($message) { | ||
return $handler($message); | ||
}, $this->handlers); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't an array_walk
be better ? Don't see why we would need to fetch the return values of each handlers..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed here I think we should have all we need to get the result of handlers, to support queries. Therefore, array_map
makes sense here :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And a 👍 from me :)
|
||
<!-- Bus --> | ||
<service id="message_bus" class="Symfony\Component\Message\MessageBus" public="true"> | ||
<argument type="collection" /> <!-- Middlewares --> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can it be type tagged
? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I think so 👍
class SendMessageMiddleware implements MiddlewareInterface | ||
{ | ||
/** | ||
* @var SenderLocatorInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
obvious IMHO
*/ | ||
public function handle($message, callable $next) | ||
{ | ||
$this->logger->debug('Starting processing message', array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... message "'.get_class($message).'"'
? (as for logs below).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should have that as part of the log message (as it's in the context) but happy to change if more people believe it should be the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would make identifying messages a bit easier (i.e. in the web profiler log panel, which would boldify the message class :))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only if the message is Starting processing {class} message
and we add class
as context I believe. Which... makes sense actually 👍
try { | ||
$result = $next($message); | ||
} catch (\Throwable $e) { | ||
$this->logger->warning('Something went wrong while processing message', array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An exception occured while ...
?
|
||
try { | ||
$method = $reflection->getMethod('__invoke'); | ||
} catch (\ReflectionException $e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just use hasMethod
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getMethod
has to be called nonetheless. As we need the access to the method parameters. Do we really need to call 2 methods instead of just catching the exception in those rare cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to call 2 methods instead of just catching the exception
For readability / less LoC, i'd say yes. No real issue i guess.. just looks like an unneeded micro-optim.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that we should also support having the handles
attribute on the tag. In case a handler is handling multiple commands/queries and/or the typehint is not present, we need to be able to specify the handled message.
*/ | ||
public function __construct(array $middlewares = array()) | ||
{ | ||
$this->middlewares = $middlewares; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to leverage tagged
(which implies iterable
type), what about;
$this->middlewares = is_array($middlewares) ? array_values($middlewares) : iterator_to_array($middlewares, false);
or find a lazy way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, good point. Why would you array_values
if array?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to re-index middlewares, ensuring callableForNextMiddleware
implem always works. (needs an int $index
type btw :))
*/ | ||
private $serializer; | ||
|
||
public function __construct(SerializerInterface $serializer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
, $format = 'json
?
* | ||
* @param object $message | ||
* | ||
* @return array |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about Transport\EncodedMessage
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be the value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type info :) i.e. not having to rely on such a comment;
* The most common keys of the encoded array are:
* - `body` (string) - the message body
* - `headers` (string<string>) - a key/value pair of headers
common means required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But much less flexibility for use-cases we don't know about yet. That's for the same reason that the serializer's context is an array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been back and forth on this.
Serializers are never used in Symfony (message component) code. They are only used by queues to transform a PHP object to array to json. Which means that we should not really care. =)
Except for the SymfonySerialization
that ships with the component, any implementation of EncoderInterface
and DecoderInterface
will only be used inside the concept of a "my-queue-bridge". Which leads to the question, do we need these interfaces?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serializers are never used in Symfony (message component) code. They are only used by queues to transform a PHP object to array to json.
I've read it this way:
Encoders are never used in Symfony (message component) code. They are only used by adapters to transform a PHP object to a PHP array. Adapters will then transform the PHP array to their own transport layer (can be JSON, can be whatever...)
Which means that we should not really care. =)
We, as users, that's correct, we shouldn't care at all about these technical details when dispatching messages to the bus.
Any implementation of EncoderInterface and DecoderInterface will only be used inside the concept of a "my-queue-bridge".
I don't think so, that's why I've put these interfaces within the component. There are multiple use-cases that are not adapter-specific but might provider encoder/decoders:
- When dispatching messages, it's pretty useful to be able to trace them. To get things like OpenTracing or Zipkin to work well, all we need is "just" proper headers on the messages (via AMQP, HTTP, whatever). So a
symfony-message-zipkin
package could offer encoder/decoders decorators to populate and read these headers, regardless of the transport. - It might be useful for some users/companies to add their own routing specific keys (in body, header or anything else) within the message before it being sent to either of the senders (which could be different adapters). Such encoder/decoder would help them here.
Which leads to the question, do we need these interfaces?
The reasons given above expresses my answer which is: I believe we need them. Also, it reduces the duplication within the various adapters and reduces the overhead of creating one.
#### Same bus received and sender | ||
|
||
To allow us to receive and send messages on the same bus and prevent a loop, the message bus is equipped with the | ||
`WrappedIntoReceivedMessage` received. It will wraps the received messages into `ReceivedMessage` objects and the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WrapIntoReceivedMessage
+ It will wrap the..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, good point 👍
*/ | ||
interface ReceiverInterface | ||
{ | ||
public function receive(): \Generator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iterable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been done then reverted to \Generator
for the reasons described in this comment.
/** | ||
* @author Samuel Roze <samuel.roze@gmail.com> | ||
*/ | ||
class HandlerLocator implements HandlerLocatorInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sroze: HandlerLocator should be able to lazy load the handlers. Now the question is, can I change this class to have ServiceLocator injected into the constructor, or should a new class be created and set as default in framework bundle.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for a PSR-11 implem. (IMHO fits core/component)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
{ | ||
$this | ||
->setDefinition(array( | ||
new InputArgument('consumer', InputArgument::REQUIRED, 'Name of the consumer'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We renamed "consumer" to "receiver": this needs to reflect the change as well.
protected function execute(InputInterface $input, OutputInterface $output) | ||
{ | ||
/** @var ContainerInterface $container */ | ||
$container = $this->getApplication()->getKernel()->getContainer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, we can require the receivers (note the name change) to be tagged, so we can use service locators 👍
|
||
<!-- Bus --> | ||
<service id="message_bus" class="Symfony\Component\Message\MessageBus" public="true"> | ||
<argument type="collection" /> <!-- Middlewares --> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I think so 👍
*/ | ||
public function handle($message, callable $next) | ||
{ | ||
$this->logger->debug('Starting processing message', array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only if the message is Starting processing {class} message
and we add class
as context I believe. Which... makes sense actually 👍
/** | ||
* @author Samuel Roze <samuel.roze@gmail.com> | ||
*/ | ||
class HandlerLocator implements HandlerLocatorInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
21ae05b
to
2824e9a
Compare
After maaany discussions, especially with @fabpot and @nicolas-grekas, the component is going to be called « Messenger ». |
def7896
to
1b4be4e
Compare
|
||
$handlersLocatorMapping = array(); | ||
foreach ($handlersByMessage as $message => $handler) { | ||
$handlersLocatorMapping['handles.'.$message] = $handler; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this one supposed to be handler
just like in ContainerHandlerLocator.php line 33?
I'm curious why the test didn't catch this
@sroze so now you have to rename your conference name at Symfony Live Paris 😄 |
protected function dispatchMessage($message) | ||
{ | ||
if (!$this->container->has('message_bus')) { | ||
throw new \LogicException('The message bus is not enabled in your application. Try running "composer require symfony/message".'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to be renamed as well.
@@ -118,6 +119,7 @@ public function build(ContainerBuilder $container) | |||
$container->addCompilerPass(new ResettableServicePass()); | |||
$container->addCompilerPass(new TestServiceContainerWeakRefPass(), PassConfig::TYPE_BEFORE_REMOVING, -32); | |||
$container->addCompilerPass(new TestServiceContainerRealRefPass(), PassConfig::TYPE_AFTER_REMOVING); | |||
$this->addCompilerPassIfExists($container, MessagePass::class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be renamed to MessengerPass
|
||
<!-- Bus --> | ||
<service id="message_bus" class="Symfony\Component\Messenger\MessageBus" public="true"> | ||
<argument type="tagged" tag="message_middleware" /> <!-- Middlewares --> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To keep explicit, I'll therefore rename to message_bus_middleware
.
@@ -249,6 +250,10 @@ class_exists(SemaphoreStore::class) && SemaphoreStore::isSupported() ? 'semaphor | |||
), | |||
), | |||
), | |||
'message' => array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
messenger
interface SenderLocatorInterface | ||
{ | ||
/** | ||
* Gets the producer (if applicable) for the given message object. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/producer/sender
try { | ||
$result = $next($message); | ||
} catch (\Throwable $e) { | ||
$this->logger->warning('An exception occurred while processing message {class}', array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the same language: s/processing/handling
*/ | ||
public function handle($message, callable $next) | ||
{ | ||
$this->logger->debug('Starting processing message {class}', array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the same language: s/processing/handling
throw $e; | ||
} | ||
|
||
$this->logger->debug('Finished processing message {class}', array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the same language: s/processing/handling
/** | ||
* @author Samuel Roze <samuel.roze@gmail.com> | ||
*/ | ||
class MessagePass implements CompilerPassInterface |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be MessengerPass
|
||
$parameter = $parameters[0]; | ||
if (null === $parameter->getClass()) { | ||
throw new RuntimeException(sprintf('The parameter of `__invoke` function of service "%s" must type hint the Message class it handles.', $serviceId)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Message
=> message
@pborreli I know 😅 |
@@ -69,6 +69,13 @@ | |||
<tag name="console.command" command="debug:event-dispatcher" /> | |||
</service> | |||
|
|||
<service id="console.command.messenger_consume_message" class="Symfony\Bundle\FrameworkBundle\Command\MessengerConsumeMessagesCommand"> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s\messenger_consume_message\messenger_consume_messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So much awesome! Thank you @sroze 🍾
</service> | ||
|
||
<service id="data_collector.messenger" class="Symfony\Bundle\FrameworkBundle\DataCollector\MessengerDataCollector"> | ||
<tag name="data_collector" template="@WebProfiler/Collector/messages.html.twig" id="messenger" priority="100" /> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s\id="messenger"\id="messages"
this breaks the profiler :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed sir' 👍
86a91ec
to
c091d66
Compare
I have squashed all the commits and added tests for what I believe is really important for now. I'd argue the bits that aren't covered by tests will be covered as long as we keep the policy that every change in this experimental component has to be covered by tests. Therefore, I'd say it's ready to be merged :) |
@sroze Can you add a sentence about the fact that the new component is experimental in the README file? |
c63befd
to
117b922
Compare
@fabpot just added a message and a link to the documentation 👍 (kept two commits so it's super easy to revert the experimental one before 4.2) |
Thank you @sroze. |
This looks really interesting, I look forward to trying it out, I have been developing/using a library that solves a similar problem, but in a slightly different (probably less abstract/extensible) way, so you might interested to have a look, https://github.com/mcfedr/queue-manager-bundle, its been used in production on a number of sites that handle ~50 million messages/month - its less of a by-the-book solution, but thought might interesting for comparision |
* | ||
* @param object $message The message to dispatch | ||
* | ||
* @final |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an annotation @final
but method is not final itself
This PR was squashed before being merged into the 4.1-dev branch (closes #26632). Discussion ---------- [Messenger] Add AMQP adapter | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | ø | License | MIT - [x] Depends on the Messenger component #24411 - [x] Add tests once we are all happy about the structure --- In order to give a great DX for simple needs such as sending messages through an AMQP broker such as RabbitMq, we should ship an AMQP adapter for the Messenger component within Symfony Core. It should be as simple as this proposal. We don't need to handle more specific use-cases nor brokers as other adapters such as the [enqueue adapter](https://github.com/sroze/enqueue-bridge) can also be used. Configuring the adapter is as simple as the following configuration: ```yaml # config/packages/messenger_adapters.yaml framework: messenger: adapter: "%env(MESSENGER_DSN)%" ``` With the given `.env` for example: ``` MESSENGER_DSN=amqp://guest:guest@localhost:5672/%2f/messages ``` Keep in mind that after having configured the adapter, developers have to route their messages to the given adapter. ```yaml # config/packages/messenger_routes.yaml framework: messenger: routing: producer). 'App\Message\Command\CreateNumber': messenger.default_sender ``` --- Additionally, multiple adapters can be created and messages routed to these ones. ```yaml # config/packages/messenger_routes.yaml framework: messenger: adapters: commands: "amqp://guest:guest@localhost:5672/%2f/commands" maintenance: "amqp://guest:guest@localhost:5672/%2f/maintenance" routing: producer). 'App\Message\Command\CreateNumber': messenger.commands_sender 'App\Message\Command\MaintenanceSpecificCommand': messenger.maintenance_sender ``` Commits ------- 798c230 [Messenger] Add AMQP adapter
As discussed in #24326. This PR is to help going forward with the discussions of having a Message component.
Resources
sroze/symfony-demo:message-component-demo
sroze/enqueue-bridge
(to be moved assymfony/enqueue-bridge
I guess)2. Demo: In
sroze/symfony-demo:message-component-demo-with-enqueue
sroze/swarrot-bridge
(to be moved assymfony/swarrot-bridge
I guess)2. Demo: In
sroze/symfony-demo:message-component-demo-with-swarrot
sroze/message-http-adapter
2. Demo: In
sroze/symfony-demo:message-component-demo-with-http-adapter
Important points
I guess that this would replace #23842 & #23315.
Changes from the proposals
Based on the comments, a few changes have been made from the proposal.
MessageProducer
s have been renamed toMessageSender
sMessageConsumer
s have been renamed toMessageReceiver
s