Skip to content

gh-124397: Add threading.iter_locked #133908

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions Doc/library/threading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,48 @@ This module defines the following functions:
of the result, even when terminated.


.. function:: iter_locked(iterable)

Convert an iterable into an iterator that performs iteration using locks.

The ``iter_locked`` makes non-atomic iterators atomic::

class non_atomic_iterator:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure non_atomic_iterator is a good example of something that exists in the wild, because it's not thread-safe on GIL-ful builds either. The thread can arbitrarily switch during execution of Python code, so this would need a lock anyway if concurrent iteration were a use case. For free-threading, __next__ itself should have its own locking to prevent races. How was this design reached?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The iter_locked allows the non_atomic_iterator to be used in a threaded-setting, whether in the normal or FT build. Do you have suggestions for other tests?


def __init__(self, it):
self.it = iter(it)

def __iter__(self):
return self

def __next__(self):
a = next(self.it)
b = next(self.it)
return a, b

atomic_iterator = iter_locked(non_atomic_iterator())

The ``iter_locked`` allows concurrent iteration over generator objects. For example::

def count():
i = 0
while True:
i += 1
yield i
concurrent_iterator = iter_locked(count())

The implementation is roughly equivalent to::

class iter_locked(Iterator):
def __init__(self, it):
self._it = iter(it)
self._lock = Lock()
def __next__(self):
with self._lock:
return next(self._it)

.. versionadded:: next

.. function:: main_thread()

Return the main :class:`Thread` object. In normal conditions, the
Expand Down
85 changes: 85 additions & 0 deletions Lib/test/test_free_threading/test_threading_iter_locked.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import time
import unittest
from threading import Thread, Barrier, iter_locked
from test.support import threading_helper


threading_helper.requires_working_threading(module=True)

class non_atomic_iterator:

def __init__(self, it):
self.it = iter(it)

def __iter__(self):
return self

def __next__(self):
a = next(self.it)
t = time.perf_counter() + 1e-6
while time.perf_counter() < t:
pass
b = next(self.it)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add time.sleep(0) (or with small non-zero argument) or Event.wait() with a timeout to force or significantly increase the chance of concurrent access.

return a, b

def count():
i = 0
while True:
i += 1
yield i

class iter_lockedThreading(unittest.TestCase):

@threading_helper.reap_threads
def test_iter_locked(self):
number_of_threads = 10
number_of_iterations = 8
barrier = Barrier(number_of_threads)
def work(it):
while True:
try:
a, b = next(it)
assert a + 1 == b
except StopIteration:
break

data = tuple(range(400))
for it in range(number_of_iterations):
iter_locked_iterator = iter_locked(non_atomic_iterator(data,))
worker_threads = []
for ii in range(number_of_threads):
worker_threads.append(
Thread(target=work, args=[iter_locked_iterator]))

with threading_helper.start_threads(worker_threads):
pass

barrier.reset()

@threading_helper.reap_threads
def test_iter_locked_generator(self):
number_of_threads = 5
number_of_iterations = 4
barrier = Barrier(number_of_threads)
def work(it):
barrier.wait()
for _ in range(1_000):
try:
next(it)
except StopIteration:
break

for it in range(number_of_iterations):
generator = iter_locked(count())
worker_threads = []
for ii in range(number_of_threads):
worker_threads.append(
Thread(target=work, args=[generator]))

with threading_helper.start_threads(worker_threads):
pass

barrier.reset()

if __name__ == "__main__":
unittest.main()
9 changes: 9 additions & 0 deletions Lib/test/test_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -2447,6 +2447,15 @@ def run_last():
self.assertIn("RuntimeError: can't register atexit after shutdown",
err.decode())

class IterLockedTests(unittest.TestCase):

def test_iter_locked(self):
for s in ("123", [], [1, 2, 3], tuple(), (1, 2, 3)):
expected = list(s)
actual = list(threading.iter_locked(s))
self.assertEqual(actual, expected)
for arg in [1, None, True, sys]:
self.assertRaises(TypeError, threading.iter_locked, arg)

if __name__ == "__main__":
unittest.main()
3 changes: 2 additions & 1 deletion Lib/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
'Barrier', 'BrokenBarrierError', 'Timer', 'ThreadError',
'setprofile', 'settrace', 'local', 'stack_size',
'excepthook', 'ExceptHookArgs', 'gettrace', 'getprofile',
'setprofile_all_threads','settrace_all_threads']
'setprofile_all_threads','settrace_all_threads', 'iter_locked']

