Skip to content

gh-131788: make resource_tracker re-entrant safe #131787

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Aug 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 93 additions & 70 deletions Lib/multiprocessing/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys
import threading
import warnings
from collections import deque

from . import spawn
from . import util
Expand Down Expand Up @@ -62,6 +63,7 @@ def __init__(self):
self._fd = None
self._pid = None
self._exitcode = None
self._reentrant_messages = deque()

def _reentrant_call_error(self):
# gh-109629: this happens if an explicit call to the ResourceTracker
Expand Down Expand Up @@ -98,7 +100,7 @@ def _stop_locked(
# This shouldn't happen (it might when called by a finalizer)
# so we check for it anyway.
if self._lock._recursion_count() > 1:
return self._reentrant_call_error()
raise self._reentrant_call_error()
if self._fd is None:
# not running
return
Expand Down Expand Up @@ -128,69 +130,99 @@ def ensure_running(self):

This can be run from any process. Usually a child process will use
the resource created by its parent.'''
return self._ensure_running_and_write()

def _teardown_dead_process(self):
os.close(self._fd)

# Clean-up to avoid dangling processes.
try:
# _pid can be None if this process is a child from another
# python process, which has started the resource_tracker.
if self._pid is not None:
os.waitpid(self._pid, 0)
except ChildProcessError:
# The resource_tracker has already been terminated.
pass
self._fd = None
self._pid = None
self._exitcode = None

warnings.warn('resource_tracker: process died unexpectedly, '
'relaunching. Some resources might leak.')

def _launch(self):
fds_to_pass = []
try:
fds_to_pass.append(sys.stderr.fileno())
except Exception:
pass
r, w = os.pipe()
try:
fds_to_pass.append(r)
# process will out live us, so no need to wait on pid
exe = spawn.get_executable()
args = [
exe,
*util._args_from_interpreter_flags(),
'-c',
f'from multiprocessing.resource_tracker import main;main({r})',
]
# bpo-33613: Register a signal mask that will block the signals.
# This signal mask will be inherited by the child that is going
# to be spawned and will protect the child from a race condition
# that can make the child die before it registers signal handlers
# for SIGINT and SIGTERM. The mask is unregistered after spawning
# the child.
prev_sigmask = None
try:
if _HAVE_SIGMASK:
prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
pid = util.spawnv_passfds(exe, args, fds_to_pass)
finally:
if prev_sigmask is not None:
signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
except:
os.close(w)
raise
else:
self._fd = w
self._pid = pid
finally:
os.close(r)

def _ensure_running_and_write(self, msg=None):
with self._lock:
if self._lock._recursion_count() > 1:
# The code below is certainly not reentrant-safe, so bail out
return self._reentrant_call_error()
if msg is None:
raise self._reentrant_call_error()
return self._reentrant_messages.append(msg)

if self._fd is not None:
# resource tracker was launched before, is it still running?
if self._check_alive():
# => still alive
return
# => dead, launch it again
os.close(self._fd)

# Clean-up to avoid dangling processes.
if msg is None:
to_send = b'PROBE:0:noop\n'
else:
to_send = msg
try:
# _pid can be None if this process is a child from another
# python process, which has started the resource_tracker.
if self._pid is not None:
os.waitpid(self._pid, 0)
except ChildProcessError:
# The resource_tracker has already been terminated.
pass
self._fd = None
self._pid = None
self._exitcode = None
self._write(to_send)
except OSError:
self._teardown_dead_process()
self._launch()

warnings.warn('resource_tracker: process died unexpectedly, '
'relaunching. Some resources might leak.')
msg = None # message was sent in probe
else:
self._launch()

fds_to_pass = []
while True:
try:
fds_to_pass.append(sys.stderr.fileno())
except Exception:
pass
cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
r, w = os.pipe()
try:
fds_to_pass.append(r)
# process will out live us, so no need to wait on pid
exe = spawn.get_executable()
args = [exe] + util._args_from_interpreter_flags()
args += ['-c', cmd % r]
# bpo-33613: Register a signal mask that will block the signals.
# This signal mask will be inherited by the child that is going
# to be spawned and will protect the child from a race condition
# that can make the child die before it registers signal handlers
# for SIGINT and SIGTERM. The mask is unregistered after spawning
# the child.
prev_sigmask = None
try:
if _HAVE_SIGMASK:
prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
pid = util.spawnv_passfds(exe, args, fds_to_pass)
finally:
if prev_sigmask is not None:
signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
except:
os.close(w)
raise
else:
self._fd = w
self._pid = pid
finally:
os.close(r)
reentrant_msg = self._reentrant_messages.popleft()
except IndexError:
break
self._write(reentrant_msg)
if msg is not None:
self._write(msg)

def _check_alive(self):
'''Check that the pipe has not been closed by sending a probe.'''
Expand All @@ -211,27 +243,18 @@ def unregister(self, name, rtype):
'''Unregister name of resource with resource tracker.'''
self._send('UNREGISTER', name, rtype)

def _write(self, msg):
nbytes = os.write(self._fd, msg)
assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"

def _send(self, cmd, name, rtype):
try:
self.ensure_running()
except ReentrantCallError:
# The code below might or might not work, depending on whether
# the resource tracker was already running and still alive.
# Better warn the user.
# (XXX is warnings.warn itself reentrant-safe? :-)
warnings.warn(
f"ResourceTracker called reentrantly for resource cleanup, "
f"which is unsupported. "
f"The {rtype} object {name!r} might leak.")
msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
if len(msg) > 512:
# posix guarantees that writes to a pipe of less than PIPE_BUF
# bytes are atomic, and that PIPE_BUF >= 512
raise ValueError('msg too long')
nbytes = os.write(self._fd, msg)
assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
nbytes, len(msg))

self._ensure_running_and_write(msg)

_resource_tracker = ResourceTracker()
ensure_running = _resource_tracker.ensure_running
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Make ``ResourceTracker.send`` from :mod:`multiprocessing` re-entrant safe
Loading