Skip to content

Commit

Permalink
Rust AMQP modifications for C++ message sender; also added a bunch of…
Browse files Browse the repository at this point in the history
… terminal state functions. (#1839)

* Rust message sender updates for C++
* Added terminal states for claims based security and message sender.
  • Loading branch information
LarryOsterman authored Oct 9, 2024
1 parent 42c5ce8 commit cf8e183
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 50 deletions.
8 changes: 8 additions & 0 deletions sdk/core/azure_core_amqp/src/cbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ pub trait AmqpClaimsBasedSecurityApis {
///
fn attach(&self) -> impl std::future::Future<Output = Result<()>>;

/// Asynchronously detaches the Claims-Based Security (CBS) node from the AMQP session.
/// This method is responsible for cleaning up the AMQP links used for CBS operations.
fn detach(self) -> impl std::future::Future<Output = Result<()>>;

/// Asynchronously authorizes an AMQP path using the provided secret.
///
/// The authorization is valid until the specified `expires_on` time. The path is typically a URI that represents an AMQP resource. The secret is typically a SAS token. The `expires_on` time is the time at which the authorization expires.
Expand Down Expand Up @@ -63,6 +67,7 @@ impl<'a> AmqpClaimsBasedSecurity<'a> {
})
}
}

impl<'a> AmqpClaimsBasedSecurityApis for AmqpClaimsBasedSecurity<'a> {
async fn authorize_path(
&self,
Expand All @@ -77,4 +82,7 @@ impl<'a> AmqpClaimsBasedSecurityApis for AmqpClaimsBasedSecurity<'a> {
async fn attach(&self) -> Result<()> {
self.implementation.attach().await
}
async fn detach(self) -> Result<()> {
self.implementation.detach().await
}
}
14 changes: 13 additions & 1 deletion sdk/core/azure_core_amqp/src/fe2o3/cbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT license.
// cspell:: words amqp servicebus sastoken

use super::error::{AmqpManagement, AmqpManagementAttach};
use super::error::{AmqpLinkDetach, AmqpManagement, AmqpManagementAttach};
use crate::{cbs::AmqpClaimsBasedSecurityApis, session::AmqpSession};
use async_std::sync::Mutex;
use azure_core::error::Result;
Expand Down Expand Up @@ -53,6 +53,18 @@ impl<'a> AmqpClaimsBasedSecurityApis for Fe2o3ClaimsBasedSecurity<'a> {
Ok(())
}

async fn detach(mut self) -> Result<()> {
let cbs = self.cbs.take().ok_or_else(|| {
azure_core::Error::message(
azure_core::error::ErrorKind::Other,
"Claims Based Security was not set.",
)
})?;
let cbs = cbs.into_inner();
cbs.close().await.map_err(AmqpLinkDetach::from)?;
Ok(())
}

async fn authorize_path(
&self,
path: impl Into<String> + Debug,
Expand Down
30 changes: 23 additions & 7 deletions sdk/core/azure_core_amqp/src/fe2o3/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ use crate::session::AmqpSession;
use async_std::sync::Mutex;
use azure_core::Result;
use std::borrow::BorrowMut;
use std::sync::{Arc, OnceLock};
use std::sync::OnceLock;

use super::error::{
AmqpDeliveryRejected, AmqpNotAccepted, AmqpSenderAttach, AmqpSenderSend, Fe2o3AmqpError,
AmqpDeliveryRejected, AmqpLinkDetach, AmqpNotAccepted, AmqpSenderAttach, AmqpSenderSend,
Fe2o3AmqpError,
};

#[derive(Debug, Default)]
pub(crate) struct Fe2o3AmqpSender {
sender: OnceLock<Arc<Mutex<fe2o3_amqp::Sender>>>,
sender: OnceLock<Mutex<fe2o3_amqp::Sender>>,
}

impl AmqpSenderApis for Fe2o3AmqpSender {
Expand Down Expand Up @@ -67,7 +68,7 @@ impl AmqpSenderApis for Fe2o3AmqpSender {
.attach(session.implementation.get()?.lock().await.borrow_mut())
.await
.map_err(AmqpSenderAttach::from)?;
self.sender.set(Arc::new(Mutex::new(sender))).map_err(|_| {
self.sender.set(Mutex::new(sender)).map_err(|_| {
azure_core::Error::message(
azure_core::error::ErrorKind::Other,
"Could not set message sender.",
Expand All @@ -76,7 +77,22 @@ impl AmqpSenderApis for Fe2o3AmqpSender {
Ok(())
}

async fn max_message_size(&self) -> azure_core::Result<Option<u64>> {
async fn detach(mut self) -> Result<()> {
let sender = self.sender.take().ok_or_else(|| {
azure_core::Error::message(
azure_core::error::ErrorKind::Other,
"Message Sender not set.",
)
})?;
sender
.into_inner()
.detach()
.await
.map_err(|e| AmqpLinkDetach::from(e.1))?;
Ok(())
}

fn max_message_size(&self) -> azure_core::Result<Option<u64>> {
Ok(self
.sender
.get()
Expand All @@ -86,8 +102,7 @@ impl AmqpSenderApis for Fe2o3AmqpSender {
"Message Sender not set.",
)
})?
.lock()
.await
.lock_blocking()
.max_message_size())
}

Expand Down Expand Up @@ -124,6 +139,7 @@ impl AmqpSenderApis for Fe2o3AmqpSender {
})?
.lock()
.await
.borrow_mut()
.send(sendable)
.await
.map_err(AmqpSenderSend::from)?;
Expand Down
14 changes: 10 additions & 4 deletions sdk/core/azure_core_amqp/src/fe2o3/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::{
borrow::BorrowMut,
sync::{Arc, OnceLock},
};
use tracing::debug;
use tracing::{debug, trace};

#[derive(Debug, Clone, Default)]
pub(crate) struct Fe2o3AmqpSession {
Expand All @@ -23,7 +23,7 @@ pub(crate) struct Fe2o3AmqpSession {

impl Drop for Fe2o3AmqpSession {
fn drop(&mut self) {
debug!("Dropping Fe2o3AmqpSession.");
debug!("Dropping Fe2o3AmqpSession: {:?}.", self.session);
}
}

Expand Down Expand Up @@ -119,11 +119,17 @@ impl AmqpSessionApis for Fe2o3AmqpSession {
}

async fn end(&self) -> Result<()> {
self.session
let mut session = self
.session
.get()
.ok_or_else(|| Error::message(ErrorKind::Other, "Session Handle was not set"))?
.lock()
.await
.await;
if session.is_ended() {
trace!("Session already ended, returning.");
return Ok(());
}
session
.end()
.await
.map_err(super::error::AmqpSession::from)?;
Expand Down
9 changes: 8 additions & 1 deletion sdk/core/azure_core_amqp/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ impl<'a> AmqpClaimsBasedSecurityApis for NoopAmqpClaimsBasedSecurity<'a> {
async fn attach(&self) -> Result<()> {
unimplemented!();
}
async fn detach(self) -> Result<()> {
unimplemented!();
}
async fn authorize_path(
&self,
path: impl Into<String> + std::fmt::Debug,
Expand Down Expand Up @@ -146,7 +149,11 @@ impl AmqpSenderApis for NoopAmqpSender {
unimplemented!();
}

async fn max_message_size(&self) -> Result<Option<u64>> {
async fn detach(self) -> Result<()> {
unimplemented!();
}

fn max_message_size(&self) -> Result<Option<u64>> {
unimplemented!();
}

Expand Down
65 changes: 40 additions & 25 deletions sdk/core/azure_core_amqp/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type SenderImplementation = super::fe2o3::sender::Fe2o3AmqpSender;
#[cfg(any(not(feature = "fe2o3-amqp"), target_arch = "wasm32"))]
type SenderImplementation = super::noop::NoopAmqpSender;

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct AmqpSenderOptions {
pub(super) sender_settle_mode: Option<SenderSettleMode>,
pub(super) receiver_settle_mode: Option<ReceiverSettleMode>,
Expand All @@ -40,7 +40,8 @@ pub trait AmqpSenderApis {
target: impl Into<AmqpTarget>,
options: Option<AmqpSenderOptions>,
) -> impl std::future::Future<Output = Result<()>>;
fn max_message_size(&self) -> impl std::future::Future<Output = Result<Option<u64>>>;
fn detach(self) -> impl std::future::Future<Output = Result<()>>;
fn max_message_size(&self) -> Result<Option<u64>>;
fn send(
&self,
message: impl Into<AmqpMessage> + std::fmt::Debug,
Expand All @@ -65,8 +66,12 @@ impl AmqpSenderApis for AmqpSender {
.attach(session, name, target, options)
.await
}
async fn max_message_size(&self) -> Result<Option<u64>> {
self.implementation.max_message_size().await
async fn detach(self) -> Result<()> {
self.implementation.detach().await
}

fn max_message_size(&self) -> Result<Option<u64>> {
self.implementation.max_message_size()
}
async fn send(
&self,
Expand All @@ -86,13 +91,13 @@ impl AmqpSender {
}

/// Options for sending an AMQP message.
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct AmqpSendOptions {
/// The message format.
pub(crate) message_format: Option<u32>,
pub message_format: Option<u32>,

/// The message priority.
pub(crate) settled: Option<bool>,
pub settled: Option<bool>,
}

impl AmqpSendOptions {
Expand Down Expand Up @@ -129,6 +134,7 @@ pub mod builders {
}
}

#[derive(Clone)]
pub struct AmqpSenderOptionsBuilder {
options: AmqpSenderOptions,
}
Expand All @@ -139,57 +145,66 @@ pub mod builders {
options: Default::default(),
}
}
#[allow(dead_code)]
pub fn with_sender_settle_mode(mut self, sender_settle_mode: SenderSettleMode) -> Self {

pub fn with_sender_settle_mode(
&mut self,
sender_settle_mode: SenderSettleMode,
) -> &mut Self {
self.options.sender_settle_mode = Some(sender_settle_mode);
self
}
#[allow(dead_code)]

pub fn with_receiver_settle_mode(
mut self,
&mut self,
receiver_settle_mode: ReceiverSettleMode,
) -> Self {
) -> &mut Self {
self.options.receiver_settle_mode = Some(receiver_settle_mode);
self
}
#[allow(dead_code)]
pub fn with_source(mut self, source: impl Into<AmqpSource>) -> Self {

pub fn with_source(&mut self, source: impl Into<AmqpSource>) -> &mut Self {
self.options.source = Some(source.into());
self
}
#[allow(dead_code)]
pub fn with_offered_capabilities(mut self, offered_capabilities: Vec<AmqpSymbol>) -> Self {
pub fn with_offered_capabilities(
&mut self,
offered_capabilities: Vec<AmqpSymbol>,
) -> &mut Self {
self.options.offered_capabilities = Some(offered_capabilities);
self
}
#[allow(dead_code)]
pub fn with_desired_capabilities(mut self, desired_capabilities: Vec<AmqpSymbol>) -> Self {
pub fn with_desired_capabilities(
&mut self,
desired_capabilities: Vec<AmqpSymbol>,
) -> &mut Self {
self.options.desired_capabilities = Some(desired_capabilities);
self
}
#[allow(dead_code)]

pub fn with_properties(
mut self,
&mut self,
properties: impl Into<AmqpOrderedMap<AmqpSymbol, AmqpValue>>,
) -> Self {
) -> &mut Self {
let properties_map: AmqpOrderedMap<AmqpSymbol, AmqpValue> =
properties.into().iter().collect();

self.options.properties = Some(properties_map);
self
}
#[allow(dead_code)]
pub fn with_initial_delivery_count(mut self, initial_delivery_count: u32) -> Self {

pub fn with_initial_delivery_count(&mut self, initial_delivery_count: u32) -> &mut Self {
self.options.initial_delivery_count = Some(initial_delivery_count);
self
}
pub fn with_max_message_size(mut self, max_message_size: u64) -> Self {

pub fn with_max_message_size(&mut self, max_message_size: u64) -> &mut Self {
self.options.max_message_size = Some(max_message_size);
self
}

pub fn build(self) -> AmqpSenderOptions {
self.options
pub fn build(&mut self) -> AmqpSenderOptions {
self.options.clone()
}
}
}
Expand Down
18 changes: 6 additions & 12 deletions sdk/eventhubs/azure_messaging_eventhubs/src/producer/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,12 @@ impl<'a> EventDataBatch<'a> {

pub(crate) async fn attach(&mut self) -> Result<()> {
let sender = self.producer.ensure_sender(self.get_batch_path()).await?;
self.max_size_in_bytes =
sender
.lock()
.await
.max_message_size()
.await?
.ok_or_else(|| {
Error::message(
azure_core::error::ErrorKind::Other,
"No message size available.",
)
})?;
self.max_size_in_bytes = sender.lock().await.max_message_size()?.ok_or_else(|| {
Error::message(
azure_core::error::ErrorKind::Other,
"No message size available.",
)
})?;
Ok(())
}

Expand Down

0 comments on commit cf8e183

Please sign in to comment.