From 6313b3d37359b6ee42d933242761d46cb2a7aaf6 Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Sat, 10 May 2025 23:23:18 +0200 Subject: [PATCH 01/10] Add iter_locked --- Doc/library/threading.rst | 14 ++ .../test_threading_iter_locked.py | 81 ++++++++++++ Lib/test/test_threading.py | 14 ++ Modules/_threadmodule.c | 121 +++++++++++++++++- Modules/clinic/_threadmodule.c.h | 34 ++++- 5 files changed, 260 insertions(+), 4 deletions(-) create mode 100644 Lib/test/test_free_threading/test_threading_iter_locked.py diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst index 249c0a5cb035c3..11dd44e8ca27d0 100644 --- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -144,6 +144,20 @@ This module defines the following functions: of the result, even when terminated. +.. function:: locked_iter(iterable) + + Make an iterator thread-safe. + + Roughly equivalent to:: + + class locked_iter(Iterator): + def __init__(self, it): + self._it = iter(it) + self._lock = Lock() + def __next__(self): + with self._lock: + return next(self._it) + .. function:: main_thread() Return the main :class:`Thread` object. In normal conditions, the diff --git a/Lib/test/test_free_threading/test_threading_iter_locked.py b/Lib/test/test_free_threading/test_threading_iter_locked.py new file mode 100644 index 00000000000000..77c8d63e7360e7 --- /dev/null +++ b/Lib/test/test_free_threading/test_threading_iter_locked.py @@ -0,0 +1,81 @@ +import unittest +from threading import Thread, Barrier, iter_locked +from test.support import threading_helper + + +threading_helper.requires_working_threading(module=True) + +class non_atomic_iterator: + + def __init__(self, it): + self.it = iter(it) + + def __iter__(self): + return self + + def __next__(self): + a = next(self.it) + b = next(self.it) + return a, b + +def count(): + i = 0 + while True: + i += 1 + yield i + +class iter_lockedThreading(unittest.TestCase): + + @threading_helper.reap_threads + def test_iter_locked(self): + number_of_threads = 10 + number_of_iterations = 10 + barrier = Barrier(number_of_threads) + def work(it): + while True: + try: + a, b = next(it) + assert a + 1 == b + except StopIteration: + break + + data = tuple(range(400)) + for it in range(number_of_iterations): + iter_locked_iterator = iter_locked(non_atomic_iterator(data,)) + worker_threads = [] + for ii in range(number_of_threads): + worker_threads.append( + Thread(target=work, args=[iter_locked_iterator])) + + with threading_helper.start_threads(worker_threads): + pass + + barrier.reset() + + @threading_helper.reap_threads + def test_iter_locked_generator(self): + number_of_threads = 5 + number_of_iterations = 4 + barrier = Barrier(number_of_threads) + def work(it): + barrier.wait() + for _ in range(1_000): + try: + next(it) + except StopIteration: + break + + for it in range(number_of_iterations): + generator = iter_locked(count()) + worker_threads = [] + for ii in range(number_of_threads): + worker_threads.append( + Thread(target=work, args=[generator])) + + with threading_helper.start_threads(worker_threads): + pass + + barrier.reset() + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index abe63c10c0ac7c..668cb91072f530 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -2416,6 +2416,20 @@ def run_last(): self.assertIn("RuntimeError: can't register atexit after shutdown", err.decode()) +class LockedIterTests(unittest.TestCase): + + def test_locked_iter(self): + for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): + for g in (G, I, Ig, S, L, R): + seq = list(g(s)) + expected = seq + actual = list(serialize(g(s))) + self.assertEqual(actual, expected) + self.assertRaises(TypeError, serialize, X(s)) + self.assertRaises(TypeError, serialize, N(s)) + self.assertRaises(ZeroDivisionError, list, serialize(E(s))) + for arg in [1, True, sys]: + self.assertRaises(TypeError, serialize, arg) if __name__ == "__main__": unittest.main() diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 9776a32755db68..7feca87bf11ab7 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -19,7 +19,6 @@ # include // SIGINT #endif -#include "clinic/_threadmodule.c.h" // ThreadError is just an alias to PyExc_RuntimeError #define ThreadError PyExc_RuntimeError @@ -30,6 +29,7 @@ static struct PyModuleDef thread_module; // Module state typedef struct { PyTypeObject *excepthook_type; + PyTypeObject *iter_locked_type; PyTypeObject *lock_type; PyTypeObject *local_type; PyTypeObject *local_dummy_type; @@ -48,6 +48,17 @@ get_thread_state(PyObject *module) return (thread_module_state *)state; } +static inline thread_module_state * +find_state_by_type(PyTypeObject *tp) +{ + PyObject *mod = PyType_GetModuleByDef(tp, &thread_module); + assert(mod != NULL); + return get_thread_state(mod); +} + +#define clinic_state() (find_state_by_type(type)) +#include "clinic/_threadmodule.c.h" +#undef clinic_state #ifdef MS_WINDOWS typedef HRESULT (WINAPI *PF_GET_THREAD_DESCRIPTION)(HANDLE, PCWSTR*); @@ -59,8 +70,10 @@ static PF_SET_THREAD_DESCRIPTION pSetThreadDescription = NULL; /*[clinic input] module _thread +class _thread.iter_locked "iter_locked_object *" "clinic_state()->iter_locked_type" + [clinic start generated code]*/ -/*[clinic end generated code: output=da39a3ee5e6b4b0d input=be8dbe5cc4b16df7]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=6c78d729dec7bf7e]*/ // _ThreadHandle type @@ -731,6 +744,99 @@ static PyType_Spec ThreadHandle_Type_spec = { ThreadHandle_Type_slots, }; +/* iter_locked object **************************************************************/ + +typedef struct { + PyObject_HEAD + PyObject *it; +} iter_locked_object; + +#define iter_locked_object_CAST(op) ((iter_locked_object *)(op)) + +/*[clinic input] +@classmethod +_thread.iter_locked.__new__ + iterable: object + / +Make an iterator thread-safe. +[clinic start generated code]*/ + +static PyObject * +_thread_iter_locked_impl(PyTypeObject *type, PyObject *iterable) +/*[clinic end generated code: output=4a8ad5a25f7c09ba input=ae6124177726e809]*/ +{ + /* Get iterator. */ + PyObject *it = PyObject_GetIter(iterable); + if (it == NULL) + return NULL; + + iter_locked_object *lz = (iter_locked_object *)type->tp_alloc(type, 0); + lz->it = it; + + return (PyObject *)lz; +} + +static void +iter_locked_dealloc(PyObject *op) +{ + iter_locked_object *lz = iter_locked_object_CAST(op); + PyTypeObject *tp = Py_TYPE(lz); + PyObject_GC_UnTrack(lz); + Py_XDECREF(lz->it); + tp->tp_free(lz); + Py_DECREF(tp); +} + +static int +iter_locked_traverse(PyObject *op, visitproc visit, void *arg) +{ + iter_locked_object *lz = iter_locked_object_CAST(op); + Py_VISIT(Py_TYPE(lz)); + Py_VISIT(lz->it); + return 0; +} + +static PyObject * +iter_locked_next(PyObject *op) +{ + iter_locked_object *lz = iter_locked_object_CAST(op); + PyObject *result = NULL; + + Py_BEGIN_CRITICAL_SECTION(lz->it); // or lock on op? + PyObject *it = lz->it; + if (it != NULL) { + result = PyIter_Next(lz->it); + if (result == NULL) { + /* Note: StopIteration is already cleared by PyIter_Next() */ + if (PyErr_Occurred()) + return NULL; + Py_CLEAR(lz->it); + } + } + Py_END_CRITICAL_SECTION(); + return result; +} + +static PyType_Slot iter_locked_slots[] = { + {Py_tp_dealloc, iter_locked_dealloc}, + {Py_tp_getattro, PyObject_GenericGetAttr}, + {Py_tp_doc, (void *)_thread_iter_locked__doc__}, + {Py_tp_traverse, iter_locked_traverse}, + {Py_tp_iter, PyObject_SelfIter}, + {Py_tp_iternext, iter_locked_next}, + {Py_tp_new, _thread_iter_locked}, + {Py_tp_free, PyObject_GC_Del}, + {0, NULL}, +}; + +static PyType_Spec iter_locked_spec = { + .name = "threading.iter_locked", + .basicsize = sizeof(iter_locked_object), + .flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE | + Py_TPFLAGS_IMMUTABLETYPE), + .slots = iter_locked_slots, +}; + /* Lock objects */ typedef struct { @@ -2631,6 +2737,15 @@ thread_module_exec(PyObject *module) return -1; } + // iter_locked + state->iter_locked_type = (PyTypeObject *)PyType_FromModuleAndSpec(module, &iter_locked_spec, NULL); + if (state->iter_locked_type == NULL) { + return -1; + } + if (PyModule_AddType(module, state->iter_locked_type) < 0) { + return -1; + } + // Lock state->lock_type = (PyTypeObject *)PyType_FromModuleAndSpec(module, &lock_type_spec, NULL); if (state->lock_type == NULL) { @@ -2739,6 +2854,7 @@ thread_module_traverse(PyObject *module, visitproc visit, void *arg) { thread_module_state *state = get_thread_state(module); Py_VISIT(state->excepthook_type); + Py_CLEAR(state->iter_locked_type); Py_VISIT(state->lock_type); Py_VISIT(state->local_type); Py_VISIT(state->local_dummy_type); @@ -2751,6 +2867,7 @@ thread_module_clear(PyObject *module) { thread_module_state *state = get_thread_state(module); Py_CLEAR(state->excepthook_type); + Py_CLEAR(state->iter_locked_type); Py_CLEAR(state->lock_type); Py_CLEAR(state->local_type); Py_CLEAR(state->local_dummy_type); diff --git a/Modules/clinic/_threadmodule.c.h b/Modules/clinic/_threadmodule.c.h index 8930e54170caf4..5f4a9d3e23a771 100644 --- a/Modules/clinic/_threadmodule.c.h +++ b/Modules/clinic/_threadmodule.c.h @@ -6,7 +6,37 @@ preserve # include "pycore_gc.h" // PyGC_Head # include "pycore_runtime.h" // _Py_ID() #endif -#include "pycore_modsupport.h" // _PyArg_UnpackKeywords() +#include "pycore_modsupport.h" // _PyArg_CheckPositional() + +PyDoc_STRVAR(_thread_iter_locked__doc__, +"iter_locked(iterable, /)\n" +"--\n" +"\n" +"Make an iterator thread-safe."); + +static PyObject * +_thread_iter_locked_impl(PyTypeObject *type, PyObject *iterable); + +static PyObject * +_thread_iter_locked(PyTypeObject *type, PyObject *args, PyObject *kwargs) +{ + PyObject *return_value = NULL; + PyTypeObject *base_tp = clinic_state()->iter_locked_type; + PyObject *iterable; + + if ((type == base_tp || type->tp_init == base_tp->tp_init) && + !_PyArg_NoKeywords("iter_locked", kwargs)) { + goto exit; + } + if (!_PyArg_CheckPositional("iter_locked", PyTuple_GET_SIZE(args), 1, 1)) { + goto exit; + } + iterable = PyTuple_GET_ITEM(args, 0); + return_value = _thread_iter_locked_impl(type, iterable); + +exit: + return return_value; +} #if (defined(HAVE_PTHREAD_GETNAME_NP) || defined(HAVE_PTHREAD_GET_NAME_NP) || defined(MS_WINDOWS)) @@ -103,4 +133,4 @@ _thread_set_name(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyOb #ifndef _THREAD_SET_NAME_METHODDEF #define _THREAD_SET_NAME_METHODDEF #endif /* !defined(_THREAD_SET_NAME_METHODDEF) */ -/*[clinic end generated code: output=e978dc4615b9bc35 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=a890435ac29024b2 input=a9049054013a1b77]*/ From 5ced0a988243a25a91b0cbfda41bc27de6c684f5 Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Sun, 11 May 2025 21:30:06 +0200 Subject: [PATCH 02/10] cleanup --- .../test_threading_iter_locked.py | 1 + Lib/test/test_threading.py | 19 ++++----- Lib/threading.py | 3 +- Modules/_threadmodule.c | 42 +++++++++---------- Modules/clinic/_threadmodule.c.h | 4 +- 5 files changed, 32 insertions(+), 37 deletions(-) diff --git a/Lib/test/test_free_threading/test_threading_iter_locked.py b/Lib/test/test_free_threading/test_threading_iter_locked.py index 77c8d63e7360e7..8f3ac77b71ce74 100644 --- a/Lib/test/test_free_threading/test_threading_iter_locked.py +++ b/Lib/test/test_free_threading/test_threading_iter_locked.py @@ -41,6 +41,7 @@ def work(it): data = tuple(range(400)) for it in range(number_of_iterations): + print(f'test_iter_locked {it=}') iter_locked_iterator = iter_locked(non_atomic_iterator(data,)) worker_threads = [] for ii in range(number_of_threads): diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 668cb91072f530..8ffa215b71669b 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -2418,18 +2418,13 @@ def run_last(): class LockedIterTests(unittest.TestCase): - def test_locked_iter(self): - for s in ("123", "", range(1000), ('do', 1.2), range(2000,2200,5)): - for g in (G, I, Ig, S, L, R): - seq = list(g(s)) - expected = seq - actual = list(serialize(g(s))) - self.assertEqual(actual, expected) - self.assertRaises(TypeError, serialize, X(s)) - self.assertRaises(TypeError, serialize, N(s)) - self.assertRaises(ZeroDivisionError, list, serialize(E(s))) - for arg in [1, True, sys]: - self.assertRaises(TypeError, serialize, arg) + def test_iter_locked(self): + for s in ("123", [], [1, 2, 3], tuple(), (1, 2, 3)): + expected = list(s) + actual = list(threading.iter_locked(s)) + self.assertEqual(actual, expected) + for arg in [1, None, True, sys]: + self.assertRaises(TypeError, threading.iter_locked, arg) if __name__ == "__main__": unittest.main() diff --git a/Lib/threading.py b/Lib/threading.py index 39a1a7f4cdfda0..9ea7e25927674c 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -29,7 +29,7 @@ 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size', 'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile', - 'setprofile_all_threads','settrace_all_threads'] + 'setprofile_all_threads','settrace_all_threads', 'iter_locked'] # Rename some stuff so "from threading import *" is safe _start_joinable_thread = _thread.start_joinable_thread @@ -42,6 +42,7 @@ get_ident = _thread.get_ident _get_main_thread_ident = _thread._get_main_thread_ident _is_main_interpreter = _thread._is_main_interpreter +iter_locked = _thread.iter_locked try: get_native_id = _thread.get_native_id _HAVE_THREAD_NATIVE_ID = True diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 7feca87bf11ab7..8b69e0be6c9676 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -56,9 +56,7 @@ find_state_by_type(PyTypeObject *tp) return get_thread_state(mod); } -#define clinic_state() (find_state_by_type(type)) #include "clinic/_threadmodule.c.h" -#undef clinic_state #ifdef MS_WINDOWS typedef HRESULT (WINAPI *PF_GET_THREAD_DESCRIPTION)(HANDLE, PCWSTR*); @@ -70,10 +68,10 @@ static PF_SET_THREAD_DESCRIPTION pSetThreadDescription = NULL; /*[clinic input] module _thread -class _thread.iter_locked "iter_locked_object *" "clinic_state()->iter_locked_type" +class _thread.iter_locked "iter_locked_object *" "find_state_by_type(type)->iter_locked_type" [clinic start generated code]*/ -/*[clinic end generated code: output=da39a3ee5e6b4b0d input=6c78d729dec7bf7e]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=cc495aee1743488d]*/ // _ThreadHandle type @@ -770,20 +768,24 @@ _thread_iter_locked_impl(PyTypeObject *type, PyObject *iterable) if (it == NULL) return NULL; - iter_locked_object *lz = (iter_locked_object *)type->tp_alloc(type, 0); - lz->it = it; + iter_locked_object *il = (iter_locked_object *)type->tp_alloc(type, 0); + if (il == NULL) { + Py_DECREF(it); + return NULL; + } + il->it = it; - return (PyObject *)lz; + return (PyObject *)il; } static void iter_locked_dealloc(PyObject *op) { - iter_locked_object *lz = iter_locked_object_CAST(op); - PyTypeObject *tp = Py_TYPE(lz); - PyObject_GC_UnTrack(lz); - Py_XDECREF(lz->it); - tp->tp_free(lz); + iter_locked_object *il = iter_locked_object_CAST(op); + PyTypeObject *tp = Py_TYPE(il); + PyObject_GC_UnTrack(il); + Py_DECREF(il->it); + tp->tp_free(il); Py_DECREF(tp); } @@ -802,16 +804,12 @@ iter_locked_next(PyObject *op) iter_locked_object *lz = iter_locked_object_CAST(op); PyObject *result = NULL; - Py_BEGIN_CRITICAL_SECTION(lz->it); // or lock on op? + Py_BEGIN_CRITICAL_SECTION(op); // lock on op or lz->it? PyObject *it = lz->it; - if (it != NULL) { - result = PyIter_Next(lz->it); - if (result == NULL) { - /* Note: StopIteration is already cleared by PyIter_Next() */ - if (PyErr_Occurred()) - return NULL; - Py_CLEAR(lz->it); - } + result = PyIter_Next(it); + if (result == NULL) { + /* Note: StopIteration is already cleared by PyIter_Next() */ + /* If PyErr_Occurred() we will also return NULL*/ } Py_END_CRITICAL_SECTION(); return result; @@ -2854,7 +2852,7 @@ thread_module_traverse(PyObject *module, visitproc visit, void *arg) { thread_module_state *state = get_thread_state(module); Py_VISIT(state->excepthook_type); - Py_CLEAR(state->iter_locked_type); + Py_VISIT(state->iter_locked_type); Py_VISIT(state->lock_type); Py_VISIT(state->local_type); Py_VISIT(state->local_dummy_type); diff --git a/Modules/clinic/_threadmodule.c.h b/Modules/clinic/_threadmodule.c.h index 5f4a9d3e23a771..05c2d538ea45b6 100644 --- a/Modules/clinic/_threadmodule.c.h +++ b/Modules/clinic/_threadmodule.c.h @@ -21,7 +21,7 @@ static PyObject * _thread_iter_locked(PyTypeObject *type, PyObject *args, PyObject *kwargs) { PyObject *return_value = NULL; - PyTypeObject *base_tp = clinic_state()->iter_locked_type; + PyTypeObject *base_tp = find_state_by_type(type)->iter_locked_type; PyObject *iterable; if ((type == base_tp || type->tp_init == base_tp->tp_init) && @@ -133,4 +133,4 @@ _thread_set_name(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyOb #ifndef _THREAD_SET_NAME_METHODDEF #define _THREAD_SET_NAME_METHODDEF #endif /* !defined(_THREAD_SET_NAME_METHODDEF) */ -/*[clinic end generated code: output=a890435ac29024b2 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=9555ccecaeb48d69 input=a9049054013a1b77]*/ From 110332e7ca0754f28871e7955f0f71208b32fd5a Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Sun, 11 May 2025 21:44:58 +0200 Subject: [PATCH 03/10] cleanup --- Doc/library/threading.rst | 8 +++++--- .../test_free_threading/test_threading_iter_locked.py | 1 - Lib/test/test_threading.py | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst index 11dd44e8ca27d0..53bbef9df59b7f 100644 --- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -144,13 +144,13 @@ This module defines the following functions: of the result, even when terminated. -.. function:: locked_iter(iterable) +.. function:: iter_locked(iterable) - Make an iterator thread-safe. + Make an iterable thread-safe. Roughly equivalent to:: - class locked_iter(Iterator): + class iter_locked(Iterator): def __init__(self, it): self._it = iter(it) self._lock = Lock() @@ -158,6 +158,8 @@ This module defines the following functions: with self._lock: return next(self._it) + .. versionadded:: next + .. function:: main_thread() Return the main :class:`Thread` object. In normal conditions, the diff --git a/Lib/test/test_free_threading/test_threading_iter_locked.py b/Lib/test/test_free_threading/test_threading_iter_locked.py index 8f3ac77b71ce74..77c8d63e7360e7 100644 --- a/Lib/test/test_free_threading/test_threading_iter_locked.py +++ b/Lib/test/test_free_threading/test_threading_iter_locked.py @@ -41,7 +41,6 @@ def work(it): data = tuple(range(400)) for it in range(number_of_iterations): - print(f'test_iter_locked {it=}') iter_locked_iterator = iter_locked(non_atomic_iterator(data,)) worker_threads = [] for ii in range(number_of_threads): diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 8ffa215b71669b..873dfc4b2ab4c7 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -2416,7 +2416,7 @@ def run_last(): self.assertIn("RuntimeError: can't register atexit after shutdown", err.decode()) -class LockedIterTests(unittest.TestCase): +class IterLockedTests(unittest.TestCase): def test_iter_locked(self): for s in ("123", [], [1, 2, 3], tuple(), (1, 2, 3)): From b6756311cb5e104ec7dcb87586ca3902c346d15c Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sun, 11 May 2025 19:54:11 +0000 Subject: [PATCH 04/10] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst diff --git a/Misc/NEWS.d/next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst b/Misc/NEWS.d/next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst new file mode 100644 index 00000000000000..8c6dbadabd8f2e --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst @@ -0,0 +1 @@ +Add :meth:`threading.iter_locked` to make iterators thread-safe. From 4406050fd6b81f8f130015c222a7a6c73a936c13 Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Mon, 12 May 2025 12:07:38 +0200 Subject: [PATCH 05/10] update docs --- Doc/library/threading.rst | 30 +++++++++++++++++-- ...-05-11-19-54-05.gh-issue-124397.MwtP6-.rst | 2 +- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst index 53bbef9df59b7f..a090d45f623ba7 100644 --- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -146,9 +146,35 @@ This module defines the following functions: .. function:: iter_locked(iterable) - Make an iterable thread-safe. + Convert an iterable into an iterator that performs iteration using locks. - Roughly equivalent to:: + The ``iter_locked`` makes non-atomic iterators atomic:: + + class non_atomic_iterator: + + def __init__(self, it): + self.it = iter(it) + + def __iter__(self): + return self + + def __next__(self): + a = next(self.it) + b = next(self.it) + return a, b + + atomic_iterator = iter_locked(non_atomic_iterator()) + + The ``iter_locked`` allows concurrent iteration over generator objects. For example:: + + def count(): + i = 0 + while True: + i += 1 + yield i + concurrent_iterator = iter_locked(count()) + + The implementation is roughly equivalent to:: class iter_locked(Iterator): def __init__(self, it): diff --git a/Misc/NEWS.d/next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst b/Misc/NEWS.d/next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst index 8c6dbadabd8f2e..c415cd8ece6202 100644 --- a/Misc/NEWS.d/next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst +++ b/Misc/NEWS.d/next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst @@ -1 +1 @@ -Add :meth:`threading.iter_locked` to make iterators thread-safe. +Add :meth:`threading.iter_locked` to make concurrent iteration over an iterable execute using a lock. From d30ace8e7746d3f6c3ed3c8e3cf19999fab8ad03 Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Sun, 1 Jun 2025 21:32:34 +0200 Subject: [PATCH 06/10] review comments --- .../test_free_threading/test_threading_iter_locked.py | 2 ++ Modules/_threadmodule.c | 9 +++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_free_threading/test_threading_iter_locked.py b/Lib/test/test_free_threading/test_threading_iter_locked.py index 77c8d63e7360e7..d43e1727189d76 100644 --- a/Lib/test/test_free_threading/test_threading_iter_locked.py +++ b/Lib/test/test_free_threading/test_threading_iter_locked.py @@ -1,3 +1,4 @@ +import time import unittest from threading import Thread, Barrier, iter_locked from test.support import threading_helper @@ -15,6 +16,7 @@ def __iter__(self): def __next__(self): a = next(self.it) + time.sleep(0) b = next(self.it) return a, b diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 8b69e0be6c9676..20c24a5f88e412 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -747,6 +747,7 @@ static PyType_Spec ThreadHandle_Type_spec = { typedef struct { PyObject_HEAD PyObject *it; + PyMutex lock; } iter_locked_object; #define iter_locked_object_CAST(op) ((iter_locked_object *)(op)) @@ -774,6 +775,7 @@ _thread_iter_locked_impl(PyTypeObject *type, PyObject *iterable) return NULL; } il->it = it; + il->lock = (PyMutex){0}; return (PyObject *)il; } @@ -804,14 +806,17 @@ iter_locked_next(PyObject *op) iter_locked_object *lz = iter_locked_object_CAST(op); PyObject *result = NULL; - Py_BEGIN_CRITICAL_SECTION(op); // lock on op or lz->it? + // we cannot use Py_BEGIN_CRITICAL_SECTION as it is not available in the normal build + PyMutex_Lock(&(lz->lock)); + PyObject *it = lz->it; result = PyIter_Next(it); if (result == NULL) { /* Note: StopIteration is already cleared by PyIter_Next() */ /* If PyErr_Occurred() we will also return NULL*/ } - Py_END_CRITICAL_SECTION(); + PyMutex_Unlock(&(lz->lock)); + return result; } From baec5bce294d5b8ce2cf1b4ede0e2ac58bbd5567 Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Sun, 1 Jun 2025 21:45:21 +0200 Subject: [PATCH 07/10] use explicit lock --- Modules/_threadmodule.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 20c24a5f88e412..c89bdd008b3264 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -809,9 +809,8 @@ iter_locked_next(PyObject *op) // we cannot use Py_BEGIN_CRITICAL_SECTION as it is not available in the normal build PyMutex_Lock(&(lz->lock)); - PyObject *it = lz->it; - result = PyIter_Next(it); - if (result == NULL) { + int v = PyIter_NextItem(lz->it, &result); + if (v == -1) { /* Note: StopIteration is already cleared by PyIter_Next() */ /* If PyErr_Occurred() we will also return NULL*/ } From 7315c4916be445426cdb95cc8fe785d4a6bf92d4 Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Sun, 1 Jun 2025 22:46:23 +0200 Subject: [PATCH 08/10] fix import --- Modules/_threadmodule.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 9b8b4da5e6ebd1..cce5a8fb2c49bd 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -67,8 +67,6 @@ find_state_by_type(PyTypeObject *tp) return get_thread_state(mod); } -#include "clinic/_threadmodule.c.h" - #ifdef MS_WINDOWS typedef HRESULT (WINAPI *PF_GET_THREAD_DESCRIPTION)(HANDLE, PCWSTR*); From b7c9d77f5e5c7de61e31156e0010e9d2db0af8ba Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Sun, 1 Jun 2025 22:50:49 +0200 Subject: [PATCH 09/10] cleanup after merge to main --- Modules/_threadmodule.c | 13 ++----------- Modules/clinic/_threadmodule.c.h | 4 ++-- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index cce5a8fb2c49bd..f092b2f7c422ad 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -18,7 +18,6 @@ # include // SIGINT #endif - // ThreadError is just an alias to PyExc_RuntimeError #define ThreadError PyExc_RuntimeError @@ -59,14 +58,6 @@ get_thread_state_by_cls(PyTypeObject *cls) return get_thread_state(module); } -static inline thread_module_state * -find_state_by_type(PyTypeObject *tp) -{ - PyObject *mod = PyType_GetModuleByDef(tp, &thread_module); - assert(mod != NULL); - return get_thread_state(mod); -} - #ifdef MS_WINDOWS typedef HRESULT (WINAPI *PF_GET_THREAD_DESCRIPTION)(HANDLE, PCWSTR*); @@ -78,12 +69,12 @@ static PF_SET_THREAD_DESCRIPTION pSetThreadDescription = NULL; /*[clinic input] module _thread -class _thread.iter_locked "iter_locked_object *" "find_state_by_type(type)->iter_locked_type" +class _thread.iter_locked "iter_locked_object *" "clinic_state()->iter_locked_type" class _thread.lock "lockobject *" "clinic_state()->lock_type" class _thread.RLock "rlockobject *" "clinic_state()->rlock_type" [clinic start generated code]*/ -/*[clinic end generated code: output=da39a3ee5e6b4b0d input=6afac42991bc8469]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=1a4bf65233f83eae]*/ #define clinic_state() get_thread_state_by_cls(type) #include "clinic/_threadmodule.c.h" diff --git a/Modules/clinic/_threadmodule.c.h b/Modules/clinic/_threadmodule.c.h index c192a7752ecaec..a4481f9888f852 100644 --- a/Modules/clinic/_threadmodule.c.h +++ b/Modules/clinic/_threadmodule.c.h @@ -21,7 +21,7 @@ static PyObject * _thread_iter_locked(PyTypeObject *type, PyObject *args, PyObject *kwargs) { PyObject *return_value = NULL; - PyTypeObject *base_tp = find_state_by_type(type)->iter_locked_type; + PyTypeObject *base_tp = clinic_state()->iter_locked_type; PyObject *iterable; if ((type == base_tp || type->tp_init == base_tp->tp_init) && @@ -179,4 +179,4 @@ _thread_set_name(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyOb #ifndef _THREAD_SET_NAME_METHODDEF #define _THREAD_SET_NAME_METHODDEF #endif /* !defined(_THREAD_SET_NAME_METHODDEF) */ -/*[clinic end generated code: output=4b4734946ddeabac input=a9049054013a1b77]*/ +/*[clinic end generated code: output=74504384dbd84291 input=a9049054013a1b77]*/ From bb98c5213ed745303d7b6ce80ed2a678bf626a59 Mon Sep 17 00:00:00 2001 From: Pieter Eendebak Date: Sun, 1 Jun 2025 22:57:56 +0200 Subject: [PATCH 10/10] reduce duration of test --- Lib/test/test_free_threading/test_threading_iter_locked.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_free_threading/test_threading_iter_locked.py b/Lib/test/test_free_threading/test_threading_iter_locked.py index d43e1727189d76..30c26b328b5d76 100644 --- a/Lib/test/test_free_threading/test_threading_iter_locked.py +++ b/Lib/test/test_free_threading/test_threading_iter_locked.py @@ -16,7 +16,9 @@ def __iter__(self): def __next__(self): a = next(self.it) - time.sleep(0) + t = time.perf_counter() + 1e-6 + while time.perf_counter() < t: + pass b = next(self.it) return a, b @@ -31,7 +33,7 @@ class iter_lockedThreading(unittest.TestCase): @threading_helper.reap_threads def test_iter_locked(self): number_of_threads = 10 - number_of_iterations = 10 + number_of_iterations = 8 barrier = Barrier(number_of_threads) def work(it): while True: