Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zenoh driver #82

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

patrickelectric
Copy link
Member

No description provided.

Signed-off-by: Patrick José Pereira <[email protected]>
Signed-off-by: Patrick José Pereira <[email protected]>
Copy link
Member

@joaoantoniocardoso joaoantoniocardoso left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, here are some suggestions based on our current patterns

Comment on lines +210 to +218
result = Zenoh::send_task(&context, session.clone()) => {
if let Err(e) = result {
error!("Error in rest sender task: {e:?}");
}
}
result = Zenoh::receive_task(&context, session) => {
if let Err(e) = result {
error!("Error in rest receive task: {e:?}");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
result = Zenoh::send_task(&context, session.clone()) => {
if let Err(e) = result {
error!("Error in rest sender task: {e:?}");
}
}
result = Zenoh::receive_task(&context, session) => {
if let Err(e) = result {
error!("Error in rest receive task: {e:?}");
}
result = Zenoh::send_task(&context, session.clone()) => {
if let Err(error) = result {
error!("Error in send task: {error:?}");
}
}
result = Zenoh::receive_task(&context, session) => {
if let Err(error) = result {
error!("Error in receive task: {error:?}");
}

Comment on lines +260 to +276
fn cli_example_legacy(&self) -> Vec<String> {
let first_schema = self.valid_schemes()[0];

vec![format!("{first_schema}:<IP>:<PORT>")]
}

fn cli_example_url(&self) -> Vec<String> {
let first_schema = &self.valid_schemes()[0];
vec![format!("{first_schema}://<IP>:<PORT>").to_string()]
}

fn create_endpoint_from_url(&self, url: &url::Url) -> Option<Arc<dyn Driver>> {
println!("{}", &url);
let _host = url.host_str().unwrap();
let _port = url.port().unwrap();
Some(Arc::new(Zenoh::builder("Zenoh").build()))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fn cli_example_legacy(&self) -> Vec<String> {
let first_schema = self.valid_schemes()[0];
vec![format!("{first_schema}:<IP>:<PORT>")]
}
fn cli_example_url(&self) -> Vec<String> {
let first_schema = &self.valid_schemes()[0];
vec![format!("{first_schema}://<IP>:<PORT>").to_string()]
}
fn create_endpoint_from_url(&self, url: &url::Url) -> Option<Arc<dyn Driver>> {
println!("{}", &url);
let _host = url.host_str().unwrap();
let _port = url.port().unwrap();
Some(Arc::new(Zenoh::builder("Zenoh").build()))
}
fn cli_example_legacy(&self) -> Vec<String> {
let first_schema = self.valid_schemes()[0];
vec![format!("{first_schema}:<IP>:<PORT>")]
}
fn cli_example_url(&self) -> Vec<String> {
let first_schema = &self.valid_schemes()[0];
vec![format!("{first_schema}://<IP>:<PORT>").to_string()]
}
fn create_endpoint_from_url(&self, url: &url::Url) -> Option<Arc<dyn Driver>> {
println!("{}", &url);
let _host = url.host_str().unwrap();
let _port = url.port().unwrap();
Some(Arc::new(Zenoh::builder("Zenoh").build()))
}

This won't impede this to be merged, but ideally, the examples should follow how we are using the URL... As in.. we are ignoring host and port, but the example shows :.

@@ -0,0 +1,277 @@
use std::{f64::consts::E, sync::Arc};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use std::{f64::consts::E, sync::Arc};
use std::sync::Arc;

(unused import)

Comment on lines +92 to +119
while let Ok(sample) = subscriber.recv_async().await {
if let Ok(content) = json5::from_str::<MAVLinkMessage<mavlink::ardupilotmega::MavMessage>>(
std::str::from_utf8(&sample.payload().to_bytes()).unwrap(),
) {
let mut message_raw = mavlink::MAVLinkV2MessageRaw::new();
message_raw.serialize_message(content.header, &content.message);
let bus_message = Arc::new(Protocol::new("zenoh", Packet::from(message_raw)));
context.stats.write().await.stats.update_input(&bus_message);

for future in context.on_message_input.call_all(bus_message.clone()) {
if let Err(error) = future.await {
debug!(
"Dropping message: on_message_input callback returned error: {error:?}"
);
continue;
}
}

if let Err(error) = context.hub_sender.send(bus_message) {
error!("Failed to send message to hub: {error:?}");
}
return Ok(());
}
return Err(anyhow::anyhow!(
"Failed to parse message, not a valid MAVLinkMessage: {sample:?}"
));
}
return Err(anyhow::anyhow!("Failed to receive message from zenoh"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while let Ok(sample) = subscriber.recv_async().await {
if let Ok(content) = json5::from_str::<MAVLinkMessage<mavlink::ardupilotmega::MavMessage>>(
std::str::from_utf8(&sample.payload().to_bytes()).unwrap(),
) {
let mut message_raw = mavlink::MAVLinkV2MessageRaw::new();
message_raw.serialize_message(content.header, &content.message);
let bus_message = Arc::new(Protocol::new("zenoh", Packet::from(message_raw)));
context.stats.write().await.stats.update_input(&bus_message);
for future in context.on_message_input.call_all(bus_message.clone()) {
if let Err(error) = future.await {
debug!(
"Dropping message: on_message_input callback returned error: {error:?}"
);
continue;
}
}
if let Err(error) = context.hub_sender.send(bus_message) {
error!("Failed to send message to hub: {error:?}");
}
return Ok(());
}
return Err(anyhow::anyhow!(
"Failed to parse message, not a valid MAVLinkMessage: {sample:?}"
));
}
return Err(anyhow::anyhow!("Failed to receive message from zenoh"));
while let Ok(sample) = subscriber.recv_async().await {
let Ok(content) = json5::from_str::<MAVLinkMessage<mavlink::ardupilotmega::MavMessage>>(
std::str::from_utf8(&sample.payload().to_bytes()).unwrap(),
) else {
debug!("Failed to parse message, not a valid MAVLinkMessage: {sample:?}");
continue;
};
let bus_message = {
let mut message_raw = mavlink::MAVLinkV2MessageRaw::new();
message_raw.serialize_message(content.header, &content.message);
Arc::new(Protocol::new("zenoh", Packet::from(message_raw)))
};
trace!("Received message: {bus_message:?}");
context.stats.write().await.stats.update_input(&bus_message);
for future in context.on_message_input.call_all(bus_message.clone()) {
if let Err(error) = future.await {
debug!("Dropping message: on_message_input callback returned error: {error:?}");
continue;
}
}
if let Err(error) = context.hub_sender.send(bus_message) {
error!("Failed to send message to hub: {error:?}");
continue;
}
trace!("Message sent to hub");
}
debug!("Driver receiver task stopped!");
Ok(())

I believe

Comment on lines +126 to +175
loop {
match hub_receiver.recv().await {
Ok(message) => {
context.stats.write().await.stats.update_output(&message);
for future in context.on_message_output.call_all(message.clone()) {
if let Err(error) = future.await {
debug!("Dropping message: on_message_output callback returned error: {error:?}");
continue;
}
}

let mut bytes =
mavlink::async_peek_reader::AsyncPeekReader::new(message.as_slice());
let Ok((header, message)) = mavlink::read_v2_msg_async::<
mavlink::ardupilotmega::MavMessage,
_,
>(&mut bytes)
.await
else {
continue;
};

let message_name = message.message_name();
let mavlink_message = MAVLinkMessage { header, message };
let json_string = match json5::to_string(&mavlink_message) {
Ok(json) => json,
Err(error) => {
error!(
"Failed to transform mavlink message {message_name} to json: {error:?}"
);
continue;
}
};
let topic_name = "mavlink/out";
if let Err(error) = session.put(topic_name, json_string.clone()).await {
error!("Failed to send message to {topic_name}: {error:?}");
}
let topic_name = format!(
"mavlink/{}/{}/{}",
header.system_id, header.component_id, message_name
);
if let Err(error) = session.put(topic_name.clone(), json_string).await {
error!("Failed to send message to {topic_name}: {error:?}");
}
}
Err(error) => {
error!("Failed to receive message from hub: {error:?}");
}
}
}
Copy link
Member

@joaoantoniocardoso joaoantoniocardoso Nov 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
loop {
match hub_receiver.recv().await {
Ok(message) => {
context.stats.write().await.stats.update_output(&message);
for future in context.on_message_output.call_all(message.clone()) {
if let Err(error) = future.await {
debug!("Dropping message: on_message_output callback returned error: {error:?}");
continue;
}
}
let mut bytes =
mavlink::async_peek_reader::AsyncPeekReader::new(message.as_slice());
let Ok((header, message)) = mavlink::read_v2_msg_async::<
mavlink::ardupilotmega::MavMessage,
_,
>(&mut bytes)
.await
else {
continue;
};
let message_name = message.message_name();
let mavlink_message = MAVLinkMessage { header, message };
let json_string = match json5::to_string(&mavlink_message) {
Ok(json) => json,
Err(error) => {
error!(
"Failed to transform mavlink message {message_name} to json: {error:?}"
);
continue;
}
};
let topic_name = "mavlink/out";
if let Err(error) = session.put(topic_name, json_string.clone()).await {
error!("Failed to send message to {topic_name}: {error:?}");
}
let topic_name = format!(
"mavlink/{}/{}/{}",
header.system_id, header.component_id, message_name
);
if let Err(error) = session.put(topic_name.clone(), json_string).await {
error!("Failed to send message to {topic_name}: {error:?}");
}
}
Err(error) => {
error!("Failed to receive message from hub: {error:?}");
}
}
}
loop {
let message = match hub_receiver.recv().await {
Ok(message) => message,
Err(broadcast::error::RecvError::Closed) => {
error!("Hub channel closed!");
break;
}
Err(broadcast::error::RecvError::Lagged(count)) => {
warn!("Channel lagged by {count} messages.");
continue;
}
};
if message.origin.eq("zenoh") {
continue; // Don't do loopback
}
context.stats.write().await.stats.update_output(&message);
for future in context.on_message_output.call_all(message.clone()) {
if let Err(error) = future.await {
debug!(
"Dropping message: on_message_output callback returned error: {error:?}"
);
continue;
}
}
let mut bytes = mavlink::async_peek_reader::AsyncPeekReader::new(message.as_slice());
let Ok((header, message)) =
mavlink::read_v2_msg_async::<mavlink::ardupilotmega::MavMessage, _>(&mut bytes)
.await
else {
continue;
};
let message_name = message.message_name();
let mavlink_message = MAVLinkMessage { header, message };
let json_string = &match json5::to_string(&mavlink_message) {
Ok(json) => json,
Err(error) => {
error!("Failed to transform mavlink message {message_name} to json: {error:?}");
continue;
}
};
let topic_name = "mavlink/out";
if let Err(error) = session.put(topic_name, json_string).await {
error!("Failed to send message to {topic_name}: {error:?}");
} else {
trace!("Message sent to {topic_name}: {json_string:?}");
}
let topic_name = &format!(
"mavlink/{}/{}/{}",
header.system_id, header.component_id, message_name
);
if let Err(error) = session.put(topic_name, json_string).await {
error!("Failed to send message to {topic_name}: {error:?}");
} else {
trace!("Message sent to {topic_name}: {json_string:?}");
}
}
debug!("Driver sender task stopped!");
Ok(())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants