-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Be able to start a worker for multiple queues with custom consumption priorities #45882
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
I have also found the need for more precise prioritisation of queue-consuming. See this PR #44294. It's quite different, but perhaps these efforts can be combined somehow? |
There is two points here :
My proposition for the point 1 Reordering receivers by parsing their name and priorities in the Command if the receivers argument contains a transport with a custom priority My proposition for the point 2 Allow a strategy
|
I am no supporter of priorities, but rather weights. A configuration of That being said, I don't favor putting either priorities or weights on the command line but rather implement this in a custom Transport / Strategy. |
Fair point @buffcode regarding the potential queue starvation. I would like to inquiry about your proposition to implement the solution as a custom Transport/Strategy. Hi, @upyx. I would like to discuss the feedback you provided in this ticket. Like buffcode above, you proposed using a custom Transport or Strategy. A question to both of you: could you demonstrate to me what it would take to make the Worker execute/process the queue tasks in a priority order mentioned in my original post, please? I.e.:
What exactly is meant by the proposition to introduce a custom Transport/Strategy? I would like to add that I personally (and arguably, not only me) seek a solution where I can make the Worker(s) execute tasks in the priority I require by a mere change in supervisord.conf config file. I.e. by a mere change in the desired Workers' cli invocation. Let's picture that I have 5 workers across 3 machines, all of which (workers) are meant to consume tasks in different order. Does your proposition involve 5 different Transport php classes? Thank you. |
Hi there! As was mentioned, there are many queue consuming strategies: priorities, grouped priorities, weights, counters, etc. It's hardly possible to implement all of them at the same time. And it isn't necessary. We can provide some abstraction for that, and let users write their own logic. I'll try to make a proof of concept in a week. |
Hello, Yes, an abstraction for retrieving tasks would satisfy all parties, I agree. Perhaps the following code region could be "extracted" and adapted into an abstract method of Thanks for looking into coming up with a proof of concept of a solution. Much appreciated and I'm looking forward. |
@d-ph :
Yes! It's the main idea. I've wrote some code to explain the concept deepper. I haven't run it 🙂 The So, what should be done to implement custom ordering:
Uh, a lot of work... However, most of it can be automated by compile passes. |
1. Not tested 2. No error handling.
Hi @upyx. I finally have found a bit of time to check your solution. Apologies for the delay. Now I see what you meant by an aggregate of receivers. The solution uses the Aggregator Pattern, similar to how on a filesystem both a file and a directory are considered a file (the second being a collection/aggregate of files). Correct me if I'm wrong but the solution appears to eagerly "acquire" all pending envelopes from a single receiver due to the fact that this line in On top of that, and excuse me for being frank here, I dislike how complex the proposed solution is.
I spent a few moments to propose a solution that uses the Strategy Pattern. All it does is offering a way for a dev to replace the procedure inside the worker's "main loop". Most importantly:
Again, apologies for making it difficult. Looking forward to your answers and opinion. |
@d-ph you are in the right way. And yes, using aggregation here is slightly overkill 🙁 But the reason for the high complexity is overcomplicated Middawares and the Worker with features like "redo failured messages". Originally, it looked simple, and it would be nice if it still looked simple. But... we wanted new features, and forgot about the architecture 🥲 From the current point, your solution is better. I
Nested generators paused on every received message and continued after the message had handled. "This line" gets a |
Ok. So shall we wait for the resolution of your PR (#47090), before we attempt to move on with this ticket? I'm personally not in a great hurry, so can wait a few weeks.
Yes, you're absolutely right. My bad. |
Tough luck. So would you say that the solution proposed in my PR is sound for time being (assuming nobody else raises any further concerns of course)? |
Thanks for feedback. May I ask you to share your opinion on the proposed change, please? The bottom line is that one would be able to start a worker like this:
And it would result in the following "task consumption pattern":
That implementation:
Would you say that implementation looks alright to you? Thank you. |
I like the proposition, because it seems non-BC and open to custom implementations. What seems to be missing is a I dislike using JSON on the CLI but that's a personal flavor and I don't have a better idea rather than just passing down the option string to the strategies and let them handle it for themselves (maybe the strategy doesn't have a complex configuration or just a single option). In the future it might be interesting to pass a reference to a fixed config (for switching between pre-configured configurations without writing or updating them on CLI) or even config provider, so those configurations can become truly dynamic (eg. when the system load is high, exclude the processing of some queues but keep others running). |
Yeah, it's just a proof at this stage. I'll add to the MessengerPass and FrameworkBundle's resources/ next. I still need to see if it works when actually used in a project, so that's also that.
I had the same qualms with it (especially having to escape the quotation marks). Json is still, however, a well-adopted (in webdev) format, and it does solve the problem of "how to pass any hashmap to the command as one argument". Your points about passing any string to the strategy as its options, so it may decide on how to deserialise it, is a valid point. I guess going with json enforces a predictable standard (with its own cost). Yeah, a grey area here -- not worthwhile a long debate, when there's already a consensus.
Good ideas. I personally see little value in the fixed config proposition, since in my mind the strategy options for each invocation of a worker would be "bespoke"/custom for that worker, but I certainly see value in the config provider idea that would consume tasks from multiple queues based on system load and other runtime parameters. Thank you for your feedback. Hi guys, Just a quick question to you two, if you have the time. May I know if my proposition satisfy your use cases? |
Hi, I definitely like your proposition (better than I had), so I hope that can be shaped into a nice PR. I also like the approach of a reference to a fixed config or config provider (the same way the |
…es with custom consumption priorities
…es with custom consumption priorities.
…es with custom consumption priorities. Replay symfony#47209 on top of this code change.
Thank you for this suggestion. |
@carsonbot Yes, still interested |
Thank you for this suggestion. |
@carsonbot Yes |
@carsonbot +1 |
Description
Hello,
The documentation says:
I would like to propose that a way to run the
messenger:consume
worker is added, where the end developer can define the priorities by which the worker should consume queue tasks (in particular: define that two or more queues have the same priority). Consider the followingmessenger:consume
invocation that shows the proposed feature in practice:That invocation would start a worker that consume tasks in the following priority:
It all boils down to the following if-statement in the code: link. The worker should continue to consume tasks from the same "priority group level" before breaking the consumption loop (because there was at least one task consumed from that group).
Example
A few more examples on top of the one mentioned above:
This would be implicitly equivalent to
This would start the worker with the following "priority groups":
The text was updated successfully, but these errors were encountered: