Skip to content

bpo-29595: Expose max_queue_size in ThreadPoolExecutor #143

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,13 @@ And::
executor.submit(wait_on_future)


.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', max_queue_size=0)

An :class:`Executor` subclass that uses a pool of at most *max_workers*
threads to execute calls asynchronously.
threads to execute calls asynchronously. If all worker threads are busy it
buffers new work items using a queue of maximum size *max_queue_size*. When
this maximum size is reached, map() and submit() will block until more work
items have been processed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you mention that the default (0) means an unbounded queue?


.. versionchanged:: 3.5
If *max_workers* is ``None`` or
Expand All @@ -142,6 +145,12 @@ And::
control the threading.Thread names for worker threads created by
the pool for easier debugging.

.. versionadded:: 3.7
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a new function parameter, this should be versionchanged (versionadded means the function itself is new). Unfortunately, it seems we did the same mistake above for thread_name_prefix...

The *max_queue_size* argument was added to allow users to control
the maximum queue size to not accidentally run out of memory when
new work items are added at a higher rate than what the worker
threads can handle.

.. _threadpoolexecutor-example:

ThreadPoolExecutor Example
Expand Down Expand Up @@ -441,4 +450,3 @@ Exception classes
in a non-clean fashion (for example, if it was killed from the outside).

.. versionadded:: 3.3

6 changes: 4 additions & 2 deletions Lib/concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,15 @@ class ThreadPoolExecutor(_base.Executor):
# Used to assign unique thread names when thread_name_prefix is not supplied.
_counter = itertools.count().__next__

def __init__(self, max_workers=None, thread_name_prefix=''):
def __init__(self, max_workers=None, thread_name_prefix='', max_queue_size=0):
"""Initializes a new ThreadPoolExecutor instance.

Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
max_queue_size: The maximum number of work items to buffer before
submit() blocks, defaults to 0 (infinite).
"""
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
Expand All @@ -104,7 +106,7 @@ def __init__(self, max_workers=None, thread_name_prefix=''):
raise ValueError("max_workers must be greater than 0")

self._max_workers = max_workers
self._work_queue = queue.Queue()
self._work_queue = queue.Queue(maxsize=max_queue_size)
self._threads = set()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxsize is the sole arg to the initializer of Queue so you could just supply max_queue_size since it's also pretty descriptive.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the sole arg to the initializer

What if this changes? Is it then still safe to just pass max_queue_size as positional argument?

self._shutdown = False
self._shutdown_lock = threading.Lock()
Expand Down
23 changes: 23 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,29 @@ def test_thread_names_default(self):
self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
t.join()

def test_default_max_queue_size(self):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a higher-level test, i.e. create an executor with a non-zero queue size, submit a number of tasks larger than the queue size, and check that they all get executed in the end.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea :)

with futures.ThreadPoolExecutor() as executor:
self.assertEqual(executor._work_queue.maxsize, 0)

def test_custom_max_queue_size(self):
qsize = 1
with futures.ThreadPoolExecutor(max_queue_size=qsize) as executor:
# test custom queue size was passed down
self.assertEqual(executor._work_queue.maxsize, qsize)

# test executor works with custom size
n = 10

def process_item(item):
return item + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a small sleep() here to help the queue reach its limit at some point?


fs = [executor.submit(process_item, i) for i in range(n)]
expected_results = [process_item(i) for i in range(n)]

for f in futures.as_completed(fs, timeout=10):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, simply, self.assertEqual(sorted(f.result() for f in futures.as_completed(...)), sorted(expected_results))

result = f.result()
self.assertTrue(result in expected_results, result)


class ProcessPoolShutdownTest(ExecutorShutdownTest):
def _prime_executor(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
The ThreadPoolExecutor constructor accepts a `max_queue_size` keyword to
limit the underlying queue size. This helps keeping the memory consumption
in acceptable bounds when work items are submitted faster than they can be
processed.