Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
b8c4bac
fix
likewu Jul 5, 2025
f080c98
fix
likewu Jul 5, 2025
82dca58
fix
likewu Jul 5, 2025
c322a2a
fix delete-marker expiration. add api_restore.
likewu Jul 6, 2025
60ef1b7
fix
likewu Jul 6, 2025
29287d8
time retry object upload
likewu Jul 7, 2025
96a3315
lock file
likewu Jul 7, 2025
986ecd5
make fmt
likewu Jul 8, 2025
78b4592
Merge branch 'main' of https://github.com/rustfs/rustfs into feature-…
likewu Jul 8, 2025
2fa631f
fix
likewu Jul 8, 2025
59f6968
restore object
likewu Jul 9, 2025
3122584
fix
likewu Jul 10, 2025
f578890
Merge branch 'main' of https://github.com/rustfs/rustfs into feature-…
likewu Jul 10, 2025
fc8c3b7
fix
likewu Jul 11, 2025
ca92042
Merge branch 'main' of https://github.com/rustfs/rustfs into feature-…
likewu Jul 11, 2025
4c8db60
serde-rs-xml -> quick-xml
likewu Jul 11, 2025
e128a46
fix
likewu Jul 14, 2025
d88f166
Merge branch 'main' of https://github.com/rustfs/rustfs into feature-…
likewu Jul 20, 2025
2603e6a
Merge branch 'main' of https://github.com/rustfs/rustfs into feature-…
likewu Jul 20, 2025
83743dd
checksum
likewu Jul 22, 2025
61ce97a
fix
likewu Jul 25, 2025
af0e5c1
Merge branch 'main' of https://github.com/rustfs/rustfs into feature-…
likewu Jul 26, 2025
fc1c446
Merge branch 'main' of https://github.com/rustfs/rustfs into feature-…
likewu Jul 26, 2025
86e82ff
fix
likewu Jul 28, 2025
969bdc2
fix
likewu Jul 28, 2025
c88a008
fix
likewu Jul 28, 2025
72d0927
fix
likewu Jul 29, 2025
c36a0ae
fix
likewu Jul 29, 2025
22dc267
fix
likewu Jul 29, 2025
18cee82
Merge branch 'main' of https://github.com/rustfs/rustfs into feature-…
likewu Jul 29, 2025
76e1088
Merge branch 'main' of https://github.com/rustfs/rustfs into feature-…
likewu Aug 4, 2025
6964363
ilm env
likewu Aug 6, 2025
0dd1214
Merge branch 'main' of https://github.com/rustfs/rustfs into feature-…
likewu Aug 8, 2025
de1da2e
scanner_item prefix object_name
likewu Aug 17, 2025
e529566
oi
likewu Aug 18, 2025
aefb7f1
fix
likewu Aug 18, 2025
81beada
fix retry
likewu Aug 23, 2025
f10e471
fix
likewu Aug 23, 2025
5654494
fix
likewu Aug 24, 2025
e8898d4
object_path
likewu Aug 26, 2025
118c624
object_name
likewu Aug 26, 2025
650baeb
fi version_purge_status
likewu Aug 26, 2025
dc7be7f
old_dir None
likewu Aug 26, 2025
9848a98
fix
likewu Aug 26, 2025
c3c626a
fix
likewu Aug 26, 2025
0abcb0f
Merge branch 'main' of https://github.com/rustfs/rustfs into feature-…
likewu Aug 26, 2025
c29b555
Merge branch 'main' of https://github.com/rustfs/rustfs into feature-…
likewu Aug 28, 2025
d5c4463
fix test
likewu Aug 30, 2025
b2534fd
fix
likewu Aug 30, 2025
f992bc5
fix test case
likewu Aug 30, 2025
7ae668c
fix
likewu Aug 30, 2025
8344112
Merge branch 'main' of github.com:rustfs/rustfs into feature-up/ilm
houseme Aug 30, 2025
8015c94
Merge branch 'main' of https://github.com/rustfs/rustfs into feature-…
likewu Aug 30, 2025
dfd36d7
Merge branch 'feature-up/ilm' of https://github.com/rustfs/rustfs int…
likewu Aug 30, 2025
7d18309
fix
houseme Aug 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@
"sourceLanguages": [
"rust"
],
},
{
"name": "Debug executable target/debug/test",
"type": "lldb",
"request": "launch",
"program": "${workspaceFolder}/target/debug/deps/lifecycle_integration_test-5eb7590b8f3bea55",
"args": [],
"cwd": "${workspaceFolder}",
//"stopAtEntry": false,
//"preLaunchTask": "cargo build",
"sourceLanguages": [
"rust"
],
}
]
}
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/ahm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
serde = { workspace = true, features = ["derive"] }
time.workspace = true
serde_json = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
Expand Down
85 changes: 79 additions & 6 deletions crates/ahm/src/scanner/data_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::{
};

