Skip to content

[Messenger][AmqpExt] Add a retry mechanism for AMQP messages #27008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

sroze
Copy link
Contributor

@sroze sroze commented Apr 22, 2018

Q A
Branch? master
Bug fix? no
New feature? yes
BC breaks? no
Deprecations? no
Tests pass? yes
Fixed tickets ø
License MIT
Doc PR symfony/symfony-docs#9756

One of the most asked features (and required to have an enterprise-grade management of errors). I chose the approach of an AmqpExt-only retry mechanism to ensure we have a great user experience with this one at least. There are discussions on-going (#26945) to have more generic things like that across adapters.


So what's the point?

When processing some messages, some error might happen. It might be that one of our 3rd party is down, that something is wrong with the last deployment, etc... All of these things will throw exceptions in the message handlers. The way of tackling this problem is very well described in @odolbeau's Swarrot Retry provider real-life example.

How to configure it

Same than for the rest, this can be configured both via the DSN or the options when creating the adapter. Here is a bunch of working DSNs with their corresponding effect:

  1. Enable 3 retries before having an exception
amqp://guest:guest@localhost:5672/%2f/messages
    ?retry[attempts]=3
  1. Enable 3 retries before message to be queue in a "dead queue" (i.e. messages to be looked at manually later)
amqp://guest:guest@localhost:5672/%2f/messages
    ?retry[attempts]=3
    &retry[dead_queue]=dead_queue
  1. Enable 3 retries, dead queue and custom settings
amqp://guest:guest@localhost:5672/%2f/messages
    ?retry[attempts]=3
    &retry[exchange_name]=retry
    &retry[routing_key_pattern]=retry_attempt_%25attempt%25
    &retry[queue_name_pattern]=retry_queue_%attempt%
    &retry[dead_queue]=dead_queue
    &retry[dead_routing_key]=dead_routing_key
  1. Enable 3 retries, with custom TTLs per attempt. Waits 10 seconds before retrying for the first attempt, 30 seconds for the 2nd attempt and 5 minutes for the 3rd one.
amqp://guest:guest@localhost:5672/%2f/messages
    ?retry[attempts]=3
    &retry[ttl][0]=10000
    &retry[ttl][1]=30000
    &retry[ttl][2]=300000

$queue->setName(str_replace('%attempt%', $attemptNumber, $name));
// $queue->setFlags(AMQP_DURABLE);
$queue->setArguments(array(
'x-message-ttl' => 30000, // 30 seconds
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TTL should be configurable per attempt number.

$exchange = $this->amqpFactory->createExchange($this->channel());
$exchange->setName($retryConfiguration['name'] ?? 'retry');
$exchange->setType(AMQP_EX_TYPE_DIRECT);
// $exchange->setFlags(AMQP_DURABLE);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be removed.

{
$queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace('%attempt%', $attemptNumber, $name));
// $queue->setFlags(AMQP_DURABLE);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be removed.

return false;
}

$routingKey = 'dead';
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be a configurable dead_routing_key

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dead is a very strong word for humans. Could we use expired instead? Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that’s the term used by AMQP unfortunately. Maybe too_many_attempts if really you want to change? 🤔

$queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName($retryConfiguration['dead_queue']);
$queue->declareQueue();
$queue->bind($exchange->getName(), 'dead');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be using the dead_routing_key configuration.

@sroze sroze added this to the 4.1 milestone Apr 23, 2018
Copy link
Member

@lyrixx lyrixx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, this really need to be tested against a real implementation of AMQP (RabbitMQ).

I read very quickly the code and it seems this does not work well. I guess I should test it (by hand) to ensure work, but it looks like many case are not supported:

  • retrying a message that come from an exchange in a fan out mode => the message will be dispatched is all queue that are bound. It should not
  • retrying a message that come from an exchange in an header mode => the message will not be dispatched at all.

Then, I would be nice to be able to configure the type of retry strategy (linear / backoff), and offset (the 3/4 first iteration may be too close, and so we want to skip it) and of course the delay

Another things: all header should be casted to string.
From PHP, with the PECL, it's not really possible to create a typed header (in AMQP you can tell if the header is a string or an int). So it's better to use string everywhere to avoid edge case. I some version of rabbitmq, we hit this issue. The exchange were created with "string" type. But the message where created with "int" type. So there were not matches. Really hard to find bugs ;)


Edited: I forgot to say, you can grad some code from #23315 where the retry mechanisme is really advanced / configurable. But the more I read code about Messenger, The more I think we should reconsider #23315. How I see things: I should remove everything about worker, keep only AMQP stuff, and update the Messenger to use AMQP. (ping @fabpot)

$queue = $this->amqpFactory->createQueue($this->channel());
$queue->setName(str_replace('%attempt%', $attemptNumber, $retryConfiguration['queue_name_pattern'] ?? 'retry_queue_%attempt%'));
$queue->setArguments(array(
'x-message-ttl' => $retryConfiguration['ttls'][$attemptNumber - 1] ?? 30000, // 30 seconds by default
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you add a s in the key name ?

@sroze
Copy link
Contributor Author

sroze commented Apr 23, 2018

retrying a message that come from an exchange in a fan out mode => the message will be dispatched is all queue that are bound. It should not

That's the setup I used to test this retry mechanism and it works well. I'd love you to try and see really in which case it doesn't work :)

I think we should reconsider #23315. How I see things: I should remove everything about worker, keep only AMQP stuff, and update the Messenger to use AMQP.

If you believe you can't make the AMQP adapter much better by doing so, it is probably an interesting option (lots of work on your side though 😄).

Though, there is a point in keeping the AMQP adapter simple and not having it "fully blown featured". Obviously, there is the maintenance cost point. But also, most people will use the default auto-setuped queues, right? I also believe that everybody willing to configure the queues and exchanges very precisely will very rarely be fully satisfied by auto-configuration (regardless of the amount of effort we put into it) and end up configuring their AMQP bits manually, that's what everybody has been doing so far at the end, and this retry mechanism PR allows you to configure the routing key pattern without auto-configuring it, so this use-case would work.

@kbond
Copy link
Member

kbond commented Apr 26, 2018

Question about retries in general: If a consumed message is handled by multiple handlers but only one throws an exception - how do you handle this? It probably isn't desirable to retry the message on handlers that were successful.

@nicolas-grekas
Copy link
Member

Rebase needed after #27129

@nicolas-grekas nicolas-grekas changed the base branch from master to 4.1 May 7, 2018 15:06
@sroze
Copy link
Contributor Author

sroze commented May 9, 2018

If a consumed message is handled by multiple handlers but only one throws an exception - how do you handle this? It probably isn't desirable to retry the message on handlers that were successful.

That's a good question. You are right, it might not be desirable to retry on successful handlers but I believe that's the only (without a crazy amount of code) way to make it simple. On the other hand, I'd argue that something "critical" (a command in the CQRS point-of-view) will only have one handler. The things that will have multiple handlers are basically event subscribers and will be projections (or similar) and there should be no problem with having them running multiple times.

@sroze sroze force-pushed the retry-amqp-messages branch 2 times, most recently from ca47435 to dfdb38c Compare May 9, 2018 08:27
@sroze
Copy link
Contributor Author

sroze commented May 9, 2018

PR rebased.

@sroze
Copy link
Contributor Author

sroze commented May 9, 2018

I suggest we go ahead with this simple (optional) retry implementation for 4.1 and replace it with the one in the AMQP component later when introduced.

ogizanagi
ogizanagi previously approved these changes May 9, 2018
Copy link
Contributor

@ogizanagi ogizanagi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not much to say. Looks good to me.

}

$maximumAttempts = $retryConfiguration['attempts'] ?? 3;
$routingKey = str_replace('%attempt%', $attemptNumber, $retryConfiguration['routing_key_pattern'] ?? 'attempt_%attempt%');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we normalize all the options values before using it, in the constructor for instance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it's a good idea.

@kbond
Copy link
Member

kbond commented May 9, 2018

@sroze another potential problem could be an exception being thrown at a point where retrying the message would cause additional problems.

What about an optional mechanism to have the subscriber tell the message bus/receiver to retry the message? This could also tell the bus to retry just this specific handler.

// in your handler/subscriber
try {
    // ... some logic that throws an exception
} catch (\Throwable $e) {
    throw new RetryException(__CLASS__, $e); // or instead of __CLASS__ we can get the class from the exception's trace?
}

This is probably a discussion for another place as it is a more general retry - I can open an issue if we want to discuss further.

@sroze
Copy link
Contributor Author

sroze commented May 9, 2018

This is probably a discussion for another place as it is a more general retry - I can open an issue if we want to discuss further.

I'd say so. Though a very good point; I'd be great if you can open an issue, describe the issue and, cherry on top, if you have an idea on how to implement, propose something :)

@sroze sroze force-pushed the retry-amqp-messages branch 2 times, most recently from 624ae8d to 4ae654b Compare May 10, 2018 15:13
@sroze
Copy link
Contributor Author

sroze commented May 10, 2018

I don't think that fabbot's PoV is relevant here :)

