@@ -122,11 +122,12 @@ impl Runtime {
122
122
// Get a list of new machines to start, if any need to be started.
123
123
let machines = self . make_machines ( ) ;
124
124
for m in machines {
125
+ // println!("{} -- looking for thread", k);
125
126
idle = 0 ;
126
127
127
128
// println!("getting idle thread");
128
- let mut sched = self . sched . lock ( ) . unwrap ( ) ;
129
- ' inner: for thread in & sched. threads {
129
+ let sched = self . sched . lock ( ) . unwrap ( ) ;
130
+ ' inner: for ( i , thread) in sched. threads . iter ( ) . enumerate ( ) {
130
131
// grab the first parked thread
131
132
if thread
132
133
. parked
@@ -135,18 +136,22 @@ impl Runtime {
135
136
// transfer the machine
136
137
thread
137
138
. machine_sender
138
- . send ( m)
139
+ . send ( m. clone ( ) )
139
140
. expect ( "failed to send machine to thread" ) ;
140
141
// unpark the thread
141
142
thread. unparker . unpark ( ) ;
143
+ // println!("{} found thread to unpark {}", k, i);
142
144
break ' inner;
143
145
}
144
146
}
145
-
147
+ let len = sched. threads . len ( ) ;
148
+ drop ( sched) ;
146
149
// no idle thread available, check if we can spawn one
147
- if sched. threads . len ( ) < * MAXPROCS {
150
+ if len < * MAXPROCS {
151
+ let i = len;
152
+ // println!("{} spawning thread {}", k, i);
148
153
// we can spawn one, lets do it
149
- let parked = Arc :: new ( atomic:: AtomicBool :: new ( true ) ) ;
154
+ let parked = Arc :: new ( atomic:: AtomicBool :: new ( false ) ) ;
150
155
let parked2 = parked. clone ( ) ;
151
156
let ( machine_sender, machine_recv) = crossbeam_channel:: bounded ( 1 ) ;
152
157
let parker = Parker :: new ( ) ;
@@ -159,7 +164,9 @@ impl Runtime {
159
164
loop {
160
165
while parked2. load ( Ordering :: Acquire ) {
161
166
parker. park ( ) ;
167
+ // TODO: shutdown if idle for too long
162
168
}
169
+ // println!("{} thread unparked {}", k, i);
163
170
// when this thread is unparked, retrieve machine
164
171
let m: Arc < Machine > =
165
172
machine_recv. recv ( ) . expect ( "failed to receive machine" ) ;
@@ -169,18 +176,28 @@ impl Runtime {
169
176
m. run ( self ) ;
170
177
// when run ends, go into parked mode again
171
178
parked2. store ( false , Ordering :: Relaxed ) ;
179
+ // println!("thread parked {}", i);
172
180
}
173
181
} )
174
182
} )
175
183
. expect ( "cannot start a machine thread" ) ;
176
184
185
+ let mut sched = self . sched . lock ( ) . unwrap ( ) ;
186
+
187
+ // transfer the machine
188
+ machine_sender
189
+ . send ( m)
190
+ . expect ( "failed to send machine to thread" ) ;
191
+
192
+ // println!("started thread {}", i);
193
+
177
194
sched. threads . push ( ThreadState {
178
195
unparker,
179
196
parked,
180
197
machine_sender,
181
198
} ) ;
199
+ drop ( sched) ;
182
200
}
183
- drop ( sched) ;
184
201
}
185
202
186
203
// Sleep for a bit longer if the scheduler state hasn't changed in a while.
@@ -209,7 +226,6 @@ impl Runtime {
209
226
let m = Arc :: new ( Machine :: new ( p) ) ;
210
227
to_start. push ( m. clone ( ) ) ;
211
228
sched. machines . push ( m) ;
212
- assert ! ( sched. machines. len( ) <= * MAXPROCS ) ;
213
229
}
214
230
}
215
231
@@ -417,11 +433,7 @@ impl Machine {
417
433
// println!("returning processor to pool");
418
434
let mut sched = rt. sched . lock ( ) . unwrap ( ) ;
419
435
sched. processors . push ( p) ;
420
- assert ! ( sched. processors. len( ) <= * MAXPROCS ) ;
421
- // println!("machines {}", sched.machines.len());
422
436
sched. machines . retain ( |elem| !ptr:: eq ( & * * elem, self ) ) ;
423
- // println!("machines retained {}", sched.machines.len());
424
- assert ! ( sched. machines. len( ) <= * MAXPROCS ) ;
425
437
}
426
438
}
427
439
}
0 commit comments