From d414ed266bfc747d03ea9044ea0f662e2b388f1c Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 13 Oct 2023 02:21:48 +0200 Subject: [PATCH 01/10] gh-110205: Fix asyncio ThreadedChildWatcher._join_threads() ThreadedChildWatcher._join_threads() now clears references to completed threads. test_asyncio.utils.TestCase now calls _join_threads() of the watcher, uses SHORT_TIMEOUT to join a thread, and then raises an exception if there are still running threads. Rename also ThreadedChildWatcher threads to add "asyncio-" prefix to the name. --- Lib/asyncio/unix_events.py | 10 +++++++--- Lib/test/test_asyncio/utils.py | 9 ++++++--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 65f0923264d14e..d7362be2535752 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1373,12 +1373,16 @@ def is_active(self): def close(self): self._join_threads() - def _join_threads(self): + def _join_threads(self, timeout=None): """Internal: Join all non-daemon threads""" threads = [thread for thread in list(self._threads.values()) if thread.is_alive() and not thread.daemon] for thread in threads: - thread.join() + thread.join(timeout) + + # Clear references to terminated threads + self.threads = [thread for thread in list(self._threads.values()) + if thread.is_alive() and not thread.daemon] def __enter__(self): return self @@ -1397,7 +1401,7 @@ def __del__(self, _warn=warnings.warn): def add_child_handler(self, pid, callback, *args): loop = events.get_running_loop() thread = threading.Thread(target=self._do_waitpid, - name=f"waitpid-{next(self._pid_counter)}", + name=f"asyncio-waitpid-{next(self._pid_counter)}", args=(loop, pid, callback, args), daemon=True) self._threads[pid] = thread diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py index e1101bf42eb24e..a952fb2970c38e 100644 --- a/Lib/test/test_asyncio/utils.py +++ b/Lib/test/test_asyncio/utils.py @@ -546,6 +546,7 @@ def close_loop(loop): else: loop._default_executor.shutdown(wait=True) loop.close() + policy = support.maybe_get_event_loop_policy() if policy is not None: try: @@ -557,9 +558,11 @@ def close_loop(loop): pass else: if isinstance(watcher, asyncio.ThreadedChildWatcher): - threads = list(watcher._threads.values()) - for thread in threads: - thread.join() + watcher._join_threads(timeout=support.SHORT_TIMEOUT) + threads = watcher._threads + if threads: + self.fail(f"watcher still has running threads: " + f"{threads}") def set_event_loop(self, loop, *, cleanup=True): if loop is None: From f161daafc1d9be2a96c6486f3924dabfb63eda76 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 13 Oct 2023 11:00:09 +0200 Subject: [PATCH 02/10] Don't create a new list --- Lib/asyncio/unix_events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index d7362be2535752..dfa03946653c4a 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1381,8 +1381,8 @@ def _join_threads(self, timeout=None): thread.join(timeout) # Clear references to terminated threads - self.threads = [thread for thread in list(self._threads.values()) - if thread.is_alive() and not thread.daemon] + self.threads[:] = [thread for thread in list(self._threads.values()) + if thread.is_alive() and not thread.daemon] def __enter__(self): return self From dbbad2d163f835e06514a465d33b3d34c0d42c58 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Fri, 13 Oct 2023 16:21:35 +0200 Subject: [PATCH 03/10] Fix typo: self.threads => self._threads --- Lib/asyncio/unix_events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index dfa03946653c4a..a089087023a629 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1381,8 +1381,8 @@ def _join_threads(self, timeout=None): thread.join(timeout) # Clear references to terminated threads - self.threads[:] = [thread for thread in list(self._threads.values()) - if thread.is_alive() and not thread.daemon] + self._threads[:] = [thread for thread in list(self._threads.values()) + if thread.is_alive() and not thread.daemon] def __enter__(self): return self From 85b79b2ad222a54e7d68778154dc1bf68969967f Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Sat, 14 Oct 2023 23:35:13 +0200 Subject: [PATCH 04/10] Fix code to clear references to terminated threads --- Lib/asyncio/unix_events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index a089087023a629..f732af7d6c4ca0 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1381,8 +1381,8 @@ def _join_threads(self, timeout=None): thread.join(timeout) # Clear references to terminated threads - self._threads[:] = [thread for thread in list(self._threads.values()) - if thread.is_alive() and not thread.daemon] + self._threads = {key: thread for key, thread in self._threads.items() + if thread.daemon or thread.is_alive()} def __enter__(self): return self From a8ffeee577bc677711c2cfb5ffaed38a0d455b99 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Sun, 15 Oct 2023 00:04:38 +0200 Subject: [PATCH 05/10] Make close_loop an instance method so it can call self.fail() --- Lib/test/test_asyncio/utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py index a952fb2970c38e..02fb2876a9d942 100644 --- a/Lib/test/test_asyncio/utils.py +++ b/Lib/test/test_asyncio/utils.py @@ -538,8 +538,7 @@ def get_function_source(func): class TestCase(unittest.TestCase): - @staticmethod - def close_loop(loop): + def close_loop(self, loop): if loop._default_executor is not None: if not loop.is_closed(): loop.run_until_complete(loop.shutdown_default_executor()) From bda45c8aaf44dc94ea5070078fac0c23b2dd6218 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Sun, 15 Oct 2023 00:15:18 +0200 Subject: [PATCH 06/10] Filter out dead or daemon threads --- Lib/test/test_asyncio/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py index 02fb2876a9d942..d9edd50265479b 100644 --- a/Lib/test/test_asyncio/utils.py +++ b/Lib/test/test_asyncio/utils.py @@ -558,7 +558,8 @@ def close_loop(self, loop): else: if isinstance(watcher, asyncio.ThreadedChildWatcher): watcher._join_threads(timeout=support.SHORT_TIMEOUT) - threads = watcher._threads + threads = {key: thread for key, thread in watcher._threads.items() + if thread.is_alive() and not thread.daemon} if threads: self.fail(f"watcher still has running threads: " f"{threads}") From 566fb5294ec9fdf93aa01c6ed246229f9c544be9 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Sun, 15 Oct 2023 02:20:09 +0200 Subject: [PATCH 07/10] Back to basics --- Lib/asyncio/unix_events.py | 9 +-------- Lib/test/test_asyncio/utils.py | 11 ++++------- 2 files changed, 5 insertions(+), 15 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 4bbff6fa89b269..b81b4666883bbc 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1371,14 +1371,7 @@ def is_active(self): return True def close(self): - self._join_threads() - - def _join_threads(self, timeout=None): - """Internal: Join all non-daemon threads""" - threads = [thread for thread in list(self._threads.values()) - if thread.is_alive() and not thread.daemon] - for thread in threads: - thread.join(timeout) + pass # Clear references to terminated threads self._threads = {key: thread for key, thread in self._threads.items() diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py index d9edd50265479b..1838915fe28261 100644 --- a/Lib/test/test_asyncio/utils.py +++ b/Lib/test/test_asyncio/utils.py @@ -538,7 +538,8 @@ def get_function_source(func): class TestCase(unittest.TestCase): - def close_loop(self, loop): + @staticmethod + def close_loop(loop): if loop._default_executor is not None: if not loop.is_closed(): loop.run_until_complete(loop.shutdown_default_executor()) @@ -557,12 +558,8 @@ def close_loop(self, loop): pass else: if isinstance(watcher, asyncio.ThreadedChildWatcher): - watcher._join_threads(timeout=support.SHORT_TIMEOUT) - threads = {key: thread for key, thread in watcher._threads.items() - if thread.is_alive() and not thread.daemon} - if threads: - self.fail(f"watcher still has running threads: " - f"{threads}") + for thread in list(watcher._threads.values()): + thread.join(timeout=support.SHORT_TIMEOUT) def set_event_loop(self, loop, *, cleanup=True): if loop is None: From f8090c14f1c65af9bfdf326929804ade56c393ae Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Sun, 15 Oct 2023 09:43:08 +0200 Subject: [PATCH 08/10] Delete stray code --- Lib/asyncio/unix_events.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index b81b4666883bbc..c9441918ecfcfd 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1373,10 +1373,6 @@ def is_active(self): def close(self): pass - # Clear references to terminated threads - self._threads = {key: thread for key, thread in self._threads.items() - if thread.daemon or thread.is_alive()} - def __enter__(self): return self From aed1a89f569bf3b5c52bed08944f6d397c39045d Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Sun, 15 Oct 2023 11:24:30 +0200 Subject: [PATCH 09/10] Add a comment --- Lib/test/test_asyncio/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py index 1838915fe28261..c6e419da9f4295 100644 --- a/Lib/test/test_asyncio/utils.py +++ b/Lib/test/test_asyncio/utils.py @@ -558,6 +558,7 @@ def close_loop(loop): pass else: if isinstance(watcher, asyncio.ThreadedChildWatcher): + # Wait for subprocess to finish, but not forever for thread in list(watcher._threads.values()): thread.join(timeout=support.SHORT_TIMEOUT) From 50cab381b1c757c1bd0603fffbd82c442e984d82 Mon Sep 17 00:00:00 2001 From: Guido van Rossum Date: Wed, 25 Oct 2023 21:09:56 -0700 Subject: [PATCH 10/10] Raise error if watcher thread won't die --- Lib/test/test_asyncio/utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py index b527a427c833b1..44943e1fa7bc4e 100644 --- a/Lib/test/test_asyncio/utils.py +++ b/Lib/test/test_asyncio/utils.py @@ -561,6 +561,10 @@ def close_loop(loop): # Wait for subprocess to finish, but not forever for thread in list(watcher._threads.values()): thread.join(timeout=support.SHORT_TIMEOUT) + if thread.is_alive(): + raise RuntimeError(f"thread {thread} still alive: " + "subprocess still running") + def set_event_loop(self, loop, *, cleanup=True): if loop is None: