Skip to content

[3.11] gh-95166: cancel map waited on future on timeout (GH-95169) #95364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,18 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
done.update(waiter.finished_futures)
return DoneAndNotDoneFutures(done, fs - done)


def _result_or_cancel(fut, timeout=None):
try:
try:
return fut.result(timeout)
finally:
fut.cancel()
finally:
# Break a reference cycle with the exception in self._exception
del fut


class Future(object):
"""Represents the result of an asynchronous computation."""

Expand Down Expand Up @@ -604,9 +616,9 @@ def result_iterator():
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result()
yield _result_or_cancel(fs.pop())
else:
yield fs.pop().result(end_time - time.monotonic())
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
finally:
for future in fs:
future.cancel()
Expand Down
27 changes: 27 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,33 @@ def submit(pool):
with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
workers.submit(tuple)

def test_executor_map_current_future_cancel(self):
stop_event = threading.Event()
log = []

def log_n_wait(ident):
log.append(f"{ident=} started")
try:
stop_event.wait()
finally:
log.append(f"{ident=} stopped")

with self.executor_type(max_workers=1) as pool:
# submit work to saturate the pool
fut = pool.submit(log_n_wait, ident="first")
try:
with contextlib.closing(
pool.map(log_n_wait, ["second", "third"], timeout=0)
) as gen:
with self.assertRaises(TimeoutError):
next(gen)
finally:
stop_event.set()
fut.result()
# ident='second' is cancelled as a result of raising a TimeoutError
# ident='third' is cancelled because it remained in the collection of futures
self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])


class ProcessPoolExecutorTest(ExecutorTest):

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix :meth:`concurrent.futures.Executor.map` to cancel the currently waiting on future on an error - e.g. TimeoutError or KeyboardInterrupt.