Skip to content

Commit bb69eda

Browse files
committed
Update the _thread module to have actual mutexes
1 parent 4647731 commit bb69eda

File tree

4 files changed

+240
-35
lines changed

4 files changed

+240
-35
lines changed

Cargo.lock

Lines changed: 40 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Lib/logging/__init__.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@
3737
'warn', 'warning', 'getLogRecordFactory', 'setLogRecordFactory',
3838
'lastResort', 'raiseExceptions']
3939

40-
# TODO: import threading
41-
import _thread
40+
import threading
4241

4342
__author__ = "Vinay Sajip <vinay_sajip@red-dove.com>"
4443
__status__ = "production"
@@ -208,7 +207,7 @@ def _checkLevel(level):
208207
#the lock would already have been acquired - so we need an RLock.
209208
#The same argument applies to Loggers and Manager.loggerDict.
210209
#
211-
_lock = _thread.RLock()
210+
_lock = threading.RLock()
212211

213212
def _acquireLock():
214213
"""
@@ -844,7 +843,7 @@ def createLock(self):
844843
"""
845844
Acquire a thread lock for serializing access to the underlying I/O.
846845
"""
847-
self.lock = _thread.RLock()
846+
self.lock = threading.RLock()
848847
_register_at_fork_acquire_release(self)
849848

850849
def acquire(self):

vm/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ smallbox = "0.8"
7171
bstr = "0.2.12"
7272
crossbeam-utils = "0.7"
7373
generational-arena = "0.2"
74+
parking_lot = "0.10"
7475

7576
## unicode stuff
7677
unicode_names2 = "0.4"

vm/src/stdlib/thread.rs

Lines changed: 196 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
1-
/// Implementation of the _thread module, currently noop implementation as RustPython doesn't yet
2-
/// support threading
1+
/// Implementation of the _thread module
32
use crate::function::PyFuncArgs;
4-
use crate::pyobject::{PyObjectRef, PyResult};
3+
use crate::obj::objtype::PyClassRef;
4+
use crate::pyobject::{PyClassImpl, PyObjectRef, PyResult, PyValue};
55
use crate::vm::VirtualMachine;
66

7+
use parking_lot::{
8+
lock_api::{GetThreadId, RawMutex as RawMutexT, RawMutexTimed},
9+
RawMutex, RawThreadId,
10+
};
11+
use std::cell::Cell;
12+
use std::fmt;
13+
use std::sync::atomic::{AtomicUsize, Ordering};
14+
use std::time::Duration;
15+
716
#[cfg(not(target_os = "windows"))]
817
const PY_TIMEOUT_MAX: isize = std::isize::MAX;
918

@@ -12,49 +21,205 @@ const PY_TIMEOUT_MAX: isize = 0xffffffff * 1_000_000;
1221

1322
const TIMEOUT_MAX: f64 = (PY_TIMEOUT_MAX / 1_000_000_000) as f64;
1423

15-
fn rlock_acquire(vm: &VirtualMachine, _args: PyFuncArgs) -> PyResult {
16-
Ok(vm.get_none())
24+
#[pyimpl]
25+
trait LockProtocol: PyValue {
26+
type RawMutex: RawMutexT + RawMutexTimed<Duration = Duration>;
27+
fn mutex(&self) -> &Self::RawMutex;
28+
29+
#[pymethod]
30+
#[pymethod(name = "acquire_lock")]
31+
#[pymethod(name = "__enter__")]
32+
fn acquire(&self, args: AcquireArgs, vm: &VirtualMachine) -> PyResult<bool> {
33+
let mu = self.mutex();
34+
match args.waitflag {
35+
true if args.timeout == -1.0 => {
36+
mu.lock();
37+
Ok(true)
38+
}
39+
true if args.timeout < 0.0 => {
40+
Err(vm.new_value_error("timeout value must be positive".to_owned()))
41+
}
42+
true => Ok(mu.try_lock_for(Duration::from_secs_f64(args.timeout))),
43+
false if args.timeout != -1.0 => {
44+
Err(vm
45+
.new_value_error("can't specify a timeout for a non-blocking call".to_owned()))
46+
}
47+
false => Ok(mu.try_lock()),
48+
}
49+
}
50+
#[pymethod]
51+
#[pymethod(name = "release_lock")]
52+
fn release(&self) {
53+
self.mutex().unlock()
54+
}
55+
56+
#[pymethod(magic)]
57+
fn exit(&self, _args: PyFuncArgs) {
58+
self.release()
59+
}
60+
}
61+
#[derive(FromArgs)]
62+
struct AcquireArgs {
63+
#[pyarg(positional_or_keyword, default = "true")]
64+
waitflag: bool,
65+
#[pyarg(positional_or_keyword, default = "-1.0")]
66+
timeout: f64,
67+
}
68+
69+
#[pyclass(name = "lock")]
70+
struct PyLock {
71+
mu: RawMutex,
72+
}
73+
74+
impl PyValue for PyLock {
75+
fn class(vm: &VirtualMachine) -> PyClassRef {
76+
vm.class("_thread", "LockType")
77+
}
78+
}
79+
80+
impl fmt::Debug for PyLock {
81+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
82+
f.pad("PyLock")
83+
}
84+
}
85+
86+
impl LockProtocol for PyLock {
87+
type RawMutex = RawMutex;
88+
fn mutex(&self) -> &RawMutex {
89+
&self.mu
90+
}
91+
}
92+
93+
#[pyimpl(with(LockProtocol))]
94+
impl PyLock {
95+
// TODO: locked(), might require something to change in parking_lot
96+
}
97+
98+
// Copied from lock_api
99+
// TODO: open a PR to make this public in lock_api
100+
struct RawReentrantMutex<R, G> {
101+
owner: AtomicUsize,
102+
lock_count: Cell<usize>,
103+
mutex: R,
104+
get_thread_id: G,
17105
}
18106

19-
fn rlock_release(_zelf: PyObjectRef) {}
107+
impl<R: RawMutexT, G: GetThreadId> RawReentrantMutex<R, G> {
108+
#[inline]
109+
fn lock_internal<F: FnOnce() -> bool>(&self, try_lock: F) -> bool {
110+
let id = self.get_thread_id.nonzero_thread_id().get();
111+
if self.owner.load(Ordering::Relaxed) == id {
112+
self.lock_count.set(
113+
self.lock_count
114+
.get()
115+
.checked_add(1)
116+
.expect("ReentrantMutex lock count overflow"),
117+
);
118+
} else {
119+
if !try_lock() {
120+
return false;
121+
}
122+
self.owner.store(id, Ordering::Relaxed);
123+
debug_assert_eq!(self.lock_count.get(), 0);
124+
self.lock_count.set(1);
125+
}
126+
true
127+
}
128+
}
129+
130+
unsafe impl<R: RawMutexT, G: GetThreadId> RawMutexT for RawReentrantMutex<R, G> {
131+
const INIT: Self = RawReentrantMutex {
132+
owner: AtomicUsize::new(0),
133+
lock_count: Cell::new(0),
134+
mutex: R::INIT,
135+
get_thread_id: G::INIT,
136+
};
137+
138+
type GuardMarker = R::GuardMarker;
139+
140+
#[inline]
141+
fn lock(&self) {
142+
self.lock_internal(|| {
143+
self.mutex.lock();
144+
true
145+
});
146+
}
147+
148+
#[inline]
149+
fn try_lock(&self) -> bool {
150+
self.lock_internal(|| self.mutex.try_lock())
151+
}
152+
153+
#[inline]
154+
fn unlock(&self) {
155+
let lock_count = self.lock_count.get() - 1;
156+
self.lock_count.set(lock_count);
157+
if lock_count == 0 {
158+
self.owner.store(0, Ordering::Relaxed);
159+
self.mutex.unlock();
160+
}
161+
}
162+
}
163+
164+
unsafe impl<R: RawMutexTimed, G: GetThreadId> RawMutexTimed for RawReentrantMutex<R, G> {
165+
type Instant = R::Instant;
166+
type Duration = R::Duration;
167+
#[inline]
168+
fn try_lock_until(&self, timeout: R::Instant) -> bool {
169+
self.lock_internal(|| self.mutex.try_lock_until(timeout))
170+
}
20171

21-
fn rlock_enter(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult {
22-
arg_check!(vm, args, required = [(instance, None)]);
23-
Ok(instance.clone())
172+
#[inline]
173+
fn try_lock_for(&self, timeout: R::Duration) -> bool {
174+
self.lock_internal(|| self.mutex.try_lock_for(timeout))
175+
}
24176
}
25177

26-
fn rlock_exit(
27-
// The context manager protocol requires these, but we don't use them
28-
_instance: PyObjectRef,
29-
_exception_type: PyObjectRef,
30-
_exception_value: PyObjectRef,
31-
_traceback: PyObjectRef,
32-
vm: &VirtualMachine,
33-
) -> PyResult {
34-
Ok(vm.get_none())
178+
type RawRMutex = RawReentrantMutex<RawMutex, RawThreadId>;
179+
#[pyclass(name = "RLock")]
180+
struct PyRLock {
181+
mu: RawRMutex,
35182
}
36183

37-
fn get_ident(_vm: &VirtualMachine) -> u32 {
38-
1
184+
impl PyValue for PyRLock {
185+
fn class(vm: &VirtualMachine) -> PyClassRef {
186+
vm.class("_thread", "RLock")
187+
}
39188
}
40189

41-
fn allocate_lock(vm: &VirtualMachine) -> PyResult {
42-
let lock_class = vm.class("_thread", "RLock");
43-
vm.invoke(&lock_class.into_object(), vec![])
190+
impl fmt::Debug for PyRLock {
191+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192+
f.pad("PyLock")
193+
}
194+
}
195+
196+
impl LockProtocol for PyRLock {
197+
type RawMutex = RawRMutex;
198+
fn mutex(&self) -> &Self::RawMutex {
199+
&self.mu
200+
}
201+
}
202+
203+
#[pyimpl(with(LockProtocol))]
204+
impl PyRLock {}
205+
206+
fn get_ident() -> u64 {
207+
let id = std::thread::current().id();
208+
// TODO: use id.as_u64() once it's stable, until then, ThreadId is just a wrapper
209+
// around NonZeroU64, so this is safe
210+
unsafe { std::mem::transmute(id) }
211+
}
212+
213+
fn allocate_lock() -> PyLock {
214+
PyLock { mu: RawMutex::INIT }
44215
}
45216

46217
pub fn make_module(vm: &VirtualMachine) -> PyObjectRef {
47218
let ctx = &vm.ctx;
48219

49-
let rlock_type = py_class!(ctx, "_thread.RLock", ctx.object(), {
50-
"acquire" => ctx.new_method(rlock_acquire),
51-
"release" => ctx.new_method(rlock_release),
52-
"__enter__" => ctx.new_method(rlock_enter),
53-
"__exit__" => ctx.new_method(rlock_exit),
54-
});
55-
56220
py_module!(vm, "_thread", {
57-
"RLock" => rlock_type,
221+
"RLock" => PyRLock::make_class(ctx),
222+
"LockType" => PyLock::make_class(ctx),
58223
"get_ident" => ctx.new_function(get_ident),
59224
"allocate_lock" => ctx.new_function(allocate_lock),
60225
"TIMEOUT_MAX" => ctx.new_float(TIMEOUT_MAX),

0 commit comments

Comments
 (0)