diff --git a/Cargo.toml b/Cargo.toml index d49eb957b..288af2308 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,13 +52,15 @@ alloc = [ "futures-core/alloc", "pin-project-lite", ] +tracing = [] [dependencies] async-attributes = { version = "1.1.1", optional = true } async-task = { version = "1.3.1", optional = true } broadcaster = { version = "1.0.0", optional = true } crossbeam-channel = { version = "0.4.2", optional = true } -crossbeam-deque = { version = "0.7.3", optional = true } +crossbeam-deque = { git = "https://github.com/stjepang/crossbeam", branch = "deque-len", optional = true} +# crossbeam-deque = { version = "0.7.3", optional = true } crossbeam-queue = { version = "0.2.0", optional = true } crossbeam-utils = { version = "0.7.2", optional = true } futures-core = { version = "0.3.4", optional = true, default-features = false } @@ -74,6 +76,7 @@ once_cell = { version = "1.3.1", optional = true } pin-project-lite = { version = "0.1.4", optional = true } pin-utils = { version = "0.1.0-alpha.4", optional = true } slab = { version = "0.4.2", optional = true } +log-update = "0.1.0" [dev-dependencies] femme = "1.3.0" @@ -89,3 +92,4 @@ required-features = ["unstable"] [[example]] name = "tcp-ipv4-and-6-echo" required-features = ["unstable"] + diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index a0d88b983..5f5a86f0d 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -1,4 +1,5 @@ use std::cell::Cell; +use std::cell::RefCell; use std::io; use std::iter; use std::ptr; @@ -8,8 +9,11 @@ use std::thread; use std::time::Duration; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; -use crossbeam_utils::thread::scope; -use once_cell::unsync::OnceCell; +use crossbeam_utils::{ + sync::{Parker, Unparker}, + thread::scope, +}; +use once_cell::sync::Lazy; use crate::rt::Reactor; use crate::sync::Spinlock; @@ -18,16 +22,27 @@ use crate::utils::{abort_on_panic, random}; thread_local! { /// A reference to the current machine, if the current thread runs tasks. - static MACHINE: OnceCell> = OnceCell::new(); + static MACHINE: RefCell>> = RefCell::new(None); /// This flag is set to true whenever `task::yield_now()` is invoked. static YIELD_NOW: Cell = Cell::new(false); } +/// Maximum number of OS threads = processors = machines +static MAXPROCS: Lazy = Lazy::new(|| num_cpus::get().max(1)); + +/// Minimum number of machines that are kept exeuting, to avoid starvation. +const MIN_MACHINES: usize = 2; + struct Scheduler { /// Set to `true` while a machine is polling the reactor. polling: bool, + progress: bool, + + /// Available threads. + threads: Vec, + /// Idle processors. processors: Vec, @@ -35,6 +50,23 @@ struct Scheduler { machines: Vec>, } +impl Scheduler { + /// Get the next machine that has no work yet, if there is any. + fn next_idle_machine(&self) -> Option> { + self.machines + .iter() + .find(|m| !m.has_work()) + .map(|m| m.clone()) + } +} + +struct ThreadState { + unparker: Unparker, + parked: Arc, + /// Used to transfer the machine into the thread. + machine_sender: crossbeam_channel::Sender>, +} + /// An async runtime. pub struct Runtime { /// The reactor. @@ -48,23 +80,30 @@ pub struct Runtime { /// The scheduler state. sched: Mutex, + + #[cfg(feature = "tracing")] + poll_count: atomic::AtomicUsize, } impl Runtime { /// Creates a new runtime. pub fn new() -> Runtime { - let cpus = num_cpus::get().max(1); - let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect(); + let processors: Vec<_> = (0..*MAXPROCS).map(|_| Processor::new()).collect(); let stealers = processors.iter().map(|p| p.worker.stealer()).collect(); + let threads = Vec::with_capacity(*MAXPROCS); Runtime { reactor: Reactor::new().unwrap(), injector: Injector::new(), stealers, + #[cfg(feature = "tracing")] + poll_count: atomic::AtomicUsize::new(0), sched: Mutex::new(Scheduler { processors, - machines: Vec::new(), + machines: Vec::with_capacity(*MAXPROCS), + threads, polling: false, + progress: false, }), } } @@ -84,7 +123,7 @@ impl Runtime { MACHINE.with(|machine| { // If the current thread is a worker thread, schedule it onto the current machine. // Otherwise, push it into the global task queue. - match machine.get() { + match &*machine.borrow() { None => { self.injector.push(task); self.notify(); @@ -100,20 +139,177 @@ impl Runtime { let mut idle = 0; let mut delay = 0; + #[cfg(feature = "tracing")] + s.builder() + .name("async-std/trace".to_string()) + .spawn(|_| { + use log_update::LogUpdate; + use std::io::stdout; + let mut log_update = LogUpdate::new(stdout()).unwrap(); + + loop { + let (thread_list, machine_list, processor_list, polling) = { + let sched = self.sched.lock().unwrap(); + let thread_list = sched + .threads + .iter() + .map(|t| { + if t.parked.load(Ordering::Relaxed) { + "_" + } else { + "|" + } + }) + .fold(String::new(), |mut s, curr| { + s += " "; + s += curr; + s + }); + let machine_list = sched + .machines + .iter() + .map(|m| match &*m.processor.lock() { + Some(p) => { + let len = p.worker.len() + p.slot.is_some() as usize; + len.to_string() + } + None => "_".to_string(), + }) + .fold(String::new(), |mut s, curr| { + s += " "; + s += &curr; + s + }); + let processor_list = sched + .processors + .iter() + .map(|p| { + let len = p.worker.len() + p.slot.is_some() as usize; + len.to_string() + }) + .fold(String::new(), |mut s, curr| { + s += " "; + s += &curr; + s + }); + (thread_list, machine_list, processor_list, sched.polling) + }; + let glen = self.injector.len(); + let polls = self.poll_count.load(Ordering::Relaxed); + let msg = format!( + "GlobalQueue: {}\nPolls: {} - {}\nThreads:\n{}\nMachines:\n{}\nProcessors:\n{}\n", + glen, polls,polling, thread_list, machine_list, processor_list + ); + log_update.render(&msg).unwrap(); + thread::sleep(Duration::from_millis(10)); + } + }) + .expect("failed to start tracing"); + loop { // Get a list of new machines to start, if any need to be started. - for m in self.make_machines() { + let machines = self.make_machines(); + for m in machines { + // println!("{} -- looking for thread", k); idle = 0; - s.builder() - .name("async-std/machine".to_string()) - .spawn(move |_| { - abort_on_panic(|| { - let _ = MACHINE.with(|machine| machine.set(m.clone())); - m.run(self); + // println!("getting idle thread"); + let sched = self.sched.lock().unwrap(); + 'inner: for (i, thread) in sched.threads.iter().enumerate() { + // grab the first parked thread + if thread + .parked + .compare_and_swap(true, false, Ordering::Acquire) + { + // println!("unpark thread {}", i); + // transfer the machine + thread + .machine_sender + .send(m.clone()) + .expect("failed to send machine to thread"); + // unpark the thread + thread.unparker.unpark(); + // println!("{} found thread to unpark {}", k, i); + break 'inner; + } + } + let len = sched.threads.len(); + drop(sched); + // no idle thread available, check if we can spawn one + if len < *MAXPROCS { + let i = len; + // println!("{} spawning thread {}", k, i); + // we can spawn one, lets do it + let parked = Arc::new(atomic::AtomicBool::new(false)); + let parked2 = parked.clone(); + let (machine_sender, machine_recv) = crossbeam_channel::bounded(1); + let parker = Parker::new(); + let unparker = parker.unparker().clone(); + + s.builder() + .name("async-std/machine".to_string()) + .spawn(move |_| { + abort_on_panic(|| { + loop { + // println!("checking park loop {}", i); + while parked2.load(Ordering::Acquire) { + parker.park(); + // TODO: shutdown if idle for too long + } + // println!("thread unparked {}", i); + // when this thread is unparked, retrieve machine + let m: Arc = + machine_recv.recv().expect("failed to receive machine"); + + // store it in the thread local + MACHINE.with(|machine| { + *machine.borrow_mut() = Some(m.clone()); + }); + // run it + m.run(self); + + // when run ends + { + // see if there are any available processors + let mut sched = self.sched.lock().unwrap(); + if let Some(p) = sched.processors.pop() { + // get a machine + if let Some(m) = sched.next_idle_machine(){ + *m.processor.lock() = Some(p); + MACHINE.with(|machine| { + machine.borrow_mut().replace(m); + }); + continue; + } + } + drop(sched); + + // go into parked mode, no work + MACHINE.with(|machine| { + *machine.borrow_mut() = None; + }); + parked2.store(true, Ordering::Relaxed); + // println!("thread parked {}", i); + } + } + }) }) - }) - .expect("cannot start a machine thread"); + .expect("cannot start a machine thread"); + + let mut sched = self.sched.lock().unwrap(); + + // transfer the machine + machine_sender + .send(m) + .expect("failed to send machine to thread"); + + sched.threads.push(ThreadState { + unparker, + parked, + machine_sender, + }); + drop(sched); + } } // Sleep for a bit longer if the scheduler state hasn't changed in a while. @@ -137,12 +333,26 @@ impl Runtime { // If no machine has been polling the reactor in a while, that means the runtime is // overloaded with work and we need to start another machine. - if !sched.polling { + // + // Also ensure that there are at least 2 running machiens to avoid starvation. + if !sched.polling || sched.machines.len() < MIN_MACHINES { + #[cfg(feature = "tracing")] + self.poll_count.fetch_add(1, Ordering::Relaxed); + // if !sched.progress { if let Some(p) = sched.processors.pop() { - let m = Arc::new(Machine::new(p)); - to_start.push(m.clone()); - sched.machines.push(m); + if let Some(m) = sched.next_idle_machine() { + // find idle m + *m.processor.lock() = Some(p); + to_start.push(m.clone()); + } else { + // no idle m + let m = Arc::new(Machine::new(p)); + to_start.push(m.clone()); + sched.machines.push(m); + } } + // } + sched.progress = false; } to_start @@ -184,6 +394,15 @@ impl Machine { } } + fn has_work(&self) -> bool { + if let Some(p) = &*self.processor.lock() { + // TODO: is this the right check? + p.has_work() + } else { + false + } + } + /// Schedules a task onto the machine. fn schedule(&self, rt: &Runtime, task: Runnable) { match self.processor.lock().as_mut() { @@ -326,6 +545,7 @@ impl Machine { }; // Unlock the schedule poll the reactor until new I/O events arrive. + // println!("polling start"); sched.polling = true; drop(sched); rt.reactor.poll(None).unwrap(); @@ -333,21 +553,27 @@ impl Machine { // Lock the scheduler again and re-register the machine. sched = rt.sched.lock().unwrap(); sched.polling = false; + //println!("polling stop"); sched.machines.push(m); + sched.progress = true; runs = 0; fails = 0; } + // println!("thread break"); // When shutting down the thread, take the processor out if still available. let opt_p = self.processor.lock().take(); + // println!("processor {:?}", opt_p.is_some()); // Return the processor to the scheduler and remove the machine. if let Some(p) = opt_p { + // println!("returning processor to pool"); let mut sched = rt.sched.lock().unwrap(); sched.processors.push(p); sched.machines.retain(|elem| !ptr::eq(&**elem, self)); } + // println!("thread run stopped"); } } @@ -368,6 +594,11 @@ impl Processor { } } + /// Is there any available work for this processor? + fn has_work(&self) -> bool { + self.slot.is_some() || !self.worker.is_empty() + } + /// Schedules a task to run on this processor. fn schedule(&mut self, rt: &Runtime, task: Runnable) { match self.slot.replace(task) {