Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s2n-quic-dc): update MTU on dc path when MTU is updated #2327

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
30 changes: 21 additions & 9 deletions dc/s2n-quic-dc/src/path/secret/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl Map {
state.mark_live(self.state.cleaner.epoch());

let (sealer, credentials) = state.uni_sealer();
Some((sealer, credentials, state.parameters))
Some((sealer, credentials, state.parameters.clone()))
}

pub fn open_once(
Expand All @@ -362,7 +362,7 @@ impl Map {

let keys = state.bidi_local(features);

Some((keys, state.parameters))
Some((keys, state.parameters.clone()))
}

pub fn pair_for_credentials(
Expand All @@ -373,7 +373,7 @@ impl Map {
) -> Option<(Bidirectional, ApplicationParams)> {
let state = self.pre_authentication(credentials, control_out)?;

let params = state.parameters;
let params = state.parameters.clone();
let keys = state.bidi_remote(self.clone(), credentials, features);

Some((keys, params))
Expand Down Expand Up @@ -696,13 +696,15 @@ impl Entry {
secret: schedule::Secret,
sender: sender::State,
receiver: receiver::State,
mut parameters: ApplicationParams,
parameters: ApplicationParams,
rehandshake_time: Duration,
) -> Self {
// clamp max datagram size to a well-known value
parameters.max_datagram_size = parameters
.max_datagram_size
.min(crate::stream::MAX_DATAGRAM_SIZE as _);
let max_datagram_size = parameters.max_datagram_size.load(Ordering::Relaxed);
parameters.max_datagram_size.store(
max_datagram_size.min(crate::stream::MAX_DATAGRAM_SIZE as _),
Ordering::Relaxed,
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call, I could see Clippy catching this someday


assert!(rehandshake_time.as_secs() <= u32::MAX as u64);
Self {
Expand Down Expand Up @@ -941,7 +943,7 @@ impl HandshakingPath {
Self {
peer: connection_info.remote_address.clone().into(),
dc_version: connection_info.dc_version,
parameters: connection_info.application_params,
parameters: connection_info.application_params.clone(),
endpoint_type,
secret: None,
map,
Expand Down Expand Up @@ -1035,12 +1037,22 @@ impl dc::Path for HandshakingPath {
.expect("peer tokens are only received after secrets are ready"),
sender,
receiver,
self.parameters,
self.parameters.clone(),
self.map.state.rehandshake_period,
);
let entry = Arc::new(entry);
self.map.insert(entry);
}

fn on_mtu_updated(&mut self, mtu: u16) {
let peers_guard = self.map.state.peers.guard();
if let Some(entry) = self.map.state.peers.get(&self.peer, &peers_guard) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced this is right. If we detect a new MTU with a new handshake, this is updating a ~random path secret's MTU -- not the one associated with that handshake. IMO we should treat the ApplicationParams as immutable after insertion into the map as much as possible.

(In part, that's because it seems likely that we'll want to have a separate storage area for them and de-duplicate, since they're rather large, and this sort of thing will make that harder since the indirection/pointer then becomes mutable).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is updating a ~random path secret's MTU

it would be a random path secret, but its not a random path. The intention of this is to allow for higher MTUs even when there was some trouble confirming an MTU during the handshake. The MTU probing mechanism we have for the during the handshake is very conservative (to ensure the handshake can complete), if there is a single lost packet before the configured MTU can be confirmed, we drop down to the base MTU. This mechanism allows for us to continue with the more traditional MTU probing after we've already allowed the handshake to complete.

IMO we should treat the ApplicationParams as immutable after insertion into the map

would it be feasible/better to re-insert into the map then?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-inserting is going to be problematic too -- at least currently that would panic! since you're inserting a duplicate path secret ID.

I think we shoudl stick with this PR as-is, probably, but long-term we might want to have a separate(?) map for MTUs -- in particular I think we'd end up clamping to ~1200 byte MTUs or w/e for any in-progress handshakes, which feels a bit unfortunate. It feels true that we probably don't have any good testing in place for dynamically changing MTUs. OTOH, right now MTU is not yet used for anything I think, since it is only needed for dcQUIC streams over UDP, which aren't yet supported.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the MTU value to control TCP packet sizes, so there's a little less efficiency there. But we don't update MTUs mid-stream so only after the MTU probing is complete the flows would inherit the update.

entry
.parameters
.max_datagram_size
.store(mtu, Ordering::Relaxed);
}
}
}

#[cfg(test)]
Expand Down
7 changes: 5 additions & 2 deletions dc/s2n-quic-dc/src/stream/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use s2n_quic_core::{
inet::{ExplicitCongestionNotification, SocketAddress},
varint::VarInt,
};
use std::{io, sync::Arc};
use std::{
io,
sync::{atomic::Ordering, Arc},
};
use tracing::{debug_span, Instrument as _};

type Result<T = (), E = io::Error> = core::result::Result<T, E>;
Expand Down Expand Up @@ -193,7 +196,7 @@ where
let flow = flow::non_blocking::State::new(flow_offset);

let path = send::path::Info {
max_datagram_size: parameters.max_datagram_size,
max_datagram_size: parameters.max_datagram_size.load(Ordering::Relaxed),
send_quantum,
ecn: ExplicitCongestionNotification::Ect0,
next_expected_control_packet: VarInt::ZERO,
Expand Down
7 changes: 5 additions & 2 deletions dc/s2n-quic-dc/src/stream/send/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ use s2n_quic_core::{
varint::VarInt,
};
use slotmap::SlotMap;
use std::collections::{BinaryHeap, VecDeque};
use std::{
collections::{BinaryHeap, VecDeque},
sync::atomic::Ordering,
};
use tracing::{debug, trace};

pub mod probe;
Expand Down Expand Up @@ -118,7 +121,7 @@ pub struct PeerActivity {
impl State {
#[inline]
pub fn new(stream_id: stream::Id, params: &ApplicationParams) -> Self {
let max_datagram_size = params.max_datagram_size;
let max_datagram_size = params.max_datagram_size.load(Ordering::Relaxed);
let initial_max_data = params.remote_max_data;
let local_max_data = params.local_send_max_data;

Expand Down
24 changes: 20 additions & 4 deletions quic/s2n-quic-core/src/dc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use crate::{
transport::parameters::{DcSupportedVersions, InitialFlowControlLimits},
varint::VarInt,
};
use core::time::Duration;
use core::{
sync::atomic::{AtomicU16, Ordering},
time::Duration,
};

mod disabled;
mod traits;
Expand Down Expand Up @@ -91,25 +94,38 @@ impl<'a> DatagramInfo<'a> {
}

/// Various settings relevant to the dc path
#[derive(Clone, Copy, Debug)]
#[derive(Debug)]
#[non_exhaustive]
pub struct ApplicationParams {
pub max_datagram_size: u16,
pub max_datagram_size: AtomicU16,
pub remote_max_data: VarInt,
pub local_send_max_data: VarInt,
pub local_recv_max_data: VarInt,
pub max_idle_timeout: Option<Duration>,
pub max_ack_delay: Duration,
}

impl Clone for ApplicationParams {
fn clone(&self) -> Self {
Self {
max_datagram_size: AtomicU16::new(self.max_datagram_size.load(Ordering::Relaxed)),
remote_max_data: Default::default(),
local_send_max_data: Default::default(),
local_recv_max_data: Default::default(),
max_idle_timeout: None,
max_ack_delay: Default::default(),
}
}
}

impl ApplicationParams {
pub fn new(
max_datagram_size: u16,
peer_flow_control_limits: &InitialFlowControlLimits,
limits: &Limits,
) -> Self {
Self {
max_datagram_size,
max_datagram_size: AtomicU16::new(max_datagram_size),
remote_max_data: peer_flow_control_limits.max_data,
local_send_max_data: limits.initial_stream_limits().max_data_bidi_local,
local_recv_max_data: limits.initial_stream_limits().max_data_bidi_remote,
Expand Down
4 changes: 4 additions & 0 deletions quic/s2n-quic-core/src/dc/disabled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ impl Path for () {
) {
unimplemented!()
}

fn on_mtu_updated(&mut self, _mtu: u16) {
unimplemented!()
}
}
16 changes: 13 additions & 3 deletions quic/s2n-quic-core/src/dc/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
};
use core::time::Duration;
use std::sync::{
atomic::{AtomicU8, Ordering},
atomic::{AtomicU16, AtomicU8, Ordering},
Arc,
};

Expand All @@ -36,14 +36,19 @@ pub struct MockDcPath {
pub on_peer_stateless_reset_tokens_count: u8,
pub stateless_reset_tokens: Vec<stateless_reset::Token>,
pub peer_stateless_reset_tokens: Vec<stateless_reset::Token>,
pub mtu: u16,
}

impl dc::Endpoint for MockDcEndpoint {
type Path = MockDcPath;

fn new_path(&mut self, _connection_info: &ConnectionInfo) -> Option<Self::Path> {
fn new_path(&mut self, connection_info: &ConnectionInfo) -> Option<Self::Path> {
Some(MockDcPath {
stateless_reset_tokens: self.stateless_reset_tokens.clone(),
mtu: connection_info
.application_params
.max_datagram_size
.load(Ordering::Relaxed),
..Default::default()
})
}
Expand Down Expand Up @@ -76,10 +81,15 @@ impl dc::Path for MockDcPath {
self.peer_stateless_reset_tokens
.extend(stateless_reset_tokens);
}

fn on_mtu_updated(&mut self, mtu: u16) {
self.mtu = mtu
}
}

#[allow(clippy::declare_interior_mutable_const)]
pub const TEST_APPLICATION_PARAMS: ApplicationParams = ApplicationParams {
max_datagram_size: 1472,
max_datagram_size: AtomicU16::new(1472),
remote_max_data: VarInt::from_u32(1u32 << 25),
local_send_max_data: VarInt::from_u32(1u32 << 25),
local_recv_max_data: VarInt::from_u32(1u32 << 25),
Expand Down
10 changes: 10 additions & 0 deletions quic/s2n-quic-core/src/dc/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ pub trait Path: 'static + Send {
&mut self,
stateless_reset_tokens: impl Iterator<Item = &'a stateless_reset::Token>,
);

/// Called when the MTU has been updated for the path
fn on_mtu_updated(&mut self, mtu: u16);
}

impl<P: Path> Path for Option<P> {
Expand All @@ -69,4 +72,11 @@ impl<P: Path> Path for Option<P> {
path.on_peer_stateless_reset_tokens(stateless_reset_tokens)
}
}

#[inline]
fn on_mtu_updated(&mut self, max_datagram_size: u16) {
if let Some(path) = self {
path.on_mtu_updated(max_datagram_size)
}
}
}
43 changes: 34 additions & 9 deletions quic/s2n-quic-core/src/path/mtu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,12 @@ impl Builder {
}
}

#[derive(Eq, PartialEq, Debug)]
pub enum MtuResult {
NoChange,
MtuUpdated(u16),
}

#[derive(Clone, Debug)]
pub struct Controller {
state: State,
Expand Down Expand Up @@ -608,7 +614,7 @@ impl Controller {
congestion_controller: &mut CC,
path_id: path::Id,
publisher: &mut Pub,
) {
) -> MtuResult {
if self.state.is_early_search_requested() && sent_bytes > self.base_plpmtu {
if self.is_next_probe_size_above_threshold() {
// Early probing has succeeded, but the max MTU is higher still so
Expand All @@ -629,10 +635,13 @@ impl Controller {
}

// no need to process anything in the disabled state
ensure!(self.state != State::Disabled);
ensure!(self.state != State::Disabled, MtuResult::NoChange);

// MTU probes are only sent in application data space
ensure!(packet_number.space().is_application_data());
ensure!(
packet_number.space().is_application_data(),
MtuResult::NoChange
);

if sent_bytes >= self.plpmtu
&& self
Expand Down Expand Up @@ -671,8 +680,12 @@ impl Controller {
cause: MtuUpdatedCause::ProbeAcknowledged,
search_complete: self.state.is_search_complete(),
});

return MtuResult::MtuUpdated(self.plpmtu);
}
}

MtuResult::NoChange
}

//= https://www.rfc-editor.org/rfc/rfc8899#section-3
Expand All @@ -690,12 +703,13 @@ impl Controller {
congestion_controller: &mut CC,
path_id: path::Id,
publisher: &mut Pub,
) {
) -> MtuResult {
// MTU probes are only sent in the application data space, but since early packet
// spaces will use the `InitialMtu` prior to MTU probing being enabled, we need
// to check for potentially MTU-related packet loss if an early search has been requested
ensure!(
self.state.is_early_search_requested() || packet_number.space().is_application_data()
self.state.is_early_search_requested() || packet_number.space().is_application_data(),
MtuResult::NoChange
);

match &self.state {
Expand Down Expand Up @@ -725,7 +739,9 @@ impl Controller {
mtu: self.plpmtu,
cause: MtuUpdatedCause::InitialMtuPacketLost,
search_complete: self.state.is_search_complete(),
})
});

return MtuResult::MtuUpdated(self.plpmtu);
}
State::Searching(probe_pn, _) if *probe_pn == packet_number => {
// The MTU probe was lost
Expand Down Expand Up @@ -763,10 +779,17 @@ impl Controller {
}

if self.black_hole_counter > BLACK_HOLE_THRESHOLD {
self.on_black_hole_detected(now, congestion_controller, path_id, publisher);
return self.on_black_hole_detected(
now,
congestion_controller,
path_id,
publisher,
);
}
}
}

MtuResult::NoChange
}

/// Gets the currently validated maximum QUIC datagram size
Expand Down Expand Up @@ -837,7 +860,7 @@ impl Controller {
congestion_controller: &mut CC,
path_id: path::Id,
publisher: &mut Pub,
) {
) -> MtuResult {
self.black_hole_counter = Default::default();
self.largest_acked_mtu_sized_packet = None;
// Reset the plpmtu back to the base_plpmtu and notify the congestion controller
Expand All @@ -856,7 +879,9 @@ impl Controller {
mtu: self.plpmtu,
cause: MtuUpdatedCause::Blackhole,
search_complete: self.state.is_search_complete(),
})
});

MtuResult::MtuUpdated(self.plpmtu)
}

/// Arm the PMTU Raise Timer if there is still room to increase the
Expand Down
Loading
Loading