Skip to content

Update thread/threading/concurrent from CPython 3.10.5 #3921

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 36 additions & 38 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,14 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
A named 2-tuple of sets. The first set, named 'done', contains the
futures that completed (is finished or cancelled) before the wait
completed. The second set, named 'not_done', contains uncompleted
futures.
futures. Duplicate futures given to *fs* are removed and will be
returned only once.
"""
fs = set(fs)
with _AcquireFutures(fs):
done = set(f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
not_done = set(fs) - done

done = {f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]}
not_done = fs - done
if (return_when == FIRST_COMPLETED) and done:
return DoneAndNotDoneFutures(done, not_done)
elif (return_when == FIRST_EXCEPTION) and done:
Expand All @@ -309,7 +310,7 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
f._waiters.remove(waiter)

done.update(waiter.finished_futures)
return DoneAndNotDoneFutures(done, set(fs) - done)
return DoneAndNotDoneFutures(done, fs - done)

class Future(object):
"""Represents the result of an asynchronous computation."""
Expand Down Expand Up @@ -380,13 +381,17 @@ def running(self):
return self._state == RUNNING

def done(self):
"""Return True of the future was cancelled or finished executing."""
"""Return True if the future was cancelled or finished executing."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]

def __get_result(self):
if self._exception:
raise self._exception
try:
raise self._exception
finally:
# Break a reference cycle with the exception in self._exception
self = None
else:
return self._result

Expand Down Expand Up @@ -426,20 +431,24 @@ def result(self, timeout=None):
timeout.
Exception: If the call raised then that exception will be raised.
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()

self._condition.wait(timeout)

if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()
try:
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()

self._condition.wait(timeout)

if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()
finally:
# Break a reference cycle with the exception in self._exception
self = None

def exception(self, timeout=None):
"""Return the exception raised by the call that the future represents.
Expand Down Expand Up @@ -550,7 +559,7 @@ def set_exception(self, exception):
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""

def submit(*args, **kwargs):
def submit(self, fn, /, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
Expand All @@ -559,21 +568,7 @@ def submit(*args, **kwargs):
Returns:
A Future representing the given call.
"""
if len(args) >= 2:
pass
elif not args:
raise TypeError("descriptor 'submit' of 'Executor' object "
"needs an argument")
elif 'fn' in kwargs:
import warnings
warnings.warn("Passing 'fn' as keyword argument is deprecated",
DeprecationWarning, stacklevel=2)
else:
raise TypeError('submit expected at least 1 positional argument, '
'got %d' % (len(args)-1))

raise NotImplementedError()
submit.__text_signature__ = '($self, fn, /, *args, **kwargs)'

def map(self, fn, *iterables, timeout=None, chunksize=1):
"""Returns an iterator equivalent to map(fn, iter).
Expand Down Expand Up @@ -619,7 +614,7 @@ def result_iterator():
future.cancel()
return result_iterator()

def shutdown(self, wait=True):
def shutdown(self, wait=True, *, cancel_futures=False):
"""Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other
Expand All @@ -629,6 +624,9 @@ def shutdown(self, wait=True):
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
executor have been reclaimed.
cancel_futures: If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be
cancelled.
"""
pass

Expand Down
Loading