-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger][AmqpExt] Add a retry mechanism for AMQP messages #27008
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
$queue->setName(str_replace('%attempt%', $attemptNumber, $name)); | ||
// $queue->setFlags(AMQP_DURABLE); | ||
$queue->setArguments(array( | ||
'x-message-ttl' => 30000, // 30 seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This TTL should be configurable per attempt number.
$exchange = $this->amqpFactory->createExchange($this->channel()); | ||
$exchange->setName($retryConfiguration['name'] ?? 'retry'); | ||
$exchange->setType(AMQP_EX_TYPE_DIRECT); | ||
// $exchange->setFlags(AMQP_DURABLE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be removed.
{ | ||
$queue = $this->amqpFactory->createQueue($this->channel()); | ||
$queue->setName(str_replace('%attempt%', $attemptNumber, $name)); | ||
// $queue->setFlags(AMQP_DURABLE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be removed.
return false; | ||
} | ||
|
||
$routingKey = 'dead'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be a configurable dead_routing_key
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dead
is a very strong word for humans. Could we use expired
instead? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, that’s the term used by AMQP unfortunately. Maybe too_many_attempts
if really you want to change? 🤔
$queue = $this->amqpFactory->createQueue($this->channel()); | ||
$queue->setName($retryConfiguration['dead_queue']); | ||
$queue->declareQueue(); | ||
$queue->bind($exchange->getName(), 'dead'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be using the dead_routing_key
configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO, this really need to be tested against a real implementation of AMQP (RabbitMQ).
I read very quickly the code and it seems this does not work well. I guess I should test it (by hand) to ensure work, but it looks like many case are not supported:
- retrying a message that come from an exchange in a fan out mode => the message will be dispatched is all queue that are bound. It should not
- retrying a message that come from an exchange in an header mode => the message will not be dispatched at all.
Then, I would be nice to be able to configure the type of retry strategy (linear / backoff), and offset (the 3/4 first iteration may be too close, and so we want to skip it) and of course the delay
Another things: all header should be casted to string.
From PHP, with the PECL, it's not really possible to create a typed header (in AMQP you can tell if the header is a string or an int). So it's better to use string
everywhere to avoid edge case. I some version of rabbitmq, we hit this issue. The exchange were created with "string" type. But the message where created with "int" type. So there were not matches. Really hard to find bugs ;)
Edited: I forgot to say, you can grad some code from #23315 where the retry mechanisme is really advanced / configurable. But the more I read code about Messenger, The more I think we should reconsider #23315. How I see things: I should remove everything about worker, keep only AMQP stuff, and update the Messenger to use AMQP. (ping @fabpot)
$queue = $this->amqpFactory->createQueue($this->channel()); | ||
$queue->setName(str_replace('%attempt%', $attemptNumber, $retryConfiguration['queue_name_pattern'] ?? 'retry_queue_%attempt%')); | ||
$queue->setArguments(array( | ||
'x-message-ttl' => $retryConfiguration['ttls'][$attemptNumber - 1] ?? 30000, // 30 seconds by default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you add a s
in the key name ?
That's the setup I used to test this retry mechanism and it works well. I'd love you to try and see really in which case it doesn't work :)
If you believe you can't make the AMQP adapter much better by doing so, it is probably an interesting option (lots of work on your side though 😄). Though, there is a point in keeping the AMQP adapter simple and not having it "fully blown featured". Obviously, there is the maintenance cost point. But also, most people will use the default auto-setuped queues, right? I also believe that everybody willing to configure the queues and exchanges very precisely will very rarely be fully satisfied by auto-configuration (regardless of the amount of effort we put into it) and end up configuring their AMQP bits manually, that's what everybody has been doing so far at the end, and this retry mechanism PR allows you to configure the routing key pattern without auto-configuring it, so this use-case would work. |
Question about retries in general: If a consumed message is handled by multiple handlers but only one throws an exception - how do you handle this? It probably isn't desirable to retry the message on handlers that were successful. |
Rebase needed after #27129 |
That's a good question. You are right, it might not be desirable to retry on successful handlers but I believe that's the only (without a crazy amount of code) way to make it simple. On the other hand, I'd argue that something "critical" (a command in the CQRS point-of-view) will only have one handler. The things that will have multiple handlers are basically event subscribers and will be projections (or similar) and there should be no problem with having them running multiple times. |
ca47435
to
dfdb38c
Compare
PR rebased. |
I suggest we go ahead with this simple (optional) retry implementation for 4.1 and replace it with the one in the AMQP component later when introduced. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not much to say. Looks good to me.
src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
Outdated
Show resolved
Hide resolved
} | ||
|
||
$maximumAttempts = $retryConfiguration['attempts'] ?? 3; | ||
$routingKey = str_replace('%attempt%', $attemptNumber, $retryConfiguration['routing_key_pattern'] ?? 'attempt_%attempt%'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we normalize all the options values before using it, in the constructor for instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, it's a good idea.
@sroze another potential problem could be an exception being thrown at a point where retrying the message would cause additional problems. What about an optional mechanism to have the subscriber tell the message bus/receiver to retry the message? This could also tell the bus to retry just this specific handler. // in your handler/subscriber
try {
// ... some logic that throws an exception
} catch (\Throwable $e) {
throw new RetryException(__CLASS__, $e); // or instead of __CLASS__ we can get the class from the exception's trace?
} This is probably a discussion for another place as it is a more general retry - I can open an issue if we want to discuss further. |
I'd say so. Though a very good point; I'd be great if you can open an issue, describe the issue and, cherry on top, if you have an idea on how to implement, propose something :) |
624ae8d
to
4ae654b
Compare
I don't think that fabbot's PoV is relevant here :) |
Documentation PR is here: symfony/symfony-docs/pull/9756 |
private $amqpRetryExchange; | ||
|
||
/** | ||
* Available options: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With so many options, it's IMO better to use a configuration class. Then the code self-explaining and more explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it would mean that we have something creating the configuration object from the DSNs. I'd say that keeping these options as they are now is good and that we don't need more code to maintain just for this configuration details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it's fine like this (but not ideal, true). Otherwise we'll need something similar as the SecurityFactoryInterface
in order to make it extendable to any transport with similar use-cases.
src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpReceiver.php
Outdated
Show resolved
Hide resolved
Add some tests and remove the intrication between the receiver and the connection Configure each TTL individually and the dead routing key The `ttls` array is 0-indexed Update the retry based on feedback (`ttls` -> `ttl`, add options to documentation and normalises the default values) Catches failed retries and forward other messages' attributes
88594f9
to
d32d7b6
Compare
This pull-request is welcoming another round of review 💚 |
@sroze how can we programatically requeue message with specified delay from handler? For example for cases, when we cann't process this message now. |
In the given implementation we can't. I believe that this is out of scope for now anyway and that it will be handled with the AMQP component (#27140) which will provide more features such as this one 👍 |
@sroze Can we talk about this one this week IRL? |
@sroze do you think it would be possible to implement this in away so that it would standardize the retrying logic across the various transports? |
I've brought the AMQP retry logic from Sam and put it into #30557 - but also fitting it into a system where retrying has been made generic, so it can be implemented/included in future transports. I would LOVE review on that - I'm running full speed to get the features Messenger needs/deserves, but I need queue experts to help me avoid pitfalls. |
IMHO this one could be closed in favor or #30557 |
Closing in favor of #30557 |
… (weaverryan) This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Worker events + global retry functionality | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | yes, on Messenger only | Deprecations? | no | Tests pass? | NEEDED | Fixed tickets | #29132, #27008, #27215 and part of #30540 | License | MIT | Doc PR | TODO This is an alternative to #29132 and #27008. There are several big things: 1) The `messenger:consume` does not die if a handler has an error 2) Events are dispatched before, after and on error a message being handled 3) Logic is moved out of Amqp and into the Worker so that we can have some consistent features, like error handling. 4) A generic retry system was added, which works with Amqp and future transports should support. It will work out of the box for users. Retrying works by putting the received `Envelope` back into the bus, but with the `ReceivedStamp` removed. The retry functionality has an integration test for AMQP. 5) Added a new `MessageDecodingFailedException` that transport Serializers should throw if `decode()` fails. It allows us to reject a message in this situation, as allowing it to fail but remain on the queue causes it to be retried forever. 6) A new `DelayStamp` was added, which is the first of (later) more stamps for configuring the transport layer (see #30558). BC breaks are documented in the CHANGELOG. Thanks! Commits ------- a989384 Adding global retry support, events & more to messenger transport
…ndler (keulinho, sroze) This PR was merged into the 4.3-dev branch. Discussion ---------- [Messenger] Ensure message is handled only once per handler Add check to ensure that a message is only handled once per handler Add try...catch to run all handlers before throwing exception | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? |no | Tests pass? | yes | Fixed tickets | #27215 | License | MIT | Doc PR | Todo This would make error handling and retrying of messages much more easier. As statet here #27008 (comment) there is currently no way to retry a for all failed handlers if there are mutliple handlers and just some throw an exception. Also if an Exception in an handler occurs the execution chain is disrupted and the other handlers are never invoked. With this change it is easily possible to create an userland middleware that catches the `ChainedHandlerFailedException` and does some custom retry logic. If you ensure that the `HandledStamps` on the `Envelope` are preserved the message will be handled just by the failed handlers Commits ------- 2e5e910 Rename exception, add change log and a few other things e6e4cde Ensure message is handled only once per handler
One of the most asked features (and required to have an enterprise-grade management of errors). I chose the approach of an AmqpExt-only retry mechanism to ensure we have a great user experience with this one at least. There are discussions on-going (#26945) to have more generic things like that across adapters.
So what's the point?
When processing some messages, some error might happen. It might be that one of our 3rd party is down, that something is wrong with the last deployment, etc... All of these things will throw exceptions in the message handlers. The way of tackling this problem is very well described in @odolbeau's Swarrot Retry provider real-life example.
How to configure it
Same than for the rest, this can be configured both via the DSN or the
options
when creating the adapter. Here is a bunch of working DSNs with their corresponding effect: