Skip to content

Commit 9b03e9a

Browse files
Merge pull request #1919 from palaviv/threading-stdlib
Convert stdlib object to thread safe
2 parents 7a3df36 + 25913a6 commit 9b03e9a

File tree

6 files changed

+289
-233
lines changed

6 files changed

+289
-233
lines changed

Lib/test/test_array.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,6 @@ def test_iterator_pickle(self):
341341
a.fromlist(data2)
342342
self.assertEqual(list(it), [])
343343

344-
# TODO: RUSTPYTHON
345-
@unittest.expectedFailure
346344
def test_exhausted_iterator(self):
347345
a = array.array(self.typecode, self.example)
348346
self.assertEqual(list(a), list(self.example))

vm/src/stdlib/array.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,11 @@ use crate::pyobject::{
1010
};
1111
use crate::VirtualMachine;
1212

13-
use std::cell::Cell;
1413
use std::fmt;
1514
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
1615

16+
use crossbeam_utils::atomic::AtomicCell;
17+
1718
struct ArrayTypeSpecifierError {
1819
_priv: (),
1920
}
@@ -421,7 +422,7 @@ impl PyArray {
421422
#[pymethod(name = "__iter__")]
422423
fn iter(zelf: PyRef<Self>) -> PyArrayIter {
423424
PyArrayIter {
424-
position: Cell::new(0),
425+
position: AtomicCell::new(0),
425426
array: zelf,
426427
}
427428
}
@@ -430,10 +431,12 @@ impl PyArray {
430431
#[pyclass]
431432
#[derive(Debug)]
432433
pub struct PyArrayIter {
433-
position: Cell<usize>,
434+
position: AtomicCell<usize>,
434435
array: PyArrayRef,
435436
}
436437

438+
impl ThreadSafe for PyArrayIter {}
439+
437440
impl PyValue for PyArrayIter {
438441
fn class(vm: &VirtualMachine) -> PyClassRef {
439442
vm.class("array", "arrayiterator")
@@ -444,14 +447,9 @@ impl PyValue for PyArrayIter {
444447
impl PyArrayIter {
445448
#[pymethod(name = "__next__")]
446449
fn next(&self, vm: &VirtualMachine) -> PyResult {
447-
if self.position.get() < self.array.borrow_value().len() {
448-
let ret = self
449-
.array
450-
.borrow_value()
451-
.getitem_by_idx(self.position.get(), vm)
452-
.unwrap()?;
453-
self.position.set(self.position.get() + 1);
454-
Ok(ret)
450+
let pos = self.position.fetch_add(1);
451+
if let Some(item) = self.array.borrow_value().getitem_by_idx(pos, vm) {
452+
Ok(item?)
455453
} else {
456454
Err(objiter::new_stop_iteration(vm))
457455
}

vm/src/stdlib/collections.rs

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -6,23 +6,27 @@ mod _collections {
66
use crate::obj::{objiter, objtype::PyClassRef};
77
use crate::pyobject::{
88
IdProtocol, PyArithmaticValue::*, PyClassImpl, PyComparisonValue, PyIterable, PyObjectRef,
9-
PyRef, PyResult, PyValue,
9+
PyRef, PyResult, PyValue, ThreadSafe,
1010
};
1111
use crate::sequence;
1212
use crate::vm::ReprGuard;
1313
use crate::VirtualMachine;
1414
use itertools::Itertools;
15-
use std::cell::{Cell, RefCell};
1615
use std::collections::VecDeque;
16+
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
17+
18+
use crossbeam_utils::atomic::AtomicCell;
1719

1820
#[pyclass(name = "deque")]
19-
#[derive(Debug, Clone)]
21+
#[derive(Debug)]
2022
struct PyDeque {
21-
deque: RefCell<VecDeque<PyObjectRef>>,
22-
maxlen: Cell<Option<usize>>,
23+
deque: RwLock<VecDeque<PyObjectRef>>,
24+
maxlen: AtomicCell<Option<usize>>,
2325
}
2426
type PyDequeRef = PyRef<PyDeque>;
2527

28+
impl ThreadSafe for PyDeque {}
29+
2630
impl PyValue for PyDeque {
2731
fn class(vm: &VirtualMachine) -> PyClassRef {
2832
vm.class("_collections", "deque")
@@ -36,8 +40,12 @@ mod _collections {
3640
}
3741

3842
impl PyDeque {
39-
fn borrow_deque<'a>(&'a self) -> impl std::ops::Deref<Target = VecDeque<PyObjectRef>> + 'a {
40-
self.deque.borrow()
43+
fn borrow_deque(&self) -> RwLockReadGuard<'_, VecDeque<PyObjectRef>> {
44+
self.deque.read().unwrap()
45+
}
46+
47+
fn borrow_deque_mut(&self) -> RwLockWriteGuard<'_, VecDeque<PyObjectRef>> {
48+
self.deque.write().unwrap()
4149
}
4250
}
4351

@@ -51,8 +59,8 @@ mod _collections {
5159
vm: &VirtualMachine,
5260
) -> PyResult<PyRef<Self>> {
5361
let py_deque = PyDeque {
54-
deque: RefCell::default(),
55-
maxlen: maxlen.into(),
62+
deque: RwLock::default(),
63+
maxlen: AtomicCell::new(maxlen),
5664
};
5765
if let OptionalArg::Present(iter) = iter {
5866
py_deque.extend(iter, vm)?;
@@ -62,36 +70,39 @@ mod _collections {
6270

6371
#[pymethod]
6472
fn append(&self, obj: PyObjectRef) {
65-
let mut deque = self.deque.borrow_mut();
66-
if self.maxlen.get() == Some(deque.len()) {
73+
let mut deque = self.borrow_deque_mut();
74+
if self.maxlen.load() == Some(deque.len()) {
6775
deque.pop_front();
6876
}
6977
deque.push_back(obj);
7078
}
7179

7280
#[pymethod]
7381
fn appendleft(&self, obj: PyObjectRef) {
74-
let mut deque = self.deque.borrow_mut();
75-
if self.maxlen.get() == Some(deque.len()) {
82+
let mut deque = self.borrow_deque_mut();
83+
if self.maxlen.load() == Some(deque.len()) {
7684
deque.pop_back();
7785
}
7886
deque.push_front(obj);
7987
}
8088

8189
#[pymethod]
8290
fn clear(&self) {
83-
self.deque.borrow_mut().clear()
91+
self.borrow_deque_mut().clear()
8492
}
8593

8694
#[pymethod]
8795
fn copy(&self) -> Self {
88-
self.clone()
96+
PyDeque {
97+
deque: RwLock::new(self.borrow_deque().clone()),
98+
maxlen: AtomicCell::new(self.maxlen.load()),
99+
}
89100
}
90101

91102
#[pymethod]
92103
fn count(&self, obj: PyObjectRef, vm: &VirtualMachine) -> PyResult<usize> {
93104
let mut count = 0;
94-
for elem in self.deque.borrow().iter() {
105+
for elem in self.borrow_deque().iter() {
95106
if vm.identical_or_equal(elem, &obj)? {
96107
count += 1;
97108
}
@@ -124,7 +135,7 @@ mod _collections {
124135
stop: OptionalArg<usize>,
125136
vm: &VirtualMachine,
126137
) -> PyResult<usize> {
127-
let deque = self.deque.borrow();
138+
let deque = self.borrow_deque();
128139
let start = start.unwrap_or(0);
129140
let stop = stop.unwrap_or_else(|| deque.len());
130141
for (i, elem) in deque.iter().skip(start).take(stop - start).enumerate() {
@@ -141,9 +152,9 @@ mod _collections {
141152

142153
#[pymethod]
143154
fn insert(&self, idx: i32, obj: PyObjectRef, vm: &VirtualMachine) -> PyResult<()> {
144-
let mut deque = self.deque.borrow_mut();
155+
let mut deque = self.borrow_deque_mut();
145156

146-
if self.maxlen.get() == Some(deque.len()) {
157+
if self.maxlen.load() == Some(deque.len()) {
147158
return Err(vm.new_index_error("deque already at its maximum size".to_owned()));
148159
}
149160

@@ -166,23 +177,21 @@ mod _collections {
166177

167178
#[pymethod]
168179
fn pop(&self, vm: &VirtualMachine) -> PyResult {
169-
self.deque
170-
.borrow_mut()
180+
self.borrow_deque_mut()
171181
.pop_back()
172182
.ok_or_else(|| vm.new_index_error("pop from an empty deque".to_owned()))
173183
}
174184

175185
#[pymethod]
176186
fn popleft(&self, vm: &VirtualMachine) -> PyResult {
177-
self.deque
178-
.borrow_mut()
187+
self.borrow_deque_mut()
179188
.pop_front()
180189
.ok_or_else(|| vm.new_index_error("pop from an empty deque".to_owned()))
181190
}
182191

183192
#[pymethod]
184193
fn remove(&self, obj: PyObjectRef, vm: &VirtualMachine) -> PyResult {
185-
let mut deque = self.deque.borrow_mut();
194+
let mut deque = self.borrow_deque_mut();
186195
let mut idx = None;
187196
for (i, elem) in deque.iter().enumerate() {
188197
if vm.identical_or_equal(elem, &obj)? {
@@ -196,13 +205,13 @@ mod _collections {
196205

197206
#[pymethod]
198207
fn reverse(&self) {
199-
self.deque
200-
.replace_with(|deque| deque.iter().cloned().rev().collect());
208+
let rev: VecDeque<_> = self.borrow_deque().iter().cloned().rev().collect();
209+
*self.borrow_deque_mut() = rev;
201210
}
202211

203212
#[pymethod]
204213
fn rotate(&self, mid: OptionalArg<isize>) {
205-
let mut deque = self.deque.borrow_mut();
214+
let mut deque = self.borrow_deque_mut();
206215
let mid = mid.unwrap_or(1);
207216
if mid < 0 {
208217
deque.rotate_left(-mid as usize);
@@ -213,26 +222,25 @@ mod _collections {
213222

214223
#[pyproperty]
215224
fn maxlen(&self) -> Option<usize> {
216-
self.maxlen.get()
225+
self.maxlen.load()
217226
}
218227

219228
#[pyproperty(setter)]
220229
fn set_maxlen(&self, maxlen: Option<usize>) {
221-
self.maxlen.set(maxlen);
230+
self.maxlen.store(maxlen);
222231
}
223232

224233
#[pymethod(name = "__repr__")]
225234
fn repr(zelf: PyRef<Self>, vm: &VirtualMachine) -> PyResult<String> {
226235
let repr = if let Some(_guard) = ReprGuard::enter(zelf.as_object()) {
227236
let elements = zelf
228-
.deque
229-
.borrow()
237+
.borrow_deque()
230238
.iter()
231239
.map(|obj| vm.to_repr(obj))
232240
.collect::<Result<Vec<_>, _>>()?;
233241
let maxlen = zelf
234242
.maxlen
235-
.get()
243+
.load()
236244
.map(|maxlen| format!(", maxlen={}", maxlen))
237245
.unwrap_or_default();
238246
format!("deque([{}]{})", elements.into_iter().format(", "), maxlen)
@@ -336,29 +344,29 @@ mod _collections {
336344

337345
#[pymethod(name = "__mul__")]
338346
fn mul(&self, n: isize) -> Self {
339-
let deque: &VecDeque<_> = &self.deque.borrow();
347+
let deque: &VecDeque<_> = &self.borrow_deque();
340348
let mul = sequence::seq_mul(deque, n);
341-
let skipped = if let Some(maxlen) = self.maxlen.get() {
349+
let skipped = if let Some(maxlen) = self.maxlen.load() {
342350
mul.len() - maxlen
343351
} else {
344352
0
345353
};
346354
let deque = mul.skip(skipped).cloned().collect();
347355
PyDeque {
348-
deque: RefCell::new(deque),
349-
maxlen: self.maxlen.clone(),
356+
deque: RwLock::new(deque),
357+
maxlen: AtomicCell::new(self.maxlen.load()),
350358
}
351359
}
352360

353361
#[pymethod(name = "__len__")]
354362
fn len(&self) -> usize {
355-
self.deque.borrow().len()
363+
self.borrow_deque().len()
356364
}
357365

358366
#[pymethod(name = "__iter__")]
359367
fn iter(zelf: PyRef<Self>) -> PyDequeIterator {
360368
PyDequeIterator {
361-
position: Cell::new(0),
369+
position: AtomicCell::new(0),
362370
deque: zelf,
363371
}
364372
}
@@ -367,10 +375,12 @@ mod _collections {
367375
#[pyclass(name = "_deque_iterator")]
368376
#[derive(Debug)]
369377
struct PyDequeIterator {
370-
position: Cell<usize>,
378+
position: AtomicCell<usize>,
371379
deque: PyDequeRef,
372380
}
373381

382+
impl ThreadSafe for PyDequeIterator {}
383+
374384
impl PyValue for PyDequeIterator {
375385
fn class(vm: &VirtualMachine) -> PyClassRef {
376386
vm.class("_collections", "_deque_iterator")
@@ -381,9 +391,10 @@ mod _collections {
381391
impl PyDequeIterator {
382392
#[pymethod(name = "__next__")]
383393
fn next(&self, vm: &VirtualMachine) -> PyResult {
384-
if self.position.get() < self.deque.deque.borrow().len() {
385-
let ret = self.deque.deque.borrow()[self.position.get()].clone();
386-
self.position.set(self.position.get() + 1);
394+
let pos = self.position.fetch_add(1);
395+
let deque = self.deque.borrow_deque();
396+
if pos < deque.len() {
397+
let ret = deque[pos].clone();
387398
Ok(ret)
388399
} else {
389400
Err(objiter::new_stop_iteration(vm))

vm/src/stdlib/csv.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
use std::cell::RefCell;
21
use std::fmt::{self, Debug, Formatter};
2+
use std::sync::RwLock;
33

44
use csv as rust_csv;
55
use itertools::join;
@@ -10,7 +10,7 @@ use crate::obj::objiter;
1010
use crate::obj::objstr::{self, PyString};
1111
use crate::obj::objtype::PyClassRef;
1212
use crate::pyobject::{IntoPyObject, TryFromObject, TypeProtocol};
13-
use crate::pyobject::{PyClassImpl, PyIterable, PyObjectRef, PyRef, PyResult, PyValue};
13+
use crate::pyobject::{PyClassImpl, PyIterable, PyObjectRef, PyRef, PyResult, PyValue, ThreadSafe};
1414
use crate::types::create_type;
1515
use crate::VirtualMachine;
1616

@@ -126,9 +126,11 @@ impl ReadState {
126126

127127
#[pyclass(name = "Reader")]
128128
struct Reader {
129-
state: RefCell<ReadState>,
129+
state: RwLock<ReadState>,
130130
}
131131

132+
impl ThreadSafe for Reader {}
133+
132134
impl Debug for Reader {
133135
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
134136
write!(f, "_csv.reader")
@@ -143,7 +145,7 @@ impl PyValue for Reader {
143145

144146
impl Reader {
145147
fn new(iter: PyIterable<PyObjectRef>, config: ReaderOption) -> Self {
146-
let state = RefCell::new(ReadState::new(iter, config));
148+
let state = RwLock::new(ReadState::new(iter, config));
147149
Reader { state }
148150
}
149151
}
@@ -152,13 +154,13 @@ impl Reader {
152154
impl Reader {
153155
#[pymethod(name = "__iter__")]
154156
fn iter(this: PyRef<Self>, vm: &VirtualMachine) -> PyResult {
155-
this.state.borrow_mut().cast_to_reader(vm)?;
157+
this.state.write().unwrap().cast_to_reader(vm)?;
156158
this.into_pyobject(vm)
157159
}
158160

159161
#[pymethod(name = "__next__")]
160162
fn next(&self, vm: &VirtualMachine) -> PyResult {
161-
let mut state = self.state.borrow_mut();
163+
let mut state = self.state.write().unwrap();
162164
state.cast_to_reader(vm)?;
163165

164166
if let ReadState::CsvIter(ref mut reader) = &mut *state {

0 commit comments

Comments
 (0)