Skip to content

[Messenger] Support for handling messages after current bus is finished #28849

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 19, 2019

Conversation

Nyholm
Copy link
Member

@Nyholm Nyholm commented Oct 13, 2018

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? no
Deprecations? no
Tests pass? yes
Fixed tickets
License MIT
Doc PR symfony/symfony-docs#10015

This is a replacement for #27844. We achieve the same goals without introducing the new concept of "recorder".

class CreateUserHandler
{
    private $em;
    private $eventBus;

    public function __construct(MessageBus $eventBus, EntityManagerInterface $em)
    {
        $this->eventBus = $eventBus;
        $this->em = $em;
    }

    public function __invoke(CreateUser $command)
    {
        $user = new User($command->getUuid(), $command->getName(), $command->getEmail());
        $this->em->persist($user);

        $message = new UserCreatedEvent($command->getUuid();
        $this->eventBus->dispatch((new Envelope($message))->with(new DispatchAfterCurrentBus()));
    }
}

Note that this DispatchAfterCurrentBusMiddleware is added automatically as the first middleware.

2019-03-13: I updated the PR description.

sroze
sroze previously requested changes Oct 21, 2018
Copy link
Contributor

@sroze sroze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you look at Travis and giving a show-case of the configuration and consequence on the execution flow?

(PS: Yep, definitely like the idea much more than the recorder 👍)

@arnolanglade
Copy link

I don't know if I really understand the PR but it is weird to dispatch event in handlers. Normally, you should dispatch event after committing the main transaction. Then, you can dispatch all events raised by your domain object (entity) and subscribe them.

Example:

class User 
{
    private $messages = [];
    public function __construct()
    {
        $this->messages[] = new UserRegistered(/*...*/);
    }
    
