Skip to content

Commit 00b8366

Browse files
more work
1 parent 6306ad9 commit 00b8366

File tree

1 file changed

+29
-17
lines changed

1 file changed

+29
-17
lines changed

src/rt/runtime.rs

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::cell::Cell;
2+
use std::cell::RefCell;
23
use std::io;
34
use std::iter;
45
use std::ptr;
@@ -13,7 +14,6 @@ use crossbeam_utils::{
1314
thread::scope,
1415
};
1516
use once_cell::sync::Lazy;
16-
use once_cell::unsync::OnceCell;
1717

1818
use crate::rt::Reactor;
1919
use crate::sync::Spinlock;
@@ -22,7 +22,7 @@ use crate::utils::{abort_on_panic, random};
2222

2323
thread_local! {
2424
/// A reference to the current machine, if the current thread runs tasks.
25-
static MACHINE: OnceCell<Arc<Machine>> = OnceCell::new();
25+
static MACHINE: RefCell<Option<Arc<Machine>>> = RefCell::new(None);
2626

2727
/// This flag is set to true whenever `task::yield_now()` is invoked.
2828
static YIELD_NOW: Cell<bool> = Cell::new(false);
@@ -105,7 +105,7 @@ impl Runtime {
105105
MACHINE.with(|machine| {
106106
// If the current thread is a worker thread, schedule it onto the current machine.
107107
// Otherwise, push it into the global task queue.
108-
match machine.get() {
108+
match &*machine.borrow() {
109109
None => {
110110
self.injector.push(task);
111111
self.notify();
@@ -130,12 +130,13 @@ impl Runtime {
130130

131131
// println!("getting idle thread");
132132
let sched = self.sched.lock().unwrap();
133-
'inner: for thread in sched.threads.iter() {
133+
'inner: for (i, thread) in sched.threads.iter().enumerate() {
134134
// grab the first parked thread
135135
if thread
136136
.parked
137137
.compare_and_swap(true, false, Ordering::Acquire)
138138
{
139+
println!("unpark thread {}", i);
139140
// transfer the machine
140141
thread
141142
.machine_sender
@@ -165,21 +166,31 @@ impl Runtime {
165166
.spawn(move |_| {
166167
abort_on_panic(|| {
167168
loop {
169+
println!("checking park loop {}", i);
168170
while parked2.load(Ordering::Acquire) {
169171
parker.park();
170172
// TODO: shutdown if idle for too long
171173
}
172-
// println!("thread unparked {}", i);
174+
println!("thread unparked {}", i);
173175
// when this thread is unparked, retrieve machine
174176
let m: Arc<Machine> =
175177
machine_recv.recv().expect("failed to receive machine");
178+
176179
// store it in the thread local
177-
let _ = MACHINE.with(|machine| machine.set(m.clone()));
180+
MACHINE.with(|machine| {
181+
*machine.borrow_mut() = Some(m.clone());
182+
});
178183
// run it
179184
m.run(self);
185+
180186
// when run ends, go into parked mode again
181-
parked2.store(false, Ordering::Relaxed);
182-
// println!("thread parked {}", i);
187+
{
188+
MACHINE.with(|machine| {
189+
*machine.borrow_mut() = None;
190+
});
191+
parked2.store(true, Ordering::Relaxed);
192+
println!("thread parked {}", i);
193+
}
183194
}
184195
})
185196
})
@@ -223,13 +234,14 @@ impl Runtime {
223234
// If no machine has been polling the reactor in a while, that means the runtime is
224235
// overloaded with work and we need to start another machine.
225236
if !sched.polling {
226-
if !sched.progress {
227-
if let Some(p) = sched.processors.pop() {
228-
let m = Arc::new(Machine::new(p));
229-
to_start.push(m.clone());
230-
sched.machines.push(m);
231-
}
237+
dbg!(sched.progress, sched.polling);
238+
// if !sched.progress {
239+
if let Some(p) = sched.processors.pop() {
240+
let m = Arc::new(Machine::new(p));
241+
to_start.push(m.clone());
242+
sched.machines.push(m);
232243
}
244+
// }
233245
sched.progress = false;
234246
}
235247

@@ -429,11 +441,11 @@ impl Machine {
429441
runs = 0;
430442
fails = 0;
431443
}
432-
// println!("thread break");
444+
println!("thread break");
433445

434446
// When shutting down the thread, take the processor out if still available.
435447
let opt_p = self.processor.lock().take();
436-
// println!("processor {:?}", opt_p.is_some());
448+
println!("processor {:?}", opt_p.is_some());
437449

438450
// Return the processor to the scheduler and remove the machine.
439451
if let Some(p) = opt_p {
@@ -442,7 +454,7 @@ impl Machine {
442454
sched.processors.push(p);
443455
sched.machines.retain(|elem| !ptr::eq(&**elem, self));
444456
}
445-
// println!("thread run stopped");
457+
println!("thread run stopped");
446458
}
447459
}
448460

0 commit comments

Comments
 (0)