Skip to content

[Messenger] Batch handlers are not flushed on worker stop #49026

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
AdamKatzDev opened this issue Jan 18, 2023 · 4 comments
Closed

[Messenger] Batch handlers are not flushed on worker stop #49026

AdamKatzDev opened this issue Jan 18, 2023 · 4 comments

Comments

@AdamKatzDev
Copy link
Contributor

Symfony version(s) affected

5.4, 6.3

Description

The problem is that worker can't flush batch handlers without feeding a message into them.
Not much to write here without diving deep into details, see the sections below.
This bug can cause multiple hard to track issues in other parts of the system, that depends on the way the queue is processed.

How to reproduce

The bug can be reproduced with any batch handler that does not flush itself without shouldFlush being true.

Possible Solution

TLDR: to fix write a listener that flushes batch handlers and calls Worker::ack method on every WorkerRunningEvent(not required, see the explanation below)/WorkerStoppedEvent event.
Use a hack like https://ocramius.github.io/blog/accessing-private-php-class-members-without-reflection/ to access Worker::ack.

Currently the Worker class waits for new messages to flush handlers. In general it makes sense, there is no reason to flush batches if there are no new messages since the simplest shouldFlush method that checks current batch size won't change.
But the class also doesn't flush the handlers on stop. And that is an issue.

But there is another problem with the current approach.
For example if we have a handler with the following shouldFlush method:

private function shouldFlush(): bool
{
    return $this->batchSize <= \count($this->jobs) || random_int(0, 1) /* some external condition, e.g. timer */;
}

The handler implements shouldFlush method that depends on an external condition that could trigger flush. This might be useful for jobs that are processed in real time. In the current worker implementation the batch might be hanging in memory for a long time if the rate at which the processed messages are generated is low.

To fix the bug and also to enable the previously described use case a listener can be written that flushes batch handlers and calls Worker::ack method on every WorkerRunningEvent/WorkerStoppedEvent event. Alas, this is a hack and not a proper solution that might be integrated into the Messenger component.

Additional Context

No response

@AdamKatzDev
Copy link
Contributor Author

I've provided a workaround for this issue here. But as I mentioned this is not something that could be contributed as a PR.

@carsonbot
Copy link

Hey, thanks for your report!
There has not been a lot of activity here for a while. Is this bug still relevant? Have you managed to find a workaround?

@carsonbot
Copy link

Friendly reminder that this issue exists. If I don't hear anything I'll close this.

@carsonbot
Copy link

Hey,

I didn't hear anything so I'm going to close it. Feel free to comment if this is still relevant, I can always reopen!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants