diff --git a/vortex-btrblocks/src/integer.rs b/vortex-btrblocks/src/integer.rs index 24a778d6717..176d25516aa 100644 --- a/vortex-btrblocks/src/integer.rs +++ b/vortex-btrblocks/src/integer.rs @@ -21,10 +21,12 @@ use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_bail; use vortex_error::vortex_err; +use vortex_fastlanes::DeltaArray; use vortex_fastlanes::FoRArray; use vortex_fastlanes::bitpack_compress::bit_width_histogram; use vortex_fastlanes::bitpack_compress::bitpack_encode; use vortex_fastlanes::bitpack_compress::find_best_bit_width; +use vortex_fastlanes::delta_compress; use vortex_runend::RunEndArray; use vortex_runend::compress::runend_encode; use vortex_scalar::Scalar; @@ -62,6 +64,7 @@ impl Compressor for IntCompressor { &RunEndScheme, &SequenceScheme, &RLE_INTEGER_SCHEME, + &DeltaScheme, ] } @@ -118,6 +121,7 @@ const DICT_SCHEME: IntCode = IntCode(6); const RUN_END_SCHEME: IntCode = IntCode(7); const SEQUENCE_SCHEME: IntCode = IntCode(8); const RUN_LENGTH_SCHEME: IntCode = IntCode(9); +const DELTA_SCHEME: IntCode = IntCode(10); #[derive(Debug, Copy, Clone)] pub struct UncompressedScheme; @@ -146,6 +150,9 @@ pub struct RunEndScheme; #[derive(Debug, Copy, Clone)] pub struct SequenceScheme; +#[derive(Debug, Copy, Clone)] +pub struct DeltaScheme; + /// Threshold for the average run length in an array before we consider run-end encoding. const RUN_END_THRESHOLD: u32 = 4; @@ -768,6 +775,125 @@ impl Scheme for SequenceScheme { } } +impl Scheme for DeltaScheme { + type StatsType = IntegerStats; + type CodeType = IntCode; + + fn code(&self) -> IntCode { + DELTA_SCHEME + } + + fn expected_compression_ratio( + &self, + stats: &IntegerStats, + is_sample: bool, + allowed_cascading: usize, + excludes: &[IntCode], + ) -> VortexResult { + // Delta requires cascading (deltas need to be compressed) + if allowed_cascading == 0 { + return Ok(0.0); + } + + // Skip empty/all-null arrays + if stats.value_count == 0 { + return Ok(0.0); + } + + // Delta encoding only supports unsigned integers. + // The delta_compress function and DeltaArray only work with unsigned types. + if stats.src.ptype().is_signed_int() { + return Ok(0.0); + } + + // Single element arrays don't benefit from delta encoding + if stats.value_count <= 1 { + return Ok(0.0); + } + + // Check if data range suggests delta would help. + // Constant data should use ConstantScheme instead. + let range = stats.typed.max_minus_min(); + if range == 0 { + return Ok(0.0); + } + + // Delta encoding is most useful when min is large (there's a "base" to factor out) + // and range is small relative to the original bit width. If min is zero, + // FOR + BitPacking can handle this case equally well. + if stats.typed.min_is_zero() { + return Ok(0.0); + } + + // Estimate: if average delta fits in fewer bits, delta helps + let value_count = stats.value_count as u64; + let avg_delta = range / (value_count - 1); + let original_bits: u32 = stats + .src + .ptype() + .bit_width() + .try_into() + .vortex_expect("bit width must fit in u32"); + let delta_bits = if avg_delta == 0 { + 1 + } else { + avg_delta.ilog2() + 1 + }; + + // Only consider if we save at least 25% bits + if original_bits <= delta_bits + (original_bits / 4) { + return Ok(0.0); + } + + // Use sample-based estimation for accurate ratio + estimate_compression_ratio_with_sampling( + self, + stats, + is_sample, + allowed_cascading, + excludes, + ) + } + + fn compress( + &self, + stats: &IntegerStats, + is_sample: bool, + allowed_cascading: usize, + excludes: &[IntCode], + ) -> VortexResult { + assert!(allowed_cascading > 0); + + let (bases, deltas) = delta_compress(&stats.src)?; + + // Exclude schemes that don't work well with delta components + let mut new_excludes = vec![ + DELTA_SCHEME, // Prevent recursion + DICT_SCHEME, // Deltas have high cardinality + ]; + new_excludes.extend_from_slice(excludes); + + // Bases: small array, compress recursively + let compressed_bases = IntCompressor::compress_no_dict( + &bases, + is_sample, + allowed_cascading - 1, + &[DELTA_SCHEME], + )?; + + // Deltas: use no-dict compression (high cardinality) + let compressed_deltas = IntCompressor::compress_no_dict( + &deltas, + is_sample, + allowed_cascading - 1, + &new_excludes, + )?; + + DeltaArray::try_from_delta_compress_parts(compressed_bases, compressed_deltas) + .map(DeltaArray::into_array) + } +} + #[cfg(test)] mod tests { use std::iter; @@ -795,6 +921,7 @@ mod tests { use crate::CompressorStats; use crate::FloatCompressor; use crate::Scheme; + use crate::integer::DeltaScheme; use crate::integer::IntCompressor; use crate::integer::IntegerStats; use crate::integer::RLE_INTEGER_SCHEME; @@ -908,6 +1035,25 @@ mod tests { assert_arrays_eq!(decoded.as_ref(), expected.as_ref()); } + #[test] + fn test_delta_compression_roundtrip() { + // Monotonically increasing values with small jitter - ideal for delta encoding + // but NOT a perfect arithmetic sequence (so SequenceScheme won't intercept) + let values: Vec = (0..2000) + .map(|i| 1_000_000_000_u64 + i * 2 + (i % 3)) + .collect(); + + let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); + let compressed = DeltaScheme + .compress(&IntegerStats::generate(&array), false, 3, &[]) + .unwrap(); + + // Verify roundtrip + let decoded = compressed; + let expected = Buffer::copy_from(&values).into_array(); + assert_arrays_eq!(decoded.as_ref(), expected.as_ref()); + } + #[test_with::env(CI)] #[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)] fn compress_large_int() -> VortexResult<()> { @@ -937,6 +1083,7 @@ mod scheme_selection_tests { use vortex_array::validity::Validity; use vortex_buffer::Buffer; use vortex_fastlanes::BitPackedVTable; + use vortex_fastlanes::DeltaVTable; use vortex_fastlanes::FoRVTable; use vortex_fastlanes::RLEVTable; use vortex_runend::RunEndVTable; @@ -1040,4 +1187,21 @@ mod scheme_selection_tests { let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap(); assert!(compressed.is::()); } + + #[test] + fn test_delta_compressed() { + // Timestamps with small but varying increments - ideal for delta encoding + // Large base values (1_000_000_000) with small deltas (1-4) + // NOT a perfect arithmetic sequence (so SequenceScheme won't intercept) + let values: Vec = (0..2000) + .map(|i| 1_000_000_000_u64 + i * 2 + (i % 3)) + .collect(); + let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable); + let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap(); + assert!( + compressed.is::(), + "Expected DeltaVTable but got {}", + compressed.display_tree() + ); + } }