From 97c51be6b854f7754bf7a4db771b205fa326df72 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Sun, 17 May 2020 16:19:24 -0500 Subject: [PATCH 01/11] Add _thread.start_new_thread --- vm/src/function.rs | 5 +++++ vm/src/stdlib/mod.rs | 2 +- vm/src/stdlib/thread.rs | 49 ++++++++++++++++++++++++++++++++++------- vm/src/vm.rs | 23 +++++++++++++++++-- 4 files changed, 68 insertions(+), 11 deletions(-) diff --git a/vm/src/function.rs b/vm/src/function.rs index 48cceb8f1d..67e2dc0455 100644 --- a/vm/src/function.rs +++ b/vm/src/function.rs @@ -269,6 +269,11 @@ impl KwArgs { self.0.remove(name) } } +impl From> for KwArgs { + fn from(map: HashMap) -> Self { + KwArgs(map) + } +} impl FromArgs for KwArgs where diff --git a/vm/src/stdlib/mod.rs b/vm/src/stdlib/mod.rs index 001e7eb1d4..5c75269e81 100644 --- a/vm/src/stdlib/mod.rs +++ b/vm/src/stdlib/mod.rs @@ -65,7 +65,7 @@ mod winreg; #[cfg(not(any(target_arch = "wasm32", target_os = "redox")))] mod zlib; -pub type StdlibInitFunc = Box PyObjectRef>; +pub type StdlibInitFunc = Box PyObjectRef + Send + Sync>; pub fn get_module_inits() -> HashMap { #[allow(unused_mut)] diff --git a/vm/src/stdlib/thread.rs b/vm/src/stdlib/thread.rs index d0bbbd67eb..651e93d754 100644 --- a/vm/src/stdlib/thread.rs +++ b/vm/src/stdlib/thread.rs @@ -1,7 +1,10 @@ /// Implementation of the _thread module -use crate::function::PyFuncArgs; +use crate::exceptions; +use crate::function::{Args, KwArgs, OptionalArg, PyFuncArgs}; +use crate::obj::objdict::PyDictRef; +use crate::obj::objtuple::PyTupleRef; use crate::obj::objtype::PyClassRef; -use crate::pyobject::{PyClassImpl, PyObjectRef, PyRef, PyResult, PyValue}; +use crate::pyobject::{PyCallable, PyClassImpl, PyObjectRef, PyRef, PyResult, PyValue}; use crate::vm::VirtualMachine; use parking_lot::{ @@ -145,25 +148,55 @@ impl PyRLock { } } -fn get_ident() -> u64 { - let id = std::thread::current().id(); +fn thread_get_ident() -> u64 { + thread_to_id(&std::thread::current()) +} + +fn thread_to_id(t: &std::thread::Thread) -> u64 { // TODO: use id.as_u64() once it's stable, until then, ThreadId is just a wrapper // around NonZeroU64, so this is safe - unsafe { std::mem::transmute(id) } + unsafe { std::mem::transmute(t.id()) } } -fn allocate_lock() -> PyLock { +fn thread_allocate_lock() -> PyLock { PyLock { mu: RawMutex::INIT } } +fn thread_start_new_thread( + func: PyCallable, + args: PyTupleRef, + kwargs: OptionalArg, + vm: &VirtualMachine, +) -> u64 { + let thread_vm = vm.new_thread(); + let handle = std::thread::spawn(move || { + let vm = &thread_vm; + let args = Args::from(args.as_slice().to_owned()); + let kwargs = KwArgs::from(kwargs.map_or_else(Default::default, |k| k.to_attributes())); + if let Err(exc) = func.invoke(PyFuncArgs::from((args, kwargs)), vm) { + let stderr = std::io::stderr(); + let mut stderr = stderr.lock(); + let repr = vm.to_repr(&func.into_object()).ok(); + let repr = repr + .as_ref() + .map_or("", |s| s.as_str()); + writeln!(stderr, "Exception ignored in thread started by: {}", repr) + .and_then(|()| exceptions::write_exception(&mut stderr, vm, &exc)) + .ok(); + } + }); + thread_to_id(&handle.thread()) +} + pub fn make_module(vm: &VirtualMachine) -> PyObjectRef { let ctx = &vm.ctx; py_module!(vm, "_thread", { "RLock" => PyRLock::make_class(ctx), "LockType" => PyLock::make_class(ctx), - "get_ident" => ctx.new_function(get_ident), - "allocate_lock" => ctx.new_function(allocate_lock), + "get_ident" => ctx.new_function(thread_get_ident), + "allocate_lock" => ctx.new_function(thread_allocate_lock), + "start_new_thread" => ctx.new_function(thread_start_new_thread), "TIMEOUT_MAX" => ctx.new_float(TIMEOUT_MAX), }) } diff --git a/vm/src/vm.rs b/vm/src/vm.rs index 6e41436cc4..32a7314ef8 100644 --- a/vm/src/vm.rs +++ b/vm/src/vm.rs @@ -56,7 +56,7 @@ use crate::sysmodule; pub struct VirtualMachine { pub builtins: PyObjectRef, pub sys_module: PyObjectRef, - pub ctx: PyContext, + pub ctx: Arc, pub frames: RefCell>, pub wasm_id: Option, pub exceptions: RefCell>, @@ -190,7 +190,7 @@ impl VirtualMachine { let mut vm = VirtualMachine { builtins: builtins.clone(), sys_module: sysmod.clone(), - ctx, + ctx: Arc::new(ctx), frames: RefCell::new(vec![]), wasm_id: None, exceptions: RefCell::new(vec![]), @@ -280,6 +280,25 @@ impl VirtualMachine { } } + pub(crate) fn new_thread(&self) -> VirtualMachine { + VirtualMachine { + builtins: self.builtins.clone(), + sys_module: self.sys_module.clone(), + ctx: self.ctx.clone(), + frames: RefCell::new(vec![]), + wasm_id: self.wasm_id.clone(), + exceptions: RefCell::new(vec![]), + import_func: self.import_func.clone(), + profile_func: RefCell::new(self.get_none()), + trace_func: RefCell::new(self.get_none()), + use_tracing: Cell::new(false), + recursion_limit: self.recursion_limit.clone(), + signal_handlers: None, + state: self.state.clone(), + initialized: self.initialized, + } + } + pub fn run_code_obj(&self, code: PyCodeRef, scope: Scope) -> PyResult { let frame = Frame::new(code, scope).into_ref(self); self.run_frame_full(frame) From 56cfd83cc2afd7c55353da24d8d7e9113e4a98a5 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Sun, 17 May 2020 17:43:07 -0500 Subject: [PATCH 02/11] Add more _thread APIs --- Lib/_rp_thread.py | 7 --- Lib/threading.py | 1 - vm/src/stdlib/thread.rs | 109 ++++++++++++++++++++++++++-------------- vm/src/vm.rs | 3 ++ 4 files changed, 73 insertions(+), 47 deletions(-) delete mode 100644 Lib/_rp_thread.py diff --git a/Lib/_rp_thread.py b/Lib/_rp_thread.py deleted file mode 100644 index b843c3ec97..0000000000 --- a/Lib/_rp_thread.py +++ /dev/null @@ -1,7 +0,0 @@ -import _thread -import _dummy_thread - -for k in _dummy_thread.__all__ + ['_set_sentinel', 'stack_size']: - if k not in _thread.__dict__: - # print('Populating _thread.%s' % k) - setattr(_thread, k, getattr(_dummy_thread, k)) diff --git a/Lib/threading.py b/Lib/threading.py index 69c8e10eba..bb41456fb1 100644 --- a/Lib/threading.py +++ b/Lib/threading.py @@ -2,7 +2,6 @@ import os as _os import sys as _sys -import _rp_thread # Hack: Trigger populating of RustPython _thread with dummies import _thread from time import monotonic as _time diff --git a/vm/src/stdlib/thread.rs b/vm/src/stdlib/thread.rs index 651e93d754..6277d0e84b 100644 --- a/vm/src/stdlib/thread.rs +++ b/vm/src/stdlib/thread.rs @@ -4,15 +4,17 @@ use crate::function::{Args, KwArgs, OptionalArg, PyFuncArgs}; use crate::obj::objdict::PyDictRef; use crate::obj::objtuple::PyTupleRef; use crate::obj::objtype::PyClassRef; -use crate::pyobject::{PyCallable, PyClassImpl, PyObjectRef, PyRef, PyResult, PyValue}; +use crate::pyobject::{Either, PyCallable, PyClassImpl, PyObjectRef, PyRef, PyResult, PyValue}; use crate::vm::VirtualMachine; use parking_lot::{ lock_api::{RawMutex as RawMutexT, RawMutexTimed, RawReentrantMutex}, RawMutex, RawThreadId, }; -use std::fmt; +use std::cell::RefCell; +use std::io::Write; use std::time::Duration; +use std::{fmt, thread}; #[cfg(not(target_os = "windows"))] const PY_TIMEOUT_MAX: isize = std::isize::MAX; @@ -26,14 +28,43 @@ const TIMEOUT_MAX: f64 = (PY_TIMEOUT_MAX / 1_000_000_000) as f64; struct AcquireArgs { #[pyarg(positional_or_keyword, default = "true")] waitflag: bool, - #[pyarg(positional_or_keyword, default = "-1.0")] - timeout: f64, + #[pyarg(positional_or_keyword, default = "Either::A(-1.0)")] + timeout: Either, +} + +macro_rules! acquire_lock_impl { + ($mu:expr, $args:expr, $vm:expr) => {{ + let (mu, args, vm) = ($mu, $args, $vm); + let timeout = match args.timeout { + Either::A(f) => f, + Either::B(i) => i as f64, + }; + match args.waitflag { + true if timeout == -1.0 => { + mu.lock(); + Ok(true) + } + true if timeout < 0.0 => { + Err(vm.new_value_error("timeout value must be positive".to_owned())) + } + true => { + // TODO: respect TIMEOUT_MAX here + Ok(mu.try_lock_for(Duration::from_secs_f64(timeout))) + } + false if timeout != -1.0 => { + Err(vm + .new_value_error("can't specify a timeout for a non-blocking call".to_owned())) + } + false => Ok(mu.try_lock()), + } + }}; } #[pyclass(name = "lock")] struct PyLock { mu: RawMutex, } +type PyLockRef = PyRef; impl PyValue for PyLock { fn class(vm: &VirtualMachine) -> PyClassRef { @@ -54,21 +85,7 @@ impl PyLock { #[pymethod(name = "__enter__")] #[allow(clippy::float_cmp, clippy::match_bool)] fn acquire(&self, args: AcquireArgs, vm: &VirtualMachine) -> PyResult { - match args.waitflag { - true if args.timeout == -1.0 => { - self.mu.lock(); - Ok(true) - } - true if args.timeout < 0.0 => { - Err(vm.new_value_error("timeout value must be positive".to_owned())) - } - true => Ok(self.mu.try_lock_for(Duration::from_secs_f64(args.timeout))), - false if args.timeout != -1.0 => { - Err(vm - .new_value_error("can't specify a timeout for a non-blocking call".to_owned())) - } - false => Ok(self.mu.try_lock()), - } + acquire_lock_impl!(&self.mu, args, vm) } #[pymethod] #[pymethod(name = "release_lock")] @@ -120,21 +137,7 @@ impl PyRLock { #[pymethod(name = "__enter__")] #[allow(clippy::float_cmp, clippy::match_bool)] fn acquire(&self, args: AcquireArgs, vm: &VirtualMachine) -> PyResult { - match args.waitflag { - true if args.timeout == -1.0 => { - self.mu.lock(); - Ok(true) - } - true if args.timeout < 0.0 => { - Err(vm.new_value_error("timeout value must be positive".to_owned())) - } - true => Ok(self.mu.try_lock_for(Duration::from_secs_f64(args.timeout))), - false if args.timeout != -1.0 => { - Err(vm - .new_value_error("can't specify a timeout for a non-blocking call".to_owned())) - } - false => Ok(self.mu.try_lock()), - } + acquire_lock_impl!(&self.mu, args, vm) } #[pymethod] #[pymethod(name = "release_lock")] @@ -149,10 +152,10 @@ impl PyRLock { } fn thread_get_ident() -> u64 { - thread_to_id(&std::thread::current()) + thread_to_id(&thread::current()) } -fn thread_to_id(t: &std::thread::Thread) -> u64 { +fn thread_to_id(t: &thread::Thread) -> u64 { // TODO: use id.as_u64() once it's stable, until then, ThreadId is just a wrapper // around NonZeroU64, so this is safe unsafe { std::mem::transmute(t.id()) } @@ -167,9 +170,14 @@ fn thread_start_new_thread( args: PyTupleRef, kwargs: OptionalArg, vm: &VirtualMachine, -) -> u64 { +) -> PyResult { let thread_vm = vm.new_thread(); - let handle = std::thread::spawn(move || { + let mut thread_builder = thread::Builder::new(); + let stacksize = vm.state.stacksize.load(); + if stacksize != 0 { + thread_builder = thread_builder.stack_size(stacksize); + } + let res = thread_builder.spawn(move || { let vm = &thread_vm; let args = Args::from(args.as_slice().to_owned()); let kwargs = KwArgs::from(kwargs.map_or_else(Default::default, |k| k.to_attributes())); @@ -184,8 +192,28 @@ fn thread_start_new_thread( .and_then(|()| exceptions::write_exception(&mut stderr, vm, &exc)) .ok(); } + SENTINELS.with(|sents| { + for lock in sents.replace(Default::default()) { + lock.release() + } + }) }); - thread_to_id(&handle.thread()) + res.map(|handle| thread_to_id(&handle.thread())) + .map_err(|err| super::os::convert_io_error(vm, err)) +} + +thread_local!(static SENTINELS: RefCell> = RefCell::default()); + +fn thread_set_sentinel(vm: &VirtualMachine) -> PyLockRef { + let lock = PyLock { mu: RawMutex::INIT }.into_ref(vm); + SENTINELS.with(|sents| sents.borrow_mut().push(lock.clone())); + lock +} + +fn thread_stack_size(size: OptionalArg, vm: &VirtualMachine) -> usize { + let size = size.unwrap_or(0); + // TODO: do validation on this to make sure it's not too small + vm.state.stacksize.swap(size) } pub fn make_module(vm: &VirtualMachine) -> PyObjectRef { @@ -197,6 +225,9 @@ pub fn make_module(vm: &VirtualMachine) -> PyObjectRef { "get_ident" => ctx.new_function(thread_get_ident), "allocate_lock" => ctx.new_function(thread_allocate_lock), "start_new_thread" => ctx.new_function(thread_start_new_thread), + "_set_sentinel" => ctx.new_function(thread_set_sentinel), + "stack_size" => ctx.new_function(thread_stack_size), + "error" => ctx.exceptions.runtime_error.clone(), "TIMEOUT_MAX" => ctx.new_float(TIMEOUT_MAX), }) } diff --git a/vm/src/vm.rs b/vm/src/vm.rs index 32a7314ef8..d0c685ce90 100644 --- a/vm/src/vm.rs +++ b/vm/src/vm.rs @@ -11,6 +11,7 @@ use std::sync::{Arc, Mutex, MutexGuard}; use std::{env, fmt}; use arr_macro::arr; +use crossbeam_utils::atomic::AtomicCell; use num_bigint::BigInt; use num_traits::ToPrimitive; use once_cell::sync::Lazy; @@ -74,6 +75,7 @@ pub struct PyGlobalState { pub settings: PySettings, pub stdlib_inits: HashMap, pub frozen: HashMap, + pub stacksize: AtomicCell, } pub const NSIG: usize = 64; @@ -204,6 +206,7 @@ impl VirtualMachine { settings, stdlib_inits, frozen, + stacksize: AtomicCell::new(0), }), initialized: false, }; From 0b98c185edb84dc516be186351f86e78782c6197 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Sun, 17 May 2020 19:44:34 -0500 Subject: [PATCH 03/11] Disable _thread on wasm --- Lib/importlib/_bootstrap.py | 11 ++++++++++- vm/src/stdlib/mod.rs | 3 ++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/Lib/importlib/_bootstrap.py b/Lib/importlib/_bootstrap.py index 32deef10af..cb35633493 100644 --- a/Lib/importlib/_bootstrap.py +++ b/Lib/importlib/_bootstrap.py @@ -1149,12 +1149,21 @@ def _setup(sys_module, _imp_module): # Directly load built-in modules needed during bootstrap. self_module = sys.modules[__name__] - for builtin_name in ('_thread', '_warnings', '_weakref'): + for builtin_name in ('_warnings', '_weakref'): if builtin_name not in sys.modules: builtin_module = _builtin_from_name(builtin_name) else: builtin_module = sys.modules[builtin_name] setattr(self_module, builtin_name, builtin_module) + # _thread was part of the above loop, but other parts of the code allow for it + # to be None, so we handle it separately here + builtin_name = '_thread' + if builtin_name in sys.modules: + builtin_module = sys.modules[builtin_name] + else: + builtin_spec = BuiltinImporter.find_spec(builtin_name) + builtin_module = builtin_spec and _load_unlocked(builtin_spec) + setattr(self_module, builtin_name, builtin_module) def _install(sys_module, _imp_module): diff --git a/vm/src/stdlib/mod.rs b/vm/src/stdlib/mod.rs index 5c75269e81..2f830ed2c0 100644 --- a/vm/src/stdlib/mod.rs +++ b/vm/src/stdlib/mod.rs @@ -30,6 +30,7 @@ pub mod socket; mod string; #[cfg(feature = "rustpython-compiler")] mod symtable; +#[cfg(not(target_arch = "wasm32"))] mod thread; mod time_module; #[cfg(feature = "rustpython-parser")] @@ -89,7 +90,6 @@ pub fn get_module_inits() -> HashMap { "_random".to_owned() => Box::new(random::make_module), "_string".to_owned() => Box::new(string::make_module), "_struct".to_owned() => Box::new(pystruct::make_module), - "_thread".to_owned() => Box::new(thread::make_module), "time".to_owned() => Box::new(time_module::make_module), "_weakref".to_owned() => Box::new(weakref::make_module), "_imp".to_owned() => Box::new(imp::make_module), @@ -130,6 +130,7 @@ pub fn get_module_inits() -> HashMap { #[cfg(feature = "ssl")] modules.insert("_ssl".to_owned(), Box::new(ssl::make_module)); modules.insert("_subprocess".to_owned(), Box::new(subprocess::make_module)); + modules.insert("_thread".to_owned(), Box::new(thread::make_module)); #[cfg(not(target_os = "redox"))] modules.insert("zlib".to_owned(), Box::new(zlib::make_module)); modules.insert( From 2e536ec7b12978c1cb049c84e7becd338163b09a Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Sun, 17 May 2020 23:50:06 -0500 Subject: [PATCH 04/11] Add some TODO comments --- vm/src/stdlib/thread.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/vm/src/stdlib/thread.rs b/vm/src/stdlib/thread.rs index 6277d0e84b..425a3e3f20 100644 --- a/vm/src/stdlib/thread.rs +++ b/vm/src/stdlib/thread.rs @@ -182,6 +182,7 @@ fn thread_start_new_thread( let args = Args::from(args.as_slice().to_owned()); let kwargs = KwArgs::from(kwargs.map_or_else(Default::default, |k| k.to_attributes())); if let Err(exc) = func.invoke(PyFuncArgs::from((args, kwargs)), vm) { + // TODO: sys.unraisablehook let stderr = std::io::stderr(); let mut stderr = stderr.lock(); let repr = vm.to_repr(&func.into_object()).ok(); From 4c5dc2982d15b0d0c479cd44a4d587d95695b53e Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Mon, 18 May 2020 13:18:04 -0500 Subject: [PATCH 05/11] Box signal_handlers to reduce size_of::() --- vm/src/vm.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vm/src/vm.rs b/vm/src/vm.rs index d0c685ce90..ebcf66afa4 100644 --- a/vm/src/vm.rs +++ b/vm/src/vm.rs @@ -66,7 +66,7 @@ pub struct VirtualMachine { pub trace_func: RefCell, pub use_tracing: Cell, pub recursion_limit: Cell, - pub signal_handlers: Option>, + pub signal_handlers: Option>>, pub state: Arc, pub initialized: bool, } @@ -201,7 +201,7 @@ impl VirtualMachine { trace_func, use_tracing: Cell::new(false), recursion_limit: Cell::new(if cfg!(debug_assertions) { 256 } else { 512 }), - signal_handlers: Some(signal_handlers), + signal_handlers: Some(Box::new(signal_handlers)), state: Arc::new(PyGlobalState { settings, stdlib_inits, From 304f40358684f8e4d108d232b055c51ea6221565 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Sat, 23 May 2020 17:37:48 -0500 Subject: [PATCH 06/11] Fix WeakSet equality testing --- Lib/_weakrefset.py | 3 +++ vm/src/obj/objweakref.rs | 49 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/Lib/_weakrefset.py b/Lib/_weakrefset.py index 304c66f59b..7a84823622 100644 --- a/Lib/_weakrefset.py +++ b/Lib/_weakrefset.py @@ -194,3 +194,6 @@ def union(self, other): def isdisjoint(self, other): return len(self.intersection(other)) == 0 + + def __repr__(self): + return repr(self.data) diff --git a/vm/src/obj/objweakref.rs b/vm/src/obj/objweakref.rs index e39f643f57..64f43a4eec 100644 --- a/vm/src/obj/objweakref.rs +++ b/vm/src/obj/objweakref.rs @@ -1,23 +1,28 @@ use super::objtype::PyClassRef; use crate::function::{OptionalArg, PyFuncArgs}; use crate::pyobject::{ - PyClassImpl, PyContext, PyObject, PyObjectPayload, PyObjectRef, PyRef, PyResult, PyValue, + IdProtocol, PyClassImpl, PyContext, PyObject, PyObjectPayload, PyObjectRef, PyRef, PyResult, + PyValue, TypeProtocol, }; use crate::slots::SlotCall; use crate::vm::VirtualMachine; +use crate::pyhash::PyHash; +use crossbeam_utils::atomic::AtomicCell; use std::sync::{Arc, Weak}; #[pyclass] #[derive(Debug)] pub struct PyWeak { referent: Weak>, + hash: AtomicCell>, } impl PyWeak { pub fn downgrade(obj: &PyObjectRef) -> PyWeak { PyWeak { referent: Arc::downgrade(obj), + hash: AtomicCell::new(None), } } @@ -53,6 +58,48 @@ impl PyWeak { ) -> PyResult> { PyWeak::downgrade(&referent).into_ref_with_type(vm, cls) } + + #[pymethod(magic)] + fn hash(&self, vm: &VirtualMachine) -> PyResult { + match self.hash.load() { + Some(hash) => Ok(hash), + None => { + let obj = self + .upgrade() + .ok_or_else(|| vm.new_type_error("weak object has gone away".to_owned()))?; + let hash = vm._hash(&obj)?; + self.hash.store(Some(hash)); + Ok(hash) + } + } + } + + #[pymethod(magic)] + fn eq(&self, other: PyObjectRef, vm: &VirtualMachine) -> PyResult { + if let Some(other) = other.payload_if_subclass::(vm) { + self.upgrade() + .and_then(|s| other.upgrade().map(|o| (s, o))) + .map_or(Ok(false), |(a, b)| vm.bool_eq(a, b)) + .map(|b| vm.new_bool(b)) + } else { + Ok(vm.ctx.not_implemented()) + } + } + + #[pymethod(magic)] + fn repr(zelf: PyRef) -> String { + let id = zelf.get_id(); + if let Some(o) = zelf.upgrade() { + format!( + "", + id, + o.class().name, + o.get_id(), + ) + } else { + format!("", id) + } + } } pub fn init(context: &PyContext) { From 846aa96db83655a39c1bd70dd9002ece528d6343 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Sat, 23 May 2020 18:44:47 -0500 Subject: [PATCH 07/11] Add test.{lock_tests,test_thread} from CPython 3.8.2 --- Lib/test/lock_tests.py | 949 ++++++++++++++++++++++++++++++++++++++++ Lib/test/test_thread.py | 266 +++++++++++ 2 files changed, 1215 insertions(+) create mode 100644 Lib/test/lock_tests.py create mode 100644 Lib/test/test_thread.py diff --git a/Lib/test/lock_tests.py b/Lib/test/lock_tests.py new file mode 100644 index 0000000000..7b1ad8eb6d --- /dev/null +++ b/Lib/test/lock_tests.py @@ -0,0 +1,949 @@ +""" +Various tests for synchronization primitives. +""" + +import sys +import time +from _thread import start_new_thread, TIMEOUT_MAX +import threading +import unittest +import weakref + +from test import support + + +def _wait(): + # A crude wait/yield function not relying on synchronization primitives. + time.sleep(0.01) + +class Bunch(object): + """ + A bunch of threads. + """ + def __init__(self, f, n, wait_before_exit=False): + """ + Construct a bunch of `n` threads running the same function `f`. + If `wait_before_exit` is True, the threads won't terminate until + do_finish() is called. + """ + self.f = f + self.n = n + self.started = [] + self.finished = [] + self._can_exit = not wait_before_exit + self.wait_thread = support.wait_threads_exit() + self.wait_thread.__enter__() + + def task(): + tid = threading.get_ident() + self.started.append(tid) + try: + f() + finally: + self.finished.append(tid) + while not self._can_exit: + _wait() + + try: + for i in range(n): + start_new_thread(task, ()) + except: + self._can_exit = True + raise + + def wait_for_started(self): + while len(self.started) < self.n: + _wait() + + def wait_for_finished(self): + while len(self.finished) < self.n: + _wait() + # Wait for threads exit + self.wait_thread.__exit__(None, None, None) + + def do_finish(self): + self._can_exit = True + + +class BaseTestCase(unittest.TestCase): + def setUp(self): + self._threads = support.threading_setup() + + def tearDown(self): + support.threading_cleanup(*self._threads) + support.reap_children() + + def assertTimeout(self, actual, expected): + # The waiting and/or time.monotonic() can be imprecise, which + # is why comparing to the expected value would sometimes fail + # (especially under Windows). + self.assertGreaterEqual(actual, expected * 0.6) + # Test nothing insane happened + self.assertLess(actual, expected * 10.0) + + +class BaseLockTests(BaseTestCase): + """ + Tests for both recursive and non-recursive locks. + """ + + def test_constructor(self): + lock = self.locktype() + del lock + + def test_repr(self): + lock = self.locktype() + self.assertRegex(repr(lock), "") + del lock + + def test_locked_repr(self): + lock = self.locktype() + lock.acquire() + self.assertRegex(repr(lock), "") + del lock + + def test_acquire_destroy(self): + lock = self.locktype() + lock.acquire() + del lock + + def test_acquire_release(self): + lock = self.locktype() + lock.acquire() + lock.release() + del lock + + def test_try_acquire(self): + lock = self.locktype() + self.assertTrue(lock.acquire(False)) + lock.release() + + def test_try_acquire_contended(self): + lock = self.locktype() + lock.acquire() + result = [] + def f(): + result.append(lock.acquire(False)) + Bunch(f, 1).wait_for_finished() + self.assertFalse(result[0]) + lock.release() + + def test_acquire_contended(self): + lock = self.locktype() + lock.acquire() + N = 5 + def f(): + lock.acquire() + lock.release() + + b = Bunch(f, N) + b.wait_for_started() + _wait() + self.assertEqual(len(b.finished), 0) + lock.release() + b.wait_for_finished() + self.assertEqual(len(b.finished), N) + + def test_with(self): + lock = self.locktype() + def f(): + lock.acquire() + lock.release() + def _with(err=None): + with lock: + if err is not None: + raise err + _with() + # Check the lock is unacquired + Bunch(f, 1).wait_for_finished() + self.assertRaises(TypeError, _with, TypeError) + # Check the lock is unacquired + Bunch(f, 1).wait_for_finished() + + def test_thread_leak(self): + # The lock shouldn't leak a Thread instance when used from a foreign + # (non-threading) thread. + lock = self.locktype() + def f(): + lock.acquire() + lock.release() + n = len(threading.enumerate()) + # We run many threads in the hope that existing threads ids won't + # be recycled. + Bunch(f, 15).wait_for_finished() + if len(threading.enumerate()) != n: + # There is a small window during which a Thread instance's + # target function has finished running, but the Thread is still + # alive and registered. Avoid spurious failures by waiting a + # bit more (seen on a buildbot). + time.sleep(0.4) + self.assertEqual(n, len(threading.enumerate())) + + def test_timeout(self): + lock = self.locktype() + # Can't set timeout if not blocking + self.assertRaises(ValueError, lock.acquire, 0, 1) + # Invalid timeout values + self.assertRaises(ValueError, lock.acquire, timeout=-100) + self.assertRaises(OverflowError, lock.acquire, timeout=1e100) + self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1) + # TIMEOUT_MAX is ok + lock.acquire(timeout=TIMEOUT_MAX) + lock.release() + t1 = time.monotonic() + self.assertTrue(lock.acquire(timeout=5)) + t2 = time.monotonic() + # Just a sanity test that it didn't actually wait for the timeout. + self.assertLess(t2 - t1, 5) + results = [] + def f(): + t1 = time.monotonic() + results.append(lock.acquire(timeout=0.5)) + t2 = time.monotonic() + results.append(t2 - t1) + Bunch(f, 1).wait_for_finished() + self.assertFalse(results[0]) + self.assertTimeout(results[1], 0.5) + + def test_weakref_exists(self): + lock = self.locktype() + ref = weakref.ref(lock) + self.assertIsNotNone(ref()) + + def test_weakref_deleted(self): + lock = self.locktype() + ref = weakref.ref(lock) + del lock + self.assertIsNone(ref()) + + +class LockTests(BaseLockTests): + """ + Tests for non-recursive, weak locks + (which can be acquired and released from different threads). + """ + def test_reacquire(self): + # Lock needs to be released before re-acquiring. + lock = self.locktype() + phase = [] + + def f(): + lock.acquire() + phase.append(None) + lock.acquire() + phase.append(None) + + with support.wait_threads_exit(): + start_new_thread(f, ()) + while len(phase) == 0: + _wait() + _wait() + self.assertEqual(len(phase), 1) + lock.release() + while len(phase) == 1: + _wait() + self.assertEqual(len(phase), 2) + + def test_different_thread(self): + # Lock can be released from a different thread. + lock = self.locktype() + lock.acquire() + def f(): + lock.release() + b = Bunch(f, 1) + b.wait_for_finished() + lock.acquire() + lock.release() + + def test_state_after_timeout(self): + # Issue #11618: check that lock is in a proper state after a + # (non-zero) timeout. + lock = self.locktype() + lock.acquire() + self.assertFalse(lock.acquire(timeout=0.01)) + lock.release() + self.assertFalse(lock.locked()) + self.assertTrue(lock.acquire(blocking=False)) + + +class RLockTests(BaseLockTests): + """ + Tests for recursive locks. + """ + def test_reacquire(self): + lock = self.locktype() + lock.acquire() + lock.acquire() + lock.release() + lock.acquire() + lock.release() + lock.release() + + def test_release_unacquired(self): + # Cannot release an unacquired lock + lock = self.locktype() + self.assertRaises(RuntimeError, lock.release) + lock.acquire() + lock.acquire() + lock.release() + lock.acquire() + lock.release() + lock.release() + self.assertRaises(RuntimeError, lock.release) + + def test_release_save_unacquired(self): + # Cannot _release_save an unacquired lock + lock = self.locktype() + self.assertRaises(RuntimeError, lock._release_save) + lock.acquire() + lock.acquire() + lock.release() + lock.acquire() + lock.release() + lock.release() + self.assertRaises(RuntimeError, lock._release_save) + + def test_different_thread(self): + # Cannot release from a different thread + lock = self.locktype() + def f(): + lock.acquire() + b = Bunch(f, 1, True) + try: + self.assertRaises(RuntimeError, lock.release) + finally: + b.do_finish() + b.wait_for_finished() + + def test__is_owned(self): + lock = self.locktype() + self.assertFalse(lock._is_owned()) + lock.acquire() + self.assertTrue(lock._is_owned()) + lock.acquire() + self.assertTrue(lock._is_owned()) + result = [] + def f(): + result.append(lock._is_owned()) + Bunch(f, 1).wait_for_finished() + self.assertFalse(result[0]) + lock.release() + self.assertTrue(lock._is_owned()) + lock.release() + self.assertFalse(lock._is_owned()) + + +class EventTests(BaseTestCase): + """ + Tests for Event objects. + """ + + def test_is_set(self): + evt = self.eventtype() + self.assertFalse(evt.is_set()) + evt.set() + self.assertTrue(evt.is_set()) + evt.set() + self.assertTrue(evt.is_set()) + evt.clear() + self.assertFalse(evt.is_set()) + evt.clear() + self.assertFalse(evt.is_set()) + + def _check_notify(self, evt): + # All threads get notified + N = 5 + results1 = [] + results2 = [] + def f(): + results1.append(evt.wait()) + results2.append(evt.wait()) + b = Bunch(f, N) + b.wait_for_started() + _wait() + self.assertEqual(len(results1), 0) + evt.set() + b.wait_for_finished() + self.assertEqual(results1, [True] * N) + self.assertEqual(results2, [True] * N) + + def test_notify(self): + evt = self.eventtype() + self._check_notify(evt) + # Another time, after an explicit clear() + evt.set() + evt.clear() + self._check_notify(evt) + + def test_timeout(self): + evt = self.eventtype() + results1 = [] + results2 = [] + N = 5 + def f(): + results1.append(evt.wait(0.0)) + t1 = time.monotonic() + r = evt.wait(0.5) + t2 = time.monotonic() + results2.append((r, t2 - t1)) + Bunch(f, N).wait_for_finished() + self.assertEqual(results1, [False] * N) + for r, dt in results2: + self.assertFalse(r) + self.assertTimeout(dt, 0.5) + # The event is set + results1 = [] + results2 = [] + evt.set() + Bunch(f, N).wait_for_finished() + self.assertEqual(results1, [True] * N) + for r, dt in results2: + self.assertTrue(r) + + def test_set_and_clear(self): + # Issue #13502: check that wait() returns true even when the event is + # cleared before the waiting thread is woken up. + evt = self.eventtype() + results = [] + timeout = 0.250 + N = 5 + def f(): + results.append(evt.wait(timeout * 4)) + b = Bunch(f, N) + b.wait_for_started() + time.sleep(timeout) + evt.set() + evt.clear() + b.wait_for_finished() + self.assertEqual(results, [True] * N) + + def test_reset_internal_locks(self): + # ensure that condition is still using a Lock after reset + evt = self.eventtype() + with evt._cond: + self.assertFalse(evt._cond.acquire(False)) + evt._reset_internal_locks() + with evt._cond: + self.assertFalse(evt._cond.acquire(False)) + + +class ConditionTests(BaseTestCase): + """ + Tests for condition variables. + """ + + def test_acquire(self): + cond = self.condtype() + # Be default we have an RLock: the condition can be acquired multiple + # times. + cond.acquire() + cond.acquire() + cond.release() + cond.release() + lock = threading.Lock() + cond = self.condtype(lock) + cond.acquire() + self.assertFalse(lock.acquire(False)) + cond.release() + self.assertTrue(lock.acquire(False)) + self.assertFalse(cond.acquire(False)) + lock.release() + with cond: + self.assertFalse(lock.acquire(False)) + + def test_unacquired_wait(self): + cond = self.condtype() + self.assertRaises(RuntimeError, cond.wait) + + def test_unacquired_notify(self): + cond = self.condtype() + self.assertRaises(RuntimeError, cond.notify) + + def _check_notify(self, cond): + # Note that this test is sensitive to timing. If the worker threads + # don't execute in a timely fashion, the main thread may think they + # are further along then they are. The main thread therefore issues + # _wait() statements to try to make sure that it doesn't race ahead + # of the workers. + # Secondly, this test assumes that condition variables are not subject + # to spurious wakeups. The absence of spurious wakeups is an implementation + # detail of Condition Variables in current CPython, but in general, not + # a guaranteed property of condition variables as a programming + # construct. In particular, it is possible that this can no longer + # be conveniently guaranteed should their implementation ever change. + N = 5 + ready = [] + results1 = [] + results2 = [] + phase_num = 0 + def f(): + cond.acquire() + ready.append(phase_num) + result = cond.wait() + cond.release() + results1.append((result, phase_num)) + cond.acquire() + ready.append(phase_num) + result = cond.wait() + cond.release() + results2.append((result, phase_num)) + b = Bunch(f, N) + b.wait_for_started() + # first wait, to ensure all workers settle into cond.wait() before + # we continue. See issues #8799 and #30727. + while len(ready) < 5: + _wait() + ready.clear() + self.assertEqual(results1, []) + # Notify 3 threads at first + cond.acquire() + cond.notify(3) + _wait() + phase_num = 1 + cond.release() + while len(results1) < 3: + _wait() + self.assertEqual(results1, [(True, 1)] * 3) + self.assertEqual(results2, []) + # make sure all awaken workers settle into cond.wait() + while len(ready) < 3: + _wait() + # Notify 5 threads: they might be in their first or second wait + cond.acquire() + cond.notify(5) + _wait() + phase_num = 2 + cond.release() + while len(results1) + len(results2) < 8: + _wait() + self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2) + self.assertEqual(results2, [(True, 2)] * 3) + # make sure all workers settle into cond.wait() + while len(ready) < 5: + _wait() + # Notify all threads: they are all in their second wait + cond.acquire() + cond.notify_all() + _wait() + phase_num = 3 + cond.release() + while len(results2) < 5: + _wait() + self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2) + self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2) + b.wait_for_finished() + + def test_notify(self): + cond = self.condtype() + self._check_notify(cond) + # A second time, to check internal state is still ok. + self._check_notify(cond) + + def test_timeout(self): + cond = self.condtype() + results = [] + N = 5 + def f(): + cond.acquire() + t1 = time.monotonic() + result = cond.wait(0.5) + t2 = time.monotonic() + cond.release() + results.append((t2 - t1, result)) + Bunch(f, N).wait_for_finished() + self.assertEqual(len(results), N) + for dt, result in results: + self.assertTimeout(dt, 0.5) + # Note that conceptually (that"s the condition variable protocol) + # a wait() may succeed even if no one notifies us and before any + # timeout occurs. Spurious wakeups can occur. + # This makes it hard to verify the result value. + # In practice, this implementation has no spurious wakeups. + self.assertFalse(result) + + def test_waitfor(self): + cond = self.condtype() + state = 0 + def f(): + with cond: + result = cond.wait_for(lambda : state==4) + self.assertTrue(result) + self.assertEqual(state, 4) + b = Bunch(f, 1) + b.wait_for_started() + for i in range(4): + time.sleep(0.01) + with cond: + state += 1 + cond.notify() + b.wait_for_finished() + + def test_waitfor_timeout(self): + cond = self.condtype() + state = 0 + success = [] + def f(): + with cond: + dt = time.monotonic() + result = cond.wait_for(lambda : state==4, timeout=0.1) + dt = time.monotonic() - dt + self.assertFalse(result) + self.assertTimeout(dt, 0.1) + success.append(None) + b = Bunch(f, 1) + b.wait_for_started() + # Only increment 3 times, so state == 4 is never reached. + for i in range(3): + time.sleep(0.01) + with cond: + state += 1 + cond.notify() + b.wait_for_finished() + self.assertEqual(len(success), 1) + + +class BaseSemaphoreTests(BaseTestCase): + """ + Common tests for {bounded, unbounded} semaphore objects. + """ + + def test_constructor(self): + self.assertRaises(ValueError, self.semtype, value = -1) + self.assertRaises(ValueError, self.semtype, value = -sys.maxsize) + + def test_acquire(self): + sem = self.semtype(1) + sem.acquire() + sem.release() + sem = self.semtype(2) + sem.acquire() + sem.acquire() + sem.release() + sem.release() + + def test_acquire_destroy(self): + sem = self.semtype() + sem.acquire() + del sem + + def test_acquire_contended(self): + sem = self.semtype(7) + sem.acquire() + N = 10 + sem_results = [] + results1 = [] + results2 = [] + phase_num = 0 + def f(): + sem_results.append(sem.acquire()) + results1.append(phase_num) + sem_results.append(sem.acquire()) + results2.append(phase_num) + b = Bunch(f, 10) + b.wait_for_started() + while len(results1) + len(results2) < 6: + _wait() + self.assertEqual(results1 + results2, [0] * 6) + phase_num = 1 + for i in range(7): + sem.release() + while len(results1) + len(results2) < 13: + _wait() + self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7) + phase_num = 2 + for i in range(6): + sem.release() + while len(results1) + len(results2) < 19: + _wait() + self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6) + # The semaphore is still locked + self.assertFalse(sem.acquire(False)) + # Final release, to let the last thread finish + sem.release() + b.wait_for_finished() + self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1)) + + def test_try_acquire(self): + sem = self.semtype(2) + self.assertTrue(sem.acquire(False)) + self.assertTrue(sem.acquire(False)) + self.assertFalse(sem.acquire(False)) + sem.release() + self.assertTrue(sem.acquire(False)) + + def test_try_acquire_contended(self): + sem = self.semtype(4) + sem.acquire() + results = [] + def f(): + results.append(sem.acquire(False)) + results.append(sem.acquire(False)) + Bunch(f, 5).wait_for_finished() + # There can be a thread switch between acquiring the semaphore and + # appending the result, therefore results will not necessarily be + # ordered. + self.assertEqual(sorted(results), [False] * 7 + [True] * 3 ) + + def test_acquire_timeout(self): + sem = self.semtype(2) + self.assertRaises(ValueError, sem.acquire, False, timeout=1.0) + self.assertTrue(sem.acquire(timeout=0.005)) + self.assertTrue(sem.acquire(timeout=0.005)) + self.assertFalse(sem.acquire(timeout=0.005)) + sem.release() + self.assertTrue(sem.acquire(timeout=0.005)) + t = time.monotonic() + self.assertFalse(sem.acquire(timeout=0.5)) + dt = time.monotonic() - t + self.assertTimeout(dt, 0.5) + + def test_default_value(self): + # The default initial value is 1. + sem = self.semtype() + sem.acquire() + def f(): + sem.acquire() + sem.release() + b = Bunch(f, 1) + b.wait_for_started() + _wait() + self.assertFalse(b.finished) + sem.release() + b.wait_for_finished() + + def test_with(self): + sem = self.semtype(2) + def _with(err=None): + with sem: + self.assertTrue(sem.acquire(False)) + sem.release() + with sem: + self.assertFalse(sem.acquire(False)) + if err: + raise err + _with() + self.assertTrue(sem.acquire(False)) + sem.release() + self.assertRaises(TypeError, _with, TypeError) + self.assertTrue(sem.acquire(False)) + sem.release() + +class SemaphoreTests(BaseSemaphoreTests): + """ + Tests for unbounded semaphores. + """ + + def test_release_unacquired(self): + # Unbounded releases are allowed and increment the semaphore's value + sem = self.semtype(1) + sem.release() + sem.acquire() + sem.acquire() + sem.release() + + +class BoundedSemaphoreTests(BaseSemaphoreTests): + """ + Tests for bounded semaphores. + """ + + def test_release_unacquired(self): + # Cannot go past the initial value + sem = self.semtype() + self.assertRaises(ValueError, sem.release) + sem.acquire() + sem.release() + self.assertRaises(ValueError, sem.release) + + +class BarrierTests(BaseTestCase): + """ + Tests for Barrier objects. + """ + N = 5 + defaultTimeout = 2.0 + + def setUp(self): + self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout) + def tearDown(self): + self.barrier.abort() + + def run_threads(self, f): + b = Bunch(f, self.N-1) + f() + b.wait_for_finished() + + def multipass(self, results, n): + m = self.barrier.parties + self.assertEqual(m, self.N) + for i in range(n): + results[0].append(True) + self.assertEqual(len(results[1]), i * m) + self.barrier.wait() + results[1].append(True) + self.assertEqual(len(results[0]), (i + 1) * m) + self.barrier.wait() + self.assertEqual(self.barrier.n_waiting, 0) + self.assertFalse(self.barrier.broken) + + def test_barrier(self, passes=1): + """ + Test that a barrier is passed in lockstep + """ + results = [[],[]] + def f(): + self.multipass(results, passes) + self.run_threads(f) + + def test_barrier_10(self): + """ + Test that a barrier works for 10 consecutive runs + """ + return self.test_barrier(10) + + def test_wait_return(self): + """ + test the return value from barrier.wait + """ + results = [] + def f(): + r = self.barrier.wait() + results.append(r) + + self.run_threads(f) + self.assertEqual(sum(results), sum(range(self.N))) + + def test_action(self): + """ + Test the 'action' callback + """ + results = [] + def action(): + results.append(True) + barrier = self.barriertype(self.N, action) + def f(): + barrier.wait() + self.assertEqual(len(results), 1) + + self.run_threads(f) + + def test_abort(self): + """ + Test that an abort will put the barrier in a broken state + """ + results1 = [] + results2 = [] + def f(): + try: + i = self.barrier.wait() + if i == self.N//2: + raise RuntimeError + self.barrier.wait() + results1.append(True) + except threading.BrokenBarrierError: + results2.append(True) + except RuntimeError: + self.barrier.abort() + pass + + self.run_threads(f) + self.assertEqual(len(results1), 0) + self.assertEqual(len(results2), self.N-1) + self.assertTrue(self.barrier.broken) + + def test_reset(self): + """ + Test that a 'reset' on a barrier frees the waiting threads + """ + results1 = [] + results2 = [] + results3 = [] + def f(): + i = self.barrier.wait() + if i == self.N//2: + # Wait until the other threads are all in the barrier. + while self.barrier.n_waiting < self.N-1: + time.sleep(0.001) + self.barrier.reset() + else: + try: + self.barrier.wait() + results1.append(True) + except threading.BrokenBarrierError: + results2.append(True) + # Now, pass the barrier again + self.barrier.wait() + results3.append(True) + + self.run_threads(f) + self.assertEqual(len(results1), 0) + self.assertEqual(len(results2), self.N-1) + self.assertEqual(len(results3), self.N) + + + def test_abort_and_reset(self): + """ + Test that a barrier can be reset after being broken. + """ + results1 = [] + results2 = [] + results3 = [] + barrier2 = self.barriertype(self.N) + def f(): + try: + i = self.barrier.wait() + if i == self.N//2: + raise RuntimeError + self.barrier.wait() + results1.append(True) + except threading.BrokenBarrierError: + results2.append(True) + except RuntimeError: + self.barrier.abort() + pass + # Synchronize and reset the barrier. Must synchronize first so + # that everyone has left it when we reset, and after so that no + # one enters it before the reset. + if barrier2.wait() == self.N//2: + self.barrier.reset() + barrier2.wait() + self.barrier.wait() + results3.append(True) + + self.run_threads(f) + self.assertEqual(len(results1), 0) + self.assertEqual(len(results2), self.N-1) + self.assertEqual(len(results3), self.N) + + def test_timeout(self): + """ + Test wait(timeout) + """ + def f(): + i = self.barrier.wait() + if i == self.N // 2: + # One thread is late! + time.sleep(1.0) + # Default timeout is 2.0, so this is shorter. + self.assertRaises(threading.BrokenBarrierError, + self.barrier.wait, 0.5) + self.run_threads(f) + + def test_default_timeout(self): + """ + Test the barrier's default timeout + """ + # create a barrier with a low default timeout + barrier = self.barriertype(self.N, timeout=0.3) + def f(): + i = barrier.wait() + if i == self.N // 2: + # One thread is later than the default timeout of 0.3s. + time.sleep(1.0) + self.assertRaises(threading.BrokenBarrierError, barrier.wait) + self.run_threads(f) + + def test_single_thread(self): + b = self.barriertype(1) + b.wait() + b.wait() diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py new file mode 100644 index 0000000000..9f4801f47e --- /dev/null +++ b/Lib/test/test_thread.py @@ -0,0 +1,266 @@ +import os +import unittest +import random +from test import support +import _thread as thread +import time +import weakref + +from test import lock_tests + +NUMTASKS = 10 +NUMTRIPS = 3 +POLL_SLEEP = 0.010 # seconds = 10 ms + +_print_mutex = thread.allocate_lock() + +def verbose_print(arg): + """Helper function for printing out debugging output.""" + if support.verbose: + with _print_mutex: + print(arg) + + +class BasicThreadTest(unittest.TestCase): + + def setUp(self): + self.done_mutex = thread.allocate_lock() + self.done_mutex.acquire() + self.running_mutex = thread.allocate_lock() + self.random_mutex = thread.allocate_lock() + self.created = 0 + self.running = 0 + self.next_ident = 0 + + key = support.threading_setup() + self.addCleanup(support.threading_cleanup, *key) + + +class ThreadRunningTests(BasicThreadTest): + + def newtask(self): + with self.running_mutex: + self.next_ident += 1 + verbose_print("creating task %s" % self.next_ident) + thread.start_new_thread(self.task, (self.next_ident,)) + self.created += 1 + self.running += 1 + + def task(self, ident): + with self.random_mutex: + delay = random.random() / 10000.0 + verbose_print("task %s will run for %sus" % (ident, round(delay*1e6))) + time.sleep(delay) + verbose_print("task %s done" % ident) + with self.running_mutex: + self.running -= 1 + if self.created == NUMTASKS and self.running == 0: + self.done_mutex.release() + + def test_starting_threads(self): + with support.wait_threads_exit(): + # Basic test for thread creation. + for i in range(NUMTASKS): + self.newtask() + verbose_print("waiting for tasks to complete...") + self.done_mutex.acquire() + verbose_print("all tasks done") + + def test_stack_size(self): + # Various stack size tests. + self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0") + + thread.stack_size(0) + self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default") + + @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix') + def test_nt_and_posix_stack_size(self): + try: + thread.stack_size(4096) + except ValueError: + verbose_print("caught expected ValueError setting " + "stack_size(4096)") + except thread.error: + self.skipTest("platform does not support changing thread stack " + "size") + + fail_msg = "stack_size(%d) failed - should succeed" + for tss in (262144, 0x100000, 0): + thread.stack_size(tss) + self.assertEqual(thread.stack_size(), tss, fail_msg % tss) + verbose_print("successfully set stack_size(%d)" % tss) + + for tss in (262144, 0x100000): + verbose_print("trying stack_size = (%d)" % tss) + self.next_ident = 0 + self.created = 0 + with support.wait_threads_exit(): + for i in range(NUMTASKS): + self.newtask() + + verbose_print("waiting for all tasks to complete") + self.done_mutex.acquire() + verbose_print("all tasks done") + + thread.stack_size(0) + + def test__count(self): + # Test the _count() function. + orig = thread._count() + mut = thread.allocate_lock() + mut.acquire() + started = [] + + def task(): + started.append(None) + mut.acquire() + mut.release() + + with support.wait_threads_exit(): + thread.start_new_thread(task, ()) + while not started: + time.sleep(POLL_SLEEP) + self.assertEqual(thread._count(), orig + 1) + # Allow the task to finish. + mut.release() + # The only reliable way to be sure that the thread ended from the + # interpreter's point of view is to wait for the function object to be + # destroyed. + done = [] + wr = weakref.ref(task, lambda _: done.append(None)) + del task + while not done: + time.sleep(POLL_SLEEP) + self.assertEqual(thread._count(), orig) + + def test_unraisable_exception(self): + def task(): + started.release() + raise ValueError("task failed") + + started = thread.allocate_lock() + with support.catch_unraisable_exception() as cm: + with support.wait_threads_exit(): + started.acquire() + thread.start_new_thread(task, ()) + started.acquire() + + self.assertEqual(str(cm.unraisable.exc_value), "task failed") + self.assertIs(cm.unraisable.object, task) + self.assertEqual(cm.unraisable.err_msg, + "Exception ignored in thread started by") + self.assertIsNotNone(cm.unraisable.exc_traceback) + + +class Barrier: + def __init__(self, num_threads): + self.num_threads = num_threads + self.waiting = 0 + self.checkin_mutex = thread.allocate_lock() + self.checkout_mutex = thread.allocate_lock() + self.checkout_mutex.acquire() + + def enter(self): + self.checkin_mutex.acquire() + self.waiting = self.waiting + 1 + if self.waiting == self.num_threads: + self.waiting = self.num_threads - 1 + self.checkout_mutex.release() + return + self.checkin_mutex.release() + + self.checkout_mutex.acquire() + self.waiting = self.waiting - 1 + if self.waiting == 0: + self.checkin_mutex.release() + return + self.checkout_mutex.release() + + +class BarrierTest(BasicThreadTest): + + def test_barrier(self): + with support.wait_threads_exit(): + self.bar = Barrier(NUMTASKS) + self.running = NUMTASKS + for i in range(NUMTASKS): + thread.start_new_thread(self.task2, (i,)) + verbose_print("waiting for tasks to end") + self.done_mutex.acquire() + verbose_print("tasks done") + + def task2(self, ident): + for i in range(NUMTRIPS): + if ident == 0: + # give it a good chance to enter the next + # barrier before the others are all out + # of the current one + delay = 0 + else: + with self.random_mutex: + delay = random.random() / 10000.0 + verbose_print("task %s will run for %sus" % + (ident, round(delay * 1e6))) + time.sleep(delay) + verbose_print("task %s entering %s" % (ident, i)) + self.bar.enter() + verbose_print("task %s leaving barrier" % ident) + with self.running_mutex: + self.running -= 1 + # Must release mutex before releasing done, else the main thread can + # exit and set mutex to None as part of global teardown; then + # mutex.release() raises AttributeError. + finished = self.running == 0 + if finished: + self.done_mutex.release() + +class LockTests(lock_tests.LockTests): + locktype = thread.allocate_lock + + +class TestForkInThread(unittest.TestCase): + def setUp(self): + self.read_fd, self.write_fd = os.pipe() + + @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork') + @support.reap_threads + def test_forkinthread(self): + status = "not set" + + def thread1(): + nonlocal status + + # fork in a thread + pid = os.fork() + if pid == 0: + # child + try: + os.close(self.read_fd) + os.write(self.write_fd, b"OK") + finally: + os._exit(0) + else: + # parent + os.close(self.write_fd) + pid, status = os.waitpid(pid, 0) + + with support.wait_threads_exit(): + thread.start_new_thread(thread1, ()) + self.assertEqual(os.read(self.read_fd, 2), b"OK", + "Unable to fork() in thread") + self.assertEqual(status, 0) + + def tearDown(self): + try: + os.close(self.read_fd) + except OSError: + pass + + try: + os.close(self.write_fd) + except OSError: + pass + + +if __name__ == "__main__": + unittest.main() From 3a8daf196618c5b7ceb6986c7b900c97aedb932b Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Sat, 23 May 2020 18:46:53 -0500 Subject: [PATCH 08/11] Fix thread tests a bit --- Lib/test/support/__init__.py | 12 ++++++------ Lib/test/test_thread.py | 2 ++ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py index 4ea5c2be91..ee644b1269 100644 --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -3,11 +3,11 @@ if __name__ != 'test.support': raise ImportError('support must be imported from the test package') -# import asyncio.events +import asyncio.events import collections.abc import contextlib import errno -# import faulthandler +import faulthandler import fnmatch import functools # import gc @@ -16,7 +16,7 @@ import importlib import importlib.util import locale -# import logging.handlers +import logging.handlers # import nntplib import os import platform @@ -28,13 +28,13 @@ import subprocess import sys import sysconfig -# import tempfile +import tempfile import _thread -# import threading +import threading import time import types import unittest -# import urllib.error +import urllib.error import warnings from .testresult import get_test_runner diff --git a/Lib/test/test_thread.py b/Lib/test/test_thread.py index 9f4801f47e..8e9c5d1ee3 100644 --- a/Lib/test/test_thread.py +++ b/Lib/test/test_thread.py @@ -104,6 +104,7 @@ def test_nt_and_posix_stack_size(self): thread.stack_size(0) + @unittest.skip("TODO: RUSTPYTHON, weakref destructors") def test__count(self): # Test the _count() function. orig = thread._count() @@ -133,6 +134,7 @@ def task(): time.sleep(POLL_SLEEP) self.assertEqual(thread._count(), orig) + @unittest.skip("TODO: RUSTPYTHON, sys.unraisablehook") def test_unraisable_exception(self): def task(): started.release() From fc2f2d13f1529e1af8b2eb9ba2a21d4aae54e9db Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Sat, 23 May 2020 18:49:19 -0500 Subject: [PATCH 09/11] Add TODO in snippets.test_threading --- tests/snippets/test_threading.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/snippets/test_threading.py b/tests/snippets/test_threading.py index e00d1835fd..41024b360e 100644 --- a/tests/snippets/test_threading.py +++ b/tests/snippets/test_threading.py @@ -5,17 +5,20 @@ def thread_function(name): - output.append("Thread %s: starting" % name) + output.append((name, 0)) time.sleep(2.0) - output.append("Thread %s: finishing" % name) + output.append((name, 1)) -output.append("Main : before creating thread") +output.append((0, 0)) x = threading.Thread(target=thread_function, args=(1, )) -output.append("Main : before running thread") +output.append((0, 1)) x.start() -output.append("Main : wait for the thread to finish") +output.append((0, 2)) x.join() -output.append("Main : all done") +output.append((0, 3)) assert len(output) == 6, output +# CPython has [(1, 0), (0, 2)] for the middle 2, but we have [(0, 2), (1, 0)] +# TODO: maybe fix this, if it turns out to be a problem? +# assert output == [(0, 0), (0, 1), (1, 0), (0, 2), (1, 1), (0, 3)] From a6afd9e855d62d250baa3eddd495cd8bfcef76ca Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Sat, 23 May 2020 18:50:02 -0500 Subject: [PATCH 10/11] Add _thread._count, repr(lock), and respect TIMEOUT_MAX --- vm/src/stdlib/thread.rs | 70 ++++++++++++++++++++++++++++++++++------- vm/src/vm.rs | 2 ++ 2 files changed, 61 insertions(+), 11 deletions(-) diff --git a/vm/src/stdlib/thread.rs b/vm/src/stdlib/thread.rs index 425a3e3f20..1a3ec9a94d 100644 --- a/vm/src/stdlib/thread.rs +++ b/vm/src/stdlib/thread.rs @@ -4,7 +4,10 @@ use crate::function::{Args, KwArgs, OptionalArg, PyFuncArgs}; use crate::obj::objdict::PyDictRef; use crate::obj::objtuple::PyTupleRef; use crate::obj::objtype::PyClassRef; -use crate::pyobject::{Either, PyCallable, PyClassImpl, PyObjectRef, PyRef, PyResult, PyValue}; +use crate::pyobject::{ + Either, IdProtocol, PyCallable, PyClassImpl, PyObjectRef, PyRef, PyResult, PyValue, + TypeProtocol, +}; use crate::vm::VirtualMachine; use parking_lot::{ @@ -16,18 +19,20 @@ use std::io::Write; use std::time::Duration; use std::{fmt, thread}; +// PY_TIMEOUT_MAX is a value in microseconds #[cfg(not(target_os = "windows"))] -const PY_TIMEOUT_MAX: isize = std::isize::MAX; +const PY_TIMEOUT_MAX: isize = std::isize::MAX / 1_000; #[cfg(target_os = "windows")] -const PY_TIMEOUT_MAX: isize = 0xffffffff * 1_000_000; +const PY_TIMEOUT_MAX: isize = 0xffffffff * 1_000; -const TIMEOUT_MAX: f64 = (PY_TIMEOUT_MAX / 1_000_000_000) as f64; +// this is a value in seconds +const TIMEOUT_MAX: f64 = (PY_TIMEOUT_MAX / 1_000_000) as f64; #[derive(FromArgs)] struct AcquireArgs { #[pyarg(positional_or_keyword, default = "true")] - waitflag: bool, + blocking: bool, #[pyarg(positional_or_keyword, default = "Either::A(-1.0)")] timeout: Either, } @@ -39,7 +44,7 @@ macro_rules! acquire_lock_impl { Either::A(f) => f, Either::B(i) => i as f64, }; - match args.waitflag { + match args.blocking { true if timeout == -1.0 => { mu.lock(); Ok(true) @@ -48,7 +53,16 @@ macro_rules! acquire_lock_impl { Err(vm.new_value_error("timeout value must be positive".to_owned())) } true => { - // TODO: respect TIMEOUT_MAX here + // modified from std::time::Duration::from_secs_f64 to avoid a panic. + // TODO: put this in the Duration::try_from_object impl, maybe? + let micros = timeout * 1_000_000.0; + let nanos = timeout * 1_000_000_000.0; + if micros > PY_TIMEOUT_MAX as f64 || nanos < 0.0 || !nanos.is_finite() { + return Err(vm.new_overflow_error( + "timestamp too large to convert to Rust Duration".to_owned(), + )); + } + Ok(mu.try_lock_for(Duration::from_secs_f64(timeout))) } false if timeout != -1.0 => { @@ -59,6 +73,21 @@ macro_rules! acquire_lock_impl { } }}; } +macro_rules! repr_lock_impl { + ($zelf:expr) => {{ + let status = if $zelf.mu.is_locked() { + "locked" + } else { + "unlocked" + }; + format!( + "<{} {} object at {}>", + status, + $zelf.class().name, + $zelf.get_id() + ) + }}; +} #[pyclass(name = "lock")] struct PyLock { @@ -102,6 +131,11 @@ impl PyLock { fn locked(&self) -> bool { self.mu.is_locked() } + + #[pymethod(magic)] + fn repr(zelf: PyRef) -> String { + repr_lock_impl!(zelf) + } } type RawRMutex = RawReentrantMutex; @@ -149,6 +183,11 @@ impl PyRLock { fn exit(&self, _args: PyFuncArgs) { self.release() } + + #[pymethod(magic)] + fn repr(zelf: PyRef) -> String { + repr_lock_impl!(zelf) + } } fn thread_get_ident() -> u64 { @@ -195,12 +234,16 @@ fn thread_start_new_thread( } SENTINELS.with(|sents| { for lock in sents.replace(Default::default()) { - lock.release() + lock.mu.unlock() } - }) + }); + vm.state.thread_count.fetch_sub(1); }); - res.map(|handle| thread_to_id(&handle.thread())) - .map_err(|err| super::os::convert_io_error(vm, err)) + res.map(|handle| { + vm.state.thread_count.fetch_add(1); + thread_to_id(&handle.thread()) + }) + .map_err(|err| super::os::convert_io_error(vm, err)) } thread_local!(static SENTINELS: RefCell> = RefCell::default()); @@ -217,6 +260,10 @@ fn thread_stack_size(size: OptionalArg, vm: &VirtualMachine) -> usize { vm.state.stacksize.swap(size) } +fn thread_count(vm: &VirtualMachine) -> usize { + vm.state.thread_count.load() +} + pub fn make_module(vm: &VirtualMachine) -> PyObjectRef { let ctx = &vm.ctx; @@ -228,6 +275,7 @@ pub fn make_module(vm: &VirtualMachine) -> PyObjectRef { "start_new_thread" => ctx.new_function(thread_start_new_thread), "_set_sentinel" => ctx.new_function(thread_set_sentinel), "stack_size" => ctx.new_function(thread_stack_size), + "_count" => ctx.new_function(thread_count), "error" => ctx.exceptions.runtime_error.clone(), "TIMEOUT_MAX" => ctx.new_float(TIMEOUT_MAX), }) diff --git a/vm/src/vm.rs b/vm/src/vm.rs index ebcf66afa4..9bf5e2f700 100644 --- a/vm/src/vm.rs +++ b/vm/src/vm.rs @@ -76,6 +76,7 @@ pub struct PyGlobalState { pub stdlib_inits: HashMap, pub frozen: HashMap, pub stacksize: AtomicCell, + pub thread_count: AtomicCell, } pub const NSIG: usize = 64; @@ -207,6 +208,7 @@ impl VirtualMachine { stdlib_inits, frozen, stacksize: AtomicCell::new(0), + thread_count: AtomicCell::new(0), }), initialized: false, }; From 90223d85eedbe9b88736d198f894e9b9b8f10bb3 Mon Sep 17 00:00:00 2001 From: Noah <33094578+coolreader18@users.noreply.github.com> Date: Sun, 24 May 2020 21:03:04 -0500 Subject: [PATCH 11/11] Add TODO for parking_lot git dependency --- Cargo.lock | 6 +++--- vm/Cargo.toml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b67dc11b93..02cd179ca0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -835,7 +835,7 @@ dependencies = [ [[package]] name = "lock_api" version = "0.3.4" -source = "git+https://github.com/Amanieu/parking_lot#b338bb5ac6f517d270530e4cb2ed344d136661f5" +source = "git+https://github.com/Amanieu/parking_lot#ecaa94438e570c84f1a4c7db830916890f2ae44c" dependencies = [ "scopeguard", ] @@ -1093,7 +1093,7 @@ checksum = "a86ed3f5f244b372d6b1a00b72ef7f8876d0bc6a78a4c9985c53614041512063" [[package]] name = "parking_lot" version = "0.10.2" -source = "git+https://github.com/Amanieu/parking_lot#b338bb5ac6f517d270530e4cb2ed344d136661f5" +source = "git+https://github.com/Amanieu/parking_lot#ecaa94438e570c84f1a4c7db830916890f2ae44c" dependencies = [ "instant", "lock_api", @@ -1103,7 +1103,7 @@ dependencies = [ [[package]] name = "parking_lot_core" version = "0.7.2" -source = "git+https://github.com/Amanieu/parking_lot#b338bb5ac6f517d270530e4cb2ed344d136661f5" +source = "git+https://github.com/Amanieu/parking_lot#ecaa94438e570c84f1a4c7db830916890f2ae44c" dependencies = [ "cfg-if", "cloudabi", diff --git a/vm/Cargo.toml b/vm/Cargo.toml index cd050a914a..28b0cc435d 100644 --- a/vm/Cargo.toml +++ b/vm/Cargo.toml @@ -71,7 +71,7 @@ smallbox = "0.8" bstr = "0.2.12" crossbeam-utils = "0.7" generational-arena = "0.2" -parking_lot = { git = "https://github.com/Amanieu/parking_lot" } +parking_lot = { git = "https://github.com/Amanieu/parking_lot" } # TODO: use published version ## unicode stuff unicode_names2 = "0.4"