From bc8677ed091f1f7d57e38f29ac34741d68fe2961 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sun, 19 Apr 2020 01:19:56 +0200 Subject: [PATCH 1/6] use park + unpark instead of spawning --- src/rt/runtime.rs | 101 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 88 insertions(+), 13 deletions(-) diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index a0d88b983..9227d249d 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -8,7 +8,11 @@ use std::thread; use std::time::Duration; use crossbeam_deque::{Injector, Steal, Stealer, Worker}; -use crossbeam_utils::thread::scope; +use crossbeam_utils::{ + sync::{Parker, Unparker}, + thread::scope, +}; +use once_cell::sync::Lazy; use once_cell::unsync::OnceCell; use crate::rt::Reactor; @@ -24,10 +28,16 @@ thread_local! { static YIELD_NOW: Cell = Cell::new(false); } +/// Maximum number of OS threads = processors = machines +static MAXPROCS: Lazy = Lazy::new(|| num_cpus::get().max(1)); + struct Scheduler { /// Set to `true` while a machine is polling the reactor. polling: bool, + /// Available threads. + threads: Vec, + /// Idle processors. processors: Vec, @@ -35,6 +45,13 @@ struct Scheduler { machines: Vec>, } +struct ThreadState { + unparker: Unparker, + parked: Arc, + /// Used to transfer the machine into the thread. + machine_sender: crossbeam_channel::Sender>, +} + /// An async runtime. pub struct Runtime { /// The reactor. @@ -53,9 +70,9 @@ pub struct Runtime { impl Runtime { /// Creates a new runtime. pub fn new() -> Runtime { - let cpus = num_cpus::get().max(1); - let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect(); + let processors: Vec<_> = (0..*MAXPROCS).map(|_| Processor::new()).collect(); let stealers = processors.iter().map(|p| p.worker.stealer()).collect(); + let threads = Vec::with_capacity(*MAXPROCS); Runtime { reactor: Reactor::new().unwrap(), @@ -63,7 +80,8 @@ impl Runtime { stealers, sched: Mutex::new(Scheduler { processors, - machines: Vec::new(), + machines: Vec::with_capacity(*MAXPROCS), + threads, polling: false, }), } @@ -102,18 +120,67 @@ impl Runtime { loop { // Get a list of new machines to start, if any need to be started. - for m in self.make_machines() { + let machines = self.make_machines(); + for m in machines { idle = 0; - s.builder() - .name("async-std/machine".to_string()) - .spawn(move |_| { - abort_on_panic(|| { - let _ = MACHINE.with(|machine| machine.set(m.clone())); - m.run(self); + // println!("getting idle thread"); + let mut sched = self.sched.lock().unwrap(); + 'inner: for thread in &sched.threads { + // grab the first parked thread + if thread + .parked + .compare_and_swap(true, false, Ordering::Acquire) + { + // transfer the machine + thread + .machine_sender + .send(m) + .expect("failed to send machine to thread"); + // unpark the thread + thread.unparker.unpark(); + break 'inner; + } + } + + // no idle thread available, check if we can spawn one + if sched.threads.len() < *MAXPROCS { + // we can spawn one, lets do it + let parked = Arc::new(atomic::AtomicBool::new(true)); + let parked2 = parked.clone(); + let (machine_sender, machine_recv) = crossbeam_channel::bounded(1); + let parker = Parker::new(); + let unparker = parker.unparker().clone(); + + s.builder() + .name("async-std/machine".to_string()) + .spawn(move |_| { + abort_on_panic(|| { + loop { + while parked2.load(Ordering::Acquire) { + parker.park(); + } + // when this thread is unparked, retrieve machine + let m: Arc = + machine_recv.recv().expect("failed to receive machine"); + // store it in the thread local + let _ = MACHINE.with(|machine| machine.set(m.clone())); + // run it + m.run(self); + // when run ends, go into parked mode again + parked2.store(false, Ordering::Relaxed); + } + }) }) - }) - .expect("cannot start a machine thread"); + .expect("cannot start a machine thread"); + + sched.threads.push(ThreadState { + unparker, + parked, + machine_sender, + }); + } + drop(sched); } // Sleep for a bit longer if the scheduler state hasn't changed in a while. @@ -142,6 +209,7 @@ impl Runtime { let m = Arc::new(Machine::new(p)); to_start.push(m.clone()); sched.machines.push(m); + assert!(sched.machines.len() <= *MAXPROCS); } } @@ -326,6 +394,7 @@ impl Machine { }; // Unlock the schedule poll the reactor until new I/O events arrive. + // println!("polling start"); sched.polling = true; drop(sched); rt.reactor.poll(None).unwrap(); @@ -333,6 +402,7 @@ impl Machine { // Lock the scheduler again and re-register the machine. sched = rt.sched.lock().unwrap(); sched.polling = false; + //println!("polling stop"); sched.machines.push(m); runs = 0; @@ -344,9 +414,14 @@ impl Machine { // Return the processor to the scheduler and remove the machine. if let Some(p) = opt_p { + // println!("returning processor to pool"); let mut sched = rt.sched.lock().unwrap(); sched.processors.push(p); + assert!(sched.processors.len() <= *MAXPROCS); + // println!("machines {}", sched.machines.len()); sched.machines.retain(|elem| !ptr::eq(&**elem, self)); + // println!("machines retained {}", sched.machines.len()); + assert!(sched.machines.len() <= *MAXPROCS); } } } From 546ad3d2875b4c79ca4584d31f5b10a870d47bb8 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sun, 19 Apr 2020 01:48:57 +0200 Subject: [PATCH 2/6] fix start state --- src/rt/runtime.rs | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index 9227d249d..efdc314bd 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -122,11 +122,12 @@ impl Runtime { // Get a list of new machines to start, if any need to be started. let machines = self.make_machines(); for m in machines { + // println!("{} -- looking for thread", k); idle = 0; // println!("getting idle thread"); - let mut sched = self.sched.lock().unwrap(); - 'inner: for thread in &sched.threads { + let sched = self.sched.lock().unwrap(); + 'inner: for (i, thread) in sched.threads.iter().enumerate() { // grab the first parked thread if thread .parked @@ -135,18 +136,22 @@ impl Runtime { // transfer the machine thread .machine_sender - .send(m) + .send(m.clone()) .expect("failed to send machine to thread"); // unpark the thread thread.unparker.unpark(); + // println!("{} found thread to unpark {}", k, i); break 'inner; } } - + let len = sched.threads.len(); + drop(sched); // no idle thread available, check if we can spawn one - if sched.threads.len() < *MAXPROCS { + if len < *MAXPROCS { + let i = len; + // println!("{} spawning thread {}", k, i); // we can spawn one, lets do it - let parked = Arc::new(atomic::AtomicBool::new(true)); + let parked = Arc::new(atomic::AtomicBool::new(false)); let parked2 = parked.clone(); let (machine_sender, machine_recv) = crossbeam_channel::bounded(1); let parker = Parker::new(); @@ -159,7 +164,9 @@ impl Runtime { loop { while parked2.load(Ordering::Acquire) { parker.park(); + // TODO: shutdown if idle for too long } + // println!("{} thread unparked {}", k, i); // when this thread is unparked, retrieve machine let m: Arc = machine_recv.recv().expect("failed to receive machine"); @@ -169,18 +176,28 @@ impl Runtime { m.run(self); // when run ends, go into parked mode again parked2.store(false, Ordering::Relaxed); + // println!("thread parked {}", i); } }) }) .expect("cannot start a machine thread"); + let mut sched = self.sched.lock().unwrap(); + + // transfer the machine + machine_sender + .send(m) + .expect("failed to send machine to thread"); + + // println!("started thread {}", i); + sched.threads.push(ThreadState { unparker, parked, machine_sender, }); + drop(sched); } - drop(sched); } // Sleep for a bit longer if the scheduler state hasn't changed in a while. @@ -209,7 +226,6 @@ impl Runtime { let m = Arc::new(Machine::new(p)); to_start.push(m.clone()); sched.machines.push(m); - assert!(sched.machines.len() <= *MAXPROCS); } } @@ -417,11 +433,7 @@ impl Machine { // println!("returning processor to pool"); let mut sched = rt.sched.lock().unwrap(); sched.processors.push(p); - assert!(sched.processors.len() <= *MAXPROCS); - // println!("machines {}", sched.machines.len()); sched.machines.retain(|elem| !ptr::eq(&**elem, self)); - // println!("machines retained {}", sched.machines.len()); - assert!(sched.machines.len() <= *MAXPROCS); } } } From 6306ad9df16409cb051a90fe858e57a38cbb62c4 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sun, 19 Apr 2020 02:12:16 +0200 Subject: [PATCH 3/6] improve idle detection --- src/rt/runtime.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index efdc314bd..d4f3347c8 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -35,6 +35,8 @@ struct Scheduler { /// Set to `true` while a machine is polling the reactor. polling: bool, + progress: bool, + /// Available threads. threads: Vec, @@ -83,6 +85,7 @@ impl Runtime { machines: Vec::with_capacity(*MAXPROCS), threads, polling: false, + progress: false, }), } } @@ -127,7 +130,7 @@ impl Runtime { // println!("getting idle thread"); let sched = self.sched.lock().unwrap(); - 'inner: for (i, thread) in sched.threads.iter().enumerate() { + 'inner: for thread in sched.threads.iter() { // grab the first parked thread if thread .parked @@ -166,7 +169,7 @@ impl Runtime { parker.park(); // TODO: shutdown if idle for too long } - // println!("{} thread unparked {}", k, i); + // println!("thread unparked {}", i); // when this thread is unparked, retrieve machine let m: Arc = machine_recv.recv().expect("failed to receive machine"); @@ -189,8 +192,6 @@ impl Runtime { .send(m) .expect("failed to send machine to thread"); - // println!("started thread {}", i); - sched.threads.push(ThreadState { unparker, parked, @@ -222,11 +223,14 @@ impl Runtime { // If no machine has been polling the reactor in a while, that means the runtime is // overloaded with work and we need to start another machine. if !sched.polling { - if let Some(p) = sched.processors.pop() { - let m = Arc::new(Machine::new(p)); - to_start.push(m.clone()); - sched.machines.push(m); + if !sched.progress { + if let Some(p) = sched.processors.pop() { + let m = Arc::new(Machine::new(p)); + to_start.push(m.clone()); + sched.machines.push(m); + } } + sched.progress = false; } to_start @@ -420,13 +424,16 @@ impl Machine { sched.polling = false; //println!("polling stop"); sched.machines.push(m); + sched.progress = true; runs = 0; fails = 0; } + // println!("thread break"); // When shutting down the thread, take the processor out if still available. let opt_p = self.processor.lock().take(); + // println!("processor {:?}", opt_p.is_some()); // Return the processor to the scheduler and remove the machine. if let Some(p) = opt_p { @@ -435,6 +442,7 @@ impl Machine { sched.processors.push(p); sched.machines.retain(|elem| !ptr::eq(&**elem, self)); } + // println!("thread run stopped"); } } From 00b8366d5579918148a774cbb8f587becfbf65b4 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sun, 19 Apr 2020 12:03:24 +0200 Subject: [PATCH 4/6] more work --- src/rt/runtime.rs | 46 +++++++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index d4f3347c8..dc9f056b0 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -1,4 +1,5 @@ use std::cell::Cell; +use std::cell::RefCell; use std::io; use std::iter; use std::ptr; @@ -13,7 +14,6 @@ use crossbeam_utils::{ thread::scope, }; use once_cell::sync::Lazy; -use once_cell::unsync::OnceCell; use crate::rt::Reactor; use crate::sync::Spinlock; @@ -22,7 +22,7 @@ use crate::utils::{abort_on_panic, random}; thread_local! { /// A reference to the current machine, if the current thread runs tasks. - static MACHINE: OnceCell> = OnceCell::new(); + static MACHINE: RefCell>> = RefCell::new(None); /// This flag is set to true whenever `task::yield_now()` is invoked. static YIELD_NOW: Cell = Cell::new(false); @@ -105,7 +105,7 @@ impl Runtime { MACHINE.with(|machine| { // If the current thread is a worker thread, schedule it onto the current machine. // Otherwise, push it into the global task queue. - match machine.get() { + match &*machine.borrow() { None => { self.injector.push(task); self.notify(); @@ -130,12 +130,13 @@ impl Runtime { // println!("getting idle thread"); let sched = self.sched.lock().unwrap(); - 'inner: for thread in sched.threads.iter() { + 'inner: for (i, thread) in sched.threads.iter().enumerate() { // grab the first parked thread if thread .parked .compare_and_swap(true, false, Ordering::Acquire) { + println!("unpark thread {}", i); // transfer the machine thread .machine_sender @@ -165,21 +166,31 @@ impl Runtime { .spawn(move |_| { abort_on_panic(|| { loop { + println!("checking park loop {}", i); while parked2.load(Ordering::Acquire) { parker.park(); // TODO: shutdown if idle for too long } - // println!("thread unparked {}", i); + println!("thread unparked {}", i); // when this thread is unparked, retrieve machine let m: Arc = machine_recv.recv().expect("failed to receive machine"); + // store it in the thread local - let _ = MACHINE.with(|machine| machine.set(m.clone())); + MACHINE.with(|machine| { + *machine.borrow_mut() = Some(m.clone()); + }); // run it m.run(self); + // when run ends, go into parked mode again - parked2.store(false, Ordering::Relaxed); - // println!("thread parked {}", i); + { + MACHINE.with(|machine| { + *machine.borrow_mut() = None; + }); + parked2.store(true, Ordering::Relaxed); + println!("thread parked {}", i); + } } }) }) @@ -223,13 +234,14 @@ impl Runtime { // If no machine has been polling the reactor in a while, that means the runtime is // overloaded with work and we need to start another machine. if !sched.polling { - if !sched.progress { - if let Some(p) = sched.processors.pop() { - let m = Arc::new(Machine::new(p)); - to_start.push(m.clone()); - sched.machines.push(m); - } + dbg!(sched.progress, sched.polling); + // if !sched.progress { + if let Some(p) = sched.processors.pop() { + let m = Arc::new(Machine::new(p)); + to_start.push(m.clone()); + sched.machines.push(m); } + // } sched.progress = false; } @@ -429,11 +441,11 @@ impl Machine { runs = 0; fails = 0; } - // println!("thread break"); + println!("thread break"); // When shutting down the thread, take the processor out if still available. let opt_p = self.processor.lock().take(); - // println!("processor {:?}", opt_p.is_some()); + println!("processor {:?}", opt_p.is_some()); // Return the processor to the scheduler and remove the machine. if let Some(p) = opt_p { @@ -442,7 +454,7 @@ impl Machine { sched.processors.push(p); sched.machines.retain(|elem| !ptr::eq(&**elem, self)); } - // println!("thread run stopped"); + println!("thread run stopped"); } } From 77d36931129d16e0633bef29fad7c455d4c4325e Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sun, 19 Apr 2020 17:56:37 +0200 Subject: [PATCH 5/6] add tracing ability --- Cargo.toml | 6 ++- src/rt/runtime.rs | 117 ++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 109 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d49eb957b..288af2308 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,13 +52,15 @@ alloc = [ "futures-core/alloc", "pin-project-lite", ] +tracing = [] [dependencies] async-attributes = { version = "1.1.1", optional = true } async-task = { version = "1.3.1", optional = true } broadcaster = { version = "1.0.0", optional = true } crossbeam-channel = { version = "0.4.2", optional = true } -crossbeam-deque = { version = "0.7.3", optional = true } +crossbeam-deque = { git = "https://github.com/stjepang/crossbeam", branch = "deque-len", optional = true} +# crossbeam-deque = { version = "0.7.3", optional = true } crossbeam-queue = { version = "0.2.0", optional = true } crossbeam-utils = { version = "0.7.2", optional = true } futures-core = { version = "0.3.4", optional = true, default-features = false } @@ -74,6 +76,7 @@ once_cell = { version = "1.3.1", optional = true } pin-project-lite = { version = "0.1.4", optional = true } pin-utils = { version = "0.1.0-alpha.4", optional = true } slab = { version = "0.4.2", optional = true } +log-update = "0.1.0" [dev-dependencies] femme = "1.3.0" @@ -89,3 +92,4 @@ required-features = ["unstable"] [[example]] name = "tcp-ipv4-and-6-echo" required-features = ["unstable"] + diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index dc9f056b0..fb8d35870 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -31,6 +31,9 @@ thread_local! { /// Maximum number of OS threads = processors = machines static MAXPROCS: Lazy = Lazy::new(|| num_cpus::get().max(1)); +/// Minimum number of machines that are kept exeuting, to avoid starvation. +const MIN_MACHINES: usize = 2; + struct Scheduler { /// Set to `true` while a machine is polling the reactor. polling: bool, @@ -67,6 +70,9 @@ pub struct Runtime { /// The scheduler state. sched: Mutex, + + #[cfg(feature = "tracing")] + poll_count: atomic::AtomicUsize, } impl Runtime { @@ -80,6 +86,8 @@ impl Runtime { reactor: Reactor::new().unwrap(), injector: Injector::new(), stealers, + #[cfg(feature = "tracing")] + poll_count: atomic::AtomicUsize::new(0), sched: Mutex::new(Scheduler { processors, machines: Vec::with_capacity(*MAXPROCS), @@ -121,6 +129,73 @@ impl Runtime { let mut idle = 0; let mut delay = 0; + #[cfg(feature = "tracing")] + s.builder() + .name("async-std/trace".to_string()) + .spawn(|_| { + use log_update::LogUpdate; + use std::io::stdout; + let mut log_update = LogUpdate::new(stdout()).unwrap(); + + loop { + let (thread_list, machine_list, processor_list, polling) = { + let sched = self.sched.lock().unwrap(); + let thread_list = sched + .threads + .iter() + .map(|t| { + if t.parked.load(Ordering::Relaxed) { + "_" + } else { + "|" + } + }) + .fold(String::new(), |mut s, curr| { + s += " "; + s += curr; + s + }); + let machine_list = sched + .machines + .iter() + .map(|m| match &*m.processor.lock() { + Some(p) => { + let len = p.worker.len() + p.slot.is_some() as usize; + len.to_string() + } + None => "_".to_string(), + }) + .fold(String::new(), |mut s, curr| { + s += " "; + s += &curr; + s + }); + let processor_list = sched + .processors + .iter() + .map(|p| { + let len = p.worker.len() + p.slot.is_some() as usize; + len.to_string() + }) + .fold(String::new(), |mut s, curr| { + s += " "; + s += &curr; + s + }); + (thread_list, machine_list, processor_list, sched.polling) + }; + let glen = self.injector.len(); + let polls = self.poll_count.load(Ordering::Relaxed); + let msg = format!( + "GlobalQueue: {}\nPolls: {} - {}\nThreads:\n{}\nMachines:\n{}\nProcessors:\n{}\n", + glen, polls,polling, thread_list, machine_list, processor_list + ); + log_update.render(&msg).unwrap(); + thread::sleep(Duration::from_millis(10)); + } + }) + .expect("failed to start tracing"); + loop { // Get a list of new machines to start, if any need to be started. let machines = self.make_machines(); @@ -136,7 +211,7 @@ impl Runtime { .parked .compare_and_swap(true, false, Ordering::Acquire) { - println!("unpark thread {}", i); + // println!("unpark thread {}", i); // transfer the machine thread .machine_sender @@ -166,12 +241,12 @@ impl Runtime { .spawn(move |_| { abort_on_panic(|| { loop { - println!("checking park loop {}", i); + // println!("checking park loop {}", i); while parked2.load(Ordering::Acquire) { parker.park(); // TODO: shutdown if idle for too long } - println!("thread unparked {}", i); + // println!("thread unparked {}", i); // when this thread is unparked, retrieve machine let m: Arc = machine_recv.recv().expect("failed to receive machine"); @@ -189,7 +264,7 @@ impl Runtime { *machine.borrow_mut() = None; }); parked2.store(true, Ordering::Relaxed); - println!("thread parked {}", i); + // println!("thread parked {}", i); } } }) @@ -233,13 +308,24 @@ impl Runtime { // If no machine has been polling the reactor in a while, that means the runtime is // overloaded with work and we need to start another machine. - if !sched.polling { - dbg!(sched.progress, sched.polling); + // + // Also ensure that there are at least 2 running machiens to avoid starvation. + if !sched.polling || sched.machines.len() < MIN_MACHINES { + #[cfg(feature = "tracing")] + self.poll_count.fetch_add(1, Ordering::Relaxed); // if !sched.progress { + if let Some(p) = sched.processors.pop() { - let m = Arc::new(Machine::new(p)); - to_start.push(m.clone()); - sched.machines.push(m); + if let Some(m) = sched.machines.iter().find(|m| m.processor.lock().is_none()) { + // find idle m + *m.processor.lock() = Some(p); + to_start.push(m.clone()); + } else { + // no idle m + let m = Arc::new(Machine::new(p)); + to_start.push(m.clone()); + sched.machines.push(m); + } } // } sched.progress = false; @@ -412,7 +498,12 @@ impl Machine { // If another thread is already blocked on the reactor, there is no point in keeping // the current thread around since there is too little work to do. if sched.polling { - break; + if sched.machines.len() > MIN_MACHINES { + break; + } else { + // thread::sleep(Duration::from_micros(10)); + continue; + } } // Take out the machine associated with the current thread. @@ -441,11 +532,11 @@ impl Machine { runs = 0; fails = 0; } - println!("thread break"); + // println!("thread break"); // When shutting down the thread, take the processor out if still available. let opt_p = self.processor.lock().take(); - println!("processor {:?}", opt_p.is_some()); + // println!("processor {:?}", opt_p.is_some()); // Return the processor to the scheduler and remove the machine. if let Some(p) = opt_p { @@ -454,7 +545,7 @@ impl Machine { sched.processors.push(p); sched.machines.retain(|elem| !ptr::eq(&**elem, self)); } - println!("thread run stopped"); + // println!("thread run stopped"); } } From 124aa76c4eb005483dfe904cd37635e8082b76f1 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Sun, 19 Apr 2020 21:45:20 +0200 Subject: [PATCH 6/6] improve rescheduling --- src/rt/runtime.rs | 51 ++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/src/rt/runtime.rs b/src/rt/runtime.rs index fb8d35870..5f5a86f0d 100644 --- a/src/rt/runtime.rs +++ b/src/rt/runtime.rs @@ -50,6 +50,16 @@ struct Scheduler { machines: Vec>, } +impl Scheduler { + /// Get the next machine that has no work yet, if there is any. + fn next_idle_machine(&self) -> Option> { + self.machines + .iter() + .find(|m| !m.has_work()) + .map(|m| m.clone()) + } +} + struct ThreadState { unparker: Unparker, parked: Arc, @@ -258,8 +268,23 @@ impl Runtime { // run it m.run(self); - // when run ends, go into parked mode again + // when run ends { + // see if there are any available processors + let mut sched = self.sched.lock().unwrap(); + if let Some(p) = sched.processors.pop() { + // get a machine + if let Some(m) = sched.next_idle_machine(){ + *m.processor.lock() = Some(p); + MACHINE.with(|machine| { + machine.borrow_mut().replace(m); + }); + continue; + } + } + drop(sched); + + // go into parked mode, no work MACHINE.with(|machine| { *machine.borrow_mut() = None; }); @@ -314,9 +339,8 @@ impl Runtime { #[cfg(feature = "tracing")] self.poll_count.fetch_add(1, Ordering::Relaxed); // if !sched.progress { - if let Some(p) = sched.processors.pop() { - if let Some(m) = sched.machines.iter().find(|m| m.processor.lock().is_none()) { + if let Some(m) = sched.next_idle_machine() { // find idle m *m.processor.lock() = Some(p); to_start.push(m.clone()); @@ -370,6 +394,15 @@ impl Machine { } } + fn has_work(&self) -> bool { + if let Some(p) = &*self.processor.lock() { + // TODO: is this the right check? + p.has_work() + } else { + false + } + } + /// Schedules a task onto the machine. fn schedule(&self, rt: &Runtime, task: Runnable) { match self.processor.lock().as_mut() { @@ -498,12 +531,7 @@ impl Machine { // If another thread is already blocked on the reactor, there is no point in keeping // the current thread around since there is too little work to do. if sched.polling { - if sched.machines.len() > MIN_MACHINES { - break; - } else { - // thread::sleep(Duration::from_micros(10)); - continue; - } + break; } // Take out the machine associated with the current thread. @@ -566,6 +594,11 @@ impl Processor { } } + /// Is there any available work for this processor? + fn has_work(&self) -> bool { + self.slot.is_some() || !self.worker.is_empty() + } + /// Schedules a task to run on this processor. fn schedule(&mut self, rt: &Runtime, task: Runnable) { match self.slot.replace(task) {