From 1a39d4161bacb8a4f9a88c3b9f1f83d8e77e4e30 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Sat, 11 Jan 2025 15:18:45 +0000 Subject: [PATCH 01/17] switch to per thread tasks list --- Include/internal/pycore_tstate.h | 2 ++ Modules/_asynciomodule.c | 38 +++++++------------------------- Python/pystate.c | 7 +++++- 3 files changed, 16 insertions(+), 31 deletions(-) diff --git a/Include/internal/pycore_tstate.h b/Include/internal/pycore_tstate.h index b8bea72baeaaf5..06cfeb1247c499 100644 --- a/Include/internal/pycore_tstate.h +++ b/Include/internal/pycore_tstate.h @@ -23,9 +23,11 @@ typedef struct _PyThreadStateImpl { PyObject *asyncio_running_loop; // Strong reference + struct llist_node asyncio_tasks_head; struct _qsbr_thread_state *qsbr; // only used by free-threaded build struct llist_node mem_free_queue; // delayed free queue + #ifdef Py_GIL_DISABLED struct _gc_thread_state gc; struct _mimalloc_thread_state mimalloc; diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 48f0ef95934fa4..c38d501c8af00d 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -76,19 +76,8 @@ typedef struct { #define Future_Check(state, obj) PyObject_TypeCheck(obj, state->FutureType) #define Task_Check(state, obj) PyObject_TypeCheck(obj, state->TaskType) -#ifdef Py_GIL_DISABLED -# define ASYNCIO_STATE_LOCK(state) Py_BEGIN_CRITICAL_SECTION_MUT(&state->mutex) -# define ASYNCIO_STATE_UNLOCK(state) Py_END_CRITICAL_SECTION() -#else -# define ASYNCIO_STATE_LOCK(state) ((void)state) -# define ASYNCIO_STATE_UNLOCK(state) ((void)state) -#endif - /* State of the _asyncio module */ typedef struct { -#ifdef Py_GIL_DISABLED - PyMutex mutex; -#endif PyTypeObject *FutureIterType; PyTypeObject *TaskStepMethWrapper_Type; PyTypeObject *FutureType; @@ -135,11 +124,6 @@ typedef struct { /* Counter for autogenerated Task names */ uint64_t task_name_counter; - /* Head of circular linked-list of all tasks which are instances of `asyncio.Task` - or subclasses of it. Third party tasks implementations which don't inherit from - `asyncio.Task` are tracked separately using the `non_asyncio_tasks` WeakSet. - */ - struct llist_node asyncio_tasks_head; } asyncio_state; static inline asyncio_state * @@ -1997,16 +1981,15 @@ static PyMethodDef TaskWakeupDef = { static void register_task(asyncio_state *state, TaskObj *task) { - ASYNCIO_STATE_LOCK(state); assert(Task_Check(state, task)); if (task->task_node.next != NULL) { // already registered assert(task->task_node.prev != NULL); - goto exit; + return; } - llist_insert_tail(&state->asyncio_tasks_head, &task->task_node); -exit: - ASYNCIO_STATE_UNLOCK(state); + _PyThreadStateImpl *tstate = (_PyThreadStateImpl *) _PyThreadState_GET(); + struct llist_node *head = &tstate->asyncio_tasks_head; + llist_insert_tail(head, &task->task_node); } static int @@ -2018,16 +2001,13 @@ register_eager_task(asyncio_state *state, PyObject *task) static void unregister_task(asyncio_state *state, TaskObj *task) { - ASYNCIO_STATE_LOCK(state); assert(Task_Check(state, task)); if (task->task_node.next == NULL) { // not registered assert(task->task_node.prev == NULL); - goto exit; + return; } llist_remove(&task->task_node); -exit: - ASYNCIO_STATE_UNLOCK(state); } static int @@ -3767,10 +3747,10 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) return NULL; } int err = 0; - ASYNCIO_STATE_LOCK(state); struct llist_node *node; - - llist_for_each_safe(node, &state->asyncio_tasks_head) { + _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET(); + struct llist_node *head = &ts->asyncio_tasks_head; + llist_for_each_safe(node, head) { TaskObj *task = llist_data(node, TaskObj, task_node); // The linked list holds borrowed references to task // as such it is possible that the task is concurrently @@ -3788,7 +3768,6 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) } } } - ASYNCIO_STATE_UNLOCK(state); if (err) { return NULL; } @@ -4015,7 +3994,6 @@ module_exec(PyObject *mod) { asyncio_state *state = get_asyncio_state(mod); - llist_init(&state->asyncio_tasks_head); #define CREATE_TYPE(m, tp, spec, base) \ do { \ diff --git a/Python/pystate.c b/Python/pystate.c index c546b7c3a9f10e..a244e1d2b418df 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -1519,7 +1519,7 @@ init_threadstate(_PyThreadStateImpl *_tstate, tstate->delete_later = NULL; llist_init(&_tstate->mem_free_queue); - + llist_init(&_tstate->asyncio_tasks_head); if (interp->stoptheworld.requested || _PyRuntime.stoptheworld.requested) { // Start in the suspended state if there is an ongoing stop-the-world. tstate->state = _Py_THREAD_SUSPENDED; @@ -1698,6 +1698,11 @@ PyThreadState_Clear(PyThreadState *tstate) Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_loop); + struct llist_node *node; + llist_for_each_safe(node, &((_PyThreadStateImpl *)tstate)->asyncio_tasks_head) { + llist_remove(node); + } + Py_CLEAR(tstate->dict); Py_CLEAR(tstate->async_exc); From 28735e7560f06b96918bb7b893b8d1c864a84b7e Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Sun, 12 Jan 2025 06:59:22 +0000 Subject: [PATCH 02/17] traverse linked lists of all threads --- Include/internal/pycore_pystate.h | 4 ++-- Modules/_asynciomodule.c | 40 ++++++++++++++++++------------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/Include/internal/pycore_pystate.h b/Include/internal/pycore_pystate.h index 1e73e541ef8de0..3812d6def6b6d0 100644 --- a/Include/internal/pycore_pystate.h +++ b/Include/internal/pycore_pystate.h @@ -182,8 +182,8 @@ extern void _PyEval_StartTheWorldAll(_PyRuntimeState *runtime); // Perform a stop-the-world pause for threads in the specified interpreter. // // NOTE: This is a no-op outside of Py_GIL_DISABLED builds. -extern void _PyEval_StopTheWorld(PyInterpreterState *interp); -extern void _PyEval_StartTheWorld(PyInterpreterState *interp); +extern PyAPI_FUNC(void) _PyEval_StopTheWorld(PyInterpreterState *interp); +extern PyAPI_FUNC(void) _PyEval_StartTheWorld(PyInterpreterState *interp); static inline void diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index c38d501c8af00d..9eb2a1b2ce5e39 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -3748,26 +3748,32 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) } int err = 0; struct llist_node *node; - _PyThreadStateImpl *ts = (_PyThreadStateImpl *)_PyThreadState_GET(); - struct llist_node *head = &ts->asyncio_tasks_head; - llist_for_each_safe(node, head) { - TaskObj *task = llist_data(node, TaskObj, task_node); - // The linked list holds borrowed references to task - // as such it is possible that the task is concurrently - // deallocated while added to this list. - // To protect against concurrent deallocations, - // we first try to incref the task which would fail - // if it is concurrently getting deallocated in another thread, - // otherwise it gets added to the list. - if (_Py_TryIncref((PyObject *)task)) { - if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) { - Py_DECREF(tasks); - Py_DECREF(loop); - err = 1; - break; + PyInterpreterState *interp = PyInterpreterState_Get(); + _PyEval_StopTheWorld(interp); + _PyThreadStateImpl *ts = (_PyThreadStateImpl *)PyInterpreterState_ThreadHead(interp); + while (ts) { + struct llist_node *head = &ts->asyncio_tasks_head; + llist_for_each_safe(node, head) { + TaskObj *task = llist_data(node, TaskObj, task_node); + // The linked list holds borrowed references to task + // as such it is possible that the task is concurrently + // deallocated while added to this list. + // To protect against concurrent deallocations, + // we first try to incref the task which would fail + // if it is concurrently getting deallocated in another thread, + // otherwise it gets added to the list. + if (_Py_TryIncref((PyObject *)task)) { + if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) { + Py_DECREF(tasks); + Py_DECREF(loop); + err = 1; + break; + } } } + ts = (_PyThreadStateImpl *)ts->base.next; } + _PyEval_StartTheWorld(interp); if (err) { return NULL; } From 305233018fe4ac478ff811ad0d2c00e95b41993b Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Tue, 14 Jan 2025 11:02:53 +0000 Subject: [PATCH 03/17] cleanup --- Include/internal/pycore_lock.h | 2 +- Modules/_asynciomodule.c | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Include/internal/pycore_lock.h b/Include/internal/pycore_lock.h index 8bcb23a6ce9f9d..7484b05d7f2446 100644 --- a/Include/internal/pycore_lock.h +++ b/Include/internal/pycore_lock.h @@ -52,7 +52,7 @@ typedef enum _PyLockFlags { // Lock a mutex with an optional timeout and additional options. See // _PyLockFlags for details. -extern PyLockStatus +extern PyAPI_FUNC(PyLockStatus) _PyMutex_LockTimed(PyMutex *m, PyTime_t timeout_ns, _PyLockFlags flags); // Lock a mutex with additional options. See _PyLockFlags for details. diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 9eb2a1b2ce5e39..8030944162c76f 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -3750,9 +3750,9 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) struct llist_node *node; PyInterpreterState *interp = PyInterpreterState_Get(); _PyEval_StopTheWorld(interp); - _PyThreadStateImpl *ts = (_PyThreadStateImpl *)PyInterpreterState_ThreadHead(interp); - while (ts) { - struct llist_node *head = &ts->asyncio_tasks_head; + _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { + _PyThreadStateImpl *tstate = (_PyThreadStateImpl *)p; + struct llist_node *head = &tstate->asyncio_tasks_head; llist_for_each_safe(node, head) { TaskObj *task = llist_data(node, TaskObj, task_node); // The linked list holds borrowed references to task @@ -3771,8 +3771,8 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) } } } - ts = (_PyThreadStateImpl *)ts->base.next; } + _Py_FOR_EACH_TSTATE_END(interp); _PyEval_StartTheWorld(interp); if (err) { return NULL; From acef8212b4c92bd0c177b951360a95153c0d7454 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Wed, 15 Jan 2025 13:25:26 +0000 Subject: [PATCH 04/17] add comments --- Include/internal/pycore_tstate.h | 3 +++ Modules/_asynciomodule.c | 17 ++++++++++++++--- Python/pystate.c | 2 ++ 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/Include/internal/pycore_tstate.h b/Include/internal/pycore_tstate.h index 06cfeb1247c499..b55ae3994ee034 100644 --- a/Include/internal/pycore_tstate.h +++ b/Include/internal/pycore_tstate.h @@ -23,6 +23,9 @@ typedef struct _PyThreadStateImpl { PyObject *asyncio_running_loop; // Strong reference + /* Head of circular linked-list of all tasks which are instances of `asyncio.Task` + or subclasses of it used in `asyncio.all_tasks`. + */ struct llist_node asyncio_tasks_head; struct _qsbr_thread_state *qsbr; // only used by free-threaded build struct llist_node mem_free_queue; // delayed free queue diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 8030944162c76f..65cada11422434 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -3747,9 +3747,16 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) return NULL; } int err = 0; - struct llist_node *node; PyInterpreterState *interp = PyInterpreterState_Get(); + // Stop the world and traverse the per-thread linked list + // of asyncio tasks of all threads and add them to the list. + // Stop the world pause is required so that no thread + // modifies it's linked list while being iterated here + // concurrently. + // This design allows for lock free register/unregister of tasks + // of loops running concurrently in different threads. _PyEval_StopTheWorld(interp); + struct llist_node *node; _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { _PyThreadStateImpl *tstate = (_PyThreadStateImpl *)p; struct llist_node *head = &tstate->asyncio_tasks_head; @@ -3764,8 +3771,10 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) // otherwise it gets added to the list. if (_Py_TryIncref((PyObject *)task)) { if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) { - Py_DECREF(tasks); - Py_DECREF(loop); + // do not call any escaping function such as Py_DECREF + // while holding the runtime lock, instead set err=1 and + // call them after releasing the runtime lock + // and starting the world to avoid any deadlocks. err = 1; break; } @@ -3775,6 +3784,8 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) _Py_FOR_EACH_TSTATE_END(interp); _PyEval_StartTheWorld(interp); if (err) { + Py_DECREF(tasks); + Py_DECREF(loop); return NULL; } PyObject *scheduled_iter = PyObject_GetIter(state->non_asyncio_tasks); diff --git a/Python/pystate.c b/Python/pystate.c index a244e1d2b418df..c9ca7c3e9b66f4 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -1699,6 +1699,8 @@ PyThreadState_Clear(PyThreadState *tstate) Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_loop); struct llist_node *node; + // Clear any lingering tasks so that `TaskObj_finalize` doesn't + // try to unregister task from a freed list. llist_for_each_safe(node, &((_PyThreadStateImpl *)tstate)->asyncio_tasks_head) { llist_remove(node); } From 237a089352b8ed226f66935df8a6ae733592efae Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Thu, 16 Jan 2025 17:36:42 +0000 Subject: [PATCH 05/17] add a per interp tasks list --- Include/internal/pycore_interp.h | 2 + Modules/_asynciomodule.c | 87 +++++++++++++++++++++++--------- Python/pystate.c | 14 ++--- 3 files changed, 73 insertions(+), 30 deletions(-) diff --git a/Include/internal/pycore_interp.h b/Include/internal/pycore_interp.h index a3c14dceffd7a0..bbef600203f706 100644 --- a/Include/internal/pycore_interp.h +++ b/Include/internal/pycore_interp.h @@ -228,6 +228,7 @@ struct _is { _PyIndexPool tlbc_indices; #endif + struct llist_node asyncio_tasks_head; // Per-interpreter state for the obmalloc allocator. For the main // interpreter and for all interpreters that don't have their // own obmalloc state, this points to the static structure in @@ -280,6 +281,7 @@ struct _is { struct _Py_interp_cached_objects cached_objects; struct _Py_interp_static_objects static_objects; + Py_ssize_t _interactive_src_count; /* the initial PyInterpreterState.threads.head */ diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 65cada11422434..0283c66fef867c 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -61,6 +61,9 @@ typedef struct TaskObj { PyObject *task_name; PyObject *task_context; struct llist_node task_node; +#ifdef Py_GIL_DISABLED + uintptr_t task_tid; +#endif } TaskObj; typedef struct { @@ -2002,11 +2005,31 @@ static void unregister_task(asyncio_state *state, TaskObj *task) { assert(Task_Check(state, task)); - if (task->task_node.next == NULL) { - // not registered - assert(task->task_node.prev == NULL); - return; +#ifdef Py_GIL_DISABLED + // check if we are in the same thread + // if so, we can avoid locking + if (task->task_tid == _Py_ThreadId()) { + if (task->task_node.next == NULL) { + // not registered + assert(task->task_node.prev == NULL); + return; + } + llist_remove(&task->task_node); + } + else { + // we are in a different thread + // stop the world then check and remove the task + PyThreadState *tstate = _PyThreadState_GET(); + _PyEval_StopTheWorld(tstate->interp); + if (task->task_node.next == NULL) { + // not registered + assert(task->task_node.prev == NULL); + } + else { + llist_remove(&task->task_node); + } } +#endif llist_remove(&task->task_node); } @@ -2162,6 +2185,9 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop, } Py_CLEAR(self->task_fut_waiter); +#ifdef Py_GIL_DISABLED + self->task_tid = _Py_ThreadId(); +#endif self->task_must_cancel = 0; self->task_log_destroy_pending = 1; self->task_num_cancels_requested = 0; @@ -3708,6 +3734,32 @@ add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *lo return 0; } +static inline int +add_tasks_from_head(struct llist_node *head, PyListObject *tasks) +{ + struct llist_node *node; + llist_for_each_safe(node, head) { + TaskObj *task = llist_data(node, TaskObj, task_node); + // The linked list holds borrowed references to task + // as such it is possible that the task is concurrently + // deallocated while added to this list. + // To protect against concurrent deallocations, + // we first try to incref the task which would fail + // if it is concurrently getting deallocated in another thread, + // otherwise it gets added to the list. + if (_Py_TryIncref((PyObject *)task)) { + if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) { + // do not call any escaping function such as Py_DECREF + // while holding the runtime lock, instead set err=1 and + // call them after releasing the runtime lock + // and starting the world to avoid any deadlocks. + return -1; + } + } + } + return 0; +} + /*********************** Module **************************/ /*[clinic input] @@ -3756,31 +3808,18 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) // This design allows for lock free register/unregister of tasks // of loops running concurrently in different threads. _PyEval_StopTheWorld(interp); - struct llist_node *node; _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { _PyThreadStateImpl *tstate = (_PyThreadStateImpl *)p; struct llist_node *head = &tstate->asyncio_tasks_head; - llist_for_each_safe(node, head) { - TaskObj *task = llist_data(node, TaskObj, task_node); - // The linked list holds borrowed references to task - // as such it is possible that the task is concurrently - // deallocated while added to this list. - // To protect against concurrent deallocations, - // we first try to incref the task which would fail - // if it is concurrently getting deallocated in another thread, - // otherwise it gets added to the list. - if (_Py_TryIncref((PyObject *)task)) { - if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) { - // do not call any escaping function such as Py_DECREF - // while holding the runtime lock, instead set err=1 and - // call them after releasing the runtime lock - // and starting the world to avoid any deadlocks. - err = 1; - break; - } - } + if (add_tasks_from_head(head, (PyListObject *)tasks) < 0) { + err = 1; + break; } } + // traverse the linked list of the interpreter + if (err == 0 && add_tasks_from_head(&interp->asyncio_tasks_head, (PyListObject *)tasks) < 0) { + err = 1; + } _Py_FOR_EACH_TSTATE_END(interp); _PyEval_StartTheWorld(interp); if (err) { diff --git a/Python/pystate.c b/Python/pystate.c index c9ca7c3e9b66f4..3d97b1b9f0fd38 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -643,6 +643,7 @@ init_interpreter(PyInterpreterState *interp, _Py_brc_init_state(interp); #endif llist_init(&interp->mem_free_queue.head); + llist_init(&interp->asyncio_tasks_head); for (int i = 0; i < _PY_MONITORING_UNGROUPED_EVENTS; i++) { interp->monitors.tools[i] = 0; } @@ -1698,12 +1699,13 @@ PyThreadState_Clear(PyThreadState *tstate) Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_loop); - struct llist_node *node; - // Clear any lingering tasks so that `TaskObj_finalize` doesn't - // try to unregister task from a freed list. - llist_for_each_safe(node, &((_PyThreadStateImpl *)tstate)->asyncio_tasks_head) { - llist_remove(node); - } + + _PyEval_StopTheWorld(tstate->interp); + // merge any lingering tasks from thread state to interpreter's + // tasks list + llist_concat(&tstate->interp->asyncio_tasks_head, + &((_PyThreadStateImpl *)tstate)->asyncio_tasks_head); + _PyEval_StartTheWorld(tstate->interp); Py_CLEAR(tstate->dict); Py_CLEAR(tstate->async_exc); From c12f2719628c10da3c64df3b5a7b74766495cbd4 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Thu, 16 Jan 2025 17:47:46 +0000 Subject: [PATCH 06/17] fixup for regular builds --- Modules/_asynciomodule.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 0283c66fef867c..f04a9176928e2c 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -2029,8 +2029,14 @@ unregister_task(asyncio_state *state, TaskObj *task) llist_remove(&task->task_node); } } -#endif +#else + if (task->task_node.next == NULL) { + // not registered + assert(task->task_node.prev == NULL); + return; + } llist_remove(&task->task_node); +#endif } static int From d36270f6dd8d16ed8f649503f8aeecb66bf9f559 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Thu, 16 Jan 2025 17:49:39 +0000 Subject: [PATCH 07/17] fix missing start world --- Modules/_asynciomodule.c | 1 + 1 file changed, 1 insertion(+) diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index f04a9176928e2c..127bce84e393b1 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -2028,6 +2028,7 @@ unregister_task(asyncio_state *state, TaskObj *task) else { llist_remove(&task->task_node); } + _PyEval_StartTheWorld(tstate->interp); } #else if (task->task_node.next == NULL) { From d0fbbc21021ed70fc060aa728e5e95f67252bc4f Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Tue, 21 Jan 2025 11:27:43 +0000 Subject: [PATCH 08/17] add interp llist --- Include/internal/pycore_interp.h | 5 +- Modules/_asynciomodule.c | 88 +++++++++++++++++--------------- 2 files changed, 50 insertions(+), 43 deletions(-) diff --git a/Include/internal/pycore_interp.h b/Include/internal/pycore_interp.h index 1cd2d9ba5a9550..b5f8960904f4d7 100644 --- a/Include/internal/pycore_interp.h +++ b/Include/internal/pycore_interp.h @@ -227,7 +227,9 @@ struct _is { PyMutex weakref_locks[NUM_WEAKREF_LIST_LOCKS]; _PyIndexPool tlbc_indices; #endif - + // Per-interpreter list of tasks, any lingering tasks from thread + // states gets added here and removed from the corresponding + // thread state's list. struct llist_node asyncio_tasks_head; // Per-interpreter state for the obmalloc allocator. For the main // interpreter and for all interpreters that don't have their @@ -281,7 +283,6 @@ struct _is { struct _Py_interp_cached_objects cached_objects; struct _Py_interp_static_objects static_objects; - Py_ssize_t _interactive_src_count; /* the initial PyInterpreterState.threads.head */ diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 127bce84e393b1..a84f6d15db6f3b 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -62,6 +62,7 @@ typedef struct TaskObj { PyObject *task_context; struct llist_node task_node; #ifdef Py_GIL_DISABLED + // thread id of the thread where this task was created uintptr_t task_tid; #endif } TaskObj; @@ -2001,6 +2002,17 @@ register_eager_task(asyncio_state *state, PyObject *task) return PySet_Add(state->eager_tasks, task); } +static inline void +unregister_task_safe(TaskObj *task) +{ + if (task->task_node.next == NULL) { + // not registered + assert(task->task_node.prev == NULL); + return; + } + llist_remove(&task->task_node); +} + static void unregister_task(asyncio_state *state, TaskObj *task) { @@ -2009,34 +2021,18 @@ unregister_task(asyncio_state *state, TaskObj *task) // check if we are in the same thread // if so, we can avoid locking if (task->task_tid == _Py_ThreadId()) { - if (task->task_node.next == NULL) { - // not registered - assert(task->task_node.prev == NULL); - return; - } - llist_remove(&task->task_node); + unregister_task_safe(task); } else { // we are in a different thread // stop the world then check and remove the task PyThreadState *tstate = _PyThreadState_GET(); _PyEval_StopTheWorld(tstate->interp); - if (task->task_node.next == NULL) { - // not registered - assert(task->task_node.prev == NULL); - } - else { - llist_remove(&task->task_node); - } + unregister_task_safe(task); _PyEval_StartTheWorld(tstate->interp); } #else - if (task->task_node.next == NULL) { - // not registered - assert(task->task_node.prev == NULL); - return; - } - llist_remove(&task->task_node); + unregister_task_safe(task); #endif } @@ -3719,6 +3715,7 @@ _asyncio_current_task_impl(PyObject *module, PyObject *loop) static inline int add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *loop) { + assert(PySet_CheckExact(tasks)); PyObject *done = PyObject_CallMethodNoArgs(task, &_Py_ID(done)); if (done == NULL) { return -1; @@ -3742,8 +3739,16 @@ add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *lo } static inline int -add_tasks_from_head(struct llist_node *head, PyListObject *tasks) +add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks) { +#ifdef Py_GIL_DISABLED + assert(interp->stoptheworld.world_stopped); +#endif + // Start traversing from interpreter's linked list + struct llist_node *head = &interp->asyncio_tasks_head; + _PyThreadStateImpl *thead = (_PyThreadStateImpl *)interp->threads.head; + +traverse: struct llist_node *node; llist_for_each_safe(node, head) { TaskObj *task = llist_data(node, TaskObj, task_node); @@ -3756,14 +3761,17 @@ add_tasks_from_head(struct llist_node *head, PyListObject *tasks) // otherwise it gets added to the list. if (_Py_TryIncref((PyObject *)task)) { if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) { - // do not call any escaping function such as Py_DECREF - // while holding the runtime lock, instead set err=1 and - // call them after releasing the runtime lock - // and starting the world to avoid any deadlocks. + // do not call any escaping calls here while holding the runtime lock. return -1; } } } + // traverse the linked lists of thread states + if (thead != NULL) { + head = &thead->asyncio_tasks_head; + thead = (_PyThreadStateImpl *)thead->base.next; + goto traverse; + } return 0; } @@ -3805,31 +3813,29 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) Py_DECREF(loop); return NULL; } - int err = 0; PyInterpreterState *interp = PyInterpreterState_Get(); // Stop the world and traverse the per-thread linked list - // of asyncio tasks of all threads and add them to the list. + // of asyncio tasks of all threads and the interpreter's + // linked list and them to tasks list. + // The interpreter linked list is used for any lingering tasks + // whose thread state has been deallocated but the task is + // still alive. This can happen if task is referenced by a + // different thread, in which case the task is moved to the + // interpreter's linked list from the thread's linked list + // before deallocation. // Stop the world pause is required so that no thread // modifies it's linked list while being iterated here // concurrently. // This design allows for lock free register/unregister of tasks - // of loops running concurrently in different threads. + // of loops running concurrently in different threads (general case). _PyEval_StopTheWorld(interp); - _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { - _PyThreadStateImpl *tstate = (_PyThreadStateImpl *)p; - struct llist_node *head = &tstate->asyncio_tasks_head; - if (add_tasks_from_head(head, (PyListObject *)tasks) < 0) { - err = 1; - break; - } - } - // traverse the linked list of the interpreter - if (err == 0 && add_tasks_from_head(&interp->asyncio_tasks_head, (PyListObject *)tasks) < 0) { - err = 1; - } - _Py_FOR_EACH_TSTATE_END(interp); + HEAD_LOCK(interp->runtime); + int ret = add_tasks_interp(interp, (PyListObject *)tasks); + HEAD_UNLOCK(interp->runtime); _PyEval_StartTheWorld(interp); - if (err) { + if (ret < 0) { + // call any escaping calls after releasing the runtime lock + // and starting the world to avoid any deadlocks. Py_DECREF(tasks); Py_DECREF(loop); return NULL; From 5f5b95edd5d8813c71e60d2e75f34272024f6177 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Tue, 21 Jan 2025 12:31:09 +0000 Subject: [PATCH 09/17] add more tests --- Lib/test/test_asyncio/test_free_threading.py | 40 ++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/Lib/test/test_asyncio/test_free_threading.py b/Lib/test/test_asyncio/test_free_threading.py index 05106a2c2fe3f6..f5f65cdbb70758 100644 --- a/Lib/test/test_asyncio/test_free_threading.py +++ b/Lib/test/test_asyncio/test_free_threading.py @@ -1,5 +1,8 @@ import asyncio import unittest +import threading +import weakref +from test import support from threading import Thread from unittest import TestCase @@ -58,6 +61,43 @@ def runner(): with threading_helper.start_threads(threads): pass + def test_all_tasks_different_thread(self) -> None: + task = None + loop = None + started = threading.Event() + stop = threading.Event() + done = False + async def func(): + nonlocal task, loop, done + loop = asyncio.get_running_loop() + task = asyncio.current_task() + started.set() + while not stop.is_set(): + await asyncio.sleep(0) + + thread = Thread(target=lambda: asyncio.run(func())) + with threading_helper.start_threads([thread]): + started.wait() + self.assertSetEqual(asyncio.all_tasks(loop), {task}) + self.assertIs(task.get_loop(), loop) + stop.set() + + def test_all_tasks_different_thread_finalized(self) -> None: + task = None + loop = asyncio.EventLoop() + async def func(): + nonlocal task + task = asyncio.current_task() + + loop.run_until_complete(func()) + + self.assertEqual(self.all_tasks(loop), set()) + wr = weakref.ref(task) + del task + # task finalization in different thread shoudn't crash + support.gc_collect() + self.assertIsNone(wr()) + def test_run_coroutine_threadsafe(self) -> None: results = [] From 708b410b848e0ea35f0533d9ca7a02ff64d7f367 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Tue, 21 Jan 2025 13:45:26 +0000 Subject: [PATCH 10/17] fix test --- Lib/test/test_asyncio/test_free_threading.py | 31 ++++++++++---------- Tools/tsan/suppressions_free_threading.txt | 2 ++ 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/Lib/test/test_asyncio/test_free_threading.py b/Lib/test/test_asyncio/test_free_threading.py index f5f65cdbb70758..7fbd4bd0557f71 100644 --- a/Lib/test/test_asyncio/test_free_threading.py +++ b/Lib/test/test_asyncio/test_free_threading.py @@ -1,11 +1,11 @@ import asyncio -import unittest import threading +import unittest import weakref -from test import support from threading import Thread from unittest import TestCase +from test import support from test.support import threading_helper threading_helper.requires_working_threading(module=True) @@ -63,38 +63,37 @@ def runner(): def test_all_tasks_different_thread(self) -> None: task = None - loop = None + loop = asyncio.EventLoop() started = threading.Event() stop = threading.Event() - done = False async def func(): - nonlocal task, loop, done - loop = asyncio.get_running_loop() + nonlocal task task = asyncio.current_task() started.set() - while not stop.is_set(): - await asyncio.sleep(0) + stop.wait() + loop.call_soon_threadsafe(loop.stop) - thread = Thread(target=lambda: asyncio.run(func())) + loop.create_task(func()) + thread = Thread(target=loop.run_forever) with threading_helper.start_threads([thread]): started.wait() self.assertSetEqual(asyncio.all_tasks(loop), {task}) - self.assertIs(task.get_loop(), loop) stop.set() + loop.close() - def test_all_tasks_different_thread_finalized(self) -> None: + def test_task_different_thread_finalized(self) -> None: task = None - loop = asyncio.EventLoop() async def func(): nonlocal task task = asyncio.current_task() - loop.run_until_complete(func()) - - self.assertEqual(self.all_tasks(loop), set()) + thread = Thread(target=lambda: asyncio.run(func())) + thread.start() + thread.join() wr = weakref.ref(task) + del thread del task - # task finalization in different thread shoudn't crash + # task finalization in different thread shouldn't crash support.gc_collect() self.assertIsNone(wr()) diff --git a/Tools/tsan/suppressions_free_threading.txt b/Tools/tsan/suppressions_free_threading.txt index e5eb665ae212de..e7a4ee309c82d3 100644 --- a/Tools/tsan/suppressions_free_threading.txt +++ b/Tools/tsan/suppressions_free_threading.txt @@ -46,3 +46,5 @@ race_top:set_default_allocator_unlocked # https://gist.github.com/mpage/6962e8870606cfc960e159b407a0cb40 thread:pthread_create + +race_top:_gen_getframe \ No newline at end of file From 2c1e93e77f8ef395089309005e52e29652333241 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Tue, 21 Jan 2025 13:50:46 +0000 Subject: [PATCH 11/17] try fix compilation --- Modules/_asynciomodule.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index a84f6d15db6f3b..af27c5569ce6e4 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -3748,8 +3748,8 @@ add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks) struct llist_node *head = &interp->asyncio_tasks_head; _PyThreadStateImpl *thead = (_PyThreadStateImpl *)interp->threads.head; -traverse: struct llist_node *node; +traverse: llist_for_each_safe(node, head) { TaskObj *task = llist_data(node, TaskObj, task_node); // The linked list holds borrowed references to task From c0ce3617d999aca18591934bd3249ad9af32ad0f Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Tue, 21 Jan 2025 13:52:09 +0000 Subject: [PATCH 12/17] remove supp --- Tools/tsan/suppressions_free_threading.txt | 2 -- 1 file changed, 2 deletions(-) diff --git a/Tools/tsan/suppressions_free_threading.txt b/Tools/tsan/suppressions_free_threading.txt index e7a4ee309c82d3..e5eb665ae212de 100644 --- a/Tools/tsan/suppressions_free_threading.txt +++ b/Tools/tsan/suppressions_free_threading.txt @@ -46,5 +46,3 @@ race_top:set_default_allocator_unlocked # https://gist.github.com/mpage/6962e8870606cfc960e159b407a0cb40 thread:pthread_create - -race_top:_gen_getframe \ No newline at end of file From c45c9d7af855da45e30429119ed337314544d95b Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Fri, 24 Jan 2025 17:47:14 +0000 Subject: [PATCH 13/17] fix merge --- Lib/test/test_asyncio/test_free_threading.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Lib/test/test_asyncio/test_free_threading.py b/Lib/test/test_asyncio/test_free_threading.py index da5825a6001661..d0221d87062c5b 100644 --- a/Lib/test/test_asyncio/test_free_threading.py +++ b/Lib/test/test_asyncio/test_free_threading.py @@ -93,6 +93,9 @@ def check(): with threading_helper.start_threads(threads): pass + done.set() + runner.join() + def test_task_different_thread_finalized(self) -> None: task = None async def func(): From 5b33c45ed57705722b3c2185ff48497d10fe486d Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Wed, 29 Jan 2025 12:37:46 +0000 Subject: [PATCH 14/17] code review --- Include/internal/pycore_interp.h | 2 ++ Modules/_asynciomodule.c | 46 +++++++++++++++++++------------- Python/pystate.c | 4 +-- 3 files changed, 31 insertions(+), 21 deletions(-) diff --git a/Include/internal/pycore_interp.h b/Include/internal/pycore_interp.h index b5f8960904f4d7..bce40e7739f14c 100644 --- a/Include/internal/pycore_interp.h +++ b/Include/internal/pycore_interp.h @@ -231,6 +231,8 @@ struct _is { // states gets added here and removed from the corresponding // thread state's list. struct llist_node asyncio_tasks_head; + PyMutex asyncio_tasks_lock; + // Per-interpreter state for the obmalloc allocator. For the main // interpreter and for all interpreters that don't have their // own obmalloc state, this points to the static structure in diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index c3abad7148b200..d2c33b1b822aa5 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -3999,17 +3999,9 @@ add_one_task(asyncio_state *state, PyObject *tasks, PyObject *task, PyObject *lo } static inline int -add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks) +add_tasks_llist(struct llist_node *head, PyListObject *tasks) { -#ifdef Py_GIL_DISABLED - assert(interp->stoptheworld.world_stopped); -#endif - // Start traversing from interpreter's linked list - struct llist_node *head = &interp->asyncio_tasks_head; - _PyThreadStateImpl *thead = (_PyThreadStateImpl *)interp->threads.head; - struct llist_node *node; -traverse: llist_for_each_safe(node, head) { TaskObj *task = llist_data(node, TaskObj, task_node); // The linked list holds borrowed references to task @@ -4021,17 +4013,36 @@ add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks) // otherwise it gets added to the list. if (_Py_TryIncref((PyObject *)task)) { if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) { - // do not call any escaping calls here while holding the runtime lock. + // do not call any escaping calls here while the world is stopped. return -1; } } } - // traverse the linked lists of thread states - if (thead != NULL) { - head = &thead->asyncio_tasks_head; - thead = (_PyThreadStateImpl *)thead->base.next; - goto traverse; + return 0; +} + +static inline int +add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks) +{ +#ifdef Py_GIL_DISABLED + assert(interp->stoptheworld.world_stopped); +#endif + // Start traversing from interpreter's linked list + struct llist_node *head = &interp->asyncio_tasks_head; + + if (add_tasks_llist(head, tasks) < 0) { + return -1; + } + + // traverse the task lists of thread states + _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { + _PyThreadStateImpl *ts = (_PyThreadStateImpl *)p; + head = &ts->asyncio_tasks_head; + if (add_tasks_llist(head, tasks) < 0) { + return -1; + } } + return 0; } @@ -4089,13 +4100,10 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) // This design allows for lock free register/unregister of tasks // of loops running concurrently in different threads (general case). _PyEval_StopTheWorld(interp); - HEAD_LOCK(interp->runtime); int ret = add_tasks_interp(interp, (PyListObject *)tasks); - HEAD_UNLOCK(interp->runtime); _PyEval_StartTheWorld(interp); if (ret < 0) { - // call any escaping calls after releasing the runtime lock - // and starting the world to avoid any deadlocks. + // call any escaping calls after starting the world to avoid any deadlocks. Py_DECREF(tasks); Py_DECREF(loop); return NULL; diff --git a/Python/pystate.c b/Python/pystate.c index 44289c82b6d9ad..dfebaf27e909a5 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -1702,12 +1702,12 @@ PyThreadState_Clear(PyThreadState *tstate) Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_task); - _PyEval_StopTheWorld(tstate->interp); + PyMutex_LockFlags(&tstate->interp->asyncio_tasks_lock, _Py_LOCK_DONT_DETACH); // merge any lingering tasks from thread state to interpreter's // tasks list llist_concat(&tstate->interp->asyncio_tasks_head, &((_PyThreadStateImpl *)tstate)->asyncio_tasks_head); - _PyEval_StartTheWorld(tstate->interp); + PyMutex_Unlock(&tstate->interp->asyncio_tasks_lock); Py_CLEAR(tstate->dict); Py_CLEAR(tstate->async_exc); From 1d7384346a91a20e6535c56c22d7e6e7c5992215 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Wed, 29 Jan 2025 12:59:55 +0000 Subject: [PATCH 15/17] cleanup --- Modules/_asynciomodule.c | 11 +++++++---- Python/pystate.c | 3 ++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index d2c33b1b822aa5..743cd97247f805 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -4012,7 +4012,7 @@ add_tasks_llist(struct llist_node *head, PyListObject *tasks) // if it is concurrently getting deallocated in another thread, // otherwise it gets added to the list. if (_Py_TryIncref((PyObject *)task)) { - if (_PyList_AppendTakeRef((PyListObject *)tasks, (PyObject *)task) < 0) { + if (_PyList_AppendTakeRef(tasks, (PyObject *)task) < 0) { // do not call any escaping calls here while the world is stopped. return -1; } @@ -4034,16 +4034,19 @@ add_tasks_interp(PyInterpreterState *interp, PyListObject *tasks) return -1; } + int ret = 0; // traverse the task lists of thread states _Py_FOR_EACH_TSTATE_BEGIN(interp, p) { _PyThreadStateImpl *ts = (_PyThreadStateImpl *)p; head = &ts->asyncio_tasks_head; if (add_tasks_llist(head, tasks) < 0) { - return -1; + ret = -1; + goto exit; } } - - return 0; +exit: + _Py_FOR_EACH_TSTATE_END(interp); + return ret; } /*********************** Module **************************/ diff --git a/Python/pystate.c b/Python/pystate.c index dfebaf27e909a5..67c2541e4ebe66 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -644,6 +644,7 @@ init_interpreter(PyInterpreterState *interp, #endif llist_init(&interp->mem_free_queue.head); llist_init(&interp->asyncio_tasks_head); + interp->asyncio_tasks_lock = (PyMutex){0}; for (int i = 0; i < _PY_MONITORING_UNGROUPED_EVENTS; i++) { interp->monitors.tools[i] = 0; } @@ -1702,7 +1703,7 @@ PyThreadState_Clear(PyThreadState *tstate) Py_CLEAR(((_PyThreadStateImpl *)tstate)->asyncio_running_task); - PyMutex_LockFlags(&tstate->interp->asyncio_tasks_lock, _Py_LOCK_DONT_DETACH); + PyMutex_Lock(&tstate->interp->asyncio_tasks_lock); // merge any lingering tasks from thread state to interpreter's // tasks list llist_concat(&tstate->interp->asyncio_tasks_head, From 4014e706628f55b879e318a150821316729459e2 Mon Sep 17 00:00:00 2001 From: Kumar Aditya Date: Wed, 29 Jan 2025 13:11:58 +0000 Subject: [PATCH 16/17] add comment for asyncio_tasks_lock --- Include/internal/pycore_interp.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Include/internal/pycore_interp.h b/Include/internal/pycore_interp.h index bce40e7739f14c..fc0444ccc25609 100644 --- a/Include/internal/pycore_interp.h +++ b/Include/internal/pycore_interp.h @@ -231,6 +231,8 @@ struct _is { // states gets added here and removed from the corresponding // thread state's list. struct llist_node asyncio_tasks_head; + // `asyncio_tasks_lock` is used when tasks are moved + // from thread's list to interpreter's list. PyMutex asyncio_tasks_lock; // Per-interpreter state for the obmalloc allocator. For the main From 83aec263d25c5101f0b03ff7123d92727fa5a3b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Langa?= Date: Thu, 6 Feb 2025 13:20:58 +0100 Subject: [PATCH 17/17] Reword explanation --- Modules/_asynciomodule.c | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 743cd97247f805..b2586feead647b 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -4089,19 +4089,20 @@ _asyncio_all_tasks_impl(PyObject *module, PyObject *loop) } PyInterpreterState *interp = PyInterpreterState_Get(); // Stop the world and traverse the per-thread linked list - // of asyncio tasks of all threads and the interpreter's - // linked list and them to tasks list. + // of asyncio tasks for every thread, as well as the + // interpreter's linked list, and add them to `tasks`. // The interpreter linked list is used for any lingering tasks - // whose thread state has been deallocated but the task is - // still alive. This can happen if task is referenced by a - // different thread, in which case the task is moved to the - // interpreter's linked list from the thread's linked list - // before deallocation. - // Stop the world pause is required so that no thread - // modifies it's linked list while being iterated here - // concurrently. - // This design allows for lock free register/unregister of tasks - // of loops running concurrently in different threads (general case). + // whose thread state has been deallocated while the task was + // still alive. This can happen if a task is referenced by + // a different thread, in which case the task is moved to + // the interpreter's linked list from the thread's linked + // list before deallocation. See PyThreadState_Clear. + // + // The stop-the-world pause is required so that no thread + // modifies its linked list while being iterated here + // in parallel. This design allows for lock-free + // register_task/unregister_task for loops running in parallel + // in different threads (the general case). _PyEval_StopTheWorld(interp); int ret = add_tasks_interp(interp, (PyListObject *)tasks); _PyEval_StartTheWorld(interp);