Skip to content

Commit 2b25f54

Browse files
committed
Fix test_concurrent_futures to actually test what it says.
Many ProcessPoolExecutor based tests were ignoring the mp_context and using the default instead. This meant we lacked proper test coverage of all of them. Also removes the old _prime_executor() worker delay seeding code as it appears to have no point and causes 20-30 seconds extra latency on this already long test. It also interfered with some of the refactoring to fix the above to not needlessly create their own executor when setUp has already created an appropriate one.
1 parent 63f32fa commit 2b25f54

File tree

1 file changed

+38
-49
lines changed

1 file changed

+38
-49
lines changed

Lib/test/test_concurrent_futures.py

Lines changed: 38 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ def setUp(self):
130130
self.executor = self.executor_type(
131131
max_workers=self.worker_count,
132132
**self.executor_kwargs)
133-
self._prime_executor()
134133

135134
def tearDown(self):
136135
self.executor.shutdown(wait=True)
@@ -146,14 +145,6 @@ def tearDown(self):
146145
def get_context(self):
147146
return get_context(self.ctx)
148147

149-
def _prime_executor(self):
150-
# Make sure that the executor is ready to do work before running the
151-
# tests. This should reduce the probability of timeouts in the tests.
152-
futures = [self.executor.submit(time.sleep, 0.1)
153-
for _ in range(self.worker_count)]
154-
for f in futures:
155-
f.result()
156-
157148

158149
class ThreadPoolMixin(ExecutorMixin):
159150
executor_type = futures.ThreadPoolExecutor
@@ -275,9 +266,6 @@ def test_initializer(self):
275266
with self.assertRaises(BrokenExecutor):
276267
self.executor.submit(get_init_status)
277268

278-
def _prime_executor(self):
279-
pass
280-
281269
@contextlib.contextmanager
282270
def _assert_logged(self, msg):
283271
if self.log_queue is not None:
@@ -364,14 +352,14 @@ def test_hang_issue12364(self):
364352
f.result()
365353

366354
def test_cancel_futures(self):
367-
executor = self.executor_type(max_workers=3)
368-
fs = [executor.submit(time.sleep, .1) for _ in range(50)]
369-
executor.shutdown(cancel_futures=True)
355+
assert self.worker_count <= 5, "test needs few workers"
356+
fs = [self.executor.submit(time.sleep, .1) for _ in range(50)]
357+
self.executor.shutdown(cancel_futures=True)
370358
# We can't guarantee the exact number of cancellations, but we can
371-
# guarantee that *some* were cancelled. With setting max_workers to 3,
372-
# most of the submitted futures should have been cancelled.
359+
# guarantee that *some* were cancelled. With few workers, many of
360+
# the submitted futures should have been cancelled.
373361
cancelled = [fut for fut in fs if fut.cancelled()]
374-
self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")
362+
self.assertGreater(len(cancelled), 20)
375363

376364
# Ensure the other futures were able to finish.
377365
# Use "not fut.cancelled()" instead of "fut.done()" to include futures
@@ -384,33 +372,32 @@ def test_cancel_futures(self):
384372
# Similar to the number of cancelled futures, we can't guarantee the
385373
# exact number that completed. But, we can guarantee that at least
386374
# one finished.
387-
self.assertTrue(len(others) > 0, msg=f"{len(others)=}")
375+
self.assertGreater(len(others), 0)
388376

389-
def test_hang_issue39205(self):
377+
def test_hang_gh83386(self):
390378
"""shutdown(wait=False) doesn't hang at exit with running futures.
391379
392-
See https://bugs.python.org/issue39205.
380+
See https://github.com/python/cpython/issues/83386.
393381
"""
394382
if self.executor_type == futures.ProcessPoolExecutor:
395383
raise unittest.SkipTest(
396-
"Hangs due to https://bugs.python.org/issue39205")
384+
"Hangs, see https://github.com/python/cpython/issues/83386")
397385

