-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Adding MessageCountAwareInterface to get transport message count #30757
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6759738
to
b3fe876
Compare
src/Symfony/Component/Messenger/Transport/Receiver/MessageCountAwareReceiverInterface.php
Outdated
Show resolved
Hide resolved
I don't think we should add it to the core without knowing the actually use-case tbh. |
dc21b88
to
adda867
Compare
Very fair. So, let me try to be more specific... even though I admit, I'm "planning ahead" a bit here. One easy thing to look at is Larave's Horizon, which takes advantage of the "size" of a queue for (A) displaying a health dashboard and (B) for automatically spinning up/down more/less workers for each transport, based on the size. Without this feature (which is pretty simple), creating something like that as a 3rd party bundle, for example, will not be possible. Rebased and comments addressed! |
b7a472b
to
decd53f
Compare
@vincenttouzet could you check the Doctrine part of this PR out? There is an integration test, but would love your +1 technically for it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except the comment on the fetch / fetchColumn this looks good to me and very promising !
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpTransport.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Doctrine/Connection.php
Outdated
Show resolved
Hide resolved
7292b2b
to
acddcae
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(it'd be nice updating the 1st commit message with the new name of the interface)
acddcae
to
fc5b0cf
Compare
Thank you @weaverryan. |
…ransport message count (weaverryan) This PR was squashed before being merged into the 4.3-dev branch (closes #30757). Discussion ---------- [Messenger] Adding MessageCountAwareInterface to get transport message count | Q | A | ------------- | --- | Branch? | master | Bug fix? | no | New feature? | yes | BC breaks? | no | Deprecations? | no | Tests pass? | yes | Fixed tickets | none | License | MIT | Doc PR | symfony/symfony-docs#11236 This adds a new optional interface that receivers should implement to give an approximate number of the messages "waiting" to be handled. Why? Because, with this, you could design a system that dynamically adds/removes worker processes if a specific transport is getting slammed and needs help. Creating that system could be something we discuss for core later, but this at least makes it possible - and means it could be implemented by the user or in a bundle... which I might do if we don't get it in core ;). Commits ------- fc5b0cf [Messenger] Adding MessageCountAwareInterface to get transport message count
|
||
$sender->send($first = new Envelope(new DummyMessage('First'))); | ||
$sender->send($second = new Envelope(new DummyMessage('Second'))); | ||
$sender->send($second = new Envelope(new DummyMessage('Third'))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This adds a new optional interface that receivers should implement to give an approximate number of the messages "waiting" to be handled. Why? Because, with this, you could design a system that dynamically adds/removes worker processes if a specific transport is getting slammed and needs help. Creating that system could be something we discuss for core later, but this at least makes it possible - and means it could be implemented by the user or in a bundle... which I might do if we don't get it in core ;).