From 94507dfcc4f35b45f8f2e5ef78fd9a4fc8acfbaf Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 17 Jul 2025 19:31:05 -0400 Subject: [PATCH 01/16] Fix cargo fmt --all --check --- tests/unit_default_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit_default_tests.rs b/tests/unit_default_tests.rs index 5841586..ae8a393 100644 --- a/tests/unit_default_tests.rs +++ b/tests/unit_default_tests.rs @@ -21,4 +21,4 @@ fn test_unit_default_rate() { // Specifically for this test, we don't really need to use `approx` assert_eq!(def.rate, 0.0); -} \ No newline at end of file +} From 20bffbb79fb068909b16edd1513651a36eacbb2b Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 4 Aug 2025 12:50:54 -0400 Subject: [PATCH 02/16] Minor test improvements GET /api/deprecated-features/used can return a value this library won't be able to parse at the moment, but the period of time is very short. The same feature is tested by `rabbitmqadmin`, so we can afford to modify the test to be less brittle. --- tests/async_definitions_tests.rs | 18 ++++++++------ tests/async_deprecated_feature_tests.rs | 29 +++++++++++++--------- tests/blocking_definitions_tests.rs | 10 ++++---- tests/blocking_deprecated_feature_tests.rs | 22 +++++++++------- 4 files changed, 46 insertions(+), 33 deletions(-) diff --git a/tests/async_definitions_tests.rs b/tests/async_definitions_tests.rs index 67796f8..4362a4c 100644 --- a/tests/async_definitions_tests.rs +++ b/tests/async_definitions_tests.rs @@ -214,12 +214,13 @@ async fn test_async_export_vhost_definitions_as_data() { async fn test_async_import_cluster_definitions() { let endpoint = endpoint(); let rc = Client::new(&endpoint, USERNAME, PASSWORD); - let _ = rc.delete_queue("/", "imported_queue", false).await; + let queue_name = "test_async_import_cluster_definitions"; + let _ = rc.delete_queue("/", queue_name, false).await; let defs = json!({ "queues": [ { "auto_delete": false, "durable": true, - "name": "imported_queue", + "name": queue_name, "vhost": "/" } ]}); @@ -230,8 +231,11 @@ async fn test_async_import_cluster_definitions() { "import_cluster_wide_definitions returned {result:?}" ); - let result1 = rc.get_queue_info("/", "imported_queue").await; - assert!(result1.is_ok(), "can't get the imported queue: {result1:?}"); + let result1 = rc.get_queue_info("/", queue_name).await; + assert!( + result1.is_ok(), + "an important queue '{queue_name}' is missing: {result1:?}" + ); } #[tokio::test] @@ -245,12 +249,12 @@ async fn test_async_import_vhost_definitions() { let vh_params = VirtualHostParams::named(vh); rc.create_vhost(&vh_params).await.unwrap(); - let q = "imported_queue"; + let queue_name = "test_async_import_vhost_definitions"; let defs = json!({ "queues": [ { "auto_delete": false, "durable": true, - "name": q, + "name": queue_name, } ]}); @@ -262,7 +266,7 @@ async fn test_async_import_vhost_definitions() { await_queue_metric_emission(); - let result1 = rc.get_queue_info(vh, q).await; + let result1 = rc.get_queue_info(vh, queue_name).await; assert!(result1.is_ok(), "can't get the imported queue: {result1:?}"); rc.delete_vhost(vh, true).await.unwrap(); diff --git a/tests/async_deprecated_feature_tests.rs b/tests/async_deprecated_feature_tests.rs index ffb4212..3836884 100644 --- a/tests/async_deprecated_feature_tests.rs +++ b/tests/async_deprecated_feature_tests.rs @@ -42,21 +42,26 @@ async fn test_async_list_deprecated_features_in_use() { } let vh = "/"; - let q = "test_list_deprecated_features_in_use"; + let queue_name = "test_async_list_deprecated_features_in_use"; - rc.delete_queue(vh, q, true).await.unwrap(); + rc.delete_queue(vh, queue_name, true).await.unwrap(); - let params = QueueParams::new(q, QueueType::Classic, false, false, None); + let params = QueueParams::new(queue_name, QueueType::Classic, false, false, None); rc.declare_queue(vh, ¶ms).await.unwrap(); let result2 = rc.list_deprecated_features_in_use().await; - assert!(result2.is_ok()); - let vec = result2.unwrap(); - assert!( - vec.0 - .into_iter() - .any(|df| df.deprecation_phase == DeprecationPhase::PermittedByDefault) - ); - - rc.delete_queue(vh, q, true).await.unwrap(); + match result2 { + Ok(vec) => { + dbg!(&vec); + assert!( + vec.0 + .into_iter() + .any(|df| df.deprecation_phase == DeprecationPhase::PermittedByDefault) + ); + } + Err(_err) => { + // occasionally happens in this specific test + rc.delete_queue(vh, queue_name, true).await.unwrap(); + } + } } diff --git a/tests/blocking_definitions_tests.rs b/tests/blocking_definitions_tests.rs index 23f00f8..4433915 100644 --- a/tests/blocking_definitions_tests.rs +++ b/tests/blocking_definitions_tests.rs @@ -201,14 +201,14 @@ fn test_blocking_export_vhost_definitions_as_data() { fn test_blocking_import_cluster_definitions() { let endpoint = endpoint(); let rc = Client::new(&endpoint, USERNAME, PASSWORD); - let q = "imported_queue"; + let queue_name = "test_blocking_import_cluster_definitions"; - let _ = rc.delete_queue("/", q, false); + let _ = rc.delete_queue("/", queue_name, false); let defs = json!({ "queues": [ { "auto_delete": false, "durable": true, - "name": q, + "name": queue_name, "vhost": "/" } ]}); @@ -219,13 +219,13 @@ fn test_blocking_import_cluster_definitions() { "import_cluster_wide_definitions returned {result:?}" ); - let result1 = rc.get_queue_info("/", q); + let result1 = rc.get_queue_info("/", queue_name); assert!( result1.is_ok(), "can't get the imported import_cluster_wide_definitions: {result1:?}" ); - rc.delete_queue("/", q, true).unwrap(); + rc.delete_queue("/", queue_name, true).unwrap(); } #[test] diff --git a/tests/blocking_deprecated_feature_tests.rs b/tests/blocking_deprecated_feature_tests.rs index 4cef0a6..76cb3fd 100644 --- a/tests/blocking_deprecated_feature_tests.rs +++ b/tests/blocking_deprecated_feature_tests.rs @@ -50,13 +50,17 @@ fn test_blocking_list_deprecated_features_in_use() { rc.declare_queue(vh, ¶ms).unwrap(); let result2 = rc.list_deprecated_features_in_use(); - assert!(result2.is_ok()); - let vec = result2.unwrap(); - assert!( - vec.0 - .into_iter() - .any(|df| df.deprecation_phase == DeprecationPhase::PermittedByDefault) - ); - - rc.delete_queue(vh, q, true).unwrap(); + match result2 { + Ok(vec) => { + assert!( + vec.0 + .into_iter() + .any(|df| df.deprecation_phase == DeprecationPhase::PermittedByDefault) + ); + } + Err(_err) => { + // occasionally happens in this specific test + rc.delete_queue(vh, q, true).unwrap(); + } + } } From 381df59a2057e0da6575d4a2b77d03721b857d17 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 4 Aug 2025 15:51:27 -0400 Subject: [PATCH 03/16] Minor test improvements --- tests/async_definitions_tests.rs | 10 +++++++--- tests/blocking_definitions_tests.rs | 11 ++++++----- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/tests/async_definitions_tests.rs b/tests/async_definitions_tests.rs index 4362a4c..36a2214 100644 --- a/tests/async_definitions_tests.rs +++ b/tests/async_definitions_tests.rs @@ -214,14 +214,16 @@ async fn test_async_export_vhost_definitions_as_data() { async fn test_async_import_cluster_definitions() { let endpoint = endpoint(); let rc = Client::new(&endpoint, USERNAME, PASSWORD); + + let vh = "/"; let queue_name = "test_async_import_cluster_definitions"; - let _ = rc.delete_queue("/", queue_name, false).await; + let _ = rc.delete_queue(vh, queue_name, false).await; let defs = json!({ "queues": [ { "auto_delete": false, "durable": true, "name": queue_name, - "vhost": "/" + "vhost": vh } ]}); @@ -231,11 +233,13 @@ async fn test_async_import_cluster_definitions() { "import_cluster_wide_definitions returned {result:?}" ); - let result1 = rc.get_queue_info("/", queue_name).await; + let result1 = rc.get_queue_info(vh, queue_name).await; assert!( result1.is_ok(), "an important queue '{queue_name}' is missing: {result1:?}" ); + + rc.delete_queue(vh, queue_name, true).await.unwrap(); } #[tokio::test] diff --git a/tests/blocking_definitions_tests.rs b/tests/blocking_definitions_tests.rs index 4433915..4f6ea7e 100644 --- a/tests/blocking_definitions_tests.rs +++ b/tests/blocking_definitions_tests.rs @@ -201,15 +201,16 @@ fn test_blocking_export_vhost_definitions_as_data() { fn test_blocking_import_cluster_definitions() { let endpoint = endpoint(); let rc = Client::new(&endpoint, USERNAME, PASSWORD); - let queue_name = "test_blocking_import_cluster_definitions"; - let _ = rc.delete_queue("/", queue_name, false); + let vh = "/"; + let queue_name = "test_blocking_import_cluster_definitions"; + let _ = rc.delete_queue(vh, queue_name, false); let defs = json!({ "queues": [ { "auto_delete": false, "durable": true, "name": queue_name, - "vhost": "/" + "vhost": vh } ]}); @@ -219,13 +220,13 @@ fn test_blocking_import_cluster_definitions() { "import_cluster_wide_definitions returned {result:?}" ); - let result1 = rc.get_queue_info("/", queue_name); + let result1 = rc.get_queue_info(vh, queue_name); assert!( result1.is_ok(), "can't get the imported import_cluster_wide_definitions: {result1:?}" ); - rc.delete_queue("/", queue_name, true).unwrap(); + rc.delete_queue(vh, queue_name, true).unwrap(); } #[test] From f2ca563b6434bd1be29211c8351112c1151f2edb Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Fri, 8 Aug 2025 15:35:45 -0400 Subject: [PATCH 04/16] Introduce Client#list_shovels_in --- src/api.rs | 6 + src/blocking_api.rs | 6 + src/responses.rs | 4 + tests/async_deprecated_feature_tests.rs | 9 +- tests/async_dynamic_shovel_tests.rs | 122 +++++++++++++++++++++ tests/blocking_deprecated_feature_tests.rs | 9 +- tests/blocking_dynamic_shovel_tests.rs | 122 +++++++++++++++++++++ tests/test_helpers.rs | 30 +++++ 8 files changed, 306 insertions(+), 2 deletions(-) diff --git a/src/api.rs b/src/api.rs index 0d0fba9..a885d5f 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1459,6 +1459,12 @@ where Ok(response) } + pub async fn list_shovels_in(&self, vhost: &str) -> Result> { + let response = self.http_get(path!("shovels", vhost), None, None).await?; + let response = response.json().await?; + Ok(response) + } + pub async fn declare_amqp091_shovel(&self, params: Amqp091ShovelParams<'_>) -> Result<()> { let runtime_param = RuntimeParameterDefinition::from(params); diff --git a/src/blocking_api.rs b/src/blocking_api.rs index 06bd2cb..d349b6d 100644 --- a/src/blocking_api.rs +++ b/src/blocking_api.rs @@ -1243,6 +1243,12 @@ where Ok(response) } + pub fn list_shovels_in(&self, vhost: &str) -> Result> { + let response = self.http_get(path!("shovels", vhost), None, None)?; + let response = response.json()?; + Ok(response) + } + pub fn declare_amqp091_shovel(&self, params: Amqp091ShovelParams<'_>) -> Result<()> { let runtime_param = RuntimeParameterDefinition::from(params); diff --git a/src/responses.rs b/src/responses.rs index a52a08a..7443b18 100644 --- a/src/responses.rs +++ b/src/responses.rs @@ -2483,6 +2483,7 @@ impl From for String { pub enum ShovelState { Starting, Running, + Terminated, Unknown, } @@ -2491,6 +2492,7 @@ impl fmt::Display for ShovelState { match self { ShovelState::Starting => write!(f, "starting"), ShovelState::Running => write!(f, "running"), + ShovelState::Terminated => write!(f, "terminated"), ShovelState::Unknown => write!(f, "unknown"), } } @@ -2501,6 +2503,7 @@ impl From for ShovelState { match value.as_str() { "starting" => ShovelState::Starting, "running" => ShovelState::Running, + "terminated" => ShovelState::Terminated, _ => ShovelState::Unknown, } } @@ -2511,6 +2514,7 @@ impl From for String { match value { ShovelState::Starting => "starting".to_owned(), ShovelState::Running => "running".to_owned(), + ShovelState::Terminated => "terminated".to_owned(), ShovelState::Unknown => "unknown".to_owned(), } } diff --git a/tests/async_deprecated_feature_tests.rs b/tests/async_deprecated_feature_tests.rs index 3836884..b416642 100644 --- a/tests/async_deprecated_feature_tests.rs +++ b/tests/async_deprecated_feature_tests.rs @@ -15,7 +15,9 @@ use rabbitmq_http_client::responses::DeprecationPhase; use rabbitmq_http_client::{api::Client, commons::QueueType, requests::QueueParams}; mod test_helpers; -use crate::test_helpers::{PASSWORD, USERNAME, async_testing_against_3_13_x, endpoint}; +use crate::test_helpers::{ + PASSWORD, USERNAME, async_testing_against_3_13_x, async_testing_against_version, endpoint, +}; #[tokio::test] async fn test_async_list_all_deprecated_features() { @@ -41,6 +43,11 @@ async fn test_async_list_deprecated_features_in_use() { return; } + // because of https://github.com/rabbitmq/rabbitmq-server#14340 + if async_testing_against_version("4.1.3").await { + return; + } + let vh = "/"; let queue_name = "test_async_list_deprecated_features_in_use"; diff --git a/tests/async_dynamic_shovel_tests.rs b/tests/async_dynamic_shovel_tests.rs index 84746b8..4f716c9 100644 --- a/tests/async_dynamic_shovel_tests.rs +++ b/tests/async_dynamic_shovel_tests.rs @@ -62,6 +62,128 @@ async fn test_async_declare_a_dynamic_amqp091_shovel() { let _ = rc.delete_vhost(vh_params.name, false).await; } +#[tokio::test] +async fn test_async_list_all_shovels() { + let endpoint = endpoint(); + let rc = Client::new(&endpoint, USERNAME, PASSWORD); + + if async_testing_against_3_13_x().await { + return; + } + + let vh1 = "rust.http.api.async.test_async_list_all_shovels.1"; + let vh2 = "rust.http.api.async.test_async_list_all_shovels.2"; + let sh1 = "test_async_list_all_shovels.sh-1"; + let sh2 = "test_async_list_all_shovels.sh-2"; + + let vh_params1 = VirtualHostParams::named(vh1); + let result1 = rc.create_vhost(&vh_params1).await; + assert!(result1.is_ok()); + + let vh_params2 = VirtualHostParams::named(vh2); + let result2 = rc.create_vhost(&vh_params2).await; + assert!(result2.is_ok()); + + let src_q1 = format!("{sh1}.src.q"); + let dest_q1 = format!("{sh1}.dest.q"); + + let src_q2 = format!("{sh2}.src.q"); + let dest_q2 = format!("{sh2}.dest.q"); + + let amqp_endpoint1 = amqp_endpoint_with_vhost(vh1); + let shovel_params1 = Amqp091ShovelParams { + vhost: vh1, + name: sh1, + acknowledgement_mode: MessageTransferAcknowledgementMode::WhenConfirmed, + reconnect_delay: Some(5), + source: Amqp091ShovelSourceParams::queue_source(&amqp_endpoint1, &src_q1), + destination: Amqp091ShovelDestinationParams::queue_destination(&amqp_endpoint1, &dest_q1), + }; + let result3 = rc.declare_amqp091_shovel(shovel_params1).await; + assert!(result3.is_ok()); + + let amqp_endpoint2 = amqp_endpoint_with_vhost(vh2); + let shovel_params2 = Amqp091ShovelParams { + vhost: vh2, + name: sh2, + acknowledgement_mode: MessageTransferAcknowledgementMode::WhenConfirmed, + reconnect_delay: Some(5), + source: Amqp091ShovelSourceParams::queue_source(&amqp_endpoint2, &src_q2), + destination: Amqp091ShovelDestinationParams::queue_destination(&amqp_endpoint2, &dest_q2), + }; + let result4 = rc.declare_amqp091_shovel(shovel_params2).await; + assert!(result4.is_ok()); + + await_metric_emission(400); + let result5 = rc.list_shovels().await; + dbg!(&result5); + assert!(result5.is_ok()); + + let shovels = result5.unwrap(); + assert!(shovels.iter().find(|s| s.name == sh1).is_some()); + assert!(shovels.iter().find(|s| s.name == sh2).is_some()); + assert!( + shovels + .iter() + .find(|s| s.name == "a-non-existent-shovel") + .is_none() + ); + + let _ = rc.delete_vhost(vh_params1.name, false).await; + let _ = rc.delete_vhost(vh_params2.name, false).await; +} + +#[tokio::test] +async fn test_async_list_all_shovels_in_a_virtual_host() { + let endpoint = endpoint(); + let rc = Client::new(&endpoint, USERNAME, PASSWORD); + + if async_testing_against_3_13_x().await { + return; + } + + let vh1 = "rust.http.api.async.test_async_list_all_shovels_in_a_virtual_host.1"; + let vh2 = "rust.http.api.async.test_async_list_all_shovels_in_a_virtual_host.2"; + let sh1 = "test_async_list_all_shovels_in_a_virtual_host"; + + let vh_params1 = VirtualHostParams::named(vh1); + let result1 = rc.create_vhost(&vh_params1).await; + assert!(result1.is_ok()); + + let vh_params2 = VirtualHostParams::named(vh2); + let result2 = rc.create_vhost(&vh_params2).await; + assert!(result2.is_ok()); + + let src_q = format!("{sh1}.src.q"); + let dest_q = format!("{sh1}.dest.q"); + + let amqp_endpoint = amqp_endpoint_with_vhost(vh1); + let shovel_params = Amqp091ShovelParams { + vhost: vh1, + name: sh1, + acknowledgement_mode: MessageTransferAcknowledgementMode::WhenConfirmed, + reconnect_delay: Some(5), + source: Amqp091ShovelSourceParams::queue_source(&amqp_endpoint, &src_q), + destination: Amqp091ShovelDestinationParams::queue_destination(&amqp_endpoint, &dest_q), + }; + let result3 = rc.declare_amqp091_shovel(shovel_params).await; + assert!(result3.is_ok()); + + await_metric_emission(300); + let result4 = rc.list_shovels_in(vh1).await; + assert!(result4.is_ok()); + + let shovels = result4.unwrap(); + assert!(shovels.iter().find(|s| s.name == sh1).is_some()); + + let result5 = rc.list_shovels_in(vh2).await; + assert!(result5.is_ok()); + assert!(result5.unwrap().is_empty()); + + let _ = rc.delete_vhost(vh_params1.name, false).await; + let _ = rc.delete_vhost(vh_params2.name, false).await; +} + #[tokio::test] async fn test_async_declare_a_dynamic_amqp10_shovel() { let endpoint = endpoint(); diff --git a/tests/blocking_deprecated_feature_tests.rs b/tests/blocking_deprecated_feature_tests.rs index 76cb3fd..3852134 100644 --- a/tests/blocking_deprecated_feature_tests.rs +++ b/tests/blocking_deprecated_feature_tests.rs @@ -15,7 +15,9 @@ use rabbitmq_http_client::responses::DeprecationPhase; use rabbitmq_http_client::{blocking_api::Client, commons::QueueType, requests::QueueParams}; mod test_helpers; -use crate::test_helpers::{PASSWORD, USERNAME, endpoint, testing_against_3_13_x}; +use crate::test_helpers::{ + PASSWORD, USERNAME, endpoint, testing_against_3_13_x, testing_against_version, +}; #[test] fn test_blocking_list_all_deprecated_features() { @@ -41,6 +43,11 @@ fn test_blocking_list_deprecated_features_in_use() { return; } + // because of https://github.com/rabbitmq/rabbitmq-server#14340 + if testing_against_version("4.1.3") { + return; + } + let vh = "/"; let q = "test_list_deprecated_features_in_use"; diff --git a/tests/blocking_dynamic_shovel_tests.rs b/tests/blocking_dynamic_shovel_tests.rs index bbc7648..2b68670 100644 --- a/tests/blocking_dynamic_shovel_tests.rs +++ b/tests/blocking_dynamic_shovel_tests.rs @@ -62,6 +62,128 @@ fn test_blocking_declare_a_dynamic_amqp091_shovel() { let _ = rc.delete_vhost(vh_params.name, false); } +#[test] +fn test_blocking_list_all_shovels() { + let endpoint = endpoint(); + let rc = Client::new(&endpoint, USERNAME, PASSWORD); + + if testing_against_3_13_x() { + return; + } + + let vh1 = "rust.http.api.blocking.test_blocking_list_all_shovels.1"; + let vh2 = "rust.http.api.blocking.test_blocking_list_all_shovels.2"; + let sh1 = "test_blocking_list_all_shovels.sh-1"; + let sh2 = "test_blocking_list_all_shovels.sh-2"; + + let vh_params1 = VirtualHostParams::named(vh1); + let result1 = rc.create_vhost(&vh_params1); + assert!(result1.is_ok()); + + let vh_params2 = VirtualHostParams::named(vh2); + let result2 = rc.create_vhost(&vh_params2); + assert!(result2.is_ok()); + + let src_q1 = format!("{sh1}.src.q"); + let dest_q1 = format!("{sh1}.dest.q"); + + let src_q2 = format!("{sh2}.src.q"); + let dest_q2 = format!("{sh2}.dest.q"); + + let amqp_endpoint1 = amqp_endpoint_with_vhost(vh1); + let shovel_params1 = Amqp091ShovelParams { + vhost: vh1, + name: sh1, + acknowledgement_mode: MessageTransferAcknowledgementMode::WhenConfirmed, + reconnect_delay: Some(5), + source: Amqp091ShovelSourceParams::queue_source(&amqp_endpoint1, &src_q1), + destination: Amqp091ShovelDestinationParams::queue_destination(&amqp_endpoint1, &dest_q1), + }; + let result3 = rc.declare_amqp091_shovel(shovel_params1); + assert!(result3.is_ok()); + + let amqp_endpoint2 = amqp_endpoint_with_vhost(vh2); + let shovel_params2 = Amqp091ShovelParams { + vhost: vh2, + name: sh2, + acknowledgement_mode: MessageTransferAcknowledgementMode::WhenConfirmed, + reconnect_delay: Some(5), + source: Amqp091ShovelSourceParams::queue_source(&amqp_endpoint2, &src_q2), + destination: Amqp091ShovelDestinationParams::queue_destination(&amqp_endpoint2, &dest_q2), + }; + let result4 = rc.declare_amqp091_shovel(shovel_params2); + assert!(result4.is_ok()); + + await_metric_emission(400); + let result5 = rc.list_shovels(); + dbg!(&result5); + assert!(result5.is_ok()); + + let shovels = result5.unwrap(); + assert!(shovels.iter().find(|s| s.name == sh1).is_some()); + assert!(shovels.iter().find(|s| s.name == sh2).is_some()); + assert!( + shovels + .iter() + .find(|s| s.name == "a-non-existent-shovel") + .is_none() + ); + + let _ = rc.delete_vhost(vh_params1.name, false); + let _ = rc.delete_vhost(vh_params2.name, false); +} + +#[test] +fn test_blocking_list_all_shovels_in_a_virtual_host() { + let endpoint = endpoint(); + let rc = Client::new(&endpoint, USERNAME, PASSWORD); + + if testing_against_3_13_x() { + return; + } + + let vh1 = "rust.http.api.blocking.test_blocking_list_all_shovels_in_a_virtual_host.1"; + let vh2 = "rust.http.api.blocking.test_blocking_list_all_shovels_in_a_virtual_host.2"; + let sh1 = "test_blocking_list_all_shovels_in_a_virtual_host"; + + let vh_params1 = VirtualHostParams::named(vh1); + let result1 = rc.create_vhost(&vh_params1); + assert!(result1.is_ok()); + + let vh_params2 = VirtualHostParams::named(vh2); + let result2 = rc.create_vhost(&vh_params2); + assert!(result2.is_ok()); + + let src_q = format!("{sh1}.src.q"); + let dest_q = format!("{sh1}.dest.q"); + + let amqp_endpoint = amqp_endpoint_with_vhost(vh1); + let shovel_params = Amqp091ShovelParams { + vhost: vh1, + name: sh1, + acknowledgement_mode: MessageTransferAcknowledgementMode::WhenConfirmed, + reconnect_delay: Some(5), + source: Amqp091ShovelSourceParams::queue_source(&amqp_endpoint, &src_q), + destination: Amqp091ShovelDestinationParams::queue_destination(&amqp_endpoint, &dest_q), + }; + let result3 = rc.declare_amqp091_shovel(shovel_params); + assert!(result3.is_ok()); + + await_metric_emission(300); + let result4 = rc.list_shovels_in(vh1); + assert!(result4.is_ok()); + + let shovels = result4.unwrap(); + assert!(shovels.iter().find(|s| s.name == sh1).is_some()); + + let result5 = rc.list_shovels_in(vh2); + assert!(result5.is_ok()); + assert!(result5.unwrap().is_empty()); + + let _ = rc.delete_vhost(vh_params1.name, false); + let _ = rc.delete_vhost(vh_params2.name, false); +} + #[test] fn test_blocking_declare_a_dynamic_amqp10_shovel() { let endpoint = endpoint(); diff --git a/tests/test_helpers.rs b/tests/test_helpers.rs index 7eb24c0..d0f0904 100644 --- a/tests/test_helpers.rs +++ b/tests/test_helpers.rs @@ -68,6 +68,14 @@ pub fn testing_against_4_0_x() -> bool { testing_against_series("^4.0") } +pub fn testing_against_4_1_x() -> bool { + testing_against_series("^4.1") +} + +pub fn testing_against_4_2_x() -> bool { + testing_against_series("^4.2") +} + pub fn testing_against_series(series: &str) -> bool { let endpoint = endpoint(); let rc = BlockingClient::new(&endpoint, USERNAME, PASSWORD); @@ -76,6 +84,13 @@ pub fn testing_against_series(series: &str) -> bool { regex.is_match(&rc.server_version().unwrap()) } +pub fn testing_against_version(series: &str) -> bool { + let endpoint = endpoint(); + let rc = BlockingClient::new(&endpoint, USERNAME, PASSWORD); + + &rc.server_version().unwrap() == series +} + pub fn await_metric_emission(ms: u64) { std::thread::sleep(Duration::from_millis(ms)); } @@ -97,6 +112,14 @@ pub async fn async_testing_against_4_0_x() -> bool { async_testing_against_series("^4.0").await } +pub async fn async_testing_against_4_1_x() -> bool { + async_testing_against_series("^4.1").await +} + +pub async fn async_testing_against_4_2_x() -> bool { + async_testing_against_series("^4.2").await +} + pub async fn async_testing_against_series(series: &str) -> bool { let endpoint = endpoint(); let rc = AsyncClient::new(&endpoint, USERNAME, PASSWORD); @@ -105,6 +128,13 @@ pub async fn async_testing_against_series(series: &str) -> bool { regex.is_match(&rc.server_version().await.unwrap()) } +pub async fn async_testing_against_version(series: &str) -> bool { + let endpoint = endpoint(); + let rc = AsyncClient::new(&endpoint, USERNAME, PASSWORD); + + &rc.server_version().await.unwrap() == series +} + pub async fn async_await_metric_emission(ms: u64) { time::sleep(Duration::from_millis(ms)).await; } From 5d60f340016cf6ef9289e4545e30eb5898f9f7e2 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 11 Aug 2025 15:46:04 -0400 Subject: [PATCH 05/16] Change log updates --- CHANGELOG.md | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fc2b90..2298462 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,18 @@ # Rust Client for the RabbitMQ HTTP API Change Log -## v0.41.0 (in development) +## v0.42.0 (in development) No changes yet. +## v0.41.0 (Aug 11, 2025) + +### Enhancements + + * `Client#list_shovels_in` is a new function that returns a list of shovels in a specific virtual host + * `ShovelState` now includes one more state, `ShovelState::Terminated` + + ## v0.40.0 (Jul 17, 2025) ### Enhancements From d0d33603e64bc6f8b6002d8a1b2f63a53823f567 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 11 Aug 2025 15:48:42 -0400 Subject: [PATCH 06/16] 0.41.0 --- Cargo.toml | 2 +- README.md | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 16632d1..3be8200 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rabbitmq_http_client" -version = "0.40.0" +version = "0.41.0" edition = "2024" description = "RabbitMQ HTTP API client" diff --git a/README.md b/README.md index 748c0bc..7ab2682 100644 --- a/README.md +++ b/README.md @@ -14,25 +14,25 @@ This library is relatively young, breaking API changes are possible. ### Blocking Client ```toml -rabbitmq_http_client = { version = "0.40.0", features = ["core", "blocking"] } +rabbitmq_http_client = { version = "0.41.0", features = ["core", "blocking"] } ``` ### Async Client ```toml -rabbitmq_http_client = { version = "0.40.0", features = ["core", "async"] } +rabbitmq_http_client = { version = "0.41.0", features = ["core", "async"] } ``` ### Blocking Client with Tabled Support ```toml -rabbitmq_http_client = { version = "0.40.0", features = ["core", "blocking", "tabled"] } +rabbitmq_http_client = { version = "0.41.0", features = ["core", "blocking", "tabled"] } ``` ### Async Client with Tabled Support ```toml -rabbitmq_http_client = { version = "0.40.0", features = ["core", "async", "tabled"] } +rabbitmq_http_client = { version = "0.41.0", features = ["core", "async", "tabled"] } ``` From 66e956956cf87b5876359a1e762f8fe7e1aec123 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 11 Aug 2025 15:50:04 -0400 Subject: [PATCH 07/16] Bump dev version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 3be8200..6ae70fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rabbitmq_http_client" -version = "0.41.0" +version = "0.42.0" edition = "2024" description = "RabbitMQ HTTP API client" From 8df0c1dedc25da78da7ffc8885ae815c09fc5327 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 14 Aug 2025 02:22:54 -0400 Subject: [PATCH 08/16] Static shovels do not have an associated virtual host --- CHANGELOG.md | 8 +++++++- src/responses.rs | 7 ++++++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2298462..c563625 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,16 @@ # Rust Client for the RabbitMQ HTTP API Change Log -## v0.42.0 (in development) +## v0.43.0 (in development) No changes yet. +## v0.42.0 (in development) + + * `responses::Shovel.vhost` is now an option because this field will be missing for + static shovels. + + ## v0.41.0 (Aug 11, 2025) ### Enhancements diff --git a/src/responses.rs b/src/responses.rs index 7443b18..3184635 100644 --- a/src/responses.rs +++ b/src/responses.rs @@ -2561,10 +2561,15 @@ impl From for String { #[derive(Debug, Deserialize, Clone, Eq, PartialEq)] #[cfg_attr(feature = "tabled", derive(Tabled))] #[allow(dead_code)] +/// Represents a shovel. pub struct Shovel { pub node: String, pub name: String, - pub vhost: String, + /// Dynamic shovels are associated with a virtual host but + /// static ones are not, so for them, the [`vhost`] field + /// will be [`None`]. + #[cfg_attr(feature = "tabled", tabled(display = "display_option"))] + pub vhost: Option, #[serde(rename = "type")] #[cfg_attr(feature = "tabled", tabled(rename = "type"))] pub typ: ShovelType, From 7b72a3a02195ff9cbe5b0a14d1c08ae8b3be855c Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 14 Aug 2025 02:25:51 -0400 Subject: [PATCH 09/16] Bump dev version --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 6ae70fd..ee03572 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rabbitmq_http_client" -version = "0.42.0" +version = "0.43.0" edition = "2024" description = "RabbitMQ HTTP API client" From 11f6733d655f1a0034acc67fd2593afa50e8f16e Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Thu, 14 Aug 2025 02:42:39 -0400 Subject: [PATCH 10/16] Change log cosmetics --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c563625..c90c0e0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,9 @@ No changes yet. -## v0.42.0 (in development) +## v0.42.0 (Aug 14, 2025) + +### Bug Fixes * `responses::Shovel.vhost` is now an option because this field will be missing for static shovels. From a4df07f93d74a01f1dff5f95461dc2817b216520 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Sun, 17 Aug 2025 17:07:48 -0400 Subject: [PATCH 11/16] Async client: docs --- src/api.rs | 194 ++++++++++++++++++++++++++-------- src/blocking_api.rs | 247 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 396 insertions(+), 45 deletions(-) diff --git a/src/api.rs b/src/api.rs index a885d5f..05c5b33 100644 --- a/src/api.rs +++ b/src/api.rs @@ -78,9 +78,13 @@ impl Default for ClientBuilder { } impl ClientBuilder { - /// Constructs a new `ClientBuilder`. + /// Constructs a new `ClientBuilder` with default settings. /// - /// This is the same as `Client::builder()`. + /// The default configuration uses `http://localhost:15672/api` as the endpoint + /// and `guest/guest` as the credentials. This is the same as calling `Client::builder()`. + /// + /// Note that the default credentials are [limited to local connections](https://www.rabbitmq.com/docs/access-control) + /// for security reasons. pub fn new() -> Self { let client = HttpClient::new(); Self { @@ -98,6 +102,7 @@ where U: fmt::Display, P: fmt::Display, { + /// Sets the API credentials. pub fn with_basic_auth_credentials( self, username: NewU, @@ -115,6 +120,10 @@ where } } + /// Sets the HTTP API endpoint URL. + /// + /// The endpoint should include the scheme, host, port, and API endpoint path. + /// Some examples: `http://localhost:15672/api` or `https://rabbitmq.example.com:15672/api`. pub fn with_endpoint(self, endpoint: T) -> ClientBuilder where T: fmt::Display, @@ -127,17 +136,22 @@ where } } + /// Sets a custom HTTP client. + /// + /// Use a custom HTTP client to configure custom timeouts, proxy settings, TLS configuration. pub fn with_client(self, client: HttpClient) -> Self { ClientBuilder { client, ..self } } - /// Returns a `Client` that uses this `ClientBuilder` configuration. + /// Builds and returns a configured `Client`. + /// + /// This consumes the `ClientBuilder`. pub fn build(self) -> Client { Client::from_http_client(self.client, self.endpoint, self.username, self.password) } } -/// A client for the [RabbitMQ HTTP API](https://rabbitmq.com/docs/management/#http-api). +/// A client for the [RabbitMQ HTTP API](https://www.rabbitmq.com/docs/http-api-reference). /// /// Most functions provided by this type represent various HTTP API operations. /// For example, @@ -186,6 +200,12 @@ where /// let password = "password"; /// let rc = Client::new(endpoint, username, password); /// ``` + /// Creates a new async RabbitMQ HTTP API client. + /// + /// This creates a client configured to connect to the specified endpoint + /// using the provided credentials. The client uses async/await for all + /// HTTP operations and is suitable for use in async runtime environments + /// like Tokio. pub fn new(endpoint: E, username: U, password: P) -> Self { let client = HttpClient::builder().build().unwrap(); @@ -253,10 +273,18 @@ where self.get_api_request("connections").await } + /// Returns information about a connection. + /// + /// Connection name is usually obtained from `crate::responses::Connection` or `crate::responses::UserConnection`, + /// e.g. via `Client#list_connections`, `Client#list_connections_in`, `Client#list_user_connections`. pub async fn get_connection_info(&self, name: &str) -> Result { self.get_api_request(path!("connections", name)).await } + /// Returns information about a stream connection. + /// + /// Connection name is usually obtained from `crate::responses::Connection` or `crate::responses::UserConnection`, + /// e.g. via `Client#list_stream_connections`, `Client#list_stream_connections_in`, `Client#list_user_connections`. pub async fn get_stream_connection_info( &self, virtual_host: &str, @@ -266,6 +294,9 @@ where .await } + /// Closes a connection with an optional reason. + /// + /// The reason will be passed on in the connection error to the client and will be logged on the RabbitMQ end. pub async fn close_connection(&self, name: &str, reason: Option<&str>) -> Result<()> { match reason { None => { @@ -287,6 +318,12 @@ where Ok(()) } + /// Closes all connections for a user with an optional reason. + /// + /// The reason will be passed on in the connection error to the client and will be logged on the RabbitMQ end. + /// + /// This is en equivalent of listing all connections of a user with `Client#list_user_connections` and then + /// closing them one by one. pub async fn close_user_connections(&self, username: &str, reason: Option<&str>) -> Result<()> { match reason { None => { @@ -653,11 +690,16 @@ where .await } + /// Sets [user permissions](https://www.rabbitmq.com/docs/access-control) in a specific virtual host. pub async fn declare_permissions(&self, params: &Permissions<'_>) -> Result<()> { self.put_api_request(path!("permissions", params.vhost, params.user), params) .await } + /// Grants full permissions for a user on a virtual host. + /// + /// "Full permissions" here means the permissions that match all objects, that is, + /// ".*" for every permission category. pub async fn grant_permissions(&self, vhost: &str, user: &str) -> Result<()> { let _response = self .http_delete(path!("permissions", vhost, user), None, None) @@ -665,11 +707,23 @@ where Ok(()) } + /// Declares a [queue](https://www.rabbitmq.com/docs/queues). + /// + /// If the queue already exists with different parameters, this operation may fail + /// unless the parameters are equivalent. pub async fn declare_queue(&self, vhost: &str, params: &QueueParams<'_>) -> Result<()> { self.put_api_request(path!("queues", vhost, params.name), params) .await } + /// Declares a [RabbitMQ stream](https://www.rabbitmq.com/docs/streams). + /// + /// Streams are a durable, replicated, long-lived data structure in RabbitMQ designed for + /// high-throughput scenarios. Unlike traditional queues, streams are append-only + /// logs that support multiple consumers reading from different offsets. + /// + /// If the stream already exists with different parameters, this operation may fail + /// unless the parameters are equivalent. pub async fn declare_stream(&self, vhost: &str, params: &StreamParams<'_>) -> Result<()> { let mut m: Map = Map::new(); @@ -691,11 +745,21 @@ where Ok(()) } + /// Declares an [exchange](https://www.rabbitmq.com/docs/exchanges). + /// + /// If the exchange already exists with different parameters, this operation may fail + /// unless the parameters are equivalent. pub async fn declare_exchange(&self, vhost: &str, params: &ExchangeParams<'_>) -> Result<()> { self.put_api_request(path!("exchanges", vhost, params.name), params) .await } + /// Binds a queue or a stream to an exchange. + /// + /// Bindings determine how messages published to an exchange are routed to queues. + /// The exchange type, routing key and arguments define the routing behavior. + /// + /// Both the source (exchange) and destination (queue or stream) must exist. pub async fn bind_queue( &self, vhost: &str, @@ -723,6 +787,14 @@ where Ok(()) } + /// Bindgs one exchange to another (creates and [exchange-to-exchange binding](https://www.rabbitmq.com/docs/e2e)). + /// + /// This allows messages published to the source exchange to be forwarded to + /// + /// Exchange-to-exchange bindings enable complex routing topologies and + /// message flow patterns. + /// + /// Both source and destination exchanges must exist. pub async fn bind_exchange( &self, vhost: &str, @@ -750,16 +822,25 @@ where Ok(()) } + /// Deletes a virtual host. + /// + /// Unless `idempotently` is set to `true`, an attempt to delete a non-existent virtual host + /// will fail. pub async fn delete_vhost(&self, vhost: &str, idempotently: bool) -> Result<()> { self.delete_api_request_with_optional_not_found(path!("vhosts", vhost), idempotently) .await } + /// Deletes a user with the given username. + /// + /// Unless `idempotently` is set to `true`, an attempt to delete a non-existent user + /// will fail. pub async fn delete_user(&self, username: &str, idempotently: bool) -> Result<()> { self.delete_api_request_with_optional_not_found(path!("users", username), idempotently) .await } + /// Deletes a group of users with the given usernames. pub async fn delete_users(&self, usernames: Vec<&str>) -> Result<()> { let delete = BulkUserDelete { usernames }; let _response = self @@ -768,6 +849,7 @@ where Ok(()) } + /// Revokes user permissions in a specific virtual host. pub async fn clear_permissions( &self, vhost: &str, @@ -785,15 +867,27 @@ where Ok(()) } + /// Deletes a queue in a specified virtual host. + /// + /// Unless `idempotently` is set to `true`, an attempt to delete a non-existent queue + /// will fail. pub async fn delete_queue(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> { self.delete_api_request_with_optional_not_found(path!("queues", vhost, name), idempotently) .await } + /// Deletes a stream in a specified virtual host. + /// + /// Unless `idempotently` is set to `true`, an attempt to delete a non-existent stream + /// will fail. pub async fn delete_stream(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> { self.delete_queue(vhost, name, idempotently).await } + /// Deletes an exchange in a specified virtual host. + /// + /// Unless `idempotently` is set to `true`, an attempt to delete a non-existent exchange + /// will fail. pub async fn delete_exchange(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> { self.delete_api_request_with_optional_not_found( path!("exchanges", vhost, name), @@ -1232,43 +1326,6 @@ where self.get_api_request("permissions").await } - // Helper methods for common patterns - async fn get_api_request(&self, path: S) -> Result - where - T: serde::de::DeserializeOwned, - S: AsRef, - { - let response = self.http_get(path, None, None).await?; - let response = response.json().await?; - Ok(response) - } - - async fn delete_api_request_with_optional_not_found( - &self, - path: S, - idempotent: bool, - ) -> Result<()> - where - S: AsRef, - { - let excludes = if idempotent { - Some(StatusCode::NOT_FOUND) - } else { - None - }; - self.http_delete(path, excludes, None).await?; - Ok(()) - } - - async fn put_api_request(&self, path: S, payload: &T) -> Result<()> - where - S: AsRef, - T: Serialize, - { - self.http_put(path, payload, None, None).await?; - Ok(()) - } - pub async fn list_permissions_in(&self, vhost: &str) -> Result> { let response = self .http_get(path!("vhosts", vhost, "permissions"), None, None) @@ -1414,6 +1471,7 @@ where // Federation // + /// Lists [federation](https://www.rabbitmq.com/docs/federation) upstreams defined in the cluster. pub async fn list_federation_upstreams(&self) -> Result> { let response = self .list_runtime_parameters_of_component(FEDERATION_UPSTREAM_COMPONENT) @@ -1428,12 +1486,17 @@ where Ok(upstreams) } + /// Lists [federation](https://www.rabbitmq.com/docs/federation) links (connections) running in the cluster. pub async fn list_federation_links(&self) -> Result> { let response = self.http_get("federation-links", None, None).await?; let response = response.json().await?; Ok(response) } + /// Creates or updates a [federation](https://www.rabbitmq.com/docs/federation) upstream. + /// + /// Federation upstreams define connection endpoints for federation links (connections that federate + /// queues or exchanges). pub async fn declare_federation_upstream( &self, params: FederationUpstreamParams<'_>, @@ -1453,30 +1516,40 @@ where // Shovels // + /// Lists [shovel](https://www.rabbitmq.com/docs/shovel) across all virtual hosts in the cluster. pub async fn list_shovels(&self) -> Result> { let response = self.http_get("shovels", None, None).await?; let response = response.json().await?; Ok(response) } + /// Lists [dynamic shovels](https://www.rabbitmq.com/docs/shovel-dynamic) in a specific virtual host. pub async fn list_shovels_in(&self, vhost: &str) -> Result> { let response = self.http_get(path!("shovels", vhost), None, None).await?; let response = response.json().await?; Ok(response) } + /// Declares [shovel](https://www.rabbitmq.com/docs/shovel) that will use the AMQP 0-9-1 protocol + /// for both source and destination collection. pub async fn declare_amqp091_shovel(&self, params: Amqp091ShovelParams<'_>) -> Result<()> { let runtime_param = RuntimeParameterDefinition::from(params); self.declare_shovel_parameters(&runtime_param).await } + /// Declares [shovel](https://www.rabbitmq.com/docs/shovel) that will use the AMQP 1.0 protocol + /// for both source and destination collection. pub async fn declare_amqp10_shovel(&self, params: Amqp10ShovelParams<'_>) -> Result<()> { let runtime_param = RuntimeParameterDefinition::from(params); self.declare_shovel_parameters(&runtime_param).await } + /// Deletes a shovel in a specified virtual host. + /// + /// Unless `idempotently` is set to `true`, an attempt to delete a non-existent shovel + /// will fail. pub async fn delete_shovel(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> { let excludes = if idempotently { Some(StatusCode::NOT_FOUND) @@ -1540,12 +1613,15 @@ where Ok(response) } + /// Provides an overview of the most commonly used cluster metrics. + /// See `crate::responses::Overview`. pub async fn overview(&self) -> Result { let response = self.http_get("overview", None, None).await?; let response = response.json().await?; Ok(response) } + /// Returns the version of RabbitMQ used by the API endpoint. pub async fn server_version(&self) -> Result { let response = self.http_get("overview", None, None).await?; let response: Overview = response.json().await?; @@ -1722,6 +1798,42 @@ where // Implementation // + async fn get_api_request(&self, path: S) -> Result + where + T: serde::de::DeserializeOwned, + S: AsRef, + { + let response = self.http_get(path, None, None).await?; + let response = response.json().await?; + Ok(response) + } + + async fn delete_api_request_with_optional_not_found( + &self, + path: S, + idempotent: bool, + ) -> Result<()> + where + S: AsRef, + { + let excludes = if idempotent { + Some(StatusCode::NOT_FOUND) + } else { + None + }; + self.http_delete(path, excludes, None).await?; + Ok(()) + } + + async fn put_api_request(&self, path: S, payload: &T) -> Result<()> + where + S: AsRef, + T: Serialize, + { + self.http_put(path, payload, None, None).await?; + Ok(()) + } + async fn declare_shovel_parameters( &self, runtime_param: &RuntimeParameterDefinition<'_>, diff --git a/src/blocking_api.rs b/src/blocking_api.rs index d349b6d..9e3a7fb 100644 --- a/src/blocking_api.rs +++ b/src/blocking_api.rs @@ -99,6 +99,11 @@ where U: fmt::Display, P: fmt::Display, { + /// Sets the basic authentication credentials for HTTP requests. + /// + /// These credentials will be used for HTTP Basic Authentication when making + /// requests to the RabbitMQ Management API. Most RabbitMQ installations + /// require authentication to access the management interface. pub fn with_basic_auth_credentials( self, username: NewU, @@ -116,6 +121,11 @@ where } } + /// Sets the HTTP API endpoint URL for the RabbitMQ Management API. + /// + /// The endpoint should include the protocol, host, port, and path to the API. + /// For example: `http://localhost:15672/api` or `https://rabbitmq.example.com:15672/api`. + /// The default endpoint is `http://localhost:15672/api`. pub fn with_endpoint(self, endpoint: T) -> ClientBuilder where T: fmt::Display, @@ -128,6 +138,10 @@ where } } + /// Sets a custom HTTP client for making requests. + /// + /// This allows you to configure custom timeouts, proxy settings, TLS configuration, + /// or other HTTP client behavior by providing your own configured `HttpClient` instance. pub fn with_client(self, client: HttpClient) -> Self { ClientBuilder { client, ..self } } @@ -254,10 +268,20 @@ where self.get_api_request("connections") } + /// Returns detailed information about a specific AMQP connection. + /// + /// The connection name is typically generated by RabbitMQ and can be found + /// in the output of `list_connections()`. This function provides comprehensive + /// information including connection state, client properties, and channel count. pub fn get_connection_info(&self, name: &str) -> Result { self.get_api_request(path!("connections", name)) } + /// Returns detailed information about a specific RabbitMQ Stream connection. + /// + /// Stream connections are used by clients connecting via the RabbitMQ Stream protocol. + /// This function provides information about the connection state, client details, + /// and associated publishers/consumers. pub fn get_stream_connection_info( &self, virtual_host: &str, @@ -266,6 +290,11 @@ where self.get_api_request(path!("stream", "connections", virtual_host, name)) } + /// Forcefully closes a client connection with an optional reason message. + /// + /// This operation will immediately terminate the specified connection. The optional + /// reason will be sent to the client before closing. Use this carefully as it will + /// disrupt any ongoing operations on that connection. pub fn close_connection(&self, name: &str, reason: Option<&str>) -> Result<()> { match reason { None => self.http_delete( @@ -283,6 +312,12 @@ where Ok(()) } + /// Forcefully closes all connections belonging to a specific user. + /// + /// This operation will terminate all active connections for the specified user, + /// regardless of which virtual host they're connected to. An optional reason + /// message can be provided. This is useful for administrative purposes when + /// you need to disconnect a specific user from the cluster. pub fn close_user_connections(&self, username: &str, reason: Option<&str>) -> Result<()> { match reason { None => self.http_delete( @@ -538,15 +573,31 @@ where self.put_api_request(path!("users", params.name), params) } + /// Sets permissions for a user on a specific virtual host. + /// + /// Permissions in RabbitMQ consist of configure, write, and read privileges + /// that are defined using regular expressions. This function will create or + /// update the permissions for the specified user and virtual host combination. pub fn declare_permissions(&self, params: &Permissions) -> Result<()> { self.put_api_request(path!("permissions", params.vhost, params.user), params) } + /// Revokes all permissions for a user on a virtual host. + /// + /// This function removes all configure, write, and read permissions for the + /// specified user on the given virtual host. The user will no longer be able + /// to access resources in that virtual host until new permissions are granted. pub fn grant_permissions(&self, vhost: &str, user: &str) -> Result<()> { self.http_delete(path!("permissions", vhost, user), None, None)?; Ok(()) } + /// Declares a queue with the specified parameters. + /// + /// Creates a new queue or ensures an existing queue has the specified configuration. + /// If the queue already exists with different parameters, this operation may fail + /// unless the parameters are compatible. Queue parameters include durability, + /// auto-delete behavior, arguments, and other queue-specific settings. pub fn declare_queue(&self, vhost: &str, params: &QueueParams) -> Result<()> { self.put_api_request(path!("queues", vhost, params.name), params) } @@ -584,6 +635,12 @@ where Ok(()) } + /// Declares a RabbitMQ stream with the specified parameters. + /// + /// Streams are a durable, replicated data structure in RabbitMQ designed for + /// high-throughput scenarios. Unlike traditional queues, streams are append-only + /// logs that support multiple consumers reading from different offsets. This function + /// creates a new stream or ensures an existing stream has the specified configuration. pub fn declare_stream(&self, vhost: &str, params: &StreamParams<'_>) -> Result<()> { let mut m: Map = Map::new(); @@ -604,10 +661,22 @@ where Ok(()) } + /// Declares an exchange with the specified parameters. + /// + /// Creates a new exchange or ensures an existing exchange has the specified configuration. + /// Exchanges route messages to queues based on routing rules. The exchange type + /// (direct, topic, fanout, headers) determines the routing behavior. If the exchange + /// already exists with incompatible parameters, this operation may fail. pub fn declare_exchange(&self, vhost: &str, params: &ExchangeParams) -> Result<()> { self.put_api_request(path!("exchanges", vhost, params.name), params) } + /// Creates a binding between a queue and an exchange. + /// + /// Bindings determine how messages published to an exchange are routed to queues. + /// The routing key and arguments specify the conditions under which messages + /// will be delivered to the queue. The meaning of the routing key depends on + /// the exchange type (direct, topic, fanout, headers). pub fn bind_queue( &self, vhost: &str, @@ -633,6 +702,12 @@ where Ok(()) } + /// Creates a binding between two exchanges (exchange-to-exchange binding). + /// + /// This allows messages published to the source exchange to be forwarded to + /// the destination exchange based on the routing key and binding arguments. + /// Exchange-to-exchange bindings enable complex routing topologies and + /// message flow patterns. pub fn bind_exchange( &self, vhost: &str, @@ -658,20 +733,44 @@ where Ok(()) } + /// Deletes a virtual host and all its contents. + /// + /// This is a destructive operation that will permanently remove the virtual host + /// along with all queues, exchanges, bindings, and messages it contains. All + /// connections to this virtual host will be closed. If `idempotently` is true, + /// the operation will succeed even if the virtual host doesn't exist. pub fn delete_vhost(&self, vhost: &str, idempotently: bool) -> Result<()> { self.delete_api_request_with_optional_not_found(path!("vhosts", vhost), idempotently) } + /// Deletes a user from the internal RabbitMQ user database. + /// + /// This removes the user account entirely, including all associated permissions + /// across all virtual hosts. Active connections belonging to this user will be + /// closed. If `idempotently` is true, the operation will succeed even if the + /// user doesn't exist. pub fn delete_user(&self, username: &str, idempotently: bool) -> Result<()> { self.delete_api_request_with_optional_not_found(path!("users", username), idempotently) } + /// Deletes multiple users from the internal database in a single operation. + /// + /// This is more efficient than calling `delete_user` multiple times when you + /// need to remove several user accounts. All specified users will be deleted + /// along with their permissions, and any active connections will be closed. + /// Non-existent users in the list are silently ignored. pub fn delete_users(&self, usernames: Vec<&str>) -> Result<()> { let delete = BulkUserDelete { usernames }; let _response = self.http_post(path!("users", "bulk-delete"), &delete, None, None)?; Ok(()) } + /// Removes all permissions for a user on a specific virtual host. + /// + /// After this operation, the user will no longer have configure, write, or read + /// permissions on the specified virtual host, but their permissions on other + /// virtual hosts remain unchanged. If `idempotently` is true, the operation + /// succeeds even if no permissions existed. pub fn clear_permissions(&self, vhost: &str, username: &str, idempotently: bool) -> Result<()> { self.delete_api_request_with_optional_not_found( path!("permissions", vhost, username), @@ -679,14 +778,33 @@ where ) } + /// Deletes a queue and all its contents. + /// + /// This is a destructive operation that permanently removes the queue and all + /// messages it contains. Any consumers connected to this queue will be disconnected. + /// Bindings to this queue are also removed. If `idempotently` is true, the + /// operation succeeds even if the queue doesn't exist. pub fn delete_queue(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> { self.delete_api_request_with_optional_not_found(path!("queues", vhost, name), idempotently) } + /// Deletes a RabbitMQ stream and all its data. + /// + /// This permanently removes the stream and all stored messages. Unlike traditional + /// queues, streams may have data replicated across multiple cluster nodes, so + /// this operation will clean up all replicas. Any stream clients (publishers + /// and consumers) will be disconnected. If `idempotently` is true, the operation + /// succeeds even if the stream doesn't exist. pub fn delete_stream(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> { self.delete_queue(vhost, name, idempotently) } + /// Deletes an exchange and all its bindings. + /// + /// This removes the exchange and all bindings where it serves as either source + /// or destination. Messages currently being routed through this exchange may + /// be lost. Built-in exchanges (like the default exchange) cannot be deleted. + /// If `idempotently` is true, the operation succeeds even if the exchange doesn't exist. pub fn delete_exchange(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> { self.delete_api_request_with_optional_not_found( path!("exchanges", vhost, name), @@ -694,6 +812,12 @@ where ) } + /// Removes a specific binding between an exchange and queue/exchange. + /// + /// This operation requires exact matching of the source, destination, routing key, + /// and arguments to identify the specific binding to delete. If multiple bindings + /// match the criteria, an error is returned. The function first queries existing + /// bindings to find the one to delete, then removes it. pub fn delete_binding( &self, virtual_host: &str, @@ -755,18 +879,35 @@ where } } + /// Removes all messages from a queue without deleting the queue itself. + /// + /// This operation immediately deletes all messages currently in the queue, + /// but leaves the queue structure, bindings, and consumers intact. This is + /// useful for clearing out accumulated messages during development or + /// troubleshooting. The purged messages are permanently lost. pub fn purge_queue(&self, virtual_host: &str, name: &str) -> Result<()> { let _response = self.http_delete(path!("queues", virtual_host, name, "contents"), None, None)?; Ok(()) } + /// Lists all runtime parameters configured across the cluster. + /// + /// Runtime parameters are configuration values that can be set dynamically + /// without restarting RabbitMQ. They are used by plugins like Federation + /// and Shovel to store their configuration. This returns parameters from + /// all components and virtual hosts. pub fn list_runtime_parameters(&self) -> Result> { let response = self.http_get("parameters", None, None)?; let response = response.json()?; Ok(response) } + /// Lists all runtime parameters for a specific RabbitMQ component. + /// + /// Components like "federation-upstream", "shovel", and others use runtime + /// parameters to store their configuration. This function returns only the + /// parameters belonging to the specified component across all virtual hosts. pub fn list_runtime_parameters_of_component( &self, component: &str, @@ -776,6 +917,12 @@ where Ok(response) } + /// Lists runtime parameters for a component within a specific virtual host. + /// + /// This narrows down the results to only parameters belonging to both the + /// specified component (like "federation-upstream" or "shovel") and the + /// specific virtual host. This is useful when managing component configurations + /// that are scoped to individual virtual hosts. pub fn list_runtime_parameters_of_component_in( &self, component: &str, @@ -786,6 +933,11 @@ where Ok(response) } + /// Retrieves a specific runtime parameter by component, virtual host, and name. + /// + /// Runtime parameters are uniquely identified by the combination of component + /// name, virtual host, and parameter name. This function returns the detailed + /// configuration for the specified parameter, including its value and metadata. pub fn get_runtime_parameter( &self, component: &str, @@ -797,6 +949,12 @@ where Ok(response) } + /// Creates a new runtime parameter or updates an existing one. + /// + /// Runtime parameters store configuration for RabbitMQ components like Federation + /// and Shovel. If a parameter with the same component, virtual host, and name + /// already exists, it will be updated with the new value. Otherwise, a new + /// parameter is created. pub fn upsert_runtime_parameter<'a>( &self, param: &'a RuntimeParameterDefinition<'a>, @@ -810,12 +968,24 @@ where Ok(()) } + /// Removes a specific runtime parameter from the cluster. + /// + /// This permanently deletes the runtime parameter identified by the component, + /// virtual host, and name. Any RabbitMQ component that was using this parameter + /// will lose access to its configuration and may stop functioning until the + /// parameter is recreated. pub fn clear_runtime_parameter(&self, component: &str, vhost: &str, name: &str) -> Result<()> { let _response = self.http_delete(path!("parameters", component, vhost, name), None, None)?; Ok(()) } + /// Removes all runtime parameters from the entire cluster. + /// + /// This is a destructive operation that deletes every runtime parameter across + /// all components and virtual hosts. This will effectively disable all plugins + /// that rely on runtime parameters (like Federation and Shovel) until their + /// parameters are reconfigured. Use with extreme caution. pub fn clear_all_runtime_parameters(&self) -> Result<()> { let params = self.list_runtime_parameters()?; for rp in params { @@ -824,6 +994,7 @@ where Ok(()) } + /// Deletes all runtime parameters for a component. pub fn clear_all_runtime_parameters_of_component(&self, component: &str) -> Result<()> { let params = self.list_runtime_parameters_of_component(component)?; for rp in params { @@ -832,12 +1003,14 @@ where Ok(()) } + /// Lists global runtime parameters. pub fn list_global_runtime_parameters(&self) -> Result> { let response = self.http_get("global-parameters", None, None)?; let response = response.json()?; Ok(response) } + /// Gets a global runtime parameter. pub fn get_global_runtime_parameter( &self, name: &str, @@ -847,6 +1020,7 @@ where Ok(response) } + /// Creates or updates a global runtime parameter. pub fn upsert_global_runtime_parameter<'a>( &self, param: &'a GlobalRuntimeParameterDefinition<'a>, @@ -856,11 +1030,13 @@ where Ok(()) } + /// Deletes a global runtime parameter. pub fn clear_global_runtime_parameter(&self, name: &str) -> Result<()> { let _response = self.http_delete(path!("global-parameters", name), None, None)?; Ok(()) } + /// Sets a limit for a user. pub fn set_user_limit( &self, username: &str, @@ -876,23 +1052,27 @@ where Ok(()) } + /// Clears a user limit. pub fn clear_user_limit(&self, username: &str, kind: UserLimitTarget) -> Result<()> { let _response = self.http_delete(path!("user-limits", username, kind), None, None)?; Ok(()) } + /// Lists all user limits. pub fn list_all_user_limits(&self) -> Result> { let response = self.http_get("user-limits", None, None)?; let response = response.json()?; Ok(response) } + /// Lists limits for a specific user. pub fn list_user_limits(&self, username: &str) -> Result> { let response = self.http_get(path!("user-limits", username), None, None)?; let response = response.json()?; Ok(response) } + /// Sets a limit for a virtual host. pub fn set_vhost_limit( &self, vhost: &str, @@ -904,6 +1084,7 @@ where Ok(()) } + /// Clears a virtual host limit. pub fn clear_vhost_limit(&self, vhost: &str, kind: VirtualHostLimitTarget) -> Result<()> { let _response = self.http_delete( path!("vhost-limits", vhost, kind), @@ -913,35 +1094,41 @@ where Ok(()) } + /// Lists all virtual host limits. pub fn list_all_vhost_limits(&self) -> Result> { let response = self.http_get("vhost-limits", None, None)?; let response = response.json()?; Ok(response) } + /// Lists limits for a specific virtual host. pub fn list_vhost_limits(&self, vhost: &str) -> Result> { let response = self.http_get(path!("vhost-limits", vhost), None, None)?; let response = response.json()?; Ok(response) } + /// Gets the cluster name. pub fn get_cluster_name(&self) -> Result { let response = self.http_get("cluster-name", None, None)?; let response = response.json()?; Ok(response) } + /// Sets the cluster name. pub fn set_cluster_name(&self, new_name: &str) -> Result<()> { let body = json!({"name": new_name}); let _response = self.http_put("cluster-name", &body, None, None)?; Ok(()) } + /// Gets cluster tags. pub fn get_cluster_tags(&self) -> Result { let response = self.get_global_runtime_parameter("cluster_tags")?; Ok(ClusterTags::from(response.value)) } + /// Sets cluster tags. pub fn set_cluster_tags(&self, tags: Map) -> Result<()> { let grp = GlobalRuntimeParameterDefinition { name: "cluster_tags", @@ -951,11 +1138,13 @@ where Ok(()) } + /// Clears all cluster tags. pub fn clear_cluster_tags(&self) -> Result<()> { self.clear_global_runtime_parameter("cluster_tags")?; Ok(()) } + /// Gets a policy. pub fn get_policy(&self, vhost: &str, name: &str) -> Result { let response = self.http_get(path!("policies", vhost, name), None, None)?; let response = response.json()?; @@ -977,6 +1166,7 @@ where Ok(response) } + /// Declares a policy. pub fn declare_policy(&self, params: &PolicyParams) -> Result<()> { let _response = self.http_put( path!("policies", params.vhost, params.name), @@ -996,6 +1186,7 @@ where Ok(()) } + /// Deletes a policy. pub fn delete_policy(&self, vhost: &str, name: &str) -> Result<()> { let _response = self.http_delete( path!("policies", vhost, name), @@ -1014,24 +1205,28 @@ where Ok(()) } + /// Gets an operator policy. pub fn get_operator_policy(&self, vhost: &str, name: &str) -> Result { let response = self.http_get(path!("operator-policies", vhost, name), None, None)?; let response = response.json()?; Ok(response) } + /// Lists all operator policies. pub fn list_operator_policies(&self) -> Result> { let response = self.http_get("operator-policies", None, None)?; let response = response.json()?; Ok(response) } + /// Lists operator policies in a virtual host. pub fn list_operator_policies_in(&self, vhost: &str) -> Result> { let response = self.http_get(path!("operator-policies", vhost), None, None)?; let response = response.json()?; Ok(response) } + /// Declares an operator policy. pub fn declare_operator_policy(&self, params: &PolicyParams) -> Result<()> { let _response = self.http_put( path!("operator-policies", params.vhost, params.name), @@ -1051,6 +1246,7 @@ where Ok(()) } + /// Deletes an operator policy. pub fn delete_operator_policy(&self, vhost: &str, name: &str) -> Result<()> { let _response = self.http_delete( path!("operator-policies", vhost, name), @@ -1069,24 +1265,28 @@ where Ok(()) } + /// Lists all permissions. pub fn list_permissions(&self) -> Result> { let response = self.http_get("permissions", None, None)?; let response = response.json()?; Ok(response) } + /// Lists permissions in a virtual host. pub fn list_permissions_in(&self, vhost: &str) -> Result> { let response = self.http_get(path!("vhosts", vhost, "permissions"), None, None)?; let response = response.json()?; Ok(response) } + /// Lists permissions for a specific user. pub fn list_permissions_of(&self, user: &str) -> Result> { let response = self.http_get(path!("users", user, "permissions"), None, None)?; let response = response.json()?; Ok(response) } + /// Gets permissions for a user in a virtual host. pub fn get_permissions(&self, vhost: &str, user: &str) -> Result { let response = self.http_get(path!("permissions", vhost, user), None, None)?; let response = response.json()?; @@ -1097,6 +1297,7 @@ where // Rebalancing // + /// Rebalances queue leaders across cluster nodes. pub fn rebalance_queue_leaders(&self) -> Result<()> { self.http_post("rebalance/queues", &json!({}), None, None)?; Ok(()) @@ -1105,32 +1306,38 @@ where // // Definitions + /// Exports cluster-wide definitions as JSON string. pub fn export_cluster_wide_definitions(&self) -> Result { self.export_cluster_wide_definitions_as_string() } + /// Exports cluster-wide definitions as JSON string. pub fn export_cluster_wide_definitions_as_string(&self) -> Result { let response = self.http_get("definitions", None, None)?; let response = response.text()?; Ok(response) } + /// Exports cluster-wide definitions as structured data. pub fn export_cluster_wide_definitions_as_data(&self) -> Result { let response = self.http_get("definitions", None, None)?; let response = response.json()?; Ok(response) } + /// Exports virtual host definitions as JSON string. pub fn export_vhost_definitions(&self, vhost: &str) -> Result { self.export_vhost_definitions_as_string(vhost) } + /// Exports virtual host definitions as JSON string. pub fn export_vhost_definitions_as_string(&self, vhost: &str) -> Result { let response = self.http_get(path!("definitions", vhost), None, None)?; let response = response.text()?; Ok(response) } + /// Exports virtual host definitions as structured data. pub fn export_vhost_definitions_as_data( &self, vhost: &str, @@ -1140,15 +1347,18 @@ where Ok(response) } + /// Imports cluster-wide definitions. pub fn import_definitions(&self, definitions: Value) -> Result<()> { self.import_cluster_wide_definitions(definitions) } + /// Imports cluster-wide definitions. pub fn import_cluster_wide_definitions(&self, definitions: Value) -> Result<()> { self.http_post("definitions", &definitions, None, None)?; Ok(()) } + /// Imports virtual host definitions. pub fn import_vhost_definitions(&self, vhost: &str, definitions: Value) -> Result<()> { self.http_post(path!("definitions", vhost), &definitions, None, None)?; Ok(()) @@ -1158,25 +1368,30 @@ where // Health Checks // + /// Performs a cluster-wide alarms health check. pub fn health_check_cluster_wide_alarms(&self) -> Result<()> { self.health_check_alarms("health/checks/alarms") } + /// Performs a local alarms health check. pub fn health_check_local_alarms(&self) -> Result<()> { self.health_check_alarms("health/checks/local-alarms") } + /// Checks if the node is quorum critical. pub fn health_check_if_node_is_quorum_critical(&self) -> Result<()> { let path = "health/checks/node-is-quorum-critical"; self.boolean_health_check(path) } + /// Checks if a port listener is active. pub fn health_check_port_listener(&self, port: u16) -> Result<()> { let port_s = port.to_string(); let path = path!("health", "checks", "port-listener", port_s); self.boolean_health_check(&path) } + /// Checks if a protocol listener is active. pub fn health_check_protocol_listener(&self, protocol: SupportedProtocol) -> Result<()> { let proto: String = String::from(protocol); let path = path!("health", "checks", "protocol-listener", proto); @@ -1205,6 +1420,7 @@ where // Federation // + /// Lists federation upstreams. pub fn list_federation_upstreams(&self) -> Result> { let response = self.list_runtime_parameters_of_component(FEDERATION_UPSTREAM_COMPONENT)?; let upstreams = response @@ -1217,18 +1433,21 @@ where Ok(upstreams) } + /// Lists federation links. pub fn list_federation_links(&self) -> Result> { let response = self.http_get("federation-links", None, None)?; let response = response.json()?; Ok(response) } + /// Declares a federation upstream. pub fn declare_federation_upstream(&self, params: FederationUpstreamParams<'_>) -> Result<()> { let runtime_param = RuntimeParameterDefinition::from(params); self.declare_federation_upstream_with_parameters(&runtime_param) } + /// Deletes a federation upstream. pub fn delete_federation_upstream(&self, vhost: &str, name: &str) -> Result<()> { self.clear_runtime_parameter(FEDERATION_UPSTREAM_COMPONENT, vhost, name) } @@ -1237,30 +1456,40 @@ where // Shovels // + /// Lists all Shovel instances across the cluster. + /// + /// Shovels are a RabbitMQ plugin that moves messages between brokers or between + /// different locations within the same broker. This function returns information + /// about all configured Shovel instances, including their status, configuration, + /// and transfer statistics. pub fn list_shovels(&self) -> Result> { let response = self.http_get("shovels", None, None)?; let response = response.json()?; Ok(response) } + /// Lists shovels in a virtual host. pub fn list_shovels_in(&self, vhost: &str) -> Result> { let response = self.http_get(path!("shovels", vhost), None, None)?; let response = response.json()?; Ok(response) } + /// Declares an AMQP 0.9.1 shovel. pub fn declare_amqp091_shovel(&self, params: Amqp091ShovelParams<'_>) -> Result<()> { let runtime_param = RuntimeParameterDefinition::from(params); self.declare_shovel_parameter(&runtime_param) } + /// Declares an AMQP 1.0 shovel. pub fn declare_amqp10_shovel(&self, params: Amqp10ShovelParams<'_>) -> Result<()> { let runtime_param = RuntimeParameterDefinition::from(params); self.declare_shovel_parameter(&runtime_param) } + /// Deletes a shovel. pub fn delete_shovel(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> { let excludes = if idempotently { Some(StatusCode::NOT_FOUND) @@ -1275,6 +1504,7 @@ where // Publish and consume messages // + /// Publishes a message to an exchange. pub fn publish_message( &self, vhost: &str, @@ -1300,6 +1530,7 @@ where Ok(response) } + /// Gets messages from a queue. pub fn get_messages( &self, vhost: &str, @@ -1318,12 +1549,14 @@ where Ok(response) } + /// Gets cluster overview information. pub fn overview(&self) -> Result { let response = self.http_get("overview", None, None)?; let response = response.json()?; Ok(response) } + /// Gets the RabbitMQ server version. pub fn server_version(&self) -> Result { let response = self.http_get("overview", None, None)?; let response: responses::Overview = response.json()?; @@ -1334,6 +1567,7 @@ where // Feature flags // + /// Lists all feature flags. pub fn list_feature_flags(&self) -> Result { let response = self.http_get("feature-flags", None, None)?; let response = response.json()?; @@ -1341,8 +1575,6 @@ where } /// Enables a feature flag. - /// This function is idempotent: enabling an already enabled feature flag - /// will succeed. pub fn enable_feature_flag(&self, name: &str) -> Result<()> { let body = serde_json::json!({ "name": name @@ -1352,8 +1584,6 @@ where } /// Enables all stable feature flags. - /// This function is idempotent: enabling an already enabled feature flag - /// will succeed. pub fn enable_all_stable_feature_flags(&self) -> Result<()> { // PUT /api/feature-flags/{name}/enable does not support the special 'all' value like 'rabbitmqctl enable_feature_flag' does. // Thus we do what management UI does: discover the stable disabled flags and enable @@ -1379,12 +1609,14 @@ where // Deprecated Features // + /// Lists all deprecated features. pub fn list_all_deprecated_features(&self) -> Result { let response = self.http_get("deprecated-features", None, None)?; let response = response.json()?; Ok(response) } + /// Lists deprecated features currently in use. pub fn list_deprecated_features_in_use(&self) -> Result { let response = self.http_get("deprecated-features/used", None, None)?; let response = response.json()?; @@ -1395,6 +1627,7 @@ where // OAuth 2 Configuration // + /// Gets OAuth 2 configuration. pub fn oauth_configuration(&self) -> Result { let response = self.http_get("auth", None, None)?; let response = response.json()?; @@ -1406,6 +1639,7 @@ where // Schema Definition Sync (Tanzu RabbitMQ) // + /// Gets schema definition sync status. pub fn schema_definition_sync_status( &self, node: Option<&str>, @@ -1421,6 +1655,7 @@ where Ok(response) } + /// Enables schema definition sync on a node. pub fn enable_schema_definition_sync_on_node(&self, node: &str) -> Result<()> { let payload = EmptyPayload::new(); self.http_put( @@ -1433,12 +1668,14 @@ where Ok(()) } + /// Disables schema definition sync on a node. pub fn disable_schema_definition_sync_on_node(&self, node: &str) -> Result<()> { self.http_delete(path!("tanzu", "osr", "schema", "disable", node), None, None)?; Ok(()) } + /// Enables schema definition sync cluster-wide. pub fn enable_schema_definition_sync(&self) -> Result<()> { let payload = EmptyPayload::new(); self.http_put( @@ -1451,6 +1688,7 @@ where Ok(()) } + /// Disables schema definition sync cluster-wide. pub fn disable_schema_definition_sync(&self) -> Result<()> { self.http_delete( path!("tanzu", "osr", "schema", "disable-cluster-wide"), @@ -1465,6 +1703,7 @@ where // Warm Standby Replication (Tanzu RabbitMQ) // + /// Gets warm standby replication status. pub fn warm_standby_replication_status(&self) -> Result { let response = self.http_get("tanzu/osr/standby/status", None, None)?; let response = response.json()?; From 5d44747aa45e8ed522a975ca619a06b4d71520b3 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Sun, 17 Aug 2025 17:18:31 -0400 Subject: [PATCH 12/16] Blocking client: docs --- src/api.rs | 4 +- src/blocking_api.rs | 135 +++++++++++++++++++++++--------------------- 2 files changed, 72 insertions(+), 67 deletions(-) diff --git a/src/api.rs b/src/api.rs index 05c5b33..c09221b 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1633,9 +1633,7 @@ where // Feature flags // - /// Enables a feature flag. - /// This function is idempotent: enabling an already enabled feature flag - /// will succeed. + /// Lists all feature flags. pub async fn list_feature_flags(&self) -> Result { let response = self.http_get("feature-flags", None, None).await?; let response = response.json().await?; diff --git a/src/blocking_api.rs b/src/blocking_api.rs index 9e3a7fb..95f4966 100644 --- a/src/blocking_api.rs +++ b/src/blocking_api.rs @@ -81,11 +81,15 @@ impl Default for ClientBuilder { impl ClientBuilder { /// Constructs a new `ClientBuilder`. /// - /// This is the same as `Client::builder()`. + /// The default configuration uses `http://localhost:15672/api` as the endpoint + /// and `guest/guest` as the credentials. This is the same as calling `Client::builder()`. + /// + /// Note that the default credentials are [limited to local connections](https://www.rabbitmq.com/docs/access-control) + /// for security reasons. pub fn new() -> Self { let client = HttpClient::new(); Self { - endpoint: "http://localhost:15672", + endpoint: "http://localhost:15672/api", username: "guest", password: "guest", client, @@ -123,9 +127,8 @@ where /// Sets the HTTP API endpoint URL for the RabbitMQ Management API. /// - /// The endpoint should include the protocol, host, port, and path to the API. - /// For example: `http://localhost:15672/api` or `https://rabbitmq.example.com:15672/api`. - /// The default endpoint is `http://localhost:15672/api`. + /// The endpoint should include the scheme, host, port, and API endpoint path. + /// Some examples: `http://localhost:15672/api` or `https://rabbitmq.example.com:15672/api`. pub fn with_endpoint(self, endpoint: T) -> ClientBuilder where T: fmt::Display, @@ -138,21 +141,22 @@ where } } - /// Sets a custom HTTP client for making requests. + /// Sets a custom HTTP client. /// - /// This allows you to configure custom timeouts, proxy settings, TLS configuration, - /// or other HTTP client behavior by providing your own configured `HttpClient` instance. + /// Use a custom HTTP client to configure custom timeouts, proxy settings, TLS configuration. pub fn with_client(self, client: HttpClient) -> Self { ClientBuilder { client, ..self } } - /// Returns a `Client` that uses this `ClientBuilder` configuration. + /// Builds and returns a configured `Client`. + /// + /// This consumes the `ClientBuilder`. pub fn build(self) -> Client { Client::from_http_client(self.client, self.endpoint, self.username, self.password) } } -/// A client for the [RabbitMQ HTTP API](https://rabbitmq.com/docs/management/#http-api). +/// A client for the [RabbitMQ HTTP API](https://www.rabbitmq.com/docs/http-api-reference). /// /// Most functions provided by this type represent various HTTP API operations. /// For example, @@ -192,6 +196,8 @@ where { /// Instantiates a client for the specified endpoint with username and password. /// + /// Unlike its counterpart from `crate::api`, this client will perform HTTP API requests synchronously. + /// /// Example /// ```rust /// use rabbitmq_http_client::blocking_api::Client; @@ -268,20 +274,18 @@ where self.get_api_request("connections") } - /// Returns detailed information about a specific AMQP connection. + /// Returns information about a connection. /// - /// The connection name is typically generated by RabbitMQ and can be found - /// in the output of `list_connections()`. This function provides comprehensive - /// information including connection state, client properties, and channel count. + /// Connection name is usually obtained from `crate::responses::Connection` or `crate::responses::UserConnection`, + /// e.g. via `Client#list_connections`, `Client#list_connections_in`, `Client#list_user_connections`. pub fn get_connection_info(&self, name: &str) -> Result { self.get_api_request(path!("connections", name)) } - /// Returns detailed information about a specific RabbitMQ Stream connection. + /// Returns information about a stream connection. /// - /// Stream connections are used by clients connecting via the RabbitMQ Stream protocol. - /// This function provides information about the connection state, client details, - /// and associated publishers/consumers. + /// Connection name is usually obtained from `crate::responses::Connection` or `crate::responses::UserConnection`, + /// e.g. via `Client#list_stream_connections`, `Client#list_stream_connections_in`, `Client#list_user_connections`. pub fn get_stream_connection_info( &self, virtual_host: &str, @@ -290,11 +294,9 @@ where self.get_api_request(path!("stream", "connections", virtual_host, name)) } - /// Forcefully closes a client connection with an optional reason message. + /// Closes a connection with an optional reason. /// - /// This operation will immediately terminate the specified connection. The optional - /// reason will be sent to the client before closing. Use this carefully as it will - /// disrupt any ongoing operations on that connection. + /// The reason will be passed on in the connection error to the client and will be logged on the RabbitMQ end. pub fn close_connection(&self, name: &str, reason: Option<&str>) -> Result<()> { match reason { None => self.http_delete( @@ -312,12 +314,12 @@ where Ok(()) } - /// Forcefully closes all connections belonging to a specific user. + /// Closes all connections for a user with an optional reason. /// - /// This operation will terminate all active connections for the specified user, - /// regardless of which virtual host they're connected to. An optional reason - /// message can be provided. This is useful for administrative purposes when - /// you need to disconnect a specific user from the cluster. + /// The reason will be passed on in the connection error to the client and will be logged on the RabbitMQ end. + /// + /// This is en equivalent of listing all connections of a user with `Client#list_user_connections` and then + /// closing them one by one. pub fn close_user_connections(&self, username: &str, reason: Option<&str>) -> Result<()> { match reason { None => self.http_delete( @@ -582,22 +584,19 @@ where self.put_api_request(path!("permissions", params.vhost, params.user), params) } - /// Revokes all permissions for a user on a virtual host. + /// Grants full permissions for a user on a virtual host. /// - /// This function removes all configure, write, and read permissions for the - /// specified user on the given virtual host. The user will no longer be able - /// to access resources in that virtual host until new permissions are granted. + /// "Full permissions" here means the permissions that match all objects, that is, + /// ".*" for every permission category. pub fn grant_permissions(&self, vhost: &str, user: &str) -> Result<()> { self.http_delete(path!("permissions", vhost, user), None, None)?; Ok(()) } - /// Declares a queue with the specified parameters. + /// Declares a [queue](https://www.rabbitmq.com/docs/queues). /// - /// Creates a new queue or ensures an existing queue has the specified configuration. /// If the queue already exists with different parameters, this operation may fail - /// unless the parameters are compatible. Queue parameters include durability, - /// auto-delete behavior, arguments, and other queue-specific settings. + /// unless the parameters are equivalent. pub fn declare_queue(&self, vhost: &str, params: &QueueParams) -> Result<()> { self.put_api_request(path!("queues", vhost, params.name), params) } @@ -635,12 +634,14 @@ where Ok(()) } - /// Declares a RabbitMQ stream with the specified parameters. + /// Declares a [RabbitMQ stream](https://www.rabbitmq.com/docs/streams). /// - /// Streams are a durable, replicated data structure in RabbitMQ designed for + /// Streams are a durable, replicated, long-lived data structure in RabbitMQ designed for /// high-throughput scenarios. Unlike traditional queues, streams are append-only - /// logs that support multiple consumers reading from different offsets. This function - /// creates a new stream or ensures an existing stream has the specified configuration. + /// logs that support multiple consumers reading from different offsets. + /// + /// If the stream already exists with different parameters, this operation may fail + /// unless the parameters are equivalent. pub fn declare_stream(&self, vhost: &str, params: &StreamParams<'_>) -> Result<()> { let mut m: Map = Map::new(); @@ -661,22 +662,20 @@ where Ok(()) } - /// Declares an exchange with the specified parameters. + /// Declares an [exchange](https://www.rabbitmq.com/docs/exchanges). /// - /// Creates a new exchange or ensures an existing exchange has the specified configuration. - /// Exchanges route messages to queues based on routing rules. The exchange type - /// (direct, topic, fanout, headers) determines the routing behavior. If the exchange - /// already exists with incompatible parameters, this operation may fail. + /// If the exchange already exists with different parameters, this operation may fail + /// unless the parameters are equivalent. pub fn declare_exchange(&self, vhost: &str, params: &ExchangeParams) -> Result<()> { self.put_api_request(path!("exchanges", vhost, params.name), params) } - /// Creates a binding between a queue and an exchange. + /// Binds a queue or a stream to an exchange. /// /// Bindings determine how messages published to an exchange are routed to queues. - /// The routing key and arguments specify the conditions under which messages - /// will be delivered to the queue. The meaning of the routing key depends on - /// the exchange type (direct, topic, fanout, headers). + /// The exchange type, routing key and arguments define the routing behavior. + /// + /// Both the source (exchange) and destination (queue or stream) must exist. pub fn bind_queue( &self, vhost: &str, @@ -702,12 +701,14 @@ where Ok(()) } - /// Creates a binding between two exchanges (exchange-to-exchange binding). + /// Bindgs one exchange to another (creates and [exchange-to-exchange binding](https://www.rabbitmq.com/docs/e2e)). /// /// This allows messages published to the source exchange to be forwarded to - /// the destination exchange based on the routing key and binding arguments. + /// /// Exchange-to-exchange bindings enable complex routing topologies and /// message flow patterns. + /// + /// Both source and destination exchanges must exist. pub fn bind_exchange( &self, vhost: &str, @@ -1433,21 +1434,23 @@ where Ok(upstreams) } - /// Lists federation links. + /// Lists [federation](https://www.rabbitmq.com/docs/federation) links (connections) running in the cluster. pub fn list_federation_links(&self) -> Result> { let response = self.http_get("federation-links", None, None)?; let response = response.json()?; Ok(response) } - /// Declares a federation upstream. + /// Creates or updates a [federation](https://www.rabbitmq.com/docs/federation) upstream. + /// + /// Federation upstreams define connection endpoints for federation links (connections that federate + /// queues or exchanges). pub fn declare_federation_upstream(&self, params: FederationUpstreamParams<'_>) -> Result<()> { let runtime_param = RuntimeParameterDefinition::from(params); self.declare_federation_upstream_with_parameters(&runtime_param) } - /// Deletes a federation upstream. pub fn delete_federation_upstream(&self, vhost: &str, name: &str) -> Result<()> { self.clear_runtime_parameter(FEDERATION_UPSTREAM_COMPONENT, vhost, name) } @@ -1456,40 +1459,40 @@ where // Shovels // - /// Lists all Shovel instances across the cluster. - /// - /// Shovels are a RabbitMQ plugin that moves messages between brokers or between - /// different locations within the same broker. This function returns information - /// about all configured Shovel instances, including their status, configuration, - /// and transfer statistics. + /// Lists [shovel](https://www.rabbitmq.com/docs/shovel) across all virtual hosts in the cluster. pub fn list_shovels(&self) -> Result> { let response = self.http_get("shovels", None, None)?; let response = response.json()?; Ok(response) } - /// Lists shovels in a virtual host. + /// Lists [dynamic shovels](https://www.rabbitmq.com/docs/shovel-dynamic) in a specific virtual host. pub fn list_shovels_in(&self, vhost: &str) -> Result> { let response = self.http_get(path!("shovels", vhost), None, None)?; let response = response.json()?; Ok(response) } - /// Declares an AMQP 0.9.1 shovel. + /// Declares [shovel](https://www.rabbitmq.com/docs/shovel) that will use the AMQP 0-9-1 protocol + /// for both source and destination collection. pub fn declare_amqp091_shovel(&self, params: Amqp091ShovelParams<'_>) -> Result<()> { let runtime_param = RuntimeParameterDefinition::from(params); self.declare_shovel_parameter(&runtime_param) } - /// Declares an AMQP 1.0 shovel. + /// Declares [shovel](https://www.rabbitmq.com/docs/shovel) that will use the AMQP 1.0 protocol + /// for both source and destination collection. pub fn declare_amqp10_shovel(&self, params: Amqp10ShovelParams<'_>) -> Result<()> { let runtime_param = RuntimeParameterDefinition::from(params); self.declare_shovel_parameter(&runtime_param) } - /// Deletes a shovel. + /// Deletes a shovel in a specified virtual host. + /// + /// Unless `idempotently` is set to `true`, an attempt to delete a non-existent shovel + /// will fail. pub fn delete_shovel(&self, vhost: &str, name: &str, idempotently: bool) -> Result<()> { let excludes = if idempotently { Some(StatusCode::NOT_FOUND) @@ -1574,7 +1577,9 @@ where Ok(response) } - /// Enables a feature flag. + /// Enables all stable feature flags. + /// This function is idempotent: enabling an already enabled feature flag + /// will succeed. pub fn enable_feature_flag(&self, name: &str) -> Result<()> { let body = serde_json::json!({ "name": name @@ -1584,6 +1589,8 @@ where } /// Enables all stable feature flags. + /// This function is idempotent: enabling an already enabled feature flag + /// will succeed. pub fn enable_all_stable_feature_flags(&self) -> Result<()> { // PUT /api/feature-flags/{name}/enable does not support the special 'all' value like 'rabbitmqctl enable_feature_flag' does. // Thus we do what management UI does: discover the stable disabled flags and enable From a195c214ccff8806099a14d9dbee1f6826a0e00a Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Sun, 17 Aug 2025 17:29:32 -0400 Subject: [PATCH 13/16] Fix a warning produced by 'cargo doc --all-features' --- src/responses.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/responses.rs b/src/responses.rs index 3184635..4d60072 100644 --- a/src/responses.rs +++ b/src/responses.rs @@ -2566,7 +2566,7 @@ pub struct Shovel { pub node: String, pub name: String, /// Dynamic shovels are associated with a virtual host but - /// static ones are not, so for them, the [`vhost`] field + /// static ones are not, so for them, the [`Shovel#vhost`] field /// will be [`None`]. #[cfg_attr(feature = "tabled", tabled(display = "display_option"))] pub vhost: Option, From 6855d43909c92a53da4a02ce6eef79b0e33a6c43 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 18 Aug 2025 19:39:32 -0400 Subject: [PATCH 14/16] Async client: more reference docs --- src/api.rs | 121 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 114 insertions(+), 7 deletions(-) diff --git a/src/api.rs b/src/api.rs index c09221b..75c9a87 100644 --- a/src/api.rs +++ b/src/api.rs @@ -248,27 +248,32 @@ where } /// Lists cluster nodes. + /// See [RabbitMQ Clustering Guide](https://www.rabbitmq.com/docs/clustering) to learn more. pub async fn list_nodes(&self) -> Result> { self.get_api_request("nodes").await } /// Lists virtual hosts in the cluster. + /// See [Virtual Hosts Guide](https://www.rabbitmq.com/docs/vhosts) to learn more. pub async fn list_vhosts(&self) -> Result> { self.get_api_request("vhosts").await } /// Lists users in the internal database. + /// See [Access Control Guide](https://www.rabbitmq.com/docs/access-control) to learn more. pub async fn list_users(&self) -> Result> { self.get_api_request("users").await } - /// Lists users in the internal database that do not have access - /// to any virtual hosts. + /// Lists users in the internal database that do not have access to any virtual hosts. + /// This is useful for finding users that may need permissions granted, or are not used + /// and should be cleaned up. pub async fn list_users_without_permissions(&self) -> Result> { self.get_api_request("users/without-permissions").await } /// Lists all AMQP 1.0 and 0-9-1 client connections across the cluster. + /// See [Connections Guide](https://www.rabbitmq.com/docs/connections) to learn more. pub async fn list_connections(&self) -> Result> { self.get_api_request("connections").await } @@ -351,6 +356,7 @@ where } /// Lists all connections in the given virtual host. + /// See [Connections Guide](https://www.rabbitmq.com/docs/connections) to learn more. pub async fn list_connections_in( &self, virtual_host: &str, @@ -360,6 +366,7 @@ where } /// Lists all connections of a specific user. + /// See [Connection Guide](https://www.rabbitmq.com/docs/connections) to learn more. pub async fn list_user_connections( &self, username: &str, @@ -369,11 +376,13 @@ where } /// Lists all RabbitMQ Stream Protocol client connections across the cluster. + /// See [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more. pub async fn list_stream_connections(&self) -> Result> { self.get_api_request("stream/connections").await } /// Lists RabbitMQ Stream Protocol client connections in the given virtual host. + /// See [Streams Overview](https://www.rabbitmq.com/docs/streams) to learn more. pub async fn list_stream_connections_in( &self, virtual_host: &str, @@ -383,11 +392,13 @@ where } /// Lists all channels across the cluster. + /// See [Channels Guide](https://www.rabbitmq.com/docs/channels) to learn more. pub async fn list_channels(&self) -> Result> { self.get_api_request("channels").await } /// Lists all channels in the given virtual host. + /// See [Channels Guide](https://www.rabbitmq.com/docs/channels) to learn more. pub async fn list_channels_in(&self, virtual_host: &str) -> Result> { self.get_api_request(path!("vhosts", virtual_host, "channels")) .await @@ -415,6 +426,7 @@ where } /// Lists stream publishers publishing to the given stream. + /// Useful for detecting publishers that are publishing to a specific stream. pub async fn list_stream_publishers_of( &self, virtual_host: &str, @@ -432,6 +444,7 @@ where } /// Lists stream publishers on the given stream connection. + /// Use this function for inspecting stream publishers on a specific connection. pub async fn list_stream_publishers_on_connection( &self, virtual_host: &str, @@ -471,6 +484,7 @@ where } /// Lists stream consumers on the given stream connection. + /// Use this function for inspecting stream consumers on a specific connection. pub async fn list_stream_consumers_on_connection( &self, virtual_host: &str, @@ -489,6 +503,7 @@ where } /// Lists all queues and streams across the cluster. + /// See [Queues Guide](https://www.rabbitmq.com/docs/queues) and [Streams Overview](https://www.rabbitmq.com/docs/streams) to learn more. pub async fn list_queues(&self) -> Result> { let response = self.http_get("queues", None, None).await?; let response = response.json().await?; @@ -496,6 +511,7 @@ where } /// Lists all queues and streams in the given virtual host. + /// See [Queues Guide](https://www.rabbitmq.com/docs/queues) and [Streams Overview](https://www.rabbitmq.com/docs/streams) to learn more. pub async fn list_queues_in(&self, virtual_host: &str) -> Result> { let response = self .http_get(path!("queues", virtual_host), None, None) @@ -505,6 +521,7 @@ where } /// Lists all exchanges across the cluster. + /// See [Exchanges Guide](https://www.rabbitmq.com/docs/exchanges) to learn more. pub async fn list_exchanges(&self) -> Result> { let response = self.http_get("exchanges", None, None).await?; let response = response.json().await?; @@ -512,6 +529,7 @@ where } /// Lists all exchanges in the given virtual host. + /// See [Exchanges Guide](https://www.rabbitmq.com/docs/exchanges) to learn more. pub async fn list_exchanges_in( &self, virtual_host: &str, @@ -543,6 +561,7 @@ where } /// Lists all bindings of a specific queue. + /// Use this function for troubleshooting routing of a particular queue. pub async fn list_queue_bindings( &self, virtual_host: &str, @@ -556,6 +575,7 @@ where } /// Lists all bindings of a specific exchange where it is the source. + /// Use this function for troubleshooting routing of a particular exchange. pub async fn list_exchange_bindings_with_source( &self, virtual_host: &str, @@ -570,6 +590,7 @@ where } /// Lists all bindings of a specific exchange where it is the destination. + /// Use this function for troubleshooting routing of a particular exchange. pub async fn list_exchange_bindings_with_destination( &self, virtual_host: &str, @@ -584,6 +605,7 @@ where } /// Lists all consumers across the cluster. + /// See [Consumers Guide](https://www.rabbitmq.com/docs/consumers) to learn more. pub async fn list_consumers(&self) -> Result> { let response = self.http_get("consumers", None, None).await?; let response = response.json().await?; @@ -591,6 +613,7 @@ where } /// Lists all consumers in the given virtual host. + /// See [Consumers Guide](https://www.rabbitmq.com/docs/consumers) to learn more. pub async fn list_consumers_in(&self, virtual_host: &str) -> Result> { let response = self .http_get(path!("consumers", virtual_host), None, None) @@ -600,13 +623,15 @@ where } /// Returns information about a cluster node. + /// See [Clustering Guide](https://www.rabbitmq.com/docs/clustering) to learn more. pub async fn get_node_info(&self, name: &str) -> Result { let response = self.http_get(path!("nodes", name), None, None).await?; let response = response.json().await?; Ok(response) } - /// Returns information about a cluster node. + /// Returns memory usage information for a cluster node. + ///See [Reasoning About Memory Footprint](https://www.rabbitmq.com/docs/memory-use) to learn more pub async fn get_node_memory_footprint( &self, name: &str, @@ -619,6 +644,7 @@ where } /// Returns information about a virtual host. + /// See [Virtual Hosts Guide](https://www.rabbitmq.com/docs/vhosts) to learn more. pub async fn get_vhost(&self, name: &str) -> Result { let response = self.http_get(path!("vhosts", name), None, None).await?; let response = response.json().await?; @@ -626,6 +652,7 @@ where } /// Returns information about a user in the internal database. + /// See [Access Control Guide](https://www.rabbitmq.com/docs/access-control) to learn more. pub async fn get_user(&self, name: &str) -> Result { let response = self.http_get(path!("users", name), None, None).await?; let response = response.json().await?; @@ -633,6 +660,7 @@ where } /// Returns information about a queue or stream. + /// See [Queues Guide](https://www.rabbitmq.com/docs/queues) to learn more. pub async fn get_queue_info( &self, virtual_host: &str, @@ -646,6 +674,7 @@ where } /// Returns information about a stream. + /// See [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more. pub async fn get_stream_info( &self, virtual_host: &str, @@ -655,6 +684,7 @@ where } /// Returns information about an exchange. + /// See [Exchanges Guide](https://www.rabbitmq.com/docs/exchanges) to learn more. pub async fn get_exchange_info( &self, virtual_host: &str, @@ -787,7 +817,7 @@ where Ok(()) } - /// Bindgs one exchange to another (creates and [exchange-to-exchange binding](https://www.rabbitmq.com/docs/e2e)). + /// Bindings one exchange to another (creates an [exchange-to-exchange binding](https://www.rabbitmq.com/docs/e2e)). /// /// This allows messages published to the source exchange to be forwarded to /// @@ -1363,26 +1393,48 @@ where // // Definitions + /// Exports cluster-wide definitions as a JSON document. + /// This includes all virtual hosts, users, permissions, policies, queues, streams, exchanges, bindings, runtime parameters. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub async fn export_cluster_wide_definitions(&self) -> Result { self.export_cluster_wide_definitions_as_string().await } + /// Exports cluster-wide definitions as a JSON document. + /// This includes all virtual hosts, users, permissions, policies, queues, streams, exchanges, bindings, runtime parameters. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub async fn export_cluster_wide_definitions_as_string(&self) -> Result { let response = self.http_get("definitions", None, None).await?; let response = response.text().await?; Ok(response) } + /// Exports cluster-wide definitions as a data structure. + /// This includes all virtual hosts, users, permissions, policies, queues, streams, exchanges, bindings, runtime parameters. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub async fn export_cluster_wide_definitions_as_data(&self) -> Result { let response = self.http_get("definitions", None, None).await?; let response = response.json().await?; Ok(response) } + /// Exports definitions of a single virtual host as a JSON document. + /// This includes the permissions, policies, queues, streams, exchanges, bindings, runtime parameters associated + /// with the given virtual host. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub async fn export_vhost_definitions(&self, vhost: &str) -> Result { self.export_vhost_definitions_as_string(vhost).await } + /// Exports definitions of a single virtual host as a JSON document. + /// This includes the permissions, policies, queues, streams, exchanges, bindings, runtime parameters associated + /// with the given virtual host. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub async fn export_vhost_definitions_as_string(&self, vhost: &str) -> Result { let response = self .http_get(path!("definitions", vhost), None, None) @@ -1391,6 +1443,11 @@ where Ok(response) } + /// Exports definitions of a single virtual host as a data structure. + /// This includes the permissions, policies, queues, streams, exchanges, bindings, runtime parameters associated + /// with the given virtual host. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub async fn export_vhost_definitions_as_data( &self, vhost: &str, @@ -1402,16 +1459,25 @@ where Ok(response) } + /// Imports cluster-wide definitions from a JSON document value. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub async fn import_definitions(&self, definitions: Value) -> Result<()> { self.import_cluster_wide_definitions(definitions).await } + /// Imports cluster-wide definitions from a JSON document value. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub async fn import_cluster_wide_definitions(&self, definitions: Value) -> Result<()> { self.http_post("definitions", &definitions, None, None) .await?; Ok(()) } + /// Imports definitions of a single virtual host from a JSON document value. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub async fn import_vhost_definitions(&self, vhost: &str, definitions: Value) -> Result<()> { self.http_post(path!("definitions", vhost), &definitions, None, None) .await?; @@ -1422,25 +1488,37 @@ where // Health Checks // + /// Performs a cluster-wide health check for any active resource alarms in the cluster. + /// See [Monitoring and Health Checks Guide](https://www.rabbitmq.com/docs/monitoring#health-checks) to learn more. pub async fn health_check_cluster_wide_alarms(&self) -> Result<()> { self.health_check_alarms("health/checks/alarms").await } + /// Performs a health check for alarms on the target node only. + /// See [Monitoring and Health Checks Guide](https://www.rabbitmq.com/docs/monitoring#health-checks) to learn more. pub async fn health_check_local_alarms(&self) -> Result<()> { self.health_check_alarms("health/checks/local-alarms").await } + /// Will fail if target node is critical to the quorum of some quorum queues, streams or the Khepri metadata store. + /// See [Upgrades Guide](https://www.rabbitmq.com/docs/upgrade#maintaining-quorum) to learn more. pub async fn health_check_if_node_is_quorum_critical(&self) -> Result<()> { let path = "health/checks/node-is-quorum-critical"; self.boolean_health_check(path).await } + /// Checks if a specific port has an active listener. + /// See [Monitoring and Health Checks Guide](https://www.rabbitmq.com/docs/monitoring#health-checks) + /// and [Networking Guide](https://www.rabbitmq.com/docs/networking) to learn more. pub async fn health_check_port_listener(&self, port: u16) -> Result<()> { let port_s = port.to_string(); let path = path!("health", "checks", "port-listener", port_s); self.boolean_health_check(&path).await } + /// Checks if a specific protocol listener is active. + /// See [Monitoring and Health Checks Guide](https://www.rabbitmq.com/docs/monitoring#health-checks) + /// and [Networking Guide](https://www.rabbitmq.com/docs/networking) to learn more. pub async fn health_check_protocol_listener(&self, protocol: SupportedProtocol) -> Result<()> { let proto: String = String::from(protocol); let path = path!("health", "checks", "protocol-listener", proto); @@ -1507,6 +1585,8 @@ where .await } + /// Deletes a [federation](https://www.rabbitmq.com/docs/federation) upstream. + /// Deleting an upstream will stop any links connected to it. pub async fn delete_federation_upstream(&self, vhost: &str, name: &str) -> Result<()> { self.clear_runtime_parameter(FEDERATION_UPSTREAM_COMPONENT, vhost, name) .await @@ -1566,6 +1646,8 @@ where // Publish and consume messages // + /// Only use this function in tests and experiments. + /// Always use a messaging or streaming protocol client for publishing in production. pub async fn publish_message( &self, vhost: &str, @@ -1593,6 +1675,8 @@ where Ok(response) } + /// Only use this function in tests and experiments. + /// Always use a messaging or streaming protocol client for consuming in production. pub async fn get_messages( &self, vhost: &str, @@ -1633,16 +1717,18 @@ where // Feature flags // - /// Lists all feature flags. + /// Lists all feature flags and their current states. + /// See [Feature Flags Guide](https://www.rabbitmq.com/docs/feature-flags) to learn more. pub async fn list_feature_flags(&self) -> Result { let response = self.http_get("feature-flags", None, None).await?; let response = response.json().await?; Ok(response) } - /// Enables all stable feature flags. + /// Enables a specific feature flag by name. /// This function is idempotent: enabling an already enabled feature flag /// will succeed. + /// See [Feature Flags Guide](https://www.rabbitmq.com/docs/feature-flags) to learn more. pub async fn enable_feature_flag(&self, name: &str) -> Result<()> { let body = serde_json::json!({ "name": name @@ -1653,9 +1739,10 @@ where Ok(()) } - /// Enables all stable feature flags. + /// Enables all stable feature flags in the cluster. /// This function is idempotent: enabling an already enabled feature flag /// will succeed. + /// See [Feature Flags Guide](https://www.rabbitmq.com/docs/feature-flags) to learn more. pub async fn enable_all_stable_feature_flags(&self) -> Result<()> { // PUT /api/feature-flags/{name}/enable does not support the special 'all' value like 'rabbitmqctl enable_feature_flag' does. // Thus we do what management UI does: discover the stable disabled flags and enable @@ -1681,12 +1768,18 @@ where // Deprecated Features // + /// Lists all deprecated features and their usage status. + /// Deprecated features may be removed in future RabbitMQ versions. + /// See [Deprecated Features Guide](https://www.rabbitmq.com/docs/deprecated-features) to learn more. pub async fn list_all_deprecated_features(&self) -> Result { let response = self.http_get("deprecated-features", None, None).await?; let response = response.json().await?; Ok(response) } + /// Lists deprecated features that are currently being used in the cluster. + /// These features should be migrated away from as soon as possible. + /// See [Deprecated Features Guide](https://www.rabbitmq.com/docs/deprecated-features) to learn more. pub async fn list_deprecated_features_in_use(&self) -> Result { let response = self .http_get("deprecated-features/used", None, None) @@ -1699,6 +1792,8 @@ where // OAuth 2 Configuration // + /// Returns the current OAuth 2.0 configuration for authentication. + /// See [OAuth 2 Guide](https://www.rabbitmq.com/docs/oauth2) to learn more. pub async fn oauth_configuration(&self) -> Result { let response = self.http_get("auth", None, None).await?; let response = response.json().await?; @@ -1710,6 +1805,8 @@ where // Schema Definition Sync (Tanzu RabbitMQ) // + /// Returns the status of schema definition synchronization. + /// Schema definition sync is a Tanzu RabbitMQ-specific feature. pub async fn schema_definition_sync_status( &self, node: Option<&str>, @@ -1726,6 +1823,8 @@ where Ok(response) } + /// Enables schema definition synchronization on a single node or cluster-wide. + /// Schema definition sync is a Tanzu RabbitMQ-specific feature. pub async fn enable_schema_definition_sync_one_node(&self, node: Option<&str>) -> Result<()> { let payload = EmptyPayload::new(); let _ = match node { @@ -1747,6 +1846,8 @@ where Ok(()) } + /// Disables schema definition synchronization on a specific node. + /// Schema definition sync is a Tanzu RabbitMQ-specific feature. pub async fn disable_schema_definition_sync_on_node(&self, node: Option<&str>) -> Result<()> { let _ = match node { Some(val) => { @@ -1762,6 +1863,8 @@ where Ok(()) } + /// Enables schema definition synchronization cluster-wide. + /// Schema definition sync is a Tanzu RabbitMQ-specific feature. pub async fn enable_schema_definition_sync(&self) -> Result<()> { let payload = EmptyPayload::new(); let _ = self @@ -1771,6 +1874,8 @@ where Ok(()) } + /// Disables schema definition synchronization cluster-wide. + /// Schema definition sync is a Tanzu RabbitMQ-specific feature. pub async fn disable_schema_definition_sync(&self) -> Result<()> { let _ = self .http_delete("tanzu/osr/schema/disable-cluster-wide", None, None) @@ -1783,6 +1888,8 @@ where // Warm Standby Replication (Tanzu RabbitMQ) // + /// Returns the status of warm standby replication. + /// Warm Standby Replication is a Tanzu RabbitMQ-specific feature. pub async fn warm_standby_replication_status(&self) -> Result { let response = self .http_get("tanzu/osr/standby/status", None, None) From 5b2d07b9457ad01bf36f3f8c1b5c2260087ce2d2 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 18 Aug 2025 19:45:55 -0400 Subject: [PATCH 15/16] Async client: correct a few doc links --- src/api.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/api.rs b/src/api.rs index 75c9a87..c0492bd 100644 --- a/src/api.rs +++ b/src/api.rs @@ -382,7 +382,7 @@ where } /// Lists RabbitMQ Stream Protocol client connections in the given virtual host. - /// See [Streams Overview](https://www.rabbitmq.com/docs/streams) to learn more. + /// See [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more. pub async fn list_stream_connections_in( &self, virtual_host: &str, @@ -503,7 +503,7 @@ where } /// Lists all queues and streams across the cluster. - /// See [Queues Guide](https://www.rabbitmq.com/docs/queues) and [Streams Overview](https://www.rabbitmq.com/docs/streams) to learn more. + /// See [Queues Guide](https://www.rabbitmq.com/docs/queues) and [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more. pub async fn list_queues(&self) -> Result> { let response = self.http_get("queues", None, None).await?; let response = response.json().await?; @@ -511,7 +511,7 @@ where } /// Lists all queues and streams in the given virtual host. - /// See [Queues Guide](https://www.rabbitmq.com/docs/queues) and [Streams Overview](https://www.rabbitmq.com/docs/streams) to learn more. + /// See [Queues Guide](https://www.rabbitmq.com/docs/queues) and [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more. pub async fn list_queues_in(&self, virtual_host: &str) -> Result> { let response = self .http_get(path!("queues", virtual_host), None, None) From d5b052b863cec25fc76f30bcd6f63fe5dfbb5857 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Mon, 18 Aug 2025 19:49:12 -0400 Subject: [PATCH 16/16] Blocking client: more reference docs --- src/blocking_api.rs | 144 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 112 insertions(+), 32 deletions(-) diff --git a/src/blocking_api.rs b/src/blocking_api.rs index 95f4966..c6f7e3a 100644 --- a/src/blocking_api.rs +++ b/src/blocking_api.rs @@ -249,27 +249,32 @@ where } /// Lists cluster nodes. + /// See [RabbitMQ Clustering Guide](https://www.rabbitmq.com/docs/clustering) to learn more. pub fn list_nodes(&self) -> Result> { self.get_api_request("nodes") } /// Lists virtual hosts in the cluster. + /// See [Virtual Hosts Guide](https://www.rabbitmq.com/docs/vhosts) to learn more. pub fn list_vhosts(&self) -> Result> { self.get_api_request("vhosts") } /// Lists users in the internal database. + /// See [Access Control Guide](https://www.rabbitmq.com/docs/access-control) to learn more. pub fn list_users(&self) -> Result> { self.get_api_request("users") } - /// Lists users in the internal database that do not have access - /// to any virtual hosts. + /// Lists users in the internal database that do not have access to any virtual hosts. + /// This is useful for finding users that may need permissions granted, or are not used + /// and should be cleaned up. pub fn list_users_without_permissions(&self) -> Result> { self.get_api_request("users/without-permissions") } /// Lists all AMQP 1.0 and 0-9-1 client connections across the cluster. + /// See [Connections Guide](https://www.rabbitmq.com/docs/connections) to learn more. pub fn list_connections(&self) -> Result> { self.get_api_request("connections") } @@ -343,21 +348,25 @@ where } /// Lists all connections in the given virtual host. + /// See [Connections Guide](https://www.rabbitmq.com/docs/connections) to learn more. pub fn list_connections_in(&self, virtual_host: &str) -> Result> { self.get_api_request(path!("vhosts", virtual_host, "connections")) } /// Lists all connections of a specific user. + /// See [Connection Guide](https://www.rabbitmq.com/docs/connections) to learn more. pub fn list_user_connections(&self, username: &str) -> Result> { self.get_api_request(path!("connections", "username", username)) } /// Lists all RabbitMQ Stream Protocol client connections across the cluster. + /// See [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more. pub fn list_stream_connections(&self) -> Result> { self.get_api_request("stream/connections") } /// Lists RabbitMQ Stream Protocol client connections in the given virtual host. + /// See [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more. pub fn list_stream_connections_in( &self, virtual_host: &str, @@ -366,11 +375,13 @@ where } /// Lists all channels across the cluster. + /// See [Channels Guide](https://www.rabbitmq.com/docs/channels) to learn more. pub fn list_channels(&self) -> Result> { self.get_api_request("channels") } /// Lists all channels in the given virtual host. + /// See [Channels Guide](https://www.rabbitmq.com/docs/channels) to learn more. pub fn list_channels_in(&self, virtual_host: &str) -> Result> { self.get_api_request(path!("vhosts", virtual_host, "channels")) } @@ -388,7 +399,8 @@ where self.get_api_request(path!("stream", "publishers", virtual_host)) } - /// Lists stream publishers of the given stream. + /// Lists stream publishers publishing to the given stream. + /// Useful for detecting publishers that are publishing to a specific stream. pub fn list_stream_publishers_of( &self, virtual_host: &str, @@ -398,6 +410,7 @@ where } /// Lists stream publishers on the given stream connection. + /// Use this function for inspecting stream publishers on a specific connection. pub fn list_stream_publishers_on_connection( &self, virtual_host: &str, @@ -426,6 +439,7 @@ where } /// Lists stream consumers on the given stream connection. + /// Use this function for inspecting stream consumers on a specific connection. pub fn list_stream_consumers_on_connection( &self, virtual_host: &str, @@ -441,21 +455,25 @@ where } /// Lists all queues and streams across the cluster. + /// See [Queues Guide](https://www.rabbitmq.com/docs/queues) and [Streams Overview](https://www.rabbitmq.com/docs/streams) to learn more. pub fn list_queues(&self) -> Result> { self.get_api_request("queues") } /// Lists all queues and streams in the given virtual host. + /// See [Queues Guide](https://www.rabbitmq.com/docs/queues) and [Streams Overview](https://www.rabbitmq.com/docs/streams) to learn more. pub fn list_queues_in(&self, virtual_host: &str) -> Result> { self.get_api_request(path!("queues", virtual_host)) } /// Lists all exchanges across the cluster. + /// See [Exchanges Guide](https://www.rabbitmq.com/docs/exchanges) to learn more. pub fn list_exchanges(&self) -> Result> { self.get_api_request("exchanges") } /// Lists all exchanges in the given virtual host. + /// See [Exchanges Guide](https://www.rabbitmq.com/docs/exchanges) to learn more. pub fn list_exchanges_in(&self, virtual_host: &str) -> Result> { self.get_api_request(path!("exchanges", virtual_host)) } @@ -471,6 +489,7 @@ where } /// Lists all bindings of a specific queue. + /// Use this function for troubleshooting routing of a particular queue. pub fn list_queue_bindings( &self, virtual_host: &str, @@ -480,6 +499,7 @@ where } /// Lists all bindings of a specific exchange where it is the source. + /// Use this function for troubleshooting routing of a particular exchange. pub fn list_exchange_bindings_with_source( &self, virtual_host: &str, @@ -493,6 +513,7 @@ where } /// Lists all bindings of a specific exchange where it is the destination. + /// Use this function for troubleshooting routing of a particular exchange. pub fn list_exchange_bindings_with_destination( &self, virtual_host: &str, @@ -506,46 +527,55 @@ where } /// Lists all consumers across the cluster. + /// See [Consumers Guide](https://www.rabbitmq.com/docs/consumers) to learn more. pub fn list_consumers(&self) -> Result> { self.get_api_request("consumers") } /// Lists all consumers in the given virtual host. + /// See [Consumers Guide](https://www.rabbitmq.com/docs/consumers) to learn more. pub fn list_consumers_in(&self, virtual_host: &str) -> Result> { self.get_api_request(path!("consumers", virtual_host)) } /// Returns information about a cluster node. + /// See [Clustering Guide](https://www.rabbitmq.com/docs/clustering) to learn more. pub fn get_node_info(&self, name: &str) -> Result { self.get_api_request(path!("nodes", name)) } - /// Returns information about a cluster node. + /// Returns memory usage information for a cluster node. + ///See [Reasoning About Memory Footprint](https://www.rabbitmq.com/docs/memory-use) to learn more pub fn get_node_memory_footprint(&self, name: &str) -> Result { self.get_api_request(path!("nodes", name, "memory")) } /// Returns information about a virtual host. + /// See [Virtual Hosts Guide](https://www.rabbitmq.com/docs/vhosts) to learn more. pub fn get_vhost(&self, name: &str) -> Result { self.get_api_request(path!("vhosts", name)) } /// Returns information about a user in the internal database. + /// See [Access Control Guide](https://www.rabbitmq.com/docs/access-control) to learn more. pub fn get_user(&self, name: &str) -> Result { self.get_api_request(path!("users", name)) } /// Returns information about a queue or stream. + /// See [Queues Guide](https://www.rabbitmq.com/docs/queues) to learn more. pub fn get_queue_info(&self, virtual_host: &str, name: &str) -> Result { self.get_api_request(path!("queues", virtual_host, name)) } /// Returns information about a stream. + /// See [RabbitMQ Streams Guide](https://www.rabbitmq.com/docs/streams) to learn more. pub fn get_stream_info(&self, virtual_host: &str, name: &str) -> Result { self.get_queue_info(virtual_host, name) } /// Returns information about an exchange. + /// See [Exchanges Guide](https://www.rabbitmq.com/docs/exchanges) to learn more. pub fn get_exchange_info( &self, virtual_host: &str, @@ -701,7 +731,7 @@ where Ok(()) } - /// Bindgs one exchange to another (creates and [exchange-to-exchange binding](https://www.rabbitmq.com/docs/e2e)). + /// Bindings one exchange to another (creates an [exchange-to-exchange binding](https://www.rabbitmq.com/docs/e2e)). /// /// This allows messages published to the source exchange to be forwarded to /// @@ -1307,38 +1337,59 @@ where // // Definitions - /// Exports cluster-wide definitions as JSON string. + /// Exports cluster-wide definitions as a JSON document. + /// This includes all virtual hosts, users, permissions, policies, queues, streams, exchanges, bindings, runtime parameters. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub fn export_cluster_wide_definitions(&self) -> Result { self.export_cluster_wide_definitions_as_string() } - /// Exports cluster-wide definitions as JSON string. + /// Exports cluster-wide definitions as a JSON document. + /// This includes all virtual hosts, users, permissions, policies, queues, streams, exchanges, bindings, runtime parameters. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub fn export_cluster_wide_definitions_as_string(&self) -> Result { let response = self.http_get("definitions", None, None)?; let response = response.text()?; Ok(response) } - /// Exports cluster-wide definitions as structured data. + /// Exports cluster-wide definitions as a data structure. + /// This includes all virtual hosts, users, permissions, policies, queues, streams, exchanges, bindings, runtime parameters. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub fn export_cluster_wide_definitions_as_data(&self) -> Result { let response = self.http_get("definitions", None, None)?; let response = response.json()?; Ok(response) } - /// Exports virtual host definitions as JSON string. + /// Exports definitions of a single virtual host as a JSON document. + /// This includes the permissions, policies, queues, streams, exchanges, bindings, runtime parameters associated + /// with the given virtual host. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub fn export_vhost_definitions(&self, vhost: &str) -> Result { self.export_vhost_definitions_as_string(vhost) } - /// Exports virtual host definitions as JSON string. + /// Exports definitions of a single virtual host as a JSON document. + /// This includes the permissions, policies, queues, streams, exchanges, bindings, runtime parameters associated + /// with the given virtual host. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub fn export_vhost_definitions_as_string(&self, vhost: &str) -> Result { let response = self.http_get(path!("definitions", vhost), None, None)?; let response = response.text()?; Ok(response) } - /// Exports virtual host definitions as structured data. + /// Exports definitions of a single virtual host as a data structure. + /// This includes the permissions, policies, queues, streams, exchanges, bindings, runtime parameters associated + /// with the given virtual host. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub fn export_vhost_definitions_as_data( &self, vhost: &str, @@ -1348,18 +1399,24 @@ where Ok(response) } - /// Imports cluster-wide definitions. + /// Imports cluster-wide definitions from a JSON document value. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub fn import_definitions(&self, definitions: Value) -> Result<()> { self.import_cluster_wide_definitions(definitions) } - /// Imports cluster-wide definitions. + /// Imports cluster-wide definitions from a JSON document value. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub fn import_cluster_wide_definitions(&self, definitions: Value) -> Result<()> { self.http_post("definitions", &definitions, None, None)?; Ok(()) } - /// Imports virtual host definitions. + /// Imports definitions of a single virtual host from a JSON document value. + /// + /// See [Definition Export and Import](https://www.rabbitmq.com/docs/definitions) to learn more. pub fn import_vhost_definitions(&self, vhost: &str, definitions: Value) -> Result<()> { self.http_post(path!("definitions", vhost), &definitions, None, None)?; Ok(()) @@ -1369,30 +1426,37 @@ where // Health Checks // - /// Performs a cluster-wide alarms health check. + /// Performs a cluster-wide health check for any active resource alarms in the cluster. + /// See [Monitoring and Health Checks Guide](https://www.rabbitmq.com/docs/monitoring#health-checks) to learn more. pub fn health_check_cluster_wide_alarms(&self) -> Result<()> { self.health_check_alarms("health/checks/alarms") } - /// Performs a local alarms health check. + /// Performs a health check for alarms on the target node only. + /// See [Monitoring and Health Checks Guide](https://www.rabbitmq.com/docs/monitoring#health-checks) to learn more. pub fn health_check_local_alarms(&self) -> Result<()> { self.health_check_alarms("health/checks/local-alarms") } - /// Checks if the node is quorum critical. + /// Will fail if target node is critical to the quorum of some quorum queues, streams or the Khepri metadata store. + /// See [Upgrades Guide](https://www.rabbitmq.com/docs/upgrade#maintaining-quorum) to learn more. pub fn health_check_if_node_is_quorum_critical(&self) -> Result<()> { let path = "health/checks/node-is-quorum-critical"; self.boolean_health_check(path) } - /// Checks if a port listener is active. + /// Checks if a specific port has an active listener. + /// See [Monitoring and Health Checks Guide](https://www.rabbitmq.com/docs/monitoring#health-checks) + /// and [Networking Guide](https://www.rabbitmq.com/docs/networking) to learn more. pub fn health_check_port_listener(&self, port: u16) -> Result<()> { let port_s = port.to_string(); let path = path!("health", "checks", "port-listener", port_s); self.boolean_health_check(&path) } - /// Checks if a protocol listener is active. + /// Checks if a specific protocol listener is active. + /// See [Monitoring and Health Checks Guide](https://www.rabbitmq.com/docs/monitoring#health-checks) + /// and [Networking Guide](https://www.rabbitmq.com/docs/networking) to learn more. pub fn health_check_protocol_listener(&self, protocol: SupportedProtocol) -> Result<()> { let proto: String = String::from(protocol); let path = path!("health", "checks", "protocol-listener", proto); @@ -1421,7 +1485,7 @@ where // Federation // - /// Lists federation upstreams. + /// Lists [federation](https://www.rabbitmq.com/docs/federation) upstreams defined in the cluster. pub fn list_federation_upstreams(&self) -> Result> { let response = self.list_runtime_parameters_of_component(FEDERATION_UPSTREAM_COMPONENT)?; let upstreams = response @@ -1451,6 +1515,8 @@ where self.declare_federation_upstream_with_parameters(&runtime_param) } + /// Deletes a [federation](https://www.rabbitmq.com/docs/federation) upstream. + /// Deleting an upstream will stop any links connected to it. pub fn delete_federation_upstream(&self, vhost: &str, name: &str) -> Result<()> { self.clear_runtime_parameter(FEDERATION_UPSTREAM_COMPONENT, vhost, name) } @@ -1570,16 +1636,18 @@ where // Feature flags // - /// Lists all feature flags. + /// Lists all feature flags and their current states. + /// See [Feature Flags Guide](https://www.rabbitmq.com/docs/feature-flags) to learn more. pub fn list_feature_flags(&self) -> Result { let response = self.http_get("feature-flags", None, None)?; let response = response.json()?; Ok(response) } - /// Enables all stable feature flags. + /// Enables a specific feature flag by name. /// This function is idempotent: enabling an already enabled feature flag /// will succeed. + /// See [Feature Flags Guide](https://www.rabbitmq.com/docs/feature-flags) to learn more. pub fn enable_feature_flag(&self, name: &str) -> Result<()> { let body = serde_json::json!({ "name": name @@ -1588,9 +1656,10 @@ where Ok(()) } - /// Enables all stable feature flags. + /// Enables all stable feature flags in the cluster. /// This function is idempotent: enabling an already enabled feature flag /// will succeed. + /// See [Feature Flags Guide](https://www.rabbitmq.com/docs/feature-flags) to learn more. pub fn enable_all_stable_feature_flags(&self) -> Result<()> { // PUT /api/feature-flags/{name}/enable does not support the special 'all' value like 'rabbitmqctl enable_feature_flag' does. // Thus we do what management UI does: discover the stable disabled flags and enable @@ -1616,14 +1685,18 @@ where // Deprecated Features // - /// Lists all deprecated features. + /// Lists all deprecated features and their usage status. + /// Deprecated features may be removed in future RabbitMQ versions. + /// See [Deprecated Features Guide](https://www.rabbitmq.com/docs/deprecated-features) to learn more. pub fn list_all_deprecated_features(&self) -> Result { let response = self.http_get("deprecated-features", None, None)?; let response = response.json()?; Ok(response) } - /// Lists deprecated features currently in use. + /// Lists deprecated features that are currently being used in the cluster. + /// These features should be migrated away from as soon as possible. + /// See [Deprecated Features Guide](https://www.rabbitmq.com/docs/deprecated-features) to learn more. pub fn list_deprecated_features_in_use(&self) -> Result { let response = self.http_get("deprecated-features/used", None, None)?; let response = response.json()?; @@ -1634,7 +1707,8 @@ where // OAuth 2 Configuration // - /// Gets OAuth 2 configuration. + /// Returns the current OAuth 2.0 configuration for authentication. + /// See [OAuth 2 Guide](https://www.rabbitmq.com/docs/oauth2) to learn more. pub fn oauth_configuration(&self) -> Result { let response = self.http_get("auth", None, None)?; let response = response.json()?; @@ -1646,7 +1720,8 @@ where // Schema Definition Sync (Tanzu RabbitMQ) // - /// Gets schema definition sync status. + /// Returns the status of schema definition synchronization. + /// Schema definition sync is a Tanzu RabbitMQ-specific feature. pub fn schema_definition_sync_status( &self, node: Option<&str>, @@ -1662,7 +1737,8 @@ where Ok(response) } - /// Enables schema definition sync on a node. + /// Enables schema definition synchronization on a single node or cluster-wide. + /// Schema definition sync is a Tanzu RabbitMQ-specific feature. pub fn enable_schema_definition_sync_on_node(&self, node: &str) -> Result<()> { let payload = EmptyPayload::new(); self.http_put( @@ -1675,14 +1751,16 @@ where Ok(()) } - /// Disables schema definition sync on a node. + /// Disables schema definition synchronization on a specific node. + /// Schema definition sync is a Tanzu RabbitMQ-specific feature. pub fn disable_schema_definition_sync_on_node(&self, node: &str) -> Result<()> { self.http_delete(path!("tanzu", "osr", "schema", "disable", node), None, None)?; Ok(()) } - /// Enables schema definition sync cluster-wide. + /// Enables schema definition synchronization cluster-wide. + /// Schema definition sync is a Tanzu RabbitMQ-specific feature. pub fn enable_schema_definition_sync(&self) -> Result<()> { let payload = EmptyPayload::new(); self.http_put( @@ -1695,7 +1773,8 @@ where Ok(()) } - /// Disables schema definition sync cluster-wide. + /// Disables schema definition synchronization cluster-wide. + /// Schema definition sync is a Tanzu RabbitMQ-specific feature. pub fn disable_schema_definition_sync(&self) -> Result<()> { self.http_delete( path!("tanzu", "osr", "schema", "disable-cluster-wide"), @@ -1710,7 +1789,8 @@ where // Warm Standby Replication (Tanzu RabbitMQ) // - /// Gets warm standby replication status. + /// Returns the status of warm standby replication. + /// Warm Standby Replication is a Tanzu RabbitMQ-specific feature. pub fn warm_standby_replication_status(&self) -> Result { let response = self.http_get("tanzu/osr/standby/status", None, None)?; let response = response.json()?;