-
-
Notifications
You must be signed in to change notification settings - Fork 31.8k
bpo-29595: Expose max_queue_size in ThreadPoolExecutor #143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bpo-29595: Expose max_queue_size in ThreadPoolExecutor #143
Conversation
Hello, and thanks for your contribution! I'm a bot set up to make sure that the project can legally accept your contribution by verifying you have signed the PSF contributor agreement (CLA). Unfortunately we couldn't find an account corresponding to your GitHub username on bugs.python.org (b.p.o) to verify you have signed the CLA. This is necessary for legal reasons before we can look at your contribution. Please follow these steps to help rectify the issue:
Thanks again to your contribution and we look forward to looking at it! |
Hi, can you create an issue about this in the issue tracker https://bugs.pyhon.org |
I hope I did this right: http://bugs.python.org/issue29595 By the way I signed the CLA 🎉 |
Although the keyword |
Yes, that's a good point. I'd probably check for not None and > 0. Would you rather silently not set |
I'll let the more experienced devs weigh-in on this, but imho 'explicit is better than implicit' and I'd prob raise an exception to alert the client code that an invalid value was passed for a keyword arg. If you fail fast, this kind of a nonsensical argument will most likely be caught early instead of puzzling people when it causes bizarre behaviour in production (due to the implicit setting of keyword args). |
Updated the PR and added tests. Should I update the docs somewhere? |
Lib/concurrent/futures/thread.py
Outdated
@@ -96,8 +98,12 @@ def __init__(self, max_workers=None, thread_name_prefix=''): | |||
if max_workers <= 0: | |||
raise ValueError("max_workers must be greater than 0") | |||
|
|||
# check that max_queue_size is a positive integer | |||
if type(max_queue_size) is not int or max_queue_size < 0: | |||
raise ValueError("max_queue_size must be equal or greater 0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slight rewording to "greater or equal to 0" is possibly better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would remove this altogether again as you suggested.
self._max_workers = max_workers | ||
self._work_queue = queue.Queue() | ||
self._work_queue = queue.Queue(maxsize=max_queue_size) | ||
self._threads = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maxsize
is the sole arg to the initializer of Queue
so you could just supply max_queue_size
since it's also pretty descriptive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the sole arg to the initializer
What if this changes? Is it then still safe to just pass max_queue_size
as positional argument?
Lib/concurrent/futures/thread.py
Outdated
@@ -96,8 +98,12 @@ def __init__(self, max_workers=None, thread_name_prefix=''): | |||
if max_workers <= 0: | |||
raise ValueError("max_workers must be greater than 0") | |||
|
|||
# check that max_queue_size is a positive integer | |||
if type(max_queue_size) is not int or max_queue_size < 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it strictly be an int
? Classes that inherit from int
won't pass this condition unless isinstance
is used. Either way I'm not sure if doing any check for the type is a good idea since the Queue
class generally doesn't (and fails when you put
something in it)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also fine if we leave verification to the Queue
.
Also, should I add the same feature to the |
Since you've changed the signature of a class, the relevant docs need the required updates too :-) I'd suggest not changing anything else until a core-dev also comes through and provides a review and an opinion on the change. If one doesn't come around for a while, "ping" the issue on the issue tracker to try and get attention to it. |
Updated the docs and waiting for a core dev. |
Fixed the build and pinging for authoritative feedback. |
Any updates? It's been a while and I also pinged it on the issue tracker to no avail... |
If this feature is approved, we should give some thought to making it a keyword-only argument. That would improve code clarity and leave the positional arguments open for future expansion if needed. Also, if we ever need to make a change, keyword arguments are easier to deprecate than positional arguments (if other positional arguments had been added in the interim). |
I think this is a nice feature addition overall. |
Yes, that would be reasonable, as it is vulnerable to the same problem. |
Doc/library/concurrent.futures.rst
Outdated
|
||
An :class:`Executor` subclass that uses a pool of at most *max_workers* | ||
threads to execute calls asynchronously. | ||
threads to execute calls asynchronously. If all worker threads are busy it | ||
buffers new work items using a queue of maximum size *max_queue_size*. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please document that map
and submit
will block if the queue size is reached.
Lib/test/test_concurrent_futures.py
Outdated
for i in range(-10, 10): | ||
executor = futures.ThreadPoolExecutor(max_queue_size=i) | ||
self.assertEqual(executor._work_queue.maxsize, i) | ||
# provided value is forwarded without checking it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am surprised by this. @rhettinger, is there any reason queue.Queue
doesn't check for the size type?
>>> q = Queue(maxsize="foo")
>>> q.put(1)
Traceback (most recent call last):
File "<ipython-input-4-f7b62bda2b8c>", line 1, in <module>
q.put(1)
File "/home/antoine/miniconda3/envs/dask35/lib/python3.5/queue.py", line 127, in put
if self.maxsize > 0:
TypeError: unorderable types: str() > int()
Lib/test/test_concurrent_futures.py
Outdated
def test_custom_max_queue_size(self): | ||
# test custom valid and invalid values | ||
for i in range(-10, 10): | ||
executor = futures.ThreadPoolExecutor(max_queue_size=i) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it mean to have a negative queue size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing really, but since the Queue
constructor doesn't check for validity of arguments I wanted to add a test for invalid arguments. (To be sure that I didn't break this behavior.)
@@ -178,6 +178,20 @@ def test_thread_names_default(self): | |||
self.assertRegex(t.name, r'^.*ThreadPoolExecutor.*_[0-4]$') | |||
t.join() | |||
|
|||
def test_default_max_queue_size(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add a higher-level test, i.e. create an executor with a non-zero queue size, submit a number of tasks larger than the queue size, and check that they all get executed in the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea :)
I think I have everything for the ThreadPoolExecutor, at least. As for the ProcessPoolExecutor it seems like the Regarding multiprocessing.Pool I can mechanically apply the same change, but I have really no idea what's going on there. |
threads to execute calls asynchronously. If all worker threads are busy it | ||
buffers new work items using a queue of maximum size *max_queue_size*. When | ||
this maximum size is reached, map() and submit() will block until more work | ||
items have been processed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you mention that the default (0) means an unbounded queue?
@@ -142,6 +145,12 @@ And:: | |||
control the threading.Thread names for worker threads created by | |||
the pool for easier debugging. | |||
|
|||
.. versionadded:: 3.7 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a new function parameter, this should be versionchanged
(versionadded
means the function itself is new). Unfortunately, it seems we did the same mistake above for thread_name_prefix
...
fs = [executor.submit(process_item, i) for i in range(n)] | ||
expected_results = [process_item(i) for i in range(n)] | ||
|
||
for f in futures.as_completed(fs, timeout=10): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or, simply, self.assertEqual(sorted(f.result() for f in futures.as_completed(...)), sorted(expected_results))
n = 10 | ||
|
||
def process_item(item): | ||
return item + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps add a small sleep() here to help the queue reach its limit at some point?
I agree |
Stackless contributes two tests to builtins: TaskletExit and TaskletExit.__init__. Therefore we have to adjust the limit. Add missing changelog entries (python#143, python#144).
…on#143 - avoid type punning / strict aliasing violations - don't rely on implementation defined placement of bit-fields in the storage unit.
Stackless contributes two tests to builtins: TaskletExit and TaskletExit.__init__. Therefore we have to adjust the limit. Add missing changelog entries (python#143, python#144). (cherry picked from commit 7327e4b)
The recent changes by @pitrou have replaced the After some investigation, one possibility would be to revert to using a For the API change, my only concern is that the @prayerslayer are you still interested on working on this issue? If not, let me know, I am interested in taking over from here. |
Using a RLock does not make a routine signal-safe. The typical situation is:
(you can replace "signal" with "cyclic garbage collection" above for the same effect) In other words, the problem is not the lock, but the operations that are protected by the lock. SimpleQueue was designed specifically for |
Thanks for the explanation, I did not think about it as a signal interruption. Thus, to get this kind of API changes, we need to implement a size mechanism in the |
Hi everyone. I stumbled on this pull request after messing around with this concept myself for the last few days. I may not be testing this correctly but here's what I'm doing. from time import time, strftime, sleep, gmtime
from random import randint
from concurrent.futures import ThreadPoolExecutor, as_completed
import queue
class ThreadPoolExecutorWithQueueLimit(ThreadPoolExecutor):
def __init__(self, max_queue_size, *args, **kwargs):
super(ThreadPoolExecutorWithQueueLimit, self).__init__(*args, **kwargs)
self._work_queue = queue.Queue(maxsize=max_queue_size)
def nap(nap_length):
sleep(nap_length)
return nap_length
if __name__ == '__main__':
startTime = time()
range_size = 100
max_pool_size = 10
max_worker_count = 100
with ThreadPoolExecutorWithQueueLimit(max_queue_size=max_pool_size,
max_workers=max_worker_count) as pool_executor:
pool = {}
for i in range(range_size):
function_call = pool_executor.submit(nap, randint(0, 2))
pool[function_call] = i
for completed_function in as_completed(pool):
result = completed_function.result()
i = pool[completed_function]
print('{} completed @ {} and slept for {}'.format(
str(i).zfill(4),
strftime("%H:%M:%S", gmtime()),
result))
print('==--- Script took {} seconds. ---=='.format(
round(time() - startTime))) In this case the queue fills with all 100 threads even though I've set it to 10. Am I completely off base with my expectations? What have I done wrong? |
I'm interested in getting this merged somehow, but since I didn't do anything for a year it's probably unrealistic that I'll finish it at all. Please go ahead :) |
Hello, I posted a similar patch for I'd like to add my own 2 cents in regards of this feature as @pitrou suggested to implement it in Allowing the user to set a maximum size to the internal queue communicates a false sense of control over the amount of tasks which will be submitted to a The reason behind this flaw is that several jobs are pulled from the internal job queue and shovelled down the As the size of the jobs may vary as well as for the size of the I experienced this issue when releasing A much simpler and explicit way to achieve this goal is by using a Semaphore. This can be easily implemented both for Note that the above mentioned issue is affecting only process based pools, thread pools will actually work as expected. I could easily reproduce the issue with |
@noxdafox Using a semaphore sounds reasonable. Feel free to post a PR. |
@pitrou the question is do we need to integrate this feature in the pools considering how trivial is to achieve such functionality with few lines of code? IMHO this fits more as a recipe or gist than a core functionality of the pool. I am more concerned about the slow creeping of features leading to a complex class to maintain in the future. |
I agree with the maintainability concern. The |
I agree that this is a bad idea to change the size of the However, I am unsure of what should be the expected behavior, on two points of this API change:
Overall, this addition seems to also complicate the call to |
This gist shows how trivial is for a user to achieve such functionality. There would be several other questions to sort out to properly add this feature to the Pools. What about IMHO the benefit of having this feature built-in is not worth its maintenance. |
- avoid type punning / strict aliasing violations - don't rely on implementation defined placement of bit-fields in the storage unit. (cherry picked from commit 5512131)
Since @prayerslayer said "it's probably unrealistic that [he will] finish" this PR, I'm going to close it (obviously people can open their own PRs to implement this functionality if they choose to). |
Hi!
Please forgive this blunt pull request, I wanted to open an issue first and ask, but couldn't.
The situation I ran into recently was that I used
ThreadPoolExecutor
to parallelize AWS API calls; I had to move data from one S3 bucket to another (~150M objects). Contrary to what I expected the maximum size of the underlying queue doesn't have a non-zero value by default. Thus my process ended up consuming gigabytes of memory, because it put more items into the queue than the threads were able to work off: The queue just kept growing. (It ran on K8s and the pod was rightfully killed eventually.)Of course there ways to work around this. One could use more threads, to some extent. Or you could use your own queue with a defined maximum size. But I think it's more work for users of Python than necessary.
So this pull request exposes a
max_queue_size
parameter for theThreadPoolExecutor
which will be forwarded toqueue.Queue()
. It defaults to0
, so backward-compatibility is ensured. I am happy to add tests if you'd give me further instructions. I would've done this already, but I'm largely unfamiliar with this project as well as the language 😅I hope you find this as useful as I would and am looking forward to read your thoughts about it!
https://bugs.python.org/issue29595