Skip to content

gh-128479: fix asyncio staggered race leaking tasks, and logging unhandled exception.append exception #128475

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 48 additions & 24 deletions Lib/asyncio/staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,27 @@ async def staggered_race(coro_fns, delay, *, loop=None):
enum_coro_fns = enumerate(coro_fns)
winner_result = None
winner_index = None
unhandled_exceptions = []
exceptions = []
running_tasks = []
running_tasks = set()
on_completed_fut = None

def task_done(task):
running_tasks.discard(task)
if (
on_completed_fut is not None
and not on_completed_fut.done()
and not running_tasks
):
on_completed_fut.set_result(None)

if task.cancelled():
return

exc = task.exception()
if exc is None:
return
unhandled_exceptions.append(exc)

async def run_one_coro(ok_to_start, previous_failed) -> None:
# in eager tasks this waits for the calling task to append this task
Expand All @@ -91,11 +110,11 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
this_failed = locks.Event()
next_ok_to_start = locks.Event()
next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed))
running_tasks.append(next_task)
running_tasks.add(next_task)
next_task.add_done_callback(task_done)
# next_task has been appended to running_tasks so next_task is ok to
# start.
next_ok_to_start.set()
assert len(running_tasks) == this_index + 2
# Prepare place to put this coroutine's exceptions if not won
exceptions.append(None)
assert len(exceptions) == this_index + 1
Expand All @@ -120,31 +139,36 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
# up as done() == True, cancelled() == False, exception() ==
# asyncio.CancelledError. This behavior is specified in
# https://bugs.python.org/issue30048
for i, t in enumerate(running_tasks):
if i != this_index:
current_task = tasks.current_task(loop)
for t in running_tasks:
if t is not current_task:
t.cancel()

ok_to_start = locks.Event()
first_task = loop.create_task(run_one_coro(ok_to_start, None))
running_tasks.append(first_task)
# first_task has been appended to running_tasks so first_task is ok to start.
ok_to_start.set()
propagate_cancellation_error = None
try:
# Wait for a growing list of tasks to all finish: poor man's version of
# curio's TaskGroup or trio's nursery
done_count = 0
while done_count != len(running_tasks):
done, _ = await tasks.wait(running_tasks)
done_count = len(done)
ok_to_start = locks.Event()
first_task = loop.create_task(run_one_coro(ok_to_start, None))
running_tasks.add(first_task)
first_task.add_done_callback(task_done)
# first_task has been appended to running_tasks so first_task is ok to start.
ok_to_start.set()
propagate_cancellation_error = None
# Make sure no tasks are left running if we leave this function
while running_tasks:
on_completed_fut = loop.create_future()
try:
await on_completed_fut
except exceptions_mod.CancelledError as ex:
propagate_cancellation_error = ex
for task in running_tasks:
task.cancel(*ex.args)
on_completed_fut = None
if __debug__ and unhandled_exceptions:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably this should just always raise it, not sure why this was suppressed in the original code

# If run_one_coro raises an unhandled exception, it's probably a
# programming error, and I want to see it.
if __debug__:
for d in done:
if d.done() and not d.cancelled() and d.exception():
raise d.exception()
raise ExceptionGroup("staggered race failed", unhandled_exceptions)
if propagate_cancellation_error is not None:
raise propagate_cancellation_error
return winner_result, winner_index, exceptions
finally:
del exceptions
# Make sure no tasks are left running if we leave this function
for t in running_tasks:
t.cancel()
del exceptions, propagate_cancellation_error, unhandled_exceptions
27 changes: 27 additions & 0 deletions Lib/test/test_asyncio/test_staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,30 @@ async def do_set():
self.assertIsNone(excs[0], None)
self.assertIsInstance(excs[1], asyncio.CancelledError)
self.assertIsInstance(excs[2], asyncio.CancelledError)


async def test_cancelled(self):
log = []
with self.assertRaises(TimeoutError):
async with asyncio.timeout(None) as cs_outer, asyncio.timeout(None) as cs_inner:
async def coro_fn():
cs_inner.reschedule(-1)
await asyncio.sleep(0)
try:
await asyncio.sleep(0)
except asyncio.CancelledError:
log.append("cancelled 1")

cs_outer.reschedule(-1)
await asyncio.sleep(0)
try:
await asyncio.sleep(0)
except asyncio.CancelledError:
log.append("cancelled 2")
try:
await staggered_race([coro_fn], delay=None)
except asyncio.CancelledError:
log.append("cancelled 3")
raise

self.assertListEqual(log, ["cancelled 1", "cancelled 2", "cancelled 3"])
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix :func:`!asyncio.staggered.staggered_race` leaking tasks and issuing an unhandled exception.
Loading