Skip to content

Commit bc8677e

Browse files
use park + unpark instead of spawning
1 parent b446cd0 commit bc8677e

File tree

1 file changed

+88
-13
lines changed

1 file changed

+88
-13
lines changed

src/rt/runtime.rs

Lines changed: 88 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@ use std::thread;
88
use std::time::Duration;
99

1010
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
11-
use crossbeam_utils::thread::scope;
11+
use crossbeam_utils::{
12+
sync::{Parker, Unparker},
13+
thread::scope,
14+
};
15+
use once_cell::sync::Lazy;
1216
use once_cell::unsync::OnceCell;
1317

1418
use crate::rt::Reactor;
@@ -24,17 +28,30 @@ thread_local! {
2428
static YIELD_NOW: Cell<bool> = Cell::new(false);
2529
}
2630

31+
/// Maximum number of OS threads = processors = machines
32+
static MAXPROCS: Lazy<usize> = Lazy::new(|| num_cpus::get().max(1));
33+
2734
struct Scheduler {
2835
/// Set to `true` while a machine is polling the reactor.
2936
polling: bool,
3037

38+
/// Available threads.
39+
threads: Vec<ThreadState>,
40+
3141
/// Idle processors.
3242
processors: Vec<Processor>,
3343

3444
/// Running machines.
3545
machines: Vec<Arc<Machine>>,
3646
}
3747

48+
struct ThreadState {
49+
unparker: Unparker,
50+
parked: Arc<atomic::AtomicBool>,
51+
/// Used to transfer the machine into the thread.
52+
machine_sender: crossbeam_channel::Sender<Arc<Machine>>,
53+
}
54+
3855
/// An async runtime.
3956
pub struct Runtime {
4057
/// The reactor.
@@ -53,17 +70,18 @@ pub struct Runtime {
5370
impl Runtime {
5471
/// Creates a new runtime.
5572
pub fn new() -> Runtime {
56-
let cpus = num_cpus::get().max(1);
57-
let processors: Vec<_> = (0..cpus).map(|_| Processor::new()).collect();
73+
let processors: Vec<_> = (0..*MAXPROCS).map(|_| Processor::new()).collect();
5874
let stealers = processors.iter().map(|p| p.worker.stealer()).collect();
75+
let threads = Vec::with_capacity(*MAXPROCS);
5976

6077
Runtime {
6178
reactor: Reactor::new().unwrap(),
6279
injector: Injector::new(),
6380
stealers,
6481
sched: Mutex::new(Scheduler {
6582
processors,
66-
machines: Vec::new(),
83+
machines: Vec::with_capacity(*MAXPROCS),
84+
threads,
6785
polling: false,
6886
}),
6987
}
@@ -102,18 +120,67 @@ impl Runtime {
102120

103121
loop {
104122
// Get a list of new machines to start, if any need to be started.
105-
for m in self.make_machines() {
123+
let machines = self.make_machines();
124+
for m in machines {
106125
idle = 0;
107126

108-
s.builder()
109-
.name("async-std/machine".to_string())
110-
.spawn(move |_| {
111-
abort_on_panic(|| {
112-
let _ = MACHINE.with(|machine| machine.set(m.clone()));
113-
m.run(self);
127+
// println!("getting idle thread");
128+
let mut sched = self.sched.lock().unwrap();
129+
'inner: for thread in &sched.threads {
130+
// grab the first parked thread
131+
if thread
132+
.parked
133+
.compare_and_swap(true, false, Ordering::Acquire)
134+
{
135+
// transfer the machine
136+
thread
137+
.machine_sender
138+
.send(m)
139+
.expect("failed to send machine to thread");
140+
// unpark the thread
141+
thread.unparker.unpark();
142+
break 'inner;
143+
}
144+
}
145+
146+
// no idle thread available, check if we can spawn one
147+
if sched.threads.len() < *MAXPROCS {
148+
// we can spawn one, lets do it
149+
let parked = Arc::new(atomic::AtomicBool::new(true));
150+
let parked2 = parked.clone();
151+
let (machine_sender, machine_recv) = crossbeam_channel::bounded(1);
152+
let parker = Parker::new();
153+
let unparker = parker.unparker().clone();
154+
155+
s.builder()
156+
.name("async-std/machine".to_string())
157+
.spawn(move |_| {
158+
abort_on_panic(|| {
159+
loop {
160+
while parked2.load(Ordering::Acquire) {
161+
parker.park();
162+
}
163+
// when this thread is unparked, retrieve machine
164+
let m: Arc<Machine> =
165+
machine_recv.recv().expect("failed to receive machine");
166+
// store it in the thread local
167+
let _ = MACHINE.with(|machine| machine.set(m.clone()));
168+
// run it
169+
m.run(self);
170+
// when run ends, go into parked mode again
171+
parked2.store(false, Ordering::Relaxed);
172+
}
173+
})
114174
})
115-
})
116-
.expect("cannot start a machine thread");
175+
.expect("cannot start a machine thread");
176+
177+
sched.threads.push(ThreadState {
178+
unparker,
179+
parked,
180+
machine_sender,
181+
});
182+
}
183+
drop(sched);
117184
}
118185

119186
// Sleep for a bit longer if the scheduler state hasn't changed in a while.
@@ -142,6 +209,7 @@ impl Runtime {
142209
let m = Arc::new(Machine::new(p));
143210
to_start.push(m.clone());
144211
sched.machines.push(m);
212+
assert!(sched.machines.len() <= *MAXPROCS);
145213
}
146214
}
147215

@@ -326,13 +394,15 @@ impl Machine {
326394
};
327395

328396
// Unlock the schedule poll the reactor until new I/O events arrive.
397+
// println!("polling start");
329398
sched.polling = true;
330399
drop(sched);
331400
rt.reactor.poll(None).unwrap();
332401

333402
// Lock the scheduler again and re-register the machine.
334403
sched = rt.sched.lock().unwrap();
335404
sched.polling = false;
405+
//println!("polling stop");
336406
sched.machines.push(m);
337407

338408
runs = 0;
@@ -344,9 +414,14 @@ impl Machine {
344414

345415
// Return the processor to the scheduler and remove the machine.
346416
if let Some(p) = opt_p {
417+
// println!("returning processor to pool");
347418
let mut sched = rt.sched.lock().unwrap();
348419
sched.processors.push(p);
420+
assert!(sched.processors.len() <= *MAXPROCS);
421+
// println!("machines {}", sched.machines.len());
349422
sched.machines.retain(|elem| !ptr::eq(&**elem, self));
423+
// println!("machines retained {}", sched.machines.len());
424+
assert!(sched.machines.len() <= *MAXPROCS);
350425
}
351426
}
352427
}

0 commit comments

Comments
 (0)