Skip to content

gh-91607: Fix several test_concurrent_futures tests to actually test what they claim #91600

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 49 additions & 56 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
BrokenExecutor)
from concurrent.futures.process import BrokenProcessPool, _check_system_limits
from multiprocessing import get_context

import multiprocessing.process
import multiprocessing.util
import multiprocessing as mp


if support.check_sanitizer(address=True, memory=True):
Expand Down Expand Up @@ -130,7 +130,6 @@ def setUp(self):
self.executor = self.executor_type(
max_workers=self.worker_count,
**self.executor_kwargs)
self._prime_executor()

def tearDown(self):
self.executor.shutdown(wait=True)
Expand All @@ -144,15 +143,7 @@ def tearDown(self):
super().tearDown()

def get_context(self):
return get_context(self.ctx)

def _prime_executor(self):
# Make sure that the executor is ready to do work before running the
# tests. This should reduce the probability of timeouts in the tests.
futures = [self.executor.submit(time.sleep, 0.1)
for _ in range(self.worker_count)]
for f in futures:
f.result()
return mp.get_context(self.ctx)


class ThreadPoolMixin(ExecutorMixin):
Expand Down Expand Up @@ -275,9 +266,6 @@ def test_initializer(self):
with self.assertRaises(BrokenExecutor):
self.executor.submit(get_init_status)

def _prime_executor(self):
pass

@contextlib.contextmanager
def _assert_logged(self, msg):
if self.log_queue is not None:
Expand Down Expand Up @@ -364,14 +352,14 @@ def test_hang_issue12364(self):
f.result()

def test_cancel_futures(self):
executor = self.executor_type(max_workers=3)
fs = [executor.submit(time.sleep, .1) for _ in range(50)]
executor.shutdown(cancel_futures=True)
assert self.worker_count <= 5, "test needs few workers"
fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
self.executor.shutdown(cancel_futures=True)
# We can't guarantee the exact number of cancellations, but we can
# guarantee that *some* were cancelled. With setting max_workers to 3,
# most of the submitted futures should have been cancelled.
# guarantee that *some* were cancelled. With few workers, many of
# the submitted futures should have been cancelled.
cancelled = [fut for fut in fs if fut.cancelled()]
self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")
self.assertGreater(len(cancelled), 20)

# Ensure the other futures were able to finish.
# Use "not fut.cancelled()" instead of "fut.done()" to include futures
Expand All @@ -384,33 +372,32 @@ def test_cancel_futures(self):
# Similar to the number of cancelled futures, we can't guarantee the
# exact number that completed. But, we can guarantee that at least
# one finished.
self.assertTrue(len(others) > 0, msg=f"{len(others)=}")
self.assertGreater(len(others), 0)

def test_hang_issue39205(self):
def test_hang_gh83386(self):
"""shutdown(wait=False) doesn't hang at exit with running futures.

See https://bugs.python.org/issue39205.
See https://github.com/python/cpython/issues/83386.
"""
if self.executor_type == futures.ProcessPoolExecutor:
raise unittest.SkipTest(
"Hangs due to https://bugs.python.org/issue39205")
"Hangs, see https://github.com/python/cpython/issues/83386")

rc, out, err = assert_python_ok('-c', """if True:
from concurrent.futures import {executor_type}
from test.test_concurrent_futures import sleep_and_print
if __name__ == "__main__":
if {context!r}: multiprocessing.set_start_method({context!r})
t = {executor_type}(max_workers=3)
t.submit(sleep_and_print, 1.0, "apple")
t.shutdown(wait=False)
""".format(executor_type=self.executor_type.__name__))
""".format(executor_type=self.executor_type.__name__,
context=getattr(self, 'ctx', None)))
self.assertFalse(err)
self.assertEqual(out.strip(), b"apple")


class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
def _prime_executor(self):
pass

def test_threads_terminate(self):
def acquire_lock(lock):
lock.acquire()
Expand Down Expand Up @@ -505,14 +492,11 @@ def test_cancel_futures_wait_false(self):


class ProcessPoolShutdownTest(ExecutorShutdownTest):
def _prime_executor(self):
pass

def test_processes_terminate(self):
def acquire_lock(lock):
lock.acquire()

mp_context = get_context()
mp_context = self.get_context()
sem = mp_context.Semaphore(0)
for _ in range(3):
self.executor.submit(acquire_lock, sem)
Expand All @@ -526,7 +510,8 @@ def acquire_lock(lock):
p.join()

def test_context_manager_shutdown(self):
with futures.ProcessPoolExecutor(max_workers=5) as e:
with futures.ProcessPoolExecutor(
max_workers=5, mp_context=self.get_context()) as e:
processes = e._processes
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
Expand All @@ -535,7 +520,8 @@ def test_context_manager_shutdown(self):
p.join()

def test_del_shutdown(self):
executor = futures.ProcessPoolExecutor(max_workers=5)
executor = futures.ProcessPoolExecutor(
max_workers=5, mp_context=self.get_context())
res = executor.map(abs, range(-5, 5))
executor_manager_thread = executor._executor_manager_thread
processes = executor._processes
Expand All @@ -558,7 +544,8 @@ def test_del_shutdown(self):
def test_shutdown_no_wait(self):
# Ensure that the executor cleans up the processes when calling
# shutdown with wait=False
executor = futures.ProcessPoolExecutor(max_workers=5)
executor = futures.ProcessPoolExecutor(
max_workers=5, mp_context=self.get_context())
res = executor.map(abs, range(-5, 5))
processes = executor._processes
call_queue = executor._call_queue
Expand Down Expand Up @@ -935,7 +922,7 @@ def submit(pool):
pool.submit(submit, pool)

for _ in range(50):
with futures.ProcessPoolExecutor(1, mp_context=get_context('fork')) as workers:
with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
workers.submit(tuple)


Expand Down Expand Up @@ -1005,7 +992,7 @@ def test_traceback(self):
def test_ressources_gced_in_workers(self):
# Ensure that argument for a job are correctly gc-ed after the job
# is finished
mgr = get_context(self.ctx).Manager()
mgr = self.get_context().Manager()
obj = EventfulGCObj(mgr)
future = self.executor.submit(id, obj)
future.result()
Expand All @@ -1021,38 +1008,41 @@ def test_ressources_gced_in_workers(self):
mgr.join()

def test_saturation(self):
executor = self.executor_type(4)
mp_context = get_context()
executor = self.executor
mp_context = self.get_context()
sem = mp_context.Semaphore(0)
job_count = 15 * executor._max_workers
try:
for _ in range(job_count):
executor.submit(sem.acquire)
self.assertEqual(len(executor._processes), executor._max_workers)
for _ in range(job_count):
sem.release()
finally:
executor.shutdown()
for _ in range(job_count):
executor.submit(sem.acquire)
self.assertEqual(len(executor._processes), executor._max_workers)
for _ in range(job_count):
sem.release()

def test_idle_process_reuse_one(self):
executor = self.executor_type(4)
executor = self.executor
assert executor._max_workers >= 4
executor.submit(mul, 21, 2).result()
executor.submit(mul, 6, 7).result()
executor.submit(mul, 3, 14).result()
self.assertEqual(len(executor._processes), 1)
executor.shutdown()

def test_idle_process_reuse_multiple(self):
executor = self.executor_type(4)
executor = self.executor
assert executor._max_workers <= 5
executor.submit(mul, 12, 7).result()
executor.submit(mul, 33, 25)
executor.submit(mul, 25, 26).result()
executor.submit(mul, 18, 29)
self.assertLessEqual(len(executor._processes), 2)
executor.submit(mul, 1, 2).result()
executor.submit(mul, 0, 9)
self.assertLessEqual(len(executor._processes), 3)
executor.shutdown()

def test_max_tasks_per_child(self):
executor = self.executor_type(1, max_tasks_per_child=3)
# not using self.executor as we need to control construction.
# arguably this could go in another class w/o that mixin.
executor = self.executor_type(
1, mp_context=self.get_context(), max_tasks_per_child=3)
f1 = executor.submit(os.getpid)
original_pid = f1.result()
# The worker pid remains the same as the worker could be reused
Expand All @@ -1072,7 +1062,10 @@ def test_max_tasks_per_child(self):
executor.shutdown()

def test_max_tasks_early_shutdown(self):
executor = self.executor_type(3, max_tasks_per_child=1)
# not using self.executor as we need to control construction.
# arguably this could go in another class w/o that mixin.
executor = self.executor_type(
3, mp_context=self.get_context(), max_tasks_per_child=1)
futures = []
for i in range(6):
futures.append(executor.submit(mul, i, i))
Expand Down Expand Up @@ -1182,7 +1175,7 @@ def _check_crash(self, error, func, *args, ignore_stderr=False):
self.executor.shutdown(wait=True)

executor = self.executor_type(
max_workers=2, mp_context=get_context(self.ctx))
max_workers=2, mp_context=self.get_context())
res = executor.submit(func, *args)

if ignore_stderr:
Expand Down Expand Up @@ -1261,7 +1254,7 @@ def test_shutdown_deadlock(self):
# if a worker fails after the shutdown call.
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
mp_context=get_context(self.ctx)) as executor:
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock
f = executor.submit(_crash, delay=.1)
executor.shutdown(wait=True)
Expand All @@ -1274,7 +1267,7 @@ def test_shutdown_deadlock_pickle(self):
# Reported in bpo-39104.
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
mp_context=get_context(self.ctx)) as executor:
mp_context=self.get_context()) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock

# Start the executor and get the executor_manager_thread to collect
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix ``test_concurrent_futures`` to test the correct multiprocessing start method context in several cases where the test logic mixed this up.