diff --git a/vortex-array/src/expr/analysis/immediate_access.rs b/vortex-array/src/expr/analysis/immediate_access.rs index 6dc26a7c950..7eb61c3fd37 100644 --- a/vortex-array/src/expr/analysis/immediate_access.rs +++ b/vortex-array/src/expr/analysis/immediate_access.rs @@ -11,6 +11,7 @@ use crate::expr::analysis::AnnotationFn; use crate::expr::analysis::Annotations; use crate::expr::descendent_annotations; use crate::expr::exprs::get_item::GetItem; +use crate::expr::exprs::get_item_list::GetItemList; use crate::expr::exprs::root::Root; use crate::expr::exprs::select::Select; @@ -28,7 +29,17 @@ pub fn annotate_scope_access(scope: &StructFields) -> impl AnnotationFn() { return vec![field_name.clone()]; } - } else if expr.is::() { + } + + if expr.is::() { + if let Some(field_name) = expr.child(0).as_opt::() { + if expr.child(0).child(0).is::() { + return vec![field_name.clone()]; + } + } + } + + if expr.is::() { return scope.names().iter().cloned().collect(); } diff --git a/vortex-array/src/expr/exprs/get_item.rs b/vortex-array/src/expr/exprs/get_item.rs index 4437004040b..011c19e25d7 100644 --- a/vortex-array/src/expr/exprs/get_item.rs +++ b/vortex-array/src/expr/exprs/get_item.rs @@ -33,10 +33,12 @@ use crate::expr::Pack; use crate::expr::ReduceCtx; use crate::expr::ReduceNode; use crate::expr::ReduceNodeRef; +use crate::expr::SimplifyCtx; use crate::expr::StatsCatalog; use crate::expr::VTable; use crate::expr::VTableExt; use crate::expr::exprs::root::root; +use crate::expr::get_item_list; use crate::expr::lit; use crate::expr::stats::Stat; @@ -85,23 +87,21 @@ impl VTable for GetItem { } fn return_dtype(&self, field_name: &FieldName, arg_dtypes: &[DType]) -> VortexResult { - let struct_dtype = &arg_dtypes[0]; - let field_dtype = struct_dtype - .as_struct_fields_opt() - .and_then(|st| st.field(field_name)) - .ok_or_else(|| { + let input_dtype = &arg_dtypes[0]; + + // Struct field access: `$.a`. + if let Some(struct_fields) = input_dtype.as_struct_fields_opt() { + let field_dtype = struct_fields.field(field_name).ok_or_else(|| { vortex_err!("Couldn't find the {} field in the input scope", field_name) })?; - // Match here to avoid cloning the dtype if nullability doesn't need to change - if matches!( - (struct_dtype.nullability(), field_dtype.nullability()), - (Nullability::Nullable, Nullability::NonNullable) - ) { - return Ok(field_dtype.with_nullability(Nullability::Nullable)); + return Ok(field_dtype.union_nullability(input_dtype.nullability())); } - Ok(field_dtype) + Err(vortex_err!( + "Expected struct dtype for child of GetItem expression, got {}", + input_dtype + )) } fn evaluate( @@ -110,34 +110,75 @@ impl VTable for GetItem { expr: &Expression, scope: &ArrayRef, ) -> VortexResult { - let input = expr.children()[0].evaluate(scope)?.to_struct(); - let field = input.field_by_name(field_name).cloned()?; + let input = expr.children()[0].evaluate(scope)?; - match input.dtype().nullability() { - Nullability::NonNullable => Ok(field), - Nullability::Nullable => mask(&field, &input.validity_mask().not()), + // Struct field access: `$.a`. + if input.dtype().is_struct() { + let input = input.to_struct(); + let field = input.field_by_name(field_name).cloned()?; + + return match input.dtype().nullability() { + Nullability::NonNullable => Ok(field), + Nullability::Nullable => mask(&field, &input.validity_mask().not()), + }; } + + Err(vortex_err!( + "Expected struct scope for GetItem evaluation, got {}", + input.dtype() + )) } fn execute(&self, field_name: &FieldName, mut args: ExecutionArgs) -> VortexResult { - let struct_dtype = args.dtypes[0] - .as_struct_fields_opt() - .ok_or_else(|| vortex_err!("Expected struct dtype for child of GetItem expression"))?; - let field_idx = struct_dtype - .find(field_name) - .ok_or_else(|| vortex_err!("Field {} not found in struct dtype", field_name))?; - - match args.datums.pop().vortex_expect("missing input") { - Datum::Scalar(s) => { - let mut field = s.as_struct().field(field_idx); - field.mask_validity(s.is_valid()); - Ok(Datum::Scalar(field)) - } - Datum::Vector(v) => { - let mut field = v.as_struct().fields()[field_idx].clone(); - field.mask_validity(v.validity()); - Ok(Datum::Vector(field)) - } + let input_dtype = &args.dtypes[0]; + + // Struct field access: `$.a`. + if let Some(struct_dtype) = input_dtype.as_struct_fields_opt() { + let field_idx = struct_dtype + .find(field_name) + .ok_or_else(|| vortex_err!("Field {} not found in struct dtype", field_name))?; + + return match args.datums.pop().vortex_expect("missing input") { + Datum::Scalar(s) => { + let mut field = s.as_struct().field(field_idx); + field.mask_validity(s.is_valid()); + Ok(Datum::Scalar(field)) + } + Datum::Vector(v) => { + let mut field = v.as_struct().fields()[field_idx].clone(); + field.mask_validity(v.validity()); + Ok(Datum::Vector(field)) + } + }; + } + + Err(vortex_err!( + "Expected struct dtype for child of GetItem expression, got {}", + input_dtype + )) + } + + fn simplify( + &self, + field_name: &FieldName, + expr: &Expression, + ctx: &dyn SimplifyCtx, + ) -> VortexResult> { + let child = expr.child(0); + let child_dtype = ctx.return_dtype(child)?; + + let element_dtype = match child_dtype { + DType::List(element_dtype, _) => Some(element_dtype), + DType::FixedSizeList(element_dtype, ..) => Some(element_dtype), + _ => None, + }; + + if let Some(element_dtype) = element_dtype + && element_dtype.as_struct_fields_opt().is_some() + { + Ok(Some(get_item_list(field_name.clone(), child.clone()))) + } else { + Ok(None) } } @@ -261,6 +302,8 @@ pub fn get_item(field: impl Into, child: Expression) -> Expression { #[cfg(test)] mod tests { + use std::sync::Arc; + use vortex_buffer::buffer; use vortex_dtype::DType; use vortex_dtype::FieldNames; @@ -272,9 +315,12 @@ mod tests { use crate::Array; use crate::IntoArray; + use crate::arrays::FixedSizeListArray; + use crate::arrays::ListArray; use crate::arrays::StructArray; use crate::expr::exprs::binary::checked_add; use crate::expr::exprs::get_item::get_item; + use crate::expr::exprs::get_item_list::get_item_list; use crate::expr::exprs::literal::lit; use crate::expr::exprs::pack::pack; use crate::expr::exprs::root::root; @@ -322,6 +368,182 @@ mod tests { ); } + #[test] + fn get_item_list_of_struct() { + let element_dtype = Arc::new(DType::Struct( + [ + ("a", DType::Primitive(PType::I32, NonNullable)), + ("b", DType::Utf8(NonNullable)), + ] + .into_iter() + .collect(), + NonNullable, + )); + + let row_count = 4; + let items = ListArray::from_iter_opt_slow::( + [ + Some(vec![ + Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(1i32, NonNullable), + Scalar::utf8("x", NonNullable), + ], + ), + Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(2i32, NonNullable), + Scalar::utf8("y", NonNullable), + ], + ), + ]), + Some(Vec::new()), + None, + Some(vec![Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(3i32, NonNullable), + Scalar::utf8("z", NonNullable), + ], + )]), + ], + element_dtype, + ) + .unwrap(); + + let ids = buffer![0i32, 1, 2, 3].into_array(); + + let data = StructArray::new( + FieldNames::from(["id", "items"]), + vec![ids, items], + row_count, + Validity::NonNullable, + ) + .into_array(); + + // Regression for nested field projection on list-of-struct: `items.a`. + let projection = get_item_list("a", get_item("items", root())); + let out = data.apply(&projection).expect("apply"); + + assert_eq!( + out.dtype(), + &DType::List( + Arc::new(DType::Primitive(PType::I32, NonNullable)), + Nullability::Nullable + ) + ); + + assert_eq!( + out.scalar_at(0).as_list().elements().unwrap().to_vec(), + vec![ + Scalar::primitive(1i32, NonNullable), + Scalar::primitive(2i32, NonNullable), + ] + ); + assert!(out.scalar_at(1).as_list().elements().unwrap().is_empty()); + assert!(out.scalar_at(2).is_null()); + assert_eq!( + out.scalar_at(3).as_list().elements().unwrap().to_vec(), + vec![Scalar::primitive(3i32, NonNullable)] + ); + } + + #[test] + fn get_item_fixed_size_list_of_struct() { + let n_lists: usize = 3; + let list_size: u32 = 2; + let n_elements = n_lists * list_size as usize; + + let struct_elems = StructArray::try_new( + FieldNames::from(["a", "b"]), + vec![ + buffer![1i32, 2, 3, 4, 5, 6].into_array(), + buffer![10i64, 20, 30, 40, 50, 60].into_array(), + ], + n_elements, + Validity::from_iter([true, false, true, true, false, true]), + ) + .unwrap() + .into_array(); + + let items = FixedSizeListArray::try_new( + struct_elems, + list_size, + Validity::from_iter([true, false, true]), + n_lists, + ) + .unwrap() + .into_array(); + + let ids = buffer![0i32, 1, 2].into_array(); + + let data = StructArray::new( + FieldNames::from(["id", "items"]), + vec![ids, items], + n_lists, + Validity::NonNullable, + ) + .into_array(); + + // FixedSizeList-of-struct projection: `items.a`, including struct-level nulls inside the list. + let projection = get_item_list("a", get_item("items", root())); + let out = data.apply(&projection).expect("apply"); + + assert_eq!( + out.dtype(), + &DType::FixedSizeList( + Arc::new(DType::Primitive(PType::I32, Nullability::Nullable)), + list_size, + Nullability::Nullable + ) + ); + + assert_eq!( + out.scalar_at(0).as_list().elements().unwrap().to_vec(), + vec![ + Scalar::primitive(1i32, Nullability::Nullable), + Scalar::null(DType::Primitive(PType::I32, Nullability::Nullable)), + ] + ); + assert!(out.scalar_at(1).is_null()); + assert_eq!( + out.scalar_at(2).as_list().elements().unwrap().to_vec(), + vec![ + Scalar::null(DType::Primitive(PType::I32, Nullability::Nullable)), + Scalar::primitive(6i32, Nullability::Nullable), + ] + ); + } + + #[test] + fn get_item_list_of_struct_desugars_to_get_item_list_on_optimize() { + let scope_dtype = DType::Struct( + [ + ("id", DType::from(PType::I32)), + ( + "items", + DType::List( + Arc::new(DType::Struct( + [("a", DType::from(PType::I32))].into_iter().collect(), + NonNullable, + )), + Nullability::Nullable, + ), + ), + ] + .into_iter() + .collect(), + NonNullable, + ); + + let expr = get_item("a", get_item("items", root())); + let optimized = expr.optimize_recursive(&scope_dtype).unwrap(); + + assert!(optimized.is::()); + } + #[test] fn test_pack_get_item_rule() { // Create: pack(a: lit(1), b: lit(2)).get_item("b") diff --git a/vortex-array/src/expr/exprs/get_item_list.rs b/vortex-array/src/expr/exprs/get_item_list.rs new file mode 100644 index 00000000000..72d0c56d48e --- /dev/null +++ b/vortex-array/src/expr/exprs/get_item_list.rs @@ -0,0 +1,273 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::fmt::Formatter; +use std::ops::Not; +use std::sync::Arc; + +use vortex_dtype::DType; +use vortex_dtype::FieldName; +use vortex_dtype::Nullability; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_vector::Datum; +use vortex_vector::ScalarOps; +use vortex_vector::VectorMutOps; +use vortex_vector::VectorOps; +use vortex_vector::fixed_size_list::FixedSizeListVector; +use vortex_vector::listview::ListViewVector; + +use crate::ArrayRef; +use crate::IntoArray; +use crate::ToCanonical; +use crate::arrays::FixedSizeListArray; +use crate::arrays::ListViewArray; +use crate::compute::mask; +use crate::expr::Arity; +use crate::expr::ChildName; +use crate::expr::ExecutionArgs; +use crate::expr::ExprId; +use crate::expr::Expression; +use crate::expr::VTable; +use crate::expr::VTableExt; + +/// UNSTABLE: project a struct field from each element of a list. +/// +/// Semantics: +/// `get_item_list(col, array) == map(lambda x: get_item(col, x), array)`. +/// +/// This is a temporary internal expression used to support nested projections like `items.a` on +/// `list` and `fixed_size_list` without a general `map` expression. +/// +/// Do not serialize or persist this expression. It is not a stable part of the expression wire +/// format and may be removed or replaced by a proper `map`. +pub struct GetItemList; + +fn project_struct_elements_field(elements: &ArrayRef, field_name: &FieldName) -> VortexResult { + if !elements.dtype().is_struct() { + vortex_bail!( + "Expected list element struct dtype for GetItemList evaluation, got {}", + elements.dtype() + ); + } + + let elements = elements.to_struct(); + let field = elements.field_by_name(field_name).cloned()?; + + match elements.dtype().nullability() { + Nullability::NonNullable => Ok(field), + Nullability::Nullable => mask(&field, &elements.validity_mask().not()), + } +} + +fn project_struct_elements_vector( + elements: &vortex_vector::Vector, + field_idx: usize, +) -> VortexResult { + let vortex_vector::Vector::Struct(elements) = elements else { + vortex_bail!("Expected list element struct vector for GetItemList execution"); + }; + + let mut field = elements + .fields() + .get(field_idx) + .cloned() + .vortex_expect("field index must be valid for struct vector"); + field.mask_validity(elements.validity()); + Ok(field) +} + +impl VTable for GetItemList { + type Options = FieldName; + + fn id(&self) -> ExprId { + ExprId::from("vortex.get_item_list") + } + + fn serialize(&self, _field_name: &FieldName) -> VortexResult>> { + vortex_bail!("UNSTABLE expression vortex.get_item_list must not be serialized") + } + + fn deserialize(&self, _metadata: &[u8]) -> VortexResult { + vortex_bail!("UNSTABLE expression vortex.get_item_list must not be deserialized") + } + + fn arity(&self, _field_name: &FieldName) -> Arity { + Arity::Exact(1) + } + + fn child_name(&self, _field_name: &FieldName, child_idx: usize) -> ChildName { + match child_idx { + 0 => ChildName::from("list"), + _ => unreachable!( + "Invalid child index {} for GetItemList expression", + child_idx + ), + } + } + + fn fmt_sql( + &self, + field_name: &FieldName, + expr: &Expression, + f: &mut Formatter<'_>, + ) -> std::fmt::Result { + expr.child(0).fmt_sql(f)?; + write!(f, ".{}", field_name) + } + + fn return_dtype(&self, field_name: &FieldName, arg_dtypes: &[DType]) -> VortexResult { + let list_dtype = &arg_dtypes[0]; + + let (element_dtype, list_nullability, list_size) = match list_dtype { + DType::List(element_dtype, list_nullability) => { + (element_dtype.as_ref(), *list_nullability, None) + } + DType::FixedSizeList(element_dtype, list_size, list_nullability) => { + (element_dtype.as_ref(), *list_nullability, Some(*list_size)) + } + _ => vortex_bail!( + "Expected list dtype for child of GetItemList expression, got {}", + list_dtype + ), + }; + + let Some(struct_fields) = element_dtype.as_struct_fields_opt() else { + vortex_bail!( + "Expected list element struct dtype for GetItemList, got {}", + element_dtype + ); + }; + + let Some(field_dtype) = struct_fields.field(field_name) else { + vortex_bail!( + "Couldn't find the {} field in the list element struct dtype", + field_name + ); + }; + + let projected = field_dtype.union_nullability(element_dtype.nullability()); + + Ok(match list_size { + Some(list_size) => { + DType::FixedSizeList(Arc::new(projected), list_size, list_nullability) + } + None => DType::List(Arc::new(projected), list_nullability), + }) + } + + fn execute(&self, field_name: &FieldName, mut args: ExecutionArgs) -> VortexResult { + let list_dtype = &args.dtypes[0]; + + let element_dtype = match list_dtype { + DType::List(element_dtype, _) => element_dtype.as_ref(), + DType::FixedSizeList(element_dtype, ..) => element_dtype.as_ref(), + _ => vortex_bail!( + "Expected list dtype for child of GetItemList expression, got {}", + list_dtype + ), + }; + + let Some(struct_fields) = element_dtype.as_struct_fields_opt() else { + vortex_bail!( + "Expected list element struct dtype for GetItemList, got {}", + element_dtype + ); + }; + + let Some(field_idx) = struct_fields.find(field_name) else { + vortex_bail!("Field {} not found in struct dtype", field_name); + }; + + let input = args.datums.pop().vortex_expect("missing list"); + + let project_list_vector = |list_vec: &vortex_vector::Vector| -> VortexResult { + match list_vec { + vortex_vector::Vector::List(list) => { + let elements = project_struct_elements_vector(list.elements().as_ref(), field_idx)?; + Ok(vortex_vector::Vector::List(ListViewVector::new( + Arc::new(elements), + list.offsets().clone(), + list.sizes().clone(), + list.validity().clone(), + ))) + } + vortex_vector::Vector::FixedSizeList(list) => { + let elements = + project_struct_elements_vector(list.elements().as_ref(), field_idx)?; + Ok(vortex_vector::Vector::FixedSizeList(FixedSizeListVector::new( + Arc::new(elements), + list.list_size(), + list.validity().clone(), + ))) + } + _ => vortex_bail!("Expected list scope for GetItemList execution"), + } + }; + + Ok(match input { + Datum::Scalar(s) => { + let vec = s.repeat(1).freeze(); + let projected = project_list_vector(&vec)?; + Datum::Scalar(projected.scalar_at(0)) + } + Datum::Vector(v) => Datum::Vector(project_list_vector(&v)?), + }) + } + + fn evaluate( + &self, + field_name: &FieldName, + expr: &Expression, + scope: &ArrayRef, + ) -> VortexResult { + let input = expr.child(0).evaluate(scope)?; + + match input.dtype() { + DType::List(..) => { + let list = input.to_listview(); + let field = project_struct_elements_field(list.elements(), field_name)?; + + Ok(ListViewArray::try_new( + field, + list.offsets().clone(), + list.sizes().clone(), + list.validity()?, + )? + .into_array()) + } + DType::FixedSizeList(..) => { + let list = input.to_fixed_size_list(); + let field = project_struct_elements_field(list.elements(), field_name)?; + + Ok(FixedSizeListArray::try_new( + field, + list.list_size(), + list.validity()?, + list.len(), + )? + .into_array()) + } + _ => vortex_bail!( + "Expected list scope for GetItemList evaluation, got {}", + input.dtype() + ), + } + } + + fn is_null_sensitive(&self, _field_name: &FieldName) -> bool { + true + } + + fn is_fallible(&self, _field_name: &FieldName) -> bool { + false + } +} + +/// Creates an expression that projects a struct field from each element of a list. +#[doc(hidden)] +pub fn get_item_list(field: impl Into, list: Expression) -> Expression { + GetItemList.new_expr(field.into(), vec![list]) +} + diff --git a/vortex-array/src/expr/exprs/mod.rs b/vortex-array/src/expr/exprs/mod.rs index c606b53f5a0..748d439dd93 100644 --- a/vortex-array/src/expr/exprs/mod.rs +++ b/vortex-array/src/expr/exprs/mod.rs @@ -6,6 +6,7 @@ pub(crate) mod binary; pub(crate) mod cast; pub(crate) mod dynamic; pub(crate) mod get_item; +pub(crate) mod get_item_list; pub(crate) mod is_null; pub(crate) mod like; pub(crate) mod list_contains; @@ -23,6 +24,7 @@ pub use binary::*; pub use cast::*; pub use dynamic::*; pub use get_item::*; +pub use get_item_list::*; pub use is_null::*; pub use like::*; pub use list_contains::*; diff --git a/vortex-array/src/expr/session.rs b/vortex-array/src/expr/session.rs index f86f860e002..8baf0e2d8f0 100644 --- a/vortex-array/src/expr/session.rs +++ b/vortex-array/src/expr/session.rs @@ -10,6 +10,7 @@ use crate::expr::exprs::between::Between; use crate::expr::exprs::binary::Binary; use crate::expr::exprs::cast::Cast; use crate::expr::exprs::get_item::GetItem; +use crate::expr::exprs::get_item_list::GetItemList; use crate::expr::exprs::is_null::IsNull; use crate::expr::exprs::like::Like; use crate::expr::exprs::list_contains::ListContains; @@ -55,6 +56,7 @@ impl Default for ExprSession { ExprVTable::new_static(&Binary), ExprVTable::new_static(&Cast), ExprVTable::new_static(&GetItem), + ExprVTable::new_static(&GetItemList), ExprVTable::new_static(&IsNull), ExprVTable::new_static(&Like), ExprVTable::new_static(&ListContains), diff --git a/vortex-array/src/expr/vtable.rs b/vortex-array/src/expr/vtable.rs index e15d539944c..01bbb666352 100644 --- a/vortex-array/src/expr/vtable.rs +++ b/vortex-array/src/expr/vtable.rs @@ -709,6 +709,7 @@ mod tests { use crate::expr::exprs::cast::cast; use crate::expr::exprs::get_item::col; use crate::expr::exprs::get_item::get_item; + use crate::expr::exprs::get_item_list::get_item_list; use crate::expr::exprs::is_null::is_null; use crate::expr::exprs::list_contains::list_contains; use crate::expr::exprs::literal::lit; @@ -791,4 +792,16 @@ mod tests { Ok(()) } + + #[test] + fn get_item_list_is_not_serializable() { + let expr = get_item_list("field", root()); + let err = expr + .serialize_proto() + .expect_err("get_item_list must not be serializable"); + assert!( + err.to_string().contains("must not be serialized"), + "unexpected error: {err}" + ); + } } diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index c6c2badc528..cff6ed67212 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -196,8 +196,8 @@ impl FileOpener for VortexOpener { )?; // The schema of the stream returned from the vortex scan. - let scan_dtype = scan_projection.return_dtype(vxf.dtype()).map_err(|_e| { - exec_datafusion_err!("Couldn't get the dtype for the underlying Vortex scan") + let scan_dtype = scan_projection.return_dtype(vxf.dtype()).map_err(|e| { + exec_datafusion_err!("Couldn't get the dtype for the underlying Vortex scan: {e}") })?; let stream_schema = scan_dtype.to_arrow_schema().map_err(|_e| { exec_datafusion_err!("Couldn't get the schema for the underlying Vortex scan") diff --git a/vortex-duckdb/src/convert/table_filter.rs b/vortex-duckdb/src/convert/table_filter.rs index bab62cbe838..568f6098775 100644 --- a/vortex-duckdb/src/convert/table_filter.rs +++ b/vortex-duckdb/src/convert/table_filter.rs @@ -40,7 +40,7 @@ pub fn try_from_table_filter( TableFilterClass::ConjunctionAnd(conj_and) => { let Some(children) = conj_and .children() - .map(|child| try_from_table_filter(&child, col, scope_dtype)) + .map(|child| try_from_table_filter(&child, &col, scope_dtype)) .try_collect::<_, Option>, _>()? else { return Ok(None); @@ -52,7 +52,7 @@ pub fn try_from_table_filter( TableFilterClass::ConjunctionOr(disjuction_or) => { let Some(children) = disjuction_or .children() - .map(|child| try_from_table_filter(&child, col, scope_dtype)) + .map(|child| try_from_table_filter(&child, &col, scope_dtype)) .try_collect::<_, Option>, _>()? else { return Ok(None); @@ -67,7 +67,7 @@ pub fn try_from_table_filter( } TableFilterClass::Optional(child) => { // Optional expressions are optional not yet supported. - return try_from_table_filter(&child, col, scope_dtype).or_else(|_err| { + return try_from_table_filter(&child, &col, scope_dtype).or_else(|_err| { // Failed to convert the optional expression, but it's optional, so who cares? Ok(None) }); diff --git a/vortex-file/tests/test_write_table.rs b/vortex-file/tests/test_write_table.rs index 388e52294e2..0fde610182f 100644 --- a/vortex-file/tests/test_write_table.rs +++ b/vortex-file/tests/test_write_table.rs @@ -11,13 +11,19 @@ use futures::pin_mut; use vortex_array::Array; use vortex_array::IntoArray; use vortex_array::ToCanonical; +use vortex_array::arrays::ListArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::StructArray; +use vortex_array::assert_arrays_eq; +use vortex_array::expr::get_item; +use vortex_array::expr::root; use vortex_array::expr::session::ExprSession; use vortex_array::session::ArraySession; use vortex_array::validity::Validity; use vortex_buffer::ByteBuffer; use vortex_dtype::FieldNames; +use vortex_dtype::Nullability; +use vortex_dtype::PType; use vortex_dtype::field_path; use vortex_file::OpenOptionsSessionExt; use vortex_file::WriteOptionsSessionExt; @@ -27,6 +33,7 @@ use vortex_layout::layouts::flat::writer::FlatLayoutStrategy; use vortex_layout::layouts::table::TableStrategy; use vortex_layout::session::LayoutSession; use vortex_metrics::VortexMetrics; +use vortex_scalar::Scalar; use vortex_session::VortexSession; static SESSION: LazyLock = LazyLock::new(|| { @@ -114,3 +121,129 @@ async fn test_file_roundtrip() { assert!(raw.nbytes() > compressed.nbytes()); } } + +#[tokio::test] +async fn test_list_of_struct_nested_projection() { + // A list of structs should support nested field projection (e.g. `items.a`) without requiring + // users to pre-flatten their schemas. + + let element_dtype = Arc::new(vortex_dtype::DType::Struct( + [ + ( + "a", + vortex_dtype::DType::Primitive(PType::I32, Nullability::NonNullable), + ), + ("b", vortex_dtype::DType::Utf8(Nullability::NonNullable)), + ] + .into_iter() + .collect(), + Nullability::NonNullable, + )); + + let row_count = 4; + let items = ListArray::from_iter_opt_slow::( + [ + Some(vec![ + Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(1i32, Nullability::NonNullable), + Scalar::utf8("x", Nullability::NonNullable), + ], + ), + Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(2i32, Nullability::NonNullable), + Scalar::utf8("y", Nullability::NonNullable), + ], + ), + ]), + Some(Vec::new()), + None, + Some(vec![Scalar::struct_( + (*element_dtype).clone(), + vec![ + Scalar::primitive(3i32, Nullability::NonNullable), + Scalar::utf8("z", Nullability::NonNullable), + ], + )]), + ], + element_dtype.clone(), + ) + .unwrap(); + + let ids = PrimitiveArray::from_iter( + (0..row_count).map(|i| i32::try_from(i).expect("row id fits in i32")), + ) + .into_array(); + + let data = StructArray::new( + FieldNames::from(["id", "items"]), + vec![ids, items.clone()], + row_count, + Validity::NonNullable, + ) + .into_array(); + + // Keep the writer intentionally simple (flat) so the layout shape is deterministic. + let writer = Arc::new(TableStrategy::default()); + + let mut bytes = Vec::new(); + SESSION + .write_options() + .with_strategy(writer) + .write(&mut bytes, data.to_array_stream()) + .await + .expect("write"); + + let bytes = ByteBuffer::from(bytes); + let vxf = SESSION.open_options().open_buffer(bytes).expect("open"); + + // Project `items.a` by applying `get_item("a", ...)` to a `List`. + let projection = get_item("a".to_string(), get_item("items".to_string(), root())); + + let mut stream = vxf + .scan() + .expect("scan") + .with_projection(projection.clone()) + .into_stream() + .expect("into_stream"); + + let out = stream.next().await.expect("first batch").expect("batch"); + + let expected = ListArray::from_iter_opt_slow::( + [ + Some(vec![ + Scalar::primitive(1i32, Nullability::NonNullable), + Scalar::primitive(2i32, Nullability::NonNullable), + ]), + Some(Vec::new()), + None, + Some(vec![Scalar::primitive(3i32, Nullability::NonNullable)]), + ], + Arc::new(vortex_dtype::DType::Primitive( + PType::I32, + Nullability::NonNullable, + )), + ) + .unwrap() + .into_array(); + let simplified = projection.optimize_recursive(data.dtype()).unwrap(); + let expected_dtype = simplified.return_dtype(data.dtype()).unwrap(); + assert_eq!(out.dtype(), &expected_dtype); + assert_arrays_eq!(expected, out); + + // Verify the list column is not stored as a single flat blob layout. + // This is the root cause of poor nested support described in #4889. + let root_layout = vxf.footer().layout(); + let items_layout = (0..root_layout.nchildren()) + .find_map(|idx| match root_layout.child_type(idx) { + vortex_layout::LayoutChildType::Field(ref name) if name.as_ref() == "items" => { + Some(root_layout.child(idx).expect("items child")) + } + _ => None, + }) + .expect("items layout"); + assert_eq!(items_layout.encoding_id().as_ref(), "vortex.list"); +} diff --git a/vortex-layout/src/layouts/list/mod.rs b/vortex-layout/src/layouts/list/mod.rs new file mode 100644 index 00000000000..cbb9e532324 --- /dev/null +++ b/vortex-layout/src/layouts/list/mod.rs @@ -0,0 +1,223 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +mod reader; +pub mod writer; + +use std::sync::Arc; + +use reader::ListReader; +use vortex_array::ArrayContext; +use vortex_array::DeserializeMetadata; +use vortex_array::EmptyMetadata; +use vortex_dtype::DType; +use vortex_dtype::Nullability; +use vortex_dtype::PType; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_ensure; +use vortex_error::vortex_err; +use vortex_session::SessionExt; +use vortex_session::VortexSession; + +use crate::LayoutChildType; +use crate::LayoutEncodingRef; +use crate::LayoutId; +use crate::LayoutReaderRef; +use crate::LayoutRef; +use crate::VTable; +use crate::children::LayoutChildren; +use crate::children::OwnedLayoutChildren; +use crate::segments::SegmentId; +use crate::segments::SegmentSource; +use crate::vtable; + +vtable!(List); + +impl VTable for ListVTable { + type Layout = ListLayout; + type Encoding = ListLayoutEncoding; + type Metadata = EmptyMetadata; + + fn id(_encoding: &Self::Encoding) -> LayoutId { + LayoutId::new_ref("vortex.list") + } + + fn encoding(_layout: &Self::Layout) -> LayoutEncodingRef { + LayoutEncodingRef::new_ref(ListLayoutEncoding.as_ref()) + } + + fn row_count(layout: &Self::Layout) -> u64 { + layout.row_count + } + + fn dtype(layout: &Self::Layout) -> &DType { + &layout.dtype + } + + fn metadata(_layout: &Self::Layout) -> Self::Metadata { + EmptyMetadata + } + + fn segment_ids(_layout: &Self::Layout) -> Vec { + vec![] + } + + fn nchildren(layout: &Self::Layout) -> usize { + let validity_children = layout.dtype.is_nullable() as usize; + match &layout.dtype { + DType::List(..) => 2 + validity_children, // offsets + elements + DType::FixedSizeList(..) => 1 + validity_children, // elements + _ => 0, + } + } + + fn child(layout: &Self::Layout, index: usize) -> VortexResult { + let is_nullable = layout.dtype.is_nullable(); + let offsets_dtype = DType::Primitive(PType::U64, Nullability::NonNullable); + + let child_dtype = match (&layout.dtype, is_nullable, index) { + // validity + (_, true, 0) => DType::Bool(Nullability::NonNullable), + + // variable-size list + (DType::List(..), false, 0) => offsets_dtype, + (DType::List(element_dtype, _), false, 1) => (*element_dtype.as_ref()).clone(), + (DType::List(..), true, 1) => offsets_dtype, + (DType::List(element_dtype, _), true, 2) => (*element_dtype.as_ref()).clone(), + + // fixed-size list + (DType::FixedSizeList(element_dtype, ..), false, 0) => { + (*element_dtype.as_ref()).clone() + } + (DType::FixedSizeList(element_dtype, ..), true, 1) => (*element_dtype.as_ref()).clone(), + + _ => return Err(vortex_err!("Invalid child index {index} for list layout")), + }; + + layout.children.child(index, &child_dtype) + } + + fn child_type(layout: &Self::Layout, idx: usize) -> LayoutChildType { + let is_nullable = layout.dtype.is_nullable(); + + if is_nullable && idx == 0 { + return LayoutChildType::Auxiliary("validity".into()); + } + + match &layout.dtype { + DType::List(..) => { + let offsets_idx = if is_nullable { 1 } else { 0 }; + if idx == offsets_idx { + LayoutChildType::Auxiliary("offsets".into()) + } else { + LayoutChildType::Auxiliary("elements".into()) + } + } + DType::FixedSizeList(..) => LayoutChildType::Auxiliary("elements".into()), + _ => unreachable!( + "ListLayout only supports List and FixedSizeList dtypes, got {}", + layout.dtype() + ), + } + } + + fn new_reader( + layout: &Self::Layout, + name: Arc, + segment_source: Arc, + session: &VortexSession, + ) -> VortexResult { + Ok(Arc::new(ListReader::try_new( + layout.clone(), + name, + segment_source, + session.session(), + )?)) + } + + fn build( + _encoding: &Self::Encoding, + dtype: &DType, + row_count: u64, + _metadata: &::Output, + _segment_ids: Vec, + children: &dyn LayoutChildren, + _ctx: ArrayContext, + ) -> VortexResult { + vortex_ensure!( + matches!(dtype, DType::List(..) | DType::FixedSizeList(..)), + "Expected list dtype, got {}", + dtype + ); + + let expected_children = match dtype { + DType::List(..) => 2 + (dtype.is_nullable() as usize), + DType::FixedSizeList(..) => 1 + (dtype.is_nullable() as usize), + _ => unreachable!(), + }; + vortex_ensure!( + children.nchildren() == expected_children, + "List layout has {} children, expected {}", + children.nchildren(), + expected_children + ); + + Ok(ListLayout { + row_count, + dtype: dtype.clone(), + children: children.to_arc(), + }) + } + + fn with_children(layout: &mut Self::Layout, children: Vec) -> VortexResult<()> { + let expected_children = match layout.dtype { + DType::List(..) => 2 + (layout.dtype.is_nullable() as usize), + DType::FixedSizeList(..) => 1 + (layout.dtype.is_nullable() as usize), + _ => vortex_bail!("Expected list dtype, got {}", layout.dtype), + }; + vortex_ensure!( + children.len() == expected_children, + "ListLayout expects {} children, got {}", + expected_children, + children.len() + ); + layout.children = OwnedLayoutChildren::layout_children(children); + Ok(()) + } +} + +#[derive(Debug)] +pub struct ListLayoutEncoding; + +#[derive(Clone, Debug)] +pub struct ListLayout { + row_count: u64, + dtype: DType, + children: Arc, +} + +impl ListLayout { + pub fn new(row_count: u64, dtype: DType, children: Vec) -> Self { + Self { + row_count, + dtype, + children: OwnedLayoutChildren::layout_children(children), + } + } + + #[inline] + pub fn row_count(&self) -> u64 { + self.row_count + } + + #[inline] + pub fn dtype(&self) -> &DType { + &self.dtype + } + + #[inline] + pub fn children(&self) -> &Arc { + &self.children + } +} diff --git a/vortex-layout/src/layouts/list/reader.rs b/vortex-layout/src/layouts/list/reader.rs new file mode 100644 index 00000000000..66ecad3dd5f --- /dev/null +++ b/vortex-layout/src/layouts/list/reader.rs @@ -0,0 +1,509 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::collections::BTreeSet; +use std::ops::BitAnd; +use std::ops::Range; +use std::sync::Arc; + +use futures::try_join; +use vortex_array::Array; +use vortex_array::IntoArray; +use vortex_array::MaskFuture; +use vortex_array::ToCanonical; +use vortex_array::VortexSessionExecute; +use vortex_array::arrays::FixedSizeListArray; +use vortex_array::arrays::ListArray; +use vortex_array::expr::Expression; +use vortex_array::expr::Select; +use vortex_array::expr::get_item; +use vortex_array::expr::root; +use vortex_dtype::DType; +use vortex_dtype::FieldMask; +use vortex_dtype::FieldName; +use vortex_dtype::Nullability; +use vortex_dtype::PType; +use vortex_error::VortexExpect; +use vortex_error::VortexResult; +use vortex_error::vortex_bail; +use vortex_error::vortex_err; +use vortex_mask::Mask; +use vortex_session::VortexSession; + +use crate::ArrayFuture; +use crate::LayoutReader; +use crate::LayoutReaderRef; +use crate::LazyReaderChildren; +use crate::layouts::list::ListLayout; +use crate::segments::SegmentSource; + +pub struct ListReader { + layout: ListLayout, + name: Arc, + lazy_children: LazyReaderChildren, + session: VortexSession, +} + +impl ListReader { + pub(super) fn try_new( + layout: ListLayout, + name: Arc, + segment_source: Arc, + session: VortexSession, + ) -> VortexResult { + let mut dtypes: Vec = Vec::new(); + let mut names: Vec> = Vec::new(); + + if layout.dtype().is_nullable() { + dtypes.push(DType::Bool(Nullability::NonNullable)); + names.push(Arc::from("validity")); + } + + match layout.dtype() { + DType::List(element_dtype, _) => { + dtypes.push(DType::Primitive(PType::U64, Nullability::NonNullable)); + names.push(Arc::from("offsets")); + dtypes.push((**element_dtype).clone()); + names.push(Arc::from("elements")); + } + DType::FixedSizeList(element_dtype, ..) => { + dtypes.push((**element_dtype).clone()); + names.push(Arc::from("elements")); + } + _ => vortex_bail!("Expected list dtype, got {}", layout.dtype()), + } + + let lazy_children = LazyReaderChildren::new( + layout.children().clone(), + dtypes, + names, + segment_source, + session.clone(), + ); + + Ok(Self { + layout, + name, + lazy_children, + session, + }) + } + + fn validity(&self) -> VortexResult> { + self.dtype() + .is_nullable() + .then(|| self.lazy_children.get(0)) + .transpose() + } + + fn offsets(&self) -> VortexResult<&LayoutReaderRef> { + let idx = if self.dtype().is_nullable() { 1 } else { 0 }; + self.lazy_children.get(idx) + } + + fn elements(&self) -> VortexResult<&LayoutReaderRef> { + let idx = match self.dtype() { + DType::List(..) => { + if self.dtype().is_nullable() { + 2 + } else { + 1 + } + } + DType::FixedSizeList(..) => { + if self.dtype().is_nullable() { + 1 + } else { + 0 + } + } + _ => return Err(vortex_err!("Expected list dtype, got {}", self.dtype())), + }; + self.lazy_children.get(idx) + } + + /// Creates a future that will produce a slice of this list array. + /// + /// The produced slice may have a projection applied to its elements. + fn list_slice_future( + &self, + row_range: Range, + element_expr: &Expression, + ) -> VortexResult { + let dtype = self.dtype().clone(); + let validity_fut = self + .validity()? + .map(|reader| { + let len = usize::try_from(row_range.end - row_range.start) + .vortex_expect("row range must fit in usize"); + reader.projection_evaluation(&row_range, &root(), MaskFuture::new_true(len)) + }) + .transpose()?; + + match dtype { + DType::List(_, list_nullability) => { + let offsets_reader = self.offsets()?.clone(); + let elements_reader = self.elements()?.clone(); + let row_range_clone = row_range.clone(); + let element_expr = element_expr.clone(); + + Ok(Box::pin(async move { + let row_len = usize::try_from(row_range_clone.end - row_range_clone.start) + .vortex_expect("row range must fit in usize"); + + let offsets_row_range = row_range_clone.start..row_range_clone.end + 1; + let offsets_len = row_len + 1; + let offsets_fut = offsets_reader.projection_evaluation( + &offsets_row_range, + &root(), + MaskFuture::new_true(offsets_len), + )?; + + let (offsets, validity) = try_join!(offsets_fut, async move { + match validity_fut { + Some(v) => v.await.map(Some), + None => Ok(None), + } + })?; + + let offsets = offsets.to_primitive(); + let offsets_slice = offsets.as_slice::(); + let base = *offsets_slice.first().unwrap_or(&0u64); + let end = *offsets_slice.last().unwrap_or(&base); + + let elements_row_range = base..end; + let elements_len = usize::try_from(end - base) + .vortex_expect("element range must fit in usize"); + let elements = elements_reader.projection_evaluation( + &elements_row_range, + &element_expr, + MaskFuture::new_true(elements_len), + )?; + + let elements = elements.await?; + + let normalized_offsets = vortex_array::arrays::PrimitiveArray::from_iter( + offsets_slice.iter().map(|v| *v - base), + ) + .into_array(); + + let validity = match (list_nullability, validity) { + (Nullability::NonNullable, _) => { + vortex_array::validity::Validity::NonNullable + } + (Nullability::Nullable, Some(v)) => { + vortex_array::validity::Validity::Array(v) + } + (Nullability::Nullable, None) => vortex_array::validity::Validity::AllValid, + }; + + Ok(ListArray::try_new(elements, normalized_offsets, validity)?.into_array()) + })) + } + DType::FixedSizeList(_, list_size, list_nullability) => { + let elements_reader = self.elements()?.clone(); + let row_range_clone = row_range.clone(); + let element_expr = element_expr.clone(); + + Ok(Box::pin(async move { + let row_len_u64 = row_range_clone.end - row_range_clone.start; + let row_len = + usize::try_from(row_len_u64).vortex_expect("row range must fit in usize"); + + let list_size_u64 = u64::from(list_size); + let element_start = row_range_clone + .start + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element start overflow"))?; + let element_end = row_range_clone + .end + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element end overflow"))?; + + let elements_row_range = element_start..element_end; + let elements_len = usize::try_from(element_end - element_start) + .vortex_expect("element range must fit in usize"); + let elements_fut = elements_reader.projection_evaluation( + &elements_row_range, + &element_expr, + MaskFuture::new_true(elements_len), + )?; + + let (elements, validity) = try_join!(elements_fut, async move { + match validity_fut { + Some(v) => v.await.map(Some), + None => Ok(None), + } + })?; + + let validity = match (list_nullability, validity) { + (Nullability::NonNullable, _) => { + vortex_array::validity::Validity::NonNullable + } + (Nullability::Nullable, Some(v)) => { + vortex_array::validity::Validity::Array(v) + } + (Nullability::Nullable, None) => vortex_array::validity::Validity::AllValid, + }; + + Ok( + FixedSizeListArray::try_new(elements, list_size, validity, row_len)? + .into_array(), + ) + })) + } + _ => Err(vortex_err!("Expected list dtype, got {}", dtype)), + } + } +} + +impl LayoutReader for ListReader { + fn name(&self) -> &Arc { + &self.name + } + + fn dtype(&self) -> &DType { + self.layout.dtype() + } + + fn row_count(&self) -> u64 { + self.layout.row_count() + } + + fn register_splits( + &self, + field_mask: &[FieldMask], + row_range: &Range, + splits: &mut BTreeSet, + ) -> VortexResult<()> { + splits.insert(row_range.end); + + match self.dtype() { + DType::FixedSizeList(_, list_size, _) => { + let list_size_u64 = u64::from(*list_size); + + let element_start = row_range + .start + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element start overflow"))?; + let element_end = row_range + .end + .checked_mul(list_size_u64) + .ok_or_else(|| vortex_err!("FixedSizeList element end overflow"))?; + + let element_range = element_start..element_end; + let mut element_splits = BTreeSet::new(); + self.elements()?.register_splits( + field_mask, + &element_range, + &mut element_splits, + )?; + + // Convert element splits back to row splits, but only when the element split + // is aligned to a row boundary. + for element_split in element_splits { + if element_split % list_size_u64 != 0 { + continue; + } + + let row_split = element_split / list_size_u64; + if row_split > row_range.start && row_split < row_range.end { + splits.insert(row_split); + } + } + } + // TODO(a10y): Variable-size lists can only be split "naturally" based on elements, + // but mapping element splits back to row ranges requires offsets (or precomputed + // row->element boundary metadata) that we don't have at split-planning time. + DType::List(..) => {} + _ => {} + } + + Ok(()) + } + + fn pruning_evaluation( + &self, + _row_range: &Range, + _expr: &Expression, + mask: Mask, + ) -> VortexResult { + Ok(MaskFuture::ready(mask)) + } + + fn filter_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + let row_range = row_range.clone(); + let expr = expr.clone(); + let session = self.session.clone(); + + let list_fut = self.list_slice_future(row_range.clone(), &root())?; + + Ok(MaskFuture::new( + usize::try_from(row_range.end - row_range.start) + .vortex_expect("row range must fit in usize"), + async move { + let (array, mask) = try_join!(list_fut, mask)?; + if mask.all_false() { + return Ok(mask); + } + + let array = array.apply(&expr)?; + let mut ctx = session.create_execution_ctx(); + let array_mask = array.execute::(&mut ctx)?; + + Ok(mask.bitand(&array_mask)) + }, + )) + } + + fn projection_evaluation( + &self, + row_range: &Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + // If the expression is a simple element projection, we can push it down to the elements. + // + // NOTE: `vortex.get_item_list` is a temporary list-of-struct projection expression; + // when pushing down we construct the element projection and pass it into the elements reader. + let (is_pushdown, element_expr) = if expr.id().as_ref() == "vortex.get_item_list" + && expr.child(0).id().as_ref() == "vortex.root" + { + let field_name = expr + .options() + .as_any() + .downcast_ref::() + .vortex_expect("vortex.get_item_list options must be a FieldName"); + (true, get_item(field_name.clone(), root())) + } else if expr.is::