use ecstore::{
disk::{DiskAPI, DiskStore, WalkDirOptions},
disk::{Disk, DiskAPI, DiskStore, WalkDirOptions},
set_disk::SetDisks,
};
use rustfs_ecstore::{self as ecstore, StorageAPI, data_usage::store_data_usage_in_backend};
Expand All @@ -38,9 +38,11 @@ use crate::{
};

use rustfs_common::data_usage::DataUsageInfo;
use rustfs_common::data_usage::SizeSummary;
use rustfs_common::metrics::{Metric, Metrics, globalMetrics};
use rustfs_ecstore::bucket::versioning::VersioningApi;
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
use rustfs_ecstore::cmd::bucket_targets::VersioningConfig;

use rustfs_ecstore::disk::RUSTFS_META_BUCKET;

/// Custom scan mode enum for AHM scanner
Expand Down Expand Up @@ -1282,10 +1284,81 @@ impl Scanner {
} else {
// Apply lifecycle actions
if let Some(lifecycle_config) = &lifecycle_config {
let mut scanner_item =
ScannerItem::new(bucket.to_string(), Some(lifecycle_config.clone()), versioning_config.clone());
if let Err(e) = scanner_item.apply_actions(&entry.name, entry.clone()).await {
error!("Failed to apply lifecycle actions for {}/{}: {}", bucket, entry.name, e);
if let Disk::Local(_local_disk) = &**disk {
let vcfg = BucketVersioningSys::get(bucket).await.ok();

let mut scanner_item = ScannerItem {
bucket: bucket.to_string(),
object_name: entry.name.clone(),
lifecycle: Some(lifecycle_config.clone()),
versioning: versioning_config.clone(),
};
//ScannerItem::new(bucket.to_string(), Some(lifecycle_config.clone()), versioning_config.clone());
let fivs = match entry.clone().file_info_versions(&scanner_item.bucket) {
Ok(fivs) => fivs,
Err(_err) => {
stop_fn();
return Err(Error::other("skip this file"));
}
};
let mut size_s = SizeSummary::default();
let obj_infos = match scanner_item.apply_versions_actions(&fivs.versions).await {
Ok(obj_infos) => obj_infos,
Err(_err) => {
stop_fn();
return Err(Error::other("skip this file"));
}
};

let versioned = if let Some(vcfg) = vcfg.as_ref() {
vcfg.versioned(&scanner_item.object_name)
} else {
false
};

#[allow(unused_assignments)]
let mut obj_deleted = false;
for info in obj_infos.iter() {
let sz: i64;
(obj_deleted, sz) = scanner_item.apply_actions(info, &mut size_s).await;

if obj_deleted {
break;
}

let actual_sz = match info.get_actual_size() {
Ok(size) => size,
Err(_) => continue,
};

if info.delete_marker {
size_s.delete_markers += 1;
}

if info.version_id.is_some() && sz == actual_sz {
size_s.versions += 1;
}

size_s.total_size += sz as usize;

if info.delete_marker {
continue;
}
}

for free_version in fivs.free_versions.iter() {
let _obj_info = rustfs_ecstore::store_api::ObjectInfo::from_file_info(
free_version,
&scanner_item.bucket,
&scanner_item.object_name,
versioned,
);
}

// todo: global trace
/*if obj_deleted {
return Err(Error::other(ERR_IGNORE_FILE_CONTRIB).into());
}*/
}
}

Expand Down
175 changes: 142 additions & 33 deletions crates/ahm/src/scanner/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,66 +13,175 @@
// limitations under the License.

use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use time::OffsetDateTime;

use crate::error::Result;
use rustfs_common::data_usage::SizeSummary;
use rustfs_common::metrics::IlmAction;
use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc;
use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::{apply_lifecycle_action, eval_action_from_lifecycle};
use rustfs_ecstore::bucket::lifecycle::{
bucket_lifecycle_audit::LcEventSrc,
bucket_lifecycle_ops::{GLOBAL_ExpiryState, apply_lifecycle_action, eval_action_from_lifecycle},
lifecycle,
lifecycle::Lifecycle,
};
use rustfs_ecstore::bucket::metadata_sys::get_object_lock_config;
use rustfs_ecstore::bucket::object_lock::objectlock_sys::{BucketObjectLockSys, enforce_retention_for_deletion};
use rustfs_ecstore::bucket::versioning::VersioningApi;
use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys;
use rustfs_ecstore::cmd::bucket_targets::VersioningConfig;
use rustfs_ecstore::store_api::ObjectInfo;
use rustfs_filemeta::FileMetaVersion;
use rustfs_filemeta::metacache::MetaCacheEntry;
use rustfs_ecstore::store_api::{ObjectInfo, ObjectToDelete};
use rustfs_filemeta::FileInfo;
use s3s::dto::BucketLifecycleConfiguration as LifecycleConfig;
use tracing::info;

static SCANNER_EXCESS_OBJECT_VERSIONS: AtomicU64 = AtomicU64::new(100);
static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(1024 * 1024 * 1024 * 1024); // 1 TB

#[derive(Clone)]
pub struct ScannerItem {
bucket: String,
lifecycle: Option<Arc<LifecycleConfig>>,
versioning: Option<Arc<VersioningConfig>>,
pub bucket: String,
pub object_name: String,
pub lifecycle: Option<Arc<LifecycleConfig>>,
pub versioning: Option<Arc<VersioningConfig>>,
}

impl ScannerItem {
pub fn new(bucket: String, lifecycle: Option<Arc<LifecycleConfig>>, versioning: Option<Arc<VersioningConfig>>) -> Self {
Self {
bucket,
object_name: "".to_string(),
lifecycle,
versioning,
}
}

pub async fn apply_actions(&mut self, object: &str, mut meta: MetaCacheEntry) -> anyhow::Result<()> {
info!("apply_actions called for object: {}", object);
if self.lifecycle.is_none() {
info!("No lifecycle config for object: {}", object);
return Ok(());
pub async fn apply_versions_actions(&self, fivs: &[FileInfo]) -> Result<Vec<ObjectInfo>> {
let obj_infos = self.apply_newer_noncurrent_version_limit(fivs).await?;
if obj_infos.len() >= SCANNER_EXCESS_OBJECT_VERSIONS.load(Ordering::SeqCst) as usize {
// todo
}
info!("Lifecycle config exists for object: {}", object);

let file_meta = match meta.xl_meta() {
Ok(meta) => meta,
Err(e) => {
tracing::error!("Failed to get xl_meta for {}: {}", object, e);
return Ok(());
}
let mut cumulative_size = 0;
for obj_info in obj_infos.iter() {
cumulative_size += obj_info.size;
}

if cumulative_size >= SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE.load(Ordering::SeqCst) as i64 {
//todo
}

Ok(obj_infos)
}

pub async fn apply_newer_noncurrent_version_limit(&self, fivs: &[FileInfo]) -> Result<Vec<ObjectInfo>> {
let lock_enabled = if let Some(rcfg) = BucketObjectLockSys::get(&self.bucket).await {
rcfg.mode.is_some()
} else {
false
};
let _vcfg = BucketVersioningSys::get(&self.bucket).await?;

let latest_version = file_meta.versions.first().cloned().unwrap_or_default();
let file_meta_version = FileMetaVersion::try_from(latest_version.meta.as_slice()).unwrap_or_default();

let obj_info = ObjectInfo {
bucket: self.bucket.clone(),
name: object.to_string(),
version_id: latest_version.header.version_id,
mod_time: latest_version.header.mod_time,
size: file_meta_version.object.as_ref().map_or(0, |o| o.size),
user_defined: serde_json::from_slice(file_meta.data.as_slice()).unwrap_or_default(),
..Default::default()
let versioned = match BucketVersioningSys::get(&self.bucket).await {
Ok(vcfg) => vcfg.versioned(&self.object_name),
Err(_) => false,
};
let mut object_infos = Vec::with_capacity(fivs.len());

if self.lifecycle.is_none() {
for info in fivs.iter() {
object_infos.push(ObjectInfo::from_file_info(info, &self.bucket, &self.object_name, versioned));
}
return Ok(object_infos);
}

self.apply_lifecycle(&obj_info).await;
let event = self
.lifecycle
.as_ref()
.expect("lifecycle err.")
.clone()
.noncurrent_versions_expiration_limit(&lifecycle::ObjectOpts {
name: self.object_name.clone(),
..Default::default()
})
.await;
let lim = event.newer_noncurrent_versions;
if lim == 0 || fivs.len() <= lim + 1 {
for fi in fivs.iter() {
object_infos.push(ObjectInfo::from_file_info(fi, &self.bucket, &self.object_name, versioned));
}
return Ok(object_infos);
}

let overflow_versions = &fivs[lim + 1..];
for fi in fivs[..lim + 1].iter() {
object_infos.push(ObjectInfo::from_file_info(fi, &self.bucket, &self.object_name, versioned));
}

let mut to_del = Vec::<ObjectToDelete>::with_capacity(overflow_versions.len());
for fi in overflow_versions.iter() {
let obj = ObjectInfo::from_file_info(fi, &self.bucket, &self.object_name, versioned);
if lock_enabled && enforce_retention_for_deletion(&obj) {
//if enforce_retention_for_deletion(&obj) {
/*if self.debug {
if obj.version_id.is_some() {
info!("lifecycle: {} v({}) is locked, not deleting\n", obj.name, obj.version_id.expect("err"));
} else {
info!("lifecycle: {} is locked, not deleting\n", obj.name);
}
}*/
object_infos.push(obj);
continue;
}

if OffsetDateTime::now_utc().unix_timestamp()
< lifecycle::expected_expiry_time(obj.successor_mod_time.expect("err"), event.noncurrent_days as i32)
.unix_timestamp()
{
object_infos.push(obj);
continue;
}

to_del.push(ObjectToDelete {
object_name: obj.name,
version_id: obj.version_id,
});
}

if !to_del.is_empty() {
let mut expiry_state = GLOBAL_ExpiryState.write().await;
expiry_state.enqueue_by_newer_noncurrent(&self.bucket, to_del, event).await;
}

Ok(object_infos)
}

pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, i64) {
let (action, _size) = self.apply_lifecycle(oi).await;

info!(
"apply_actions {} {} {:?} {:?}",
oi.bucket.clone(),
oi.name.clone(),
oi.version_id.clone(),
oi.user_defined.clone()
);

// Create a mutable clone if you need to modify fields
/*let mut oi = oi.clone();
oi.replication_status = ReplicationStatusType::from(
oi.user_defined
.get("x-amz-bucket-replication-status")
.unwrap_or(&"PENDING".to_string()),
);
info!("apply status is: {:?}", oi.replication_status);
self.heal_replication(&oi, _size_s).await;*/

if action.delete_all() {
return (true, 0);
}

Ok(())
(false, oi.size)
}

async fn apply_lifecycle(&mut self, oi: &ObjectInfo) -> (IlmAction, i64) {
Expand Down
Loading