diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 8adba36a387ad0..978a748df7fa3c 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -26,10 +26,10 @@ PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, BrokenExecutor) from concurrent.futures.process import BrokenProcessPool, _check_system_limits -from multiprocessing import get_context import multiprocessing.process import multiprocessing.util +import multiprocessing as mp if support.check_sanitizer(address=True, memory=True): @@ -130,7 +130,6 @@ def setUp(self): self.executor = self.executor_type( max_workers=self.worker_count, **self.executor_kwargs) - self._prime_executor() def tearDown(self): self.executor.shutdown(wait=True) @@ -144,15 +143,7 @@ def tearDown(self): super().tearDown() def get_context(self): - return get_context(self.ctx) - - def _prime_executor(self): - # Make sure that the executor is ready to do work before running the - # tests. This should reduce the probability of timeouts in the tests. - futures = [self.executor.submit(time.sleep, 0.1) - for _ in range(self.worker_count)] - for f in futures: - f.result() + return mp.get_context(self.ctx) class ThreadPoolMixin(ExecutorMixin): @@ -275,9 +266,6 @@ def test_initializer(self): with self.assertRaises(BrokenExecutor): self.executor.submit(get_init_status) - def _prime_executor(self): - pass - @contextlib.contextmanager def _assert_logged(self, msg): if self.log_queue is not None: @@ -364,14 +352,14 @@ def test_hang_issue12364(self): f.result() def test_cancel_futures(self): - executor = self.executor_type(max_workers=3) - fs = [executor.submit(time.sleep, .1) for _ in range(50)] - executor.shutdown(cancel_futures=True) + assert self.worker_count <= 5, "test needs few workers" + fs = [self.executor.submit(time.sleep, .1) for _ in range(50)] + self.executor.shutdown(cancel_futures=True) # We can't guarantee the exact number of cancellations, but we can - # guarantee that *some* were cancelled. With setting max_workers to 3, - # most of the submitted futures should have been cancelled. + # guarantee that *some* were cancelled. With few workers, many of + # the submitted futures should have been cancelled. cancelled = [fut for fut in fs if fut.cancelled()] - self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}") + self.assertGreater(len(cancelled), 20) # Ensure the other futures were able to finish. # Use "not fut.cancelled()" instead of "fut.done()" to include futures @@ -384,33 +372,32 @@ def test_cancel_futures(self): # Similar to the number of cancelled futures, we can't guarantee the # exact number that completed. But, we can guarantee that at least # one finished. - self.assertTrue(len(others) > 0, msg=f"{len(others)=}") + self.assertGreater(len(others), 0) - def test_hang_issue39205(self): + def test_hang_gh83386(self): """shutdown(wait=False) doesn't hang at exit with running futures. - See https://bugs.python.org/issue39205. + See https://github.com/python/cpython/issues/83386. """ if self.executor_type == futures.ProcessPoolExecutor: raise unittest.SkipTest( - "Hangs due to https://bugs.python.org/issue39205") + "Hangs, see https://github.com/python/cpython/issues/83386") rc, out, err = assert_python_ok('-c', """if True: from concurrent.futures import {executor_type} from test.test_concurrent_futures import sleep_and_print if __name__ == "__main__": + if {context!r}: multiprocessing.set_start_method({context!r}) t = {executor_type}(max_workers=3) t.submit(sleep_and_print, 1.0, "apple") t.shutdown(wait=False) - """.format(executor_type=self.executor_type.__name__)) + """.format(executor_type=self.executor_type.__name__, + context=getattr(self, 'ctx', None))) self.assertFalse(err) self.assertEqual(out.strip(), b"apple") class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase): - def _prime_executor(self): - pass - def test_threads_terminate(self): def acquire_lock(lock): lock.acquire() @@ -505,14 +492,11 @@ def test_cancel_futures_wait_false(self): class ProcessPoolShutdownTest(ExecutorShutdownTest): - def _prime_executor(self): - pass - def test_processes_terminate(self): def acquire_lock(lock): lock.acquire() - mp_context = get_context() + mp_context = self.get_context() sem = mp_context.Semaphore(0) for _ in range(3): self.executor.submit(acquire_lock, sem) @@ -526,7 +510,8 @@ def acquire_lock(lock): p.join() def test_context_manager_shutdown(self): - with futures.ProcessPoolExecutor(max_workers=5) as e: + with futures.ProcessPoolExecutor( + max_workers=5, mp_context=self.get_context()) as e: processes = e._processes self.assertEqual(list(e.map(abs, range(-5, 5))), [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) @@ -535,7 +520,8 @@ def test_context_manager_shutdown(self): p.join() def test_del_shutdown(self): - executor = futures.ProcessPoolExecutor(max_workers=5) + executor = futures.ProcessPoolExecutor( + max_workers=5, mp_context=self.get_context()) res = executor.map(abs, range(-5, 5)) executor_manager_thread = executor._executor_manager_thread processes = executor._processes @@ -558,7 +544,8 @@ def test_del_shutdown(self): def test_shutdown_no_wait(self): # Ensure that the executor cleans up the processes when calling # shutdown with wait=False - executor = futures.ProcessPoolExecutor(max_workers=5) + executor = futures.ProcessPoolExecutor( + max_workers=5, mp_context=self.get_context()) res = executor.map(abs, range(-5, 5)) processes = executor._processes call_queue = executor._call_queue @@ -935,7 +922,7 @@ def submit(pool): pool.submit(submit, pool) for _ in range(50): - with futures.ProcessPoolExecutor(1, mp_context=get_context('fork')) as workers: + with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers: workers.submit(tuple) @@ -1005,7 +992,7 @@ def test_traceback(self): def test_ressources_gced_in_workers(self): # Ensure that argument for a job are correctly gc-ed after the job # is finished - mgr = get_context(self.ctx).Manager() + mgr = self.get_context().Manager() obj = EventfulGCObj(mgr) future = self.executor.submit(id, obj) future.result() @@ -1021,38 +1008,41 @@ def test_ressources_gced_in_workers(self): mgr.join() def test_saturation(self): - executor = self.executor_type(4) - mp_context = get_context() + executor = self.executor + mp_context = self.get_context() sem = mp_context.Semaphore(0) job_count = 15 * executor._max_workers - try: - for _ in range(job_count): - executor.submit(sem.acquire) - self.assertEqual(len(executor._processes), executor._max_workers) - for _ in range(job_count): - sem.release() - finally: - executor.shutdown() + for _ in range(job_count): + executor.submit(sem.acquire) + self.assertEqual(len(executor._processes), executor._max_workers) + for _ in range(job_count): + sem.release() def test_idle_process_reuse_one(self): - executor = self.executor_type(4) + executor = self.executor + assert executor._max_workers >= 4 executor.submit(mul, 21, 2).result() executor.submit(mul, 6, 7).result() executor.submit(mul, 3, 14).result() self.assertEqual(len(executor._processes), 1) - executor.shutdown() def test_idle_process_reuse_multiple(self): - executor = self.executor_type(4) + executor = self.executor + assert executor._max_workers <= 5 executor.submit(mul, 12, 7).result() executor.submit(mul, 33, 25) executor.submit(mul, 25, 26).result() executor.submit(mul, 18, 29) - self.assertLessEqual(len(executor._processes), 2) + executor.submit(mul, 1, 2).result() + executor.submit(mul, 0, 9) + self.assertLessEqual(len(executor._processes), 3) executor.shutdown() def test_max_tasks_per_child(self): - executor = self.executor_type(1, max_tasks_per_child=3) + # not using self.executor as we need to control construction. + # arguably this could go in another class w/o that mixin. + executor = self.executor_type( + 1, mp_context=self.get_context(), max_tasks_per_child=3) f1 = executor.submit(os.getpid) original_pid = f1.result() # The worker pid remains the same as the worker could be reused @@ -1072,7 +1062,10 @@ def test_max_tasks_per_child(self): executor.shutdown() def test_max_tasks_early_shutdown(self): - executor = self.executor_type(3, max_tasks_per_child=1) + # not using self.executor as we need to control construction. + # arguably this could go in another class w/o that mixin. + executor = self.executor_type( + 3, mp_context=self.get_context(), max_tasks_per_child=1) futures = [] for i in range(6): futures.append(executor.submit(mul, i, i)) @@ -1182,7 +1175,7 @@ def _check_crash(self, error, func, *args, ignore_stderr=False): self.executor.shutdown(wait=True) executor = self.executor_type( - max_workers=2, mp_context=get_context(self.ctx)) + max_workers=2, mp_context=self.get_context()) res = executor.submit(func, *args) if ignore_stderr: @@ -1261,7 +1254,7 @@ def test_shutdown_deadlock(self): # if a worker fails after the shutdown call. self.executor.shutdown(wait=True) with self.executor_type(max_workers=2, - mp_context=get_context(self.ctx)) as executor: + mp_context=self.get_context()) as executor: self.executor = executor # Allow clean up in fail_on_deadlock f = executor.submit(_crash, delay=.1) executor.shutdown(wait=True) @@ -1274,7 +1267,7 @@ def test_shutdown_deadlock_pickle(self): # Reported in bpo-39104. self.executor.shutdown(wait=True) with self.executor_type(max_workers=2, - mp_context=get_context(self.ctx)) as executor: + mp_context=self.get_context()) as executor: self.executor = executor # Allow clean up in fail_on_deadlock # Start the executor and get the executor_manager_thread to collect diff --git a/Misc/NEWS.d/next/Tests/2022-04-16-17-54-05.gh-issue-91607.FnXjtW.rst b/Misc/NEWS.d/next/Tests/2022-04-16-17-54-05.gh-issue-91607.FnXjtW.rst new file mode 100644 index 00000000000000..32839a826a41ea --- /dev/null +++ b/Misc/NEWS.d/next/Tests/2022-04-16-17-54-05.gh-issue-91607.FnXjtW.rst @@ -0,0 +1 @@ +Fix ``test_concurrent_futures`` to test the correct multiprocessing start method context in several cases where the test logic mixed this up.