diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py index d32c3f07f7..cf119ac643 100644 --- a/Lib/concurrent/futures/_base.py +++ b/Lib/concurrent/futures/_base.py @@ -284,13 +284,14 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): A named 2-tuple of sets. The first set, named 'done', contains the futures that completed (is finished or cancelled) before the wait completed. The second set, named 'not_done', contains uncompleted - futures. + futures. Duplicate futures given to *fs* are removed and will be + returned only once. """ + fs = set(fs) with _AcquireFutures(fs): - done = set(f for f in fs - if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - not_done = set(fs) - done - + done = {f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]} + not_done = fs - done if (return_when == FIRST_COMPLETED) and done: return DoneAndNotDoneFutures(done, not_done) elif (return_when == FIRST_EXCEPTION) and done: @@ -309,7 +310,7 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): f._waiters.remove(waiter) done.update(waiter.finished_futures) - return DoneAndNotDoneFutures(done, set(fs) - done) + return DoneAndNotDoneFutures(done, fs - done) class Future(object): """Represents the result of an asynchronous computation.""" @@ -380,13 +381,17 @@ def running(self): return self._state == RUNNING def done(self): - """Return True of the future was cancelled or finished executing.""" + """Return True if the future was cancelled or finished executing.""" with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] def __get_result(self): if self._exception: - raise self._exception + try: + raise self._exception + finally: + # Break a reference cycle with the exception in self._exception + self = None else: return self._result @@ -426,20 +431,24 @@ def result(self, timeout=None): timeout. Exception: If the call raised then that exception will be raised. """ - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self.__get_result() - else: - raise TimeoutError() + try: + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + else: + raise TimeoutError() + finally: + # Break a reference cycle with the exception in self._exception + self = None def exception(self, timeout=None): """Return the exception raised by the call that the future represents. @@ -550,7 +559,7 @@ def set_exception(self, exception): class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" - def submit(*args, **kwargs): + def submit(self, fn, /, *args, **kwargs): """Submits a callable to be executed with the given arguments. Schedules the callable to be executed as fn(*args, **kwargs) and returns @@ -559,21 +568,7 @@ def submit(*args, **kwargs): Returns: A Future representing the given call. """ - if len(args) >= 2: - pass - elif not args: - raise TypeError("descriptor 'submit' of 'Executor' object " - "needs an argument") - elif 'fn' in kwargs: - import warnings - warnings.warn("Passing 'fn' as keyword argument is deprecated", - DeprecationWarning, stacklevel=2) - else: - raise TypeError('submit expected at least 1 positional argument, ' - 'got %d' % (len(args)-1)) - raise NotImplementedError() - submit.__text_signature__ = '($self, fn, /, *args, **kwargs)' def map(self, fn, *iterables, timeout=None, chunksize=1): """Returns an iterator equivalent to map(fn, iter). @@ -619,7 +614,7 @@ def result_iterator(): future.cancel() return result_iterator() - def shutdown(self, wait=True): + def shutdown(self, wait=True, *, cancel_futures=False): """Clean-up the resources associated with the Executor. It is safe to call this method several times. Otherwise, no other @@ -629,6 +624,9 @@ def shutdown(self, wait=True): wait: If True then shutdown will not return until all running futures have finished executing and the resources used by the executor have been reclaimed. + cancel_futures: If True then shutdown will cancel all pending + futures. Futures that are completed or running will not be + cancelled. """ pass diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 2b2b78eedd..57941e485d 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -45,11 +45,9 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' -import atexit import os from concurrent.futures import _base import queue -from queue import Full import multiprocessing as mp import multiprocessing.connection from multiprocessing.queues import Queue @@ -60,19 +58,6 @@ import sys import traceback -# Workers are created as daemon threads and processes. This is done to allow the -# interpreter to exit when there are still idle processes in a -# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, -# allowing workers to die with the interpreter has two undesirable properties: -# - The workers would still be running during interpreter shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads/processes finish. _threads_wakeups = weakref.WeakKeyDictionary() _global_shutdown = False @@ -80,18 +65,23 @@ class _ThreadWakeup: def __init__(self): + self._closed = False self._reader, self._writer = mp.Pipe(duplex=False) def close(self): - self._writer.close() - self._reader.close() + if not self._closed: + self._closed = True + self._writer.close() + self._reader.close() def wakeup(self): - self._writer.send_bytes(b"") + if not self._closed: + self._writer.send_bytes(b"") def clear(self): - while self._reader.poll(): - self._reader.recv_bytes() + if not self._closed: + while self._reader.poll(): + self._reader.recv_bytes() def _python_exit(): @@ -99,10 +89,17 @@ def _python_exit(): _global_shutdown = True items = list(_threads_wakeups.items()) for _, thread_wakeup in items: + # call not protected by ProcessPoolExecutor._shutdown_lock thread_wakeup.wakeup() for t, _ in items: t.join() +# Register for `_python_exit()` to be called just before joining all +# non-daemon threads. This is used instead of `atexit.register()` for +# compatibility with subinterpreters, which no longer support daemon threads. +# See bpo-39812 for context. +threading._register_atexit(_python_exit) + # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for # work while a larger number will make Future.cancel() succeed less frequently @@ -129,6 +126,9 @@ def __init__(self, exc, tb): tb = traceback.format_exception(type(exc), exc, tb) tb = ''.join(tb) self.exc = exc + # Traceback object needs to be garbage-collected as its frames + # contain references to all the objects in the exception scope + self.exc.__traceback__ = None self.tb = '\n"""\n%s"""' % tb def __reduce__(self): return _rebuild_exc, (self.exc, self.tb) @@ -160,8 +160,11 @@ def __init__(self, work_id, fn, args, kwargs): class _SafeQueue(Queue): """Safe Queue set exception to the future object linked to a job""" - def __init__(self, max_size=0, *, ctx, pending_work_items): + def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, + thread_wakeup): self.pending_work_items = pending_work_items + self.shutdown_lock = shutdown_lock + self.thread_wakeup = thread_wakeup super().__init__(max_size, ctx=ctx) def _on_queue_feeder_error(self, e, obj): @@ -169,8 +172,11 @@ def _on_queue_feeder_error(self, e, obj): tb = traceback.format_exception(type(e), e, e.__traceback__) e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) work_item = self.pending_work_items.pop(obj.work_id, None) - # work_item can be None if another process terminated. In this case, - # the queue_manager_thread fails all work_items with BrokenProcessPool + with self.shutdown_lock: + self.thread_wakeup.wakeup() + # work_item can be None if another process terminated. In this + # case, the executor_manager_thread fails all work_items + # with BrokenProcessPool if work_item is not None: work_item.future.set_exception(e) else: @@ -186,6 +192,7 @@ def _get_chunks(*iterables, chunksize): return yield chunk + def _process_chunk(fn, chunk): """ Processes a chunk of an iterable passed to map. @@ -249,120 +256,132 @@ def _process_worker(call_queue, result_queue, initializer, initargs): del call_item -def _add_call_item_to_queue(pending_work_items, - work_ids, - call_queue): - """Fills call_queue with _WorkItems from pending_work_items. +class _ExecutorManagerThread(threading.Thread): + """Manages the communication between this process and the worker processes. - This function never blocks. + The manager is run in a local thread. Args: - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids - are consumed and the corresponding _WorkItems from - pending_work_items are transformed into _CallItems and put in - call_queue. - call_queue: A multiprocessing.Queue that will be filled with _CallItems - derived from _WorkItems. + executor: A reference to the ProcessPoolExecutor that owns + this thread. A weakref will be own by the manager as well as + references to internal objects used to introspect the state of + the executor. """ - while True: - if call_queue.full(): - return - try: - work_id = work_ids.get(block=False) - except queue.Empty: - return - else: - work_item = pending_work_items[work_id] - - if work_item.future.set_running_or_notify_cancel(): - call_queue.put(_CallItem(work_id, - work_item.fn, - work_item.args, - work_item.kwargs), - block=True) - else: - del pending_work_items[work_id] - continue + def __init__(self, executor): + # Store references to necessary internals of the executor. + + # A _ThreadWakeup to allow waking up the queue_manager_thread from the + # main Thread and avoid deadlocks caused by permanently locked queues. + self.thread_wakeup = executor._executor_manager_thread_wakeup + self.shutdown_lock = executor._shutdown_lock + + # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used + # to determine if the ProcessPoolExecutor has been garbage collected + # and that the manager can exit. + # When the executor gets garbage collected, the weakref callback + # will wake up the queue management thread so that it can terminate + # if there is no pending work item. + def weakref_cb(_, + thread_wakeup=self.thread_wakeup, + shutdown_lock=self.shutdown_lock): + mp.util.debug('Executor collected: triggering callback for' + ' QueueManager wakeup') + with shutdown_lock: + thread_wakeup.wakeup() -def _queue_management_worker(executor_reference, - processes, - pending_work_items, - work_ids_queue, - call_queue, - result_queue, - thread_wakeup): - """Manages the communication between this process and the worker processes. + self.executor_reference = weakref.ref(executor, weakref_cb) - This function is run in a local thread. + # A list of the ctx.Process instances used as workers. + self.processes = executor._processes - Args: - executor_reference: A weakref.ref to the ProcessPoolExecutor that owns - this thread. Used to determine if the ProcessPoolExecutor has been - garbage collected and that this function can exit. - process: A list of the ctx.Process instances used as - workers. - pending_work_items: A dict mapping work ids to _WorkItems e.g. - {5: <_WorkItem...>, 6: <_WorkItem...>, ...} - work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). - call_queue: A ctx.Queue that will be filled with _CallItems - derived from _WorkItems for processing by the process workers. - result_queue: A ctx.SimpleQueue of _ResultItems generated by the - process workers. - thread_wakeup: A _ThreadWakeup to allow waking up the - queue_manager_thread from the main Thread and avoid deadlocks - caused by permanently locked queues. - """ - executor = None + # A ctx.Queue that will be filled with _CallItems derived from + # _WorkItems for processing by the process workers. + self.call_queue = executor._call_queue - def shutting_down(): - return (_global_shutdown or executor is None - or executor._shutdown_thread) + # A ctx.SimpleQueue of _ResultItems generated by the process workers. + self.result_queue = executor._result_queue - def shutdown_worker(): - # This is an upper bound on the number of children alive. - n_children_alive = sum(p.is_alive() for p in processes.values()) - n_children_to_stop = n_children_alive - n_sentinels_sent = 0 - # Send the right number of sentinels, to make sure all children are - # properly terminated. - while n_sentinels_sent < n_children_to_stop and n_children_alive > 0: - for i in range(n_children_to_stop - n_sentinels_sent): - try: - call_queue.put_nowait(None) - n_sentinels_sent += 1 - except Full: - break - n_children_alive = sum(p.is_alive() for p in processes.values()) + # A queue.Queue of work ids e.g. Queue([5, 6, ...]). + self.work_ids_queue = executor._work_ids - # Release the queue's resources as soon as possible. - call_queue.close() - # If .join() is not called on the created processes then - # some ctx.Queue methods may deadlock on Mac OS X. - for p in processes.values(): - p.join() + # A dict mapping work ids to _WorkItems e.g. + # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + self.pending_work_items = executor._pending_work_items - result_reader = result_queue._reader - wakeup_reader = thread_wakeup._reader - readers = [result_reader, wakeup_reader] + super().__init__() - while True: - _add_call_item_to_queue(pending_work_items, - work_ids_queue, - call_queue) + def run(self): + # Main loop for the executor manager thread. + + while True: + self.add_call_item_to_queue() + result_item, is_broken, cause = self.wait_result_broken_or_wakeup() + + if is_broken: + self.terminate_broken(cause) + return + if result_item is not None: + self.process_result_item(result_item) + # Delete reference to result_item to avoid keeping references + # while waiting on new results. + del result_item + + # attempt to increment idle process count + executor = self.executor_reference() + if executor is not None: + executor._idle_worker_semaphore.release() + del executor + + if self.is_shutting_down(): + self.flag_executor_shutting_down() + + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not self.pending_work_items: + self.join_executor_internals() + return + + def add_call_item_to_queue(self): + # Fills call_queue with _WorkItems from pending_work_items. + # This function never blocks. + while True: + if self.call_queue.full(): + return + try: + work_id = self.work_ids_queue.get(block=False) + except queue.Empty: + return + else: + work_item = self.pending_work_items[work_id] + + if work_item.future.set_running_or_notify_cancel(): + self.call_queue.put(_CallItem(work_id, + work_item.fn, + work_item.args, + work_item.kwargs), + block=True) + else: + del self.pending_work_items[work_id] + continue + + def wait_result_broken_or_wakeup(self): # Wait for a result to be ready in the result_queue while checking # that all worker processes are still running, or for a wake up # signal send. The wake up signals come either from new tasks being # submitted, from the executor being shutdown/gc-ed, or from the # shutdown of the python interpreter. - worker_sentinels = [p.sentinel for p in processes.values()] + result_reader = self.result_queue._reader + assert not self.thread_wakeup._closed + wakeup_reader = self.thread_wakeup._reader + readers = [result_reader, wakeup_reader] + worker_sentinels = [p.sentinel for p in list(self.processes.values())] ready = mp.connection.wait(readers + worker_sentinels) cause = None is_broken = True + result_item = None if result_reader in ready: try: result_item = result_reader.recv() @@ -372,79 +391,138 @@ def shutdown_worker(): elif wakeup_reader in ready: is_broken = False - result_item = None - thread_wakeup.clear() - if is_broken: - # Mark the process pool broken so that submits fail right now. - executor = executor_reference() - if executor is not None: - executor._broken = ('A child process terminated ' - 'abruptly, the process pool is not ' - 'usable anymore') - executor._shutdown_thread = True - executor = None - bpe = BrokenProcessPool("A process in the process pool was " - "terminated abruptly while the future was " - "running or pending.") - if cause is not None: - bpe.__cause__ = _RemoteTraceback( - f"\n'''\n{''.join(cause)}'''") - # All futures in flight must be marked failed - for work_id, work_item in pending_work_items.items(): - work_item.future.set_exception(bpe) - # Delete references to object. See issue16284 - del work_item - pending_work_items.clear() - # Terminate remaining workers forcibly: the queues or their - # locks may be in a dirty state and block forever. - for p in processes.values(): - p.terminate() - shutdown_worker() - return + + with self.shutdown_lock: + self.thread_wakeup.clear() + + return result_item, is_broken, cause + + def process_result_item(self, result_item): + # Process the received a result_item. This can be either the PID of a + # worker that exited gracefully or a _ResultItem + if isinstance(result_item, int): # Clean shutdown of a worker using its PID # (avoids marking the executor broken) - assert shutting_down() - p = processes.pop(result_item) + assert self.is_shutting_down() + p = self.processes.pop(result_item) p.join() - if not processes: - shutdown_worker() + if not self.processes: + self.join_executor_internals() return - elif result_item is not None: - work_item = pending_work_items.pop(result_item.work_id, None) + else: + # Received a _ResultItem so mark the future as completed. + work_item = self.pending_work_items.pop(result_item.work_id, None) # work_item can be None if another process terminated (see above) if work_item is not None: if result_item.exception: work_item.future.set_exception(result_item.exception) else: work_item.future.set_result(result_item.result) - # Delete references to object. See issue16284 - del work_item - # Delete reference to result_item - del result_item - # Check whether we should start shutting down. - executor = executor_reference() + def is_shutting_down(self): + # Check whether we should start shutting down the executor. + executor = self.executor_reference() # No more work items can be added if: # - The interpreter is shutting down OR # - The executor that owns this worker has been collected OR # - The executor that owns this worker has been shutdown. - if shutting_down(): - try: - # Flag the executor as shutting down as early as possible if it - # is not gc-ed yet. - if executor is not None: - executor._shutdown_thread = True - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - shutdown_worker() - return - except Full: - # This is not a problem: we will eventually be woken up (in - # result_queue.get()) and be able to send a sentinel again. - pass - executor = None + return (_global_shutdown or executor is None + or executor._shutdown_thread) + + def terminate_broken(self, cause): + # Terminate the executor because it is in a broken state. The cause + # argument can be used to display more information on the error that + # lead the executor into becoming broken. + + # Mark the process pool broken so that submits fail right now. + executor = self.executor_reference() + if executor is not None: + executor._broken = ('A child process terminated ' + 'abruptly, the process pool is not ' + 'usable anymore') + executor._shutdown_thread = True + executor = None + + # All pending tasks are to be marked failed with the following + # BrokenProcessPool error + bpe = BrokenProcessPool("A process in the process pool was " + "terminated abruptly while the future was " + "running or pending.") + if cause is not None: + bpe.__cause__ = _RemoteTraceback( + f"\n'''\n{''.join(cause)}'''") + + # Mark pending tasks as failed. + for work_id, work_item in self.pending_work_items.items(): + work_item.future.set_exception(bpe) + # Delete references to object. See issue16284 + del work_item + self.pending_work_items.clear() + + # Terminate remaining workers forcibly: the queues or their + # locks may be in a dirty state and block forever. + for p in self.processes.values(): + p.terminate() + + # clean up resources + self.join_executor_internals() + + def flag_executor_shutting_down(self): + # Flag the executor as shutting down and cancel remaining tasks if + # requested as early as possible if it is not gc-ed yet. + executor = self.executor_reference() + if executor is not None: + executor._shutdown_thread = True + # Cancel pending work items if requested. + if executor._cancel_pending_futures: + # Cancel all pending futures and update pending_work_items + # to only have futures that are currently running. + new_pending_work_items = {} + for work_id, work_item in self.pending_work_items.items(): + if not work_item.future.cancel(): + new_pending_work_items[work_id] = work_item + self.pending_work_items = new_pending_work_items + # Drain work_ids_queue since we no longer need to + # add items to the call queue. + while True: + try: + self.work_ids_queue.get_nowait() + except queue.Empty: + break + # Make sure we do this only once to not waste time looping + # on running processes over and over. + executor._cancel_pending_futures = False + + def shutdown_workers(self): + n_children_to_stop = self.get_n_children_alive() + n_sentinels_sent = 0 + # Send the right number of sentinels, to make sure all children are + # properly terminated. + while (n_sentinels_sent < n_children_to_stop + and self.get_n_children_alive() > 0): + for i in range(n_children_to_stop - n_sentinels_sent): + try: + self.call_queue.put_nowait(None) + n_sentinels_sent += 1 + except queue.Full: + break + + def join_executor_internals(self): + self.shutdown_workers() + # Release the queue's resources as soon as possible. + self.call_queue.close() + self.call_queue.join_thread() + with self.shutdown_lock: + self.thread_wakeup.close() + # If .join() is not called on the created processes then + # some ctx.Queue methods may deadlock on Mac OS X. + for p in self.processes.values(): + p.join() + + def get_n_children_alive(self): + # This is an upper bound on the number of children alive. + return sum(p.is_alive() for p in self.processes.values()) _system_limits_checked = False @@ -457,6 +535,14 @@ def _check_system_limits(): if _system_limited: raise NotImplementedError(_system_limited) _system_limits_checked = True + try: + import multiprocessing.synchronize + except ImportError: + _system_limited = ( + "This Python build lacks multiprocessing.synchronize, usually due " + "to named semaphores being unavailable on this platform." + ) + raise NotImplementedError(_system_limited) try: nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") except (AttributeError, ValueError): @@ -529,13 +615,17 @@ def __init__(self, max_workers=None, mp_context=None, mp_context = mp.get_context() self._mp_context = mp_context + # https://github.com/python/cpython/issues/90622 + self._safe_to_dynamically_spawn_children = ( + self._mp_context.get_start_method(allow_none=False) != "fork") + if initializer is not None and not callable(initializer): raise TypeError("initializer must be a callable") self._initializer = initializer self._initargs = initargs # Management thread - self._queue_management_thread = None + self._executor_manager_thread = None # Map of pids to processes self._processes = {} @@ -543,9 +633,21 @@ def __init__(self, max_workers=None, mp_context=None, # Shutdown is a two-step process. self._shutdown_thread = False self._shutdown_lock = threading.Lock() + self._idle_worker_semaphore = threading.Semaphore(0) self._broken = False self._queue_count = 0 self._pending_work_items = {} + self._cancel_pending_futures = False + + # _ThreadWakeup is a communication channel used to interrupt the wait + # of the main loop of executor_manager_thread from another thread (e.g. + # when calling executor.submit or executor.shutdown). We do not use the + # _result_queue to send wakeup signals to the executor_manager_thread + # as it could result in a deadlock if a worker process dies with the + # _result_queue write lock still acquired. + # + # _shutdown_lock must be locked to access _ThreadWakeup. + self._executor_manager_thread_wakeup = _ThreadWakeup() # Create communication channels for the executor # Make the call queue slightly larger than the number of processes to @@ -554,7 +656,9 @@ def __init__(self, max_workers=None, mp_context=None, queue_size = self._max_workers + EXTRA_QUEUED_CALLS self._call_queue = _SafeQueue( max_size=queue_size, ctx=self._mp_context, - pending_work_items=self._pending_work_items) + pending_work_items=self._pending_work_items, + shutdown_lock=self._shutdown_lock, + thread_wakeup=self._executor_manager_thread_wakeup) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed # processes anyway, so silence the tracebacks. @@ -562,68 +666,50 @@ def __init__(self, max_workers=None, mp_context=None, self._result_queue = mp_context.SimpleQueue() self._work_ids = queue.Queue() - # _ThreadWakeup is a communication channel used to interrupt the wait - # of the main loop of queue_manager_thread from another thread (e.g. - # when calling executor.submit or executor.shutdown). We do not use the - # _result_queue to send the wakeup signal to the queue_manager_thread - # as it could result in a deadlock if a worker process dies with the - # _result_queue write lock still acquired. - self._queue_management_thread_wakeup = _ThreadWakeup() - - def _start_queue_management_thread(self): - if self._queue_management_thread is None: - # When the executor gets garbarge collected, the weakref callback - # will wake up the queue management thread so that it can terminate - # if there is no pending work item. - def weakref_cb(_, - thread_wakeup=self._queue_management_thread_wakeup): - mp.util.debug('Executor collected: triggering callback for' - ' QueueManager wakeup') - thread_wakeup.wakeup() + def _start_executor_manager_thread(self): + if self._executor_manager_thread is None: # Start the processes so that their sentinels are known. - self._adjust_process_count() - self._queue_management_thread = threading.Thread( - target=_queue_management_worker, - args=(weakref.ref(self, weakref_cb), - self._processes, - self._pending_work_items, - self._work_ids, - self._call_queue, - self._result_queue, - self._queue_management_thread_wakeup), - name="QueueManagerThread") - self._queue_management_thread.daemon = True - self._queue_management_thread.start() - _threads_wakeups[self._queue_management_thread] = \ - self._queue_management_thread_wakeup + if not self._safe_to_dynamically_spawn_children: # ie, using fork. + self._launch_processes() + self._executor_manager_thread = _ExecutorManagerThread(self) + self._executor_manager_thread.start() + _threads_wakeups[self._executor_manager_thread] = \ + self._executor_manager_thread_wakeup def _adjust_process_count(self): - for _ in range(len(self._processes), self._max_workers): - p = self._mp_context.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue, - self._initializer, - self._initargs)) - p.start() - self._processes[p.pid] = p - - def submit(*args, **kwargs): - if len(args) >= 2: - self, fn, *args = args - elif not args: - raise TypeError("descriptor 'submit' of 'ProcessPoolExecutor' object " - "needs an argument") - elif 'fn' in kwargs: - fn = kwargs.pop('fn') - self, *args = args - import warnings - warnings.warn("Passing 'fn' as keyword argument is deprecated", - DeprecationWarning, stacklevel=2) - else: - raise TypeError('submit expected at least 1 positional argument, ' - 'got %d' % (len(args)-1)) + # if there's an idle process, we don't need to spawn a new one. + if self._idle_worker_semaphore.acquire(blocking=False): + return + process_count = len(self._processes) + if process_count < self._max_workers: + # Assertion disabled as this codepath is also used to replace a + # worker that unexpectedly dies, even when using the 'fork' start + # method. That means there is still a potential deadlock bug. If a + # 'fork' mp_context worker dies, we'll be forking a new one when + # we know a thread is running (self._executor_manager_thread). + #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622' + self._spawn_process() + + def _launch_processes(self): + # https://github.com/python/cpython/issues/90622 + assert not self._executor_manager_thread, ( + 'Processes cannot be fork()ed after the thread has started, ' + 'deadlock in the child processes could result.') + for _ in range(len(self._processes), self._max_workers): + self._spawn_process() + + def _spawn_process(self): + p = self._mp_context.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._initializer, + self._initargs)) + p.start() + self._processes[p.pid] = p + + def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock: if self._broken: raise BrokenProcessPool(self._broken) @@ -640,11 +726,12 @@ def submit(*args, **kwargs): self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread - self._queue_management_thread_wakeup.wakeup() + self._executor_manager_thread_wakeup.wakeup() - self._start_queue_management_thread() + if self._safe_to_dynamically_spawn_children: + self._adjust_process_count() + self._start_executor_manager_thread() return f - submit.__text_signature__ = _base.Executor.submit.__text_signature__ submit.__doc__ = _base.Executor.submit.__doc__ def map(self, fn, *iterables, timeout=None, chunksize=1): @@ -676,29 +763,24 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): timeout=timeout) return _chain_from_iterable_of_lists(results) - def shutdown(self, wait=True): + def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: + self._cancel_pending_futures = cancel_futures self._shutdown_thread = True - if self._queue_management_thread: - # Wake up queue management thread - self._queue_management_thread_wakeup.wakeup() - if wait: - self._queue_management_thread.join() + if self._executor_manager_thread_wakeup is not None: + # Wake up queue management thread + self._executor_manager_thread_wakeup.wakeup() + + if self._executor_manager_thread is not None and wait: + self._executor_manager_thread.join() # To reduce the risk of opening too many files, remove references to # objects that use file descriptors. - self._queue_management_thread = None - if self._call_queue is not None: - self._call_queue.close() - if wait: - self._call_queue.join_thread() - self._call_queue = None + self._executor_manager_thread = None + self._call_queue = None + if self._result_queue is not None and wait: + self._result_queue.close() self._result_queue = None self._processes = None - - if self._queue_management_thread_wakeup: - self._queue_management_thread_wakeup.close() - self._queue_management_thread_wakeup = None + self._executor_manager_thread_wakeup = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ - -atexit.register(_python_exit) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 74cacd885a..51c942f51a 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -5,7 +5,6 @@ __author__ = 'Brian Quinlan (brian@sweetapp.com)' -import atexit from concurrent.futures import _base import itertools import queue @@ -14,33 +13,34 @@ import weakref import os -# Workers are created as daemon threads. This is done to allow the interpreter -# to exit when there are still idle threads in a ThreadPoolExecutor's thread -# pool (i.e. shutdown() was not called). However, allowing workers to die with -# the interpreter has two undesirable properties: -# - The workers would still be running during interpreter shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. -# writing to a file. -# -# To work around this problem, an exit handler is installed which tells the -# workers to exit when their work queues are empty and then waits until the -# threads finish. _threads_queues = weakref.WeakKeyDictionary() _shutdown = False +# Lock that ensures that new workers are not created while the interpreter is +# shutting down. Must be held while mutating _threads_queues and _shutdown. +_global_shutdown_lock = threading.Lock() def _python_exit(): global _shutdown - _shutdown = True + with _global_shutdown_lock: + _shutdown = True items = list(_threads_queues.items()) for t, q in items: q.put(None) for t, q in items: t.join() -atexit.register(_python_exit) +# Register for `_python_exit()` to be called just before joining all +# non-daemon threads. This is used instead of `atexit.register()` for +# compatibility with subinterpreters, which no longer support daemon threads. +# See bpo-39812 for context. +threading._register_atexit(_python_exit) + +# At fork, reinitialize the `_global_shutdown_lock` lock in the child process +if hasattr(os, 'register_at_fork'): + os.register_at_fork(before=_global_shutdown_lock.acquire, + after_in_child=_global_shutdown_lock._at_fork_reinit, + after_in_parent=_global_shutdown_lock.release) class _WorkItem(object): @@ -158,23 +158,8 @@ def __init__(self, max_workers=None, thread_name_prefix='', self._initializer = initializer self._initargs = initargs - def submit(*args, **kwargs): - if len(args) >= 2: - self, fn, *args = args - elif not args: - raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object " - "needs an argument") - elif 'fn' in kwargs: - fn = kwargs.pop('fn') - self, *args = args - import warnings - warnings.warn("Passing 'fn' as keyword argument is deprecated", - DeprecationWarning, stacklevel=2) - else: - raise TypeError('submit expected at least 1 positional argument, ' - 'got %d' % (len(args)-1)) - - with self._shutdown_lock: + def submit(self, fn, /, *args, **kwargs): + with self._shutdown_lock, _global_shutdown_lock: if self._broken: raise BrokenThreadPool(self._broken) @@ -190,7 +175,6 @@ def submit(*args, **kwargs): self._work_queue.put(w) self._adjust_thread_count() return f - submit.__text_signature__ = _base.Executor.submit.__text_signature__ submit.__doc__ = _base.Executor.submit.__doc__ def _adjust_thread_count(self): @@ -212,7 +196,6 @@ def weakref_cb(_, q=self._work_queue): self._work_queue, self._initializer, self._initargs)) - t.daemon = True t.start() self._threads.add(t) _threads_queues[t] = self._work_queue @@ -230,9 +213,22 @@ def _initializer_failed(self): if work_item is not None: work_item.future.set_exception(BrokenThreadPool(self._broken)) - def shutdown(self, wait=True): + def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: self._shutdown = True + if cancel_futures: + # Drain all work items from the queue, and then cancel their + # associated futures. + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + break + if work_item is not None: + work_item.future.cancel() + + # Send a wake-up to prevent threads calling + # _work_queue.get(block=True) from permanently blocking. self._work_queue.put(None) if wait: for t in self._threads: diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index a8a403ce47..f55cf3656e 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -133,6 +133,7 @@ def task(): del task while not done: time.sleep(POLL_SLEEP) + support.gc_collect() # For PyPy or other GCs. self.assertEqual(thread._count(), orig) def test_unraisable_exception(self): @@ -227,30 +228,31 @@ def setUp(self): @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork') @threading_helper.reap_threads def test_forkinthread(self): - status = "not set" + pid = None - def thread1(): - nonlocal status + def fork_thread(read_fd, write_fd): + nonlocal pid # fork in a thread pid = os.fork() - if pid == 0: - # child - try: - os.close(self.read_fd) - os.write(self.write_fd, b"OK") - finally: - os._exit(0) - else: - # parent - os.close(self.write_fd) - pid, status = os.waitpid(pid, 0) + if pid: + # parent process + return + + # child process + try: + os.close(read_fd) + os.write(write_fd, b"OK") + finally: + os._exit(0) with threading_helper.wait_threads_exit(): - thread.start_new_thread(thread1, ()) - self.assertEqual(os.read(self.read_fd, 2), b"OK", - "Unable to fork() in thread") - self.assertEqual(status, 0) + thread.start_new_thread(fork_thread, (self.read_fd, self.write_fd)) + self.assertEqual(os.read(self.read_fd, 2), b"OK") + os.close(self.write_fd) + + self.assertIsNotNone(pid) + support.wait_process(pid, exitcode=0) def tearDown(self): try: diff --git a/Lib/test/test_threadedtempfile.py b/Lib/test/test_threadedtempfile.py index 9095637327..b088f5baf7 100644 --- a/Lib/test/test_threadedtempfile.py +++ b/Lib/test/test_threadedtempfile.py @@ -19,9 +19,10 @@ import unittest import io import threading -import sys from traceback import print_exc +import sys # XXX: RUSTPYTHON + NUM_THREADS = 20 FILES_PER_THREAD = 50 diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index c9b0b5eda6..0f6eceda73 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -18,7 +18,10 @@ import os import subprocess import signal +import textwrap +import traceback +from unittest import mock from test import lock_tests from test import support @@ -29,6 +32,14 @@ # on platforms known to behave badly. platforms_to_skip = ('netbsd5', 'hp-ux11') +# Is Python built with Py_DEBUG macro defined? +Py_DEBUG = hasattr(sys, 'gettotalrefcount') + + +def restore_default_excepthook(testcase): + testcase.addCleanup(setattr, threading, 'excepthook', threading.excepthook) + threading.excepthook = threading.__excepthook__ + # A trivial mutable counter. class Counter(object): @@ -85,6 +96,39 @@ def tearDown(self): class ThreadTests(BaseTestCase): + @cpython_only + def test_name(self): + def func(): pass + + thread = threading.Thread(name="myname1") + self.assertEqual(thread.name, "myname1") + + # Convert int name to str + thread = threading.Thread(name=123) + self.assertEqual(thread.name, "123") + + # target name is ignored if name is specified + thread = threading.Thread(target=func, name="myname2") + self.assertEqual(thread.name, "myname2") + + with mock.patch.object(threading, '_counter', return_value=2): + thread = threading.Thread(name="") + self.assertEqual(thread.name, "Thread-2") + + with mock.patch.object(threading, '_counter', return_value=3): + thread = threading.Thread() + self.assertEqual(thread.name, "Thread-3") + + with mock.patch.object(threading, '_counter', return_value=5): + thread = threading.Thread(target=func) + self.assertEqual(thread.name, "Thread-5 (func)") + + @cpython_only + def test_disallow_instantiation(self): + # Ensure that the type disallows instantiation (bpo-43916) + lock = threading.Lock() + test.support.check_disallow_instantiation(self, type(lock)) + # Create a bunch of threads, let each do some work, wait until all are # done. def test_various_ops(self): @@ -125,9 +169,9 @@ def test_various_ops(self): def test_ident_of_no_threading_threads(self): # The ident still must work for the main thread and dummy threads. - self.assertIsNotNone(threading.currentThread().ident) + self.assertIsNotNone(threading.current_thread().ident) def f(): - ident.append(threading.currentThread().ident) + ident.append(threading.current_thread().ident) done.set() done = threading.Event() ident = [] @@ -265,7 +309,7 @@ def run(self): self.assertEqual(result, 1) # one thread state modified if verbose: print(" waiting for worker to say it caught the exception") - worker_saw_exception.wait(timeout=10) + worker_saw_exception.wait(timeout=support.SHORT_TIMEOUT) self.assertTrue(t.finished) if verbose: print(" all OK -- joining worker") @@ -288,7 +332,7 @@ def fail_new_thread(*args): finally: threading._start_new_thread = _start_new_thread - def test_finalize_runnning_thread(self): + def test_finalize_running_thread(self): # Issue 1402: the PyGILState_Ensure / _Release functions may be called # very late on python exit: on deallocation of a running thread for # example. @@ -402,6 +446,8 @@ def _run(self, other_ref, yet_another): if self.should_raise: raise SystemExit + restore_default_excepthook(self) + cyclic_object = RunSelfFunction(should_raise=False) weak_cyclic_object = weakref.ref(cyclic_object) cyclic_object.thread.join() @@ -422,15 +468,32 @@ def test_old_threading_api(self): # Just a quick sanity check to make sure the old method names are # still present t = threading.Thread() - t.isDaemon() - t.setDaemon(True) - t.getName() - t.setName("name") - with self.assertWarnsRegex(DeprecationWarning, 'use is_alive()'): - t.isAlive() + with self.assertWarnsRegex(DeprecationWarning, + r'get the daemon attribute'): + t.isDaemon() + with self.assertWarnsRegex(DeprecationWarning, + r'set the daemon attribute'): + t.setDaemon(True) + with self.assertWarnsRegex(DeprecationWarning, + r'get the name attribute'): + t.getName() + with self.assertWarnsRegex(DeprecationWarning, + r'set the name attribute'): + t.setName("name") + e = threading.Event() - e.isSet() - threading.activeCount() + with self.assertWarnsRegex(DeprecationWarning, 'use is_set()'): + e.isSet() + + cond = threading.Condition() + cond.acquire() + with self.assertWarnsRegex(DeprecationWarning, 'use notify_all()'): + cond.notifyAll() + + with self.assertWarnsRegex(DeprecationWarning, 'use active_count()'): + threading.activeCount() + with self.assertWarnsRegex(DeprecationWarning, 'use current_thread()'): + threading.currentThread() def test_repr_daemon(self): t = threading.Thread() @@ -446,6 +509,34 @@ def test_daemon_param(self): t = threading.Thread(daemon=True) self.assertTrue(t.daemon) + @unittest.skipUnless(hasattr(os, 'fork'), 'needs os.fork()') + def test_fork_at_exit(self): + # bpo-42350: Calling os.fork() after threading._shutdown() must + # not log an error. + code = textwrap.dedent(""" + import atexit + import os + import sys + from test.support import wait_process + + # Import the threading module to register its "at fork" callback + import threading + + def exit_handler(): + pid = os.fork() + if not pid: + print("child process ok", file=sys.stderr, flush=True) + # child process + else: + wait_process(pid, exitcode=0) + + # exit_handler() will be called after threading._shutdown() + atexit.register(exit_handler) + """) + _, out, err = assert_python_ok("-c", code) + self.assertEqual(out, b'') + self.assertEqual(err.rstrip(), b'child process ok') + @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()') def test_dummy_thread_after_fork(self): # Issue #14308: a dummy thread in the active list doesn't mess up @@ -492,9 +583,7 @@ def test_is_alive_after_fork(self): else: t.join() - pid, status = os.waitpid(pid, 0) - self.assertTrue(os.WIFEXITED(status)) - self.assertEqual(10, os.WEXITSTATUS(status)) + support.wait_process(pid, exitcode=10) def test_main_thread(self): main = threading.main_thread() @@ -514,6 +603,7 @@ def f(): def test_main_thread_after_fork(self): code = """if 1: import os, threading + from test import support pid = os.fork() if pid == 0: @@ -522,7 +612,7 @@ def test_main_thread_after_fork(self): print(main.ident == threading.current_thread().ident) print(main.ident == threading.get_ident()) else: - os.waitpid(pid, 0) + support.wait_process(pid, exitcode=0) """ _, out, err = assert_python_ok("-c", code) data = out.decode().replace('\r', '') @@ -535,8 +625,9 @@ def test_main_thread_after_fork(self): def test_main_thread_after_fork_from_nonmain_thread(self): code = """if 1: import os, threading, sys + from test import support - def f(): + def func(): pid = os.fork() if pid == 0: main = threading.main_thread() @@ -547,16 +638,16 @@ def f(): # we have to flush before exit. sys.stdout.flush() else: - os.waitpid(pid, 0) + support.wait_process(pid, exitcode=0) - th = threading.Thread(target=f) + th = threading.Thread(target=func) th.start() th.join() """ _, out, err = assert_python_ok("-c", code) data = out.decode().replace('\r', '') self.assertEqual(err, b"") - self.assertEqual(data, "Thread-1\nTrue\nTrue\n") + self.assertEqual(data, "Thread-1 (func)\nTrue\nTrue\n") # TODO: RUSTPYTHON @unittest.expectedFailure @@ -647,7 +738,7 @@ def f(): finish.release() # When the thread ends, the state_lock can be successfully # acquired. - self.assertTrue(tstate_lock.acquire(timeout=5), False) + self.assertTrue(tstate_lock.acquire(timeout=support.SHORT_TIMEOUT), False) # But is_alive() is still True: we hold _tstate_lock now, which # prevents is_alive() from knowing the thread's end-of-life C code # is done. @@ -744,6 +835,27 @@ def callback(): finally: sys.settrace(old_trace) + def test_gettrace(self): + def noop_trace(frame, event, arg): + # no operation + return noop_trace + old_trace = threading.gettrace() + try: + threading.settrace(noop_trace) + trace_func = threading.gettrace() + self.assertEqual(noop_trace,trace_func) + finally: + threading.settrace(old_trace) + + def test_getprofile(self): + def fn(*args): pass + old_profile = threading.getprofile() + try: + threading.setprofile(fn) + self.assertEqual(fn, threading.getprofile()) + finally: + threading.setprofile(old_profile) + @cpython_only def test_shutdown_locks(self): for daemon in (False, True): @@ -768,6 +880,95 @@ def test_shutdown_locks(self): # Daemon threads must never add it to _shutdown_locks. self.assertNotIn(tstate_lock, threading._shutdown_locks) + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_locals_at_exit(self): + # bpo-19466: thread locals must not be deleted before destructors + # are called + rc, out, err = assert_python_ok("-c", """if 1: + import threading + + class Atexit: + def __del__(self): + print("thread_dict.atexit = %r" % thread_dict.atexit) + + thread_dict = threading.local() + thread_dict.atexit = "value" + + atexit = Atexit() + """) + self.assertEqual(out.rstrip(), b"thread_dict.atexit = 'value'") + + def test_boolean_target(self): + # bpo-41149: A thread that had a boolean value of False would not + # run, regardless of whether it was callable. The correct behaviour + # is for a thread to do nothing if its target is None, and to call + # the target otherwise. + class BooleanTarget(object): + def __init__(self): + self.ran = False + def __bool__(self): + return False + def __call__(self): + self.ran = True + + target = BooleanTarget() + thread = threading.Thread(target=target) + thread.start() + thread.join() + self.assertTrue(target.ran) + + def test_leak_without_join(self): + # bpo-37788: Test that a thread which is not joined explicitly + # does not leak. Test written for reference leak checks. + def noop(): pass + with threading_helper.wait_threads_exit(): + threading.Thread(target=noop).start() + # Thread.join() is not called + + @unittest.skipUnless(Py_DEBUG, 'need debug build (Py_DEBUG)') + def test_debug_deprecation(self): + # bpo-44584: The PYTHONTHREADDEBUG environment variable is deprecated + rc, out, err = assert_python_ok("-Wdefault", "-c", "pass", + PYTHONTHREADDEBUG="1") + msg = (b'DeprecationWarning: The threading debug ' + b'(PYTHONTHREADDEBUG environment variable) ' + b'is deprecated and will be removed in Python 3.12') + self.assertIn(msg, err) + + def test_import_from_another_thread(self): + # bpo-1596321: If the threading module is first import from a thread + # different than the main thread, threading._shutdown() must handle + # this case without logging an error at Python exit. + code = textwrap.dedent(''' + import _thread + import sys + + event = _thread.allocate_lock() + event.acquire() + + def import_threading(): + import threading + event.release() + + if 'threading' in sys.modules: + raise Exception('threading is already imported') + + _thread.start_new_thread(import_threading, ()) + + # wait until the threading module is imported + event.acquire() + event.release() + + if 'threading' not in sys.modules: + raise Exception('threading is not imported') + + # don't wait until the thread completes + ''') + rc, out, err = assert_python_ok("-c", code) + self.assertEqual(out, b'') + self.assertEqual(err, b'') + class ThreadJoinOnShutdown(BaseTestCase): @@ -807,11 +1008,15 @@ def test_1_join_on_shutdown(self): def test_2_join_in_forked_process(self): # Like the test above, but from a forked interpreter script = """if 1: + from test import support + childpid = os.fork() if childpid != 0: - os.waitpid(childpid, 0) + # parent process + support.wait_process(childpid, exitcode=0) sys.exit(0) + # child process t = threading.Thread(target=joiningfunc, args=(threading.current_thread(),)) t.start() @@ -826,13 +1031,17 @@ def test_3_join_in_forked_from_thread(self): # In the forked process, the main Thread object must be marked as stopped. script = """if 1: + from test import support + main_thread = threading.current_thread() def worker(): childpid = os.fork() if childpid != 0: - os.waitpid(childpid, 0) + # parent process + support.wait_process(childpid, exitcode=0) sys.exit(0) + # child process t = threading.Thread(target=joiningfunc, args=(main_thread,)) print('end of main') @@ -895,9 +1104,9 @@ def do_fork_and_wait(): # just fork a child process and wait it pid = os.fork() if pid > 0: - os.waitpid(pid, 0) + support.wait_process(pid, exitcode=50) else: - os._exit(0) + os._exit(50) # start a bunch of threads that will fork() child processes threads = [] @@ -924,28 +1133,32 @@ def test_clear_threads_states_after_fork(self): if pid == 0: # check that threads states have been cleared if len(sys._current_frames()) == 1: - os._exit(0) + os._exit(51) else: - os._exit(1) + os._exit(52) else: - _, status = os.waitpid(pid, 0) - self.assertEqual(0, status) + support.wait_process(pid, exitcode=51) for t in threads: t.join() class SubinterpThreadingTests(BaseTestCase): + def pipe(self): + r, w = os.pipe() + self.addCleanup(os.close, r) + self.addCleanup(os.close, w) + if hasattr(os, 'set_blocking'): + os.set_blocking(r, False) + return (r, w) # TODO: RUSTPYTHON @unittest.expectedFailure def test_threads_join(self): # Non-daemon threads should be joined at subinterpreter shutdown # (issue #18808) - r, w = os.pipe() - self.addCleanup(os.close, r) - self.addCleanup(os.close, w) - code = r"""if 1: + r, w = self.pipe() + code = textwrap.dedent(r""" import os import random import threading @@ -963,7 +1176,7 @@ def f(): threading.Thread(target=f).start() random_sleep() - """ % (w,) + """ % (w,)) ret = test.support.run_in_subinterp(code) self.assertEqual(ret, 0) # The thread was joined properly. @@ -976,10 +1189,8 @@ def test_threads_join_2(self): # Python code returned but before the thread state is deleted. # To achieve this, we register a thread-local object which sleeps # a bit when deallocated. - r, w = os.pipe() - self.addCleanup(os.close, r) - self.addCleanup(os.close, w) - code = r"""if 1: + r, w = self.pipe() + code = textwrap.dedent(r""" import os import random import threading @@ -1004,7 +1215,7 @@ def f(): threading.Thread(target=f).start() random_sleep() - """ % (w,) + """ % (w,)) ret = test.support.run_in_subinterp(code) self.assertEqual(ret, 0) # The thread was joined properly. @@ -1012,7 +1223,7 @@ def f(): @cpython_only def test_daemon_threads_fatal_error(self): - subinterp_code = r"""if 1: + subinterp_code = f"""if 1: import os import threading import time @@ -1020,7 +1231,7 @@ def test_daemon_threads_fatal_error(self): def f(): # Make sure the daemon thread is still running when # Py_EndInterpreter is called. - time.sleep(10) + time.sleep({test.support.SHORT_TIMEOUT}) threading.Thread(target=f, daemon=True).start() """ script = r"""if 1: @@ -1202,6 +1413,22 @@ def run(self): # explicitly break the reference cycle to not leak a dangling thread thread.exc = None + def test_multithread_modify_file_noerror(self): + # See issue25872 + def modify_file(): + with open(os_helper.TESTFN, 'w', encoding='utf-8') as fp: + fp.write(' ') + traceback.format_stack() + + self.addCleanup(os_helper.unlink, os_helper.TESTFN) + threads = [ + threading.Thread(target=modify_file) + for i in range(100) + ] + for t in threads: + t.start() + t.join() + class ThreadRunFail(threading.Thread): def run(self): @@ -1209,6 +1436,10 @@ def run(self): class ExceptHookTests(BaseTestCase): + def setUp(self): + restore_default_excepthook(self) + super().setUp() + def test_excepthook(self): with support.captured_output("stderr") as stderr: thread = ThreadRunFail(name="excepthook thread") @@ -1297,6 +1528,27 @@ def sys_hook(exc_type, exc_value, exc_traceback): 'Exception in threading.excepthook:\n') self.assertEqual(err_str, 'threading_hook failed') + def test_original_excepthook(self): + def run_thread(): + with support.captured_output("stderr") as output: + thread = ThreadRunFail(name="excepthook thread") + thread.start() + thread.join() + return output.getvalue() + + def threading_hook(args): + print("Running a thread failed", file=sys.stderr) + + default_output = run_thread() + with support.swap_attr(threading, 'excepthook', threading_hook): + custom_hook_output = run_thread() + threading.excepthook = threading.__excepthook__ + recovered_output = run_thread() + + self.assertEqual(default_output, recovered_output) + self.assertNotEqual(default_output, custom_hook_output) + self.assertEqual(custom_hook_output, "Running a thread failed\n") + class TimerTests(BaseTestCase): @@ -1349,6 +1601,11 @@ def test_release_save_unacquired(self): class EventTests(lock_tests.EventTests): eventtype = staticmethod(threading.Event) + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_reset_internal_locks(): # TODO: RUSTPYTHON; remove this when done + super().test_reset_internal_locks() + class ConditionAsRLockTests(lock_tests.RLockTests): # Condition uses an RLock by default and exports its API. locktype = staticmethod(threading.Condition) @@ -1375,6 +1632,8 @@ class MiscTestCase(unittest.TestCase): # TODO: RUSTPYTHON @unittest.expectedFailure def test__all__(self): + restore_default_excepthook(self) + extra = {"ThreadError"} not_exported = {'currentThread', 'activeCount'} support.check__all__(self, threading, ('threading', '_thread'), @@ -1382,6 +1641,30 @@ def test__all__(self): class InterruptMainTests(unittest.TestCase): + def check_interrupt_main_with_signal_handler(self, signum): + def handler(signum, frame): + 1/0 + + old_handler = signal.signal(signum, handler) + self.addCleanup(signal.signal, signum, old_handler) + + with self.assertRaises(ZeroDivisionError): + _thread.interrupt_main() + + def check_interrupt_main_noerror(self, signum): + handler = signal.getsignal(signum) + try: + # No exception should arise. + signal.signal(signum, signal.SIG_IGN) + _thread.interrupt_main(signum) + + signal.signal(signum, signal.SIG_DFL) + _thread.interrupt_main(signum) + finally: + # Restore original handler + signal.signal(signum, handler) + + @unittest.skip("TODO: RUSTPYTHON; flaky") def test_interrupt_main_subthread(self): # Calling start_new_thread with a function that executes interrupt_main # should raise KeyboardInterrupt upon completion. @@ -1399,18 +1682,99 @@ def test_interrupt_main_mainthread(self): with self.assertRaises(KeyboardInterrupt): _thread.interrupt_main() + def test_interrupt_main_with_signal_handler(self): + self.check_interrupt_main_with_signal_handler(signal.SIGINT) + self.check_interrupt_main_with_signal_handler(signal.SIGTERM) + def test_interrupt_main_noerror(self): - handler = signal.getsignal(signal.SIGINT) - try: - # No exception should arise. - signal.signal(signal.SIGINT, signal.SIG_IGN) - _thread.interrupt_main() + self.check_interrupt_main_noerror(signal.SIGINT) + self.check_interrupt_main_noerror(signal.SIGTERM) + + def test_interrupt_main_invalid_signal(self): + self.assertRaises(ValueError, _thread.interrupt_main, -1) + self.assertRaises(ValueError, _thread.interrupt_main, signal.NSIG) + self.assertRaises(ValueError, _thread.interrupt_main, 1000000) + + @threading_helper.reap_threads + def test_can_interrupt_tight_loops(self): + cont = [True] + started = [False] + interrupted = [False] + + def worker(started, cont, interrupted): + iterations = 100_000_000 + started[0] = True + while cont[0]: + if iterations: + iterations -= 1 + else: + return + pass + interrupted[0] = True - signal.signal(signal.SIGINT, signal.SIG_DFL) - _thread.interrupt_main() - finally: - # Restore original handler - signal.signal(signal.SIGINT, handler) + t = threading.Thread(target=worker,args=(started, cont, interrupted)) + t.start() + while not started[0]: + pass + cont[0] = False + t.join() + self.assertTrue(interrupted[0]) + + +class AtexitTests(unittest.TestCase): + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_atexit_output(self): + rc, out, err = assert_python_ok("-c", """if True: + import threading + + def run_last(): + print('parrot') + + threading._register_atexit(run_last) + """) + + self.assertFalse(err) + self.assertEqual(out.strip(), b'parrot') + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_atexit_called_once(self): + rc, out, err = assert_python_ok("-c", """if True: + import threading + from unittest.mock import Mock + + mock = Mock() + threading._register_atexit(mock) + mock.assert_not_called() + # force early shutdown to ensure it was called once + threading._shutdown() + mock.assert_called_once() + """) + + self.assertFalse(err) + + # TODO: RUSTPYTHON + @unittest.expectedFailure + def test_atexit_after_shutdown(self): + # The only way to do this is by registering an atexit within + # an atexit, which is intended to raise an exception. + rc, out, err = assert_python_ok("-c", """if True: + import threading + + def func(): + pass + + def run_last(): + threading._register_atexit(func) + + threading._register_atexit(run_last) + """) + + self.assertTrue(err) + self.assertIn("RuntimeError: can't register atexit after shutdown", + err.decode()) if __name__ == "__main__": diff --git a/Lib/test/test_threading_local.py b/Lib/test/test_threading_local.py index 5485d3e0ee..3443e3875d 100644 --- a/Lib/test/test_threading_local.py +++ b/Lib/test/test_threading_local.py @@ -4,7 +4,7 @@ from test import support from test.support import threading_helper import weakref -# import gc +import gc # Modules under test import _thread @@ -38,7 +38,7 @@ def _local_refs(self, n): t.join() del t - # gc.collect() + support.gc_collect() # For PyPy or other GCs. self.assertEqual(len(weaklist), n) # XXX _threading_local keeps the local of the last stopped thread alive. @@ -47,7 +47,7 @@ def _local_refs(self, n): # Assignment to the same thread local frees it sometimes (!) local.someothervar = None - # gc.collect() + support.gc_collect() # For PyPy or other GCs. deadlist = [weak for weak in weaklist if weak() is None] self.assertIn(len(deadlist), (n-1, n), (n, len(deadlist))) @@ -90,7 +90,7 @@ def f(): # 2) GC the cycle (triggers threadmodule.c::local_clear # before local_dealloc) del cycle - # gc.collect() + support.gc_collect() # For PyPy or other GCs. e1.set() e2.wait() @@ -110,8 +110,6 @@ def f(): self.assertTrue(passed) - # TODO: RUSTPYTHON, __new__ vs __init__ cooperation - @unittest.expectedFailure def test_arguments(self): # Issue 1522237 class MyLocal(self._local): @@ -195,13 +193,19 @@ class X: x.local.x = x wr = weakref.ref(x) del x - # gc.collect() + support.gc_collect() # For PyPy or other GCs. self.assertIsNone(wr()) class ThreadLocalTest(unittest.TestCase, BaseLocalTest): _local = _thread._local + # TODO: RUSTPYTHON, __new__ vs __init__ cooperation + @unittest.expectedFailure + def test_arguments(): + super().test_arguments() + + class PyThreadingLocalTest(unittest.TestCase, BaseLocalTest): _local = _threading_local.local @@ -210,7 +214,7 @@ def test_main(): suite = unittest.TestSuite() suite.addTest(DocTestSuite('_threading_local')) suite.addTest(unittest.makeSuite(ThreadLocalTest)) - # suite.addTest(unittest.makeSuite(PyThreadingLocalTest)) + suite.addTest(unittest.makeSuite(PyThreadingLocalTest)) local_orig = _threading_local.local def setUp(test): diff --git a/Lib/threading.py b/Lib/threading.py index 813dae2aa9..668126523d 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -3,6 +3,7 @@ import os as _os import sys as _sys import _thread +import functools from time import monotonic as _time from _weakrefset import WeakSet @@ -27,7 +28,7 @@ 'Event', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size', - 'excepthook', 'ExceptHookArgs'] + 'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile'] # Rename some stuff so "from threading import *" is safe _start_new_thread = _thread.start_new_thread @@ -64,6 +65,10 @@ def setprofile(func): global _profile_hook _profile_hook = func +def getprofile(): + """Get the profiler function as set by threading.setprofile().""" + return _profile_hook + def settrace(func): """Set a trace function for all threads started from the threading module. @@ -74,6 +79,10 @@ def settrace(func): global _trace_hook _trace_hook = func +def gettrace(): + """Get the trace function as set by threading.settrace().""" + return _trace_hook + # Synchronization classes Lock = _allocate_lock @@ -121,6 +130,11 @@ def __repr__(self): hex(id(self)) ) + def _at_fork_reinit(self): + self._block._at_fork_reinit() + self._owner = None + self._count = 0 + def acquire(self, blocking=True, timeout=-1): """Acquire a lock, blocking or non-blocking. @@ -243,6 +257,10 @@ def __init__(self, lock=None): pass self._waiters = _deque() + def _at_fork_reinit(self): + self._lock._at_fork_reinit() + self._waiters.clear() + def __enter__(self): return self._lock.__enter__() @@ -261,7 +279,7 @@ def _acquire_restore(self, x): def _is_owned(self): # Return True if lock is owned by current_thread. # This method is called only if _lock doesn't have _is_owned(). - if self._lock.acquire(0): + if self._lock.acquire(False): self._lock.release() return False else: @@ -350,14 +368,21 @@ def notify(self, n=1): """ if not self._is_owned(): raise RuntimeError("cannot notify on un-acquired lock") - all_waiters = self._waiters - waiters_to_notify = _deque(_islice(all_waiters, n)) - if not waiters_to_notify: - return - for waiter in waiters_to_notify: - waiter.release() + waiters = self._waiters + while waiters and n > 0: + waiter = waiters[0] + try: + waiter.release() + except RuntimeError: + # gh-92530: The previous call of notify() released the lock, + # but was interrupted before removing it from the queue. + # It can happen if a signal handler raises an exception, + # like CTRL+C which raises KeyboardInterrupt. + pass + else: + n -= 1 try: - all_waiters.remove(waiter) + waiters.remove(waiter) except ValueError: pass @@ -370,7 +395,16 @@ def notify_all(self): """ self.notify(len(self._waiters)) - notifyAll = notify_all + def notifyAll(self): + """Wake up all threads waiting on this condition. + + This method is deprecated, use notify_all() instead. + + """ + import warnings + warnings.warn('notifyAll() is deprecated, use notify_all() instead', + DeprecationWarning, stacklevel=2) + self.notify_all() class Semaphore: @@ -438,16 +472,19 @@ def acquire(self, blocking=True, timeout=None): __enter__ = acquire - def release(self): - """Release a semaphore, incrementing the internal counter by one. + def release(self, n=1): + """Release a semaphore, incrementing the internal counter by one or more. When the counter is zero on entry and another thread is waiting for it to become larger than zero again, wake up that thread. """ + if n < 1: + raise ValueError('n must be one or more') with self._cond: - self._value += 1 - self._cond.notify() + self._value += n + for i in range(n): + self._cond.notify() def __exit__(self, t, v, tb): self.release() @@ -474,8 +511,8 @@ def __init__(self, value=1): Semaphore.__init__(self, value) self._initial_value = value - def release(self): - """Release a semaphore, incrementing the internal counter by one. + def release(self, n=1): + """Release a semaphore, incrementing the internal counter by one or more. When the counter is zero on entry and another thread is waiting for it to become larger than zero again, wake up that thread. @@ -484,11 +521,14 @@ def release(self): raise a ValueError. """ + if n < 1: + raise ValueError('n must be one or more') with self._cond: - if self._value >= self._initial_value: + if self._value + n > self._initial_value: raise ValueError("Semaphore released too many times") - self._value += 1 - self._cond.notify() + self._value += n + for i in range(n): + self._cond.notify() class Event: @@ -506,15 +546,24 @@ def __init__(self): self._cond = Condition(Lock()) self._flag = False - def _reset_internal_locks(self): - # private! called by Thread._reset_internal_locks by _after_fork() - self._cond.__init__(Lock()) + def _at_fork_reinit(self): + # Private method called by Thread._reset_internal_locks() + self._cond._at_fork_reinit() def is_set(self): """Return true if and only if the internal flag is true.""" return self._flag - isSet = is_set + def isSet(self): + """Return true if and only if the internal flag is true. + + This method is deprecated, use notify_all() instead. + + """ + import warnings + warnings.warn('isSet() is deprecated, use is_set() instead', + DeprecationWarning, stacklevel=2) + return self.is_set() def set(self): """Set the internal flag to true. @@ -592,7 +641,7 @@ def __init__(self, parties, action=None, timeout=None): self._action = action self._timeout = timeout self._parties = parties - self._state = 0 #0 filling, 1, draining, -1 resetting, -2 broken + self._state = 0 # 0 filling, 1 draining, -1 resetting, -2 broken self._count = 0 def wait(self, timeout=None): @@ -729,22 +778,39 @@ class BrokenBarrierError(RuntimeError): # Helper to generate new thread names -_counter = _count().__next__ -_counter() # Consume 0 so first non-main thread has id 1. -def _newname(template="Thread-%d"): - return template % _counter() - -# Active thread administration -_active_limbo_lock = _allocate_lock() +_counter = _count(1).__next__ +def _newname(name_template): + return name_template % _counter() + +# Active thread administration. +# +# bpo-44422: Use a reentrant lock to allow reentrant calls to functions like +# threading.enumerate(). +_active_limbo_lock = RLock() _active = {} # maps thread id to Thread object _limbo = {} _dangling = WeakSet() + # Set of Thread._tstate_lock locks of non-daemon threads used by _shutdown() # to wait until all Python thread states get deleted: # see Thread._set_tstate_lock(). _shutdown_locks_lock = _allocate_lock() _shutdown_locks = set() +def _maintain_shutdown_locks(): + """ + Drop any shutdown locks that don't correspond to running threads anymore. + + Calling this from time to time avoids an ever-growing _shutdown_locks + set when Thread objects are not joined explicitly. See bpo-37788. + + This must be called with _shutdown_locks_lock acquired. + """ + # If a lock was released, the corresponding thread has exited + to_remove = [lock for lock in _shutdown_locks if not lock.locked()] + _shutdown_locks.difference_update(to_remove) + + # Main class for threads class Thread: @@ -784,8 +850,19 @@ class is implemented. assert group is None, "group argument must be None for now" if kwargs is None: kwargs = {} + if name: + name = str(name) + else: + name = _newname("Thread-%d") + if target is not None: + try: + target_name = target.__name__ + name += f" ({target_name})" + except AttributeError: + pass + self._target = target - self._name = str(name or _newname()) + self._name = name self._args = args self._kwargs = kwargs if daemon is not None: @@ -808,9 +885,14 @@ class is implemented. def _reset_internal_locks(self, is_alive): # private! Called by _after_fork() to reset our internal locks as # they may be in an invalid state leading to a deadlock or crash. - self._started._reset_internal_locks() + self._started._at_fork_reinit() if is_alive: - self._set_tstate_lock() + # bpo-42350: If the fork happens when the thread is already stopped + # (ex: after threading._shutdown() has been called), _tstate_lock + # is None. Do nothing in this case. + if self._tstate_lock is not None: + self._tstate_lock._at_fork_reinit() + self._tstate_lock.acquire() else: # The thread isn't alive after fork: it doesn't have a tstate # anymore. @@ -846,6 +928,7 @@ def start(self): if self._started.is_set(): raise RuntimeError("threads can only be started once") + with _active_limbo_lock: _limbo[self] = self try: @@ -866,7 +949,7 @@ def run(self): """ try: - if self._target: + if self._target is not None: self._target(*self._args, **self._kwargs) finally: # Avoid a refcycle if the thread is running a function with @@ -910,6 +993,7 @@ def _set_tstate_lock(self): if not self.daemon: with _shutdown_locks_lock: + _maintain_shutdown_locks() _shutdown_locks.add(self._tstate_lock) def _bootstrap_inner(self): @@ -965,7 +1049,8 @@ def _stop(self): self._tstate_lock = None if not self.daemon: with _shutdown_locks_lock: - _shutdown_locks.discard(lock) + # Remove our lock and other released locks from _shutdown_locks + _maintain_shutdown_locks() def _delete(self): "Remove current thread from the dict of currently running threads." @@ -1022,11 +1107,24 @@ def _wait_for_tstate_lock(self, block=True, timeout=-1): # If the lock is acquired, the C code is done, and self._stop() is # called. That sets ._is_stopped to True, and ._tstate_lock to None. lock = self._tstate_lock - if lock is None: # already determined that the C code is done + if lock is None: + # already determined that the C code is done assert self._is_stopped - elif lock.acquire(block, timeout): - lock.release() - self._stop() + return + + try: + if lock.acquire(block, timeout): + lock.release() + self._stop() + except: + if lock.locked(): + # bpo-45274: lock.acquire() acquired the lock, but the function + # was interrupted with an exception before reaching the + # lock.release(). It can happen if a signal handler raises an + # exception, like CTRL+C which raises KeyboardInterrupt. + lock.release() + self._stop() + raise @property def name(self): @@ -1072,8 +1170,8 @@ def is_alive(self): """Return whether the thread is alive. This method returns True just before the run() method starts until just - after the run() method terminates. The module function enumerate() - returns a list of all alive threads. + after the run() method terminates. See also the module function + enumerate(). """ assert self._initialized, "Thread.__init__() not called" @@ -1082,16 +1180,6 @@ def is_alive(self): self._wait_for_tstate_lock(False) return not self._is_stopped - def isAlive(self): - """Return whether the thread is alive. - - This method is deprecated, use is_alive() instead. - """ - import warnings - warnings.warn('isAlive() is deprecated, use is_alive() instead', - DeprecationWarning, stacklevel=2) - return self.is_alive() - @property def daemon(self): """A boolean value indicating whether this thread is a daemon thread. @@ -1116,15 +1204,47 @@ def daemon(self, daemonic): self._daemonic = daemonic def isDaemon(self): + """Return whether this thread is a daemon. + + This method is deprecated, use the daemon attribute instead. + + """ + import warnings + warnings.warn('isDaemon() is deprecated, get the daemon attribute instead', + DeprecationWarning, stacklevel=2) return self.daemon def setDaemon(self, daemonic): + """Set whether this thread is a daemon. + + This method is deprecated, use the .daemon property instead. + + """ + import warnings + warnings.warn('setDaemon() is deprecated, set the daemon attribute instead', + DeprecationWarning, stacklevel=2) self.daemon = daemonic def getName(self): + """Return a string used for identification purposes only. + + This method is deprecated, use the name attribute instead. + + """ + import warnings + warnings.warn('getName() is deprecated, get the name attribute instead', + DeprecationWarning, stacklevel=2) return self.name def setName(self, name): + """Set the name string for this thread. + + This method is deprecated, use the name attribute instead. + + """ + import warnings + warnings.warn('setName() is deprecated, set the name attribute instead', + DeprecationWarning, stacklevel=2) self.name = name @@ -1174,6 +1294,10 @@ def excepthook(args, /): stderr.flush() +# Original value of threading.excepthook +__excepthook__ = excepthook + + def _make_invoke_excepthook(): # Create a local namespace to ensure that variables remain alive # when _invoke_excepthook() is called, even if it is called late during @@ -1315,7 +1439,16 @@ def current_thread(): except KeyError: return _DummyThread() -currentThread = current_thread +def currentThread(): + """Return the current Thread object, corresponding to the caller's thread of control. + + This function is deprecated, use current_thread() instead. + + """ + import warnings + warnings.warn('currentThread() is deprecated, use current_thread() instead', + DeprecationWarning, stacklevel=2) + return current_thread() def active_count(): """Return the number of Thread objects currently alive. @@ -1327,7 +1460,16 @@ def active_count(): with _active_limbo_lock: return len(_active) + len(_limbo) -activeCount = active_count +def activeCount(): + """Return the number of Thread objects currently alive. + + This function is deprecated, use active_count() instead. + + """ + import warnings + warnings.warn('activeCount() is deprecated, use active_count() instead', + DeprecationWarning, stacklevel=2) + return active_count() def _enumerate(): # Same as enumerate(), but without the lock. Internal use only. @@ -1344,6 +1486,27 @@ def enumerate(): with _active_limbo_lock: return list(_active.values()) + list(_limbo.values()) + +_threading_atexits = [] +_SHUTTING_DOWN = False + +def _register_atexit(func, *arg, **kwargs): + """CPython internal: register *func* to be called before joining threads. + + The registered *func* is called with its arguments just before all + non-daemon threads are joined in `_shutdown()`. It provides a similar + purpose to `atexit.register()`, but its functions are called prior to + threading shutdown instead of interpreter shutdown. + + For similarity to atexit, the registered functions are called in reverse. + """ + if _SHUTTING_DOWN: + raise RuntimeError("can't register atexit after shutdown") + + call = functools.partial(func, *arg, **kwargs) + _threading_atexits.append(call) + + from _thread import stack_size # Create the main thread object, @@ -1365,14 +1528,30 @@ def _shutdown(): # _shutdown() was already called return + global _SHUTTING_DOWN + _SHUTTING_DOWN = True + + # Call registered threading atexit functions before threads are joined. + # Order is reversed, similar to atexit. + for atexit_call in reversed(_threading_atexits): + atexit_call() + # Main thread - tlock = _main_thread._tstate_lock - # The main thread isn't finished yet, so its thread state lock can't have - # been released. - assert tlock is not None - assert tlock.locked() - tlock.release() - _main_thread._stop() + if _main_thread.ident == get_ident(): + tlock = _main_thread._tstate_lock + # The main thread isn't finished yet, so its thread state lock can't + # have been released. + assert tlock is not None + assert tlock.locked() + tlock.release() + _main_thread._stop() + else: + # bpo-1596321: _shutdown() must be called in the main thread. + # If the threading module was not imported by the main thread, + # _main_thread is the thread which imported the threading module. + # In this case, ignore _main_thread, similar behavior than for threads + # spawned by C libraries or using _thread.start_new_thread(). + pass # Join all non-deamon threads while True: @@ -1384,7 +1563,7 @@ def _shutdown(): break for lock in locks: - # mimick Thread.join() + # mimic Thread.join() lock.acquire() lock.release() @@ -1417,7 +1596,7 @@ def _after_fork(): # by another (non-forked) thread. http://bugs.python.org/issue874900 global _active_limbo_lock, _main_thread global _shutdown_locks_lock, _shutdown_locks - _active_limbo_lock = _allocate_lock() + _active_limbo_lock = RLock() # fork() only copied the current thread; clear references to others. new_active = {}