Skip to content

Commit

Permalink
gh-129195: use future_add_to_awaited_by/future_discard_from_awaited_b…
Browse files Browse the repository at this point in the history
…y in staggered race
  • Loading branch information
graingert committed Jan 24, 2025
1 parent 732670d commit ddeeee9
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion Lib/asyncio/staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from . import exceptions as exceptions_mod
from . import locks
from . import tasks
from . import futures


async def staggered_race(coro_fns, delay, *, loop=None):
Expand Down Expand Up @@ -63,6 +64,7 @@ async def staggered_race(coro_fns, delay, *, loop=None):
"""
# TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
loop = loop or events.get_running_loop()
parent_task = tasks.current_task(loop)
enum_coro_fns = enumerate(coro_fns)
winner_result = None
winner_index = None
Expand All @@ -73,6 +75,7 @@ async def staggered_race(coro_fns, delay, *, loop=None):

def task_done(task):
running_tasks.discard(task)
futures.future_discard_from_awaited_by(task, parent_task)
if (
on_completed_fut is not None
and not on_completed_fut.done()
Expand Down Expand Up @@ -110,6 +113,7 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
this_failed = locks.Event()
next_ok_to_start = locks.Event()
next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed))
futures.future_add_to_awaited_by(task, parent_task)
running_tasks.add(next_task)
next_task.add_done_callback(task_done)
# next_task has been appended to running_tasks so next_task is ok to
Expand Down Expand Up @@ -148,6 +152,7 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
try:
ok_to_start = locks.Event()
first_task = loop.create_task(run_one_coro(ok_to_start, None))
futures.future_add_to_awaited_by(task, parent_task)
running_tasks.add(first_task)
first_task.add_done_callback(task_done)
# first_task has been appended to running_tasks so first_task is ok to start.
Expand All @@ -171,4 +176,4 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
raise propagate_cancellation_error
return winner_result, winner_index, exceptions
finally:
del exceptions, propagate_cancellation_error, unhandled_exceptions
del exceptions, propagate_cancellation_error, unhandled_exceptions, parent_task

0 comments on commit ddeeee9

Please sign in to comment.