Skip to content

gh-115999: Make list and tuple iteration more thread-safe. #128637

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions Lib/test/test_free_threading/test_iteration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import threading
import unittest
from test import support

# The race conditions these tests were written for only happen every now and
# then, even with the current numbers. To find rare race conditions, bumping
# these up will help, but it makes the test runtime highly variable under
# free-threading. Overhead is much higher under ThreadSanitizer, but it's
# also much better at detecting certain races, so we don't need as many
# items/threads.
if support.check_sanitizer(thread=True):
NUMITEMS = 1000
NUMTHREADS = 2
else:
NUMITEMS = 100000
NUMTHREADS = 5
NUMMUTATORS = 2

class ContendedTupleIterationTest(unittest.TestCase):
def make_testdata(self, n):
return tuple(range(n))

def assert_iterator_results(self, results, expected):
# Most iterators are not atomic (yet?) so they can skip or duplicate
# items, but they should not invent new items (like the range
# iterator currently does).
extra_items = set(results) - set(expected)
self.assertEqual(set(), extra_items)

def run_threads(self, func, *args, numthreads=NUMTHREADS):
threads = []
for _ in range(numthreads):
t = threading.Thread(target=func, args=args)
t.start()
threads.append(t)
return threads

def test_iteration(self):
"""Test iteration over a shared container"""
seq = self.make_testdata(NUMITEMS)
results = []
start = threading.Barrier(NUMTHREADS)
def worker():
idx = 0
start.wait()
for item in seq:
idx += 1
results.append(idx)
threads = self.run_threads(worker)
for t in threads:
t.join()
# Each thread has its own iterator, so results should be entirely predictable.
self.assertEqual(results, [NUMITEMS] * NUMTHREADS)

def test_shared_iterator(self):
"""Test iteration over a shared iterator"""
seq = self.make_testdata(NUMITEMS)
it = iter(seq)
results = []
start = threading.Barrier(NUMTHREADS)
def worker():
items = []
start.wait()
# We want a tight loop, so put items in the shared list at the end.
for item in it:
items.append(item)
results.extend(items)
threads = self.run_threads(worker)
for t in threads:
t.join()
self.assert_iterator_results(results, seq)

class ContendedListIterationTest(ContendedTupleIterationTest):
def make_testdata(self, n):
return list(range(n))

def test_iteration_while_mutating(self):
"""Test iteration over a shared mutating container."""
seq = self.make_testdata(NUMITEMS)
results = []
start = threading.Barrier(NUMTHREADS + NUMMUTATORS)
endmutate = threading.Event()
def mutator():
orig = seq[:]
# Make changes big enough to cause resizing of the list, with
# items shifted around for good measure.
replacement = (orig * 3)[NUMITEMS//2:]
start.wait()
while not endmutate.is_set():
seq.extend(replacement)
seq[:0] = orig
seq.__imul__(2)
seq.extend(seq)
seq[:] = orig
def worker():
items = []
start.wait()
# We want a tight loop, so put items in the shared list at the end.
for item in seq:
items.append(item)
results.extend(items)
mutators = ()
try:
threads = self.run_threads(worker)
mutators = self.run_threads(mutator, numthreads=NUMMUTATORS)
for t in threads:
t.join()
finally:
endmutate.set()
for m in mutators:
m.join()
self.assert_iterator_results(results, list(seq))


class ContendedRangeIterationTest(ContendedTupleIterationTest):
def make_testdata(self, n):
return range(n)

def assert_iterator_results(self, results, expected):
# Range iterators that are shared between threads will (right now)
# sometimes produce items after the end of the range, sometimes
# _far_ after the end of the range. That should be fixed, but for
# now, let's just check they're integers that could have resulted
# from stepping beyond the range bounds.
extra_items = set(results) - set(expected)
for item in extra_items:
self.assertEqual((item - expected.start) % expected.step, 0)
28 changes: 18 additions & 10 deletions Objects/listobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ list_get_item_ref(PyListObject *op, Py_ssize_t i)
return NULL;
}
Py_ssize_t cap = list_capacity(ob_item);
assert(cap != -1 && cap >= size);
assert(cap != -1);
if (!valid_index(i, cap)) {
return NULL;
}
Expand Down Expand Up @@ -784,7 +784,8 @@ list_repeat_lock_held(PyListObject *a, Py_ssize_t n)
_Py_RefcntAdd(*src, n);
*dest++ = *src++;
}

// TODO: _Py_memory_repeat calls are not safe for shared lists in
// GIL_DISABLED builds. (See issue #129069)
_Py_memory_repeat((char *)np->ob_item, sizeof(PyObject *)*output_size,
sizeof(PyObject *)*input_size);
}
Expand Down Expand Up @@ -919,6 +920,8 @@ list_ass_slice_lock_held(PyListObject *a, Py_ssize_t ilow, Py_ssize_t ihigh, PyO
if (d < 0) { /* Delete -d items */
Py_ssize_t tail;
tail = (Py_SIZE(a) - ihigh) * sizeof(PyObject *);
// TODO: these memmove/memcpy calls are not safe for shared lists in
// GIL_DISABLED builds. (See issue #129069)
memmove(&item[ihigh+d], &item[ihigh], tail);
if (list_resize(a, Py_SIZE(a) + d) < 0) {
memmove(&item[ihigh], &item[ihigh+d], tail);
Expand All @@ -932,12 +935,14 @@ list_ass_slice_lock_held(PyListObject *a, Py_ssize_t ilow, Py_ssize_t ihigh, PyO
if (list_resize(a, k+d) < 0)
goto Error;
item = a->ob_item;
// TODO: these memmove/memcpy calls are not safe for shared lists in
// GIL_DISABLED builds. (See issue #129069)
memmove(&item[ihigh+d], &item[ihigh],
(k - ihigh)*sizeof(PyObject *));
}
for (k = 0; k < n; k++, ilow++) {
PyObject *w = vitem[k];
item[ilow] = Py_XNewRef(w);
FT_ATOMIC_STORE_PTR_RELEASE(item[ilow], Py_XNewRef(w));
}
for (k = norig - 1; k >= 0; --k)
Py_XDECREF(recycle[k]);
Expand Down Expand Up @@ -1017,6 +1022,8 @@ list_inplace_repeat_lock_held(PyListObject *self, Py_ssize_t n)
for (Py_ssize_t j = 0; j < input_size; j++) {
_Py_RefcntAdd(items[j], n-1);
}
// TODO: _Py_memory_repeat calls are not safe for shared lists in
// GIL_DISABLED builds. (See issue #129069)
_Py_memory_repeat((char *)items, sizeof(PyObject *)*output_size,
sizeof(PyObject *)*input_size);
return 0;
Expand Down Expand Up @@ -3993,7 +4000,7 @@ listiter_setstate(PyObject *self, PyObject *state)
index = -1;
else if (index > PyList_GET_SIZE(it->it_seq))
index = PyList_GET_SIZE(it->it_seq); /* iterator exhausted */
it->it_index = index;
FT_ATOMIC_STORE_SSIZE_RELAXED(it->it_index, index);
}
Py_RETURN_NONE;
}
Expand Down Expand Up @@ -4145,7 +4152,7 @@ listreviter_setstate(PyObject *self, PyObject *state)
index = -1;
else if (index > PyList_GET_SIZE(it->it_seq) - 1)
index = PyList_GET_SIZE(it->it_seq) - 1;
it->it_index = index;
FT_ATOMIC_STORE_SSIZE_RELAXED(it->it_index, index);
}
Py_RETURN_NONE;
}
Expand All @@ -4162,18 +4169,19 @@ listiter_reduce_general(void *_it, int forward)
* call must be before access of iterator pointers.
* see issue #101765 */

/* the objects are not the same, index is of different types! */
if (forward) {
iter = _PyEval_GetBuiltin(&_Py_ID(iter));
_PyListIterObject *it = (_PyListIterObject *)_it;
if (it->it_index >= 0) {
return Py_BuildValue("N(O)n", iter, it->it_seq, it->it_index);
Py_ssize_t idx = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index);
if (idx >= 0) {
return Py_BuildValue("N(O)n", iter, it->it_seq, idx);
}
} else {
iter = _PyEval_GetBuiltin(&_Py_ID(reversed));
listreviterobject *it = (listreviterobject *)_it;
if (it->it_index >= 0) {
return Py_BuildValue("N(O)n", iter, it->it_seq, it->it_index);
Py_ssize_t idx = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index);
if (idx >= 0) {
return Py_BuildValue("N(O)n", iter, it->it_seq, idx);
}
}
/* empty iterator, create an empty list */
Expand Down
29 changes: 23 additions & 6 deletions Objects/tupleobject.c
Original file line number Diff line number Diff line change
Expand Up @@ -1014,18 +1014,23 @@ tupleiter_next(PyObject *self)

assert(it != NULL);
seq = it->it_seq;
#ifndef Py_GIL_DISABLED
if (seq == NULL)
return NULL;
#endif
assert(PyTuple_Check(seq));

if (it->it_index < PyTuple_GET_SIZE(seq)) {
item = PyTuple_GET_ITEM(seq, it->it_index);
++it->it_index;
Py_ssize_t index = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index);
if (index < PyTuple_GET_SIZE(seq)) {
FT_ATOMIC_STORE_SSIZE_RELAXED(it->it_index, index + 1);
item = PyTuple_GET_ITEM(seq, index);
return Py_NewRef(item);
}

#ifndef Py_GIL_DISABLED
it->it_seq = NULL;
Py_DECREF(seq);
#endif
return NULL;
}

Expand All @@ -1034,8 +1039,15 @@ tupleiter_len(PyObject *self, PyObject *Py_UNUSED(ignored))
{
_PyTupleIterObject *it = _PyTupleIterObject_CAST(self);
Py_ssize_t len = 0;
#ifdef Py_GIL_DISABLED
Py_ssize_t idx = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index);
Py_ssize_t seq_len = PyTuple_GET_SIZE(it->it_seq);
if (idx < seq_len)
len = seq_len - idx;
#else
if (it->it_seq)
len = PyTuple_GET_SIZE(it->it_seq) - it->it_index;
#endif
return PyLong_FromSsize_t(len);
}

Expand All @@ -1051,10 +1063,15 @@ tupleiter_reduce(PyObject *self, PyObject *Py_UNUSED(ignored))
* see issue #101765 */
_PyTupleIterObject *it = _PyTupleIterObject_CAST(self);

#ifdef Py_GIL_DISABLED
Py_ssize_t idx = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index);
if (idx < PyTuple_GET_SIZE(it->it_seq))
return Py_BuildValue("N(O)n", iter, it->it_seq, idx);
#else
if (it->it_seq)
return Py_BuildValue("N(O)n", iter, it->it_seq, it->it_index);
else
return Py_BuildValue("N(())", iter);
#endif
return Py_BuildValue("N(())", iter);
}

static PyObject *
Expand All @@ -1069,7 +1086,7 @@ tupleiter_setstate(PyObject *self, PyObject *state)
index = 0;
else if (index > PyTuple_GET_SIZE(it->it_seq))
index = PyTuple_GET_SIZE(it->it_seq); /* exhausted iterator */
it->it_index = index;
FT_ATOMIC_STORE_SSIZE_RELAXED(it->it_index, index);
}
Py_RETURN_NONE;
}
Expand Down
11 changes: 9 additions & 2 deletions Tools/tsan/suppressions_free_threading.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ race:set_allocator_unlocked
# https://gist.github.com/swtaarrs/08dfe7883b4c975c31ecb39388987a67
race:free_threadstate


# These warnings trigger directly in a CPython function.

race_top:assign_version_tag
race_top:_multiprocessing_SemLock_acquire_impl
race_top:list_get_item_ref
race_top:_Py_slot_tp_getattr_hook
race_top:add_threadstate
race_top:dump_traceback
Expand Down Expand Up @@ -54,3 +52,12 @@ race_top:update_one_slot

# https://gist.github.com/mpage/6962e8870606cfc960e159b407a0cb40
thread:pthread_create

# Range iteration is not thread-safe yet (issue #129068)
race_top:rangeiter_next

# List resizing happens through different paths ending in memcpy or memmove
# (for efficiency), which will probably need to rewritten as explicit loops
# of ptr-sized copies to be thread-safe. (Issue #129069)
race:list_ass_slice_lock_held
race:list_inplace_repeat_lock_held
Loading