diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 30556fbb345490..dc708fb44c19e9 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -124,10 +124,13 @@ And:: executor.submit(wait_on_future) -.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='') +.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', max_queue_size=0) An :class:`Executor` subclass that uses a pool of at most *max_workers* - threads to execute calls asynchronously. + threads to execute calls asynchronously. If all worker threads are busy it + buffers new work items using a queue of maximum size *max_queue_size*. When + this maximum size is reached, map() and submit() will block until more work + items have been processed. .. versionchanged:: 3.5 If *max_workers* is ``None`` or @@ -142,6 +145,12 @@ And:: control the threading.Thread names for worker threads created by the pool for easier debugging. + .. versionadded:: 3.7 + The *max_queue_size* argument was added to allow users to control + the maximum queue size to not accidentally run out of memory when + new work items are added at a higher rate than what the worker + threads can handle. + .. _threadpoolexecutor-example: ThreadPoolExecutor Example @@ -441,4 +450,3 @@ Exception classes in a non-clean fashion (for example, if it was killed from the outside). .. versionadded:: 3.3 - diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 0b5d5373ffdc0a..eadccf64e3fb4d 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -88,13 +88,15 @@ class ThreadPoolExecutor(_base.Executor): # Used to assign unique thread names when thread_name_prefix is not supplied. _counter = itertools.count().__next__ - def __init__(self, max_workers=None, thread_name_prefix=''): + def __init__(self, max_workers=None, thread_name_prefix='', max_queue_size=0): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. + max_queue_size: The maximum number of work items to buffer before + submit() blocks, defaults to 0 (infinite). """ if max_workers is None: # Use this number because ThreadPoolExecutor is often @@ -104,7 +106,7 @@ def __init__(self, max_workers=None, thread_name_prefix=''): raise ValueError("max_workers must be greater than 0") self._max_workers = max_workers - self._work_queue = queue.Queue() + self._work_queue = queue.Queue(maxsize=max_queue_size) self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index ed8ad41f8e6cb5..94522b70db800a 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -238,6 +238,29 @@ def test_thread_names_default(self): self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') t.join() + def test_default_max_queue_size(self): + with futures.ThreadPoolExecutor() as executor: + self.assertEqual(executor._work_queue.maxsize, 0) + + def test_custom_max_queue_size(self): + qsize = 1 + with futures.ThreadPoolExecutor(max_queue_size=qsize) as executor: + # test custom queue size was passed down + self.assertEqual(executor._work_queue.maxsize, qsize) + + # test executor works with custom size + n = 10 + + def process_item(item): + return item + 1 + + fs = [executor.submit(process_item, i) for i in range(n)] + expected_results = [process_item(i) for i in range(n)] + + for f in futures.as_completed(fs, timeout=10): + result = f.result() + self.assertTrue(result in expected_results, result) + class ProcessPoolShutdownTest(ExecutorShutdownTest): def _prime_executor(self): diff --git a/Misc/NEWS.d/next/Library/2017-10-30-22-18-28.bpo-29595.Zd7P54.rst b/Misc/NEWS.d/next/Library/2017-10-30-22-18-28.bpo-29595.Zd7P54.rst new file mode 100644 index 00000000000000..84dba4f2f2988a --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-10-30-22-18-28.bpo-29595.Zd7P54.rst @@ -0,0 +1,4 @@ +The ThreadPoolExecutor constructor accepts a `max_queue_size` keyword to +limit the underlying queue size. This helps keeping the memory consumption +in acceptable bounds when work items are submitted faster than they can be +processed.