From 405f9df8ad5d98fba89653a75987ab4745890ada Mon Sep 17 00:00:00 2001 From: godnight10061 Date: Sat, 17 Jan 2026 12:02:07 +0800 Subject: [PATCH 1/8] Fix nested projection for list-of-struct columns --- vortex-array/src/expr/exprs/get_item.rs | 297 ++++++++++++++++--- vortex-file/tests/test_write_table.rs | 122 ++++++++ vortex-layout/src/layouts/list/mod.rs | 221 ++++++++++++++ vortex-layout/src/layouts/list/reader.rs | 361 +++++++++++++++++++++++ vortex-layout/src/layouts/list/writer.rs | 304 +++++++++++++++++++ vortex-layout/src/layouts/mod.rs | 1 + vortex-layout/src/layouts/table.rs | 21 +- vortex-layout/src/session.rs | 2 + 8 files changed, 1285 insertions(+), 44 deletions(-) create mode 100644 vortex-layout/src/layouts/list/mod.rs create mode 100644 vortex-layout/src/layouts/list/reader.rs create mode 100644 vortex-layout/src/layouts/list/writer.rs diff --git a/vortex-array/src/expr/exprs/get_item.rs b/vortex-array/src/expr/exprs/get_item.rs index 4437004040b..23c8df71b1c 100644 --- a/vortex-array/src/expr/exprs/get_item.rs +++ b/vortex-array/src/expr/exprs/get_item.rs @@ -3,6 +3,7 @@ use std::fmt::Formatter; use std::ops::Not; +use std::sync::Arc; use prost::Message; use vortex_dtype::DType; @@ -14,11 +15,19 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_proto::expr as pb; use vortex_vector::Datum; +use vortex_vector::Scalar as VScalar; use vortex_vector::ScalarOps; use vortex_vector::VectorOps; +use vortex_vector::fixed_size_list::FixedSizeListScalar; +use vortex_vector::fixed_size_list::FixedSizeListVector; +use vortex_vector::listview::ListViewScalar; +use vortex_vector::listview::ListViewVector; use crate::ArrayRef; +use crate::IntoArray; use crate::ToCanonical; +use crate::arrays::FixedSizeListArray; +use crate::arrays::ListViewArray; use crate::builtins::ExprBuiltins; use crate::compute::mask; use crate::expr::Arity; @@ -39,6 +48,7 @@ use crate::expr::VTableExt; use crate::expr::exprs::root::root; use crate::expr::lit; use crate::expr::stats::Stat; +use crate::vtable::ValidityHelper; pub struct GetItem; @@ -85,23 +95,89 @@ impl VTable for GetItem { } fn return_dtype(&self, field_name: &FieldName, arg_dtypes: &[DType]) -> VortexResult { - let struct_dtype = &arg_dtypes[0]; - let field_dtype = struct_dtype - .as_struct_fields_opt() - .and_then(|st| st.field(field_name)) - .ok_or_else(|| { - vortex_err!("Couldn't find the {} field in the input scope", field_name) - })?; - - // Match here to avoid cloning the dtype if nullability doesn't need to change - if matches!( - (struct_dtype.nullability(), field_dtype.nullability()), - (Nullability::Nullable, Nullability::NonNullable) - ) { - return Ok(field_dtype.with_nullability(Nullability::Nullable)); + let input_dtype = &arg_dtypes[0]; + + // Struct field access: `$.a`. + if let Some(struct_fields) = input_dtype.as_struct_fields_opt() { + let field_dtype = struct_fields + .field(field_name) + .ok_or_else(|| { + vortex_err!("Couldn't find the {} field in the input scope", field_name) + })? + .clone(); + + // Match here to avoid cloning the dtype if nullability doesn't need to change + if matches!( + (input_dtype.nullability(), field_dtype.nullability()), + (Nullability::Nullable, Nullability::NonNullable) + ) { + return Ok(field_dtype.with_nullability(Nullability::Nullable)); + } + + return Ok(field_dtype); } - Ok(field_dtype) + // List-of-struct field access: `$.items.a` where `items: list`. + match input_dtype { + DType::List(element_dtype, list_nullability) => { + let element_dtype = element_dtype.as_ref(); + let struct_fields = element_dtype.as_struct_fields_opt().ok_or_else(|| { + vortex_err!( + "Expected list element dtype to be a struct for GetItem, got {}", + element_dtype + ) + })?; + let mut field_dtype = struct_fields + .field(field_name) + .ok_or_else(|| { + vortex_err!("Couldn't find the {} field in the input scope", field_name) + })? + .clone(); + + // If the struct elements are nullable, we must propagate that nullability to the + // extracted field. + if matches!( + (element_dtype.nullability(), field_dtype.nullability()), + (Nullability::Nullable, Nullability::NonNullable) + ) { + field_dtype = field_dtype.with_nullability(Nullability::Nullable); + } + + Ok(DType::List(Arc::new(field_dtype), *list_nullability)) + } + DType::FixedSizeList(element_dtype, list_size, list_nullability) => { + let element_dtype = element_dtype.as_ref(); + let struct_fields = element_dtype.as_struct_fields_opt().ok_or_else(|| { + vortex_err!( + "Expected fixed-size list element dtype to be a struct for GetItem, got {}", + element_dtype + ) + })?; + let mut field_dtype = struct_fields + .field(field_name) + .ok_or_else(|| { + vortex_err!("Couldn't find the {} field in the input scope", field_name) + })? + .clone(); + + if matches!( + (element_dtype.nullability(), field_dtype.nullability()), + (Nullability::Nullable, Nullability::NonNullable) + ) { + field_dtype = field_dtype.with_nullability(Nullability::Nullable); + } + + Ok(DType::FixedSizeList( + Arc::new(field_dtype), + *list_size, + *list_nullability, + )) + } + _ => Err(vortex_err!( + "Expected struct or list-of-struct dtype for child of GetItem expression, got {}", + input_dtype + )), + } } fn evaluate( @@ -110,34 +186,183 @@ impl VTable for GetItem { expr: &Expression, scope: &ArrayRef, ) -> VortexResult { - let input = expr.children()[0].evaluate(scope)?.to_struct(); - let field = input.field_by_name(field_name).cloned()?; + let input = expr.children()[0].evaluate(scope)?; + + // Struct field access: `$.a`. + if input.dtype().is_struct() { + let input = input.to_struct(); + let field = input.field_by_name(field_name).cloned()?; + + return match input.dtype().nullability() { + Nullability::NonNullable => Ok(field), + Nullability::Nullable => mask(&field, &input.validity_mask().not()), + }; + } - match input.dtype().nullability() { - Nullability::NonNullable => Ok(field), - Nullability::Nullable => mask(&field, &input.validity_mask().not()), + // List-of-struct field access: `$.items.a` where `items: list`. + match input.dtype() { + DType::List(..) => { + let list = input.to_listview(); + let struct_elems = list.elements().to_struct(); + let mut field = struct_elems.field_by_name(field_name).cloned()?; + + if struct_elems.dtype().is_nullable() { + field = mask(&field, &struct_elems.validity_mask().not())?; + } + + Ok(ListViewArray::try_new( + field, + list.offsets().clone(), + list.sizes().clone(), + list.validity().clone(), + )? + .into_array()) + } + DType::FixedSizeList(..) => { + let list = input.to_fixed_size_list(); + let struct_elems = list.elements().to_struct(); + let mut field = struct_elems.field_by_name(field_name).cloned()?; + + if struct_elems.dtype().is_nullable() { + field = mask(&field, &struct_elems.validity_mask().not())?; + } + + Ok(FixedSizeListArray::try_new( + field, + list.list_size(), + list.validity().clone(), + list.len(), + )? + .into_array()) + } + _ => Err(vortex_err!( + "Expected struct or list-of-struct scope for GetItem evaluation, got {}", + input.dtype() + )), } } fn execute(&self, field_name: &FieldName, mut args: ExecutionArgs) -> VortexResult { - let struct_dtype = args.dtypes[0] - .as_struct_fields_opt() - .ok_or_else(|| vortex_err!("Expected struct dtype for child of GetItem expression"))?; - let field_idx = struct_dtype - .find(field_name) - .ok_or_else(|| vortex_err!("Field {} not found in struct dtype", field_name))?; - - match args.datums.pop().vortex_expect("missing input") { - Datum::Scalar(s) => { - let mut field = s.as_struct().field(field_idx); - field.mask_validity(s.is_valid()); - Ok(Datum::Scalar(field)) + let input_dtype = &args.dtypes[0]; + + // Struct field access: `$.a`. + if let Some(struct_dtype) = input_dtype.as_struct_fields_opt() { + let field_idx = struct_dtype + .find(field_name) + .ok_or_else(|| vortex_err!("Field {} not found in struct dtype", field_name))?; + + return match args.datums.pop().vortex_expect("missing input") { + Datum::Scalar(s) => { + let mut field = s.as_struct().field(field_idx); + field.mask_validity(s.is_valid()); + Ok(Datum::Scalar(field)) + } + Datum::Vector(v) => { + let mut field = v.as_struct().fields()[field_idx].clone(); + field.mask_validity(v.validity()); + Ok(Datum::Vector(field)) + } + }; + } + + // List-of-struct field access: `$.items.a` where `items: list`. + match input_dtype { + DType::List(element_dtype, _) => { + let struct_dtype = element_dtype + .as_struct_fields_opt() + .ok_or_else(|| vortex_err!("Expected list element dtype to be struct"))?; + let field_idx = struct_dtype + .find(field_name) + .ok_or_else(|| vortex_err!("Field {} not found in struct dtype", field_name))?; + + match args.datums.pop().vortex_expect("missing input") { + Datum::Scalar(s) => { + if !s.is_valid() { + return Ok(Datum::Scalar(VScalar::null(&args.return_dtype))); + } + + let list = s.into_list(); + let list_vec = list.value().clone(); + let (elements, offsets, sizes, validity) = list_vec.into_parts(); + + let struct_elements = elements.as_ref().as_struct(); + let mut field = struct_elements.fields()[field_idx].clone(); + field.mask_validity(struct_elements.validity()); + + Ok(Datum::Scalar( + ListViewScalar::new(ListViewVector::try_new( + Arc::new(field), + offsets, + sizes, + validity, + )?) + .into(), + )) + } + Datum::Vector(v) => { + let list: ListViewVector = v.into_list(); + let (elements, offsets, sizes, validity) = list.into_parts(); + let struct_elements = elements.as_ref().as_struct(); + let mut field = struct_elements.fields()[field_idx].clone(); + field.mask_validity(struct_elements.validity()); + + Ok(Datum::Vector( + ListViewVector::try_new(Arc::new(field), offsets, sizes, validity)? + .into(), + )) + } + } } - Datum::Vector(v) => { - let mut field = v.as_struct().fields()[field_idx].clone(); - field.mask_validity(v.validity()); - Ok(Datum::Vector(field)) + DType::FixedSizeList(element_dtype, list_size, _) => { + let struct_dtype = element_dtype + .as_struct_fields_opt() + .ok_or_else(|| vortex_err!("Expected fixed-size list element dtype to be struct"))?; + let field_idx = struct_dtype + .find(field_name) + .ok_or_else(|| vortex_err!("Field {} not found in struct dtype", field_name))?; + + match args.datums.pop().vortex_expect("missing input") { + Datum::Scalar(s) => { + if !s.is_valid() { + return Ok(Datum::Scalar(VScalar::null(&args.return_dtype))); + } + + let list = s.into_fixed_size_list(); + let list_vec = list.value().clone(); + let (elements, _vector_list_size, validity) = list_vec.into_parts(); + + let struct_elements = elements.as_ref().as_struct(); + let mut field = struct_elements.fields()[field_idx].clone(); + field.mask_validity(struct_elements.validity()); + + Ok(Datum::Scalar( + FixedSizeListScalar::new(FixedSizeListVector::try_new( + Arc::new(field), + *list_size, + validity, + )?) + .into(), + )) + } + Datum::Vector(v) => { + let list = v.into_fixed_size_list(); + let (elements, _vector_list_size, validity) = list.into_parts(); + + let struct_elements = elements.as_ref().as_struct(); + let mut field = struct_elements.fields()[field_idx].clone(); + field.mask_validity(struct_elements.validity()); + + Ok(Datum::Vector( + FixedSizeListVector::try_new(Arc::new(field), *list_size, validity)? + .into(), + )) + } + } } + _ => Err(vortex_err!( + "Expected struct or list-of-struct dtype for child of GetItem expression, got {}", + input_dtype + )), } } diff --git a/vortex-file/tests/test_write_table.rs b/vortex-file/tests/test_write_table.rs index 388e52294e2..c468b2f65f3 100644 --- a/vortex-file/tests/test_write_table.rs +++ b/vortex-file/tests/test_write_table.rs @@ -11,13 +11,18 @@ use futures::pin_mut; use vortex_array::Array; use vortex_array::IntoArray; use vortex_array::ToCanonical; +use vortex_array::arrays::ListArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::StructArray; +use vortex_array::expr::get_item; +use vortex_array::expr::root; use vortex_array::expr::session::ExprSession; use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::ByteBuffer; use vortex_dtype::FieldNames; +use vortex_dtype::Nullability; +use vortex_dtype::PType; use vortex_dtype::field_path; use vortex_file::OpenOptionsSessionExt; use vortex_file::WriteOptionsSessionExt; @@ -28,6 +33,7 @@ use vortex_layout::layouts::table::TableStrategy; use vortex_layout::session::LayoutSession; use vortex_metrics::VortexMetrics; use vortex_session::VortexSession; +use vortex_scalar::Scalar; static SESSION: LazyLock = LazyLock::new(|| { let mut session = VortexSession::empty() @@ -114,3 +120,119 @@ async fn test_file_roundtrip() { assert!(raw.nbytes() > compressed.nbytes()); } } + +#[tokio::test] +async fn test_list_of_struct_nested_projection() { + // A list of structs should support nested field projection (e.g. `items.a`) without requiring + // users to pre-flatten their schemas. + + let element_dtype = Arc::new(vortex_dtype::DType::Struct( + [ + ("a", vortex_dtype::DType::Primitive(PType::I32, Nullability::NonNullable)), + ("b", vortex_dtype::DType::Utf8(Nullability::NonNullable)), + ] + .into_iter() + .collect(), + Nullability::NonNullable, + )); + + let row_count = 4; + let items = ListArray::from_iter_opt_slow::( + [ + Some(vec![ + Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(1i32, Nullability::NonNullable), + Scalar::utf8("x", Nullability::NonNullable), + ], + ), + Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(2i32, Nullability::NonNullable), + Scalar::utf8("y", Nullability::NonNullable), + ], + ), + ]), + Some(Vec::new()), + None, + Some(vec![Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(3i32, Nullability::NonNullable), + Scalar::utf8("z", Nullability::NonNullable), + ], + )]), + ], + element_dtype, + ) + .unwrap(); + + let ids = PrimitiveArray::from_iter((0..row_count).map(|i| i as i32)).into_array(); + + let data = StructArray::new( + FieldNames::from(["id", "items"]), + vec![ids, items], + row_count, + Validity::NonNullable, + ) + .into_array(); + + // Keep the writer intentionally simple (flat) so the layout shape is deterministic. + let writer = Arc::new(TableStrategy::default()); + + let mut bytes = Vec::new(); + SESSION + .write_options() + .with_strategy(writer) + .write(&mut bytes, data.to_array_stream()) + .await + .expect("write"); + + let bytes = ByteBuffer::from(bytes); + let vxf = SESSION.open_options().open_buffer(bytes).expect("open"); + + // Project `items.a` by applying `get_item("a", ...)` to a `List`. + let projection = get_item("a".to_string(), get_item("items".to_string(), root())); + + let mut stream = vxf + .scan() + .expect("scan") + .with_projection(projection) + .into_stream() + .expect("into_stream"); + + let out = stream.next().await.expect("first batch").expect("batch"); + + // The output is a nullable list with the same outer validity/offsets as `items`. + assert_eq!(out.len(), row_count); + assert!(matches!(out.dtype(), vortex_dtype::DType::List(_, Nullability::Nullable))); + + assert_eq!( + out.scalar_at(0).as_list().elements().unwrap().to_vec(), + vec![ + Scalar::primitive(1i32, Nullability::NonNullable), + Scalar::primitive(2i32, Nullability::NonNullable), + ] + ); + assert!(out.scalar_at(1).as_list().elements().unwrap().is_empty()); + assert!(out.scalar_at(2).is_null()); + assert_eq!( + out.scalar_at(3).as_list().elements().unwrap().to_vec(), + vec![Scalar::primitive(3i32, Nullability::NonNullable)] + ); + + // Verify the list column is not stored as a single flat blob layout. + // This is the root cause of poor nested support described in #4889. + let root_layout = vxf.footer().layout(); + let items_layout = (0..root_layout.nchildren()) + .find_map(|idx| match root_layout.child_type(idx) { + vortex_layout::LayoutChildType::Field(ref name) if name.as_ref() == "items" => { + Some(root_layout.child(idx).expect("items child")) + } + _ => None, + }) + .expect("items layout"); + assert_eq!(items_layout.encoding_id().as_ref(), "vortex.list"); +} diff --git a/vortex-layout/src/layouts/list/mod.rs b/vortex-layout/src/layouts/list/mod.rs new file mode 100644 index 00000000000..d54c3d4ba9c --- /dev/null +++ b/vortex-layout/src/layouts/list/mod.rs @@ -0,0 +1,221 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod reader; +pub mod writer; + +use std::sync::Arc; + +use reader::ListReader; +use vortex_array::ArrayContext; +use vortex_array::DeserializeMetadata; +use vortex_array::EmptyMetadata; +use vortex_dtype::DType; +use vortex_dtype::Nullability; +use vortex_dtype::PType; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_session::SessionExt; +use vortex_session::VortexSession; + +use crate::LayoutChildType; +use crate::LayoutEncodingRef; +use crate::LayoutId; +use crate::LayoutReaderRef; +use crate::LayoutRef; +use crate::VTable; +use crate::children::LayoutChildren; +use crate::children::OwnedLayoutChildren; +use crate::segments::SegmentId; +use crate::segments::SegmentSource; +use crate::vtable; + +vtable!(List); + +impl VTable for ListVTable { + type Layout = ListLayout; + type Encoding = ListLayoutEncoding; + type Metadata = EmptyMetadata; + + fn id(_encoding: &Self::Encoding) -> LayoutId { + LayoutId::new_ref("vortex.list") + } + + fn encoding(_layout: &Self::Layout) -> LayoutEncodingRef { + LayoutEncodingRef::new_ref(ListLayoutEncoding.as_ref()) + } + + fn row_count(layout: &Self::Layout) -> u64 { + layout.row_count + } + + fn dtype(layout: &Self::Layout) -> &DType { + &layout.dtype + } + + fn metadata(_layout: &Self::Layout) -> Self::Metadata { + EmptyMetadata + } + + fn segment_ids(_layout: &Self::Layout) -> Vec { + vec![] + } + + fn nchildren(layout: &Self::Layout) -> usize { + let validity_children = layout.dtype.is_nullable() as usize; + match layout.dtype { + DType::List(..) => 2 + validity_children, // offsets + elements + DType::FixedSizeList(..) => 1 + validity_children, // elements + _ => 0, + } + } + + fn child(layout: &Self::Layout, index: usize) -> VortexResult { + let is_nullable = layout.dtype.is_nullable(); + let offsets_dtype = DType::Primitive(PType::U64, Nullability::NonNullable); + + let child_dtype = match (&layout.dtype, is_nullable, index) { + // validity + (_, true, 0) => DType::Bool(Nullability::NonNullable), + + // variable-size list + (DType::List(element_dtype, _), false, 0) => offsets_dtype, + (DType::List(element_dtype, _), false, 1) => (*element_dtype.as_ref()).clone(), + (DType::List(element_dtype, _), true, 1) => offsets_dtype, + (DType::List(element_dtype, _), true, 2) => (*element_dtype.as_ref()).clone(), + + // fixed-size list + (DType::FixedSizeList(element_dtype, ..), false, 0) => { + (*element_dtype.as_ref()).clone() + } + (DType::FixedSizeList(element_dtype, ..), true, 1) => (*element_dtype.as_ref()).clone(), + + _ => return Err(vortex_err!("Invalid child index {index} for list layout")), + }; + + layout.children.child(index, &child_dtype) + } + + fn child_type(layout: &Self::Layout, idx: usize) -> LayoutChildType { + let is_nullable = layout.dtype.is_nullable(); + + if is_nullable && idx == 0 { + return LayoutChildType::Auxiliary("validity".into()); + } + + match layout.dtype { + DType::List(..) => { + let offsets_idx = if is_nullable { 1 } else { 0 }; + if idx == offsets_idx { + LayoutChildType::Auxiliary("offsets".into()) + } else { + LayoutChildType::Auxiliary("elements".into()) + } + } + DType::FixedSizeList(..) => LayoutChildType::Auxiliary("elements".into()), + _ => LayoutChildType::Auxiliary("unknown".into()), + } + } + + fn new_reader( + layout: &Self::Layout, + name: Arc, + segment_source: Arc, + session: &VortexSession, + ) -> VortexResult { + Ok(Arc::new(ListReader::try_new( + layout.clone(), + name, + segment_source, + session.session(), + )?)) + } + + fn build( + _encoding: &Self::Encoding, + dtype: &DType, + row_count: u64, + _metadata: &::Output, + _segment_ids: Vec, + children: &dyn LayoutChildren, + _ctx: ArrayContext, + ) -> VortexResult { + vortex_ensure!( + matches!(dtype, DType::List(..) | DType::FixedSizeList(..)), + "Expected list dtype, got {}", + dtype + ); + + let expected_children = match dtype { + DType::List(..) => 2 + (dtype.is_nullable() as usize), + DType::FixedSizeList(..) => 1 + (dtype.is_nullable() as usize), + _ => unreachable!(), + }; + vortex_ensure!( + children.nchildren() == expected_children, + "List layout has {} children, expected {}", + children.nchildren(), + expected_children + ); + + Ok(ListLayout { + row_count, + dtype: dtype.clone(), + children: children.to_arc(), + }) + } + + fn with_children(layout: &mut Self::Layout, children: Vec) -> VortexResult<()> { + let expected_children = match layout.dtype { + DType::List(..) => 2 + (layout.dtype.is_nullable() as usize), + DType::FixedSizeList(..) => 1 + (layout.dtype.is_nullable() as usize), + _ => vortex_bail!("Expected list dtype, got {}", layout.dtype), + }; + vortex_ensure!( + children.len() == expected_children, + "ListLayout expects {} children, got {}", + expected_children, + children.len() + ); + layout.children = OwnedLayoutChildren::layout_children(children); + Ok(()) + } +} + +#[derive(Debug)] +pub struct ListLayoutEncoding; + +#[derive(Clone, Debug)] +pub struct ListLayout { + row_count: u64, + dtype: DType, + children: Arc, +} + +impl ListLayout { + pub fn new(row_count: u64, dtype: DType, children: Vec) -> Self { + Self { + row_count, + dtype, + children: OwnedLayoutChildren::layout_children(children), + } + } + + #[inline] + pub fn row_count(&self) -> u64 { + self.row_count + } + + #[inline] + pub fn dtype(&self) -> &DType { + &self.dtype + } + + #[inline] + pub fn children(&self) -> &Arc { + &self.children + } +} + diff --git a/vortex-layout/src/layouts/list/reader.rs b/vortex-layout/src/layouts/list/reader.rs new file mode 100644 index 00000000000..ccec676b1d8 --- /dev/null +++ b/vortex-layout/src/layouts/list/reader.rs @@ -0,0 +1,361 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::collections::BTreeSet; +use std::ops::BitAnd; +use std::ops::Range; +use std::sync::Arc; + +use futures::future::BoxFuture; +use futures::try_join; +use vortex_array::Array; +use vortex_array::ArrayRef; +use vortex_array::IntoArray; +use vortex_array::MaskFuture; +use vortex_array::ToCanonical; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::FixedSizeListArray; +use vortex_array::arrays::ListArray; +use vortex_array::expr::Expression; +use vortex_array::expr::root; +use vortex_dtype::DType; +use vortex_dtype::FieldMask; +use vortex_dtype::Nullability; +use vortex_dtype::PType; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_mask::Mask; +use vortex_session::VortexSession; + +use crate::ArrayFuture; +use crate::LayoutReader; +use crate::LayoutReaderRef; +use crate::LazyReaderChildren; +use crate::layouts::list::ListLayout; +use crate::segments::SegmentSource; + +pub struct ListReader { + layout: ListLayout, + name: Arc, + lazy_children: LazyReaderChildren, + session: VortexSession, +} + +impl ListReader { + pub(super) fn try_new( + layout: ListLayout, + name: Arc, + segment_source: Arc, + session: VortexSession, + ) -> VortexResult { + let mut dtypes: Vec = Vec::new(); + let mut names: Vec> = Vec::new(); + + if layout.dtype().is_nullable() { + dtypes.push(DType::Bool(Nullability::NonNullable)); + names.push(Arc::from("validity")); + } + + match layout.dtype() { + DType::List(element_dtype, _) => { + dtypes.push(DType::Primitive(PType::U64, Nullability::NonNullable)); + names.push(Arc::from("offsets")); + dtypes.push((**element_dtype).clone()); + names.push(Arc::from("elements")); + } + DType::FixedSizeList(element_dtype, ..) => { + dtypes.push((**element_dtype).clone()); + names.push(Arc::from("elements")); + } + _ => vortex_bail!("Expected list dtype, got {}", layout.dtype()), + } + + let lazy_children = LazyReaderChildren::new( + layout.children().clone(), + dtypes, + names, + segment_source, + session.clone(), + ); + + Ok(Self { + layout, + name, + lazy_children, + session, + }) + } + + fn validity(&self) -> VortexResult> { + self.dtype() + .is_nullable() + .then(|| self.lazy_children.get(0)) + .transpose() + } + + fn offsets(&self) -> VortexResult<&LayoutReaderRef> { + let idx = if self.dtype().is_nullable() { 1 } else { 0 }; + self.lazy_children.get(idx) + } + + fn elements(&self) -> VortexResult<&LayoutReaderRef> { + let idx = match self.dtype() { + DType::List(..) => { + if self.dtype().is_nullable() { + 2 + } else { + 1 + } + } + DType::FixedSizeList(..) => { + if self.dtype().is_nullable() { + 1 + } else { + 0 + } + } + _ => return Err(vortex_err!("Expected list dtype, got {}", self.dtype())), + }; + self.lazy_children.get(idx) + } + + fn list_slice_future( + &self, + row_range: Range, + element_expr: &Expression, + ) -> VortexResult { + let dtype = self.dtype().clone(); + let validity_fut = self + .validity()? + .map(|reader| { + let len = usize::try_from(row_range.end - row_range.start) + .vortex_expect("row range must fit in usize"); + reader.projection_evaluation(&row_range, &root(), MaskFuture::new_true(len)) + }) + .transpose()?; + + match dtype { + DType::List(element_dtype, list_nullability) => { + let offsets_reader = self.offsets()?.clone(); + let elements_reader = self.elements()?.clone(); + let row_range_clone = row_range.clone(); + let element_expr = element_expr.clone(); + + Ok(Box::pin(async move { + let row_len = usize::try_from(row_range_clone.end - row_range_clone.start) + .vortex_expect("row range must fit in usize"); + + let offsets_row_range = row_range_clone.start..row_range_clone.end + 1; + let offsets_len = row_len + 1; + let offsets_fut = offsets_reader.projection_evaluation( + &offsets_row_range, + &root(), + MaskFuture::new_true(offsets_len), + )?; + + let (offsets, validity) = try_join!( + offsets_fut, + async move { + match validity_fut { + Some(v) => v.await.map(Some), + None => Ok(None), + } + } + )?; + + let offsets = offsets.to_primitive(); + let offsets_slice = offsets.as_slice::(); + let base = *offsets_slice.first().unwrap_or(&0u64); + let end = *offsets_slice.last().unwrap_or(&base); + + let elements_row_range = base..end; + let elements_len = usize::try_from(end - base) + .vortex_expect("element range must fit in usize"); + let elements = elements_reader.projection_evaluation( + &elements_row_range, + &element_expr, + MaskFuture::new_true(elements_len), + )?; + + let elements = elements.await?; + + let normalized_offsets = + vortex_array::arrays::PrimitiveArray::from_iter( + offsets_slice.iter().map(|v| *v - base), + ) + .into_array(); + + let validity = match (list_nullability, validity) { + (Nullability::NonNullable, _) => vortex_array::validity::Validity::NonNullable, + (Nullability::Nullable, Some(v)) => { + vortex_array::validity::Validity::Array(v) + } + (Nullability::Nullable, None) => vortex_array::validity::Validity::AllValid, + _ => vortex_array::validity::Validity::NonNullable, + }; + + Ok( + ListArray::try_new(elements, normalized_offsets, validity)? + .into_array(), + ) + })) + } + DType::FixedSizeList(element_dtype, list_size, list_nullability) => { + let elements_reader = self.elements()?.clone(); + let row_range_clone = row_range.clone(); + let element_expr = element_expr.clone(); + + Ok(Box::pin(async move { + let row_len_u64 = row_range_clone.end - row_range_clone.start; + let row_len = usize::try_from(row_len_u64) + .vortex_expect("row range must fit in usize"); + + let list_size_u64 = u64::from(list_size); + let element_start = row_range_clone + .start + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element start overflow"))?; + let element_end = row_range_clone + .end + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element end overflow"))?; + + let elements_row_range = element_start..element_end; + let elements_len = usize::try_from(element_end - element_start) + .vortex_expect("element range must fit in usize"); + let elements_fut = elements_reader.projection_evaluation( + &elements_row_range, + &element_expr, + MaskFuture::new_true(elements_len), + )?; + + let (elements, validity) = try_join!( + elements_fut, + async move { + match validity_fut { + Some(v) => v.await.map(Some), + None => Ok(None), + } + } + )?; + + let validity = match (list_nullability, validity) { + (Nullability::NonNullable, _) => vortex_array::validity::Validity::NonNullable, + (Nullability::Nullable, Some(v)) => { + vortex_array::validity::Validity::Array(v) + } + (Nullability::Nullable, None) => vortex_array::validity::Validity::AllValid, + _ => vortex_array::validity::Validity::NonNullable, + }; + + Ok( + FixedSizeListArray::try_new(elements, list_size, validity, row_len)? + .into_array(), + ) + })) + } + _ => Err(vortex_err!("Expected list dtype, got {}", dtype)), + } + } +} + +impl LayoutReader for ListReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + self.layout.dtype() + } + + fn row_count(&self) -> u64 { + self.layout.row_count() + } + + fn register_splits( + &self, + _field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + splits.insert(row_range.end); + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: Mask, + ) -> VortexResult { + Ok(MaskFuture::ready(mask)) + } + + fn filter_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let row_range = row_range.clone(); + let expr = expr.clone(); + let session = self.session.clone(); + + let list_fut = self.list_slice_future(row_range.clone(), &root())?; + + Ok(MaskFuture::new( + usize::try_from(row_range.end - row_range.start) + .vortex_expect("row range must fit in usize"), + async move { + let (array, mask) = try_join!(list_fut, mask)?; + if mask.all_false() { + return Ok(mask); + } + + let array = array.apply(&expr)?; + let mut ctx = session.create_execution_ctx(); + let array_mask = array.execute::(&mut ctx)?; + + Ok(mask.bitand(&array_mask)) + }, + )) + } + + fn projection_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + // If the expression is a simple column access or select, we can push it down to the elements. + let is_pushdown = matches!( + expr.vtable().id().as_ref(), + "vortex.get_item" | "vortex.select" + ); + + let row_range = row_range.clone(); + let expr = expr.clone(); + let root_expr = root(); + let list_fut = self.list_slice_future( + row_range.clone(), + if is_pushdown { &expr } else { &root_expr }, + )?; + + Ok(Box::pin(async move { + let (mut array, mask) = try_join!(list_fut, mask)?; + + // Apply the selection mask before applying the expression, matching `FlatReader`. + if !mask.all_true() { + array = array.filter(mask)?; + } + + if is_pushdown { + Ok(array) + } else { + array.apply(&expr) + } + })) + } +} diff --git a/vortex-layout/src/layouts/list/writer.rs b/vortex-layout/src/layouts/list/writer.rs new file mode 100644 index 00000000000..0f24b782449 --- /dev/null +++ b/vortex-layout/src/layouts/list/writer.rs @@ -0,0 +1,304 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; + +use async_trait::async_trait; +use futures::StreamExt; +use futures::future::try_join_all; +use futures::pin_mut; +use itertools::Itertools; +use vortex_array::Array; +use vortex_array::ArrayContext; +use vortex_array::IntoArray; +use vortex_array::ToCanonical; +use vortex_array::arrays::list_from_list_view; +use vortex_dtype::DType; +use vortex_dtype::Nullability; +use vortex_dtype::PType; +use vortex_dtype::match_each_integer_ptype; +use vortex_error::VortexError; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_io::kanal_ext::KanalExt; +use vortex_io::runtime::Handle; + +use crate::IntoLayout as _; +use crate::LayoutRef; +use crate::LayoutStrategy; +use crate::layouts::list::ListLayout; +use crate::segments::SegmentSinkRef; +use crate::sequence::SendableSequentialStream; +use crate::sequence::SequencePointer; +use crate::sequence::SequentialStreamAdapter; +use crate::sequence::SequentialStreamExt; + +/// A write strategy that performs component shredding for list types. +/// +/// - Variable-size lists are written as: +/// - optional validity (is_valid: bool) +/// - offsets (u64, length = rows + 1) +/// - elements (concatenated) +/// - Fixed-size lists are written as: +/// - optional validity (is_valid: bool) +/// - elements (concatenated) +#[derive(Clone)] +pub struct ListStrategy { + validity: Arc, + offsets: Arc, + elements: Arc, +} + +impl ListStrategy { + pub fn new( + validity: Arc, + offsets: Arc, + elements: Arc, + ) -> Self { + Self { + validity, + offsets, + elements, + } + } +} + +#[async_trait] +impl LayoutStrategy for ListStrategy { + async fn write_stream( + &self, + ctx: ArrayContext, + segment_sink: SegmentSinkRef, + stream: SendableSequentialStream, + mut eof: SequencePointer, + handle: Handle, + ) -> VortexResult { + let dtype = stream.dtype().clone(); + + let is_nullable = dtype.is_nullable(); + let offsets_dtype = DType::Primitive(PType::U64, Nullability::NonNullable); + + let (stream_count, column_dtypes): (usize, Vec) = match &dtype { + DType::List(element_dtype, _) => { + let mut dtypes = Vec::new(); + if is_nullable { + dtypes.push(DType::Bool(Nullability::NonNullable)); + } + dtypes.push(offsets_dtype.clone()); + dtypes.push((**element_dtype).clone()); + (dtypes.len(), dtypes) + } + DType::FixedSizeList(element_dtype, ..) => { + let mut dtypes = Vec::new(); + if is_nullable { + dtypes.push(DType::Bool(Nullability::NonNullable)); + } + dtypes.push((**element_dtype).clone()); + (dtypes.len(), dtypes) + } + _ => { + vortex_bail!("ListStrategy expected list dtype, got {}", dtype); + } + }; + + let (column_streams_tx, column_streams_rx): (Vec<_>, Vec<_>) = + (0..stream_count).map(|_| kanal::bounded_async(1)).unzip(); + + let total_rows = Arc::new(AtomicU64::new(0)); + + // Spawn a task to fan out chunk components to their respective transposed streams. + { + let total_rows = total_rows.clone(); + let dtype = dtype.clone(); + handle + .spawn(async move { + let mut base_elements: u64 = 0; + let mut first_offsets = true; + + pin_mut!(stream); + while let Some(result) = stream.next().await { + match result { + Ok((sequence_id, chunk)) => { + total_rows.fetch_add(chunk.len() as u64, Ordering::SeqCst); + + let mut sequence_pointer = sequence_id.descend(); + + // validity (optional) + if is_nullable { + let validity = chunk.validity_mask().into_array(); + let _ = column_streams_tx[0] + .send(Ok((sequence_pointer.advance(), validity))) + .await; + } + + match &dtype { + DType::List(..) => { + let list_view = chunk.to_listview(); + let list = match list_from_list_view(list_view) { + Ok(list) => list, + Err(e) => { + let e: Arc = Arc::new(e); + for tx in column_streams_tx.iter() { + let _ = tx + .send(Err(VortexError::from(e.clone()))) + .await; + } + break; + } + }; + + // Build global u64 offsets, dropping the leading 0 for all but the first chunk. + let offsets = list.offsets().to_primitive(); + let offsets_slice_u64: VortexResult> = + match_each_integer_ptype!(offsets.ptype(), |T| { + offsets + .as_slice::() + .iter() + .map(|v| { + u64::try_from(*v).map_err(|_| { + vortex_err!( + "List offsets must be convertible to u64" + ) + }) + }) + .collect() + }); + let offsets_slice_u64 = match offsets_slice_u64 { + Ok(v) => v, + Err(e) => { + let e: Arc = Arc::new(e); + for tx in column_streams_tx.iter() { + let _ = tx + .send(Err(VortexError::from(e.clone()))) + .await; + } + break; + } + }; + + let mut adjusted: Vec = Vec::with_capacity( + offsets_slice_u64 + .len() + .saturating_sub((!first_offsets) as usize), + ); + for (i, v) in offsets_slice_u64.into_iter().enumerate() { + if !first_offsets && i == 0 { + continue; + } + adjusted.push(v + base_elements); + } + + let offsets_arr = + vortex_array::arrays::PrimitiveArray::from_iter( + adjusted, + ) + .into_array(); + + // offsets index depends on nullable validity child + let offsets_idx = if is_nullable { 1 } else { 0 }; + let elements_idx = offsets_idx + 1; + + let _ = column_streams_tx[offsets_idx] + .send(Ok((sequence_pointer.advance(), offsets_arr))) + .await; + let _ = column_streams_tx[elements_idx] + .send(Ok(( + sequence_pointer.advance(), + list.elements().clone(), + ))) + .await; + + base_elements += list.elements().len() as u64; + first_offsets = false; + } + DType::FixedSizeList(..) => { + let list = chunk.to_fixed_size_list(); + + let elements_idx = if is_nullable { 1 } else { 0 }; + let _ = column_streams_tx[elements_idx] + .send(Ok(( + sequence_pointer.advance(), + list.elements().clone(), + ))) + .await; + } + _ => unreachable!(), + } + } + Err(e) => { + let e: Arc = Arc::new(e); + for tx in column_streams_tx.iter() { + let _ = tx.send(Err(VortexError::from(e.clone()))).await; + } + break; + } + } + } + }) + .detach(); + } + + let layout_futures: Vec<_> = column_dtypes + .into_iter() + .zip_eq(column_streams_rx) + .enumerate() + .map(|(index, (dtype, recv))| { + let column_stream = + SequentialStreamAdapter::new(dtype.clone(), recv.into_stream().boxed()) + .sendable(); + let child_eof = eof.split_off(); + handle.spawn_nested(|h| { + let validity = self.validity.clone(); + let offsets = self.offsets.clone(); + let elements = self.elements.clone(); + let ctx = ctx.clone(); + let segment_sink = segment_sink.clone(); + async move { + if is_nullable && index == 0 { + validity + .write_stream(ctx, segment_sink, column_stream, child_eof, h) + .await + } else if matches!(dtype, DType::Primitive(PType::U64, _)) { + offsets + .write_stream(ctx, segment_sink, column_stream, child_eof, h) + .await + } else { + elements + .write_stream(ctx, segment_sink, column_stream, child_eof, h) + .await + } + } + }) + }) + .collect(); + + let children = try_join_all(layout_futures).await?; + + let row_count = total_rows.load(Ordering::SeqCst); + + // Basic invariant: for variable-size lists, offsets must have row_count + 1 entries. + if matches!(dtype, DType::List(..)) { + let offsets_layout = if is_nullable { + &children[1] + } else { + &children[0] + }; + vortex_ensure!( + offsets_layout.row_count() == row_count + 1, + "ListLayout offsets row_count {} does not match list row_count + 1 ({})", + offsets_layout.row_count(), + row_count + 1 + ); + } + + Ok(ListLayout::new(row_count, dtype, children).into_layout()) + } + + fn buffered_bytes(&self) -> u64 { + self.elements.buffered_bytes() + } +} diff --git a/vortex-layout/src/layouts/mod.rs b/vortex-layout/src/layouts/mod.rs index 61193c54b6e..e33d33cf7e8 100644 --- a/vortex-layout/src/layouts/mod.rs +++ b/vortex-layout/src/layouts/mod.rs @@ -17,6 +17,7 @@ pub mod compressed; pub mod dict; pub mod file_stats; pub mod flat; +pub mod list; pub(crate) mod partitioned; pub mod repartition; pub mod row_idx; diff --git a/vortex-layout/src/layouts/table.rs b/vortex-layout/src/layouts/table.rs index 3ccb9c7b04a..3104d71e40a 100644 --- a/vortex-layout/src/layouts/table.rs +++ b/vortex-layout/src/layouts/table.rs @@ -34,6 +34,7 @@ use crate::IntoLayout; use crate::LayoutRef; use crate::LayoutStrategy; use crate::layouts::flat::writer::FlatLayoutStrategy; +use crate::layouts::list::writer::ListStrategy; use crate::layouts::struct_::StructLayout; use crate::segments::SegmentSinkRef; use crate::sequence::SendableSequentialStream; @@ -97,8 +98,7 @@ impl TableStrategy { /// /// ```no_run /// # use std::sync::Arc; - /// # use vortex_dtype::{field_path, Field, FieldPath}; - /// # use vortex_layout::layouts::compact::CompactCompressor; + /// # use vortex_dtype::field_path; /// # use vortex_layout::layouts::compressed::CompressingStrategy; /// # use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; /// # use vortex_layout::layouts::table::TableStrategy; @@ -106,19 +106,15 @@ impl TableStrategy { /// // A strategy for compressing data using the balanced BtrBlocks compressor. /// let compress_btrblocks = CompressingStrategy::new_btrblocks(FlatLayoutStrategy::default(), true); /// - /// // A strategy that compresses data using ZSTD - /// let compress_compact = CompressingStrategy::new_compact(FlatLayoutStrategy::default(), CompactCompressor::default()); - /// /// // Our combined strategy uses no compression for validity buffers, BtrBlocks compression - /// // for most columns, and will use ZSTD compression for a nested binary column that we know - /// // is never filtered in. + /// // for most columns, and will leave a nested binary column uncompressed. /// let strategy = TableStrategy::new( /// Arc::new(FlatLayoutStrategy::default()), /// Arc::new(compress_btrblocks), /// ) /// .with_field_writer( /// field_path!(request.body.bytes), - /// Arc::new(compress_compact), + /// Arc::new(FlatLayoutStrategy::default()), /// ); /// ``` pub fn with_field_writer( @@ -336,6 +332,15 @@ impl LayoutStrategy for TableStrategy { if dtype.is_struct() { // Step into the field path for struct columns Arc::new(self.descend(&field)) + } else if matches!(dtype, DType::List(..) | DType::FixedSizeList(..)) { + // Component shredding for lists: descend into the element type. + let elements = + Arc::new(self.descend(&field).descend(&Field::ElementType)); + Arc::new(ListStrategy::new( + self.validity.clone(), + self.fallback.clone(), + elements, + )) } else { // Use fallback for leaf columns self.fallback.clone() diff --git a/vortex-layout/src/session.rs b/vortex-layout/src/session.rs index 0250a855084..54ce228e459 100644 --- a/vortex-layout/src/session.rs +++ b/vortex-layout/src/session.rs @@ -9,6 +9,7 @@ use crate::LayoutEncodingRef; use crate::layouts::chunked::ChunkedLayoutEncoding; use crate::layouts::dict::DictLayoutEncoding; use crate::layouts::flat::FlatLayoutEncoding; +use crate::layouts::list::ListLayoutEncoding; use crate::layouts::struct_::StructLayoutEncoding; use crate::layouts::zoned::ZonedLayoutEncoding; @@ -45,6 +46,7 @@ impl Default for LayoutSession { layouts.register_many([ LayoutEncodingRef::new_ref(ChunkedLayoutEncoding.as_ref()), LayoutEncodingRef::new_ref(FlatLayoutEncoding.as_ref()), + LayoutEncodingRef::new_ref(ListLayoutEncoding.as_ref()), LayoutEncodingRef::new_ref(StructLayoutEncoding.as_ref()), LayoutEncodingRef::new_ref(ZonedLayoutEncoding.as_ref()), LayoutEncodingRef::new_ref(DictLayoutEncoding.as_ref()), From 0e943379777b05ca3dcfb838a9cfc4a7a1aa72a4 Mon Sep 17 00:00:00 2001 From: godnight10061 Date: Sat, 17 Jan 2026 12:34:55 +0800 Subject: [PATCH 2/8] Add unit regression for list-of-struct get_item --- vortex-array/src/expr/exprs/get_item.rs | 87 +++++++++++++++++++++++++ vortex-file/tests/test_write_table.rs | 17 +---- 2 files changed, 89 insertions(+), 15 deletions(-) diff --git a/vortex-array/src/expr/exprs/get_item.rs b/vortex-array/src/expr/exprs/get_item.rs index 23c8df71b1c..29e58e63a44 100644 --- a/vortex-array/src/expr/exprs/get_item.rs +++ b/vortex-array/src/expr/exprs/get_item.rs @@ -486,6 +486,8 @@ pub fn get_item(field: impl Into, child: Expression) -> Expression { #[cfg(test)] mod tests { + use std::sync::Arc; + use vortex_buffer::buffer; use vortex_dtype::DType; use vortex_dtype::FieldNames; @@ -497,6 +499,7 @@ mod tests { use crate::Array; use crate::IntoArray; + use crate::arrays::ListArray; use crate::arrays::StructArray; use crate::expr::exprs::binary::checked_add; use crate::expr::exprs::get_item::get_item; @@ -547,6 +550,90 @@ mod tests { ); } + #[test] + fn get_item_list_of_struct() { + let element_dtype = Arc::new(DType::Struct( + [ + ("a", DType::Primitive(PType::I32, NonNullable)), + ("b", DType::Utf8(NonNullable)), + ] + .into_iter() + .collect(), + NonNullable, + )); + + let row_count = 4; + let items = ListArray::from_iter_opt_slow::( + [ + Some(vec![ + Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(1i32, NonNullable), + Scalar::utf8("x", NonNullable), + ], + ), + Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(2i32, NonNullable), + Scalar::utf8("y", NonNullable), + ], + ), + ]), + Some(Vec::new()), + None, + Some(vec![Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(3i32, NonNullable), + Scalar::utf8("z", NonNullable), + ], + )]), + ], + element_dtype, + ) + .unwrap(); + + let ids = buffer![0i32, 1, 2, 3].into_array(); + + let data = StructArray::new( + FieldNames::from(["id", "items"]), + vec![ids, items], + row_count, + Validity::NonNullable, + ) + .into_array(); + + // Regression for nested field projection on list-of-struct: `items.a`. + // The key path here is `Array::apply(...)`, which must be able to infer the correct return + // dtype for `GetItem` when its input is a list with struct elements. + let projection = get_item("a", get_item("items", root())); + let out = data.apply(&projection).expect("apply"); + + assert_eq!( + out.dtype(), + &DType::List( + Arc::new(DType::Primitive(PType::I32, NonNullable)), + Nullability::Nullable + ) + ); + + assert_eq!( + out.scalar_at(0).as_list().elements().unwrap().to_vec(), + vec![ + Scalar::primitive(1i32, NonNullable), + Scalar::primitive(2i32, NonNullable), + ] + ); + assert!(out.scalar_at(1).as_list().elements().unwrap().is_empty()); + assert!(out.scalar_at(2).is_null()); + assert_eq!( + out.scalar_at(3).as_list().elements().unwrap().to_vec(), + vec![Scalar::primitive(3i32, NonNullable)] + ); + } + #[test] fn test_pack_get_item_rule() { // Create: pack(a: lit(1), b: lit(2)).get_item("b") diff --git a/vortex-file/tests/test_write_table.rs b/vortex-file/tests/test_write_table.rs index c468b2f65f3..cd97f829566 100644 --- a/vortex-file/tests/test_write_table.rs +++ b/vortex-file/tests/test_write_table.rs @@ -205,24 +205,11 @@ async fn test_list_of_struct_nested_projection() { let out = stream.next().await.expect("first batch").expect("batch"); - // The output is a nullable list with the same outer validity/offsets as `items`. + // Smoke-check the projected shape; detailed value semantics are covered by unit tests in + // `vortex-array`. assert_eq!(out.len(), row_count); assert!(matches!(out.dtype(), vortex_dtype::DType::List(_, Nullability::Nullable))); - assert_eq!( - out.scalar_at(0).as_list().elements().unwrap().to_vec(), - vec![ - Scalar::primitive(1i32, Nullability::NonNullable), - Scalar::primitive(2i32, Nullability::NonNullable), - ] - ); - assert!(out.scalar_at(1).as_list().elements().unwrap().is_empty()); - assert!(out.scalar_at(2).is_null()); - assert_eq!( - out.scalar_at(3).as_list().elements().unwrap().to_vec(), - vec![Scalar::primitive(3i32, Nullability::NonNullable)] - ); - // Verify the list column is not stored as a single flat blob layout. // This is the root cause of poor nested support described in #4889. let root_layout = vxf.footer().layout(); From 11f06b6b4c6c87bb2d3602d2336bf03f7a799364 Mon Sep 17 00:00:00 2001 From: godnight10061 Date: Sat, 17 Jan 2026 13:55:47 +0800 Subject: [PATCH 3/8] Fix lints in list layout and projections --- vortex-array/src/expr/exprs/get_item.rs | 33 +++++------- vortex-file/tests/test_write_table.rs | 17 ++++-- vortex-layout/src/layouts/list/mod.rs | 7 ++- vortex-layout/src/layouts/list/reader.rs | 66 ++++++++++------------- vortex-layout/src/layouts/list/writer.rs | 69 +++++++++++++++++++++--- 5 files changed, 116 insertions(+), 76 deletions(-) diff --git a/vortex-array/src/expr/exprs/get_item.rs b/vortex-array/src/expr/exprs/get_item.rs index 29e58e63a44..dceea0f8a6b 100644 --- a/vortex-array/src/expr/exprs/get_item.rs +++ b/vortex-array/src/expr/exprs/get_item.rs @@ -99,12 +99,9 @@ impl VTable for GetItem { // Struct field access: `$.a`. if let Some(struct_fields) = input_dtype.as_struct_fields_opt() { - let field_dtype = struct_fields - .field(field_name) - .ok_or_else(|| { - vortex_err!("Couldn't find the {} field in the input scope", field_name) - })? - .clone(); + let field_dtype = struct_fields.field(field_name).ok_or_else(|| { + vortex_err!("Couldn't find the {} field in the input scope", field_name) + })?; // Match here to avoid cloning the dtype if nullability doesn't need to change if matches!( @@ -127,12 +124,9 @@ impl VTable for GetItem { element_dtype ) })?; - let mut field_dtype = struct_fields - .field(field_name) - .ok_or_else(|| { - vortex_err!("Couldn't find the {} field in the input scope", field_name) - })? - .clone(); + let mut field_dtype = struct_fields.field(field_name).ok_or_else(|| { + vortex_err!("Couldn't find the {} field in the input scope", field_name) + })?; // If the struct elements are nullable, we must propagate that nullability to the // extracted field. @@ -153,12 +147,9 @@ impl VTable for GetItem { element_dtype ) })?; - let mut field_dtype = struct_fields - .field(field_name) - .ok_or_else(|| { - vortex_err!("Couldn't find the {} field in the input scope", field_name) - })? - .clone(); + let mut field_dtype = struct_fields.field(field_name).ok_or_else(|| { + vortex_err!("Couldn't find the {} field in the input scope", field_name) + })?; if matches!( (element_dtype.nullability(), field_dtype.nullability()), @@ -314,9 +305,9 @@ impl VTable for GetItem { } } DType::FixedSizeList(element_dtype, list_size, _) => { - let struct_dtype = element_dtype - .as_struct_fields_opt() - .ok_or_else(|| vortex_err!("Expected fixed-size list element dtype to be struct"))?; + let struct_dtype = element_dtype.as_struct_fields_opt().ok_or_else(|| { + vortex_err!("Expected fixed-size list element dtype to be struct") + })?; let field_idx = struct_dtype .find(field_name) .ok_or_else(|| vortex_err!("Field {} not found in struct dtype", field_name))?; diff --git a/vortex-file/tests/test_write_table.rs b/vortex-file/tests/test_write_table.rs index cd97f829566..706e65a619a 100644 --- a/vortex-file/tests/test_write_table.rs +++ b/vortex-file/tests/test_write_table.rs @@ -32,8 +32,8 @@ use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; use vortex_layout::layouts::table::TableStrategy; use vortex_layout::session::LayoutSession; use vortex_metrics::VortexMetrics; -use vortex_session::VortexSession; use vortex_scalar::Scalar; +use vortex_session::VortexSession; static SESSION: LazyLock = LazyLock::new(|| { let mut session = VortexSession::empty() @@ -128,7 +128,10 @@ async fn test_list_of_struct_nested_projection() { let element_dtype = Arc::new(vortex_dtype::DType::Struct( [ - ("a", vortex_dtype::DType::Primitive(PType::I32, Nullability::NonNullable)), + ( + "a", + vortex_dtype::DType::Primitive(PType::I32, Nullability::NonNullable), + ), ("b", vortex_dtype::DType::Utf8(Nullability::NonNullable)), ] .into_iter() @@ -169,7 +172,10 @@ async fn test_list_of_struct_nested_projection() { ) .unwrap(); - let ids = PrimitiveArray::from_iter((0..row_count).map(|i| i as i32)).into_array(); + let ids = PrimitiveArray::from_iter( + (0..row_count).map(|i| i32::try_from(i).expect("row id fits in i32")), + ) + .into_array(); let data = StructArray::new( FieldNames::from(["id", "items"]), @@ -208,7 +214,10 @@ async fn test_list_of_struct_nested_projection() { // Smoke-check the projected shape; detailed value semantics are covered by unit tests in // `vortex-array`. assert_eq!(out.len(), row_count); - assert!(matches!(out.dtype(), vortex_dtype::DType::List(_, Nullability::Nullable))); + assert!(matches!( + out.dtype(), + vortex_dtype::DType::List(_, Nullability::Nullable) + )); // Verify the list column is not stored as a single flat blob layout. // This is the root cause of poor nested support described in #4889. diff --git a/vortex-layout/src/layouts/list/mod.rs b/vortex-layout/src/layouts/list/mod.rs index d54c3d4ba9c..9a551779d23 100644 --- a/vortex-layout/src/layouts/list/mod.rs +++ b/vortex-layout/src/layouts/list/mod.rs @@ -66,7 +66,7 @@ impl VTable for ListVTable { fn nchildren(layout: &Self::Layout) -> usize { let validity_children = layout.dtype.is_nullable() as usize; match layout.dtype { - DType::List(..) => 2 + validity_children, // offsets + elements + DType::List(..) => 2 + validity_children, // offsets + elements DType::FixedSizeList(..) => 1 + validity_children, // elements _ => 0, } @@ -81,9 +81,9 @@ impl VTable for ListVTable { (_, true, 0) => DType::Bool(Nullability::NonNullable), // variable-size list - (DType::List(element_dtype, _), false, 0) => offsets_dtype, + (DType::List(..), false, 0) => offsets_dtype, (DType::List(element_dtype, _), false, 1) => (*element_dtype.as_ref()).clone(), - (DType::List(element_dtype, _), true, 1) => offsets_dtype, + (DType::List(..), true, 1) => offsets_dtype, (DType::List(element_dtype, _), true, 2) => (*element_dtype.as_ref()).clone(), // fixed-size list @@ -218,4 +218,3 @@ impl ListLayout { &self.children } } - diff --git a/vortex-layout/src/layouts/list/reader.rs b/vortex-layout/src/layouts/list/reader.rs index ccec676b1d8..6106f5ea72a 100644 --- a/vortex-layout/src/layouts/list/reader.rs +++ b/vortex-layout/src/layouts/list/reader.rs @@ -6,10 +6,8 @@ use std::ops::BitAnd; use std::ops::Range; use std::sync::Arc; -use futures::future::BoxFuture; use futures::try_join; use vortex_array::Array; -use vortex_array::ArrayRef; use vortex_array::IntoArray; use vortex_array::MaskFuture; use vortex_array::ToCanonical; @@ -137,7 +135,7 @@ impl ListReader { .transpose()?; match dtype { - DType::List(element_dtype, list_nullability) => { + DType::List(_, list_nullability) => { let offsets_reader = self.offsets()?.clone(); let elements_reader = self.elements()?.clone(); let row_range_clone = row_range.clone(); @@ -155,15 +153,12 @@ impl ListReader { MaskFuture::new_true(offsets_len), )?; - let (offsets, validity) = try_join!( - offsets_fut, - async move { - match validity_fut { - Some(v) => v.await.map(Some), - None => Ok(None), - } + let (offsets, validity) = try_join!(offsets_fut, async move { + match validity_fut { + Some(v) => v.await.map(Some), + None => Ok(None), } - )?; + })?; let offsets = offsets.to_primitive(); let offsets_slice = offsets.as_slice::(); @@ -181,36 +176,33 @@ impl ListReader { let elements = elements.await?; - let normalized_offsets = - vortex_array::arrays::PrimitiveArray::from_iter( - offsets_slice.iter().map(|v| *v - base), - ) - .into_array(); + let normalized_offsets = vortex_array::arrays::PrimitiveArray::from_iter( + offsets_slice.iter().map(|v| *v - base), + ) + .into_array(); let validity = match (list_nullability, validity) { - (Nullability::NonNullable, _) => vortex_array::validity::Validity::NonNullable, + (Nullability::NonNullable, _) => { + vortex_array::validity::Validity::NonNullable + } (Nullability::Nullable, Some(v)) => { vortex_array::validity::Validity::Array(v) } (Nullability::Nullable, None) => vortex_array::validity::Validity::AllValid, - _ => vortex_array::validity::Validity::NonNullable, }; - Ok( - ListArray::try_new(elements, normalized_offsets, validity)? - .into_array(), - ) + Ok(ListArray::try_new(elements, normalized_offsets, validity)?.into_array()) })) } - DType::FixedSizeList(element_dtype, list_size, list_nullability) => { + DType::FixedSizeList(_, list_size, list_nullability) => { let elements_reader = self.elements()?.clone(); let row_range_clone = row_range.clone(); let element_expr = element_expr.clone(); Ok(Box::pin(async move { let row_len_u64 = row_range_clone.end - row_range_clone.start; - let row_len = usize::try_from(row_len_u64) - .vortex_expect("row range must fit in usize"); + let row_len = + usize::try_from(row_len_u64).vortex_expect("row range must fit in usize"); let list_size_u64 = u64::from(list_size); let element_start = row_range_clone @@ -231,23 +223,21 @@ impl ListReader { MaskFuture::new_true(elements_len), )?; - let (elements, validity) = try_join!( - elements_fut, - async move { - match validity_fut { - Some(v) => v.await.map(Some), - None => Ok(None), - } + let (elements, validity) = try_join!(elements_fut, async move { + match validity_fut { + Some(v) => v.await.map(Some), + None => Ok(None), } - )?; + })?; let validity = match (list_nullability, validity) { - (Nullability::NonNullable, _) => vortex_array::validity::Validity::NonNullable, + (Nullability::NonNullable, _) => { + vortex_array::validity::Validity::NonNullable + } (Nullability::Nullable, Some(v)) => { vortex_array::validity::Validity::Array(v) } (Nullability::Nullable, None) => vortex_array::validity::Validity::AllValid, - _ => vortex_array::validity::Validity::NonNullable, }; Ok( @@ -338,10 +328,8 @@ impl LayoutReader for ListReader { let row_range = row_range.clone(); let expr = expr.clone(); let root_expr = root(); - let list_fut = self.list_slice_future( - row_range.clone(), - if is_pushdown { &expr } else { &root_expr }, - )?; + let list_fut = + self.list_slice_future(row_range, if is_pushdown { &expr } else { &root_expr })?; Ok(Box::pin(async move { let (mut array, mask) = try_join!(list_fut, mask)?; diff --git a/vortex-layout/src/layouts/list/writer.rs b/vortex-layout/src/layouts/list/writer.rs index 0f24b782449..4551ac0c10b 100644 --- a/vortex-layout/src/layouts/list/writer.rs +++ b/vortex-layout/src/layouts/list/writer.rs @@ -18,7 +18,6 @@ use vortex_array::arrays::list_from_list_view; use vortex_dtype::DType; use vortex_dtype::Nullability; use vortex_dtype::PType; -use vortex_dtype::match_each_integer_ptype; use vortex_error::VortexError; use vortex_error::VortexResult; use vortex_error::vortex_bail; @@ -154,19 +153,73 @@ impl LayoutStrategy for ListStrategy { // Build global u64 offsets, dropping the leading 0 for all but the first chunk. let offsets = list.offsets().to_primitive(); let offsets_slice_u64: VortexResult> = - match_each_integer_ptype!(offsets.ptype(), |T| { - offsets - .as_slice::() + match offsets.ptype() { + PType::U8 => Ok(offsets + .as_slice::() .iter() - .map(|v| { - u64::try_from(*v).map_err(|_| { + .map(|&v| u64::from(v)) + .collect()), + PType::U16 => Ok(offsets + .as_slice::() + .iter() + .map(|&v| u64::from(v)) + .collect()), + PType::U32 => Ok(offsets + .as_slice::() + .iter() + .map(|&v| u64::from(v)) + .collect()), + PType::U64 => Ok(offsets + .as_slice::() + .to_vec()), + PType::I8 => offsets + .as_slice::() + .iter() + .map(|&v| { + u64::try_from(v).map_err(|_| { + vortex_err!( + "List offsets must be convertible to u64" + ) + }) + }) + .collect(), + PType::I16 => offsets + .as_slice::() + .iter() + .map(|&v| { + u64::try_from(v).map_err(|_| { + vortex_err!( + "List offsets must be convertible to u64" + ) + }) + }) + .collect(), + PType::I32 => offsets + .as_slice::() + .iter() + .map(|&v| { + u64::try_from(v).map_err(|_| { + vortex_err!( + "List offsets must be convertible to u64" + ) + }) + }) + .collect(), + PType::I64 => offsets + .as_slice::() + .iter() + .map(|&v| { + u64::try_from(v).map_err(|_| { vortex_err!( "List offsets must be convertible to u64" ) }) }) - .collect() - }); + .collect(), + other => Err(vortex_err!( + "List offsets must be an integer type, got {other}" + )), + }; let offsets_slice_u64 = match offsets_slice_u64 { Ok(v) => v, Err(e) => { From 586550e0535815f088b6624042e81f4e72f34998 Mon Sep 17 00:00:00 2001 From: godnight10061 Date: Tue, 20 Jan 2026 18:30:07 +0800 Subject: [PATCH 4/8] Refactor list projection internals --- vortex-array/src/expr/exprs/get_item.rs | 159 ++++++++++------------- vortex-layout/src/layouts/list/mod.rs | 9 +- vortex-layout/src/layouts/list/writer.rs | 115 ++++++++-------- 3 files changed, 125 insertions(+), 158 deletions(-) diff --git a/vortex-array/src/expr/exprs/get_item.rs b/vortex-array/src/expr/exprs/get_item.rs index dceea0f8a6b..c2559b41ddf 100644 --- a/vortex-array/src/expr/exprs/get_item.rs +++ b/vortex-array/src/expr/exprs/get_item.rs @@ -52,6 +52,37 @@ use crate::vtable::ValidityHelper; pub struct GetItem; +fn propagate_nullability(parent_nullability: Nullability, field_dtype: DType) -> DType { + if matches!( + (parent_nullability, field_dtype.nullability()), + (Nullability::Nullable, Nullability::NonNullable) + ) { + return field_dtype.with_nullability(Nullability::Nullable); + } + + field_dtype +} + +fn list_struct_field_dtype( + element_dtype: &DType, + field_name: &FieldName, + list_kind: &'static str, +) -> VortexResult { + let struct_fields = element_dtype.as_struct_fields_opt().ok_or_else(|| { + vortex_err!( + "Expected {list_kind} element dtype to be a struct for GetItem, got {element_dtype}", + ) + })?; + let field_dtype = struct_fields + .field(field_name) + .ok_or_else(|| vortex_err!("Couldn't find the {} field in the input scope", field_name))?; + + Ok(propagate_nullability( + element_dtype.nullability(), + field_dtype, + )) +} + impl VTable for GetItem { type Options = FieldName; @@ -103,61 +134,22 @@ impl VTable for GetItem { vortex_err!("Couldn't find the {} field in the input scope", field_name) })?; - // Match here to avoid cloning the dtype if nullability doesn't need to change - if matches!( - (input_dtype.nullability(), field_dtype.nullability()), - (Nullability::Nullable, Nullability::NonNullable) - ) { - return Ok(field_dtype.with_nullability(Nullability::Nullable)); - } - - return Ok(field_dtype); + return Ok(propagate_nullability( + input_dtype.nullability(), + field_dtype, + )); } // List-of-struct field access: `$.items.a` where `items: list`. match input_dtype { DType::List(element_dtype, list_nullability) => { - let element_dtype = element_dtype.as_ref(); - let struct_fields = element_dtype.as_struct_fields_opt().ok_or_else(|| { - vortex_err!( - "Expected list element dtype to be a struct for GetItem, got {}", - element_dtype - ) - })?; - let mut field_dtype = struct_fields.field(field_name).ok_or_else(|| { - vortex_err!("Couldn't find the {} field in the input scope", field_name) - })?; - - // If the struct elements are nullable, we must propagate that nullability to the - // extracted field. - if matches!( - (element_dtype.nullability(), field_dtype.nullability()), - (Nullability::Nullable, Nullability::NonNullable) - ) { - field_dtype = field_dtype.with_nullability(Nullability::Nullable); - } - + let field_dtype = + list_struct_field_dtype(element_dtype.as_ref(), field_name, "list")?; Ok(DType::List(Arc::new(field_dtype), *list_nullability)) } DType::FixedSizeList(element_dtype, list_size, list_nullability) => { - let element_dtype = element_dtype.as_ref(); - let struct_fields = element_dtype.as_struct_fields_opt().ok_or_else(|| { - vortex_err!( - "Expected fixed-size list element dtype to be a struct for GetItem, got {}", - element_dtype - ) - })?; - let mut field_dtype = struct_fields.field(field_name).ok_or_else(|| { - vortex_err!("Couldn't find the {} field in the input scope", field_name) - })?; - - if matches!( - (element_dtype.nullability(), field_dtype.nullability()), - (Nullability::Nullable, Nullability::NonNullable) - ) { - field_dtype = field_dtype.with_nullability(Nullability::Nullable); - } - + let field_dtype = + list_struct_field_dtype(element_dtype.as_ref(), field_name, "fixed-size list")?; Ok(DType::FixedSizeList( Arc::new(field_dtype), *list_size, @@ -266,41 +258,30 @@ impl VTable for GetItem { .find(field_name) .ok_or_else(|| vortex_err!("Field {} not found in struct dtype", field_name))?; + let project_list = |list: ListViewVector| -> VortexResult { + let (elements, offsets, sizes, validity) = list.into_parts(); + + let struct_elements = elements.as_ref().as_struct(); + let mut field = struct_elements.fields()[field_idx].clone(); + field.mask_validity(struct_elements.validity()); + + ListViewVector::try_new(Arc::new(field), offsets, sizes, validity) + }; + match args.datums.pop().vortex_expect("missing input") { Datum::Scalar(s) => { if !s.is_valid() { return Ok(Datum::Scalar(VScalar::null(&args.return_dtype))); } - let list = s.into_list(); - let list_vec = list.value().clone(); - let (elements, offsets, sizes, validity) = list_vec.into_parts(); - - let struct_elements = elements.as_ref().as_struct(); - let mut field = struct_elements.fields()[field_idx].clone(); - field.mask_validity(struct_elements.validity()); - Ok(Datum::Scalar( - ListViewScalar::new(ListViewVector::try_new( - Arc::new(field), - offsets, - sizes, - validity, - )?) - .into(), + ListViewScalar::new(project_list(s.into_list().value().clone())?) + .into(), )) } Datum::Vector(v) => { let list: ListViewVector = v.into_list(); - let (elements, offsets, sizes, validity) = list.into_parts(); - let struct_elements = elements.as_ref().as_struct(); - let mut field = struct_elements.fields()[field_idx].clone(); - field.mask_validity(struct_elements.validity()); - - Ok(Datum::Vector( - ListViewVector::try_new(Arc::new(field), offsets, sizes, validity)? - .into(), - )) + Ok(Datum::Vector(project_list(list)?.into())) } } } @@ -312,41 +293,33 @@ impl VTable for GetItem { .find(field_name) .ok_or_else(|| vortex_err!("Field {} not found in struct dtype", field_name))?; + let project_list = + |list: FixedSizeListVector| -> VortexResult { + let (elements, _vector_list_size, validity) = list.into_parts(); + + let struct_elements = elements.as_ref().as_struct(); + let mut field = struct_elements.fields()[field_idx].clone(); + field.mask_validity(struct_elements.validity()); + + FixedSizeListVector::try_new(Arc::new(field), *list_size, validity) + }; + match args.datums.pop().vortex_expect("missing input") { Datum::Scalar(s) => { if !s.is_valid() { return Ok(Datum::Scalar(VScalar::null(&args.return_dtype))); } - let list = s.into_fixed_size_list(); - let list_vec = list.value().clone(); - let (elements, _vector_list_size, validity) = list_vec.into_parts(); - - let struct_elements = elements.as_ref().as_struct(); - let mut field = struct_elements.fields()[field_idx].clone(); - field.mask_validity(struct_elements.validity()); - Ok(Datum::Scalar( - FixedSizeListScalar::new(FixedSizeListVector::try_new( - Arc::new(field), - *list_size, - validity, + FixedSizeListScalar::new(project_list( + s.into_fixed_size_list().value().clone(), )?) .into(), )) } Datum::Vector(v) => { let list = v.into_fixed_size_list(); - let (elements, _vector_list_size, validity) = list.into_parts(); - - let struct_elements = elements.as_ref().as_struct(); - let mut field = struct_elements.fields()[field_idx].clone(); - field.mask_validity(struct_elements.validity()); - - Ok(Datum::Vector( - FixedSizeListVector::try_new(Arc::new(field), *list_size, validity)? - .into(), - )) + Ok(Datum::Vector(project_list(list)?.into())) } } } diff --git a/vortex-layout/src/layouts/list/mod.rs b/vortex-layout/src/layouts/list/mod.rs index 9a551779d23..cbb9e532324 100644 --- a/vortex-layout/src/layouts/list/mod.rs +++ b/vortex-layout/src/layouts/list/mod.rs @@ -65,7 +65,7 @@ impl VTable for ListVTable { fn nchildren(layout: &Self::Layout) -> usize { let validity_children = layout.dtype.is_nullable() as usize; - match layout.dtype { + match &layout.dtype { DType::List(..) => 2 + validity_children, // offsets + elements DType::FixedSizeList(..) => 1 + validity_children, // elements _ => 0, @@ -105,7 +105,7 @@ impl VTable for ListVTable { return LayoutChildType::Auxiliary("validity".into()); } - match layout.dtype { + match &layout.dtype { DType::List(..) => { let offsets_idx = if is_nullable { 1 } else { 0 }; if idx == offsets_idx { @@ -115,7 +115,10 @@ impl VTable for ListVTable { } } DType::FixedSizeList(..) => LayoutChildType::Auxiliary("elements".into()), - _ => LayoutChildType::Auxiliary("unknown".into()), + _ => unreachable!( + "ListLayout only supports List and FixedSizeList dtypes, got {}", + layout.dtype() + ), } } diff --git a/vortex-layout/src/layouts/list/writer.rs b/vortex-layout/src/layouts/list/writer.rs index 4551ac0c10b..d75513b313e 100644 --- a/vortex-layout/src/layouts/list/writer.rs +++ b/vortex-layout/src/layouts/list/writer.rs @@ -36,6 +36,34 @@ use crate::sequence::SequencePointer; use crate::sequence::SequentialStreamAdapter; use crate::sequence::SequentialStreamExt; +trait ToU64 { + fn to_u64(self) -> u64; +} + +impl ToU64 for u8 { + fn to_u64(self) -> u64 { + u64::from(self) + } +} + +impl ToU64 for u16 { + fn to_u64(self) -> u64 { + u64::from(self) + } +} + +impl ToU64 for u32 { + fn to_u64(self) -> u64 { + u64::from(self) + } +} + +impl ToU64 for u64 { + fn to_u64(self) -> u64 { + self + } +} + /// A write strategy that performs component shredding for list types. /// /// - Variable-size lists are written as: @@ -154,68 +182,31 @@ impl LayoutStrategy for ListStrategy { let offsets = list.offsets().to_primitive(); let offsets_slice_u64: VortexResult> = match offsets.ptype() { - PType::U8 => Ok(offsets - .as_slice::() - .iter() - .map(|&v| u64::from(v)) - .collect()), - PType::U16 => Ok(offsets - .as_slice::() - .iter() - .map(|&v| u64::from(v)) - .collect()), - PType::U32 => Ok(offsets - .as_slice::() - .iter() - .map(|&v| u64::from(v)) - .collect()), - PType::U64 => Ok(offsets - .as_slice::() - .to_vec()), - PType::I8 => offsets - .as_slice::() - .iter() - .map(|&v| { - u64::try_from(v).map_err(|_| { - vortex_err!( - "List offsets must be convertible to u64" - ) - }) - }) - .collect(), - PType::I16 => offsets - .as_slice::() - .iter() - .map(|&v| { - u64::try_from(v).map_err(|_| { - vortex_err!( - "List offsets must be convertible to u64" - ) - }) - }) - .collect(), - PType::I32 => offsets - .as_slice::() - .iter() - .map(|&v| { - u64::try_from(v).map_err(|_| { - vortex_err!( - "List offsets must be convertible to u64" - ) - }) - }) - .collect(), - PType::I64 => offsets - .as_slice::() - .iter() - .map(|&v| { - u64::try_from(v).map_err(|_| { - vortex_err!( - "List offsets must be convertible to u64" - ) - }) - }) - .collect(), + ptype if ptype.is_unsigned_int() => vortex_dtype::match_each_unsigned_integer_ptype!(ptype, |T| { + Ok(offsets + .as_slice::() + .iter() + .map(|&v| v.to_u64()) + .collect()) + }), + ptype if ptype.is_signed_int() => { + vortex_dtype::match_each_signed_integer_ptype!( + ptype, + |T| { + offsets + .as_slice::() + .iter() + .map(|&v| { + u64::try_from(v).map_err(|_| { + vortex_err!( + "List offsets must be convertible to u64" + ) + }) + }) + .collect() + } + ) + } other => Err(vortex_err!( "List offsets must be an integer type, got {other}" )), From 3e0575a201f0e32e26e27c34788f44e5409d8acf Mon Sep 17 00:00:00 2001 From: godnight10061 Date: Tue, 20 Jan 2026 20:19:51 +0800 Subject: [PATCH 5/8] Refactor GetItem list projections --- vortex-array/src/expr/exprs/get_item.rs | 160 ++++++++++++++++++------ 1 file changed, 120 insertions(+), 40 deletions(-) diff --git a/vortex-array/src/expr/exprs/get_item.rs b/vortex-array/src/expr/exprs/get_item.rs index c2559b41ddf..7ae0a4a0814 100644 --- a/vortex-array/src/expr/exprs/get_item.rs +++ b/vortex-array/src/expr/exprs/get_item.rs @@ -83,6 +83,29 @@ fn list_struct_field_dtype( )) } +fn project_struct_field_from_elements( + elements: &ArrayRef, + field_name: &FieldName, +) -> VortexResult { + let struct_elems = elements.to_struct(); + let mut field = struct_elems.field_by_name(field_name).cloned()?; + + if struct_elems.dtype().is_nullable() { + field = mask(&field, &struct_elems.validity_mask().not())?; + } + + Ok(field) +} + +fn project_struct_field_vector( + struct_elements: &vortex_vector::struct_::StructVector, + field_idx: usize, +) -> vortex_vector::Vector { + let mut field = struct_elements.fields()[field_idx].clone(); + field.mask_validity(struct_elements.validity()); + field +} + impl VTable for GetItem { type Options = FieldName; @@ -186,12 +209,7 @@ impl VTable for GetItem { match input.dtype() { DType::List(..) => { let list = input.to_listview(); - let struct_elems = list.elements().to_struct(); - let mut field = struct_elems.field_by_name(field_name).cloned()?; - - if struct_elems.dtype().is_nullable() { - field = mask(&field, &struct_elems.validity_mask().not())?; - } + let field = project_struct_field_from_elements(list.elements(), field_name)?; Ok(ListViewArray::try_new( field, @@ -203,12 +221,7 @@ impl VTable for GetItem { } DType::FixedSizeList(..) => { let list = input.to_fixed_size_list(); - let struct_elems = list.elements().to_struct(); - let mut field = struct_elems.field_by_name(field_name).cloned()?; - - if struct_elems.dtype().is_nullable() { - field = mask(&field, &struct_elems.validity_mask().not())?; - } + let field = project_struct_field_from_elements(list.elements(), field_name)?; Ok(FixedSizeListArray::try_new( field, @@ -249,21 +262,30 @@ impl VTable for GetItem { } // List-of-struct field access: `$.items.a` where `items: list`. - match input_dtype { - DType::List(element_dtype, _) => { - let struct_dtype = element_dtype - .as_struct_fields_opt() - .ok_or_else(|| vortex_err!("Expected list element dtype to be struct"))?; - let field_idx = struct_dtype - .find(field_name) - .ok_or_else(|| vortex_err!("Field {} not found in struct dtype", field_name))?; + let (element_dtype, list_kind): (&DType, &'static str) = match input_dtype { + DType::List(element_dtype, _) => (element_dtype.as_ref(), "list"), + DType::FixedSizeList(element_dtype, ..) => (element_dtype.as_ref(), "fixed-size list"), + _ => { + return Err(vortex_err!( + "Expected struct or list-of-struct dtype for child of GetItem expression, got {}", + input_dtype + )); + } + }; + + let struct_dtype = element_dtype.as_struct_fields_opt().ok_or_else(|| { + vortex_err!("Expected {list_kind} element dtype to be struct for GetItem") + })?; + let field_idx = struct_dtype + .find(field_name) + .ok_or_else(|| vortex_err!("Field {} not found in struct dtype", field_name))?; + match input_dtype { + DType::List(..) => { let project_list = |list: ListViewVector| -> VortexResult { let (elements, offsets, sizes, validity) = list.into_parts(); - - let struct_elements = elements.as_ref().as_struct(); - let mut field = struct_elements.fields()[field_idx].clone(); - field.mask_validity(struct_elements.validity()); + let field = + project_struct_field_vector(elements.as_ref().as_struct(), field_idx); ListViewVector::try_new(Arc::new(field), offsets, sizes, validity) }; @@ -285,21 +307,12 @@ impl VTable for GetItem { } } } - DType::FixedSizeList(element_dtype, list_size, _) => { - let struct_dtype = element_dtype.as_struct_fields_opt().ok_or_else(|| { - vortex_err!("Expected fixed-size list element dtype to be struct") - })?; - let field_idx = struct_dtype - .find(field_name) - .ok_or_else(|| vortex_err!("Field {} not found in struct dtype", field_name))?; - + DType::FixedSizeList(_, list_size, _) => { let project_list = |list: FixedSizeListVector| -> VortexResult { let (elements, _vector_list_size, validity) = list.into_parts(); - - let struct_elements = elements.as_ref().as_struct(); - let mut field = struct_elements.fields()[field_idx].clone(); - field.mask_validity(struct_elements.validity()); + let field = + project_struct_field_vector(elements.as_ref().as_struct(), field_idx); FixedSizeListVector::try_new(Arc::new(field), *list_size, validity) }; @@ -323,10 +336,9 @@ impl VTable for GetItem { } } } - _ => Err(vortex_err!( - "Expected struct or list-of-struct dtype for child of GetItem expression, got {}", - input_dtype - )), + _ => unreachable!( + "Expected {list_kind} dtype for child of GetItem expression, got {input_dtype}", + ), } } @@ -463,6 +475,7 @@ mod tests { use crate::Array; use crate::IntoArray; + use crate::arrays::FixedSizeListArray; use crate::arrays::ListArray; use crate::arrays::StructArray; use crate::expr::exprs::binary::checked_add; @@ -598,6 +611,73 @@ mod tests { ); } + #[test] + fn get_item_fixed_size_list_of_struct() { + let n_lists: usize = 3; + let list_size: u32 = 2; + let n_elements = n_lists * list_size as usize; + + let struct_elems = StructArray::try_new( + FieldNames::from(["a", "b"]), + vec![ + buffer![1i32, 2, 3, 4, 5, 6].into_array(), + buffer![10i64, 20, 30, 40, 50, 60].into_array(), + ], + n_elements, + Validity::from_iter([true, false, true, true, false, true]), + ) + .unwrap() + .into_array(); + + let items = FixedSizeListArray::try_new( + struct_elems, + list_size, + Validity::from_iter([true, false, true]), + n_lists, + ) + .unwrap() + .into_array(); + + let ids = buffer![0i32, 1, 2].into_array(); + + let data = StructArray::new( + FieldNames::from(["id", "items"]), + vec![ids, items], + n_lists, + Validity::NonNullable, + ) + .into_array(); + + // FixedSizeList-of-struct projection: `items.a`, including struct-level nulls inside the list. + let projection = get_item("a", get_item("items", root())); + let out = data.apply(&projection).expect("apply"); + + assert_eq!( + out.dtype(), + &DType::FixedSizeList( + Arc::new(DType::Primitive(PType::I32, Nullability::Nullable)), + list_size, + Nullability::Nullable + ) + ); + + assert_eq!( + out.scalar_at(0).as_list().elements().unwrap().to_vec(), + vec![ + Scalar::primitive(1i32, Nullability::Nullable), + Scalar::null(DType::Primitive(PType::I32, Nullability::Nullable)), + ] + ); + assert!(out.scalar_at(1).is_null()); + assert_eq!( + out.scalar_at(2).as_list().elements().unwrap().to_vec(), + vec![ + Scalar::null(DType::Primitive(PType::I32, Nullability::Nullable)), + Scalar::primitive(6i32, Nullability::Nullable), + ] + ); + } + #[test] fn test_pack_get_item_rule() { // Create: pack(a: lit(1), b: lit(2)).get_item("b") From 87d9553fae88c9a4d4c236433771f41293bb7e96 Mon Sep 17 00:00:00 2001 From: godnight10061 Date: Tue, 20 Jan 2026 23:54:37 +0800 Subject: [PATCH 6/8] Use map expression for list-of-struct projection --- vortex-array/src/expr/exprs/get_item.rs | 253 ++++++--------------- vortex-array/src/expr/exprs/map.rs | 196 ++++++++++++++++ vortex-array/src/expr/exprs/mod.rs | 2 + vortex-array/src/expr/session.rs | 2 + vortex-array/src/expr/vtable.rs | 3 + vortex-datafusion/src/persistent/opener.rs | 11 +- vortex-duckdb/src/convert/table_filter.rs | 7 +- vortex-layout/src/layouts/list/reader.rs | 24 +- vortex-scan/src/scan_builder.rs | 5 +- 9 files changed, 304 insertions(+), 199 deletions(-) create mode 100644 vortex-array/src/expr/exprs/map.rs diff --git a/vortex-array/src/expr/exprs/get_item.rs b/vortex-array/src/expr/exprs/get_item.rs index 7ae0a4a0814..2ecee147db3 100644 --- a/vortex-array/src/expr/exprs/get_item.rs +++ b/vortex-array/src/expr/exprs/get_item.rs @@ -3,7 +3,6 @@ use std::fmt::Formatter; use std::ops::Not; -use std::sync::Arc; use prost::Message; use vortex_dtype::DType; @@ -15,19 +14,11 @@ use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_proto::expr as pb; use vortex_vector::Datum; -use vortex_vector::Scalar as VScalar; use vortex_vector::ScalarOps; use vortex_vector::VectorOps; -use vortex_vector::fixed_size_list::FixedSizeListScalar; -use vortex_vector::fixed_size_list::FixedSizeListVector; -use vortex_vector::listview::ListViewScalar; -use vortex_vector::listview::ListViewVector; use crate::ArrayRef; -use crate::IntoArray; use crate::ToCanonical; -use crate::arrays::FixedSizeListArray; -use crate::arrays::ListViewArray; use crate::builtins::ExprBuiltins; use crate::compute::mask; use crate::expr::Arity; @@ -42,13 +33,14 @@ use crate::expr::Pack; use crate::expr::ReduceCtx; use crate::expr::ReduceNode; use crate::expr::ReduceNodeRef; +use crate::expr::SimplifyCtx; use crate::expr::StatsCatalog; use crate::expr::VTable; use crate::expr::VTableExt; use crate::expr::exprs::root::root; use crate::expr::lit; +use crate::expr::map; use crate::expr::stats::Stat; -use crate::vtable::ValidityHelper; pub struct GetItem; @@ -63,49 +55,6 @@ fn propagate_nullability(parent_nullability: Nullability, field_dtype: DType) -> field_dtype } -fn list_struct_field_dtype( - element_dtype: &DType, - field_name: &FieldName, - list_kind: &'static str, -) -> VortexResult { - let struct_fields = element_dtype.as_struct_fields_opt().ok_or_else(|| { - vortex_err!( - "Expected {list_kind} element dtype to be a struct for GetItem, got {element_dtype}", - ) - })?; - let field_dtype = struct_fields - .field(field_name) - .ok_or_else(|| vortex_err!("Couldn't find the {} field in the input scope", field_name))?; - - Ok(propagate_nullability( - element_dtype.nullability(), - field_dtype, - )) -} - -fn project_struct_field_from_elements( - elements: &ArrayRef, - field_name: &FieldName, -) -> VortexResult { - let struct_elems = elements.to_struct(); - let mut field = struct_elems.field_by_name(field_name).cloned()?; - - if struct_elems.dtype().is_nullable() { - field = mask(&field, &struct_elems.validity_mask().not())?; - } - - Ok(field) -} - -fn project_struct_field_vector( - struct_elements: &vortex_vector::struct_::StructVector, - field_idx: usize, -) -> vortex_vector::Vector { - let mut field = struct_elements.fields()[field_idx].clone(); - field.mask_validity(struct_elements.validity()); - field -} - impl VTable for GetItem { type Options = FieldName; @@ -163,27 +112,10 @@ impl VTable for GetItem { )); } - // List-of-struct field access: `$.items.a` where `items: list`. - match input_dtype { - DType::List(element_dtype, list_nullability) => { - let field_dtype = - list_struct_field_dtype(element_dtype.as_ref(), field_name, "list")?; - Ok(DType::List(Arc::new(field_dtype), *list_nullability)) - } - DType::FixedSizeList(element_dtype, list_size, list_nullability) => { - let field_dtype = - list_struct_field_dtype(element_dtype.as_ref(), field_name, "fixed-size list")?; - Ok(DType::FixedSizeList( - Arc::new(field_dtype), - *list_size, - *list_nullability, - )) - } - _ => Err(vortex_err!( - "Expected struct or list-of-struct dtype for child of GetItem expression, got {}", - input_dtype - )), - } + Err(vortex_err!( + "Expected struct dtype for child of GetItem expression, got {}", + input_dtype + )) } fn evaluate( @@ -205,37 +137,10 @@ impl VTable for GetItem { }; } - // List-of-struct field access: `$.items.a` where `items: list`. - match input.dtype() { - DType::List(..) => { - let list = input.to_listview(); - let field = project_struct_field_from_elements(list.elements(), field_name)?; - - Ok(ListViewArray::try_new( - field, - list.offsets().clone(), - list.sizes().clone(), - list.validity().clone(), - )? - .into_array()) - } - DType::FixedSizeList(..) => { - let list = input.to_fixed_size_list(); - let field = project_struct_field_from_elements(list.elements(), field_name)?; - - Ok(FixedSizeListArray::try_new( - field, - list.list_size(), - list.validity().clone(), - list.len(), - )? - .into_array()) - } - _ => Err(vortex_err!( - "Expected struct or list-of-struct scope for GetItem evaluation, got {}", - input.dtype() - )), - } + Err(vortex_err!( + "Expected struct scope for GetItem evaluation, got {}", + input.dtype() + )) } fn execute(&self, field_name: &FieldName, mut args: ExecutionArgs) -> VortexResult { @@ -261,84 +166,36 @@ impl VTable for GetItem { }; } - // List-of-struct field access: `$.items.a` where `items: list`. - let (element_dtype, list_kind): (&DType, &'static str) = match input_dtype { - DType::List(element_dtype, _) => (element_dtype.as_ref(), "list"), - DType::FixedSizeList(element_dtype, ..) => (element_dtype.as_ref(), "fixed-size list"), - _ => { - return Err(vortex_err!( - "Expected struct or list-of-struct dtype for child of GetItem expression, got {}", - input_dtype - )); - } + Err(vortex_err!( + "Expected struct dtype for child of GetItem expression, got {}", + input_dtype + )) + } + + fn simplify( + &self, + field_name: &FieldName, + expr: &Expression, + ctx: &dyn SimplifyCtx, + ) -> VortexResult> { + let child = expr.child(0); + let child_dtype = ctx.return_dtype(child)?; + + let element_dtype = match child_dtype { + DType::List(element_dtype, _) => Some(element_dtype), + DType::FixedSizeList(element_dtype, ..) => Some(element_dtype), + _ => None, }; - let struct_dtype = element_dtype.as_struct_fields_opt().ok_or_else(|| { - vortex_err!("Expected {list_kind} element dtype to be struct for GetItem") - })?; - let field_idx = struct_dtype - .find(field_name) - .ok_or_else(|| vortex_err!("Field {} not found in struct dtype", field_name))?; - - match input_dtype { - DType::List(..) => { - let project_list = |list: ListViewVector| -> VortexResult { - let (elements, offsets, sizes, validity) = list.into_parts(); - let field = - project_struct_field_vector(elements.as_ref().as_struct(), field_idx); - - ListViewVector::try_new(Arc::new(field), offsets, sizes, validity) - }; - - match args.datums.pop().vortex_expect("missing input") { - Datum::Scalar(s) => { - if !s.is_valid() { - return Ok(Datum::Scalar(VScalar::null(&args.return_dtype))); - } - - Ok(Datum::Scalar( - ListViewScalar::new(project_list(s.into_list().value().clone())?) - .into(), - )) - } - Datum::Vector(v) => { - let list: ListViewVector = v.into_list(); - Ok(Datum::Vector(project_list(list)?.into())) - } - } - } - DType::FixedSizeList(_, list_size, _) => { - let project_list = - |list: FixedSizeListVector| -> VortexResult { - let (elements, _vector_list_size, validity) = list.into_parts(); - let field = - project_struct_field_vector(elements.as_ref().as_struct(), field_idx); - - FixedSizeListVector::try_new(Arc::new(field), *list_size, validity) - }; - - match args.datums.pop().vortex_expect("missing input") { - Datum::Scalar(s) => { - if !s.is_valid() { - return Ok(Datum::Scalar(VScalar::null(&args.return_dtype))); - } - - Ok(Datum::Scalar( - FixedSizeListScalar::new(project_list( - s.into_fixed_size_list().value().clone(), - )?) - .into(), - )) - } - Datum::Vector(v) => { - let list = v.into_fixed_size_list(); - Ok(Datum::Vector(project_list(list)?.into())) - } - } - } - _ => unreachable!( - "Expected {list_kind} dtype for child of GetItem expression, got {input_dtype}", - ), + if let Some(element_dtype) = element_dtype + && element_dtype.as_struct_fields_opt().is_some() + { + Ok(Some(map( + get_item(field_name.clone(), root()), + child.clone(), + ))) + } else { + Ok(None) } } @@ -481,6 +338,7 @@ mod tests { use crate::expr::exprs::binary::checked_add; use crate::expr::exprs::get_item::get_item; use crate::expr::exprs::literal::lit; + use crate::expr::exprs::map::map; use crate::expr::exprs::pack::pack; use crate::expr::exprs::root::root; use crate::validity::Validity; @@ -583,9 +441,7 @@ mod tests { .into_array(); // Regression for nested field projection on list-of-struct: `items.a`. - // The key path here is `Array::apply(...)`, which must be able to infer the correct return - // dtype for `GetItem` when its input is a list with struct elements. - let projection = get_item("a", get_item("items", root())); + let projection = map(get_item("a", root()), get_item("items", root())); let out = data.apply(&projection).expect("apply"); assert_eq!( @@ -649,7 +505,7 @@ mod tests { .into_array(); // FixedSizeList-of-struct projection: `items.a`, including struct-level nulls inside the list. - let projection = get_item("a", get_item("items", root())); + let projection = map(get_item("a", root()), get_item("items", root())); let out = data.apply(&projection).expect("apply"); assert_eq!( @@ -678,6 +534,33 @@ mod tests { ); } + #[test] + fn get_item_list_of_struct_desugars_to_map_on_optimize() { + let scope_dtype = DType::Struct( + [ + ("id", DType::from(PType::I32)), + ( + "items", + DType::List( + Arc::new(DType::Struct( + [("a", DType::from(PType::I32))].into_iter().collect(), + NonNullable, + )), + Nullability::Nullable, + ), + ), + ] + .into_iter() + .collect(), + NonNullable, + ); + + let expr = get_item("a", get_item("items", root())); + let optimized = expr.optimize_recursive(&scope_dtype).unwrap(); + + assert!(optimized.is::()); + } + #[test] fn test_pack_get_item_rule() { // Create: pack(a: lit(1), b: lit(2)).get_item("b") diff --git a/vortex-array/src/expr/exprs/map.rs b/vortex-array/src/expr/exprs/map.rs new file mode 100644 index 00000000000..88d0383fee4 --- /dev/null +++ b/vortex-array/src/expr/exprs/map.rs @@ -0,0 +1,196 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt::Formatter; +use std::sync::Arc; + +use prost::Message; +use vortex_dtype::DType; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_proto::expr as pb; + +use crate::ArrayRef; +use crate::IntoArray; +use crate::ToCanonical; +use crate::arrays::FixedSizeListArray; +use crate::arrays::ListViewArray; +use crate::expr::Arity; +use crate::expr::ChildName; +use crate::expr::ExprId; +use crate::expr::Expression; +use crate::expr::SimplifyCtx; +use crate::expr::VTable; +use crate::expr::VTableExt; +use crate::expr::exprs::get_item::GetItem; +use crate::expr::exprs::root::Root; +use crate::expr::proto::ExprSerializeProtoExt; +use crate::expr::proto::deserialize_expr_proto; +use crate::expr::session::ExprSession; +use crate::vtable::ValidityHelper; + +/// An expression that maps a scalar expression over the elements of a list. +/// +/// The map "lambda" is represented by `element_expr`, which is evaluated with the list elements as +/// its root scope. +pub struct Map; + +impl VTable for Map { + /// The element expression to apply to each list element. + type Options = Expression; + + fn id(&self) -> ExprId { + ExprId::from("vortex.map") + } + + fn serialize(&self, element_expr: &Expression) -> VortexResult>> { + Ok(Some(element_expr.serialize_proto()?.encode_to_vec())) + } + + fn deserialize(&self, metadata: &[u8]) -> VortexResult { + let element_expr_pb = pb::Expr::decode(metadata)?; + let registry = ExprSession::default().registry().clone(); + deserialize_expr_proto(&element_expr_pb, ®istry) + } + + fn arity(&self, _element_expr: &Expression) -> Arity { + Arity::Exact(1) + } + + fn child_name(&self, _element_expr: &Expression, child_idx: usize) -> ChildName { + match child_idx { + 0 => ChildName::from("list"), + _ => unreachable!("Invalid child index {} for Map expression", child_idx), + } + } + + fn fmt_sql( + &self, + element_expr: &Expression, + expr: &Expression, + f: &mut Formatter<'_>, + ) -> std::fmt::Result { + // If the mapping is a simple struct field projection, format as a dotted path (e.g. `$.items.a`) + // to match user expectations and existing `get_item` formatting. + if let Some(field_name) = element_expr.as_opt::() + && element_expr.child(0).is::() + { + expr.child(0).fmt_sql(f)?; + return write!(f, ".{}", field_name); + } + + write!(f, "map(")?; + element_expr.fmt_sql(f)?; + write!(f, ", ")?; + expr.child(0).fmt_sql(f)?; + write!(f, ")") + } + + fn return_dtype(&self, element_expr: &Expression, arg_dtypes: &[DType]) -> VortexResult { + let list_dtype = &arg_dtypes[0]; + match list_dtype { + DType::List(element_dtype, list_nullability) => Ok(DType::List( + Arc::new(element_expr.return_dtype(element_dtype.as_ref())?), + *list_nullability, + )), + DType::FixedSizeList(element_dtype, list_size, list_nullability) => { + Ok(DType::FixedSizeList( + Arc::new(element_expr.return_dtype(element_dtype.as_ref())?), + *list_size, + *list_nullability, + )) + } + _ => Err(vortex_err!( + "Expected list dtype for child of Map expression, got {}", + list_dtype + )), + } + } + + fn evaluate( + &self, + element_expr: &Expression, + expr: &Expression, + scope: &ArrayRef, + ) -> VortexResult { + let input = expr.child(0).evaluate(scope)?; + + match input.dtype() { + DType::List(..) => { + let list = input.to_listview(); + let mapped = element_expr.evaluate(list.elements())?; + + Ok(ListViewArray::try_new( + mapped, + list.offsets().clone(), + list.sizes().clone(), + list.validity().clone(), + )? + .into_array()) + } + DType::FixedSizeList(..) => { + let list = input.to_fixed_size_list(); + let mapped = element_expr.evaluate(list.elements())?; + + Ok(FixedSizeListArray::try_new( + mapped, + list.list_size(), + list.validity().clone(), + list.len(), + )? + .into_array()) + } + _ => Err(vortex_err!( + "Expected list scope for Map evaluation, got {}", + input.dtype() + )), + } + } + + fn simplify( + &self, + element_expr: &Expression, + expr: &Expression, + ctx: &dyn SimplifyCtx, + ) -> VortexResult> { + let list = expr.child(0); + let list_dtype = ctx.return_dtype(list)?; + + let element_dtype = match &list_dtype { + DType::List(element_dtype, _) => Some(element_dtype.as_ref()), + DType::FixedSizeList(element_dtype, ..) => Some(element_dtype.as_ref()), + _ => None, + }; + + let Some(element_dtype) = element_dtype else { + return Ok(None); + }; + + // Optimize the element expression in the element scope (not the outer list scope). + let optimized_element_expr = element_expr.optimize_recursive(element_dtype)?; + + // map(identity, list) == list + if optimized_element_expr.is::() { + return Ok(Some(list.clone())); + } + + if &optimized_element_expr != element_expr { + Ok(Some(map(optimized_element_expr, list.clone()))) + } else { + Ok(None) + } + } + + fn is_null_sensitive(&self, _element_expr: &Expression) -> bool { + true + } + + fn is_fallible(&self, element_expr: &Expression) -> bool { + element_expr.signature().is_fallible() + } +} + +/// Creates an expression that maps `element_expr` over each element of `list`. +pub fn map(element_expr: Expression, list: Expression) -> Expression { + Map.new_expr(element_expr, vec![list]) +} diff --git a/vortex-array/src/expr/exprs/mod.rs b/vortex-array/src/expr/exprs/mod.rs index c606b53f5a0..16c0c5bb8c6 100644 --- a/vortex-array/src/expr/exprs/mod.rs +++ b/vortex-array/src/expr/exprs/mod.rs @@ -10,6 +10,7 @@ pub(crate) mod is_null; pub(crate) mod like; pub(crate) mod list_contains; pub(crate) mod literal; +pub(crate) mod map; pub(crate) mod mask; pub(crate) mod merge; pub(crate) mod not; @@ -27,6 +28,7 @@ pub use is_null::*; pub use like::*; pub use list_contains::*; pub use literal::*; +pub use map::*; pub use mask::*; pub use merge::*; pub use not::*; diff --git a/vortex-array/src/expr/session.rs b/vortex-array/src/expr/session.rs index f86f860e002..fb18564b536 100644 --- a/vortex-array/src/expr/session.rs +++ b/vortex-array/src/expr/session.rs @@ -14,6 +14,7 @@ use crate::expr::exprs::is_null::IsNull; use crate::expr::exprs::like::Like; use crate::expr::exprs::list_contains::ListContains; use crate::expr::exprs::literal::Literal; +use crate::expr::exprs::map::Map; use crate::expr::exprs::merge::Merge; use crate::expr::exprs::not::Not; use crate::expr::exprs::pack::Pack; @@ -58,6 +59,7 @@ impl Default for ExprSession { ExprVTable::new_static(&IsNull), ExprVTable::new_static(&Like), ExprVTable::new_static(&ListContains), + ExprVTable::new_static(&Map), ExprVTable::new_static(&Literal), ExprVTable::new_static(&Merge), ExprVTable::new_static(&Not), diff --git a/vortex-array/src/expr/vtable.rs b/vortex-array/src/expr/vtable.rs index e15d539944c..225c66644dd 100644 --- a/vortex-array/src/expr/vtable.rs +++ b/vortex-array/src/expr/vtable.rs @@ -712,6 +712,7 @@ mod tests { use crate::expr::exprs::is_null::is_null; use crate::expr::exprs::list_contains::list_contains; use crate::expr::exprs::literal::lit; + use crate::expr::exprs::map::map; use crate::expr::exprs::merge::merge; use crate::expr::exprs::not::not; use crate::expr::exprs::pack::pack; @@ -771,6 +772,8 @@ mod tests { ))] // List contains expressions #[case(list_contains(col("list_col"), lit("item")))] + // List map expressions + #[case(map(get_item("field", root()), root()))] // Pack expressions - creating struct from fields #[case(pack([("field1", col("a")), ("field2", col("b"))], vortex_dtype::Nullability::NonNullable ))] diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index c6c2badc528..db05595463f 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -196,8 +196,15 @@ impl FileOpener for VortexOpener { )?; // The schema of the stream returned from the vortex scan. - let scan_dtype = scan_projection.return_dtype(vxf.dtype()).map_err(|_e| { - exec_datafusion_err!("Couldn't get the dtype for the underlying Vortex scan") + let scan_projection = scan_projection + .optimize_recursive(vxf.dtype()) + .map_err(|e| { + exec_datafusion_err!( + "Couldn't simplify the underlying Vortex scan projection: {e}" + ) + })?; + let scan_dtype = scan_projection.return_dtype(vxf.dtype()).map_err(|e| { + exec_datafusion_err!("Couldn't get the dtype for the underlying Vortex scan: {e}") })?; let stream_schema = scan_dtype.to_arrow_schema().map_err(|_e| { exec_datafusion_err!("Couldn't get the schema for the underlying Vortex scan") diff --git a/vortex-duckdb/src/convert/table_filter.rs b/vortex-duckdb/src/convert/table_filter.rs index bab62cbe838..e53525f4446 100644 --- a/vortex-duckdb/src/convert/table_filter.rs +++ b/vortex-duckdb/src/convert/table_filter.rs @@ -31,6 +31,7 @@ pub fn try_from_table_filter( col: &Expression, scope_dtype: &DType, ) -> VortexResult> { + let col = col.optimize_recursive(scope_dtype)?; Ok(Some(match value.as_class() { TableFilterClass::ConstantComparison(const_) => { let scalar: Scalar = const_.value.try_into()?; @@ -40,7 +41,7 @@ pub fn try_from_table_filter( TableFilterClass::ConjunctionAnd(conj_and) => { let Some(children) = conj_and .children() - .map(|child| try_from_table_filter(&child, col, scope_dtype)) + .map(|child| try_from_table_filter(&child, &col, scope_dtype)) .try_collect::<_, Option>, _>()? else { return Ok(None); @@ -52,7 +53,7 @@ pub fn try_from_table_filter( TableFilterClass::ConjunctionOr(disjuction_or) => { let Some(children) = disjuction_or .children() - .map(|child| try_from_table_filter(&child, col, scope_dtype)) + .map(|child| try_from_table_filter(&child, &col, scope_dtype)) .try_collect::<_, Option>, _>()? else { return Ok(None); @@ -67,7 +68,7 @@ pub fn try_from_table_filter( } TableFilterClass::Optional(child) => { // Optional expressions are optional not yet supported. - return try_from_table_filter(&child, col, scope_dtype).or_else(|_err| { + return try_from_table_filter(&child, &col, scope_dtype).or_else(|_err| { // Failed to convert the optional expression, but it's optional, so who cares? Ok(None) }); diff --git a/vortex-layout/src/layouts/list/reader.rs b/vortex-layout/src/layouts/list/reader.rs index 6106f5ea72a..32037b189d5 100644 --- a/vortex-layout/src/layouts/list/reader.rs +++ b/vortex-layout/src/layouts/list/reader.rs @@ -15,6 +15,8 @@ use vortex_array::VortexSessionExecute; use vortex_array::arrays::FixedSizeListArray; use vortex_array::arrays::ListArray; use vortex_array::expr::Expression; +use vortex_array::expr::Map; +use vortex_array::expr::Root; use vortex_array::expr::root; use vortex_dtype::DType; use vortex_dtype::FieldMask; @@ -319,17 +321,23 @@ impl LayoutReader for ListReader { expr: &Expression, mask: MaskFuture, ) -> VortexResult { - // If the expression is a simple column access or select, we can push it down to the elements. - let is_pushdown = matches!( - expr.vtable().id().as_ref(), - "vortex.get_item" | "vortex.select" - ); + // If the expression is a simple element projection, we can push it down to the elements. + // + // NOTE: `vortex.map` is an explicit "map over elements" expression; when pushing down we + // pass the element expression into the elements reader. + let (is_pushdown, element_expr) = if let Some(element_expr) = expr.as_opt::() + && expr.child(0).is::() + { + (true, element_expr.clone()) + } else if expr.vtable().id().as_ref() == "vortex.select" { + (true, expr.clone()) + } else { + (false, root()) + }; let row_range = row_range.clone(); let expr = expr.clone(); - let root_expr = root(); - let list_fut = - self.list_slice_future(row_range, if is_pushdown { &expr } else { &root_expr })?; + let list_fut = self.list_slice_future(row_range, &element_expr)?; Ok(Box::pin(async move { let (mut array, mask) = try_join!(list_fut, mask)?; diff --git a/vortex-scan/src/scan_builder.rs b/vortex-scan/src/scan_builder.rs index a8b562d134b..47831574509 100644 --- a/vortex-scan/src/scan_builder.rs +++ b/vortex-scan/src/scan_builder.rs @@ -187,7 +187,10 @@ impl ScanBuilder { /// The [`DType`] returned by the scan, after applying the projection. pub fn dtype(&self) -> VortexResult { - self.projection.return_dtype(self.layout_reader.dtype()) + let projection = self + .projection + .optimize_recursive(self.layout_reader.dtype())?; + projection.return_dtype(self.layout_reader.dtype()) } /// The session used by the scan. From 66f104ee1f2259789f89c3155ad7b65dfbc92941 Mon Sep 17 00:00:00 2001 From: godnight10061 Date: Wed, 21 Jan 2026 12:33:14 +0800 Subject: [PATCH 7/8] Improve ListReader split planning for FixedSizeList --- vortex-layout/src/layouts/list/reader.rs | 145 ++++++++++++++++++++++- 1 file changed, 144 insertions(+), 1 deletion(-) diff --git a/vortex-layout/src/layouts/list/reader.rs b/vortex-layout/src/layouts/list/reader.rs index 32037b189d5..dc31645e1d5 100644 --- a/vortex-layout/src/layouts/list/reader.rs +++ b/vortex-layout/src/layouts/list/reader.rs @@ -268,11 +268,53 @@ impl LayoutReader for ListReader { fn register_splits( &self, - _field_mask: &[FieldMask], + field_mask: &[FieldMask], row_range: &Range, splits: &mut BTreeSet, ) -> VortexResult<()> { splits.insert(row_range.end); + + match self.dtype() { + DType::FixedSizeList(_, list_size, _) => { + let list_size_u64 = u64::from(*list_size); + + let element_start = row_range + .start + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element start overflow"))?; + let element_end = row_range + .end + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element end overflow"))?; + + let element_range = element_start..element_end; + let mut element_splits = BTreeSet::new(); + self.elements()?.register_splits( + field_mask, + &element_range, + &mut element_splits, + )?; + + // Convert element splits back to row splits, but only when the element split + // is aligned to a row boundary. + for element_split in element_splits { + if element_split % list_size_u64 != 0 { + continue; + } + + let row_split = element_split / list_size_u64; + if row_split > row_range.start && row_split < row_range.end { + splits.insert(row_split); + } + } + } + // TODO(a10y): Variable-size lists can only be split "naturally" based on elements, + // but mapping element splits back to row ranges requires offsets (or precomputed + // row->element boundary metadata) that we don't have at split-planning time. + DType::List(..) => {} + _ => {} + } + Ok(()) } @@ -355,3 +397,104 @@ impl LayoutReader for ListReader { })) } } + +#[cfg(test)] +mod tests { + use std::collections::BTreeSet; + use std::sync::Arc; + + use futures::stream; + use vortex_array::Array; + use vortex_array::ArrayContext; + use vortex_array::IntoArray; + use vortex_array::arrays::FixedSizeListArray; + use vortex_buffer::buffer; + use vortex_dtype::Nullability::NonNullable; + use vortex_dtype::PType; + use vortex_io::runtime::single::block_on; + + use crate::LayoutStrategy; + use crate::layouts::chunked::writer::ChunkedLayoutStrategy; + use crate::layouts::flat::writer::FlatLayoutStrategy; + use crate::layouts::list::writer::ListStrategy; + use crate::segments::TestSegments; + use crate::sequence::SequenceId; + use crate::sequence::SequentialStreamAdapter; + use crate::sequence::SequentialStreamExt as _; + use crate::test::SESSION; + + #[test] + fn register_splits_fixed_size_list_maps_element_splits_to_rows() { + let ctx = ArrayContext::empty(); + + let segments = Arc::new(TestSegments::default()); + + let list_size: u32 = 2; + + let chunk1_elements = buffer![1i32, 2, 3, 4].into_array(); + let chunk1 = FixedSizeListArray::try_new( + chunk1_elements, + list_size, + vortex_array::validity::Validity::NonNullable, + 2, + ) + .unwrap() + .into_array(); + + let chunk2_elements = buffer![5i32, 6, 7, 8].into_array(); + let chunk2 = FixedSizeListArray::try_new( + chunk2_elements, + list_size, + vortex_array::validity::Validity::NonNullable, + 2, + ) + .unwrap() + .into_array(); + + let list_dtype = chunk1.dtype().clone(); + + let elements_strategy = Arc::new(ChunkedLayoutStrategy::new(FlatLayoutStrategy::default())); + let strategy = ListStrategy::new( + Arc::new(FlatLayoutStrategy::default()), + Arc::new(FlatLayoutStrategy::default()), + elements_strategy, + ); + + let (mut sequence_id, eof) = SequenceId::root().split(); + let layout = block_on(|handle| { + strategy.write_stream( + ctx, + segments.clone(), + SequentialStreamAdapter::new( + vortex_dtype::DType::FixedSizeList( + Arc::new(vortex_dtype::DType::Primitive(PType::I32, NonNullable)), + list_size, + NonNullable, + ), + stream::iter([ + Ok((sequence_id.advance(), chunk1)), + Ok((sequence_id.advance(), chunk2)), + ]), + ) + .sendable(), + eof, + handle, + ) + }) + .unwrap(); + + // Sanity check we produced the expected fixed-size list shape. + assert_eq!(layout.row_count(), 4); + assert_eq!(layout.dtype(), &list_dtype); + + // The elements child is chunked with a split at element index 4, which should map to row 2. + let reader = layout.new_reader("".into(), segments, &SESSION).unwrap(); + let mut splits = BTreeSet::new(); + reader + .register_splits(&[], &(0..layout.row_count()), &mut splits) + .unwrap(); + + assert!(splits.contains(&2), "splits = {splits:?}"); + assert!(splits.contains(&layout.row_count())); + } +} From 51cfa4f76d0a49d7435b4574a09c70f46a025615 Mon Sep 17 00:00:00 2001 From: godnight10061 Date: Thu, 22 Jan 2026 10:19:19 +0800 Subject: [PATCH 8/8] Use get_item_list for list-of-struct projection --- vortex-array/src/expr/exprs/get_item.rs | 17 +- vortex-array/src/expr/exprs/get_item_list.rs | 200 +++++++++++++++++++ vortex-array/src/expr/exprs/map.rs | 196 ------------------ vortex-array/src/expr/exprs/mod.rs | 4 +- vortex-array/src/expr/session.rs | 4 +- vortex-array/src/expr/vtable.rs | 6 +- vortex-layout/src/layouts/list/reader.rs | 11 +- 7 files changed, 220 insertions(+), 218 deletions(-) create mode 100644 vortex-array/src/expr/exprs/get_item_list.rs delete mode 100644 vortex-array/src/expr/exprs/map.rs diff --git a/vortex-array/src/expr/exprs/get_item.rs b/vortex-array/src/expr/exprs/get_item.rs index 2ecee147db3..a03b823ca0b 100644 --- a/vortex-array/src/expr/exprs/get_item.rs +++ b/vortex-array/src/expr/exprs/get_item.rs @@ -38,8 +38,8 @@ use crate::expr::StatsCatalog; use crate::expr::VTable; use crate::expr::VTableExt; use crate::expr::exprs::root::root; +use crate::expr::get_item_list; use crate::expr::lit; -use crate::expr::map; use crate::expr::stats::Stat; pub struct GetItem; @@ -190,10 +190,7 @@ impl VTable for GetItem { if let Some(element_dtype) = element_dtype && element_dtype.as_struct_fields_opt().is_some() { - Ok(Some(map( - get_item(field_name.clone(), root()), - child.clone(), - ))) + Ok(Some(get_item_list(field_name.clone(), child.clone()))) } else { Ok(None) } @@ -337,8 +334,8 @@ mod tests { use crate::arrays::StructArray; use crate::expr::exprs::binary::checked_add; use crate::expr::exprs::get_item::get_item; + use crate::expr::exprs::get_item_list::get_item_list; use crate::expr::exprs::literal::lit; - use crate::expr::exprs::map::map; use crate::expr::exprs::pack::pack; use crate::expr::exprs::root::root; use crate::validity::Validity; @@ -441,7 +438,7 @@ mod tests { .into_array(); // Regression for nested field projection on list-of-struct: `items.a`. - let projection = map(get_item("a", root()), get_item("items", root())); + let projection = get_item_list("a", get_item("items", root())); let out = data.apply(&projection).expect("apply"); assert_eq!( @@ -505,7 +502,7 @@ mod tests { .into_array(); // FixedSizeList-of-struct projection: `items.a`, including struct-level nulls inside the list. - let projection = map(get_item("a", root()), get_item("items", root())); + let projection = get_item_list("a", get_item("items", root())); let out = data.apply(&projection).expect("apply"); assert_eq!( @@ -535,7 +532,7 @@ mod tests { } #[test] - fn get_item_list_of_struct_desugars_to_map_on_optimize() { + fn get_item_list_of_struct_desugars_to_get_item_list_on_optimize() { let scope_dtype = DType::Struct( [ ("id", DType::from(PType::I32)), @@ -558,7 +555,7 @@ mod tests { let expr = get_item("a", get_item("items", root())); let optimized = expr.optimize_recursive(&scope_dtype).unwrap(); - assert!(optimized.is::()); + assert!(optimized.is::()); } #[test] diff --git a/vortex-array/src/expr/exprs/get_item_list.rs b/vortex-array/src/expr/exprs/get_item_list.rs new file mode 100644 index 00000000000..31397edd6c5 --- /dev/null +++ b/vortex-array/src/expr/exprs/get_item_list.rs @@ -0,0 +1,200 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt::Formatter; +use std::ops::Not; +use std::sync::Arc; + +use prost::Message; +use vortex_dtype::DType; +use vortex_dtype::FieldName; +use vortex_dtype::Nullability; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_proto::expr as pb; + +use crate::ArrayRef; +use crate::IntoArray; +use crate::ToCanonical; +use crate::arrays::FixedSizeListArray; +use crate::arrays::ListViewArray; +use crate::compute::mask; +use crate::expr::Arity; +use crate::expr::ChildName; +use crate::expr::ExprId; +use crate::expr::Expression; +use crate::expr::VTable; +use crate::expr::VTableExt; + +pub struct GetItemList; + +fn propagate_nullability(parent_nullability: Nullability, field_dtype: DType) -> DType { + if matches!( + (parent_nullability, field_dtype.nullability()), + (Nullability::Nullable, Nullability::NonNullable) + ) { + return field_dtype.with_nullability(Nullability::Nullable); + } + + field_dtype +} + +impl VTable for GetItemList { + type Options = FieldName; + + fn id(&self) -> ExprId { + ExprId::from("vortex.get_item_list") + } + + fn serialize(&self, field_name: &FieldName) -> VortexResult>> { + Ok(Some( + pb::GetItemOpts { + path: field_name.to_string(), + } + .encode_to_vec(), + )) + } + + fn deserialize(&self, metadata: &[u8]) -> VortexResult { + let opts = pb::GetItemOpts::decode(metadata)?; + Ok(FieldName::from(opts.path)) + } + + fn arity(&self, _field_name: &FieldName) -> Arity { + Arity::Exact(1) + } + + fn child_name(&self, _field_name: &FieldName, child_idx: usize) -> ChildName { + match child_idx { + 0 => ChildName::from("list"), + _ => unreachable!( + "Invalid child index {} for GetItemList expression", + child_idx + ), + } + } + + fn fmt_sql( + &self, + field_name: &FieldName, + expr: &Expression, + f: &mut Formatter<'_>, + ) -> std::fmt::Result { + expr.child(0).fmt_sql(f)?; + write!(f, ".{}", field_name) + } + + fn return_dtype(&self, field_name: &FieldName, arg_dtypes: &[DType]) -> VortexResult { + let list_dtype = &arg_dtypes[0]; + + let (element_dtype, list_nullability, list_size) = match list_dtype { + DType::List(element_dtype, list_nullability) => { + (element_dtype.as_ref(), *list_nullability, None) + } + DType::FixedSizeList(element_dtype, list_size, list_nullability) => { + (element_dtype.as_ref(), *list_nullability, Some(*list_size)) + } + _ => { + return Err(vortex_err!( + "Expected list dtype for child of GetItemList expression, got {}", + list_dtype + )); + } + }; + + let struct_fields = element_dtype.as_struct_fields_opt().ok_or_else(|| { + vortex_err!( + "Expected list element struct dtype for GetItemList, got {}", + element_dtype + ) + })?; + + let field_dtype = struct_fields.field(field_name).ok_or_else(|| { + vortex_err!( + "Couldn't find the {} field in the list element struct dtype", + field_name + ) + })?; + + let projected = propagate_nullability(element_dtype.nullability(), field_dtype); + + Ok(match list_size { + Some(list_size) => { + DType::FixedSizeList(Arc::new(projected), list_size, list_nullability) + } + None => DType::List(Arc::new(projected), list_nullability), + }) + } + + fn evaluate( + &self, + field_name: &FieldName, + expr: &Expression, + scope: &ArrayRef, + ) -> VortexResult { + let input = expr.child(0).evaluate(scope)?; + + match input.dtype() { + DType::List(..) => { + let list = input.to_listview(); + + let elements = list.elements(); + let elements = elements.to_struct(); + + let field = elements.field_by_name(field_name).cloned()?; + let field = match elements.dtype().nullability() { + Nullability::NonNullable => field, + Nullability::Nullable => mask(&field, &elements.validity_mask().not())?, + }; + + Ok(ListViewArray::try_new( + field, + list.offsets().clone(), + list.sizes().clone(), + list.validity()?, + )? + .into_array()) + } + DType::FixedSizeList(..) => { + let list = input.to_fixed_size_list(); + + let elements = list.elements(); + let elements = elements.to_struct(); + + let field = elements.field_by_name(field_name).cloned()?; + let field = match elements.dtype().nullability() { + Nullability::NonNullable => field, + Nullability::Nullable => mask(&field, &elements.validity_mask().not())?, + }; + + Ok(FixedSizeListArray::try_new( + field, + list.list_size(), + list.validity()?, + list.len(), + )? + .into_array()) + } + _ => Err(vortex_err!( + "Expected list scope for GetItemList evaluation, got {}", + input.dtype() + )), + } + } + + fn is_null_sensitive(&self, _field_name: &FieldName) -> bool { + true + } + + fn is_fallible(&self, _field_name: &FieldName) -> bool { + false + } +} + +/// Creates an expression that projects a struct field from each element of a list. +/// +/// This is a temporary, specialized form of `map(get_item(field, x), list)` used to support +/// nested projections like `items.a` without a general higher-order `map` expression. +pub fn get_item_list(field: impl Into, list: Expression) -> Expression { + GetItemList.new_expr(field.into(), vec![list]) +} diff --git a/vortex-array/src/expr/exprs/map.rs b/vortex-array/src/expr/exprs/map.rs deleted file mode 100644 index 88d0383fee4..00000000000 --- a/vortex-array/src/expr/exprs/map.rs +++ /dev/null @@ -1,196 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -use std::fmt::Formatter; -use std::sync::Arc; - -use prost::Message; -use vortex_dtype::DType; -use vortex_error::VortexResult; -use vortex_error::vortex_err; -use vortex_proto::expr as pb; - -use crate::ArrayRef; -use crate::IntoArray; -use crate::ToCanonical; -use crate::arrays::FixedSizeListArray; -use crate::arrays::ListViewArray; -use crate::expr::Arity; -use crate::expr::ChildName; -use crate::expr::ExprId; -use crate::expr::Expression; -use crate::expr::SimplifyCtx; -use crate::expr::VTable; -use crate::expr::VTableExt; -use crate::expr::exprs::get_item::GetItem; -use crate::expr::exprs::root::Root; -use crate::expr::proto::ExprSerializeProtoExt; -use crate::expr::proto::deserialize_expr_proto; -use crate::expr::session::ExprSession; -use crate::vtable::ValidityHelper; - -/// An expression that maps a scalar expression over the elements of a list. -/// -/// The map "lambda" is represented by `element_expr`, which is evaluated with the list elements as -/// its root scope. -pub struct Map; - -impl VTable for Map { - /// The element expression to apply to each list element. - type Options = Expression; - - fn id(&self) -> ExprId { - ExprId::from("vortex.map") - } - - fn serialize(&self, element_expr: &Expression) -> VortexResult>> { - Ok(Some(element_expr.serialize_proto()?.encode_to_vec())) - } - - fn deserialize(&self, metadata: &[u8]) -> VortexResult { - let element_expr_pb = pb::Expr::decode(metadata)?; - let registry = ExprSession::default().registry().clone(); - deserialize_expr_proto(&element_expr_pb, ®istry) - } - - fn arity(&self, _element_expr: &Expression) -> Arity { - Arity::Exact(1) - } - - fn child_name(&self, _element_expr: &Expression, child_idx: usize) -> ChildName { - match child_idx { - 0 => ChildName::from("list"), - _ => unreachable!("Invalid child index {} for Map expression", child_idx), - } - } - - fn fmt_sql( - &self, - element_expr: &Expression, - expr: &Expression, - f: &mut Formatter<'_>, - ) -> std::fmt::Result { - // If the mapping is a simple struct field projection, format as a dotted path (e.g. `$.items.a`) - // to match user expectations and existing `get_item` formatting. - if let Some(field_name) = element_expr.as_opt::() - && element_expr.child(0).is::() - { - expr.child(0).fmt_sql(f)?; - return write!(f, ".{}", field_name); - } - - write!(f, "map(")?; - element_expr.fmt_sql(f)?; - write!(f, ", ")?; - expr.child(0).fmt_sql(f)?; - write!(f, ")") - } - - fn return_dtype(&self, element_expr: &Expression, arg_dtypes: &[DType]) -> VortexResult { - let list_dtype = &arg_dtypes[0]; - match list_dtype { - DType::List(element_dtype, list_nullability) => Ok(DType::List( - Arc::new(element_expr.return_dtype(element_dtype.as_ref())?), - *list_nullability, - )), - DType::FixedSizeList(element_dtype, list_size, list_nullability) => { - Ok(DType::FixedSizeList( - Arc::new(element_expr.return_dtype(element_dtype.as_ref())?), - *list_size, - *list_nullability, - )) - } - _ => Err(vortex_err!( - "Expected list dtype for child of Map expression, got {}", - list_dtype - )), - } - } - - fn evaluate( - &self, - element_expr: &Expression, - expr: &Expression, - scope: &ArrayRef, - ) -> VortexResult { - let input = expr.child(0).evaluate(scope)?; - - match input.dtype() { - DType::List(..) => { - let list = input.to_listview(); - let mapped = element_expr.evaluate(list.elements())?; - - Ok(ListViewArray::try_new( - mapped, - list.offsets().clone(), - list.sizes().clone(), - list.validity().clone(), - )? - .into_array()) - } - DType::FixedSizeList(..) => { - let list = input.to_fixed_size_list(); - let mapped = element_expr.evaluate(list.elements())?; - - Ok(FixedSizeListArray::try_new( - mapped, - list.list_size(), - list.validity().clone(), - list.len(), - )? - .into_array()) - } - _ => Err(vortex_err!( - "Expected list scope for Map evaluation, got {}", - input.dtype() - )), - } - } - - fn simplify( - &self, - element_expr: &Expression, - expr: &Expression, - ctx: &dyn SimplifyCtx, - ) -> VortexResult> { - let list = expr.child(0); - let list_dtype = ctx.return_dtype(list)?; - - let element_dtype = match &list_dtype { - DType::List(element_dtype, _) => Some(element_dtype.as_ref()), - DType::FixedSizeList(element_dtype, ..) => Some(element_dtype.as_ref()), - _ => None, - }; - - let Some(element_dtype) = element_dtype else { - return Ok(None); - }; - - // Optimize the element expression in the element scope (not the outer list scope). - let optimized_element_expr = element_expr.optimize_recursive(element_dtype)?; - - // map(identity, list) == list - if optimized_element_expr.is::() { - return Ok(Some(list.clone())); - } - - if &optimized_element_expr != element_expr { - Ok(Some(map(optimized_element_expr, list.clone()))) - } else { - Ok(None) - } - } - - fn is_null_sensitive(&self, _element_expr: &Expression) -> bool { - true - } - - fn is_fallible(&self, element_expr: &Expression) -> bool { - element_expr.signature().is_fallible() - } -} - -/// Creates an expression that maps `element_expr` over each element of `list`. -pub fn map(element_expr: Expression, list: Expression) -> Expression { - Map.new_expr(element_expr, vec![list]) -} diff --git a/vortex-array/src/expr/exprs/mod.rs b/vortex-array/src/expr/exprs/mod.rs index 16c0c5bb8c6..748d439dd93 100644 --- a/vortex-array/src/expr/exprs/mod.rs +++ b/vortex-array/src/expr/exprs/mod.rs @@ -6,11 +6,11 @@ pub(crate) mod binary; pub(crate) mod cast; pub(crate) mod dynamic; pub(crate) mod get_item; +pub(crate) mod get_item_list; pub(crate) mod is_null; pub(crate) mod like; pub(crate) mod list_contains; pub(crate) mod literal; -pub(crate) mod map; pub(crate) mod mask; pub(crate) mod merge; pub(crate) mod not; @@ -24,11 +24,11 @@ pub use binary::*; pub use cast::*; pub use dynamic::*; pub use get_item::*; +pub use get_item_list::*; pub use is_null::*; pub use like::*; pub use list_contains::*; pub use literal::*; -pub use map::*; pub use mask::*; pub use merge::*; pub use not::*; diff --git a/vortex-array/src/expr/session.rs b/vortex-array/src/expr/session.rs index fb18564b536..8baf0e2d8f0 100644 --- a/vortex-array/src/expr/session.rs +++ b/vortex-array/src/expr/session.rs @@ -10,11 +10,11 @@ use crate::expr::exprs::between::Between; use crate::expr::exprs::binary::Binary; use crate::expr::exprs::cast::Cast; use crate::expr::exprs::get_item::GetItem; +use crate::expr::exprs::get_item_list::GetItemList; use crate::expr::exprs::is_null::IsNull; use crate::expr::exprs::like::Like; use crate::expr::exprs::list_contains::ListContains; use crate::expr::exprs::literal::Literal; -use crate::expr::exprs::map::Map; use crate::expr::exprs::merge::Merge; use crate::expr::exprs::not::Not; use crate::expr::exprs::pack::Pack; @@ -56,10 +56,10 @@ impl Default for ExprSession { ExprVTable::new_static(&Binary), ExprVTable::new_static(&Cast), ExprVTable::new_static(&GetItem), + ExprVTable::new_static(&GetItemList), ExprVTable::new_static(&IsNull), ExprVTable::new_static(&Like), ExprVTable::new_static(&ListContains), - ExprVTable::new_static(&Map), ExprVTable::new_static(&Literal), ExprVTable::new_static(&Merge), ExprVTable::new_static(&Not), diff --git a/vortex-array/src/expr/vtable.rs b/vortex-array/src/expr/vtable.rs index 225c66644dd..7274b2f6958 100644 --- a/vortex-array/src/expr/vtable.rs +++ b/vortex-array/src/expr/vtable.rs @@ -709,10 +709,10 @@ mod tests { use crate::expr::exprs::cast::cast; use crate::expr::exprs::get_item::col; use crate::expr::exprs::get_item::get_item; + use crate::expr::exprs::get_item_list::get_item_list; use crate::expr::exprs::is_null::is_null; use crate::expr::exprs::list_contains::list_contains; use crate::expr::exprs::literal::lit; - use crate::expr::exprs::map::map; use crate::expr::exprs::merge::merge; use crate::expr::exprs::not::not; use crate::expr::exprs::pack::pack; @@ -772,8 +772,8 @@ mod tests { ))] // List contains expressions #[case(list_contains(col("list_col"), lit("item")))] - // List map expressions - #[case(map(get_item("field", root()), root()))] + // Unstable list-of-struct projection expressions + #[case(get_item_list("field", root()))] // Pack expressions - creating struct from fields #[case(pack([("field1", col("a")), ("field2", col("b"))], vortex_dtype::Nullability::NonNullable ))] diff --git a/vortex-layout/src/layouts/list/reader.rs b/vortex-layout/src/layouts/list/reader.rs index dc31645e1d5..a5c973a75fe 100644 --- a/vortex-layout/src/layouts/list/reader.rs +++ b/vortex-layout/src/layouts/list/reader.rs @@ -15,8 +15,9 @@ use vortex_array::VortexSessionExecute; use vortex_array::arrays::FixedSizeListArray; use vortex_array::arrays::ListArray; use vortex_array::expr::Expression; -use vortex_array::expr::Map; +use vortex_array::expr::GetItemList; use vortex_array::expr::Root; +use vortex_array::expr::get_item; use vortex_array::expr::root; use vortex_dtype::DType; use vortex_dtype::FieldMask; @@ -365,12 +366,12 @@ impl LayoutReader for ListReader { ) -> VortexResult { // If the expression is a simple element projection, we can push it down to the elements. // - // NOTE: `vortex.map` is an explicit "map over elements" expression; when pushing down we - // pass the element expression into the elements reader. - let (is_pushdown, element_expr) = if let Some(element_expr) = expr.as_opt::() + // NOTE: `vortex.get_item_list` is a temporary list-of-struct projection expression; + // when pushing down we construct the element projection and pass it into the elements reader. + let (is_pushdown, element_expr) = if let Some(field_name) = expr.as_opt::() && expr.child(0).is::() { - (true, element_expr.clone()) + (true, get_item(field_name.clone(), root())) } else if expr.vtable().id().as_ref() == "vortex.select" { (true, expr.clone()) } else {