Skip to content

Commit

Permalink
refactor(s2n-quic-core): improve reassembler error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
camshaft committed May 3, 2024
1 parent b085808 commit ea9be85
Show file tree
Hide file tree
Showing 9 changed files with 621 additions and 230 deletions.
3 changes: 3 additions & 0 deletions quic/s2n-quic-core/src/buffer/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ pub use incremental::Incremental;
pub use limit::Limit;
pub use storage::Storage;

#[cfg(any(test, feature = "testing"))]
pub mod testing;

/// A buffer that can be read with a tracked offset and final position.
pub trait Reader: Storage {
/// Returns the currently read offset for the stream
Expand Down
33 changes: 25 additions & 8 deletions quic/s2n-quic-core/src/buffer/reader/storage/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// SPDX-License-Identifier: Apache-2.0

use crate::buffer::{
reader::{storage::Chunk, Storage},
reader::{
storage::{Chunk, Infallible as _},
Storage,
},
writer,
};
use bytes::{Bytes, BytesMut};
Expand All @@ -18,7 +21,15 @@ impl Storage for BytesMut {
#[inline]
fn read_chunk(&mut self, watermark: usize) -> Result<Chunk, Self::Error> {
let len = self.len().min(watermark);
Ok(self.split_to(len).into())

ensure!(len > 0, Ok(Self::new().into()));

// if this is reading the entire thing then swap `self` for an empty value
if self.capacity() == len {
Ok(core::mem::replace(self, Self::new()).into())
} else {
Ok(self.split_to(len).into())
}
}

#[inline]
Expand All @@ -37,11 +48,15 @@ impl Storage for BytesMut {
let watermark = self.len().min(dest.remaining_capacity());

if Dest::SPECIALIZES_BYTES_MUT {
let buffer = self.split_to(watermark);
dest.put_bytes_mut(buffer);
let Chunk::BytesMut(chunk) = self.infallible_read_chunk(watermark) else {
unsafe { assume!(false) }
};
dest.put_bytes_mut(chunk);
} else if Dest::SPECIALIZES_BYTES {
let buffer = self.split_to(watermark);
dest.put_bytes(buffer.freeze());
let Chunk::BytesMut(chunk) = self.infallible_read_chunk(watermark) else {
unsafe { assume!(false) }
};
dest.put_bytes(chunk.freeze());
} else {
// copy bytes into the destination buf
dest.put_slice(&self[..watermark]);
Expand Down Expand Up @@ -83,8 +98,10 @@ impl Storage for Bytes {
let watermark = self.len().min(dest.remaining_capacity());

if Dest::SPECIALIZES_BYTES {
let buffer = self.split_to(watermark);
dest.put_bytes(buffer);
let Chunk::Bytes(chunk) = self.infallible_read_chunk(watermark) else {
unsafe { assume!(false) }
};
dest.put_bytes(chunk);
} else {
// copy bytes into the destination buf
dest.put_slice(&self[..watermark]);
Expand Down
124 changes: 124 additions & 0 deletions quic/s2n-quic-core/src/buffer/reader/testing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use super::{storage::Infallible as _, Reader, Storage, VarInt};
use core::convert::Infallible;

pub struct Fallible<'a, R, E>
where
R: ?Sized + Storage<Error = Infallible>,
E: 'static + Clone,
{
inner: &'a mut R,
error: Option<E>,
}

impl<'a, R, E> Fallible<'a, R, E>
where
R: ?Sized + Storage<Error = Infallible>,
E: 'static + Clone,
{
#[inline]
pub fn new(inner: &'a mut R) -> Self {
Self { inner, error: None }
}

#[inline]
pub fn with_error(mut self, error: E) -> Self {
self.error = Some(error);
self
}

#[inline]
pub fn set_error(&mut self, error: Option<E>) {
self.error = error;
}

#[inline]
fn check_error(&self) -> Result<(), E> {
if let Some(error) = self.error.as_ref() {
Err(error.clone())
} else {
Ok(())
}
}
}

impl<'a, R, E> Storage for Fallible<'a, R, E>
where
R: ?Sized + Storage<Error = Infallible>,
E: 'static + Clone,
{
type Error = E;

#[inline]
fn buffered_len(&self) -> usize {
self.inner.buffered_len()
}

#[inline]
fn buffer_is_empty(&self) -> bool {
self.inner.buffer_is_empty()
}

#[inline]
fn read_chunk(&mut self, watermark: usize) -> Result<super::storage::Chunk<'_>, Self::Error> {
self.check_error()?;
let chunk = self.inner.infallible_read_chunk(watermark);
Ok(chunk)
}

#[inline]
fn partial_copy_into<Dest>(
&mut self,
dest: &mut Dest,
) -> Result<super::storage::Chunk<'_>, Self::Error>
where
Dest: crate::buffer::writer::Storage + ?Sized,
{
self.check_error()?;
let chunk = self.inner.infallible_partial_copy_into(dest);
Ok(chunk)
}

#[inline]
fn copy_into<Dest>(&mut self, dest: &mut Dest) -> Result<(), Self::Error>
where
Dest: crate::buffer::writer::Storage + ?Sized,
{
self.check_error()?;
self.inner.infallible_copy_into(dest);
Ok(())
}
}

impl<'a, R, E> Reader for Fallible<'a, R, E>
where
R: ?Sized + Reader<Error = Infallible>,
E: 'static + Clone,
{
#[inline]
fn current_offset(&self) -> VarInt {
self.inner.current_offset()
}

#[inline]
fn final_offset(&self) -> Option<VarInt> {
self.inner.final_offset()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::buffer::reader::storage::Chunk;
use s2n_codec::DecoderError;

#[test]
fn fallible_test() {
let mut reader = Chunk::Slice(b"hello");
let mut reader = Fallible::new(&mut reader).with_error(DecoderError::UnexpectedEof(1));

assert!(reader.read_chunk(1).is_err());
}
}
Loading

0 comments on commit ea9be85

Please sign in to comment.