Skip to content

[Messenger] Allow to limit consumer to specific queues #38973

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 2, 2021

Conversation

dbu
Copy link
Contributor

@dbu dbu commented Nov 3, 2020

Q A
Branch? 5.x for features
Bug fix? no
New feature? yes (TODO: changelog)
Deprecations? no
Tickets Fix #38630 (i think)
License MIT
Doc PR symfony/symfony-docs#14921

(Note: I am aware that there are other solutions for #38630 that might be more elegant. Our usecase does not use fanout, the reason why we need the functionality is different)

Description
We have a large application where one part is creating messages for products that need reindexing. A transport decorator decides before queueing whether this is a large effort or a small effort, based on some metric. Based on that, it adds a routing key which is then used in rabbitmq to put the message into the "small" or "large" queue.

We need two separate consumer processes that consume the small and the large queue, for separate scaling and such.

I looked into how we could achieve that. One option is to offer another option in the consume command. That would need to be forwarded to the receiver somehow, i added an interface for it now. The current PR is an illustration of the idea. If you specify a queue that the receiver does not have, things will fail in an inlegeant. If you specify multiple receivers, you can't specify the queue per receiver (though that starts being an odd usecase imho, you could then run two consumers instead)

Another option could be to allow configuring multiple receivers for the same transport that get the queue name(s) injected into their constructor. Then you can consume them separately. This currently needs a ton of configuration and some custom code to work. I can look at doing a PR to make this approach simpler, if you prefer it over the option to the consume command...

@Nyholm
Copy link
Member

Nyholm commented Nov 3, 2020

Thank you.

I like the idea and I think your scenario makes a lot of sense. What bothers me a little is that an object implementing the QueueAwareInterface will be stateful.

@dbu
Copy link
Contributor Author

dbu commented Nov 4, 2020

thanks for the feedback. glad that you agree on the use case.

What bothers me a little is that an object implementing the QueueAwareInterface will be stateful.

me too. but the alternative would be to add an optional parameter to the ReceiverInterface::get, but that is a BC break: https://3v4l.org/54WYq . what would work would be a marker interface and then handle the parameter in get even when it is not part of the interface: https://3v4l.org/pkVNp but that seems quite ugly. and we would need to also make the Worker aware of the queue option.

what do you think about the other approach of extend the configuration to allow configuring receivers for just some queues? i don't know how complicated this will get to implement with the configuration...

@jderusse jderusse added this to the 5.x milestone Nov 4, 2020
@dbu
Copy link
Contributor Author

dbu commented Nov 11, 2020

just thought of another solution where we could pass the queue names as an option to the worker but without BC breaks or ugly hacks: the QueueAwareInterface could provide a getFromQueues method that has the queues parameter and the worker would call that other method when the queues option is set... that would probably be the cleanest solution if we want to allow specifying the queues.

@Nyholm
Copy link
Member

Nyholm commented Nov 11, 2020

I've been thinking of this issue time to time. I find it really hard =)

I like you last suggestion. It sounds clean and simple.. I would kindly ask you to move towards that direction and it may be easier to think/discuss when we can look and point at code?

@dbu dbu force-pushed the messenger-consume-single-queue branch from c1d54a8 to 7e41960 Compare November 11, 2020 10:36
Copy link
Contributor Author

@dbu dbu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like you last suggestion. It sounds clean and simple.. I would kindly ask you to move towards that direction and it may be easier to think/discuss when we can look and point at code?

i just pushed those changes. i like it better than my initial attempt.


while (false === $this->shouldStop) {
$envelopeHandled = false;
foreach ($this->receivers as $transportName => $receiver) {
$envelopes = $receiver->get();
if ($queueNames) {
if (!$receiver instanceof QueueReceiverInterface) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do this check too often i think, but not sure if it matters. putting it outside of this loop is more code, but maybe also more readable.

but this is a detail, lets decide if we like the approach before going into implementation details.

@dbu
Copy link
Contributor Author

dbu commented Nov 12, 2020

@Nyholm @sroze what do you think of this approach? if you are happy with the direction, i will look at the test failures, add some tests for it and tweak the details and do a documentation pull request. (but first i'd like to know if you agree with the approach in general)

@Nyholm
Copy link
Member

Nyholm commented Nov 12, 2020

I like it.

This looks way more simple.
👍

@dbu dbu force-pushed the messenger-consume-single-queue branch 2 times, most recently from 5fbabc2 to 115669e Compare November 16, 2020 09:31
@dbu
Copy link
Contributor Author

dbu commented Nov 16, 2020

ok, this is now ready imho.

the build error is confusing to me, it claims that the interface i am adding is not found. not sure what is happening here.

@dbu
Copy link
Contributor Author

dbu commented Dec 7, 2020

@Nyholm how do we proceed here? i would love to get this merged in the next couple of weeks if possible ;-)

@dbu dbu force-pushed the messenger-consume-single-queue branch 2 times, most recently from afdc751 to 3e282ff Compare December 7, 2020 08:53
Copy link
Member

@Nyholm Nyholm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you David.

I like the approach and I think it should be merged. There are a few things minor stuff we should fix first though.

@carsonbot carsonbot changed the title Allow to limit consumer to specific queues [Messenger] Allow to limit consumer to specific queues Dec 31, 2020
@dbu dbu force-pushed the messenger-consume-single-queue branch from 85528c7 to 85fb3f5 Compare January 12, 2021 08:57
@dbu dbu force-pushed the messenger-consume-single-queue branch from 112dfce to e03768e Compare January 12, 2021 09:23
@Nyholm
Copy link
Member

Nyholm commented Jan 12, 2021

Thank you. Could you update composer.json for the AMQP messenger bridge? The deps=low tests are failing. It should require 5.3

https://github.com/symfony/symfony/blob/5.x/src/Symfony/Component/Messenger/Bridge/Amqp/composer.json#L21

@dbu dbu force-pushed the messenger-consume-single-queue branch from e03768e to 19f0502 Compare January 12, 2021 11:16
@dbu
Copy link
Contributor Author

dbu commented Jan 12, 2021

Could you update composer.json for the AMQP messenger bridge? The deps=low tests are failing. It should require 5.3

oh, indeed. cool that the build system detects this. should be fixed now.

shall i squash commits, or do you do that while merging?

@dbu
Copy link
Contributor Author

dbu commented Jan 13, 2021

hm, why does only the PHP 7.3 build fail, but the others work? i fixed the composer.json as you said, tobias and it seems to work for the other builds.

@dbu
Copy link
Contributor Author

dbu commented Jan 22, 2021

@Nyholm do you have an idea why the php 7.3 specifically failed and not the others?

@Nyholm
Copy link
Member

Nyholm commented Jan 22, 2021

I think that is fine. 7.3 tests "deps=high".
@derrabus could you confirm that it is safe to merge this even though one travis job is failing?

@derrabus
Copy link
Member

@Nyholm Yes, I would expect that job to pass once this PR is merged and picked up by the monorepo split.

@dbu
Copy link
Contributor Author

dbu commented Feb 1, 2021

@Nyholm Yes, I would expect that job to pass once this PR is merged and picked up by the monorepo split.

@Nyholm ok to merge then, or should i still change something?

@OskarStark
Copy link
Contributor

I think we can make it work by adding source here: https://github.com/symfony/symfony/blob/5.x/.github/composer-config.json

@dbu dbu force-pushed the messenger-consume-single-queue branch from 19f0502 to 91e6ffd Compare February 2, 2021 12:10
@dbu
Copy link
Contributor Author

dbu commented Feb 2, 2021

@Nyholm the idea by @OskarStark worked, the build is now green 😄

Copy link
Member

@Nyholm Nyholm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @dbu Im happy with this.

@dbu dbu force-pushed the messenger-consume-single-queue branch from 91e6ffd to eb919bf Compare February 2, 2021 13:58
@Nyholm Nyholm force-pushed the messenger-consume-single-queue branch from eb919bf to 9af1e20 Compare February 2, 2021 20:34
@Nyholm
Copy link
Member

Nyholm commented Feb 2, 2021

Thank you for this. I added a small changelog while merging.

@Nyholm Nyholm merged commit 84faecf into symfony:5.x Feb 2, 2021
@dbu dbu deleted the messenger-consume-single-queue branch February 3, 2021 07:42
OskarStark added a commit to symfony/symfony-docs that referenced this pull request Feb 4, 2021
…(dbu)

This PR was merged into the 5.3-dev branch.

Discussion
----------

[Messenger] message consume command --queue parameter

Feature introduced in symfony/symfony#38973

fix #14919

I was considering whether to provide a full, meaningful example, but don't see where this would fit. Do you see a place where that should go? Otherwise i will write up the example as a blog post.

Commits
-------

77007c1 message consume command --queue parameter
@fabpot fabpot mentioned this pull request Apr 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Messenger] Consume a queue from fanout exchange
8 participants