diff --git a/sdk/core/azure_core_amqp/src/cbs.rs b/sdk/core/azure_core_amqp/src/cbs.rs index 6f3aa579c7..c66fd76918 100644 --- a/sdk/core/azure_core_amqp/src/cbs.rs +++ b/sdk/core/azure_core_amqp/src/cbs.rs @@ -27,6 +27,10 @@ pub trait AmqpClaimsBasedSecurityApis { /// fn attach(&self) -> impl std::future::Future>; + /// Asynchronously detaches the Claims-Based Security (CBS) node from the AMQP session. + /// This method is responsible for cleaning up the AMQP links used for CBS operations. + fn detach(self) -> impl std::future::Future>; + /// Asynchronously authorizes an AMQP path using the provided secret. /// /// The authorization is valid until the specified `expires_on` time. The path is typically a URI that represents an AMQP resource. The secret is typically a SAS token. The `expires_on` time is the time at which the authorization expires. @@ -63,6 +67,7 @@ impl<'a> AmqpClaimsBasedSecurity<'a> { }) } } + impl<'a> AmqpClaimsBasedSecurityApis for AmqpClaimsBasedSecurity<'a> { async fn authorize_path( &self, @@ -77,4 +82,7 @@ impl<'a> AmqpClaimsBasedSecurityApis for AmqpClaimsBasedSecurity<'a> { async fn attach(&self) -> Result<()> { self.implementation.attach().await } + async fn detach(self) -> Result<()> { + self.implementation.detach().await + } } diff --git a/sdk/core/azure_core_amqp/src/fe2o3/cbs.rs b/sdk/core/azure_core_amqp/src/fe2o3/cbs.rs index 722c52d12d..5fcf423360 100644 --- a/sdk/core/azure_core_amqp/src/fe2o3/cbs.rs +++ b/sdk/core/azure_core_amqp/src/fe2o3/cbs.rs @@ -2,7 +2,7 @@ // Licensed under the MIT license. // cspell:: words amqp servicebus sastoken -use super::error::{AmqpManagement, AmqpManagementAttach}; +use super::error::{AmqpLinkDetach, AmqpManagement, AmqpManagementAttach}; use crate::{cbs::AmqpClaimsBasedSecurityApis, session::AmqpSession}; use async_std::sync::Mutex; use azure_core::error::Result; @@ -53,6 +53,18 @@ impl<'a> AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'a> { Ok(()) } + async fn detach(mut self) -> Result<()> { + let cbs = self.cbs.take().ok_or_else(|| { + azure_core::Error::message( + azure_core::error::ErrorKind::Other, + "Claims Based Security was not set.", + ) + })?; + let cbs = cbs.into_inner(); + cbs.close().await.map_err(AmqpLinkDetach::from)?; + Ok(()) + } + async fn authorize_path( &self, path: impl Into + Debug, diff --git a/sdk/core/azure_core_amqp/src/fe2o3/sender.rs b/sdk/core/azure_core_amqp/src/fe2o3/sender.rs index 88323a3af9..09bb880bfe 100644 --- a/sdk/core/azure_core_amqp/src/fe2o3/sender.rs +++ b/sdk/core/azure_core_amqp/src/fe2o3/sender.rs @@ -8,15 +8,16 @@ use crate::session::AmqpSession; use async_std::sync::Mutex; use azure_core::Result; use std::borrow::BorrowMut; -use std::sync::{Arc, OnceLock}; +use std::sync::OnceLock; use super::error::{ - AmqpDeliveryRejected, AmqpNotAccepted, AmqpSenderAttach, AmqpSenderSend, Fe2o3AmqpError, + AmqpDeliveryRejected, AmqpLinkDetach, AmqpNotAccepted, AmqpSenderAttach, AmqpSenderSend, + Fe2o3AmqpError, }; #[derive(Debug, Default)] pub(crate) struct Fe2o3AmqpSender { - sender: OnceLock>>, + sender: OnceLock>, } impl AmqpSenderApis for Fe2o3AmqpSender { @@ -67,7 +68,7 @@ impl AmqpSenderApis for Fe2o3AmqpSender { .attach(session.implementation.get()?.lock().await.borrow_mut()) .await .map_err(AmqpSenderAttach::from)?; - self.sender.set(Arc::new(Mutex::new(sender))).map_err(|_| { + self.sender.set(Mutex::new(sender)).map_err(|_| { azure_core::Error::message( azure_core::error::ErrorKind::Other, "Could not set message sender.", @@ -76,7 +77,22 @@ impl AmqpSenderApis for Fe2o3AmqpSender { Ok(()) } - async fn max_message_size(&self) -> azure_core::Result> { + async fn detach(mut self) -> Result<()> { + let sender = self.sender.take().ok_or_else(|| { + azure_core::Error::message( + azure_core::error::ErrorKind::Other, + "Message Sender not set.", + ) + })?; + sender + .into_inner() + .detach() + .await + .map_err(|e| AmqpLinkDetach::from(e.1))?; + Ok(()) + } + + fn max_message_size(&self) -> azure_core::Result> { Ok(self .sender .get() @@ -86,8 +102,7 @@ impl AmqpSenderApis for Fe2o3AmqpSender { "Message Sender not set.", ) })? - .lock() - .await + .lock_blocking() .max_message_size()) } @@ -124,6 +139,7 @@ impl AmqpSenderApis for Fe2o3AmqpSender { })? .lock() .await + .borrow_mut() .send(sendable) .await .map_err(AmqpSenderSend::from)?; diff --git a/sdk/core/azure_core_amqp/src/fe2o3/session.rs b/sdk/core/azure_core_amqp/src/fe2o3/session.rs index 634b20bf2a..b6f9a27b14 100644 --- a/sdk/core/azure_core_amqp/src/fe2o3/session.rs +++ b/sdk/core/azure_core_amqp/src/fe2o3/session.rs @@ -14,7 +14,7 @@ use std::{ borrow::BorrowMut, sync::{Arc, OnceLock}, }; -use tracing::debug; +use tracing::{debug, trace}; #[derive(Debug, Clone, Default)] pub(crate) struct Fe2o3AmqpSession { @@ -23,7 +23,7 @@ pub(crate) struct Fe2o3AmqpSession { impl Drop for Fe2o3AmqpSession { fn drop(&mut self) { - debug!("Dropping Fe2o3AmqpSession."); + debug!("Dropping Fe2o3AmqpSession: {:?}.", self.session); } } @@ -119,11 +119,17 @@ impl AmqpSessionApis for Fe2o3AmqpSession { } async fn end(&self) -> Result<()> { - self.session + let mut session = self + .session .get() .ok_or_else(|| Error::message(ErrorKind::Other, "Session Handle was not set"))? .lock() - .await + .await; + if session.is_ended() { + trace!("Session already ended, returning."); + return Ok(()); + } + session .end() .await .map_err(super::error::AmqpSession::from)?; diff --git a/sdk/core/azure_core_amqp/src/noop.rs b/sdk/core/azure_core_amqp/src/noop.rs index 3525537e33..db8106736c 100644 --- a/sdk/core/azure_core_amqp/src/noop.rs +++ b/sdk/core/azure_core_amqp/src/noop.rs @@ -96,6 +96,9 @@ impl<'a> AmqpClaimsBasedSecurityApis for NoopAmqpClaimsBasedSecurity<'a> { async fn attach(&self) -> Result<()> { unimplemented!(); } + async fn detach(self) -> Result<()> { + unimplemented!(); + } async fn authorize_path( &self, path: impl Into + std::fmt::Debug, @@ -146,7 +149,11 @@ impl AmqpSenderApis for NoopAmqpSender { unimplemented!(); } - async fn max_message_size(&self) -> Result> { + async fn detach(self) -> Result<()> { + unimplemented!(); + } + + fn max_message_size(&self) -> Result> { unimplemented!(); } diff --git a/sdk/core/azure_core_amqp/src/sender.rs b/sdk/core/azure_core_amqp/src/sender.rs index bd256a18f7..822fae7c3d 100644 --- a/sdk/core/azure_core_amqp/src/sender.rs +++ b/sdk/core/azure_core_amqp/src/sender.rs @@ -14,7 +14,7 @@ type SenderImplementation = super::fe2o3::sender::Fe2o3AmqpSender; #[cfg(any(not(feature = "fe2o3-amqp"), target_arch = "wasm32"))] type SenderImplementation = super::noop::NoopAmqpSender; -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct AmqpSenderOptions { pub(super) sender_settle_mode: Option, pub(super) receiver_settle_mode: Option, @@ -40,7 +40,8 @@ pub trait AmqpSenderApis { target: impl Into, options: Option, ) -> impl std::future::Future>; - fn max_message_size(&self) -> impl std::future::Future>>; + fn detach(self) -> impl std::future::Future>; + fn max_message_size(&self) -> Result>; fn send( &self, message: impl Into + std::fmt::Debug, @@ -65,8 +66,12 @@ impl AmqpSenderApis for AmqpSender { .attach(session, name, target, options) .await } - async fn max_message_size(&self) -> Result> { - self.implementation.max_message_size().await + async fn detach(self) -> Result<()> { + self.implementation.detach().await + } + + fn max_message_size(&self) -> Result> { + self.implementation.max_message_size() } async fn send( &self, @@ -86,13 +91,13 @@ impl AmqpSender { } /// Options for sending an AMQP message. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct AmqpSendOptions { /// The message format. - pub(crate) message_format: Option, + pub message_format: Option, /// The message priority. - pub(crate) settled: Option, + pub settled: Option, } impl AmqpSendOptions { @@ -129,6 +134,7 @@ pub mod builders { } } + #[derive(Clone)] pub struct AmqpSenderOptionsBuilder { options: AmqpSenderOptions, } @@ -139,57 +145,66 @@ pub mod builders { options: Default::default(), } } - #[allow(dead_code)] - pub fn with_sender_settle_mode(mut self, sender_settle_mode: SenderSettleMode) -> Self { + + pub fn with_sender_settle_mode( + &mut self, + sender_settle_mode: SenderSettleMode, + ) -> &mut Self { self.options.sender_settle_mode = Some(sender_settle_mode); self } - #[allow(dead_code)] + pub fn with_receiver_settle_mode( - mut self, + &mut self, receiver_settle_mode: ReceiverSettleMode, - ) -> Self { + ) -> &mut Self { self.options.receiver_settle_mode = Some(receiver_settle_mode); self } - #[allow(dead_code)] - pub fn with_source(mut self, source: impl Into) -> Self { + + pub fn with_source(&mut self, source: impl Into) -> &mut Self { self.options.source = Some(source.into()); self } - #[allow(dead_code)] - pub fn with_offered_capabilities(mut self, offered_capabilities: Vec) -> Self { + pub fn with_offered_capabilities( + &mut self, + offered_capabilities: Vec, + ) -> &mut Self { self.options.offered_capabilities = Some(offered_capabilities); self } #[allow(dead_code)] - pub fn with_desired_capabilities(mut self, desired_capabilities: Vec) -> Self { + pub fn with_desired_capabilities( + &mut self, + desired_capabilities: Vec, + ) -> &mut Self { self.options.desired_capabilities = Some(desired_capabilities); self } - #[allow(dead_code)] + pub fn with_properties( - mut self, + &mut self, properties: impl Into>, - ) -> Self { + ) -> &mut Self { let properties_map: AmqpOrderedMap = properties.into().iter().collect(); self.options.properties = Some(properties_map); self } - #[allow(dead_code)] - pub fn with_initial_delivery_count(mut self, initial_delivery_count: u32) -> Self { + + pub fn with_initial_delivery_count(&mut self, initial_delivery_count: u32) -> &mut Self { self.options.initial_delivery_count = Some(initial_delivery_count); self } - pub fn with_max_message_size(mut self, max_message_size: u64) -> Self { + + pub fn with_max_message_size(&mut self, max_message_size: u64) -> &mut Self { self.options.max_message_size = Some(max_message_size); self } - pub fn build(self) -> AmqpSenderOptions { - self.options + pub fn build(&mut self) -> AmqpSenderOptions { + self.options.clone() } } } diff --git a/sdk/eventhubs/azure_messaging_eventhubs/src/producer/batch.rs b/sdk/eventhubs/azure_messaging_eventhubs/src/producer/batch.rs index edb5133b2c..d5eaad127e 100644 --- a/sdk/eventhubs/azure_messaging_eventhubs/src/producer/batch.rs +++ b/sdk/eventhubs/azure_messaging_eventhubs/src/producer/batch.rs @@ -83,18 +83,12 @@ impl<'a> EventDataBatch<'a> { pub(crate) async fn attach(&mut self) -> Result<()> { let sender = self.producer.ensure_sender(self.get_batch_path()).await?; - self.max_size_in_bytes = - sender - .lock() - .await - .max_message_size() - .await? - .ok_or_else(|| { - Error::message( - azure_core::error::ErrorKind::Other, - "No message size available.", - ) - })?; + self.max_size_in_bytes = sender.lock().await.max_message_size()?.ok_or_else(|| { + Error::message( + azure_core::error::ErrorKind::Other, + "No message size available.", + ) + })?; Ok(()) }