Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions vortex-duckdb/src/duckdb/table_function/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,12 @@ impl Connection {
.map(|logical_type| logical_type.as_ptr())
.collect::<Vec<_>>();

let param_names = T::named_parameters();
let (param_names_ptrs, param_types_ptr) = param_names
.into_iter()
.map(|(name, logical_type)| (name.as_ptr(), logical_type.as_ptr()))
.unzip::<_, _, Vec<_>, Vec<_>>();
// Keep the named parameters alive while we use their pointers
let named_params = T::named_parameters();
let (named_param_names, named_param_types): (Vec<_>, Vec<_>) =
named_params.into_iter().unzip();
let param_names_ptrs: Vec<_> = named_param_names.iter().map(|n| n.as_ptr()).collect();
let param_types_ptr: Vec<_> = named_param_types.iter().map(|t| t.as_ptr()).collect();

let vtab = cpp::duckdb_vx_tfunc_vtab_t {
name: name.as_ptr(),
Expand Down
243 changes: 243 additions & 0 deletions vortex-duckdb/src/e2e_test/vortex_scan_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,3 +924,246 @@ fn test_vortex_encodings_roundtrip() {
let fixed_child_values = fixed_child.as_slice_with_len::<i32>(10); // 10 total child elements
assert_eq!(fixed_child_values, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
}

/// Helper function to write a vortex file to a specific path
async fn write_vortex_file_to_path(path: &Path, field_name: &str, array: impl IntoArray) {
let struct_array = StructArray::from_fields(&[(field_name, array.into_array())]).unwrap();
let mut file = async_fs::File::create(path).await.unwrap();
SESSION
.write_options()
.write(&mut file, struct_array.to_array_stream())
.await
.unwrap();
}

#[test]
fn test_vortex_scan_hive_partitioning() {
// Create a directory structure with hive-style partitioning:
// tempdir/
// month=01/
// data.vortex (contains value 100)
// month=02/
// data.vortex (contains value 200)
// month=03/
// data.vortex (contains value 300)
let tempdir = tempfile::tempdir().unwrap();

RUNTIME.block_on(async {
for (month, value) in [("01", 100i32), ("02", 200), ("03", 300)] {
let partition_dir = tempdir.path().join(format!("month={month}"));
std::fs::create_dir(&partition_dir).unwrap();
let file_path = partition_dir.join("data.vortex");
write_vortex_file_to_path(&file_path, "value", buffer![value]).await;
}
});

let conn = database_connection();
let glob_pattern = format!("{}/**/data.vortex", tempdir.path().display());

// Query with hive_partitioning enabled
let result = conn
.query(&format!(
"SELECT value, month FROM read_vortex('{glob_pattern}', hive_partitioning = true) ORDER BY value"
))
.unwrap();

let mut values = Vec::new();
let mut months = Vec::new();

for chunk in result {
let value_vec = chunk.get_vector(0);
let value_slice = value_vec.as_slice_with_len::<i32>(chunk.len().as_());
values.extend(value_slice.iter().copied());

let mut month_vec = chunk.get_vector(1);
let month_slice = unsafe { month_vec.as_slice_mut::<duckdb_string_t>(chunk.len().as_()) };
for month in month_slice.iter_mut() {
months.push(String::from_duckdb_value(month));
}
}

// Verify the values and months are correct and matched
assert_eq!(values, vec![100, 200, 300]);
assert_eq!(months, vec!["01", "02", "03"]);
}

#[test]
fn test_vortex_scan_hive_partitioning_multiple_partitions() {
// Create a directory structure with multiple hive partition levels:
// tempdir/
// year=2023/
// month=01/
// data.vortex (contains value 1)
// year=2023/
// month=02/
// data.vortex (contains value 2)
// year=2024/
// month=01/
// data.vortex (contains value 3)
let tempdir = tempfile::tempdir().unwrap();

RUNTIME.block_on(async {
for (year, month, value) in [("2023", "01", 1i32), ("2023", "02", 2), ("2024", "01", 3)] {
let partition_dir = tempdir.path().join(format!("year={year}/month={month}"));
std::fs::create_dir_all(&partition_dir).unwrap();
let file_path = partition_dir.join("data.vortex");
write_vortex_file_to_path(&file_path, "value", buffer![value]).await;
}
});

let conn = database_connection();
let glob_pattern = format!("{}/**/data.vortex", tempdir.path().display());

// Query with hive_partitioning enabled and filter by year
let result = conn
.query(&format!(
"SELECT value, year, month FROM vortex_scan('{glob_pattern}', hive_partitioning = true) ORDER BY value"
))
.unwrap();

let mut values = Vec::new();
let mut years = Vec::new();
let mut months = Vec::new();

for chunk in result {
let value_vec = chunk.get_vector(0);
let value_slice = value_vec.as_slice_with_len::<i32>(chunk.len().as_());
values.extend(value_slice.iter().copied());

let mut year_vec = chunk.get_vector(1);
let year_slice = unsafe { year_vec.as_slice_mut::<duckdb_string_t>(chunk.len().as_()) };
for year in year_slice.iter_mut() {
years.push(String::from_duckdb_value(year));
}

let mut month_vec = chunk.get_vector(2);
let month_slice = unsafe { month_vec.as_slice_mut::<duckdb_string_t>(chunk.len().as_()) };
for month in month_slice.iter_mut() {
months.push(String::from_duckdb_value(month));
}
}

// Verify all partition columns are correctly extracted
assert_eq!(values, vec![1, 2, 3]);
assert_eq!(years, vec!["2023", "2023", "2024"]);
assert_eq!(months, vec!["01", "02", "01"]);
}

#[test]
fn test_vortex_scan_hive_partitioning_filter_on_file_column() {
// Test that we can filter on file columns when using hive partitioning
let tempdir = tempfile::tempdir().unwrap();

RUNTIME.block_on(async {
for (month, value) in [("01", 100i32), ("02", 200), ("03", 300)] {
let partition_dir = tempdir.path().join(format!("month={month}"));
std::fs::create_dir(&partition_dir).unwrap();
let file_path = partition_dir.join("data.vortex");
write_vortex_file_to_path(&file_path, "value", buffer![value]).await;
}
});

let conn = database_connection();
let glob_pattern = format!("{}/**/data.vortex", tempdir.path().display());

// Query with a filter on a file column (not partition column)
let result = conn
.query(&format!(
"SELECT value, month FROM vortex_scan('{glob_pattern}', hive_partitioning = true) WHERE value = 200"
))
.unwrap();

let mut values = Vec::new();
let mut months = Vec::new();

for chunk in result {
let value_vec = chunk.get_vector(0);
let value_slice = value_vec.as_slice_with_len::<i32>(chunk.len().as_());
values.extend(value_slice.iter().copied());

let mut month_vec = chunk.get_vector(1);
let month_slice = unsafe { month_vec.as_slice_mut::<duckdb_string_t>(chunk.len().as_()) };
for month in month_slice.iter_mut() {
months.push(String::from_duckdb_value(month));
}
}

// Only value=200 (month=02) should be returned
assert_eq!(values, vec![200]);
assert_eq!(months, vec!["02"]);
}

// TODO: Filtering on partition columns is not yet supported.
// The filter pushdown mechanism passes partition column filters incorrectly.
// For now, users can filter on partition columns by using a subquery:
// SELECT * FROM (SELECT ... FROM vortex_scan(...)) WHERE partition_col = 'value'
#[test]
#[ignore = "partition column filter pushdown not yet supported"]
fn test_vortex_scan_hive_partitioning_filter_on_partition_column() {
// Test that we can filter on hive partition columns
let tempdir = tempfile::tempdir().unwrap();

RUNTIME.block_on(async {
for (month, value) in [("01", 100i32), ("02", 200), ("03", 300)] {
let partition_dir = tempdir.path().join(format!("month={month}"));
std::fs::create_dir(&partition_dir).unwrap();
let file_path = partition_dir.join("data.vortex");
write_vortex_file_to_path(&file_path, "value", buffer![value]).await;
}
});

let conn = database_connection();
let glob_pattern = format!("{}/**/data.vortex", tempdir.path().display());

// Query with a filter on the partition column
let result = conn
.query(&format!(
"SELECT value, month FROM vortex_scan('{glob_pattern}', hive_partitioning = true) WHERE month = '02'"
))
.unwrap();

let mut values = Vec::new();
let mut months = Vec::new();

for chunk in result {
let value_vec = chunk.get_vector(0);
let value_slice = value_vec.as_slice_with_len::<i32>(chunk.len().as_());
values.extend(value_slice.iter().copied());

let mut month_vec = chunk.get_vector(1);
let month_slice = unsafe { month_vec.as_slice_mut::<duckdb_string_t>(chunk.len().as_()) };
for month in month_slice.iter_mut() {
months.push(String::from_duckdb_value(month));
}
}

// Only month=02 should be returned
assert_eq!(values, vec![200]);
assert_eq!(months, vec!["02"]);
}

#[test]
fn test_vortex_scan_without_hive_partitioning() {
// Verify that without hive_partitioning, partition columns are not added
let tempdir = tempfile::tempdir().unwrap();

RUNTIME.block_on(async {
let partition_dir = tempdir.path().join("month=01");
std::fs::create_dir(&partition_dir).unwrap();
let file_path = partition_dir.join("data.vortex");
write_vortex_file_to_path(&file_path, "value", buffer![100i32]).await;
});

let conn = database_connection();
let glob_pattern = format!("{}/**/data.vortex", tempdir.path().display());

// Query without hive_partitioning - should only have the 'value' column
let result = conn
.query(&format!("SELECT * FROM vortex_scan('{glob_pattern}')"))
.unwrap();

let chunk = result.into_iter().next().unwrap();

// Should only have 1 column (value), not 2 (value, month)
assert_eq!(chunk.column_count(), 1);
}
Loading
Loading