Skip to content

gh-109917: Fix concurrent.future ProcessPoolExecutor locks #110137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 39 additions & 31 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,37 +66,54 @@


class _ThreadWakeup:
# Constant overriden by tests to make them faster
_wakeup_msg = b'x'

def __init__(self):
self._closed = False
self._reader, self._writer = mp.Pipe(duplex=False)
# True if self._wakeup_msg was sent to self._writer once.
# Cleared by clear() method.
self._awaken = False
# Lock to protect _ThreadWakeup and make it thread safe. Use the lock
# to serialize method calls to make sure that the reader and the writer
# remains usable (at not closed) during a method call.
self._lock = threading.Lock()

def close(self):
# Please note that we do not take the shutdown lock when
# calling clear() (to avoid deadlocking) so this method can
# only be called safely from the same thread as all calls to
# clear() even if you hold the shutdown lock. Otherwise we
# might try to read from the closed pipe.
if not self._closed:
with self._lock:
if self._closed:
return
self._closed = True
self._writer.close()
self._reader.close()

def wakeup(self):
if not self._closed:
self._writer.send_bytes(b"")
with self._lock:
if self._closed:
return
if self._awaken:
# gh-105829: Send a single message to not block if the pipe is
# full. wait_result_broken_or_wakeup() ignores the message anyway,
# it just calls clear().
return
self._awaken = True
self._writer.send_bytes(self._wakeup_msg)

def clear(self):
if not self._closed:
with self._lock:
if self._closed:
return
while self._reader.poll():
self._reader.recv_bytes()
self._awaken = False


def _python_exit():
global _global_shutdown
_global_shutdown = True
items = list(_threads_wakeups.items())
for _, thread_wakeup in items:
# call not protected by ProcessPoolExecutor._shutdown_lock
thread_wakeup.wakeup()
for t, _ in items:
t.join()
Expand Down Expand Up @@ -167,10 +184,8 @@ def __init__(self, work_id, fn, args, kwargs):

class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
thread_wakeup):
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
self.pending_work_items = pending_work_items
self.shutdown_lock = shutdown_lock
self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)

Expand All @@ -179,8 +194,7 @@ def _on_queue_feeder_error(self, e, obj):
tb = format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
with self.shutdown_lock:
self.thread_wakeup.wakeup()
self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this
# case, the executor_manager_thread fails all work_items
# with BrokenProcessPool
Expand Down Expand Up @@ -304,13 +318,10 @@ def __init__(self, executor):
# When the executor gets garbage collected, the weakref callback
# will wake up the queue management thread so that it can terminate
# if there is no pending work item.
def weakref_cb(_,
thread_wakeup=self.thread_wakeup,
shutdown_lock=self.shutdown_lock):
def weakref_cb(_, thread_wakeup=self.thread_wakeup):
mp.util.debug('Executor collected: triggering callback for'
' QueueManager wakeup')
with shutdown_lock:
thread_wakeup.wakeup()
thread_wakeup.wakeup()

self.executor_reference = weakref.ref(executor, weakref_cb)

Expand Down Expand Up @@ -438,11 +449,6 @@ def wait_result_broken_or_wakeup(self):
elif wakeup_reader in ready:
is_broken = False

# No need to hold the _shutdown_lock here because:
# 1. we're the only thread to use the wakeup reader
# 2. we're also the only thread to call thread_wakeup.close()
# 3. we want to avoid a possible deadlock when both reader and writer
# would block (gh-105829)
self.thread_wakeup.clear()

return result_item, is_broken, cause
Expand Down Expand Up @@ -714,6 +720,12 @@ def __init__(self, max_workers=None, mp_context=None,
# Map of pids to processes
self._processes = {}

# The shutdown lock protects ProcessPoolExecutor. It is used to shut
# down the pool and to mark the pool as broken in a reliable way. It
# makes sure that pool remains usable during submit(). It is also used
# by _ExecutorManagerThread to adjust the process count if a worker
# process exited.
#
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_lock = threading.Lock()
Expand All @@ -729,11 +741,6 @@ def __init__(self, max_workers=None, mp_context=None,
# _result_queue to send wakeup signals to the executor_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
#
# _shutdown_lock must be locked to access _ThreadWakeup.close() and
# .wakeup(). Care must also be taken to not call clear or close from
# more than one thread since _ThreadWakeup.clear() is not protected by
# the _shutdown_lock
self._executor_manager_thread_wakeup = _ThreadWakeup()

# Create communication channels for the executor
Expand All @@ -744,7 +751,6 @@ def __init__(self, max_workers=None, mp_context=None,
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
shutdown_lock=self._shutdown_lock,
thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
Expand Down Expand Up @@ -798,6 +804,8 @@ def _spawn_process(self):
self._processes[p.pid] = p

def submit(self, fn, /, *args, **kwargs):
# The lock makes sure that the pool remains usable while we are
# submitting the task.
with self._shutdown_lock:
if self._broken:
raise BrokenProcessPool(self._broken)
Expand Down
75 changes: 3 additions & 72 deletions Lib/test/test_concurrent_futures/test_deadlock.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import contextlib
import queue
import signal
import sys
import time
import unittest
Expand Down Expand Up @@ -203,7 +201,7 @@ def test_shutdown_deadlock(self):
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
self.executor = executor # Allow clean up in _fail_on_deadlock
f = executor.submit(_crash, delay=.1)
executor.shutdown(wait=True)
with self.assertRaises(BrokenProcessPool):
Expand All @@ -216,7 +214,7 @@ def test_shutdown_deadlock_pickle(self):
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
self.executor = executor # Allow clean up in _fail_on_deadlock

# Start the executor and get the executor_manager_thread to collect
# the threads and avoid dangling thread that should be cleaned up
Expand Down Expand Up @@ -244,79 +242,12 @@ def test_crash_big_data(self):
data = "a" * support.PIPE_MAX_SIZE
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
self.executor = executor # Allow clean up in _fail_on_deadlock
with self.assertRaises(BrokenProcessPool):
list(executor.map(_crash_with_data, [data] * 10))

executor.shutdown(wait=True)

def test_gh105829_should_not_deadlock_if_wakeup_pipe_full(self):
# Issue #105829: The _ExecutorManagerThread wakeup pipe could
# fill up and block. See: https://github.com/python/cpython/issues/105829

# Lots of cargo culting while writing this test, apologies if
# something is really stupid...

self.executor.shutdown(wait=True)

if not hasattr(signal, 'alarm'):
raise unittest.SkipTest(
"Tested platform does not support the alarm signal")

def timeout(_signum, _frame):
import faulthandler
faulthandler.dump_traceback()

raise RuntimeError("timed out while submitting jobs?")

thread_run = futures.process._ExecutorManagerThread.run
def mock_run(self):
# Delay thread startup so the wakeup pipe can fill up and block
time.sleep(3)
thread_run(self)

class MockWakeup(_ThreadWakeup):
"""Mock wakeup object to force the wakeup to block"""
def __init__(self):
super().__init__()
self._dummy_queue = queue.Queue(maxsize=1)

def wakeup(self):
self._dummy_queue.put(None, block=True)
super().wakeup()

def clear(self):
try:
while True:
self._dummy_queue.get_nowait()
except queue.Empty:
super().clear()

with (unittest.mock.patch.object(futures.process._ExecutorManagerThread,
'run', mock_run),
unittest.mock.patch('concurrent.futures.process._ThreadWakeup',
MockWakeup)):
with self.executor_type(max_workers=2,
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock

job_num = 100
job_data = range(job_num)

# Need to use sigalarm for timeout detection because
# Executor.submit is not guarded by any timeout (both
# self._work_ids.put(self._queue_count) and
# self._executor_manager_thread_wakeup.wakeup() might
# timeout, maybe more?). In this specific case it was
# the wakeup call that deadlocked on a blocking pipe.
old_handler = signal.signal(signal.SIGALRM, timeout)
try:
signal.alarm(int(self.TIMEOUT))
self.assertEqual(job_num, len(list(executor.map(int, job_data))))
finally:
signal.alarm(0)
signal.signal(signal.SIGALRM, old_handler)


create_executor_tests(globals(), ExecutorDeadlockTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down
50 changes: 50 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,56 @@ def mock_start_new_thread(func, *args):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()

def test_wakeup(self):
# gh-105829: Check that calling _ExecutorManagerThread wakeup() many
# times in ProcessPoolExecutor.submit() does not block if the
# _ThreadWakeup pipe becomes full.

def get_pipe_size(connection):
try:
import fcntl
if hasattr(fcntl, 'F_GETPIPE_SZ'):
return fcntl.fcntl(connection.fileno(),
fcntl.F_GETPIPE_SZ)
except ImportError:
pass

# Assume 64 KiB pipe if we fail, makes test take longer
return 65_536

executor = self.executor
with executor:
# Summit a job to start the executor manager thread
# future = self.executor.submit(str, 12)
# future.result()

# Wrap _ThreadWakeup.wakeup() to count how many times it has been
# called
thread_wakeup = executor._executor_manager_thread_wakeup
orig_wakeup = thread_wakeup.wakeup
nwakeup = 0
def wrap_wakeup():
nonlocal nwakeup
nwakeup += 1
orig_wakeup()
thread_wakeup.wakeup = wrap_wakeup

# Use longer "wakeup message" to make the hang more likely
# and to speed up the test
njob = self.worker_count * 2 # at least 2 jobs per worker
pipe_size = get_pipe_size(thread_wakeup._writer)
msg_len = min(pipe_size // njob, 512)
thread_wakeup._wakeup_msg = b'x' * msg_len
msg_size = 4 + len(thread_wakeup._wakeup_msg)

njob = pipe_size // msg_size + 10 # Add some margin
job_data = range(njob)
if support.verbose:
print(f"run {njob:,} jobs")

self.assertEqual(len(list(executor.map(int, job_data))), njob)
self.assertGreaterEqual(nwakeup, njob)


create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down