Skip to content

Commit

Permalink
feat(s2n-quic-dc): mtu probing event subscriber (#2374)
Browse files Browse the repository at this point in the history
  • Loading branch information
WesleyRosenblum authored Nov 9, 2024
1 parent 56ce95a commit cb41b35
Show file tree
Hide file tree
Showing 7 changed files with 754 additions and 90 deletions.
3 changes: 3 additions & 0 deletions quic/s2n-quic/src/provider/dc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
//! Provides dc support

mod confirm;
mod mtu_confirm;

use s2n_quic_core::dc::Disabled;

// these imports are only accessible if the unstable feature is enabled
#[allow(unused_imports)]
pub use confirm::ConfirmComplete;
#[allow(unused_imports)]
pub use mtu_confirm::MtuConfirmComplete;
#[allow(unused_imports)]
pub use s2n_quic_core::dc::{ApplicationParams, ConnectionInfo, Endpoint, Path};

pub trait Provider {
Expand Down
117 changes: 117 additions & 0 deletions quic/s2n-quic/src/provider/dc/mtu_confirm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use crate::Connection;
use s2n_quic_core::{
ensure,
event::{
api as events,
api::{ConnectionInfo, ConnectionMeta, MtuUpdated, Subscriber},
},
};
use std::io;
use tokio::sync::watch;

/// `event::Subscriber` used for ensuring an s2n-quic client or server negotiating dc
/// waits for post-handshake MTU probing to complete
pub struct MtuConfirmComplete;

impl MtuConfirmComplete {
/// Blocks the task until the provided connection has either completed MTU probing or closed
pub async fn wait_ready(conn: &mut Connection) -> io::Result<()> {
let mut receiver = conn
.query_event_context_mut(|context: &mut MtuConfirmContext| context.sender.subscribe())
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;

loop {
match &*receiver.borrow_and_update() {
// if we're ready then let the application know
State::Ready => return Ok(()),
State::Waiting => {}
}

if receiver.changed().await.is_err() {
return Err(io::Error::new(
io::ErrorKind::Other,
"never reached terminal state",
));
}
}
}
}

pub struct MtuConfirmContext {
sender: watch::Sender<State>,
}

impl Default for MtuConfirmContext {
fn default() -> Self {
let (sender, _receiver) = watch::channel(State::default());
Self { sender }
}
}

impl MtuConfirmContext {
/// Updates the state on the context
fn update(&mut self, state: State) {
self.sender.send_replace(state);
}
}

impl Drop for MtuConfirmContext {
// make sure the application is notified that we're closing the connection
fn drop(&mut self) {
self.sender.send_modify(|state| {
if matches!(state, State::Waiting) {
*state = State::Ready
}
});
}
}

#[derive(Default)]
enum State {
#[default]
Waiting,
Ready,
}

impl Subscriber for MtuConfirmComplete {
type ConnectionContext = MtuConfirmContext;

#[inline]
fn create_connection_context(
&mut self,
_: &ConnectionMeta,
_info: &ConnectionInfo,
) -> Self::ConnectionContext {
MtuConfirmContext::default()
}

#[inline]
fn on_connection_closed(
&mut self,
context: &mut Self::ConnectionContext,
_meta: &ConnectionMeta,
_event: &events::ConnectionClosed,
) {
ensure!(matches!(*context.sender.borrow(), State::Waiting));

// The connection closed before MTU probing completed
context.update(State::Ready);
}

#[inline]
fn on_mtu_updated(
&mut self,
context: &mut Self::ConnectionContext,
_meta: &ConnectionMeta,
event: &MtuUpdated,
) {
ensure!(matches!(*context.sender.borrow(), State::Waiting));

if event.search_complete {
context.update(State::Ready)
}
}
}
60 changes: 54 additions & 6 deletions quic/s2n-quic/src/tests/dc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use s2n_quic_core::{
crypto::tls,
dc::testing::MockDcEndpoint,
event::{
api::{DatagramDropReason, DcState, EndpointDatagramDropped, EndpointMeta, Subject},
api::{
ConnectionMeta, DatagramDropReason, DcState, EndpointDatagramDropped, EndpointMeta,
MtuUpdated, Subject,
},
metrics::aggregate,
Timestamp,
},
Expand Down Expand Up @@ -318,7 +321,10 @@ fn self_test<S: ServerProviders, C: ClientProviders>(
let metrics = aggregate::testing::Registry::snapshot();

let server_event = (
(dc::ConfirmComplete, metrics.subscriber("server")),
(
(dc::ConfirmComplete, dc::MtuConfirmComplete),
metrics.subscriber("server"),
),
(tracing_events(), server_subscriber),
);

Expand All @@ -344,13 +350,17 @@ fn self_test<S: ServerProviders, C: ClientProviders>(
}
} else {
assert!(result.is_ok());
assert!(dc::MtuConfirmComplete::wait_ready(&mut conn).await.is_ok());
}
}
}
});

let client_event = (
(dc::ConfirmComplete, metrics.subscriber("client")),
(
(dc::ConfirmComplete, dc::MtuConfirmComplete),
metrics.subscriber("client"),
),
(tracing_events(), client_subscriber),
);

Expand Down Expand Up @@ -387,9 +397,10 @@ fn self_test<S: ServerProviders, C: ClientProviders>(
.unwrap()
.clone();
assert_dc_complete(&client_events);
// wait briefly so the ack for the `DC_STATELESS_RESET_TOKENS` frame from the server is sent
// before the client closes the connection. This is only necessary to confirm the `dc::State`
// on the server moves to `DcState::Complete`

assert!(dc::MtuConfirmComplete::wait_ready(&mut conn).await.is_ok());

// wait briefly for MTU probing to complete on the server
delay(Duration::from_millis(100)).await;
}
}
Expand Down Expand Up @@ -449,6 +460,12 @@ fn self_test<S: ServerProviders, C: ClientProviders>(
.duration_since_start()
);

let client_mtu_events = client_events.mtu_updated_events.lock().unwrap().clone();
let server_mtu_events = server_events.mtu_updated_events.lock().unwrap().clone();

assert_mtu_probing_completed(&client_mtu_events);
assert_mtu_probing_completed(&server_mtu_events);

Ok((client_events, server_events))
}

Expand All @@ -466,6 +483,14 @@ fn assert_dc_complete(events: &[DcStateChangedEvent]) {
assert!(matches!(events[2].state, DcState::Complete { .. }));
}

fn assert_mtu_probing_completed(events: &[MtuUpdatedEvent]) {
assert!(!events.is_empty());
let last_event = events.last().unwrap();
assert!(last_event.search_complete);
// 1472 = default MaxMtu (1500) - headers
assert_eq!(1472, last_event.mtu);
}

fn convert_io_result(io_result: std::io::Result<()>) -> Option<connection::Error> {
io_result
.err()?
Expand All @@ -482,9 +507,16 @@ struct DcStateChangedEvent {
state: DcState,
}

#[derive(Clone)]
struct MtuUpdatedEvent {
mtu: u16,
search_complete: bool,
}

#[derive(Clone, Default)]
struct DcRecorder {
pub dc_state_changed_events: Arc<Mutex<Vec<DcStateChangedEvent>>>,
pub mtu_updated_events: Arc<Mutex<Vec<MtuUpdatedEvent>>>,
pub endpoint_datagram_dropped_events: Arc<Mutex<Vec<EndpointDatagramDropped>>>,
}
impl DcRecorder {
Expand Down Expand Up @@ -524,6 +556,22 @@ impl events::Subscriber for DcRecorder {
store(event, &mut buffer);
}

fn on_mtu_updated(
&mut self,
context: &mut Self::ConnectionContext,
_meta: &ConnectionMeta,
event: &MtuUpdated,
) {
let store = |event: &events::MtuUpdated, storage: &mut Vec<MtuUpdatedEvent>| {
storage.push(MtuUpdatedEvent {
mtu: event.mtu,
search_complete: event.search_complete,
});
};
let mut buffer = context.mtu_updated_events.lock().unwrap();
store(event, &mut buffer);
}

fn on_endpoint_datagram_dropped(
&mut self,
_meta: &EndpointMeta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,19 +338,6 @@ measure#recovery_metrics.bytes_in_flight=[REDACTED]
count#recovery_metrics.congestion_limited=false
count#platform_event_loop_sleep=1
timer#platform_event_loop_sleep.processing_duration=1µs
count#connection_closed=1
timer#connection_closed.latency=299.999ms
count#connection_closed.error|CLOSED=1
count#frame_sent=1
count#frame_sent.packet|ONE_RTT=1
count#frame_sent.frame|CONNECTION_CLOSE=1
count#frame_sent=1
count#frame_sent.packet|ONE_RTT=1
count#frame_sent.frame|PADDING=1
count#packet_sent=1
count#packet_sent.kind|ONE_RTT=1
count#packet_sent.bytes.total=[REDACTED]b
measure#packet_sent.bytes=[REDACTED]b
count#platform_event_loop_wakeup=1
count#platform_tx=1
count#platform_tx.packets.total=2
Expand All @@ -364,16 +351,63 @@ measure#platform_tx.errors=0
count#platform_tx.errors.dropped.total=0
measure#platform_tx.errors.dropped=0
count#platform_rx=1
count#platform_rx.packets.total=0
measure#platform_rx.packets=0
count#platform_rx.syscalls.total=1
measure#platform_rx.syscalls=1
count#platform_rx.packets.total=2
measure#platform_rx.packets=2
count#platform_rx.syscalls.total=2
measure#platform_rx.syscalls=2
count#platform_rx.syscalls.blocked.total=1
measure#platform_rx.syscalls.blocked=1
count#platform_rx.errors.total=0
measure#platform_rx.errors=0
count#platform_rx.errors.dropped.total=0
measure#platform_rx.errors.dropped=0
count#datagram_received=1
count#datagram_received.bytes.total=[REDACTED]b
measure#datagram_received.bytes=[REDACTED]b
count#packet_received=1
count#packet_received.kind|ONE_RTT=1
count#connection_id_updated=1
count#frame_received=1
count#frame_received.packet|ONE_RTT=1
count#frame_received.frame|ACK=1
count#ack_range_received=1
count#ack_range_received.packet|ONE_RTT=1
count#mtu_updated=1
measure#mtu_updated.mtu=1472b
count#mtu_updated.cause|PROBE_ACKNOWLEDGED=1
count#mtu_updated.search_complete=true
count#recovery_metrics=1
measure#recovery_metrics.min_rtt=99.999ms
measure#recovery_metrics.smoothed_rtt=99.99923ms
measure#recovery_metrics.latest_rtt=100ms
measure#recovery_metrics.rtt_variance=28.125122ms
measure#recovery_metrics.max_ack_delay=25ms
measure#recovery_metrics.pto_count=0
measure#recovery_metrics.congestion_window=14720
measure#recovery_metrics.bytes_in_flight=[REDACTED]
count#recovery_metrics.congestion_limited=false
count#frame_received=1
count#frame_received.packet|ONE_RTT=1
count#frame_received.frame|RETIRE_CONNECTION_ID=1
count#frame_received=1
count#frame_received.packet|ONE_RTT=1
count#frame_received.frame|PADDING=1
count#datagram_received=1
count#datagram_received.bytes.total=[REDACTED]b
measure#datagram_received.bytes=[REDACTED]b
count#packet_received=1
count#packet_received.kind|ONE_RTT=1
count#frame_received=1
count#frame_received.packet|ONE_RTT=1
count#frame_received.frame|CONNECTION_CLOSE=1
count#connection_closed=1
timer#connection_closed.latency=299.999ms
count#connection_closed.error|CLOSED=1
count#platform_event_loop_sleep=1
timer#platform_event_loop_sleep.processing_duration=1µs
count#platform_event_loop_wakeup=1
count#platform_event_loop_sleep=1
timer#platform_event_loop_sleep.processing_duration=1µs
=== server ===
count#platform_event_loop_started=1
count#platform_event_loop_wakeup=1
Expand Down
Loading

0 comments on commit cb41b35

Please sign in to comment.