Skip to content

[Messenger] Worker events + global retry functionality #30557

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 23, 2019

Conversation

weaverryan
Copy link
Member

@weaverryan weaverryan commented Mar 13, 2019

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? yes, on Messenger only
Deprecations? no
Tests pass? NEEDED
Fixed tickets #29132, #27008, #27215 and part of #30540
License MIT
Doc PR TODO

This is an alternative to #29132 and #27008. There are several big things:

  1. The messenger:consume does not die if a handler has an error
  2. Events are dispatched before, after and on error a message being handled
  3. Logic is moved out of Amqp and into the Worker so that we can have some consistent features, like error handling.
  4. A generic retry system was added, which works with Amqp and future transports should support.
    It will work out of the box for users. Retrying works by putting the received Envelope back into the bus, but with the ReceivedStamp removed. The retry functionality has an integration test for AMQP.
  5. Added a new MessageDecodingFailedException that transport Serializers should throw if decode() fails. It allows us to reject a message in this situation, as allowing it to fail but remain on the queue causes it to be retried forever.
  6. A new DelayStamp was added, which is the first of (later) more stamps for configuring the transport layer (see [Messenger][RFC] Generic TransportConfig stamp #30558).

BC breaks are documented in the CHANGELOG.

Thanks!

@weaverryan weaverryan requested a review from sroze as a code owner March 13, 2019 19:58
@weaverryan weaverryan mentioned this pull request Mar 13, 2019
36 tasks
@nicolas-grekas nicolas-grekas added this to the next milestone Mar 14, 2019
@weaverryan weaverryan changed the title [WIP] Events for Messenger worker and not failing [WIP][Messenger] Worker events + global retry functionality Mar 15, 2019
@weaverryan
Copy link
Member Author

I've now incorporated generic retry abilities from #27008. I think more details need to be considered, like #30558 (transport stamp) and how it will affect things.

But, generally speaking, what big issues do people see? I'm pushing much more control into the Worker, and making the transport a bit more agnostic to sending/receiving/acking/nacking/retrying.

@@ -24,6 +24,8 @@
*/
class Connection
{
public const ATTEMPT_COUNT_HEADER_NAME = 'symfony-messenger-attempts';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AMQP uses x-death when a message is dead-letter-ed. Maybe we can reuse the same? (not sure if it conflicts though, would need to try).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I don't know much about this, but I don't think it'll work. Each time we retry, we're re-using the headers from the existing message. This has a nice effect that the x-death header (which is an array) will have 0 items, then 1 item, then 2 items, etc - it'll increase with each "death". The "newest" death apparently (i've just tested) always becomes the 0 key - the others are "pushed back". If we try to insert an entry into x-death, it just looks like there was a "previous" death, and Rabbit pushes a new item onto the 0 index and our entry is pushed back to 0.

It seems like we shouldn't be setting values onto x-death.

@weaverryan
Copy link
Member Author

weaverryan commented Mar 18, 2019

Thank you @sroze for the review! I was working concurrently with your review, so I pushed a bunch of changes as you were commenting. I would love another look. I've replied to a few of your comments with the changes I've made. Specifically, the QueuedMetadata thing is now gone, replaced with a Stamp. Also (and this is important), I've given the AmqpReceiver some temporary state. The issue is that the AMQPEnvelope is needed to do things like ack, nack, etc. And because these methods are now called by Worker, I previously was passing this "message" back out to the Worker... which was unfortunate, because this is really an internal detail to each transport. I fixed that by tracking the current AMQPEnvelope being handled inside AmqpReceiver, which allows us to drop passing the AMQPEnvelope object around.

I also made the retry stuff configurable on a transport-by-transport basis using a service.

Also, question: should we ever redeliver via nack() onto the same queue? Like, if, for some reason, redelivery onto the DLX fails, should we nack() retry? Or is that just madness - trying to gracefully fail when something has gone terribly wrong.

Thanks!

*
* @throws TransportException If there is an issue communicating with the transport
*/
public function retryCurrentMessage(int $delay): void;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These all rely on being called from inside the receive() loop, because that keeps temporary "state" about which message is currently being handled. That's a key change here, which simplifies a lot, but which I want to make sure won't cause issues.

@weaverryan
Copy link
Member Author

This is ready for review! This represents a big change in how we handle the transports, so I really appreciate review!

$this->dispatchEvent(
WorkerMessageFailedEvent::class,
new WorkerMessageFailedEvent($envelope, $this->receiverName, $throwable, $shouldRequeue)
);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The events are currently dispatched before ack/reject/retry on the queue. That's subjective, and either order could, in theory, cause a situation where one fails and so the other doesn't run (e.g. some listener throws an exception, so the retry never happens, or, if we reverse, the retry fails due to a network connection, then the event is never dispatched). Not sure if we need to be thinking about this level of failure. Catching exceptions makes things harder to debug/know about when they go wrong.

@@ -140,8 +145,13 @@ protected function execute(InputInterface $input, OutputInterface $output): void
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName));
}

if (!$this->retryStrategyLocator->has($receiverName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we decorate the receiver instead of having this logic in the command & worker then? 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably could, but should we? The code reads really clearly inside Worker, and my thought is sort of that we're setting out the "core" logic that (unless you really want to) everyone gets. At this point, it includes a lot - event dispatching, retry logic (and a stamp being added for this).

So I guess I would say: someone needs to sell hard on the idea of making it a decorator.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, to say it differently:

  • Do we really need this?
  • If we do want it, could we only move some parts out (e.g. logging decorator, or event listener, event decorator) and leave others

$AMQPEnvelope = $this->connection->get();
if (null === $AMQPEnvelope) {
try {
$this->currentAmqpEnvelope = $this->connection->get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not fond of the idea of forcing the receivers to have such local state at all. Can't you have another stamp (non-serialisable), AMQP-specific, that contains this \AMQPMessage? You'd call it AmqpStamp and that's it, you can just get it from the Envelope, no need of these *currentMessage methods anymore, just "normal" methods taking the Envelope as an argument.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, I've gone back-and-forth on this. Your transport-specific stamp is a nice idea. It would leak this information out to userland, like middleware or event listeners. What do you think about that?

Also, in the latest commit, on retry, I re-send the Envelope for normal encoding/decoding. This means (and is by design) that if a new Envelope was created with new stamps, those stamps will be serialized & sent. It would mean that this AMQP-specific stamp would be serialized & sent. You mentioned "non-serialisable"... do you basically mean: give it a "sleep" method so that if/when we serialize it, it'll just be an empty object (i.e. it won't cause an error).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: if we did this and made methods like reject() require the Envelope, we wouldn't be able to handle the MessageDecodingFailedException in the Worker - catching that and rejecting would need to remain the responsibility of each transport, because there wouldn't be any Envelope that the Worker could send back to the Receiver::reject() method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would leak this information out to userland, like middleware or event listeners. What do you think about that?

That's a good thing IMHO. This allows users to go deep into customising it.

This means (and is by design) that if a new Envelope was created with new stamps, those stamps will be serialized & sent.

Indeed, hence my (non-serializable) comment. We need a way to make them non-transportable, it's definitely a valid use-case. The sleep method works but is only about the serialize method... I'd imagine a NonSerializableStampInterface actually.

if we did this and made methods like reject() require the Envelope, we wouldn't be able to handle the MessageDecodingFailedException in the Worker

Technically speaking we could actually create an empty envelope as part of the exception. It would make sense. But for the sake of this pull-request, let's keep it the responsibility of the transport :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made the change to use a stamp (AmqpReceivedStamp) and it's really, really nice - good suggestion. However, I also found out that serializing that stamp works just fine. The AMQPEnvelope inside it is just a simple object that doesn't cause any problems.

So, should we still add this NonSerializableStampInterface? Serializing the AMQPEnvelope doesn't cause any problems, as we're always looking for $envelope->last(AMQPEnvelop::class). Also, to implement this, each serializer would actually need to create a new envelope with the NonSerializableStampInterface filtered out. Totally doable, but is this something we actually need to add?

@weaverryan
Copy link
Member Author

One last update: "retrying" is no longer a special situation - we basically just call SenderInterface::send() with whatever the delay is, then "ack" the old message. That is basically what we were doing before, but the difference is that we're now re-serializing the Envelope on re-send, instead of sending a duplicate of the original message. That allows us to add stamps to the Envelope, which is really powerful because we can, for example, manage the RetryCountStamp in a way where the transport doesn't need to do anything with tracking how many retries have happened.

@weaverryan
Copy link
Member Author

Ready to go again!

Last commits guarantee that redeliveries are only sent back to the same transport.

@Nyholm
Copy link
Member

Nyholm commented Mar 22, 2019

There are still quite a few mentions of "queue" when we really mean "transport". I think we should try to be more technically correct.

Copy link
Contributor

@sroze sroze left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fabulous. Last thing is the event name I think.

@fabpot can you update your review? 🙏

@weaverryan
Copy link
Member Author

Last changes look good to me. Thank you!

@fabpot
Copy link
Member

fabpot commented Mar 23, 2019

@weaverryan Can you squash so that I can merge? Thank you.

Co-authored-by: Samuel ROZE <samuel.roze@gmail.com>
@weaverryan
Copy link
Member Author

Squashed!

@fabpot
Copy link
Member

fabpot commented Mar 23, 2019

Thank you @weaverryan.

@fabpot fabpot merged commit a989384 into symfony:master Mar 23, 2019
fabpot added a commit that referenced this pull request Mar 23, 2019
… (weaverryan)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Worker events + global retry functionality

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | yes, on Messenger only
| Deprecations? | no
| Tests pass?   | NEEDED
| Fixed tickets | #29132, #27008, #27215 and part of #30540
| License       | MIT
| Doc PR        | TODO

This is an alternative to #29132 and #27008. There are several big things:

1) The `messenger:consume` does not die if a handler has an error
2) Events are dispatched before, after and on error a message being handled
3) Logic is moved out of Amqp and into the Worker so that we can have some consistent features, like error handling.
4) A generic retry system was added, which works with Amqp and future transports should support.
 It will work out of the box for users. Retrying works by putting the received `Envelope` back into the bus, but with the `ReceivedStamp` removed. The retry functionality has an integration test for AMQP.
