diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index c4d0ca81e7034a..c53092f6e34b32 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -20,6 +20,7 @@ import sys import threading import warnings +from collections import deque from . import spawn from . import util @@ -62,6 +63,7 @@ def __init__(self): self._fd = None self._pid = None self._exitcode = None + self._reentrant_messages = deque() def _reentrant_call_error(self): # gh-109629: this happens if an explicit call to the ResourceTracker @@ -98,7 +100,7 @@ def _stop_locked( # This shouldn't happen (it might when called by a finalizer) # so we check for it anyway. if self._lock._recursion_count() > 1: - return self._reentrant_call_error() + raise self._reentrant_call_error() if self._fd is None: # not running return @@ -128,69 +130,99 @@ def ensure_running(self): This can be run from any process. Usually a child process will use the resource created by its parent.''' + return self._ensure_running_and_write() + + def _teardown_dead_process(self): + os.close(self._fd) + + # Clean-up to avoid dangling processes. + try: + # _pid can be None if this process is a child from another + # python process, which has started the resource_tracker. + if self._pid is not None: + os.waitpid(self._pid, 0) + except ChildProcessError: + # The resource_tracker has already been terminated. + pass + self._fd = None + self._pid = None + self._exitcode = None + + warnings.warn('resource_tracker: process died unexpectedly, ' + 'relaunching. Some resources might leak.') + + def _launch(self): + fds_to_pass = [] + try: + fds_to_pass.append(sys.stderr.fileno()) + except Exception: + pass + r, w = os.pipe() + try: + fds_to_pass.append(r) + # process will out live us, so no need to wait on pid + exe = spawn.get_executable() + args = [ + exe, + *util._args_from_interpreter_flags(), + '-c', + f'from multiprocessing.resource_tracker import main;main({r})', + ] + # bpo-33613: Register a signal mask that will block the signals. + # This signal mask will be inherited by the child that is going + # to be spawned and will protect the child from a race condition + # that can make the child die before it registers signal handlers + # for SIGINT and SIGTERM. The mask is unregistered after spawning + # the child. + prev_sigmask = None + try: + if _HAVE_SIGMASK: + prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) + pid = util.spawnv_passfds(exe, args, fds_to_pass) + finally: + if prev_sigmask is not None: + signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask) + except: + os.close(w) + raise + else: + self._fd = w + self._pid = pid + finally: + os.close(r) + + def _ensure_running_and_write(self, msg=None): with self._lock: if self._lock._recursion_count() > 1: # The code below is certainly not reentrant-safe, so bail out - return self._reentrant_call_error() + if msg is None: + raise self._reentrant_call_error() + return self._reentrant_messages.append(msg) + if self._fd is not None: # resource tracker was launched before, is it still running? - if self._check_alive(): - # => still alive - return - # => dead, launch it again - os.close(self._fd) - - # Clean-up to avoid dangling processes. + if msg is None: + to_send = b'PROBE:0:noop\n' + else: + to_send = msg try: - # _pid can be None if this process is a child from another - # python process, which has started the resource_tracker. - if self._pid is not None: - os.waitpid(self._pid, 0) - except ChildProcessError: - # The resource_tracker has already been terminated. - pass - self._fd = None - self._pid = None - self._exitcode = None + self._write(to_send) + except OSError: + self._teardown_dead_process() + self._launch() - warnings.warn('resource_tracker: process died unexpectedly, ' - 'relaunching. Some resources might leak.') + msg = None # message was sent in probe + else: + self._launch() - fds_to_pass = [] + while True: try: - fds_to_pass.append(sys.stderr.fileno()) - except Exception: - pass - cmd = 'from multiprocessing.resource_tracker import main;main(%d)' - r, w = os.pipe() - try: - fds_to_pass.append(r) - # process will out live us, so no need to wait on pid - exe = spawn.get_executable() - args = [exe] + util._args_from_interpreter_flags() - args += ['-c', cmd % r] - # bpo-33613: Register a signal mask that will block the signals. - # This signal mask will be inherited by the child that is going - # to be spawned and will protect the child from a race condition - # that can make the child die before it registers signal handlers - # for SIGINT and SIGTERM. The mask is unregistered after spawning - # the child. - prev_sigmask = None - try: - if _HAVE_SIGMASK: - prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS) - pid = util.spawnv_passfds(exe, args, fds_to_pass) - finally: - if prev_sigmask is not None: - signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask) - except: - os.close(w) - raise - else: - self._fd = w - self._pid = pid - finally: - os.close(r) + reentrant_msg = self._reentrant_messages.popleft() + except IndexError: + break + self._write(reentrant_msg) + if msg is not None: + self._write(msg) def _check_alive(self): '''Check that the pipe has not been closed by sending a probe.''' @@ -211,27 +243,18 @@ def unregister(self, name, rtype): '''Unregister name of resource with resource tracker.''' self._send('UNREGISTER', name, rtype) + def _write(self, msg): + nbytes = os.write(self._fd, msg) + assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}" + def _send(self, cmd, name, rtype): - try: - self.ensure_running() - except ReentrantCallError: - # The code below might or might not work, depending on whether - # the resource tracker was already running and still alive. - # Better warn the user. - # (XXX is warnings.warn itself reentrant-safe? :-) - warnings.warn( - f"ResourceTracker called reentrantly for resource cleanup, " - f"which is unsupported. " - f"The {rtype} object {name!r} might leak.") - msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii') + msg = f"{cmd}:{name}:{rtype}\n".encode("ascii") if len(msg) > 512: # posix guarantees that writes to a pipe of less than PIPE_BUF # bytes are atomic, and that PIPE_BUF >= 512 raise ValueError('msg too long') - nbytes = os.write(self._fd, msg) - assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format( - nbytes, len(msg)) + self._ensure_running_and_write(msg) _resource_tracker = ResourceTracker() ensure_running = _resource_tracker.ensure_running diff --git a/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst b/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst new file mode 100644 index 00000000000000..525802405bd8bd --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst @@ -0,0 +1 @@ +Make ``ResourceTracker.send`` from :mod:`multiprocessing` re-entrant safe