Skip to content

Commit 124aa76

Browse files
improve rescheduling
1 parent 77d3693 commit 124aa76

File tree

1 file changed

+42
-9
lines changed

1 file changed

+42
-9
lines changed

src/rt/runtime.rs

Lines changed: 42 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,16 @@ struct Scheduler {
5050
machines: Vec<Arc<Machine>>,
5151
}
5252

53+
impl Scheduler {
54+
/// Get the next machine that has no work yet, if there is any.
55+
fn next_idle_machine(&self) -> Option<Arc<Machine>> {
56+
self.machines
57+
.iter()
58+
.find(|m| !m.has_work())
59+
.map(|m| m.clone())
60+
}
61+
}
62+
5363
struct ThreadState {
5464
unparker: Unparker,
5565
parked: Arc<atomic::AtomicBool>,
@@ -258,8 +268,23 @@ impl Runtime {
258268
// run it
259269
m.run(self);
260270

261-
// when run ends, go into parked mode again
271+
// when run ends
262272
{
273+
// see if there are any available processors
274+
let mut sched = self.sched.lock().unwrap();
275+
if let Some(p) = sched.processors.pop() {
276+
// get a machine
277+
if let Some(m) = sched.next_idle_machine(){
278+
*m.processor.lock() = Some(p);
279+
MACHINE.with(|machine| {
280+
machine.borrow_mut().replace(m);
281+
});
282+
continue;
283+
}
284+
}
285+
drop(sched);
286+
287+
// go into parked mode, no work
263288
MACHINE.with(|machine| {
264289
*machine.borrow_mut() = None;
265290
});
@@ -314,9 +339,8 @@ impl Runtime {
314339
#[cfg(feature = "tracing")]
315340
self.poll_count.fetch_add(1, Ordering::Relaxed);
316341
// if !sched.progress {
317-
318342
if let Some(p) = sched.processors.pop() {
319-
if let Some(m) = sched.machines.iter().find(|m| m.processor.lock().is_none()) {
343+
if let Some(m) = sched.next_idle_machine() {
320344
// find idle m
321345
*m.processor.lock() = Some(p);
322346
to_start.push(m.clone());
@@ -370,6 +394,15 @@ impl Machine {
370394
}
371395
}
372396

397+
fn has_work(&self) -> bool {
398+
if let Some(p) = &*self.processor.lock() {
399+
// TODO: is this the right check?
400+
p.has_work()
401+
} else {
402+
false
403+
}
404+
}
405+
373406
/// Schedules a task onto the machine.
374407
fn schedule(&self, rt: &Runtime, task: Runnable) {
375408
match self.processor.lock().as_mut() {
@@ -498,12 +531,7 @@ impl Machine {
498531
// If another thread is already blocked on the reactor, there is no point in keeping
499532
// the current thread around since there is too little work to do.
500533
if sched.polling {
501-
if sched.machines.len() > MIN_MACHINES {
502-
break;
503-
} else {
504-
// thread::sleep(Duration::from_micros(10));
505-
continue;
506-
}
534+
break;
507535
}
508536

509537
// Take out the machine associated with the current thread.
@@ -566,6 +594,11 @@ impl Processor {
566594
}
567595
}
568596

597+
/// Is there any available work for this processor?
598+
fn has_work(&self) -> bool {
599+
self.slot.is_some() || !self.worker.is_empty()
600+
}
601+
569602
/// Schedules a task to run on this processor.
570603
fn schedule(&mut self, rt: &Runtime, task: Runnable) {
571604
match self.slot.replace(task) {

0 commit comments

Comments
 (0)