-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Support for handling messages after current bus is finished #28849
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you look at Travis and giving a show-case of the configuration and consequence on the execution flow?
(PS: Yep, definitely like the idea much more than the recorder 👍)
src/Symfony/Component/Messenger/Middleware/HandleMessageInNewTransactionMiddleware.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Middleware/HandleMessageInNewTransactionMiddleware.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Middleware/HandleMessageInNewTransactionMiddleware.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Middleware/HandleMessageInNewTransactionMiddleware.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Middleware/HandleMessageInNewTransactionMiddleware.php
Outdated
Show resolved
Hide resolved
I don't know if I really understand the PR but it is weird to dispatch event in handlers. Normally, you should dispatch event after committing the main transaction. Then, you can dispatch all events raised by your domain object (entity) and subscribe them. Example: class User
{
private $messages = [];
public function __construct()
{
$this->messages[] = new UserRegistered(/*...*/);
}
public function messages()
{
return $this->messages;
}
}
// It should be wrapped into a transaction
class RegisterUserHandler
{
public function __invoke(RegisterUser $command)
{
$user = new User(/*...*/);
// If we use a collection oriented repository, events can be collected in a middleware. In the unit of work, you can find the aggregate User and get all messages.
$this->userRespoitory->add($user);
// If we use a collection oriented repository, you don't have a unit of work so your repository need to collect them.
$this->userRespoitory->save($user);
}
}
class DispatchEventMiddleware
{
public function handle(/*...*/)
{
// Your transaction has been committed then your dispatch your event
}
} Perhaps, |
@Nyholm : I gave this a shot in userland. This just needs some tweaks to comply the 4.2 Messenger interfaces (and sharing the same middleware instance across buses (#28849 (comment))), but it works great! Tell me if you want some help to take over. |
Thank you for testing this. Yes, please. This has been on my TODO list for ages now. Please help me. |
src/Symfony/Component/Messenger/Middleware/HandleMessageInNewTransactionMiddleware.php
Outdated
Show resolved
Hide resolved
fc3dc64
to
e00518d
Compare
I've pushed new changes and tests, trying to showcase a bit the feature in it. Status: Needs Review |
I have tried implementing this in userland with a test app and I can not figure out how to dispatch between buses. I have a command bus that is trying to dispatch to an event bus using the Transaction stamp. In my |
e00518d
to
4de9e68
Compare
I've rebased this PR on master. @gubler Did you add the |
@Nyholm Yes, I did. I have created a test project here that you can review and see if I messed anything up. I'm guessing its more likely I've made an error in the config or usage. |
@gubler : It seems you only declared the Which means the event you dispatch from the command handler to the event bus won't go through the middleware queuing it, so it's dispatched directly. You must declare it on each bus likely to have messages they handle queued by a transaction. Which means your event bus here. |
@ogizanagi I added the middleware to the event bus: https://github.com/gubler/messenger-transaction-test/blob/master/config/packages/messenger.yaml Now the |
@gubler: 😱 There was indeed an issue! The wrong Thanks for testing this and for the reproducer btw. |
Merged your changes into my test app and it looks like everything works now! Thank you very much for figuring this out 🎉 |
You guys are the best. Thank you @gubler and @ogizanagi to testing and fixing this PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comments. I've been playing with this, it works awesome!
src/Symfony/Component/Messenger/Stamp/DispatchAfterCurrentBus.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Middleware/DispatchAfterCurrentBusMiddleware.php
Show resolved
Hide resolved
$queueItem->getStack()->next()->handle($queueItem->getEnvelope(), $queueItem->getStack()); | ||
} catch (\Exception $exception) { | ||
// Gather all exceptions | ||
$exceptions[] = $exception; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need to document that when you use this stamp, a totally different exception class will be thrown if your handler throws an exception. Just important to know if you're error handling.
src/Symfony/Component/Messenger/Exception/QueuedMessageHandlingException.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Exception/QueuedMessageHandlingException.php
Outdated
Show resolved
Hide resolved
126bc7c
to
97f9360
Compare
Thank you. I've updated the PR according to @weaverryan's suggestions and I've rebased to avoid the merge conflict. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 from me
This worked great when I tested it and I think the naming is clear now. Tobias added this as one of the default middleware because, even though not everyone will need to use it, it doesn't do anything unless you opt-in.
Minor conflict now on Still +1 from me |
97f9360
to
4567943
Compare
I fixed the conflict |
Please squash commits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great 👍
Co-authored-by: Maxime Steinhausser <ogizanagi@users.noreply.github.com>
4567943
to
903355f
Compare
Thank you for the reviews. I've squashed my commits |
Thank you @Nyholm. |
…t bus is finished (Nyholm) This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Support for handling messages after current bus is finished | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | | License | MIT | Doc PR | symfony/symfony-docs#10015 This is a replacement for #27844. We achieve the same goals without introducing the new concept of "recorder". ```php class CreateUserHandler { private $em; private $eventBus; public function __construct(MessageBus $eventBus, EntityManagerInterface $em) { $this->eventBus = $eventBus; $this->em = $em; } public function __invoke(CreateUser $command) { $user = new User($command->getUuid(), $command->getName(), $command->getEmail()); $this->em->persist($user); $message = new UserCreatedEvent($command->getUuid(); $this->eventBus->dispatch((new Envelope($message))->with(new DispatchAfterCurrentBus())); } } ``` Note that this `DispatchAfterCurrentBusMiddleware` is added automatically as the first middleware. 2019-03-13: I updated the PR description. Commits ------- 903355f Support for handling messages after current bus is finished
Thank you for merging |
This is a replacement for #27844. We achieve the same goals without introducing the new concept of "recorder".
Note that this
DispatchAfterCurrentBusMiddleware
is added automatically as the first middleware.2019-03-13: I updated the PR description.