Skip to content

sort: Rework merge batching logic #6957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/uu/sort/src/ext_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ fn reader_writer<
)?;
match read_result {
ReadResult::WroteChunksToFile { tmp_files } => {
let merger = merge::merge_with_file_limit::<_, _, Tmp>(
merge::merge_with_file_limit::<_, _, Tmp>(
tmp_files.into_iter().map(|c| c.reopen()),
settings,
output,
tmp_dir,
)?;
merger.write_all(settings, output)?;
}
ReadResult::SortedSingleChunk(chunk) => {
if settings.unique {
Expand Down
77 changes: 40 additions & 37 deletions src/uu/sort/src/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::{
};

use compare::Compare;
use itertools::Itertools;
use uucore::error::UResult;

use crate::{
Expand Down Expand Up @@ -67,58 +66,63 @@ fn replace_output_file_in_input_files(
///
/// If `settings.merge_batch_size` is greater than the length of `files`, intermediate files will be used.
/// If `settings.compress_prog` is `Some`, intermediate files will be compressed with it.
pub fn merge<'a>(
pub fn merge(
files: &mut [OsString],
settings: &'a GlobalSettings,
output: Option<&str>,
settings: &GlobalSettings,
output: Output,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<FileMerger<'a>> {
replace_output_file_in_input_files(files, output, tmp_dir)?;
) -> UResult<()> {
replace_output_file_in_input_files(files, output.as_output_name(), tmp_dir)?;
let files = files
.iter()
.map(|file| open(file).map(|file| PlainMergeInput { inner: file }));
if settings.compress_prog.is_none() {
merge_with_file_limit::<_, _, WriteablePlainTmpFile>(
files
.iter()
.map(|file| open(file).map(|file| PlainMergeInput { inner: file })),
settings,
tmp_dir,
)
merge_with_file_limit::<_, _, WriteablePlainTmpFile>(files, settings, output, tmp_dir)
} else {
merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(
files
.iter()
.map(|file| open(file).map(|file| PlainMergeInput { inner: file })),
settings,
tmp_dir,
)
merge_with_file_limit::<_, _, WriteableCompressedTmpFile>(files, settings, output, tmp_dir)
}
}

// Merge already sorted `MergeInput`s.
pub fn merge_with_file_limit<
'a,
M: MergeInput + 'static,
F: ExactSizeIterator<Item = UResult<M>>,
Tmp: WriteableTmpFile + 'static,
>(
files: F,
settings: &'a GlobalSettings,
settings: &GlobalSettings,
output: Output,
tmp_dir: &mut TmpDirWrapper,
) -> UResult<FileMerger<'a>> {
if files.len() > settings.merge_batch_size {
let mut remaining_files = files.len();
let batches = files.chunks(settings.merge_batch_size);
let mut batches = batches.into_iter();
) -> UResult<()> {
if files.len() <= settings.merge_batch_size {
let merger = merge_without_limit(files, settings);
merger?.write_all(settings, output)
} else {
let mut temporary_files = vec![];
while remaining_files != 0 {
// Work around the fact that `Chunks` is not an `ExactSizeIterator`.
remaining_files = remaining_files.saturating_sub(settings.merge_batch_size);
let merger = merge_without_limit(batches.next().unwrap(), settings)?;
let mut batch = vec![];
for file in files {
batch.push(file);
if batch.len() >= settings.merge_batch_size {
assert_eq!(batch.len(), settings.merge_batch_size);
let merger = merge_without_limit(batch.into_iter(), settings)?;
batch = vec![];

let mut tmp_file =
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
merger.write_all_to(settings, tmp_file.as_write())?;
temporary_files.push(tmp_file.finished_writing()?);
}
}
// Merge any remaining files that didn't get merged in a full batch above.
if !batch.is_empty() {
assert!(batch.len() < settings.merge_batch_size);
let merger = merge_without_limit(batch.into_iter(), settings)?;

let mut tmp_file =
Tmp::create(tmp_dir.next_file()?, settings.compress_prog.as_deref())?;
merger.write_all_to(settings, tmp_file.as_write())?;
temporary_files.push(tmp_file.finished_writing()?);
}
assert!(batches.next().is_none());
merge_with_file_limit::<_, _, Tmp>(
temporary_files
.into_iter()
Expand All @@ -127,10 +131,9 @@ pub fn merge_with_file_limit<
dyn FnMut(Tmp::Closed) -> UResult<<Tmp::Closed as ClosedTmpFile>::Reopened>,
>),
settings,
output,
tmp_dir,
)
} else {
merge_without_limit(files, settings)
}
}

Expand Down Expand Up @@ -260,7 +263,7 @@ struct PreviousLine {
}

/// Merges files together. This is **not** an iterator because of lifetime problems.
pub struct FileMerger<'a> {
struct FileMerger<'a> {
heap: binary_heap_plus::BinaryHeap<MergeableFile, FileComparator<'a>>,
request_sender: Sender<(usize, RecycledChunk)>,
prev: Option<PreviousLine>,
Expand All @@ -269,12 +272,12 @@ pub struct FileMerger<'a> {

impl FileMerger<'_> {
/// Write the merged contents to the output file.
pub fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> {
fn write_all(self, settings: &GlobalSettings, output: Output) -> UResult<()> {
let mut out = output.into_write();
self.write_all_to(settings, &mut out)
}

pub fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> {
fn write_all_to(mut self, settings: &GlobalSettings, out: &mut impl Write) -> UResult<()> {
while self.write_next(settings, out) {}
drop(self.request_sender);
self.reader_join_handle.join().unwrap()
Expand Down
3 changes: 1 addition & 2 deletions src/uu/sort/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1567,8 +1567,7 @@ fn exec(
tmp_dir: &mut TmpDirWrapper,
) -> UResult<()> {
if settings.merge {
let file_merger = merge::merge(files, settings, output.as_output_name(), tmp_dir)?;
file_merger.write_all(settings, output)
merge::merge(files, settings, output, tmp_dir)
} else if settings.check {
if files.len() > 1 {
Err(UUsageError::new(2, "only one file allowed with -c"))
Expand Down
27 changes: 26 additions & 1 deletion tests/by-util/test_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// For the full copyright and license information, please view the LICENSE
// file that was distributed with this source code.

// spell-checker:ignore (words) ints
// spell-checker:ignore (words) ints (linux) NOFILE
#![allow(clippy::cast_possible_wrap)]

use std::time::Duration;
Expand Down Expand Up @@ -1084,6 +1084,31 @@ fn test_merge_batch_size() {
.stdout_only_fixture("merge_ints_interleaved.expected");
}

#[test]
#[cfg(any(target_os = "linux", target_os = "android"))]
fn test_merge_batch_size_with_limit() {
use rlimit::Resource;
// Currently need...
// 3 descriptors for stdin, stdout, stderr
// 2 descriptors for CTRL+C handling logic (to be reworked at some point)
// 2 descriptors for the input files (i.e. batch-size of 2).
let limit_fd = 3 + 2 + 2;
TestScenario::new(util_name!())
.ucmd()
.limit(Resource::NOFILE, limit_fd, limit_fd)
.arg("--batch-size=2")
.arg("-m")
.arg("--unique")
.arg("merge_ints_interleaved_1.txt")
.arg("merge_ints_interleaved_2.txt")
.arg("merge_ints_interleaved_3.txt")
.arg("merge_ints_interleaved_3.txt")
.arg("merge_ints_interleaved_2.txt")
.arg("merge_ints_interleaved_1.txt")
.succeeds()
.stdout_only_fixture("merge_ints_interleaved.expected");
}

#[test]
fn test_sigpipe_panic() {
let mut cmd = new_ucmd!();
Expand Down
Loading