@@ -50,6 +50,16 @@ struct Scheduler {
50
50
machines : Vec < Arc < Machine > > ,
51
51
}
52
52
53
+ impl Scheduler {
54
+ /// Get the next machine that has no work yet, if there is any.
55
+ fn next_idle_machine ( & self ) -> Option < Arc < Machine > > {
56
+ self . machines
57
+ . iter ( )
58
+ . find ( |m| !m. has_work ( ) )
59
+ . map ( |m| m. clone ( ) )
60
+ }
61
+ }
62
+
53
63
struct ThreadState {
54
64
unparker : Unparker ,
55
65
parked : Arc < atomic:: AtomicBool > ,
@@ -258,8 +268,23 @@ impl Runtime {
258
268
// run it
259
269
m. run ( self ) ;
260
270
261
- // when run ends, go into parked mode again
271
+ // when run ends
262
272
{
273
+ // see if there are any available processors
274
+ let mut sched = self . sched . lock ( ) . unwrap ( ) ;
275
+ if let Some ( p) = sched. processors . pop ( ) {
276
+ // get a machine
277
+ if let Some ( m) = sched. next_idle_machine ( ) {
278
+ * m. processor . lock ( ) = Some ( p) ;
279
+ MACHINE . with ( |machine| {
280
+ machine. borrow_mut ( ) . replace ( m) ;
281
+ } ) ;
282
+ continue ;
283
+ }
284
+ }
285
+ drop ( sched) ;
286
+
287
+ // go into parked mode, no work
263
288
MACHINE . with ( |machine| {
264
289
* machine. borrow_mut ( ) = None ;
265
290
} ) ;
@@ -314,9 +339,8 @@ impl Runtime {
314
339
#[ cfg( feature = "tracing" ) ]
315
340
self . poll_count . fetch_add ( 1 , Ordering :: Relaxed ) ;
316
341
// if !sched.progress {
317
-
318
342
if let Some ( p) = sched. processors . pop ( ) {
319
- if let Some ( m) = sched. machines . iter ( ) . find ( |m| m . processor . lock ( ) . is_none ( ) ) {
343
+ if let Some ( m) = sched. next_idle_machine ( ) {
320
344
// find idle m
321
345
* m. processor . lock ( ) = Some ( p) ;
322
346
to_start. push ( m. clone ( ) ) ;
@@ -370,6 +394,15 @@ impl Machine {
370
394
}
371
395
}
372
396
397
+ fn has_work ( & self ) -> bool {
398
+ if let Some ( p) = & * self . processor . lock ( ) {
399
+ // TODO: is this the right check?
400
+ p. has_work ( )
401
+ } else {
402
+ false
403
+ }
404
+ }
405
+
373
406
/// Schedules a task onto the machine.
374
407
fn schedule ( & self , rt : & Runtime , task : Runnable ) {
375
408
match self . processor . lock ( ) . as_mut ( ) {
@@ -498,12 +531,7 @@ impl Machine {
498
531
// If another thread is already blocked on the reactor, there is no point in keeping
499
532
// the current thread around since there is too little work to do.
500
533
if sched. polling {
501
- if sched. machines . len ( ) > MIN_MACHINES {
502
- break ;
503
- } else {
504
- // thread::sleep(Duration::from_micros(10));
505
- continue ;
506
- }
534
+ break ;
507
535
}
508
536
509
537
// Take out the machine associated with the current thread.
@@ -566,6 +594,11 @@ impl Processor {
566
594
}
567
595
}
568
596
597
+ /// Is there any available work for this processor?
598
+ fn has_work ( & self ) -> bool {
599
+ self . slot . is_some ( ) || !self . worker . is_empty ( )
600
+ }
601
+
569
602
/// Schedules a task to run on this processor.
570
603
fn schedule ( & mut self , rt : & Runtime , task : Runnable ) {
571
604
match self . slot . replace ( task) {
0 commit comments