From 77608d43a30a9940f358e7dfcc7dcafc741e3d8b Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Mon, 12 Jan 2026 15:58:24 +0000 Subject: [PATCH 01/12] io bench Signed-off-by: Onur Satici --- Cargo.lock | 2 + vortex-bench/Cargo.toml | 2 + vortex-bench/src/bin/scan_io_bench.rs | 600 ++++++++++++++++++++++++++ vortex-layout/src/layout.rs | 25 ++ vortex-scan/src/scan_builder.rs | 2 +- 5 files changed, 630 insertions(+), 1 deletion(-) create mode 100644 vortex-bench/src/bin/scan_io_bench.rs diff --git a/Cargo.lock b/Cargo.lock index b619866ea58..2e63f32b6c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10152,6 +10152,7 @@ dependencies = [ "mimalloc", "noodles-bgzf", "noodles-vcf", + "object_store", "parking_lot", "parquet 57.2.0", "rand 0.9.2", @@ -10173,6 +10174,7 @@ dependencies = [ "url", "uuid", "vortex", + "vortex-scan", ] [[package]] diff --git a/vortex-bench/Cargo.toml b/vortex-bench/Cargo.toml index aee90d34b77..8ee74f66689 100644 --- a/vortex-bench/Cargo.toml +++ b/vortex-bench/Cargo.toml @@ -32,6 +32,7 @@ humansize = { workspace = true } indicatif = { workspace = true, features = ["futures"] } itertools = { workspace = true } mimalloc = { workspace = true } +object_store = { workspace = true, features = ["aws", "http", "fs"] } noodles-bgzf = { workspace = true, features = ["async"] } noodles-vcf = { workspace = true, features = ["async"] } parking_lot = { workspace = true } @@ -64,3 +65,4 @@ vortex = { workspace = true, features = [ "zstd", "unstable_encodings", ] } +vortex-scan = { workspace = true } diff --git a/vortex-bench/src/bin/scan_io_bench.rs b/vortex-bench/src/bin/scan_io_bench.rs new file mode 100644 index 00000000000..356d9902b25 --- /dev/null +++ b/vortex-bench/src/bin/scan_io_bench.rs @@ -0,0 +1,600 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::path::Path; +use std::path::PathBuf; +use std::time::Instant; + +use anyhow::Result; +use clap::Parser; +use clap::ValueEnum; +use futures::StreamExt; +use futures::TryStreamExt; +use object_store::ObjectStore; +use object_store::ObjectStoreScheme; +use object_store::aws::AmazonS3Builder; +use object_store::http::HttpBuilder; +use object_store::local::LocalFileSystem; +use object_store::path::Path as ObjectStorePath; +use url::Url; +use vortex::array::Array; +use vortex::array::MaskFuture; +use vortex::array::expr::Expression; +use vortex::array::expr::col; +use vortex::array::expr::eq; +use vortex::array::expr::gt; +use vortex::array::expr::gt_eq; +use vortex::array::expr::lit; +use vortex::array::expr::lt; +use vortex::array::expr::lt_eq; +use vortex::array::expr::not_eq; +use vortex::array::expr::root; +use vortex::array::expr::select; +use vortex::dtype::FieldNames; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex::file::OpenOptionsSessionExt; +use vortex::layout::collect_segment_ids; +use vortex::layout::LayoutReader; +use vortex::mask::Mask; +use vortex::metrics::VortexMetrics; +use parking_lot::Mutex; +use vortex_bench::SESSION; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use tracing_subscriber::EnvFilter; +use vortex_scan::ScanBuilder; + +#[derive(Parser, Debug)] +#[command(version, about = "Benchmark Vortex scans over local files vs object stores")] +struct Args { + /// File path, directory, or object store URL (e.g. file:/..., s3://bucket/path, https://host/path) + #[arg(long)] + source: String, + /// Use object_store even for file: URLs + #[arg(long, default_value_t = false)] + force_object_store: bool, + /// Run a predefined scan shape. + #[arg(long, value_enum)] + preset: Option, + /// Projection field names (comma-separated). + #[arg(long, value_delimiter = ',')] + projection: Option>, + /// Filter column name. + #[arg(long)] + filter_col: Option, + /// Filter operator. + #[arg(long, value_enum)] + filter_op: Option, + /// Filter literal value (integer). + #[arg(long)] + filter_value: Option, + /// Filter literal type. + #[arg(long, value_enum, default_value_t = LiteralType::I64)] + filter_type: LiteralType, + /// Number of scan iterations. + #[arg(long, default_value_t = 1)] + iterations: usize, + /// Scan concurrency (tasks per thread). + #[arg(long, default_value_t = 4)] + concurrency: usize, + /// Max files scanned in parallel (file-level readahead). + #[arg(long, default_value_t = 1)] + file_concurrency: usize, + /// Reopen the file for each iteration to avoid caching effects. + #[arg(long, default_value_t = false)] + reopen: bool, + /// Which scan path to use. + #[arg(long, value_enum, default_value_t = ScanMode::Full)] + mode: ScanMode, + /// Only read segments and drop buffers (skip decode/projection). + #[arg(long, default_value_t = false)] + io_only: bool, + /// Only prune whole segments (no intra-segment pruning on CPU). + #[arg(long, default_value_t = false)] + prune_segments: bool, +} + +#[derive(ValueEnum, Clone, Debug)] +enum ScanMode { + /// Read segments only (no decode). + Io, + /// Decode arrays without filter evaluation. + Decode, + /// Decode arrays with full filter/projection evaluation. + Full, +} + +#[derive(ValueEnum, Clone, Debug)] +enum Preset { + /// ClickBench query #2: AdvEngineID != 0, projecting AdvEngineID. + Clickbench2, +} + +#[derive(ValueEnum, Clone, Debug)] +enum FilterOp { + Eq, + Neq, + Gt, + Gte, + Lt, + Lte, +} + +#[derive(ValueEnum, Clone, Debug, Copy)] +enum LiteralType { + I16, + I32, + I64, +} + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + let args = Args::parse(); + let mode = if args.io_only { ScanMode::Io } else { args.mode.clone() }; + + let (projection, filter) = build_scan_exprs(&args)?; + let metrics = VortexMetrics::new_with_tags([("bench", "scan-io")]); + let read_bytes = metrics.counter("vortex.io.read.total_size"); + + let targets = resolve_targets(&args).await?; + let cached_files = if args.reopen { + None + } else { + Some(std::sync::Arc::new( + open_all_targets(&targets, metrics.clone(), args.file_concurrency).await?, + )) + }; + let mut total_rows = 0usize; + let mut total_elapsed = 0.0f64; + let mut total_bytes = 0i64; + let mut total_first_latency = 0.0f64; + let mut total_first_bytes = 0i64; + + for _ in 0..args.iterations { + read_bytes.clear(); + + let start = Instant::now(); + let bytes_before = read_bytes.count(); + let first_seen = std::sync::Arc::new(AtomicBool::new(false)); + let first_info = std::sync::Arc::new(Mutex::new(None::<(f64, i64)>)); + + let rows = futures::stream::iter(targets.iter().enumerate()) + .map(|(idx, target)| { + let cached_files = cached_files.clone(); + let projection = projection.clone(); + let filter = filter.clone(); + let metrics = metrics.clone(); + let read_bytes = read_bytes.clone(); + let first_seen = first_seen.clone(); + let first_info = first_info.clone(); + let mode = mode.clone(); + async move { + let file = match &cached_files { + Some(files) => files[idx].clone(), + None => open_vortex_file_for_target(target, metrics.clone()).await?, + }; + + if args.prune_segments + && let Some(filter) = filter.as_ref() + && file.can_prune(filter)? + { + return Ok::<_, anyhow::Error>(0); + } + + if matches!(mode, ScanMode::Io) { + read_all_segments(&file, args.concurrency).await?; + if !first_seen.load(Ordering::Relaxed) + && !first_seen.swap(true, Ordering::Relaxed) + { + let latency = start.elapsed().as_secs_f64(); + let bytes = read_bytes.count() - bytes_before; + *first_info.lock() = Some((latency, bytes)); + } + let file_rows = usize::try_from(file.row_count()) + .map_err(|_| anyhow::anyhow!("row_count exceeds usize"))?; + drop(file); + return Ok::<_, anyhow::Error>(file_rows); + } + + let (scan_projection, scan_filter, bypass_filter) = match mode { + ScanMode::Decode => { + let scan_filter = if args.prune_segments { + filter.clone() + } else { + None + }; + (root(), scan_filter, true) + } + ScanMode::Full => (projection.clone(), filter.clone(), false), + ScanMode::Io => unreachable!("io-only handled above"), + }; + + let layout_reader = file.layout_reader()?; + let layout_reader = if args.prune_segments || bypass_filter { + std::sync::Arc::new(BenchLayoutReader::new( + layout_reader, + args.prune_segments, + bypass_filter, + )) as std::sync::Arc + } else { + layout_reader + }; + + let scan = ScanBuilder::new(SESSION.clone(), layout_reader) + .with_metrics(metrics.clone()) + .with_projection(scan_projection) + .with_some_filter(scan_filter) + .with_concurrency(args.concurrency) + .map(|array| Ok(array.len())); + + let mut stream = scan.into_stream()?; + let mut file_rows = 0usize; + while let Some(rows) = stream.try_next().await? { + if !first_seen.load(Ordering::Relaxed) + && !first_seen.swap(true, Ordering::Relaxed) + { + let latency = start.elapsed().as_secs_f64(); + let bytes = read_bytes.count() - bytes_before; + *first_info.lock() = Some((latency, bytes)); + } + file_rows += rows; + } + + drop(file); + Ok::<_, anyhow::Error>(file_rows) + } + }) + .buffer_unordered(args.file_concurrency.max(1)) + .try_fold(0usize, |rows, file_rows| async move { Ok(rows + file_rows) }) + .await?; + + let elapsed = start.elapsed().as_secs_f64(); + let bytes = read_bytes.count(); + + total_rows += rows; + total_elapsed += elapsed; + total_bytes += bytes; + let (iter_first_latency, iter_first_bytes) = + first_info.lock().unwrap_or((elapsed, read_bytes.count() - bytes_before)); + total_first_latency += iter_first_latency; + total_first_bytes += iter_first_bytes; + + } + + let avg_elapsed = total_elapsed / args.iterations as f64; + let avg_bytes = total_bytes as f64 / args.iterations as f64; + let avg_first_latency = total_first_latency / args.iterations as f64; + let avg_first_bytes = total_first_bytes as f64 / args.iterations as f64; + let steady_bytes = (avg_bytes - avg_first_bytes).max(0.0); + let steady_time = (avg_elapsed - avg_first_latency).max(0.0); + let total_mb_s = if avg_elapsed > 0.0 { + avg_bytes / (1024.0 * 1024.0) / avg_elapsed + } else { + 0.0 + }; + let steady_mb_s = if steady_time > 0.0 { + steady_bytes / (1024.0 * 1024.0) / steady_time + } else { + 0.0 + }; + + println!("files={}", targets.len()); + println!("rows={}", total_rows / args.iterations); + println!("avg_time_s={:.3}", avg_elapsed); + println!("avg_bytes={:.0}", avg_bytes); + println!("avg_mb_s={:.2}", total_mb_s); + println!("avg_first_latency_ms={:.2}", avg_first_latency * 1000.0); + println!("steady_mb_s={:.2}", steady_mb_s); + + Ok(()) +} + +fn build_scan_exprs(args: &Args) -> VortexResult<(Expression, Option)> { + if let Some(preset) = &args.preset { + return build_preset_exprs(preset); + } + + let projection = match &args.projection { + Some(fields) if !fields.is_empty() => { + let names = FieldNames::from_iter(fields.iter().map(|s| s.as_str())); + select(names, root()) + } + _ => root(), + }; + + let filter = match (&args.filter_col, &args.filter_op, args.filter_value) { + (Some(col_name), Some(op), Some(value)) => { + let lhs = col(col_name.as_str()); + let rhs = match args.filter_type { + LiteralType::I16 => lit( + i16::try_from(value).map_err(|_| vortex_err!("filter_value does not fit in i16"))?, + ), + LiteralType::I32 => lit( + i32::try_from(value).map_err(|_| vortex_err!("filter_value does not fit in i32"))?, + ), + LiteralType::I64 => lit(value), + }; + Some(apply_filter_op(op.clone(), lhs, rhs)) + } + _ => None, + }; + + Ok((projection, filter)) +} + +fn build_preset_exprs(preset: &Preset) -> VortexResult<(Expression, Option)> { + match preset { + Preset::Clickbench2 => { + let projection = select(["AdvEngineID"], root()); + let filter = not_eq(col("AdvEngineID"), lit(0_i16)); + Ok((projection, Some(filter))) + } + } +} + +fn apply_filter_op(op: FilterOp, lhs: Expression, rhs: Expression) -> Expression { + match op { + FilterOp::Eq => eq(lhs, rhs), + FilterOp::Neq => not_eq(lhs, rhs), + FilterOp::Gt => gt(lhs, rhs), + FilterOp::Gte => gt_eq(lhs, rhs), + FilterOp::Lt => lt(lhs, rhs), + FilterOp::Lte => lt_eq(lhs, rhs), + } +} + +#[derive(Clone)] +enum ScanTarget { + Local(PathBuf), + ObjectStore { + store: std::sync::Arc, + path: ObjectStorePath, + }, +} + +async fn resolve_targets(args: &Args) -> Result> { + let source = &args.source; + + if let Ok(url) = Url::parse(source) { + if url.scheme() == "file" && !args.force_object_store { + let path = url + .to_file_path() + .map_err(|_| anyhow::anyhow!("Invalid file URL: {source}"))?; + return Ok(resolve_local_targets(&path)); + } + + let (scheme, store, path) = object_store_from_url(source)?; + if is_prefix(source) { + if matches!(scheme, ObjectStoreScheme::Http) { + anyhow::bail!("HTTP object stores do not support listing prefixes"); + } + let mut entries = store.list(Some(&path)); + let mut targets = Vec::new(); + while let Some(entry) = entries.try_next().await? { + targets.push(ScanTarget::ObjectStore { + store: store.clone(), + path: entry.location.clone(), + }); + } + return Ok(targets); + } + + return Ok(vec![ScanTarget::ObjectStore { + store, + path, + }]); + } + + let path = PathBuf::from(source); + Ok(resolve_local_targets(&path)) +} + +fn resolve_local_targets(path: &Path) -> Vec { + if path.is_dir() { + let mut entries = match std::fs::read_dir(path) { + Ok(entries) => entries + .filter_map(|entry| entry.ok()) + .map(|entry| entry.path()) + .filter(|entry| entry.extension().is_some_and(|e| e == "vortex")) + .collect::>(), + Err(_) => Vec::new(), + }; + entries.sort(); + entries.into_iter().map(ScanTarget::Local).collect() + } else { + vec![ScanTarget::Local(path.to_path_buf())] + } +} + +fn is_prefix(source: &str) -> bool { + source.ends_with('/') +} + +async fn open_vortex_file_for_target( + target: &ScanTarget, + metrics: VortexMetrics, +) -> Result { + let session = SESSION.clone(); + match target { + ScanTarget::Local(path) => Ok(session + .open_options() + .with_metrics(metrics) + .open(path.clone()) + .await?), + ScanTarget::ObjectStore { store, path } => { + let path_str = path.to_string(); + Ok(session + .open_options() + .with_metrics(metrics) + .open_object_store(store, &path_str) + .await?) + } + } +} + +async fn open_all_targets( + targets: &[ScanTarget], + metrics: VortexMetrics, + concurrency: usize, +) -> Result> { + let mut files = vec![None; targets.len()]; + let results = futures::stream::iter(targets.iter().enumerate()) + .map(|(idx, target)| { + let metrics = metrics.clone(); + async move { + let file = open_vortex_file_for_target(target, metrics).await?; + Ok::<_, anyhow::Error>((idx, file)) + } + }) + .buffer_unordered(concurrency.max(1)) + .try_collect::>() + .await?; + + for (idx, file) in results { + files[idx] = Some(file); + } + + files + .into_iter() + .map(|file| file.ok_or_else(|| anyhow::anyhow!("file open missing"))) + .collect() +} + +fn object_store_from_url( + url_str: &str, +) -> Result<(ObjectStoreScheme, std::sync::Arc, ObjectStorePath)> { + let url = Url::parse(url_str)?; + let (scheme, path) = ObjectStoreScheme::parse(&url).map_err(object_store::Error::from)?; + let store: std::sync::Arc = match scheme { + ObjectStoreScheme::Local => std::sync::Arc::new(LocalFileSystem::default()), + ObjectStoreScheme::AmazonS3 => { + std::sync::Arc::new(AmazonS3Builder::from_env().with_url(url_str).build()?) + } + ObjectStoreScheme::Http => std::sync::Arc::new( + HttpBuilder::new() + .with_url(&url[..url::Position::BeforePath]) + .build()?, + ), + otherwise => anyhow::bail!("unsupported object store scheme: {otherwise:?}"), + }; + + Ok((scheme, store, path)) +} + +async fn read_all_segments( + file: &vortex::file::VortexFile, + concurrency: usize, +) -> Result<()> { + let layout = file.footer().layout().clone(); + let segment_ids = collect_segment_ids(&layout)?; + let segment_source = file.segment_source(); + + futures::stream::iter(segment_ids) + .map(|segment_id| { + let segment_source = segment_source.clone(); + async move { + let buffer = segment_source.request(segment_id).await?; + drop(buffer); + Ok::<_, anyhow::Error>(()) + } + }) + .buffer_unordered(concurrency.max(1)) + .try_collect::>() + .await?; + + Ok(()) +} + +#[derive(Clone)] +struct BenchLayoutReader { + inner: std::sync::Arc, + segment_pruning: bool, + bypass_filter: bool, +} + +impl BenchLayoutReader { + fn new( + inner: std::sync::Arc, + segment_pruning: bool, + bypass_filter: bool, + ) -> Self { + Self { + inner, + segment_pruning, + bypass_filter, + } + } +} + +impl LayoutReader for BenchLayoutReader { + fn name(&self) -> &std::sync::Arc { + self.inner.name() + } + + fn dtype(&self) -> &vortex::dtype::DType { + self.inner.dtype() + } + + fn row_count(&self) -> u64 { + self.inner.row_count() + } + + fn register_splits( + &self, + field_mask: &[vortex::dtype::FieldMask], + row_range: &std::ops::Range, + splits: &mut std::collections::BTreeSet, + ) -> VortexResult<()> { + self.inner.register_splits(field_mask, row_range, splits) + } + + fn pruning_evaluation( + &self, + row_range: &std::ops::Range, + expr: &Expression, + mask: Mask, + ) -> VortexResult { + if !self.segment_pruning { + return self.inner.pruning_evaluation(row_range, expr, mask); + } + + let len = mask.len(); + let fut = self.inner.pruning_evaluation(row_range, expr, mask)?; + Ok(MaskFuture::new(len, async move { + let mask = fut.await?; + if mask.all_false() { + Ok(mask) + } else { + Ok(Mask::new_true(len)) + } + })) + } + + fn filter_evaluation( + &self, + row_range: &std::ops::Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult { + if self.bypass_filter { + Ok(mask) + } else { + self.inner.filter_evaluation(row_range, expr, mask) + } + } + + fn projection_evaluation( + &self, + row_range: &std::ops::Range, + expr: &Expression, + mask: MaskFuture, + ) -> VortexResult>> + { + self.inner.projection_evaluation(row_range, expr, mask) + } +} diff --git a/vortex-layout/src/layout.rs b/vortex-layout/src/layout.rs index 368d7c6bdc8..f654eb612e1 100644 --- a/vortex-layout/src/layout.rs +++ b/vortex-layout/src/layout.rs @@ -23,6 +23,7 @@ use crate::LayoutReaderRef; use crate::VTable; use crate::display::DisplayLayoutTree; use crate::display::display_tree_with_segment_sizes; +use crate::layouts::flat::FlatVTable; use crate::segments::SegmentId; use crate::segments::SegmentSource; @@ -230,6 +231,30 @@ impl dyn Layout + '_ { } } +/// Collect all segment IDs that should be fetched for a layout tree. +pub fn collect_segment_ids(layout: &LayoutRef) -> VortexResult> { + let mut segment_ids = Vec::new(); + collect_segments_to_fetch(layout, &mut segment_ids)?; + segment_ids.sort(); + segment_ids.dedup(); + Ok(segment_ids) +} + +fn collect_segments_to_fetch(layout: &LayoutRef, segment_ids: &mut Vec) -> VortexResult<()> { + if let Some(flat_layout) = layout.as_opt::() { + if flat_layout.array_tree().is_none() { + segment_ids.push(flat_layout.segment_id()); + } + } else { + segment_ids.extend(layout.segment_ids()); + } + + for child in layout.children()? { + collect_segments_to_fetch(&child, segment_ids)?; + } + Ok(()) +} + /// Display the encoding, dtype, row count, and segment IDs of this layout. impl Display for dyn Layout + '_ { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { diff --git a/vortex-scan/src/scan_builder.rs b/vortex-scan/src/scan_builder.rs index a8b562d134b..7772eb68ea5 100644 --- a/vortex-scan/src/scan_builder.rs +++ b/vortex-scan/src/scan_builder.rs @@ -646,7 +646,7 @@ mod test { let mut values = Vec::new(); for chunk in &mut iter { - values.push(chunk?.to_primitive().into_buffer::()[0]); + values.push(chunk?.to_primitive().as_slice::()[0]); } assert_eq!(calls.load(Ordering::Relaxed), 1); From deea96b8fe79787bbb6f6ee6641b820be6f8d898 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Fri, 16 Jan 2026 13:55:26 +0000 Subject: [PATCH 02/12] clippy Signed-off-by: Onur Satici --- vortex-bench/src/bin/scan_io_bench.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-bench/src/bin/scan_io_bench.rs b/vortex-bench/src/bin/scan_io_bench.rs index 356d9902b25..86f61c720f2 100644 --- a/vortex-bench/src/bin/scan_io_bench.rs +++ b/vortex-bench/src/bin/scan_io_bench.rs @@ -424,7 +424,7 @@ async fn open_vortex_file_for_target( ScanTarget::Local(path) => Ok(session .open_options() .with_metrics(metrics) - .open(path.clone()) + .open_path(path) .await?), ScanTarget::ObjectStore { store, path } => { let path_str = path.to_string(); From 0170ff1ec54e40e2c02c98efcd6d449bd2cebf81 Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Fri, 16 Jan 2026 14:56:56 +0000 Subject: [PATCH 03/12] repeats don't wait for the prev iteration Signed-off-by: Onur Satici --- vortex-bench/src/bin/scan_io_bench.rs | 55 ++++++++++----------------- 1 file changed, 21 insertions(+), 34 deletions(-) diff --git a/vortex-bench/src/bin/scan_io_bench.rs b/vortex-bench/src/bin/scan_io_bench.rs index 86f61c720f2..1a85004688f 100644 --- a/vortex-bench/src/bin/scan_io_bench.rs +++ b/vortex-bench/src/bin/scan_io_bench.rs @@ -149,22 +149,17 @@ async fn main() -> Result<()> { open_all_targets(&targets, metrics.clone(), args.file_concurrency).await?, )) }; - let mut total_rows = 0usize; - let mut total_elapsed = 0.0f64; - let mut total_bytes = 0i64; - let mut total_first_latency = 0.0f64; - let mut total_first_bytes = 0i64; - - for _ in 0..args.iterations { - read_bytes.clear(); - - let start = Instant::now(); - let bytes_before = read_bytes.count(); - let first_seen = std::sync::Arc::new(AtomicBool::new(false)); - let first_info = std::sync::Arc::new(Mutex::new(None::<(f64, i64)>)); - - let rows = futures::stream::iter(targets.iter().enumerate()) - .map(|(idx, target)| { + read_bytes.clear(); + + let start = Instant::now(); + let bytes_before = read_bytes.count(); + let first_seen = std::sync::Arc::new(AtomicBool::new(false)); + let first_info = std::sync::Arc::new(Mutex::new(None::<(f64, i64)>)); + let targets = targets.clone(); + + let rows = futures::stream::iter(0..args.iterations) + .flat_map(|_| futures::stream::iter(targets.clone().into_iter().enumerate())) + .map(|(idx, target)| { let cached_files = cached_files.clone(); let projection = projection.clone(); let filter = filter.clone(); @@ -176,7 +171,7 @@ async fn main() -> Result<()> { async move { let file = match &cached_files { Some(files) => files[idx].clone(), - None => open_vortex_file_for_target(target, metrics.clone()).await?, + None => open_vortex_file_for_target(&target, metrics.clone()).await?, }; if args.prune_segments @@ -253,23 +248,15 @@ async fn main() -> Result<()> { .try_fold(0usize, |rows, file_rows| async move { Ok(rows + file_rows) }) .await?; - let elapsed = start.elapsed().as_secs_f64(); - let bytes = read_bytes.count(); - - total_rows += rows; - total_elapsed += elapsed; - total_bytes += bytes; - let (iter_first_latency, iter_first_bytes) = - first_info.lock().unwrap_or((elapsed, read_bytes.count() - bytes_before)); - total_first_latency += iter_first_latency; - total_first_bytes += iter_first_bytes; - - } + let elapsed = start.elapsed().as_secs_f64(); + let bytes = read_bytes.count(); + let (first_latency, first_bytes) = + first_info.lock().unwrap_or((elapsed, read_bytes.count() - bytes_before)); - let avg_elapsed = total_elapsed / args.iterations as f64; - let avg_bytes = total_bytes as f64 / args.iterations as f64; - let avg_first_latency = total_first_latency / args.iterations as f64; - let avg_first_bytes = total_first_bytes as f64 / args.iterations as f64; + let avg_elapsed = elapsed / args.iterations as f64; + let avg_bytes = bytes as f64 / args.iterations as f64; + let avg_first_latency = first_latency / args.iterations as f64; + let avg_first_bytes = first_bytes as f64 / args.iterations as f64; let steady_bytes = (avg_bytes - avg_first_bytes).max(0.0); let steady_time = (avg_elapsed - avg_first_latency).max(0.0); let total_mb_s = if avg_elapsed > 0.0 { @@ -284,7 +271,7 @@ async fn main() -> Result<()> { }; println!("files={}", targets.len()); - println!("rows={}", total_rows / args.iterations); + println!("rows={}", rows / args.iterations); println!("avg_time_s={:.3}", avg_elapsed); println!("avg_bytes={:.0}", avg_bytes); println!("avg_mb_s={:.2}", total_mb_s); From 11fad54609518fa5b13b25f089bc2ad7c4204a3a Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Tue, 20 Jan 2026 11:23:09 +0000 Subject: [PATCH 04/12] cuda pinned buffer pool Signed-off-by: Onur Satici --- Cargo.lock | 1 + vortex-cuda/Cargo.toml | 1 + vortex-cuda/src/lib.rs | 3 + vortex-cuda/src/pinned.rs | 121 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 126 insertions(+) create mode 100644 vortex-cuda/src/pinned.rs diff --git a/Cargo.lock b/Cargo.lock index 2e63f32b6c9..10c402b80a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10268,6 +10268,7 @@ dependencies = [ "async-trait", "criterion", "cudarc", + "parking_lot", "tokio", "tracing", "vortex-array", diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index cf822f1ec02..1aa405f8d58 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -19,6 +19,7 @@ workspace = true [dependencies] async-trait = { workspace = true } cudarc = { workspace = true } +parking_lot = { workspace = true } tracing = { workspace = true } vortex-array = { workspace = true } vortex-buffer = { workspace = true } diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index ca798939493..1d87c8b8dc8 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -6,12 +6,15 @@ pub mod executor; mod for_; mod kernel; +pub mod pinned; mod session; use std::process::Command; pub use executor::CudaExecutionCtx; pub use executor::CudaKernelEvents; +pub use pinned::PinnedByteBuffer; +pub use pinned::PinnedByteBufferPool; use for_::ForExecutor; pub use session::CudaSession; diff --git a/vortex-cuda/src/pinned.rs b/vortex-cuda/src/pinned.rs new file mode 100644 index 00000000000..6e88cb4ec97 --- /dev/null +++ b/vortex-cuda/src/pinned.rs @@ -0,0 +1,121 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use cudarc::driver::CudaContext; +use cudarc::driver::PinnedHostSlice; +use parking_lot::Mutex; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_utils::aliases::hash_map::HashMap; + +/// A page-locked host buffer allocated by CUDA. +/// +/// This is intended as a staging buffer for H2D transfers. Contents are uninitialized after +/// allocation. +pub struct PinnedByteBuffer { + inner: PinnedHostSlice, +} + +impl PinnedByteBuffer { + /// Allocate a pinned host buffer with uninitialized contents. + /// + /// # Safety + /// The returned buffer's contents are uninitialized. The caller must initialize before read. + pub unsafe fn uninit(ctx: &Arc, len: usize) -> VortexResult { + let inner = unsafe { + ctx.alloc_pinned::(len) + .map_err(|e| vortex_err!("failed to allocate pinned host buffer: {e}"))? + }; + Ok(Self { inner }) + } + + /// Returns the length of the buffer in bytes. + pub fn len(&self) -> usize { + self.inner.len() + } + + /// Returns true if the buffer is empty. + pub fn is_empty(&self) -> bool { + self.inner.is_empty() + } + + /// Returns the buffer as an immutable slice. + pub fn as_slice(&self) -> VortexResult<&[u8]> { + self.inner + .as_slice() + .map_err(|e| vortex_err!("failed to access pinned host buffer: {e}")) + } + + /// Returns the buffer as a mutable slice. + pub fn as_mut_slice(&mut self) -> VortexResult<&mut [u8]> { + self.inner + .as_mut_slice() + .map_err(|e| vortex_err!("failed to access pinned host buffer: {e}")) + } + + /// Returns a raw pointer to the buffer. + pub fn as_ptr(&self) -> VortexResult<*const u8> { + self.inner + .as_ptr() + .map_err(|e| vortex_err!("failed to access pinned host buffer: {e}")) + } + + /// Returns a mutable raw pointer to the buffer. + pub fn as_mut_ptr(&mut self) -> VortexResult<*mut u8> { + self.inner + .as_mut_ptr() + .map_err(|e| vortex_err!("failed to access pinned host buffer: {e}")) + } + + /// Returns the CUDA context that owns this allocation. + pub fn context(&self) -> &Arc { + self.inner.context() + } +} + +/// A simple pinned host buffer pool keyed by allocation size. +pub struct PinnedByteBufferPool { + ctx: Arc, + max_keep_per_size: usize, + buckets: Mutex>>, +} + +impl PinnedByteBufferPool { + /// Create a new pool with default limits. + pub fn new(ctx: Arc) -> Self { + Self::with_limits(ctx, 4) + } + + /// Create a new pool with a maximum number of cached buffers per size. + pub fn with_limits(ctx: Arc, max_keep_per_size: usize) -> Self { + Self { + ctx, + max_keep_per_size: max_keep_per_size.max(1), + buckets: Mutex::new(HashMap::new()), + } + } + + /// Acquire a pinned buffer of the given size in bytes. + pub fn get(&self, len: usize) -> VortexResult { + let mut buckets = self.buckets.lock(); + if let Some(bucket) = buckets.get_mut(&len) + && let Some(buf) = bucket.pop() + { + return Ok(buf); + } + unsafe { PinnedByteBuffer::uninit(&self.ctx, len) } + } + + /// Return a buffer to the pool. + pub fn put(&self, buf: PinnedByteBuffer) -> VortexResult<()> { + let len = buf.len(); + let mut buckets = self.buckets.lock(); + let bucket = buckets.entry(len).or_default(); + if bucket.len() < self.max_keep_per_size { + bucket.push(buf); + } + Ok(()) + } +} From df4563990f5ea71aa6fddc0394950347d242a2fc Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Tue, 20 Jan 2026 12:53:06 +0000 Subject: [PATCH 05/12] WriteTarget + read_at_into Signed-off-by: Onur Satici --- Cargo.lock | 2 + vortex-cuda/Cargo.toml | 1 + vortex-cuda/src/lib.rs | 1 + vortex-cuda/src/pinned.rs | 140 +++++++++++++++++++++++++++++ vortex-io/Cargo.toml | 1 + vortex-io/src/file/object_store.rs | 71 +++++++++++++++ vortex-io/src/file/std_file.rs | 20 +++++ vortex-io/src/lib.rs | 2 + vortex-io/src/read.rs | 61 +++++++++++++ vortex-io/src/write_target.rs | 36 ++++++++ 10 files changed, 335 insertions(+) create mode 100644 vortex-io/src/write_target.rs diff --git a/Cargo.lock b/Cargo.lock index 10c402b80a1..06e13137e03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10266,6 +10266,7 @@ name = "vortex-cuda" version = "0.1.0" dependencies = [ "async-trait", + "bytes", "criterion", "cudarc", "parking_lot", @@ -10604,6 +10605,7 @@ dependencies = [ "tempfile", "tokio", "tracing", + "vortex-array", "vortex-buffer", "vortex-error", "vortex-metrics", diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index 1aa405f8d58..ce3372b587c 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -18,6 +18,7 @@ workspace = true [dependencies] async-trait = { workspace = true } +bytes = { workspace = true } cudarc = { workspace = true } parking_lot = { workspace = true } tracing = { workspace = true } diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index 1d87c8b8dc8..6ad59745fd4 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -15,6 +15,7 @@ pub use executor::CudaExecutionCtx; pub use executor::CudaKernelEvents; pub use pinned::PinnedByteBuffer; pub use pinned::PinnedByteBufferPool; +pub use pinned::PooledPinnedBuffer; use for_::ForExecutor; pub use session::CudaSession; diff --git a/vortex-cuda/src/pinned.rs b/vortex-cuda/src/pinned.rs index 6e88cb4ec97..5b5cbd28dc7 100644 --- a/vortex-cuda/src/pinned.rs +++ b/vortex-cuda/src/pinned.rs @@ -3,11 +3,14 @@ use std::sync::Arc; +use bytes::Bytes; use cudarc::driver::CudaContext; use cudarc::driver::PinnedHostSlice; use parking_lot::Mutex; +use vortex_buffer::ByteBuffer; use vortex_error::VortexResult; use vortex_error::vortex_err; +use vortex_error::vortex_panic; use vortex_utils::aliases::hash_map::HashMap; /// A page-locked host buffer allocated by CUDA. @@ -118,4 +121,141 @@ impl PinnedByteBufferPool { } Ok(()) } + + /// Get a pooled pinned buffer that will be returned to the pool on drop. + pub fn get_pooled(self: &Arc, len: usize) -> VortexResult { + let inner = self.get(len)?; + Ok(PooledPinnedBuffer { + inner: Some(inner), + pool: self.clone(), + }) + } +} + +/// A pinned buffer that is returned to its pool when dropped. +/// +/// This wrapper owns a [`PinnedByteBuffer`] and ensures it gets returned to the +/// [`PinnedByteBufferPool`] when the buffer is no longer needed. This enables efficient +/// buffer reuse for I/O operations. +pub struct PooledPinnedBuffer { + inner: Option, + pool: Arc, +} + +impl PooledPinnedBuffer { + /// Create a new pooled buffer. + pub fn new(inner: PinnedByteBuffer, pool: Arc) -> Self { + Self { + inner: Some(inner), + pool, + } + } + + /// Returns the length of the buffer in bytes. + pub fn len(&self) -> usize { + self.inner.as_ref().map(|b| b.len()).unwrap_or(0) + } + + /// Returns true if the buffer is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Returns the buffer as a mutable slice. + /// + /// # Panics + /// + /// Panics if the buffer has already been consumed or if the CUDA context is invalid. + pub fn as_mut_slice(&mut self) -> &mut [u8] { + let inner = self + .inner + .as_mut() + .unwrap_or_else(|| vortex_panic!("buffer already consumed")); + inner + .as_mut_slice() + .unwrap_or_else(|e| vortex_panic!("failed to access pinned host buffer: {e}")) + } + + /// Convert this pooled buffer into a [`ByteBuffer`]. + /// + /// The returned buffer will return the underlying pinned memory to the pool when dropped. + /// This enables zero-copy conversion to the standard Vortex buffer type while maintaining + /// pool-based memory reuse. + pub fn into_byte_buffer(mut self) -> ByteBuffer { + let inner = self + .inner + .take() + .unwrap_or_else(|| vortex_panic!("buffer already consumed")); + let len = inner.len(); + let pool = self.pool.clone(); + + // Create a wrapper that will return the buffer to the pool on drop + let wrapper = PooledPinnedBufferOwner::new(inner, pool); + + // Use Bytes::from_owner to create a Bytes that owns the wrapper + let bytes = Bytes::from_owner(wrapper); + + // The ByteBuffer should have the full length + assert_eq!(bytes.len(), len); + + ByteBuffer::from(bytes) + } +} + +impl Drop for PooledPinnedBuffer { + fn drop(&mut self) { + if let Some(inner) = self.inner.take() { + // Return the buffer to the pool, ignoring errors + drop(self.pool.put(inner)); + } + } +} + +/// Internal wrapper that owns a PinnedByteBuffer and returns it to the pool on drop. +/// +/// This is used by `Bytes::from_owner` to manage the lifecycle of pooled pinned buffers. +struct PooledPinnedBufferOwner { + // We use Option so we can take the buffer out in Drop + inner: Mutex>, + // Cached pointer and length for AsRef implementation + ptr: *const u8, + len: usize, + pool: Arc, +} + +// SAFETY: The pinned buffer is allocated by CUDA and is safe to send across threads. +// The pointer is derived from the buffer and remains valid as long as the buffer exists. +unsafe impl Send for PooledPinnedBufferOwner {} +unsafe impl Sync for PooledPinnedBufferOwner {} + +impl PooledPinnedBufferOwner { + fn new(inner: PinnedByteBuffer, pool: Arc) -> Self { + let ptr = inner + .as_ptr() + .unwrap_or_else(|e| vortex_panic!("failed to get pointer to pinned buffer: {e}")); + let len = inner.len(); + Self { + inner: Mutex::new(Some(inner)), + ptr, + len, + pool, + } + } +} + +impl AsRef<[u8]> for PooledPinnedBufferOwner { + fn as_ref(&self) -> &[u8] { + // SAFETY: The pointer and length were captured when the buffer was created + // and remain valid as long as this struct exists (buffer is in the Mutex). + unsafe { std::slice::from_raw_parts(self.ptr, self.len) } + } +} + +impl Drop for PooledPinnedBufferOwner { + fn drop(&mut self) { + // Take the buffer out and return it to the pool + if let Some(buffer) = self.inner.lock().take() { + drop(self.pool.put(buffer)); + } + } } diff --git a/vortex-io/Cargo.toml b/vortex-io/Cargo.toml index cef1c69e351..5a98bd28528 100644 --- a/vortex-io/Cargo.toml +++ b/vortex-io/Cargo.toml @@ -35,6 +35,7 @@ handle = "1.0.2" tokio = { workspace = true, features = ["io-util", "rt", "sync"] } tracing = { workspace = true } vortex-buffer = { workspace = true } +vortex-array = { workspace = true } vortex-error = { workspace = true } vortex-metrics = { workspace = true } vortex-session = { workspace = true } diff --git a/vortex-io/src/file/object_store.rs b/vortex-io/src/file/object_store.rs index 0d09cbdcd2b..30d8ba593f4 100644 --- a/vortex-io/src/file/object_store.rs +++ b/vortex-io/src/file/object_store.rs @@ -13,6 +13,7 @@ use object_store::GetRange; use object_store::GetResultPayload; use object_store::ObjectStore; use object_store::path::Path as ObjectPath; +use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; @@ -22,6 +23,7 @@ use vortex_error::vortex_ensure; use crate::CoalesceConfig; use crate::VortexReadAt; +use crate::WriteTarget; #[cfg(not(target_arch = "wasm32"))] use crate::file::std_file::read_exact_at; use crate::runtime::Handle; @@ -165,4 +167,73 @@ impl VortexReadAt for ObjectStoreSource { }) .boxed() } + + fn read_at_into( + &self, + offset: u64, + mut target: Box, + ) -> BoxFuture<'static, VortexResult> { + let store = self.store.clone(); + let path = self.path.clone(); + let handle = self.handle.clone(); + let length = target.len(); + let range = offset..(offset + length as u64); + + Compat::new(async move { + let response = store + .get_opts( + &path, + GetOptions { + range: Some(GetRange::Bounded(range.clone())), + ..Default::default() + }, + ) + .await?; + + match response.payload { + #[cfg(not(target_arch = "wasm32"))] + GetResultPayload::File(file, _) => { + target = handle + .spawn_blocking(move || { + let mut target = target; + read_exact_at(&file, target.as_mut_slice(), range.start)?; + Ok::<_, io::Error>(target) + }) + .await + .map_err(io::Error::other)?; + } + #[cfg(target_arch = "wasm32")] + GetResultPayload::File(..) => { + unreachable!("File payload not supported on wasm32") + } + GetResultPayload::Stream(mut byte_stream) => { + let mut filled = 0usize; + while let Some(bytes) = byte_stream.next().await { + let bytes = bytes?; + let end = filled + bytes.len(); + vortex_ensure!( + end <= length, + "Object store stream returned more bytes than expected (expected {} bytes, got at least {} bytes, range: {:?})", + length, + end, + range + ); + target.as_mut_slice()[filled..end].copy_from_slice(&bytes); + filled = end; + } + + vortex_ensure!( + filled == length, + "Object store stream returned {} bytes but expected {} bytes (range: {:?})", + filled, + length, + range + ); + } + } + + Ok(target.into_handle()) + }) + .boxed() + } } diff --git a/vortex-io/src/file/std_file.rs b/vortex-io/src/file/std_file.rs index 56abd56eb60..3db3e4c24a5 100644 --- a/vortex-io/src/file/std_file.rs +++ b/vortex-io/src/file/std_file.rs @@ -15,6 +15,7 @@ use std::sync::Arc; use futures::FutureExt; use futures::future::BoxFuture; +use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; @@ -22,6 +23,7 @@ use vortex_error::VortexResult; use crate::CoalesceConfig; use crate::VortexReadAt; +use crate::WriteTarget; use crate::runtime::Handle; /// Read exactly `buffer.len()` bytes from `file` starting at `offset`. @@ -122,4 +124,22 @@ impl VortexReadAt for FileReadAdapter { } .boxed() } + + fn read_at_into( + &self, + offset: u64, + mut target: Box, + ) -> BoxFuture<'static, VortexResult> { + let file = self.file.clone(); + let handle = self.handle.clone(); + async move { + handle + .spawn_blocking(move || { + read_exact_at(&file, target.as_mut_slice(), offset)?; + Ok(target.into_handle()) + }) + .await + } + .boxed() + } } diff --git a/vortex-io/src/lib.rs b/vortex-io/src/lib.rs index 6a08c821c8f..029c8ea3bc8 100644 --- a/vortex-io/src/lib.rs +++ b/vortex-io/src/lib.rs @@ -15,6 +15,7 @@ pub use limit::*; #[cfg(feature = "object_store")] pub use object_store::*; pub use read::*; +pub use write_target::*; pub use write::*; pub mod file; @@ -24,6 +25,7 @@ mod limit; #[cfg(feature = "object_store")] mod object_store; mod read; +mod write_target; pub mod runtime; pub mod session; #[cfg(feature = "tokio")] diff --git a/vortex-io/src/read.rs b/vortex-io/src/read.rs index ed8ddecad39..1e235844568 100644 --- a/vortex-io/src/read.rs +++ b/vortex-io/src/read.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use futures::FutureExt; use futures::future::BoxFuture; +use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; use vortex_error::VortexExpect; @@ -15,6 +16,8 @@ use vortex_metrics::Histogram; use vortex_metrics::Timer; use vortex_metrics::VortexMetrics; +use crate::WriteTarget; + /// Configuration for coalescing nearby I/O requests into single operations. #[derive(Clone, Copy, Debug)] pub struct CoalesceConfig { @@ -81,6 +84,24 @@ pub trait VortexReadAt: Send + Sync + 'static { length: usize, alignment: Alignment, ) -> BoxFuture<'static, VortexResult>; + + /// Read into a pre-allocated target buffer. + /// + /// The default implementation reads into a temporary buffer and copies into the target. + fn read_at_into( + &self, + offset: u64, + mut target: Box, + ) -> BoxFuture<'static, VortexResult> { + let len = target.len(); + let read_fut = self.read_at(offset, len, Alignment::none()); + async move { + let data = read_fut.await?; + target.as_mut_slice().copy_from_slice(data.as_ref()); + Ok(target.into_handle()) + } + .boxed() + } } impl VortexReadAt for Arc { @@ -108,6 +129,14 @@ impl VortexReadAt for Arc { ) -> BoxFuture<'static, VortexResult> { self.as_ref().read_at(offset, length, alignment) } + + fn read_at_into( + &self, + offset: u64, + target: Box, + ) -> BoxFuture<'static, VortexResult> { + self.as_ref().read_at_into(offset, target) + } } impl VortexReadAt for Arc { @@ -136,6 +165,14 @@ impl VortexReadAt for Arc { self.as_ref().read_at(offset, length, alignment) } + fn read_at_into( + &self, + offset: u64, + target: Box, + ) -> BoxFuture<'static, VortexResult> { + self.as_ref().read_at_into(offset, target) + } + // fn drive(self: Arc, requests: BoxStream<'static, IoRequest>) -> BoxFuture<'static, ()> { // // Delegate to the inner implementation's drive // let inner: Arc = Arc::clone(&self); @@ -176,6 +213,30 @@ impl VortexReadAt for ByteBuffer { } .boxed() } + + fn read_at_into( + &self, + offset: u64, + mut target: Box, + ) -> BoxFuture<'static, VortexResult> { + let buffer = self.clone(); + async move { + let start = usize::try_from(offset).vortex_expect("start too big for usize"); + let end = usize::try_from(offset + target.len() as u64) + .vortex_expect("end too big for usize"); + if end > buffer.len() { + vortex_bail!( + "Requested range {}..{} out of bounds for buffer of length {}", + start, + end, + buffer.len() + ); + } + target.as_mut_slice().copy_from_slice(&buffer.as_ref()[start..end]); + Ok(target.into_handle()) + } + .boxed() + } } /// A wrapper that instruments a [`VortexReadAt`] with metrics. diff --git a/vortex-io/src/write_target.rs b/vortex-io/src/write_target.rs new file mode 100644 index 00000000000..7ee4685335b --- /dev/null +++ b/vortex-io/src/write_target.rs @@ -0,0 +1,36 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::buffer::BufferHandle; +use vortex_buffer::ByteBufferMut; + +/// A destination for I/O reads that can be finalized into a [`BufferHandle`]. +pub trait WriteTarget: Send + 'static { + /// Returns the buffer as a mutable slice. + fn as_mut_slice(&mut self) -> &mut [u8]; + + /// Returns the length of the buffer in bytes. + fn len(&self) -> usize; + + /// Returns true if the buffer is empty. + fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Finalize the target into a buffer handle. + fn into_handle(self: Box) -> BufferHandle; +} + +impl WriteTarget for ByteBufferMut { + fn as_mut_slice(&mut self) -> &mut [u8] { + self.as_mut() + } + + fn len(&self) -> usize { + ByteBufferMut::len(self) + } + + fn into_handle(self: Box) -> BufferHandle { + BufferHandle::new_host(self.freeze()) + } +} From cf0312b0ce2c7a27a89b457a68da7a3d85da170b Mon Sep 17 00:00:00 2001 From: Onur Satici Date: Tue, 20 Jan 2026 13:26:39 +0000 Subject: [PATCH 06/12] BufferAllocator Signed-off-by: Onur Satici --- vortex-file/src/open.rs | 25 +++++++++++++----- vortex-file/src/segments/source.rs | 42 +++++++++++++++++++++++++++--- vortex-io/src/allocator.rs | 25 ++++++++++++++++++ vortex-io/src/lib.rs | 2 ++ 4 files changed, 85 insertions(+), 9 deletions(-) create mode 100644 vortex-io/src/allocator.rs diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index 03232856531..6a8b23aecef 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -12,6 +12,7 @@ use vortex_dtype::DType; use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_io::BufferAllocator; use vortex_io::InstrumentedReadAt; use vortex_io::VortexReadAt; use vortex_io::session::RuntimeSessionExt; @@ -53,6 +54,8 @@ pub struct VortexOpenOptions { footer: Option