From e63c3c9c4767ea39ab34ebfb67a9636872ab917c Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Sat, 10 Jan 2026 21:21:35 +0000 Subject: [PATCH 01/11] LayoutReader2 Signed-off-by: Nicholas Gates --- vortex-layout/src/lib.rs | 1 + vortex-layout/src/v2/mod.rs | 7 ++ vortex-layout/src/v2/optimizer.rs | 13 +++ vortex-layout/src/v2/reader.rs | 49 ++++++++++ vortex-layout/src/v2/readers/flat.rs | 112 ++++++++++++++++++++++ vortex-layout/src/v2/readers/mod.rs | 5 + vortex-layout/src/v2/readers/scalar_fn.rs | 94 ++++++++++++++++++ vortex-layout/src/v2/stream.rs | 34 +++++++ 8 files changed, 315 insertions(+) create mode 100644 vortex-layout/src/v2/mod.rs create mode 100644 vortex-layout/src/v2/optimizer.rs create mode 100644 vortex-layout/src/v2/reader.rs create mode 100644 vortex-layout/src/v2/readers/flat.rs create mode 100644 vortex-layout/src/v2/readers/mod.rs create mode 100644 vortex-layout/src/v2/readers/scalar_fn.rs create mode 100644 vortex-layout/src/v2/stream.rs diff --git a/vortex-layout/src/lib.rs b/vortex-layout/src/lib.rs index cc62398ef99..83be9af45a9 100644 --- a/vortex-layout/src/lib.rs +++ b/vortex-layout/src/lib.rs @@ -28,6 +28,7 @@ pub mod session; mod strategy; #[cfg(test)] mod test; +pub mod v2; pub mod vtable; pub type LayoutContext = VTableContext; diff --git a/vortex-layout/src/v2/mod.rs b/vortex-layout/src/v2/mod.rs new file mode 100644 index 00000000000..fd8a30068f6 --- /dev/null +++ b/vortex-layout/src/v2/mod.rs @@ -0,0 +1,7 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod optimizer; +pub mod reader; +pub mod readers; +pub mod stream; diff --git a/vortex-layout/src/v2/optimizer.rs b/vortex-layout/src/v2/optimizer.rs new file mode 100644 index 00000000000..db5567f7b13 --- /dev/null +++ b/vortex-layout/src/v2/optimizer.rs @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_error::VortexResult; + +use crate::v2::reader::LayoutReader2; +use crate::v2::reader::LayoutReader2Ref; + +impl dyn LayoutReader2 + '_ { + pub fn optimize(&self) -> VortexResult { + todo!() + } +} diff --git a/vortex-layout/src/v2/reader.rs b/vortex-layout/src/v2/reader.rs new file mode 100644 index 00000000000..1aac4ded38c --- /dev/null +++ b/vortex-layout/src/v2/reader.rs @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; +use std::sync::Arc; + +use vortex_dtype::DType; +use vortex_error::VortexResult; + +use crate::v2::stream::SendableLayoutReaderStream; + +pub type LayoutReader2Ref = Arc; + +pub trait LayoutReader2: 'static + Send + Sync { + /// Returns the number of rows in the layout. + fn row_count(&self) -> u64; + + /// Returns the [`DType`] of the layout. + fn dtype(&self) -> &DType; + + /// Returns the number of child layouts. + fn nchildren(&self) -> usize; + + /// Returns the nth child reader of the layout. + fn child(&self, idx: usize) -> &LayoutReader2Ref; + + /// Execute the layout reader for the given range of data, returning a masked array stream. + fn execute(&self, row_range: Range) -> VortexResult; + + /// Attempt to reduce the layout reader to a more simple representation. + /// + /// Returns `Ok(None)` if no optimization is possible. + fn try_reduce(&self) -> VortexResult> { + _ = self; + Ok(None) + } + + /// Attempt to perform a reduction of the parent of this layout reader. + /// + /// Returns `Ok(None)` if no reduction is possible. + fn try_reduce_parent( + &self, + parent: &LayoutReader2Ref, + child_idx: usize, + ) -> VortexResult> { + _ = (self, parent, child_idx); + Ok(None) + } +} diff --git a/vortex-layout/src/v2/readers/flat.rs b/vortex-layout/src/v2/readers/flat.rs new file mode 100644 index 00000000000..825c474b2a9 --- /dev/null +++ b/vortex-layout/src/v2/readers/flat.rs @@ -0,0 +1,112 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; + +use futures::future::BoxFuture; +use moka::future::FutureExt; +use vortex_array::ArrayRef; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_mask::Mask; + +use crate::layouts::SharedArrayFuture; +use crate::v2::reader::LayoutReader2; +use crate::v2::reader::LayoutReader2Ref; +use crate::v2::stream::LayoutReaderStream; +use crate::v2::stream::SendableLayoutReaderStream; + +pub struct FlatReader2 { + len: usize, + dtype: DType, + array_fut: SharedArrayFuture, +} + +impl LayoutReader2 for FlatReader2 { + fn row_count(&self) -> u64 { + self.len as u64 + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn nchildren(&self) -> usize { + 0 + } + + fn child(&self, _idx: usize) -> &LayoutReader2Ref { + unreachable!() + } + + fn execute(&self, row_range: Range) -> VortexResult { + // We need to share the same array future + let array_fut = self.array_fut.clone(); + + let start = usize::try_from(row_range.start).map_err(|_| { + vortex_err!("Row range start {} is too large for usize", row_range.start) + })?; + let end = usize::try_from(row_range.end) + .map_err(|_| vortex_err!("Row range end {} is too large for usize", row_range.end))?; + + if start > self.len || end > self.len || start > end { + vortex_bail!( + "Row range {:?} is out of bounds for array of length {}", + row_range, + self.len + ); + } + + Ok(Box::new(FlatLayoutReaderStream { + array_fut, + offset: start, + remaining: end - start, + })) + } +} + +struct FlatLayoutReaderStream { + array_fut: SharedArrayFuture, + offset: usize, + remaining: usize, +} + +impl LayoutReaderStream for FlatLayoutReaderStream { + fn next_chunk_len(&self) -> Option { + if self.remaining == 0 { + None + } else { + Some(self.remaining) + } + } + + fn next_chunk( + &mut self, + selection: &Mask, + ) -> VortexResult>> { + if selection.len() > self.remaining { + vortex_bail!( + "Selection mask length {} exceeds remaining rows {}", + selection.len(), + self.remaining + ); + } + + let array_fut = self.array_fut.clone(); + let offset = self.offset; + let selection = selection.clone(); + + self.offset += selection.len(); + self.remaining -= selection.len(); + + Ok(async move { + let array = array_fut.await?; + let sliced_array = array.slice(offset..offset + selection.len()); + let selected_array = sliced_array.filter(selection.clone())?; + Ok(selected_array) + } + .boxed()) + } +} diff --git a/vortex-layout/src/v2/readers/mod.rs b/vortex-layout/src/v2/readers/mod.rs new file mode 100644 index 00000000000..7239646c8dc --- /dev/null +++ b/vortex-layout/src/v2/readers/mod.rs @@ -0,0 +1,5 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +pub mod flat; +pub mod scalar_fn; diff --git a/vortex-layout/src/v2/readers/scalar_fn.rs b/vortex-layout/src/v2/readers/scalar_fn.rs new file mode 100644 index 00000000000..13bb7c2fad8 --- /dev/null +++ b/vortex-layout/src/v2/readers/scalar_fn.rs @@ -0,0 +1,94 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; + +use futures::future::BoxFuture; +use futures::future::try_join_all; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::arrays::ScalarFnArray; +use vortex_array::expr::ScalarFn; +use vortex_array::optimizer::ArrayOptimizer; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use crate::v2::reader::LayoutReader2; +use crate::v2::reader::LayoutReader2Ref; +use crate::v2::stream::LayoutReaderStream; +use crate::v2::stream::SendableLayoutReaderStream; + +/// A [`LayoutReader2] for applying a scalar function to another layout. +pub struct ScalarFnReader { + scalar_fn: ScalarFn, + dtype: DType, + row_count: u64, + children: Vec, +} + +impl LayoutReader2 for ScalarFnReader { + fn row_count(&self) -> u64 { + self.row_count + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn nchildren(&self) -> usize { + self.children.len() + } + + fn child(&self, idx: usize) -> &LayoutReader2Ref { + &self.children[idx] + } + + fn execute(&self, row_range: Range) -> VortexResult { + let input_streams = self + .children + .iter() + .map(|child| child.execute(row_range.clone())) + .collect::>>()?; + + Ok(Box::new(ScalarFnArrayStream { + scalar_fn: self.scalar_fn.clone(), + input_streams, + })) + } +} + +struct ScalarFnArrayStream { + scalar_fn: ScalarFn, + input_streams: Vec, +} + +impl LayoutReaderStream for ScalarFnArrayStream { + fn next_chunk_len(&self) -> Option { + self.input_streams + .iter() + .map(|s| s.next_chunk_len()) + .min() + .flatten() + } + + fn next_chunk( + &mut self, + selection: &Mask, + ) -> VortexResult>> { + let scalar_fn = self.scalar_fn.clone(); + let len = selection.true_count(); + let futs = self + .input_streams + .iter_mut() + .map(|s| s.next_chunk(selection)) + .collect::>>()?; + + Ok(Box::pin(async move { + let input_arrays = try_join_all(futs).await?; + let array = ScalarFnArray::try_new(scalar_fn, input_arrays, len)?.into_array(); + let array = array.optimize()?; + Ok(array) + })) + } +} diff --git a/vortex-layout/src/v2/stream.rs b/vortex-layout/src/v2/stream.rs new file mode 100644 index 00000000000..acb4105ef4a --- /dev/null +++ b/vortex-layout/src/v2/stream.rs @@ -0,0 +1,34 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use futures::future::BoxFuture; +use vortex_array::ArrayRef; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +pub type SendableLayoutReaderStream = Box; + +/// A stream of data produced by a [`LayoutReader2`](crate::v2::reader::LayoutReader2). +/// +/// Layout readers are driven by requesting chunks of data using a given selection masks. +pub trait LayoutReaderStream { + /// Returns the length in rows of the next chunk in the stream. + /// + /// Returns [`None`] if the stream has ended. + fn next_chunk_len(&self) -> Option; + + /// Returns the next chunk of data given a selection mask of the requested length. + /// + /// The length of the provided selection mask must be `<=` the size returned from + /// [`LayoutReaderStream::next_chunk_len`]. + /// + /// The length of the returned chunk must be equal to the [`Mask::true_count`] of the selection + /// mask. + /// + /// The returned future has a `'static` lifetime allowing the calling to drive the stream + /// arbitrarily far without awaiting any data. + fn next_chunk( + &mut self, + selection: &Mask, + ) -> VortexResult>>; +} From ffd220bd1884c23d0ba893daed9112a8561e96fc Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Sat, 10 Jan 2026 21:38:52 +0000 Subject: [PATCH 02/11] LayoutReader2 Signed-off-by: Nicholas Gates --- vortex-layout/src/v2/readers/chunked.rs | 161 ++++++++++++++++++++++ vortex-layout/src/v2/readers/flat.rs | 6 + vortex-layout/src/v2/readers/mod.rs | 2 + vortex-layout/src/v2/readers/scalar_fn.rs | 6 + vortex-layout/src/v2/readers/struct_.rs | 99 +++++++++++++ vortex-layout/src/v2/stream.rs | 4 + 6 files changed, 278 insertions(+) create mode 100644 vortex-layout/src/v2/readers/chunked.rs create mode 100644 vortex-layout/src/v2/readers/struct_.rs diff --git a/vortex-layout/src/v2/readers/chunked.rs b/vortex-layout/src/v2/readers/chunked.rs new file mode 100644 index 00000000000..58697e899eb --- /dev/null +++ b/vortex-layout/src/v2/readers/chunked.rs @@ -0,0 +1,161 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; + +use futures::future::BoxFuture; +use futures::future::try_join_all; +use moka::future::FutureExt; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::arrays::ChunkedArray; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_mask::Mask; + +use crate::v2::reader::LayoutReader2; +use crate::v2::reader::LayoutReader2Ref; +use crate::v2::stream::LayoutReaderStream; +use crate::v2::stream::SendableLayoutReaderStream; + +pub struct ChunkedReader2 { + row_count: u64, + dtype: DType, + chunks: Vec, +} + +impl LayoutReader2 for ChunkedReader2 { + fn row_count(&self) -> u64 { + self.row_count + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn nchildren(&self) -> usize { + self.chunks.len() + } + + fn child(&self, idx: usize) -> &LayoutReader2Ref { + &self.chunks[idx] + } + + fn execute(&self, row_range: Range) -> VortexResult { + let mut remaining_start = row_range.start; + let mut remaining_end = row_range.end; + let mut streams = Vec::new(); + + for chunk in &self.chunks { + let chunk_row_count = chunk.row_count(); + + if remaining_start >= chunk_row_count { + // This chunk is before the requested range + remaining_start -= chunk_row_count; + remaining_end -= chunk_row_count; + continue; + } + + let start_in_chunk = remaining_start; + let end_in_chunk = if remaining_end <= chunk_row_count { + remaining_end + } else { + chunk_row_count + }; + + streams.push(chunk.execute(start_in_chunk..end_in_chunk)?); + + remaining_start = 0; + if remaining_end <= chunk_row_count { + break; + } else { + remaining_end -= chunk_row_count; + } + } + + Ok(Box::new(ChunkedReaderStream { + dtype: self.dtype.clone(), + chunks: streams, + })) + } +} + +struct ChunkedReaderStream { + dtype: DType, + chunks: Vec, +} + +impl LayoutReaderStream for ChunkedReaderStream { + fn dtype(&self) -> &DType { + &self.dtype + } + + fn next_chunk_len(&self) -> Option { + self.chunks + .iter() + .map(|s| s.next_chunk_len()) + .find(|len| len.is_some()) + .flatten() + } + + fn next_chunk( + &mut self, + selection: &Mask, + ) -> VortexResult>> { + // Remove any chunks that are already exhausted + loop { + if self.chunks.is_empty() { + vortex_bail!("Early termination of chunked layout"); + } + if self.chunks[0].next_chunk_len().is_none() { + self.chunks.remove(0); + } else { + break; + } + } + + // Get the length of the next chunk + let mut next_len = self.chunks[0] + .next_chunk_len() + .ok_or_else(|| vortex_err!("Early termination of chunked layout"))?; + + if selection.len() <= next_len { + // The selection is smaller than the next chunk length, therefore we only need one chunk + return self.chunks[0].next_chunk(selection); + } + + // Otherwise, we need to gather from multiple chunks + let mut selection = selection.clone(); + let mut futs = vec![]; + while !selection.is_empty() { + if self.chunks.is_empty() { + vortex_bail!("Early termination of chunked layout"); + } + + // Slice off the right amount of selection for this chunk + let next_sel = selection.slice(..next_len); + selection = selection.slice(next_len..); + + let fut = self.chunks[0].next_chunk(&next_sel)?; + futs.push(fut); + + // Remove any chunks that are already exhausted + loop { + if self.chunks[0].next_chunk_len().is_none() { + self.chunks.remove(0); + } else { + break; + } + } + } + + let dtype = self.dtype.clone(); + Ok(async move { + let arrays = try_join_all(futs).await?; + Ok(ChunkedArray::try_new(arrays, dtype)?.into_array()) + } + .boxed()) + } +} diff --git a/vortex-layout/src/v2/readers/flat.rs b/vortex-layout/src/v2/readers/flat.rs index 825c474b2a9..95aa5920c46 100644 --- a/vortex-layout/src/v2/readers/flat.rs +++ b/vortex-layout/src/v2/readers/flat.rs @@ -60,6 +60,7 @@ impl LayoutReader2 for FlatReader2 { } Ok(Box::new(FlatLayoutReaderStream { + dtype: self.dtype.clone(), array_fut, offset: start, remaining: end - start, @@ -68,12 +69,17 @@ impl LayoutReader2 for FlatReader2 { } struct FlatLayoutReaderStream { + dtype: DType, array_fut: SharedArrayFuture, offset: usize, remaining: usize, } impl LayoutReaderStream for FlatLayoutReaderStream { + fn dtype(&self) -> &DType { + &self.dtype + } + fn next_chunk_len(&self) -> Option { if self.remaining == 0 { None diff --git a/vortex-layout/src/v2/readers/mod.rs b/vortex-layout/src/v2/readers/mod.rs index 7239646c8dc..6d020982f62 100644 --- a/vortex-layout/src/v2/readers/mod.rs +++ b/vortex-layout/src/v2/readers/mod.rs @@ -1,5 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +pub mod chunked; pub mod flat; pub mod scalar_fn; +pub mod struct_; diff --git a/vortex-layout/src/v2/readers/scalar_fn.rs b/vortex-layout/src/v2/readers/scalar_fn.rs index 13bb7c2fad8..5cf67c033eb 100644 --- a/vortex-layout/src/v2/readers/scalar_fn.rs +++ b/vortex-layout/src/v2/readers/scalar_fn.rs @@ -52,6 +52,7 @@ impl LayoutReader2 for ScalarFnReader { .collect::>>()?; Ok(Box::new(ScalarFnArrayStream { + dtype: self.dtype.clone(), scalar_fn: self.scalar_fn.clone(), input_streams, })) @@ -59,11 +60,16 @@ impl LayoutReader2 for ScalarFnReader { } struct ScalarFnArrayStream { + dtype: DType, scalar_fn: ScalarFn, input_streams: Vec, } impl LayoutReaderStream for ScalarFnArrayStream { + fn dtype(&self) -> &DType { + &self.dtype + } + fn next_chunk_len(&self) -> Option { self.input_streams .iter() diff --git a/vortex-layout/src/v2/readers/struct_.rs b/vortex-layout/src/v2/readers/struct_.rs new file mode 100644 index 00000000000..38b832337d5 --- /dev/null +++ b/vortex-layout/src/v2/readers/struct_.rs @@ -0,0 +1,99 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; + +use futures::future::BoxFuture; +use futures::future::try_join_all; +use moka::future::FutureExt; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::arrays::StructArray; +use vortex_array::validity::Validity; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use crate::v2::reader::LayoutReader2; +use crate::v2::reader::LayoutReader2Ref; +use crate::v2::stream::LayoutReaderStream; +use crate::v2::stream::SendableLayoutReaderStream; + +pub struct StructReader2 { + row_count: u64, + dtype: DType, + fields: Vec, +} + +impl LayoutReader2 for StructReader2 { + fn row_count(&self) -> u64 { + self.row_count + } + + fn dtype(&self) -> &DType { + &self.dtype + } + + fn nchildren(&self) -> usize { + self.fields.len() + } + + fn child(&self, idx: usize) -> &LayoutReader2Ref { + &self.fields[idx] + } + + fn execute(&self, row_range: Range) -> VortexResult { + let field_streams = self + .fields + .iter() + .map(|field| field.execute(row_range.clone())) + .collect::>>()?; + + Ok(Box::new(StructReaderStream { + dtype: self.dtype.clone(), + fields: field_streams, + })) + } +} + +struct StructReaderStream { + dtype: DType, + fields: Vec, +} + +impl LayoutReaderStream for StructReaderStream { + fn dtype(&self) -> &DType { + &self.dtype + } + + fn next_chunk_len(&self) -> Option { + self.fields + .iter() + .map(|s| s.next_chunk_len()) + .min() + .flatten() + } + + fn next_chunk( + &mut self, + selection: &Mask, + ) -> VortexResult>> { + let struct_fields = self.dtype.as_struct_fields().clone(); + let validity: Validity = self.dtype.nullability().into(); + let fields = self + .fields + .iter_mut() + .map(|s| s.next_chunk(selection)) + .collect::>>()?; + let len = selection.true_count(); + + Ok(async move { + let fields = try_join_all(fields).await?; + Ok( + StructArray::try_new_with_dtype(fields, struct_fields, len, validity.clone())? + .into_array(), + ) + } + .boxed()) + } +} diff --git a/vortex-layout/src/v2/stream.rs b/vortex-layout/src/v2/stream.rs index acb4105ef4a..7e57da7ba2b 100644 --- a/vortex-layout/src/v2/stream.rs +++ b/vortex-layout/src/v2/stream.rs @@ -3,6 +3,7 @@ use futures::future::BoxFuture; use vortex_array::ArrayRef; +use vortex_dtype::DType; use vortex_error::VortexResult; use vortex_mask::Mask; @@ -12,6 +13,9 @@ pub type SendableLayoutReaderStream = Box &DType; + /// Returns the length in rows of the next chunk in the stream. /// /// Returns [`None`] if the stream has ended. From 80ecccc7a1413b434fe22a7de6948387366305c0 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Sun, 11 Jan 2026 10:26:10 +0000 Subject: [PATCH 03/11] LayoutReader2 Signed-off-by: Nicholas Gates --- vortex-layout/src/layout.rs | 16 ++++++++++++++++ vortex-layout/src/segments/source.rs | 5 +++++ vortex-layout/src/v2/reader.rs | 15 +++++++++++++++ vortex-layout/src/v2/readers/struct_.rs | 1 + vortex-layout/src/vtable.rs | 12 ++++++++++++ 5 files changed, 49 insertions(+) diff --git a/vortex-layout/src/layout.rs b/vortex-layout/src/layout.rs index 368d7c6bdc8..e22bf81f1a6 100644 --- a/vortex-layout/src/layout.rs +++ b/vortex-layout/src/layout.rs @@ -25,6 +25,8 @@ use crate::display::DisplayLayoutTree; use crate::display::display_tree_with_segment_sizes; use crate::segments::SegmentId; use crate::segments::SegmentSource; +use crate::segments::SegmentSourceRef; +use crate::v2::reader::LayoutReader2Ref; pub type LayoutId = ArcRef; @@ -76,6 +78,12 @@ pub trait Layout: 'static + Send + Sync + Debug + private::Sealed { segment_source: Arc, session: &VortexSession, ) -> VortexResult; + + fn new_reader2( + &self, + segment_source: &SegmentSourceRef, + session: &VortexSession, + ) -> VortexResult; } pub trait IntoLayout { @@ -331,6 +339,14 @@ impl Layout for LayoutAdapter { ) -> VortexResult { V::new_reader(&self.0, name, segment_source, session) } + + fn new_reader2( + &self, + segment_source: &SegmentSourceRef, + session: &VortexSession, + ) -> VortexResult { + V::new_reader2(&self.0, segment_source, session) + } } mod private { diff --git a/vortex-layout/src/segments/source.rs b/vortex-layout/src/segments/source.rs index a48a79b2889..3a5b3f84b99 100644 --- a/vortex-layout/src/segments/source.rs +++ b/vortex-layout/src/segments/source.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::sync::Arc; + use futures::future::BoxFuture; use vortex_array::buffer::BufferHandle; use vortex_error::VortexResult; @@ -9,6 +11,9 @@ use crate::segments::SegmentId; /// Static future resolving to a segment byte buffer. pub type SegmentFuture = BoxFuture<'static, VortexResult>; +/// A reference-counted segment source. +pub type SegmentSourceRef = Arc; + /// A trait for providing segment data to a [`crate::LayoutReader`]. pub trait SegmentSource: 'static + Send + Sync { /// Request a segment, returning a future that will eventually resolve to the segment data. diff --git a/vortex-layout/src/v2/reader.rs b/vortex-layout/src/v2/reader.rs index 1aac4ded38c..6dad4890594 100644 --- a/vortex-layout/src/v2/reader.rs +++ b/vortex-layout/src/v2/reader.rs @@ -13,6 +13,14 @@ pub type LayoutReader2Ref = Arc; pub trait LayoutReader2: 'static + Send + Sync { /// Returns the number of rows in the layout. + /// + /// TODO(ngates): if we relaxed this to be a cardinality estimate, we could support arbitrary + /// data streams including joins, group bys, scans, etc. The problem is, invoking execute with + /// some row range becomes weird... + /// Perhaps we borrow DataFusion's style of partitioning where we ask the reader to partition + /// into `n` and then pass the partition index to execute? Or perhaps we just pass `n` to the + /// execute call and have the reader return all `n` partitions at once? That would also make + /// sharing cached resources a lot easier. fn row_count(&self) -> u64; /// Returns the [`DType`] of the layout. @@ -25,6 +33,13 @@ pub trait LayoutReader2: 'static + Send + Sync { fn child(&self, idx: usize) -> &LayoutReader2Ref; /// Execute the layout reader for the given range of data, returning a masked array stream. + /// + /// TODO(ngates): this bit feels weird to me. + /// It's odd that we don't know when a particular reader is done executing. Meaning we don't + /// have a good lifetime for cached resources. The returned reader stream _does_ have a good + /// lifetime for caching (the duration of the stream), so perhaps we just say that layout + /// readers should not hold data and instead each call to execute should make its own segment + /// requests? Assuming we can de-dupe also within a segment source, this seems reasonable. fn execute(&self, row_range: Range) -> VortexResult; /// Attempt to reduce the layout reader to a more simple representation. diff --git a/vortex-layout/src/v2/readers/struct_.rs b/vortex-layout/src/v2/readers/struct_.rs index 38b832337d5..85c17855686 100644 --- a/vortex-layout/src/v2/readers/struct_.rs +++ b/vortex-layout/src/v2/readers/struct_.rs @@ -22,6 +22,7 @@ use crate::v2::stream::SendableLayoutReaderStream; pub struct StructReader2 { row_count: u64, dtype: DType, + // TODO(ngates): we should make this lazy? fields: Vec, } diff --git a/vortex-layout/src/vtable.rs b/vortex-layout/src/vtable.rs index 8012907693d..969fc02af0b 100644 --- a/vortex-layout/src/vtable.rs +++ b/vortex-layout/src/vtable.rs @@ -24,6 +24,8 @@ use crate::LayoutRef; use crate::children::LayoutChildren; use crate::segments::SegmentId; use crate::segments::SegmentSource; +use crate::segments::SegmentSourceRef; +use crate::v2::reader::LayoutReader2Ref; pub trait VTable: 'static + Sized + Send + Sync + Debug { type Layout: 'static + Send + Sync + Clone + Debug + Deref + IntoLayout; @@ -65,6 +67,16 @@ pub trait VTable: 'static + Sized + Send + Sync + Debug { session: &VortexSession, ) -> VortexResult; + /// Create a new v2 reader for the layout. + fn new_reader2( + layout: &Self::Layout, + segment_source: &SegmentSourceRef, + session: &VortexSession, + ) -> VortexResult { + let _ = (layout, segment_source, session); + vortex_bail!("new_reader2 not implemented for this layout") + } + #[cfg(gpu_unstable)] /// Create a new reader for the layout that uses a gpu device fn new_gpu_reader( From 5d62d4d01e850c25d8187f9634b5551c3ee937e9 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 14 Jan 2026 09:58:00 +0000 Subject: [PATCH 04/11] Add flag to run benchmarks under Samply Signed-off-by: Nicholas Gates --- Cargo.lock | 1 + vortex-scan/Cargo.toml | 1 + vortex-scan/src/lib.rs | 1 + vortex-scan/src/v2/mod.rs | 5 ++ vortex-scan/src/v2/reader.rs | 53 +++++++++++++++++++ vortex-scan/src/v2/source.rs | 98 ++++++++++++++++++++++++++++++++++++ 6 files changed, 159 insertions(+) create mode 100644 vortex-scan/src/v2/mod.rs create mode 100644 vortex-scan/src/v2/reader.rs create mode 100644 vortex-scan/src/v2/source.rs diff --git a/Cargo.lock b/Cargo.lock index 47cb5e7e05a..83c4e454abb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10675,6 +10675,7 @@ version = "0.1.0" dependencies = [ "arrow-array 57.1.0", "arrow-schema 57.1.0", + "async-trait", "bit-vec 0.8.0", "futures", "itertools 0.14.0", diff --git a/vortex-scan/Cargo.toml b/vortex-scan/Cargo.toml index d78e92fa147..41961760041 100644 --- a/vortex-scan/Cargo.toml +++ b/vortex-scan/Cargo.toml @@ -29,6 +29,7 @@ vortex-mask = { workspace = true } vortex-metrics = { workspace = true } vortex-session = { workspace = true } +async-trait = { workspace = true } bit-vec = { workspace = true } futures = { workspace = true } itertools = { workspace = true } diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index 52eb0dd47bb..ed62775f7b0 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -26,5 +26,6 @@ pub mod gpu; mod repeated_scan; #[cfg(test)] mod test; +pub mod v2; pub use repeated_scan::RepeatedScan; diff --git a/vortex-scan/src/v2/mod.rs b/vortex-scan/src/v2/mod.rs new file mode 100644 index 00000000000..af92f452a33 --- /dev/null +++ b/vortex-scan/src/v2/mod.rs @@ -0,0 +1,5 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +pub mod reader; +pub mod source; diff --git a/vortex-scan/src/v2/reader.rs b/vortex-scan/src/v2/reader.rs new file mode 100644 index 00000000000..916b7379c47 --- /dev/null +++ b/vortex-scan/src/v2/reader.rs @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use async_trait::async_trait; +use vortex_array::ArrayRef; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +/// A reader provides an interface for loading data from row-indexed layouts. +/// +/// Unlike a [`super::source::Source`], readers have a concrete row count allowing fixed +/// partitions over a known set of rows. Readers are driven by providing an input stream of +/// array data that can be used to provide arguments to parameterized filter and projection +/// expressions. +#[async_trait] +pub trait Reader: 'static + Send + Sync { + /// Get the data type of the layout being read. + fn dtype(&self) -> &DType; + + /// Returns the number of rows in the reader. + fn row_count(&self) -> u64; + + /// Creates a scan where an input stream is used to drive the output data. + /// + /// TODO(ngates): should this take a RowSelection? + async fn scan( + &self, + input_dtype: &DType, + row_offset: u64, + row_mask: Mask, + ) -> VortexResult; +} + +pub type ReaderScanRef = Box; + +/// A scan over a reader, producing output arrays given an input array to parameterize the filter +/// and projection expressions. +#[async_trait] +pub trait ReaderScan { + /// The data type of the returned data. + fn dtype(&self) -> &DType; + + /// The preferred maximum row count for the next batch. + /// + /// Returns [`None`] if there are no more batches. + fn next_batch_size(&mut self) -> Option; + + /// Returns the next batch of data given an input array. + /// + /// The returned batch must have the same number of rows as the input array. + async fn next_batch(&mut self, input: ArrayRef) -> VortexResult; +} diff --git a/vortex-scan/src/v2/source.rs b/vortex-scan/src/v2/source.rs new file mode 100644 index 00000000000..bfd7d6ee36e --- /dev/null +++ b/vortex-scan/src/v2/source.rs @@ -0,0 +1,98 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use futures::stream::BoxStream; +use vortex_array::expr::Expression; +use vortex_array::stream::SendableArrayStream; +use vortex_dtype::DType; +use vortex_error::VortexResult; + +/// Create a Vortex source from serialized configuration. +/// +/// Providers can be registered with Vortex under a specific +#[async_trait(?Send)] +pub trait SourceProvider: 'static { + /// URI schemes handled by this source provider. + /// + /// TODO(ngates): this might not be the right way to plugin sources. + fn schemes(&self) -> &[&str]; + + /// Initialize a new source. + async fn init_source(&self, uri: String) -> VortexResult; + + /// Serialize a source split to bytes. + async fn serialize_split(&self, split: &dyn Split) -> VortexResult>; + + /// Deserialize a source split from bytes. + async fn deserialize_split(&self, data: &[u8]) -> VortexResult; +} + +/// A reference-counted source. +pub type SourceRef = Arc; + +/// A source represents a streamable dataset that can be scanned with projection and filter +/// expressions. Each scan produces splits that can be executed in parallel to read data. +/// Each split can be serialized for remote execution. +#[async_trait] +pub trait Source: 'static + Send + Sync { + /// Returns the dtype of the source. + fn dtype(&self) -> &DType; + + /// Returns an estimate of the row count of the source. + fn row_count_estimate(&self) -> Estimate; + + /// Returns a scan over the source. + async fn scan(&self, scan_request: ScanRequest) -> VortexResult; +} + +#[derive(Debug, Clone, Default)] +pub struct ScanRequest { + pub projection: Option, + pub filter: Option, + pub limit: Option, +} + +pub type SourceScanRef = Box; + +#[async_trait] +pub trait SourceScan: 'static + Send + Sync { + /// The returned dtype of the scan. + fn dtype(&self) -> &DType; + + /// An estimate of the remaining splits. + fn remaining_splits_estimate(&self) -> Estimate; + + /// Returns the next batch of splits to be processed. + /// + /// This should not return _more_ than the max_batch_size splits, but may return fewer. + async fn next_splits(&mut self, max_splits: usize) -> VortexResult>; +} + +pub type SplitStream = BoxStream<'static, VortexResult>; +pub type SplitRef = Arc; + +pub trait Split: 'static + Send + Sync { + /// Downcast the split to a concrete type. + fn as_any(&self) -> &dyn Any; + + /// Executes the split. + fn execute(&self) -> VortexResult; + + /// Returns an estimate of the row count for this split. + fn row_count_estimate(&self) -> Estimate; + + /// Returns an estimate of the byte size for this split. + fn byte_size_estimate(&self) -> Estimate; +} + +#[derive(Default)] +pub enum Estimate { + Exact(T), + UpperBound(T), + #[default] + Unknown, +} From f7f83289770879965eebb9eaa2b94484982fcac2 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 14 Jan 2026 11:39:01 +0000 Subject: [PATCH 05/11] Add flag to run benchmarks under Samply Signed-off-by: Nicholas Gates --- vortex-scan/src/v2/reader.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/vortex-scan/src/v2/reader.rs b/vortex-scan/src/v2/reader.rs index 916b7379c47..6292933645e 100644 --- a/vortex-scan/src/v2/reader.rs +++ b/vortex-scan/src/v2/reader.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::ops::Range; + use async_trait::async_trait; use vortex_array::ArrayRef; use vortex_dtype::DType; @@ -21,15 +23,8 @@ pub trait Reader: 'static + Send + Sync { /// Returns the number of rows in the reader. fn row_count(&self) -> u64; - /// Creates a scan where an input stream is used to drive the output data. - /// - /// TODO(ngates): should this take a RowSelection? - async fn scan( - &self, - input_dtype: &DType, - row_offset: u64, - row_mask: Mask, - ) -> VortexResult; + /// Creates a scan over the given row range of the reader. + async fn scan(&self, row_range: Range) -> VortexResult; } pub type ReaderScanRef = Box; @@ -48,6 +43,7 @@ pub trait ReaderScan { /// Returns the next batch of data given an input array. /// - /// The returned batch must have the same number of rows as the input array. - async fn next_batch(&mut self, input: ArrayRef) -> VortexResult; + /// The returned batch must have the same number of rows as the [`Mask::true_count`]. + /// The provided mask will have at most [`next_batch_size`] rows. + async fn next_batch(&mut self, mask: Mask) -> VortexResult; } From 64fbb5db19285955df23d459e09f2bf30e92f525 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 14 Jan 2026 14:19:55 +0000 Subject: [PATCH 06/11] Scan API Signed-off-by: Nicholas Gates --- vortex-layout/src/layout.rs | 6 +- vortex-layout/src/v2/optimizer.rs | 8 +- vortex-layout/src/v2/reader.rs | 86 ++++++++++---------- vortex-layout/src/v2/readers/chunked.rs | 10 +-- vortex-layout/src/v2/readers/flat.rs | 8 +- vortex-layout/src/v2/readers/scalar_fn.rs | 12 +-- vortex-layout/src/v2/readers/struct_.rs | 10 +-- vortex-layout/src/v2/stream.rs | 2 +- vortex-layout/src/vtable.rs | 4 +- vortex-scan/src/lib.rs | 1 - vortex-scan/src/v2/mod.rs | 5 -- vortex-scan/src/v2/reader.rs | 49 ------------ vortex-scan/src/v2/source.rs | 98 ----------------------- 13 files changed, 74 insertions(+), 225 deletions(-) delete mode 100644 vortex-scan/src/v2/mod.rs delete mode 100644 vortex-scan/src/v2/reader.rs delete mode 100644 vortex-scan/src/v2/source.rs diff --git a/vortex-layout/src/layout.rs b/vortex-layout/src/layout.rs index e22bf81f1a6..79f5bd26c85 100644 --- a/vortex-layout/src/layout.rs +++ b/vortex-layout/src/layout.rs @@ -26,7 +26,7 @@ use crate::display::display_tree_with_segment_sizes; use crate::segments::SegmentId; use crate::segments::SegmentSource; use crate::segments::SegmentSourceRef; -use crate::v2::reader::LayoutReader2Ref; +use crate::v2::reader::ReaderRef; pub type LayoutId = ArcRef; @@ -83,7 +83,7 @@ pub trait Layout: 'static + Send + Sync + Debug + private::Sealed { &self, segment_source: &SegmentSourceRef, session: &VortexSession, - ) -> VortexResult; + ) -> VortexResult; } pub trait IntoLayout { @@ -344,7 +344,7 @@ impl Layout for LayoutAdapter { &self, segment_source: &SegmentSourceRef, session: &VortexSession, - ) -> VortexResult { + ) -> VortexResult { V::new_reader2(&self.0, segment_source, session) } } diff --git a/vortex-layout/src/v2/optimizer.rs b/vortex-layout/src/v2/optimizer.rs index db5567f7b13..5c2686654e6 100644 --- a/vortex-layout/src/v2/optimizer.rs +++ b/vortex-layout/src/v2/optimizer.rs @@ -3,11 +3,11 @@ use vortex_error::VortexResult; -use crate::v2::reader::LayoutReader2; -use crate::v2::reader::LayoutReader2Ref; +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; -impl dyn LayoutReader2 + '_ { - pub fn optimize(&self) -> VortexResult { +impl dyn Reader + '_ { + pub fn optimize(&self) -> VortexResult { todo!() } } diff --git a/vortex-layout/src/v2/reader.rs b/vortex-layout/src/v2/reader.rs index 6dad4890594..e8b56ceedb2 100644 --- a/vortex-layout/src/v2/reader.rs +++ b/vortex-layout/src/v2/reader.rs @@ -4,61 +4,63 @@ use std::ops::Range; use std::sync::Arc; +use async_trait::async_trait; +use vortex_array::ArrayRef; use vortex_dtype::DType; use vortex_error::VortexResult; +use vortex_mask::Mask; -use crate::v2::stream::SendableLayoutReaderStream; +pub type ReaderRef = Arc; -pub type LayoutReader2Ref = Arc; - -pub trait LayoutReader2: 'static + Send + Sync { - /// Returns the number of rows in the layout. - /// - /// TODO(ngates): if we relaxed this to be a cardinality estimate, we could support arbitrary - /// data streams including joins, group bys, scans, etc. The problem is, invoking execute with - /// some row range becomes weird... - /// Perhaps we borrow DataFusion's style of partitioning where we ask the reader to partition - /// into `n` and then pass the partition index to execute? Or perhaps we just pass `n` to the - /// execute call and have the reader return all `n` partitions at once? That would also make - /// sharing cached resources a lot easier. - fn row_count(&self) -> u64; - - /// Returns the [`DType`] of the layout. +/// A reader provides an interface for loading data from row-indexed layouts. +/// +/// Unlike a [`super::source::DataSource`], readers have a concrete row count allowing fixed +/// partitions over a known set of rows. Readers are driven by providing an input stream of +/// array data that can be used to provide arguments to parameterized filter and projection +/// expressions. +pub trait Reader: 'static + Send + Sync { + /// Get the data type of the layout being read. fn dtype(&self) -> &DType; - /// Returns the number of child layouts. - fn nchildren(&self) -> usize; - - /// Returns the nth child reader of the layout. - fn child(&self, idx: usize) -> &LayoutReader2Ref; - - /// Execute the layout reader for the given range of data, returning a masked array stream. - /// - /// TODO(ngates): this bit feels weird to me. - /// It's odd that we don't know when a particular reader is done executing. Meaning we don't - /// have a good lifetime for cached resources. The returned reader stream _does_ have a good - /// lifetime for caching (the duration of the stream), so perhaps we just say that layout - /// readers should not hold data and instead each call to execute should make its own segment - /// requests? Assuming we can de-dupe also within a segment source, this seems reasonable. - fn execute(&self, row_range: Range) -> VortexResult; + /// Returns the number of rows in the reader. + fn row_count(&self) -> u64; - /// Attempt to reduce the layout reader to a more simple representation. - /// - /// Returns `Ok(None)` if no optimization is possible. - fn try_reduce(&self) -> VortexResult> { - _ = self; + /// Reduces the reader, simplifying its internal structure if possible. + fn try_reduce(&self) -> VortexResult> { Ok(None) } - /// Attempt to perform a reduction of the parent of this layout reader. - /// - /// Returns `Ok(None)` if no reduction is possible. + /// Reduce the parent reader if possible, returning a new reader if successful. fn try_reduce_parent( &self, - parent: &LayoutReader2Ref, + parent: &ReaderRef, child_idx: usize, - ) -> VortexResult> { - _ = (self, parent, child_idx); + ) -> VortexResult> { + let _ = (parent, child_idx); Ok(None) } + + /// Creates a scan over the given row range of the reader. + fn scan(&self, row_range: Range) -> VortexResult; +} + +pub type ReaderScanRef = Box; + +/// A scan over a reader, producing output arrays given an input array to parameterize the filter +/// and projection expressions. +#[async_trait] +pub trait ReaderScan { + /// The data type of the returned data. + fn dtype(&self) -> &DType; + + /// The preferred maximum row count for the next batch. + /// + /// Returns [`None`] if there are no more batches. + fn next_batch_size(&mut self) -> Option; + + /// Returns the next batch of data given an input array. + /// + /// The returned batch must have the same number of rows as the [`Mask::true_count`]. + /// The provided mask will have at most [`next_batch_size`] rows. + async fn next_batch(&mut self, mask: Mask) -> VortexResult; } diff --git a/vortex-layout/src/v2/readers/chunked.rs b/vortex-layout/src/v2/readers/chunked.rs index 58697e899eb..0350a3f38fd 100644 --- a/vortex-layout/src/v2/readers/chunked.rs +++ b/vortex-layout/src/v2/readers/chunked.rs @@ -15,18 +15,18 @@ use vortex_error::vortex_bail; use vortex_error::vortex_err; use vortex_mask::Mask; -use crate::v2::reader::LayoutReader2; -use crate::v2::reader::LayoutReader2Ref; +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; use crate::v2::stream::LayoutReaderStream; use crate::v2::stream::SendableLayoutReaderStream; pub struct ChunkedReader2 { row_count: u64, dtype: DType, - chunks: Vec, + chunks: Vec, } -impl LayoutReader2 for ChunkedReader2 { +impl Reader for ChunkedReader2 { fn row_count(&self) -> u64 { self.row_count } @@ -39,7 +39,7 @@ impl LayoutReader2 for ChunkedReader2 { self.chunks.len() } - fn child(&self, idx: usize) -> &LayoutReader2Ref { + fn child(&self, idx: usize) -> &ReaderRef { &self.chunks[idx] } diff --git a/vortex-layout/src/v2/readers/flat.rs b/vortex-layout/src/v2/readers/flat.rs index 95aa5920c46..57ce3baeadb 100644 --- a/vortex-layout/src/v2/readers/flat.rs +++ b/vortex-layout/src/v2/readers/flat.rs @@ -13,8 +13,8 @@ use vortex_error::vortex_err; use vortex_mask::Mask; use crate::layouts::SharedArrayFuture; -use crate::v2::reader::LayoutReader2; -use crate::v2::reader::LayoutReader2Ref; +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; use crate::v2::stream::LayoutReaderStream; use crate::v2::stream::SendableLayoutReaderStream; @@ -24,7 +24,7 @@ pub struct FlatReader2 { array_fut: SharedArrayFuture, } -impl LayoutReader2 for FlatReader2 { +impl Reader for FlatReader2 { fn row_count(&self) -> u64 { self.len as u64 } @@ -37,7 +37,7 @@ impl LayoutReader2 for FlatReader2 { 0 } - fn child(&self, _idx: usize) -> &LayoutReader2Ref { + fn child(&self, _idx: usize) -> &ReaderRef { unreachable!() } diff --git a/vortex-layout/src/v2/readers/scalar_fn.rs b/vortex-layout/src/v2/readers/scalar_fn.rs index 5cf67c033eb..3bf8e679924 100644 --- a/vortex-layout/src/v2/readers/scalar_fn.rs +++ b/vortex-layout/src/v2/readers/scalar_fn.rs @@ -14,20 +14,20 @@ use vortex_dtype::DType; use vortex_error::VortexResult; use vortex_mask::Mask; -use crate::v2::reader::LayoutReader2; -use crate::v2::reader::LayoutReader2Ref; +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; use crate::v2::stream::LayoutReaderStream; use crate::v2::stream::SendableLayoutReaderStream; -/// A [`LayoutReader2] for applying a scalar function to another layout. +/// A [`Reader] for applying a scalar function to another layout. pub struct ScalarFnReader { scalar_fn: ScalarFn, dtype: DType, row_count: u64, - children: Vec, + children: Vec, } -impl LayoutReader2 for ScalarFnReader { +impl Reader for ScalarFnReader { fn row_count(&self) -> u64 { self.row_count } @@ -40,7 +40,7 @@ impl LayoutReader2 for ScalarFnReader { self.children.len() } - fn child(&self, idx: usize) -> &LayoutReader2Ref { + fn child(&self, idx: usize) -> &ReaderRef { &self.children[idx] } diff --git a/vortex-layout/src/v2/readers/struct_.rs b/vortex-layout/src/v2/readers/struct_.rs index 85c17855686..bd1822fcbfa 100644 --- a/vortex-layout/src/v2/readers/struct_.rs +++ b/vortex-layout/src/v2/readers/struct_.rs @@ -14,8 +14,8 @@ use vortex_dtype::DType; use vortex_error::VortexResult; use vortex_mask::Mask; -use crate::v2::reader::LayoutReader2; -use crate::v2::reader::LayoutReader2Ref; +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; use crate::v2::stream::LayoutReaderStream; use crate::v2::stream::SendableLayoutReaderStream; @@ -23,10 +23,10 @@ pub struct StructReader2 { row_count: u64, dtype: DType, // TODO(ngates): we should make this lazy? - fields: Vec, + fields: Vec, } -impl LayoutReader2 for StructReader2 { +impl Reader for StructReader2 { fn row_count(&self) -> u64 { self.row_count } @@ -39,7 +39,7 @@ impl LayoutReader2 for StructReader2 { self.fields.len() } - fn child(&self, idx: usize) -> &LayoutReader2Ref { + fn child(&self, idx: usize) -> &ReaderRef { &self.fields[idx] } diff --git a/vortex-layout/src/v2/stream.rs b/vortex-layout/src/v2/stream.rs index 7e57da7ba2b..b2cbf6c7cfa 100644 --- a/vortex-layout/src/v2/stream.rs +++ b/vortex-layout/src/v2/stream.rs @@ -9,7 +9,7 @@ use vortex_mask::Mask; pub type SendableLayoutReaderStream = Box; -/// A stream of data produced by a [`LayoutReader2`](crate::v2::reader::LayoutReader2). +/// A stream of data produced by a [`Reader`](crate::v2::reader::Reader). /// /// Layout readers are driven by requesting chunks of data using a given selection masks. pub trait LayoutReaderStream { diff --git a/vortex-layout/src/vtable.rs b/vortex-layout/src/vtable.rs index 969fc02af0b..a1aff45518c 100644 --- a/vortex-layout/src/vtable.rs +++ b/vortex-layout/src/vtable.rs @@ -25,7 +25,7 @@ use crate::children::LayoutChildren; use crate::segments::SegmentId; use crate::segments::SegmentSource; use crate::segments::SegmentSourceRef; -use crate::v2::reader::LayoutReader2Ref; +use crate::v2::reader::ReaderRef; pub trait VTable: 'static + Sized + Send + Sync + Debug { type Layout: 'static + Send + Sync + Clone + Debug + Deref + IntoLayout; @@ -72,7 +72,7 @@ pub trait VTable: 'static + Sized + Send + Sync + Debug { layout: &Self::Layout, segment_source: &SegmentSourceRef, session: &VortexSession, - ) -> VortexResult { + ) -> VortexResult { let _ = (layout, segment_source, session); vortex_bail!("new_reader2 not implemented for this layout") } diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index ed62775f7b0..52eb0dd47bb 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -26,6 +26,5 @@ pub mod gpu; mod repeated_scan; #[cfg(test)] mod test; -pub mod v2; pub use repeated_scan::RepeatedScan; diff --git a/vortex-scan/src/v2/mod.rs b/vortex-scan/src/v2/mod.rs deleted file mode 100644 index af92f452a33..00000000000 --- a/vortex-scan/src/v2/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -pub mod reader; -pub mod source; diff --git a/vortex-scan/src/v2/reader.rs b/vortex-scan/src/v2/reader.rs deleted file mode 100644 index 6292933645e..00000000000 --- a/vortex-scan/src/v2/reader.rs +++ /dev/null @@ -1,49 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::ops::Range; - -use async_trait::async_trait; -use vortex_array::ArrayRef; -use vortex_dtype::DType; -use vortex_error::VortexResult; -use vortex_mask::Mask; - -/// A reader provides an interface for loading data from row-indexed layouts. -/// -/// Unlike a [`super::source::Source`], readers have a concrete row count allowing fixed -/// partitions over a known set of rows. Readers are driven by providing an input stream of -/// array data that can be used to provide arguments to parameterized filter and projection -/// expressions. -#[async_trait] -pub trait Reader: 'static + Send + Sync { - /// Get the data type of the layout being read. - fn dtype(&self) -> &DType; - - /// Returns the number of rows in the reader. - fn row_count(&self) -> u64; - - /// Creates a scan over the given row range of the reader. - async fn scan(&self, row_range: Range) -> VortexResult; -} - -pub type ReaderScanRef = Box; - -/// A scan over a reader, producing output arrays given an input array to parameterize the filter -/// and projection expressions. -#[async_trait] -pub trait ReaderScan { - /// The data type of the returned data. - fn dtype(&self) -> &DType; - - /// The preferred maximum row count for the next batch. - /// - /// Returns [`None`] if there are no more batches. - fn next_batch_size(&mut self) -> Option; - - /// Returns the next batch of data given an input array. - /// - /// The returned batch must have the same number of rows as the [`Mask::true_count`]. - /// The provided mask will have at most [`next_batch_size`] rows. - async fn next_batch(&mut self, mask: Mask) -> VortexResult; -} diff --git a/vortex-scan/src/v2/source.rs b/vortex-scan/src/v2/source.rs deleted file mode 100644 index bfd7d6ee36e..00000000000 --- a/vortex-scan/src/v2/source.rs +++ /dev/null @@ -1,98 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::any::Any; -use std::sync::Arc; - -use async_trait::async_trait; -use futures::stream::BoxStream; -use vortex_array::expr::Expression; -use vortex_array::stream::SendableArrayStream; -use vortex_dtype::DType; -use vortex_error::VortexResult; - -/// Create a Vortex source from serialized configuration. -/// -/// Providers can be registered with Vortex under a specific -#[async_trait(?Send)] -pub trait SourceProvider: 'static { - /// URI schemes handled by this source provider. - /// - /// TODO(ngates): this might not be the right way to plugin sources. - fn schemes(&self) -> &[&str]; - - /// Initialize a new source. - async fn init_source(&self, uri: String) -> VortexResult; - - /// Serialize a source split to bytes. - async fn serialize_split(&self, split: &dyn Split) -> VortexResult>; - - /// Deserialize a source split from bytes. - async fn deserialize_split(&self, data: &[u8]) -> VortexResult; -} - -/// A reference-counted source. -pub type SourceRef = Arc; - -/// A source represents a streamable dataset that can be scanned with projection and filter -/// expressions. Each scan produces splits that can be executed in parallel to read data. -/// Each split can be serialized for remote execution. -#[async_trait] -pub trait Source: 'static + Send + Sync { - /// Returns the dtype of the source. - fn dtype(&self) -> &DType; - - /// Returns an estimate of the row count of the source. - fn row_count_estimate(&self) -> Estimate; - - /// Returns a scan over the source. - async fn scan(&self, scan_request: ScanRequest) -> VortexResult; -} - -#[derive(Debug, Clone, Default)] -pub struct ScanRequest { - pub projection: Option, - pub filter: Option, - pub limit: Option, -} - -pub type SourceScanRef = Box; - -#[async_trait] -pub trait SourceScan: 'static + Send + Sync { - /// The returned dtype of the scan. - fn dtype(&self) -> &DType; - - /// An estimate of the remaining splits. - fn remaining_splits_estimate(&self) -> Estimate; - - /// Returns the next batch of splits to be processed. - /// - /// This should not return _more_ than the max_batch_size splits, but may return fewer. - async fn next_splits(&mut self, max_splits: usize) -> VortexResult>; -} - -pub type SplitStream = BoxStream<'static, VortexResult>; -pub type SplitRef = Arc; - -pub trait Split: 'static + Send + Sync { - /// Downcast the split to a concrete type. - fn as_any(&self) -> &dyn Any; - - /// Executes the split. - fn execute(&self) -> VortexResult; - - /// Returns an estimate of the row count for this split. - fn row_count_estimate(&self) -> Estimate; - - /// Returns an estimate of the byte size for this split. - fn byte_size_estimate(&self) -> Estimate; -} - -#[derive(Default)] -pub enum Estimate { - Exact(T), - UpperBound(T), - #[default] - Unknown, -} From 2b5cb33a740b9a0deebe00a654831b8a220ec71a Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 14 Jan 2026 14:29:21 +0000 Subject: [PATCH 07/11] Scan API Signed-off-by: Nicholas Gates --- vortex-layout/src/v2/mod.rs | 1 - vortex-layout/src/v2/reader.rs | 31 +++++++++--------- vortex-layout/src/v2/readers/chunked.rs | 22 +++++-------- vortex-layout/src/v2/readers/flat.rs | 21 ++++--------- vortex-layout/src/v2/readers/scalar_fn.rs | 22 +++++-------- vortex-layout/src/v2/readers/struct_.rs | 22 +++++-------- vortex-layout/src/v2/stream.rs | 38 ----------------------- 7 files changed, 44 insertions(+), 113 deletions(-) delete mode 100644 vortex-layout/src/v2/stream.rs diff --git a/vortex-layout/src/v2/mod.rs b/vortex-layout/src/v2/mod.rs index fd8a30068f6..48e8c801039 100644 --- a/vortex-layout/src/v2/mod.rs +++ b/vortex-layout/src/v2/mod.rs @@ -4,4 +4,3 @@ mod optimizer; pub mod reader; pub mod readers; -pub mod stream; diff --git a/vortex-layout/src/v2/reader.rs b/vortex-layout/src/v2/reader.rs index e8b56ceedb2..6b6585a31f3 100644 --- a/vortex-layout/src/v2/reader.rs +++ b/vortex-layout/src/v2/reader.rs @@ -4,7 +4,7 @@ use std::ops::Range; use std::sync::Arc; -use async_trait::async_trait; +use futures::future::BoxFuture; use vortex_array::ArrayRef; use vortex_dtype::DType; use vortex_error::VortexResult; @@ -41,26 +41,29 @@ pub trait Reader: 'static + Send + Sync { } /// Creates a scan over the given row range of the reader. - fn scan(&self, row_range: Range) -> VortexResult; + fn execute(&self, row_range: Range) -> VortexResult; } -pub type ReaderScanRef = Box; +pub type ReaderStreamRef = Box; -/// A scan over a reader, producing output arrays given an input array to parameterize the filter -/// and projection expressions. -#[async_trait] -pub trait ReaderScan { +pub trait ReaderStream: 'static + Send + Sync { /// The data type of the returned data. fn dtype(&self) -> &DType; - /// The preferred maximum row count for the next batch. + /// The preferred maximum row count for the next chunk. /// - /// Returns [`None`] if there are no more batches. - fn next_batch_size(&mut self) -> Option; + /// Returns [`None`] if there are no more chunks. + fn next_chunk_len(&self) -> Option; - /// Returns the next batch of data given an input array. + /// Returns the next chunk of data given an input array. /// - /// The returned batch must have the same number of rows as the [`Mask::true_count`]. - /// The provided mask will have at most [`next_batch_size`] rows. - async fn next_batch(&mut self, mask: Mask) -> VortexResult; + /// The returned chunk must have the same number of rows as the [`Mask::true_count`]. + /// The provided mask will have at most [`next_chunk_len`] rows. + /// + /// The returned future has a `'static` lifetime allowing the calling to drive the stream + /// arbitrarily far without awaiting any data. + fn next_chunk( + &mut self, + mask: &Mask, + ) -> VortexResult>>; } diff --git a/vortex-layout/src/v2/readers/chunked.rs b/vortex-layout/src/v2/readers/chunked.rs index 0350a3f38fd..e4e5ca7088d 100644 --- a/vortex-layout/src/v2/readers/chunked.rs +++ b/vortex-layout/src/v2/readers/chunked.rs @@ -17,8 +17,8 @@ use vortex_mask::Mask; use crate::v2::reader::Reader; use crate::v2::reader::ReaderRef; -use crate::v2::stream::LayoutReaderStream; -use crate::v2::stream::SendableLayoutReaderStream; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; pub struct ChunkedReader2 { row_count: u64, @@ -27,23 +27,15 @@ pub struct ChunkedReader2 { } impl Reader for ChunkedReader2 { - fn row_count(&self) -> u64 { - self.row_count - } - fn dtype(&self) -> &DType { &self.dtype } - fn nchildren(&self) -> usize { - self.chunks.len() - } - - fn child(&self, idx: usize) -> &ReaderRef { - &self.chunks[idx] + fn row_count(&self) -> u64 { + self.row_count } - fn execute(&self, row_range: Range) -> VortexResult { + fn execute(&self, row_range: Range) -> VortexResult { let mut remaining_start = row_range.start; let mut remaining_end = row_range.end; let mut streams = Vec::new(); @@ -84,10 +76,10 @@ impl Reader for ChunkedReader2 { struct ChunkedReaderStream { dtype: DType, - chunks: Vec, + chunks: Vec, } -impl LayoutReaderStream for ChunkedReaderStream { +impl ReaderStream for ChunkedReaderStream { fn dtype(&self) -> &DType { &self.dtype } diff --git a/vortex-layout/src/v2/readers/flat.rs b/vortex-layout/src/v2/readers/flat.rs index 57ce3baeadb..b4f0bf0c3e7 100644 --- a/vortex-layout/src/v2/readers/flat.rs +++ b/vortex-layout/src/v2/readers/flat.rs @@ -14,9 +14,8 @@ use vortex_mask::Mask; use crate::layouts::SharedArrayFuture; use crate::v2::reader::Reader; -use crate::v2::reader::ReaderRef; -use crate::v2::stream::LayoutReaderStream; -use crate::v2::stream::SendableLayoutReaderStream; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; pub struct FlatReader2 { len: usize, @@ -25,23 +24,15 @@ pub struct FlatReader2 { } impl Reader for FlatReader2 { - fn row_count(&self) -> u64 { - self.len as u64 - } - fn dtype(&self) -> &DType { &self.dtype } - fn nchildren(&self) -> usize { - 0 - } - - fn child(&self, _idx: usize) -> &ReaderRef { - unreachable!() + fn row_count(&self) -> u64 { + self.len as u64 } - fn execute(&self, row_range: Range) -> VortexResult { + fn execute(&self, row_range: Range) -> VortexResult { // We need to share the same array future let array_fut = self.array_fut.clone(); @@ -75,7 +66,7 @@ struct FlatLayoutReaderStream { remaining: usize, } -impl LayoutReaderStream for FlatLayoutReaderStream { +impl ReaderStream for FlatLayoutReaderStream { fn dtype(&self) -> &DType { &self.dtype } diff --git a/vortex-layout/src/v2/readers/scalar_fn.rs b/vortex-layout/src/v2/readers/scalar_fn.rs index 3bf8e679924..009cfea7cf9 100644 --- a/vortex-layout/src/v2/readers/scalar_fn.rs +++ b/vortex-layout/src/v2/readers/scalar_fn.rs @@ -16,8 +16,8 @@ use vortex_mask::Mask; use crate::v2::reader::Reader; use crate::v2::reader::ReaderRef; -use crate::v2::stream::LayoutReaderStream; -use crate::v2::stream::SendableLayoutReaderStream; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; /// A [`Reader] for applying a scalar function to another layout. pub struct ScalarFnReader { @@ -28,23 +28,15 @@ pub struct ScalarFnReader { } impl Reader for ScalarFnReader { - fn row_count(&self) -> u64 { - self.row_count - } - fn dtype(&self) -> &DType { &self.dtype } - fn nchildren(&self) -> usize { - self.children.len() - } - - fn child(&self, idx: usize) -> &ReaderRef { - &self.children[idx] + fn row_count(&self) -> u64 { + self.row_count } - fn execute(&self, row_range: Range) -> VortexResult { + fn execute(&self, row_range: Range) -> VortexResult { let input_streams = self .children .iter() @@ -62,10 +54,10 @@ impl Reader for ScalarFnReader { struct ScalarFnArrayStream { dtype: DType, scalar_fn: ScalarFn, - input_streams: Vec, + input_streams: Vec, } -impl LayoutReaderStream for ScalarFnArrayStream { +impl ReaderStream for ScalarFnArrayStream { fn dtype(&self) -> &DType { &self.dtype } diff --git a/vortex-layout/src/v2/readers/struct_.rs b/vortex-layout/src/v2/readers/struct_.rs index bd1822fcbfa..97ddafa13e8 100644 --- a/vortex-layout/src/v2/readers/struct_.rs +++ b/vortex-layout/src/v2/readers/struct_.rs @@ -16,8 +16,8 @@ use vortex_mask::Mask; use crate::v2::reader::Reader; use crate::v2::reader::ReaderRef; -use crate::v2::stream::LayoutReaderStream; -use crate::v2::stream::SendableLayoutReaderStream; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; pub struct StructReader2 { row_count: u64, @@ -27,23 +27,15 @@ pub struct StructReader2 { } impl Reader for StructReader2 { - fn row_count(&self) -> u64 { - self.row_count - } - fn dtype(&self) -> &DType { &self.dtype } - fn nchildren(&self) -> usize { - self.fields.len() - } - - fn child(&self, idx: usize) -> &ReaderRef { - &self.fields[idx] + fn row_count(&self) -> u64 { + self.row_count } - fn execute(&self, row_range: Range) -> VortexResult { + fn execute(&self, row_range: Range) -> VortexResult { let field_streams = self .fields .iter() @@ -59,10 +51,10 @@ impl Reader for StructReader2 { struct StructReaderStream { dtype: DType, - fields: Vec, + fields: Vec, } -impl LayoutReaderStream for StructReaderStream { +impl ReaderStream for StructReaderStream { fn dtype(&self) -> &DType { &self.dtype } diff --git a/vortex-layout/src/v2/stream.rs b/vortex-layout/src/v2/stream.rs deleted file mode 100644 index b2cbf6c7cfa..00000000000 --- a/vortex-layout/src/v2/stream.rs +++ /dev/null @@ -1,38 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use futures::future::BoxFuture; -use vortex_array::ArrayRef; -use vortex_dtype::DType; -use vortex_error::VortexResult; -use vortex_mask::Mask; - -pub type SendableLayoutReaderStream = Box; - -/// A stream of data produced by a [`Reader`](crate::v2::reader::Reader). -/// -/// Layout readers are driven by requesting chunks of data using a given selection masks. -pub trait LayoutReaderStream { - /// Returns the [`DType`] of the data produced by the stream. - fn dtype(&self) -> &DType; - - /// Returns the length in rows of the next chunk in the stream. - /// - /// Returns [`None`] if the stream has ended. - fn next_chunk_len(&self) -> Option; - - /// Returns the next chunk of data given a selection mask of the requested length. - /// - /// The length of the provided selection mask must be `<=` the size returned from - /// [`LayoutReaderStream::next_chunk_len`]. - /// - /// The length of the returned chunk must be equal to the [`Mask::true_count`] of the selection - /// mask. - /// - /// The returned future has a `'static` lifetime allowing the calling to drive the stream - /// arbitrarily far without awaiting any data. - fn next_chunk( - &mut self, - selection: &Mask, - ) -> VortexResult>>; -} From 479cb8caaabe1f29f68b379fb33a887c6ea4dfd9 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 14 Jan 2026 15:29:03 +0000 Subject: [PATCH 08/11] Scan API Signed-off-by: Nicholas Gates --- vortex-file/src/file.rs | 9 ++++ vortex-file/src/tests.rs | 68 ++++++++++++------------- vortex-layout/src/v2/readers/chunked.rs | 7 ++- vortex-scan/src/lib.rs | 1 + vortex-scan/src/v2/mod.rs | 4 ++ vortex-scan/src/v2/scan.rs | 68 +++++++++++++++++++++++++ 6 files changed, 121 insertions(+), 36 deletions(-) create mode 100644 vortex-scan/src/v2/mod.rs create mode 100644 vortex-scan/src/v2/scan.rs diff --git a/vortex-file/src/file.rs b/vortex-file/src/file.rs index b2161f042c7..10a30cdf0f2 100644 --- a/vortex-file/src/file.rs +++ b/vortex-file/src/file.rs @@ -27,6 +27,7 @@ use vortex_layout::segments::SegmentSource; use vortex_metrics::VortexMetrics; use vortex_scan::ScanBuilder; use vortex_scan::SplitBy; +use vortex_scan::v2::scan::ScanBuilder2; use vortex_session::VortexSession; use vortex_utils::aliases::hash_map::HashMap; @@ -103,6 +104,14 @@ impl VortexFile { ) } + pub fn scan2(&self) -> VortexResult { + let reader_ref = self + .footer + .layout() + .new_reader2(&self.segment_source, &self.session)?; + Ok(ScanBuilder2::new(reader_ref, self.session.clone())) + } + #[cfg(gpu_unstable)] pub fn gpu_scan( &self, diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index 2bcc0b73739..47020ac32f9 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -61,7 +61,7 @@ use vortex_io::session::RuntimeSession; use vortex_layout::session::LayoutSession; use vortex_metrics::VortexMetrics; use vortex_scalar::Scalar; -use vortex_scan::ScanBuilder; +use vortex_scan::v2::scan::ScanBuilder2; use vortex_session::VortexSession; use crate::OpenOptionsSessionExt; @@ -118,7 +118,7 @@ async fn test_read_simple() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap(); @@ -198,7 +198,7 @@ async fn test_round_trip_many_types() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap() @@ -284,7 +284,7 @@ async fn test_read_projection() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let array = file - .scan() + .scan2() .unwrap() .with_projection(select(["strings"], root())) .into_array_stream() @@ -306,7 +306,7 @@ async fn test_read_projection() { assert_arrays_eq!(actual.as_ref(), expected.as_ref()); let array = file - .scan() + .scan2() .unwrap() .with_projection(select(["numbers"], root())) .into_array_stream() @@ -355,7 +355,7 @@ async fn unequal_batches() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap(); @@ -415,7 +415,7 @@ async fn write_chunked() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap(); @@ -445,7 +445,7 @@ async fn test_empty_varbin_array_roundtrip() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let result = file - .scan() + .scan2() .unwrap() .into_array_stream() .unwrap() @@ -475,7 +475,7 @@ async fn issue_5385_filter_casted_column() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .with_filter(eq( cast( @@ -525,7 +525,7 @@ async fn filter_string() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .with_filter(eq(get_item("name", root()), lit("Joseph"))) .into_array_stream() @@ -574,7 +574,7 @@ async fn filter_or() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .with_filter(or( eq(get_item("name", root()), lit("Angela")), @@ -634,7 +634,7 @@ async fn filter_and() { .open_options() .open_buffer(buf) .unwrap() - .scan() + .scan2() .unwrap() .with_filter(and( gt(get_item("age", root()), lit(21)), @@ -688,7 +688,7 @@ async fn test_with_indices_simple() { // test no indices let actual_kept_array = file - .scan() + .scan2() .unwrap() .with_row_indices(Buffer::::empty()) .into_array_stream() @@ -704,7 +704,7 @@ async fn test_with_indices_simple() { let kept_indices = [0_u64, 3, 99, 100, 101, 399, 400, 401, 499]; let actual_kept_array = file - .scan() + .scan2() .unwrap() .with_row_indices(Buffer::from_iter(kept_indices)) .into_array_stream() @@ -724,7 +724,7 @@ async fn test_with_indices_simple() { // test all indices let actual_array = file - .scan() + .scan2() .unwrap() .with_row_indices((0u64..500).collect::>()) .into_array_stream() @@ -767,7 +767,7 @@ async fn test_with_indices_on_two_columns() { let kept_indices = [0_u64, 3, 7]; let array = file - .scan() + .scan2() .unwrap() .with_row_indices(Buffer::from_iter(kept_indices)) .into_array_stream() @@ -822,7 +822,7 @@ async fn test_with_indices_and_with_row_filter_simple() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let actual_kept_array = file - .scan() + .scan2() .unwrap() .with_filter(gt(get_item("numbers", root()), lit(50_i16))) .with_row_indices(Buffer::empty()) @@ -839,7 +839,7 @@ async fn test_with_indices_and_with_row_filter_simple() { let kept_indices = [0u64, 3, 99, 100, 101, 399, 400, 401, 499]; let actual_kept_array = file - .scan() + .scan2() .unwrap() .with_filter(gt(get_item("numbers", root()), lit(50_i16))) .with_row_indices(Buffer::from_iter(kept_indices)) @@ -862,7 +862,7 @@ async fn test_with_indices_and_with_row_filter_simple() { // test all indices let actual_array = file - .scan() + .scan2() .unwrap() .with_filter(gt(get_item("numbers", root()), lit(50_i16))) .with_row_indices((0..500).collect::>()) @@ -925,7 +925,7 @@ async fn filter_string_chunked() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let actual_array = file - .scan() + .scan2() .unwrap() .with_filter(eq(get_item("name", root()), lit("Joseph"))) .into_array_stream() @@ -1013,7 +1013,7 @@ async fn test_pruning_with_or() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let actual_array = file - .scan() + .scan2() .unwrap() .with_filter(or( lt_eq(get_item("letter", root()), lit("J")), @@ -1086,7 +1086,7 @@ async fn test_repeated_projection() { let file = SESSION.open_options().open_buffer(buf).unwrap(); let actual = file - .scan() + .scan2() .unwrap() .with_projection(select(["strings", "strings"], root())) .into_array_stream() @@ -1126,7 +1126,7 @@ async fn chunked_file() -> VortexResult { #[tokio::test] async fn basic_file_roundtrip() -> VortexResult<()> { let vxf = chunked_file().await?; - let result = vxf.scan()?.into_array_stream()?.read_all().await?; + let result = vxf.scan2()?.into_array_stream()?.read_all().await?; let expected = buffer![0i32, 1, 2, 3, 4, 5, 6, 7, 8].into_array(); assert_arrays_eq!(result.as_ref(), expected.as_ref()); @@ -1170,7 +1170,7 @@ async fn file_excluding_dtype() -> VortexResult<()> { async fn file_take() -> VortexResult<()> { let vxf = chunked_file().await?; let result = vxf - .scan()? + .scan2()? .with_row_indices(buffer![0, 1, 8]) .into_array_stream()? .read_all() @@ -1208,7 +1208,7 @@ async fn write_nullable_top_level_struct() { async fn round_trip( array: &dyn Array, - f: impl Fn(ScanBuilder) -> VortexResult>, + f: impl Fn(ScanBuilder2) -> VortexResult, ) -> VortexResult { let mut writer = vec![]; SESSION @@ -1225,7 +1225,7 @@ async fn round_trip( assert_eq!(vxf.dtype(), array.dtype()); assert_eq!(vxf.row_count(), array.len() as u64); - f(vxf.scan()?)?.into_array_stream()?.read_all().await + f(vxf.scan2()?)?.into_array_stream()?.read_all().await } #[tokio::test] @@ -1304,7 +1304,7 @@ async fn test_into_tokio_array_stream() -> VortexResult<()> { .await?; let file = SESSION.open_options().open_buffer(buf)?; - let stream = file.scan().unwrap().into_array_stream()?; + let stream = file.scan2().unwrap().into_array_stream()?; let array = stream.read_all().await?; assert_eq!(array.len(), 8); @@ -1326,7 +1326,7 @@ async fn test_array_stream_no_double_dict_encode() -> VortexResult<()> { .write(&mut buf, array.to_array_stream()) .await?; let file = SESSION.open_options().open_buffer(buf)?; - let read_array = file.scan()?.into_array_stream()?.read_all().await?; + let read_array = file.scan2()?.into_array_stream()?.read_all().await?; let dict = read_array .as_opt::() @@ -1354,7 +1354,7 @@ async fn test_writer_basic_push() -> VortexResult<()> { assert_eq!(summary.row_count(), 4); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 4); assert_eq!(result.dtype(), &dtype); @@ -1384,7 +1384,7 @@ async fn test_writer_multiple_pushes() -> VortexResult<()> { assert_eq!(summary.row_count(), 9); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 9); let numbers = result.to_struct().field_by_name("numbers")?.clone(); @@ -1415,7 +1415,7 @@ async fn test_writer_push_stream() -> VortexResult<()> { assert_eq!(summary.row_count(), 6); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 6); let numbers = result.to_struct().field_by_name("numbers")?.clone(); @@ -1476,7 +1476,7 @@ async fn test_writer_empty_chunks() -> VortexResult<()> { assert_eq!(summary.row_count(), 2); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 2); let numbers = result.to_struct().field_by_name("numbers")?.clone(); @@ -1511,7 +1511,7 @@ async fn test_writer_mixed_push_and_stream() -> VortexResult<()> { assert_eq!(summary.row_count(), 6); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 6); let numbers = result.to_struct().field_by_name("numbers")?.clone(); @@ -1548,7 +1548,7 @@ async fn test_writer_with_complex_types() -> VortexResult<()> { assert_eq!(footer.row_count(), 3); let file = SESSION.open_options().open_buffer(buf)?; - let result = file.scan()?.into_array_stream()?.read_all().await?; + let result = file.scan2()?.into_array_stream()?.read_all().await?; assert_eq!(result.len(), 3); assert_eq!(result.dtype(), &dtype); diff --git a/vortex-layout/src/v2/readers/chunked.rs b/vortex-layout/src/v2/readers/chunked.rs index e4e5ca7088d..244421dca36 100644 --- a/vortex-layout/src/v2/readers/chunked.rs +++ b/vortex-layout/src/v2/readers/chunked.rs @@ -10,6 +10,7 @@ use vortex_array::ArrayRef; use vortex_array::IntoArray; use vortex_array::arrays::ChunkedArray; use vortex_dtype::DType; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_err; @@ -135,11 +136,13 @@ impl ReaderStream for ChunkedReaderStream { // Remove any chunks that are already exhausted loop { + if self.chunks.is_empty() { + vortex_bail!("Early termination of chunked layout"); + } if self.chunks[0].next_chunk_len().is_none() { self.chunks.remove(0); - } else { - break; } + next_len = self.chunks[0].next_chunk_len().vortex_expect("non-none"); } } diff --git a/vortex-scan/src/lib.rs b/vortex-scan/src/lib.rs index 52eb0dd47bb..ed62775f7b0 100644 --- a/vortex-scan/src/lib.rs +++ b/vortex-scan/src/lib.rs @@ -26,5 +26,6 @@ pub mod gpu; mod repeated_scan; #[cfg(test)] mod test; +pub mod v2; pub use repeated_scan::RepeatedScan; diff --git a/vortex-scan/src/v2/mod.rs b/vortex-scan/src/v2/mod.rs new file mode 100644 index 00000000000..4f05171404d --- /dev/null +++ b/vortex-scan/src/v2/mod.rs @@ -0,0 +1,4 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +pub mod scan; diff --git a/vortex-scan/src/v2/scan.rs b/vortex-scan/src/v2/scan.rs new file mode 100644 index 00000000000..c806cf15e12 --- /dev/null +++ b/vortex-scan/src/v2/scan.rs @@ -0,0 +1,68 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::expr::Expression; +use vortex_array::expr::root; +use vortex_array::stream::SendableArrayStream; +use vortex_buffer::Buffer; +use vortex_error::VortexResult; +use vortex_layout::v2::reader::ReaderRef; +use vortex_session::VortexSession; + +use crate::Selection; + +pub struct ScanBuilder2 { + reader: ReaderRef, + projection: Expression, + filter: Option, + limit: Option, + row_selection: Selection, + session: VortexSession, +} + +impl ScanBuilder2 { + pub fn new(reader: ReaderRef, session: VortexSession) -> Self { + Self { + reader, + projection: root(), + filter: None, + limit: None, + row_selection: Selection::All, + session, + } + } + + pub fn with_filter(mut self, filter: Expression) -> Self { + self.filter = Some(filter); + self + } + + pub fn with_some_filter(mut self, filter: Option) -> Self { + self.filter = filter; + self + } + + pub fn with_projection(mut self, projection: Expression) -> Self { + self.projection = projection; + self + } + + pub fn with_limit(mut self, limit: u64) -> Self { + self.limit = Some(limit); + self + } + + pub fn with_row_selection(mut self, row_selection: Selection) -> Self { + self.row_selection = row_selection; + self + } + + pub fn with_row_indices(mut self, row_indices: Buffer) -> Self { + self.row_selection = Selection::IncludeByIndex(row_indices); + self + } + + pub fn into_array_stream(self) -> VortexResult { + todo!() + } +} From 9646a53274987ac1492f6c83661b3105eb579b72 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 14 Jan 2026 15:36:12 +0000 Subject: [PATCH 09/11] Scan API Signed-off-by: Nicholas Gates --- vortex-scan/src/v2/scan.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/vortex-scan/src/v2/scan.rs b/vortex-scan/src/v2/scan.rs index c806cf15e12..946514096bb 100644 --- a/vortex-scan/src/v2/scan.rs +++ b/vortex-scan/src/v2/scan.rs @@ -66,3 +66,5 @@ impl ScanBuilder2 { todo!() } } + +struct Scan {} From c0645ca1bfacd565f1b8b5fe4260aa1231357f88 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Wed, 14 Jan 2026 22:16:37 +0000 Subject: [PATCH 10/11] Scan API Signed-off-by: Nicholas Gates --- Cargo.lock | 1 + vortex-array/src/expr/exprs/mod.rs | 2 + vortex-array/src/expr/exprs/stats.rs | 84 ++++++++++++++++ vortex-array/src/expr/stats/mod.rs | 2 +- vortex-array/src/expr/vtable.rs | 23 ++++- vortex-layout/src/v2/expression.rs | 52 ++++++++++ vortex-layout/src/v2/expressions/falsify.rs | 95 ++++++++++++++++++ vortex-layout/src/v2/expressions/mod.rs | 4 + vortex-layout/src/v2/matcher.rs | 16 +++ vortex-layout/src/v2/mod.rs | 3 + vortex-layout/src/v2/optimizer.rs | 7 +- vortex-layout/src/v2/reader.rs | 4 + vortex-layout/src/v2/readers/chunked.rs | 9 +- vortex-layout/src/v2/readers/constant.rs | 79 +++++++++++++++ vortex-layout/src/v2/readers/flat.rs | 9 +- vortex-layout/src/v2/readers/mod.rs | 2 + vortex-layout/src/v2/readers/scalar_fn.rs | 52 ++++++++++ vortex-layout/src/v2/readers/struct_.rs | 9 +- vortex-layout/src/v2/readers/zoned.rs | 104 ++++++++++++++++++++ vortex-scan/src/v2/scan.rs | 68 ++++++++++++- 20 files changed, 611 insertions(+), 14 deletions(-) create mode 100644 vortex-array/src/expr/exprs/stats.rs create mode 100644 vortex-layout/src/v2/expression.rs create mode 100644 vortex-layout/src/v2/expressions/falsify.rs create mode 100644 vortex-layout/src/v2/expressions/mod.rs create mode 100644 vortex-layout/src/v2/matcher.rs create mode 100644 vortex-layout/src/v2/readers/constant.rs create mode 100644 vortex-layout/src/v2/readers/zoned.rs diff --git a/Cargo.lock b/Cargo.lock index 11b61602873..c9462d2acdf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10683,6 +10683,7 @@ version = "0.1.0" dependencies = [ "arrow-array 57.2.0", "arrow-schema 57.2.0", + "async-trait", "bit-vec 0.8.0", "futures", "itertools 0.14.0", diff --git a/vortex-array/src/expr/exprs/mod.rs b/vortex-array/src/expr/exprs/mod.rs index c606b53f5a0..c8a5776a131 100644 --- a/vortex-array/src/expr/exprs/mod.rs +++ b/vortex-array/src/expr/exprs/mod.rs @@ -17,6 +17,7 @@ pub(crate) mod operators; pub(crate) mod pack; pub(crate) mod root; pub(crate) mod select; +pub(crate) mod stats; pub use between::*; pub use binary::*; @@ -34,3 +35,4 @@ pub use operators::*; pub use pack::*; pub use root::*; pub use select::*; +pub use stats::*; diff --git a/vortex-array/src/expr/exprs/stats.rs b/vortex-array/src/expr/exprs/stats.rs new file mode 100644 index 00000000000..2afefa98232 --- /dev/null +++ b/vortex-array/src/expr/exprs/stats.rs @@ -0,0 +1,84 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_vector::Datum; +use vortex_vector::Scalar; + +use crate::Array; +use crate::ArrayRef; +use crate::IntoArray; +use crate::arrays::ConstantArray; +use crate::expr::Arity; +use crate::expr::ChildName; +use crate::expr::ExecutionArgs; +use crate::expr::ExprId; +use crate::expr::Expression; +use crate::expr::SimplifyCtx; +use crate::expr::VTable; +use crate::expr::VTableExt; +use crate::expr::stats::Stat; + +/// Creates a new expression that returns a minimum bound of its input. +pub fn statistic(stat: Stat, child: Expression) -> Expression { + Statistic.new_expr(stat, vec![child]) +} + +pub struct Statistic; + +impl VTable for Statistic { + type Options = Stat; + + fn id(&self) -> ExprId { + ExprId::from("statistic") + } + + fn arity(&self, _options: &Self::Options) -> Arity { + Arity::Exact(1) + } + + fn child_name(&self, _options: &Self::Options, _child_idx: usize) -> ChildName { + ChildName::from("input") + } + + fn return_dtype(&self, stat: &Stat, arg_dtypes: &[DType]) -> VortexResult { + stat.dtype(&arg_dtypes[0]) + .ok_or_else(|| { + vortex_err!( + "statistic {:?} not supported for dtype {:?}", + stat, + arg_dtypes[0] + ) + }) + // We make all statistics types nullable in case there is no reduction rule to handle + // the statistic expression. + .map(|dt| dt.as_nullable()) + } + + fn evaluate( + &self, + _stat: &Stat, + expr: &Expression, + scope: &ArrayRef, + ) -> VortexResult { + let return_dtype = expr.return_dtype(scope.dtype())?; + Ok(ConstantArray::new(vortex_scalar::Scalar::null(return_dtype), scope.len()).into_array()) + } + + fn execute(&self, _stat: &Stat, args: ExecutionArgs) -> VortexResult { + Ok(Datum::Scalar(Scalar::null(&args.return_dtype))) + } + + fn simplify( + &self, + _options: &Self::Options, + _expr: &Expression, + _ctx: &dyn SimplifyCtx, + ) -> VortexResult> { + // FIXME(ngates): we really want to implement a reduction rule for all arrays? But it's an array. + // And it's a reduction rule. How do we do this without reduce_parent on everything..? + Ok(None) + } +} diff --git a/vortex-array/src/expr/stats/mod.rs b/vortex-array/src/expr/stats/mod.rs index cba33e2743c..e01f807e21d 100644 --- a/vortex-array/src/expr/stats/mod.rs +++ b/vortex-array/src/expr/stats/mod.rs @@ -216,7 +216,7 @@ impl Stat { }) } - pub fn name(&self) -> &str { + pub const fn name(&self) -> &'static str { match self { Self::IsConstant => "is_constant", Self::IsSorted => "is_sorted", diff --git a/vortex-array/src/expr/vtable.rs b/vortex-array/src/expr/vtable.rs index 3e0efafb6ae..fc77fb4857f 100644 --- a/vortex-array/src/expr/vtable.rs +++ b/vortex-array/src/expr/vtable.rs @@ -72,7 +72,20 @@ pub trait VTable: 'static + Sized + Send + Sync { options: &Self::Options, expr: &Expression, f: &mut Formatter<'_>, - ) -> fmt::Result; + ) -> fmt::Result { + write!(f, "{}(", expr.id())?; + for (i, child) in expr.children().iter().enumerate() { + if i > 0 { + write!(f, ", ")?; + } + child.fmt_sql(f)?; + } + let options = format!("{}", options); + if !options.is_empty() { + write!(f, ", options={}", options)?; + } + write!(f, ")") + } /// Compute the return [`DType`] of the expression if evaluated over the given input types. fn return_dtype(&self, options: &Self::Options, arg_dtypes: &[DType]) -> VortexResult; @@ -143,6 +156,14 @@ pub trait VTable: 'static + Sized + Send + Sync { Ok(None) } + /// Falsify the expression, returning a new expression that is true whenever the original + /// expression is guaranteed to be false via stats. + fn falsify(&self, options: &Self::Options, expr: &Expression) -> Option { + _ = options; + _ = expr; + None + } + /// See [`Expression::stat_falsification`]. fn stat_falsification( &self, diff --git a/vortex-layout/src/v2/expression.rs b/vortex-layout/src/v2/expression.rs new file mode 100644 index 00000000000..76f639f987f --- /dev/null +++ b/vortex-layout/src/v2/expression.rs @@ -0,0 +1,52 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use itertools::Itertools; +use vortex_array::expr::Expression; +use vortex_array::expr::Literal; +use vortex_array::expr::Root; +use vortex_error::VortexResult; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::readers::constant::ConstantReader; +use crate::v2::readers::scalar_fn::ScalarFnReader; + +impl dyn Reader + '_ { + /// Apply the expression to this reader, producing a new reader in constant time. + pub fn apply(self: Arc, expr: &Expression) -> VortexResult { + // If the expression is a root, return self. + if expr.is::() { + return Ok(self); + } + + // Manually convert literals to ConstantArray. + if let Some(scalar) = expr.as_opt::() { + return Ok(Arc::new(ConstantReader::new( + scalar.clone(), + self.row_count(), + ))); + } + + let row_count = self.row_count(); + + // Otherwise, collect the child readers. + let children: Vec<_> = expr + .children() + .iter() + .map(|e| self.clone().apply(e)) + .try_collect()?; + + // And wrap the scalar function up in an array. + let reader: ReaderRef = Arc::new(ScalarFnReader::try_new( + expr.scalar_fn().clone(), + children, + row_count, + )?); + + // Optimize the resulting reader. + reader.optimize() + } +} diff --git a/vortex-layout/src/v2/expressions/falsify.rs b/vortex-layout/src/v2/expressions/falsify.rs new file mode 100644 index 00000000000..56cfcf8a77c --- /dev/null +++ b/vortex-layout/src/v2/expressions/falsify.rs @@ -0,0 +1,95 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt::Display; +use std::fmt::Formatter; + +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::arrays::ConstantArray; +use vortex_array::expr::Arity; +use vortex_array::expr::ChildName; +use vortex_array::expr::ExecutionArgs; +use vortex_array::expr::ExprId; +use vortex_array::expr::Expression; +use vortex_array::expr::VTable; +use vortex_dtype::DType; +use vortex_dtype::Nullability; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_vector::Datum; +use vortex_vector::Scalar; +use vortex_vector::bool::BoolScalar; + +/// An expression that evaluates to true when the predicate is provably false, without evaluating +/// it. +/// +/// Falsify typically reduces to operations over statistics expressions. For example, +/// the expression `falsify(col > 5)` may reduce to `col.max() <= 5`. +/// +/// If a falsify expression cannot be reduced, it evaluates to `false` for all inputs. +pub struct Falsify; + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct FalsifyOptions { + predicate: Expression, +} + +impl Display for FalsifyOptions { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "predicate={}", self.predicate) + } +} + +impl VTable for Falsify { + // FIXME(ngates): should the predicate be a child expression, or live like this in the options. + // It's a bit weird? Maybe it makes implementing the optimizer rules a little more fiddly? + // But it's weird to have a child expression that we know is never executed. + type Options = FalsifyOptions; + + fn id(&self) -> ExprId { + ExprId::from("falsify") + } + + fn arity(&self, _options: &Self::Options) -> Arity { + Arity::Exact(0) + } + + fn child_name(&self, _options: &Self::Options, _child_idx: usize) -> ChildName { + ChildName::from("predicate") + } + + fn fmt_sql( + &self, + _options: &Self::Options, + expr: &Expression, + f: &mut Formatter<'_>, + ) -> std::fmt::Result { + write!(f, "falsify(")?; + expr.child(0).fmt_sql(f)?; + write!(f, ")") + } + + fn return_dtype(&self, _options: &Self::Options, arg_dtypes: &[DType]) -> VortexResult { + if !arg_dtypes[0].is_boolean() { + vortex_bail!("falsify() requires a boolean argument"); + } + Ok(DType::Bool(Nullability::NonNullable)) + } + + // NOTE(ngates): do we prefer evaluate or execute semantics??? + fn evaluate( + &self, + _options: &Self::Options, + _expr: &Expression, + scope: &ArrayRef, + ) -> VortexResult { + // Unless falsify has been reduced by another expression, we cannot prove the predicate + // is false. Therefore, we return a constant false array. + Ok(ConstantArray::new(false, scope.len()).into_array()) + } + + fn execute(&self, _options: &Self::Options, _args: ExecutionArgs) -> VortexResult { + Ok(Datum::Scalar(Scalar::Bool(BoolScalar::new(Some(false))))) + } +} diff --git a/vortex-layout/src/v2/expressions/mod.rs b/vortex-layout/src/v2/expressions/mod.rs new file mode 100644 index 00000000000..41b859483a0 --- /dev/null +++ b/vortex-layout/src/v2/expressions/mod.rs @@ -0,0 +1,4 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod falsify; diff --git a/vortex-layout/src/v2/matcher.rs b/vortex-layout/src/v2/matcher.rs new file mode 100644 index 00000000000..dcd68f2dc1d --- /dev/null +++ b/vortex-layout/src/v2/matcher.rs @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::expr; + +use crate::v2::reader::Reader; +use crate::v2::readers::scalar_fn::ScalarFnReader; + +impl dyn Reader + '_ { + /// If this reader is a [`ScalarFnReader`], return its scalar function options + pub fn as_scalar_fn(&self) -> Option<&V::Options> { + self.as_any() + .downcast_ref::() + .and_then(|r| r.scalar_fn().as_opt::()) + } +} diff --git a/vortex-layout/src/v2/mod.rs b/vortex-layout/src/v2/mod.rs index 48e8c801039..2fe71dbffb7 100644 --- a/vortex-layout/src/v2/mod.rs +++ b/vortex-layout/src/v2/mod.rs @@ -1,6 +1,9 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +mod expression; +mod expressions; +mod matcher; mod optimizer; pub mod reader; pub mod readers; diff --git a/vortex-layout/src/v2/optimizer.rs b/vortex-layout/src/v2/optimizer.rs index 5c2686654e6..343179037ac 100644 --- a/vortex-layout/src/v2/optimizer.rs +++ b/vortex-layout/src/v2/optimizer.rs @@ -1,13 +1,16 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::sync::Arc; + use vortex_error::VortexResult; use crate::v2::reader::Reader; use crate::v2::reader::ReaderRef; impl dyn Reader + '_ { - pub fn optimize(&self) -> VortexResult { - todo!() + pub fn optimize(self: Arc) -> VortexResult { + // TODO(ngates): run the reduce rules + Ok(self) } } diff --git a/vortex-layout/src/v2/reader.rs b/vortex-layout/src/v2/reader.rs index 6b6585a31f3..589cbe8b322 100644 --- a/vortex-layout/src/v2/reader.rs +++ b/vortex-layout/src/v2/reader.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::any::Any; use std::ops::Range; use std::sync::Arc; @@ -19,6 +20,9 @@ pub type ReaderRef = Arc; /// array data that can be used to provide arguments to parameterized filter and projection /// expressions. pub trait Reader: 'static + Send + Sync { + /// Downcast the reader to a concrete type. + fn as_any(&self) -> &dyn Any; + /// Get the data type of the layout being read. fn dtype(&self) -> &DType; diff --git a/vortex-layout/src/v2/readers/chunked.rs b/vortex-layout/src/v2/readers/chunked.rs index 244421dca36..7eaa90e388a 100644 --- a/vortex-layout/src/v2/readers/chunked.rs +++ b/vortex-layout/src/v2/readers/chunked.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::any::Any; use std::ops::Range; use futures::future::BoxFuture; @@ -21,13 +22,17 @@ use crate::v2::reader::ReaderRef; use crate::v2::reader::ReaderStream; use crate::v2::reader::ReaderStreamRef; -pub struct ChunkedReader2 { +pub struct ChunkedReader { row_count: u64, dtype: DType, chunks: Vec, } -impl Reader for ChunkedReader2 { +impl Reader for ChunkedReader { + fn as_any(&self) -> &dyn Any { + self + } + fn dtype(&self) -> &DType { &self.dtype } diff --git a/vortex-layout/src/v2/readers/constant.rs b/vortex-layout/src/v2/readers/constant.rs new file mode 100644 index 00000000000..a15676b712b --- /dev/null +++ b/vortex-layout/src/v2/readers/constant.rs @@ -0,0 +1,79 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; + +use futures::future::BoxFuture; +use moka::future::FutureExt; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::arrays::ConstantArray; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_mask::Mask; +use vortex_scalar::Scalar; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderStream; +use crate::v2::reader::ReaderStreamRef; + +pub struct ConstantReader { + scalar: Scalar, + row_count: u64, +} + +impl ConstantReader { + pub fn new(scalar: Scalar, row_count: u64) -> Self { + Self { scalar, row_count } + } +} + +impl Reader for ConstantReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + self.scalar.dtype() + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn execute(&self, row_range: Range) -> VortexResult { + let remaining = row_range.end.saturating_sub(row_range.start); + Ok(Box::new(ConstantReaderStream { + scalar: self.scalar.clone(), + remaining, + })) + } +} + +struct ConstantReaderStream { + scalar: Scalar, + remaining: u64, +} + +impl ReaderStream for ConstantReaderStream { + fn dtype(&self) -> &DType { + self.scalar.dtype() + } + + fn next_chunk_len(&self) -> Option { + if self.remaining == 0 { + None + } else { + Some(usize::try_from(self.remaining).unwrap_or(usize::MAX)) + } + } + + fn next_chunk( + &mut self, + mask: &Mask, + ) -> VortexResult>> { + let array = ConstantArray::new(self.scalar.clone(), mask.true_count()).into_array(); + Ok(async move { Ok(array) }.boxed()) + } +} diff --git a/vortex-layout/src/v2/readers/flat.rs b/vortex-layout/src/v2/readers/flat.rs index b4f0bf0c3e7..fefeec326d3 100644 --- a/vortex-layout/src/v2/readers/flat.rs +++ b/vortex-layout/src/v2/readers/flat.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::any::Any; use std::ops::Range; use futures::future::BoxFuture; @@ -17,13 +18,17 @@ use crate::v2::reader::Reader; use crate::v2::reader::ReaderStream; use crate::v2::reader::ReaderStreamRef; -pub struct FlatReader2 { +pub struct FlatReader { len: usize, dtype: DType, array_fut: SharedArrayFuture, } -impl Reader for FlatReader2 { +impl Reader for FlatReader { + fn as_any(&self) -> &dyn Any { + self + } + fn dtype(&self) -> &DType { &self.dtype } diff --git a/vortex-layout/src/v2/readers/mod.rs b/vortex-layout/src/v2/readers/mod.rs index 6d020982f62..6d354cb392d 100644 --- a/vortex-layout/src/v2/readers/mod.rs +++ b/vortex-layout/src/v2/readers/mod.rs @@ -2,6 +2,8 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors pub mod chunked; +pub mod constant; pub mod flat; pub mod scalar_fn; pub mod struct_; +mod zoned; diff --git a/vortex-layout/src/v2/readers/scalar_fn.rs b/vortex-layout/src/v2/readers/scalar_fn.rs index 009cfea7cf9..86e2348760f 100644 --- a/vortex-layout/src/v2/readers/scalar_fn.rs +++ b/vortex-layout/src/v2/readers/scalar_fn.rs @@ -1,14 +1,19 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::any::Any; use std::ops::Range; +use std::sync::Arc; use futures::future::BoxFuture; use futures::future::try_join_all; use vortex_array::ArrayRef; use vortex_array::IntoArray; use vortex_array::arrays::ScalarFnArray; +use vortex_array::expr::Expression; use vortex_array::expr::ScalarFn; +use vortex_array::expr::VTable; +use vortex_array::expr::VTableExt; use vortex_array::optimizer::ArrayOptimizer; use vortex_dtype::DType; use vortex_error::VortexResult; @@ -27,7 +32,37 @@ pub struct ScalarFnReader { children: Vec, } +impl ScalarFnReader { + pub fn try_new( + scalar_fn: ScalarFn, + children: Vec, + row_count: u64, + ) -> VortexResult { + let dtype = scalar_fn.return_dtype( + &children + .iter() + .map(|c| c.dtype().clone()) + .collect::>(), + )?; + + Ok(Self { + scalar_fn, + dtype, + row_count, + children, + }) + } + + pub fn scalar_fn(&self) -> &ScalarFn { + &self.scalar_fn + } +} + impl Reader for ScalarFnReader { + fn as_any(&self) -> &dyn Any { + self + } + fn dtype(&self) -> &DType { &self.dtype } @@ -90,3 +125,20 @@ impl ReaderStream for ScalarFnArrayStream { })) } } + +pub trait ScalarFnReaderExt: VTable { + /// Creates a [`ScalarFnReader`] applying this scalar function to the given children. + fn new_reader( + &'static self, + options: Self::Options, + children: Vec, + row_count: u64, + ) -> VortexResult { + Ok(Arc::new(ScalarFnReader::try_new( + self.bind(options), + children.into(), + row_count, + )?)) + } +} +impl ScalarFnReaderExt for V {} diff --git a/vortex-layout/src/v2/readers/struct_.rs b/vortex-layout/src/v2/readers/struct_.rs index 97ddafa13e8..d290065e16c 100644 --- a/vortex-layout/src/v2/readers/struct_.rs +++ b/vortex-layout/src/v2/readers/struct_.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::any::Any; use std::ops::Range; use futures::future::BoxFuture; @@ -19,14 +20,18 @@ use crate::v2::reader::ReaderRef; use crate::v2::reader::ReaderStream; use crate::v2::reader::ReaderStreamRef; -pub struct StructReader2 { +pub struct StructReader { row_count: u64, dtype: DType, // TODO(ngates): we should make this lazy? fields: Vec, } -impl Reader for StructReader2 { +impl Reader for StructReader { + fn as_any(&self) -> &dyn Any { + self + } + fn dtype(&self) -> &DType { &self.dtype } diff --git a/vortex-layout/src/v2/readers/zoned.rs b/vortex-layout/src/v2/readers/zoned.rs new file mode 100644 index 00000000000..5fec47880ec --- /dev/null +++ b/vortex-layout/src/v2/readers/zoned.rs @@ -0,0 +1,104 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::any::Any; +use std::ops::Range; +use std::sync::Arc; + +use vortex_array::expr::GetItem; +use vortex_array::expr::Statistic; +use vortex_array::expr::stats::Stat; +use vortex_dtype::DType; +use vortex_dtype::FieldName; +use vortex_error::VortexResult; + +use crate::v2::reader::Reader; +use crate::v2::reader::ReaderRef; +use crate::v2::reader::ReaderStreamRef; +use crate::v2::readers::scalar_fn::ScalarFnReaderExt; + +pub struct ZonedReader { + data: ReaderRef, + zone_map: ReaderRef, + zone_len: usize, + present_stats: Arc<[Stat]>, +} + +impl Reader for ZonedReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + self.data.dtype() + } + + fn row_count(&self) -> u64 { + self.data.row_count() + } + + fn try_reduce_parent( + &self, + parent: &ReaderRef, + _child_idx: usize, + ) -> VortexResult> { + if let Some(stat) = parent.as_scalar_fn::() { + if !self.present_stats.contains(stat) { + return Ok(None); + } + + // We know the statistic is present; so we return a new reader that pulls the value + // from the zone map. + let zoned_statistic = GetItem.new_reader( + // FIXME(ngates): construct the field name properly + FieldName::from(stat.name()), + vec![self.zone_map.clone()], + self.zone_map.row_count(), + )?; + + // We now need to explode the zoned_statistic to match the data reader's row count. + // We do this based on the zone map's zone length. + let exploded_statistic = Arc::new(ZonedExpansionReader { + zoned: zoned_statistic, + zone_len: self.zone_len, + row_count: self.data.row_count(), + }); + + return Ok(Some(exploded_statistic)); + } + + Ok(None) + } + + fn execute(&self, row_range: Range) -> VortexResult { + // By default, a zoned reader is just a pass-through. + self.data.execute(row_range) + } +} + +/// A reader that expands zoned statistics to match the data rows. +/// This repeats each row of the zone map `zone_len` times. +/// TODO(ngates): we could use a RunEndReader + Slice to do this? +struct ZonedExpansionReader { + zoned: ReaderRef, + zone_len: usize, + row_count: u64, +} + +impl Reader for ZonedExpansionReader { + fn as_any(&self) -> &dyn Any { + self + } + + fn dtype(&self) -> &DType { + self.zoned.dtype() + } + + fn row_count(&self) -> u64 { + self.row_count + } + + fn execute(&self, row_range: Range) -> VortexResult { + todo!() + } +} diff --git a/vortex-scan/src/v2/scan.rs b/vortex-scan/src/v2/scan.rs index 946514096bb..834ce905e77 100644 --- a/vortex-scan/src/v2/scan.rs +++ b/vortex-scan/src/v2/scan.rs @@ -1,10 +1,19 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::ops::Range; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use futures::Stream; +use vortex_array::ArrayRef; use vortex_array::expr::Expression; use vortex_array::expr::root; +use vortex_array::stream::ArrayStream; use vortex_array::stream::SendableArrayStream; use vortex_buffer::Buffer; +use vortex_dtype::DType; use vortex_error::VortexResult; use vortex_layout::v2::reader::ReaderRef; use vortex_session::VortexSession; @@ -16,17 +25,20 @@ pub struct ScanBuilder2 { projection: Expression, filter: Option, limit: Option, - row_selection: Selection, + row_range: Range, + row_selection: Selection, // NOTE: applies to the selected row range. session: VortexSession, } impl ScanBuilder2 { pub fn new(reader: ReaderRef, session: VortexSession) -> Self { + let row_range = 0..reader.row_count(); Self { reader, projection: root(), filter: None, limit: None, + row_range, row_selection: Selection::All, session, } @@ -52,19 +64,67 @@ impl ScanBuilder2 { self } + pub fn with_row_range(mut self, row_range: Range) -> Self { + self.row_range = row_range; + self + } + + /// Sets the row selection to use the given selection (relative to the row range). pub fn with_row_selection(mut self, row_selection: Selection) -> Self { self.row_selection = row_selection; self } + /// Sets the row selection to include only the given row indices (relative to the row range). pub fn with_row_indices(mut self, row_indices: Buffer) -> Self { self.row_selection = Selection::IncludeByIndex(row_indices); self } - pub fn into_array_stream(self) -> VortexResult { - todo!() + pub fn into_array_stream(self) -> VortexResult { + let projection = self.projection.optimize_recursive(self.reader.dtype())?; + let filter = self + .filter + .map(|f| f.optimize_recursive(self.reader.dtype())) + .transpose()?; + + let dtype = projection.return_dtype(self.reader.dtype())?; + + // So we wrap the reader for filtering. + let filter_reader = filter.as_ref().map(|f| self.reader.apply(&f)).transpose()?; + let projection_reader = self.reader.apply(&projection)?; + + // And finally, we wrap the reader for pruning. + let pruning_reader = filter + .as_ref() + .map(|f| { + // TODO(ngates): wrap filter in `falsify` expression. + let f = f.falsify()?; + self.reader.apply(&f) + }) + .transpose()?; + + let reader_stream = self.reader.execute(self.row_range)?; + + Ok(Scan { dtype }) } } -struct Scan {} +struct Scan { + dtype: DType, + stream: SendableArrayStream, +} + +impl ArrayStream for Scan { + fn dtype(&self) -> &DType { + &self.dtype + } +} + +impl Stream for Scan { + type Item = VortexResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + todo!() + } +} From 33ea76df1470211f8770e9bf00e9dc3edbc6073d Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Thu, 15 Jan 2026 09:37:02 +0000 Subject: [PATCH 11/11] Merge Signed-off-by: Nicholas Gates --- vortex-layout/src/v2/readers/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-layout/src/v2/readers/mod.rs b/vortex-layout/src/v2/readers/mod.rs index 6d354cb392d..46d3c793376 100644 --- a/vortex-layout/src/v2/readers/mod.rs +++ b/vortex-layout/src/v2/readers/mod.rs @@ -6,4 +6,4 @@ pub mod constant; pub mod flat; pub mod scalar_fn; pub mod struct_; -mod zoned; +pub mod zoned;