cmd_lib/
child.rs

1use crate::{info, warn};
2use crate::{process, CmdResult, FunResult};
3use os_pipe::PipeReader;
4use std::io::{BufRead, BufReader, Error, ErrorKind, Read, Result};
5use std::process::{Child, ExitStatus};
6use std::thread::JoinHandle;
7
8/// Representation of running or exited children processes, connected with pipes
9/// optionally.
10///
11/// Calling [`spawn!`](../cmd_lib/macro.spawn.html) macro will return `Result<CmdChildren>`
12pub struct CmdChildren {
13    children: Vec<CmdChild>,
14    ignore_error: bool,
15}
16
17impl CmdChildren {
18    pub(crate) fn new(children: Vec<CmdChild>, ignore_error: bool) -> Self {
19        Self {
20            children,
21            ignore_error,
22        }
23    }
24
25    pub(crate) fn into_fun_children(self) -> FunChildren {
26        FunChildren {
27            children: self.children,
28            ignore_error: self.ignore_error,
29        }
30    }
31
32    /// Waits for the children processes to exit completely, returning the status that they exited with.
33    pub fn wait(&mut self) -> CmdResult {
34        // wait for the last child result
35        let handle = self.children.pop().unwrap();
36        if let Err(e) = handle.wait(true) {
37            let _ = Self::wait_children(&mut self.children);
38            return Err(e);
39        }
40        Self::wait_children(&mut self.children)
41    }
42
43    fn wait_children(children: &mut Vec<CmdChild>) -> CmdResult {
44        let mut ret = Ok(());
45        while let Some(child_handle) = children.pop() {
46            if let Err(e) = child_handle.wait(false) {
47                ret = Err(e);
48            }
49        }
50        ret
51    }
52
53    /// Forces the children processes to exit.
54    pub fn kill(&mut self) -> CmdResult {
55        let mut ret = Ok(());
56        while let Some(child_handle) = self.children.pop() {
57            if let Err(e) = child_handle.kill() {
58                ret = Err(e);
59            }
60        }
61        ret
62    }
63
64    /// Returns the OS-assigned process identifiers associated with these children processes
65    pub fn pids(&self) -> Vec<u32> {
66        self.children.iter().filter_map(|x| x.pid()).collect()
67    }
68}
69
70/// Representation of running or exited children processes with output, connected with pipes
71/// optionally.
72///
73/// Calling [spawn_with_output!](../cmd_lib/macro.spawn_with_output.html) macro will return `Result<FunChildren>`
74pub struct FunChildren {
75    children: Vec<CmdChild>,
76    ignore_error: bool,
77}
78
79impl FunChildren {
80    /// Waits for the children processes to exit completely, returning the command result, stdout
81    /// content string and stderr content string.
82    pub fn wait_with_all(&mut self) -> (CmdResult, String, String) {
83        self.inner_wait_with_all(true)
84    }
85
86    /// Waits for the children processes to exit completely, returning the stdout output.
87    pub fn wait_with_output(&mut self) -> FunResult {
88        let (res, stdout, _) = self.inner_wait_with_all(false);
89        if let Err(e) = res {
90            if !self.ignore_error {
91                return Err(e);
92            }
93        }
94        Ok(stdout)
95    }
96
97    /// Waits for the children processes to exit completely, and read all bytes from stdout into `buf`.
98    pub fn wait_with_raw_output(&mut self, buf: &mut Vec<u8>) -> CmdResult {
99        // wait for the last child result
100        let handle = self.children.pop().unwrap();
101        let wait_last = handle.wait_with_raw_output(self.ignore_error, buf);
102        match wait_last {
103            Err(e) => {
104                let _ = CmdChildren::wait_children(&mut self.children);
105                Err(e)
106            }
107            Ok(_) => {
108                let ret = CmdChildren::wait_children(&mut self.children);
109                if self.ignore_error {
110                    Ok(())
111                } else {
112                    ret
113                }
114            }
115        }
116    }
117
118    /// Waits for the children processes to exit completely, pipe content will be processed by
119    /// provided function.
120    pub fn wait_with_pipe(&mut self, f: &mut dyn FnMut(Box<dyn Read>)) -> CmdResult {
121        let child = self.children.pop().unwrap();
122        let stderr_thread =
123            StderrThread::new(&child.cmd, &child.file, child.line, child.stderr, false);
124        match child.handle {
125            CmdChildHandle::Proc(mut proc) => {
126                if let Some(stdout) = child.stdout {
127                    f(Box::new(stdout));
128                    let _ = proc.kill();
129                }
130            }
131            CmdChildHandle::Thread(_) => {
132                if let Some(stdout) = child.stdout {
133                    f(Box::new(stdout));
134                }
135            }
136            CmdChildHandle::SyncFn => {
137                if let Some(stdout) = child.stdout {
138                    f(Box::new(stdout));
139                }
140            }
141        };
142        drop(stderr_thread);
143        CmdChildren::wait_children(&mut self.children)
144    }
145
146    /// Returns the OS-assigned process identifiers associated with these children processes.
147    pub fn pids(&self) -> Vec<u32> {
148        self.children.iter().filter_map(|x| x.pid()).collect()
149    }
150
151    fn inner_wait_with_all(&mut self, capture_stderr: bool) -> (CmdResult, String, String) {
152        // wait for the last child result
153        let last_handle = self.children.pop().unwrap();
154        let mut stdout_buf = Vec::new();
155        let mut stderr = String::new();
156        let last_res = last_handle.wait_with_all(capture_stderr, &mut stdout_buf, &mut stderr);
157        let res = CmdChildren::wait_children(&mut self.children);
158        let mut stdout: String = String::from_utf8_lossy(&stdout_buf).into();
159        if stdout.ends_with('\n') {
160            stdout.pop();
161        }
162        if res.is_err() && !self.ignore_error && process::pipefail_enabled() {
163            (res, stdout, stderr)
164        } else {
165            (last_res, stdout, stderr)
166        }
167    }
168}
169
170pub(crate) struct CmdChild {
171    handle: CmdChildHandle,
172    cmd: String,
173    file: String,
174    line: u32,
175    stdout: Option<PipeReader>,
176    stderr: Option<PipeReader>,
177}
178
179impl CmdChild {
180    pub(crate) fn new(
181        handle: CmdChildHandle,
182        cmd: String,
183        file: String,
184        line: u32,
185        stdout: Option<PipeReader>,
186        stderr: Option<PipeReader>,
187    ) -> Self {
188        Self {
189            file,
190            line,
191            handle,
192            cmd,
193            stdout,
194            stderr,
195        }
196    }
197
198    fn wait(mut self, is_last: bool) -> CmdResult {
199        let _stderr_thread =
200            StderrThread::new(&self.cmd, &self.file, self.line, self.stderr.take(), false);
201        let res = self.handle.wait(&self.cmd, &self.file, self.line);
202        if let Err(e) = res {
203            if is_last || process::pipefail_enabled() {
204                return Err(e);
205            }
206        }
207        Ok(())
208    }
209
210    fn wait_with_raw_output(self, ignore_error: bool, stdout_buf: &mut Vec<u8>) -> CmdResult {
211        let mut _stderr = String::new();
212        let res = self.wait_with_all(false, stdout_buf, &mut _stderr);
213        if ignore_error {
214            return Ok(());
215        }
216        res
217    }
218
219    fn wait_with_all(
220        mut self,
221        capture_stderr: bool,
222        stdout_buf: &mut Vec<u8>,
223        stderr_buf: &mut String,
224    ) -> CmdResult {
225        let mut stderr_thread = StderrThread::new(
226            &self.cmd,
227            &self.file,
228            self.line,
229            self.stderr.take(),
230            capture_stderr,
231        );
232        let mut stdout_res = Ok(());
233        if let Some(mut stdout) = self.stdout.take() {
234            if let Err(e) = stdout.read_to_end(stdout_buf) {
235                stdout_res = Err(e)
236            }
237        }
238        *stderr_buf = stderr_thread.join();
239        let wait_res = self.handle.wait(&self.cmd, &self.file, self.line);
240        wait_res.and(stdout_res)
241    }
242
243    fn kill(self) -> CmdResult {
244        self.handle.kill(&self.cmd, &self.file, self.line)
245    }
246
247    fn pid(&self) -> Option<u32> {
248        self.handle.pid()
249    }
250}
251
252pub(crate) enum CmdChildHandle {
253    Proc(Child),
254    Thread(JoinHandle<CmdResult>),
255    SyncFn,
256}
257
258impl CmdChildHandle {
259    fn wait(self, cmd: &str, file: &str, line: u32) -> CmdResult {
260        match self {
261            CmdChildHandle::Proc(mut proc) => {
262                let status = proc.wait();
263                match status {
264                    Err(e) => return Err(process::new_cmd_io_error(&e, cmd, file, line)),
265                    Ok(status) => {
266                        if !status.success() {
267                            return Err(Self::status_to_io_error(status, cmd, file, line));
268                        }
269                    }
270                }
271            }
272            CmdChildHandle::Thread(thread) => {
273                let status = thread.join();
274                match status {
275                    Ok(result) => {
276                        if let Err(e) = result {
277                            return Err(process::new_cmd_io_error(&e, cmd, file, line));
278                        }
279                    }
280                    Err(e) => {
281                        return Err(Error::new(
282                            ErrorKind::Other,
283                            format!(
284                                "Running [{cmd}] thread joined with error: {e:?} at {file}:{line}"
285                            ),
286                        ))
287                    }
288                }
289            }
290            CmdChildHandle::SyncFn => {}
291        }
292        Ok(())
293    }
294
295    fn status_to_io_error(status: ExitStatus, cmd: &str, file: &str, line: u32) -> Error {
296        if let Some(code) = status.code() {
297            Error::new(
298                ErrorKind::Other,
299                format!("Running [{cmd}] exited with error; status code: {code} at {file}:{line}"),
300            )
301        } else {
302            Error::new(
303                ErrorKind::Other,
304                format!(
305                    "Running [{cmd}] exited with error; terminated by {status} at {file}:{line}"
306                ),
307            )
308        }
309    }
310
311    fn kill(self, cmd: &str, file: &str, line: u32) -> CmdResult {
312        match self {
313            CmdChildHandle::Proc(mut proc) => proc.kill().map_err(|e| {
314                Error::new(
315                    e.kind(),
316                    format!("Killing process [{cmd}] failed with error: {e} at {file}:{line}"),
317                )
318            }),
319            CmdChildHandle::Thread(_thread) => Err(Error::new(
320                ErrorKind::Other,
321                format!("Killing thread [{cmd}] failed: not supported at {file}:{line}"),
322            )),
323            CmdChildHandle::SyncFn => Ok(()),
324        }
325    }
326
327    fn pid(&self) -> Option<u32> {
328        match self {
329            CmdChildHandle::Proc(proc) => Some(proc.id()),
330            _ => None,
331        }
332    }
333}
334
335struct StderrThread {
336    thread: Option<JoinHandle<String>>,
337    cmd: String,
338    file: String,
339    line: u32,
340}
341
342impl StderrThread {
343    fn new(cmd: &str, file: &str, line: u32, stderr: Option<PipeReader>, capture: bool) -> Self {
344        if let Some(stderr) = stderr {
345            let thread = std::thread::spawn(move || {
346                let mut output = String::new();
347                BufReader::new(stderr)
348                    .lines()
349                    .map_while(Result::ok)
350                    .for_each(|line| {
351                        if !capture {
352                            info!("{line}");
353                        } else {
354                            if !output.is_empty() {
355                                output.push('\n');
356                            }
357                            output.push_str(&line);
358                        }
359                    });
360                output
361            });
362            Self {
363                cmd: cmd.into(),
364                file: file.into(),
365                line,
366                thread: Some(thread),
367            }
368        } else {
369            Self {
370                cmd: cmd.into(),
371                file: file.into(),
372                line,
373                thread: None,
374            }
375        }
376    }
377
378    fn join(&mut self) -> String {
379        if let Some(thread) = self.thread.take() {
380            match thread.join() {
381                Err(e) => {
382                    warn!(
383                        "Running [{}] stderr thread joined with error: {:?} at {}:{}",
384                        self.cmd, e, self.file, self.line
385                    );
386                }
387                Ok(output) => return output,
388            }
389        }
390        "".into()
391    }
392}
393
394impl Drop for StderrThread {
395    fn drop(&mut self) {
396        self.join();
397    }
398}