Skip to content

[Messenger] Move container resetting after receiver acknowledging #43133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 27, 2021

Conversation

upyx
Copy link
Contributor

@upyx upyx commented Sep 22, 2021

Q A
Branch? 5.4
Bug fix? yes
New feature? no
Deprecations? no
Tickets Fix #43114
License MIT
Doc PR nope

It fixes the #43114 issue and similar potential bugs. The ResetServicesListener lean on events sequence: the AbstractWorkerMessageEvent to save the actual receiver name, then the WorkerRunningEvent to check that name and reset the container. It may look fragile, so let's discuss it in the issue.

@okwinza
Copy link
Contributor

okwinza commented Sep 22, 2021

Waiting until the worker is completely done handling a message before actually resetting services sounds good to me. 👍

Things to keep in mind:

  • Services should only be reset after the worker is done with a message.
  • Services should not be reset more once between each message.

As for the implementation part, this could also benefit from #42335 to easily check the worker transport name.
So you could just do something like this:

$event->getWorker()->getWorkerMetadata()->getTransportName() === $this->transportName;

lyrixx added a commit that referenced this pull request Sep 22, 2021
…kwinza)

This PR was merged into the 5.4 branch.

Discussion
----------

[Messenger] Add `WorkerMetadata` to `Worker` class.

| Q             | A
| ------------- | ---
| Branch?       | 5.4
| Bug fix?      | no
| New feature?  | yes
| Deprecations? | no
| Tickets       | Fixes #37736
| License       | MIT
| Doc PR        | -

At the moment, there is no clean way to access the values of `transportNames` or recently introduced `queueNames` that the worker was configured with, although such data might be quite useful for logging/monitoring or other tasks.

This PR attempts to fix that by adding a new and extensible way to provide additional information about a particular `Worker` object.

So far, the following PRs could benefit from this change:

- #43133
- #42723

**Use case example:**
----

- As I developer
- When a message was consumed from transport with name `async`.
- And the worker state is `idle`.
- Then I want to reset services.

**Before this PR**, the only solution not relying on using Reflection API would look like this:
```php
    private $servicesResetter;
    private $receiversName;
    private $actualReceiverName = null;

    public function __construct(ServicesResetter $servicesResetter, array $receiversName)
    {
        $this->servicesResetter = $servicesResetter;
        $this->receiversName = $receiversName;
    }

    public function saveReceiverName(AbstractWorkerMessageEvent $event): void
    {
        $this->actualReceiverName = $event->getReceiverName();
    }

    public function resetServices(WorkerRunningEvent $event): void
    {
        if (!$event->isWorkerIdle() && \in_array($this->actualReceiverName, $this->receiversName, true)) {
            $this->servicesResetter->reset();
        }

        $this->actualReceiverName = null;
    }

    public static function getSubscribedEvents(): array
    {
        return [
            WorkerMessageHandledEvent::class => ['saveReceiverName'],
            WorkerMessageFailedEvent::class => ['saveReceiverName'],
            WorkerRunningEvent::class => ['resetServices'],
        ];
    }
```
**With this PR**, one could simply use this to retrieve the transport name.
```php
$event->getWorker()->getWorkerMetadata()->getTransportName() === $this->transportName;
```
So the whole solution would look like this:
```php
    private $servicesResetter;
    private $receiversName;

    public function __construct(ServicesResetter $servicesResetter, array $receiversName)
    {
        $this->servicesResetter = $servicesResetter;
        $this->receiversName = $receiversName;
    }

    public function resetServices(WorkerRunningEvent $event): void
    {
        $actualTransportName = $event->getWorker()->getWorkerMetadata()->getTransportName();
        if (!$event->isWorkerIdle() || !in_array($actualTransportName, $this->receiversName, true)) {
            return;
        }

        $this->servicesResetter->reset();
    }

    public static function getSubscribedEvents(): array
    {
        return [
            WorkerRunningEvent::class => ['resetServices'],
        ];
    }
```

Commits
-------

583f85b [Messenger] Add WorkerMetadata to Worker class
@lyrixx
Copy link
Member

lyrixx commented Sep 22, 2021

I just merged #42335. Can you rebase to use the new feature? Thanks

And please, fix fabpot issue :)

@upyx
Copy link
Contributor Author

upyx commented Sep 24, 2021

$event->getWorker()->getWorkerMetadata()->getTransportName() === $this->transportName;

@okwinza
It won't work as expected because the WorkerMetadata has an array of transport names. The part of transports can be not resettable. We can either restrict using resettable and not resettable transports in the same worker or rely on the event's sequence. I'd prefer to use new metadata, and I think it will be good to restrict resetting services on a receiver when the different one doesn't expect that behavior.

So it'll take a while to implement.

@lyrixx
Copy link
Member

lyrixx commented Sep 24, 2021

What about moving the ack before the dispatch ?

Edit: this is a bad idea. But I think the best solution is to create a new event (same for nack)

@upyx
Copy link
Contributor Author

upyx commented Sep 24, 2021

@lyrixx:

What about moving the ack before the dispatch ?

Edit: this is a bad idea. But I think the best solution is to create a new event (same for nack)

We have the event already. We have metadata, transport names, and other things.

I've been investigating the topic more. I've realized that resetting services is a worker's job rather than a transport's because service leak is a consumer problem and isn't dependent on transport. And there aren't any consumers on the "sync" transport.

In my opinion, the best alternative is resetting services after every message. The only drawback is decreasing performance, so we can provide an option to disable it. It also will break compatibility, so we should enable that option by default and add an inversed one.

I consider removing the reset_on_message parameter from a configuration and adding two options to the messenger:consume command: --reset-services-on-message and --no-reset-services-on-message. It's also good to deprecate running the command without explicitly indicating which one to use and change defaults in 6.0.

@upyx upyx marked this pull request as draft September 24, 2021 21:15
@upyx upyx force-pushed the fix-container-resetting branch 2 times, most recently from 987baef to 214a63a Compare September 24, 2021 23:31
@upyx
Copy link
Contributor Author

upyx commented Sep 24, 2021

I've made a working draft as an example. If it's OK, I'll continue.

Copy link
Member

@lyrixx lyrixx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. But I realize we can simplify the work for end user.
We can move the command configuration to the bundle:
Like that, user don't have to update all consumer recipies (ansible, helm, etc). Only the code needs to be updated.

framework:
    messenger:
        reset_on_message: true # add a deprecation if not set
        transports: ....

WDYT?

I agree with you on the fact that is should be the default, and I don't see a use case where a transport should be reset but not another one.

And BTW Fab wanted to reset the container all the time too: #41163 (comment)

@okwinza
Copy link
Contributor

okwinza commented Sep 25, 2021

framework:
    messenger:
        reset_on_message: true # add a deprecation if not set
        transports: ....

Can't we just set it to false by default for BC and then change it to true and make it mandatory in 6.0?
Also it might be useful to add a runtime --no-reset option to messenger:consume as well.
WDYT?

@lyrixx
Copy link
Member

lyrixx commented Sep 25, 2021

Can't we just set it to false by default for BC and then change it to true and make it mandatory in 6.0?

it's not the best way to handle migration path.
5.4: deprecation if not set, telling it'll be true in 6.0, default to false for BC
6.0: default to true, no deprecation, optin

Also it might be useful to add a runtime --no-reset option to messenger:consume as well.

do you have a use case for overriding the value? (less code == less bug)

@okwinza
Copy link
Contributor

okwinza commented Sep 25, 2021

5.4: deprecation if not set, telling it'll be true in 6.0, default to false for BC
6.0: default to true, no deprecation, optin

👍 Agreed.

Also it might be useful to add a runtime --no-reset option to messenger:consume as well.

do you have a use case for overriding the value? (less code == less bug)

Since container resetting affects conformance, it might make sense to be able to override it for additional performance gain in some particular consumers.

But this is just an idea and it's totally out of scope of this PR and should be added later down the road, if at all.

@lyrixx
Copy link
Member

lyrixx commented Sep 25, 2021

Okay let's go 👍 do you wanna to finish the pr ?

Thanks for discussion 😊

@upyx
Copy link
Contributor Author

upyx commented Sep 25, 2021

@lyrixx
The only use case for disabling service resetting is to gain performance for specific consumers; therefore, the option should be in a consumer but not in configuration.

less code == less bug

Absolutely! So we do not need a configuration option (in prospect). What we need are backward compatibility and migration strategy. Some users may use "service leak" as a side effect for their scenarios; however, it's a poor solution. So the decision we should make: how to migrate it.

I think you are right about a deployment. A new plan:
5.4:

  • add the messenger.reset_on_message config with default false
  • deprecate empty messenger.reset_on_message with hint about defaults
  • add --no-reset (or --no-reset-services or --no-reset-services-on-message ?) option to messenger:consume command.

6.0:

  • change default of messenger.reset_on_message to true
  • deprecate messenger.reset_on_message to remove it in 7.0

@lyrixx @okwinza do you agree?

