Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-129195: use future_add_to_awaited_by/future_discard_from_awaited_by in asyncio.staggered.staggered_race #129253

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Lib/asyncio/staggered.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from . import exceptions as exceptions_mod
from . import locks
from . import tasks
from . import futures


async def staggered_race(coro_fns, delay, *, loop=None):
Expand Down Expand Up @@ -63,6 +64,7 @@ async def staggered_race(coro_fns, delay, *, loop=None):
"""
# TODO: when we have aiter() and anext(), allow async iterables in coro_fns.
loop = loop or events.get_running_loop()
parent_task = tasks.current_task(loop)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we're not running in a task? Should we assert that this is non-None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one possible outcome is that there's no task or running loop:

import asyncio.staggered

async def main():
    async def asyncfn():
        pass

    await asyncio.staggered.staggered_race([asyncfn, asyncfn], delay=None)

main().__await__().send(None)

we get a nice traceback, saying that:

./python demo.py
Traceback (most recent call last):
  File "/home/graingert/projects/cpython/demo.py", line 9, in <module>
    main().__await__().send(None)
    ~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "/home/graingert/projects/cpython/demo.py", line 7, in main
    await asyncio.staggered.staggered_race([asyncfn, asyncfn], delay=None)
  File "/home/graingert/projects/cpython/Lib/asyncio/staggered.py", line 66, in staggered_race
    loop = loop or events.get_running_loop()
                   ~~~~~~~~~~~~~~~~~~~~~~~^^
RuntimeError: no running event loop

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you manually pass a loop, and step the coroutine outside of a task:

import asyncio.staggered

async def main(loop):
    async def asyncfn():
        print("hello")

    await asyncio.staggered.staggered_race([asyncfn, asyncfn], delay=None, loop=loop)
    return "world"

loop = asyncio.EventLoop()
coro = main(loop).__await__()
f = coro.send(None)
loop.run_until_complete(f)
try:
    coro.send(None)
except StopIteration as e:
    print(e.value)

by some miracle it all works, this is because future_add_to_awaited_by and future_discard_from_awaited_by is noop if any arg is not a future (eg is None) and we don't have any further use of the parent task

./python demo.py
hello
world

so we should probably leave it like that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's worth testing this usecase though

enum_coro_fns = enumerate(coro_fns)
winner_result = None
winner_index = None
Expand All @@ -73,6 +75,7 @@ async def staggered_race(coro_fns, delay, *, loop=None):

def task_done(task):
running_tasks.discard(task)
futures.future_discard_from_awaited_by(task, parent_task)
if (
on_completed_fut is not None
and not on_completed_fut.done()
Expand Down Expand Up @@ -110,6 +113,7 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
this_failed = locks.Event()
next_ok_to_start = locks.Event()
next_task = loop.create_task(run_one_coro(next_ok_to_start, this_failed))
futures.future_add_to_awaited_by(next_task, parent_task)
running_tasks.add(next_task)
next_task.add_done_callback(task_done)
# next_task has been appended to running_tasks so next_task is ok to
Expand Down Expand Up @@ -148,6 +152,7 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
try:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above this--for cancelling all the coroutines, future_discard_from_awaited_by should get called, right?

Copy link
Contributor Author

@graingert graingert Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah add_done_callback callbacks are called on cancellation, so it will get called

ok_to_start = locks.Event()
first_task = loop.create_task(run_one_coro(ok_to_start, None))
futures.future_add_to_awaited_by(first_task, parent_task)
running_tasks.add(first_task)
first_task.add_done_callback(task_done)
# first_task has been appended to running_tasks so first_task is ok to start.
Expand All @@ -171,4 +176,4 @@ async def run_one_coro(ok_to_start, previous_failed) -> None:
raise propagate_cancellation_error
return winner_result, winner_index, exceptions
finally:
del exceptions, propagate_cancellation_error, unhandled_exceptions
del exceptions, propagate_cancellation_error, unhandled_exceptions, parent_task
63 changes: 63 additions & 0 deletions Lib/test/test_external_inspection.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,69 @@ async def main():
]
self.assertEqual(stack_trace, expected_stack_trace)

@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
"Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support")
def test_async_staggered_race_remote_stack_trace(self):
# Spawn a process with some realistic Python code
script = textwrap.dedent("""\
import asyncio.staggered
import time
import sys

async def deep():
await asyncio.sleep(0)
fifo_path = sys.argv[1]
with open(fifo_path, "w") as fifo:
fifo.write("ready")
time.sleep(10000)

async def c1():
await asyncio.sleep(0)
await deep()

async def c2():
await asyncio.sleep(10000)

async def main():
await asyncio.staggered.staggered_race(
[c1, c2],
delay=None,
)

asyncio.run(main())
""")
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
os.mkdir(script_dir)
fifo = f"{work_dir}/the_fifo"
os.mkfifo(fifo)
script_name = _make_test_script(script_dir, 'script', script)
try:
p = subprocess.Popen([sys.executable, script_name, str(fifo)])
with open(fifo, "r") as fifo_file:
response = fifo_file.read()
self.assertEqual(response, "ready")
stack_trace = get_async_stack_trace(p.pid)
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace")
finally:
os.remove(fifo)
p.kill()
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)

# sets are unordered, so we want to sort "awaited_by"s
stack_trace[2].sort(key=lambda x: x[1])

expected_stack_trace = [
['deep', 'c1', 'run_one_coro'], 'Task-2', [[['main'], 'Task-1', []]]
]
self.assertEqual(stack_trace, expected_stack_trace)

@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
"Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Support reporting call graph information from :func:`!asyncio.staggered.staggered_race`
graingert marked this conversation as resolved.
Show resolved Hide resolved
Loading