-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Worker events + global retry functionality #30557
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
822b289
to
a2d30ac
Compare
I've now incorporated generic retry abilities from #27008. I think more details need to be considered, like #30558 (transport stamp) and how it will affect things. But, generally speaking, what big issues do people see? I'm pushing much more control into the |
d16c1a3
to
41844fc
Compare
41844fc
to
bf338e7
Compare
src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
Outdated
Show resolved
Hide resolved
@@ -24,6 +24,8 @@ | |||
*/ | |||
class Connection | |||
{ | |||
public const ATTEMPT_COUNT_HEADER_NAME = 'symfony-messenger-attempts'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AMQP uses x-death
when a message is dead-letter-ed. Maybe we can reuse the same? (not sure if it conflicts though, would need to try).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm. I don't know much about this, but I don't think it'll work. Each time we retry, we're re-using the headers from the existing message. This has a nice effect that the x-death
header (which is an array) will have 0 items, then 1 item, then 2 items, etc - it'll increase with each "death". The "newest" death apparently (i've just tested) always becomes the 0
key - the others are "pushed back". If we try to insert an entry into x-death
, it just looks like there was a "previous" death, and Rabbit pushes a new item onto the 0 index and our entry is pushed back to 0.
It seems like we shouldn't be setting values onto x-death
.
src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php
Outdated
Show resolved
Hide resolved
Thank you @sroze for the review! I was working concurrently with your review, so I pushed a bunch of changes as you were commenting. I would love another look. I've replied to a few of your comments with the changes I've made. Specifically, the I also made the retry stuff configurable on a transport-by-transport basis using a service. Also, question: should we ever redeliver via Thanks! |
2f66cb6
to
8c851ca
Compare
* | ||
* @throws TransportException If there is an issue communicating with the transport | ||
*/ | ||
public function retryCurrentMessage(int $delay): void; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These all rely on being called from inside the receive()
loop, because that keeps temporary "state" about which message is currently being handled. That's a key change here, which simplifies a lot, but which I want to make sure won't cause issues.
This is ready for review! This represents a big change in how we handle the transports, so I really appreciate review! |
$this->dispatchEvent( | ||
WorkerMessageFailedEvent::class, | ||
new WorkerMessageFailedEvent($envelope, $this->receiverName, $throwable, $shouldRequeue) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The events are currently dispatched before ack/reject/retry on the queue. That's subjective, and either order could, in theory, cause a situation where one fails and so the other doesn't run (e.g. some listener throws an exception, so the retry never happens, or, if we reverse, the retry fails due to a network connection, then the event is never dispatched). Not sure if we need to be thinking about this level of failure. Catching exceptions makes things harder to debug/know about when they go wrong.
src/Symfony/Bundle/FrameworkBundle/DependencyInjection/Configuration.php
Outdated
Show resolved
Hide resolved
@@ -140,8 +145,13 @@ protected function execute(InputInterface $input, OutputInterface $output): void | |||
throw new RuntimeException(sprintf('Bus "%s" does not exist.', $busName)); | |||
} | |||
|
|||
if (!$this->retryStrategyLocator->has($receiverName)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we decorate the receiver instead of having this logic in the command & worker then? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably could, but should we? The code reads really clearly inside Worker
, and my thought is sort of that we're setting out the "core" logic that (unless you really want to) everyone gets. At this point, it includes a lot - event dispatching, retry logic (and a stamp being added for this).
So I guess I would say: someone needs to sell hard on the idea of making it a decorator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, to say it differently:
- Do we really need this?
- If we do want it, could we only move some parts out (e.g. logging decorator, or event listener, event decorator) and leave others
$AMQPEnvelope = $this->connection->get(); | ||
if (null === $AMQPEnvelope) { | ||
try { | ||
$this->currentAmqpEnvelope = $this->connection->get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not fond of the idea of forcing the receivers to have such local state at all. Can't you have another stamp (non-serialisable), AMQP-specific, that contains this \AMQPMessage
? You'd call it AmqpStamp
and that's it, you can just get it from the Envelope
, no need of these *currentMessage
methods anymore, just "normal" methods taking the Envelope
as an argument.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, I've gone back-and-forth on this. Your transport-specific stamp is a nice idea. It would leak this information out to userland, like middleware or event listeners. What do you think about that?
Also, in the latest commit, on retry, I re-send the Envelope for normal encoding/decoding. This means (and is by design) that if a new Envelope was created with new stamps, those stamps will be serialized & sent. It would mean that this AMQP-specific stamp would be serialized & sent. You mentioned "non-serialisable"... do you basically mean: give it a "sleep" method so that if/when we serialize it, it'll just be an empty object (i.e. it won't cause an error).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side note: if we did this and made methods like reject()
require the Envelope
, we wouldn't be able to handle the MessageDecodingFailedException
in the Worker
- catching that and rejecting would need to remain the responsibility of each transport, because there wouldn't be any Envelope
that the Worker
could send back to the Receiver::reject()
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would leak this information out to userland, like middleware or event listeners. What do you think about that?
That's a good thing IMHO. This allows users to go deep into customising it.
This means (and is by design) that if a new Envelope was created with new stamps, those stamps will be serialized & sent.
Indeed, hence my (non-serializable) comment. We need a way to make them non-transportable, it's definitely a valid use-case. The sleep
method works but is only about the serialize
method... I'd imagine a NonSerializableStampInterface
actually.
if we did this and made methods like reject() require the Envelope, we wouldn't be able to handle the MessageDecodingFailedException in the Worker
Technically speaking we could actually create an empty envelope as part of the exception. It would make sense. But for the sake of this pull-request, let's keep it the responsibility of the transport :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've made the change to use a stamp (AmqpReceivedStamp
) and it's really, really nice - good suggestion. However, I also found out that serializing that stamp works just fine. The AMQPEnvelope
inside it is just a simple object that doesn't cause any problems.
So, should we still add this NonSerializableStampInterface
? Serializing the AMQPEnvelope
doesn't cause any problems, as we're always looking for $envelope->last(AMQPEnvelop::class)
. Also, to implement this, each serializer would actually need to create a new envelope with the NonSerializableStampInterface
filtered out. Totally doable, but is this something we actually need to add?
src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
Outdated
Show resolved
Hide resolved
One last update: "retrying" is no longer a special situation - we basically just call |
d5a83b4
to
76f6378
Compare
Ready to go again! Last commits guarantee that redeliveries are only sent back to the same transport. |
There are still quite a few mentions of "queue" when we really mean "transport". I think we should try to be more technically correct. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fabulous. Last thing is the event name I think.
@fabpot can you update your review? 🙏
src/Symfony/Component/Messenger/Event/WorkerMessageHandlingEvent.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Receiver/ReceiverInterface.php
Outdated
Show resolved
Hide resolved
Last changes look good to me. Thank you! |
@weaverryan Can you squash so that I can merge? Thank you. |
Co-authored-by: Samuel ROZE <samuel.roze@gmail.com>
895562b
to
a989384
Compare
Squashed! |
Thank you @weaverryan. |
… (weaverryan) This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Worker events + global retry functionality | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | yes, on Messenger only | Deprecations? | no | Tests pass? | NEEDED | Fixed tickets | #29132, #27008, #27215 and part of #30540 | License | MIT | Doc PR | TODO This is an alternative to #29132 and #27008. There are several big things: 1) The `messenger:consume` does not die if a handler has an error 2) Events are dispatched before, after and on error a message being handled 3) Logic is moved out of Amqp and into the Worker so that we can have some consistent features, like error handling. 4) A generic retry system was added, which works with Amqp and future transports should support. It will work out of the box for users. Retrying works by putting the received `Envelope` back into the bus, but with the `ReceivedStamp` removed. The retry functionality has an integration test for AMQP. 5) Added a new `MessageDecodingFailedException` that transport Serializers should throw if `decode()` fails. It allows us to reject a message in this situation, as allowing it to fail but remain on the queue causes it to be retried forever. 6) A new `DelayStamp` was added, which is the first of (later) more stamps for configuring the transport layer (see #30558). BC breaks are documented in the CHANGELOG. Thanks! Commits ------- a989384 Adding global retry support, events & more to messenger transport
…nnect() does not work (sroze) This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Ensure an exception is thrown when the AMQP connect() does not work | Q | A | ------------- | --- | Branch? | master | Bug fix? | yes | New feature? | no | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | #30557 | License | MIT | Doc PR | ø This `connectionCredentials` instance escaped the renaming in #30557. Commits ------- 46b9476 Ensure an exception is thrown when the AMQP connect() does not work
…d (weaverryan) This PR was merged into the 4.3-dev branch. Discussion ---------- Dispatching two events when a message is sent & handled | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | none | License | MIT | Doc PR | TODO Alternative to #30646. This uses a more generic system, so you could do anything when a message is sent. The main use-case is when a message is dispatched by a 3rd party. I didn't try to add *exhaustive* events everywhere: I added an event for a very specific use-case: When a message is dispatched by a 3rd party, being able to add stamps (e.g. `DelayStamp` or a future `AmqpRoutingKeyStamp` before the message is sent. Example: ```php class MailerMessageSendToTransportEventSubscriber implements EventSubscriberInterface { public function onSendMessage(SendMessageToTransportsEvent $event) { $envelope = $event->getEnvelope(); if (!$envelope->getMessage() instanceof SomeMailerMessage) { return; } $event->setEnvelope($envelope->with(new AmpqRoutingKeyStamp('mailer-route'))); } public static function getSubscribedEvents() { return [SendMessageToTransportsEvent::class => 'onSendMessage']; } } ``` Along with #30557, we will now have the following events, regarding async messages: * Event when a message is sent to transports (this PR) * Event when a message is received from transport, but before handling it * Event when a message is received from transport and after handling it Commits ------- a7ad1b4 Dispatching two events when a message is sent & handled
This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Add a Doctrine transport | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | | License | MIT | Doc PR | symfony/symfony-docs#10616 | DoctrineBundle PR | doctrine/DoctrineBundle#868 As discussed with @sroze at PHPForum in Paris I've worked on adding a Doctrine transport to the Messenger component. Actually `AMQP` is the only supported transport and it could be a good thing to support multiple transports. Having a Doctrine transport could help users to start using the component IMHO (Almost all projects use a database). # How it works The code is splitted betwwen this PR and the one on the DoctrineBundle : doctrine/DoctrineBundle#868 ## Configuration To configure a Doctrine transport the dsn MUST have the format `doctrine://<entity_manager_name>` where `<entity_manager_name>` is the name of the entity manager (usually `default`) ```yml # config/packages/messenger.yaml framework: messenger: transports: my_transport: "doctrine://default?queue=important" ``` ## Table schema Dispatched messages are stored into a database table with the following schema: | Column | Type | Options | Description | |--------------|----------|--------------------------|-------------------------------------------------------------------| | id | bigint | AUTO_INCREMENT, NOT NULL | Primary key | | body | text | NOT NULL | Body of the message | | headers | text | NOT NULL | Headers of the message | | queue | varchar(32) | NOT NULL | Headers of the message | | created_at | datetime | NOT NULL | When the message was inserted onto the table. (automatically set) | | available_at | datetime | NOT NULL | When the message is available to be handled | | delivered_at | datetime | NULL | When the message was delivered to a worker | ## Message dispatching When dispatching a message a new row is inserted into the table. See `Symfony\Component\Messenger\Transport\Doctrine::publish` ## Message consuming The message is retrieved by the `Symfony\Component\Messenger\Transport\Doctrine\DoctrineReceiver`. It calls the `Symfony\Component\Messenger\Transport\Doctrine::get` method to get the next message to handle. ### Getting the next message * Start a transaction * Lock the table to get the first message to handle (The lock is done with the `SELECT ... FOR UPDATE` query) * Update the message in database to update the delivered_at columns * Commit the transaction ### Handling the message The retrieved message is then passed to the handler. If the message is correctly handled the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::ack` which delete the message from the table. If an error occured the receiver call the `Symfony\Component\Messenger\Transport\Doctrine::nack` method which update the message to set the delivered_at column to `null`. ## Message requeueing It may happen that a message is stuck in `delivered` state but the handler does not really handle the message (Database connection error, server crash, ...). To requeue messages the `DoctrineReceiver` call the `Symfony\Component\Messenger\Transport\Doctrine::requeueMessages`. This method update all the message with a `delivered_at` not null since more than the "redeliver timeout" (default to 3600 seconds) # TODO - [x] Add tests - [x] Create DOC PR - [x] PR on doctrine-bundle for transport factory - [x] Add a `available_at` column - [x] Add a `queue` column - [x] Implement the retry functionnality : See #30557 - [x] Rebase after #29476 Commits ------- 88d008c [Messenger] Add a Doctrine transport
This is an alternative to #29132 and #27008. There are several big things:
messenger:consume
does not die if a handler has an errorIt will work out of the box for users. Retrying works by putting the received
Envelope
back into the bus, but with theReceivedStamp
removed. The retry functionality has an integration test for AMQP.MessageDecodingFailedException
that transport Serializers should throw ifdecode()
fails. It allows us to reject a message in this situation, as allowing it to fail but remain on the queue causes it to be retried forever.DelayStamp
was added, which is the first of (later) more stamps for configuring the transport layer (see [Messenger][RFC] Generic TransportConfig stamp #30558).BC breaks are documented in the CHANGELOG.
Thanks!