diff --git a/vortex-duckdb/src/duckdb/table_function/mod.rs b/vortex-duckdb/src/duckdb/table_function/mod.rs index 95a09cce68b..c7b2a4a784e 100644 --- a/vortex-duckdb/src/duckdb/table_function/mod.rs +++ b/vortex-duckdb/src/duckdb/table_function/mod.rs @@ -157,11 +157,12 @@ impl Connection { .map(|logical_type| logical_type.as_ptr()) .collect::>(); - let param_names = T::named_parameters(); - let (param_names_ptrs, param_types_ptr) = param_names - .into_iter() - .map(|(name, logical_type)| (name.as_ptr(), logical_type.as_ptr())) - .unzip::<_, _, Vec<_>, Vec<_>>(); + // Keep the named parameters alive while we use their pointers + let named_params = T::named_parameters(); + let (named_param_names, named_param_types): (Vec<_>, Vec<_>) = + named_params.into_iter().unzip(); + let param_names_ptrs: Vec<_> = named_param_names.iter().map(|n| n.as_ptr()).collect(); + let param_types_ptr: Vec<_> = named_param_types.iter().map(|t| t.as_ptr()).collect(); let vtab = cpp::duckdb_vx_tfunc_vtab_t { name: name.as_ptr(), diff --git a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs index f6062362b46..9a40a6542cb 100644 --- a/vortex-duckdb/src/e2e_test/vortex_scan_test.rs +++ b/vortex-duckdb/src/e2e_test/vortex_scan_test.rs @@ -924,3 +924,246 @@ fn test_vortex_encodings_roundtrip() { let fixed_child_values = fixed_child.as_slice_with_len::(10); // 10 total child elements assert_eq!(fixed_child_values, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); } + +/// Helper function to write a vortex file to a specific path +async fn write_vortex_file_to_path(path: &Path, field_name: &str, array: impl IntoArray) { + let struct_array = StructArray::from_fields(&[(field_name, array.into_array())]).unwrap(); + let mut file = async_fs::File::create(path).await.unwrap(); + SESSION + .write_options() + .write(&mut file, struct_array.to_array_stream()) + .await + .unwrap(); +} + +#[test] +fn test_vortex_scan_hive_partitioning() { + // Create a directory structure with hive-style partitioning: + // tempdir/ + // month=01/ + // data.vortex (contains value 100) + // month=02/ + // data.vortex (contains value 200) + // month=03/ + // data.vortex (contains value 300) + let tempdir = tempfile::tempdir().unwrap(); + + RUNTIME.block_on(async { + for (month, value) in [("01", 100i32), ("02", 200), ("03", 300)] { + let partition_dir = tempdir.path().join(format!("month={month}")); + std::fs::create_dir(&partition_dir).unwrap(); + let file_path = partition_dir.join("data.vortex"); + write_vortex_file_to_path(&file_path, "value", buffer![value]).await; + } + }); + + let conn = database_connection(); + let glob_pattern = format!("{}/**/data.vortex", tempdir.path().display()); + + // Query with hive_partitioning enabled + let result = conn + .query(&format!( + "SELECT value, month FROM read_vortex('{glob_pattern}', hive_partitioning = true) ORDER BY value" + )) + .unwrap(); + + let mut values = Vec::new(); + let mut months = Vec::new(); + + for chunk in result { + let value_vec = chunk.get_vector(0); + let value_slice = value_vec.as_slice_with_len::(chunk.len().as_()); + values.extend(value_slice.iter().copied()); + + let mut month_vec = chunk.get_vector(1); + let month_slice = unsafe { month_vec.as_slice_mut::(chunk.len().as_()) }; + for month in month_slice.iter_mut() { + months.push(String::from_duckdb_value(month)); + } + } + + // Verify the values and months are correct and matched + assert_eq!(values, vec![100, 200, 300]); + assert_eq!(months, vec!["01", "02", "03"]); +} + +#[test] +fn test_vortex_scan_hive_partitioning_multiple_partitions() { + // Create a directory structure with multiple hive partition levels: + // tempdir/ + // year=2023/ + // month=01/ + // data.vortex (contains value 1) + // year=2023/ + // month=02/ + // data.vortex (contains value 2) + // year=2024/ + // month=01/ + // data.vortex (contains value 3) + let tempdir = tempfile::tempdir().unwrap(); + + RUNTIME.block_on(async { + for (year, month, value) in [("2023", "01", 1i32), ("2023", "02", 2), ("2024", "01", 3)] { + let partition_dir = tempdir.path().join(format!("year={year}/month={month}")); + std::fs::create_dir_all(&partition_dir).unwrap(); + let file_path = partition_dir.join("data.vortex"); + write_vortex_file_to_path(&file_path, "value", buffer![value]).await; + } + }); + + let conn = database_connection(); + let glob_pattern = format!("{}/**/data.vortex", tempdir.path().display()); + + // Query with hive_partitioning enabled and filter by year + let result = conn + .query(&format!( + "SELECT value, year, month FROM vortex_scan('{glob_pattern}', hive_partitioning = true) ORDER BY value" + )) + .unwrap(); + + let mut values = Vec::new(); + let mut years = Vec::new(); + let mut months = Vec::new(); + + for chunk in result { + let value_vec = chunk.get_vector(0); + let value_slice = value_vec.as_slice_with_len::(chunk.len().as_()); + values.extend(value_slice.iter().copied()); + + let mut year_vec = chunk.get_vector(1); + let year_slice = unsafe { year_vec.as_slice_mut::(chunk.len().as_()) }; + for year in year_slice.iter_mut() { + years.push(String::from_duckdb_value(year)); + } + + let mut month_vec = chunk.get_vector(2); + let month_slice = unsafe { month_vec.as_slice_mut::(chunk.len().as_()) }; + for month in month_slice.iter_mut() { + months.push(String::from_duckdb_value(month)); + } + } + + // Verify all partition columns are correctly extracted + assert_eq!(values, vec![1, 2, 3]); + assert_eq!(years, vec!["2023", "2023", "2024"]); + assert_eq!(months, vec!["01", "02", "01"]); +} + +#[test] +fn test_vortex_scan_hive_partitioning_filter_on_file_column() { + // Test that we can filter on file columns when using hive partitioning + let tempdir = tempfile::tempdir().unwrap(); + + RUNTIME.block_on(async { + for (month, value) in [("01", 100i32), ("02", 200), ("03", 300)] { + let partition_dir = tempdir.path().join(format!("month={month}")); + std::fs::create_dir(&partition_dir).unwrap(); + let file_path = partition_dir.join("data.vortex"); + write_vortex_file_to_path(&file_path, "value", buffer![value]).await; + } + }); + + let conn = database_connection(); + let glob_pattern = format!("{}/**/data.vortex", tempdir.path().display()); + + // Query with a filter on a file column (not partition column) + let result = conn + .query(&format!( + "SELECT value, month FROM vortex_scan('{glob_pattern}', hive_partitioning = true) WHERE value = 200" + )) + .unwrap(); + + let mut values = Vec::new(); + let mut months = Vec::new(); + + for chunk in result { + let value_vec = chunk.get_vector(0); + let value_slice = value_vec.as_slice_with_len::(chunk.len().as_()); + values.extend(value_slice.iter().copied()); + + let mut month_vec = chunk.get_vector(1); + let month_slice = unsafe { month_vec.as_slice_mut::(chunk.len().as_()) }; + for month in month_slice.iter_mut() { + months.push(String::from_duckdb_value(month)); + } + } + + // Only value=200 (month=02) should be returned + assert_eq!(values, vec![200]); + assert_eq!(months, vec!["02"]); +} + +// TODO: Filtering on partition columns is not yet supported. +// The filter pushdown mechanism passes partition column filters incorrectly. +// For now, users can filter on partition columns by using a subquery: +// SELECT * FROM (SELECT ... FROM vortex_scan(...)) WHERE partition_col = 'value' +#[test] +#[ignore = "partition column filter pushdown not yet supported"] +fn test_vortex_scan_hive_partitioning_filter_on_partition_column() { + // Test that we can filter on hive partition columns + let tempdir = tempfile::tempdir().unwrap(); + + RUNTIME.block_on(async { + for (month, value) in [("01", 100i32), ("02", 200), ("03", 300)] { + let partition_dir = tempdir.path().join(format!("month={month}")); + std::fs::create_dir(&partition_dir).unwrap(); + let file_path = partition_dir.join("data.vortex"); + write_vortex_file_to_path(&file_path, "value", buffer![value]).await; + } + }); + + let conn = database_connection(); + let glob_pattern = format!("{}/**/data.vortex", tempdir.path().display()); + + // Query with a filter on the partition column + let result = conn + .query(&format!( + "SELECT value, month FROM vortex_scan('{glob_pattern}', hive_partitioning = true) WHERE month = '02'" + )) + .unwrap(); + + let mut values = Vec::new(); + let mut months = Vec::new(); + + for chunk in result { + let value_vec = chunk.get_vector(0); + let value_slice = value_vec.as_slice_with_len::(chunk.len().as_()); + values.extend(value_slice.iter().copied()); + + let mut month_vec = chunk.get_vector(1); + let month_slice = unsafe { month_vec.as_slice_mut::(chunk.len().as_()) }; + for month in month_slice.iter_mut() { + months.push(String::from_duckdb_value(month)); + } + } + + // Only month=02 should be returned + assert_eq!(values, vec![200]); + assert_eq!(months, vec!["02"]); +} + +#[test] +fn test_vortex_scan_without_hive_partitioning() { + // Verify that without hive_partitioning, partition columns are not added + let tempdir = tempfile::tempdir().unwrap(); + + RUNTIME.block_on(async { + let partition_dir = tempdir.path().join("month=01"); + std::fs::create_dir(&partition_dir).unwrap(); + let file_path = partition_dir.join("data.vortex"); + write_vortex_file_to_path(&file_path, "value", buffer![100i32]).await; + }); + + let conn = database_connection(); + let glob_pattern = format!("{}/**/data.vortex", tempdir.path().display()); + + // Query without hive_partitioning - should only have the 'value' column + let result = conn + .query(&format!("SELECT * FROM vortex_scan('{glob_pattern}')")) + .unwrap(); + + let chunk = result.into_iter().next().unwrap(); + + // Should only have 1 column (value), not 2 (value, month) + assert_eq!(chunk.column_count(), 1); +} diff --git a/vortex-duckdb/src/scan.rs b/vortex-duckdb/src/scan.rs index f48ce764fc5..ec72c9f21cf 100644 --- a/vortex-duckdb/src/scan.rs +++ b/vortex-duckdb/src/scan.rs @@ -27,6 +27,8 @@ use vortex::VortexSessionDefault; use vortex::array::ArrayRef; use vortex::array::Canonical; use vortex::array::ExecutionCtx; +use vortex::array::IntoArray; +use vortex::array::arrays::ConstantArray; use vortex::array::arrays::ScalarFnVTable; use vortex::array::arrays::StructArray; use vortex::array::arrays::StructVTable; @@ -47,6 +49,7 @@ use vortex::file::VortexFile; use vortex::file::VortexOpenOptions; use vortex::io::runtime::BlockingRuntime; use vortex::io::runtime::current::ThreadSafeIterator; +use vortex::scalar::Scalar; use vortex::session::VortexSession; use vortex_utils::aliases::hash_set::HashSet; @@ -71,6 +74,15 @@ use crate::exporter::ConversionCache; use crate::utils::glob::expand_glob; use crate::utils::object_store::s3_store; +/// Represents a hive partition column extracted from file paths. +#[derive(Clone, Debug)] +pub struct HivePartitionColumn { + /// The name of the partition column (e.g., "month" from "month=01"). + pub name: String, + /// The values for each file URL in the same order as file_urls. + pub values: Vec, +} + pub struct VortexBindData { first_file: VortexFile, filter_exprs: Vec, @@ -78,6 +90,12 @@ pub struct VortexBindData { column_names: Vec, column_types: Vec, max_threads: u64, + /// Whether hive partitioning is enabled. + hive_partitioning: bool, + /// Hive partition columns extracted from file paths. + hive_partition_columns: Vec, + /// The number of columns that come from the file (excludes partition columns). + file_column_count: usize, } impl Clone for VortexBindData { @@ -91,6 +109,9 @@ impl Clone for VortexBindData { column_names: self.column_names.clone(), column_types: self.column_types.clone(), max_threads: self.max_threads, + hive_partitioning: self.hive_partitioning, + hive_partition_columns: self.hive_partition_columns.clone(), + file_column_count: self.file_column_count, } } } @@ -102,6 +123,9 @@ impl Debug for VortexBindData { .field("column_names", &self.column_names) .field("column_types", &self.column_types) .field("filter_expr", &self.filter_exprs) + .field("hive_partitioning", &self.hive_partitioning) + .field("hive_partition_columns", &self.hive_partition_columns) + .field("file_column_count", &self.file_column_count) .finish() } } @@ -146,9 +170,11 @@ fn extract_schema_from_vortex_file( } /// Creates a projection expression based on the table initialization input. +/// Only includes file columns (excludes hive partition columns). fn extract_projection_expr(init: &TableInitInput) -> Expression { let projection_ids = init.projection_ids().unwrap_or(&[]); let column_ids = init.column_ids(); + let file_column_count = init.bind_data().file_column_count; select( projection_ids @@ -158,6 +184,8 @@ fn extract_projection_expr(init: &TableInitInput) -> Expres let val: usize = column_ids[idx].as_(); val }) + // Only include columns that are file columns (not partition columns) + .filter(|&idx| idx < file_column_count) .map(|idx| { init.bind_data() .column_names @@ -171,27 +199,36 @@ fn extract_projection_expr(init: &TableInitInput) -> Expres } /// Creates a table filter expression from the table filter set. +/// Only includes filters on file columns (excludes hive partition columns). fn extract_table_filter_expr( init: &TableInitInput, - column_ids: &[u64], + _column_ids: &[u64], ) -> VortexResult> { + let file_column_count = init.bind_data().file_column_count; let mut table_filter_exprs: HashSet = if let Some(filter) = init.table_filter_set() { filter .into_iter() - .map(|(idx, ex)| { - let idx_u: usize = idx.as_(); - let col_idx: usize = column_ids[idx_u].as_(); + .filter_map(|(col_idx, ex)| { + // col_idx is the column index in the table + let col_idx_usize: usize = col_idx.as_(); + + // Skip filters on partition columns - they don't exist in the file + // DuckDB will apply these filters on the result set + if col_idx_usize >= file_column_count { + return None; + } + let name = init .bind_data() .column_names - .get(col_idx) + .get(col_idx_usize) .vortex_expect("exists"); - try_from_table_filter( + Some(try_from_table_filter( &ex, &col(name.as_str()), init.bind_data().first_file.dtype(), - ) + )) }) .collect::>>>()? .unwrap_or_else(HashSet::new) @@ -233,6 +270,84 @@ async fn open_file(url: Url, options: VortexOpenOptions) -> VortexResult Vec<(String, String)> { + let path = url.path(); + let mut partitions = Vec::new(); + + for segment in path.split('/') { + if let Some((key, value)) = segment.split_once('=') { + // Only add if both key and value are non-empty + if !key.is_empty() && !value.is_empty() { + partitions.push((key.to_string(), value.to_string())); + } + } + } + + partitions +} + +/// Extracts hive partition columns from a list of file URLs. +/// +/// Returns a list of HivePartitionColumn, where each column has values for all files. +/// All files must have the same partition keys in the same order. +fn extract_hive_partition_columns(file_urls: &[Url]) -> VortexResult> { + if file_urls.is_empty() { + return Ok(vec![]); + } + + // Extract partitions from all files + let all_partitions: Vec> = + file_urls.iter().map(extract_hive_partitions).collect(); + + // Use the first file to determine the partition schema + let first_partitions = &all_partitions[0]; + + // Validate all files have the same partition keys + for (idx, partitions) in all_partitions.iter().enumerate() { + if partitions.len() != first_partitions.len() { + vortex_bail!( + "Hive partition mismatch: file {} has {} partitions but expected {}", + file_urls[idx], + partitions.len(), + first_partitions.len() + ); + } + + for (i, (key, _)) in partitions.iter().enumerate() { + if key != &first_partitions[i].0 { + vortex_bail!( + "Hive partition key mismatch: file {} has key '{}' but expected '{}'", + file_urls[idx], + key, + first_partitions[i].0 + ); + } + } + } + + // Build HivePartitionColumn for each partition key + let mut columns: Vec = first_partitions + .iter() + .map(|(key, _)| HivePartitionColumn { + name: key.clone(), + values: Vec::with_capacity(file_urls.len()), + }) + .collect(); + + // Populate values for each column + for partitions in all_partitions { + for (i, (_, value)) in partitions.into_iter().enumerate() { + columns[i].values.push(value); + } + } + + Ok(columns) +} + impl TableFunction for VortexTableFunction { type BindData = VortexBindData; type GlobalState = VortexGlobalData; @@ -249,6 +364,14 @@ impl TableFunction for VortexTableFunction { vec![LogicalType::varchar()] } + fn named_parameters() -> Vec<(CString, LogicalType)> { + vec![( + #[expect(clippy::unwrap_used, reason = "static string known to be valid")] + CString::new("hive_partitioning").unwrap(), + LogicalType::bool(), + )] + } + fn bind( ctx: &ClientContext, input: &BindInput, @@ -258,6 +381,12 @@ impl TableFunction for VortexTableFunction { .get_parameter(0) .ok_or_else(|| vortex_err!("Missing file glob parameter"))?; + // Read the hive_partitioning named parameter (defaults to false) + let hive_partitioning = input + .get_named_parameter(c"hive_partitioning") + .map(|v| matches!(v.as_ref().extract(), ExtractedValue::Boolean(true))) + .unwrap_or(false); + // Read the vortex_max_threads setting from DuckDB configuration let max_threads_cstr = CString::new("vortex_max_threads") .map_err(|e| vortex_err!("Invalid setting name: {}", e))?; @@ -294,13 +423,31 @@ impl TableFunction for VortexTableFunction { VortexResult::Ok(file) })?; - let (column_names, column_types) = extract_schema_from_vortex_file(&first_file)?; + let (mut column_names, mut column_types) = extract_schema_from_vortex_file(&first_file)?; + + // Track the number of file columns before adding partition columns + let file_column_count = column_names.len(); + + // Extract hive partition columns if enabled + let hive_partition_columns = if hive_partitioning { + extract_hive_partition_columns(&file_urls)? + } else { + vec![] + }; // Add result columns based on the extracted schema. for (column_name, column_type) in column_names.iter().zip(&column_types) { result.add_result_column(column_name, column_type); } + // Add hive partition columns to the result schema + for partition_col in &hive_partition_columns { + // Partition columns are always VARCHAR type + result.add_result_column(&partition_col.name, &LogicalType::varchar()); + column_names.push(partition_col.name.clone()); + column_types.push(LogicalType::varchar()); + } + Ok(VortexBindData { file_urls, first_file, @@ -308,12 +455,15 @@ impl TableFunction for VortexTableFunction { column_names, column_types, max_threads: max_threads as u64, + hive_partitioning, + hive_partition_columns, + file_column_count, }) } fn scan( _client_context: &ClientContext, - _bind_data: &Self::BindData, + bind_data: &Self::BindData, local_state: &mut Self::LocalState, global_state: &mut Self::GlobalState, chunk: &mut DataChunk, @@ -327,7 +477,7 @@ impl TableFunction for VortexTableFunction { let (array_result, conversion_cache) = result?; let array_result = array_result.optimize_recursive()?; - let array_result = if let Some(array) = array_result.as_opt::() { + let mut array_result = if let Some(array) = array_result.as_opt::() { array.clone() } else if let Some(array) = array_result.as_opt::() && let Some(pack_options) = array.scalar_fn().as_opt::() @@ -344,6 +494,26 @@ impl TableFunction for VortexTableFunction { .into_struct() }; + // Append hive partition columns if enabled + if bind_data.hive_partitioning && !bind_data.hive_partition_columns.is_empty() { + #[expect( + clippy::cast_possible_truncation, + reason = "file index fits in usize on supported platforms" + )] + let file_idx = conversion_cache.instance_id() as usize; + let row_count = array_result.len(); + + for partition_col in &bind_data.hive_partition_columns { + let partition_value = &partition_col.values[file_idx]; + let constant_array = + ConstantArray::new(Scalar::from(partition_value.as_str()), row_count); + array_result = array_result.with_column( + partition_col.name.as_str(), + constant_array.into_array(), + )?; + } + } + local_state.exporter = Some(ArrayExporter::try_new( &array_result, &conversion_cache, @@ -471,9 +641,18 @@ impl TableFunction for VortexTableFunction { bind_data: &mut Self::BindData, expr: &duckdb::Expression, ) -> VortexResult { + // Don't push down complex filters when hive partitioning is enabled. + // This avoids issues with filters referencing partition columns that don't + // exist in the file. DuckDB will apply these filters on the result set. + // TODO(joe): Improve this to only skip filters that reference partition columns. + if bind_data.hive_partitioning { + return Ok(false); + } + let Some(expr) = try_from_bound_expression(expr)? else { return Ok(false); }; + bind_data.filter_exprs.push(expr); // It seems like there is a regression in the DuckDB planner we actually delete filters?? // TODO(joe): file and issue and fix.