-
-
Notifications
You must be signed in to change notification settings - Fork 31.8k
bpo-29595: Expose max_queue_size in ThreadPoolExecutor #143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
66c0031
693689c
e76e9a7
c2079a7
6103533
edcf3f9
cde3edd
f10a6f9
2338991
9d1a24b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -124,10 +124,13 @@ And:: | |
executor.submit(wait_on_future) | ||
|
||
|
||
.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='') | ||
.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', max_queue_size=0) | ||
|
||
An :class:`Executor` subclass that uses a pool of at most *max_workers* | ||
threads to execute calls asynchronously. | ||
threads to execute calls asynchronously. If all worker threads are busy it | ||
buffers new work items using a queue of maximum size *max_queue_size*. When | ||
this maximum size is reached, map() and submit() will block until more work | ||
items have been processed. | ||
|
||
.. versionchanged:: 3.5 | ||
If *max_workers* is ``None`` or | ||
|
@@ -142,6 +145,12 @@ And:: | |
control the threading.Thread names for worker threads created by | ||
the pool for easier debugging. | ||
|
||
.. versionadded:: 3.7 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For a new function parameter, this should be |
||
The *max_queue_size* argument was added to allow users to control | ||
the maximum queue size to not accidentally run out of memory when | ||
new work items are added at a higher rate than what the worker | ||
threads can handle. | ||
|
||
.. _threadpoolexecutor-example: | ||
|
||
ThreadPoolExecutor Example | ||
|
@@ -441,4 +450,3 @@ Exception classes | |
in a non-clean fashion (for example, if it was killed from the outside). | ||
|
||
.. versionadded:: 3.3 | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -88,13 +88,15 @@ class ThreadPoolExecutor(_base.Executor): | |
# Used to assign unique thread names when thread_name_prefix is not supplied. | ||
_counter = itertools.count().__next__ | ||
|
||
def __init__(self, max_workers=None, thread_name_prefix=''): | ||
def __init__(self, max_workers=None, thread_name_prefix='', max_queue_size=0): | ||
"""Initializes a new ThreadPoolExecutor instance. | ||
|
||
Args: | ||
max_workers: The maximum number of threads that can be used to | ||
execute the given calls. | ||
thread_name_prefix: An optional name prefix to give our threads. | ||
max_queue_size: The maximum number of work items to buffer before | ||
submit() blocks, defaults to 0 (infinite). | ||
""" | ||
if max_workers is None: | ||
# Use this number because ThreadPoolExecutor is often | ||
|
@@ -104,7 +106,7 @@ def __init__(self, max_workers=None, thread_name_prefix=''): | |
raise ValueError("max_workers must be greater than 0") | ||
|
||
self._max_workers = max_workers | ||
self._work_queue = queue.Queue() | ||
self._work_queue = queue.Queue(maxsize=max_queue_size) | ||
self._threads = set() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
What if this changes? Is it then still safe to just pass |
||
self._shutdown = False | ||
self._shutdown_lock = threading.Lock() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -238,6 +238,29 @@ def test_thread_names_default(self): | |
self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') | ||
t.join() | ||
|
||
def test_default_max_queue_size(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would add a higher-level test, i.e. create an executor with a non-zero queue size, submit a number of tasks larger than the queue size, and check that they all get executed in the end. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea :) |
||
with futures.ThreadPoolExecutor() as executor: | ||
self.assertEqual(executor._work_queue.maxsize, 0) | ||
|
||
def test_custom_max_queue_size(self): | ||
qsize = 1 | ||
with futures.ThreadPoolExecutor(max_queue_size=qsize) as executor: | ||
# test custom queue size was passed down | ||
self.assertEqual(executor._work_queue.maxsize, qsize) | ||
|
||
# test executor works with custom size | ||
n = 10 | ||
|
||
def process_item(item): | ||
return item + 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps add a small sleep() here to help the queue reach its limit at some point? |
||
|
||
fs = [executor.submit(process_item, i) for i in range(n)] | ||
expected_results = [process_item(i) for i in range(n)] | ||
|
||
for f in futures.as_completed(fs, timeout=10): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or, simply, |
||
result = f.result() | ||
self.assertTrue(result in expected_results, result) | ||
|
||
|
||
class ProcessPoolShutdownTest(ExecutorShutdownTest): | ||
def _prime_executor(self): | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
The ThreadPoolExecutor constructor accepts a `max_queue_size` keyword to | ||
limit the underlying queue size. This helps keeping the memory consumption | ||
in acceptable bounds when work items are submitted faster than they can be | ||
processed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you mention that the default (0) means an unbounded queue?