Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps: h3-0.0.6 (w/ tracing feature as h3-quinn) #2263

Merged
merged 2 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion quic/s2n-quic-h3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ publish = false
[dependencies]
bytes = { version = "1", default-features = false }
futures = { version = "0.3", default-features = false }
h3 = "0.0.5"
h3 = "0.0.6"
s2n-quic = { path = "../s2n-quic" }
s2n-quic-core = { path = "../s2n-quic-core" }
tracing = { version = "0.1", optional = true }

[features]
tracing = ["dep:tracing"]
59 changes: 42 additions & 17 deletions quic/s2n-quic-h3/src/s2n_quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ use std::{
task::{self, Poll},
};

#[cfg(feature = "tracing")]
use tracing::instrument;

pub struct Connection {
conn: s2n_quic::connection::Handle,
bidi_acceptor: s2n_quic::connection::BidirectionalStreamAcceptor,
Expand Down Expand Up @@ -66,27 +69,27 @@ impl<B> quic::Connection<B> for Connection
where
B: Buf,
{
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
type OpenStreams = OpenStreams;
type Error = ConnectionError;
type AcceptError = ConnectionError;

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_accept_recv(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::RecvStream>, Self::Error>> {
) -> Poll<Result<Option<Self::RecvStream>, Self::AcceptError>> {
let recv = match ready!(self.recv_acceptor.poll_accept_receive_stream(cx))? {
Some(x) => x,
None => return Poll::Ready(Ok(None)),
};
Poll::Ready(Ok(Some(Self::RecvStream::new(recv))))
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_accept_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::BidiStream>, Self::Error>> {
) -> Poll<Result<Option<Self::BidiStream>, Self::AcceptError>> {
let (recv, send) = match ready!(self.bidi_acceptor.poll_accept_bidirectional_stream(cx))? {
Some(x) => x.split(),
None => return Poll::Ready(Ok(None)),
Expand All @@ -97,28 +100,40 @@ where
})))
}

fn opener(&self) -> Self::OpenStreams {
OpenStreams {
conn: self.conn.clone(),
}
}
}

impl<B> quic::OpenStreams<B> for Connection
where
B: Buf,
{
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type OpenError = ConnectionError;

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::Error>> {
) -> Poll<Result<Self::BidiStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?;
Ok(stream.into()).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::Error>> {
) -> Poll<Result<Self::SendStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_send_stream(cx))?;
Ok(stream.into()).into()
}

fn opener(&self) -> Self::OpenStreams {
OpenStreams {
conn: self.conn.clone(),
}
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn close(&mut self, code: h3::error::Code, _reason: &[u8]) {
self.conn.close(
code.value()
Expand All @@ -138,25 +153,27 @@ where
{
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
type Error = ConnectionError;
type OpenError = ConnectionError;

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::Error>> {
) -> Poll<Result<Self::BidiStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?;
Ok(stream.into()).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::Error>> {
) -> Poll<Result<Self::SendStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_send_stream(cx))?;
Ok(stream.into()).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn close(&mut self, code: h3::error::Code, _reason: &[u8]) {
self.conn.close(
code.value()
Expand Down Expand Up @@ -271,6 +288,7 @@ impl quic::RecvStream for RecvStream {
type Buf = Bytes;
type Error = ReadError;

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_data(
&mut self,
cx: &mut task::Context<'_>,
Expand All @@ -279,13 +297,15 @@ impl quic::RecvStream for RecvStream {
Ok(buf).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn stop_sending(&mut self, error_code: u64) {
let _ = self.stream.stop_sending(
s2n_quic::application::Error::new(error_code)
.expect("s2n-quic supports error codes up to 2^62-1"),
);
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn recv_id(&self) -> StreamId {
self.stream.id().try_into().expect("invalid stream id")
}
Expand Down Expand Up @@ -369,6 +389,7 @@ where
{
type Error = SendStreamError;

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
// try to flush the current chunk if we have one
Expand Down Expand Up @@ -409,6 +430,7 @@ where
// Poll::Ready(Ok(()))
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), Self::Error> {
if self.buf.is_some() {
return Err(Self::Error::NotReady);
Expand All @@ -427,19 +449,22 @@ where
// Ok(())
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// ensure all chunks are flushed to the QUIC stream before finishing
ready!(self.poll_ready(cx))?;
self.stream.finish()?;
Ok(()).into()
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn reset(&mut self, reset_code: u64) {
let _ = self
.stream
.reset(reset_code.try_into().unwrap_or_else(|_| VarInt::MAX.into()));
}

#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn send_id(&self) -> StreamId {
self.stream.id().try_into().expect("invalid stream id")
}
Expand Down
Loading