diff --git a/Cargo.lock b/Cargo.lock index ded20e538..58ce2e9d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -649,6 +649,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a89cbf775b137e9b968e67227ef7f775587cde3fd31b0d8599dbd0f598a48340" +dependencies = [ + "bytemuck", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1399,6 +1408,12 @@ version = "3.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" +[[package]] +name = "bytemuck" +version = "1.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3995eaeebcdf32f91f980d360f78732ddc061097ab4e39991ae7a6ace9194677" + [[package]] name = "byteorder" version = "1.5.0" @@ -3792,6 +3807,20 @@ dependencies = [ "rustc_version", ] +[[package]] +name = "figment" +version = "0.10.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8cb01cd46b0cf372153850f4c6c272d9cbea2da513e07538405148f95bd789f3" +dependencies = [ + "atomic", + "pear", + "serde", + "serde_json", + "uncased", + "version_check", +] + [[package]] name = "filetime" version = "0.2.25" @@ -5010,6 +5039,12 @@ dependencies = [ "cfb", ] +[[package]] +name = "inlinable_string" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8fae54786f62fb2918dcfae3d568594e50eb9b5c25bf04371af6fe7516452fb" + [[package]] name = "inotify" version = "0.11.0" @@ -6739,6 +6774,29 @@ dependencies = [ "hmac 0.12.1", ] +[[package]] +name = "pear" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdeeaa00ce488657faba8ebf44ab9361f9365a97bd39ffb8a60663f57ff4b467" +dependencies = [ + "inlinable_string", + "pear_codegen", + "yansi", +] + +[[package]] +name = "pear_codegen" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bab5b985dc082b345f812b7df84e1bef27e7207b39e448439ba8bd69c93f147" +dependencies = [ + "proc-macro2", + "proc-macro2-diagnostics", + "quote", + "syn 2.0.104", +] + [[package]] name = "pem" version = "3.0.5" @@ -7200,6 +7258,7 @@ dependencies = [ "quote", "syn 2.0.104", "version_check", + "yansi", ] [[package]] @@ -8062,6 +8121,7 @@ dependencies = [ "rust-embed", "rustfs-ahm", "rustfs-appauth", + "rustfs-audit-logger", "rustfs-common", "rustfs-config", "rustfs-ecstore", @@ -8142,6 +8202,24 @@ dependencies = [ "serde_json", ] +[[package]] +name = "rustfs-audit-logger" +version = "0.0.5" +dependencies = [ + "async-trait", + "chrono", + "figment", + "reqwest", + "serde", + "serde_json", + "thiserror 2.0.12", + "tokio", + "tracing", + "tracing-core", + "url", + "uuid", +] + [[package]] name = "rustfs-checksums" version = "0.0.5" @@ -10782,6 +10860,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "uncased" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697" +dependencies = [ + "version_check", +] + [[package]] name = "unicase" version = "2.8.1" diff --git a/Cargo.toml b/Cargo.toml index 9f8daa5c6..d73819c61 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ members = [ "rustfs", # Core file system implementation "cli/rustfs-gui", # Graphical user interface client "crates/appauth", # Application authentication and authorization + "crates/audit-logger", # Audit logging system for file operations "crates/common", # Shared utilities and data structures "crates/config", # Configuration management "crates/crypto", # Cryptography and security features @@ -37,7 +38,7 @@ members = [ "crates/utils", # Utility functions and helpers "crates/workers", # Worker thread pools and task scheduling "crates/zip", # ZIP file handling and compression - "crates/ahm", + "crates/ahm", # Asynchronous Hash Map for concurrent data structures "crates/mcp", # MCP server for S3 operations ] resolver = "2" @@ -59,15 +60,11 @@ unsafe_code = "deny" [workspace.lints.clippy] all = "warn" -[patch.crates-io] -rustfs-utils = { path = "crates/utils" } -rustfs-filemeta = { path = "crates/filemeta" } -rustfs-rio = { path = "crates/rio" } - [workspace.dependencies] rustfs-ahm = { path = "crates/ahm", version = "0.0.5" } rustfs-s3select-api = { path = "crates/s3select-api", version = "0.0.5" } rustfs-appauth = { path = "crates/appauth", version = "0.0.5" } +rustfs-audit-logger = { path = "crates/audit-logger", version = "0.0.5" } rustfs-common = { path = "crates/common", version = "0.0.5" } rustfs-crypto = { path = "crates/crypto", version = "0.0.5" } rustfs-ecstore = { path = "crates/ecstore", version = "0.0.5" } @@ -278,7 +275,7 @@ zstd = "0.13.3" [workspace.metadata.cargo-shear] -ignored = ["rustfs", "rust-i18n", "rustfs-mcp"] +ignored = ["rustfs", "rust-i18n", "rustfs-mcp", "rustfs-audit-logger", "tokio-test"] [profile.wasm-dev] inherits = "dev" diff --git a/crates/audit-logger/Cargo.toml b/crates/audit-logger/Cargo.toml new file mode 100644 index 000000000..326c53c78 --- /dev/null +++ b/crates/audit-logger/Cargo.toml @@ -0,0 +1,43 @@ +# Copyright 2024 RustFS Team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +[package] +name = "rustfs-audit-logger" +edition.workspace = true +license.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true +homepage.workspace = true +description = "Audit logging system for RustFS, providing detailed logging of file operations and system events." +documentation = "https://docs.rs/audit-logger/latest/audit_logger/" +keywords = ["audit", "logging", "file-operations", "system-events", "RustFS"] +categories = ["web-programming", "development-tools::profiling", "asynchronous", "api-bindings", "development-tools::debugging"] + +[dependencies] +async-trait = { workspace = true } +chrono = { workspace = true } +reqwest = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +tracing = { workspace = true, features = ["std", "attributes"] } +tracing-core = { workspace = true } +tokio = { workspace = true, features = ["sync", "fs", "rt-multi-thread", "rt", "time", "macros"] } +url = { workspace = true } +uuid = { workspace = true } +thiserror = { workspace = true } +figment = { version = "0.10", features = ["json", "env"] } + +[lints] +workspace = true diff --git a/crates/audit-logger/examples/config.json b/crates/audit-logger/examples/config.json new file mode 100644 index 000000000..329f26adf --- /dev/null +++ b/crates/audit-logger/examples/config.json @@ -0,0 +1,34 @@ +{ + "console": { + "enabled": true + }, + "logger_webhook": { + "default": { + "enabled": true, + "endpoint": "http://localhost:3000/logs", + "auth_token": "secret-token-for-logs", + "batch_size": 5, + "queue_size": 1000, + "max_retry": 3, + "retry_interval": "2s" + } + }, + "audit_webhook": { + "splunk": { + "enabled": true, + "endpoint": "http://localhost:3000/audit", + "auth_token": "secret-token-for-audit", + "batch_size": 10 + } + }, + "audit_kafka": { + "default": { + "enabled": false, + "brokers": [ + "kafka1:9092", + "kafka2:9092" + ], + "topic": "minio-audit-events" + } + } +} \ No newline at end of file diff --git a/crates/audit-logger/examples/main.rs b/crates/audit-logger/examples/main.rs new file mode 100644 index 000000000..d051b80cc --- /dev/null +++ b/crates/audit-logger/examples/main.rs @@ -0,0 +1,65 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use figment::{ + Figment, + providers::{Env, Format, Json}, +}; +use logger::{AuditEntry, AuditLogger, Config, LogEntry, Trace}; +use tokio::time::{Duration, sleep}; + +#[tokio::main] +async fn main() { + // 1. 从文件和环境变量加载配置 + // 环境变量会覆盖文件中的设置 + // 例如:`AUDIT_WEBHOOK_DEFAULT_ENABLED=true AUDIT_WEBHOOK_DEFAULT_ENDPOINT=http://localhost:3000/logs` + let config: Config = Figment::new() + .merge(Json::file("config.json")) + .merge(Env::prefixed("").split("__")) + .extract() + .expect("Failed to load configuration"); + + println!("Loaded config: {:?}", config); + + // 2. 初始化记录器 + let logger = AuditLogger::new(&config); + + // 3. 发送一些日志 + println!("\n--- Sending logs ---"); + for i in 0..5 { + let log_entry = LogEntry { + deployment_id: "global-deployment-id".to_string(), + level: "INFO".to_string(), + message: format!("This is log message #{}", i), + trace: Some(Trace { + message: "An operation was performed".to_string(), + source: vec!["main.rs:45:main()".to_string()], + variables: Default::default(), + }), + time: chrono::Utc::now(), + request_id: uuid::Uuid::new_v4().to_string(), + }; + logger.log(log_entry).await; + + let audit_entry = AuditEntry::new("GetObject", "my-bucket", &format!("object-{}", i)); + logger.log(audit_entry).await; + + sleep(Duration::from_millis(100)).await; + } + println!("--- Finished sending logs ---\n"); + + // 4. 优雅地关闭 + // 这将确保所有缓冲的/队列中的日志在退出前被发送 + logger.shutdown().await; +} diff --git a/crates/audit-logger/src/entry/args.rs b/crates/audit-logger/src/entry/args.rs new file mode 100644 index 000000000..7711c9ceb --- /dev/null +++ b/crates/audit-logger/src/entry/args.rs @@ -0,0 +1,88 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::entry::ObjectVersion; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +/// Args - defines the arguments for API operations +/// Args is used to define the arguments for API operations. +/// +/// # Example +/// ``` +/// use rustfs_audit_logger::Args; +/// use std::collections::HashMap; +/// +/// let args = Args::new() +/// .set_bucket(Some("my-bucket".to_string())) +/// .set_object(Some("my-object".to_string())) +/// .set_version_id(Some("123".to_string())) +/// .set_metadata(Some(HashMap::new())); +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize, Default, Eq, PartialEq)] +pub struct Args { + #[serde(rename = "bucket", skip_serializing_if = "Option::is_none")] + pub bucket: Option, + #[serde(rename = "object", skip_serializing_if = "Option::is_none")] + pub object: Option, + #[serde(rename = "versionId", skip_serializing_if = "Option::is_none")] + pub version_id: Option, + #[serde(rename = "objects", skip_serializing_if = "Option::is_none")] + pub objects: Option>, + #[serde(rename = "metadata", skip_serializing_if = "Option::is_none")] + pub metadata: Option>, +} + +impl Args { + /// Create a new Args object + pub fn new() -> Self { + Args { + bucket: None, + object: None, + version_id: None, + objects: None, + metadata: None, + } + } + + /// Set the bucket + pub fn set_bucket(mut self, bucket: Option) -> Self { + self.bucket = bucket; + self + } + + /// Set the object + pub fn set_object(mut self, object: Option) -> Self { + self.object = object; + self + } + + /// Set the version ID + pub fn set_version_id(mut self, version_id: Option) -> Self { + self.version_id = version_id; + self + } + + /// Set the objects + pub fn set_objects(mut self, objects: Option>) -> Self { + self.objects = objects; + self + } + + /// Set the metadata + pub fn set_metadata(mut self, metadata: Option>) -> Self { + self.metadata = metadata; + self + } +} diff --git a/crates/audit-logger/src/entry/audit.rs b/crates/audit-logger/src/entry/audit.rs new file mode 100644 index 000000000..3fae37c2a --- /dev/null +++ b/crates/audit-logger/src/entry/audit.rs @@ -0,0 +1,467 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{BaseLogEntry, LogRecord, ObjectVersion}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashMap; + +/// API details structure +/// ApiDetails is used to define the details of an API operation +/// +/// The `ApiDetails` structure contains the following fields: +/// - `name` - the name of the API operation +/// - `bucket` - the bucket name +/// - `object` - the object name +/// - `objects` - the list of objects +/// - `status` - the status of the API operation +/// - `status_code` - the status code of the API operation +/// - `input_bytes` - the input bytes +/// - `output_bytes` - the output bytes +/// - `header_bytes` - the header bytes +/// - `time_to_first_byte` - the time to first byte +/// - `time_to_first_byte_in_ns` - the time to first byte in nanoseconds +/// - `time_to_response` - the time to response +/// - `time_to_response_in_ns` - the time to response in nanoseconds +/// +/// The `ApiDetails` structure contains the following methods: +/// - `new` - create a new `ApiDetails` with default values +/// - `set_name` - set the name +/// - `set_bucket` - set the bucket +/// - `set_object` - set the object +/// - `set_objects` - set the objects +/// - `set_status` - set the status +/// - `set_status_code` - set the status code +/// - `set_input_bytes` - set the input bytes +/// - `set_output_bytes` - set the output bytes +/// - `set_header_bytes` - set the header bytes +/// - `set_time_to_first_byte` - set the time to first byte +/// - `set_time_to_first_byte_in_ns` - set the time to first byte in nanoseconds +/// - `set_time_to_response` - set the time to response +/// - `set_time_to_response_in_ns` - set the time to response in nanoseconds +/// +/// # Example +/// ``` +/// use rustfs_audit_logger::ApiDetails; +/// use rustfs_audit_logger::ObjectVersion; +/// +/// let api = ApiDetails::new() +/// .set_name(Some("GET".to_string())) +/// .set_bucket(Some("my-bucket".to_string())) +/// .set_object(Some("my-object".to_string())) +/// .set_objects(vec![ObjectVersion::new_with_object_name("my-object".to_string())]) +/// .set_status(Some("OK".to_string())) +/// .set_status_code(Some(200)) +/// .set_input_bytes(100) +/// .set_output_bytes(200) +/// .set_header_bytes(Some(50)) +/// .set_time_to_first_byte(Some("100ms".to_string())) +/// .set_time_to_first_byte_in_ns(Some("100000000ns".to_string())) +/// .set_time_to_response(Some("200ms".to_string())) +/// .set_time_to_response_in_ns(Some("200000000ns".to_string())); +/// ``` +#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)] +pub struct ApiDetails { + #[serde(rename = "name", skip_serializing_if = "Option::is_none")] + pub name: Option, + #[serde(rename = "bucket", skip_serializing_if = "Option::is_none")] + pub bucket: Option, + #[serde(rename = "object", skip_serializing_if = "Option::is_none")] + pub object: Option, + #[serde(rename = "objects", skip_serializing_if = "Vec::is_empty", default)] + pub objects: Vec, + #[serde(rename = "status", skip_serializing_if = "Option::is_none")] + pub status: Option, + #[serde(rename = "statusCode", skip_serializing_if = "Option::is_none")] + pub status_code: Option, + #[serde(rename = "rx")] + pub input_bytes: i64, + #[serde(rename = "tx")] + pub output_bytes: i64, + #[serde(rename = "txHeaders", skip_serializing_if = "Option::is_none")] + pub header_bytes: Option, + #[serde(rename = "timeToFirstByte", skip_serializing_if = "Option::is_none")] + pub time_to_first_byte: Option, + #[serde(rename = "timeToFirstByteInNS", skip_serializing_if = "Option::is_none")] + pub time_to_first_byte_in_ns: Option, + #[serde(rename = "timeToResponse", skip_serializing_if = "Option::is_none")] + pub time_to_response: Option, + #[serde(rename = "timeToResponseInNS", skip_serializing_if = "Option::is_none")] + pub time_to_response_in_ns: Option, +} + +impl ApiDetails { + /// Create a new `ApiDetails` with default values + pub fn new() -> Self { + ApiDetails { + name: None, + bucket: None, + object: None, + objects: Vec::new(), + status: None, + status_code: None, + input_bytes: 0, + output_bytes: 0, + header_bytes: None, + time_to_first_byte: None, + time_to_first_byte_in_ns: None, + time_to_response: None, + time_to_response_in_ns: None, + } + } + + /// Set the name + pub fn set_name(mut self, name: Option) -> Self { + self.name = name; + self + } + + /// Set the bucket + pub fn set_bucket(mut self, bucket: Option) -> Self { + self.bucket = bucket; + self + } + + /// Set the object + pub fn set_object(mut self, object: Option) -> Self { + self.object = object; + self + } + + /// Set the objects + pub fn set_objects(mut self, objects: Vec) -> Self { + self.objects = objects; + self + } + + /// Set the status + pub fn set_status(mut self, status: Option) -> Self { + self.status = status; + self + } + + /// Set the status code + pub fn set_status_code(mut self, status_code: Option) -> Self { + self.status_code = status_code; + self + } + + /// Set the input bytes + pub fn set_input_bytes(mut self, input_bytes: i64) -> Self { + self.input_bytes = input_bytes; + self + } + + /// Set the output bytes + pub fn set_output_bytes(mut self, output_bytes: i64) -> Self { + self.output_bytes = output_bytes; + self + } + + /// Set the header bytes + pub fn set_header_bytes(mut self, header_bytes: Option) -> Self { + self.header_bytes = header_bytes; + self + } + + /// Set the time to first byte + pub fn set_time_to_first_byte(mut self, time_to_first_byte: Option) -> Self { + self.time_to_first_byte = time_to_first_byte; + self + } + + /// Set the time to first byte in nanoseconds + pub fn set_time_to_first_byte_in_ns(mut self, time_to_first_byte_in_ns: Option) -> Self { + self.time_to_first_byte_in_ns = time_to_first_byte_in_ns; + self + } + + /// Set the time to response + pub fn set_time_to_response(mut self, time_to_response: Option) -> Self { + self.time_to_response = time_to_response; + self + } + + /// Set the time to response in nanoseconds + pub fn set_time_to_response_in_ns(mut self, time_to_response_in_ns: Option) -> Self { + self.time_to_response_in_ns = time_to_response_in_ns; + self + } +} + +/// Entry - audit entry logs +/// AuditLogEntry is used to define the structure of an audit log entry +/// +/// The `AuditLogEntry` structure contains the following fields: +/// - `base` - the base log entry +/// - `version` - the version of the audit log entry +/// - `deployment_id` - the deployment ID +/// - `event` - the event +/// - `entry_type` - the type of audit message +/// - `api` - the API details +/// - `remote_host` - the remote host +/// - `user_agent` - the user agent +/// - `req_path` - the request path +/// - `req_host` - the request host +/// - `req_claims` - the request claims +/// - `req_query` - the request query +/// - `req_header` - the request header +/// - `resp_header` - the response header +/// - `access_key` - the access key +/// - `parent_user` - the parent user +/// - `error` - the error +/// +/// The `AuditLogEntry` structure contains the following methods: +/// - `new` - create a new `AuditEntry` with default values +/// - `new_with_values` - create a new `AuditEntry` with version, time, event and api details +/// - `with_base` - set the base log entry +/// - `set_version` - set the version +/// - `set_deployment_id` - set the deployment ID +/// - `set_event` - set the event +/// - `set_entry_type` - set the entry type +/// - `set_api` - set the API details +/// - `set_remote_host` - set the remote host +/// - `set_user_agent` - set the user agent +/// - `set_req_path` - set the request path +/// - `set_req_host` - set the request host +/// - `set_req_claims` - set the request claims +/// - `set_req_query` - set the request query +/// - `set_req_header` - set the request header +/// - `set_resp_header` - set the response header +/// - `set_access_key` - set the access key +/// - `set_parent_user` - set the parent user +/// - `set_error` - set the error +/// +/// # Example +/// ``` +/// use rustfs_audit_logger::AuditLogEntry; +/// use rustfs_audit_logger::ApiDetails; +/// use std::collections::HashMap; +/// +/// let entry = AuditLogEntry::new() +/// .set_version("1.0".to_string()) +/// .set_deployment_id(Some("123".to_string())) +/// .set_event("event".to_string()) +/// .set_entry_type(Some("type".to_string())) +/// .set_api(ApiDetails::new()) +/// .set_remote_host(Some("remote-host".to_string())) +/// .set_user_agent(Some("user-agent".to_string())) +/// .set_req_path(Some("req-path".to_string())) +/// .set_req_host(Some("req-host".to_string())) +/// .set_req_claims(Some(HashMap::new())) +/// .set_req_query(Some(HashMap::new())) +/// .set_req_header(Some(HashMap::new())) +/// .set_resp_header(Some(HashMap::new())) +/// .set_access_key(Some("access-key".to_string())) +/// .set_parent_user(Some("parent-user".to_string())) +/// .set_error(Some("error".to_string())); +#[derive(Debug, Serialize, Deserialize, Clone, Default)] +pub struct AuditLogEntry { + #[serde(flatten)] + pub base: BaseLogEntry, + pub version: String, + #[serde(rename = "deploymentid", skip_serializing_if = "Option::is_none")] + pub deployment_id: Option, + pub event: String, + // Class of audit message - S3, admin ops, bucket management + #[serde(rename = "type", skip_serializing_if = "Option::is_none")] + pub entry_type: Option, + pub api: ApiDetails, + #[serde(rename = "remotehost", skip_serializing_if = "Option::is_none")] + pub remote_host: Option, + #[serde(rename = "userAgent", skip_serializing_if = "Option::is_none")] + pub user_agent: Option, + #[serde(rename = "requestPath", skip_serializing_if = "Option::is_none")] + pub req_path: Option, + #[serde(rename = "requestHost", skip_serializing_if = "Option::is_none")] + pub req_host: Option, + #[serde(rename = "requestClaims", skip_serializing_if = "Option::is_none")] + pub req_claims: Option>, + #[serde(rename = "requestQuery", skip_serializing_if = "Option::is_none")] + pub req_query: Option>, + #[serde(rename = "requestHeader", skip_serializing_if = "Option::is_none")] + pub req_header: Option>, + #[serde(rename = "responseHeader", skip_serializing_if = "Option::is_none")] + pub resp_header: Option>, + #[serde(rename = "accessKey", skip_serializing_if = "Option::is_none")] + pub access_key: Option, + #[serde(rename = "parentUser", skip_serializing_if = "Option::is_none")] + pub parent_user: Option, + #[serde(rename = "error", skip_serializing_if = "Option::is_none")] + pub error: Option, +} + +impl AuditLogEntry { + /// Create a new `AuditEntry` with default values + pub fn new() -> Self { + AuditLogEntry { + base: BaseLogEntry::new(), + version: String::new(), + deployment_id: None, + event: String::new(), + entry_type: None, + api: ApiDetails::new(), + remote_host: None, + user_agent: None, + req_path: None, + req_host: None, + req_claims: None, + req_query: None, + req_header: None, + resp_header: None, + access_key: None, + parent_user: None, + error: None, + } + } + + /// Create a new `AuditEntry` with version, time, event and api details + pub fn new_with_values(version: String, time: DateTime, event: String, api: ApiDetails) -> Self { + let mut base = BaseLogEntry::new(); + base.timestamp = time; + + AuditLogEntry { + base, + version, + deployment_id: None, + event, + entry_type: None, + api, + remote_host: None, + user_agent: None, + req_path: None, + req_host: None, + req_claims: None, + req_query: None, + req_header: None, + resp_header: None, + access_key: None, + parent_user: None, + error: None, + } + } + + /// Set the base log entry + pub fn with_base(mut self, base: BaseLogEntry) -> Self { + self.base = base; + self + } + + /// Set the version + pub fn set_version(mut self, version: String) -> Self { + self.version = version; + self + } + + /// Set the deployment ID + pub fn set_deployment_id(mut self, deployment_id: Option) -> Self { + self.deployment_id = deployment_id; + self + } + + /// Set the event + pub fn set_event(mut self, event: String) -> Self { + self.event = event; + self + } + + /// Set the entry type + pub fn set_entry_type(mut self, entry_type: Option) -> Self { + self.entry_type = entry_type; + self + } + + /// Set the API details + pub fn set_api(mut self, api: ApiDetails) -> Self { + self.api = api; + self + } + + /// Set the remote host + pub fn set_remote_host(mut self, remote_host: Option) -> Self { + self.remote_host = remote_host; + self + } + + /// Set the user agent + pub fn set_user_agent(mut self, user_agent: Option) -> Self { + self.user_agent = user_agent; + self + } + + /// Set the request path + pub fn set_req_path(mut self, req_path: Option) -> Self { + self.req_path = req_path; + self + } + + /// Set the request host + pub fn set_req_host(mut self, req_host: Option) -> Self { + self.req_host = req_host; + self + } + + /// Set the request claims + pub fn set_req_claims(mut self, req_claims: Option>) -> Self { + self.req_claims = req_claims; + self + } + + /// Set the request query + pub fn set_req_query(mut self, req_query: Option>) -> Self { + self.req_query = req_query; + self + } + + /// Set the request header + pub fn set_req_header(mut self, req_header: Option>) -> Self { + self.req_header = req_header; + self + } + + /// Set the response header + pub fn set_resp_header(mut self, resp_header: Option>) -> Self { + self.resp_header = resp_header; + self + } + + /// Set the access key + pub fn set_access_key(mut self, access_key: Option) -> Self { + self.access_key = access_key; + self + } + + /// Set the parent user + pub fn set_parent_user(mut self, parent_user: Option) -> Self { + self.parent_user = parent_user; + self + } + + /// Set the error + pub fn set_error(mut self, error: Option) -> Self { + self.error = error; + self + } +} + +impl LogRecord for AuditLogEntry { + fn to_json(&self) -> String { + serde_json::to_string(self).unwrap_or_else(|_| String::from("{}")) + } + + fn get_timestamp(&self) -> DateTime { + self.base.timestamp + } +} diff --git a/crates/audit-logger/src/entry/base.rs b/crates/audit-logger/src/entry/base.rs new file mode 100644 index 000000000..b0ffd4bbf --- /dev/null +++ b/crates/audit-logger/src/entry/base.rs @@ -0,0 +1,106 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use std::collections::HashMap; + +/// Base log entry structure shared by all log types +/// This structure is used to serialize log entries to JSON +/// and send them to the log sinks +/// This structure is also used to deserialize log entries from JSON +/// This structure is also used to store log entries in the database +/// This structure is also used to query log entries from the database +/// +/// The `BaseLogEntry` structure contains the following fields: +/// - `timestamp` - the timestamp of the log entry +/// - `request_id` - the request ID of the log entry +/// - `message` - the message of the log entry +/// - `tags` - the tags of the log entry +/// +/// The `BaseLogEntry` structure contains the following methods: +/// - `new` - create a new `BaseLogEntry` with default values +/// - `message` - set the message +/// - `request_id` - set the request ID +/// - `tags` - set the tags +/// - `timestamp` - set the timestamp +/// +/// # Example +/// ``` +/// use rustfs_audit_logger::BaseLogEntry; +/// use chrono::{DateTime, Utc}; +/// use std::collections::HashMap; +/// +/// let timestamp = Utc::now(); +/// let request = Some("req-123".to_string()); +/// let message = Some("This is a log message".to_string()); +/// let tags = Some(HashMap::new()); +/// +/// let entry = BaseLogEntry::new() +/// .timestamp(timestamp) +/// .request_id(request) +/// .message(message) +/// .tags(tags); +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Default)] +pub struct BaseLogEntry { + #[serde(rename = "time")] + pub timestamp: DateTime, + + #[serde(rename = "requestID", skip_serializing_if = "Option::is_none")] + pub request_id: Option, + + #[serde(rename = "message", skip_serializing_if = "Option::is_none")] + pub message: Option, + + #[serde(rename = "tags", skip_serializing_if = "Option::is_none")] + pub tags: Option>, +} + +impl BaseLogEntry { + /// Create a new BaseLogEntry with default values + pub fn new() -> Self { + BaseLogEntry { + timestamp: Utc::now(), + request_id: None, + message: None, + tags: None, + } + } + + /// Set the message + pub fn message(mut self, message: Option) -> Self { + self.message = message; + self + } + + /// Set the request ID + pub fn request_id(mut self, request_id: Option) -> Self { + self.request_id = request_id; + self + } + + /// Set the tags + pub fn tags(mut self, tags: Option>) -> Self { + self.tags = tags; + self + } + + /// Set the timestamp + pub fn timestamp(mut self, timestamp: DateTime) -> Self { + self.timestamp = timestamp; + self + } +} diff --git a/crates/audit-logger/src/entry/mod.rs b/crates/audit-logger/src/entry/mod.rs new file mode 100644 index 000000000..12de8f594 --- /dev/null +++ b/crates/audit-logger/src/entry/mod.rs @@ -0,0 +1,157 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +pub(crate) mod args; +pub(crate) mod audit; +pub(crate) mod base; +pub(crate) mod unified; + +use serde::de::Error; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use tracing_core::Level; + +/// ObjectVersion is used across multiple modules +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +pub struct ObjectVersion { + #[serde(rename = "name")] + pub object_name: String, + #[serde(rename = "versionId", skip_serializing_if = "Option::is_none")] + pub version_id: Option, +} + +impl ObjectVersion { + /// Create a new ObjectVersion object + pub fn new() -> Self { + ObjectVersion { + object_name: String::new(), + version_id: None, + } + } + + /// Create a new ObjectVersion with object name + pub fn new_with_object_name(object_name: String) -> Self { + ObjectVersion { + object_name, + version_id: None, + } + } + + /// Set the object name + pub fn set_object_name(mut self, object_name: String) -> Self { + self.object_name = object_name; + self + } + + /// Set the version ID + pub fn set_version_id(mut self, version_id: Option) -> Self { + self.version_id = version_id; + self + } +} + +impl Default for ObjectVersion { + fn default() -> Self { + Self::new() + } +} + +/// Log kind/level enum +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub enum LogKind { + #[serde(rename = "INFO")] + #[default] + Info, + #[serde(rename = "WARNING")] + Warning, + #[serde(rename = "ERROR")] + Error, + #[serde(rename = "FATAL")] + Fatal, +} + +/// Trait for types that can be serialized to JSON and have a timestamp +/// This trait is used by `ServerLogEntry` to convert the log entry to JSON +/// and get the timestamp of the log entry +/// This trait is implemented by `ServerLogEntry` +/// +/// # Example +/// ``` +/// use rustfs_audit_logger::LogRecord; +/// use chrono::{DateTime, Utc}; +/// use rustfs_audit_logger::ServerLogEntry; +/// use tracing_core::Level; +/// +/// let log_entry = ServerLogEntry::new(Level::INFO, "api_handler".to_string()); +/// let json = log_entry.to_json(); +/// let timestamp = log_entry.get_timestamp(); +/// ``` +pub trait LogRecord { + fn to_json(&self) -> String; + fn get_timestamp(&self) -> chrono::DateTime; +} + +/// Wrapper for `tracing_core::Level` to implement `Serialize` and `Deserialize` +/// for `ServerLogEntry` +/// This is necessary because `tracing_core::Level` does not implement `Serialize` +/// and `Deserialize` +/// This is a workaround to allow `ServerLogEntry` to be serialized and deserialized +/// using `serde` +/// +/// # Example +/// ``` +/// use rustfs_audit_logger::SerializableLevel; +/// use tracing_core::Level; +/// +/// let level = Level::INFO; +/// let serializable_level = SerializableLevel::from(level); +/// ``` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SerializableLevel(pub Level); + +impl From for SerializableLevel { + fn from(level: Level) -> Self { + SerializableLevel(level) + } +} + +impl From for Level { + fn from(serializable_level: SerializableLevel) -> Self { + serializable_level.0 + } +} + +impl Serialize for SerializableLevel { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_str(self.0.as_str()) + } +} + +impl<'de> Deserialize<'de> for SerializableLevel { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + match s.as_str() { + "TRACE" => Ok(SerializableLevel(Level::TRACE)), + "DEBUG" => Ok(SerializableLevel(Level::DEBUG)), + "INFO" => Ok(SerializableLevel(Level::INFO)), + "WARN" => Ok(SerializableLevel(Level::WARN)), + "ERROR" => Ok(SerializableLevel(Level::ERROR)), + _ => Err(D::Error::custom("unknown log level")), + } + } +} diff --git a/crates/audit-logger/src/entry/unified.rs b/crates/audit-logger/src/entry/unified.rs new file mode 100644 index 000000000..5eb54df01 --- /dev/null +++ b/crates/audit-logger/src/entry/unified.rs @@ -0,0 +1,264 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{AuditLogEntry, BaseLogEntry, LogKind, LogRecord, SerializableLevel}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use tracing_core::Level; + +/// Server log entry with structured fields +/// ServerLogEntry is used to log structured log entries from the server +/// +/// The `ServerLogEntry` structure contains the following fields: +/// - `base` - the base log entry +/// - `level` - the log level +/// - `source` - the source of the log entry +/// - `user_id` - the user ID +/// - `fields` - the structured fields of the log entry +/// +/// The `ServerLogEntry` structure contains the following methods: +/// - `new` - create a new `ServerLogEntry` with specified level and source +/// - `with_base` - set the base log entry +/// - `user_id` - set the user ID +/// - `fields` - set the fields +/// - `add_field` - add a field +/// +/// # Example +/// ``` +/// use rustfs_audit_logger::ServerLogEntry; +/// use tracing_core::Level; +/// +/// let entry = ServerLogEntry::new(Level::INFO, "test_module".to_string()) +/// .user_id(Some("user-456".to_string())) +/// .add_field("operation".to_string(), "login".to_string()); +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ServerLogEntry { + #[serde(flatten)] + pub base: BaseLogEntry, + + pub level: SerializableLevel, + pub source: String, + + #[serde(rename = "userId", skip_serializing_if = "Option::is_none")] + pub user_id: Option, + + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub fields: Vec<(String, String)>, +} + +impl ServerLogEntry { + /// Create a new ServerLogEntry with specified level and source + pub fn new(level: Level, source: String) -> Self { + ServerLogEntry { + base: BaseLogEntry::new(), + level: SerializableLevel(level), + source, + user_id: None, + fields: Vec::new(), + } + } + + /// Set the base log entry + pub fn with_base(mut self, base: BaseLogEntry) -> Self { + self.base = base; + self + } + + /// Set the user ID + pub fn user_id(mut self, user_id: Option) -> Self { + self.user_id = user_id; + self + } + + /// Set fields + pub fn fields(mut self, fields: Vec<(String, String)>) -> Self { + self.fields = fields; + self + } + + /// Add a field + pub fn add_field(mut self, key: String, value: String) -> Self { + self.fields.push((key, value)); + self + } +} + +impl LogRecord for ServerLogEntry { + fn to_json(&self) -> String { + serde_json::to_string(self).unwrap_or_else(|_| String::from("{}")) + } + + fn get_timestamp(&self) -> DateTime { + self.base.timestamp + } +} + +/// Console log entry structure +/// ConsoleLogEntry is used to log console log entries +/// The `ConsoleLogEntry` structure contains the following fields: +/// - `base` - the base log entry +/// - `level` - the log level +/// - `console_msg` - the console message +/// - `node_name` - the node name +/// - `err` - the error message +/// +/// The `ConsoleLogEntry` structure contains the following methods: +/// - `new` - create a new `ConsoleLogEntry` +/// - `new_with_console_msg` - create a new `ConsoleLogEntry` with console message and node name +/// - `with_base` - set the base log entry +/// - `set_level` - set the log level +/// - `set_node_name` - set the node name +/// - `set_console_msg` - set the console message +/// - `set_err` - set the error message +/// +/// # Example +/// ``` +/// use rustfs_audit_logger::ConsoleLogEntry; +/// +/// let entry = ConsoleLogEntry::new_with_console_msg("Test message".to_string(), "node-123".to_string()); +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConsoleLogEntry { + #[serde(flatten)] + pub base: BaseLogEntry, + + pub level: LogKind, + pub console_msg: String, + pub node_name: String, + + #[serde(skip)] + pub err: Option, +} + +impl ConsoleLogEntry { + /// Create a new ConsoleLogEntry + pub fn new() -> Self { + ConsoleLogEntry { + base: BaseLogEntry::new(), + level: LogKind::Info, + console_msg: String::new(), + node_name: String::new(), + err: None, + } + } + + /// Create a new ConsoleLogEntry with console message and node name + pub fn new_with_console_msg(console_msg: String, node_name: String) -> Self { + ConsoleLogEntry { + base: BaseLogEntry::new(), + level: LogKind::Info, + console_msg, + node_name, + err: None, + } + } + + /// Set the base log entry + pub fn with_base(mut self, base: BaseLogEntry) -> Self { + self.base = base; + self + } + + /// Set the log level + pub fn set_level(mut self, level: LogKind) -> Self { + self.level = level; + self + } + + /// Set the node name + pub fn set_node_name(mut self, node_name: String) -> Self { + self.node_name = node_name; + self + } + + /// Set the console message + pub fn set_console_msg(mut self, console_msg: String) -> Self { + self.console_msg = console_msg; + self + } + + /// Set the error message + pub fn set_err(mut self, err: Option) -> Self { + self.err = err; + self + } +} + +impl Default for ConsoleLogEntry { + fn default() -> Self { + Self::new() + } +} + +impl LogRecord for ConsoleLogEntry { + fn to_json(&self) -> String { + serde_json::to_string(self).unwrap_or_else(|_| String::from("{}")) + } + + fn get_timestamp(&self) -> DateTime { + self.base.timestamp + } +} + +/// Unified log entry type +/// UnifiedLogEntry is used to log different types of log entries +/// +/// The `UnifiedLogEntry` enum contains the following variants: +/// - `Server` - a server log entry +/// - `Audit` - an audit log entry +/// - `Console` - a console log entry +/// +/// The `UnifiedLogEntry` enum contains the following methods: +/// - `to_json` - convert the log entry to JSON +/// - `get_timestamp` - get the timestamp of the log entry +/// +/// # Example +/// ``` +/// use rustfs_audit_logger::{UnifiedLogEntry, ServerLogEntry}; +/// use tracing_core::Level; +/// +/// let server_entry = ServerLogEntry::new(Level::INFO, "test_module".to_string()); +/// let unified = UnifiedLogEntry::Server(server_entry); +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum UnifiedLogEntry { + #[serde(rename = "server")] + Server(ServerLogEntry), + + #[serde(rename = "audit")] + Audit(Box), + + #[serde(rename = "console")] + Console(ConsoleLogEntry), +} + +impl LogRecord for UnifiedLogEntry { + fn to_json(&self) -> String { + match self { + UnifiedLogEntry::Server(entry) => entry.to_json(), + UnifiedLogEntry::Audit(entry) => entry.to_json(), + UnifiedLogEntry::Console(entry) => entry.to_json(), + } + } + + fn get_timestamp(&self) -> DateTime { + match self { + UnifiedLogEntry::Server(entry) => entry.get_timestamp(), + UnifiedLogEntry::Audit(entry) => entry.get_timestamp(), + UnifiedLogEntry::Console(entry) => entry.get_timestamp(), + } + } +} diff --git a/crates/audit-logger/src/lib.rs b/crates/audit-logger/src/lib.rs new file mode 100644 index 000000000..d91b7217e --- /dev/null +++ b/crates/audit-logger/src/lib.rs @@ -0,0 +1,9 @@ +mod entry; +mod logger; +mod target; + +pub use entry::args::Args; +pub use entry::audit::{ApiDetails, AuditLogEntry}; +pub use entry::base::BaseLogEntry; +pub use entry::unified::{ConsoleLogEntry, ServerLogEntry, UnifiedLogEntry}; +pub use entry::{LogKind, LogRecord, ObjectVersion, SerializableLevel}; diff --git a/crates/audit-logger/src/logger/config.rs b/crates/audit-logger/src/logger/config.rs new file mode 100644 index 000000000..9e79d6cd7 --- /dev/null +++ b/crates/audit-logger/src/logger/config.rs @@ -0,0 +1,84 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::Deserialize; +use std::collections::HashMap; +use url::Url; + +#[derive(Deserialize, Debug, Default)] +pub struct Config { + #[serde(default)] + pub console: ConsoleConfig, + #[serde(default)] + pub logger_webhook: HashMap, + #[serde(default)] + pub audit_webhook: HashMap, + #[serde(default)] + pub audit_kafka: HashMap, +} + +#[derive(Deserialize, Debug)] +#[serde(default)] +pub struct ConsoleConfig { + pub enabled: bool, +} + +impl Default for ConsoleConfig { + fn default() -> Self { + Self { enabled: true } + } +} + +#[derive(Deserialize, Debug, Clone)] +pub struct HttpConfig { + pub enabled: bool, + pub endpoint: Url, + #[serde(default)] + pub auth_token: String, + #[serde(default = "default_batch_size")] + pub batch_size: usize, + #[serde(default = "default_queue_size")] + pub queue_size: usize, + #[serde(default = "default_max_retry")] + pub max_retry: u32, + #[serde(with = "humantime_serde")] + #[serde(default = "default_retry_interval")] + pub retry_interval: std::time::Duration, +} + +// 为 HttpConfig 提供别名以区分 logger 和 audit +pub type LoggerWebhookConfig = HttpConfig; +pub type AuditWebhookConfig = HttpConfig; + +#[derive(Deserialize, Debug, Clone)] +pub struct AuditKafkaConfig { + pub enabled: bool, + pub brokers: Vec, + pub topic: String, + // ... 其他 Kafka 特定字段 +} + +// 默认值函数 +fn default_batch_size() -> usize { + 10 +} +fn default_queue_size() -> usize { + 10000 +} +fn default_max_retry() -> u32 { + 5 +} +fn default_retry_interval() -> std::time::Duration { + std::time::Duration::from_secs(3) +} diff --git a/crates/audit-logger/src/logger/dispatch.rs b/crates/audit-logger/src/logger/dispatch.rs new file mode 100644 index 000000000..23f2ae592 --- /dev/null +++ b/crates/audit-logger/src/logger/dispatch.rs @@ -0,0 +1,80 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::logger::config::Config; +use crate::logger::entry::{AuditEntry, LogEntry}; +use crate::logger::{Loggable, Target, factory}; +use std::sync::Arc; +use tokio::task::JoinHandle; + +pub struct AuditLogger { + targets: Vec>, +} + +impl AuditLogger { + pub fn new(config: &Config) -> Self { + let targets = factory::create_targets_from_config(config); + Self { targets } + } + + pub async fn log(&self, entry: impl Loggable) { + let boxed_entry: Box = Box::new(entry); + let entry_arc = Arc::new(boxed_entry); + + let mut handles: Vec> = Vec::new(); + + for target in &self.targets { + let target_clone = target.clone(); + let entry_clone = Arc::clone(&entry_arc); + let handle = tokio::spawn(async move { + // 我们需要一个新的 Box,因为 Arc 不能直接转换为 Box + let entry_for_send = entry_clone.to_json().unwrap(); + let rehydrated_entry: Box = match serde_json::from_str::(&entry_for_send) { + Ok(log_entry) => Box::new(log_entry), + Err(_) => match serde_json::from_str::(&entry_for_send) { + Ok(audit_entry) => Box::new(audit_entry), + Err(_) => { + eprintln!("Failed to rehydrate log entry for target {}", target_clone.name()); + return; + } + }, + }; + + if let Err(e) = target_clone.send(rehydrated_entry).await { + eprintln!("Failed to send log to target {}: {}", target_clone.name(), e); + } + }); + handles.push(handle); + } + + for handle in handles { + let _ = handle.await; + } + } + + pub async fn shutdown(&self) { + println!("Shutting down all logger targets..."); + let mut handles = vec![]; + for target in &self.targets { + let target = target.clone(); + handles.push(tokio::spawn(async move { + target.shutdown().await; + })); + } + for handle in handles { + handle.await.unwrap(); + } + println!("All logger targets shut down."); + } +} diff --git a/crates/audit-logger/src/logger/entry.rs b/crates/audit-logger/src/logger/entry.rs new file mode 100644 index 000000000..880c28fdf --- /dev/null +++ b/crates/audit-logger/src/logger/entry.rs @@ -0,0 +1,106 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::{DateTime, Utc}; +use serde::Serialize; +use std::collections::HashMap; +use uuid::Uuid; + +/// 一个可以被序列化和发送的日志条目的 Trait +pub trait Loggable: Serialize + Send + Sync + 'static { + fn to_json(&self) -> Result { + serde_json::to_string(self) + } +} + +/// 标准日志条目 +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct LogEntry { + pub deployment_id: String, + pub level: String, + pub message: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub trace: Option, + pub time: DateTime, + pub request_id: String, +} + +impl Loggable for LogEntry {} + +/// 审计日志条目 +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct AuditEntry { + pub version: String, + pub deployment_id: String, + pub time: DateTime, + pub trigger: String, + pub api: ApiDetails, + pub remote_host: String, + pub request_id: String, + pub user_agent: String, + pub access_key: String, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub tags: HashMap, +} + +impl Loggable for AuditEntry {} + +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Trace { + pub message: String, + pub source: Vec, + #[serde(skip_serializing_if = "HashMap::is_empty")] + pub variables: HashMap, +} + +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ApiDetails { + pub name: String, + pub bucket: String, + pub object: String, + pub status: String, + pub status_code: u16, + pub time_to_first_byte: String, + pub time_to_response: String, +} + +// 辅助函数来创建条目 +impl AuditEntry { + pub fn new(api_name: &str, bucket: &str, object: &str) -> Self { + AuditEntry { + version: "1".to_string(), + deployment_id: "global-deployment-id".to_string(), + time: Utc::now(), + trigger: "incoming".to_string(), + api: ApiDetails { + name: api_name.to_string(), + bucket: bucket.to_string(), + object: object.to_string(), + status: "OK".to_string(), + status_code: 200, + time_to_first_byte: "10ms".to_string(), + time_to_response: "50ms".to_string(), + }, + remote_host: "127.0.0.1".to_string(), + request_id: Uuid::new_v4().to_string(), + user_agent: "Rust-Client/1.0".to_string(), + access_key: "minioadmin".to_string(), + tags: HashMap::new(), + } + } +} diff --git a/crates/audit-logger/src/logger/factory.rs b/crates/audit-logger/src/logger/factory.rs new file mode 100644 index 000000000..7f8842711 --- /dev/null +++ b/crates/audit-logger/src/logger/factory.rs @@ -0,0 +1,56 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::Target; +use super::config::Config; +use super::http_target::HttpTarget; +use std::sync::Arc; + +pub fn create_targets_from_config(config: &Config) -> Vec> { + let mut targets: Vec> = Vec::new(); + + // Logger Webhook 目标 + for (name, cfg) in &config.logger_webhook { + if cfg.enabled { + println!("Initializing logger webhook target: {}", name); + let target = HttpTarget::new(format!("logger-webhook-{}", name), cfg.clone()); + targets.push(Arc::new(target)); + } + } + + // Audit Webhook 目标 + for (name, cfg) in &config.audit_webhook { + if cfg.enabled { + println!("Initializing audit webhook target: {}", name); + let target = HttpTarget::new(format!("audit-webhook-{}", name), cfg.clone()); + targets.push(Arc::new(target)); + } + } + + // Audit Kafka 目标 (存根) + for (name, cfg) in &config.audit_kafka { + if cfg.enabled { + println!("Initializing audit kafka target: {} (STUBBED)", name); + // let target = KafkaTarget::new(name.clone(), cfg.clone()); + // targets.push(Arc::new(target)); + } + } + + if config.console.enabled { + println!("Console logging is enabled."); + // 可以在这里添加一个 ConsoleTarget 实现 + } + + targets +} diff --git a/crates/audit-logger/src/logger/http_target.rs b/crates/audit-logger/src/logger/http_target.rs new file mode 100644 index 000000000..3ba144944 --- /dev/null +++ b/crates/audit-logger/src/logger/http_target.rs @@ -0,0 +1,188 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::{Loggable, Target}; +use crate::logger::config::HttpConfig; +use async_trait::async_trait; +use reqwest::Client; +use std::error::Error; +use std::sync::{ + Arc, + atomic::{AtomicBool, Ordering}, +}; +use tokio::sync::{Mutex, mpsc}; +use tokio::task::JoinHandle; +use tokio::time::{Duration, sleep}; + +pub struct HttpTarget { + name: String, + config: HttpConfig, + client: Client, + sender: mpsc::Sender>, + shutdown_signal: Arc, + worker_handle: Arc>>>, +} + +impl HttpTarget { + pub fn new(name: String, config: HttpConfig) -> Self { + let (sender, receiver) = mpsc::channel(config.queue_size); + let shutdown_signal = Arc::new(AtomicBool::new(false)); + + let target = Self { + name, + config, + client: Client::new(), + sender, + shutdown_signal, + worker_handle: Arc::new(Mutex::new(None)), + }; + + target.start_worker(receiver); + target + } + + fn start_worker(&self, mut receiver: mpsc::Receiver>) { + let client = self.client.clone(); + let config = self.config.clone(); + let endpoint = self.config.endpoint.clone(); + let shutdown_signal = self.shutdown_signal.clone(); + let name = self.name.clone(); + + let handle = tokio::spawn(async move { + let mut buffer: Vec> = Vec::with_capacity(config.batch_size); + let batch_timeout = Duration::from_secs(1); + + loop { + let should_shutdown = shutdown_signal.load(Ordering::SeqCst); + + match tokio::time::timeout(batch_timeout, receiver.recv()).await { + Ok(Some(entry)) => { + buffer.push(entry); + if buffer.len() >= config.batch_size { + Self::send_batch(&client, &endpoint, &config.auth_token, &mut buffer, &name).await; + } + } + Ok(None) => { + // Channel closed + break; + } + Err(_) => { + // Timeout + if !buffer.is_empty() { + Self::send_batch(&client, &endpoint, &config.auth_token, &mut buffer, &name).await; + } + } + } + + if should_shutdown && buffer.is_empty() { + break; + } + } + + // 发送剩余的日志 + if !buffer.is_empty() { + Self::send_batch(&client, &endpoint, &config.auth_token, &mut buffer, &name).await; + } + println!("Worker for target '{}' has shut down.", name); + }); + + // Store the handle so we can await it later + let worker_handle = self.worker_handle.clone(); + tokio::spawn(async move { + *worker_handle.lock().await = Some(handle); + }); + } + + async fn send_batch(client: &Client, endpoint: &url::Url, token: &str, buffer: &mut Vec>, name: &str) { + if buffer.is_empty() { + return; + } + + let entries_as_json: Vec<_> = buffer.iter().map(|e| e.to_json().unwrap()).collect(); + let body = format!("[{}]", entries_as_json.join(",")); + + let mut retries = 0; + loop { + let mut request = client + .post(endpoint.clone()) + .body(body.clone()) + .header("Content-Type", "application/json"); + if !token.is_empty() { + request = request.bearer_auth(token); + } + + match request.send().await { + Ok(resp) if resp.status().is_success() => { + // println!("Successfully sent batch of {} logs to {}", buffer.len(), name); + buffer.clear(); + return; + } + Ok(resp) => { + eprintln!("Error sending logs to {}: HTTP Status {}", name, resp.status()); + } + Err(e) => { + eprintln!("Network error sending logs to {}: {}", name, e); + } + } + + retries += 1; + if retries > default_max_retry() { + eprintln!("Failed to send batch to {} after {} retries. Dropping logs.", name, retries - 1); + buffer.clear(); + return; + } + sleep(default_retry_interval()).await; + } + } +} + +#[async_trait] +impl Target for HttpTarget { + async fn send(&self, entry: Box) -> Result<(), Box> { + if self.shutdown_signal.load(Ordering::SeqCst) { + return Err("Target is shutting down".into()); + } + + match self.sender.try_send(entry) { + Ok(_) => Ok(()), + Err(mpsc::error::TrySendError::Full(_)) => { + eprintln!("Log queue for target '{}' is full. Dropping log.", self.name); + Err("Queue full".into()) + } + Err(mpsc::error::TrySendError::Closed(_)) => { + eprintln!("Log channel for target '{}' is closed.", self.name); + Err("Channel closed".into()) + } + } + } + + fn name(&self) -> &str { + &self.name + } + + async fn shutdown(&self) { + println!("Initiating shutdown for target '{}'...", self.name); + self.shutdown_signal.store(true, Ordering::SeqCst); + + // 克隆 sender 并关闭它,这将导致 worker 中的 recv() 返回 None + let sender_clone = self.sender.clone(); + sender_clone.closed().await; + + if let Some(handle) = self.worker_handle.lock().await.take() { + if let Err(e) = handle.await { + eprintln!("Error waiting for worker of target '{}' to shut down: {}", self.name, e); + } + } + } +} diff --git a/crates/audit-logger/src/logger/mod.rs b/crates/audit-logger/src/logger/mod.rs new file mode 100644 index 000000000..37af7d47d --- /dev/null +++ b/crates/audit-logger/src/logger/mod.rs @@ -0,0 +1,36 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod config; +pub mod dispatch; +pub mod entry; +pub mod factory; +pub mod http_target; + +use crate::logger::entry::Loggable; +use async_trait::async_trait; +use std::error::Error; + +/// 通用日志目标 Trait +#[async_trait] +pub trait Target: Send + Sync { + /// 发送单个可日志化条目 + async fn send(&self, entry: Box) -> Result<(), Box>; + + /// 返回目标的唯一名称 + fn name(&self) -> &str; + + /// 优雅地关闭目标,确保所有缓冲的日志都被处理 + async fn shutdown(&self); +} diff --git a/crates/audit-logger/src/target/file.rs b/crates/audit-logger/src/target/file.rs new file mode 100644 index 000000000..6238cfff4 --- /dev/null +++ b/crates/audit-logger/src/target/file.rs @@ -0,0 +1,13 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. diff --git a/crates/audit-logger/src/target/mod.rs b/crates/audit-logger/src/target/mod.rs new file mode 100644 index 000000000..c55e62efa --- /dev/null +++ b/crates/audit-logger/src/target/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod file; +mod webhook; diff --git a/crates/audit-logger/src/target/webhook.rs b/crates/audit-logger/src/target/webhook.rs new file mode 100644 index 000000000..6238cfff4 --- /dev/null +++ b/crates/audit-logger/src/target/webhook.rs @@ -0,0 +1,13 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. diff --git a/crates/config/Cargo.toml b/crates/config/Cargo.toml index c9095b316..7cf5ef86a 100644 --- a/crates/config/Cargo.toml +++ b/crates/config/Cargo.toml @@ -31,8 +31,9 @@ const-str = { workspace = true, optional = true } workspace = true [features] -default = [] +default = ["constants"] +audit = ["dep:const-str", "constants"] constants = ["dep:const-str"] -notify = ["dep:const-str"] -observability = [] +notify = ["dep:const-str", "constants"] +observability = ["constants"] diff --git a/crates/config/src/audit/mod.rs b/crates/config/src/audit/mod.rs new file mode 100644 index 000000000..c1aac6a56 --- /dev/null +++ b/crates/config/src/audit/mod.rs @@ -0,0 +1,29 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Audit configuration module +//! //! This module defines the configuration for audit systems, including +//! webhook and other audit-related settings. +pub const AUDIT_WEBHOOK_SUB_SYS: &str = "audit_webhook"; + +pub const WEBHOOK_ENDPOINT: &str = "endpoint"; +pub const WEBHOOK_AUTH_TOKEN: &str = "auth_token"; +pub const WEBHOOK_CLIENT_CERT: &str = "client_cert"; +pub const WEBHOOK_CLIENT_KEY: &str = "client_key"; +pub const WEBHOOK_BATCH_SIZE: &str = "batch_size"; +pub const WEBHOOK_QUEUE_SIZE: &str = "queue_size"; +pub const WEBHOOK_QUEUE_DIR: &str = "queue_dir"; +pub const WEBHOOK_MAX_RETRY: &str = "max_retry"; +pub const WEBHOOK_RETRY_INTERVAL: &str = "retry_interval"; +pub const WEBHOOK_HTTP_TIMEOUT: &str = "http_timeout"; diff --git a/crates/config/src/constants/env.rs b/crates/config/src/constants/env.rs index 22eae738b..e78c2b90f 100644 --- a/crates/config/src/constants/env.rs +++ b/crates/config/src/constants/env.rs @@ -16,6 +16,13 @@ pub const DEFAULT_DELIMITER: &str = "_"; pub const ENV_PREFIX: &str = "RUSTFS_"; pub const ENV_WORD_DELIMITER: &str = "_"; +pub const DEFAULT_DIR: &str = "/opt/rustfs/events"; // Default directory for event store +pub const DEFAULT_LIMIT: u64 = 100000; // Default store limit + +/// Standard config keys and values. +pub const ENABLE_KEY: &str = "enable"; +pub const COMMENT_KEY: &str = "comment"; + /// Medium-drawn lines separator /// This is used to separate words in environment variable names. pub const ENV_WORD_DELIMITER_DASH: &str = "-"; diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index 1c2afecde..98bd42c14 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -20,6 +20,8 @@ pub use constants::app::*; pub use constants::env::*; #[cfg(feature = "constants")] pub use constants::tls::*; +#[cfg(feature = "audit")] +pub mod audit; #[cfg(feature = "notify")] pub mod notify; #[cfg(feature = "observability")] diff --git a/crates/config/src/notify/mod.rs b/crates/config/src/notify/mod.rs index 09d8f6f6a..ec86985d7 100644 --- a/crates/config/src/notify/mod.rs +++ b/crates/config/src/notify/mod.rs @@ -29,14 +29,6 @@ pub const NOTIFY_PREFIX: &str = "notify"; pub const NOTIFY_ROUTE_PREFIX: &str = const_str::concat!(NOTIFY_PREFIX, "_"); -/// Standard config keys and values. -pub const ENABLE_KEY: &str = "enable"; -pub const COMMENT_KEY: &str = "comment"; - -/// Enable values -pub const ENABLE_ON: &str = "on"; -pub const ENABLE_OFF: &str = "off"; - #[allow(dead_code)] pub const NOTIFY_SUB_SYSTEMS: &[&str] = &[NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS]; diff --git a/crates/config/src/notify/mqtt.rs b/crates/config/src/notify/mqtt.rs index ba5ed6b14..0282d82e0 100644 --- a/crates/config/src/notify/mqtt.rs +++ b/crates/config/src/notify/mqtt.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::notify::{COMMENT_KEY, ENABLE_KEY}; +use crate::{COMMENT_KEY, ENABLE_KEY}; // MQTT Keys pub const MQTT_BROKER: &str = "broker"; diff --git a/crates/config/src/notify/store.rs b/crates/config/src/notify/store.rs index 5e3dd51cb..ed838b05d 100644 --- a/crates/config/src/notify/store.rs +++ b/crates/config/src/notify/store.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub const DEFAULT_DIR: &str = "/opt/rustfs/events"; // Default directory for event store -pub const DEFAULT_LIMIT: u64 = 100000; // Default store limit pub const DEFAULT_EXT: &str = ".unknown"; // Default file extension pub const COMPRESS_EXT: &str = ".snappy"; // Extension for compressed files diff --git a/crates/config/src/notify/webhook.rs b/crates/config/src/notify/webhook.rs index b4fefb5f5..bbb787162 100644 --- a/crates/config/src/notify/webhook.rs +++ b/crates/config/src/notify/webhook.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::notify::{COMMENT_KEY, ENABLE_KEY}; +use crate::{COMMENT_KEY, ENABLE_KEY}; // Webhook Keys pub const WEBHOOK_ENDPOINT: &str = "endpoint"; diff --git a/crates/ecstore/Cargo.toml b/crates/ecstore/Cargo.toml index 1e3d815e5..182d7ad66 100644 --- a/crates/ecstore/Cargo.toml +++ b/crates/ecstore/Cargo.toml @@ -34,7 +34,7 @@ workspace = true default = [] [dependencies] -rustfs-config = { workspace = true, features = ["constants", "notify"] } +rustfs-config = { workspace = true, features = ["constants", "notify", "audit"] } async-trait.workspace = true bytes.workspace = true byteorder = { workspace = true } diff --git a/crates/ecstore/src/config/audit.rs b/crates/ecstore/src/config/audit.rs new file mode 100644 index 000000000..91fb4fe45 --- /dev/null +++ b/crates/ecstore/src/config/audit.rs @@ -0,0 +1,82 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::config::{KV, KVS}; +use rustfs_config::audit::{ + WEBHOOK_AUTH_TOKEN, WEBHOOK_BATCH_SIZE, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_HTTP_TIMEOUT, + WEBHOOK_MAX_RETRY, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_SIZE, WEBHOOK_RETRY_INTERVAL, +}; +use rustfs_config::{DEFAULT_DIR, DEFAULT_LIMIT, ENABLE_KEY, ENABLE_OFF}; +use std::sync::LazyLock; + +/// Default KVS for audit webhook settings. +pub const DEFAULT_AUDIT_WEBHOOK_KVS: LazyLock = LazyLock::new(|| { + KVS(vec![ + KV { + key: ENABLE_KEY.to_owned(), + value: ENABLE_OFF.to_owned(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_ENDPOINT.to_owned(), + value: "".to_owned(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_AUTH_TOKEN.to_owned(), + value: "".to_owned(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_CLIENT_CERT.to_owned(), + value: "".to_owned(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_CLIENT_KEY.to_owned(), + value: "".to_owned(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_BATCH_SIZE.to_owned(), + value: "1".to_owned(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_QUEUE_SIZE.to_owned(), + value: DEFAULT_LIMIT.to_string(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_QUEUE_DIR.to_owned(), + value: DEFAULT_DIR.to_owned(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_MAX_RETRY.to_owned(), + value: "0".to_owned(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_RETRY_INTERVAL.to_owned(), + value: "3s".to_owned(), + hidden_if_empty: false, + }, + KV { + key: WEBHOOK_HTTP_TIMEOUT.to_owned(), + value: "5s".to_owned(), + hidden_if_empty: false, + }, + ]) +}); diff --git a/crates/ecstore/src/config/mod.rs b/crates/ecstore/src/config/mod.rs index e957b7d89..f4562b388 100644 --- a/crates/ecstore/src/config/mod.rs +++ b/crates/ecstore/src/config/mod.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod audit; pub mod com; #[allow(dead_code)] pub mod heal; @@ -21,8 +22,9 @@ pub mod storageclass; use crate::error::Result; use crate::store::ECStore; use com::{STORAGE_CLASS_SUB_SYS, lookup_configs, read_config_without_migrate}; +use rustfs_config::COMMENT_KEY; use rustfs_config::DEFAULT_DELIMITER; -use rustfs_config::notify::{COMMENT_KEY, NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS}; +use rustfs_config::notify::{NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::LazyLock; diff --git a/crates/ecstore/src/config/notify.rs b/crates/ecstore/src/config/notify.rs index 2e1dedc2d..bc3c7e42f 100644 --- a/crates/ecstore/src/config/notify.rs +++ b/crates/ecstore/src/config/notify.rs @@ -14,10 +14,11 @@ use crate::config::{KV, KVS}; use rustfs_config::notify::{ - COMMENT_KEY, DEFAULT_DIR, DEFAULT_LIMIT, ENABLE_KEY, ENABLE_OFF, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, - MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, - WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, + MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, + MQTT_TOPIC, MQTT_USERNAME, WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, + WEBHOOK_QUEUE_LIMIT, }; +use rustfs_config::{COMMENT_KEY, DEFAULT_DIR, DEFAULT_LIMIT, ENABLE_KEY, ENABLE_OFF}; use std::sync::LazyLock; /// The default configuration collection of webhooks, diff --git a/crates/notify/examples/full_demo.rs b/crates/notify/examples/full_demo.rs index 4eb93df9b..af1d9b4e3 100644 --- a/crates/notify/examples/full_demo.rs +++ b/crates/notify/examples/full_demo.rs @@ -13,10 +13,10 @@ // limitations under the License. use rustfs_config::notify::{ - DEFAULT_LIMIT, DEFAULT_TARGET, ENABLE_KEY, ENABLE_ON, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, - MQTT_TOPIC, MQTT_USERNAME, NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, - WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, + DEFAULT_TARGET, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, MQTT_USERNAME, + NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, }; +use rustfs_config::{DEFAULT_LIMIT, ENABLE_KEY, ENABLE_ON}; use rustfs_ecstore::config::{Config, KV, KVS}; use rustfs_notify::arn::TargetID; use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger}; diff --git a/crates/notify/examples/full_demo_one.rs b/crates/notify/examples/full_demo_one.rs index 5d06eef9f..88b8d24fe 100644 --- a/crates/notify/examples/full_demo_one.rs +++ b/crates/notify/examples/full_demo_one.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Using Global Accessories use rustfs_config::notify::{ - DEFAULT_LIMIT, DEFAULT_TARGET, ENABLE_KEY, ENABLE_ON, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, - MQTT_TOPIC, MQTT_USERNAME, NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, - WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, + DEFAULT_TARGET, MQTT_BROKER, MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_TOPIC, MQTT_USERNAME, + NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, }; +// Using Global Accessories +use rustfs_config::{DEFAULT_LIMIT, ENABLE_KEY, ENABLE_ON}; use rustfs_ecstore::config::{Config, KV, KVS}; use rustfs_notify::arn::TargetID; use rustfs_notify::{BucketNotificationConfig, Event, EventName, LogLevel, NotificationError, init_logger}; diff --git a/crates/notify/src/factory.rs b/crates/notify/src/factory.rs index ada2de487..fc30bf721 100644 --- a/crates/notify/src/factory.rs +++ b/crates/notify/src/factory.rs @@ -19,11 +19,11 @@ use crate::{ use async_trait::async_trait; use rumqttc::QoS; use rustfs_config::notify::{ - DEFAULT_DIR, DEFAULT_LIMIT, ENV_NOTIFY_MQTT_KEYS, ENV_NOTIFY_WEBHOOK_KEYS, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, - MQTT_PASSWORD, MQTT_QOS, MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, - NOTIFY_MQTT_KEYS, NOTIFY_WEBHOOK_KEYS, WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, - WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, + ENV_NOTIFY_MQTT_KEYS, ENV_NOTIFY_WEBHOOK_KEYS, MQTT_BROKER, MQTT_KEEP_ALIVE_INTERVAL, MQTT_PASSWORD, MQTT_QOS, + MQTT_QUEUE_DIR, MQTT_QUEUE_LIMIT, MQTT_RECONNECT_INTERVAL, MQTT_TOPIC, MQTT_USERNAME, NOTIFY_MQTT_KEYS, NOTIFY_WEBHOOK_KEYS, + WEBHOOK_AUTH_TOKEN, WEBHOOK_CLIENT_CERT, WEBHOOK_CLIENT_KEY, WEBHOOK_ENDPOINT, WEBHOOK_QUEUE_DIR, WEBHOOK_QUEUE_LIMIT, }; +use rustfs_config::{DEFAULT_DIR, DEFAULT_LIMIT}; use rustfs_ecstore::config::KVS; use std::collections::HashSet; use std::time::Duration; diff --git a/crates/notify/src/store.rs b/crates/notify/src/store.rs index bf54e1e2d..8d09319fe 100644 --- a/crates/notify/src/store.rs +++ b/crates/notify/src/store.rs @@ -13,7 +13,8 @@ // limitations under the License. use crate::error::StoreError; -use rustfs_config::notify::{COMPRESS_EXT, DEFAULT_EXT, DEFAULT_LIMIT}; +use rustfs_config::DEFAULT_LIMIT; +use rustfs_config::notify::{COMPRESS_EXT, DEFAULT_EXT}; use serde::{Serialize, de::DeserializeOwned}; use snap::raw::{Decoder, Encoder}; use std::sync::{Arc, RwLock}; diff --git a/crates/rio/Cargo.toml b/crates/rio/Cargo.toml index 875b5fc28..835f23e28 100644 --- a/crates/rio/Cargo.toml +++ b/crates/rio/Cargo.toml @@ -45,4 +45,4 @@ serde_json.workspace = true md-5 = { workspace = true } [dev-dependencies] -tokio-test = { workspace = true } +tokio-test = { workspace = true } \ No newline at end of file diff --git a/rustfs/Cargo.toml b/rustfs/Cargo.toml index 6d753eff6..c24b9bded 100644 --- a/rustfs/Cargo.toml +++ b/rustfs/Cargo.toml @@ -49,8 +49,9 @@ rustfs-config = { workspace = true, features = ["constants", "notify"] } rustfs-notify = { workspace = true } rustfs-obs = { workspace = true } rustfs-utils = { workspace = true, features = ["full"] } -rustfs-protos.workspace = true +rustfs-protos = { workspace = true } rustfs-s3select-query = { workspace = true } +rustfs-audit-logger = { workspace = true } atoi = { workspace = true } atomic_enum = { workspace = true } axum.workspace = true diff --git a/rustfs/src/admin/handlers/event.rs b/rustfs/src/admin/handlers/event.rs index 999597449..765e3338b 100644 --- a/rustfs/src/admin/handlers/event.rs +++ b/rustfs/src/admin/handlers/event.rs @@ -18,7 +18,8 @@ use crate::admin::router::Operation; use crate::auth::{check_key_valid, get_session_token}; use http::{HeaderMap, StatusCode}; use matchit::Params; -use rustfs_config::notify::{ENABLE_KEY, ENABLE_ON, NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS}; +use rustfs_config::notify::{NOTIFY_MQTT_SUB_SYS, NOTIFY_WEBHOOK_SUB_SYS}; +use rustfs_config::{ENABLE_KEY, EnableState}; use rustfs_notify::EventName; use rustfs_notify::rules::{BucketNotificationConfig, PatternRules}; use s3s::header::CONTENT_LENGTH; @@ -78,7 +79,7 @@ impl Operation for SetNotificationTarget { .map_err(|e| s3_error!(InvalidArgument, "invalid json body for target config: {}", e))?; // If there is an enable key, add an enable key value to "on" if !kvs_map.contains_key(ENABLE_KEY) { - kvs_map.insert(ENABLE_KEY.to_string(), ENABLE_ON.to_string()); + kvs_map.insert(ENABLE_KEY.to_string(), EnableState::On.to_string()); } let kvs = rustfs_ecstore::config::KVS( diff --git a/rustfs/src/admin/mod.rs b/rustfs/src/admin/mod.rs index 0449b290c..2fde47f97 100644 --- a/rustfs/src/admin/mod.rs +++ b/rustfs/src/admin/mod.rs @@ -25,7 +25,10 @@ use handlers::{ sts, tier, user, }; -use crate::admin::handlers::event::{ListNotificationTargets, RemoveNotificationTarget, SetNotificationTarget}; +use crate::admin::handlers::event::{ + GetBucketNotification, ListNotificationTargets, RemoveBucketNotification, RemoveNotificationTarget, SetBucketNotification, + SetNotificationTarget, +}; use handlers::{GetReplicationMetricsHandler, ListRemoteTargetHandler, RemoveRemoteTargetHandler, SetRemoteTargetHandler}; use hyper::Method; use router::{AdminOperation, S3Router}; @@ -389,5 +392,23 @@ fn register_user_route(r: &mut S3Router) -> std::io::Result<()> AdminOperation(&RemoveNotificationTarget {}), )?; + r.insert( + Method::POST, + format!("{}{}", ADMIN_PREFIX, "/v3/target-set-bucket").as_str(), + AdminOperation(&SetBucketNotification {}), + )?; + + r.insert( + Method::POST, + format!("{}{}", ADMIN_PREFIX, "/v3/target-get-bucket").as_str(), + AdminOperation(&GetBucketNotification {}), + )?; + + r.insert( + Method::POST, + format!("{}{}", ADMIN_PREFIX, "/v3/target-remove-bucket").as_str(), + AdminOperation(&RemoveBucketNotification {}), + )?; + Ok(()) } diff --git a/rustfs/src/config/mod.rs b/rustfs/src/config/mod.rs index 038f5f765..ddc248cec 100644 --- a/rustfs/src/config/mod.rs +++ b/rustfs/src/config/mod.rs @@ -72,7 +72,7 @@ pub struct Opt { #[arg(long, default_value_t = rustfs_config::DEFAULT_OBS_ENDPOINT.to_string(), env = "RUSTFS_OBS_ENDPOINT")] pub obs_endpoint: String, - /// tls path for rustfs api and console. + /// tls path for rustfs API and console. #[arg(long, env = "RUSTFS_TLS_PATH")] pub tls_path: Option, diff --git a/rustfs/src/server/audit.rs b/rustfs/src/server/audit.rs new file mode 100644 index 000000000..439d7a7c5 --- /dev/null +++ b/rustfs/src/server/audit.rs @@ -0,0 +1,13 @@ +// Copyright 2024 RustFS Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. diff --git a/rustfs/src/server/mod.rs b/rustfs/src/server/mod.rs index 5efd86170..3b86e513a 100644 --- a/rustfs/src/server/mod.rs +++ b/rustfs/src/server/mod.rs @@ -12,10 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod audit; mod http; mod hybrid; mod layer; mod service_state; + pub(crate) use http::start_http_server; pub(crate) use service_state::SHUTDOWN_TIMEOUT; pub(crate) use service_state::ServiceState;