diff --git a/vortex-io/src/read.rs b/vortex-io/src/read.rs index ed8ddecad39..3eb3afb4411 100644 --- a/vortex-io/src/read.rs +++ b/vortex-io/src/read.rs @@ -143,6 +143,8 @@ impl VortexReadAt for Arc { // } } +const DEFAULT_IN_MEMORY_CONCURRENCY: usize = 16; + impl VortexReadAt for ByteBuffer { fn size(&self) -> BoxFuture<'static, VortexResult> { let length = self.len() as u64; @@ -150,7 +152,7 @@ impl VortexReadAt for ByteBuffer { } fn concurrency(&self) -> usize { - 16 + DEFAULT_IN_MEMORY_CONCURRENCY } fn read_at( @@ -178,6 +180,56 @@ impl VortexReadAt for ByteBuffer { } } +/// An in-memory read source with a configurable concurrency limit. +#[derive(Clone, Debug)] +pub struct InMemoryReadAt { + buffer: ByteBuffer, + concurrency: usize, +} + +impl InMemoryReadAt { + pub fn new(buffer: ByteBuffer) -> Self { + Self { + buffer, + concurrency: DEFAULT_IN_MEMORY_CONCURRENCY, + } + } + + pub fn with_concurrency(mut self, concurrency: usize) -> Self { + self.concurrency = concurrency; + self + } + + pub fn into_inner(self) -> ByteBuffer { + self.buffer + } +} + +impl From for InMemoryReadAt { + fn from(buffer: ByteBuffer) -> Self { + Self::new(buffer) + } +} + +impl VortexReadAt for InMemoryReadAt { + fn size(&self) -> BoxFuture<'static, VortexResult> { + self.buffer.size() + } + + fn concurrency(&self) -> usize { + self.concurrency + } + + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + self.buffer.read_at(offset, length, alignment) + } +} + /// A wrapper that instruments a [`VortexReadAt`] with metrics. #[derive(Clone)] pub struct InstrumentedReadAt { @@ -302,6 +354,18 @@ mod tests { assert!(result.is_err()); } + #[test] + fn test_in_memory_read_at_default_concurrency() { + let read = InMemoryReadAt::new(ByteBuffer::from(vec![1, 2, 3])); + assert_eq!(read.concurrency(), 16); + } + + #[test] + fn test_in_memory_read_at_custom_concurrency() { + let read = InMemoryReadAt::new(ByteBuffer::from(vec![1, 2, 3])).with_concurrency(4); + assert_eq!(read.concurrency(), 4); + } + #[tokio::test] async fn test_arc_read_at() { let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));