@sroze
Copy link
Contributor Author

sroze commented May 10, 2018

Documentation PR is here: symfony/symfony-docs/pull/9756

private $amqpRetryExchange;

/**
* Available options:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With so many options, it's IMO better to use a configuration class. Then the code self-explaining and more explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it would mean that we have something creating the configuration object from the DSNs. I'd say that keeping these options as they are now is good and that we don't need more code to maintain just for this configuration details.

Copy link
Contributor

@ogizanagi ogizanagi May 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think it's fine like this (but not ideal, true). Otherwise we'll need something similar as the SecurityFactoryInterface in order to make it extendable to any transport with similar use-cases.

@sroze sroze added this to the next milestone May 21, 2018
sroze added 2 commits August 12, 2018 18:28
Add some tests and remove the intrication between the receiver and the connection
Configure each TTL individually and the dead routing key
The `ttls` array is 0-indexed
Update the retry based on feedback (`ttls` -> `ttl`, add options to documentation and normalises the default values)
Catches failed retries and forward other messages' attributes
@sroze sroze force-pushed the retry-amqp-messages branch from 88594f9 to d32d7b6 Compare August 12, 2018 17:32
@sroze sroze requested review from dunglas and xabbuh as code owners August 12, 2018 17:32
@sroze sroze changed the base branch from 4.1 to master August 12, 2018 17:32
@sroze
Copy link
Contributor Author

sroze commented Aug 12, 2018

This pull-request is welcoming another round of review 💚

@Koc
Copy link
Contributor

Koc commented Aug 12, 2018

@sroze how can we programatically requeue message with specified delay from handler? For example for cases, when we cann't process this message now.

@sroze
Copy link
Contributor Author

sroze commented Aug 24, 2018

can we programatically requeue message with specified delay from handler?

In the given implementation we can't. I believe that this is out of scope for now anyway and that it will be handled with the AMQP component (#27140) which will provide more features such as this one 👍

@fabpot
Copy link
Member

fabpot commented Oct 10, 2018

@sroze Can we talk about this one this week IRL?

@ragboyjr
Copy link
Contributor

ragboyjr commented Dec 9, 2018

@sroze do you think it would be possible to implement this in away so that it would standardize the retrying logic across the various transports?

@weaverryan
Copy link
Member

I've brought the AMQP retry logic from Sam and put it into #30557 - but also fitting it into a system where retrying has been made generic, so it can be implemented/included in future transports.

I would LOVE review on that - I'm running full speed to get the features Messenger needs/deserves, but I need queue experts to help me avoid pitfalls.

@lyrixx
Copy link
Member

lyrixx commented Mar 16, 2019

IMHO this one could be closed in favor or #30557

@fabpot
Copy link
Member

fabpot commented Mar 17, 2019

Closing in favor of #30557

@fabpot fabpot closed this Mar 17, 2019
fabpot added a commit that referenced this pull request Mar 23, 2019
… (weaverryan)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Worker events + global retry functionality

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | yes, on Messenger only
| Deprecations? | no
| Tests pass?   | NEEDED
| Fixed tickets | #29132, #27008, #27215 and part of #30540
| License       | MIT
| Doc PR        | TODO

This is an alternative to #29132 and #27008. There are several big things:

1) The `messenger:consume` does not die if a handler has an error
2) Events are dispatched before, after and on error a message being handled
3) Logic is moved out of Amqp and into the Worker so that we can have some consistent features, like error handling.
4) A generic retry system was added, which works with Amqp and future transports should support.
 It will work out of the box for users. Retrying works by putting the received `Envelope` back into the bus, but with the `ReceivedStamp` removed. The retry functionality has an integration test for AMQP.
5) Added a new `MessageDecodingFailedException` that transport Serializers should throw if `decode()` fails. It allows us to reject a message in this situation, as allowing it to fail but remain on the queue causes it to be retried forever.
6) A new `DelayStamp` was added, which is the first of (later) more stamps for configuring the transport layer (see #30558).

BC breaks are documented in the CHANGELOG.

Thanks!

Commits
-------

a989384 Adding global retry support, events & more to messenger transport
fabpot added a commit that referenced this pull request Apr 6, 2019
…ndler (keulinho, sroze)

This PR was merged into the 4.3-dev branch.

Discussion
----------

[Messenger] Ensure message is handled only once per handler

Add check to ensure that a message is only handled once per handler
Add try...catch to run all handlers before throwing exception

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? |no
| Tests pass?   | yes
| Fixed tickets | #27215
| License       | MIT
| Doc PR        | Todo

This would make error handling and retrying of messages much more easier. As statet  here #27008 (comment) there is currently no way to retry a for all failed handlers if there are mutliple handlers and just some throw an exception.
Also if an Exception in an handler occurs the execution chain is disrupted and the other handlers are never invoked.
With this change it is easily possible to create an userland middleware that catches the `ChainedHandlerFailedException` and does some custom retry logic. If you ensure that the `HandledStamps` on the `Envelope` are preserved the message will be handled just by the failed handlers

Commits
-------

2e5e910 Rename exception, add change log and a few other things
e6e4cde Ensure message is handled only once per handler
@nicolas-grekas nicolas-grekas modified the milestones: next, 4.3 Apr 30, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.