Skip to content

Commit

Permalink
fix(s2n-quic-transport): fix client connection migration when local a…
Browse files Browse the repository at this point in the history
…ddress handle changes
  • Loading branch information
camshaft committed Aug 10, 2023
1 parent 9110f50 commit 8dc45ef
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 2 deletions.
13 changes: 13 additions & 0 deletions quic/s2n-quic-core/src/path/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ pub trait Handle: 'static + Copy + Send + fmt::Debug {
/// Implementations should try to limit the cost of updating by checking the current value to
/// see if it needs updating.
fn maybe_update(&mut self, other: &Self);

/// Updates the local address of the handle in the case where the client experienced a rebind
fn update_local_address(&mut self, other: &Self);
}

macro_rules! impl_addr {
Expand Down Expand Up @@ -195,6 +198,11 @@ impl Handle for RemoteAddress {
fn maybe_update(&mut self, _other: &Self) {
// nothing to update
}

#[inline]
fn update_local_address(&mut self, _other: &Self) {
// we don't keep track of the local address so nothing to do
}
}

#[derive(Clone, Copy, Debug, Eq)]
Expand Down Expand Up @@ -250,6 +258,11 @@ impl Handle for Tuple {
self.local_address = other.local_address;
}
}

#[inline]
fn update_local_address(&mut self, other: &Self) {
self.local_address = other.local_address;
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
Expand Down
5 changes: 5 additions & 0 deletions quic/s2n-quic-core/src/xdp/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,9 @@ impl Handle for Tuple {
*self = *other;
}
}

#[inline]
fn update_local_address(&mut self, other: &Self) {
self.local_address = other.local_address;
}
}
5 changes: 5 additions & 0 deletions quic/s2n-quic-platform/src/message/msg/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,9 @@ impl path::Handle for Handle {
self.local_address = other.local_address;
}
}

#[inline]
fn update_local_address(&mut self, other: &Self) {
self.local_address = other.local_address;
}
}
22 changes: 20 additions & 2 deletions quic/s2n-quic-transport/src/path/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,20 @@ impl<Config: endpoint::Config> Manager<Config> {
) -> Result<(Id, bool), DatagramDropReason> {
let valid_initial_received = self.valid_initial_received();

if let Some((id, path)) = self.path_mut(path_handle) {
for (idx, path) in self.paths.iter_mut().enumerate() {
let is_match = if Config::ENDPOINT_TYPE.is_client() {
// only compare remote addresses in the case of a local rebind
path.handle.remote_address() == path_handle.remote_address()
} else {
path.handle.eq(path_handle)
};

if !is_match {
continue;
}

let id = path_id(idx as _);

let source_cid_changed = datagram.source_connection_id.map_or(false, |scid| {
scid != path.peer_connection_id && valid_initial_received
});
Expand All @@ -261,6 +274,11 @@ impl<Config: endpoint::Config> Manager<Config> {
// update the address if it was resolved
path.handle.maybe_update(path_handle);

// make sure the client is always using the latest local address
if Config::ENDPOINT_TYPE.is_client() {
path.handle.update_local_address(path_handle);
}

let unblocked = path.on_bytes_received(datagram.payload_len);
return Ok((id, unblocked));
}
Expand All @@ -270,7 +288,7 @@ impl<Config: endpoint::Config> Manager<Config> {
//# the client MUST discard these packets.
if Config::ENDPOINT_TYPE.is_client() {
return Err(DatagramDropReason::UnknownServerAddress);
};
}

//= https://www.rfc-editor.org/rfc/rfc9000#section-9
//# The design of QUIC relies on endpoints retaining a stable address
Expand Down
1 change: 1 addition & 0 deletions quic/s2n-quic/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod setup;
use setup::*;

mod blackhole;
mod connection_migration;
mod interceptor;
mod mtu;
mod no_tls;
Expand Down
131 changes: 131 additions & 0 deletions quic/s2n-quic/src/tests/connection_migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use super::*;

fn run_test<F>(mut on_rebind: F)
where
F: FnMut(SocketAddr) -> SocketAddr + Send + 'static,
{
let model = Model::default();
let rtt = Duration::from_millis(10);
let rebind_rate = rtt * 2;
// we currently only support 4 migrations
let rebind_count = 4;

model.set_delay(rtt / 2);

let expected_paths = Arc::new(Mutex::new(vec![]));
let expected_paths_pub = expected_paths.clone();

let on_socket = move |socket: io::Socket| {
spawn(async move {
let mut local_addr = socket.local_addr().unwrap();
for _ in 0..rebind_count {
local_addr = on_rebind(local_addr);
delay(rebind_rate).await;
if let Ok(mut paths) = expected_paths_pub.lock() {
paths.push(local_addr);
}
socket.rebind(local_addr);
}
});
};

let active_paths = recorder::ActivePathUpdated::new();
let active_path_sub = active_paths.clone();

test(model, move |handle| {
let server = Server::builder()
.with_io(handle.builder().build()?)?
.with_tls(SERVER_CERTS)?
.with_event((events(), active_path_sub))?
.start()?;

let client_io = handle.builder().on_socket(on_socket).build()?;

let client = Client::builder()
.with_io(client_io)?
.with_tls(certificates::CERT_PEM)?
.with_event(events())?
.start()?;

let addr = start_server(server)?;
primary::spawn(async move {
let connect = Connect::new(addr).with_server_name("localhost");
let mut conn = client.connect(connect).await.unwrap();
let mut stream = conn.open_bidirectional_stream().await.unwrap();

stream.send(Bytes::from_static(b"A")).await.unwrap();

delay(rebind_rate / 2).await;

for _ in 0..rebind_count {
stream.send(Bytes::from_static(b"B")).await.unwrap();
delay(rebind_rate).await;
}

stream.finish().unwrap();

let chunk = stream
.receive()
.await
.unwrap()
.expect("a chunk should be available");
assert_eq!(&chunk[..], &b"ABBBB"[..]);

assert!(
stream.receive().await.unwrap().is_none(),
"stream should be finished"
);
});

Ok(addr)
})
.unwrap();

assert_eq!(
&*active_paths.events().lock().unwrap(),
&*expected_paths.lock().unwrap()
);
}

/// Rebinds the IP of an address
fn rebind_ip(mut addr: SocketAddr) -> SocketAddr {
let ip = match addr.ip() {
std::net::IpAddr::V4(ip) => {
let mut v = u32::from_be_bytes(ip.octets());
v += 1;
std::net::Ipv4Addr::from(v).into()
}
std::net::IpAddr::V6(ip) => {
let mut v = u128::from_be_bytes(ip.octets());
v += 1;
std::net::Ipv6Addr::from(v).into()
}
};
addr.set_ip(ip);
addr
}

/// Rebinds the port of an address
fn rebind_port(mut addr: SocketAddr) -> SocketAddr {
let port = addr.port() + 1;
addr.set_port(port);
addr
}

#[test]
fn ip_rebind_test() {
run_test(rebind_ip);
}

#[test]
fn port_rebind_test() {
run_test(rebind_port);
}

#[test]
fn ip_and_port_rebind_test() {
run_test(|addr| rebind_ip(rebind_port(addr)));
}
10 changes: 10 additions & 0 deletions quic/s2n-quic/src/tests/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,13 @@ event_recorder!(
HandshakeStatusUpdated,
on_handshake_status_updated
);
event_recorder!(
ActivePathUpdated,
ActivePathUpdated,
on_active_path_updated,
SocketAddr,
|event: &events::ActivePathUpdated, storage: &mut Vec<SocketAddr>| {
let addr = (&event.active.remote_addr).into();
storage.push(addr);
}
);

0 comments on commit 8dc45ef

Please sign in to comment.