-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] allow processing messages in batches #43354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
e97dbbd
to
effa72f
Compare
Could you add a full example? |
src/Symfony/Component/Messenger/Handler/BatchHandlerInterface.php
Outdated
Show resolved
Hide resolved
eca72b4
to
7892621
Compare
This PR is ready, tests included 🎉 |
1e2d542
to
9cf1bc7
Compare
@lyrixx I think you were very interested in this feature :) |
This misses a CHANGELOG entry |
9cf1bc7
to
4d3e050
Compare
CHANGELOG added |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
I'm super interested in this feature as well, the PR looks great, I'm just wondering if it will allow for "grab whatever pending messages exist and process them in a batch (N max, but don't wait for more if there are less than N)" ? Will the --time-limit worker option trigger a batch flush ? |
4d3e050
to
456f94c
Compare
Yes, PR just updated. Batches will be flushed when the worker is idle or when it is stopped. |
638d008
to
fe89ec0
Compare
PR updated to inform handlers if flushing is called while the worker is idle (in which case the handler can decide not to flush) or if the flush happens while the worker is stopping. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I let some comments, but I did not finished to review
$result = null !== $ack ? 0 : null; | ||
$this->jobs[] = [$this->schedule($message), $ack ?? $ack = new Acknowledger(get_debug_type($this))]; | ||
|
||
if (null !== $result && !$this->shouldFlush()) { | ||
return \count($this->jobs); | ||
} | ||
$this->flush(true); | ||
|
||
return $result ?? $ack->getResult(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like is more readable
$result = null !== $ack ? 0 : null; | |
$this->jobs[] = [$this->schedule($message), $ack ?? $ack = new Acknowledger(get_debug_type($this))]; | |
if (null !== $result && !$this->shouldFlush()) { | |
return \count($this->jobs); | |
} | |
$this->flush(true); | |
return $result ?? $ack->getResult(); | |
if ($ack === null) { | |
$ack = new Acknowledger(get_debug_type($this)) | |
$this->jobs[] = [$this->schedule($message), $ack]; | |
$this->flush(true); | |
return $ack->getResult(); | |
} | |
$this->jobs[] = [$this->schedule($message), $ack]; | |
if (!$this->shouldFlush()) { | |
return \count($this->jobs); | |
} | |
$this->flush(true); | |
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, the code when $ack === null
seems broken: You create a new Acknowledger
, so the $ack
argument in it is null, so it's an empty callable, so it never really ack
on the transport?
I think this part can be simplified
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Patch applied thanks.
This logic is the answer to your question:
as soon as I implement the interface, the handler is forced to be used in batch mode?
when $ack
is null
, HandleMessageMiddleware
will not add the NoAutoAckStamp
, and the worker will ack as usual. The batch handler is thus compatible with non-batch mode.
src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php
Outdated
Show resolved
Hide resolved
One question: as soon as I implement the interface, the handler is forced to be used in batch mode? |
e6789e9
to
ce13bc6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice, But I let some comments for the public API.
Then, we could merge it, and I'll have to play with it on a real project to see how it behaves.
thanks for the hard work
$ack = new Acknowledger(get_debug_type($this)); | ||
$this->jobs[] = [$this->schedule($message), $ack]; | ||
$this->flush(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still want to simplify this PR and I think something could be done here.
This trait is named BatchHandlerTrait
so its purpose is to handle batches of messages. I understand that you want to preserve the contact in the HandlerInterface
. But this very method must not allow to pass $ack=null
.
So basically, If one wants to use the handler in a batch way and also in a non batch way (which is not possible BTW), it does not make sens when $ack === null
(in the $handler::__invoke()
) to call handle()
.
So this trait must throw an exception when $ack is null. And it's up to the developer to check if $ack
is null or not, and if it's null they must not call this method.
Finally, all cases where $ack === null
could be removed (here, and in Acknowledger
).
- the code will be simpler, and easier to read / lean
- the contract of
HandlerInterface::__invoke($message)
(where$ack = null
) is preserved - I'm happy
And by doing so, we can renamed handle() (which is very generic and not really intuitive) to schedule()
. (see my comment on the current schedule method).
I really like the following code / API:
class Handler implements BatchHandlerInterace
{
public function __invoke(Message $message, $ack)
{
if ($ack) {
return $this->schedule(new Foobar($message), $ack);
}
// sync processing
}
}
Don't you?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree with throwing. The handler should be usable in non-batch mode per LSP and throwing would break that. The current design makes it a no brainer: implement process()
and no need to care about this concern. Your proposal would require ppl to write extra logic if they want to follow LSP. But LSP is not an option to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree with throwing. The handler should be usable in non-batch mode per LSP and throwing would break that.
Yes, I totally agree with you on that point
The current design makes it a no brainer
I disagree with you. You may think it's easy, but people read code under the hood. and ATM, the code is not simple.
Your proposal would require ppl to write extra logic
Yes one more line of code. Explicit is better than implicit
But LSP is not an option to me.
I don't understand why LSP may be broken. Can you elaborate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The boilerplate to implement both modes is in the trait. That's how it should be. Not doing so means requiring every consumer of the trait to add the same boring logic. That'd fail the purpose of the trait, and more critically, that'll break LSP when ppl won't implement the "else" in your snippet. Since we know ppl are lazy, or make mistakes, this is a recipe for more bugs.
About LSP, the principle is not about php interfaces but about abstractions that should be swappable. If an implementation fails to implement non-batch mode, eg by throwing, then it's not swappable and LSP is broken.
src/Symfony/Component/Messenger/Middleware/HandleMessageMiddleware.php
Outdated
Show resolved
Hide resolved
ce13bc6
to
f9874c5
Compare
@lyrixx thanks for the review, I should have addressed your comments. |
f9874c5
to
b878719
Compare
b878719
to
81e52b2
Compare
Thank you @nicolas-grekas. |
This replaces #42873 as it proposes an alternative approach to handling messages in batch.
BatchHandlerInterface
says it all: if a handler implements this interface, then it should expect a new$ack
optional argument to be provided when__invoke()
is called. When$ack
is not provided,__invoke()
is expected to handle the message synchronously as usual. But when$ack
is provided,__invoke()
is expected to buffer the message and its$ack
function, and to return the number of pending messages in the batch.Batch handlers are responsible for deciding when they flush their buffers, calling the
$ack
functions while doing so.Best reviewed ignoring whitespaces.
Here is what a batch handler might look like:
The size of the batch is controlled by
BatchHandlerTrait::shouldFlush()
(defaults to 10).The transport is acknowledged in batch, after the bus returned from dispatching (unlike what is done in #42873). This is especially important when considering transactions since we don't want to ack unless the transaction committed successfully.
By default, pending batches are flushed when the worker is idle and when it stops.