1use crate::{info, warn};
2use crate::{process, CmdResult, FunResult};
3use os_pipe::PipeReader;
4use std::io::{BufRead, BufReader, Error, ErrorKind, Read, Result};
5use std::process::{Child, ExitStatus};
6use std::thread::JoinHandle;
7
8pub struct CmdChildren {
13 children: Vec<CmdChild>,
14 ignore_error: bool,
15}
16
17impl CmdChildren {
18 pub(crate) fn new(children: Vec<CmdChild>, ignore_error: bool) -> Self {
19 Self {
20 children,
21 ignore_error,
22 }
23 }
24
25 pub(crate) fn into_fun_children(self) -> FunChildren {
26 FunChildren {
27 children: self.children,
28 ignore_error: self.ignore_error,
29 }
30 }
31
32 pub fn wait(&mut self) -> CmdResult {
34 let handle = self.children.pop().unwrap();
36 if let Err(e) = handle.wait(true) {
37 let _ = Self::wait_children(&mut self.children);
38 return Err(e);
39 }
40 Self::wait_children(&mut self.children)
41 }
42
43 fn wait_children(children: &mut Vec<CmdChild>) -> CmdResult {
44 let mut ret = Ok(());
45 while let Some(child_handle) = children.pop() {
46 if let Err(e) = child_handle.wait(false) {
47 ret = Err(e);
48 }
49 }
50 ret
51 }
52
53 pub fn kill(&mut self) -> CmdResult {
55 let mut ret = Ok(());
56 while let Some(child_handle) = self.children.pop() {
57 if let Err(e) = child_handle.kill() {
58 ret = Err(e);
59 }
60 }
61 ret
62 }
63
64 pub fn pids(&self) -> Vec<u32> {
66 self.children.iter().filter_map(|x| x.pid()).collect()
67 }
68}
69
70pub struct FunChildren {
75 children: Vec<CmdChild>,
76 ignore_error: bool,
77}
78
79impl FunChildren {
80 pub fn wait_with_all(&mut self) -> (CmdResult, String, String) {
83 self.inner_wait_with_all(true)
84 }
85
86 pub fn wait_with_output(&mut self) -> FunResult {
88 let (res, stdout, _) = self.inner_wait_with_all(false);
89 if let Err(e) = res {
90 if !self.ignore_error {
91 return Err(e);
92 }
93 }
94 Ok(stdout)
95 }
96
97 pub fn wait_with_raw_output(&mut self, buf: &mut Vec<u8>) -> CmdResult {
99 let handle = self.children.pop().unwrap();
101 let wait_last = handle.wait_with_raw_output(self.ignore_error, buf);
102 match wait_last {
103 Err(e) => {
104 let _ = CmdChildren::wait_children(&mut self.children);
105 Err(e)
106 }
107 Ok(_) => {
108 let ret = CmdChildren::wait_children(&mut self.children);
109 if self.ignore_error {
110 Ok(())
111 } else {
112 ret
113 }
114 }
115 }
116 }
117
118 pub fn wait_with_pipe(&mut self, f: &mut dyn FnMut(Box<dyn Read>)) -> CmdResult {
121 let child = self.children.pop().unwrap();
122 let stderr_thread =
123 StderrThread::new(&child.cmd, &child.file, child.line, child.stderr, false);
124 match child.handle {
125 CmdChildHandle::Proc(mut proc) => {
126 if let Some(stdout) = child.stdout {
127 f(Box::new(stdout));
128 let _ = proc.kill();
129 }
130 }
131 CmdChildHandle::Thread(_) => {
132 if let Some(stdout) = child.stdout {
133 f(Box::new(stdout));
134 }
135 }
136 CmdChildHandle::SyncFn => {
137 if let Some(stdout) = child.stdout {
138 f(Box::new(stdout));
139 }
140 }
141 };
142 drop(stderr_thread);
143 CmdChildren::wait_children(&mut self.children)
144 }
145
146 pub fn pids(&self) -> Vec<u32> {
148 self.children.iter().filter_map(|x| x.pid()).collect()
149 }
150
151 fn inner_wait_with_all(&mut self, capture_stderr: bool) -> (CmdResult, String, String) {
152 let last_handle = self.children.pop().unwrap();
154 let mut stdout_buf = Vec::new();
155 let mut stderr = String::new();
156 let last_res = last_handle.wait_with_all(capture_stderr, &mut stdout_buf, &mut stderr);
157 let res = CmdChildren::wait_children(&mut self.children);
158 let mut stdout: String = String::from_utf8_lossy(&stdout_buf).into();
159 if stdout.ends_with('\n') {
160 stdout.pop();
161 }
162 if res.is_err() && !self.ignore_error && process::pipefail_enabled() {
163 (res, stdout, stderr)
164 } else {
165 (last_res, stdout, stderr)
166 }
167 }
168}
169
170pub(crate) struct CmdChild {
171 handle: CmdChildHandle,
172 cmd: String,
173 file: String,
174 line: u32,
175 stdout: Option<PipeReader>,
176 stderr: Option<PipeReader>,
177}
178
179impl CmdChild {
180 pub(crate) fn new(
181 handle: CmdChildHandle,
182 cmd: String,
183 file: String,
184 line: u32,
185 stdout: Option<PipeReader>,
186 stderr: Option<PipeReader>,
187 ) -> Self {
188 Self {
189 file,
190 line,
191 handle,
192 cmd,
193 stdout,
194 stderr,
195 }
196 }
197
198 fn wait(mut self, is_last: bool) -> CmdResult {
199 let _stderr_thread =
200 StderrThread::new(&self.cmd, &self.file, self.line, self.stderr.take(), false);
201 let res = self.handle.wait(&self.cmd, &self.file, self.line);
202 if let Err(e) = res {
203 if is_last || process::pipefail_enabled() {
204 return Err(e);
205 }
206 }
207 Ok(())
208 }
209
210 fn wait_with_raw_output(self, ignore_error: bool, stdout_buf: &mut Vec<u8>) -> CmdResult {
211 let mut _stderr = String::new();
212 let res = self.wait_with_all(false, stdout_buf, &mut _stderr);
213 if ignore_error {
214 return Ok(());
215 }
216 res
217 }
218
219 fn wait_with_all(
220 mut self,
221 capture_stderr: bool,
222 stdout_buf: &mut Vec<u8>,
223 stderr_buf: &mut String,
224 ) -> CmdResult {
225 let mut stderr_thread = StderrThread::new(
226 &self.cmd,
227 &self.file,
228 self.line,
229 self.stderr.take(),
230 capture_stderr,
231 );
232 let mut stdout_res = Ok(());
233 if let Some(mut stdout) = self.stdout.take() {
234 if let Err(e) = stdout.read_to_end(stdout_buf) {
235 stdout_res = Err(e)
236 }
237 }
238 *stderr_buf = stderr_thread.join();
239 let wait_res = self.handle.wait(&self.cmd, &self.file, self.line);
240 wait_res.and(stdout_res)
241 }
242
243 fn kill(self) -> CmdResult {
244 self.handle.kill(&self.cmd, &self.file, self.line)
245 }
246
247 fn pid(&self) -> Option<u32> {
248 self.handle.pid()
249 }
250}
251
252pub(crate) enum CmdChildHandle {
253 Proc(Child),
254 Thread(JoinHandle<CmdResult>),
255 SyncFn,
256}
257
258impl CmdChildHandle {
259 fn wait(self, cmd: &str, file: &str, line: u32) -> CmdResult {
260 match self {
261 CmdChildHandle::Proc(mut proc) => {
262 let status = proc.wait();
263 match status {
264 Err(e) => return Err(process::new_cmd_io_error(&e, cmd, file, line)),
265 Ok(status) => {
266 if !status.success() {
267 return Err(Self::status_to_io_error(status, cmd, file, line));
268 }
269 }
270 }
271 }
272 CmdChildHandle::Thread(thread) => {
273 let status = thread.join();
274 match status {
275 Ok(result) => {
276 if let Err(e) = result {
277 return Err(process::new_cmd_io_error(&e, cmd, file, line));
278 }
279 }
280 Err(e) => {
281 return Err(Error::new(
282 ErrorKind::Other,
283 format!(
284 "Running [{cmd}] thread joined with error: {e:?} at {file}:{line}"
285 ),
286 ))
287 }
288 }
289 }
290 CmdChildHandle::SyncFn => {}
291 }
292 Ok(())
293 }
294
295 fn status_to_io_error(status: ExitStatus, cmd: &str, file: &str, line: u32) -> Error {
296 if let Some(code) = status.code() {
297 Error::new(
298 ErrorKind::Other,
299 format!("Running [{cmd}] exited with error; status code: {code} at {file}:{line}"),
300 )
301 } else {
302 Error::new(
303 ErrorKind::Other,
304 format!(
305 "Running [{cmd}] exited with error; terminated by {status} at {file}:{line}"
306 ),
307 )
308 }
309 }
310
311 fn kill(self, cmd: &str, file: &str, line: u32) -> CmdResult {
312 match self {
313 CmdChildHandle::Proc(mut proc) => proc.kill().map_err(|e| {
314 Error::new(
315 e.kind(),
316 format!("Killing process [{cmd}] failed with error: {e} at {file}:{line}"),
317 )
318 }),
319 CmdChildHandle::Thread(_thread) => Err(Error::new(
320 ErrorKind::Other,
321 format!("Killing thread [{cmd}] failed: not supported at {file}:{line}"),
322 )),
323 CmdChildHandle::SyncFn => Ok(()),
324 }
325 }
326
327 fn pid(&self) -> Option<u32> {
328 match self {
329 CmdChildHandle::Proc(proc) => Some(proc.id()),
330 _ => None,
331 }
332 }
333}
334
335struct StderrThread {
336 thread: Option<JoinHandle<String>>,
337 cmd: String,
338 file: String,
339 line: u32,
340}
341
342impl StderrThread {
343 fn new(cmd: &str, file: &str, line: u32, stderr: Option<PipeReader>, capture: bool) -> Self {
344 if let Some(stderr) = stderr {
345 let thread = std::thread::spawn(move || {
346 let mut output = String::new();
347 BufReader::new(stderr)
348 .lines()
349 .map_while(Result::ok)
350 .for_each(|line| {
351 if !capture {
352 info!("{line}");
353 } else {
354 if !output.is_empty() {
355 output.push('\n');
356 }
357 output.push_str(&line);
358 }
359 });
360 output
361 });
362 Self {
363 cmd: cmd.into(),
364 file: file.into(),
365 line,
366 thread: Some(thread),
367 }
368 } else {
369 Self {
370 cmd: cmd.into(),
371 file: file.into(),
372 line,
373 thread: None,
374 }
375 }
376 }
377
378 fn join(&mut self) -> String {
379 if let Some(thread) = self.thread.take() {
380 match thread.join() {
381 Err(e) => {
382 warn!(
383 "Running [{}] stderr thread joined with error: {:?} at {}:{}",
384 self.cmd, e, self.file, self.line
385 );
386 }
387 Ok(output) => return output,
388 }
389 }
390 "".into()
391 }
392}
393
394impl Drop for StderrThread {
395 fn drop(&mut self) {
396 self.join();
397 }
398}