Skip to content

[3.13] gh-132969: Fix error/hang when shutdown(wait=False) and task exited abnormally (GH-133222) #135343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: 3.13
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -747,6 +747,11 @@ def _start_executor_manager_thread(self):
self._executor_manager_thread_wakeup

def _adjust_process_count(self):
# gh-132969: avoid error when state is reset and executor is still running,
# which will happen when shutdown(wait=False) is called.
if self._processes is None:
return

# if there's an idle process, we don't need to spawn a new one.
if self._idle_worker_semaphore.acquire(blocking=False):
return
Expand Down
58 changes: 58 additions & 0 deletions Lib/test/test_concurrent_futures/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,64 @@ def test_shutdown_no_wait(self):
# shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])

@classmethod
def _failing_task_gh_132969(cls, n):
raise ValueError("failing task")

@classmethod
def _good_task_gh_132969(cls, n):
time.sleep(0.1 * n)
return n

def _run_test_issue_gh_132969(self, max_workers):
# max_workers=2 will repro exception
# max_workers=4 will repro exception and then hang

# Repro conditions
# max_tasks_per_child=1
# a task ends abnormally
# shutdown(wait=False) is called
start_method = self.get_context().get_start_method()
if (start_method == "fork" or
(start_method == "forkserver" and sys.platform.startswith("win"))):
self.skipTest(f"Skipping test for {start_method = }")
executor = futures.ProcessPoolExecutor(
max_workers=max_workers,
max_tasks_per_child=1,
mp_context=self.get_context())
f1 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 1)
f2 = executor.submit(ProcessPoolShutdownTest._failing_task_gh_132969, 2)
f3 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 3)
result = 0
try:
result += f1.result()
result += f2.result()
result += f3.result()
except ValueError:
# stop processing results upon first exception
pass

# Ensure that the executor cleans up after called
# shutdown with wait=False
executor_manager_thread = executor._executor_manager_thread
executor.shutdown(wait=False)
time.sleep(0.2)
executor_manager_thread.join()
return result

def test_shutdown_gh_132969_case_1(self):
# gh-132969: test that exception "object of type 'NoneType' has no len()"
# is not raised when shutdown(wait=False) is called.
result = self._run_test_issue_gh_132969(2)
self.assertEqual(result, 1)

def test_shutdown_gh_132969_case_2(self):
# gh-132969: test that process does not hang and
# exception "object of type 'NoneType' has no len()" is not raised
# when shutdown(wait=False) is called.
result = self._run_test_issue_gh_132969(4)
self.assertEqual(result, 1)


create_executor_tests(globals(), ProcessPoolShutdownTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Prevent the :class:`~concurrent.futures.ProcessPoolExecutor` executor thread,
which remains running when :meth:`shutdown(wait=False)
<concurrent.futures.Executor.shutdown>`, from
attempting to adjust the pool's worker processes after the object state has already been reset during shutdown.
A combination of conditions, including a worker process having terminated abormally,
resulted in an exception and a potential hang when the still-running executor thread
attempted to replace dead workers within the pool.
Loading