From 798f9776a5ca39d43819942330fc4b5823c54af0 Mon Sep 17 00:00:00 2001 From: Larry Osterman Date: Tue, 15 Oct 2024 14:58:26 -0700 Subject: [PATCH] Implement rust management APIs (#1854) * Implement rust management APIs --- sdk/core/azure_core_amqp/src/connection.rs | 32 +++++++----------- .../azure_core_amqp/src/fe2o3/management.rs | 20 +++++++++-- sdk/core/azure_core_amqp/src/management.rs | 4 +++ sdk/core/azure_core_amqp/src/noop.rs | 6 ++-- sdk/core/azure_core_amqp/src/sender.rs | 33 +++++++------------ 5 files changed, 49 insertions(+), 46 deletions(-) diff --git a/sdk/core/azure_core_amqp/src/connection.rs b/sdk/core/azure_core_amqp/src/connection.rs index 898c2a532e..c918e3cb81 100644 --- a/sdk/core/azure_core_amqp/src/connection.rs +++ b/sdk/core/azure_core_amqp/src/connection.rs @@ -30,7 +30,6 @@ impl AmqpConnectionOptions { pub fn builder() -> builders::AmqpConnectionOptionsBuilder { builders::AmqpConnectionOptionsBuilder::new() } - pub fn max_frame_size(&self) -> Option { self.max_frame_size } @@ -124,47 +123,38 @@ pub mod builders { options: Default::default(), } } - pub fn build(&mut self) -> AmqpConnectionOptions { - self.options.clone() + pub fn build(self) -> AmqpConnectionOptions { + self.options } - pub fn with_max_frame_size(&mut self, max_frame_size: u32) -> &mut Self { + pub fn with_max_frame_size(mut self, max_frame_size: u32) -> Self { self.options.max_frame_size = Some(max_frame_size); self } - pub fn with_channel_max(&mut self, channel_max: u16) -> &mut Self { + pub fn with_channel_max(mut self, channel_max: u16) -> Self { self.options.channel_max = Some(channel_max); self } - pub fn with_idle_timeout(&mut self, idle_timeout: Duration) -> &mut Self { + pub fn with_idle_timeout(mut self, idle_timeout: Duration) -> Self { self.options.idle_timeout = Some(idle_timeout); self } - pub fn with_outgoing_locales(&mut self, outgoing_locales: Vec) -> &mut Self { + pub fn with_outgoing_locales(mut self, outgoing_locales: Vec) -> Self { self.options.outgoing_locales = Some(outgoing_locales); self } - pub fn with_incoming_locales(&mut self, incoming_locales: Vec) -> &mut Self { + pub fn with_incoming_locales(mut self, incoming_locales: Vec) -> Self { self.options.incoming_locales = Some(incoming_locales); self } - pub fn with_offered_capabilities( - &mut self, - offered_capabilities: Vec, - ) -> &mut Self { + pub fn with_offered_capabilities(mut self, offered_capabilities: Vec) -> Self { self.options.offered_capabilities = Some(offered_capabilities); self } - pub fn with_desired_capabilities( - &mut self, - desired_capabilities: Vec, - ) -> &mut Self { + pub fn with_desired_capabilities(mut self, desired_capabilities: Vec) -> Self { self.options.desired_capabilities = Some(desired_capabilities); self } - pub fn with_properties( - &mut self, - properties: impl Into>, - ) -> &mut Self + pub fn with_properties(mut self, properties: impl Into>) -> Self where K: Into + Debug + Default + PartialEq, V: Into + Debug + Default, @@ -177,7 +167,7 @@ pub mod builders { self.options.properties = Some(properties_map); self } - pub fn with_buffer_size(&mut self, buffer_size: usize) -> &mut Self { + pub fn with_buffer_size(mut self, buffer_size: usize) -> Self { self.options.buffer_size = Some(buffer_size); self } diff --git a/sdk/core/azure_core_amqp/src/fe2o3/management.rs b/sdk/core/azure_core_amqp/src/fe2o3/management.rs index 1cacaba8b7..775da86113 100644 --- a/sdk/core/azure_core_amqp/src/fe2o3/management.rs +++ b/sdk/core/azure_core_amqp/src/fe2o3/management.rs @@ -11,13 +11,17 @@ use crate::{ }; use async_std::sync::Mutex; -use azure_core::{credentials::AccessToken, error::Result}; +use azure_core::{ + credentials::AccessToken, + error::{ErrorKind, Result}, + Error, +}; use fe2o3_amqp_management::operations::ReadResponse; use fe2o3_amqp_types::{messaging::ApplicationProperties, primitives::SimpleValue}; use std::sync::{Arc, OnceLock}; use tracing::debug; -use super::error::{AmqpManagement, AmqpManagementAttach}; +use super::error::{AmqpLinkDetach, AmqpManagement, AmqpManagementAttach}; #[derive(Debug)] pub(crate) struct Fe2o3AmqpManagement { @@ -67,6 +71,18 @@ impl AmqpManagementApis for Fe2o3AmqpManagement { })?; Ok(()) } + + async fn detach(mut self) -> Result<()> { + // Detach the management client from the session. + let management = self + .management + .take() + .ok_or_else(|| Error::message(ErrorKind::Other, "Unattached management node."))?; + let management = management.into_inner(); + management.close().await.map_err(AmqpLinkDetach::from)?; + Ok(()) + } + async fn call( &self, operation_type: impl Into, diff --git a/sdk/core/azure_core_amqp/src/management.rs b/sdk/core/azure_core_amqp/src/management.rs index ae91373345..450fb34ab1 100644 --- a/sdk/core/azure_core_amqp/src/management.rs +++ b/sdk/core/azure_core_amqp/src/management.rs @@ -17,6 +17,7 @@ type ManagementImplementation = super::noop::NoopAmqpManagement; pub trait AmqpManagementApis { fn attach(&self) -> impl std::future::Future>; + fn detach(self) -> impl std::future::Future>; #[allow(unused_variables)] fn call( @@ -35,6 +36,9 @@ impl AmqpManagementApis for AmqpManagement { async fn attach(&self) -> Result<()> { self.implementation.attach().await } + async fn detach(self) -> Result<()> { + self.implementation.detach().await + } async fn call( &self, operation_type: impl Into, diff --git a/sdk/core/azure_core_amqp/src/noop.rs b/sdk/core/azure_core_amqp/src/noop.rs index db8106736c..cd84163fbd 100644 --- a/sdk/core/azure_core_amqp/src/noop.rs +++ b/sdk/core/azure_core_amqp/src/noop.rs @@ -123,6 +123,10 @@ impl AmqpManagementApis for NoopAmqpManagement { unimplemented!(); } + async fn detach(self) -> Result<()> { + unimplemented!(); + } + async fn call( &self, operation_type: impl Into, @@ -148,11 +152,9 @@ impl AmqpSenderApis for NoopAmqpSender { ) -> Result<()> { unimplemented!(); } - async fn detach(self) -> Result<()> { unimplemented!(); } - fn max_message_size(&self) -> Result> { unimplemented!(); } diff --git a/sdk/core/azure_core_amqp/src/sender.rs b/sdk/core/azure_core_amqp/src/sender.rs index 822fae7c3d..4379480b31 100644 --- a/sdk/core/azure_core_amqp/src/sender.rs +++ b/sdk/core/azure_core_amqp/src/sender.rs @@ -146,46 +146,37 @@ pub mod builders { } } - pub fn with_sender_settle_mode( - &mut self, - sender_settle_mode: SenderSettleMode, - ) -> &mut Self { + pub fn with_sender_settle_mode(mut self, sender_settle_mode: SenderSettleMode) -> Self { self.options.sender_settle_mode = Some(sender_settle_mode); self } pub fn with_receiver_settle_mode( - &mut self, + mut self, receiver_settle_mode: ReceiverSettleMode, - ) -> &mut Self { + ) -> Self { self.options.receiver_settle_mode = Some(receiver_settle_mode); self } - pub fn with_source(&mut self, source: impl Into) -> &mut Self { + pub fn with_source(mut self, source: impl Into) -> Self { self.options.source = Some(source.into()); self } - pub fn with_offered_capabilities( - &mut self, - offered_capabilities: Vec, - ) -> &mut Self { + pub fn with_offered_capabilities(mut self, offered_capabilities: Vec) -> Self { self.options.offered_capabilities = Some(offered_capabilities); self } #[allow(dead_code)] - pub fn with_desired_capabilities( - &mut self, - desired_capabilities: Vec, - ) -> &mut Self { + pub fn with_desired_capabilities(mut self, desired_capabilities: Vec) -> Self { self.options.desired_capabilities = Some(desired_capabilities); self } pub fn with_properties( - &mut self, + mut self, properties: impl Into>, - ) -> &mut Self { + ) -> Self { let properties_map: AmqpOrderedMap = properties.into().iter().collect(); @@ -193,18 +184,18 @@ pub mod builders { self } - pub fn with_initial_delivery_count(&mut self, initial_delivery_count: u32) -> &mut Self { + pub fn with_initial_delivery_count(mut self, initial_delivery_count: u32) -> Self { self.options.initial_delivery_count = Some(initial_delivery_count); self } - pub fn with_max_message_size(&mut self, max_message_size: u64) -> &mut Self { + pub fn with_max_message_size(mut self, max_message_size: u64) -> Self { self.options.max_message_size = Some(max_message_size); self } - pub fn build(&mut self) -> AmqpSenderOptions { - self.options.clone() + pub fn build(self) -> AmqpSenderOptions { + self.options } } }