Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-dc): mtu probing event subscriber #2374

Merged
merged 5 commits into from
Nov 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions quic/s2n-quic/src/provider/dc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
//! Provides dc support

mod confirm;
mod mtu_confirm;

use s2n_quic_core::dc::Disabled;

// these imports are only accessible if the unstable feature is enabled
#[allow(unused_imports)]
pub use confirm::ConfirmComplete;
#[allow(unused_imports)]
pub use mtu_confirm::MtuConfirmComplete;
#[allow(unused_imports)]
pub use s2n_quic_core::dc::{ApplicationParams, ConnectionInfo, Endpoint, Path};

pub trait Provider {
Expand Down
117 changes: 117 additions & 0 deletions quic/s2n-quic/src/provider/dc/mtu_confirm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::Connection;
use s2n_quic_core::{
ensure,
event::{
api as events,
api::{ConnectionInfo, ConnectionMeta, MtuUpdated, Subscriber},
},
};
use std::io;
use tokio::sync::watch;

/// `event::Subscriber` used for ensuring an s2n-quic client or server negotiating dc
/// waits for post-handshake MTU probing to complete
pub struct MtuConfirmComplete;

impl MtuConfirmComplete {
/// Blocks the task until the provided connection has either completed MTU probing or closed
pub async fn wait_ready(conn: &mut Connection) -> io::Result<()> {
let mut receiver = conn
.query_event_context_mut(|context: &mut MtuConfirmContext| context.sender.subscribe())
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;

loop {
match &*receiver.borrow_and_update() {
// if we're ready then let the application know
State::Ready => return Ok(()),
State::Waiting => {}
}

if receiver.changed().await.is_err() {
return Err(io::Error::new(
io::ErrorKind::Other,
"never reached terminal state",
));
}
}
}
}

pub struct MtuConfirmContext {
sender: watch::Sender<State>,
}

impl Default for MtuConfirmContext {
fn default() -> Self {
let (sender, _receiver) = watch::channel(State::default());
Self { sender }
}
}

impl MtuConfirmContext {
/// Updates the state on the context
fn update(&mut self, state: State) {
self.sender.send_replace(state);
}
}

impl Drop for MtuConfirmContext {
// make sure the application is notified that we're closing the connection
fn drop(&mut self) {
self.sender.send_modify(|state| {
if matches!(state, State::Waiting) {
*state = State::Ready
}
});
}
}

#[derive(Default)]
enum State {
#[default]
Waiting,
Ready,
}

impl Subscriber for MtuConfirmComplete {
type ConnectionContext = MtuConfirmContext;

#[inline]
fn create_connection_context(
&mut self,
_: &ConnectionMeta,
_info: &ConnectionInfo,
) -> Self::ConnectionContext {
MtuConfirmContext::default()
}

#[inline]
fn on_connection_closed(
&mut self,
context: &mut Self::ConnectionContext,
_meta: &ConnectionMeta,
_event: &events::ConnectionClosed,
) {
ensure!(matches!(*context.sender.borrow(), State::Waiting));

// The connection closed before MTU probing completed
context.update(State::Ready);
}

#[inline]
fn on_mtu_updated(
&mut self,
context: &mut Self::ConnectionContext,
_meta: &ConnectionMeta,
event: &MtuUpdated,
) {
ensure!(matches!(*context.sender.borrow(), State::Waiting));

if event.search_complete {
context.update(State::Ready)
}
}
}
60 changes: 54 additions & 6 deletions quic/s2n-quic/src/tests/dc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use s2n_quic_core::{
crypto::tls,
dc::testing::MockDcEndpoint,
event::{
api::{DatagramDropReason, DcState, EndpointDatagramDropped, EndpointMeta, Subject},
api::{
ConnectionMeta, DatagramDropReason, DcState, EndpointDatagramDropped, EndpointMeta,
MtuUpdated, Subject,
},
metrics::aggregate,
Timestamp,
},
Expand Down Expand Up @@ -318,7 +321,10 @@ fn self_test<S: ServerProviders, C: ClientProviders>(
let metrics = aggregate::testing::Registry::snapshot();

let server_event = (
(dc::ConfirmComplete, metrics.subscriber("server")),
(
(dc::ConfirmComplete, dc::MtuConfirmComplete),
metrics.subscriber("server"),
),
(tracing_events(), server_subscriber),
);

Expand All @@ -344,13 +350,17 @@ fn self_test<S: ServerProviders, C: ClientProviders>(
}
} else {
assert!(result.is_ok());
assert!(dc::MtuConfirmComplete::wait_ready(&mut conn).await.is_ok());
}
}
}
});

let client_event = (
(dc::ConfirmComplete, metrics.subscriber("client")),
(
(dc::ConfirmComplete, dc::MtuConfirmComplete),
metrics.subscriber("client"),
),
(tracing_events(), client_subscriber),
);

Expand Down Expand Up @@ -387,9 +397,10 @@ fn self_test<S: ServerProviders, C: ClientProviders>(
.unwrap()
.clone();
assert_dc_complete(&client_events);
// wait briefly so the ack for the `DC_STATELESS_RESET_TOKENS` frame from the server is sent
// before the client closes the connection. This is only necessary to confirm the `dc::State`
// on the server moves to `DcState::Complete`

assert!(dc::MtuConfirmComplete::wait_ready(&mut conn).await.is_ok());

// wait briefly for MTU probing to complete on the server
delay(Duration::from_millis(100)).await;
}
}
Expand Down Expand Up @@ -449,6 +460,12 @@ fn self_test<S: ServerProviders, C: ClientProviders>(
.duration_since_start()
);

let client_mtu_events = client_events.mtu_updated_events.lock().unwrap().clone();
let server_mtu_events = server_events.mtu_updated_events.lock().unwrap().clone();

assert_mtu_probing_completed(&client_mtu_events);
assert_mtu_probing_completed(&server_mtu_events);

Ok((client_events, server_events))
}

Expand All @@ -466,6 +483,14 @@ fn assert_dc_complete(events: &[DcStateChangedEvent]) {
assert!(matches!(events[2].state, DcState::Complete { .. }));
}

fn assert_mtu_probing_completed(events: &[MtuUpdatedEvent]) {
assert!(!events.is_empty());
let last_event = events.last().unwrap();
assert!(last_event.search_complete);
// 1472 = default MaxMtu (1500) - headers
assert_eq!(1472, last_event.mtu);
}

fn convert_io_result(io_result: std::io::Result<()>) -> Option<connection::Error> {
io_result
.err()?
Expand All @@ -482,9 +507,16 @@ struct DcStateChangedEvent {
state: DcState,
}

#[derive(Clone)]
struct MtuUpdatedEvent {
mtu: u16,
search_complete: bool,
}

#[derive(Clone, Default)]
struct DcRecorder {
pub dc_state_changed_events: Arc<Mutex<Vec<DcStateChangedEvent>>>,
pub mtu_updated_events: Arc<Mutex<Vec<MtuUpdatedEvent>>>,
pub endpoint_datagram_dropped_events: Arc<Mutex<Vec<EndpointDatagramDropped>>>,
}
impl DcRecorder {
Expand Down Expand Up @@ -524,6 +556,22 @@ impl events::Subscriber for DcRecorder {
store(event, &mut buffer);
}

fn on_mtu_updated(
&mut self,
context: &mut Self::ConnectionContext,
_meta: &ConnectionMeta,
event: &MtuUpdated,
) {
let store = |event: &events::MtuUpdated, storage: &mut Vec<MtuUpdatedEvent>| {
storage.push(MtuUpdatedEvent {
mtu: event.mtu,
search_complete: event.search_complete,
});
};
let mut buffer = context.mtu_updated_events.lock().unwrap();
store(event, &mut buffer);
}

fn on_endpoint_datagram_dropped(
&mut self,
_meta: &EndpointMeta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,19 +338,6 @@ measure#recovery_metrics.bytes_in_flight=[REDACTED]
count#recovery_metrics.congestion_limited=false
count#platform_event_loop_sleep=1
timer#platform_event_loop_sleep.processing_duration=1µs
count#connection_closed=1
timer#connection_closed.latency=299.999ms
count#connection_closed.error|CLOSED=1
count#frame_sent=1
count#frame_sent.packet|ONE_RTT=1
count#frame_sent.frame|CONNECTION_CLOSE=1
count#frame_sent=1
count#frame_sent.packet|ONE_RTT=1
count#frame_sent.frame|PADDING=1
count#packet_sent=1
count#packet_sent.kind|ONE_RTT=1
count#packet_sent.bytes.total=[REDACTED]b
measure#packet_sent.bytes=[REDACTED]b
count#platform_event_loop_wakeup=1
count#platform_tx=1
count#platform_tx.packets.total=2
Expand All @@ -364,16 +351,63 @@ measure#platform_tx.errors=0
count#platform_tx.errors.dropped.total=0
measure#platform_tx.errors.dropped=0
count#platform_rx=1
count#platform_rx.packets.total=0
measure#platform_rx.packets=0
count#platform_rx.syscalls.total=1
measure#platform_rx.syscalls=1
count#platform_rx.packets.total=2
measure#platform_rx.packets=2
count#platform_rx.syscalls.total=2
measure#platform_rx.syscalls=2
count#platform_rx.syscalls.blocked.total=1
measure#platform_rx.syscalls.blocked=1
count#platform_rx.errors.total=0
measure#platform_rx.errors=0
count#platform_rx.errors.dropped.total=0
measure#platform_rx.errors.dropped=0
count#datagram_received=1
count#datagram_received.bytes.total=[REDACTED]b
measure#datagram_received.bytes=[REDACTED]b
count#packet_received=1
count#packet_received.kind|ONE_RTT=1
count#connection_id_updated=1
count#frame_received=1
count#frame_received.packet|ONE_RTT=1
count#frame_received.frame|ACK=1
count#ack_range_received=1
count#ack_range_received.packet|ONE_RTT=1
count#mtu_updated=1
measure#mtu_updated.mtu=1472b
count#mtu_updated.cause|PROBE_ACKNOWLEDGED=1
count#mtu_updated.search_complete=true
count#recovery_metrics=1
measure#recovery_metrics.min_rtt=99.999ms
measure#recovery_metrics.smoothed_rtt=99.99923ms
measure#recovery_metrics.latest_rtt=100ms
measure#recovery_metrics.rtt_variance=28.125122ms
measure#recovery_metrics.max_ack_delay=25ms
measure#recovery_metrics.pto_count=0
measure#recovery_metrics.congestion_window=14720
measure#recovery_metrics.bytes_in_flight=[REDACTED]
count#recovery_metrics.congestion_limited=false
count#frame_received=1
count#frame_received.packet|ONE_RTT=1
count#frame_received.frame|RETIRE_CONNECTION_ID=1
count#frame_received=1
count#frame_received.packet|ONE_RTT=1
count#frame_received.frame|PADDING=1
count#datagram_received=1
count#datagram_received.bytes.total=[REDACTED]b
measure#datagram_received.bytes=[REDACTED]b
count#packet_received=1
count#packet_received.kind|ONE_RTT=1
count#frame_received=1
count#frame_received.packet|ONE_RTT=1
count#frame_received.frame|CONNECTION_CLOSE=1
count#connection_closed=1
timer#connection_closed.latency=299.999ms
count#connection_closed.error|CLOSED=1
count#platform_event_loop_sleep=1
timer#platform_event_loop_sleep.processing_duration=1µs
count#platform_event_loop_wakeup=1
count#platform_event_loop_sleep=1
timer#platform_event_loop_sleep.processing_duration=1µs
=== server ===
count#platform_event_loop_started=1
count#platform_event_loop_wakeup=1
Expand Down
Loading
Loading