diff --git a/src/uu/sort/src/ext_sort.rs b/src/uu/sort/src/ext_sort.rs index 18309881229..57e434e99b2 100644 --- a/src/uu/sort/src/ext_sort.rs +++ b/src/uu/sort/src/ext_sort.rs @@ -98,12 +98,12 @@ fn reader_writer< )?; match read_result { ReadResult::WroteChunksToFile { tmp_files } => { - let merger = merge::merge_with_file_limit::<_, _, Tmp>( + merge::merge_with_file_limit::<_, _, Tmp>( tmp_files.into_iter().map(|c| c.reopen()), settings, + output, tmp_dir, )?; - merger.write_all(settings, output)?; } ReadResult::SortedSingleChunk(chunk) => { if settings.unique { diff --git a/src/uu/sort/src/merge.rs b/src/uu/sort/src/merge.rs index d6872ec80e3..300733d1e36 100644 --- a/src/uu/sort/src/merge.rs +++ b/src/uu/sort/src/merge.rs @@ -25,7 +25,6 @@ use std::{ }; use compare::Compare; -use itertools::Itertools; use uucore::error::UResult; use crate::{ @@ -67,58 +66,63 @@ fn replace_output_file_in_input_files( /// /// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used. /// If `settings.compress_prog` is `Some`, intermediate files will be compressed with it. -pub fn merge<'a>( +pub fn merge( files: &mut [OsString], - settings: &'a GlobalSettings, - output: Option<&str>, + settings: &GlobalSettings, + output: Output, tmp_dir: &mut TmpDirWrapper, -) -> UResult> { - replace_output_file_in_input_files(files, output, tmp_dir)?; +) -> UResult<()> { + replace_output_file_in_input_files(files, output.as_output_name(), tmp_dir)?; + let files = files + .iter() + .map(|file| open(file).map(|file| PlainMergeInput { inner: file })); if settings.compress_prog.is_none() { - merge_with_file_limit::<_, _, WriteablePlainTmpFile>( - files - .iter() - .map(|file| open(file).map(|file| PlainMergeInput { inner: file })), - settings, - tmp_dir, - ) + merge_with_file_limit::<_, _, WriteablePlainTmpFile>(files, settings, output, tmp_dir) } else { - merge_with_file_limit::<_, _, WriteableCompressedTmpFile>( - files - .iter() - .map(|file| open(file).map(|file| PlainMergeInput { inner: file })), - settings, - tmp_dir, - ) + merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(files, settings, output, tmp_dir) } } // Merge already sorted `MergeInput`s. pub fn merge_with_file_limit< - 'a, M: MergeInput + 'static, F: ExactSizeIterator>, Tmp: WriteableTmpFile + 'static, >( files: F, - settings: &'a GlobalSettings, + settings: &GlobalSettings, + output: Output, tmp_dir: &mut TmpDirWrapper, -) -> UResult> { - if files.len() > settings.merge_batch_size { - let mut remaining_files = files.len(); - let batches = files.chunks(settings.merge_batch_size); - let mut batches = batches.into_iter(); +) -> UResult<()> { + if files.len() <= settings.merge_batch_size { + let merger = merge_without_limit(files, settings); + merger?.write_all(settings, output) + } else { let mut temporary_files = vec![]; - while remaining_files != 0 { - // Work around the fact that `Chunks` is not an `ExactSizeIterator`. - remaining_files = remaining_files.saturating_sub(settings.merge_batch_size); - let merger = merge_without_limit(batches.next().unwrap(), settings)?; + let mut batch = vec![]; + for file in files { + batch.push(file); + if batch.len() >= settings.merge_batch_size { + assert_eq!(batch.len(), settings.merge_batch_size); + let merger = merge_without_limit(batch.into_iter(), settings)?; + batch = vec![]; + + let mut tmp_file = + Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?; + merger.write_all_to(settings, tmp_file.as_write())?; + temporary_files.push(tmp_file.finished_writing()?); + } + } + // Merge any remaining files that didn't get merged in a full batch above. + if !batch.is_empty() { + assert!(batch.len() < settings.merge_batch_size); + let merger = merge_without_limit(batch.into_iter(), settings)?; + let mut tmp_file = Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?; merger.write_all_to(settings, tmp_file.as_write())?; temporary_files.push(tmp_file.finished_writing()?); } - assert!(batches.next().is_none()); merge_with_file_limit::<_, _, Tmp>( temporary_files .into_iter() @@ -127,10 +131,9 @@ pub fn merge_with_file_limit< dyn FnMut(Tmp::Closed) -> UResult<::Reopened>, >), settings, + output, tmp_dir, ) - } else { - merge_without_limit(files, settings) } } @@ -260,7 +263,7 @@ struct PreviousLine { } /// Merges files together. This is **not** an iterator because of lifetime problems. -pub struct FileMerger<'a> { +struct FileMerger<'a> { heap: binary_heap_plus::BinaryHeap>, request_sender: Sender<(usize, RecycledChunk)>, prev: Option, @@ -269,12 +272,12 @@ pub struct FileMerger<'a> { impl FileMerger<'_> { /// Write the merged contents to the output file. - pub fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> { + fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> { let mut out = output.into_write(); self.write_all_to(settings, &mut out) } - pub fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> { + fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> { while self.write_next(settings, out) {} drop(self.request_sender); self.reader_join_handle.join().unwrap() diff --git a/src/uu/sort/src/sort.rs b/src/uu/sort/src/sort.rs index c2e752bdf6a..8b6fcbb2514 100644 --- a/src/uu/sort/src/sort.rs +++ b/src/uu/sort/src/sort.rs @@ -1567,8 +1567,7 @@ fn exec( tmp_dir: &mut TmpDirWrapper, ) -> UResult<()> { if settings.merge { - let file_merger = merge::merge(files, settings, output.as_output_name(), tmp_dir)?; - file_merger.write_all(settings, output) + merge::merge(files, settings, output, tmp_dir) } else if settings.check { if files.len() > 1 { Err(UUsageError::new(2, "only one file allowed with -c")) diff --git a/tests/by-util/test_sort.rs b/tests/by-util/test_sort.rs index 97bfc6a74d0..62aa07dae5d 100644 --- a/tests/by-util/test_sort.rs +++ b/tests/by-util/test_sort.rs @@ -3,7 +3,7 @@ // For the full copyright and license information, please view the LICENSE // file that was distributed with this source code. -// spell-checker:ignore (words) ints +// spell-checker:ignore (words) ints (linux) NOFILE #![allow(clippy::cast_possible_wrap)] use std::time::Duration; @@ -1084,6 +1084,31 @@ fn test_merge_batch_size() { .stdout_only_fixture("merge_ints_interleaved.expected"); } +#[test] +#[cfg(any(target_os = "linux", target_os = "android"))] +fn test_merge_batch_size_with_limit() { + use rlimit::Resource; + // Currently need... + // 3 descriptors for stdin, stdout, stderr + // 2 descriptors for CTRL+C handling logic (to be reworked at some point) + // 2 descriptors for the input files (i.e. batch-size of 2). + let limit_fd = 3 + 2 + 2; + TestScenario::new(util_name!()) + .ucmd() + .limit(Resource::NOFILE, limit_fd, limit_fd) + .arg("--batch-size=2") + .arg("-m") + .arg("--unique") + .arg("merge_ints_interleaved_1.txt") + .arg("merge_ints_interleaved_2.txt") + .arg("merge_ints_interleaved_3.txt") + .arg("merge_ints_interleaved_3.txt") + .arg("merge_ints_interleaved_2.txt") + .arg("merge_ints_interleaved_1.txt") + .succeeds() + .stdout_only_fixture("merge_ints_interleaved.expected"); +} + #[test] fn test_sigpipe_panic() { let mut cmd = new_ucmd!();