diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index cb0b8be55ecced..0dee8303ba24fb 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -747,6 +747,11 @@ def _start_executor_manager_thread(self): self._executor_manager_thread_wakeup def _adjust_process_count(self): + # gh-132969: avoid error when state is reset and executor is still running, + # which will happen when shutdown(wait=False) is called. + if self._processes is None: + return + # if there's an idle process, we don't need to spawn a new one. if self._idle_worker_semaphore.acquire(blocking=False): return diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py b/Lib/test/test_concurrent_futures/test_shutdown.py index 7a4065afd46fc8..99b315b47e2530 100644 --- a/Lib/test/test_concurrent_futures/test_shutdown.py +++ b/Lib/test/test_concurrent_futures/test_shutdown.py @@ -330,6 +330,64 @@ def test_shutdown_no_wait(self): # shutdown. assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) + @classmethod + def _failing_task_gh_132969(cls, n): + raise ValueError("failing task") + + @classmethod + def _good_task_gh_132969(cls, n): + time.sleep(0.1 * n) + return n + + def _run_test_issue_gh_132969(self, max_workers): + # max_workers=2 will repro exception + # max_workers=4 will repro exception and then hang + + # Repro conditions + # max_tasks_per_child=1 + # a task ends abnormally + # shutdown(wait=False) is called + start_method = self.get_context().get_start_method() + if (start_method == "fork" or + (start_method == "forkserver" and sys.platform.startswith("win"))): + self.skipTest(f"Skipping test for {start_method = }") + executor = futures.ProcessPoolExecutor( + max_workers=max_workers, + max_tasks_per_child=1, + mp_context=self.get_context()) + f1 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 1) + f2 = executor.submit(ProcessPoolShutdownTest._failing_task_gh_132969, 2) + f3 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 3) + result = 0 + try: + result += f1.result() + result += f2.result() + result += f3.result() + except ValueError: + # stop processing results upon first exception + pass + + # Ensure that the executor cleans up after called + # shutdown with wait=False + executor_manager_thread = executor._executor_manager_thread + executor.shutdown(wait=False) + time.sleep(0.2) + executor_manager_thread.join() + return result + + def test_shutdown_gh_132969_case_1(self): + # gh-132969: test that exception "object of type 'NoneType' has no len()" + # is not raised when shutdown(wait=False) is called. + result = self._run_test_issue_gh_132969(2) + self.assertEqual(result, 1) + + def test_shutdown_gh_132969_case_2(self): + # gh-132969: test that process does not hang and + # exception "object of type 'NoneType' has no len()" is not raised + # when shutdown(wait=False) is called. + result = self._run_test_issue_gh_132969(4) + self.assertEqual(result, 1) + create_executor_tests(globals(), ProcessPoolShutdownTest, executor_mixins=(ProcessPoolForkMixin, diff --git a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst new file mode 100644 index 00000000000000..7364c425941233 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst @@ -0,0 +1,7 @@ +Prevent the :class:`~concurrent.futures.ProcessPoolExecutor` executor thread, +which remains running when :meth:`shutdown(wait=False) +`, from +attempting to adjust the pool's worker processes after the object state has already been reset during shutdown. +A combination of conditions, including a worker process having terminated abormally, +resulted in an exception and a potential hang when the still-running executor thread +attempted to replace dead workers within the pool.