    public function messages() 
    {
        return $this->messages;
    }
}

// It should be wrapped into a transaction
class RegisterUserHandler
{
    public function __invoke(RegisterUser $command)
    {
        $user = new User(/*...*/);
        
        // If we use a collection oriented repository, events can be collected in a middleware. In the unit of work, you can find the aggregate User and get all messages.  
        $this->userRespoitory->add($user);

        // If we use a collection oriented repository, you don't have a unit of work so your repository need to collect them.
        $this->userRespoitory->save($user);
    }
}

class DispatchEventMiddleware
{
    public function handle(/*...*/)
    {
        // Your transaction has been committed then your dispatch your event
    }
}

Perhaps, $this->messageBus->dispatch(/*...*/) should not dispatch event/message synchronously?

@ogizanagi
Copy link
Contributor

@Nyholm : I gave this a shot in userland. This just needs some tweaks to comply the 4.2 Messenger interfaces (and sharing the same middleware instance across buses (#28849 (comment))), but it works great!
I'm not sure about the terms used, though. Transaction may be a bit confusing with the doctrine transaction middleware. No strong opinion, maybe I'm just used to the lock term in use with Tactician.

Tell me if you want some help to take over.

@Nyholm
Copy link
Member Author

Nyholm commented Nov 29, 2018

Thank you for testing this.

Yes, please. This has been on my TODO list for ages now. Please help me.
I will try to fix this during the holidays but if you (or someone) have time before that, feel free.

@ogizanagi ogizanagi force-pushed the messenger-transaction branch 2 times, most recently from fc3dc64 to e00518d Compare December 12, 2018 09:18
@ogizanagi
Copy link
Contributor

I've pushed new changes and tests, trying to showcase a bit the feature in it.
I may have a look at the docs, too.

Status: Needs Review

@gubler
Copy link

gubler commented Feb 9, 2019

I have tried implementing this in userland with a test app and I can not figure out how to dispatch between buses. I have a command bus that is trying to dispatch to an event bus using the Transaction stamp. In my messenger.yml I have the transaction middleware declared for both buses. Is there something else I need to configure? My service for the middleware is declared with abstract: false as I saw the comment that otherwise separate middlewares would be generated.

@Nyholm Nyholm force-pushed the messenger-transaction branch from e00518d to 4de9e68 Compare February 22, 2019 13:53
@Nyholm
Copy link
Member Author

Nyholm commented Feb 22, 2019

I've rebased this PR on master.
Status: Needs Review

@gubler Did you add the HandleMessageInNewTransactionMiddleware before the Doctrine transaction middleware?

@gubler
Copy link

gubler commented Feb 23, 2019

@Nyholm Yes, I did. I have created a test project here that you can review and see if I messed anything up.

I'm guessing its more likely I've made an error in the config or usage.

@ogizanagi
Copy link
Contributor

ogizanagi commented Feb 23, 2019

@gubler : It seems you only declared the handle_message_in_new_transaction middleware on command bus: https://github.com/gubler/messenger-transaction-test/blob/54b225f5df9a68a502d8e74d7acbcba2c4da94e5/config/packages/messenger.yaml#L9

Which means the event you dispatch from the command handler to the event bus won't go through the middleware queuing it, so it's dispatched directly.

You must declare it on each bus likely to have messages they handle queued by a transaction. Which means your event bus here.

@gubler
Copy link

gubler commented Feb 23, 2019

@ogizanagi I added the middleware to the event bus: https://github.com/gubler/messenger-transaction-test/blob/master/config/packages/messenger.yaml

Now the BookCreatedSubscriber does not execute at all, whether the NewBookHandler succeeds or errors.

@ogizanagi
Copy link
Contributor

@gubler: 😱 There was indeed an issue! The wrong $stack was passed to the next middleware when resuming dispatching the queued items. Which means the handler middleware was not called, as the middleware stack used was empty (because the main dispatch call's stack was used).

Thanks for testing this and for the reproducer btw.
Could you give a try to the last commit? 🙏

@gubler
Copy link

gubler commented Feb 24, 2019

Merged your changes into my test app and it looks like everything works now!

Thank you very much for figuring this out 🎉

@Nyholm
Copy link
Member Author

Nyholm commented Feb 24, 2019

You guys are the best. Thank you @gubler and @ogizanagi to testing and fixing this PR.

Copy link
Member

@weaverryan weaverryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments. I've been playing with this, it works awesome!

$queueItem->getStack()->next()->handle($queueItem->getEnvelope(), $queueItem->getStack());
} catch (\Exception $exception) {
// Gather all exceptions
$exceptions[] = $exception;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to document that when you use this stamp, a totally different exception class will be thrown if your handler throws an exception. Just important to know if you're error handling.

@Nyholm Nyholm force-pushed the messenger-transaction branch from 126bc7c to 97f9360 Compare March 17, 2019 10:12
@Nyholm
Copy link
Member Author

Nyholm commented Mar 17, 2019

Thank you. I've updated the PR according to @weaverryan's suggestions and I've rebased to avoid the merge conflict.

Copy link
Member

@weaverryan weaverryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 from me

This worked great when I tested it and I think the naming is clear now. Tobias added this as one of the default middleware because, even though not everyone will need to use it, it doesn't do anything unless you opt-in.

@weaverryan
Copy link
Member

Minor conflict now on composer.json file.

Still +1 from me

@Nyholm Nyholm force-pushed the messenger-transaction branch from 97f9360 to 4567943 Compare March 18, 2019 21:26
@Nyholm
Copy link
Member Author

Nyholm commented Mar 18, 2019

I fixed the conflict

@nicolas-grekas
Copy link
Member

Please squash commits.

Copy link
Contributor

@sroze sroze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great 👍

Co-authored-by: Maxime Steinhausser <ogizanagi@users.noreply.github.com>
@Nyholm Nyholm changed the title [Messenger] Support for handling messages in different transaction [Messenger] Support for handling messages after current bus is finished Mar 19, 2019
@Nyholm Nyholm force-pushed the messenger-transaction branch from 4567943 to 903355f Compare March 19, 2019 04:16
@Nyholm
Copy link
Member Author

Nyholm commented Mar 19, 2019

Thank you for the reviews. I've squashed my commits

@sroze
Copy link
Contributor

sroze commented Mar 19, 2019

Thank you @Nyholm.

@sroze sroze merged commit 903355f into symfony:master Mar 19, 2019
sroze added a commit that referenced this pull request Mar 19, 2019
…t bus is finished (Nyholm)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Support for handling messages after current bus is finished

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets |
| License       | MIT
| Doc PR        | symfony/symfony-docs#10015

This is a replacement for #27844. We achieve the same goals without introducing the new concept of "recorder".

```php
class CreateUserHandler
{
    private $em;
    private $eventBus;

    public function __construct(MessageBus $eventBus, EntityManagerInterface $em)
    {
        $this->eventBus = $eventBus;
        $this->em = $em;
    }

    public function __invoke(CreateUser $command)
    {
        $user = new User($command->getUuid(), $command->getName(), $command->getEmail());
        $this->em->persist($user);

        $message = new UserCreatedEvent($command->getUuid();
        $this->eventBus->dispatch((new Envelope($message))->with(new DispatchAfterCurrentBus()));
    }
}
```

Note that this `DispatchAfterCurrentBusMiddleware` is added automatically as the first middleware.

2019-03-13: I updated the PR description.

Commits
-------

903355f Support for handling messages after current bus is finished
@Nyholm
Copy link
Member Author

Nyholm commented Mar 19, 2019

Thank you for merging

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants