Skip to content

gh-110205: Fix asyncio ThreadedChildWatcher._join_threads() #110884

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Oct 27, 2023
11 changes: 2 additions & 9 deletions Lib/asyncio/unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1371,14 +1371,7 @@ def is_active(self):
return True

def close(self):
self._join_threads()

def _join_threads(self):
"""Internal: Join all non-daemon threads"""
threads = [thread for thread in list(self._threads.values())
if thread.is_alive() and not thread.daemon]
for thread in threads:
thread.join()
pass

def __enter__(self):
return self
Expand All @@ -1397,7 +1390,7 @@ def __del__(self, _warn=warnings.warn):
def add_child_handler(self, pid, callback, *args):
loop = events.get_running_loop()
thread = threading.Thread(target=self._do_waitpid,
name=f"waitpid-{next(self._pid_counter)}",
name=f"asyncio-waitpid-{next(self._pid_counter)}",
args=(loop, pid, callback, args),
daemon=True)
self._threads[pid] = thread
Expand Down
11 changes: 8 additions & 3 deletions Lib/test/test_asyncio/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ def close_loop(loop):
else:
loop._default_executor.shutdown(wait=True)
loop.close()

policy = support.maybe_get_event_loop_policy()
if policy is not None:
try:
Expand All @@ -557,9 +558,13 @@ def close_loop(loop):
pass
else:
if isinstance(watcher, asyncio.ThreadedChildWatcher):
threads = list(watcher._threads.values())
for thread in threads:
thread.join()
# Wait for subprocess to finish, but not forever
for thread in list(watcher._threads.values()):
thread.join(timeout=support.SHORT_TIMEOUT)
if thread.is_alive():
raise RuntimeError(f"thread {thread} still alive: "
"subprocess still running")


def set_event_loop(self, loop, *, cleanup=True):
if loop is None:
Expand Down