# Rename some stuff so "from threading import *" is safe
_start_joinable_thread = _thread.start_joinable_thread
Expand All @@ -42,6 +42,7 @@
get_ident = _thread.get_ident
_get_main_thread_ident = _thread._get_main_thread_ident
_is_main_interpreter = _thread._is_main_interpreter
iter_locked = _thread.iter_locked
try:
get_native_id = _thread.get_native_id
_HAVE_THREAD_NATIVE_ID = True
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add :meth:`threading.iter_locked` to make concurrent iteration over an iterable execute using a lock.
113 changes: 112 additions & 1 deletion Modules/_threadmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ static struct PyModuleDef thread_module;
// Module state
typedef struct {
PyTypeObject *excepthook_type;
PyTypeObject *iter_locked_type;
PyTypeObject *lock_type;
PyTypeObject *rlock_type;
PyTypeObject *local_type;
Expand Down Expand Up @@ -68,10 +69,12 @@ static PF_SET_THREAD_DESCRIPTION pSetThreadDescription = NULL;

/*[clinic input]
module _thread
class _thread.iter_locked "iter_locked_object *" "clinic_state()->iter_locked_type"
class _thread.lock "lockobject *" "clinic_state()->lock_type"
class _thread.RLock "rlockobject *" "clinic_state()->rlock_type"

[clinic start generated code]*/
/*[clinic end generated code: output=da39a3ee5e6b4b0d input=c5a0f8c492a0c263]*/
/*[clinic end generated code: output=da39a3ee5e6b4b0d input=1a4bf65233f83eae]*/

#define clinic_state() get_thread_state_by_cls(type)
#include "clinic/_threadmodule.c.h"
Expand Down Expand Up @@ -751,6 +754,103 @@ static PyType_Spec ThreadHandle_Type_spec = {
ThreadHandle_Type_slots,
};

/* iter_locked object **************************************************************/

typedef struct {
PyObject_HEAD
PyObject *it;
PyMutex lock;
} iter_locked_object;

#define iter_locked_object_CAST(op) ((iter_locked_object *)(op))

/*[clinic input]
@classmethod
_thread.iter_locked.__new__
iterable: object
/
Make an iterator thread-safe.
[clinic start generated code]*/

static PyObject *
_thread_iter_locked_impl(PyTypeObject *type, PyObject *iterable)
/*[clinic end generated code: output=4a8ad5a25f7c09ba input=ae6124177726e809]*/
{
/* Get iterator. */
PyObject *it = PyObject_GetIter(iterable);
if (it == NULL)
return NULL;

iter_locked_object *il = (iter_locked_object *)type->tp_alloc(type, 0);
if (il == NULL) {
Py_DECREF(it);
return NULL;
}
il->it = it;
il->lock = (PyMutex){0};

return (PyObject *)il;
}

static void
iter_locked_dealloc(PyObject *op)
{
iter_locked_object *il = iter_locked_object_CAST(op);
PyTypeObject *tp = Py_TYPE(il);
PyObject_GC_UnTrack(il);
Py_DECREF(il->it);
tp->tp_free(il);
Py_DECREF(tp);
}

static int
iter_locked_traverse(PyObject *op, visitproc visit, void *arg)
{
iter_locked_object *lz = iter_locked_object_CAST(op);
Py_VISIT(Py_TYPE(lz));
Py_VISIT(lz->it);
return 0;
}

static PyObject *
iter_locked_next(PyObject *op)
{
iter_locked_object *lz = iter_locked_object_CAST(op);
PyObject *result = NULL;

// we cannot use Py_BEGIN_CRITICAL_SECTION as it is not available in the normal build
PyMutex_Lock(&(lz->lock));

int v = PyIter_NextItem(lz->it, &result);
if (v == -1) {
/* Note: StopIteration is already cleared by PyIter_Next() */
/* If PyErr_Occurred() we will also return NULL*/
}
PyMutex_Unlock(&(lz->lock));

return result;
}

static PyType_Slot iter_locked_slots[] = {
{Py_tp_dealloc, iter_locked_dealloc},
{Py_tp_getattro, PyObject_GenericGetAttr},
{Py_tp_doc, (void *)_thread_iter_locked__doc__},
{Py_tp_traverse, iter_locked_traverse},
{Py_tp_iter, PyObject_SelfIter},
{Py_tp_iternext, iter_locked_next},
{Py_tp_new, _thread_iter_locked},
{Py_tp_free, PyObject_GC_Del},
{0, NULL},
};

static PyType_Spec iter_locked_spec = {
.name = "threading.iter_locked",
.basicsize = sizeof(iter_locked_object),
.flags = (Py_TPFLAGS_DEFAULT | Py_TPFLAGS_HAVE_GC | Py_TPFLAGS_BASETYPE |
Py_TPFLAGS_IMMUTABLETYPE),
.slots = iter_locked_slots,
};

/* Lock objects */

typedef struct {
Expand Down Expand Up @@ -2652,6 +2752,15 @@ thread_module_exec(PyObject *module)
return -1;
}

// iter_locked
state->iter_locked_type = (PyTypeObject *)PyType_FromModuleAndSpec(module, &iter_locked_spec, NULL);
if (state->iter_locked_type == NULL) {
return -1;
}
if (PyModule_AddType(module, state->iter_locked_type) < 0) {
return -1;
}

// Lock
state->lock_type = (PyTypeObject *)PyType_FromModuleAndSpec(module, &lock_type_spec, NULL);
if (state->lock_type == NULL) {
Expand Down Expand Up @@ -2758,6 +2867,7 @@ thread_module_traverse(PyObject *module, visitproc visit, void *arg)
{
thread_module_state *state = get_thread_state(module);
Py_VISIT(state->excepthook_type);
Py_VISIT(state->iter_locked_type);
Py_VISIT(state->lock_type);
Py_VISIT(state->rlock_type);
Py_VISIT(state->local_type);
Expand All @@ -2771,6 +2881,7 @@ thread_module_clear(PyObject *module)
{
thread_module_state *state = get_thread_state(module);
Py_CLEAR(state->excepthook_type);
Py_CLEAR(state->iter_locked_type);
Py_CLEAR(state->lock_type);
Py_CLEAR(state->rlock_type);
Py_CLEAR(state->local_type);
Expand Down
34 changes: 32 additions & 2 deletions Modules/clinic/_threadmodule.c.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading