Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions vortex-btrblocks/src/integer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_err;
use vortex_fastlanes::DeltaArray;
use vortex_fastlanes::FoRArray;
use vortex_fastlanes::bitpack_compress::bit_width_histogram;
use vortex_fastlanes::bitpack_compress::bitpack_encode;
use vortex_fastlanes::bitpack_compress::find_best_bit_width;
use vortex_fastlanes::delta_compress;
use vortex_runend::RunEndArray;
use vortex_runend::compress::runend_encode;
use vortex_scalar::Scalar;
Expand Down Expand Up @@ -62,6 +64,7 @@ impl Compressor for IntCompressor {
&RunEndScheme,
&SequenceScheme,
&RLE_INTEGER_SCHEME,
&DeltaScheme,
]
}

Expand Down Expand Up @@ -118,6 +121,7 @@ const DICT_SCHEME: IntCode = IntCode(6);
const RUN_END_SCHEME: IntCode = IntCode(7);
const SEQUENCE_SCHEME: IntCode = IntCode(8);
const RUN_LENGTH_SCHEME: IntCode = IntCode(9);
const DELTA_SCHEME: IntCode = IntCode(10);

#[derive(Debug, Copy, Clone)]
pub struct UncompressedScheme;
Expand Down Expand Up @@ -146,6 +150,9 @@ pub struct RunEndScheme;
#[derive(Debug, Copy, Clone)]
pub struct SequenceScheme;

#[derive(Debug, Copy, Clone)]
pub struct DeltaScheme;

/// Threshold for the average run length in an array before we consider run-end encoding.
const RUN_END_THRESHOLD: u32 = 4;

Expand Down Expand Up @@ -768,6 +775,125 @@ impl Scheme for SequenceScheme {
}
}

impl Scheme for DeltaScheme {
type StatsType = IntegerStats;
type CodeType = IntCode;

fn code(&self) -> IntCode {
DELTA_SCHEME
}

fn expected_compression_ratio(
&self,
stats: &IntegerStats,
is_sample: bool,
allowed_cascading: usize,
excludes: &[IntCode],
) -> VortexResult<f64> {
// Delta requires cascading (deltas need to be compressed)
if allowed_cascading == 0 {
return Ok(0.0);
}

// Skip empty/all-null arrays
if stats.value_count == 0 {
return Ok(0.0);
}

// Delta encoding only supports unsigned integers.
// The delta_compress function and DeltaArray only work with unsigned types.
if stats.src.ptype().is_signed_int() {
return Ok(0.0);
}

// Single element arrays don't benefit from delta encoding
if stats.value_count <= 1 {
return Ok(0.0);
}

// Check if data range suggests delta would help.
// Constant data should use ConstantScheme instead.
let range = stats.typed.max_minus_min();
if range == 0 {
return Ok(0.0);
}

// Delta encoding is most useful when min is large (there's a "base" to factor out)
// and range is small relative to the original bit width. If min is zero,
// FOR + BitPacking can handle this case equally well.
if stats.typed.min_is_zero() {
return Ok(0.0);
}

// Estimate: if average delta fits in fewer bits, delta helps
let value_count = stats.value_count as u64;
let avg_delta = range / (value_count - 1);
let original_bits: u32 = stats
.src
.ptype()
.bit_width()
.try_into()
.vortex_expect("bit width must fit in u32");
let delta_bits = if avg_delta == 0 {
1
} else {
avg_delta.ilog2() + 1
};

// Only consider if we save at least 25% bits
if original_bits <= delta_bits + (original_bits / 4) {
return Ok(0.0);
}

// Use sample-based estimation for accurate ratio
estimate_compression_ratio_with_sampling(
self,
stats,
is_sample,
allowed_cascading,
excludes,
)
}

fn compress(
&self,
stats: &IntegerStats,
is_sample: bool,
allowed_cascading: usize,
excludes: &[IntCode],
) -> VortexResult<ArrayRef> {
assert!(allowed_cascading > 0);

let (bases, deltas) = delta_compress(&stats.src)?;

// Exclude schemes that don't work well with delta components
let mut new_excludes = vec![
DELTA_SCHEME, // Prevent recursion
DICT_SCHEME, // Deltas have high cardinality
];
new_excludes.extend_from_slice(excludes);

// Bases: small array, compress recursively
let compressed_bases = IntCompressor::compress_no_dict(
&bases,
is_sample,
allowed_cascading - 1,
&[DELTA_SCHEME],
)?;

// Deltas: use no-dict compression (high cardinality)
let compressed_deltas = IntCompressor::compress_no_dict(
&deltas,
is_sample,
allowed_cascading - 1,
&new_excludes,
)?;

DeltaArray::try_from_delta_compress_parts(compressed_bases, compressed_deltas)
.map(DeltaArray::into_array)
}
}

#[cfg(test)]
mod tests {
use std::iter;
Expand Down Expand Up @@ -795,6 +921,7 @@ mod tests {
use crate::CompressorStats;
use crate::FloatCompressor;
use crate::Scheme;
use crate::integer::DeltaScheme;
use crate::integer::IntCompressor;
use crate::integer::IntegerStats;
use crate::integer::RLE_INTEGER_SCHEME;
Expand Down Expand Up @@ -908,6 +1035,25 @@ mod tests {
assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
}

#[test]
fn test_delta_compression_roundtrip() {
// Monotonically increasing values with small jitter - ideal for delta encoding
// but NOT a perfect arithmetic sequence (so SequenceScheme won't intercept)
let values: Vec<u64> = (0..2000)
.map(|i| 1_000_000_000_u64 + i * 2 + (i % 3))
.collect();

let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
let compressed = DeltaScheme
.compress(&IntegerStats::generate(&array), false, 3, &[])
.unwrap();

// Verify roundtrip
let decoded = compressed;
let expected = Buffer::copy_from(&values).into_array();
assert_arrays_eq!(decoded.as_ref(), expected.as_ref());
}

#[test_with::env(CI)]
#[test_with::no_env(VORTEX_SKIP_SLOW_TESTS)]
fn compress_large_int() -> VortexResult<()> {
Expand Down Expand Up @@ -937,6 +1083,7 @@ mod scheme_selection_tests {
use vortex_array::validity::Validity;
use vortex_buffer::Buffer;
use vortex_fastlanes::BitPackedVTable;
use vortex_fastlanes::DeltaVTable;
use vortex_fastlanes::FoRVTable;
use vortex_fastlanes::RLEVTable;
use vortex_runend::RunEndVTable;
Expand Down Expand Up @@ -1040,4 +1187,21 @@ mod scheme_selection_tests {
let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap();
assert!(compressed.is::<RLEVTable>());
}

#[test]
fn test_delta_compressed() {
// Timestamps with small but varying increments - ideal for delta encoding
// Large base values (1_000_000_000) with small deltas (1-4)
// NOT a perfect arithmetic sequence (so SequenceScheme won't intercept)
let values: Vec<u64> = (0..2000)
.map(|i| 1_000_000_000_u64 + i * 2 + (i % 3))
.collect();
let array = PrimitiveArray::new(Buffer::copy_from(&values), Validity::NonNullable);
let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap();
assert!(
compressed.is::<DeltaVTable>(),
"Expected DeltaVTable but got {}",
compressed.display_tree()
);
}
}
Loading