398386
rc, out, err = assert_python_ok('-c', """if True:
399387
from concurrent.futures import {executor_type}
400388
from test.test_concurrent_futures import sleep_and_print
401389
if __name__ == "__main__":
390+
if {context!r}: multiprocessing.set_start_method({context!r})
402391
t = {executor_type}(max_workers=3)
403392
t.submit(sleep_and_print, 1.0, "apple")
404393
t.shutdown(wait=False)
405-
""".format(executor_type=self.executor_type.__name__))
394+
""".format(executor_type=self.executor_type.__name__,
395+
context=getattr(self, 'ctx', None)))
406396
self.assertFalse(err)
407397
self.assertEqual(out.strip(), b"apple")
408398

409399

410400
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
411-
def _prime_executor(self):
412-
pass
413-
414401
def test_threads_terminate(self):
415402
def acquire_lock(lock):
416403
lock.acquire()
@@ -505,14 +492,11 @@ def test_cancel_futures_wait_false(self):
505492

506493

507494
class ProcessPoolShutdownTest(ExecutorShutdownTest):
508-
def _prime_executor(self):
509-
pass
510-
511495
def test_processes_terminate(self):
512496
def acquire_lock(lock):
513497
lock.acquire()
514498

515-
mp_context = get_context()
499+
mp_context = self.get_context()
516500
sem = mp_context.Semaphore(0)
517501
for _ in range(3):
518502
self.executor.submit(acquire_lock, sem)
@@ -526,7 +510,8 @@ def acquire_lock(lock):
526510
p.join()
527511

528512
def test_context_manager_shutdown(self):
529-
with futures.ProcessPoolExecutor(max_workers=5) as e:
513+
with futures.ProcessPoolExecutor(
514+
max_workers=5, mp_context=self.get_context()) as e:
530515
processes = e._processes
531516
self.assertEqual(list(e.map(abs, range(-5, 5))),
532517
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
@@ -535,7 +520,8 @@ def test_context_manager_shutdown(self):
535520
p.join()
536521

537522
def test_del_shutdown(self):
538-
executor = futures.ProcessPoolExecutor(max_workers=5)
523+
executor = futures.ProcessPoolExecutor(
524+
max_workers=5, mp_context=self.get_context())
539525
res = executor.map(abs, range(-5, 5))
540526
executor_manager_thread = executor._executor_manager_thread
541527
processes = executor._processes
@@ -558,7 +544,8 @@ def test_del_shutdown(self):
558544
def test_shutdown_no_wait(self):
559545
# Ensure that the executor cleans up the processes when calling
560546
# shutdown with wait=False
561-
executor = futures.ProcessPoolExecutor(max_workers=5)
547+
executor = futures.ProcessPoolExecutor(
548+
max_workers=5, mp_context=self.get_context())
562549
res = executor.map(abs, range(-5, 5))
563550
processes = executor._processes
564551
call_queue = executor._call_queue
@@ -1021,38 +1008,39 @@ def test_ressources_gced_in_workers(self):
10211008
mgr.join()
10221009

10231010
def test_saturation(self):
1024-
executor = self.executor_type(4)
1025-
mp_context = get_context()
1011+
executor = self.executor
1012+
mp_context = self.get_context()
10261013
sem = mp_context.Semaphore(0)
10271014
job_count = 15 * executor._max_workers
1028-
try:
1029-
for _ in range(job_count):
1030-
executor.submit(sem.acquire)
1031-
self.assertEqual(len(executor._processes), executor._max_workers)
1032-
for _ in range(job_count):
1033-
sem.release()
1034-
finally:
1035-
executor.shutdown()
1015+
for _ in range(job_count):
1016+
executor.submit(sem.acquire)
1017+
self.assertEqual(len(executor._processes), executor._max_workers)
1018+
for _ in range(job_count):
1019+
sem.release()
10361020

10371021
def test_idle_process_reuse_one(self):
1038-
executor = self.executor_type(4)
1022+
executor = self.executor
1023+
assert executor._max_workers >= 4
10391024
executor.submit(mul, 21, 2).result()
10401025
executor.submit(mul, 6, 7).result()
10411026
executor.submit(mul, 3, 14).result()
10421027
self.assertEqual(len(executor._processes), 1)
1043-
executor.shutdown()
10441028

10451029
def test_idle_process_reuse_multiple(self):
1046-
executor = self.executor_type(4)
1030+
executor = self.executor
1031+
assert executor._max_workers <= 5
10471032
executor.submit(mul, 12, 7).result()
10481033
executor.submit(mul, 33, 25)
10491034
executor.submit(mul, 25, 26).result()
10501035
executor.submit(mul, 18, 29)
1051-
self.assertLessEqual(len(executor._processes), 2)
1036+
executor.submit(mul, 1, 2).result()
1037+
executor.submit(mul, 0, 9)
1038+
self.assertLessEqual(len(executor._processes), 3)
10521039
executor.shutdown()
10531040

10541041
def test_max_tasks_per_child(self):
1055-
executor = self.executor_type(1, max_tasks_per_child=3)
1042+
executor = self.executor_type(
1043+
1, mp_context=self.get_context(), max_tasks_per_child=3)
10561044
f1 = executor.submit(os.getpid)
10571045
original_pid = f1.result()
10581046
# The worker pid remains the same as the worker could be reused
@@ -1072,7 +1060,8 @@ def test_max_tasks_per_child(self):
10721060
executor.shutdown()
10731061

10741062
def test_max_tasks_early_shutdown(self):
1075-
executor = self.executor_type(3, max_tasks_per_child=1)
1063+
executor = self.executor_type(
1064+
3, mp_context=self.get_context(), max_tasks_per_child=1)
10761065
futures = []
10771066
for i in range(6):
10781067
futures.append(executor.submit(mul, i, i))

0 commit comments

Comments
 (0)