diff --git a/Cargo.toml b/Cargo.toml index c900a1b..443801f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "signet-libmdbx" description = "Idiomatic and safe MDBX wrapper" -version = "0.5.0" +version = "0.6.0" edition = "2024" rust-version = "1.92" license = "MIT OR Apache-2.0" @@ -41,6 +41,7 @@ thiserror = "2.0.18" tracing = "0.1.44" dashmap = { version = "6.1.0", features = ["inline"], optional = true } +auto_impl = "1.3.0" [features] default = [] diff --git a/ISSUE_BENCH_REGRESSION.md b/ISSUE_BENCH_REGRESSION.md new file mode 100644 index 0000000..a92a98c --- /dev/null +++ b/ISSUE_BENCH_REGRESSION.md @@ -0,0 +1,202 @@ +# Issue: Cursor and Iterator Benchmark Regression + +the baseline `cursor::traverse::raw` is 606.19 ns. + +this branch causes `cursor::traverse::iter::single_thread` performance to +degrade from 784.02 ns to 995.32 ns + +and `cursor:traverse::iter` perf to degrade from 847.02 ns to 1,015.2 tns + +Given that both of these benchmarks have similar degradation, the root cause +is likely the same. + +## Summary + +Cursor and iterator benchmarks have regressed. The root cause is mutex lock +acquisition on every cursor operation for synchronized transactions. + +## Root Cause + +### Hot Path Analysis + +Every iterator step calls `execute_op` (iter.rs:215-238): + +```rust +fn execute_op(&self) -> ReadResult> { + let access = self.cursor.access(); + access.with_txn_ptr(|txn| { // ← Called for EVERY item + ffi::mdbx_cursor_get(...) + })? +} +``` + +For `PtrSyncInner` (synchronized transactions), `with_txn_ptr` acquires a +mutex (access.rs:643-653): + +```rust +fn with_txn_ptr(&self, f: F) -> MdbxResult { + let timeout_flag = self.lock(); // ← MUTEX LOCK per item + if *timeout_flag { + return Err(MdbxError::ReadTransactionTimeout); + } + let result = f(self.txn); + Ok(result) +} +``` + +### Impact + +With 100 items in the benchmark: + +- **100+ mutex lock/unlock cycles** in a tight loop +- Each lock involves atomic operations and potential thread contention +- Completely dominates the actual FFI call time + +### Why single_thread Benchmarks Are Faster + +`TxUnsync` uses `RoGuard` which has no mutex (access.rs:396-415): + +```rust +// RoGuard::with_txn_ptr - no mutex, just Arc operations +fn with_txn_ptr(&self, f: F) -> MdbxResult { + if let Some(strong) = self.try_ref() { // Arc upgrade (atomic, no mutex) + return Ok(f(strong.ptr)); + } + Err(MdbxError::ReadTransactionTimeout) +} +``` + +Arc atomic operations are much cheaper than mutex lock/unlock. + +### Comparison with Raw FFI + +The raw benchmark (cursor.rs:219-238) shows baseline performance: + +```rust +while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 { + // No locking, no checks, just FFI +} +``` + +## Benchmark Structure + +``` +benches/cursor.rs: + cursor::traverse::iter - sync tx, uses PtrSyncInner (slow) + cursor::traverse::iter_x3 - sync tx, 3 iterations + cursor::traverse::for_loop - sync tx, explicit loop + cursor::traverse::raw - raw FFI baseline (fast) + cursor::traverse::iter::single_thread - unsync tx, RoGuard (faster) + cursor::traverse::iter_x3::single_thread - unsync tx, 3 iterations + cursor::traverse::for_loop::single_thread - unsync tx, explicit loop +``` + +## Potential Fixes + +### Option 1: Guarded Iteration + +Hold lock for entire iteration session: + +```rust +impl Iter { + fn with_guard(&mut self, f: F) -> MdbxResult + where + F: FnOnce(&mut Self) -> R, + { + let _guard = self.cursor.access().try_guard()?; + Ok(f(self)) + } +} + +// Usage: hold lock, iterate all items +cursor.iter().with_guard(|iter| { + for item in iter { ... } +}) +``` + +### Option 2: Cached Validity Check + +Cache timeout check result per iteration batch: + +```rust +struct Iter { + // ... existing fields + validity_token: Option, +} + +impl Iter { + fn execute_op(&mut self) -> ReadResult<...> { + // Revalidate periodically, not every call + if self.validity_token.is_none() || self.validity_token.expired() { + self.validity_token = Some(self.cursor.access().validate()?); + } + // Fast path: use cached pointer + unsafe { ffi::mdbx_cursor_get(...) } + } +} +``` + +### Option 3: Lock-Free Fast Path for RO + +For read-only transactions that haven't timed out, skip locking: + +```rust +fn with_txn_ptr(&self, f: F) -> MdbxResult { + // Fast path: check timeout flag without lock (relaxed read) + if !self.timeout_flag.load(Ordering::Relaxed) { + return Ok(f(self.txn)); + } + // Slow path: acquire lock, recheck + let timeout_flag = self.lock(); + if *timeout_flag { + return Err(MdbxError::ReadTransactionTimeout); + } + Ok(f(self.txn)) +} +``` + +**Warning**: This has subtle correctness implications. The monitor sets the +flag while holding the lock, so a relaxed read could see stale data. Would +need careful analysis. + +### Option 4: Batch Operations + +Add batch cursor operations that acquire lock once: + +```rust +impl Cursor { + fn collect_n(&mut self, n: usize) -> MdbxResult> { + self.access().with_txn_ptr(|txn| { + let mut results = Vec::with_capacity(n); + for _ in 0..n { + // All FFI calls under single lock + match unsafe { ffi::mdbx_cursor_get(...) } { + 0 => results.push(decode(...)), + MDBX_NOTFOUND => break, + err => return Err(err), + } + } + Ok(results) + }) + } +} +``` + +## Key Files + +- `src/tx/iter.rs:215-238` - execute_op, the hot path +- `src/tx/access.rs:643-653` - PtrSyncInner::with_txn_ptr (mutex) +- `src/tx/access.rs:396-415` - RoGuard::with_txn_ptr (no mutex) +- `benches/cursor.rs` - benchmark definitions + +## Recommendation + +Option 1 (guarded iteration) is likely the cleanest solution: + +- Maintains correctness guarantees +- Single lock acquisition per iteration session +- Compatible with existing API (can be opt-in) +- Clear semantics: "I'm iterating, don't timeout during this" + +The guard would need to prevent timeout while held, similar to how +`SyncTxGuard` already works. diff --git a/ISSUE_READER_CLEANUP.md b/ISSUE_READER_CLEANUP.md new file mode 100644 index 0000000..4b1c023 --- /dev/null +++ b/ISSUE_READER_CLEANUP.md @@ -0,0 +1,126 @@ +# Issue: Readers Not Being Cleaned Up + +## Summary + +Read-only transactions (readers) are not being properly cleaned up. This +investigation documents the reader lifecycle and potential causes. + +## Architecture + +### Type Hierarchy + +``` +TxSync + └── inner: Arc> + ├── ptr: PtrSync + │ └── inner: Arc> ← actual txn pointer lives here + └── db_cache: SharedCache +``` + +### Reader Tracking (read-tx-timeouts feature) + +When `read-tx-timeouts` is enabled, the `TxnManager` maintains: + +- `active: DashMap, Instant, Option)>` - currently + active RO transactions +- `timed_out_not_aborted: DashSet` - transactions that timed out but + user hasn't dropped yet + +## Lifecycle + +### Creation (sync.rs:567-585) + +```rust +impl TxSync { + pub(crate) fn new(env: Environment) -> MdbxResult { + // 1. Create raw MDBX transaction via FFI + mdbx_txn_begin_ex(..., &mut txn, ...); + + // 2. Wrap in TxSync + let this = Self::new_from_ptr(env, txn); + + // 3. Clone PtrSync and store in tracking map + #[cfg(feature = "read-tx-timeouts")] + this.env().txn_manager().add_active_read_transaction(txn, this.inner.ptr.clone()); + // ^^^^^^^^^^^^^^^^^^^^ + // Arc refcount becomes 2 + + Ok(this) + } +} +``` + +### Expected Cleanup Path + +1. User drops `TxSync` +2. `Arc` refcount → 0 +3. `SyncInner::drop` runs (sync.rs:325-340): + - Calls `remove_active_read_transaction(ptr)` → removes from map + - Map's `PtrSync` clone is dropped → Arc refcount decreases +4. `SyncInner.ptr` (PtrSync) is dropped → Arc refcount → 0 +5. `PtrSyncInner::drop` runs (access.rs:667-692): + - Calls `remove_active_read_transaction` again (no-op, already removed) + - Calls `mdbx_txn_abort(ptr)` → **reader slot released** + +### Timeout Path (txn_manager.rs:231-318) + +The timeout monitor thread: + +1. Iterates `active` transactions +2. For transactions exceeding `max_duration`: + - Acquires lock on `PtrSyncInner` + - Calls `mdbx_txn_reset(ptr)` - **parks reader, does NOT release slot** + - Sets timeout flag to `true` + - Removes from `active` map + - Adds to `timed_out_not_aborted` set + +**Critical**: `mdbx_txn_reset` parks the reader but the reader slot remains +occupied. Only `mdbx_txn_abort` releases it. + +## Potential Issues + +### 1. Timed-out transactions with live handles + +If a transaction times out but the user still holds `TxSync`: + +- Monitor calls `mdbx_txn_reset` (reader parked, slot occupied) +- Monitor removes from `active`, adds to `timed_out_not_aborted` +- User still holds `TxSync` → `PtrSyncInner` still alive +- Reader slot stays occupied until user drops `TxSync` + +If users leak `TxSync` handles (never drop them), reader slots accumulate. + +### 2. Arc reference in map prevents cleanup + +The map stores `PtrSync` which holds `Arc>`. If +`SyncInner::drop` fails to remove from the map for any reason, the Arc +reference keeps `PtrSyncInner` alive, preventing `mdbx_txn_abort`. + +### 3. Double-removal timing + +Both `SyncInner::drop` and `PtrSyncInner::drop` call +`remove_active_read_transaction`. This is intentional (belt and suspenders) +but relies on the map operations being idempotent. + +## Key Files + +- `src/tx/sync.rs:567-585` - TxSync::new, adds to tracking +- `src/tx/sync.rs:325-340` - SyncInner::drop, removes from tracking +- `src/tx/access.rs:667-692` - PtrSyncInner::drop, calls mdbx_txn_abort +- `src/sys/txn_manager.rs:156-176` - add/remove tracking methods +- `src/sys/txn_manager.rs:231-318` - timeout monitor thread + +## Investigation Steps + +1. Add tracing to `add_active` and `remove_active` to verify calls match up +2. Check `Arc::strong_count` on `PtrSyncInner` at key points +3. Monitor `timed_out_not_aborted.len()` over time +4. Check if `TxSync` handles are being leaked (never dropped) +5. Verify `mdbx_txn_abort` is actually being called in `PtrSyncInner::drop` + +## Questions to Answer + +- Are `TxSync` handles being leaked somewhere? +- Is `SyncInner::drop` running for all transactions? +- Is `remove_active_read_transaction` succeeding? +- What's the state of `timed_out_not_aborted` over time? diff --git a/benches/transaction.rs b/benches/transaction.rs index 6996158..e946d1b 100644 --- a/benches/transaction.rs +++ b/benches/transaction.rs @@ -55,7 +55,7 @@ fn bench_get_rand_sync(c: &mut Criterion) { b.iter(|| { let mut i = 0usize; for key in &keys { - i += *txn.get::(db.dbi(), key.as_bytes()).unwrap().unwrap(); + i += *txn.get_owned::(db.dbi(), key.as_bytes()).unwrap().unwrap(); } black_box(i); }) @@ -75,7 +75,7 @@ fn bench_get_rand_unsync(c: &mut Criterion) { b.iter(|| { let mut i = 0usize; for key in &keys { - i += *txn.get::(db.dbi(), key.as_bytes()).unwrap().unwrap(); + i += *txn.get_owned::(db.dbi(), key.as_bytes()).unwrap().unwrap(); } black_box(i); }) diff --git a/src/codec.rs b/src/entries/codec.rs similarity index 70% rename from src/codec.rs rename to src/entries/codec.rs index 084313c..f37bc2e 100644 --- a/src/codec.rs +++ b/src/entries/codec.rs @@ -1,6 +1,10 @@ //! Codec for deserializing database values into Rust types. -use crate::{MdbxError, TransactionKind, error::ReadResult, tx::ops}; +use crate::{ + MdbxError, TxView, + error::ReadResult, + tx::{TxPtrAccess, ops}, +}; use ffi::MDBX_txn; use std::{borrow::Cow, slice}; @@ -25,9 +29,24 @@ pub trait TableObjectOwned: for<'de> TableObject<'de> { fn decode(data_val: &[u8]) -> ReadResult { >::decode_borrow(Cow::Borrowed(data_val)) } -} -impl TableObjectOwned for T where T: for<'de> TableObject<'de> {} + /// Decodes directly from MDBX_val without TxView wrapper. + /// + /// Used by owned iterators to avoid wrapper overhead. The default + /// implementation delegates to [`Self::decode`] via a slice. + /// + /// # Safety + /// + /// The data pointed to by `data_val` is only valid for the lifetime of + /// the transaction. The caller must ensure that `data_val` points to valid + /// MDBX data. + #[doc(hidden)] + #[inline(always)] + unsafe fn decode_val_owned(data_val: ffi::MDBX_val) -> ReadResult { + let s = unsafe { slice::from_raw_parts(data_val.iov_base as *const u8, data_val.iov_len) }; + Self::decode(s) + } +} /// Decodes values read from the database into Rust types. /// @@ -139,12 +158,14 @@ pub trait TableObject<'a>: Sized { /// they have exclusive access to the transaction if it is read-write. #[doc(hidden)] #[inline(always)] - unsafe fn decode_val( + unsafe fn decode_val( + access: &'a A, tx: *const MDBX_txn, data_val: ffi::MDBX_val, - ) -> ReadResult { - let cow = unsafe { Cow::<'a, [u8]>::decode_val::(tx, data_val)? }; - Self::decode_borrow(cow) + ) -> ReadResult> { + let cow = unsafe { Cow::<'a, [u8]>::decode_val::(access, tx, data_val)? }; + + cow.flat_map(Self::decode_borrow) } } @@ -154,18 +175,21 @@ impl<'a> TableObject<'a> for Cow<'a, [u8]> { } #[doc(hidden)] - unsafe fn decode_val( + unsafe fn decode_val( + access: &'a A, txn: *const MDBX_txn, data_val: ffi::MDBX_val, - ) -> ReadResult { + ) -> ReadResult> { // SAFETY: Caller ensures the tx is active, slice is valid for lifetime // 'a. let s = unsafe { slice::from_raw_parts(data_val.iov_base as *const u8, data_val.iov_len) }; // SAFETY: txn is valid from caller, data_val.iov_base points to db pages. - let is_dirty = (!K::IS_READ_ONLY) && unsafe { ops::is_dirty_raw(txn, data_val.iov_base) }?; + let is_dirty = + (!A::HAS_RUNTIME_CHECK) && unsafe { ops::is_dirty_raw(txn, data_val.iov_base) }?; - Ok(if is_dirty { Cow::Owned(s.to_vec()) } else { Cow::Borrowed(s) }) + let cow = if is_dirty { Cow::Owned(s.to_vec()) } else { Cow::Borrowed(s) }; + Ok(TxView::new(cow, access)) } } @@ -174,13 +198,24 @@ impl TableObject<'_> for Vec { Ok(data.into_owned()) } - unsafe fn decode_val( + unsafe fn decode_val<'a, A: TxPtrAccess>( + access: &'a A, _tx: *const MDBX_txn, data_val: ffi::MDBX_val, - ) -> ReadResult { + ) -> ReadResult> { // SAFETY: Caller ensures the tx is active, slice is valid for lifetime. // We always copy for Vec since we need to own the data. let s = unsafe { slice::from_raw_parts(data_val.iov_base as *const u8, data_val.iov_len) }; + Ok(TxView::new(s.to_vec(), access)) + } +} + +impl TableObjectOwned for Vec { + #[inline(always)] + unsafe fn decode_val_owned(data_val: ffi::MDBX_val) -> ReadResult { + // SAFETY: Caller ensures the tx is active, slice is valid. + // Direct slice copy without Cow intermediate. + let s = unsafe { slice::from_raw_parts(data_val.iov_base as *const u8, data_val.iov_len) }; Ok(s.to_vec()) } } @@ -190,10 +225,18 @@ impl<'a> TableObject<'a> for () { Ok(()) } - unsafe fn decode_val( + unsafe fn decode_val( + access: &'a A, _: *const MDBX_txn, _: ffi::MDBX_val, - ) -> ReadResult { + ) -> ReadResult> { + Ok(TxView::new((), access)) + } +} + +impl TableObjectOwned for () { + #[inline(always)] + unsafe fn decode_val_owned(_: ffi::MDBX_val) -> ReadResult { Ok(()) } } @@ -208,10 +251,19 @@ impl TableObject<'_> for ObjectLength { Ok(Self(data.len())) } - unsafe fn decode_val( + unsafe fn decode_val<'a, A: TxPtrAccess>( + access: &'a A, _tx: *const MDBX_txn, data_val: ffi::MDBX_val, - ) -> ReadResult { + ) -> ReadResult> { + Ok(TxView::new(Self(data_val.iov_len), access)) + } +} + +impl TableObjectOwned for ObjectLength { + #[inline(always)] + unsafe fn decode_val_owned(data_val: ffi::MDBX_val) -> ReadResult { + // Just read the length field directly. Ok(Self(data_val.iov_len)) } } @@ -234,10 +286,25 @@ impl<'a, const LEN: usize> TableObject<'a> for [u8; LEN] { Ok(a) } - unsafe fn decode_val( + unsafe fn decode_val( + access: &'a A, _tx: *const MDBX_txn, data_val: ffi::MDBX_val, - ) -> ReadResult { + ) -> ReadResult> { + // SAFETY: Caller ensures the tx is active, slice is valid. + if data_val.iov_len != LEN { + return Err(MdbxError::DecodeErrorLenDiff.into()); + } + let s = unsafe { slice::from_raw_parts(data_val.iov_base as *const u8, data_val.iov_len) }; + let mut a = [0; LEN]; + a[..].copy_from_slice(s); + Ok(TxView::new(a, access)) + } +} + +impl TableObjectOwned for [u8; LEN] { + #[inline(always)] + unsafe fn decode_val_owned(data_val: ffi::MDBX_val) -> ReadResult { // SAFETY: Caller ensures the tx is active, slice is valid. if data_val.iov_len != LEN { return Err(MdbxError::DecodeErrorLenDiff.into()); diff --git a/src/entries/mod.rs b/src/entries/mod.rs new file mode 100644 index 0000000..29c99e5 --- /dev/null +++ b/src/entries/mod.rs @@ -0,0 +1,31 @@ +//! Safe access to database entries. +//! +//! This module provides abstractions for working with database entries, +//! including serialization/deserialization via the [`TableObject`] trait +//! and safe views of borrowed data through [`TxView`]. +mod codec; +pub use codec::{ObjectLength, TableObject, TableObjectOwned}; + +mod view; +pub use view::TxView; + +use crate::TransactionKind; + +/// Synchronized table object view tied to a synchronized transaction. +pub type SyncView<'tx, K, T> = TxView<'tx, crate::tx::PtrSyncInner, T>; + +/// Unsynchronized table object view tied to an unsynchronized transaction. +pub type TableViewUnsync<'tx, K, T> = TxView<'tx, ::UnsyncAccess, T>; + +/// Synchronized key-value view tied to a synchronized transaction. +pub type SyncKvView<'tx, Kind, Key, Value> = (SyncView<'tx, Kind, Key>, SyncView<'tx, Kind, Value>); + +/// Unsynchronized key-value view tied to an unsynchronized transaction. +pub type UnsyncKvView<'tx, Kind, Key, Value> = + (TableViewUnsync<'tx, Kind, Key>, TableViewUnsync<'tx, Kind, Value>); + +/// Key-value view type. +pub type KvView<'tx, A, Key, Value> = (TxView<'tx, A, Key>, TxView<'tx, A, Value>); + +/// Optional KV pair view type. +pub type KvOpt<'tx, A, Key, Value> = Option>; diff --git a/src/entries/view.rs b/src/entries/view.rs new file mode 100644 index 0000000..c67390b --- /dev/null +++ b/src/entries/view.rs @@ -0,0 +1,278 @@ +//! A view of data borrowed from a transaction. +//! +//! This module provides [`TxView`], a wrapper that ensures transaction validity +//! is checked before accessing borrowed data. + +use crate::{ + MdbxError, MdbxResult, RW, ReadResult, TableObjectOwned, + tx::{PtrSyncInner, RwUnsync, TxPtrAccess}, +}; +use std::borrow::Cow; + +/// A view of data borrowed from a transaction. +/// +/// This wrapper ensures transaction validity is checked before accessing +/// the underlying data. For RW transactions and RO transactions without +/// the `read-tx-timeouts` feature, validity checks compile to no-ops. +/// +/// # Safety Rationale +/// +/// When the `read-tx-timeouts` feature is enabled, RO transactions can be +/// aborted asynchronously by a timeout thread. Data borrowed from the +/// transaction (like `Cow::Borrowed` slices) can become dangling if the +/// transaction times out while the borrowed data is still in use. +/// +/// `TxView` addresses this by: +/// 1. Holding a reference to the transaction's access type +/// 2. Checking transaction validity before returning the data +/// 3. Compiling to zero overhead when no runtime check is needed +/// +/// # Example +/// +/// ```ignore +/// let view = txn.get(db.dbi(), b"key")?; +/// if let Some(view) = view { +/// let data = view.try_get()?; +/// // Use data... +/// } +/// ``` +pub struct TxView<'tx, A, T = Cow<'tx, [u8]>> { + data: T, + access: &'tx A, +} + +impl<'tx, A, T> TxView<'tx, A, T> { + /// Creates a new `TxView`. + #[inline] + pub(crate) const fn new(data: T, access: &'tx A) -> Self { + Self { data, access } + } +} + +impl<'tx, A, T> TxView<'tx, A, T> +where + A: TxPtrAccess, + T: TableObjectOwned, +{ + /// Access the data by value. + pub fn into_owned(self) -> T { + self.data + } +} + +impl<'tx, A, T> TxView<'tx, A, T> +where + A: TxPtrAccess, +{ + /// Checks if data view is still valid. + /// + /// Returns `true` if the underlying transaction is still valid or if no + /// runtime validity check is needed (e.g., RW transactions cannot time + /// out). + #[inline(always)] + pub fn is_valid(&self) -> bool { + !A::HAS_RUNTIME_CHECK || self.access.valid() + } + + /// Enforce that the transaction is still valid. + #[inline(always)] + pub fn enforce_valid(&self) -> MdbxResult<()> { + if A::HAS_RUNTIME_CHECK && !self.access.valid() { + return Err(MdbxError::ReadTransactionTimeout); + } + Ok(()) + } + + /// Access the data after checking transaction validity. + /// + /// Returns `Err(MdbxError::ReadTransactionTimeout)` if the transaction + /// has timed out. + /// + /// For RW transactions and RO transactions without the `read-tx-timeouts` + /// feature, this check compiles to a no-op. + #[inline] + pub fn try_get(&self) -> MdbxResult<&T> { + self.enforce_valid()?; + Ok(&self.data) + } + + /// Access the data after checking transaction validity. + #[inline] + pub fn inspect(&self, f: F) -> ReadResult<()> + where + F: FnOnce(&T), + { + self.enforce_valid()?; + f(&self.data); + Ok(()) + } + + /// Map the inner data to another type while preserving transaction access. + /// + /// This is useful for transforming the data while still ensuring + /// transaction validity checks are in place. + #[inline] + pub fn map(self, f: F) -> ReadResult> + where + F: FnOnce(T) -> U, + { + self.enforce_valid()?; + Ok(TxView::new(f(self.data), self.access)) + } + + /// Map the inner data to another type that may fail, while preserving + /// transaction access. + #[inline] + pub fn flat_map(self, f: F) -> ReadResult> + where + F: FnOnce(T) -> ReadResult, + { + if A::HAS_RUNTIME_CHECK && !self.access.valid() { + return Err(MdbxError::ReadTransactionTimeout.into()); + } + Ok(TxView::new(f(self.data)?, self.access)) + } + + /// Access the data without validity check. + /// + /// # Safety + /// + /// The caller must ensure the transaction is still valid. Using the + /// returned reference after the transaction has been aborted or timed + /// out is undefined behavior. + #[inline] + pub const unsafe fn get_unchecked(&self) -> &T { + &self.data + } + + /// Consume the view and take ownership of the inner data. + /// + /// This is useful when you need to outlive the transaction or want to + /// avoid repeated validity checks. + /// + /// # Safety + /// + /// The caller must ensure the transaction is still valid. If the data + /// borrows from the transaction (e.g., `Cow::Borrowed` slices), using + /// the returned data after the transaction has been aborted or timed + /// out is undefined behavior. + #[inline] + pub unsafe fn into_inner(self) -> T { + self.data + } +} + +impl<'tx, A, T> TxView<'tx, A, T> +where + T: AsRef<[u8]>, + A: TxPtrAccess, +{ + /// Returns the length of the data. + pub fn try_len(&self) -> MdbxResult { + self.enforce_valid()?; + Ok(self.data.as_ref().len()) + } +} + +impl<'tx, A, T> Copy for TxView<'tx, A, T> +where + A: TxPtrAccess, + T: TableObjectOwned + Copy, +{ +} + +impl<'tx, A, T> Clone for TxView<'tx, A, T> +where + A: TxPtrAccess, + T: Clone + TableObjectOwned, +{ + fn clone(&self) -> Self { + Self { data: self.data.clone(), access: self.access } + } +} + +impl<'tx, A, T> TxView<'tx, A, T> +where + A: TxPtrAccess, + T: Clone, +{ + /// Clone the inner data after checking transaction validity. + /// + /// Returns `Err(MdbxError::ReadTransactionTimeout)` if the transaction + /// has timed out. + #[inline] + pub fn try_clone_inner(&self) -> MdbxResult { + self.enforce_valid()?; + Ok(self.data.clone()) + } +} + +impl<'tx, A, T> core::fmt::Debug for TxView<'tx, A, T> +where + A: TxPtrAccess, + T: core::fmt::Debug, +{ + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + // Check validity before displaying data to avoid showing stale data + if A::HAS_RUNTIME_CHECK && !self.access.valid() { + f.debug_struct("TxView").field("data", &"").finish() + } else { + f.debug_struct("TxView").field("data", &self.data).finish() + } + } +} + +// TxView is Send if both T is Send and A is Sync +unsafe impl<'tx, A, T> Send for TxView<'tx, A, T> +where + A: TxPtrAccess + Sync, + T: Send, +{ +} + +// TxView is Sync if both T is Sync and A is Sync +unsafe impl<'tx, A, T> Sync for TxView<'tx, A, T> +where + A: TxPtrAccess + Sync, + T: Sync, +{ +} + +macro_rules! impl_direct_access { + ($ty:ty) => { + + impl<'tx, T> TxView<'tx, $ty, T> + { + /// Access the data without validity check. + // Safe because RW transactions cannot time out. + #[inline] + pub const fn get(&self) -> &T { + &self.data + } + } + + impl AsRef for TxView<'_, $ty, T> { + fn as_ref(&self) -> &T { + &self.data + } + } + + impl std::ops::Deref for TxView<'_, $ty, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.data + } + } + + }; + ($($ty:ty),+) => { + $(impl_direct_access!($ty);)+ + }; +} + +impl_direct_access!(RwUnsync, PtrSyncInner); + +// When read-tx-timeouts feature is disabled, RO transactions cannot time out. +#[cfg(not(feature = "read-tx-timeouts"))] +impl_direct_access!(crate::tx::RoGuard, crate::tx::PtrSyncInner); diff --git a/src/lib.rs b/src/lib.rs index 3ba2aad..f820f9c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -42,7 +42,7 @@ //! // Read data in a read-only transaction //! let txn = env.begin_ro_txn()?; //! let db = txn.open_db(None)?; -//! let value: Option> = txn.get(db.dbi(), b"hello").expect("read failed"); +//! let value: Option> = txn.get_owned(db.dbi(), b"hello").expect("read failed"); //! assert_eq!(value.as_deref(), Some(b"world".as_slice())); //! //! Ok(()) @@ -67,6 +67,47 @@ //! - [`Cursor`]: Enables iteration and positioned access within a database. //! Created via [`TxSync::cursor()`] or [`TxUnsync::cursor()`]. //! +//! # Owned vs Borrowed APIs +//! +//! This crate provides two styles of read operations: +//! +//! - **Owned methods** (e.g., `get_owned`, `first_owned`, `owned_next`) return +//! data directly. Use these by default. +//! - **Borrowed methods** (e.g., `get`, `first`, `borrow_next`) return +//! [`TxView`] wrappers that require validity checks before access. +//! +//! **We recommend using `_owned` methods unless zero-copy deserialization is +//! strictly required.** The `_owned` variants: +//! - Return data directly without wrapper types +//! - Produce simpler, more readable code +//! - Allow data to safely outlive the transaction +//! +//! Non-owned methods return [`TxView`], which guards borrowed data against +//! transaction timeouts (when `read-tx-timeouts` is enabled). While safe, this +//! can make code unwieldy: +//! +//! ```ignore +//! // With TxView (non-owned) - requires unwrapping +//! let view = cursor.first()?; +//! if let Some((key, value)) = view { +//! let k = key.try_get()?; +//! let v = value.try_get()?; +//! // use k, v... +//! } +//! +//! // With owned - direct access +//! let pair = cursor.first_owned::, Vec>()?; +//! if let Some((key, value)) = pair { +//! // use key, value directly... +//! } +//! ``` +//! +//! Use non-owned methods only when: +//! - You need zero-copy deserialization for performance +//! - Data will be used briefly within the transaction scope +//! - You're implementing [`TableObject`] with borrowed data (e.g., `Cow<'a, +//! [u8]>`) +//! //! # Feature Flags //! //! - `return-borrowed`: When enabled, iterators return borrowed data @@ -156,8 +197,8 @@ pub extern crate signet_mdbx_sys as ffi; -mod codec; -pub use codec::{ObjectLength, TableObject, TableObjectOwned}; +pub mod entries; +pub use entries::{ObjectLength, TableObject, TableObjectOwned, TxView}; #[cfg(feature = "read-tx-timeouts")] pub use crate::sys::read_transactions::MaxReadTransactionDuration; diff --git a/src/sys/environment.rs b/src/sys/environment.rs index cde19d0..6af67b7 100644 --- a/src/sys/environment.rs +++ b/src/sys/environment.rs @@ -266,12 +266,15 @@ impl Environment { let mut cursor = txn.cursor(db)?; let mut iter = cursor.iter_slices(); - while let Some((_key, value)) = iter.borrow_next()? { - if value.len() < size_of::() { - return Err(MdbxError::Corrupted.into()); - } - let s = &value[..size_of::()]; - freelist += NativeEndian::read_u32(s) as usize; + while let Some((_, value)) = iter.borrow_next()? { + value.flat_map(|value| { + if value.len() < size_of::() { + return Err(MdbxError::Corrupted.into()); + } + let s = &value[..size_of::()]; + freelist += NativeEndian::read_u32(s) as usize; + Ok(()) + })?; } Ok(freelist) diff --git a/src/tx/access.rs b/src/tx/access.rs index c6a938b..524818e 100644 --- a/src/tx/access.rs +++ b/src/tx/access.rs @@ -13,6 +13,47 @@ use std::{ }, }; +/// Guard that keeps an RO unsync transaction alive during data access. +/// +/// This guard holds an `Arc`, preventing the timeout thread from +/// aborting the transaction while the guard exists. The guard is acquired +/// via [`TxPtrAccess::try_guard`] and should be held while accessing data +/// borrowed from the transaction. +#[cfg(feature = "read-tx-timeouts")] +pub struct RoTxGuard { + _arc: Arc, +} + +#[cfg(feature = "read-tx-timeouts")] +impl fmt::Debug for RoTxGuard { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("RoTxGuard").finish() + } +} + +/// Guard type for RO unsync transactions when timeouts are disabled. +/// +/// This is a zero-sized type that provides no runtime overhead. +#[cfg(not(feature = "read-tx-timeouts"))] +#[allow(unreachable_pub)] +pub type RoTxGuard = (); + +/// Guard that keeps a sync transaction valid during data access. +/// +/// This guard holds the transaction's mutex lock, preventing the timeout +/// monitor from resetting the transaction while the guard exists. The guard +/// is acquired via [`TxPtrAccess::try_guard`] and should be held while +/// accessing data borrowed from the transaction. +pub struct SyncTxGuard<'a> { + _guard: MutexGuard<'a, bool>, +} + +impl fmt::Debug for SyncTxGuard<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SyncTxGuard").finish() + } +} + mod sealed { use super::*; @@ -30,8 +71,35 @@ mod sealed { /// are stored for read-only and read-write transactions. It ensures that /// the transaction pointer can be accessed safely, respecting timeouts /// and ownership semantics. -#[allow(unreachable_pub)] +#[auto_impl::auto_impl(&, &mut, Box, Arc)] pub trait TxPtrAccess: fmt::Debug + sealed::Sealed { + /// Whether this access type has runtime timeout checks. + const HAS_RUNTIME_CHECK: bool = false; + + /// Guard type that prevents transaction invalidation while held. + /// + /// For transaction types that can timeout (RO with `read-tx-timeouts`), + /// this holds resources that keep the transaction valid. For transaction + /// types that cannot timeout (RW), this is `()`. + type Guard<'a> + where + Self: 'a; + + /// Try to acquire a guard that prevents transaction invalidation. + /// + /// Returns `Err(MdbxError::ReadTransactionTimeout)` if the transaction + /// has already timed out. + /// + /// The returned guard should be held while accessing data borrowed from + /// the transaction. While the guard exists, the transaction cannot be + /// invalidated by a timeout. + fn try_guard(&self) -> MdbxResult>; + + /// Check if the transaction is still valid. + fn valid(&self) -> bool { + true + } + /// Execute a closure with the transaction pointer. fn with_txn_ptr(&self, f: F) -> MdbxResult where @@ -76,6 +144,13 @@ impl RwUnsync { } impl TxPtrAccess for RwUnsync { + type Guard<'a> = (); + + fn try_guard(&self) -> MdbxResult> { + // RW transactions cannot timeout, so guard is always available. + Ok(()) + } + fn with_txn_ptr(&self, f: F) -> MdbxResult where F: FnOnce(*mut ffi::MDBX_txn) -> R, @@ -263,6 +338,43 @@ impl RoGuard { } impl TxPtrAccess for RoGuard { + const HAS_RUNTIME_CHECK: bool = cfg!(feature = "read-tx-timeouts"); + + type Guard<'a> = RoTxGuard; + + fn try_guard(&self) -> MdbxResult> { + #[cfg(feature = "read-tx-timeouts")] + { + self.try_ref() + .map(|arc| RoTxGuard { _arc: arc }) + .ok_or(crate::MdbxError::ReadTransactionTimeout) + } + + #[cfg(not(feature = "read-tx-timeouts"))] + { + Ok(()) + } + } + + /// Check if the pointer is valid. + /// + /// # Warning: + /// + /// Relying on this method for safety is discouraged, as the transaction + /// may time out immediately after this check. Prefer using + /// `with_txn_ptr` or `try_guard` to ensure validity during access. + fn valid(&self) -> bool { + #[cfg(feature = "read-tx-timeouts")] + { + self.try_ref().is_some() + } + + #[cfg(not(feature = "read-tx-timeouts"))] + { + true + } + } + /// Execute a closure with the transaction pointer, failing if timed out. /// /// Calling this function will ensure that the transaction is still valid @@ -501,6 +613,33 @@ impl PtrSyncInner { } impl TxPtrAccess for PtrSyncInner { + const HAS_RUNTIME_CHECK: bool = cfg!(feature = "read-tx-timeouts"); + + type Guard<'a> = SyncTxGuard<'a>; + + fn try_guard(&self) -> MdbxResult> { + let guard = self.lock(); + // For RW transactions, timeout flag is never set, so this always succeeds. + // For RO transactions with timeouts, check if timed out. + if *guard { + return Err(crate::MdbxError::ReadTransactionTimeout); + } + Ok(SyncTxGuard { _guard: guard }) + } + + fn valid(&self) -> bool { + #[cfg(feature = "read-tx-timeouts")] + { + let timeout_flag = self.lock(); + !*timeout_flag + } + + #[cfg(not(feature = "read-tx-timeouts"))] + { + true + } + } + fn with_txn_ptr(&self, f: F) -> MdbxResult where F: FnOnce(*mut ffi::MDBX_txn) -> R, diff --git a/src/tx/cursor.rs b/src/tx/cursor.rs index 48ec816..e980e33 100644 --- a/src/tx/cursor.rs +++ b/src/tx/cursor.rs @@ -1,5 +1,9 @@ +//! Cursor for navigating database items. + use crate::{ - Database, MdbxError, RW, ReadResult, TableObject, TransactionKind, codec_try_optional, + Database, MdbxError, RW, ReadResult, TableObject, TableObjectOwned, TransactionKind, TxView, + codec_try_optional, + entries::KvOpt, error::{MdbxResult, mdbx_result}, flags::*, tx::{ @@ -15,6 +19,69 @@ use ffi::{ }; use std::{ffi::c_void, fmt, marker::PhantomData, ptr}; +/// Helper struct to make [`Cursor::get`] return values more readable. +/// The meaning of the flag is operation-dependent and corresponds to +/// `MDBX_RESULT_TRUE` (true) and `MDBX_RESULT_SUCCESS` (false). +/// +/// Typically, `true` indicates that no matching item was found, or for range/ +/// bound operations that an inexact match was found. +struct FlaggedGet<'a, A, Key, Value> +where + A: TxPtrAccess, + Key: TableObject<'a>, + Value: TableObject<'a>, +{ + pub mdbx_result: bool, + pub key: Option>, + pub value: TxView<'a, A, Value>, +} + +/// Helper struct to make [`Cursor`] return values more readable. +/// The meaning of the flag is operation-dependent and corresponds to +/// `MDBX_RESULT_TRUE` (true) and `MDBX_RESULT_SUCCESS` (false). +/// +/// Typically, `true` indicates that no matching item was found, or for range/ +/// bound operations that an inexact match was found. +#[derive(Debug)] +pub struct FlaggedKv<'a, A, Key, Value> +where + A: TxPtrAccess, + Key: TableObject<'a>, + Value: TableObject<'a>, +{ + /// The flag indicating the result of the cursor operation. This will + /// correspond to `MDBX_RESULT_TRUE` (true) or `MDBX_RESULT_SUCCESS` + /// (false). + pub mdbx_result: bool, + /// The key returned by the cursor operation. + pub key: TxView<'a, A, Key>, + /// The value returned by the cursor operation. + pub value: TxView<'a, A, Value>, +} + +/// Helper struct to make [`Cursor`] return values more readable. +/// The meaning of the flag is operation-dependent and corresponds to +/// `MDBX_RESULT_TRUE` (true) and `MDBX_RESULT_SUCCESS` (false). +/// +/// Typically, `true` indicates that no matching item was found, or for range/ +/// bound operations that an inexact match was found. +#[derive(Debug)] +pub struct FlaggedKvOpt<'a, A, Key, Value> +where + A: TxPtrAccess, + Key: TableObject<'a>, + Value: TableObject<'a>, +{ + /// The flag indicating the result of the cursor operation. This will + /// correspond to `MDBX_RESULT_TRUE` (true) or `MDBX_RESULT_SUCCESS` + /// (false). + pub mdbx_result: bool, + /// The optional key returned by the cursor operation. + pub key: Option>, + /// The optional value returned by the cursor operation. + pub value: Option>, +} + /// A cursor for navigating the items within a database. /// /// The cursor is generic over the transaction kind `K` and the access type `A`. @@ -132,12 +199,18 @@ where /// Retrieves a key/data pair from the cursor. Depending on the cursor op, /// the current key may be returned. + /// + /// The boolean in the returned tuple indicates the result of the operation: + /// - `true` - MDBX_RESULT_TRUE was returned + /// - `false` - MDBX_RESULT_SUCCESS was returned + /// + /// The meaning of this boolean depends on the cursor operation used. fn get( &self, key: Option<&[u8]>, data: Option<&[u8]>, op: MDBX_cursor_op, - ) -> ReadResult<(Option, Value, bool)> + ) -> ReadResult> where Key: TableObject<'tx>, Value: TableObject<'tx>, @@ -147,7 +220,8 @@ where let key_ptr = key_val.iov_base; let data_ptr = data_val.iov_base; - self.access.with_txn_ptr(|txn| { + let access = &self.access; + access.with_txn_ptr(|txn| { // SAFETY: // The cursor is valid as long as self is alive. // The transaction is also valid as long as self is alive. @@ -155,7 +229,7 @@ where // transaction is alive, provided the page is not dirty. // decode_val checks for dirty pages and copies data if needed. unsafe { - let v = mdbx_result(ffi::mdbx_cursor_get( + let mdbx_result = mdbx_result(ffi::mdbx_cursor_get( self.cursor, &mut key_val, &mut data_val, @@ -167,11 +241,11 @@ where if ptr::eq(key_ptr, key_val.iov_base) { None } else { - Some(Key::decode_val::(txn, key_val)?) + Some(Key::decode_val::(access, txn, key_val)?) } }; - let data_out = Value::decode_val::(txn, data_val)?; - Ok((key_out, data_out, v)) + let data_out = Value::decode_val::(access, txn, data_val)?; + Ok(FlaggedGet { mdbx_result, key: key_out, value: data_out }) } })? } @@ -181,15 +255,16 @@ where key: Option<&[u8]>, data: Option<&[u8]>, op: MDBX_cursor_op, - ) -> ReadResult> + ) -> ReadResult>> where Value: TableObject<'tx>, { - let (_, v, result_true) = codec_try_optional!(self.get::<(), Value>(key, data, op)); - if result_true { + let output = codec_try_optional!(self.get::<(), Value>(key, data, op)); + // If MDBX_RESULT_TRUE, no value was found. + if output.mdbx_result { return Ok(None); } - Ok(Some(v)) + Ok(Some(output.value)) } fn get_full( @@ -197,20 +272,23 @@ where key: Option<&[u8]>, data: Option<&[u8]>, op: MDBX_cursor_op, - ) -> ReadResult> + ) -> ReadResult> where Key: TableObject<'tx>, Value: TableObject<'tx>, { - let (k, v, result_true) = codec_try_optional!(self.get(key, data, op)); - if result_true { + let output = codec_try_optional!(self.get(key, data, op)); + + // If MDBX_RESULT_TRUE, no key/value pair was found. Thus return None. + if output.mdbx_result { return Ok(None); } - Ok(Some((k.unwrap(), v))) + // If we got MDBX_RESULT_SUCCESS, return the key/value pair. + Ok(Some((output.key.unwrap(), output.value))) } /// Position at first key/data item. - pub fn first(&mut self) -> ReadResult> + pub fn first(&mut self) -> ReadResult> where Key: TableObject<'tx>, Value: TableObject<'tx>, @@ -222,7 +300,7 @@ where /// /// Returns [`MdbxError::RequiresDupSort`] if the database does not have the /// [`DatabaseFlags::DUP_SORT`] flag set. - pub fn first_dup(&mut self) -> ReadResult> + pub fn first_dup(&mut self) -> ReadResult>> where Value: TableObject<'tx>, { @@ -234,7 +312,11 @@ where /// /// Returns [`MdbxError::RequiresDupSort`] if the database does not have the /// [`DatabaseFlags::DUP_SORT`] flag set. - pub fn get_both(&mut self, k: &[u8], v: &[u8]) -> ReadResult> + pub fn get_both( + &mut self, + k: &[u8], + v: &[u8], + ) -> ReadResult>> where Value: TableObject<'tx>, { @@ -247,7 +329,11 @@ where /// /// Returns [`MdbxError::RequiresDupSort`] if the database does not have the /// [`DatabaseFlags::DUP_SORT`] flag set. - pub fn get_both_range(&mut self, k: &[u8], v: &[u8]) -> ReadResult> + pub fn get_both_range( + &mut self, + k: &[u8], + v: &[u8], + ) -> ReadResult>> where Value: TableObject<'tx>, { @@ -256,7 +342,7 @@ where } /// Return key/data at current cursor position. - pub fn get_current(&mut self) -> ReadResult> + pub fn get_current(&mut self) -> ReadResult> where Key: TableObject<'tx>, Value: TableObject<'tx>, @@ -269,7 +355,7 @@ where /// /// Returns [`MdbxError::RequiresDupFixed`] if the database does not have the /// [`DatabaseFlags::DUP_FIXED`] flag set. - pub fn get_multiple(&mut self) -> ReadResult> + pub fn get_multiple(&mut self) -> ReadResult>> where Value: TableObject<'tx>, { @@ -278,7 +364,7 @@ where } /// Position at last key/data item. - pub fn last(&mut self) -> ReadResult> + pub fn last(&mut self) -> ReadResult> where Key: TableObject<'tx>, Value: TableObject<'tx>, @@ -290,7 +376,7 @@ where /// /// Returns [`MdbxError::RequiresDupSort`] if the database does not have the /// [`DatabaseFlags::DUP_SORT`] flag set. - pub fn last_dup(&mut self) -> ReadResult> + pub fn last_dup(&mut self) -> ReadResult>> where Value: TableObject<'tx>, { @@ -300,7 +386,7 @@ where /// Position at next data item #[expect(clippy::should_implement_trait)] - pub fn next(&mut self) -> ReadResult> + pub fn next(&mut self) -> ReadResult> where Key: TableObject<'tx>, Value: TableObject<'tx>, @@ -312,7 +398,7 @@ where /// /// Returns [`MdbxError::RequiresDupSort`] if the database does not have the /// [`DatabaseFlags::DUP_SORT`] flag set. - pub fn next_dup(&mut self) -> ReadResult> + pub fn next_dup(&mut self) -> ReadResult> where Key: TableObject<'tx>, Value: TableObject<'tx>, @@ -326,7 +412,7 @@ where /// /// Returns [`MdbxError::RequiresDupFixed`] if the database does not have the /// [`DatabaseFlags::DUP_FIXED`] flag set. - pub fn next_multiple(&mut self) -> ReadResult> + pub fn next_multiple(&mut self) -> ReadResult> where Key: TableObject<'tx>, Value: TableObject<'tx>, @@ -336,7 +422,7 @@ where } /// Position at first data item of next key. - pub fn next_nodup(&mut self) -> ReadResult> + pub fn next_nodup(&mut self) -> ReadResult> where Key: TableObject<'tx>, Value: TableObject<'tx>, @@ -345,7 +431,7 @@ where } /// Position at previous data item. - pub fn prev(&mut self) -> ReadResult> + pub fn prev(&mut self) -> ReadResult> where Key: TableObject<'tx>, Value: TableObject<'tx>, @@ -357,7 +443,7 @@ where /// /// Returns [`MdbxError::RequiresDupSort`] if the database does not have the /// [`DatabaseFlags::DUP_SORT`] flag set. - pub fn prev_dup(&mut self) -> ReadResult> + pub fn prev_dup(&mut self) -> ReadResult> where Key: TableObject<'tx>, Value: TableObject<'tx>, @@ -367,7 +453,7 @@ where } /// Position at last data item of previous key. - pub fn prev_nodup(&mut self) -> ReadResult> + pub fn prev_nodup(&mut self) -> ReadResult> where Key: TableObject<'tx>, Value: TableObject<'tx>, @@ -376,7 +462,7 @@ where } /// Position at specified key. - pub fn set(&mut self, key: &[u8]) -> ReadResult> + pub fn set(&mut self, key: &[u8]) -> ReadResult>> where Value: TableObject<'tx>, { @@ -385,7 +471,7 @@ where } /// Position at specified key, return both key and data. - pub fn set_key(&mut self, key: &[u8]) -> ReadResult> + pub fn set_key(&mut self, key: &[u8]) -> ReadResult> where Key: TableObject<'tx>, Value: TableObject<'tx>, @@ -395,13 +481,28 @@ where } /// Position at first key greater than or equal to specified key. - pub fn set_range(&mut self, key: &[u8]) -> ReadResult> + /// + /// If an exact match is found, the [`FlaggedKvOpt::mdbx_result`]` is + /// `false`. If a greater key is found, it is `true`. + pub fn set_range( + &mut self, + key: &[u8], + ) -> ReadResult>> where Key: TableObject<'tx>, Value: TableObject<'tx>, { assertions::debug_assert_integer_key(self.db.flags(), key); - self.get_full(Some(key), None, MDBX_SET_RANGE) + + let FlaggedGet { mdbx_result, key: Some(key), value } = + codec_try_optional!(self.get(Some(key), None, MDBX_SET_RANGE)) + else { + unreachable!( + "MDBX_SET_RANGE always positions cursor if DB is non-empty. Empty case is caught by codec_try_optional" + ); + }; + + Ok(Some(FlaggedKv { mdbx_result, key, value })) } /// [`DatabaseFlags::DUP_FIXED`]-only: Position at previous page and return up to a page of @@ -409,7 +510,7 @@ where /// /// Returns [`MdbxError::RequiresDupFixed`] if the database does not have the /// [`DatabaseFlags::DUP_FIXED`] flag set. - pub fn prev_multiple(&mut self) -> ReadResult> + pub fn prev_multiple(&mut self) -> ReadResult> where Key: TableObject<'tx>, Value: TableObject<'tx>, @@ -418,27 +519,35 @@ where self.get_full(None, None, MDBX_PREV_MULTIPLE) } - /// Position at first key-value pair greater than or equal to specified, return both key and - /// data, and the return code depends on an exact match. + /// Position at first key-value pair greater than or equal to specified, + /// return both key and data, and the return code depends on an exact match. /// - /// For non DupSort-ed collections this works the same as [`Self::set_range()`], but returns - /// [false] if key found exactly and [true] if greater key was found. + /// For non DupSort-ed collections this works the same as + /// [`Self::set_range()`], but returns `false` if key found exactly and + /// `true` if greater key was found. /// - /// For DupSort-ed a data value is taken into account for duplicates, i.e. for a pairs/tuples of - /// a key and an each data value of duplicates. Returns [false] if key-value pair found - /// exactly and [true] if the next pair was returned. + /// For DupSort-ed a data value is taken into account for duplicates, i.e. + /// for a pairs/tuples of a key and an each data value of duplicates. + /// + /// Returns `false` if key-value pair found exactly and `true` if the next + /// pair was returned. pub fn set_lowerbound( &mut self, key: &[u8], - ) -> ReadResult> + ) -> ReadResult>> where Key: TableObject<'tx>, Value: TableObject<'tx>, { assertions::debug_assert_integer_key(self.db.flags(), key); - let (k, v, found) = codec_try_optional!(self.get(Some(key), None, MDBX_SET_LOWERBOUND)); - - Ok(Some((found, k.unwrap(), v))) + let FlaggedGet { mdbx_result, key: Some(key), value } = + codec_try_optional!(self.get(Some(key), None, MDBX_SET_LOWERBOUND)) + else { + unreachable!( + "MDBX_SET_LOWERBOUND always positions cursor if DB is non-empty. Empty case is caught by codec_try_optional" + ); + }; + Ok(Some(FlaggedKv { mdbx_result, key, value })) } /// Returns an iterator over database items. @@ -503,6 +612,9 @@ where /// Iterate over database items starting from the given key. /// + /// This will position the cursor at the first key greater than or equal to + /// the given key, and begin the iteration there. + /// /// For databases with duplicate data items ([`DatabaseFlags::DUP_SORT`]), /// the duplicate data items of each key will be returned before moving on /// to the next key. @@ -515,11 +627,13 @@ where Key: TableObject<'tx>, Value: TableObject<'tx>, { - let Some(first) = self.set_range::(key)? else { + // We can discard the mdbx_result flag here, as the iterator is + // explicitly starting at or after the given key. + let Some(FlaggedKv { key, value, .. }) = self.set_range(key)? else { return Ok(Iter::end_from_ref(self)); }; - Ok(Iter::from_ref_with(self, first)) + Ok(Iter::from_ref_with(self, (key, value))) } /// Iterate over duplicate database items. @@ -576,11 +690,11 @@ where Key: TableObject<'tx>, Value: TableObject<'tx>, { - let Some(first) = self.set_range(key)? else { + let Some(FlaggedKv { key, value, .. }) = self.set_range(key)? else { return Ok(IterDup::end_from_ref(self)); }; - Ok(IterDup::from_ref_with(self, first)) + Ok(IterDup::from_ref_with(self, (key, value))) } /// Iterate over the duplicates of the item in the database with the given @@ -600,6 +714,214 @@ where Ok(IterDupVals::from_ref_with(self, first)) } + + // ========================================================================= + // Owned variants - return owned values directly + // ========================================================================= + + /// Position at specified key, returning an owned value. + pub fn set_owned(&mut self, key: &[u8]) -> ReadResult> + where + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.set(key).map(|opt| opt.map(TxView::into_owned)) + } + + /// [`DatabaseFlags::DUP_SORT`]-only: Position at first data item of current key, + /// returning an owned value. + pub fn first_dup_owned(&mut self) -> ReadResult> + where + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.first_dup().map(|opt| opt.map(TxView::into_owned)) + } + + /// [`DatabaseFlags::DUP_SORT`]-only: Position at last data item of current key, + /// returning an owned value. + pub fn last_dup_owned(&mut self) -> ReadResult> + where + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.last_dup().map(|opt| opt.map(TxView::into_owned)) + } + + /// [`DatabaseFlags::DUP_SORT`]-only: Position at key/data pair, returning an owned value. + pub fn get_both_owned(&mut self, k: &[u8], v: &[u8]) -> ReadResult> + where + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.get_both(k, v).map(|opt| opt.map(TxView::into_owned)) + } + + /// [`DatabaseFlags::DUP_SORT`]-only: Position at given key and at first data greater + /// than or equal to specified data, returning an owned value. + pub fn get_both_range_owned(&mut self, k: &[u8], v: &[u8]) -> ReadResult> + where + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.get_both_range(k, v).map(|opt| opt.map(TxView::into_owned)) + } + + /// [`DatabaseFlags::DUP_FIXED`]-only: Return up to a page of duplicate data items, + /// returning an owned value. + pub fn get_multiple_owned(&mut self) -> ReadResult> + where + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.get_multiple().map(|opt| opt.map(TxView::into_owned)) + } + + /// Position at first key/data item, returning owned values. + pub fn first_owned(&mut self) -> ReadResult> + where + Key: TableObjectOwned + for<'a> TableObject<'a>, + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.first().map(|opt| opt.map(|(k, v)| (k.into_owned(), v.into_owned()))) + } + + /// Position at last key/data item, returning owned values. + pub fn last_owned(&mut self) -> ReadResult> + where + Key: TableObjectOwned + for<'a> TableObject<'a>, + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.last().map(|opt| opt.map(|(k, v)| (k.into_owned(), v.into_owned()))) + } + + /// Position at next data item, returning owned values. + pub fn next_owned(&mut self) -> ReadResult> + where + Key: TableObjectOwned + for<'a> TableObject<'a>, + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.next().map(|opt| opt.map(|(k, v)| (k.into_owned(), v.into_owned()))) + } + + /// Position at previous data item, returning owned values. + pub fn prev_owned(&mut self) -> ReadResult> + where + Key: TableObjectOwned + for<'a> TableObject<'a>, + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.prev().map(|opt| opt.map(|(k, v)| (k.into_owned(), v.into_owned()))) + } + + /// Return key/data at current cursor position, returning owned values. + pub fn get_current_owned(&mut self) -> ReadResult> + where + Key: TableObjectOwned + for<'a> TableObject<'a>, + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.get_current().map(|opt| opt.map(|(k, v)| (k.into_owned(), v.into_owned()))) + } + + /// Position at specified key, return both key and data as owned values. + pub fn set_key_owned(&mut self, key: &[u8]) -> ReadResult> + where + Key: TableObjectOwned + for<'a> TableObject<'a>, + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.set_key(key).map(|opt| opt.map(|(k, v)| (k.into_owned(), v.into_owned()))) + } + + /// [`DatabaseFlags::DUP_SORT`]-only: Position at next data item of current key, + /// returning owned values. + pub fn next_dup_owned(&mut self) -> ReadResult> + where + Key: TableObjectOwned + for<'a> TableObject<'a>, + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.next_dup().map(|opt| opt.map(|(k, v)| (k.into_owned(), v.into_owned()))) + } + + /// [`DatabaseFlags::DUP_SORT`]-only: Position at previous data item of current key, + /// returning owned values. + pub fn prev_dup_owned(&mut self) -> ReadResult> + where + Key: TableObjectOwned + for<'a> TableObject<'a>, + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.prev_dup().map(|opt| opt.map(|(k, v)| (k.into_owned(), v.into_owned()))) + } + + /// Position at first data item of next key, returning owned values. + pub fn next_nodup_owned(&mut self) -> ReadResult> + where + Key: TableObjectOwned + for<'a> TableObject<'a>, + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.next_nodup().map(|opt| opt.map(|(k, v)| (k.into_owned(), v.into_owned()))) + } + + /// Position at last data item of previous key, returning owned values. + pub fn prev_nodup_owned(&mut self) -> ReadResult> + where + Key: TableObjectOwned + for<'a> TableObject<'a>, + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.prev_nodup().map(|opt| opt.map(|(k, v)| (k.into_owned(), v.into_owned()))) + } + + /// [`DatabaseFlags::DUP_FIXED`]-only: Return up to a page of duplicate data items + /// from next cursor position, returning owned values. + pub fn next_multiple_owned(&mut self) -> ReadResult> + where + Key: TableObjectOwned + for<'a> TableObject<'a>, + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.next_multiple().map(|opt| opt.map(|(k, v)| (k.into_owned(), v.into_owned()))) + } + + /// [`DatabaseFlags::DUP_FIXED`]-only: Position at previous page and return up to a page + /// of duplicate data items, returning owned values. + pub fn prev_multiple_owned(&mut self) -> ReadResult> + where + Key: TableObjectOwned + for<'a> TableObject<'a>, + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.prev_multiple().map(|opt| opt.map(|(k, v)| (k.into_owned(), v.into_owned()))) + } + + /// Position at first key greater than or equal to specified key, returning + /// owned values with a flag indicating exact match. + /// + /// Returns a tuple of `(mdbx_result, key, value)` where `mdbx_result` is + /// `false` for an exact match and `true` for a greater key. + pub fn set_range_owned( + &mut self, + key: &[u8], + ) -> ReadResult> + where + Key: TableObjectOwned + for<'a> TableObject<'a>, + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.set_range(key).map(|opt| { + opt.map(|flagged| { + (flagged.mdbx_result, flagged.key.into_owned(), flagged.value.into_owned()) + }) + }) + } + + /// Position at first key-value pair greater than or equal to specified, + /// returning owned values with a flag indicating exact match. + /// + /// Returns a tuple of `(mdbx_result, key, value)` where `mdbx_result` is + /// `false` for an exact match and `true` for the next pair. + pub fn set_lowerbound_owned( + &mut self, + key: &[u8], + ) -> ReadResult> + where + Key: TableObjectOwned + for<'a> TableObject<'a>, + Value: TableObjectOwned + for<'a> TableObject<'a>, + { + self.set_lowerbound(key).map(|opt| { + opt.map(|flagged| { + (flagged.mdbx_result, flagged.key.into_owned(), flagged.value.into_owned()) + }) + }) + } } impl<'tx, A> Cursor<'tx, RW, A> diff --git a/src/tx/iter.rs b/src/tx/iter.rs index fa68d86..1d2848b 100644 --- a/src/tx/iter.rs +++ b/src/tx/iter.rs @@ -20,10 +20,10 @@ //! [`TableObject<'tx>`](crate::TableObject). This can avoid allocations //! when using `Cow<'tx, [u8]>`. //! -//! - [`owned_next()`](Iter::owned_next): Returns owned data. Requires +//! - [`next_owned()`](Iter::next_owned): Returns owned data. Requires //! [`TableObjectOwned`]. Always safe but may allocate. //! -//! The standard [`Iterator`] trait is implemented via `owned_next()`. +//! The standard [`Iterator`] trait is implemented via `next_owned()`. //! //! # Dirty Page Handling //! @@ -57,7 +57,9 @@ use crate::{ Cursor, MdbxError, ReadResult, TableObject, TableObjectOwned, TransactionKind, - error::mdbx_result, tx::TxPtrAccess, + entries::{KvOpt, KvView}, + error::mdbx_result, + tx::TxPtrAccess, }; use std::{borrow::Cow, marker::PhantomData, ptr}; @@ -100,7 +102,7 @@ pub struct Iter< > { cursor: Cow<'cur, Cursor<'tx, K, A>>, /// Pre-fetched value from cursor positioning, yielded before calling FFI. - pending: Option<(Key, Value)>, + pending: KvOpt<'tx, A, Key, Value>, /// When true, the iterator is exhausted and will always return `None`. exhausted: bool, _marker: PhantomData (Key, Value)>, @@ -150,19 +152,28 @@ where /// Create a new iterator from the given cursor, first yielding the /// provided key/value pair. - pub(crate) fn new_with(cursor: Cow<'cur, Cursor<'tx, K, A>>, first: (Key, Value)) -> Self { + pub(crate) fn new_with( + cursor: Cow<'cur, Cursor<'tx, K, A>>, + first: KvView<'tx, A, Key, Value>, + ) -> Self { Iter { cursor, pending: Some(first), exhausted: false, _marker: PhantomData } } /// Create a new iterator from a mutable reference to the given cursor, /// first yielding the provided key/value pair. - pub(crate) fn from_ref_with(cursor: &'cur mut Cursor<'tx, K, A>, first: (Key, Value)) -> Self { + pub(crate) fn from_ref_with( + cursor: &'cur mut Cursor<'tx, K, A>, + first: KvView<'tx, A, Key, Value>, + ) -> Self { Self::new_with(Cow::Borrowed(cursor), first) } /// Create a new iterator from an owned cursor, first yielding the /// provided key/value pair. - pub(crate) fn from_owned_with(cursor: Cursor<'tx, K, A>, first: (Key, Value)) -> Self + pub(crate) fn from_owned_with( + cursor: Cursor<'tx, K, A>, + first: KvView<'tx, A, Key, Value>, + ) -> Self where A: Sized, { @@ -170,22 +181,48 @@ where } } -impl Iter<'_, '_, K, A, Key, Value, OP> +impl<'tx, K, A, Key, Value, const OP: u32> Iter<'tx, '_, K, A, Key, Value, OP> where K: TransactionKind, A: TxPtrAccess, Key: TableObjectOwned, Value: TableObjectOwned, { + /// Execute the MDBX operation and decode directly to owned types. + /// + /// This bypasses `TxView` construction for better performance when + /// only owned values are needed. + fn execute_op_owned(&self) -> ReadResult> { + let mut key = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; + let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; + + self.cursor.access().with_txn_ptr(|_| { + let res = + unsafe { ffi::mdbx_cursor_get(self.cursor.cursor(), &mut key, &mut data, OP) }; + + match res { + ffi::MDBX_SUCCESS => unsafe { + let key = Key::decode_val_owned(key)?; + let data = Value::decode_val_owned(data)?; + Ok(Some((key, data))) + }, + ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA | ffi::MDBX_RESULT_TRUE => Ok(None), + other => Err(MdbxError::from_err_code(other).into()), + } + })? + } + /// Own the next key/value pair from the iterator. - pub fn owned_next(&mut self) -> ReadResult> { + pub fn next_owned(&mut self) -> ReadResult> { if self.exhausted { return Ok(None); } - if let Some(v) = self.pending.take() { - return Ok(Some(v)); + + if let Some((k, v)) = self.pending.take() { + return Ok(Some((k.into_owned(), v.into_owned()))); } - self.execute_op() + + self.execute_op_owned() } } @@ -200,11 +237,12 @@ where /// /// Returns `Ok(Some((key, value)))` if a key/value pair was found, /// `Ok(None)` if no more key/value pairs are available, or `Err` on error. - fn execute_op(&self) -> ReadResult> { + fn execute_op(&self) -> ReadResult> { let mut key = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; - self.cursor.access().with_txn_ptr(|txn| { + let access = self.cursor.access(); + access.with_txn_ptr(|txn| { let res = unsafe { ffi::mdbx_cursor_get(self.cursor.cursor(), &mut key, &mut data, OP) }; @@ -213,8 +251,8 @@ where // SAFETY: decode_val checks for dirty writes and copies if needed. // The lifetime 'tx guarantees the Cow cannot outlive the transaction. unsafe { - let key = TableObject::decode_val::(txn, key)?; - let data = TableObject::decode_val::(txn, data)?; + let key = TableObject::decode_val::(access, txn, key)?; + let data = TableObject::decode_val::(access, txn, data)?; Ok(Some((key, data))) } } @@ -229,7 +267,7 @@ where /// Returns `Ok(Some((key, value)))` if a key/value pair was found, /// `Ok(None)` if no more key/value pairs are available, or `Err` on DB /// access error. - pub fn borrow_next(&mut self) -> ReadResult> { + pub fn borrow_next(&mut self) -> ReadResult> { if self.exhausted { return Ok(None); } @@ -240,7 +278,7 @@ where } } -impl Iterator for Iter<'_, '_, K, A, Key, Value, OP> +impl<'tx, K, A, Key, Value, const OP: u32> Iterator for Iter<'tx, '_, K, A, Key, Value, OP> where K: TransactionKind, A: TxPtrAccess, @@ -250,7 +288,7 @@ where type Item = ReadResult<(Key, Value)>; fn next(&mut self) -> Option { - self.owned_next().transpose() + self.next_owned().transpose() } } @@ -305,13 +343,19 @@ where /// Create a new iterator from the given cursor, the inner iterator will /// first yield the provided key/value pair. - pub(crate) fn new_with(cursor: Cow<'cur, Cursor<'tx, K, A>>, first: (Key, Value)) -> Self { + pub(crate) fn new_with( + cursor: Cow<'cur, Cursor<'tx, K, A>>, + first: KvView<'tx, A, Key, Value>, + ) -> Self { IterDup { inner: Iter::new_with(cursor, first) } } /// Create a new iterator from a mutable reference to the given cursor, /// first yielding the provided key/value pair. - pub fn from_ref_with(cursor: &'cur mut Cursor<'tx, K, A>, first: (Key, Value)) -> Self { + pub fn from_ref_with( + cursor: &'cur mut Cursor<'tx, K, A>, + first: KvView<'tx, A, Key, Value>, + ) -> Self { Self::new_with(Cow::Borrowed(cursor), first) } @@ -386,7 +430,7 @@ where Value: TableObjectOwned, { /// Own the next key/value pair from the iterator. - pub fn owned_next(&mut self) -> ReadResult>> { + pub fn next_owned(&mut self) -> ReadResult>> { self.borrow_next() } } @@ -401,7 +445,7 @@ where type Item = ReadResult>; fn next(&mut self) -> Option { - self.owned_next().transpose() + self.next_owned().transpose() } } diff --git a/src/tx/kind.rs b/src/tx/kind.rs index 6e82466..c8c6c6a 100644 --- a/src/tx/kind.rs +++ b/src/tx/kind.rs @@ -35,20 +35,19 @@ pub trait TransactionKind: private::Sealed + core::fmt::Debug + 'static { const IS_READ_ONLY: bool; /// The inner storage type for the transaction pointer. - type Inner: TxPtrAccess; + type UnsyncAccess: TxPtrAccess; } impl TransactionKind for RO { const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_RDONLY; const IS_READ_ONLY: bool = true; - // Without timeouts, RO uses direct pointer like RW - type Inner = RoGuard; + type UnsyncAccess = RoGuard; } impl TransactionKind for RW { const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_READWRITE; const IS_READ_ONLY: bool = false; - type Inner = RwUnsync; + type UnsyncAccess = RwUnsync; } diff --git a/src/tx/mod.rs b/src/tx/mod.rs index da0e20a..82f027e 100644 --- a/src/tx/mod.rs +++ b/src/tx/mod.rs @@ -32,7 +32,7 @@ mod assertions; mod cache; pub(crate) use cache::{CachedDb, SharedCache}; -mod cursor; +pub mod cursor; pub use cursor::{Cursor, RoCursorSync, RoCursorUnsync, RwCursorSync, RwCursorUnsync}; mod database; diff --git a/src/tx/sync.rs b/src/tx/sync.rs index 5df1ea5..69210f0 100644 --- a/src/tx/sync.rs +++ b/src/tx/sync.rs @@ -2,7 +2,8 @@ use crate::tx::assertions; use crate::{ Cursor, Database, Environment, MdbxError, RO, RW, ReadResult, Stat, TableObject, - TransactionKind, + TableObjectOwned, TransactionKind, + entries::SyncView, error::{MdbxResult, mdbx_result}, flags::{DatabaseFlags, WriteFlags}, sys::txn_manager::{RawTxPtr, TxnManagerMessage}, @@ -79,10 +80,16 @@ where /// returned. Retrieval of other items requires the use of /// [Cursor]. If the item is not in the database, then /// [None] will be returned. - pub fn get<'a, Key>(&'a self, dbi: ffi::MDBX_dbi, key: &[u8]) -> ReadResult> + pub fn get<'a, Key>( + &'a self, + dbi: ffi::MDBX_dbi, + key: &[u8], + ) -> ReadResult>> where Key: TableObject<'a>, { + let access = &*self.inner.ptr; + self.txn_execute(|txn_ptr| { // SAFETY: // txn is a valid transaction pointer from txn_execute. @@ -93,11 +100,27 @@ where // `decode_val` checks for dirty writes and copies data if needed. unsafe { let data_val = ops::get_raw(txn_ptr, dbi, key)?; - data_val.map(|val| Key::decode_val::(txn_ptr, val)).transpose() + + data_val + .map(|val| Key::decode_val::>(access, txn_ptr, val)) + .transpose() } })? } + /// Gets an item from a database, returning an owned value. + /// + /// This is a convenience method that retrieves the data and converts it + /// to an owned value directly. + pub fn get_owned(&self, dbi: ffi::MDBX_dbi, key: &[u8]) -> ReadResult> + where + T: TableObjectOwned, + { + self.txn_execute(|txn_ptr| unsafe { + ops::get_raw(txn_ptr, dbi, key)?.map(|val| T::decode_val_owned(val)).transpose() + })? + } + /// Commits the transaction. /// /// Any pending operations will be saved. diff --git a/src/tx/timer.rs b/src/tx/timer.rs deleted file mode 100644 index 282e4a1..0000000 --- a/src/tx/timer.rs +++ /dev/null @@ -1,75 +0,0 @@ -#[cfg(feature = "read-tx-timeouts")] -#[cfg(feature = "read-tx-timeouts")] -use crate::tx::access::WeakRoTxPtr; -use crate::{MdbxError, MdbxResult, tx::RoTxPtr}; -use std::{ - sync::{Arc, Weak}, - time::Duration, -}; - -/// Inner storage for RO transactions with timeout support. -/// -/// When a timeout is set, the Arc is handed off to a background thread that -/// will drop it after the timeout, causing the transaction to be aborted. -/// The transaction keeps a Weak reference and must upgrade it for each -/// operation. -/// -/// When no timeout is set, we keep the Arc ourselves and can use it directly. -#[cfg(feature = "read-tx-timeouts")] -pub(crate) struct RoInner { - /// If we own the Arc (no timeout), we can use it directly without upgrade. - owner: Option>, - /// Weak reference for timeout case - must upgrade to use. - #[cfg(feature = "read-tx-timeouts")] - weak: WeakRoTxPtr, -} - -impl RoInner { - /// Create a new RoInner with no timeout (we keep the Arc). - pub(crate) fn new_owned(arc: Arc) -> Self { - let weak = Arc::downgrade(&arc); - Self { - owner: Some(arc), - - #[cfg(feature = "read-tx-timeouts")] - weak, - } - } - - /// Create a new RoInner with timeout (background thread has the Arc). - #[cfg(feature = "read-tx-timeouts")] - pub(crate) fn new_with_timeout(ptr: RoTxPtr, duration: Duration) -> Self { - let arc = Arc::new(ptr); - let weak = Arc::downgrade(&arc); - std::thread::spawn(move || { - std::thread::sleep(duration); - // Drop the Arc, aborting the transaction. - drop(arc); - }); - Self { owner: None, weak } - } - - /// Get a reference to the owner Arc, if we have it. - pub(crate) fn owner(&self) -> Option<&Arc> { - self.owner.as_ref() - } - - /// Get a reference to the weak pointer. - #[cfg(feature = "read-tx-timeouts")] - pub(crate) fn weak(&self) -> &Weak { - &self.weak - } - - /// Try to upgrade the weak reference to take ownership. - #[cfg(feature = "read-tx-timeouts")] - pub(crate) fn try_upgrade(&mut self) -> MdbxResult<()> { - if self.owner.is_some() { - return Ok(()); - } - if let Some(arc) = self.weak.upgrade() { - self.owner = Some(arc); - return Ok(()); - } - Err(MdbxError::ReadTransactionTimeout.into()) - } -} diff --git a/src/tx/unsync.rs b/src/tx/unsync.rs index c411b41..53baa1c 100644 --- a/src/tx/unsync.rs +++ b/src/tx/unsync.rs @@ -28,7 +28,8 @@ use crate::tx::assertions; use crate::{ CommitLatency, Database, Environment, MdbxError, RO, RW, ReadResult, Stat, TableObject, - TransactionKind, + TableObjectOwned, TransactionKind, + entries::TableViewUnsync, error::{MdbxResult, mdbx_result}, flags::{DatabaseFlags, WriteFlags}, tx::{ @@ -60,7 +61,7 @@ impl fmt::Debug for TxMeta { /// - Arc/Weak pattern for RO transactions (!Sync, with timeout support) /// - Direct ownership for RW transactions (!Send, !Sync, no mutex needed) pub struct TxUnsync { - txn: K::Inner, + txn: K::UnsyncAccess, meta: TxMeta, @@ -185,19 +186,41 @@ impl TxUnsync { } /// Gets an item from a database. - pub fn get<'a, Key>(&'a mut self, dbi: ffi::MDBX_dbi, key: &[u8]) -> ReadResult> + pub fn get<'a, Key>( + &'a mut self, + dbi: ffi::MDBX_dbi, + key: &[u8], + ) -> ReadResult>> where Key: TableObject<'a>, { - self.with_txn_ptr(|txn_ptr| { - // SAFETY: txn_ptr is valid from with_txn_ptr. + // Rebinding here allows us to use access in the closure, whereas + // if we used self.with_txn_ptr directly, we couldn't borrow self again + let access = &self.txn; + access.with_txn_ptr(|txn_ptr| { + // SAFETY: txn_ptr is valid within with_txn_ptr. unsafe { let data_val = ops::get_raw(txn_ptr, dbi, key)?; - data_val.map(|val| Key::decode_val::(txn_ptr, val)).transpose() + data_val + .map(|val| Key::decode_val::(access, txn_ptr, val)) + .transpose() } })? } + /// Gets an item from a database, returning an owned value. + /// + /// This is a convenience method that retrieves the data and converts it + /// to an owned value directly. + pub fn get_owned(&mut self, dbi: ffi::MDBX_dbi, key: &[u8]) -> ReadResult> + where + T: TableObjectOwned, + { + self.txn.with_txn_ptr(|txn_ptr| unsafe { + ops::get_raw(txn_ptr, dbi, key)?.map(|val| T::decode_val_owned(val)).transpose() + })? + } + /// Opens a handle to an MDBX database. pub fn open_db(&mut self, name: Option<&str>) -> MdbxResult { let name_hash = CachedDb::hash_name(name); @@ -266,7 +289,7 @@ impl TxUnsync { /// Multiple cursors can be open simultaneously on different databases /// within the same transaction. The cursor borrows the transaction's /// inner access type, allowing concurrent cursor operations. - pub fn cursor(&self, db: Database) -> MdbxResult> { + pub fn cursor(&self, db: Database) -> MdbxResult> { Cursor::new(&self.txn, db) } @@ -541,13 +564,13 @@ mod tests { let mut txn = TxUnsync::::new(env.clone()).unwrap(); let db = txn.open_db(None).unwrap(); - let value: Option> = txn.get(db.dbi(), b"key1").unwrap(); + let value: Option> = txn.get_owned(db.dbi(), b"key1").unwrap(); assert_eq!(value.as_deref(), Some(b"value1".as_slice())); - let value: Option> = txn.get(db.dbi(), b"key2").unwrap(); + let value: Option> = txn.get_owned(db.dbi(), b"key2").unwrap(); assert_eq!(value.as_deref(), Some(b"value2".as_slice())); - let value: Option> = txn.get(db.dbi(), b"nonexistent").unwrap(); + let value: Option> = txn.get_owned(db.dbi(), b"nonexistent").unwrap(); assert!(value.is_none()); } @@ -582,7 +605,7 @@ mod tests { let mut txn = TxUnsync::::new_no_timeout(env).unwrap(); let db = txn.open_db(None).unwrap(); - let value: Option> = txn.get(db.dbi(), b"missing").unwrap(); + let value: Option> = txn.get_owned(db.dbi(), b"missing").unwrap(); assert!(value.is_none()); } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 78648f6..27c94ac 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -6,7 +6,7 @@ #![allow(missing_docs, dead_code)] use signet_libmdbx::{ Cursor, Database, DatabaseFlags, Environment, MdbxResult, RO, RW, ReadResult, Stat, - TableObject, TxSync, WriteFlags, ffi, + TableObject, TableObjectOwned, TxSync, WriteFlags, ffi, tx::{PtrSyncInner, RoGuard, RwUnsync, TxPtrAccess, unsync}, }; @@ -17,8 +17,8 @@ pub trait TestRwTxn: Sized { fn create_db(&mut self, name: Option<&str>, flags: DatabaseFlags) -> MdbxResult; fn open_db(&mut self, name: Option<&str>) -> MdbxResult; - fn get<'a, T: TableObject<'a>>( - &'a mut self, + fn get TableObject<'a>>( + &mut self, dbi: ffi::MDBX_dbi, key: &[u8], ) -> ReadResult>; @@ -43,8 +43,8 @@ pub trait TestRoTxn: Sized { type CursorAccess: TxPtrAccess; fn open_db(&mut self, name: Option<&str>) -> MdbxResult; - fn get<'a, T: TableObject<'a>>( - &'a mut self, + fn get TableObject<'a>>( + &mut self, dbi: ffi::MDBX_dbi, key: &[u8], ) -> ReadResult>; @@ -68,12 +68,12 @@ impl TestRwTxn for TxSync { TxSync::open_db(self, name) } - fn get<'a, T: TableObject<'a>>( - &'a mut self, + fn get TableObject<'a>>( + &mut self, dbi: ffi::MDBX_dbi, key: &[u8], ) -> ReadResult> { - TxSync::get(self, dbi, key) + TxSync::get_owned(self, dbi, key) } fn put(&mut self, db: Database, key: &[u8], data: &[u8], flags: WriteFlags) -> MdbxResult<()> { @@ -121,12 +121,12 @@ impl TestRoTxn for TxSync { TxSync::open_db(self, name) } - fn get<'a, T: TableObject<'a>>( - &'a mut self, + fn get TableObject<'a>>( + &mut self, dbi: ffi::MDBX_dbi, key: &[u8], ) -> ReadResult> { - TxSync::get(self, dbi, key) + TxSync::get_owned(self, dbi, key) } fn commit(self) -> MdbxResult<()> { @@ -157,12 +157,12 @@ impl TestRwTxn for unsync::TxUnsync { unsync::TxUnsync::open_db(self, name) } - fn get<'a, T: TableObject<'a>>( - &'a mut self, + fn get TableObject<'a>>( + &mut self, dbi: ffi::MDBX_dbi, key: &[u8], ) -> ReadResult> { - unsync::TxUnsync::get(self, dbi, key) + unsync::TxUnsync::get_owned(self, dbi, key) } fn put(&mut self, db: Database, key: &[u8], data: &[u8], flags: WriteFlags) -> MdbxResult<()> { @@ -210,12 +210,12 @@ impl TestRoTxn for unsync::TxUnsync { unsync::TxUnsync::open_db(self, name) } - fn get<'a, T: TableObject<'a>>( - &'a mut self, + fn get TableObject<'a>>( + &mut self, dbi: ffi::MDBX_dbi, key: &[u8], ) -> ReadResult> { - unsync::TxUnsync::get(self, dbi, key) + unsync::TxUnsync::get_owned(self, dbi, key) } fn commit(self) -> MdbxResult<()> { diff --git a/tests/cursor.rs b/tests/cursor.rs index f7beedd..67d3566 100644 --- a/tests/cursor.rs +++ b/tests/cursor.rs @@ -5,7 +5,7 @@ use signet_libmdbx::{ DatabaseFlags, Environment, MdbxError, MdbxResult, ObjectLength, ReadError, ReadResult, WriteFlags, tx::TxPtrAccess, }; -use std::{borrow::Cow, hint::black_box}; +use std::hint::black_box; use tempfile::tempdir; /// Convenience @@ -28,21 +28,21 @@ fn test_get_impl( let mut txn = begin_rw(&env).unwrap(); let db = txn.open_db(None).unwrap(); - assert_eq!(None, txn.cursor(db).unwrap().first::<(), ()>().unwrap()); + assert_eq!(None, txn.cursor(db).unwrap().first_owned::<(), ()>().unwrap()); txn.put(db, b"key1", b"val1", WriteFlags::empty()).unwrap(); txn.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); txn.put(db, b"key3", b"val3", WriteFlags::empty()).unwrap(); let mut cursor = txn.cursor(db).unwrap(); - assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1"))); - assert_eq!(cursor.get_current().unwrap(), Some((*b"key1", *b"val1"))); - assert_eq!(cursor.next().unwrap(), Some((*b"key2", *b"val2"))); - assert_eq!(cursor.prev().unwrap(), Some((*b"key1", *b"val1"))); - assert_eq!(cursor.last().unwrap(), Some((*b"key3", *b"val3"))); - assert_eq!(cursor.set(b"key1").unwrap(), Some(*b"val1")); - assert_eq!(cursor.set_key(b"key3").unwrap(), Some((*b"key3", *b"val3"))); - assert_eq!(cursor.set_range(b"key2\0").unwrap(), Some((*b"key3", *b"val3"))); + assert_eq!(cursor.first_owned().unwrap(), Some((*b"key1", *b"val1"))); + assert_eq!(cursor.get_current_owned().unwrap(), Some((*b"key1", *b"val1"))); + assert_eq!(cursor.next_owned().unwrap(), Some((*b"key2", *b"val2"))); + assert_eq!(cursor.prev_owned().unwrap(), Some((*b"key1", *b"val1"))); + assert_eq!(cursor.last_owned().unwrap(), Some((*b"key3", *b"val3"))); + assert_eq!(cursor.set_owned(b"key1").unwrap(), Some(*b"val1")); + assert_eq!(cursor.set_key_owned(b"key3").unwrap(), Some((*b"key3", *b"val3"))); + assert_eq!(cursor.set_range_owned(b"key2\0").unwrap(), Some((false, *b"key3", *b"val3"))); } #[test] @@ -75,33 +75,33 @@ fn test_get_dup_impl( txn.put(db, b"key2", b"val3", WriteFlags::empty()).unwrap(); let mut cursor = txn.cursor(db).unwrap(); - assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1"))); - assert_eq!(cursor.first_dup().unwrap(), Some(*b"val1")); - assert_eq!(cursor.get_current().unwrap(), Some((*b"key1", *b"val1"))); - assert_eq!(cursor.next_nodup().unwrap(), Some((*b"key2", *b"val1"))); - assert_eq!(cursor.next().unwrap(), Some((*b"key2", *b"val2"))); - assert_eq!(cursor.prev().unwrap(), Some((*b"key2", *b"val1"))); - assert_eq!(cursor.next_dup().unwrap(), Some((*b"key2", *b"val2"))); - assert_eq!(cursor.next_dup().unwrap(), Some((*b"key2", *b"val3"))); - assert_eq!(cursor.next_dup::<(), ()>().unwrap(), None); - assert_eq!(cursor.prev_dup().unwrap(), Some((*b"key2", *b"val2"))); - assert_eq!(cursor.last_dup().unwrap(), Some(*b"val3")); - assert_eq!(cursor.prev_nodup().unwrap(), Some((*b"key1", *b"val3"))); - assert_eq!(cursor.next_dup::<(), ()>().unwrap(), None); - assert_eq!(cursor.set(b"key1").unwrap(), Some(*b"val1")); - assert_eq!(cursor.set(b"key2").unwrap(), Some(*b"val1")); - assert_eq!(cursor.set_range(b"key1\0").unwrap(), Some((*b"key2", *b"val1"))); - assert_eq!(cursor.get_both(b"key1", b"val3").unwrap(), Some(*b"val3")); - assert_eq!(cursor.get_both_range::<()>(b"key1", b"val4").unwrap(), None); - assert_eq!(cursor.get_both_range(b"key2", b"val").unwrap(), Some(*b"val1")); - - assert_eq!(cursor.last().unwrap(), Some((*b"key2", *b"val3"))); + assert_eq!(cursor.first_owned().unwrap(), Some((*b"key1", *b"val1"))); + assert_eq!(cursor.first_dup_owned().unwrap(), Some(*b"val1")); + assert_eq!(cursor.get_current_owned().unwrap(), Some((*b"key1", *b"val1"))); + assert_eq!(cursor.next_nodup_owned().unwrap(), Some((*b"key2", *b"val1"))); + assert_eq!(cursor.next_owned().unwrap(), Some((*b"key2", *b"val2"))); + assert_eq!(cursor.prev_owned().unwrap(), Some((*b"key2", *b"val1"))); + assert_eq!(cursor.next_dup_owned().unwrap(), Some((*b"key2", *b"val2"))); + assert_eq!(cursor.next_dup_owned().unwrap(), Some((*b"key2", *b"val3"))); + assert_eq!(cursor.next_dup_owned::<(), ()>().unwrap(), None); + assert_eq!(cursor.prev_dup_owned().unwrap(), Some((*b"key2", *b"val2"))); + assert_eq!(cursor.last_dup_owned().unwrap(), Some(*b"val3")); + assert_eq!(cursor.prev_nodup_owned().unwrap(), Some((*b"key1", *b"val3"))); + assert_eq!(cursor.next_dup_owned::<(), ()>().unwrap(), None); + assert_eq!(cursor.set_owned(b"key1").unwrap(), Some(*b"val1")); + assert_eq!(cursor.set_owned(b"key2").unwrap(), Some(*b"val1")); + assert_eq!(cursor.set_range_owned(b"key1\0").unwrap(), Some((false, *b"key2", *b"val1"))); + assert_eq!(cursor.get_both_owned(b"key1", b"val3").unwrap(), Some(*b"val3")); + assert_eq!(cursor.get_both_range_owned::<()>(b"key1", b"val4").unwrap(), None); + assert_eq!(cursor.get_both_range_owned(b"key2", b"val").unwrap(), Some(*b"val1")); + + assert_eq!(cursor.last_owned().unwrap(), Some((*b"key2", *b"val3"))); cursor.del(WriteFlags::empty()).unwrap(); - assert_eq!(cursor.last().unwrap(), Some((*b"key2", *b"val2"))); + assert_eq!(cursor.last_owned().unwrap(), Some((*b"key2", *b"val2"))); cursor.del(WriteFlags::empty()).unwrap(); - assert_eq!(cursor.last().unwrap(), Some((*b"key2", *b"val1"))); + assert_eq!(cursor.last_owned().unwrap(), Some((*b"key2", *b"val1"))); cursor.del(WriteFlags::empty()).unwrap(); - assert_eq!(cursor.last().unwrap(), Some((*b"key1", *b"val3"))); + assert_eq!(cursor.last_owned().unwrap(), Some((*b"key1", *b"val3"))); } #[test] @@ -134,9 +134,9 @@ fn test_get_dupfixed_impl( txn.put(db, b"key2", b"val6", WriteFlags::empty()).unwrap(); let mut cursor = txn.cursor(db).unwrap(); - assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1"))); - assert_eq!(cursor.get_multiple().unwrap(), Some(*b"val1val2val3")); - assert_eq!(cursor.next_multiple::<(), ()>().unwrap(), None); + assert_eq!(cursor.first_owned().unwrap(), Some((*b"key1", *b"val1"))); + assert_eq!(cursor.get_multiple_owned().unwrap(), Some(*b"val1val2val3")); + assert_eq!(cursor.next_multiple_owned::<(), ()>().unwrap(), None); } #[test] @@ -188,7 +188,7 @@ fn test_iter_impl( let retr: Result> = cursor.iter_start().unwrap().collect(); assert_eq!(items, retr.unwrap()); - cursor.set::<()>(b"key2").unwrap(); + cursor.set_owned::<()>(b"key2").unwrap(); assert_eq!( items.clone().into_iter().skip(2).collect::>(), cursor.iter().collect::>>().unwrap() @@ -334,7 +334,7 @@ fn test_iter_dup_impl( let mut cursor = txn.cursor(db).unwrap(); assert_eq!(items, cursor.iter_dup().flatten().flatten().collect::>>().unwrap()); - cursor.set::<()>(b"b").unwrap(); + cursor.set_owned::<()>(b"b").unwrap(); assert_eq!( items.iter().copied().skip(6).collect::>(), cursor.iter_dup().flatten().flatten().collect::>>().unwrap() @@ -456,7 +456,7 @@ fn test_iter_del_get_impl( cursor.iter_dup_of(b"a").unwrap().collect::>>().unwrap() ); - assert_eq!(cursor.set(b"a").unwrap(), Some(*b"1")); + assert_eq!(cursor.set_owned(b"a").unwrap(), Some(*b"1")); cursor.del(WriteFlags::empty()).unwrap(); @@ -499,24 +499,12 @@ fn test_put_del_impl( cursor.put(b"key2", b"val2", WriteFlags::empty()).unwrap(); cursor.put(b"key3", b"val3", WriteFlags::empty()).unwrap(); - assert_eq!( - cursor.set_key(b"key2").unwrap(), - Some((Cow::Borrowed(b"key2" as &[u8]), Cow::Borrowed(b"val2" as &[u8]))) - ); - assert_eq!( - cursor.get_current().unwrap(), - Some((Cow::Borrowed(b"key2" as &[u8]), Cow::Borrowed(b"val2" as &[u8]))) - ); + assert_eq!(cursor.set_key_owned(b"key2").unwrap(), Some((*b"key2", *b"val2"))); + assert_eq!(cursor.get_current_owned().unwrap(), Some((*b"key2", *b"val2"))); cursor.del(WriteFlags::empty()).unwrap(); - assert_eq!( - cursor.get_current().unwrap(), - Some((Cow::Borrowed(b"key3" as &[u8]), Cow::Borrowed(b"val3" as &[u8]))) - ); - assert_eq!( - cursor.last().unwrap(), - Some((Cow::Borrowed(b"key3" as &[u8]), Cow::Borrowed(b"val3" as &[u8]))) - ); + assert_eq!(cursor.get_current_owned().unwrap(), Some((*b"key3", *b"val3"))); + assert_eq!(cursor.last_owned().unwrap(), Some((*b"key3", *b"val3"))); } #[test] @@ -839,13 +827,13 @@ mod timeout_tests { let mut cursor = txn.cursor(db).unwrap(); // Cursor should work before timeout - assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1"))); + assert_eq!(cursor.first_owned().unwrap(), Some((*b"key1", *b"val1"))); // Wait for timeout std::thread::sleep(WAIT_FOR_TIMEOUT); // Cursor operations should now fail with ReadTransactionTimeout - let err = cursor.first::<(), ()>().unwrap_err(); + let err = cursor.first_owned::<(), ()>().unwrap_err(); assert!( matches!(err, ReadError::Mdbx(MdbxError::ReadTransactionTimeout)), "Expected ReadTransactionTimeout, got: {err:?}" @@ -882,7 +870,7 @@ mod timeout_tests { let txn = env.begin_ro_txn().unwrap(); let db = txn.open_db(None).unwrap(); let mut cursor = txn.cursor(db).unwrap(); - assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1"))); + assert_eq!(cursor.first_owned().unwrap(), Some((*b"key1", *b"val1"))); } /// Test multiple cursors cleanup after timeout. @@ -920,7 +908,7 @@ mod timeout_tests { let txn = env.begin_ro_txn().unwrap(); let db = txn.open_db(Some("db1")).unwrap(); let mut cursor = txn.cursor(db).unwrap(); - assert_eq!(cursor.first().unwrap(), Some((*b"k1", *b"v1"))); + assert_eq!(cursor.first_owned().unwrap(), Some((*b"k1", *b"v1"))); } /// Test that transactions without timeout work correctly. @@ -943,13 +931,13 @@ mod timeout_tests { let mut cursor = txn.cursor(db).unwrap(); // Cursor should work before the "normal" timeout period - assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1"))); + assert_eq!(cursor.first_owned().unwrap(), Some((*b"key1", *b"val1"))); // Wait longer than the short timeout (but not forever) std::thread::sleep(WAIT_FOR_TIMEOUT); // Cursor should still work since there's no timeout - assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1"))); + assert_eq!(cursor.first_owned().unwrap(), Some((*b"key1", *b"val1"))); } /// Test iterator behavior after timeout. @@ -1020,10 +1008,10 @@ fn test_cursor_append_impl( let db = txn.open_db(None).unwrap(); let mut cursor = txn.cursor(db).unwrap(); - assert_eq!(cursor.first().unwrap(), Some((*b"a", *b"val_a"))); - assert_eq!(cursor.next().unwrap(), Some((*b"b", *b"val_b"))); - assert_eq!(cursor.next().unwrap(), Some((*b"c", *b"val_c"))); - assert_eq!(cursor.next::<(), ()>().unwrap(), None); + assert_eq!(cursor.first_owned().unwrap(), Some((*b"a", *b"val_a"))); + assert_eq!(cursor.next_owned().unwrap(), Some((*b"b", *b"val_b"))); + assert_eq!(cursor.next_owned().unwrap(), Some((*b"c", *b"val_c"))); + assert_eq!(cursor.next_owned::<(), ()>().unwrap(), None); } #[test] @@ -1063,9 +1051,9 @@ fn test_tx_append_impl( let db = txn.open_db(None).unwrap(); let mut cursor = txn.cursor(db).unwrap(); - assert_eq!(cursor.first().unwrap(), Some((*b"key1", *b"val1"))); - assert_eq!(cursor.next().unwrap(), Some((*b"key2", *b"val2"))); - assert_eq!(cursor.next().unwrap(), Some((*b"key3", *b"val3"))); + assert_eq!(cursor.first_owned().unwrap(), Some((*b"key1", *b"val1"))); + assert_eq!(cursor.next_owned().unwrap(), Some((*b"key2", *b"val2"))); + assert_eq!(cursor.next_owned().unwrap(), Some((*b"key3", *b"val3"))); } #[test] @@ -1106,10 +1094,10 @@ fn test_append_dup_impl( let db = txn.open_db(None).unwrap(); let mut cursor = txn.cursor(db).unwrap(); - assert_eq!(cursor.first().unwrap(), Some((*b"a", *b"1"))); - assert_eq!(cursor.next_dup().unwrap(), Some((*b"a", *b"2"))); - assert_eq!(cursor.next_dup().unwrap(), Some((*b"a", *b"3"))); - assert_eq!(cursor.next_dup::<(), ()>().unwrap(), None); + assert_eq!(cursor.first_owned().unwrap(), Some((*b"a", *b"1"))); + assert_eq!(cursor.next_dup_owned().unwrap(), Some((*b"a", *b"2"))); + assert_eq!(cursor.next_dup_owned().unwrap(), Some((*b"a", *b"3"))); + assert_eq!(cursor.next_dup_owned::<(), ()>().unwrap(), None); } #[test] @@ -1250,7 +1238,7 @@ mod append_debug_tests { let db = txn.open_db(None).unwrap(); let mut cursor = txn.cursor(db).unwrap(); - let first: Option<(Vec, Vec)> = cursor.first().unwrap(); + let first: Option<(Vec, Vec)> = cursor.first_owned().unwrap(); assert!(first.is_some()); } @@ -1307,7 +1295,7 @@ mod append_debug_tests { let db = txn.open_db(None).unwrap(); let mut cursor = txn.cursor(db).unwrap(); - let first: Option<(Vec, Vec)> = cursor.first().unwrap(); + let first: Option<(Vec, Vec)> = cursor.first_owned().unwrap(); assert!(first.is_some()); } diff --git a/tests/proptest_inputs.rs b/tests/proptest_inputs.rs index 3b624e7..12ff393 100644 --- a/tests/proptest_inputs.rs +++ b/tests/proptest_inputs.rs @@ -43,7 +43,7 @@ proptest! { // If put succeeded, get should not panic if put_result.is_ok() { - let _: Option> = txn.get(db.dbi(), &key).unwrap(); + let _: Option> = txn.get_owned(db.dbi(), &key).unwrap(); } } @@ -70,7 +70,7 @@ proptest! { let db = txn.open_db(None).unwrap(); // Get on nonexistent key should return Ok(None), not panic - let result: signet_libmdbx::ReadResult>> = txn.get(db.dbi(), &key); + let result: signet_libmdbx::ReadResult>> = txn.get_owned(db.dbi(), &key); prop_assert!(result.is_ok()); prop_assert!(result.unwrap().is_none()); } @@ -96,7 +96,7 @@ proptest! { // If put succeeded, get should not panic if put_result.is_ok() { - let _: Option> = txn.get(db.dbi(), &key).unwrap(); + let _: Option> = txn.get_owned(db.dbi(), &key).unwrap(); } } @@ -123,7 +123,7 @@ proptest! { let db = txn.open_db(None).unwrap(); // Get on nonexistent key should return Ok(None), not panic - let result: signet_libmdbx::ReadResult>> = txn.get(db.dbi(), &key); + let result: signet_libmdbx::ReadResult>> = txn.get_owned(db.dbi(), &key); prop_assert!(result.is_ok()); prop_assert!(result.unwrap().is_none()); } @@ -150,7 +150,7 @@ proptest! { let mut cursor = txn.cursor(db).unwrap(); // set() with arbitrary key should return None or value, never panic - let result: signet_libmdbx::ReadResult>> = cursor.set(&key); + let result: signet_libmdbx::ReadResult>> = cursor.set_owned(&key); prop_assert!(result.is_ok()); } @@ -169,8 +169,9 @@ proptest! { let mut cursor = txn.cursor(db).unwrap(); // set_range() with arbitrary key should not panic - let result: signet_libmdbx::ReadResult, Vec)>> = - cursor.set_range(&key); + #[allow(clippy::type_complexity)] + let result: signet_libmdbx::ReadResult, Vec)>> = + cursor.set_range_owned(&key); prop_assert!(result.is_ok()); } @@ -188,7 +189,7 @@ proptest! { // set_key() should not panic let result: signet_libmdbx::ReadResult, Vec)>> = - cursor.set_key(&key); + cursor.set_key_owned(&key); prop_assert!(result.is_ok()); } @@ -204,7 +205,7 @@ proptest! { let mut cursor = txn.cursor(db).unwrap(); - let result: signet_libmdbx::ReadResult>> = cursor.set(&key); + let result: signet_libmdbx::ReadResult>> = cursor.set_owned(&key); prop_assert!(result.is_ok()); } @@ -221,8 +222,9 @@ proptest! { let mut cursor = txn.cursor(db).unwrap(); - let result: signet_libmdbx::ReadResult, Vec)>> = - cursor.set_range(&key); + #[allow(clippy::type_complexity)] + let result: signet_libmdbx::ReadResult, Vec)>> = + cursor.set_range_owned(&key); prop_assert!(result.is_ok()); } @@ -239,7 +241,7 @@ proptest! { let mut cursor = txn.cursor(db).unwrap(); let result: signet_libmdbx::ReadResult, Vec)>> = - cursor.set_key(&key); + cursor.set_key_owned(&key); prop_assert!(result.is_ok()); } } @@ -344,7 +346,7 @@ proptest! { // get_both should not panic let result: signet_libmdbx::ReadResult>> = - cursor.get_both(&search_key, &search_value); + cursor.get_both_owned(&search_key, &search_value); prop_assert!(result.is_ok()); } @@ -366,7 +368,7 @@ proptest! { // get_both_range should not panic let result: signet_libmdbx::ReadResult>> = - cursor.get_both_range(&search_key, &search_value); + cursor.get_both_range_owned(&search_key, &search_value); prop_assert!(result.is_ok()); } @@ -387,7 +389,7 @@ proptest! { let mut cursor = txn.cursor(db).unwrap(); let result: signet_libmdbx::ReadResult>> = - cursor.get_both(&search_key, &search_value); + cursor.get_both_owned(&search_key, &search_value); prop_assert!(result.is_ok()); } @@ -408,7 +410,7 @@ proptest! { let mut cursor = txn.cursor(db).unwrap(); let result: signet_libmdbx::ReadResult>> = - cursor.get_both_range(&search_key, &search_value); + cursor.get_both_range_owned(&search_key, &search_value); prop_assert!(result.is_ok()); } } @@ -624,7 +626,7 @@ proptest! { prop_assert!(put_result.is_ok()); let get_result: signet_libmdbx::ReadResult>> = - txn.get(db.dbi(), b""); + txn.get_owned(db.dbi(), b""); prop_assert!(get_result.is_ok()); let del_result = txn.del(db, b"", None); @@ -647,7 +649,7 @@ proptest! { prop_assert!(put_result.is_ok()); let get_result: signet_libmdbx::ReadResult>> = - txn.get(db.dbi(), &key); + txn.get_owned(db.dbi(), &key); prop_assert!(get_result.is_ok()); prop_assert!(get_result.unwrap().is_some()); } @@ -664,7 +666,7 @@ proptest! { prop_assert!(put_result.is_ok()); let get_result: signet_libmdbx::ReadResult>> = - txn.get(db.dbi(), b""); + txn.get_owned(db.dbi(), b""); prop_assert!(get_result.is_ok()); let del_result = txn.del(db, b"", None); @@ -685,7 +687,7 @@ proptest! { prop_assert!(put_result.is_ok()); let get_result: signet_libmdbx::ReadResult>> = - txn.get(db.dbi(), &key); + txn.get_owned(db.dbi(), &key); prop_assert!(get_result.is_ok()); prop_assert!(get_result.unwrap().is_some()); } @@ -708,7 +710,7 @@ proptest! { let put_result = txn.put(db, &key, &value, WriteFlags::empty()); if put_result.is_ok() { - let retrieved: Option> = txn.get(db.dbi(), &key).unwrap(); + let retrieved: Option> = txn.get_owned(db.dbi(), &key).unwrap(); prop_assert_eq!(retrieved, Some(value)); } } @@ -731,7 +733,7 @@ proptest! { let put_result = txn.put(db, &key, &value, WriteFlags::empty()); if put_result.is_ok() { - let retrieved: Option> = txn.get(db.dbi(), &key).unwrap(); + let retrieved: Option> = txn.get_owned(db.dbi(), &key).unwrap(); prop_assert_eq!(retrieved, Some(value)); } } @@ -760,7 +762,7 @@ proptest! { let put2 = txn.put(db, &key, &value2, WriteFlags::empty()); if put1.is_ok() && put2.is_ok() { - let retrieved: Option> = txn.get(db.dbi(), &key).unwrap(); + let retrieved: Option> = txn.get_owned(db.dbi(), &key).unwrap(); prop_assert_eq!(retrieved, Some(value2)); } } @@ -789,7 +791,7 @@ proptest! { let put2 = txn.put(db, &key, &value2, WriteFlags::empty()); if put1.is_ok() && put2.is_ok() { - let retrieved: Option> = txn.get(db.dbi(), &key).unwrap(); + let retrieved: Option> = txn.get_owned(db.dbi(), &key).unwrap(); prop_assert_eq!(retrieved, Some(value2)); } } @@ -815,7 +817,7 @@ proptest! { let deleted = txn.del(db, &key, None).unwrap(); prop_assert!(deleted); - let retrieved: Option> = txn.get(db.dbi(), &key).unwrap(); + let retrieved: Option> = txn.get_owned(db.dbi(), &key).unwrap(); prop_assert_eq!(retrieved, None); } } @@ -841,7 +843,7 @@ proptest! { let deleted = txn.del(db, &key, None).unwrap(); prop_assert!(deleted); - let retrieved: Option> = txn.get(db.dbi(), &key).unwrap(); + let retrieved: Option> = txn.get_owned(db.dbi(), &key).unwrap(); prop_assert_eq!(retrieved, None); } } @@ -1044,7 +1046,7 @@ proptest! { let put_result = txn.put(db, &key, &value, WriteFlags::empty()); if put_result.is_ok() { let mut cursor = txn.cursor(db).unwrap(); - let retrieved: Option> = cursor.set(&key).unwrap(); + let retrieved: Option> = cursor.set_owned(&key).unwrap(); prop_assert_eq!(retrieved, Some(value)); } } @@ -1068,7 +1070,7 @@ proptest! { let put_result = txn.put(db, &key, &value, WriteFlags::empty()); if put_result.is_ok() { let mut cursor = txn.cursor(db).unwrap(); - let retrieved: Option> = cursor.set(&key).unwrap(); + let retrieved: Option> = cursor.set_owned(&key).unwrap(); prop_assert_eq!(retrieved, Some(value)); } } @@ -1111,7 +1113,10 @@ proptest! { .cloned(); let mut cursor = txn.cursor(db).unwrap(); - let result: Option<(Vec, Vec)> = cursor.set_range(&search_key).unwrap(); + let result: Option<(Vec, Vec)> = cursor + .set_range_owned(&search_key) + .unwrap() + .map(|(_, k, v)| (k, v)); prop_assert_eq!(result, expected); } @@ -1153,7 +1158,10 @@ proptest! { .cloned(); let mut cursor = txn.cursor(db).unwrap(); - let result: Option<(Vec, Vec)> = cursor.set_range(&search_key).unwrap(); + let result: Option<(Vec, Vec)> = cursor + .set_range_owned(&search_key) + .unwrap() + .map(|(_, k, v)| (k, v)); prop_assert_eq!(result, expected); } diff --git a/tests/transaction.rs b/tests/transaction.rs index 2b6fb1c..77efd36 100644 --- a/tests/transaction.rs +++ b/tests/transaction.rs @@ -3,7 +3,6 @@ mod common; use common::{TestRoTxn, TestRwTxn, V1Factory, V2Factory}; use signet_libmdbx::*; use std::{ - borrow::Cow, io::Write, sync::{Arc, Barrier}, thread::{self, JoinHandle}, @@ -33,13 +32,13 @@ fn test_put_get_del_impl( let mut txn = begin_rw(&env).unwrap(); let db = txn.open_db(None).unwrap(); - assert_eq!(txn.get(db.dbi(), b"key1").unwrap(), Some(*b"val1")); - assert_eq!(txn.get(db.dbi(), b"key2").unwrap(), Some(*b"val2")); - assert_eq!(txn.get(db.dbi(), b"key3").unwrap(), Some(*b"val3")); - assert_eq!(txn.get::<()>(db.dbi(), b"key").unwrap(), None); + assert_eq!(txn.get::<[u8; 4]>(db.dbi(), b"key1").unwrap(), Some(*b"val1")); + assert_eq!(txn.get::<[u8; 4]>(db.dbi(), b"key2").unwrap(), Some(*b"val2")); + assert_eq!(txn.get::<[u8; 4]>(db.dbi(), b"key3").unwrap(), Some(*b"val3")); + assert_eq!(txn.get::<[u8; 4]>(db.dbi(), b"key").unwrap(), None); txn.del(db, b"key1", None).unwrap(); - assert_eq!(txn.get::<()>(db.dbi(), b"key1").unwrap(), None); + assert_eq!(txn.get::<[u8; 4]>(db.dbi(), b"key1").unwrap(), None); } #[test] @@ -128,14 +127,14 @@ fn test_put_get_del_empty_key_impl( let mut txn = begin_rw(&env).unwrap(); let db = txn.create_db(None, Default::default()).unwrap(); txn.put(db, b"", b"hello", WriteFlags::empty()).unwrap(); - assert_eq!(txn.get(db.dbi(), b"").unwrap(), Some(*b"hello")); + assert_eq!(txn.get::<[u8; 5]>(db.dbi(), b"").unwrap(), Some(*b"hello")); txn.commit().unwrap(); let mut txn = begin_rw(&env).unwrap(); let db = txn.open_db(None).unwrap(); - assert_eq!(txn.get(db.dbi(), b"").unwrap(), Some(*b"hello")); + assert_eq!(txn.get::<[u8; 5]>(db.dbi(), b"").unwrap(), Some(*b"hello")); txn.put(db, b"", b"", WriteFlags::empty()).unwrap(); - assert_eq!(txn.get(db.dbi(), b"").unwrap(), Some(*b"")); + assert_eq!(txn.get::<[u8; 0]>(db.dbi(), b"").unwrap(), Some([])); } #[test] @@ -503,11 +502,11 @@ fn test_reserve_v1() { let txn = env.begin_rw_txn().unwrap(); let db = txn.open_db(None).unwrap(); - assert_eq!(txn.get(db.dbi(), b"key1").unwrap(), Some(*b"val1")); - assert_eq!(txn.get::<()>(db.dbi(), b"key").unwrap(), None); + assert_eq!(txn.get_owned::<[u8; 4]>(db.dbi(), b"key1").unwrap(), Some(*b"val1")); + assert_eq!(txn.get_owned::<[u8; 4]>(db.dbi(), b"key").unwrap(), None); txn.del(db, b"key1", None).unwrap(); - assert_eq!(txn.get::<()>(db.dbi(), b"key1").unwrap(), None); + assert_eq!(txn.get_owned::<[u8; 4]>(db.dbi(), b"key1").unwrap(), None); } /// Test reserve - V2 version @@ -529,11 +528,11 @@ fn test_reserve_v2() { let mut txn = env.begin_rw_unsync().unwrap(); let db = txn.open_db(None).unwrap(); - assert_eq!(txn.get(db.dbi(), b"key1").unwrap(), Some(*b"val1")); - assert_eq!(txn.get::<()>(db.dbi(), b"key").unwrap(), None); + assert_eq!(txn.get_owned::<[u8; 4]>(db.dbi(), b"key1").unwrap(), Some(*b"val1")); + assert_eq!(txn.get_owned::<[u8; 4]>(db.dbi(), b"key").unwrap(), None); txn.del(db, b"key1", None).unwrap(); - assert_eq!(txn.get::<()>(db.dbi(), b"key1").unwrap(), None); + assert_eq!(txn.get_owned::<[u8; 4]>(db.dbi(), b"key1").unwrap(), None); } /// Test nested transactions - V1 only (V2 doesn't support nested txns) @@ -550,13 +549,13 @@ fn test_nested_txn() { let nested = txn.begin_nested_txn().unwrap(); let db = nested.open_db(None).unwrap(); nested.put(db, b"key2", b"val2", WriteFlags::empty()).unwrap(); - assert_eq!(nested.get(db.dbi(), b"key1").unwrap(), Some(*b"val1")); - assert_eq!(nested.get(db.dbi(), b"key2").unwrap(), Some(*b"val2")); + assert_eq!(nested.get_owned::<[u8; 4]>(db.dbi(), b"key1").unwrap(), Some(*b"val1")); + assert_eq!(nested.get_owned::<[u8; 4]>(db.dbi(), b"key2").unwrap(), Some(*b"val2")); } let db = txn.open_db(None).unwrap(); - assert_eq!(txn.get(db.dbi(), b"key1").unwrap(), Some(*b"val1")); - assert_eq!(txn.get::<()>(db.dbi(), b"key2").unwrap(), None); + assert_eq!(txn.get_owned::<[u8; 4]>(db.dbi(), b"key1").unwrap(), Some(*b"val1")); + assert_eq!(txn.get_owned::<[u8; 4]>(db.dbi(), b"key2").unwrap(), None); } /// Test concurrent readers with single writer - V1 only (V2 is !Sync) @@ -580,14 +579,14 @@ fn test_concurrent_readers_single_writer() { { let txn = reader_env.begin_ro_txn().unwrap(); let db = txn.open_db(None).unwrap(); - assert_eq!(txn.get::<()>(db.dbi(), key).unwrap(), None); + assert_eq!(txn.get_owned::<[u8; 3]>(db.dbi(), key).unwrap(), None); } reader_barrier.wait(); reader_barrier.wait(); { let txn = reader_env.begin_ro_txn().unwrap(); let db = txn.open_db(None).unwrap(); - txn.get::<[u8; 3]>(db.dbi(), key).unwrap().unwrap() == *val + txn.get_owned::<[u8; 3]>(db.dbi(), key).unwrap().unwrap() == *val } })); } @@ -633,8 +632,8 @@ fn test_concurrent_writers() { for i in 0..n { assert_eq!( - Cow::>::Owned(format!("{val}{i}").into_bytes()), - txn.get(db.dbi(), format!("{key}{i}").as_bytes()).unwrap().unwrap() + format!("{val}{i}").into_bytes(), + txn.get_owned::>(db.dbi(), format!("{key}{i}").as_bytes()).unwrap().unwrap() ); } }