@@ -652,6 +652,10 @@ def __init__(self, max_workers=None, mp_context=None,
652
652
mp_context = mp .get_context ()
653
653
self ._mp_context = mp_context
654
654
655
+ # https://github.com/python/cpython/issues/90622
656
+ self ._safe_to_dynamically_spawn_children = (
657
+ self ._mp_context .get_start_method (allow_none = False ) != "fork" )
658
+
655
659
if initializer is not None and not callable (initializer ):
656
660
raise TypeError ("initializer must be a callable" )
657
661
self ._initializer = initializer
@@ -714,6 +718,8 @@ def __init__(self, max_workers=None, mp_context=None,
714
718
def _start_executor_manager_thread (self ):
715
719
if self ._executor_manager_thread is None :
716
720
# Start the processes so that their sentinels are known.
721
+ if not self ._safe_to_dynamically_spawn_children : # ie, using fork.
722
+ self ._launch_processes ()
717
723
self ._executor_manager_thread = _ExecutorManagerThread (self )
718
724
self ._executor_manager_thread .start ()
719
725
_threads_wakeups [self ._executor_manager_thread ] = \
@@ -726,15 +732,32 @@ def _adjust_process_count(self):
726
732
727
733
process_count = len (self ._processes )
728
734
if process_count < self ._max_workers :
729
- p = self ._mp_context .Process (
730
- target = _process_worker ,
731
- args = (self ._call_queue ,
732
- self ._result_queue ,
733
- self ._initializer ,
734
- self ._initargs ,
735
- self ._max_tasks_per_child ))
736
- p .start ()
737
- self ._processes [p .pid ] = p
735
+ # Assertion disabled as this codepath is also used to replace a
736
+ # worker that unexpectedly dies, even when using the 'fork' start
737
+ # method. That means there is still a potential deadlock bug. If a
738
+ # 'fork' mp_context worker dies, we'll be forking a new one when
739
+ # we know a thread is running (self._executor_manager_thread).
740
+ #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622'
741
+ self ._spawn_process ()
742
+
743
+ def _launch_processes (self ):
744
+ # https://github.com/python/cpython/issues/90622
745
+ assert not self ._executor_manager_thread , (
746
+ 'Processes cannot be fork()ed after the thread has started, '
747
+ 'deadlock in the child processes could result.' )
748
+ for _ in range (len (self ._processes ), self ._max_workers ):
749
+ self ._spawn_process ()
750
+
751
+ def _spawn_process (self ):
752
+ p = self ._mp_context .Process (
753
+ target = _process_worker ,
754
+ args = (self ._call_queue ,
755
+ self ._result_queue ,
756
+ self ._initializer ,
757
+ self ._initargs ,
758
+ self ._max_tasks_per_child ))
759
+ p .start ()
760
+ self ._processes [p .pid ] = p
738
761
739
762
def submit (self , fn , / , * args , ** kwargs ):
740
763
with self ._shutdown_lock :
@@ -755,7 +778,8 @@ def submit(self, fn, /, *args, **kwargs):
755
778
# Wake up queue management thread
756
779
self ._executor_manager_thread_wakeup .wakeup ()
757
780
758
- self ._adjust_process_count ()
781
+ if self ._safe_to_dynamically_spawn_children :
782
+ self ._adjust_process_count ()
759
783
self ._start_executor_manager_thread ()
760
784
return f
761
785
submit .__doc__ = _base .Executor .submit .__doc__
0 commit comments