-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Move container resetting after receiver acknowledging #43133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Waiting until the worker is completely done handling a message before actually resetting services sounds good to me. 👍 Things to keep in mind:
As for the implementation part, this could also benefit from #42335 to easily check the worker transport name. $event->getWorker()->getWorkerMetadata()->getTransportName() === $this->transportName; |
…kwinza) This PR was merged into the 5.4 branch. Discussion ---------- [Messenger] Add `WorkerMetadata` to `Worker` class. | Q | A | ------------- | --- | Branch? | 5.4 | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | Fixes #37736 | License | MIT | Doc PR | - At the moment, there is no clean way to access the values of `transportNames` or recently introduced `queueNames` that the worker was configured with, although such data might be quite useful for logging/monitoring or other tasks. This PR attempts to fix that by adding a new and extensible way to provide additional information about a particular `Worker` object. So far, the following PRs could benefit from this change: - #43133 - #42723 **Use case example:** ---- - As I developer - When a message was consumed from transport with name `async`. - And the worker state is `idle`. - Then I want to reset services. **Before this PR**, the only solution not relying on using Reflection API would look like this: ```php private $servicesResetter; private $receiversName; private $actualReceiverName = null; public function __construct(ServicesResetter $servicesResetter, array $receiversName) { $this->servicesResetter = $servicesResetter; $this->receiversName = $receiversName; } public function saveReceiverName(AbstractWorkerMessageEvent $event): void { $this->actualReceiverName = $event->getReceiverName(); } public function resetServices(WorkerRunningEvent $event): void { if (!$event->isWorkerIdle() && \in_array($this->actualReceiverName, $this->receiversName, true)) { $this->servicesResetter->reset(); } $this->actualReceiverName = null; } public static function getSubscribedEvents(): array { return [ WorkerMessageHandledEvent::class => ['saveReceiverName'], WorkerMessageFailedEvent::class => ['saveReceiverName'], WorkerRunningEvent::class => ['resetServices'], ]; } ``` **With this PR**, one could simply use this to retrieve the transport name. ```php $event->getWorker()->getWorkerMetadata()->getTransportName() === $this->transportName; ``` So the whole solution would look like this: ```php private $servicesResetter; private $receiversName; public function __construct(ServicesResetter $servicesResetter, array $receiversName) { $this->servicesResetter = $servicesResetter; $this->receiversName = $receiversName; } public function resetServices(WorkerRunningEvent $event): void { $actualTransportName = $event->getWorker()->getWorkerMetadata()->getTransportName(); if (!$event->isWorkerIdle() || !in_array($actualTransportName, $this->receiversName, true)) { return; } $this->servicesResetter->reset(); } public static function getSubscribedEvents(): array { return [ WorkerRunningEvent::class => ['resetServices'], ]; } ``` Commits ------- 583f85b [Messenger] Add WorkerMetadata to Worker class
I just merged #42335. Can you rebase to use the new feature? Thanks And please, fix fabpot issue :) |
@okwinza So it'll take a while to implement. |
What about moving the ack before the dispatch ? Edit: this is a bad idea. But I think the best solution is to create a new event (same for nack) |
We have the event already. We have metadata, transport names, and other things. I've been investigating the topic more. I've realized that resetting services is a worker's job rather than a transport's because service leak is a consumer problem and isn't dependent on transport. And there aren't any consumers on the "sync" transport. In my opinion, the best alternative is resetting services after every message. The only drawback is decreasing performance, so we can provide an option to disable it. It also will break compatibility, so we should enable that option by default and add an inversed one. I consider removing the |
987baef
to
214a63a
Compare
I've made a working draft as an example. If it's OK, I'll continue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it. But I realize we can simplify the work for end user.
We can move the command configuration to the bundle:
Like that, user don't have to update all consumer recipies (ansible, helm, etc). Only the code needs to be updated.
framework:
messenger:
reset_on_message: true # add a deprecation if not set
transports: ....
WDYT?
I agree with you on the fact that is should be the default, and I don't see a use case where a transport should be reset but not another one.
And BTW Fab wanted to reset the container all the time too: #41163 (comment)
src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Outdated
Show resolved
Hide resolved
Can't we just set it to |
it's not the best way to handle migration path.
do you have a use case for overriding the value? (less code == less bug) |
👍 Agreed.
Since container resetting affects conformance, it might make sense to be able to override it for additional performance gain in some particular consumers. But this is just an idea and it's totally out of scope of this PR and should be added later down the road, if at all. |
Okay let's go 👍 do you wanna to finish the pr ? Thanks for discussion 😊 |
@lyrixx
Absolutely! So we do not need a configuration option (in prospect). What we need are backward compatibility and migration strategy. Some users may use "service leak" as a side effect for their scenarios; however, it's a poor solution. So the decision we should make: how to migrate it. I think you are right about a deployment. A new plan:
6.0:
|
214a63a
to
baa42f9
Compare
baa42f9
to
b75dd03
Compare
b75dd03
to
fc85aef
Compare
I've realized that services must not be reset on idle. It's not optional because of counters and sequences that can be used. Possible memory leaks in transports should be fixed in transports. |
What are you refering to? I think something is not clear here. you (@upyx and @okwinza) talks about performance penalty. Even if this could be true, this is not always the case. I'm not sure, but it looks like you think all symfony (and app) services will be unset. This is not the case. Service will be reset, it means the For reference:
|
@lyrixx I think it would be good to add documentation that describes what the "services resetting" is and what it is for, how to use the |
if the sequence is in APCu or Redis, it's safe. IMHO, store a state between 2 messages in PHP memory (so in a service) is a bad idea. It does not work when a worker restart or when there are many worker. So this is not a valid use case since it's harmful. The plan you propose in this comment is really good. Let's stick to it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very good work. Thanks 👍🏼
Thank you @upyx. |
if (null === $config['reset_on_message']) { | ||
trigger_deprecation('symfony/framework-bundle', '5.4', 'Not setting the "framework.messenger.reset_on_message" configuration option is deprecated, it will default to "true" in version 6.0.'); | ||
|
||
$container->getDefinition('console.command.messenger_consume_messages')->replaceArgument(5, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will debug further and create a proper issue/PR later but this service may not exist:
Symfony\Component\DependencyInjection\Exception\ServiceNotFoundException: You have requested a non-existent service "console.command.messenger_consume_messages".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I've found where it is. It's possible when messenger
is installed without console
. I will fix it, but I don't know how to test that fix because of class_exists(Application::class)
expression.
yes, and a PR on the doc too :) |
…nsole (upyx) This PR was squashed before being merged into the 5.4 branch. Discussion ---------- Fix framework configuration when messenger uses without console | Q | A | ------------- | --- | Branch? | 5.4 | Bug fix? | yes | New feature? | no | Deprecations? | yes | Tickets | Fix #43133 (review) | License | MIT | Doc PR | later We hurried, an addition to #43133 It fixes #43133 (review) It adds the forgotten CHANGELOG & UPGRADE I'm not sure about the CHANGELOG format. I've done as people do but... [it says](https://symfony.com/doc/current/contributing/code/conventions.html#writing-a-changelog-entry) "New entries must be added on top of the list." However, nobody does it 🤷 ~I don't know how to test the fix, because of `class_exists()` call that cannot be easily mocked.~ `src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php:245`: ```php if (class_exists(Application::class)) { $loader->load('console.php'); /* ... */ } ``` Commits ------- d098ef8 Fix framework configuration when messenger uses without console
It fixes the #43114 issue and similar potential bugs. The
ResetServicesListener
lean on events sequence: theAbstractWorkerMessageEvent
to save the actual receiver name, then theWorkerRunningEvent
to check that name and reset the container. It may look fragile, so let's discuss it in the issue.