@upyx upyx force-pushed the fix-container-resetting branch from 214a63a to baa42f9 Compare September 25, 2021 18:11
@upyx upyx marked this pull request as ready for review September 25, 2021 18:25
@upyx upyx force-pushed the fix-container-resetting branch from baa42f9 to b75dd03 Compare September 25, 2021 18:25
@upyx upyx force-pushed the fix-container-resetting branch from b75dd03 to fc85aef Compare September 26, 2021 13:45
@upyx
Copy link
Contributor Author

upyx commented Sep 26, 2021

I've realized that services must not be reset on idle. It's not optional because of counters and sequences that can be used. Possible memory leaks in transports should be fixed in transports.

@lyrixx
Copy link
Member

lyrixx commented Sep 27, 2021

I've realized that services must not be reset on idle. It's not optional because of counters and sequences that can be used.

What are you refering to?

I think something is not clear here. you (@upyx and @okwinza) talks about performance penalty. Even if this could be true, this is not always the case. I'm not sure, but it looks like you think all symfony (and app) services will be unset. This is not the case. Service will be reset, it means the ServicesResetter service will take all services tagged kernel.reset and call reset() method on it. On the current application I'm working one (SF 5.3, APIP, Doctrine). There are 41 services impacted in dev.

For reference:

 ---------------------------------------------------------- ---------------------- ----------------------------------------------------------------------------------------
  Service ID                                                 method                 Class name                                                                             
 ---------------------------------------------------------- ---------------------- ----------------------------------------------------------------------------------------
  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx        reset                  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx        reset                  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx        reset                  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx        reset                  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx        reset                  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx        reset                  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx        reset                  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx        reset                  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
  Symfony\Bridge\Monolog\Processor\ConsoleCommandProcessor   reset                  Symfony\Bridge\Monolog\Processor\ConsoleCommandProcessor
  Symfony\Component\Cache\Psr16Cache                         reset                  Symfony\Component\Cache\Psr16Cache
  cache.app                                                  reset                  Symfony\Component\Cache\Adapter\FilesystemAdapter
  cache.system                                               reset                  Symfony\Component\Cache\Adapter\AdapterInterface
  cache.validator                                            reset                  Symfony\Component\Cache\Adapter\AdapterInterface
  cache.serializer                                           reset                  Symfony\Component\Cache\Adapter\AdapterInterface
  cache.annotations                                          reset                  Symfony\Component\Cache\Adapter\AdapterInterface
  cache.property_info                                        reset                  Symfony\Component\Cache\Adapter\AdapterInterface
  cache.messenger.restart_workers_signal                     reset                  Symfony\Component\Cache\Adapter\FilesystemAdapter
  agent_rules.cache                                          reset                  Symfony\Component\Cache\Adapter\RedisAdapter
  cache.array                                                reset                  Symfony\Component\Cache\Adapter\ArrayAdapter
  doctrine.result_cache_pool                                 reset                  Symfony\Component\Cache\Adapter\FilesystemAdapter
  doctrine.system_cache_pool                                 reset                  Symfony\Component\Cache\Adapter\AdapterInterface
  form.choice_list_factory.cached                            reset                  Symfony\Component\Form\ChoiceList\Factory\CachingFactoryDecorator
  messenger.transport.in_memory.factory                      reset                  Symfony\Component\Messenger\Transport\InMemoryTransportFactory
  cache.validator_expression_language                        reset                  Symfony\Component\Cache\Adapter\AdapterInterface
  debug.stopwatch                                            reset                  Symfony\Component\Stopwatch\Stopwatch
  security.token_storage                                     disableUsageTracking   Symfony\Component\Security\Core\Authentication\Token\Storage\UsageTrackingTokenStorage
   (same service as previous, another tag)                   setToken
  cache.security_expression_language                         reset                  Symfony\Component\Cache\Adapter\AdapterInterface
  monolog.handler.main                                       reset                  Monolog\Handler\FingersCrossedHandler
  monolog.handler.console                                    reset                  Symfony\Bridge\Monolog\Handler\ConsoleHandler
  webpack_encore.tag_renderer                                reset                  Symfony\WebpackEncoreBundle\Asset\TagRenderer
  cache.webpack_encore                                       reset                  Symfony\Component\Cache\Adapter\AdapterInterface
  webpack_encore.entrypoint_lookup[_default]                 reset                  Symfony\WebpackEncoreBundle\Asset\EntrypointLookup
  doctrine                                                   reset                  Doctrine\Bundle\DoctrineBundle\Registry
  form.type.entity                                           reset                  Symfony\Bridge\Doctrine\Form\Type\EntityType
  cache.easyadmin                                            reset                  Symfony\Component\Cache\Adapter\AdapterInterface
  api_platform.cache.route_name_resolver                     reset                  Symfony\Component\Cache\Adapter\AdapterInterface
  api_platform.cache.identifiers_extractor                   reset                  Symfony\Component\Cache\Adapter\AdapterInterface
  api_platform.cache.subresource_operation_factory           reset                  Symfony\Component\Cache\Adapter\AdapterInterface
  api_platform.cache.metadata.resource                       reset                  Symfony\Component\Cache\Adapter\AdapterInterface
  api_platform.cache.metadata.property                       reset                  Symfony\Component\Cache\Adapter\AdapterInterface
 ---------------------------------------------------------- ---------------------- ----------------------------------------------------------------------------------------

