Skip to content

Commit 24e3d3a

Browse files
committed
refactor(ecstore): Optimize memory usage for object integrity verification
Change the object integrity verification from reading all data to streaming processing to avoid memory overflow caused by large objects. Modify the TLS key log check to use environment variables directly instead of configuration constants. Add memory limits for object data reading in the AHM module. Signed-off-by: junxiang Mu <1948535941@qq.com>
1 parent ebad748 commit 24e3d3a

File tree

7 files changed

+79
-45
lines changed

7 files changed

+79
-45
lines changed

crates/ahm/src/heal/storage.rs

Lines changed: 53 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -148,27 +148,47 @@ impl HealStorageAPI for ECStoreHealStorage {
148148
async fn get_object_data(&self, bucket: &str, object: &str) -> Result<Option<Vec<u8>>> {
149149
debug!("Getting object data: {}/{}", bucket, object);
150150

151-
match (*self.ecstore)
151+
let reader = match (*self.ecstore)
152152
.get_object_reader(bucket, object, None, Default::default(), &Default::default())
153153
.await
154154
{
155-
Ok(mut reader) => match reader.read_all().await {
156-
Ok(data) => Ok(Some(data)),
155+
Ok(reader) => reader,
156+
Err(e) => {
157+
error!("Failed to get object: {}/{} - {}", bucket, object, e);
158+
return Err(Error::other(e));
159+
}
160+
};
161+
162+
// WARNING: Returning Vec<u8> for large objects is dangerous. To avoid OOM, cap the read size.
163+
// If needed, refactor callers to stream instead of buffering entire object.
164+
const MAX_READ_BYTES: usize = 16 * 1024 * 1024; // 16 MiB cap
165+
let mut buf = Vec::with_capacity(1024 * 1024);
166+
use tokio::io::AsyncReadExt as _;
167+
let mut n_read: usize = 0;
168+
let mut stream = reader.stream;
169+
loop {
170+
// Read in chunks
171+
let mut chunk = vec![0u8; 1024 * 1024];
172+
match stream.read(&mut chunk).await {
173+
Ok(0) => break,
174+
Ok(n) => {
175+
buf.extend_from_slice(&chunk[..n]);
176+
n_read += n;
177+
if n_read > MAX_READ_BYTES {
178+
warn!(
179+
"Object data exceeds cap ({} bytes), aborting full read to prevent OOM: {}/{}",
180+
MAX_READ_BYTES, bucket, object
181+
);
182+
return Ok(None);
183+
}
184+
}
157185
Err(e) => {
158186
error!("Failed to read object data: {}/{} - {}", bucket, object, e);
159-
Err(Error::other(e))
160-
}
161-
},
162-
Err(e) => {
163-
if matches!(e, rustfs_ecstore::error::StorageError::ObjectNotFound(_, _)) {
164-
debug!("Object data not found: {}/{}", bucket, object);
165-
Ok(None)
166-
} else {
167-
error!("Failed to get object: {}/{} - {}", bucket, object, e);
168-
Err(Error::other(e))
187+
return Err(Error::other(e));
169188
}
170189
}
171190
}
191+
Ok(Some(buf))
172192
}
173193

174194
async fn put_object_data(&self, bucket: &str, object: &str, data: &[u8]) -> Result<()> {
@@ -208,27 +228,34 @@ impl HealStorageAPI for ECStoreHealStorage {
208228
async fn verify_object_integrity(&self, bucket: &str, object: &str) -> Result<bool> {
209229
debug!("Verifying object integrity: {}/{}", bucket, object);
210230

211-
// Try to get object info and data to verify integrity
231+
// Check object metadata first
212232
match self.get_object_meta(bucket, object).await? {
213233
Some(obj_info) => {
214-
// Check if object has valid metadata
215234
if obj_info.size < 0 {
216235
warn!("Object has invalid size: {}/{}", bucket, object);
217236
return Ok(false);
218237
}
219238

220-
// Try to read object data to verify it's accessible
221-
match self.get_object_data(bucket, object).await {
222-
Ok(Some(_)) => {
223-
info!("Object integrity check passed: {}/{}", bucket, object);
224-
Ok(true)
225-
}
226-
Ok(None) => {
227-
warn!("Object data not found: {}/{}", bucket, object);
228-
Ok(false)
239+
// Stream-read the object to a sink to avoid loading into memory
240+
match (*self.ecstore)
241+
.get_object_reader(bucket, object, None, Default::default(), &Default::default())
242+
.await
243+
{
244+
Ok(reader) => {
245+
let mut stream = reader.stream;
246+
match tokio::io::copy(&mut stream, &mut tokio::io::sink()).await {
247+
Ok(_) => {
248+
info!("Object integrity check passed: {}/{}", bucket, object);
249+
Ok(true)
250+
}
251+
Err(e) => {
252+
warn!("Object stream read failed: {}/{} - {}", bucket, object, e);
253+
Ok(false)
254+
}
255+
}
229256
}
230-
Err(_) => {
231-
warn!("Object data read failed: {}/{}", bucket, object);
257+
Err(e) => {
258+
warn!("Failed to get object reader: {}/{} - {}", bucket, object, e);
232259
Ok(false)
233260
}
234261
}

crates/config/src/constants/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
pub(crate) mod app;
16-
pub(crate) mod env;
17-
pub(crate) mod tls;
15+
pub mod app;
16+
pub mod env;
17+
pub mod tls;

crates/ecstore/src/set_disk.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5354,9 +5354,10 @@ impl StorageAPI for SetDisks {
53545354

53555355
#[tracing::instrument(skip(self))]
53565356
async fn verify_object_integrity(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> {
5357-
let mut get_object_reader =
5358-
<Self as ObjectIO>::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?;
5359-
let _ = get_object_reader.read_all().await?;
5357+
let get_object_reader = <Self as ObjectIO>::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?;
5358+
// Stream to sink to avoid loading entire object into memory during verification
5359+
let mut reader = get_object_reader.stream;
5360+
tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?;
53605361
Ok(())
53615362
}
53625363
}

crates/ecstore/src/sets.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -882,11 +882,15 @@ impl StorageAPI for Sets {
882882
unimplemented!()
883883
}
884884

885-
#[tracing::instrument(skip(self))]
885+
#[tracing::instrument(level = "debug", skip(self))]
886886
async fn verify_object_integrity(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> {
887-
self.get_disks_by_key(object)
888-
.verify_object_integrity(bucket, object, opts)
889-
.await
887+
let gor = self.get_object_reader(bucket, object, None, HeaderMap::new(), opts).await?;
888+
let mut reader = gor.stream;
889+
890+
// Stream data to sink instead of reading all into memory to prevent OOM
891+
tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?;
892+
893+
Ok(())
890894
}
891895
}
892896

crates/ecstore/src/store.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2238,9 +2238,10 @@ impl StorageAPI for ECStore {
22382238
}
22392239

22402240
async fn verify_object_integrity(&self, bucket: &str, object: &str, opts: &ObjectOptions) -> Result<()> {
2241-
let mut get_object_reader =
2242-
<Self as ObjectIO>::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?;
2243-
let _ = get_object_reader.read_all().await?;
2241+
let get_object_reader = <Self as ObjectIO>::get_object_reader(self, bucket, object, None, HeaderMap::new(), opts).await?;
2242+
// Stream to sink to avoid loading entire object into memory during verification
2243+
let mut reader = get_object_reader.stream;
2244+
tokio::io::copy(&mut reader, &mut tokio::io::sink()).await?;
22442245
Ok(())
22452246
}
22462247
}

crates/notify/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ categories = ["web-programming", "development-tools", "filesystem"]
2626
documentation = "https://docs.rs/rustfs-notify/latest/rustfs_notify/"
2727

2828
[dependencies]
29-
rustfs-config = { workspace = true, features = ["notify"] }
29+
rustfs-config = { workspace = true, features = ["notify", "constants"] }
3030
rustfs-ecstore = { workspace = true }
3131
rustfs-utils = { workspace = true, features = ["path", "sys"] }
3232
async-trait = { workspace = true }

crates/utils/src/certs.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,13 @@ pub fn create_multi_cert_resolver(
196196

197197
/// Checks if TLS key logging is enabled.
198198
pub fn tls_key_log() -> bool {
199-
env::var(rustfs_config::ENV_TLS_KEYLOG)
199+
env::var("RUSTFS_TLS_KEYLOG")
200200
.map(|v| {
201-
v.eq_ignore_ascii_case(rustfs_config::EnableState::One.as_str())
202-
|| v.eq_ignore_ascii_case(rustfs_config::EnableState::On.as_str())
203-
|| v.eq_ignore_ascii_case(rustfs_config::EnableState::True.as_str())
204-
|| v.eq_ignore_ascii_case(rustfs_config::EnableState::Yes.as_str())
201+
let v = v.trim();
202+
v.eq_ignore_ascii_case("1")
203+
|| v.eq_ignore_ascii_case("on")
204+
|| v.eq_ignore_ascii_case("true")
205+
|| v.eq_ignore_ascii_case("yes")
205206
})
206207
.unwrap_or(false)
207208
}

0 commit comments

Comments
 (0)