Skip to content

Commit 546ad3d

Browse files
fix start state
1 parent bc8677e commit 546ad3d

File tree

1 file changed

+24
-12
lines changed

1 file changed

+24
-12
lines changed

src/rt/runtime.rs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -122,11 +122,12 @@ impl Runtime {
122122
// Get a list of new machines to start, if any need to be started.
123123
let machines = self.make_machines();
124124
for m in machines {
125+
// println!("{} -- looking for thread", k);
125126
idle = 0;
126127

127128
// println!("getting idle thread");
128-
let mut sched = self.sched.lock().unwrap();
129-
'inner: for thread in &sched.threads {
129+
let sched = self.sched.lock().unwrap();
130+
'inner: for (i, thread) in sched.threads.iter().enumerate() {
130131
// grab the first parked thread
131132
if thread
132133
.parked
@@ -135,18 +136,22 @@ impl Runtime {
135136
// transfer the machine
136137
thread
137138
.machine_sender
138-
.send(m)
139+
.send(m.clone())
139140
.expect("failed to send machine to thread");
140141
// unpark the thread
141142
thread.unparker.unpark();
143+
// println!("{} found thread to unpark {}", k, i);
142144
break 'inner;
143145
}
144146
}
145-
147+
let len = sched.threads.len();
148+
drop(sched);
146149
// no idle thread available, check if we can spawn one
147-
if sched.threads.len() < *MAXPROCS {
150+
if len < *MAXPROCS {
151+
let i = len;
152+
// println!("{} spawning thread {}", k, i);
148153
// we can spawn one, lets do it
149-
let parked = Arc::new(atomic::AtomicBool::new(true));
154+
let parked = Arc::new(atomic::AtomicBool::new(false));
150155
let parked2 = parked.clone();
151156
let (machine_sender, machine_recv) = crossbeam_channel::bounded(1);
152157
let parker = Parker::new();
@@ -159,7 +164,9 @@ impl Runtime {
159164
loop {
160165
while parked2.load(Ordering::Acquire) {
161166
parker.park();
167+
// TODO: shutdown if idle for too long
162168
}
169+
// println!("{} thread unparked {}", k, i);
163170
// when this thread is unparked, retrieve machine
164171
let m: Arc<Machine> =
165172
machine_recv.recv().expect("failed to receive machine");
@@ -169,18 +176,28 @@ impl Runtime {
169176
m.run(self);
170177
// when run ends, go into parked mode again
171178
parked2.store(false, Ordering::Relaxed);
179+
// println!("thread parked {}", i);
172180
}
173181
})
174182
})
175183
.expect("cannot start a machine thread");
176184

185+
let mut sched = self.sched.lock().unwrap();
186+
187+
// transfer the machine
188+
machine_sender
189+
.send(m)
190+
.expect("failed to send machine to thread");
191+
192+
// println!("started thread {}", i);
193+
177194
sched.threads.push(ThreadState {
178195
unparker,
179196
parked,
180197
machine_sender,
181198
});
199+
drop(sched);
182200
}
183-
drop(sched);
184201
}
185202

186203
// Sleep for a bit longer if the scheduler state hasn't changed in a while.
@@ -209,7 +226,6 @@ impl Runtime {
209226
let m = Arc::new(Machine::new(p));
210227
to_start.push(m.clone());
211228
sched.machines.push(m);
212-
assert!(sched.machines.len() <= *MAXPROCS);
213229
}
214230
}
215231

@@ -417,11 +433,7 @@ impl Machine {
417433
// println!("returning processor to pool");
418434
let mut sched = rt.sched.lock().unwrap();
419435
sched.processors.push(p);
420-
assert!(sched.processors.len() <= *MAXPROCS);
421-
// println!("machines {}", sched.machines.len());
422436
sched.machines.retain(|elem| !ptr::eq(&**elem, self));
423-
// println!("machines retained {}", sched.machines.len());
424-
assert!(sched.machines.len() <= *MAXPROCS);
425437
}
426438
}
427439
}

0 commit comments

Comments
 (0)