@upyx
Copy link
Contributor Author

upyx commented Sep 27, 2021

@lyrixx
I meant that what services do on reset. It can be a custom service that increases some counter on every message. For example, Monolog's UidProcessor generates random UID on every reset. I'm not too fond of randoms (because of collisions), so I'd like to store a sequence in APCu or Redis. It isn't a hot queue (5 messages per day), and I'd like to make numbers as short as possible, and the sequence is short. So forwarding sequence on idle would quickly lead to sequence overflow. It's a simplified example just for clarity. Performance isn't an issue when the queue is idle. We mustn't reset services on idle not because of the cost but because of side effects.

I think it would be good to add documentation that describes what the "services resetting" is and what it is for, how to use the kernel.reset tag and the ResetInterface, where to enable autoconfigure, and where don't.

@lyrixx
Copy link
Member

lyrixx commented Sep 27, 2021

so I'd like to store a sequence in APCu or Redis

if the sequence is in APCu or Redis, it's safe.

IMHO, store a state between 2 messages in PHP memory (so in a service) is a bad idea. It does not work when a worker restart or when there are many worker.

So this is not a valid use case since it's harmful.

The plan you propose in this comment is really good. Let's stick to it.

Copy link
Member

@lyrixx lyrixx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good work. Thanks 👍🏼

@fabpot
Copy link
Member

fabpot commented Sep 27, 2021

Thank you @upyx.

@fabpot fabpot merged commit 8653b33 into symfony:5.4 Sep 27, 2021
@upyx
Copy link
Contributor Author

upyx commented Sep 27, 2021

@lyrixx @fabpot you are welcome 🤝 🤝

Should I make another PR to the 6.0 branch in which deprecations are removed?

if (null === $config['reset_on_message']) {
trigger_deprecation('symfony/framework-bundle', '5.4', 'Not setting the "framework.messenger.reset_on_message" configuration option is deprecated, it will default to "true" in version 6.0.');

$container->getDefinition('console.command.messenger_consume_messages')->replaceArgument(5, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will debug further and create a proper issue/PR later but this service may not exist:

Symfony\Component\DependencyInjection\Exception\ServiceNotFoundException: You have requested a non-existent service "console.command.messenger_consume_messages".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I've found where it is. It's possible when messenger is installed without console. I will fix it, but I don't know how to test that fix because of class_exists(Application::class) expression.

@lyrixx
Copy link
Member

lyrixx commented Sep 27, 2021

Should I make another PR to the 6.0 branch in which deprecations are removed?

yes, and a PR on the doc too :)

@upyx
Copy link
Contributor Author

upyx commented Sep 27, 2021

Should I make another PR to the 6.0 branch in which deprecations are removed?

yes, and a PR on the doc too :)

#43202 has some fixes
#43203 is a draft for 6.0

fabpot added a commit that referenced this pull request Sep 29, 2021
…nsole (upyx)

This PR was squashed before being merged into the 5.4 branch.

Discussion
----------

Fix framework configuration when messenger uses without console

| Q             | A
| ------------- | ---
| Branch?       | 5.4
| Bug fix?      | yes
| New feature?  | no
| Deprecations? | yes
| Tickets       | Fix #43133 (review)
| License       | MIT
| Doc PR        |  later

We hurried, an addition to #43133

It fixes #43133 (review)
It adds the forgotten CHANGELOG & UPGRADE

I'm not sure about the CHANGELOG format. I've done as people do but... [it says](https://symfony.com/doc/current/contributing/code/conventions.html#writing-a-changelog-entry) "New entries must be added on top of the list." However, nobody does it 🤷

~I don't know how to test the fix, because of `class_exists()` call that cannot be easily mocked.~

`src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php:245`:
```php
if (class_exists(Application::class)) {
    $loader->load('console.php');
    /* ... */
}

```

Commits
-------

d098ef8 Fix framework configuration when messenger uses without console
@upyx upyx deleted the fix-container-resetting branch October 6, 2021 22:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Too early resetting services in messenger
6 participants