-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Add a way to no ack message automatically #42873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
89bb008
to
682e7ca
Compare
Hey! I think @tyx has recently worked with this code. Maybe they can help review this? Cheers! Carsonbot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this feature!.
Beware in your example, flushing in not an option on consumer stop
-WorkerStoppedEvent::class => 'flushIfNeeded',
+WorkerStoppedEvent::class => 'forceFlush',
src/Symfony/Component/Messenger/Handler/ConfigureAutoAckInterface.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php
Outdated
Show resolved
Hide resolved
682e7ca
to
a77c2c5
Compare
src/Symfony/Component/Messenger/Handler/ConfigureAutoAckInterface.php
Outdated
Show resolved
Hide resolved
a77c2c5
to
934dde6
Compare
Hello @symfony/mergers The initial issue got 8 👍🏼 and, this PR got 5 👍🏼 and @jderusse liked this feature. So without feedback, I'm planning to merge this PR by the end of the month, before the feature freeze. |
934dde6
to
4b255b0
Compare
I have rebased the PR (to fix conflict on CHANGELOG.md file) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes handlers aware of the transport layer, which is something that we refused to do as of now, see eg #42005.
By doing so, this approach is also giving the responsibility of buffering to handlers. I'm wondering if handlers are the right place to do that. I don't think so for now.
Instead, what about adding a middleware that does the buffering, and communicates with the worker via a stamp as done here? Buffering would be enabled/disabled by either setting a limit on the number of messages per batch, or by injecting a buffering decision strategy if we want something super flexible.
BTW, how are we sure that messages are flushed eventually, when we don't know when messages can arrive? Should we inject a delayed message that would trigger the flush if nothing happened before it?
About naming, "DelayedAckStamp" made me think: "delayed? when?". Maybe this should be named "DisableAutoAckStamp"?
Hmm, indeed it might be possible. But ATM, I fail to see how the middleware could actually call the "code that process the buffer". Should we introduce a new method (via an interface) on the handler (Like it's done on monolog handler)? So basically, you want me to move the the
As you can see in the PR body, user need to register also a listener but a timer to flush after a configured time. But this could leave in the middleware
I agree 👍🏼 But I see a drawback with your idea: a new middleware implies a new bus because one doesn't want to buffer all message: https://symfony.com/doc/current/messenger.html#middleware So publishing a message will be more complexe because one will have to choose the right buses when dispatching the message. IMHO, it's error prone. So you want to move the "handlers [is] aware of the transport layer" to the publisher is aware of the transport layer. Is it really better? |
can't the middleware call the handler many times in a loop, once per buffered message?
I think we won't need any new interface if we do this. The stamp might be enough.
we could have a config option that gives the size of the buffer, and if the size is > 1, then we add this middleware? (or we remove it if it's <= 1? or we make it a no-op if we can't skip registering it). |
nope, because you don't know how it's the "last message". You need all messages at once.
You don't have access to the stamp in the handler. Adding it is what you don't want.
I don't understand. Middlerware a setup for a busses. not for a transport or handler. See the doc to better understand. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the PR description:
__invoke(HelloMessage $message, Envelope $envelope)
the envelope is not passed as 2nd arg so this doesn't work it would be in this PR :)
to be well thought as we rejected doing so in the past
$envelope->last(ReceivedStamp::class)->getTransportName() === 'async';
hardcoding the name of the transport in the class should be really avoided
I'm putting this on hold while looking for alternative approaches.
Here is another proposal on the topic: #43354 |
…las-grekas) This PR was merged into the 5.4 branch. Discussion ---------- [Messenger] allow processing messages in batches | Q | A | ------------- | --- | Branch? | 5.4 | Bug fix? | no | New feature? | yes | Deprecations? | no | Tickets | #36910 | License | MIT | Doc PR | - This replaces #42873 as it proposes an alternative approach to handling messages in batch. `BatchHandlerInterface` says it all: if a handler implements this interface, then it should expect a new `$ack` optional argument to be provided when `__invoke()` is called. When `$ack` is not provided, `__invoke()` is expected to handle the message synchronously as usual. But when `$ack` is provided, `__invoke()` is expected to buffer the message and its `$ack` function, and to return the number of pending messages in the batch. Batch handlers are responsible for deciding when they flush their buffers, calling the `$ack` functions while doing so. Best reviewed [ignoring whitespaces](https://github.com/symfony/symfony/pull/43354/files?w=1). Here is what a batch handler might look like: ```php class MyBatchHandler implements BatchHandlerInterface { use BatchHandlerTrait; public function __invoke(MyMessage $message, Acknowledger $ack = null) { return $this->handle($message, $ack); } private function process(array $jobs): void { foreach ($jobs as [$job, $ack]) { try { // [...] compute $result from $job $ack->ack($result); } catch (\Throwable $e) { $ack->nack($e); } } } } ``` By default, `$jobs` contains the messages to handle, but it can be anything as returned by `BatchHandlerTrait::schedule()` (eg a Symfony HttpClient response derived from the message, a promise, etc.). The size of the batch is controlled by `BatchHandlerTrait::shouldProcess()` (defaults to 10). The transport is acknowledged in batch, *after* the bus returned from dispatching (unlike what is done in #42873). This is especially important when considering transactions since we don't want to ack unless the transaction committed successfully. By default, pending batches are flushed when the worker is idle and when it stops. Commits ------- 81e52b2 [Messenger] allow processing messages in batches
This PR add a way to manually controller auto ACK from the handler.
It open the door to buffering.
Here is the code one must write to buffer message, and process the buffer each 10s. No more code is needed!
sponsored by arte.tv