Skip to content

Commit

Permalink
allow configurable in-memory buffer size for SeekableStream
Browse files Browse the repository at this point in the history
SeekableStream makes use of an in-memory buffer during reads.  Right
now, this is currently set to 64k.  This change allows the buffer size
to be configurable.
  • Loading branch information
Brian Caswell committed Sep 7, 2023
1 parent 40579f6 commit 74bb907
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions sdk/core/src/seekable_stream.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
use bytes::Bytes;
use futures::io::AsyncRead;
use futures::stream::Stream;
use futures::task::Poll;
use dyn_clone::DynClone;
use futures::{io::AsyncRead, stream::Stream, task::Poll};
use std::{pin::Pin, task::Context};

/// Amount of the stream to buffer in memory during streaming uploads
pub(crate) const DEFAULT_BUFFER_SIZE: usize = 1024 * 64;

/// Enable a type implementing `AsyncRead` to be consumed as if it were
/// a `Stream` of `Bytes`.
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
pub trait SeekableStream:
AsyncRead + Unpin + std::fmt::Debug + Send + Sync + dyn_clone::DynClone
{
pub trait SeekableStream: AsyncRead + Unpin + std::fmt::Debug + Send + Sync + DynClone {
async fn reset(&mut self) -> crate::error::Result<()>;
fn len(&self) -> usize;

fn is_empty(&self) -> bool {
self.len() == 0
fn buffer_size(&self) -> usize {
DEFAULT_BUFFER_SIZE
}
}

Expand All @@ -23,11 +24,8 @@ dyn_clone::clone_trait_object!(SeekableStream);
impl Stream for dyn SeekableStream {
type Item = crate::error::Result<Bytes>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let mut buffer = vec![0_u8; 1024 * 64];
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut buffer = vec![0_u8; self.buffer_size()];

match self.poll_read(cx, &mut buffer) {
Poll::Ready(Ok(0)) => Poll::Ready(None),
Expand Down

0 comments on commit 74bb907

Please sign in to comment.