Skip to content

Commit

Permalink
src: device: Major rework on module structure
Browse files Browse the repository at this point in the history
  • Loading branch information
RaulTrombin committed Aug 6, 2024
1 parent bfc4e34 commit d5756d0
Show file tree
Hide file tree
Showing 5 changed files with 342 additions and 17 deletions.
210 changes: 210 additions & 0 deletions src/device/manager/continuous_mode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
use serde_json::json;
use tracing::{error, trace, warn};
use uuid::Uuid;

use crate::device::{
manager::{Answer, DeviceAnswer, ManagerError},
manager::{DeviceManager, DeviceSelection},
};

impl DeviceManager {
// Call the helpers specifically for each device type
pub async fn continuous_mode_start(
&mut self,
mut subscriber: tokio::sync::broadcast::Receiver<
bluerobotics_ping::message::ProtocolMessage,
>,
device_id: Uuid,
device_type: DeviceSelection,
) -> Option<tokio::task::JoinHandle<()>> {
let raw_handler = match self.get_device_handler(device_id).await {
Ok(handler) => handler.clone(),
Err(err) => {
trace!("Error during start_continuous_mode: Failed to get device handler: {err:?}");
return None;
}
};

let handler = match self.extract_handler(raw_handler) {
Ok(handler) => handler,
Err(err) => {
trace!("Error during start_continuous_mode: Failed to extract handler: {err:?}");
return None;
}
};

match device_type {
DeviceSelection::Ping1D => Some(tokio::spawn(async move {
loop {
match subscriber.recv().await {
Ok(msg) => {
Self::ping1d_continuous_mode_helper(msg, device_id);
}
Err(err) => {
Self::handle_error_continuous_mode(err, device_id);
break;
}
}
}
})),
DeviceSelection::Ping360 => {
Some(tokio::spawn(async move {
let handler = handler.clone();

// Attempt to send the Ping360 request and handle the result
let device_data = match handler
.send(crate::device::devices::PingRequest::Ping360(
crate::device::devices::Ping360Request::DeviceData,
))
.await
{
Ok(response) => match response {
crate::device::devices::PingAnswer::PingMessage(
bluerobotics_ping::Messages::Ping360(
bluerobotics_ping::ping360::Messages::DeviceData(msg),
),
) => msg,
msg => {
error!("Error during start_continuous_mode: unexpected message: {msg:?}");
return;
}
},
Err(err) => {
error!("Error during start_continuous_mode: Device Error: {err:?}");
return;
}
};

loop {
for n in 0..=399 {
// Handle timeout and errors
let result = tokio::time::timeout(
std::time::Duration::from_millis(1000),
handler.send(crate::device::devices::PingRequest::Ping360(
crate::device::devices::Ping360Request::Transducer(
bluerobotics_ping::ping360::TransducerStruct {
mode: device_data.mode,
gain_setting: device_data.gain_setting,
transmit_duration: device_data.transmit_duration,
sample_period: device_data.sample_period,
transmit_frequency: device_data.transmit_frequency,
number_of_samples: device_data.number_of_samples,
angle: n,
transmit: 1,
reserved: 0,
},
),
)),
)
.await;

match result {
Ok(Ok(answer)) => match answer {
crate::device::devices::PingAnswer::PingMessage(msg) => {
Self::ping360_continuous_mode_helper(msg, device_id)
}
msg => {
error!("Error during continuous_mode: Unexpected Message: {msg:?}");
return;
}
},
Ok(Err(err)) => {
error!("Error during continuous_mode: Device Error: {err:?}");
return;
}
Err(_err) => {
warn!("Error during continuous_mode: Answer delayed more than 1 s");
}
}
}
}
}))
}
DeviceSelection::Common | DeviceSelection::Auto => None,
}
}

// Execute some especial commands required for device enter in auto_send mode
pub async fn continuous_mode_startup_routine(
&self,
device_id: Uuid,
device_type: DeviceSelection,
) -> Result<(), ManagerError> {
if device_type == DeviceSelection::Ping1D {
let handler_request = self.get_device_handler(device_id).await?;
let handler = self.extract_handler(handler_request)?;

let id = <bluerobotics_ping::ping1d::ProfileStruct as bluerobotics_ping::message::MessageInfo>::id();
let _ = handler
.send(crate::device::devices::PingRequest::Ping1D(
crate::device::devices::Ping1DRequest::ContinuousStart(
bluerobotics_ping::ping1d::ContinuousStartStruct { id },
),
))
.await
.map_err(|err| {trace!("Something went wrong while executing continuous_mode_startup, details: {err:?}"); ManagerError::DeviceError(err)})?;
}
Ok(())
}

// Execute some especial commands required for device stop auto_send mode
pub async fn continuous_mode_shutdown_routine(
&self,
device_id: Uuid,
device_type: DeviceSelection,
) -> Result<(), ManagerError> {
let handler_request = self.get_device_handler(device_id).await?;
let handler = self.extract_handler(handler_request)?;

if device_type == DeviceSelection::Ping1D {
let id = <bluerobotics_ping::ping1d::ProfileStruct as bluerobotics_ping::message::MessageInfo>::id();
let _ = handler
.send(crate::device::devices::PingRequest::Ping1D(
crate::device::devices::Ping1DRequest::ContinuousStop(
bluerobotics_ping::ping1d::ContinuousStopStruct { id },
),
))
.await
.map_err(|err| {trace!("Something went wrong while executing broadcast_startup_routine, details: {err:?}"); ManagerError::DeviceError(err)})?;
}
Ok(())
}

// An inner helper focused on Ping1D, which uses Profile message to plot graphs
pub fn ping1d_continuous_mode_helper(
msg: bluerobotics_ping::message::ProtocolMessage,
device_id: Uuid,
) {
if msg.message_id == <bluerobotics_ping::ping1d::ProfileStruct as bluerobotics_ping::message::MessageInfo>::id() {
if let Ok(bluerobotics_ping::Messages::Ping1D(bluerobotics_ping::ping1d::Messages::Profile(_answer))) = bluerobotics_ping::Messages::try_from(&msg) {
let answer = Answer::DeviceMessage(DeviceAnswer {
answer: crate::device::devices::PingAnswer::PingMessage(
bluerobotics_ping::Messages::try_from(&msg).unwrap(),
),
device_id,
});
crate::server::protocols::v1::websocket::send_to_websockets(json!(answer), Some(device_id));
}
}
}

// An inner helper focused on Ping360, which uses DeviceData message to plot graphs
pub fn ping360_continuous_mode_helper(msg: bluerobotics_ping::Messages, device_id: Uuid) {
let answer = Answer::DeviceMessage(DeviceAnswer {
answer: crate::device::devices::PingAnswer::PingMessage(msg),
device_id,
});
crate::server::protocols::v1::websocket::send_to_websockets(json!(answer), Some(device_id));
}

// An inner helper that returns error to requester
pub fn handle_error_continuous_mode(
error: tokio::sync::broadcast::error::RecvError,
device_id: Uuid,
) {
let error = ManagerError::DeviceError(crate::device::devices::DeviceError::PingError(
bluerobotics_ping::error::PingError::TokioBroadcastError(error.to_string()),
));
crate::server::protocols::v1::websocket::send_to_websockets(json!(error), Some(device_id));
}
}
110 changes: 110 additions & 0 deletions src/device/manager/device_handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use tracing::{error, trace, warn};
use uuid::Uuid;

use crate::device::{
devices::{self, DeviceActorHandler},
manager::{Answer, Device, DeviceManager, DeviceSelection, DeviceStatus, ManagerError},
};

impl DeviceManager {
pub fn check_device_uuid(&self, device_id: Uuid) -> Result<(), ManagerError> {
if self.device.contains_key(&device_id) {
return Ok(());
}
error!(
"Getting device handler for device: {:?} : Error, device doesn't exist",
device_id
);
Err(ManagerError::DeviceNotExist(device_id))
}

pub fn get_device(&self, device_id: Uuid) -> Result<&Device, ManagerError> {
let device = self
.device
.get(&device_id)
.ok_or(ManagerError::DeviceNotExist(device_id))?;
Ok(device)
}

pub async fn get_device_handler(&self, device_id: Uuid) -> Result<Answer, ManagerError> {
self.check_device_uuid(device_id)?;

trace!(
"Getting device handler for device: {:?} : Success",
device_id
);

// Fail-fast if device is stopped
self.check_device_status(
device_id,
&[DeviceStatus::ContinuousMode, DeviceStatus::Running],
)?;

let handler: DeviceActorHandler = self.get_device(device_id)?.handler.clone();

Ok(Answer::InnerDeviceHandler(handler))
}

pub fn check_device_status(
&self,
device_id: Uuid,
valid_statuses: &[DeviceStatus],
) -> Result<(), ManagerError> {
let status = &self.get_device(device_id)?.status;
if !valid_statuses.contains(status) {
return Err(ManagerError::DeviceStatus(status.clone(), device_id));
}
Ok(())
}

pub fn get_mut_device(&mut self, device_id: Uuid) -> Result<&mut Device, ManagerError> {
let device = self
.device
.get_mut(&device_id)
.ok_or(ManagerError::DeviceNotExist(device_id))?;
Ok(device)
}

pub fn get_device_type(&self, device_id: Uuid) -> Result<DeviceSelection, ManagerError> {
let device_type = self.device.get(&device_id).unwrap().device_type.clone();
Ok(device_type)
}

pub fn extract_handler(
&self,
device_handler: Answer,
) -> Result<DeviceActorHandler, ManagerError> {
match device_handler {
Answer::InnerDeviceHandler(handler) => Ok(handler),
answer => Err(ManagerError::Other(format!(
"Unreachable: extract_handler helper, detail: {answer:?}"
))),
}
}

pub async fn get_subscriber(
&self,
device_id: Uuid,
) -> Result<
tokio::sync::broadcast::Receiver<bluerobotics_ping::message::ProtocolMessage>,
ManagerError,
> {
let handler_request = self.get_device_handler(device_id).await?;
let handler = self.extract_handler(handler_request)?;

let subscriber = handler
.send(devices::PingRequest::GetSubscriber)
.await
.map_err(|err| {
warn!("Something went wrong while executing get_subscriber, details: {err:?}");
ManagerError::DeviceError(err)
})?;

match subscriber {
devices::PingAnswer::Subscriber(subscriber) => Ok(subscriber),
_ => Err(ManagerError::Other(
"Unreachable: get_subscriber helper".to_string(),
)),
}
}
}
13 changes: 8 additions & 5 deletions src/device/manager.rs → src/device/manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/// Specially for DeviceManager to retrieve checks and structures from Devices stored in it's hashmap collection
pub mod continuous_mode;
/// Specially for continuous_mode methods, startup, shutdown, handle and errors routines for each device type
pub mod device_handle;

use paperclip::actix::Apiv2Schema;
use serde::{Deserialize, Serialize};
use std::{
Expand Down Expand Up @@ -194,17 +199,15 @@ impl DeviceManager {
let result = self.continuous_mode(uuid).await;
if let Err(e) = actor_request.respond_to.send(result) {
error!(
"DeviceManager: Failed to return EnableContinuousMode response: {:?}",
e
"DeviceManager: Failed to return EnableContinuousMode response: {e:?}"
);
}
}
Request::DisableContinuousMode(uuid) => {
let result = self.continuous_mode_off(uuid).await;
if let Err(e) = actor_request.respond_to.send(result) {
error!(
"DeviceManager: Failed to return DisableContinuousMode response: {:?}",
e
"DeviceManager: Failed to return DisableContinuousMode response: {e:?}"
);
}
}
Expand Down Expand Up @@ -397,7 +400,7 @@ impl DeviceManager {
Some(device) => {
let device_info = device.info();
drop(device);
trace!("Device delete id {:?}: Success", device_id);
trace!("Device delete id {device_id:?}: Success");
Ok(Answer::DeviceInfo(vec![device_info]))
}
None => {
Expand Down
25 changes: 13 additions & 12 deletions src/device/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
/// The `devices` module defines the pattern for managing devices and their handlers.
/// It includes the `Device` and `DeviceHandler` structures.
///
/// The `DeviceHandler` can forward requests defined in the `PingRequest` enum.
pub mod devices;
pub mod manager;

// The Device module consists of two main modules: devices and manager.
//
// Manager:
// The Manager module includes two primary structures: Manager and its Handler.
// This design allows the Manager to receive and process requests from multiple, distinct threads.
// The ManagerHandler can forward requests defined in the Request enum, creating a Device if necessary.
// If a device is stopped or encounters an error during execution, the user can recover the device and make it available again.
//
// Device:
// Each device follows the same pattern, consisting of a Device and its Handler.
// The DeviceHandler can forward requests defined in the PingRequest enum.
/// The `manager` module provides the `Manager` and `ManagerHandler` structures.
///
/// The `Manager` can handle requests from multiple threads. The `ManagerHandler`
/// is capable of forwarding requests defined in the `Request` enum and can create
/// devices as needed.
///
/// If a device is stopped or encounters an error during execution, it can be recovered
/// and made available again.
pub mod manager;
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use tracing::info;
extern crate lazy_static;

mod cli;
/// The Device module consists of two main modules: devices and manager.
mod device;
mod logger;
mod server;
Expand Down

0 comments on commit d5756d0

Please sign in to comment.