@@ -21,6 +21,7 @@ use nix::fcntl::FcntlArg::F_SETFL;
21
21
#[ cfg( any( target_os = "linux" , target_os = "android" ) ) ]
22
22
use nix:: fcntl:: OFlag ;
23
23
use parseargs:: Parser ;
24
+ use progress:: ProgUpdateType ;
24
25
use progress:: { gen_prog_updater, ProgUpdate , ReadStat , StatusLevel , WriteStat } ;
25
26
use uucore:: io:: OwnedFileDescriptorOrHandle ;
26
27
@@ -39,10 +40,8 @@ use std::os::unix::{
39
40
#[ cfg( windows) ]
40
41
use std:: os:: windows:: { fs:: MetadataExt , io:: AsHandle } ;
41
42
use std:: path:: Path ;
42
- use std:: sync:: {
43
- atomic:: { AtomicBool , Ordering :: Relaxed } ,
44
- mpsc, Arc ,
45
- } ;
43
+ use std:: sync:: atomic:: AtomicU8 ;
44
+ use std:: sync:: { atomic:: Ordering :: Relaxed , mpsc, Arc } ;
46
45
use std:: thread;
47
46
use std:: time:: { Duration , Instant } ;
48
47
@@ -87,38 +86,65 @@ struct Settings {
87
86
88
87
/// A timer which triggers on a given interval
89
88
///
90
- /// After being constructed with [`Alarm::with_interval`], [`Alarm::is_triggered`]
91
- /// will return true once per the given [`Duration`].
89
+ /// After being constructed with [`Alarm::with_interval`], [`Alarm::get_trigger`]
90
+ /// will return [`ALARM_TRIGGER_TIMER`] once per the given [`Duration`].
91
+ /// Alarm can be manually triggered with closure returned by [`Alarm::manual_trigger_fn`].
92
+ /// [`Alarm::get_trigger`] will return [`ALARM_TRIGGER_SIGNAL`] in this case.
92
93
///
93
94
/// Can be cloned, but the trigger status is shared across all instances so only
94
95
/// the first caller each interval will yield true.
95
96
///
96
97
/// When all instances are dropped the background thread will exit on the next interval.
97
- #[ derive( Debug , Clone ) ]
98
98
pub struct Alarm {
99
99
interval : Duration ,
100
- trigger : Arc < AtomicBool > ,
100
+ trigger : Arc < AtomicU8 > ,
101
101
}
102
102
103
+ pub const ALARM_TRIGGER_NONE : u8 = 0 ;
104
+ pub const ALARM_TRIGGER_TIMER : u8 = 1 ;
105
+ pub const ALARM_TRIGGER_SIGNAL : u8 = 2 ;
106
+
103
107
impl Alarm {
108
+ /// use to construct alarm timer with duration
104
109
pub fn with_interval ( interval : Duration ) -> Self {
105
- let trigger = Arc :: new ( AtomicBool :: default ( ) ) ;
110
+ let trigger = Arc :: new ( AtomicU8 :: default ( ) ) ;
106
111
107
112
let weak_trigger = Arc :: downgrade ( & trigger) ;
108
113
thread:: spawn ( move || {
109
114
while let Some ( trigger) = weak_trigger. upgrade ( ) {
110
115
thread:: sleep ( interval) ;
111
- trigger. store ( true , Relaxed ) ;
116
+ trigger. store ( ALARM_TRIGGER_TIMER , Relaxed ) ;
112
117
}
113
118
} ) ;
114
119
115
120
Self { interval, trigger }
116
121
}
117
122
118
- pub fn is_triggered ( & self ) -> bool {
119
- self . trigger . swap ( false , Relaxed )
123
+ /// Returns a closure that allows to manually trigger the alarm
124
+ ///
125
+ /// This is useful for cases where more than one alarm even source exists
126
+ /// In case of `dd` there is the SIGUSR1/SIGINFO case where we want to
127
+ /// trigger an manual progress report.
128
+ pub fn manual_trigger_fn ( & self ) -> Box < dyn Send + Sync + Fn ( ) > {
129
+ let weak_trigger = Arc :: downgrade ( & self . trigger ) ;
130
+ Box :: new ( move || {
131
+ if let Some ( trigger) = weak_trigger. upgrade ( ) {
132
+ trigger. store ( ALARM_TRIGGER_SIGNAL , Relaxed ) ;
133
+ }
134
+ } )
135
+ }
136
+
137
+ /// Use this function to poll for any pending alarm event
138
+ ///
139
+ /// Returns `ALARM_TRIGGER_NONE` for no pending event.
140
+ /// Returns `ALARM_TRIGGER_TIMER` if the event was triggered by timer
141
+ /// Returns `ALARM_TRIGGER_SIGNAL` if the event was triggered manually
142
+ /// by the closure returned from `manual_trigger_fn`
143
+ pub fn get_trigger ( & self ) -> u8 {
144
+ self . trigger . swap ( ALARM_TRIGGER_NONE , Relaxed )
120
145
}
121
146
147
+ // Getter function for the configured interval duration
122
148
pub fn get_interval ( & self ) -> Duration {
123
149
self . interval
124
150
}
@@ -818,6 +844,30 @@ impl<'a> Output<'a> {
818
844
}
819
845
}
820
846
847
+ /// writes a block of data. optionally retries when first try didn't complete
848
+ ///
849
+ /// this is needed by gnu-test: tests/dd/stats.s
850
+ /// the write can be interrupted by a system signal.
851
+ /// e.g. SIGUSR1 which is send to report status
852
+ /// without retry, the data might not be fully written to destination.
853
+ fn write_block ( & mut self , chunk : & [ u8 ] ) -> io:: Result < usize > {
854
+ let full_len = chunk. len ( ) ;
855
+ let mut base_idx = 0 ;
856
+ loop {
857
+ match self . dst . write ( & chunk[ base_idx..] ) {
858
+ Ok ( wlen) => {
859
+ base_idx += wlen;
860
+ // take iflags.fullblock as oflags shall not have this option
861
+ if ( base_idx >= full_len) || !self . settings . iflags . fullblock {
862
+ return Ok ( base_idx) ;
863
+ }
864
+ }
865
+ Err ( e) if e. kind ( ) == io:: ErrorKind :: Interrupted => continue ,
866
+ Err ( e) => return Err ( e) ,
867
+ }
868
+ }
869
+ }
870
+
821
871
/// Write the given bytes one block at a time.
822
872
///
823
873
/// This may write partial blocks (for example, if the underlying
@@ -831,7 +881,7 @@ impl<'a> Output<'a> {
831
881
let mut bytes_total = 0 ;
832
882
833
883
for chunk in buf. chunks ( self . settings . obs ) {
834
- let wlen = self . dst . write ( chunk) ?;
884
+ let wlen = self . write_block ( chunk) ?;
835
885
if wlen < self . settings . obs {
836
886
writes_partial += 1 ;
837
887
} else {
@@ -922,6 +972,29 @@ impl<'a> BlockWriter<'a> {
922
972
}
923
973
}
924
974
975
+ /// depending on the command line arguments, this function
976
+ /// informs the OS to flush/discard the caches for input and/or output file.
977
+ fn flush_caches_full_length ( i : & Input , o : & Output ) -> std:: io:: Result < ( ) > {
978
+ // TODO Better error handling for overflowing `len`.
979
+ if i. settings . iflags . nocache {
980
+ let offset = 0 ;
981
+ #[ allow( clippy:: useless_conversion) ]
982
+ let len = i. src . len ( ) ?. try_into ( ) . unwrap ( ) ;
983
+ i. discard_cache ( offset, len) ;
984
+ }
985
+ // Similarly, discard the system cache for the output file.
986
+ //
987
+ // TODO Better error handling for overflowing `len`.
988
+ if i. settings . oflags . nocache {
989
+ let offset = 0 ;
990
+ #[ allow( clippy:: useless_conversion) ]
991
+ let len = o. dst . len ( ) ?. try_into ( ) . unwrap ( ) ;
992
+ o. discard_cache ( offset, len) ;
993
+ }
994
+
995
+ Ok ( ( ) )
996
+ }
997
+
925
998
/// Copy the given input data to this output, consuming both.
926
999
///
927
1000
/// This method contains the main loop for the `dd` program. Bytes
@@ -981,22 +1054,7 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
981
1054
// requests that we inform the system that we no longer
982
1055
// need the contents of the input file in a system cache.
983
1056
//
984
- // TODO Better error handling for overflowing `len`.
985
- if i. settings . iflags . nocache {
986
- let offset = 0 ;
987
- #[ allow( clippy:: useless_conversion) ]
988
- let len = i. src . len ( ) ?. try_into ( ) . unwrap ( ) ;
989
- i. discard_cache ( offset, len) ;
990
- }
991
- // Similarly, discard the system cache for the output file.
992
- //
993
- // TODO Better error handling for overflowing `len`.
994
- if i. settings . oflags . nocache {
995
- let offset = 0 ;
996
- #[ allow( clippy:: useless_conversion) ]
997
- let len = o. dst . len ( ) ?. try_into ( ) . unwrap ( ) ;
998
- o. discard_cache ( offset, len) ;
999
- }
1057
+ flush_caches_full_length ( & i, & o) ?;
1000
1058
return finalize (
1001
1059
BlockWriter :: Unbuffered ( o) ,
1002
1060
rstat,
@@ -1018,6 +1076,18 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
1018
1076
// This avoids the need to query the OS monotonic clock for every block.
1019
1077
let alarm = Alarm :: with_interval ( Duration :: from_secs ( 1 ) ) ;
1020
1078
1079
+ // The signal handler spawns an own thread that waits for signals.
1080
+ // When the signal is received, it calls a handler function.
1081
+ // We inject a handler function that manually triggers the alarm.
1082
+ #[ cfg( target_os = "linux" ) ]
1083
+ let signal_handler = progress:: SignalHandler :: install_signal_handler ( alarm. manual_trigger_fn ( ) ) ;
1084
+ #[ cfg( target_os = "linux" ) ]
1085
+ if let Err ( e) = & signal_handler {
1086
+ if Some ( StatusLevel :: None ) != i. settings . status {
1087
+ eprintln ! ( "Internal dd Warning: Unable to register signal handler \n \t {e}" ) ;
1088
+ }
1089
+ }
1090
+
1021
1091
// Index in the input file where we are reading bytes and in
1022
1092
// the output file where we are writing bytes.
1023
1093
//
@@ -1086,11 +1156,20 @@ fn dd_copy(mut i: Input, o: Output) -> std::io::Result<()> {
1086
1156
// error.
1087
1157
rstat += rstat_update;
1088
1158
wstat += wstat_update;
1089
- if alarm. is_triggered ( ) {
1090
- let prog_update = ProgUpdate :: new ( rstat, wstat, start. elapsed ( ) , false ) ;
1091
- prog_tx. send ( prog_update) . unwrap_or ( ( ) ) ;
1159
+ match alarm. get_trigger ( ) {
1160
+ ALARM_TRIGGER_NONE => { }
1161
+ t @ ALARM_TRIGGER_TIMER | t @ ALARM_TRIGGER_SIGNAL => {
1162
+ let tp = match t {
1163
+ ALARM_TRIGGER_TIMER => ProgUpdateType :: Periodic ,
1164
+ _ => ProgUpdateType :: Signal ,
1165
+ } ;
1166
+ let prog_update = ProgUpdate :: new ( rstat, wstat, start. elapsed ( ) , tp) ;
1167
+ prog_tx. send ( prog_update) . unwrap_or ( ( ) ) ;
1168
+ }
1169
+ _ => { }
1092
1170
}
1093
1171
}
1172
+
1094
1173
finalize ( o, rstat, wstat, start, & prog_tx, output_thread, truncate)
1095
1174
}
1096
1175
@@ -1118,12 +1197,13 @@ fn finalize<T>(
1118
1197
1119
1198
// Print the final read/write statistics.
1120
1199
let wstat = wstat + wstat_update;
1121
- let prog_update = ProgUpdate :: new ( rstat, wstat, start. elapsed ( ) , true ) ;
1200
+ let prog_update = ProgUpdate :: new ( rstat, wstat, start. elapsed ( ) , ProgUpdateType :: Final ) ;
1122
1201
prog_tx. send ( prog_update) . unwrap_or ( ( ) ) ;
1123
1202
// Wait for the output thread to finish
1124
1203
output_thread
1125
1204
. join ( )
1126
1205
. expect ( "Failed to join with the output thread." ) ;
1206
+
1127
1207
Ok ( ( ) )
1128
1208
}
1129
1209
0 commit comments