-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Allow to limit consumer to specific queues #38973
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Outdated
Show resolved
Hide resolved
Thank you. I like the idea and I think your scenario makes a lot of sense. What bothers me a little is that an object implementing the |
thanks for the feedback. glad that you agree on the use case.
me too. but the alternative would be to add an optional parameter to the what do you think about the other approach of extend the configuration to allow configuring receivers for just some queues? i don't know how complicated this will get to implement with the configuration... |
src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Outdated
Show resolved
Hide resolved
just thought of another solution where we could pass the queue names as an option to the worker but without BC breaks or ugly hacks: the QueueAwareInterface could provide a getFromQueues method that has the queues parameter and the worker would call that other method when the queues option is set... that would probably be the cleanest solution if we want to allow specifying the queues. |
I've been thinking of this issue time to time. I find it really hard =) I like you last suggestion. It sounds clean and simple.. I would kindly ask you to move towards that direction and it may be easier to think/discuss when we can look and point at code? |
c1d54a8
to
7e41960
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like you last suggestion. It sounds clean and simple.. I would kindly ask you to move towards that direction and it may be easier to think/discuss when we can look and point at code?
i just pushed those changes. i like it better than my initial attempt.
src/Symfony/Component/Messenger/Transport/Receiver/QueueReceiverInterface.php
Show resolved
Hide resolved
|
||
while (false === $this->shouldStop) { | ||
$envelopeHandled = false; | ||
foreach ($this->receivers as $transportName => $receiver) { | ||
$envelopes = $receiver->get(); | ||
if ($queueNames) { | ||
if (!$receiver instanceof QueueReceiverInterface) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we do this check too often i think, but not sure if it matters. putting it outside of this loop is more code, but maybe also more readable.
but this is a detail, lets decide if we like the approach before going into implementation details.
I like it. This looks way more simple. |
5fbabc2
to
115669e
Compare
ok, this is now ready imho. the build error is confusing to me, it claims that the interface i am adding is not found. not sure what is happening here. |
src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Receiver/QueueReceiverInterface.php
Outdated
Show resolved
Hide resolved
@Nyholm how do we proceed here? i would love to get this merged in the next couple of weeks if possible ;-) |
afdc751
to
3e282ff
Compare
src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you David.
I like the approach and I think it should be merged. There are a few things minor stuff we should fix first though.
src/Symfony/Component/Messenger/Transport/Receiver/QueueReceiverInterface.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Transport/Receiver/QueueReceiverInterface.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Command/ConsumeMessagesCommand.php
Outdated
Show resolved
Hide resolved
85528c7
to
85fb3f5
Compare
112dfce
to
e03768e
Compare
Thank you. Could you update composer.json for the AMQP messenger bridge? The deps=low tests are failing. It should require 5.3 |
e03768e
to
19f0502
Compare
oh, indeed. cool that the build system detects this. should be fixed now. shall i squash commits, or do you do that while merging? |
hm, why does only the PHP 7.3 build fail, but the others work? i fixed the composer.json as you said, tobias and it seems to work for the other builds. |
@Nyholm do you have an idea why the php 7.3 specifically failed and not the others? |
I think that is fine. 7.3 tests "deps=high". |
@Nyholm Yes, I would expect that job to pass once this PR is merged and picked up by the monorepo split. |
I think we can make it work by adding |
19f0502
to
91e6ffd
Compare
@Nyholm the idea by @OskarStark worked, the build is now green 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @dbu Im happy with this.
91e6ffd
to
eb919bf
Compare
eb919bf
to
9af1e20
Compare
Thank you for this. I added a small changelog while merging. |
…(dbu) This PR was merged into the 5.3-dev branch. Discussion ---------- [Messenger] message consume command --queue parameter Feature introduced in symfony/symfony#38973 fix #14919 I was considering whether to provide a full, meaningful example, but don't see where this would fit. Do you see a place where that should go? Otherwise i will write up the example as a blog post. Commits ------- 77007c1 message consume command --queue parameter
(Note: I am aware that there are other solutions for #38630 that might be more elegant. Our usecase does not use fanout, the reason why we need the functionality is different)
Description
We have a large application where one part is creating messages for products that need reindexing. A transport decorator decides before queueing whether this is a large effort or a small effort, based on some metric. Based on that, it adds a routing key which is then used in rabbitmq to put the message into the "small" or "large" queue.
We need two separate consumer processes that consume the small and the large queue, for separate scaling and such.
I looked into how we could achieve that. One option is to offer another option in the consume command. That would need to be forwarded to the receiver somehow, i added an interface for it now. The current PR is an illustration of the idea. If you specify a queue that the receiver does not have, things will fail in an inlegeant. If you specify multiple receivers, you can't specify the queue per receiver (though that starts being an odd usecase imho, you could then run two consumers instead)
Another option could be to allow configuring multiple receivers for the same transport that get the queue name(s) injected into their constructor. Then you can consume them separately. This currently needs a ton of configuration and some custom code to work. I can look at doing a PR to make this approach simpler, if you prefer it over the option to the consume command...