Skip to content

perf: avoid transmitting parity shards when the object is good #322

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 2, 2025

Conversation

zzhpro
Copy link
Contributor

@zzhpro zzhpro commented Jul 31, 2025

Type of Change

  • New Feature
  • Bug Fix
  • Documentation
  • Performance Improvement
  • Test/CI
  • Refactor
  • Other:

Related Issues

#46
#50

Summary of Changes

When executing GetObject, transmitting the data shards only when the object is fine. Read the parity shards only when some of the data shards are broken.
I've added some unit tests and tested this implementation on a three-nodes RustFS setup, the object is available when one of the nodes is down or one part file is lost

Checklist

  • I have read and followed the CONTRIBUTING.md guidelines
  • Code is formatted with cargo fmt --all
  • Passed cargo clippy --all-targets --all-features -- -D warnings
  • Passed cargo check --all-targets
  • Added/updated necessary tests
  • Documentation updated (if needed)
  • CI/CD passed (if applicable)

Impact

  • Breaking change (compatibility)
  • Requires doc/config/deployment update
  • Other impact:

Additional Notes


Thank you for your contribution! Please ensure your PR follows the community standards (CODE_OF_CONDUCT.md) and sign the CLA if this is your first contribution.

@CLAassistant
Copy link

CLAassistant commented Jul 31, 2025

CLA assistant check
All committers have signed the CLA.

@weisd weisd requested a review from shiroleeee August 1, 2025 01:32
@shiroleeee
Copy link
Contributor

You can try using futures::stream::FuturesUnordered to simplify concurrency control. For example, in the following case, we put all async blocks for reader data reading into a Vec, then control the concurrency level by limiting the number of pushes to FuturesUnordered. When errors occur, you can quickly push an additional async block for execution without waiting for all tasks in the current batch to complete before proceeding to the next round.

        use futures::stream::{FuturesUnordered, StreamExt};

        let mut futures = Vec::with_capacity(num_readers);
        for (i, reader) in self.readers.iter_mut().enumerate() {
            if let Some(r) = reader {
                futures.push(async move {
                    let mut buf = vec![0u8; shard_size];
                    match r.read(&mut buf).await {
                        Ok(n) => {
                            buf.truncate(n);
                            (i, Ok(buf))
                        }
                        Err(e) => (i, Err(Error::from(e))),
                    }
                });
            }
        }
        if futures.len() > self.data_shards {
            let mut sets = FuturesUnordered::new();
            let mut fns_iter = futures.into_iter();
            for _ in 0..self.data_shards {
                if let Some(f) = fns_iter.next() {
                    sets.push(f)
                }
            }

            while let Some((i, result)) = sets.next().await {
                match result {
                    Ok(v) => {}
                    Err(e) => {
                        // error!("Error reading shard {}: {}", i, e);
                        if let Some(f) = fns_iter.next() {
                            sets.push(f)
                        }
                    }
                }
            }
        }

@zzhpro
Copy link
Contributor Author

zzhpro commented Aug 1, 2025

@shiroleeee thanks for your advice, I'll improve this.

@zzhpro zzhpro force-pushed the optimize-get-object branch from cc37fa5 to 5048dc2 Compare August 1, 2025 08:56
@shiroleeee
Copy link
Contributor

You've done great work! However, I think the logic for determining whether sufficient data shards are obtained could be further simplified. For example, as shown below, we can simply verify if the number of successful returns matches the total count of data shards.

            let mut success = 0;
            while let Some((i, result)) = sets.next().await {
                match result {
                    Ok(v) => {
                        shards[i] = Some(v);
                        success += 1;
                    }
                    Err(e) => {
                        errs[i] = Some(e);
                        if let Some(future) = fut_iter.next() {
                            sets.push(future);
                        }
                        continue;
                    }
                }

                if success == self.data_shards {
                    break;
                }
            }

@zzhpro
Copy link
Contributor Author

zzhpro commented Aug 2, 2025

@shiroleeee You're right. Your implementation is actually equivalent to mine while being simpler. I've applied the changes in the new commit.

@weisd weisd merged commit b119894 into rustfs:main Aug 2, 2025
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants