From 66c0031a08e6499486cc05db737ace0fb0a8de2d Mon Sep 17 00:00:00 2001 From: Nikolaus Piccolotto Date: Fri, 17 Feb 2017 20:26:04 +0100 Subject: [PATCH 1/9] expose max_queue_size in threadpoolexecutor --- Lib/concurrent/futures/thread.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 03d276b63f63ca..4286d30bfbaf2e 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -81,7 +81,7 @@ def _worker(executor_reference, work_queue): _base.LOGGER.critical('Exception in worker', exc_info=True) class ThreadPoolExecutor(_base.Executor): - def __init__(self, max_workers=None, thread_name_prefix=''): + def __init__(self, max_workers=None, thread_name_prefix='', max_queue_size=0): """Initializes a new ThreadPoolExecutor instance. Args: @@ -97,7 +97,7 @@ def __init__(self, max_workers=None, thread_name_prefix=''): raise ValueError("max_workers must be greater than 0") self._max_workers = max_workers - self._work_queue = queue.Queue() + self._work_queue = queue.Queue(maxsize=max_queue_size) self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() From 693689c74b7dcdfad2ea6d1a2dbdcee2160e9896 Mon Sep 17 00:00:00 2001 From: Nikolaus Piccolotto Date: Sat, 18 Feb 2017 13:23:26 +0100 Subject: [PATCH 2/9] tests --- Lib/concurrent/futures/thread.py | 6 ++++++ Lib/test/test_concurrent_futures.py | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 4286d30bfbaf2e..2e922f7d221e56 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -88,6 +88,8 @@ def __init__(self, max_workers=None, thread_name_prefix='', max_queue_size=0): max_workers: The maximum number of threads that can be used to execute the given calls. thread_name_prefix: An optional name prefix to give our threads. + max_queue_size: The maximum number of work items to buffer before + submit() blocks, defaults to 0 (infinite). """ if max_workers is None: # Use this number because ThreadPoolExecutor is often @@ -96,6 +98,10 @@ def __init__(self, max_workers=None, thread_name_prefix='', max_queue_size=0): if max_workers <= 0: raise ValueError("max_workers must be greater than 0") + # check that max_queue_size is a positive integer + if type(max_queue_size) is not int or max_queue_size < 0: + raise ValueError("max_queue_size must be equal or greater 0") + self._max_workers = max_workers self._work_queue = queue.Queue(maxsize=max_queue_size) self._threads = set() diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 23e95b212447c8..13f6dbeefb135e 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -178,6 +178,25 @@ def test_thread_names_default(self): self.assertRegex(t.name, r'^.*ThreadPoolExecutor.*_[0-4]$') t.join() + def test_default_max_queue_size(self): + executor = futures.ThreadPoolExecutor() + self.assertEqual(executor._work_queue.maxsize, 0) + + def test_custom_max_queue_size(self): + for i in range(0, 10): + executor = futures.ThreadPoolExecutor(max_queue_size=i) + self.assertEqual(executor._work_queue.maxsize, i) + + def test_negative_max_queue_size(self): + for i in range(-1, -10): + with self.assertRaises(ValueError): + futures.ThreadPoolExecutor(max_queue_size=i) + + def test_max_queue_size_not_int(self): + for bad_value in [None, "5", False, True, 3.14]: + with self.assertRaises(ValueError): + futures.ThreadPoolExecutor(max_queue_size=bad_value) + class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase): def _prime_executor(self): From e76e9a7a1b2b78d3374229bf139f0bbedce0e561 Mon Sep 17 00:00:00 2001 From: Nikolaus Piccolotto Date: Sat, 18 Feb 2017 13:28:55 +0100 Subject: [PATCH 3/9] wording --- Lib/test/test_concurrent_futures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 13f6dbeefb135e..4ca86607bfe105 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -192,7 +192,7 @@ def test_negative_max_queue_size(self): with self.assertRaises(ValueError): futures.ThreadPoolExecutor(max_queue_size=i) - def test_max_queue_size_not_int(self): + def test_invalid_max_queue_size(self): for bad_value in [None, "5", False, True, 3.14]: with self.assertRaises(ValueError): futures.ThreadPoolExecutor(max_queue_size=bad_value) From c2079a74f723c37d9b7b219d4343f3074422246c Mon Sep 17 00:00:00 2001 From: Nikolaus Piccolotto Date: Sun, 19 Feb 2017 13:12:44 +0100 Subject: [PATCH 4/9] review fixes --- Lib/concurrent/futures/thread.py | 4 ---- Lib/test/test_concurrent_futures.py | 15 +++++---------- 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 2e922f7d221e56..96140c11eb0239 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -98,10 +98,6 @@ def __init__(self, max_workers=None, thread_name_prefix='', max_queue_size=0): if max_workers <= 0: raise ValueError("max_workers must be greater than 0") - # check that max_queue_size is a positive integer - if type(max_queue_size) is not int or max_queue_size < 0: - raise ValueError("max_queue_size must be equal or greater 0") - self._max_workers = max_workers self._work_queue = queue.Queue(maxsize=max_queue_size) self._threads = set() diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 4ca86607bfe105..dbb8ab28878373 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -183,19 +183,14 @@ def test_default_max_queue_size(self): self.assertEqual(executor._work_queue.maxsize, 0) def test_custom_max_queue_size(self): - for i in range(0, 10): + # test custom valid and invalid values + for i in range(-10, 10): executor = futures.ThreadPoolExecutor(max_queue_size=i) self.assertEqual(executor._work_queue.maxsize, i) - - def test_negative_max_queue_size(self): - for i in range(-1, -10): - with self.assertRaises(ValueError): - futures.ThreadPoolExecutor(max_queue_size=i) - - def test_invalid_max_queue_size(self): + # provided value is forwarded without checking it for bad_value in [None, "5", False, True, 3.14]: - with self.assertRaises(ValueError): - futures.ThreadPoolExecutor(max_queue_size=bad_value) + executor = futures.ThreadPoolExecutor(max_queue_size=bad_value) + self.assertEqual(executor._work_queue.maxsize, bad_value) class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase): From 6103533b364c19c48f5d61eb1dbef8cfc71c9727 Mon Sep 17 00:00:00 2001 From: Nikolaus Piccolotto Date: Mon, 20 Feb 2017 21:17:12 +0100 Subject: [PATCH 5/9] update docs --- Doc/library/concurrent.futures.rst | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index d85576b8bedd8e..0107d48ad0311a 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -124,10 +124,11 @@ And:: executor.submit(wait_on_future) -.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='') +.. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='', max_queue_size=0) An :class:`Executor` subclass that uses a pool of at most *max_workers* - threads to execute calls asynchronously. + threads to execute calls asynchronously. If all worker threads are busy it + buffers new work items using a queue of maximum size *max_queue_size*. .. versionchanged:: 3.5 If *max_workers* is ``None`` or @@ -141,6 +142,12 @@ And:: The *thread_name_prefix* argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging. + + .. versionadded:: 3.7 + The *max_queue_size* argument was added to allow users to control + the maximum queue size to not accidentally run out of memory when + new work items are added at a higher rate than what the worker + threads can handle. .. _threadpoolexecutor-example: From edcf3f91871a4f2d1b589a61bea7f3feb178ba3a Mon Sep 17 00:00:00 2001 From: Nikolaus Piccolotto Date: Mon, 27 Feb 2017 15:58:06 +0100 Subject: [PATCH 6/9] fix whitespace --- Doc/library/concurrent.futures.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 0107d48ad0311a..b563f1ec787095 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -142,7 +142,7 @@ And:: The *thread_name_prefix* argument was added to allow users to control the threading.Thread names for worker threads created by the pool for easier debugging. - + .. versionadded:: 3.7 The *max_queue_size* argument was added to allow users to control the maximum queue size to not accidentally run out of memory when From f10a6f9ed4ca2e9384d3d4e64d0da39724377ae0 Mon Sep 17 00:00:00 2001 From: Nikolaus Piccolotto Date: Mon, 30 Oct 2017 20:23:22 +0100 Subject: [PATCH 7/9] Document blocking behavior --- Doc/library/concurrent.futures.rst | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 6fcf6c098f9711..dc708fb44c19e9 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -128,7 +128,9 @@ And:: An :class:`Executor` subclass that uses a pool of at most *max_workers* threads to execute calls asynchronously. If all worker threads are busy it - buffers new work items using a queue of maximum size *max_queue_size*. + buffers new work items using a queue of maximum size *max_queue_size*. When + this maximum size is reached, map() and submit() will block until more work + items have been processed. .. versionchanged:: 3.5 If *max_workers* is ``None`` or @@ -448,4 +450,3 @@ Exception classes in a non-clean fashion (for example, if it was killed from the outside). .. versionadded:: 3.3 - From 23389910bfffcdb83f6e8cfbe6adc044bd8a54c0 Mon Sep 17 00:00:00 2001 From: Nikolaus Piccolotto Date: Mon, 30 Oct 2017 22:07:33 +0100 Subject: [PATCH 8/9] Add highlevel test for threadpool queue size --- Lib/test/test_concurrent_futures.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index cbeab539fb2a29..94522b70db800a 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -239,18 +239,27 @@ def test_thread_names_default(self): t.join() def test_default_max_queue_size(self): - executor = futures.ThreadPoolExecutor() - self.assertEqual(executor._work_queue.maxsize, 0) + with futures.ThreadPoolExecutor() as executor: + self.assertEqual(executor._work_queue.maxsize, 0) def test_custom_max_queue_size(self): - # test custom valid and invalid values - for i in range(-10, 10): - executor = futures.ThreadPoolExecutor(max_queue_size=i) - self.assertEqual(executor._work_queue.maxsize, i) - # provided value is forwarded without checking it - for bad_value in [None, "5", False, True, 3.14]: - executor = futures.ThreadPoolExecutor(max_queue_size=bad_value) - self.assertEqual(executor._work_queue.maxsize, bad_value) + qsize = 1 + with futures.ThreadPoolExecutor(max_queue_size=qsize) as executor: + # test custom queue size was passed down + self.assertEqual(executor._work_queue.maxsize, qsize) + + # test executor works with custom size + n = 10 + + def process_item(item): + return item + 1 + + fs = [executor.submit(process_item, i) for i in range(n)] + expected_results = [process_item(i) for i in range(n)] + + for f in futures.as_completed(fs, timeout=10): + result = f.result() + self.assertTrue(result in expected_results, result) class ProcessPoolShutdownTest(ExecutorShutdownTest): From 9d1a24b837464323a0063a26a56e3b8383072860 Mon Sep 17 00:00:00 2001 From: Nikolaus Piccolotto Date: Mon, 30 Oct 2017 22:19:01 +0100 Subject: [PATCH 9/9] Add news entry --- .../next/Library/2017-10-30-22-18-28.bpo-29595.Zd7P54.rst | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2017-10-30-22-18-28.bpo-29595.Zd7P54.rst diff --git a/Misc/NEWS.d/next/Library/2017-10-30-22-18-28.bpo-29595.Zd7P54.rst b/Misc/NEWS.d/next/Library/2017-10-30-22-18-28.bpo-29595.Zd7P54.rst new file mode 100644 index 00000000000000..84dba4f2f2988a --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-10-30-22-18-28.bpo-29595.Zd7P54.rst @@ -0,0 +1,4 @@ +The ThreadPoolExecutor constructor accepts a `max_queue_size` keyword to +limit the underlying queue size. This helps keeping the memory consumption +in acceptable bounds when work items are submitted faster than they can be +processed.