5) Added a new `MessageDecodingFailedException` that transport Serializers should throw if `decode()` fails. It allows us to reject a message in this situation, as allowing it to fail but remain on the queue causes it to be retried forever.
6) A new `DelayStamp` was added, which is the first of (later) more stamps for configuring the transport layer (see #30558).

BC breaks are documented in the CHANGELOG.

Thanks!

Commits
-------

a989384 Adding global retry support, events & more to messenger transport
@weaverryan weaverryan deleted the worker-events branch March 23, 2019 14:02
sroze added a commit that referenced this pull request Mar 23, 2019
…nnect() does not work (sroze)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Ensure an exception is thrown when the AMQP connect() does not work

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #30557
| License       | MIT
| Doc PR        | ø

This `connectionCredentials` instance escaped the renaming in #30557.

Commits
-------

46b9476 Ensure an exception is thrown when the AMQP connect() does not work
fabpot added a commit that referenced this pull request Mar 23, 2019
…d (weaverryan)

This PR was merged into the 4.3-dev branch.

Discussion
----------

Dispatching two events when a message is sent & handled

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | none
| License       | MIT
| Doc PR        | TODO

Alternative to #30646. This uses a more generic system, so you could do anything when a message is sent. The main use-case is when a message is dispatched by a 3rd party.

I didn't try to add *exhaustive* events everywhere: I added an event for a very specific use-case:

When a message is dispatched by a 3rd party, being able to add stamps (e.g. `DelayStamp` or a future `AmqpRoutingKeyStamp` before the message is sent. Example:

```php
class MailerMessageSendToTransportEventSubscriber implements EventSubscriberInterface
{
    public function onSendMessage(SendMessageToTransportsEvent $event)
    {
        $envelope = $event->getEnvelope();
        if (!$envelope->getMessage() instanceof SomeMailerMessage) {
            return;
        }

        $event->setEnvelope($envelope->with(new AmpqRoutingKeyStamp('mailer-route')));
    }

    public static function getSubscribedEvents()
    {
        return [SendMessageToTransportsEvent::class => 'onSendMessage'];
    }
}
```

Along with #30557, we will now have the following events, regarding async messages:
* Event when a message is sent to transports (this PR)
* Event when a message is received from transport, but before handling it
* Event when a message is received from transport and after handling it

Commits
-------

a7ad1b4 Dispatching two events when a message is sent & handled
sroze added a commit that referenced this pull request Mar 31, 2019
This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Add a Doctrine transport

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets |
| License       | MIT
| Doc PR        | symfony/symfony-docs#10616
| DoctrineBundle PR | doctrine/DoctrineBundle#868

As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component.

Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database).

# How it works

The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868

## Configuration

To configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`)
```yml
        # config/packages/messenger.yaml
        framework:
            messenger:
                transports:
                    my_transport: "doctrine://default?queue=important"
```

## Table schema

Dispatched messages are stored into a database table with the following schema:

| Column       | Type     | Options                  | Description                                                       |
|--------------|----------|--------------------------|-------------------------------------------------------------------|
| id           | bigint   | AUTO_INCREMENT, NOT NULL | Primary key                                                       |
| body         | text     | NOT NULL                 | Body of the message                                               |
| headers      | text     | NOT NULL                 | Headers of the message                                            |
| queue      | varchar(32)     | NOT NULL                 | Headers of the message                                            |
| created_at   | datetime | NOT NULL                 | When the message was inserted onto the table. (automatically set) |
| available_at       | datetime   | NOT NULL                 | When the message is available to be handled                      |
| delivered_at | datetime | NULL                     | When the message was delivered to a worker                        |

## Message dispatching

When dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish`

## Message consuming

The message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle.

### Getting the next message

* Start a transaction
* Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query)
* Update the message in database to update the delivered_at columns
* Commit the transaction

### Handling the message

The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table.

If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`.

## Message requeueing

It may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a  `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds)

# TODO

- [x] Add tests
- [x] Create DOC PR
- [x] PR on doctrine-bundle for transport factory
- [x] Add a `available_at` column
- [x] Add a `queue` column
- [x] Implement the retry functionnality : See #30557
- [x] Rebase after #29476

Commits
-------

88d008c [Messenger] Add a Doctrine transport
@nicolas-grekas nicolas-grekas modified the milestones: next, 4.3 Apr 30, 2019
@fabpot fabpot mentioned this pull request May 9, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants