From de5c33e800ffff14c235ed0ae7d695222f84dcca Mon Sep 17 00:00:00 2001 From: Cameron Bytheway Date: Fri, 3 May 2024 19:27:17 -0600 Subject: [PATCH 1/2] refactor(s2n-quic-core): improve reassembler error handling (#2197) --- quic/s2n-quic-core/src/buffer/reader.rs | 3 + .../src/buffer/reader/storage/bytes.rs | 33 ++- .../src/buffer/reader/testing.rs | 124 ++++++++ quic/s2n-quic-core/src/buffer/reassembler.rs | 183 ++++++------ .../src/buffer/reassembler/reader.rs | 111 ++++--- .../src/buffer/reassembler/slot.rs | 275 +++++++++++++++--- .../src/buffer/reassembler/tests.rs | 135 ++++++--- quic/s2n-quic-core/src/lib.rs | 10 + quic/s2n-quic-core/src/stream/testing.rs | 8 +- 9 files changed, 644 insertions(+), 238 deletions(-) create mode 100644 quic/s2n-quic-core/src/buffer/reader/testing.rs diff --git a/quic/s2n-quic-core/src/buffer/reader.rs b/quic/s2n-quic-core/src/buffer/reader.rs index 655c132894..6db06fa251 100644 --- a/quic/s2n-quic-core/src/buffer/reader.rs +++ b/quic/s2n-quic-core/src/buffer/reader.rs @@ -17,6 +17,9 @@ pub use incremental::Incremental; pub use limit::Limit; pub use storage::Storage; +#[cfg(any(test, feature = "testing"))] +pub mod testing; + /// A buffer that can be read with a tracked offset and final position. pub trait Reader: Storage { /// Returns the currently read offset for the stream diff --git a/quic/s2n-quic-core/src/buffer/reader/storage/bytes.rs b/quic/s2n-quic-core/src/buffer/reader/storage/bytes.rs index fe3e88f1a0..8dcf0b46f2 100644 --- a/quic/s2n-quic-core/src/buffer/reader/storage/bytes.rs +++ b/quic/s2n-quic-core/src/buffer/reader/storage/bytes.rs @@ -2,7 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 use crate::buffer::{ - reader::{storage::Chunk, Storage}, + reader::{ + storage::{Chunk, Infallible as _}, + Storage, + }, writer, }; use bytes::{Bytes, BytesMut}; @@ -18,7 +21,15 @@ impl Storage for BytesMut { #[inline] fn read_chunk(&mut self, watermark: usize) -> Result { let len = self.len().min(watermark); - Ok(self.split_to(len).into()) + + ensure!(len > 0, Ok(Self::new().into())); + + // if this is reading the entire thing then swap `self` for an empty value + if self.capacity() == len { + Ok(core::mem::replace(self, Self::new()).into()) + } else { + Ok(self.split_to(len).into()) + } } #[inline] @@ -37,11 +48,15 @@ impl Storage for BytesMut { let watermark = self.len().min(dest.remaining_capacity()); if Dest::SPECIALIZES_BYTES_MUT { - let buffer = self.split_to(watermark); - dest.put_bytes_mut(buffer); + let Chunk::BytesMut(chunk) = self.infallible_read_chunk(watermark) else { + unsafe { assume!(false) } + }; + dest.put_bytes_mut(chunk); } else if Dest::SPECIALIZES_BYTES { - let buffer = self.split_to(watermark); - dest.put_bytes(buffer.freeze()); + let Chunk::BytesMut(chunk) = self.infallible_read_chunk(watermark) else { + unsafe { assume!(false) } + }; + dest.put_bytes(chunk.freeze()); } else { // copy bytes into the destination buf dest.put_slice(&self[..watermark]); @@ -83,8 +98,10 @@ impl Storage for Bytes { let watermark = self.len().min(dest.remaining_capacity()); if Dest::SPECIALIZES_BYTES { - let buffer = self.split_to(watermark); - dest.put_bytes(buffer); + let Chunk::Bytes(chunk) = self.infallible_read_chunk(watermark) else { + unsafe { assume!(false) } + }; + dest.put_bytes(chunk); } else { // copy bytes into the destination buf dest.put_slice(&self[..watermark]); diff --git a/quic/s2n-quic-core/src/buffer/reader/testing.rs b/quic/s2n-quic-core/src/buffer/reader/testing.rs new file mode 100644 index 0000000000..7b757226de --- /dev/null +++ b/quic/s2n-quic-core/src/buffer/reader/testing.rs @@ -0,0 +1,124 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use super::{storage::Infallible as _, Reader, Storage, VarInt}; +use core::convert::Infallible; + +pub struct Fallible<'a, R, E> +where + R: ?Sized + Storage, + E: 'static + Clone, +{ + inner: &'a mut R, + error: Option, +} + +impl<'a, R, E> Fallible<'a, R, E> +where + R: ?Sized + Storage, + E: 'static + Clone, +{ + #[inline] + pub fn new(inner: &'a mut R) -> Self { + Self { inner, error: None } + } + + #[inline] + pub fn with_error(mut self, error: E) -> Self { + self.error = Some(error); + self + } + + #[inline] + pub fn set_error(&mut self, error: Option) { + self.error = error; + } + + #[inline] + fn check_error(&self) -> Result<(), E> { + if let Some(error) = self.error.as_ref() { + Err(error.clone()) + } else { + Ok(()) + } + } +} + +impl<'a, R, E> Storage for Fallible<'a, R, E> +where + R: ?Sized + Storage, + E: 'static + Clone, +{ + type Error = E; + + #[inline] + fn buffered_len(&self) -> usize { + self.inner.buffered_len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.inner.buffer_is_empty() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result, Self::Error> { + self.check_error()?; + let chunk = self.inner.infallible_read_chunk(watermark); + Ok(chunk) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result, Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + self.check_error()?; + let chunk = self.inner.infallible_partial_copy_into(dest); + Ok(chunk) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + self.check_error()?; + self.inner.infallible_copy_into(dest); + Ok(()) + } +} + +impl<'a, R, E> Reader for Fallible<'a, R, E> +where + R: ?Sized + Reader, + E: 'static + Clone, +{ + #[inline] + fn current_offset(&self) -> VarInt { + self.inner.current_offset() + } + + #[inline] + fn final_offset(&self) -> Option { + self.inner.final_offset() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::buffer::reader::storage::Chunk; + use s2n_codec::DecoderError; + + #[test] + fn fallible_test() { + let mut reader = Chunk::Slice(b"hello"); + let mut reader = Fallible::new(&mut reader).with_error(DecoderError::UnexpectedEof(1)); + + assert!(reader.read_chunk(1).is_err()); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reassembler.rs b/quic/s2n-quic-core/src/buffer/reassembler.rs index 3c643952ce..343acb8b25 100644 --- a/quic/s2n-quic-core/src/buffer/reassembler.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler.rs @@ -4,7 +4,10 @@ //! This module contains data structures for buffering incoming streams. use crate::{ - buffer::{Error, Reader}, + buffer::{ + reader::storage::{Chunk, Infallible as _}, + Error, Reader, + }, varint::VarInt, }; use alloc::collections::{vec_deque, VecDeque}; @@ -91,6 +94,17 @@ struct Cursors { final_offset: u64, } +impl Cursors { + #[inline] + fn final_size(&self) -> Option { + if self.final_offset == UNKNOWN_FINAL_SIZE { + None + } else { + Some(self.final_offset) + } + } +} + impl Default for Cursors { #[inline] fn default() -> Self { @@ -126,11 +140,7 @@ impl Reassembler { /// Returns the final size of the stream, if known #[inline] pub fn final_size(&self) -> Option { - if self.cursors.final_offset == UNKNOWN_FINAL_SIZE { - None - } else { - Some(self.cursors.final_offset) - } + self.cursors.final_size() } /// Returns the amount of bytes available for reading. @@ -267,11 +277,10 @@ impl Reassembler { // start from the back with the assumption that most data arrives in order for idx in (0..self.slots.len()).rev() { let Some(slot) = self.slots.get(idx) else { - debug_assert!(false); unsafe { // SAFETY: `idx` should always be in bounds, since it's generated by the range // `0..slots.len()` - core::hint::unreachable_unchecked() + assume!(false); } }; @@ -324,10 +333,14 @@ impl Reassembler { while !reader.buffer_is_empty() { let Some(slot) = self.slots.get_mut(idx) else { - debug_assert!(false); - unsafe { core::hint::unreachable_unchecked() } + unsafe { + // SAFETY: `idx` should always be in bounds, since it's provided by a range + // that was bound to `slots.len()` + assume!(false); + } }; + // try filling the slot with the reader let filled = slot.try_write_reader(reader, &mut filled_slot)?; idx += 1; @@ -336,17 +349,12 @@ impl Reassembler { idx += 1; } + // if the reader is empty then we're done copying ensure!(!reader.buffer_is_empty(), break); - if let Some(next) = self.slots.get(idx) { - // the next slot is able to handle the reader - if next.start() <= reader.current_offset().as_u64() { - continue; - } - } + // we need to start allocating new slots + self.write_reader_with_alloc(reader, &mut idx, &mut filled_slot)?; - let slot = self.allocate_slot(reader); - self.insert(idx, slot); continue; } @@ -358,16 +366,50 @@ impl Reassembler { Ok(()) } + #[inline(always)] + fn write_reader_with_alloc( + &mut self, + reader: &mut R, + idx: &mut usize, + filled_slot: &mut bool, + ) -> Result<(), R::Error> + where + R: Reader + ?Sized, + { + while !reader.buffer_is_empty() { + if let Some(next) = self.slots.get(*idx) { + // the next slot is able to handle the reader so yield + ensure!(next.start() > reader.current_offset().as_u64(), break); + } + + // allocate a new slot for the reader + let mut slot = self.allocate_slot(reader); + + // try filling the slot with the reader + let filled = slot.try_write_reader(reader, filled_slot)?; + + // insert the newly allocated slot if the reader succeeded + self.insert(*idx, slot); + + *idx += 1; + if let Some(slot) = filled { + self.insert(*idx, slot); + *idx += 1; + } + } + + Ok(()) + } + #[inline] fn unsplit_range(&mut self, range: core::ops::Range) { // try to merge all of the slots that were modified for idx in range.rev() { let Some(slot) = self.slots.get(idx) else { - debug_assert!(false); unsafe { // SAFETY: `idx` should always be in bounds, since it's provided by a range // that was bound to `slots.len()` - core::hint::unreachable_unchecked() + assume!(false); } }; @@ -390,8 +432,11 @@ impl Reassembler { if let Some(next) = self.slots.remove(idx + 1) { self.slots[idx].unsplit(next); } else { - debug_assert!(false, "idx + 1 was checked above"); - unsafe { core::hint::unreachable_unchecked() } + unsafe { + // SAFETY: `idx` should always be in bounds, since it's provided by a range + // that was bound to `slots.len()` + assume!(false, "idx + 1 was checked above"); + } } } } @@ -409,39 +454,35 @@ impl Reassembler { .cursors .start_offset .checked_add(len.as_u64()) + .and_then(|v| VarInt::new(v).ok()) .ok_or(Error::OutOfRange)?; if let Some(final_size) = self.final_size() { - ensure!(final_size >= new_start_offset, Err(Error::InvalidFin)); + ensure!( + final_size >= new_start_offset.as_u64(), + Err(Error::InvalidFin) + ); } // record the maximum offset that we've seen - self.cursors.max_recv_offset = self.cursors.max_recv_offset.max(new_start_offset); + self.cursors.max_recv_offset = self.cursors.max_recv_offset.max(new_start_offset.as_u64()); // update the current start offset - self.cursors.start_offset = new_start_offset; + self.cursors.start_offset = new_start_offset.as_u64(); // clear out the slots to the new start offset while let Some(mut slot) = self.slots.pop_front() { // the new offset consumes the slot so drop and continue - if slot.end_allocated() < new_start_offset { + if slot.end_allocated() < new_start_offset.as_u64() { continue; } - match new_start_offset.checked_sub(slot.start()) { - None | Some(0) => { - // the slot starts after/on the new offset so put it back and break out - self.slots.push_front(slot); - } - Some(len) => { - // the slot overlaps with the new boundary so modify it and put it back if - // needed - slot.skip(len); - - if !slot.should_drop() { - self.slots.push_front(slot); - } - } + // skip to the new offset + slot.skip_until(new_start_offset).unwrap(); + + // put the slot back if it's still needed + if !slot.should_drop() { + self.slots.push_front(slot); } break; @@ -467,70 +508,20 @@ impl Reassembler { /// Pops a buffer from the front of the receive queue if available #[inline] pub fn pop(&mut self) -> Option { - self.pop_transform(|buffer, is_final_offset| { - let chunk = if is_final_offset || buffer.len() == buffer.capacity() { - core::mem::take(buffer) - } else { - buffer.split() - }; - let len = chunk.len(); - (chunk, len) - }) + self.pop_watermarked(usize::MAX) } /// Pops a buffer from the front of the receive queue, who's length is always guaranteed to be /// less than the provided `watermark`. #[inline] pub fn pop_watermarked(&mut self, watermark: usize) -> Option { - self.pop_transform(|buffer, is_final_offset| { - // make sure the buffer doesn't exceed the watermark - let watermark = watermark.min(buffer.len()); - - // if the watermark is 0 then don't needlessly increment refcounts - ensure!(watermark > 0, (BytesMut::new(), 0)); - - if watermark == buffer.len() && is_final_offset { - return (core::mem::take(buffer), watermark); - } - - (buffer.split_to(watermark), watermark) - }) - } - - /// Pops a buffer from the front of the receive queue as long as the `transform` function returns a - /// non-empty buffer. - #[inline] - fn pop_transform (O, usize), O>( - &mut self, - transform: F, - ) -> Option { - let slot = self.slots.front_mut()?; - - // make sure the slot has some data - ensure!(slot.is_occupied(self.cursors.start_offset), None); - - let is_final_offset = self.cursors.final_offset == slot.end(); - let buffer = slot.data_mut(); - - let (out, len) = transform(buffer, is_final_offset); - - // filter out empty buffers - ensure!(len > 0, None); - - slot.add_start(len); - - if slot.should_drop() { - // remove empty buffers - self.slots.pop_front(); - } - - probe::pop(self.cursors.start_offset, len); - - self.cursors.start_offset += len as u64; + let Chunk::BytesMut(chunk) = self.infallible_read_chunk(watermark) else { + unsafe { assume!(false) } + }; - self.invariants(); + ensure!(!chunk.is_empty(), None); - Some(out) + Some(chunk) } /// Returns the amount of data that had already been consumed from the diff --git a/quic/s2n-quic-core/src/buffer/reassembler/reader.rs b/quic/s2n-quic-core/src/buffer/reassembler/reader.rs index edc77719a8..0ad49194cb 100644 --- a/quic/s2n-quic-core/src/buffer/reassembler/reader.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/reader.rs @@ -29,11 +29,40 @@ impl Storage for Reassembler { #[inline] fn read_chunk(&mut self, watermark: usize) -> Result { - if let Some(chunk) = self.pop_watermarked(watermark) { - return Ok(chunk.into()); + let Some(slot) = self.slots.front_mut() else { + return Ok(BytesMut::new().into()); + }; + + // make sure the slot has some data + ensure!( + slot.is_occupied(self.cursors.start_offset), + Ok(BytesMut::new().into()) + ); + + // if we have a final size and this slot overlaps it then return the entire thing + let chunk = if self.cursors.final_size().map_or(false, |final_size| { + final_size <= slot.end_allocated() && watermark >= slot.buffered_len() + }) { + slot.consume() + } else { + let Chunk::BytesMut(chunk) = slot.read_chunk(watermark)? else { + unsafe { assume!(false) } + }; + chunk + }; + + if slot.should_drop() { + // remove empty buffers + self.slots.pop_front(); } - Ok(Default::default()) + super::probe::pop(self.cursors.start_offset, chunk.len()); + + self.cursors.start_offset += chunk.len() as u64; + + self.invariants(); + + Ok(chunk.into()) } #[inline] @@ -55,34 +84,25 @@ impl Storage for Reassembler { debug_assert!(remaining > 0); - match self.pop_watermarked(watermark) { - Some(chunk) => { - debug_assert!(!chunk.is_empty(), "pop should never return an empty chunk"); - debug_assert!( - chunk.len() <= watermark, - "chunk should never exceed watermark" - ); - - // flush the previous chunk if needed - if !prev.is_empty() { - dest.put_bytes_mut(prev); - } - - // if the chunk is exactly the same size as the watermark, then return it - if chunk.len() == watermark { - return Ok(chunk.into()); - } - - // store the chunk for another iteration, in case we can pull more - prev = chunk; - } - None if prev.is_empty() => { - return Ok(Default::default()); - } - None => { - return Ok(prev.into()); - } + let Chunk::BytesMut(chunk) = self.infallible_read_chunk(watermark) else { + unsafe { assume!(false) } + }; + + // if the chunk is empty then return the previous value + ensure!(!chunk.is_empty(), Ok(prev.into())); + + // flush the previous chunk if needed + if !prev.is_empty() { + dest.put_bytes_mut(prev); } + + // if the chunk is exactly the same size as the watermark, then return it + if chunk.len() == watermark { + return Ok(chunk.into()); + } + + // store the chunk for another iteration, in case we can pull more + prev = chunk; } } @@ -91,19 +111,38 @@ impl Storage for Reassembler { where Dest: writer::Storage + ?Sized, { + // if the destination wants bytes then use the partial copy logic instead + if Dest::SPECIALIZES_BYTES || Dest::SPECIALIZES_BYTES_MUT { + let mut chunk = self.infallible_partial_copy_into(dest); + chunk.infallible_copy_into(dest); + return Ok(()); + } + loop { // ensure we have enough capacity in the destination buf ensure!(dest.has_remaining_capacity(), Ok(())); - let transform = |buffer: &mut BytesMut, _is_final_offset| { - let mut dest = dest.track_write(); - buffer.infallible_copy_into(&mut dest); - ((), dest.written_len()) + let Some(slot) = self.slots.front_mut() else { + return Ok(()); }; - if self.pop_transform(transform).is_none() { - return Ok(()); + // make sure the slot has some data + ensure!(slot.is_occupied(self.cursors.start_offset), Ok(())); + + // avoid refcounting if the destination wants slices + let mut dest = dest.track_write(); + slot.infallible_copy_into(&mut dest); + + if slot.should_drop() { + // remove empty buffers + self.slots.pop_front(); } + + super::probe::pop(self.cursors.start_offset, dest.written_len()); + + self.cursors.start_offset += dest.written_len() as u64; + + self.invariants(); } } } diff --git a/quic/s2n-quic-core/src/buffer/reassembler/slot.rs b/quic/s2n-quic-core/src/buffer/reassembler/slot.rs index da9898cc57..fa3c6e47fa 100644 --- a/quic/s2n-quic-core/src/buffer/reassembler/slot.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/slot.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - buffer::{writer::Storage as _, Reader}, + buffer::{reader, writer::Storage as _, Reader}, varint::VarInt, }; use bytes::{Buf, BufMut, BytesMut}; @@ -38,7 +38,7 @@ impl Slot { } #[inline(always)] - pub fn try_write_reader( + pub fn try_write_reader( &mut self, reader: &mut R, filled_slot: &mut bool, @@ -61,54 +61,90 @@ impl Slot { ensure!(!reader.buffer_is_empty(), Ok(None)); + // read the current offset + let start = reader.current_offset().as_u64(); + // make sure this slot owns this range of data - ensure!( - reader.current_offset().as_u64() < self.end_allocated(), - Ok(None) - ); - - // if the current offsets match just do a straight copy - if reader.current_offset().as_u64() == end { - self.write_reader_end(reader, filled_slot)?; + ensure!(start < self.end_allocated(), Ok(None)); + + // if the current offsets match just do a straight copy on to the end of the buffer + if start == end { + self.write_reader_append(reader, filled_slot)?; self.invariants(); return Ok(None); } - // split off the unfilled chunk from the filled chunk and return this filled one + // copy and split off the filled data into another slot + let filled = self.write_reader_split(reader, filled_slot)?; - // find the split point between the buffers - let unfilled_len = reader.current_offset().as_u64() - self.start(); + self.invariants(); + filled.invariants(); - // create a new mid slot - let start = reader.current_offset().as_u64(); - let data = unsafe { - assume!(self.data.len() < unfilled_len as usize,); - self.data.split_off(unfilled_len as usize) - }; + Ok(Some(filled)) + } - let mut filled = Self { - start, - end: self.end, - data, - }; + #[inline(always)] + fn write_reader_split( + &mut self, + reader: &mut R, + filled_slot: &mut bool, + ) -> Result + where + R: Reader + ?Sized, + { + let reader_start = reader.current_offset().as_u64(); - // copy the data to the buffer - if let Err(err) = filled.write_reader_end(reader, filled_slot) { - // revert the split since the reader failed - self.data.unsplit(filled.data); - return Err(err); + unsafe { + assume!(reader_start > self.end()); } + let offset = reader_start - self.end(); - self.end = start; + let chunk = self.data.spare_capacity_mut(); - self.invariants(); + unsafe { + // SAFETY: the data buffer should have at least one byte of spare capacity if we got to + // this point + assume!(chunk.len() as u64 > offset); + } + + let chunk = &mut chunk[offset as usize..]; + let mut chunk = bytes::buf::UninitSlice::uninit(chunk); + let chunk_len = chunk.len(); + let mut chunk = chunk.track_write(); + reader.copy_into(&mut chunk)?; + let filled_len = chunk.written_len(); + + super::probe::write(reader_start, filled_len); + + let filled = unsafe { + // SAFETY: we should not have written more than the spare capacity + let offset = offset as usize; + + assume!(self.data.len() + offset <= self.data.capacity()); + let mut filled = self.data.split_off(self.data.len() + offset); + + assume!(filled.is_empty()); + assume!(filled_len <= filled.capacity() - filled.len()); + filled.advance_mut(filled_len); + filled + }; + *filled_slot |= chunk_len == filled_len; + + let filled = Self { + start: reader_start, + end: self.end, + data: filled, + }; filled.invariants(); - Ok(Some(filled)) + self.end = reader_start; + self.invariants(); + + Ok(filled) } #[inline(always)] - fn write_reader_end( + fn write_reader_append( &mut self, reader: &mut R, filled_slot: &mut bool, @@ -179,22 +215,11 @@ impl Slot { &self.data } - #[inline(always)] - pub fn data_mut(&mut self) -> &mut BytesMut { - &mut self.data - } - #[inline(always)] pub fn start(&self) -> u64 { self.start } - #[inline(always)] - pub fn add_start(&mut self, len: usize) { - self.start += len as u64; - self.invariants() - } - #[inline(always)] pub fn end(&self) -> u64 { self.start + self.data.len() as u64 @@ -205,6 +230,13 @@ impl Slot { self.end } + #[inline(always)] + pub fn consume(&mut self) -> BytesMut { + let data = core::mem::replace(&mut self.data, BytesMut::new()); + self.start = self.end; + data + } + #[inline] pub fn skip(&mut self, len: u64) { // trim off the data buffer @@ -238,10 +270,157 @@ impl Slot { #[inline(always)] fn invariants(&self) { if cfg!(debug_assertions) { - assert!(self.data.capacity() <= 1 << 16, "{:?}", self); - assert!(self.start() <= self.end(), "{:?}", self); - assert!(self.start() <= self.end_allocated(), "{:?}", self); - assert!(self.end() <= self.end_allocated(), "{:?}", self); + assert!(self.data.capacity() <= 1 << 16, "{self:?}"); + assert!(self.start() <= self.end(), "{self:?}"); + assert!(self.start() <= self.end_allocated(), "{self:?}"); + assert!(self.end() <= self.end_allocated(), "{self:?}"); + assert_eq!( + self.data.capacity() as u64, + self.end_allocated() - self.start(), + "{self:?}" + ); + assert_eq!( + self.data.len() as u64, + self.end() - self.start(), + "{self:?}" + ); } } } + +impl reader::Storage for Slot { + type Error = core::convert::Infallible; + + #[inline] + fn buffered_len(&self) -> usize { + self.data.buffered_len() + } + + #[inline] + fn buffer_is_empty(&self) -> bool { + self.data.buffer_is_empty() + } + + #[inline] + fn read_chunk(&mut self, watermark: usize) -> Result, Self::Error> { + let chunk = self.data.read_chunk(watermark)?; + self.start += chunk.len() as u64; + Ok(chunk) + } + + #[inline] + fn partial_copy_into( + &mut self, + dest: &mut Dest, + ) -> Result, Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + let mut dest = dest.track_write(); + let chunk = self.data.partial_copy_into(&mut dest)?; + self.start += dest.written_len() as u64; + self.start += chunk.len() as u64; + Ok(chunk) + } + + #[inline] + fn copy_into(&mut self, dest: &mut Dest) -> Result<(), Self::Error> + where + Dest: crate::buffer::writer::Storage + ?Sized, + { + let mut dest = dest.track_write(); + self.data.copy_into(&mut dest)?; + self.start += dest.written_len() as u64; + self.invariants(); + Ok(()) + } +} + +impl Reader for Slot { + #[inline] + fn current_offset(&self) -> VarInt { + unsafe { VarInt::new_unchecked(self.start) } + } + + #[inline] + fn final_offset(&self) -> Option { + Some(unsafe { VarInt::new_unchecked(self.end) }) + } + + #[inline] + fn skip_until(&mut self, offset: VarInt) -> Result<(), Self::Error> { + if let Some(len) = offset.as_u64().checked_sub(self.current_offset().as_u64()) { + self.skip(len); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{buffer::reader::testing::Fallible, stream::testing::Data}; + use bolero::{check, TypeGenerator}; + + #[derive(Clone, Copy, Debug, TypeGenerator)] + struct Params { + slot_offset: VarInt, + slot_filled: u16, + slot_len: u16, + skip_len: u16, + reader_offset: VarInt, + reader_len: u16, + is_error: bool, + } + + impl Params { + fn run(&self) { + let mut slot = self.slot(); + let mut reader = self.reader(&slot); + let mut reader = reader.with_read_limit(self.reader_len as _); + + if self.is_error { + let mut reader = Fallible::new(&mut reader).with_error(()); + let _ = slot.try_write_reader(&mut reader, &mut false); + } else { + let _ = slot.try_write_reader(&mut reader, &mut false); + } + } + + fn slot(&self) -> Slot { + let start = self.slot_offset.as_u64(); + let end = start + self.slot_len as u64; + let end = VarInt::MAX.as_u64().min(end); + let len = end - start; + let mut bytes = BytesMut::with_capacity(len as _); + + // fill some bytes + let filled_len = len.min(self.slot_filled as _) as usize; + bytes.resize(filled_len, 0); + + let mut slot = Slot::new(start, end, bytes); + + // skip some bytes + let skip_len = len.min(self.skip_len as u64); + slot.skip(skip_len); + + slot + } + + fn reader(&self, slot: &Slot) -> Data { + let mut reader = Data::new(u64::MAX); + // the reader needs to be at least start at the same offset as the slot + let start = self.reader_offset.as_u64().max(slot.start); + reader.seek_forward(start); + reader + } + } + + #[test] + fn try_write_test() { + check!().with_type::().for_each(|params| { + params.run(); + }); + } +} diff --git a/quic/s2n-quic-core/src/buffer/reassembler/tests.rs b/quic/s2n-quic-core/src/buffer/reassembler/tests.rs index c968ee66e7..668487802c 100644 --- a/quic/s2n-quic-core/src/buffer/reassembler/tests.rs +++ b/quic/s2n-quic-core/src/buffer/reassembler/tests.rs @@ -3,6 +3,10 @@ use super::*; use crate::{ + buffer::{ + reader::{testing::Fallible, Storage as _}, + writer::Storage as _, + }, stream::testing::Data, varint::{VarInt, MAX_VARINT_VALUE}, }; @@ -15,6 +19,7 @@ enum Op { #[generator(0..=Data::MAX_CHUNK_LEN)] len: usize, is_fin: bool, + is_error: bool, }, Pop { watermark: Option, @@ -24,51 +29,87 @@ enum Op { }, } -#[test] -#[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time -fn model_test() { - check!().with_type::>().for_each(|ops| { - let mut buffer = Reassembler::new(); - let mut recv = Data::new(u64::MAX); +#[derive(Debug)] +struct Model { + buffer: Reassembler, + recv: Data, +} + +impl Default for Model { + fn default() -> Self { + Self { + buffer: Reassembler::new(), + recv: Data::new(u64::MAX), + } + } +} + +impl Model { + fn apply_all(&mut self, ops: &[Op]) { for op in ops { - match *op { - Op::Write { - offset, - len, - is_fin, - } => { - let chunk = Data::send_one_at(offset.as_u64(), len); - if is_fin { - let _ = buffer.write_at_fin(offset, &chunk); - } else { - let _ = buffer.write_at(offset, &chunk); - } + self.apply(op); + } + } + + fn apply(&mut self, op: &Op) { + let Self { buffer, recv } = self; + + match *op { + Op::Write { + offset, + len, + is_fin, + is_error, + } => { + let mut send = if is_fin { + Data::new(offset.as_u64() + len as u64) + } else { + Data::new(u64::MAX) + }; + send.seek_forward(offset.as_u64()); + let mut send = send.with_read_limit(len); + + // inject errors + if is_error { + let mut send = Fallible::new(&mut send).with_error(()); + let _ = buffer.write_reader(&mut send); + } else { + let _ = buffer.write_reader(&mut send); } - Op::Pop { watermark } => { - if let Some(watermark) = watermark { - if let Some(chunk) = buffer.pop_watermarked(watermark as _) { - assert!(chunk.len() <= watermark as usize); - recv.receive(&[&chunk]); - } - } else if let Some(chunk) = buffer.pop() { - assert!(!chunk.is_empty(), "popped chunks should never be empty"); - recv.receive(&[&chunk]); - } + } + Op::Pop { watermark } => { + if let Some(watermark) = watermark { + let mut recv = recv.with_write_limit(watermark as _); + let _ = buffer.copy_into(&mut recv); + } else { + let _ = buffer.copy_into(recv); } - Op::Skip { len } => { - let consumed_len = buffer.consumed_len(); - if buffer.skip(len).is_ok() { - let new_consumed_len = buffer.consumed_len(); - assert_eq!(new_consumed_len, consumed_len + len.as_u64()); - recv.seek_forward(len.as_u64()); - } + } + Op::Skip { len } => { + let consumed_len = buffer.consumed_len(); + if buffer.skip(len).is_ok() { + let new_consumed_len = buffer.consumed_len(); + assert_eq!(new_consumed_len, consumed_len + len.as_u64()); + recv.seek_forward(len.as_u64()); } } } + } + fn finish(&mut self) { // make sure a cleared buffer is the same as a new one - buffer.reset(); - assert_eq!(buffer, Reassembler::new()); + self.buffer.reset(); + assert_eq!(self.buffer, Reassembler::new()); + } +} + +#[test] +#[cfg_attr(miri, ignore)] // This test is too expensive for miri to complete in a reasonable amount of time +fn model_test() { + check!().with_type::>().for_each(|ops| { + let mut model = Model::default(); + model.apply_all(ops); + model.finish(); }) } @@ -868,16 +909,16 @@ fn write_partial_fin_test() { for buf in [&mut buffer, &mut oracle] { let mut chunks = vec![]; let mut actual_len = 0; - let mut allocated_len = 0; - - // use pop_transform so we can take the entire buffer and get an accurate `capacity` value - while let Some(chunk) = buf.pop_transform(|chunk, _is_final_chunk| { - let chunk = core::mem::take(chunk); - let len = chunk.len(); - (chunk, len) - }) { + + // look at how many bytes we actually allocated + let allocated_len: u64 = buf + .slots + .iter() + .map(|slot| slot.end_allocated() - slot.start()) + .sum(); + + while let Some(chunk) = buf.pop() { actual_len += chunk.len(); - allocated_len += chunk.capacity(); chunks.push(chunk); } @@ -911,7 +952,7 @@ fn write_partial_fin_test() { ); if reverse { - let ideal_allocation = (partial_size + fin_size) as usize; + let ideal_allocation = (partial_size + fin_size) as u64; assert_eq!( actual_results.2, ideal_allocation, "if the chunks were reversed, the allocation should be ideal" diff --git a/quic/s2n-quic-core/src/lib.rs b/quic/s2n-quic-core/src/lib.rs index 4ec0e3cf23..c033ee1fa8 100644 --- a/quic/s2n-quic-core/src/lib.rs +++ b/quic/s2n-quic-core/src/lib.rs @@ -17,6 +17,16 @@ extern crate alloc; /// may optimize based on false assumptions and behave incorrectly. #[macro_export] macro_rules! assume { + (false) => { + assume!(false, "assumption failed") + }; + (false $(, $fmtarg:expr)* $(,)?) => {{ + if cfg!(not(debug_assertions)) { + core::hint::unreachable_unchecked(); + } + + panic!($($fmtarg),*) + }}; ($cond:expr) => { $crate::assume!($cond, "assumption failed: {}", stringify!($cond)); }; diff --git a/quic/s2n-quic-core/src/stream/testing.rs b/quic/s2n-quic-core/src/stream/testing.rs index 3f5c824d87..98b530b2d3 100644 --- a/quic/s2n-quic-core/src/stream/testing.rs +++ b/quic/s2n-quic-core/src/stream/testing.rs @@ -155,7 +155,7 @@ impl reader::Storage for Data { #[inline] fn buffered_len(&self) -> usize { - (self.len - self.offset).try_into().unwrap() + (self.len - self.offset).try_into().unwrap_or(usize::MAX) } #[inline] @@ -189,12 +189,14 @@ impl reader::Storage for Data { impl reader::Reader for Data { #[inline] fn current_offset(&self) -> crate::varint::VarInt { - self.offset().try_into().unwrap() + self.offset() + .try_into() + .unwrap_or(crate::varint::VarInt::MAX) } #[inline] fn final_offset(&self) -> Option { - Some(self.len.try_into().unwrap()) + self.len.try_into().ok() } } From 6dd41e09195bc22dbac93a48f8ab35f8063726dc Mon Sep 17 00:00:00 2001 From: Wesley Rosenblum <55108558+WesleyRosenblum@users.noreply.github.com> Date: Fri, 3 May 2024 19:12:30 -0700 Subject: [PATCH 2/2] build: fix clippy warnings for 1.78 (#2199) * build: fix clippy warnings for 1.78 * remove unused packet_number_space method * allow dead code * remove unreachables --- dc/s2n-quic-dc/src/msg/addr.rs | 2 -- quic/s2n-quic-core/src/interval_set/mod.rs | 2 +- quic/s2n-quic-qns/src/tls.rs | 9 +++++---- quic/s2n-quic-sim/src/batch.rs | 2 ++ quic/s2n-quic-transport/src/connection/open_token.rs | 6 ++++++ .../src/stream/controller/local_initiated.rs | 2 +- quic/s2n-quic-transport/src/stream/stream_container.rs | 2 +- quic/s2n-quic-transport/src/transmission/application.rs | 5 ----- .../src/transmission/connection_close.rs | 4 ---- quic/s2n-quic-transport/src/transmission/early.rs | 4 ---- quic/s2n-quic-transport/src/transmission/mod.rs | 7 +------ quic/s2n-quic/src/provider.rs | 6 ++++++ 12 files changed, 23 insertions(+), 28 deletions(-) diff --git a/dc/s2n-quic-dc/src/msg/addr.rs b/dc/s2n-quic-dc/src/msg/addr.rs index d003d68504..2ce9d6ca57 100644 --- a/dc/s2n-quic-dc/src/msg/addr.rs +++ b/dc/s2n-quic-dc/src/msg/addr.rs @@ -71,7 +71,6 @@ impl Addr { } _ => unsafe { assume!(false, "invalid remote address"); - unreachable!() }, } } @@ -113,7 +112,6 @@ impl Addr { } _ => unsafe { assume!(false, "invalid remote address"); - unreachable!() }, } } diff --git a/quic/s2n-quic-core/src/interval_set/mod.rs b/quic/s2n-quic-core/src/interval_set/mod.rs index 440bd21613..158f432e07 100644 --- a/quic/s2n-quic-core/src/interval_set/mod.rs +++ b/quic/s2n-quic-core/src/interval_set/mod.rs @@ -375,7 +375,7 @@ impl IntervalSet { #[inline] pub fn union(&mut self, other: &Self) -> Result<(), IntervalSetError> { if self.intervals.is_empty() { - self.intervals = other.intervals.clone(); + self.intervals.clone_from(&other.intervals); return Ok(()); } diff --git a/quic/s2n-quic-qns/src/tls.rs b/quic/s2n-quic-qns/src/tls.rs index ddbadc095f..fea0553272 100644 --- a/quic/s2n-quic-qns/src/tls.rs +++ b/quic/s2n-quic-qns/src/tls.rs @@ -164,14 +164,15 @@ impl Default for TlsProviders { } } -impl ToString for TlsProviders { - fn to_string(&self) -> String { - match self { +impl core::fmt::Display for TlsProviders { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let str = match self { #[cfg(unix)] TlsProviders::S2N => String::from("s2n-tls"), TlsProviders::Rustls => String::from("rustls"), TlsProviders::Null => String::from("null"), - } + }; + write!(f, "{}", str) } } diff --git a/quic/s2n-quic-sim/src/batch.rs b/quic/s2n-quic-sim/src/batch.rs index 43761a8534..1a574a9767 100644 --- a/quic/s2n-quic-sim/src/batch.rs +++ b/quic/s2n-quic-sim/src/batch.rs @@ -44,6 +44,8 @@ impl Batch { plan.run(out, &command, self.skip_run, &mut reports)?; } + // See https://github.com/rust-lang/rust-clippy/pull/12756 + #[allow(clippy::assigning_clones)] for (_title, report) in reports.iter_mut() { *report = report.strip_prefix(out).unwrap().to_owned(); } diff --git a/quic/s2n-quic-transport/src/connection/open_token.rs b/quic/s2n-quic-transport/src/connection/open_token.rs index 0c7d21e05b..9a3840af95 100644 --- a/quic/s2n-quic-transport/src/connection/open_token.rs +++ b/quic/s2n-quic-transport/src/connection/open_token.rs @@ -30,6 +30,12 @@ impl Pair { } } +impl Default for Pair { + fn default() -> Self { + Self::new() + } +} + #[derive(Debug)] pub(crate) struct Token(Option); diff --git a/quic/s2n-quic-transport/src/stream/controller/local_initiated.rs b/quic/s2n-quic-transport/src/stream/controller/local_initiated.rs index cc4f0d90b7..086837e8f1 100644 --- a/quic/s2n-quic-transport/src/stream/controller/local_initiated.rs +++ b/quic/s2n-quic-transport/src/stream/controller/local_initiated.rs @@ -99,7 +99,7 @@ impl LocalInitiated InterestLists { waiting_for_stream_flow_control_credits ); - if !interests.retained != node.done_streams_link.is_linked() { + if interests.retained == node.done_streams_link.is_linked() { if !interests.retained { self.done_streams.push_back(node.clone()); } else { diff --git a/quic/s2n-quic-transport/src/transmission/application.rs b/quic/s2n-quic-transport/src/transmission/application.rs index 78e01a1fe9..a7338824ef 100644 --- a/quic/s2n-quic-transport/src/transmission/application.rs +++ b/quic/s2n-quic-transport/src/transmission/application.rs @@ -14,7 +14,6 @@ use crate::{ transmission::{self, Mode, Provider as _}, }; use core::ops::RangeInclusive; -use s2n_quic_core::packet::number::PacketNumberSpace; pub enum Payload<'a, Config: endpoint::Config> { Normal(Normal<'a, Config>), @@ -84,10 +83,6 @@ impl<'a, Config: endpoint::Config> super::Payload for Payload<'a, Config> { Payload::PathValidationOnly(inner) => inner.on_transmit(context), } } - - fn packet_number_space(&self) -> PacketNumberSpace { - PacketNumberSpace::ApplicationData - } } impl<'a, Config: endpoint::Config> transmission::interest::Provider for Payload<'a, Config> { diff --git a/quic/s2n-quic-transport/src/transmission/connection_close.rs b/quic/s2n-quic-transport/src/transmission/connection_close.rs index 219a9830aa..3f6836d9cb 100644 --- a/quic/s2n-quic-transport/src/transmission/connection_close.rs +++ b/quic/s2n-quic-transport/src/transmission/connection_close.rs @@ -19,10 +19,6 @@ impl<'a> super::Payload for Payload<'a> { fn on_transmit(&mut self, context: &mut W) { context.write_frame(self.connection_close); } - - fn packet_number_space(&self) -> PacketNumberSpace { - self.packet_number_space - } } impl<'a> transmission::interest::Provider for Payload<'a> { diff --git a/quic/s2n-quic-transport/src/transmission/early.rs b/quic/s2n-quic-transport/src/transmission/early.rs index 02a351d396..e199e5e13a 100644 --- a/quic/s2n-quic-transport/src/transmission/early.rs +++ b/quic/s2n-quic-transport/src/transmission/early.rs @@ -48,10 +48,6 @@ impl<'a, Config: endpoint::Config> super::Payload for Payload<'a, Config> { self.ack_manager.on_transmit_complete(context); } } - - fn packet_number_space(&self) -> PacketNumberSpace { - self.packet_number_space - } } impl<'a, Config: endpoint::Config> transmission::interest::Provider for Payload<'a, Config> { diff --git a/quic/s2n-quic-transport/src/transmission/mod.rs b/quic/s2n-quic-transport/src/transmission/mod.rs index 8692519c1a..840538c689 100644 --- a/quic/s2n-quic-transport/src/transmission/mod.rs +++ b/quic/s2n-quic-transport/src/transmission/mod.rs @@ -23,18 +23,13 @@ use s2n_codec::{encoder::scatter, Encoder}; use s2n_quic_core::{ event, frame::Padding, - packet::{ - encoding::PacketPayloadEncoder, - number::{PacketNumber, PacketNumberSpace}, - stateless_reset, - }, + packet::{encoding::PacketPayloadEncoder, number::PacketNumber, stateless_reset}, time::Timestamp, }; pub trait Payload: interest::Provider { fn size_hint(&self, payload_range: RangeInclusive) -> usize; fn on_transmit(&mut self, context: &mut W); - fn packet_number_space(&self) -> PacketNumberSpace; } pub struct Transmission<'a, 'sub, Config: endpoint::Config, P: Payload> { diff --git a/quic/s2n-quic/src/provider.rs b/quic/s2n-quic/src/provider.rs index caa2609c21..dc827470e5 100644 --- a/quic/s2n-quic/src/provider.rs +++ b/quic/s2n-quic/src/provider.rs @@ -18,14 +18,18 @@ pub mod stateless_reset_token; pub mod tls; // These providers are not currently exposed to applications +#[allow(dead_code)] pub(crate) mod connection_close_formatter; +#[allow(dead_code)] pub(crate) mod path_migration; +#[allow(dead_code)] pub(crate) mod sync; cfg_if!( if #[cfg(any(test, feature = "unstable-provider-packet-interceptor"))] { pub mod packet_interceptor; } else { + #[allow(dead_code)] pub(crate) mod packet_interceptor; } ); @@ -34,6 +38,7 @@ cfg_if!( if #[cfg(any(test, feature = "unstable-provider-random"))] { pub mod random; } else { + #[allow(dead_code)] pub(crate) mod random; } ); @@ -42,6 +47,7 @@ cfg_if!( if #[cfg(any(test, feature = "unstable-provider-datagram"))] { pub mod datagram; } else { + #[allow(dead_code)] pub(crate) mod datagram; } );