Skip to content

Commit

Permalink
Implement rust management APIs (#1854)
Browse files Browse the repository at this point in the history
* Implement rust management APIs
  • Loading branch information
LarryOsterman authored Oct 15, 2024
1 parent e96116b commit 798f977
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 46 deletions.
32 changes: 11 additions & 21 deletions sdk/core/azure_core_amqp/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ impl AmqpConnectionOptions {
pub fn builder() -> builders::AmqpConnectionOptionsBuilder {
builders::AmqpConnectionOptionsBuilder::new()
}

pub fn max_frame_size(&self) -> Option<u32> {
self.max_frame_size
}
Expand Down Expand Up @@ -124,47 +123,38 @@ pub mod builders {
options: Default::default(),
}
}
pub fn build(&mut self) -> AmqpConnectionOptions {
self.options.clone()
pub fn build(self) -> AmqpConnectionOptions {
self.options
}
pub fn with_max_frame_size(&mut self, max_frame_size: u32) -> &mut Self {
pub fn with_max_frame_size(mut self, max_frame_size: u32) -> Self {
self.options.max_frame_size = Some(max_frame_size);
self
}
pub fn with_channel_max(&mut self, channel_max: u16) -> &mut Self {
pub fn with_channel_max(mut self, channel_max: u16) -> Self {
self.options.channel_max = Some(channel_max);
self
}
pub fn with_idle_timeout(&mut self, idle_timeout: Duration) -> &mut Self {
pub fn with_idle_timeout(mut self, idle_timeout: Duration) -> Self {
self.options.idle_timeout = Some(idle_timeout);
self
}
pub fn with_outgoing_locales(&mut self, outgoing_locales: Vec<String>) -> &mut Self {
pub fn with_outgoing_locales(mut self, outgoing_locales: Vec<String>) -> Self {
self.options.outgoing_locales = Some(outgoing_locales);
self
}
pub fn with_incoming_locales(&mut self, incoming_locales: Vec<String>) -> &mut Self {
pub fn with_incoming_locales(mut self, incoming_locales: Vec<String>) -> Self {
self.options.incoming_locales = Some(incoming_locales);
self
}
pub fn with_offered_capabilities(
&mut self,
offered_capabilities: Vec<AmqpSymbol>,
) -> &mut Self {
pub fn with_offered_capabilities(mut self, offered_capabilities: Vec<AmqpSymbol>) -> Self {
self.options.offered_capabilities = Some(offered_capabilities);
self
}
pub fn with_desired_capabilities(
&mut self,
desired_capabilities: Vec<AmqpSymbol>,
) -> &mut Self {
pub fn with_desired_capabilities(mut self, desired_capabilities: Vec<AmqpSymbol>) -> Self {
self.options.desired_capabilities = Some(desired_capabilities);
self
}
pub fn with_properties<K, V>(
&mut self,
properties: impl Into<AmqpOrderedMap<K, V>>,
) -> &mut Self
pub fn with_properties<K, V>(mut self, properties: impl Into<AmqpOrderedMap<K, V>>) -> Self
where
K: Into<AmqpSymbol> + Debug + Default + PartialEq,
V: Into<AmqpValue> + Debug + Default,
Expand All @@ -177,7 +167,7 @@ pub mod builders {
self.options.properties = Some(properties_map);
self
}
pub fn with_buffer_size(&mut self, buffer_size: usize) -> &mut Self {
pub fn with_buffer_size(mut self, buffer_size: usize) -> Self {
self.options.buffer_size = Some(buffer_size);
self
}
Expand Down
20 changes: 18 additions & 2 deletions sdk/core/azure_core_amqp/src/fe2o3/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ use crate::{
};

use async_std::sync::Mutex;
use azure_core::{credentials::AccessToken, error::Result};
use azure_core::{
credentials::AccessToken,
error::{ErrorKind, Result},
Error,
};
use fe2o3_amqp_management::operations::ReadResponse;
use fe2o3_amqp_types::{messaging::ApplicationProperties, primitives::SimpleValue};
use std::sync::{Arc, OnceLock};
use tracing::debug;

use super::error::{AmqpManagement, AmqpManagementAttach};
use super::error::{AmqpLinkDetach, AmqpManagement, AmqpManagementAttach};

#[derive(Debug)]
pub(crate) struct Fe2o3AmqpManagement {
Expand Down Expand Up @@ -67,6 +71,18 @@ impl AmqpManagementApis for Fe2o3AmqpManagement {
})?;
Ok(())
}

async fn detach(mut self) -> Result<()> {
// Detach the management client from the session.
let management = self
.management
.take()
.ok_or_else(|| Error::message(ErrorKind::Other, "Unattached management node."))?;
let management = management.into_inner();
management.close().await.map_err(AmqpLinkDetach::from)?;
Ok(())
}

async fn call(
&self,
operation_type: impl Into<String>,
Expand Down
4 changes: 4 additions & 0 deletions sdk/core/azure_core_amqp/src/management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type ManagementImplementation = super::noop::NoopAmqpManagement;

pub trait AmqpManagementApis {
fn attach(&self) -> impl std::future::Future<Output = Result<()>>;
fn detach(self) -> impl std::future::Future<Output = Result<()>>;

#[allow(unused_variables)]
fn call(
Expand All @@ -35,6 +36,9 @@ impl AmqpManagementApis for AmqpManagement {
async fn attach(&self) -> Result<()> {
self.implementation.attach().await
}
async fn detach(self) -> Result<()> {
self.implementation.detach().await
}
async fn call(
&self,
operation_type: impl Into<String>,
Expand Down
6 changes: 4 additions & 2 deletions sdk/core/azure_core_amqp/src/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ impl AmqpManagementApis for NoopAmqpManagement {
unimplemented!();
}

async fn detach(self) -> Result<()> {
unimplemented!();
}

async fn call(
&self,
operation_type: impl Into<String>,
Expand All @@ -148,11 +152,9 @@ impl AmqpSenderApis for NoopAmqpSender {
) -> Result<()> {
unimplemented!();
}

async fn detach(self) -> Result<()> {
unimplemented!();
}

fn max_message_size(&self) -> Result<Option<u64>> {
unimplemented!();
}
Expand Down
33 changes: 12 additions & 21 deletions sdk/core/azure_core_amqp/src/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,65 +146,56 @@ pub mod builders {
}
}

pub fn with_sender_settle_mode(
&mut self,
sender_settle_mode: SenderSettleMode,
) -> &mut Self {
pub fn with_sender_settle_mode(mut self, sender_settle_mode: SenderSettleMode) -> Self {
self.options.sender_settle_mode = Some(sender_settle_mode);
self
}

pub fn with_receiver_settle_mode(
&mut self,
mut self,
receiver_settle_mode: ReceiverSettleMode,
) -> &mut Self {
) -> Self {
self.options.receiver_settle_mode = Some(receiver_settle_mode);
self
}

pub fn with_source(&mut self, source: impl Into<AmqpSource>) -> &mut Self {
pub fn with_source(mut self, source: impl Into<AmqpSource>) -> Self {
self.options.source = Some(source.into());
self
}
pub fn with_offered_capabilities(
&mut self,
offered_capabilities: Vec<AmqpSymbol>,
) -> &mut Self {
pub fn with_offered_capabilities(mut self, offered_capabilities: Vec<AmqpSymbol>) -> Self {
self.options.offered_capabilities = Some(offered_capabilities);
self
}
#[allow(dead_code)]
pub fn with_desired_capabilities(
&mut self,
desired_capabilities: Vec<AmqpSymbol>,
) -> &mut Self {
pub fn with_desired_capabilities(mut self, desired_capabilities: Vec<AmqpSymbol>) -> Self {
self.options.desired_capabilities = Some(desired_capabilities);
self
}

pub fn with_properties(
&mut self,
mut self,
properties: impl Into<AmqpOrderedMap<AmqpSymbol, AmqpValue>>,
) -> &mut Self {
) -> Self {
let properties_map: AmqpOrderedMap<AmqpSymbol, AmqpValue> =
properties.into().iter().collect();

self.options.properties = Some(properties_map);
self
}

pub fn with_initial_delivery_count(&mut self, initial_delivery_count: u32) -> &mut Self {
pub fn with_initial_delivery_count(mut self, initial_delivery_count: u32) -> Self {
self.options.initial_delivery_count = Some(initial_delivery_count);
self
}

pub fn with_max_message_size(&mut self, max_message_size: u64) -> &mut Self {
pub fn with_max_message_size(mut self, max_message_size: u64) -> Self {
self.options.max_message_size = Some(max_message_size);
self
}

pub fn build(&mut self) -> AmqpSenderOptions {
self.options.clone()
pub fn build(self) -> AmqpSenderOptions {
self.options
}
}
}
Expand Down

0 comments on commit 798f977

Please sign in to comment.