From 8ac1fcf62462e089a53a9fb8a9c488042bdb40af Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Wed, 30 Apr 2025 13:17:53 -0400 Subject: [PATCH 01/26] gh-132969 Fix exception/hang when shutdown(wait=False) and a task exited abnormally When shutdown is called with wait=False, the executor thread keeps running even after the ProcessPoolExecutor's state is reset. The executor then tries to replenish the worker processes pool resulting in an error and a potential hang when it comes across a worker that has died. Fixed the issue by having _adjust_process_count() return without doing anything if the ProcessPoolExecutor's state has been reset. Added unit tests to validate two scenarios: max_workers < num_tasks (exception) max_workers > num_tasks (exception + hang) --- Lib/concurrent/futures/process.py | 4 ++ .../test_concurrent_futures/test_shutdown.py | 47 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 76b7b2abe836d8..45a6cb8080f7ec 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -755,6 +755,10 @@ def _start_executor_manager_thread(self): self._executor_manager_thread_wakeup def _adjust_process_count(self): + # gh-132969 + if self._processes is None: + return + # if there's an idle process, we don't need to spawn a new one. if self._idle_worker_semaphore.acquire(blocking=False): return diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index 7a4065afd46fc8..8d30b672d6f4b3 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -20,6 +20,16 @@ def sleep_and_print(t, msg): sys.stdout.flush() +def failing_task_132969(n: int) -> int: + raise ValueError("failing task") + + +def good_task_132969(n: int) -> int: + time.sleep(0.1 * n) + return n + + + class ExecutorShutdownTest: def test_run_after_shutdown(self): self.executor.shutdown() @@ -330,6 +340,43 @@ def test_shutdown_no_wait(self): # shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + def _run_test_issue_132969(self, max_workers: int) -> int: + # max_workers=2 will repro exception + # max_workers=4 will repro exception and then hang + + import multiprocessing as mp + + # Repro conditions + # max_tasks_per_child=1 + # a task ends abnormally + # shutdown(wait=False) is called + executor = futures.ProcessPoolExecutor( + max_workers=max_workers, + max_tasks_per_child=1, + mp_context=mp.get_context("forkserver")) + f1 = executor.submit(good_task_132969, 1) + f2 = executor.submit(failing_task_132969, 2) + f3 = executor.submit(good_task_132969, 3) + result:int = 0 + try: + result += f1.result() + result += f2.result() + result += f3.result() + except ValueError: + # stop processing results upon first exception + pass + + executor.shutdown(wait=False) + return result + + def test_shutdown_len_exception_132969(self): + result = self._run_test_issue_132969(2) + self.assertEqual(result, 1) + + def test_shutdown_process_hang_132969(self): + result = self._run_test_issue_132969(4) + self.assertEqual(result, 1) + create_executor_tests(globals(), ProcessPoolShutdownTest, executor_mixins=(ProcessPoolForkMixin, From 177444507ba32d208e19fd0b63d5dbec7626aa8f Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Wed, 30 Apr 2025 19:32:19 +0000 Subject: [PATCH 02/26] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst diff --git a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst new file mode 100644 index 00000000000000..d0c89ad1790838 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst @@ -0,0 +1 @@ +Fixes error+hang when ProcessPoolExecutor shutdown called with wait=False and a task ended abnormally. From 0938eeb30f7019ad1b58d0af1cc8c9e089b229c7 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Wed, 30 Apr 2025 19:32:19 +0000 Subject: [PATCH 03/26] gh-132969 Skip forkserver-dependent test on Windows --- .../next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst diff --git a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst new file mode 100644 index 00000000000000..d0c89ad1790838 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst @@ -0,0 +1 @@ +Fixes error+hang when ProcessPoolExecutor shutdown called with wait=False and a task ended abnormally. From 99ab3211dee463ebe650e49d979952be4789d033 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Wed, 30 Apr 2025 16:57:34 -0400 Subject: [PATCH 04/26] gh-132969 Skip forkserver dependent tests on Windows --- Lib/test/test_concurrent_futures/test_shutdown.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index 8d30b672d6f4b3..5de43af99b209f 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -341,6 +341,9 @@ def test_shutdown_no_wait(self): assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) def _run_test_issue_132969(self, max_workers: int) -> int: + if sys.platform == "win32": + raise unittest.SkipTest("skip test since forkserver is not available on Windows") + # max_workers=2 will repro exception # max_workers=4 will repro exception and then hang From c177c91664865e397fef44e9413a92efddfea429 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Thu, 8 May 2025 23:09:51 -0400 Subject: [PATCH 05/26] gh-132969 Added more verbiage to the explanation. --- .../next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst index d0c89ad1790838..590f6429fd1bcf 100644 --- a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst +++ b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst @@ -1 +1 @@ -Fixes error+hang when ProcessPoolExecutor shutdown called with wait=False and a task ended abnormally. +Prevent the `ProcessPoolExecutor` executor thread from tying to adjust worker processes after `shutdown` is called and object state has already been reset. Calling `shutdown` with `wait=False` resets the state of the `ProcessPoolExecutor` while still leaving the executor thread running. A combination of conditions including a worker process having terminated abormally and `shutdown(wait=False)` resulted in an exception when the executor thread tried to replace dead workers, and additionally the process could also potentially hang. From 2cd70b10970e17cabefb2a818b670fb3044132f0 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Thu, 8 May 2025 23:12:14 -0400 Subject: [PATCH 06/26] gh-132969 Add more verbiage explaining the change --- Lib/concurrent/futures/process.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 45a6cb8080f7ec..74cb3504a8e16a 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -755,7 +755,8 @@ def _start_executor_manager_thread(self): self._executor_manager_thread_wakeup def _adjust_process_count(self): - # gh-132969 + # gh-132969: avoid error if shutdown(wait=False) is called and state is reset + # and leaving the executor still running if self._processes is None: return From 57265b5996ea9170c3d0fcecb2ef23d474c5c2cf Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Thu, 8 May 2025 23:13:15 -0400 Subject: [PATCH 07/26] gh-132969 Make test helper methods into private class methods, use @unittest.skipIf annotation to skip test on win32, add some more comments --- .../test_concurrent_futures/test_shutdown.py | 42 +++++++++++-------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index 5de43af99b209f..e0672355285a2d 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -20,16 +20,6 @@ def sleep_and_print(t, msg): sys.stdout.flush() -def failing_task_132969(n: int) -> int: - raise ValueError("failing task") - - -def good_task_132969(n: int) -> int: - time.sleep(0.1 * n) - return n - - - class ExecutorShutdownTest: def test_run_after_shutdown(self): self.executor.shutdown() @@ -340,7 +330,18 @@ def test_shutdown_no_wait(self): # shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) - def _run_test_issue_132969(self, max_workers: int) -> int: + @classmethod + def _failing_task_gh_132969(cls, n: int) -> int: + raise ValueError("failing task") + + @classmethod + def _good_task_gh_132969(cls, n: int) -> int: + time.sleep(0.1 * n) + return n + + + @unittest.skipIf(sys.platform == 'win32', 'Test does not run on Windows') + def _run_test_issue_gh_132969(self, max_workers: int) -> int: if sys.platform == "win32": raise unittest.SkipTest("skip test since forkserver is not available on Windows") @@ -357,9 +358,9 @@ def _run_test_issue_132969(self, max_workers: int) -> int: max_workers=max_workers, max_tasks_per_child=1, mp_context=mp.get_context("forkserver")) - f1 = executor.submit(good_task_132969, 1) - f2 = executor.submit(failing_task_132969, 2) - f3 = executor.submit(good_task_132969, 3) + f1 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 1) + f2 = executor.submit(ProcessPoolShutdownTest._failing_task_gh_132969, 2) + f3 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 3) result:int = 0 try: result += f1.result() @@ -372,12 +373,17 @@ def _run_test_issue_132969(self, max_workers: int) -> int: executor.shutdown(wait=False) return result - def test_shutdown_len_exception_132969(self): - result = self._run_test_issue_132969(2) + def test_shutdown_gh_132969_case_1(self): + # gh-132969: test that exception "object of type 'NoneType' has no len()" + # is not raised when shutdown(wait=False) is called. + result = self._run_test_issue_gh_132969(2) self.assertEqual(result, 1) - def test_shutdown_process_hang_132969(self): - result = self._run_test_issue_132969(4) + def test_shutdown_gh_132969_case_2(self): + # gh-132969: test that process does not hang and + # exception "object of type 'NoneType' has no len()" is not raised + # when shutdown(wait=False) is called. + result = self._run_test_issue_gh_132969(4) self.assertEqual(result, 1) From 709d9fd0d265267aefd6a72aa56d7e1c131387ee Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Thu, 8 May 2025 23:26:17 -0400 Subject: [PATCH 08/26] gh-132969 Fix Sphinx Lint error --- .../next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst index 590f6429fd1bcf..7fd1f4b4f889f6 100644 --- a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst +++ b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst @@ -1 +1 @@ -Prevent the `ProcessPoolExecutor` executor thread from tying to adjust worker processes after `shutdown` is called and object state has already been reset. Calling `shutdown` with `wait=False` resets the state of the `ProcessPoolExecutor` while still leaving the executor thread running. A combination of conditions including a worker process having terminated abormally and `shutdown(wait=False)` resulted in an exception when the executor thread tried to replace dead workers, and additionally the process could also potentially hang. +Prevent the ProcessPoolExecutor executor thread from tying to adjust worker processes after shutdown is called and object state has already been reset. Calling shutdown with wait=False resets the state of the ProcessPoolExecutor while still leaving the executor thread running. A combination of conditions including a worker process having terminated abormally and shutdown(wait=False) resulted in an exception when the executor thread tried to replace dead workers, and additionally the process could also potentially hang. From 1a8919bbb3e56430182c56a20dd4f386ef9dab73 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Fri, 9 May 2025 10:09:56 -0400 Subject: [PATCH 09/26] gh-132969: make unittest for gh-132969 runnable on windows by swiching to "spawn" --- Lib/test/test_concurrent_futures/test_shutdown.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index e0672355285a2d..fd71aa25e55555 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -340,11 +340,7 @@ def _good_task_gh_132969(cls, n: int) -> int: return n - @unittest.skipIf(sys.platform == 'win32', 'Test does not run on Windows') def _run_test_issue_gh_132969(self, max_workers: int) -> int: - if sys.platform == "win32": - raise unittest.SkipTest("skip test since forkserver is not available on Windows") - # max_workers=2 will repro exception # max_workers=4 will repro exception and then hang @@ -357,7 +353,7 @@ def _run_test_issue_gh_132969(self, max_workers: int) -> int: executor = futures.ProcessPoolExecutor( max_workers=max_workers, max_tasks_per_child=1, - mp_context=mp.get_context("forkserver")) + mp_context=mp.get_context("spawn")) f1 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 1) f2 = executor.submit(ProcessPoolShutdownTest._failing_task_gh_132969, 2) f3 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 3) @@ -373,12 +369,14 @@ def _run_test_issue_gh_132969(self, max_workers: int) -> int: executor.shutdown(wait=False) return result + @unittest.skipIf(sys.platform == 'win32', 'Test does not run on Windows') def test_shutdown_gh_132969_case_1(self): # gh-132969: test that exception "object of type 'NoneType' has no len()" # is not raised when shutdown(wait=False) is called. result = self._run_test_issue_gh_132969(2) self.assertEqual(result, 1) + @unittest.skipIf(sys.platform == 'win32', 'Test does not run on Windows') def test_shutdown_gh_132969_case_2(self): # gh-132969: test that process does not hang and # exception "object of type 'NoneType' has no len()" is not raised From 87eb62310d9ab0367409812bf4bae5b47adf3404 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Fri, 9 May 2025 14:30:27 -0400 Subject: [PATCH 10/26] gh-132969: Remove @unittest.skipIf annotations since spawn works on windows --- Lib/test/test_concurrent_futures/test_shutdown.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index fd71aa25e55555..9aa2ceac104984 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -369,14 +369,12 @@ def _run_test_issue_gh_132969(self, max_workers: int) -> int: executor.shutdown(wait=False) return result - @unittest.skipIf(sys.platform == 'win32', 'Test does not run on Windows') def test_shutdown_gh_132969_case_1(self): # gh-132969: test that exception "object of type 'NoneType' has no len()" # is not raised when shutdown(wait=False) is called. result = self._run_test_issue_gh_132969(2) self.assertEqual(result, 1) - @unittest.skipIf(sys.platform == 'win32', 'Test does not run on Windows') def test_shutdown_gh_132969_case_2(self): # gh-132969: test that process does not hang and # exception "object of type 'NoneType' has no len()" is not raised From ec2543a9a9a75ed76641511755222a5468205651 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Fri, 9 May 2025 14:38:44 -0400 Subject: [PATCH 11/26] gh-132969: Incorproate review suggestion to skip test when start_method is "fork" or ("forkserver" and windows) --- Lib/test/test_concurrent_futures/test_shutdown.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index 9aa2ceac104984..ee4f9ccbe494e7 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -350,6 +350,11 @@ def _run_test_issue_gh_132969(self, max_workers: int) -> int: # max_tasks_per_child=1 # a task ends abnormally # shutdown(wait=False) is called + start_method = self.get_context().get_start_method() + if (start_method == "fork" or + (start_method == "forkserver" and sys.platform.startswith("win"))): + raise unittest.SkipTest("Excluding tests with start_method " + "'fork' and 'forkserver' on Windows") executor = futures.ProcessPoolExecutor( max_workers=max_workers, max_tasks_per_child=1, From 3b3721f317f8d9398669d6de0f06bcecbaa9753c Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Fri, 9 May 2025 14:57:50 -0400 Subject: [PATCH 12/26] gh-132969: Remove import of multiprocessing and use self.get_context() as the mp_context --- Lib/test/test_concurrent_futures/test_shutdown.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index ee4f9ccbe494e7..2cdd5ebf86c4ef 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -344,8 +344,6 @@ def _run_test_issue_gh_132969(self, max_workers: int) -> int: # max_workers=2 will repro exception # max_workers=4 will repro exception and then hang - import multiprocessing as mp - # Repro conditions # max_tasks_per_child=1 # a task ends abnormally @@ -358,7 +356,7 @@ def _run_test_issue_gh_132969(self, max_workers: int) -> int: executor = futures.ProcessPoolExecutor( max_workers=max_workers, max_tasks_per_child=1, - mp_context=mp.get_context("spawn")) + mp_context=self.get_context()) f1 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 1) f2 = executor.submit(ProcessPoolShutdownTest._failing_task_gh_132969, 2) f3 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 3) From 7ef787245df87fdd95961ac815ca6cc606b10a41 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Fri, 9 May 2025 15:04:25 -0400 Subject: [PATCH 13/26] gh-132969: use self.skipTest instead of unittest.SkipTest, shorten message --- Lib/test/test_concurrent_futures/test_shutdown.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index 2cdd5ebf86c4ef..199caf1293848a 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -351,8 +351,7 @@ def _run_test_issue_gh_132969(self, max_workers: int) -> int: start_method = self.get_context().get_start_method() if (start_method == "fork" or (start_method == "forkserver" and sys.platform.startswith("win"))): - raise unittest.SkipTest("Excluding tests with start_method " - "'fork' and 'forkserver' on Windows") + raise self.skipTest(f"Excluding test with {start_method}=") executor = futures.ProcessPoolExecutor( max_workers=max_workers, max_tasks_per_child=1, From efd0a1db706cc179e4679deaf993cf987626fd0b Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Fri, 9 May 2025 15:10:41 -0400 Subject: [PATCH 14/26] gh-132969: removed type annotations --- Lib/test/test_concurrent_futures/test_shutdown.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index 199caf1293848a..9f7ccf1be25996 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -331,11 +331,11 @@ def test_shutdown_no_wait(self): assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) @classmethod - def _failing_task_gh_132969(cls, n: int) -> int: + def _failing_task_gh_132969(cls, n): raise ValueError("failing task") @classmethod - def _good_task_gh_132969(cls, n: int) -> int: + def _good_task_gh_132969(cls, n): time.sleep(0.1 * n) return n From b335d385ade212f3cd23b61255aaa4ad775df54e Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Fri, 9 May 2025 15:13:32 -0400 Subject: [PATCH 15/26] gh-132969: removed extraneous blank line --- Lib/test/test_concurrent_futures/test_shutdown.py | 1 - 1 file changed, 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index 9f7ccf1be25996..e77107a257668c 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -339,7 +339,6 @@ def _good_task_gh_132969(cls, n): time.sleep(0.1 * n) return n - def _run_test_issue_gh_132969(self, max_workers: int) -> int: # max_workers=2 will repro exception # max_workers=4 will repro exception and then hang From b0607868d7a5ce6e7425b90bc93ca135ae2d8981 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Fri, 9 May 2025 15:42:20 -0400 Subject: [PATCH 16/26] gh-132969: fix call to self.skipTest --- Lib/test/test_concurrent_futures/test_shutdown.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index e77107a257668c..f5607aea2a8376 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -350,7 +350,7 @@ def _run_test_issue_gh_132969(self, max_workers: int) -> int: start_method = self.get_context().get_start_method() if (start_method == "fork" or (start_method == "forkserver" and sys.platform.startswith("win"))): - raise self.skipTest(f"Excluding test with {start_method}=") + self.skipTest(f"Excluding test with {start_method}=") executor = futures.ProcessPoolExecutor( max_workers=max_workers, max_tasks_per_child=1, From c6457c395eb874dbf4e00c69d1381a77833f69f2 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Mon, 12 May 2025 10:05:39 -0400 Subject: [PATCH 17/26] gh-132969: Run gh-132969 tests only with "spawn" mp_context --- Lib/test/test_concurrent_futures/test_shutdown.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index f5607aea2a8376..3bd74eb97570b6 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -348,9 +348,8 @@ def _run_test_issue_gh_132969(self, max_workers: int) -> int: # a task ends abnormally # shutdown(wait=False) is called start_method = self.get_context().get_start_method() - if (start_method == "fork" or - (start_method == "forkserver" and sys.platform.startswith("win"))): - self.skipTest(f"Excluding test with {start_method}=") + if start_method != ProcessPoolSpawnMixin.ctx: + self.skipTest(f"Skipping test for {start_method}=") executor = futures.ProcessPoolExecutor( max_workers=max_workers, max_tasks_per_child=1, From 65f30b0d23abbaed0043836470c7698c9a939ff3 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Mon, 12 May 2025 12:06:23 -0400 Subject: [PATCH 18/26] gh-132969: Join executor_manager_thread after exercising shutdown(wait=False) to prevent resource leak when running CI --- Lib/test/test_concurrent_futures/test_shutdown.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index 3bd74eb97570b6..21c78d9f51e6f0 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -339,7 +339,7 @@ def _good_task_gh_132969(cls, n): time.sleep(0.1 * n) return n - def _run_test_issue_gh_132969(self, max_workers: int) -> int: + def _run_test_issue_gh_132969(self, max_workers): # max_workers=2 will repro exception # max_workers=4 will repro exception and then hang @@ -366,7 +366,12 @@ def _run_test_issue_gh_132969(self, max_workers: int) -> int: # stop processing results upon first exception pass + # Ensure that the executor cleans up after called + # shutdown with wait=False + executor_manager_thread = executor._executor_manager_thread executor.shutdown(wait=False) + time.sleep(0.2) + executor_manager_thread.join() return result def test_shutdown_gh_132969_case_1(self): From 54ae7b82b17418c7978b744e623a91ad6b3ccc6d Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Mon, 12 May 2025 13:21:25 -0400 Subject: [PATCH 19/26] gh-132969: fix typo in skip message --- Lib/test/test_concurrent_futures/test_shutdown.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index 21c78d9f51e6f0..0a07d4dd846a8e 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -349,7 +349,7 @@ def _run_test_issue_gh_132969(self, max_workers): # shutdown(wait=False) is called start_method = self.get_context().get_start_method() if start_method != ProcessPoolSpawnMixin.ctx: - self.skipTest(f"Skipping test for {start_method}=") + self.skipTest(f"Skipping test for {start_method = }") executor = futures.ProcessPoolExecutor( max_workers=max_workers, max_tasks_per_child=1, From e132f5e1a73c11f397b8475041fe54f5cf71c872 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Mon, 12 May 2025 14:12:55 -0400 Subject: [PATCH 20/26] gh-132969: Tweak the news entry for grammar and clarity. --- .../next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst index 7fd1f4b4f889f6..62f03e4035ff0f 100644 --- a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst +++ b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst @@ -1 +1 @@ -Prevent the ProcessPoolExecutor executor thread from tying to adjust worker processes after shutdown is called and object state has already been reset. Calling shutdown with wait=False resets the state of the ProcessPoolExecutor while still leaving the executor thread running. A combination of conditions including a worker process having terminated abormally and shutdown(wait=False) resulted in an exception when the executor thread tried to replace dead workers, and additionally the process could also potentially hang. +Prevent the ProcessPoolExecutor executor thread, which remains running when shutdown is called with wait=False, from attempting to adjust the pool's worker processes after the object state has already been reset during shutdown. A combination of conditions, including a worker process having terminated abormally, resulted in an exception and a potential hang when the still-running executor thread attempted to replace dead workers within the pool. From 99088104ac8d9fce3554f1b5d0d8e11994532230 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Mon, 12 May 2025 14:33:19 -0400 Subject: [PATCH 21/26] gh-132969: Run test with "spawn" on all platforms and "forkserver" if not windows --- Lib/test/test_concurrent_futures/test_shutdown.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index 0a07d4dd846a8e..6f20bb72771373 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -348,7 +348,8 @@ def _run_test_issue_gh_132969(self, max_workers): # a task ends abnormally # shutdown(wait=False) is called start_method = self.get_context().get_start_method() - if start_method != ProcessPoolSpawnMixin.ctx: + if (start_method == "fork" or + (start_method == "forkserver" and sys.platform.startswith("win"))): self.skipTest(f"Skipping test for {start_method = }") executor = futures.ProcessPoolExecutor( max_workers=max_workers, From f5f3dc219411f6430d3914e7f5670a2bb6dae790 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar <140011370+ogbiggles@users.noreply.github.com> Date: Mon, 2 Jun 2025 10:24:55 -0400 Subject: [PATCH 22/26] Update Lib/test/test_concurrent_futures/test_shutdown.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com> --- Lib/test/test_concurrent_futures/test_shutdown.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index 6f20bb72771373..99b315b47e2530 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -358,7 +358,7 @@ def _run_test_issue_gh_132969(self, max_workers): f1 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 1) f2 = executor.submit(ProcessPoolShutdownTest._failing_task_gh_132969, 2) f3 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 3) - result:int = 0 + result = 0 try: result += f1.result() result += f2.result() From 0fe743bd9a87479a32e04e715b9691d0a4ec81e4 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar <140011370+ogbiggles@users.noreply.github.com> Date: Mon, 2 Jun 2025 10:28:37 -0400 Subject: [PATCH 23/26] Update Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com> --- .../2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst index 62f03e4035ff0f..2ffef4264be0b7 100644 --- a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst +++ b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst @@ -1 +1,7 @@ -Prevent the ProcessPoolExecutor executor thread, which remains running when shutdown is called with wait=False, from attempting to adjust the pool's worker processes after the object state has already been reset during shutdown. A combination of conditions, including a worker process having terminated abormally, resulted in an exception and a potential hang when the still-running executor thread attempted to replace dead workers within the pool. +Prevent the :class:`~concurrent.futures.ProcessPoolExecutor` executor thread, +which remains running when :meth:`shutdown(wait=False) +`, from +attempting to adjust the pool's worker processes after the object state has already been reset during shutdown. +A combination of conditions, including a worker process having terminated abormally, +resulted in an exception and a potential hang when the still-running executor thread +attempted to replace dead workers within the pool. From 310765e399f881d79d984c25cbdf34b007791c7f Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Mon, 2 Jun 2025 10:34:12 -0400 Subject: [PATCH 24/26] gh-132969: fixed typo in reference to shutdown method --- .../next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst index 2ffef4264be0b7..c474a4450eed7b 100644 --- a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst +++ b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst @@ -1,6 +1,6 @@ Prevent the :class:`~concurrent.futures.ProcessPoolExecutor` executor thread, which remains running when :meth:`shutdown(wait=False) -`, from +`, from attempting to adjust the pool's worker processes after the object state has already been reset during shutdown. A combination of conditions, including a worker process having terminated abormally, resulted in an exception and a potential hang when the still-running executor thread From 86d0c2a61efffcfaa0c782b99269434e7c036742 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Mon, 2 Jun 2025 16:20:38 -0400 Subject: [PATCH 25/26] gh-132969: fixed doc reference to shutdown method --- .../next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst index c474a4450eed7b..7364c425941233 100644 --- a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst +++ b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst @@ -1,6 +1,6 @@ Prevent the :class:`~concurrent.futures.ProcessPoolExecutor` executor thread, which remains running when :meth:`shutdown(wait=False) -`, from +`, from attempting to adjust the pool's worker processes after the object state has already been reset during shutdown. A combination of conditions, including a worker process having terminated abormally, resulted in an exception and a potential hang when the still-running executor thread From 4f468755218b1bf3a90ca7a72d0ac67cbeb97be1 Mon Sep 17 00:00:00 2001 From: Ajay Kamdar Date: Mon, 2 Jun 2025 17:30:45 -0400 Subject: [PATCH 26/26] gh-132969: Tweak comment for readability --- Lib/concurrent/futures/process.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 74cb3504a8e16a..a14650bf5fa47c 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -755,8 +755,8 @@ def _start_executor_manager_thread(self): self._executor_manager_thread_wakeup def _adjust_process_count(self): - # gh-132969: avoid error if shutdown(wait=False) is called and state is reset - # and leaving the executor still running + # gh-132969: avoid error when state is reset and executor is still running, + # which will happen when shutdown(wait=False) is called. if self._processes is None: return