Skip to content

gh-91048: Add ability to list all pending asyncio tasks in a process remotely #132807

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 122 additions & 1 deletion Lib/test/test_external_inspection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import textwrap
import importlib
import sys
from test.support import os_helper, SHORT_TIMEOUT
from test.support import os_helper, SHORT_TIMEOUT, busy_retry
from test.support.script_helper import make_script

import subprocess
Expand All @@ -14,6 +14,7 @@
from _testexternalinspection import PROCESS_VM_READV_SUPPORTED
from _testexternalinspection import get_stack_trace
from _testexternalinspection import get_async_stack_trace
from _testexternalinspection import get_all_awaited_by
except ImportError:
raise unittest.SkipTest(
"Test only runs when _testexternalinspection is available")
Expand Down Expand Up @@ -349,6 +350,126 @@ async def main():
]
self.assertEqual(stack_trace, expected_stack_trace)

@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
"Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
"Test only runs on Linux with process_vm_readv support")
def test_async_global_awaited_by(self):
script = textwrap.dedent("""\
import asyncio
import os
import random
import sys
from string import ascii_lowercase, digits
from test.support import socket_helper, SHORT_TIMEOUT

HOST = '127.0.0.1'
PORT = socket_helper.find_unused_port()
connections = 0

class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
global connections
connections += 1
self.transport = transport

def data_received(self, data):
self.transport.write(data)
self.transport.close()

async def echo_client(message):
reader, writer = await asyncio.open_connection(HOST, PORT)
writer.write(message.encode())
await writer.drain()

data = await reader.read(100)
assert message == data.decode()
writer.close()
await writer.wait_closed()
await asyncio.sleep(SHORT_TIMEOUT)

async def echo_client_spam(server):
async with asyncio.TaskGroup() as tg:
while connections < 1000:
msg = list(ascii_lowercase + digits)
random.shuffle(msg)
tg.create_task(echo_client("".join(msg)))
await asyncio.sleep(0)
# at least a 1000 tasks created
fifo_path = sys.argv[1]
with open(fifo_path, "w") as fifo:
fifo.write("ready")
# at this point all client tasks completed without assertion errors
# let's wrap up the test
server.close()
await server.wait_closed()

async def main():
loop = asyncio.get_running_loop()
server = await loop.create_server(EchoServerProtocol, HOST, PORT)
async with server:
async with asyncio.TaskGroup() as tg:
tg.create_task(server.serve_forever(), name="server task")
tg.create_task(echo_client_spam(server), name="echo client spam")

asyncio.run(main())
""")
stack_trace = None
with os_helper.temp_dir() as work_dir:
script_dir = os.path.join(work_dir, "script_pkg")
os.mkdir(script_dir)
fifo = f"{work_dir}/the_fifo"
os.mkfifo(fifo)
script_name = _make_test_script(script_dir, 'script', script)
try:
p = subprocess.Popen([sys.executable, script_name, str(fifo)])
with open(fifo, "r") as fifo_file:
response = fifo_file.read()
self.assertEqual(response, "ready")
for _ in busy_retry(SHORT_TIMEOUT):
try:
all_awaited_by = get_all_awaited_by(p.pid)
except RuntimeError as re:
# This call reads a linked list in another process with
# no synchronization. That occasionally leads to invalid
# reads. Here we avoid making the test flaky.
msg = str(re)
if msg.startswith("Task list appears corrupted"):
continue
elif msg.startswith("Invalid linked list structure reading remote memory"):
continue
elif msg.startswith("Unknown error reading memory"):
continue
elif msg.startswith("Unhandled frame owner"):
continue
raise # Unrecognized exception, safest not to ignore it
else:
break
# expected: a list of two elements: 1 thread, 1 interp
self.assertEqual(len(all_awaited_by), 2)
# expected: a tuple with the thread ID and the awaited_by list
self.assertEqual(len(all_awaited_by[0]), 2)
# expected: no tasks in the fallback per-interp task list
self.assertEqual(all_awaited_by[1], (0, []))
entries = all_awaited_by[0][1]
# expected: at least 1000 pending tasks
self.assertGreaterEqual(len(entries), 1000)
# the first three tasks stem from the code structure
self.assertIn(('Task-1', []), entries)
self.assertIn(('server task', [[['main'], 'Task-1', []]]), entries)
self.assertIn(('echo client spam', [[['main'], 'Task-1', []]]), entries)
# the final task will have some random number, but it should for
# sure be one of the echo client spam horde
self.assertEqual([[['echo_client_spam'], 'echo client spam', [[['main'], 'Task-1', []]]]], entries[-1][1])
except PermissionError:
self.skipTest(
"Insufficient permissions to read the stack trace")
finally:
os.remove(fifo)
p.kill()
p.terminate()
p.wait(timeout=SHORT_TIMEOUT)

@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
"Test only runs on Linux and MacOS")
@unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add ability to externally inspect all pending asyncio tasks, even if no task
is currently entered on the event loop.
12 changes: 12 additions & 0 deletions Modules/_asynciomodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,17 @@ typedef struct _Py_AsyncioModuleDebugOffsets {
uint64_t task_is_task;
uint64_t task_awaited_by_is_set;
uint64_t task_coro;
uint64_t task_node;
} asyncio_task_object;
struct _asyncio_interpreter_state {
uint64_t size;
uint64_t asyncio_tasks_head;
} asyncio_interpreter_state;
struct _asyncio_thread_state {
uint64_t size;
uint64_t asyncio_running_loop;
uint64_t asyncio_running_task;
uint64_t asyncio_tasks_head;
} asyncio_thread_state;
} Py_AsyncioModuleDebugOffsets;

Expand All @@ -121,11 +127,17 @@ GENERATE_DEBUG_SECTION(AsyncioDebug, Py_AsyncioModuleDebugOffsets _AsyncioDebug)
.task_is_task = offsetof(TaskObj, task_is_task),
.task_awaited_by_is_set = offsetof(TaskObj, task_awaited_by_is_set),
.task_coro = offsetof(TaskObj, task_coro),
.task_node = offsetof(TaskObj, task_node),
},
.asyncio_interpreter_state = {
.size = sizeof(PyInterpreterState),
.asyncio_tasks_head = offsetof(PyInterpreterState, asyncio_tasks_head),
},
.asyncio_thread_state = {
.size = sizeof(_PyThreadStateImpl),
.asyncio_running_loop = offsetof(_PyThreadStateImpl, asyncio_running_loop),
.asyncio_running_task = offsetof(_PyThreadStateImpl, asyncio_running_task),
.asyncio_tasks_head = offsetof(_PyThreadStateImpl, asyncio_tasks_head),
}};

/* State of the _asyncio module */
Expand Down
Loading
Loading