diff --git a/sdk/core/src/seekable_stream.rs b/sdk/core/src/seekable_stream.rs index 0e96f5fc53..01c3cc6036 100644 --- a/sdk/core/src/seekable_stream.rs +++ b/sdk/core/src/seekable_stream.rs @@ -1,20 +1,21 @@ use bytes::Bytes; -use futures::io::AsyncRead; -use futures::stream::Stream; -use futures::task::Poll; +use dyn_clone::DynClone; +use futures::{io::AsyncRead, stream::Stream, task::Poll}; +use std::{pin::Pin, task::Context}; + +/// Amount of the stream to buffer in memory during streaming uploads +pub(crate) const DEFAULT_BUFFER_SIZE: usize = 1024 * 64; /// Enable a type implementing `AsyncRead` to be consumed as if it were /// a `Stream` of `Bytes`. #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] -pub trait SeekableStream: - AsyncRead + Unpin + std::fmt::Debug + Send + Sync + dyn_clone::DynClone -{ +pub trait SeekableStream: AsyncRead + Unpin + std::fmt::Debug + Send + Sync + DynClone { async fn reset(&mut self) -> crate::error::Result<()>; fn len(&self) -> usize; - fn is_empty(&self) -> bool { - self.len() == 0 + fn buffer_size(&self) -> usize { + DEFAULT_BUFFER_SIZE } } @@ -23,11 +24,8 @@ dyn_clone::clone_trait_object!(SeekableStream); impl Stream for dyn SeekableStream { type Item = crate::error::Result; - fn poll_next( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let mut buffer = vec![0_u8; 1024 * 64]; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut buffer = vec![0_u8; self.buffer_size()]; match self.poll_read(cx, &mut buffer) { Poll::Ready(Ok(0)) => Poll::Ready(None),