-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Support configuring messages when dispatching #26945
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
{ | ||
/** | ||
* @param string $wrapperFqcn The wrapper FQCN you're looking for | ||
* @param object|null The original message without wrappers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be @return
instead 😅
return $this; | ||
} | ||
|
||
$message = $this->message; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this variable isn't useful
if ($message instanceof WrappedMessage) { | ||
/** @var SerializerContextAwareMessage|null $contextAwareMessage */ | ||
if ($contextAwareMessage = $message->unwrap(SerializerContextAwareMessage::class)) { | ||
$context = $contextAwareMessage->getContext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess after https://github.com/symfony/symfony/pull/26941/files#diff-5e7347edce37c5886ec67b7ba02f3a9cR1019 both context entries can be merged? having priority the specific one over the general one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just replace actually. But that's debatable.
That's a very interesting idea. So if I understand it correctly, it would allow users to configure things when they dispatch the messages to the bus. For example, in order to configure the serialization groups used when serializing the message or enabling the validation, this would look like this: $bus->dispatch(new SerializerContextAwareMessage(
new MessageToValidate(
new MyMessage(/* ... */)
),
['groups' => [...]]
));
|
I'd expect some opinions first before deciding if we want to support it or even if it's is a concept only acting before sending the message :) But we could find a solution for sure.
That's the point to me indeed. Don't mix infrastructural requirements with my messages. This is also the most flexible approach as implementing such interfaces would be cumbersome, inheriting multiple base classes won't be possible appart from using traits, and I want to keep my messages simple wrappers around the data they exist for, no more. Composing through these wrappers solves this and is only the responsibility of the dispatcher (I don't mind the infrastructural stuff knowing about my messages, so "coupling" in this way is not an issue). As you mentioned it, this also allows having "different rules" for the same message, according to the place it was dispatched. Can still be relevant for the metrics middleware example aforementioned for instance. |
Alright, I definitely like the idea. The problem that we are trying to solve here will happen both ways (before/after sending/receiving the message), so we need this "configuration" to survive encoding/decoding or otherwise, it won't be a valid solution for lots of use cases. Do you have an idea to tackle this serialization problem? Nesting the message within multiple wrappers sound like a source of troubles when doing the de-serialization. It also poses the problem of communication with 3rd party systems. Let's get the example of an application communicating with another one via RabbitMq. If they want to configure the retry mechanism (an example that would use such configuration) then the serialized message within the queue would be too different (the message is now within another one) for the one before and... it wouldn't work anymore. |
An alternative to your idea is a more flat option. $message = new ConfiguredMessage($message);
$message = $message->with(new RetryStrategyConfiguration(/* ... */));
$message = $message->with(new SerializationConfiguration(/* ... */));
$bus->dispatch($message);
WDYT? 🤔 |
That looks nice to me, I should give it a try :) |
|
I see some red flags when you have classes with feature interfaces (like our messages) and you try to wrap (decorate) them in other classes. We had the similar situation with Im not sure I understand why this is needed. Or I mean, why we need this abstraction to solve the problems mentioned. We mention validation groups, but from a CQRS perspective, messages are for one purpose only. Ie you would never need to validate the same message in two different ways. About the issue when you only want to retry some messages: See here what HTTPlug is doing. Sure, HTTP is simpler here. If you got something super complex, them maybe you should implement your own RetryPlugin. For the vast majority of the users I assume they want to retry all messages until they pass. At the moment Im 👎 for this PR. If one still believe that this is a really good feature, I see to problem implementing this in a third party library. The Messenger component is so awesome and flexible so one could easily create a |
@Nyholm can you elaborate on these ones? You actually did not mention any of these "red flags" 😉 Also, what do you mean by "classes with feature interfaces"? Last but not least, do you acknowledge that you saw that in the implementation, the middleware or handler won't receive the wrapped message, right? It's only if you implement a specific interface that you do so I don't see understand the issue.
Right now, without such feature proposed here, we simply can't (or I don't see how at least 😃) have a generic retry feature, unfortunately. In order to have such a feature (broader than just the AmqpExt adapter), we need to pass some metadata down to the adapters in some way (an interface on the message or via wrapping it). |
Of course. In PHP cache we have cache adapters for different types of storages. Some storage/adapter may support tagging, some support hierarchy cache. That is why we have "feature interfaces" like When we later wanted to decorate our adapters with a I do recognize that
No, I did not see that. So it only the bus that handle the WrappedMessage?
I think the logic of "how a message should be retired" should not belong to the message. The message itself should not care at all. It is the RetryMiddleware that should handle that. Of the top of my head, here is an example: class RetryMiddleware {
private $decider; // A service that decides if a message should be retried or not
private $publisher; // A service that pushes the message to a new(?) bus or different(?) queue
// ...
public function handle($message, callable $next)
{
try {
$result = $next($message);
} catch (\Throwable $e) {
if($this->decider->decide($message, $e)) {
$this->publisher->publish($message, $e);
}
throw $e;
}
return $result;
}
} |
Sorry if the description was unclear/minimalistic. I did open this RFC through a PR in order to give examples through code rather than a big description so we can discuss all its aspects right now through (partial) implementation. The idea is to make wrapper transparent in most cases when directly consuming a message like in middlewares. If a middleware needs to access the wrappers and not only the original message, it has to implement a specific marker interface, used by the bus to send either the message and its wrappers around, or just the message otherwise. That's what the So, right now, I think the red flags you mentioned actually won't be an issue for middlewares.
You can still implement feature interface on your messages. But that's precisely what I'd want to avoid actually, because I don't want my messages to be anything else than VO around the data they exist for. Starting implementing feature interfaces related to infrastructural concerns is not the responsibility of these objects, but rather on the callers's side.
We introduced
Even if as I said it should be transparent for middlewares, this requires the bus to be aware of such a feature to choose to provide either the wrapped or unwrapped message to each middlewares in the stack. Third-parties middlewares decorators would also need to be aware of such a feature and implement We should also consider that, currently, there is already a wrapper in the wild (ReceivedMessage) and it already requires a special treatment: #27066. Either the concept should be generalized, or either we might think about another way to handle it. |
Thank you for such a detailed answer @ogizanagi. I think that this is a needed feature as well and I'm 👍 to add it (the flat version though, for the reasons I've mentioned). |
Hey guys! As I have very little Message Bus experience, take my opinion as a bit of an outsider/noob :). I don't really have a good impression as to if this is or isn't a good idea, or the possible pitfalls. However, this note makes a lot of sense to me:
And, in general, even if it’s an edge case, the fact that there is currently no way to “attach” middleware configuration to a message (e.g. validation groups) seems not ideal, at the very least. I'd also prefer the flat approach. But, more for the reason that messages inside of messages make my head spin a little bit. And, I think there's no disadvantage to the flat version? |
IMO this should stay like this, the configuration should depend of the bus and message class but the caller shouldn't be able to override this configuration. In the example of the validation group this would allow anyone to pass invalid messages by setting the validation group to one with no constraints. |
I've pushed the new flat version. To answer @weaverryan, there isn't any disadvantage to it in comparison with the wrappers approach. For the serialization of flatten configurations, I'd expect each configuration to be |
@@ -29,7 +30,10 @@ public function receive(callable $handler): void | |||
{ | |||
$this->decoratedReceiver->receive(function ($message) use ($handler) { | |||
if (null !== $message) { | |||
$message = new ReceivedMessage($message); | |||
if (!$message instanceof ConfiguredMessage) { | |||
$message = new ConfiguredMessage($message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why didn't you use ConfiguredMessage::from()
here?
return $message instanceof self ? $message : new self($message); | ||
} | ||
|
||
public function with(ConfigurationInterface $configuration): self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me, with
should clone the object and return another one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just because of the name? 😄Or any other reason?
'headers' => array('type' => \get_class($message)), | ||
'body' => $this->serializer->serialize($originalMessage, $this->format, $context), | ||
'headers' => array('type' => \get_class($originalMessage)), | ||
'configurations' => $message instanceof ConfiguredMessage ? serialize($message->all()) : null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't necessarily be picked up by the adapters. I suggest we use headers for that. Typically, we have have the headers looking like that:
X-Message-Configuration-1: Symfony\Component\Messenger\Transport\Serialization\SerializerConfiguration:{"context": {"foo": "bar"}}
X-Message-Configuration-2: Symfony\Component\Messenger\Asynchronous\Transport\ReceivedMessage:{}
And we use Symfony Serializer to use some JSON so we ensure it's compatible with other systems reading it.
@@ -32,17 +34,22 @@ public function __construct(SenderLocatorInterface $senderLocator) | |||
*/ | |||
public function handle($message, callable $next) | |||
{ | |||
if ($message instanceof ReceivedMessage) { | |||
return $next($message->getMessage()); | |||
$configuredMessage = ConfiguredMessage::from($message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having these $configuredMessage
and $originalMessage
everywhere indeed looks weird. What about the following:
$message = ReceivedMessage::fromMessage($message);
if ($message->hasConfiguration(ReceivedMessage::class)) {
return $next($message);
}
if (!empty($senders = $this->senderLocator->getSendersForMessage($message->for($this->senderLocator)) {
// ...
$sender->send($message->for($sender));
}
The $message->for
method would return the message according to the presence of the ConfiguredMessageAwareInterface
interface.
Rebase needed after #27129 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor things to change. But 👌
return $message instanceof self ? $message : new self($message); | ||
} | ||
|
||
public function with(EnvelopeItemInterface $item): self |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As already mentioned earlier, I have the same reaction with with
. It makes sense to clone the message in it IMHO so it’s immutable.
One more thing: should every item go through transport? We could just ignore non-serializable items when |
But wouldn’t making it optionally serialisable be a source of potential
confusion and “tricky bugs”? (i.e. the middleware X doesn’t “work” anymore
because I forgot to add the Serializable on my EnvelopeItem)
…On Sat, 5 May 2018 at 15:03, Maxime Steinhausser ***@***.***> wrote:
One more thing: should every items go through transport? We could just
ignore non-serializable items when encoding. Also ReceivedMessage just is
a marker used on receiver's side so no point being serializable.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#26945 (comment)>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAxHESfpeFACi4cJmu-4pw_b6S86axmPks5tvaMQgaJpZM4TWWEB>
.
|
Yes, that's a fair point. So I hesitated proposing a |
I've opened PR in @ogizanagi's repository that solve this problem: https://github.com/ogizanagi/symfony/pull/1 , Mark transportable items and ensure that they can be properly serialized. You can cherry pick my commit if this kind of sollution is acceptable. |
Thank you @Koc ! Unfortunately, I chose not doing so on purpose because I believe it has the same drawbacks expressed in #26945 (comment). |
You cann't forget implement |
The point was you can forget implementing So that's not changing anything to what is expressed in #26945 (comment). Your suggestion is basically just using a dedicated Messenger interface rather than just The question is: is it either a DX or documentation issue to solve. |
…dler/g (ogizanagi) This PR was merged into the 4.1-dev branch. Discussion ---------- [Messenger] Fix missing s/tolerate_no_handler/allow_no_handler/g | Q | A | ------------- | --- | Branch? | master <!-- see below --> | Bug fix? | yes | New feature? | no <!-- don't forget to update src/**/CHANGELOG.md files --> | BC breaks? | no <!-- see https://symfony.com/bc --> | Deprecations? | no <!-- don't forget to update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Tests pass? | yes <!-- please add some, will be required by reviewers --> | Fixed tickets | N/A <!-- #-prefixed issue number(s), if any --> | License | MIT | Doc PR | N/A Spotted while rebasing #26945: some places weren't updated. Most are in tests and not that much important but should be updated for consistency. But moreover, the `AllowNoHandlerMiddleware` wiring in DI was still referencing the old class name. Commits ------- e7cfb59 [Messenger] Fix missing s/tolerate_no_handler/allow_no_handler/g
Let's go ahead with this one in 4.1 beta, it's definitely needed. |
It took time, but here we go, this is in now. Thank you very much @ogizanagi. |
…ing (ogizanagi) This PR was squashed before being merged into the 4.1-dev branch (closes #26945). Discussion ---------- [Messenger] Support configuring messages when dispatching | Q | A | ------------- | --- | Branch? | master <!-- see below --> | Bug fix? | no | New feature? | yes <!-- don't forget to update src/**/CHANGELOG.md files --> | BC breaks? | no <!-- see https://symfony.com/bc --> | Deprecations? | no <!-- don't forget to update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Tests pass? | see CI checks <!-- please add some, will be required by reviewers --> | Fixed tickets | N/A <!-- #-prefixed issue number(s), if any --> | License | MIT | Doc PR | todo For now, this is mainly a RFC to get your opinion on such a feature (so it misses proper tests). Supporting wrapped messages out-of-the-box would mainly allow to easily create middlewares that should act only or be configured differently for specific messages (by using wrappers instead of making messages implements specific interfaces and without requiring dedicated buses). For instance, I might want to track specific messages and expose metrics about it. The Symfony Serializer transport (relates to #26900) and Validator middleware serve as guinea pigs for sample purpose here. But despite message validation usually is simple as it matches one use-case and won't require specific validation groups in most cases, I still think it can be useful sometimes. Also there is the case of the [`AllValidator` which currently requires using a `GroupSequence`](#26477) as a workaround, but that could also be specified directly in message metadata instead. ## Latest updates PR updated to use a flat version of configurations instead of wrappers, using an `Envelope` wrapper and a list of envelope items. Usage: ```php $message = Envelope::wrap(new DummyMessage('Hello')) ->with(new SerializerConfiguration(array(ObjectNormalizer::GROUPS => array('foo')))) ->with(new ValidationConfiguration(array('foo', 'bar'))) ; ``` ~~By default, Messenger protagonists receive the original message. But each of them can opt-in to receive the envelope instead, by implementing `EnvelopeReaderInterface`. Then, they can read configurations from it and forward the original message or the envelope to another party.~~ Senders, encoders/decoders & receivers always get an `Envelope`. Middlewares & handlers always get a message. But middlewares can opt-in to receive the envelope instead, by implementing `EnvelopeReaderInterface`. Commits ------- 749054a [Messenger] Support configuring messages when dispatching
Thanks everyone for the discussions & reviews, @sroze for the discussions on Slack and @andersonamuller who gave me a whole new point of view just by finding the proper naming for this 😄 |
Reverted after detecting an issue. Solution is probably found but that'll wait beta2 :) |
…ispatching (ogizanagi) This reverts commit 76b17b0.
…ispatching (ogizanagi) This reverts commit 76b17b0.
…ispatching (ogizanagi) This reverts commit 76b17b0.
… (with fix) (sroze, ogizanagi) This PR was merged into the 4.1 branch. Discussion ---------- [Messenger] Re-introduce wrapped message configuration (with fix) | Q | A | ------------- | --- | Branch? | 4.1 | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | #26945 | License | MIT | Doc PR | ø The pull request was merged before beta1, but because it introduced a bug, it has been reverted. This adds back the merged PR but pushes a fix for the found bug. Commits ------- 21e49d2 [Messenger] Fix TraceableBus with envelope 599f32c Ensure the envelope is passed back and can be altered Ensure that the middlewares can also update the message within the envelope 7c33cb2 feature #26945 [Messenger] Support configuring messages when dispatching (ogizanagi)
* 4.1: [Messenger] Middleware factories support in config [HttpKernel] Make TraceableValueResolver $stopwatch mandatory [Messenger] Improve the profiler panel [Workflow] Added DefinitionBuilder::setMetadataStore(). [Messenger][DX] Uses custom method names for handlers [Messenger] remove autoconfiguration for Sender/ReceiverInterface [Messenger] Make sure default receiver name is set before command configuration [HttpKernel] Fix services are no longer injected into __invoke controllers method Rename tag attribute "name" by "alias" Autoconfiguring TransportFactoryInterface classes [Messenger] Fix new AMQP Transport test with Envelope Fixed return senders based on the message parents/interfaces [Messenger] Make sure Sender and Receiver locators have valid services [Workflow] add is deprecated since Symfony 4.1. Use addWorkflow() instead [Messenger] Fix TraceableBus with envelope Ensure the envelope is passed back and can be altered Ensure that the middlewares can also update the message within the envelope feature symfony#26945 [Messenger] Support configuring messages when dispatching (ogizanagi) Add more tests around the AMQP transport
For now, this is mainly a RFC to get your opinion on such a feature (so it misses proper tests).
Supporting wrapped messages out-of-the-box would mainly allow to easily create middlewares that should act only or be configured differently for specific messages (by using wrappers instead of making messages implements specific interfaces and without requiring dedicated buses). For instance, I might want to track specific messages and expose metrics about it.
The Symfony Serializer transport (relates to #26900) and Validator middleware serve as guinea pigs for sample purpose here. But despite message validation usually is simple as it matches one use-case and won't require specific validation groups in most cases, I still think it can be useful sometimes. Also there is the case of the
AllValidator
which currently requires using aGroupSequence
as a workaround, but that could also be specified directly in message metadata instead.Latest updates
PR updated to use a flat version of configurations instead of wrappers, using an
Envelope
wrapper and a list of envelope items.Usage:
By default, Messenger protagonists receive the original message. But each of them can opt-in to receive the envelope instead, by implementingEnvelopeReaderInterface
.Then, they can read configurations from it and forward the original message or the envelope to another party.
Senders, encoders/decoders & receivers always get an
Envelope
.Middlewares & handlers always get a message. But middlewares can opt-in to receive the envelope instead, by implementing
EnvelopeReaderInterface
.