1
1
use std:: cell:: Cell ;
2
+ use std:: cell:: RefCell ;
2
3
use std:: io;
3
4
use std:: iter;
4
5
use std:: ptr;
@@ -13,7 +14,6 @@ use crossbeam_utils::{
13
14
thread:: scope,
14
15
} ;
15
16
use once_cell:: sync:: Lazy ;
16
- use once_cell:: unsync:: OnceCell ;
17
17
18
18
use crate :: rt:: Reactor ;
19
19
use crate :: sync:: Spinlock ;
@@ -22,7 +22,7 @@ use crate::utils::{abort_on_panic, random};
22
22
23
23
thread_local ! {
24
24
/// A reference to the current machine, if the current thread runs tasks.
25
- static MACHINE : OnceCell < Arc <Machine >> = OnceCell :: new( ) ;
25
+ static MACHINE : RefCell < Option < Arc <Machine >>> = RefCell :: new( None ) ;
26
26
27
27
/// This flag is set to true whenever `task::yield_now()` is invoked.
28
28
static YIELD_NOW : Cell <bool > = Cell :: new( false ) ;
@@ -105,7 +105,7 @@ impl Runtime {
105
105
MACHINE . with ( |machine| {
106
106
// If the current thread is a worker thread, schedule it onto the current machine.
107
107
// Otherwise, push it into the global task queue.
108
- match machine. get ( ) {
108
+ match & * machine. borrow ( ) {
109
109
None => {
110
110
self . injector . push ( task) ;
111
111
self . notify ( ) ;
@@ -130,12 +130,13 @@ impl Runtime {
130
130
131
131
// println!("getting idle thread");
132
132
let sched = self . sched . lock ( ) . unwrap ( ) ;
133
- ' inner: for thread in sched. threads . iter ( ) {
133
+ ' inner: for ( i , thread) in sched. threads . iter ( ) . enumerate ( ) {
134
134
// grab the first parked thread
135
135
if thread
136
136
. parked
137
137
. compare_and_swap ( true , false , Ordering :: Acquire )
138
138
{
139
+ println ! ( "unpark thread {}" , i) ;
139
140
// transfer the machine
140
141
thread
141
142
. machine_sender
@@ -165,21 +166,31 @@ impl Runtime {
165
166
. spawn ( move |_| {
166
167
abort_on_panic ( || {
167
168
loop {
169
+ println ! ( "checking park loop {}" , i) ;
168
170
while parked2. load ( Ordering :: Acquire ) {
169
171
parker. park ( ) ;
170
172
// TODO: shutdown if idle for too long
171
173
}
172
- // println!("thread unparked {}", i);
174
+ println ! ( "thread unparked {}" , i) ;
173
175
// when this thread is unparked, retrieve machine
174
176
let m: Arc < Machine > =
175
177
machine_recv. recv ( ) . expect ( "failed to receive machine" ) ;
178
+
176
179
// store it in the thread local
177
- let _ = MACHINE . with ( |machine| machine. set ( m. clone ( ) ) ) ;
180
+ MACHINE . with ( |machine| {
181
+ * machine. borrow_mut ( ) = Some ( m. clone ( ) ) ;
182
+ } ) ;
178
183
// run it
179
184
m. run ( self ) ;
185
+
180
186
// when run ends, go into parked mode again
181
- parked2. store ( false , Ordering :: Relaxed ) ;
182
- // println!("thread parked {}", i);
187
+ {
188
+ MACHINE . with ( |machine| {
189
+ * machine. borrow_mut ( ) = None ;
190
+ } ) ;
191
+ parked2. store ( true , Ordering :: Relaxed ) ;
192
+ println ! ( "thread parked {}" , i) ;
193
+ }
183
194
}
184
195
} )
185
196
} )
@@ -223,13 +234,14 @@ impl Runtime {
223
234
// If no machine has been polling the reactor in a while, that means the runtime is
224
235
// overloaded with work and we need to start another machine.
225
236
if !sched. polling {
226
- if ! sched. progress {
227
- if let Some ( p ) = sched. processors . pop ( ) {
228
- let m = Arc :: new ( Machine :: new ( p ) ) ;
229
- to_start . push ( m . clone ( ) ) ;
230
- sched . machines . push ( m) ;
231
- }
237
+ dbg ! ( sched. progress, sched . polling ) ;
238
+ // if ! sched.progress {
239
+ if let Some ( p ) = sched . processors . pop ( ) {
240
+ let m = Arc :: new ( Machine :: new ( p ) ) ;
241
+ to_start . push ( m. clone ( ) ) ;
242
+ sched . machines . push ( m ) ;
232
243
}
244
+ // }
233
245
sched. progress = false ;
234
246
}
235
247
@@ -429,11 +441,11 @@ impl Machine {
429
441
runs = 0 ;
430
442
fails = 0 ;
431
443
}
432
- // println!("thread break");
444
+ println ! ( "thread break" ) ;
433
445
434
446
// When shutting down the thread, take the processor out if still available.
435
447
let opt_p = self . processor . lock ( ) . take ( ) ;
436
- // println!("processor {:?}", opt_p.is_some());
448
+ println ! ( "processor {:?}" , opt_p. is_some( ) ) ;
437
449
438
450
// Return the processor to the scheduler and remove the machine.
439
451
if let Some ( p) = opt_p {
@@ -442,7 +454,7 @@ impl Machine {
442
454
sched. processors . push ( p) ;
443
455
sched. machines . retain ( |elem| !ptr:: eq ( & * * elem, self ) ) ;
444
456
}
445
- // println!("thread run stopped");
457
+ println ! ( "thread run stopped" ) ;
446
458
}
447
459
}
448
460
0 commit comments