@@ -130,7 +130,6 @@ def setUp(self):
130
130
self .executor = self .executor_type (
131
131
max_workers = self .worker_count ,
132
132
** self .executor_kwargs )
133
- self ._prime_executor ()
134
133
135
134
def tearDown (self ):
136
135
self .executor .shutdown (wait = True )
@@ -146,14 +145,6 @@ def tearDown(self):
146
145
def get_context (self ):
147
146
return get_context (self .ctx )
148
147
149
- def _prime_executor (self ):
150
- # Make sure that the executor is ready to do work before running the
151
- # tests. This should reduce the probability of timeouts in the tests.
152
- futures = [self .executor .submit (time .sleep , 0.1 )
153
- for _ in range (self .worker_count )]
154
- for f in futures :
155
- f .result ()
156
-
157
148
158
149
class ThreadPoolMixin (ExecutorMixin ):
159
150
executor_type = futures .ThreadPoolExecutor
@@ -275,9 +266,6 @@ def test_initializer(self):
275
266
with self .assertRaises (BrokenExecutor ):
276
267
self .executor .submit (get_init_status )
277
268
278
- def _prime_executor (self ):
279
- pass
280
-
281
269
@contextlib .contextmanager
282
270
def _assert_logged (self , msg ):
283
271
if self .log_queue is not None :
@@ -364,14 +352,14 @@ def test_hang_issue12364(self):
364
352
f .result ()
365
353
366
354
def test_cancel_futures (self ):
367
- executor = self .executor_type ( max_workers = 3 )
368
- fs = [executor .submit (time .sleep , .1 ) for _ in range (50 )]
369
- executor .shutdown (cancel_futures = True )
355
+ assert self .worker_count <= 5 , "test needs few workers"
356
+ fs = [self . executor .submit (time .sleep , .1 ) for _ in range (50 )]
357
+ self . executor .shutdown (cancel_futures = True )
370
358
# We can't guarantee the exact number of cancellations, but we can
371
- # guarantee that *some* were cancelled. With setting max_workers to 3,
372
- # most of the submitted futures should have been cancelled.
359
+ # guarantee that *some* were cancelled. With few workers, many of
360
+ # the submitted futures should have been cancelled.
373
361
cancelled = [fut for fut in fs if fut .cancelled ()]
374
- self .assertTrue (len (cancelled ) >= 35 , msg = f" { len ( cancelled ) = } " )
362
+ self .assertGreater (len (cancelled ), 20 )
375
363
376
364
# Ensure the other futures were able to finish.
377
365
# Use "not fut.cancelled()" instead of "fut.done()" to include futures
@@ -384,33 +372,32 @@ def test_cancel_futures(self):
384
372
# Similar to the number of cancelled futures, we can't guarantee the
385
373
# exact number that completed. But, we can guarantee that at least
386
374
# one finished.
387
- self .assertTrue (len (others ) > 0 , msg = f" { len ( others ) = } " )
375
+ self .assertGreater (len (others ), 0 )
388
376
389
- def test_hang_issue39205 (self ):
377
+ def test_hang_gh83386 (self ):
390
378
"""shutdown(wait=False) doesn't hang at exit with running futures.
391
379
392
- See https://bugs. python.org/issue39205 .
380
+ See https://github.com/ python/cpython/issues/83386 .
393
381
"""
394
382
if self .executor_type == futures .ProcessPoolExecutor :
395
383
raise unittest .SkipTest (
396
- "Hangs due to https://bugs. python.org/issue39205 " )
384
+ "Hangs, see https://github.com/ python/cpython/issues/83386 " )
397
385
398
386
rc , out , err = assert_python_ok ('-c' , """if True:
399
387
from concurrent.futures import {executor_type}
400
388
from test.test_concurrent_futures import sleep_and_print
401
389
if __name__ == "__main__":
390
+ if {context!r}: multiprocessing.set_start_method({context!r})
402
391
t = {executor_type}(max_workers=3)
403
392
t.submit(sleep_and_print, 1.0, "apple")
404
393
t.shutdown(wait=False)
405
- """ .format (executor_type = self .executor_type .__name__ ))
394
+ """ .format (executor_type = self .executor_type .__name__ ,
395
+ context = getattr (self , 'ctx' , None )))
406
396
self .assertFalse (err )
407
397
self .assertEqual (out .strip (), b"apple" )
408
398
409
399
410
400
class ThreadPoolShutdownTest (ThreadPoolMixin , ExecutorShutdownTest , BaseTestCase ):
411
- def _prime_executor (self ):
412
- pass
413
-
414
401
def test_threads_terminate (self ):
415
402
def acquire_lock (lock ):
416
403
lock .acquire ()
@@ -505,14 +492,11 @@ def test_cancel_futures_wait_false(self):
505
492
506
493
507
494
class ProcessPoolShutdownTest (ExecutorShutdownTest ):
508
- def _prime_executor (self ):
509
- pass
510
-
511
495
def test_processes_terminate (self ):
512
496
def acquire_lock (lock ):
513
497
lock .acquire ()
514
498
515
- mp_context = get_context ()
499
+ mp_context = self . get_context ()
516
500
sem = mp_context .Semaphore (0 )
517
501
for _ in range (3 ):
518
502
self .executor .submit (acquire_lock , sem )
@@ -526,7 +510,8 @@ def acquire_lock(lock):
526
510
p .join ()
527
511
528
512
def test_context_manager_shutdown (self ):
529
- with futures .ProcessPoolExecutor (max_workers = 5 ) as e :
513
+ with futures .ProcessPoolExecutor (
514
+ max_workers = 5 , mp_context = self .get_context ()) as e :
530
515
processes = e ._processes
531
516
self .assertEqual (list (e .map (abs , range (- 5 , 5 ))),
532
517
[5 , 4 , 3 , 2 , 1 , 0 , 1 , 2 , 3 , 4 ])
@@ -535,7 +520,8 @@ def test_context_manager_shutdown(self):
535
520
p .join ()
536
521
537
522
def test_del_shutdown (self ):
538
- executor = futures .ProcessPoolExecutor (max_workers = 5 )
523
+ executor = futures .ProcessPoolExecutor (
524
+ max_workers = 5 , mp_context = self .get_context ())
539
525
res = executor .map (abs , range (- 5 , 5 ))
540
526
executor_manager_thread = executor ._executor_manager_thread
541
527
processes = executor ._processes
@@ -558,7 +544,8 @@ def test_del_shutdown(self):
558
544
def test_shutdown_no_wait (self ):
559
545
# Ensure that the executor cleans up the processes when calling
560
546
# shutdown with wait=False
561
- executor = futures .ProcessPoolExecutor (max_workers = 5 )
547
+ executor = futures .ProcessPoolExecutor (
548
+ max_workers = 5 , mp_context = self .get_context ())
562
549
res = executor .map (abs , range (- 5 , 5 ))
563
550
processes = executor ._processes
564
551
call_queue = executor ._call_queue
@@ -1021,38 +1008,39 @@ def test_ressources_gced_in_workers(self):
1021
1008
mgr .join ()
1022
1009
1023
1010
def test_saturation (self ):
1024
- executor = self .executor_type ( 4 )
1025
- mp_context = get_context ()
1011
+ executor = self .executor
1012
+ mp_context = self . get_context ()
1026
1013
sem = mp_context .Semaphore (0 )
1027
1014
job_count = 15 * executor ._max_workers
1028
- try :
1029
- for _ in range (job_count ):
1030
- executor .submit (sem .acquire )
1031
- self .assertEqual (len (executor ._processes ), executor ._max_workers )
1032
- for _ in range (job_count ):
1033
- sem .release ()
1034
- finally :
1035
- executor .shutdown ()
1015
+ for _ in range (job_count ):
1016
+ executor .submit (sem .acquire )
1017
+ self .assertEqual (len (executor ._processes ), executor ._max_workers )
1018
+ for _ in range (job_count ):
1019
+ sem .release ()
1036
1020
1037
1021
def test_idle_process_reuse_one (self ):
1038
- executor = self .executor_type (4 )
1022
+ executor = self .executor
1023
+ assert executor ._max_workers >= 4
1039
1024
executor .submit (mul , 21 , 2 ).result ()
1040
1025
executor .submit (mul , 6 , 7 ).result ()
1041
1026
executor .submit (mul , 3 , 14 ).result ()
1042
1027
self .assertEqual (len (executor ._processes ), 1 )
1043
- executor .shutdown ()
1044
1028
1045
1029
def test_idle_process_reuse_multiple (self ):
1046
- executor = self .executor_type (4 )
1030
+ executor = self .executor
1031
+ assert executor ._max_workers <= 5
1047
1032
executor .submit (mul , 12 , 7 ).result ()
1048
1033
executor .submit (mul , 33 , 25 )
1049
1034
executor .submit (mul , 25 , 26 ).result ()
1050
1035
executor .submit (mul , 18 , 29 )
1051
- self .assertLessEqual (len (executor ._processes ), 2 )
1036
+ executor .submit (mul , 1 , 2 ).result ()
1037
+ executor .submit (mul , 0 , 9 )
1038
+ self .assertLessEqual (len (executor ._processes ), 3 )
1052
1039
executor .shutdown ()
1053
1040
1054
1041
def test_max_tasks_per_child (self ):
1055
- executor = self .executor_type (1 , max_tasks_per_child = 3 )
1042
+ executor = self .executor_type (
1043
+ 1 , mp_context = self .get_context (), max_tasks_per_child = 3 )
1056
1044
f1 = executor .submit (os .getpid )
1057
1045
original_pid = f1 .result ()
1058
1046
# The worker pid remains the same as the worker could be reused
@@ -1072,7 +1060,8 @@ def test_max_tasks_per_child(self):
1072
1060
executor .shutdown ()
1073
1061
1074
1062
def test_max_tasks_early_shutdown (self ):
1075
- executor = self .executor_type (3 , max_tasks_per_child = 1 )
1063
+ executor = self .executor_type (
1064
+ 3 , mp_context = self .get_context (), max_tasks_per_child = 1 )
1076
1065
futures = []
1077
1066
for i in range (6 ):
1078
1067
futures .append (executor .submit (mul , i , i ))
0 commit comments