Skip to content

gh-128002: use per threads tasks linked list in asyncio #128869

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Include/internal/pycore_interp.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ struct _is {
PyMutex weakref_locks[NUM_WEAKREF_LIST_LOCKS];
_PyIndexPool tlbc_indices;
#endif
// Per-interpreter list of tasks, any lingering tasks from thread
// states gets added here and removed from the corresponding
// thread state's list.
struct llist_node asyncio_tasks_head;
// `asyncio_tasks_lock` is used when tasks are moved
// from thread's list to interpreter's list.
PyMutex asyncio_tasks_lock;

// Per-interpreter state for the obmalloc allocator. For the main
// interpreter and for all interpreters that don't have their
Expand Down
2 changes: 1 addition & 1 deletion Include/internal/pycore_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ typedef enum _PyLockFlags {

// Lock a mutex with an optional timeout and additional options. See
// _PyLockFlags for details.
extern PyLockStatus
extern PyAPI_FUNC(PyLockStatus)
_PyMutex_LockTimed(PyMutex *m, PyTime_t timeout_ns, _PyLockFlags flags);

// Lock a mutex with additional options. See _PyLockFlags for details.
Expand Down
4 changes: 2 additions & 2 deletions Include/internal/pycore_pystate.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ extern void _PyEval_StartTheWorldAll(_PyRuntimeState *runtime);
// Perform a stop-the-world pause for threads in the specified interpreter.
//
// NOTE: This is a no-op outside of Py_GIL_DISABLED builds.
extern void _PyEval_StopTheWorld(PyInterpreterState *interp);
extern void _PyEval_StartTheWorld(PyInterpreterState *interp);
extern PyAPI_FUNC(void) _PyEval_StopTheWorld(PyInterpreterState *interp);
extern PyAPI_FUNC(void) _PyEval_StartTheWorld(PyInterpreterState *interp);


static inline void
Expand Down
5 changes: 5 additions & 0 deletions Include/internal/pycore_tstate.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@ typedef struct _PyThreadStateImpl {
PyObject *asyncio_running_loop; // Strong reference
PyObject *asyncio_running_task; // Strong reference

/* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
or subclasses of it used in `asyncio.all_tasks`.
*/
struct llist_node asyncio_tasks_head;
struct _qsbr_thread_state *qsbr; // only used by free-threaded build
struct llist_node mem_free_queue; // delayed free queue


#ifdef Py_GIL_DISABLED
struct _gc_thread_state gc;
struct _mimalloc_thread_state mimalloc;
Expand Down
19 changes: 18 additions & 1 deletion Lib/test/test_asyncio/test_free_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import unittest
from threading import Thread
from unittest import TestCase

import weakref
from test import support
from test.support import threading_helper

threading_helper.requires_working_threading(module=True)
Expand Down Expand Up @@ -95,6 +96,22 @@ def check():
done.set()
runner.join()

def test_task_different_thread_finalized(self) -> None:
task = None
async def func():
nonlocal task
task = asyncio.current_task()

thread = Thread(target=lambda: asyncio.run(func()))
thread.start()
thread.join()
wr = weakref.ref(task)
del thread
del task
# task finalization in different thread shouldn't crash
support.gc_collect()
self.assertIsNone(wr())

def test_run_coroutine_threadsafe(self) -> None:
results = []

Expand Down
165 changes: 112 additions & 53 deletions Modules/_asynciomodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ typedef struct TaskObj {
PyObject *task_name;
PyObject *task_context;
struct llist_node task_node;
#ifdef Py_GIL_DISABLED
// thread id of the thread where this task was created
uintptr_t task_tid;
#endif
} TaskObj;

typedef struct {
Expand Down Expand Up @@ -94,14 +98,6 @@ typedef struct {
|| PyObject_TypeCheck(obj, state->FutureType) \
|| PyObject_TypeCheck(obj, state->TaskType))

#ifdef Py_GIL_DISABLED
# define ASYNCIO_STATE_LOCK(state) Py_BEGIN_CRITICAL_SECTION_MUT(&state->mutex)
# define ASYNCIO_STATE_UNLOCK(state) Py_END_CRITICAL_SECTION()
#else
# define ASYNCIO_STATE_LOCK(state) ((void)state)
# define ASYNCIO_STATE_UNLOCK(state) ((void)state)
#endif

typedef struct _Py_AsyncioModuleDebugOffsets {
struct _asyncio_task_object {
uint64_t size;
Expand Down Expand Up @@ -135,9 +131,6 @@ GENERATE_DEBUG_SECTION(AsyncioDebug, Py_AsyncioModuleDebugOffsets AsyncioDebug)

/* State of the _asyncio module */
typedef struct {
#ifdef Py_GIL_DISABLED
PyMutex mutex;
#endif
PyTypeObject *FutureIterType;
PyTypeObject *TaskStepMethWrapper_Type;
PyTypeObject *FutureType;
Expand Down Expand Up @@ -184,11 +177,6 @@ typedef struct {
/* Counter for autogenerated Task names */
uint64_t task_name_counter;

/* Head of circular linked-list of all tasks which are instances of `asyncio.Task`
or subclasses of it. Third party tasks implementations which don't inherit from
`asyncio.Task` are tracked separately using the `non_asyncio_tasks` WeakSet.
*/
struct llist_node asyncio_tasks_head;
} asyncio_state;

static inline asyncio_state *
Expand Down Expand Up @@ -2181,16 +2169,15 @@ static PyMethodDef TaskWakeupDef = {
static void
register_task(asyncio_state *state, TaskObj *task)
{
ASYNCIO_STATE_LOCK(state);
assert(Task_Check(state, task));
if (task->task_node.next != NULL) {
// already registered
assert(task->task_node.prev != NULL);
goto exit;
return;
}
llist_insert_tail(&state->asyncio_tasks_head, &task->task_node);
exit:
ASYNCIO_STATE_UNLOCK(state);
_PyThreadStateImpl *tstate = (_PyThreadStateImpl *) _PyThreadState_GET();
struct llist_node *head = &tstate->asyncio_tasks_head;
llist_insert_tail(head, &task->task_node);
}

static int
Expand All @@ -2199,19 +2186,38 @@ register_eager_task(asyncio_state *state, PyObject *task)
return PySet_Add(state->eager_tasks, task);
}

static void
unregister_task(asyncio_state *state, TaskObj *task)
static inline void
unregister_task_safe(TaskObj *task)
{
ASYNCIO_STATE_LOCK(state);
assert(Task_Check(state, task));
if (task->task_node.next == NULL) {
// not registered
assert(task->task_node.prev == NULL);
goto exit;
return;
}
llist_remove(&task->task_node);
exit:
ASYNCIO_STATE_UNLOCK(state);
}

static void
unregister_task(asyncio_state *state, TaskObj *task)
{
assert(Task_Check(state, task));
#ifdef Py_GIL_DISABLED
// check if we are in the same thread
// if so, we can avoid locking
if (task->task_tid == _Py_ThreadId()) {
unregister_task_safe(task);
}
else {
// we are in a different thread
// stop the world then check and remove the task
PyThreadState *tstate = _PyThreadState_GET();
_PyEval_StopTheWorld(tstate->interp);
unregister_task_safe(task);
_PyEval_StartTheWorld(tstate->interp);
}
#else
unregister_task_safe(task);
#endif
}

static int
Expand Down Expand Up @@ -2425,6 +2431,9 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop,
}

Py_CLEAR(self->task_fut_waiter);
#ifdef Py_GIL_DISABLED
self->task_tid = _Py_ThreadId();
#endif
self->task_must_cancel = 0;
self->task_log_destroy_pending = 1;
self->task_num_cancels_requested = 0;
Expand Down Expand Up @@ -3968,6 +3977,7 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop)
static inline int
add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *loop)
{
assert(PySet_CheckExact(tasks));
PyObject *done = PyObject_CallMethodNoArgs(task, &_Py_ID(done));
if (done == NULL) {
return -1;
Expand All @@ -3990,6 +4000,57 @@ add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *lo
return 0;
}

static inline int
add_tasks_llist(struct llist_node *head, PyListObject *tasks)
{
struct llist_node *node;
llist_for_each_safe(node, head) {
TaskObj *task = llist_data(node, TaskObj, task_node);
// The linked list holds borrowed references to task
// as such it is possible that the task is concurrently
// deallocated while added to this list.
// To protect against concurrent deallocations,
// we first try to incref the task which would fail
// if it is concurrently getting deallocated in another thread,
// otherwise it gets added to the list.
if (_Py_TryIncref((PyObject *)task)) {
if (_PyList_AppendTakeRef(tasks, (PyObject *)task) < 0) {
// do not call any escaping calls here while the world is stopped.
return -1;
}
}
}
return 0;
}

static inline int
add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks)
{
#ifdef Py_GIL_DISABLED
assert(interp->stoptheworld.world_stopped);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻

#endif
// Start traversing from interpreter's linked list
struct llist_node *head = &interp->asyncio_tasks_head;

if (add_tasks_llist(head, tasks) < 0) {
return -1;
}

int ret = 0;
// traverse the task lists of thread states
_Py_FOR_EACH_TSTATE_BEGIN(interp, p) {
_PyThreadStateImpl *ts = (_PyThreadStateImpl *)p;
head = &ts->asyncio_tasks_head;
if (add_tasks_llist(head, tasks) < 0) {
ret = -1;
goto exit;
}
}
exit:
_Py_FOR_EACH_TSTATE_END(interp);
return ret;
}

/*********************** Module **************************/

/*[clinic input]
Expand Down Expand Up @@ -4028,30 +4089,29 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop)
Py_DECREF(loop);
return NULL;
}
int err = 0;
ASYNCIO_STATE_LOCK(state);
struct llist_node *node;

llist_for_each_safe(node, &state->asyncio_tasks_head) {
TaskObj *task = llist_data(node, TaskObj, task_node);
// The linked list holds borrowed references to task
// as such it is possible that the task is concurrently
// deallocated while added to this list.
// To protect against concurrent deallocations,
// we first try to incref the task which would fail
// if it is concurrently getting deallocated in another thread,
// otherwise it gets added to the list.
if (_Py_TryIncref((PyObject *)task)) {
if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) {
Py_DECREF(tasks);
Py_DECREF(loop);
err = 1;
break;
}
}
}
ASYNCIO_STATE_UNLOCK(state);
if (err) {
PyInterpreterState *interp = PyInterpreterState_Get();
// Stop the world and traverse the per-thread linked list
// of asyncio tasks for every thread, as well as the
// interpreter's linked list, and add them to `tasks`.
// The interpreter linked list is used for any lingering tasks
// whose thread state has been deallocated while the task was
// still alive. This can happen if a task is referenced by
// a different thread, in which case the task is moved to
// the interpreter's linked list from the thread's linked
// list before deallocation. See PyThreadState_Clear.
//
// The stop-the-world pause is required so that no thread
// modifies its linked list while being iterated here
// in parallel. This design allows for lock-free
// register_task/unregister_task for loops running in parallel
// in different threads (the general case).
_PyEval_StopTheWorld(interp);
int ret = add_tasks_interp(interp, (PyListObject *)tasks);
_PyEval_StartTheWorld(interp);
if (ret < 0) {
// call any escaping calls after starting the world to avoid any deadlocks.
Py_DECREF(tasks);
Py_DECREF(loop);
return NULL;
}
PyObject *scheduled_iter = PyObject_GetIter(state->non_asyncio_tasks);
Expand Down Expand Up @@ -4323,7 +4383,6 @@ module_exec(PyObject *mod)
{
asyncio_state *state = get_asyncio_state(mod);

llist_init(&state->asyncio_tasks_head);

#define CREATE_TYPE(m, tp, spec, base) \
do { \
Expand Down
12 changes: 11 additions & 1 deletion Python/pystate.c
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,8 @@ init_interpreter(PyInterpreterState *interp,
_Py_brc_init_state(interp);
#endif
llist_init(&interp->mem_free_queue.head);
llist_init(&interp->asyncio_tasks_head);
interp->asyncio_tasks_lock = (PyMutex){0};
for (int i = 0; i < _PY_MONITORING_UNGROUPED_EVENTS; i++) {
interp->monitors.tools[i] = 0;
}
Expand Down Expand Up @@ -1512,7 +1514,7 @@ init_threadstate(_PyThreadStateImpl *_tstate,
tstate->delete_later = NULL;

llist_init(&_tstate->mem_free_queue);

llist_init(&_tstate->asyncio_tasks_head);
if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) {
// Start in the suspended state if there is an ongoing stop-the-world.
tstate->state = _Py_THREAD_SUSPENDED;
Expand Down Expand Up @@ -1692,6 +1694,14 @@ PyThreadState_Clear(PyThreadState *tstate)
Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_loop);
Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_task);


PyMutex_Lock(&tstate->interp->asyncio_tasks_lock);
// merge any lingering tasks from thread state to interpreter's
// tasks list
llist_concat(&tstate->interp->asyncio_tasks_head,
&((_PyThreadStateImpl *)tstate)->asyncio_tasks_head);
PyMutex_Unlock(&tstate->interp->asyncio_tasks_lock);

Py_CLEAR(tstate->dict);
Py_CLEAR(tstate->async_exc);

Expand Down
Loading