Skip to content

Commit e2b340a

Browse files
elprans1st1
authored andcommitted
bpo-32751: Wait for task cancellation in asyncio.wait_for() (GH-7216)
Currently, asyncio.wait_for(fut), upon reaching the timeout deadline, cancels the future and returns immediately. This is problematic for when *fut* is a Task, because it will be left running for an arbitrary amount of time. This behavior is iself surprising and may lead to related bugs such as the one described in bpo-33638: condition = asyncio.Condition() async with condition: await asyncio.wait_for(condition.wait(), timeout=0.5) Currently, instead of raising a TimeoutError, the above code will fail with `RuntimeError: cannot wait on un-acquired lock`, because `__aexit__` is reached _before_ `condition.wait()` finishes its cancellation and re-acquires the condition lock. To resolve this, make `wait_for` await for the task cancellation. The tradeoff here is that the `timeout` promise may be broken if the task decides to handle its cancellation in a slow way. This represents a behavior change and should probably not be back-patched to 3.6 and earlier.
1 parent 863b674 commit e2b340a

File tree

5 files changed

+100
-3
lines changed

5 files changed

+100
-3
lines changed

Doc/library/asyncio-task.rst

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -790,7 +790,9 @@ Task functions
790790

791791
Returns result of the Future or coroutine. When a timeout occurs, it
792792
cancels the task and raises :exc:`asyncio.TimeoutError`. To avoid the task
793-
cancellation, wrap it in :func:`shield`.
793+
cancellation, wrap it in :func:`shield`. The function will wait until
794+
the future is actually cancelled, so the total wait time may exceed
795+
the *timeout*.
794796

795797
If the wait is cancelled, the future *fut* is also cancelled.
796798

@@ -800,3 +802,8 @@ Task functions
800802

801803
.. versionchanged:: 3.4.3
802804
If the wait is cancelled, the future *fut* is now also cancelled.
805+
806+
.. versionchanged:: 3.7
807+
When *fut* is cancelled due to a timeout, ``wait_for`` now waits
808+
for *fut* to be cancelled. Previously,
809+
it raised :exc:`~asyncio.TimeoutError` immediately.

Lib/asyncio/tasks.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -412,14 +412,17 @@ async def wait_for(fut, timeout, *, loop=None):
412412
return fut.result()
413413
else:
414414
fut.remove_done_callback(cb)
415-
fut.cancel()
415+
# We must ensure that the task is not running
416+
# after wait_for() returns.
417+
# See https://bugs.python.org/issue32751
418+
await _cancel_and_wait(fut, loop=loop)
416419
raise futures.TimeoutError()
417420
finally:
418421
timeout_handle.cancel()
419422

420423

421424
async def _wait(fs, timeout, return_when, loop):
422-
"""Internal helper for wait() and wait_for().
425+
"""Internal helper for wait().
423426
424427
The fs argument must be a collection of Futures.
425428
"""
@@ -461,6 +464,22 @@ def _on_completion(f):
461464
return done, pending
462465

463466

467+
async def _cancel_and_wait(fut, loop):
468+
"""Cancel the *fut* future or task and wait until it completes."""
469+
470+
waiter = loop.create_future()
471+
cb = functools.partial(_release_waiter, waiter)
472+
fut.add_done_callback(cb)
473+
474+
try:
475+
fut.cancel()
476+
# We cannot wait on *fut* directly to make
477+
# sure _cancel_and_wait itself is reliably cancellable.
478+
await waiter
479+
finally:
480+
fut.remove_done_callback(cb)
481+
482+
464483
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
465484
def as_completed(fs, *, loop=None, timeout=None):
466485
"""Return an iterator whose values are coroutines.

Lib/test/test_asyncio/test_locks.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -807,6 +807,19 @@ def test_ambiguous_loops(self):
807807
with self.assertRaises(ValueError):
808808
asyncio.Condition(lock, loop=loop)
809809

810+
def test_timeout_in_block(self):
811+
loop = asyncio.new_event_loop()
812+
self.addCleanup(loop.close)
813+
814+
async def task_timeout():
815+
condition = asyncio.Condition(loop=loop)
816+
async with condition:
817+
with self.assertRaises(asyncio.TimeoutError):
818+
await asyncio.wait_for(condition.wait(), timeout=0.5,
819+
loop=loop)
820+
821+
loop.run_until_complete(task_timeout())
822+
810823

811824
class SemaphoreTests(test_utils.TestCase):
812825

Lib/test/test_asyncio/test_tasks.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,62 @@ def gen():
789789
res = loop.run_until_complete(task)
790790
self.assertEqual(res, "ok")
791791

792+
def test_wait_for_waits_for_task_cancellation(self):
793+
loop = asyncio.new_event_loop()
794+
self.addCleanup(loop.close)
795+
796+
task_done = False
797+
798+
async def foo():
799+
async def inner():
800+
nonlocal task_done
801+
try:
802+
await asyncio.sleep(0.2, loop=loop)
803+
finally:
804+
task_done = True
805+
806+
inner_task = self.new_task(loop, inner())
807+
808+
with self.assertRaises(asyncio.TimeoutError):
809+
await asyncio.wait_for(inner_task, timeout=0.1, loop=loop)
810+
811+
self.assertTrue(task_done)
812+
813+
loop.run_until_complete(foo())
814+
815+
def test_wait_for_self_cancellation(self):
816+
loop = asyncio.new_event_loop()
817+
self.addCleanup(loop.close)
818+
819+
async def foo():
820+
async def inner():
821+
try:
822+
await asyncio.sleep(0.3, loop=loop)
823+
except asyncio.CancelledError:
824+
try:
825+
await asyncio.sleep(0.3, loop=loop)
826+
except asyncio.CancelledError:
827+
await asyncio.sleep(0.3, loop=loop)
828+
829+
return 42
830+
831+
inner_task = self.new_task(loop, inner())
832+
833+
wait = asyncio.wait_for(inner_task, timeout=0.1, loop=loop)
834+
835+
# Test that wait_for itself is properly cancellable
836+
# even when the initial task holds up the initial cancellation.
837+
task = self.new_task(loop, wait)
838+
await asyncio.sleep(0.2, loop=loop)
839+
task.cancel()
840+
841+
with self.assertRaises(asyncio.CancelledError):
842+
await task
843+
844+
self.assertEqual(await inner_task, 42)
845+
846+
loop.run_until_complete(foo())
847+
792848
def test_wait(self):
793849

794850
def gen():
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
When cancelling the task due to a timeout, :meth:`asyncio.wait_for` will now
2+
wait until the cancellation is complete.

0 commit comments

Comments
 (0)