diff --git a/Cargo.lock b/Cargo.lock index ded20e53..b69b8246 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8123,6 +8123,7 @@ dependencies = [ "serial_test", "tempfile", "thiserror 2.0.12", + "time", "tokio", "tokio-util", "tracing", diff --git a/crates/ahm/Cargo.toml b/crates/ahm/Cargo.toml index d94f32aa..0e7be349 100644 --- a/crates/ahm/Cargo.toml +++ b/crates/ahm/Cargo.toml @@ -22,6 +22,7 @@ tokio = { workspace = true, features = ["full"] } tokio-util = { workspace = true } tracing = { workspace = true } serde = { workspace = true, features = ["derive"] } +time.workspace = true serde_json = { workspace = true } thiserror = { workspace = true } uuid = { workspace = true, features = ["v4", "serde"] } diff --git a/crates/ahm/src/scanner/data_scanner.rs b/crates/ahm/src/scanner/data_scanner.rs index 3aa4b9df..e7dde0a0 100644 --- a/crates/ahm/src/scanner/data_scanner.rs +++ b/crates/ahm/src/scanner/data_scanner.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::path::{Path, PathBuf}; use std::{ collections::HashMap, sync::Arc, @@ -38,10 +39,13 @@ use crate::{ }; use rustfs_common::data_usage::DataUsageInfo; +use rustfs_common::data_usage::SizeSummary; use rustfs_common::metrics::{Metric, Metrics, globalMetrics}; +use rustfs_ecstore::bucket::versioning::VersioningApi; +use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys; use rustfs_ecstore::cmd::bucket_targets::VersioningConfig; - use rustfs_ecstore::disk::RUSTFS_META_BUCKET; +use rustfs_utils::path::path_to_bucket_object_with_base_path; /// Custom scan mode enum for AHM scanner #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] @@ -1263,11 +1267,91 @@ impl Scanner { } else { // Apply lifecycle actions if let Some(lifecycle_config) = &lifecycle_config { - let mut scanner_item = - ScannerItem::new(bucket.to_string(), Some(lifecycle_config.clone()), versioning_config.clone()); - if let Err(e) = scanner_item.apply_actions(&entry.name, entry.clone()).await { - error!("Failed to apply lifecycle actions for {}/{}: {}", bucket, entry.name, e); + let sub_path = entry.path(); + let ent_name = Path::new(&folder.name).join(&sub_path); + + let vcfg = BucketVersioningSys::get(bucket).await.ok(); + + let mut scanner_item = ScannerItem { + //path: Path::new(&self.root).join(&ent_name).to_string_lossy().to_string(), + bucket: bucket.to_string(), + prefix: Path::new(&prefix) + .parent() + .unwrap_or(Path::new("")) + .to_string_lossy() + .to_string(), + object_name: ent_name + .file_name() + .map(|name| name.to_string_lossy().into_owned()) + .unwrap_or_default(), + lifecycle: Some(lifecycle_config.clone()), + versioning: versioning_config.clone(), + }; + //ScannerItem::new(bucket.to_string(), Some(lifecycle_config.clone()), versioning_config.clone()); + let fivs = match entry.clone().file_info_versions(&scanner_item.bucket) { + Ok(fivs) => fivs, + Err(err) => { + stop_fn(); + return Err(Error::other("skip this file").into()); + } + }; + let mut size_s = SizeSummary::default(); + let obj_infos = match scanner_item.apply_versions_actions(&fivs.versions).await { + Ok(obj_infos) => obj_infos, + Err(err) => { + stop_fn(); + return Err(Error::other("skip this file").into()); + } + }; + + let versioned = if let Some(vcfg) = vcfg.as_ref() { + vcfg.versioned(scanner_item.object_path().to_str().unwrap_or_default()) + } else { + false + }; + + let mut obj_deleted = false; + for info in obj_infos.iter() { + let sz: i64; + (obj_deleted, sz) = scanner_item.apply_actions(info, &mut size_s).await; + + /*if obj_deleted { + break; + }*/ + + let actual_sz = match info.get_actual_size() { + Ok(size) => size, + Err(_) => continue, + }; + + if info.delete_marker { + size_s.delete_markers += 1; + } + + if info.version_id.is_some() && sz == actual_sz { + size_s.versions += 1; + } + + size_s.total_size += sz as usize; + + if info.delete_marker { + continue; + } } + + for free_version in fivs.free_versions.iter() { + let _obj_info = rustfs_ecstore::store_api::ObjectInfo::from_file_info( + free_version, + &scanner_item.bucket, + &scanner_item.object_path().to_string_lossy(), + versioned, + ); + } + + // todo: global trace + /*if obj_deleted { + return Err(Error::other(ERR_IGNORE_FILE_CONTRIB).into()); + }*/ } // Store object metadata for later analysis diff --git a/crates/ahm/src/scanner/lifecycle.rs b/crates/ahm/src/scanner/lifecycle.rs index 5d33399d..bf3fbfd8 100644 --- a/crates/ahm/src/scanner/lifecycle.rs +++ b/crates/ahm/src/scanner/lifecycle.rs @@ -12,67 +12,197 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use time::OffsetDateTime; +use crate::error::{Error, Result}; +use rustfs_common::data_usage::SizeSummary; use rustfs_common::metrics::IlmAction; -use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_audit::LcEventSrc; -use rustfs_ecstore::bucket::lifecycle::bucket_lifecycle_ops::{apply_lifecycle_action, eval_action_from_lifecycle}; +use rustfs_ecstore::bucket::lifecycle::{ + bucket_lifecycle_audit::LcEventSrc, + bucket_lifecycle_ops::{GLOBAL_ExpiryState, GLOBAL_TransitionState, apply_lifecycle_action, eval_action_from_lifecycle}, + lifecycle, + lifecycle::Lifecycle, +}; use rustfs_ecstore::bucket::metadata_sys::get_object_lock_config; +use rustfs_ecstore::bucket::object_lock::objectlock_sys::{BucketObjectLockSys, enforce_retention_for_deletion}; +use rustfs_ecstore::bucket::versioning::VersioningApi; +use rustfs_ecstore::bucket::versioning_sys::BucketVersioningSys; use rustfs_ecstore::cmd::bucket_targets::VersioningConfig; -use rustfs_ecstore::store_api::ObjectInfo; +use rustfs_ecstore::store_api::{ObjectInfo, ObjectToDelete}; +use rustfs_filemeta::FileInfo; use rustfs_filemeta::FileMetaVersion; use rustfs_filemeta::metacache::MetaCacheEntry; +use rustfs_utils::path::path_join; use s3s::dto::BucketLifecycleConfiguration as LifecycleConfig; use tracing::info; +static SCANNER_EXCESS_OBJECT_VERSIONS: AtomicU64 = AtomicU64::new(100); +static SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE: AtomicU64 = AtomicU64::new(1024 * 1024 * 1024 * 1024); // 1 TB + #[derive(Clone)] pub struct ScannerItem { - bucket: String, - lifecycle: Option>, - versioning: Option>, + pub bucket: String, + pub prefix: String, + pub object_name: String, + pub lifecycle: Option>, + pub versioning: Option>, } impl ScannerItem { pub fn new(bucket: String, lifecycle: Option>, versioning: Option>) -> Self { Self { bucket, + prefix: "".to_string(), + object_name: "".to_string(), lifecycle, versioning, } } - pub async fn apply_actions(&mut self, object: &str, mut meta: MetaCacheEntry) -> anyhow::Result<()> { - info!("apply_actions called for object: {}", object); - if self.lifecycle.is_none() { - info!("No lifecycle config for object: {}", object); - return Ok(()); + pub async fn apply_versions_actions(&self, fivs: &[FileInfo]) -> Result> { + let obj_infos = self.apply_newer_noncurrent_version_limit(fivs).await?; + if obj_infos.len() >= SCANNER_EXCESS_OBJECT_VERSIONS.load(Ordering::SeqCst) as usize { + // todo } - info!("Lifecycle config exists for object: {}", object); - let file_meta = match meta.xl_meta() { - Ok(meta) => meta, - Err(e) => { - tracing::error!("Failed to get xl_meta for {}: {}", object, e); - return Ok(()); - } + let mut cumulative_size = 0; + for obj_info in obj_infos.iter() { + cumulative_size += obj_info.size; + } + + if cumulative_size >= SCANNER_EXCESS_OBJECT_VERSIONS_TOTAL_SIZE.load(Ordering::SeqCst) as i64 { + //todo + } + + Ok(obj_infos) + } + + pub async fn apply_newer_noncurrent_version_limit(&self, fivs: &[FileInfo]) -> Result> { + let lock_enabled = if let Some(rcfg) = BucketObjectLockSys::get(&self.bucket).await { + rcfg.mode.is_some() + } else { + false }; + let _vcfg = BucketVersioningSys::get(&self.bucket).await?; - let latest_version = file_meta.versions.first().cloned().unwrap_or_default(); - let file_meta_version = FileMetaVersion::try_from(latest_version.meta.as_slice()).unwrap_or_default(); - - let obj_info = ObjectInfo { - bucket: self.bucket.clone(), - name: object.to_string(), - version_id: latest_version.header.version_id, - mod_time: latest_version.header.mod_time, - size: file_meta_version.object.as_ref().map_or(0, |o| o.size), - user_defined: serde_json::from_slice(file_meta.data.as_slice()).unwrap_or_default(), - ..Default::default() + let versioned = match BucketVersioningSys::get(&self.bucket).await { + Ok(vcfg) => vcfg.versioned(self.object_path().to_str().unwrap_or_default()), + Err(_) => false, }; + let mut object_infos = Vec::with_capacity(fivs.len()); + + if self.lifecycle.is_none() { + for info in fivs.iter() { + object_infos.push(ObjectInfo::from_file_info( + info, + &self.bucket, + &self.object_path().to_string_lossy(), + versioned, + )); + } + return Ok(object_infos); + } + + let event = self + .lifecycle + .as_ref() + .expect("lifecycle err.") + .clone() + .noncurrent_versions_expiration_limit(&lifecycle::ObjectOpts { + name: self.object_path().to_string_lossy().to_string(), + ..Default::default() + }) + .await; + let lim = event.newer_noncurrent_versions; + if lim == 0 || fivs.len() <= lim + 1 { + for fi in fivs.iter() { + object_infos.push(ObjectInfo::from_file_info( + fi, + &self.bucket, + &self.object_path().to_string_lossy(), + versioned, + )); + } + return Ok(object_infos); + } + + let overflow_versions = &fivs[lim + 1..]; + for fi in fivs[..lim + 1].iter() { + object_infos.push(ObjectInfo::from_file_info( + fi, + &self.bucket, + &self.object_path().to_string_lossy(), + versioned, + )); + } + + let mut to_del = Vec::::with_capacity(overflow_versions.len()); + for fi in overflow_versions.iter() { + let obj = ObjectInfo::from_file_info(fi, &self.bucket, &self.object_path().to_string_lossy(), versioned); + if lock_enabled && enforce_retention_for_deletion(&obj) { + //if enforce_retention_for_deletion(&obj) { + /*if self.debug { + if obj.version_id.is_some() { + info!("lifecycle: {} v({}) is locked, not deleting\n", obj.name, obj.version_id.expect("err")); + } else { + info!("lifecycle: {} is locked, not deleting\n", obj.name); + } + }*/ + object_infos.push(obj); + continue; + } + + if OffsetDateTime::now_utc().unix_timestamp() + < lifecycle::expected_expiry_time(obj.successor_mod_time.expect("err"), event.noncurrent_days as i32) + .unix_timestamp() + { + object_infos.push(obj); + continue; + } + + to_del.push(ObjectToDelete { + object_name: obj.name, + version_id: obj.version_id, + }); + } + + if !to_del.is_empty() { + let mut expiry_state = GLOBAL_ExpiryState.write().await; + expiry_state.enqueue_by_newer_noncurrent(&self.bucket, to_del, event).await; + } + + Ok(object_infos) + } + + pub async fn apply_actions(&mut self, oi: &ObjectInfo, _size_s: &mut SizeSummary) -> (bool, i64) { + let (action, _size) = self.apply_lifecycle(oi).await; + + info!( + "apply_actions {} {} {:?} {:?}", + oi.bucket.clone(), + oi.name.clone(), + oi.version_id.clone(), + oi.user_defined.clone() + ); - self.apply_lifecycle(&obj_info).await; + // Create a mutable clone if you need to modify fields + /*let mut oi = oi.clone(); + oi.replication_status = ReplicationStatusType::from( + oi.user_defined + .get("x-amz-bucket-replication-status") + .unwrap_or(&"PENDING".to_string()), + ); + info!("apply status is: {:?}", oi.replication_status); + self.heal_replication(&oi, _size_s).await;*/ - Ok(()) + if action.delete_all() { + return (true, 0); + } + + (false, oi.size) } async fn apply_lifecycle(&mut self, oi: &ObjectInfo) -> (IlmAction, i64) { @@ -122,4 +252,8 @@ impl ScannerItem { apply_lifecycle_action(&lc_evt, &LcEventSrc::Scanner, oi).await; (lc_evt.action, new_size) } + + pub fn object_path(&self) -> PathBuf { + path_join(&[PathBuf::from(self.prefix.clone()), PathBuf::from(self.object_name.clone())]) + } } diff --git a/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs b/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs index 6fe4c835..50e87a12 100644 --- a/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs +++ b/crates/ecstore/src/bucket/lifecycle/bucket_lifecycle_ops.rs @@ -1,4 +1,3 @@ -#![allow(unused_imports)] // Copyright 2024 RustFS Team // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,6 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. +#![allow(unused_imports)] #![allow(unused_variables)] #![allow(unused_mut)] #![allow(unused_assignments)] @@ -587,7 +587,7 @@ impl TransitionState { pub async fn init_background_expiry(api: Arc) { let mut workers = num_cpus::get() / 2; //globalILMConfig.getExpirationWorkers() - if let Ok(env_expiration_workers) = env::var("_RUSTFS_EXPIRATION_WORKERS") { + if let Ok(env_expiration_workers) = env::var("_RUSTFS_ILM_EXPIRATION_WORKERS") { if let Ok(num_expirations) = env_expiration_workers.parse::() { workers = num_expirations; } diff --git a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs index f1d4a311..82eb4871 100644 --- a/crates/ecstore/src/bucket/lifecycle/lifecycle.rs +++ b/crates/ecstore/src/bucket/lifecycle/lifecycle.rs @@ -25,6 +25,7 @@ use s3s::dto::{ use std::cmp::Ordering; use std::env; use std::fmt::Display; +use std::sync::Arc; use time::macros::{datetime, offset}; use time::{self, Duration, OffsetDateTime}; use tracing::info; @@ -138,7 +139,7 @@ pub trait Lifecycle { async fn eval(&self, obj: &ObjectOpts) -> Event; async fn eval_inner(&self, obj: &ObjectOpts, now: OffsetDateTime) -> Event; //fn set_prediction_headers(&self, w: http.ResponseWriter, obj: ObjectOpts); - async fn noncurrent_versions_expiration_limit(&self, obj: &ObjectOpts) -> Event; + async fn noncurrent_versions_expiration_limit(self: Arc, obj: &ObjectOpts) -> Event; } #[async_trait::async_trait] @@ -538,7 +539,7 @@ impl Lifecycle for BucketLifecycleConfiguration { Event::default() } - async fn noncurrent_versions_expiration_limit(&self, obj: &ObjectOpts) -> Event { + async fn noncurrent_versions_expiration_limit(self: Arc, obj: &ObjectOpts) -> Event { if let Some(filter_rules) = self.filter_rules(obj).await { for rule in filter_rules.iter() { if let Some(ref noncurrent_version_expiration) = rule.noncurrent_version_expiration { @@ -626,7 +627,7 @@ pub fn expected_expiry_time(mod_time: OffsetDateTime, days: i32) -> OffsetDateTi .to_offset(offset!(-0:00:00)) .saturating_add(Duration::days(days as i64)); let mut hour = 3600; - if let Ok(env_ilm_hour) = env::var("_RUSTFS_ILM_HOUR") { + if let Ok(env_ilm_hour) = env::var("_RUSTFS_ILM_PROCESS_TIME") { if let Ok(num_hour) = env_ilm_hour.parse::() { hour = num_hour; }