Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion Lib/asyncio/staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ async def staggered_race(coro_fns, delay, *, loop=None):
winner_index = None
exceptions = []
running_tasks = []
waiting_tasks = []

async def run_one_coro(previous_failed) -> None:
# Wait for the previous task to finish, or for delay seconds
Expand All @@ -78,6 +79,7 @@ async def run_one_coro(previous_failed) -> None:
# cancelled, otherwise there will be a "Task destroyed but it is
# pending" later.
await tasks.wait_for(previous_failed.wait(), delay)
waiting_tasks.append(previous_failed)
# Get the next coroutine to run
try:
this_index, coro_fn = next(enum_coro_fns)
Expand All @@ -87,7 +89,11 @@ async def run_one_coro(previous_failed) -> None:
this_failed = locks.Event()
next_task = loop.create_task(run_one_coro(this_failed))
running_tasks.append(next_task)
assert len(running_tasks) == this_index + 2
running_tasks.append(next_task)
if len(waiting_tasks) != len(running_tasks):
task = running_tasks.pop(len(waiting_tasks) - 1)
if task.cancelled():
task.cancel()
# Prepare place to put this coroutine's exceptions if not won
exceptions.append(None)
assert len(exceptions) == this_index + 1
Expand Down
Loading