Skip to content

Commit c99e1c6

Browse files
authored
Merge pull request #6025 from cre4ture/feature/dd_direct_progress
dd: handle SIGUSR1 directly. not just every 1sec
2 parents 393feaf + 174e9a0 commit c99e1c6

File tree

2 files changed

+195
-93
lines changed

2 files changed

+195
-93
lines changed

src/uu/dd/src/dd.rs

Lines changed: 113 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use nix::fcntl::FcntlArg::F_SETFL;
2121
#[cfg(any(target_os = "linux", target_os = "android"))]
2222
use nix::fcntl::OFlag;
2323
use parseargs::Parser;
24+
use progress::ProgUpdateType;
2425
use progress::{gen_prog_updater, ProgUpdate, ReadStat, StatusLevel, WriteStat};
2526
use uucore::io::OwnedFileDescriptorOrHandle;
2627

@@ -39,10 +40,8 @@ use std::os::unix::{
3940
#[cfg(windows)]
4041
use std::os::windows::{fs::MetadataExt, io::AsHandle};
4142
use std::path::Path;
42-
use std::sync::{
43-
atomic::{AtomicBool, Ordering::Relaxed},
44-
mpsc, Arc,
45-
};
43+
use std::sync::atomic::AtomicU8;
44+
use std::sync::{atomic::Ordering::Relaxed, mpsc, Arc};
4645
use std::thread;
4746
use std::time::{Duration, Instant};
4847

@@ -87,38 +86,65 @@ struct Settings {
8786

8887
/// A timer which triggers on a given interval
8988
///
90-
/// After being constructed with [`Alarm::with_interval`], [`Alarm::is_triggered`]
91-
/// will return true once per the given [`Duration`].
89+
/// After being constructed with [`Alarm::with_interval`], [`Alarm::get_trigger`]
90+
/// will return [`ALARM_TRIGGER_TIMER`] once per the given [`Duration`].
91+
/// Alarm can be manually triggered with closure returned by [`Alarm::manual_trigger_fn`].
92+
/// [`Alarm::get_trigger`] will return [`ALARM_TRIGGER_SIGNAL`] in this case.
9293
///
9394
/// Can be cloned, but the trigger status is shared across all instances so only
9495
/// the first caller each interval will yield true.
9596
///
9697
/// When all instances are dropped the background thread will exit on the next interval.
97-
#[derive(Debug, Clone)]
9898
pub struct Alarm {
9999
interval: Duration,
100-
trigger: Arc<AtomicBool>,
100+
trigger: Arc<AtomicU8>,
101101
}
102102

103+
pub const ALARM_TRIGGER_NONE: u8 = 0;
104+
pub const ALARM_TRIGGER_TIMER: u8 = 1;
105+
pub const ALARM_TRIGGER_SIGNAL: u8 = 2;
106+
103107
impl Alarm {
108+
/// use to construct alarm timer with duration
104109
pub fn with_interval(interval: Duration) -> Self {
105-
let trigger = Arc::new(AtomicBool::default());
110+
let trigger = Arc::new(AtomicU8::default());
106111

107112
let weak_trigger = Arc::downgrade(&trigger);
108113
thread::spawn(move || {
109114
while let Some(trigger) = weak_trigger.upgrade() {
110115
thread::sleep(interval);
111-
trigger.store(true, Relaxed);
116+
trigger.store(ALARM_TRIGGER_TIMER, Relaxed);
112117
}
113118
});
114119

115120
Self { interval, trigger }
116121
}
117122

118-
pub fn is_triggered(&self) -> bool {
119-
self.trigger.swap(false, Relaxed)
123+
/// Returns a closure that allows to manually trigger the alarm
124+
///
125+
/// This is useful for cases where more than one alarm even source exists
126+
/// In case of `dd` there is the SIGUSR1/SIGINFO case where we want to
127+
/// trigger an manual progress report.
128+
pub fn manual_trigger_fn(&self) -> Box<dyn Send + Sync + Fn()> {
129+
let weak_trigger = Arc::downgrade(&self.trigger);
130+
Box::new(move || {
131+
if let Some(trigger) = weak_trigger.upgrade() {
132+
trigger.store(ALARM_TRIGGER_SIGNAL, Relaxed);
133+
}
134+
})
135+
}
136+
137+
/// Use this function to poll for any pending alarm event
138+
///
139+
/// Returns `ALARM_TRIGGER_NONE` for no pending event.
140+
/// Returns `ALARM_TRIGGER_TIMER` if the event was triggered by timer
141+
/// Returns `ALARM_TRIGGER_SIGNAL` if the event was triggered manually
142+
/// by the closure returned from `manual_trigger_fn`
143+
pub fn get_trigger(&self) -> u8 {
144+
self.trigger.swap(ALARM_TRIGGER_NONE, Relaxed)
120145
}
121146

147+
// Getter function for the configured interval duration
122148
pub fn get_interval(&self) -> Duration {
123149
self.interval
124150
}
@@ -818,6 +844,30 @@ impl<'a> Output<'a> {
818844
}
819845
}
820846

847+
/// writes a block of data. optionally retries when first try didn't complete
848+
///
849+
/// this is needed by gnu-test: tests/dd/stats.s
850+
/// the write can be interrupted by a system signal.
851+
/// e.g. SIGUSR1 which is send to report status
852+
/// without retry, the data might not be fully written to destination.
853+
fn write_block(&mut self, chunk: &[u8]) -> io::Result<usize> {
854+
let full_len = chunk.len();
855+
let mut base_idx = 0;
856+
loop {
857+
match self.dst.write(&chunk[base_idx..]) {
858+
Ok(wlen) => {
859+
base_idx += wlen;
860+
// take iflags.fullblock as oflags shall not have this option
861+
if (base_idx >= full_len) || !self.settings.iflags.fullblock {
862+
return Ok(base_idx);
863+
}
864+
}
865+
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
866+
Err(e) => return Err(e),
867+
}
868+
}
869+
}
870+
821871
/// Write the given bytes one block at a time.
822872
///
823873
/// This may write partial blocks (for example, if the underlying
@@ -831,7 +881,7 @@ impl<'a> Output<'a> {
831881
let mut bytes_total = 0;
832882

833883
for chunk in buf.chunks(self.settings.obs) {
834-
let wlen = self.dst.write(chunk)?;
884+
let wlen = self.write_block(chunk)?;
835885
if wlen < self.settings.obs {
836886
writes_partial += 1;
837887
} else {
@@ -922,6 +972,29 @@ impl<'a> BlockWriter<'a> {
922972
}
923973
}
924974

975+
/// depending on the command line arguments, this function
976+
/// informs the OS to flush/discard the caches for input and/or output file.
977+
fn flush_caches_full_length(i: &Input, o: &Output) -> std::io::Result<()> {
978+
// TODO Better error handling for overflowing `len`.
979+
if i.settings.iflags.nocache {
980+
let offset = 0;
981+
#[allow(clippy::useless_conversion)]
982+
let len = i.src.len()?.try_into().unwrap();
983+
i.discard_cache(offset, len);
984+
}
985+
// Similarly, discard the system cache for the output file.
986+
//
987+
// TODO Better error handling for overflowing `len`.
988+
if i.settings.oflags.nocache {
989+
let offset = 0;
990+
#[allow(clippy::useless_conversion)]
991+
let len = o.dst.len()?.try_into().unwrap();
992+
o.discard_cache(offset, len);
993+
}
994+
995+
Ok(())
996+
}
997+
925998
/// Copy the given input data to this output, consuming both.
926999
///
9271000
/// This method contains the main loop for the `dd` program. Bytes
@@ -981,22 +1054,7 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
9811054
// requests that we inform the system that we no longer
9821055
// need the contents of the input file in a system cache.
9831056
//
984-
// TODO Better error handling for overflowing `len`.
985-
if i.settings.iflags.nocache {
986-
let offset = 0;
987-
#[allow(clippy::useless_conversion)]
988-
let len = i.src.len()?.try_into().unwrap();
989-
i.discard_cache(offset, len);
990-
}
991-
// Similarly, discard the system cache for the output file.
992-
//
993-
// TODO Better error handling for overflowing `len`.
994-
if i.settings.oflags.nocache {
995-
let offset = 0;
996-
#[allow(clippy::useless_conversion)]
997-
let len = o.dst.len()?.try_into().unwrap();
998-
o.discard_cache(offset, len);
999-
}
1057+
flush_caches_full_length(&i, &o)?;
10001058
return finalize(
10011059
BlockWriter::Unbuffered(o),
10021060
rstat,
@@ -1018,6 +1076,18 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
10181076
// This avoids the need to query the OS monotonic clock for every block.
10191077
let alarm = Alarm::with_interval(Duration::from_secs(1));
10201078

1079+
// The signal handler spawns an own thread that waits for signals.
1080+
// When the signal is received, it calls a handler function.
1081+
// We inject a handler function that manually triggers the alarm.
1082+
#[cfg(target_os = "linux")]
1083+
let signal_handler = progress::SignalHandler::install_signal_handler(alarm.manual_trigger_fn());
1084+
#[cfg(target_os = "linux")]
1085+
if let Err(e) = &signal_handler {
1086+
if Some(StatusLevel::None) != i.settings.status {
1087+
eprintln!("Internal dd Warning: Unable to register signal handler \n\t{e}");
1088+
}
1089+
}
1090+
10211091
// Index in the input file where we are reading bytes and in
10221092
// the output file where we are writing bytes.
10231093
//
@@ -1086,11 +1156,20 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
10861156
// error.
10871157
rstat += rstat_update;
10881158
wstat += wstat_update;
1089-
if alarm.is_triggered() {
1090-
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), false);
1091-
prog_tx.send(prog_update).unwrap_or(());
1159+
match alarm.get_trigger() {
1160+
ALARM_TRIGGER_NONE => {}
1161+
t @ ALARM_TRIGGER_TIMER | t @ ALARM_TRIGGER_SIGNAL => {
1162+
let tp = match t {
1163+
ALARM_TRIGGER_TIMER => ProgUpdateType::Periodic,
1164+
_ => ProgUpdateType::Signal,
1165+
};
1166+
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), tp);
1167+
prog_tx.send(prog_update).unwrap_or(());
1168+
}
1169+
_ => {}
10921170
}
10931171
}
1172+
10941173
finalize(o, rstat, wstat, start, &prog_tx, output_thread, truncate)
10951174
}
10961175

@@ -1118,12 +1197,13 @@ fn finalize<T>(
11181197

11191198
// Print the final read/write statistics.
11201199
let wstat = wstat + wstat_update;
1121-
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), true);
1200+
let prog_update = ProgUpdate::new(rstat, wstat, start.elapsed(), ProgUpdateType::Final);
11221201
prog_tx.send(prog_update).unwrap_or(());
11231202
// Wait for the output thread to finish
11241203
output_thread
11251204
.join()
11261205
.expect("Failed to join with the output thread.");
1206+
11271207
Ok(())
11281208
}
11291209

0 commit comments

Comments
 (0)