@@ -8,7 +8,11 @@ use std::thread;
8
8
use std:: time:: Duration ;
9
9
10
10
use crossbeam_deque:: { Injector , Steal , Stealer , Worker } ;
11
- use crossbeam_utils:: thread:: scope;
11
+ use crossbeam_utils:: {
12
+ sync:: { Parker , Unparker } ,
13
+ thread:: scope,
14
+ } ;
15
+ use once_cell:: sync:: Lazy ;
12
16
use once_cell:: unsync:: OnceCell ;
13
17
14
18
use crate :: rt:: Reactor ;
@@ -24,17 +28,30 @@ thread_local! {
24
28
static YIELD_NOW : Cell <bool > = Cell :: new( false ) ;
25
29
}
26
30
31
+ /// Maximum number of OS threads = processors = machines
32
+ static MAXPROCS : Lazy < usize > = Lazy :: new ( || num_cpus:: get ( ) . max ( 1 ) ) ;
33
+
27
34
struct Scheduler {
28
35
/// Set to `true` while a machine is polling the reactor.
29
36
polling : bool ,
30
37
38
+ /// Available threads.
39
+ threads : Vec < ThreadState > ,
40
+
31
41
/// Idle processors.
32
42
processors : Vec < Processor > ,
33
43
34
44
/// Running machines.
35
45
machines : Vec < Arc < Machine > > ,
36
46
}
37
47
48
+ struct ThreadState {
49
+ unparker : Unparker ,
50
+ parked : Arc < atomic:: AtomicBool > ,
51
+ /// Used to transfer the machine into the thread.
52
+ machine_sender : crossbeam_channel:: Sender < Arc < Machine > > ,
53
+ }
54
+
38
55
/// An async runtime.
39
56
pub struct Runtime {
40
57
/// The reactor.
@@ -53,17 +70,18 @@ pub struct Runtime {
53
70
impl Runtime {
54
71
/// Creates a new runtime.
55
72
pub fn new ( ) -> Runtime {
56
- let cpus = num_cpus:: get ( ) . max ( 1 ) ;
57
- let processors: Vec < _ > = ( 0 ..cpus) . map ( |_| Processor :: new ( ) ) . collect ( ) ;
73
+ let processors: Vec < _ > = ( 0 ..* MAXPROCS ) . map ( |_| Processor :: new ( ) ) . collect ( ) ;
58
74
let stealers = processors. iter ( ) . map ( |p| p. worker . stealer ( ) ) . collect ( ) ;
75
+ let threads = Vec :: with_capacity ( * MAXPROCS ) ;
59
76
60
77
Runtime {
61
78
reactor : Reactor :: new ( ) . unwrap ( ) ,
62
79
injector : Injector :: new ( ) ,
63
80
stealers,
64
81
sched : Mutex :: new ( Scheduler {
65
82
processors,
66
- machines : Vec :: new ( ) ,
83
+ machines : Vec :: with_capacity ( * MAXPROCS ) ,
84
+ threads,
67
85
polling : false ,
68
86
} ) ,
69
87
}
@@ -102,18 +120,67 @@ impl Runtime {
102
120
103
121
loop {
104
122
// Get a list of new machines to start, if any need to be started.
105
- for m in self . make_machines ( ) {
123
+ let machines = self . make_machines ( ) ;
124
+ for m in machines {
106
125
idle = 0 ;
107
126
108
- s. builder ( )
109
- . name ( "async-std/machine" . to_string ( ) )
110
- . spawn ( move |_| {
111
- abort_on_panic ( || {
112
- let _ = MACHINE . with ( |machine| machine. set ( m. clone ( ) ) ) ;
113
- m. run ( self ) ;
127
+ // println!("getting idle thread");
128
+ let mut sched = self . sched . lock ( ) . unwrap ( ) ;
129
+ ' inner: for thread in & sched. threads {
130
+ // grab the first parked thread
131
+ if thread
132
+ . parked
133
+ . compare_and_swap ( true , false , Ordering :: Acquire )
134
+ {
135
+ // transfer the machine
136
+ thread
137
+ . machine_sender
138
+ . send ( m)
139
+ . expect ( "failed to send machine to thread" ) ;
140
+ // unpark the thread
141
+ thread. unparker . unpark ( ) ;
142
+ break ' inner;
143
+ }
144
+ }
145
+
146
+ // no idle thread available, check if we can spawn one
147
+ if sched. threads . len ( ) < * MAXPROCS {
148
+ // we can spawn one, lets do it
149
+ let parked = Arc :: new ( atomic:: AtomicBool :: new ( true ) ) ;
150
+ let parked2 = parked. clone ( ) ;
151
+ let ( machine_sender, machine_recv) = crossbeam_channel:: bounded ( 1 ) ;
152
+ let parker = Parker :: new ( ) ;
153
+ let unparker = parker. unparker ( ) . clone ( ) ;
154
+
155
+ s. builder ( )
156
+ . name ( "async-std/machine" . to_string ( ) )
157
+ . spawn ( move |_| {
158
+ abort_on_panic ( || {
159
+ loop {
160
+ while parked2. load ( Ordering :: Acquire ) {
161
+ parker. park ( ) ;
162
+ }
163
+ // when this thread is unparked, retrieve machine
164
+ let m: Arc < Machine > =
165
+ machine_recv. recv ( ) . expect ( "failed to receive machine" ) ;
166
+ // store it in the thread local
167
+ let _ = MACHINE . with ( |machine| machine. set ( m. clone ( ) ) ) ;
168
+ // run it
169
+ m. run ( self ) ;
170
+ // when run ends, go into parked mode again
171
+ parked2. store ( false , Ordering :: Relaxed ) ;
172
+ }
173
+ } )
114
174
} )
115
- } )
116
- . expect ( "cannot start a machine thread" ) ;
175
+ . expect ( "cannot start a machine thread" ) ;
176
+
177
+ sched. threads . push ( ThreadState {
178
+ unparker,
179
+ parked,
180
+ machine_sender,
181
+ } ) ;
182
+ }
183
+ drop ( sched) ;
117
184
}
118
185
119
186
// Sleep for a bit longer if the scheduler state hasn't changed in a while.
@@ -142,6 +209,7 @@ impl Runtime {
142
209
let m = Arc :: new ( Machine :: new ( p) ) ;
143
210
to_start. push ( m. clone ( ) ) ;
144
211
sched. machines . push ( m) ;
212
+ assert ! ( sched. machines. len( ) <= * MAXPROCS ) ;
145
213
}
146
214
}
147
215
@@ -326,13 +394,15 @@ impl Machine {
326
394
} ;
327
395
328
396
// Unlock the schedule poll the reactor until new I/O events arrive.
397
+ // println!("polling start");
329
398
sched. polling = true ;
330
399
drop ( sched) ;
331
400
rt. reactor . poll ( None ) . unwrap ( ) ;
332
401
333
402
// Lock the scheduler again and re-register the machine.
334
403
sched = rt. sched . lock ( ) . unwrap ( ) ;
335
404
sched. polling = false ;
405
+ //println!("polling stop");
336
406
sched. machines . push ( m) ;
337
407
338
408
runs = 0 ;
@@ -344,9 +414,14 @@ impl Machine {
344
414
345
415
// Return the processor to the scheduler and remove the machine.
346
416
if let Some ( p) = opt_p {
417
+ // println!("returning processor to pool");
347
418
let mut sched = rt. sched . lock ( ) . unwrap ( ) ;
348
419
sched. processors . push ( p) ;
420
+ assert ! ( sched. processors. len( ) <= * MAXPROCS ) ;
421
+ // println!("machines {}", sched.machines.len());
349
422
sched. machines . retain ( |elem| !ptr:: eq ( & * * elem, self ) ) ;
423
+ // println!("machines retained {}", sched.machines.len());
424
+ assert ! ( sched. machines. len( ) <= * MAXPROCS ) ;
350
425
}
351
426
}
352
427
}
0 commit comments