diff --git a/Doc/library/threading.rst b/Doc/library/threading.rst index 7edcdcabdce1df..e5ca2303244e99 100644 --- a/Doc/library/threading.rst +++ b/Doc/library/threading.rst @@ -208,6 +208,48 @@ This module defines the following functions: of the result, even when terminated. +.. function:: iter_locked(iterable) + + Convert an iterable into an iterator that performs iteration using locks. + + The ``iter_locked`` makes non-atomic iterators atomic:: + + class non_atomic_iterator: + + def __init__(self, it): + self.it = iter(it) + + def __iter__(self): + return self + + def __next__(self): + a = next(self.it) + b = next(self.it) + return a, b + + atomic_iterator = iter_locked(non_atomic_iterator()) + + The ``iter_locked`` allows concurrent iteration over generator objects. For example:: + + def count(): + i = 0 + while True: + i += 1 + yield i + concurrent_iterator = iter_locked(count()) + + The implementation is roughly equivalent to:: + + class iter_locked(Iterator): + def __init__(self, it): + self._it = iter(it) + self._lock = Lock() + def __next__(self): + with self._lock: + return next(self._it) + + .. versionadded:: next + .. function:: main_thread() Return the main :class:`Thread` object. In normal conditions, the diff --git a/Lib/test/test_free_threading/test_threading_iter_locked.py b/Lib/test/test_free_threading/test_threading_iter_locked.py new file mode 100644 index 00000000000000..30c26b328b5d76 --- /dev/null +++ b/Lib/test/test_free_threading/test_threading_iter_locked.py @@ -0,0 +1,85 @@ +import time +import unittest +from threading import Thread, Barrier, iter_locked +from test.support import threading_helper + + +threading_helper.requires_working_threading(module=True) + +class non_atomic_iterator: + + def __init__(self, it): + self.it = iter(it) + + def __iter__(self): + return self + + def __next__(self): + a = next(self.it) + t = time.perf_counter() + 1e-6 + while time.perf_counter() < t: + pass + b = next(self.it) + return a, b + +def count(): + i = 0 + while True: + i += 1 + yield i + +class iter_lockedThreading(unittest.TestCase): + + @threading_helper.reap_threads + def test_iter_locked(self): + number_of_threads = 10 + number_of_iterations = 8 + barrier = Barrier(number_of_threads) + def work(it): + while True: + try: + a, b = next(it) + assert a + 1 == b + except StopIteration: + break + + data = tuple(range(400)) + for it in range(number_of_iterations): + iter_locked_iterator = iter_locked(non_atomic_iterator(data,)) + worker_threads = [] + for ii in range(number_of_threads): + worker_threads.append( + Thread(target=work, args=[iter_locked_iterator])) + + with threading_helper.start_threads(worker_threads): + pass + + barrier.reset() + + @threading_helper.reap_threads + def test_iter_locked_generator(self): + number_of_threads = 5 + number_of_iterations = 4 + barrier = Barrier(number_of_threads) + def work(it): + barrier.wait() + for _ in range(1_000): + try: + next(it) + except StopIteration: + break + + for it in range(number_of_iterations): + generator = iter_locked(count()) + worker_threads = [] + for ii in range(number_of_threads): + worker_threads.append( + Thread(target=work, args=[generator])) + + with threading_helper.start_threads(worker_threads): + pass + + barrier.reset() + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py index 59b3a749d2fffa..e92b037382bc5a 100644 --- a/Lib/test/test_threading.py +++ b/Lib/test/test_threading.py @@ -2447,6 +2447,15 @@ def run_last(): self.assertIn("RuntimeError: can't register atexit after shutdown", err.decode()) +class IterLockedTests(unittest.TestCase): + + def test_iter_locked(self): + for s in ("123", [], [1, 2, 3], tuple(), (1, 2, 3)): + expected = list(s) + actual = list(threading.iter_locked(s)) + self.assertEqual(actual, expected) + for arg in [1, None, True, sys]: + self.assertRaises(TypeError, threading.iter_locked, arg) if __name__ == "__main__": unittest.main() diff --git a/Lib/threading.py b/Lib/threading.py index b6c451d1fbaabd..63753f8675cb7f 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -29,7 +29,7 @@ 'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError', 'setprofile', 'settrace', 'local', 'stack_size', 'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile', - 'setprofile_all_threads','settrace_all_threads'] + 'setprofile_all_threads','settrace_all_threads', 'iter_locked'] # Rename some stuff so "from threading import *" is safe _start_joinable_thread = _thread.start_joinable_thread @@ -42,6 +42,7 @@ get_ident = _thread.get_ident _get_main_thread_ident = _thread._get_main_thread_ident _is_main_interpreter = _thread._is_main_interpreter +iter_locked = _thread.iter_locked try: get_native_id = _thread.get_native_id _HAVE_THREAD_NATIVE_ID = True diff --git a/Misc/NEWS.d/next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst b/Misc/NEWS.d/next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst new file mode 100644 index 00000000000000..c415cd8ece6202 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-05-11-19-54-05.gh-issue-124397.MwtP6-.rst @@ -0,0 +1 @@ +Add :meth:`threading.iter_locked` to make concurrent iteration over an iterable execute using a lock. diff --git a/Modules/_threadmodule.c b/Modules/_threadmodule.c index 150a266b521736..f092b2f7c422ad 100644 --- a/Modules/_threadmodule.c +++ b/Modules/_threadmodule.c @@ -27,6 +27,7 @@ static struct PyModuleDef thread_module; // Module state typedef struct { PyTypeObject *excepthook_type; + PyTypeObject *iter_locked_type; PyTypeObject *lock_type; PyTypeObject *rlock_type; PyTypeObject *local_type; @@ -68,10 +69,12 @@ static PF_SET_THREAD_DESCRIPTION pSetThreadDescription = NULL; /*[clinic input] module _thread +class _thread.iter_locked "iter_locked_object *" "clinic_state()->iter_locked_type" class _thread.lock "lockobject *" "clinic_state()->lock_type" class _thread.RLock "rlockobject *" "clinic_state()->rlock_type" + [clinic start generated code]*/ -/*[clinic end generated code: output=da39a3ee5e6b4b0d input=c5a0f8c492a0c263]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=1a4bf65233f83eae]*/ #define clinic_state() get_thread_state_by_cls(type) #include "clinic/_threadmodule.c.h" @@ -751,6 +754,103 @@ static PyType_Spec ThreadHandle_Type_spec = { ThreadHandle_Type_slots, }; +/* iter_locked object **************************************************************/ + +typedef struct { + PyObject_HEAD + PyObject *it; + PyMutex lock; +} iter_locked_object; + +#define iter_locked_object_CAST(op) ((iter_locked_object *)(op)) + +/*[clinic input] +@classmethod +_thread.iter_locked.__new__ + iterable: object + / +Make an iterator thread-safe. +[clinic start generated code]*/ + +static PyObject * +_thread_iter_locked_impl(PyTypeObject *type, PyObject *iterable) +/*[clinic end generated code: output=4a8ad5a25f7c09ba input=ae6124177726e809]*/ +{ + /* Get iterator. */ + PyObject *it = PyObject_GetIter(iterable); + if (it == NULL) + return NULL; + + iter_locked_object *il = (iter_locked_object *)type->tp_alloc(type, 0); + if (il == NULL) { + Py_DECREF(it); + return NULL; + } + il->it = it; + il->lock = (PyMutex){0}; + + return (PyObject *)il; +} + +static void +iter_locked_dealloc(PyObject *op) +{ + iter_locked_object *il = iter_locked_object_CAST(op); + PyTypeObject *tp = Py_TYPE(il); + PyObject_GC_UnTrack(il); + Py_DECREF(il->it); + tp->tp_free(il); + Py_DECREF(tp); +} + +static int +iter_locked_traverse(PyObject *op, visitproc visit, void *arg) +{ + iter_locked_object *lz = iter_locked_object_CAST(op); + Py_VISIT(Py_TYPE(lz)); + Py_VISIT(lz->it); + return 0; +} + +static PyObject * +iter_locked_next(PyObject *op) +{ + iter_locked_object *lz = iter_locked_object_CAST(op); + PyObject *result = NULL; + + // we cannot use Py_BEGIN_CRITICAL_SECTION as it is not available in the normal build + PyMutex_Lock(&(lz->lock)); + + int v = PyIter_NextItem(lz->it, &result); + if (v == -1) { + /* Note: StopIteration is already cleared by PyIter_Next() */ + /* If PyErr_Occurred() we will also return NULL*/ + } + PyMutex_Unlock(&(lz->lock)); + + return result; +} + +static PyType_Slot iter_locked_slots[] = { + {Py_tp_dealloc, iter_locked_dealloc}, + {Py_tp_getattro, PyObject_GenericGetAttr}, + {Py_tp_doc, (void *)_thread_iter_locked__doc__}, + {Py_tp_traverse, iter_locked_traverse}, + {Py_tp_iter, PyObject_SelfIter}, + {Py_tp_iternext, iter_locked_next}, + {Py_tp_new, _thread_iter_locked}, + {Py_tp_free, PyObject_GC_Del}, + {0, NULL}, +}; + +static PyType_Spec iter_locked_spec = { + .name = "threading.iter_locked", + .basicsize = sizeof(iter_locked_object), + .flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE | + Py_TPFLAGS_IMMUTABLETYPE), + .slots = iter_locked_slots, +}; + /* Lock objects */ typedef struct { @@ -2652,6 +2752,15 @@ thread_module_exec(PyObject *module) return -1; } + // iter_locked + state->iter_locked_type = (PyTypeObject *)PyType_FromModuleAndSpec(module, &iter_locked_spec, NULL); + if (state->iter_locked_type == NULL) { + return -1; + } + if (PyModule_AddType(module, state->iter_locked_type) < 0) { + return -1; + } + // Lock state->lock_type = (PyTypeObject *)PyType_FromModuleAndSpec(module, &lock_type_spec, NULL); if (state->lock_type == NULL) { @@ -2758,6 +2867,7 @@ thread_module_traverse(PyObject *module, visitproc visit, void *arg) { thread_module_state *state = get_thread_state(module); Py_VISIT(state->excepthook_type); + Py_VISIT(state->iter_locked_type); Py_VISIT(state->lock_type); Py_VISIT(state->rlock_type); Py_VISIT(state->local_type); @@ -2771,6 +2881,7 @@ thread_module_clear(PyObject *module) { thread_module_state *state = get_thread_state(module); Py_CLEAR(state->excepthook_type); + Py_CLEAR(state->iter_locked_type); Py_CLEAR(state->lock_type); Py_CLEAR(state->rlock_type); Py_CLEAR(state->local_type); diff --git a/Modules/clinic/_threadmodule.c.h b/Modules/clinic/_threadmodule.c.h index fd8e32a2261d38..a4481f9888f852 100644 --- a/Modules/clinic/_threadmodule.c.h +++ b/Modules/clinic/_threadmodule.c.h @@ -6,7 +6,37 @@ preserve # include "pycore_gc.h" // PyGC_Head # include "pycore_runtime.h" // _Py_ID() #endif -#include "pycore_modsupport.h" // _PyArg_NoKeywords() +#include "pycore_modsupport.h" // _PyArg_CheckPositional() + +PyDoc_STRVAR(_thread_iter_locked__doc__, +"iter_locked(iterable, /)\n" +"--\n" +"\n" +"Make an iterator thread-safe."); + +static PyObject * +_thread_iter_locked_impl(PyTypeObject *type, PyObject *iterable); + +static PyObject * +_thread_iter_locked(PyTypeObject *type, PyObject *args, PyObject *kwargs) +{ + PyObject *return_value = NULL; + PyTypeObject *base_tp = clinic_state()->iter_locked_type; + PyObject *iterable; + + if ((type == base_tp || type->tp_init == base_tp->tp_init) && + !_PyArg_NoKeywords("iter_locked", kwargs)) { + goto exit; + } + if (!_PyArg_CheckPositional("iter_locked", PyTuple_GET_SIZE(args), 1, 1)) { + goto exit; + } + iterable = PyTuple_GET_ITEM(args, 0); + return_value = _thread_iter_locked_impl(type, iterable); + +exit: + return return_value; +} static PyObject * lock_new_impl(PyTypeObject *type); @@ -149,4 +179,4 @@ _thread_set_name(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyOb #ifndef _THREAD_SET_NAME_METHODDEF #define _THREAD_SET_NAME_METHODDEF #endif /* !defined(_THREAD_SET_NAME_METHODDEF) */ -/*[clinic end generated code: output=b381ec5e313198e7 input=a9049054013a1b77]*/ +/*[clinic end generated code: output=74504384dbd84291 input=a9049054013a1b77]*/