diff --git a/storage/conformance.toml b/storage/conformance.toml index a8f86a29035..f4251f496e1 100644 --- a/storage/conformance.toml +++ b/storage/conformance.toml @@ -48,11 +48,11 @@ hash = "13b3e99a8c74b50dc18150194a92306de670b94e6642758feb6d9b6e9881f827" ["commonware_storage::journal::conformance::ContiguousFixed"] n_cases = 512 -hash = "b193d460f527eb5e6f54e6bfc0819aefac1e7b58367464e10e45f0c14d5805e9" +hash = "037ee738320c79885a2e0a750bbb2d5ee62f3270c631b2874b8cc9e9ebe3eee7" ["commonware_storage::journal::conformance::ContiguousVariable"] n_cases = 512 -hash = "4345d35c8fe6fbfb3b76d6a487a864085637163f6a70a7eaa288bea469e842ed" +hash = "6604e1cf727e7894187fa5ed95a655048abfe40822f132075925fe140bfea93a" ["commonware_storage::journal::conformance::SegmentedFixed"] n_cases = 512 @@ -356,4 +356,4 @@ hash = "290187801284530d0d7e82c33bb6ce975a5f4daa4b104230291cea9cffcd7686" ["commonware_storage::queue::conformance::QueueConformance"] n_cases = 512 -hash = "718a6b9f3905c146aa0d95b2b32ea1557680940c3fe2a293b73a19a56a3ac000" +hash = "46b87c955c232ab53bc8fdffef26b0f68ed2cc6b435655fbc418949191e3af66" diff --git a/storage/src/journal/contiguous/fixed.rs b/storage/src/journal/contiguous/fixed.rs index 4b8f749cf39..9fe8b8575e4 100644 --- a/storage/src/journal/contiguous/fixed.rs +++ b/storage/src/journal/contiguous/fixed.rs @@ -38,10 +38,39 @@ //! //! Metadata is stored in `{cfg.partition}-metadata`. //! +//! # Metadata +//! +//! Metadata contains the following keys: +//! - PRUNING_BOUNDARY_KEY: Stores the pruning boundary as a u64 when it's mid-section (not a +//! multiple of items_per_blob). Absent from legacy journals or when the boundary is +//! section-aligned, since it can be derived from the oldest blob. +//! - RECOVERY_WATERMARK_KEY: Stores a lower bound on the last logical size at which the fixed +//! journal's entries and metadata were synced as a coherent recovery checkpoint by an external +//! consumer. The key is durably written during initialization for any journal last opened before +//! this key was introduced. +//! +//! RECOVERY_WATERMARK_KEY is mainly useful when this journal is used as an index for a layered +//! journal, such as the variable journal's offsets. Standalone fixed journals do not need it to +//! recover their own size; they recover from retained blob lengths. +//! +//! # Recovery +//! +//! Recovery derives fixed-journal size from retained blob lengths: +//! - Once RECOVERY_WATERMARK_KEY exists, recovery walks retained blob lengths from oldest to +//! newest. A short newest section is the natural tail; a short earlier section is treated as the +//! end of the contiguous prefix, and newer sections are truncated. After size recovery, the +//! watermark is preserved if it is still within the recovered size and lowered otherwise. +//! - Legacy journals without RECOVERY_WATERMARK_KEY rely on the old rule that section rollover +//! synced the previous section. Valid legacy journals recover from the newest retained blob once, +//! then persist the watermark before returning from `init`. +//! +//! The recovery watermark is therefore an external recovery checkpoint, not a complete record of +//! every item that may have become durable through `commit` or storage behavior. +//! //! # Consistency //! //! Data written to `Journal` may not be immediately persisted to `Storage`. It is up to the caller -//! to determine when to force pending data to be written to `Storage` using the `sync` method. When +//! to determine when to force pending data to be durably written using `commit` or `sync`. When //! calling `close`, all pending data is automatically synced and any open blobs are closed. //! //! # Pruning @@ -66,7 +95,7 @@ use crate::{ }; use commonware_codec::CodecFixedShared; use commonware_runtime::buffer::paged::CacheRef; -use commonware_utils::sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock}; +use commonware_utils::sync::{AsyncMutex, AsyncRwLock, AsyncRwLockReadGuard}; use futures::{stream::Stream, StreamExt}; use std::num::{NonZeroU64, NonZeroUsize}; use tracing::warn; @@ -74,6 +103,46 @@ use tracing::warn; /// Metadata key for storing the pruning boundary. const PRUNING_BOUNDARY_KEY: u64 = 1; +/// Metadata key for storing the recovery watermark. +const RECOVERY_WATERMARK_KEY: u64 = 2; + +/// Return the first retained logical position in `section`. +#[inline] +fn first_in_section( + pruning_boundary: u64, + section: u64, + items_per_blob: u64, +) -> Result { + let start = section + .checked_mul(items_per_blob) + .ok_or(Error::OffsetOverflow)?; + if pruning_boundary > start { + Ok(pruning_boundary) + } else { + Ok(start) + } +} + +/// Maximum number of items a section's blob can physically hold. This is `items_per_blob` unless +/// the pruning boundary falls mid-section (from `init_at_size`), in which case the skipped prefix +/// reduces the capacity. +#[inline] +fn section_capacity( + pruning_boundary: u64, + section: u64, + items_per_blob: u64, +) -> Result { + let start = section + .checked_mul(items_per_blob) + .ok_or(Error::OffsetOverflow)?; + let skipped = first_in_section(pruning_boundary, section, items_per_blob)? + .checked_sub(start) + .ok_or(Error::OffsetOverflow)?; + items_per_blob + .checked_sub(skipped) + .ok_or(Error::OffsetOverflow) +} + /// Configuration for `Journal` storage. #[derive(Clone)] pub struct Config { @@ -85,8 +154,9 @@ pub struct Config { /// The maximum number of journal items to store in each blob. /// - /// Any unpruned historical blobs will contain exactly this number of items. - /// Only the newest blob may contain fewer items. + /// Retained non-tail blobs are expected to be full relative to their logical capacity. A + /// mid-section oldest blob may physically hold fewer than this many items, and the newest blob + /// may contain fewer items. pub items_per_blob: NonZeroU64, /// The page cache to use for caching data. @@ -104,17 +174,27 @@ struct Inner { /// Total number of items appended (not affected by pruning). size: u64, - /// If the journal's pruning boundary is mid-section (that is, the oldest retained item's - /// position is not a multiple of `items_per_blob`), then the metadata stores the pruning - /// boundary. Otherwise, the metadata is empty. + /// Stores the recovery watermark and, when the pruning boundary is mid-section, the exact + /// pruning boundary. Otherwise, the pruning-boundary entry is omitted. /// - /// When the journal is pruned, `metadata` must be persisted AFTER the inner journal is - /// persisted to ensure that its pruning boundary is never after the inner journal's size. + /// Metadata that advances the pruning boundary or recovery watermark is persisted only after + /// the blob state it describes is durable. A lower recovery watermark is always safe to persist + /// because it only expands the suffix external consumers may replay. If pruning metadata + /// disagrees with the oldest blob during recovery, the blob state wins. // TODO(#2939): Remove metadata metadata: Metadata>, /// The position before which all items have been pruned. pruning_boundary: u64, + + /// The earliest section modified since the last successful `commit()` or `sync()`. + dirty_from_section: Option, +} + +/// A deferred blob truncation to apply after metadata is persisted during init. +struct RecoveryRepair { + section: u64, + byte_offset: u64, } impl Inner { @@ -133,12 +213,8 @@ impl Inner { } let section = pos / items_per_blob; - let section_start = section * items_per_blob; - - // Calculate position within the blob. - // This accounts for sections that begin mid-section (pruning_boundary > section_start). - let first_in_section = self.pruning_boundary.max(section_start); - let pos_in_section = pos - first_in_section; + let pos_in_section = + pos - first_in_section(self.pruning_boundary, section, items_per_blob)?; self.journal .get(section, pos_in_section) @@ -168,9 +244,8 @@ impl Inner { return None; } let section = pos / items_per_blob; - let section_start = section * items_per_blob; - let first_in_section = self.pruning_boundary.max(section_start); - let pos_in_section = pos - first_in_section; + let pos_in_section = + pos - first_in_section(self.pruning_boundary, section, items_per_blob).ok()?; self.journal.try_get_sync_into(section, pos_in_section, buf) } } @@ -191,10 +266,10 @@ impl Inner { /// by the underlying [SegmentedJournal] during init. pub struct Journal { /// Inner state with segmented journal and size. - /// - /// Serializes persistence and write operations (`sync`, `append`, `prune`, `rewind`) to prevent - /// race conditions while allowing concurrent reads during sync. - inner: UpgradableAsyncRwLock>, + inner: AsyncRwLock>, + + /// Serializes writers with `commit()` and `sync()` so a plain rwlock is sufficient. + op_lock: AsyncMutex<()>, /// The maximum number of items per blob (section). items_per_blob: u64, @@ -290,8 +365,6 @@ impl super::Reader for Reader<'_, E, A> { let mut group_start = 0; while group_start < miss_positions.len() { let section = miss_positions[group_start] / items_per_blob; - let section_start = section * items_per_blob; - let first_in_section = pruning_boundary.max(section_start); let mut group_end = group_start + 1; while group_end < miss_positions.len() @@ -301,9 +374,10 @@ impl super::Reader for Reader<'_, E, A> { } let group_len = group_end - group_start; + let first_position = first_in_section(pruning_boundary, section, items_per_blob)?; let section_positions: Vec = miss_positions[group_start..group_end] .iter() - .map(|&pos| pos - first_in_section) + .map(|&pos| pos - first_position) .collect(); let buf = &mut reusable_buf[..group_len * chunk_size]; @@ -367,11 +441,8 @@ impl super::Reader for Reader<'_, E, A> { } let start_section = start_pos / items_per_blob; - let section_start = start_section * items_per_blob; - - // Calculate start position within the section. - let first_in_section = pruning_boundary.max(section_start); - let start_pos_in_section = start_pos - first_in_section; + let start_pos_in_section = + start_pos - first_in_section(pruning_boundary, start_section, items_per_blob)?; // Check all middle sections (not oldest, not tail) in range are complete. let journal = &self.guard.journal; @@ -393,11 +464,11 @@ impl super::Reader for Reader<'_, E, A> { // Transform (section, pos_in_section, item) to (global_pos, item). let stream = inner_stream.map(move |result| { - result.map(|(section, pos_in_section, item)| { - let section_start = section * items_per_blob; - let first_in_section = pruning_boundary.max(section_start); - let global_pos = first_in_section + pos_in_section; - (global_pos, item) + result.and_then(|(section, pos_in_section, item)| { + let global_pos = first_in_section(pruning_boundary, section, items_per_blob)? + .checked_add(pos_in_section) + .ok_or(Error::OffsetOverflow)?; + Ok((global_pos, item)) }) }); @@ -412,6 +483,123 @@ impl Journal { /// Size of each entry in bytes (as u64). pub const CHUNK_SIZE_U64: u64 = Self::CHUNK_SIZE as u64; + /// Mark all sections from `section` onward as dirty. + fn mark_dirty_from(inner: &mut Inner, section: u64) { + inner.dirty_from_section = Some( + inner + .dirty_from_section + .map_or(section, |existing| existing.min(section)), + ); + } + + /// Parse an optional u64 value from metadata. + fn parse_metadata_u64( + metadata: &Metadata>, + key: u64, + label: &'static str, + ) -> Result, Error> { + match metadata.get(&key) { + Some(bytes) => Ok(Some(u64::from_be_bytes( + bytes + .as_slice() + .try_into() + .map_err(|_| Error::Corruption(format!("invalid {label} metadata")))?, + ))), + None => Ok(None), + } + } + + /// Update pruning-boundary and recovery-watermark entries in metadata's in-memory state. + /// + /// Call `inner.metadata.sync()` separately to persist the updated entries. + fn update_metadata_entries( + inner: &mut Inner, + items_per_blob: u64, + pruning_boundary: u64, + recovery_watermark: u64, + ) -> Result<(), Error> { + let current_pruning = + Self::parse_metadata_u64(&inner.metadata, PRUNING_BOUNDARY_KEY, "pruning_boundary")?; + if !pruning_boundary.is_multiple_of(items_per_blob) { + if current_pruning != Some(pruning_boundary) { + inner.metadata.put( + PRUNING_BOUNDARY_KEY, + pruning_boundary.to_be_bytes().to_vec(), + ); + } + } else if current_pruning.is_some() { + inner.metadata.remove(&PRUNING_BOUNDARY_KEY); + } + + let current_watermark = Self::parse_metadata_u64( + &inner.metadata, + RECOVERY_WATERMARK_KEY, + "recovery_watermark", + )?; + if current_watermark != Some(recovery_watermark) { + inner.metadata.put( + RECOVERY_WATERMARK_KEY, + recovery_watermark.to_be_bytes().to_vec(), + ); + } + + Ok(()) + } + + /// Update and persist pruning-boundary and recovery-watermark metadata entries. + async fn persist_metadata_entries( + inner: &mut Inner, + items_per_blob: u64, + pruning_boundary: u64, + recovery_watermark: u64, + ) -> Result<(), Error> { + Self::update_metadata_entries(inner, items_per_blob, pruning_boundary, recovery_watermark)?; + inner.metadata.sync().await?; + Ok(()) + } + + /// Stage a recovery watermark no greater than `limit`. + /// + /// This is used before blob state moves backward so external consumers never see a persisted + /// recovery checkpoint beyond the rewind/clear target. + fn lower_recovery_watermark(inner: &mut Inner, limit: u64) -> Result { + let current_watermark = Self::parse_metadata_u64( + &inner.metadata, + RECOVERY_WATERMARK_KEY, + "recovery_watermark", + )?; + let Some(current) = current_watermark else { + return Ok(false); + }; + if current <= limit { + return Ok(false); + } + inner + .metadata + .put(RECOVERY_WATERMARK_KEY, limit.to_be_bytes().to_vec()); + Ok(true) + } + + /// Stage a recovery-watermark entry no greater than `limit` in raw metadata. + /// + /// This is used by `init_at_size` before it clears existing blobs, before an `Inner` exists. + #[commonware_macros::stability(ALPHA)] + fn update_metadata_watermark_before_clear( + metadata: &mut Metadata>, + limit: u64, + ) -> Result { + let Some(current_watermark) = + Self::parse_metadata_u64(metadata, RECOVERY_WATERMARK_KEY, "recovery_watermark")? + else { + return Ok(false); + }; + if current_watermark <= limit { + return Ok(false); + } + metadata.put(RECOVERY_WATERMARK_KEY, limit.to_be_bytes().to_vec()); + Ok(true) + } + /// Scan a partition and return blob names, treating a missing partition as empty. async fn scan_partition(context: &E, partition: &str) -> Result>, Error> { match context.scan(partition).await { @@ -462,195 +650,287 @@ impl Journal { }; let mut journal = SegmentedJournal::init(context.child("blobs"), segmented_cfg).await?; - // Initialize metadata store let meta_cfg = MetadataConfig { partition: format!("{}-metadata", cfg.partition), codec_config: ((0..).into(), ()), }; - let mut metadata = - Metadata::<_, u64, Vec>::init(context.child("meta"), meta_cfg).await?; + let metadata = Metadata::<_, u64, Vec>::init(context.child("meta"), meta_cfg).await?; - // Parse metadata if present - let meta_pruning_boundary = match metadata.get(&PRUNING_BOUNDARY_KEY) { - Some(bytes) => Some(u64::from_be_bytes(bytes.as_slice().try_into().map_err( - |_| Error::Corruption("invalid pruning_boundary metadata".into()), - )?)), - None => None, - }; + let meta_pruning_boundary = + Self::parse_metadata_u64(&metadata, PRUNING_BOUNDARY_KEY, "pruning_boundary")?; + let meta_recovery_watermark = + Self::parse_metadata_u64(&metadata, RECOVERY_WATERMARK_KEY, "recovery_watermark")?; - // Recover bounds from metadata and/or blobs - let (pruning_boundary, size, needs_metadata_update) = - Self::recover_bounds(&journal, items_per_blob, meta_pruning_boundary).await?; + let (pruning_boundary, size, recovery_watermark, repair) = Self::recover_bounds( + &mut journal, + items_per_blob, + meta_pruning_boundary, + meta_recovery_watermark, + ) + .await?; + + let mut inner = Inner { + journal, + size, + metadata, + pruning_boundary, + dirty_from_section: None, + }; + // Persist any lowered checkpoint before applying blob repairs that move recovered state + // backward. + Self::persist_metadata_entries( + &mut inner, + items_per_blob, + pruning_boundary, + recovery_watermark, + ) + .await?; - // Persist metadata if needed - if needs_metadata_update { - if pruning_boundary.is_multiple_of(items_per_blob) { - metadata.remove(&PRUNING_BOUNDARY_KEY); - } else { - metadata.put( - PRUNING_BOUNDARY_KEY, - pruning_boundary.to_be_bytes().to_vec(), - ); - } - metadata.sync().await?; + if let Some(repair) = repair { + inner + .journal + .rewind(repair.section, repair.byte_offset) + .await?; + inner.journal.sync(repair.section).await?; } - // Invariant: Tail blob must exist, even if empty. This ensures we can reconstruct size on - // reopen even after pruning all items. The tail blob is at `size / items_per_blob` (where - // the next append would go). let tail_section = size / items_per_blob; - journal.ensure_section_exists(tail_section).await?; + inner.journal.ensure_section_exists(tail_section).await?; let metrics = Metrics::new(context); metrics.update(size, pruning_boundary, items_per_blob); Ok(Self { - inner: UpgradableAsyncRwLock::new(Inner { - journal, - size, - metadata, - pruning_boundary, - }), + inner: AsyncRwLock::new(inner), + op_lock: AsyncMutex::new(()), items_per_blob, metrics, }) } - /// Returns (pruning_boundary, size, needs_metadata_update) based on metadata and blobs. - /// - /// If `meta_pruning_boundary` is `Some`, validates it against the physical blob state: - /// - If metadata is section-aligned, it's unnecessary and we use blob-based boundary - /// - If metadata refers to a pruned section, it's stale and we use blob-based boundary - /// - If metadata refers to a future section, it must have been written by [Self::clear_to_size] - /// or [Self::init_at_size] and crashed before writing the blobs. Fall back to blobs. - /// - Otherwise, metadata is valid and we use it - /// - /// If `meta_pruning_boundary` is `None`, computes bounds purely from blobs. + /// Recover `(pruning_boundary, size, recovery_watermark, repair)` from metadata and blob state. async fn recover_bounds( - inner: &SegmentedJournal, + inner: &mut SegmentedJournal, items_per_blob: u64, meta_pruning_boundary: Option, - ) -> Result<(u64, u64, bool), Error> { - // Blob-based boundary is always section-aligned - let blob_boundary = inner.oldest_section().map_or(0, |o| o * items_per_blob); + meta_recovery_watermark: Option, + ) -> Result<(u64, u64, u64, Option), Error> { + let blob_boundary = match inner.oldest_section() { + Some(oldest) => oldest + .checked_mul(items_per_blob) + .ok_or(Error::OffsetOverflow)?, + None => 0, + }; - let (pruning_boundary, needs_update) = match meta_pruning_boundary { - // Mid-section metadata: validate against blobs + // Determine the pruning boundary from metadata and blob state. + // + // PRUNING_BOUNDARY_KEY is only stored when the boundary falls mid-section. If present and + // it refers to the current oldest section, use it. If it refers to a different section + // (crash left stale metadata), fall back to the section-aligned blob boundary. Absence of + // the key just means the boundary is section-aligned. + // + // Staleness detection is one-sided: we can only tell metadata is stale when it names a + // section that no longer exists. If it names the current oldest section, we trust it. This + // is safe because prune persists metadata after blob state, so a crash before the metadata + // update means the newer boundary was never fully committed. + let mut pruning_metadata_stale = false; + let pruning_boundary = match meta_pruning_boundary { Some(meta_pruning_boundary) if !meta_pruning_boundary.is_multiple_of(items_per_blob) => { let meta_oldest_section = meta_pruning_boundary / items_per_blob; match inner.oldest_section() { None => { - // No blobs exist but metadata claims mid-section boundary. - // This can happen if we crash after inner.clear() but before - // ensure_section_exists(). Ignore stale metadata. warn!( meta_oldest_section, - "crash repair: no blobs exist, ignoring stale metadata" + "crash repair: no blobs exist, ignoring stale pruning metadata" ); - (blob_boundary, true) + pruning_metadata_stale = true; + blob_boundary } Some(oldest_section) if meta_oldest_section < oldest_section => { warn!( meta_oldest_section, - oldest_section, "crash repair: metadata stale, computing from blobs" + oldest_section, + "crash repair: pruning metadata stale, computing from blobs" ); - (blob_boundary, true) + pruning_metadata_stale = true; + blob_boundary } Some(oldest_section) if meta_oldest_section > oldest_section => { - // Metadata references a section ahead of the oldest blob. This can happen - // if we crash during clear_to_size/init_at_size after blobs update but - // before metadata update. Fall back to blob state. warn!( meta_oldest_section, oldest_section, - "crash repair: metadata ahead of blobs, computing from blobs" + "crash repair: pruning metadata ahead of blobs, computing from blobs" ); - (blob_boundary, true) + pruning_metadata_stale = true; + blob_boundary } - Some(_) => (meta_pruning_boundary, false), // valid mid-section metadata + Some(_) => meta_pruning_boundary, } } - // Section-aligned metadata: unnecessary, use blob-based - Some(_) => (blob_boundary, true), - // No metadata: use blob-based, no update needed - None => (blob_boundary, false), + _ => blob_boundary, }; - // Validate oldest section before computing size. + // Check oldest section for over-capacity corruption before recovery mode dispatch. Self::validate_oldest_section(inner, items_per_blob, pruning_boundary).await?; - let size = Self::compute_size(inner, items_per_blob, pruning_boundary).await?; - Ok((pruning_boundary, size, needs_update)) + // Perform any recovery if needed, computing journal size and recovery watermark. + let (size, repair) = match meta_recovery_watermark { + Some(_) => { + Self::recover_by_walking_lengths(inner, items_per_blob, pruning_boundary).await? + } + None if !pruning_metadata_stale => { + // No stale pruning metadata and no recovery watermark implies a legacy format. + Self::recover_legacy_size(inner, items_per_blob, pruning_boundary).await? + } + None => { + // Pruning metadata was stale, and there is no recovery watermark to preserve. + Self::recover_by_walking_lengths(inner, items_per_blob, pruning_boundary).await? + } + }; + let recovery_watermark = meta_recovery_watermark.unwrap_or(size).min(size); + + Ok((pruning_boundary, size, recovery_watermark, repair)) } - /// Validate that the oldest section has the expected number of items. - /// - /// Non-tail sections must be full from their logical start. The tail section - /// (oldest == newest) can be partially filled. + /// Check that the oldest section does not exceed its logical capacity. async fn validate_oldest_section( inner: &SegmentedJournal, items_per_blob: u64, pruning_boundary: u64, ) -> Result<(), Error> { - let (Some(oldest), Some(newest)) = (inner.oldest_section(), inner.newest_section()) else { - return Ok(()); // No sections to validate + let Some(oldest) = inner.oldest_section() else { + return Ok(()); }; - if oldest == newest { - return Ok(()); // Tail section, can be partial - } - let oldest_len = inner.section_len(oldest).await?; - let oldest_start = oldest * items_per_blob; - - let expected = if pruning_boundary > oldest_start { - // Mid-section boundary: items from pruning_boundary to section end - items_per_blob - (pruning_boundary - oldest_start) - } else { - // Section-aligned boundary: full section - items_per_blob - }; + let expected = section_capacity(pruning_boundary, oldest, items_per_blob)?; - if oldest_len != expected { + if oldest_len > expected { return Err(Error::Corruption(format!( - "oldest section {oldest} has wrong size: expected {expected} items, got {oldest_len}" + "oldest section {oldest} has too many items: expected at most {expected}, got {oldest_len}" ))); } Ok(()) } - /// Returns the total number of items ever appended (size), computed from the blobs. - async fn compute_size( + async fn section_len_within_capacity( inner: &SegmentedJournal, items_per_blob: u64, pruning_boundary: u64, - ) -> Result { + section: u64, + ) -> Result<(u64, u64), Error> { + let len = inner.section_len(section).await?; + let capacity = section_capacity(pruning_boundary, section, items_per_blob)?; + if len > capacity { + return Err(Error::Corruption(format!( + "section {section} has too many items: expected at most {capacity}, got {len}" + ))); + } + Ok((len, capacity)) + } + + /// Recover a legacy journal that has no RECOVERY_WATERMARK_KEY. + /// + /// Before the watermark key existed, writers synced each section before rolling over to the + /// next one. That lets valid legacy journals recover from the newest retained blob without + /// walking all retained sections. If the oldest non-tail section is already short, the legacy + /// invariant is violated and recovery keeps only the contiguous prefix. + async fn recover_legacy_size( + inner: &mut SegmentedJournal, + items_per_blob: u64, + pruning_boundary: u64, + ) -> Result<(u64, Option), Error> { + let Some(newest) = inner.newest_section() else { + return Ok((pruning_boundary, None)); + }; + let Some(oldest) = inner.oldest_section() else { + return Ok((pruning_boundary, None)); + }; + + let (oldest_len, oldest_capacity) = + Self::section_len_within_capacity(inner, items_per_blob, pruning_boundary, oldest) + .await?; + if oldest != newest && oldest_len < oldest_capacity { + // This cannot be a valid legacy state under the old rollover-sync rule, but walking + // lengths still recovers the contiguous prefix without trusting the stale size. + return Self::recover_by_walking_lengths(inner, items_per_blob, pruning_boundary).await; + } + + let (tail_len, _) = + Self::section_len_within_capacity(inner, items_per_blob, pruning_boundary, newest) + .await?; + let size = first_in_section(pruning_boundary, newest, items_per_blob)? + .checked_add(tail_len) + .ok_or(Error::OffsetOverflow)?; + Ok((size, None)) + } + + /// Recover by walking section lengths until the first short non-tail section. + /// + /// This is the normal current-format crash-repair path. Legacy recovery uses it only when the + /// old rollover invariant is already violated or pruning metadata was stale. + async fn recover_by_walking_lengths( + inner: &mut SegmentedJournal, + items_per_blob: u64, + pruning_boundary: u64, + ) -> Result<(u64, Option), Error> { let oldest = inner.oldest_section(); let newest = inner.newest_section(); let (Some(oldest), Some(newest)) = (oldest, newest) else { - return Ok(pruning_boundary); + return Ok((pruning_boundary, None)); }; + // The oldest section's capacity was already checked before recovery mode dispatch. + let oldest_len = inner.section_len(oldest).await?; + let expected_oldest = section_capacity(pruning_boundary, oldest, items_per_blob)?; + let mut size = pruning_boundary + .checked_add(oldest_len) + .ok_or(Error::OffsetOverflow)?; + if oldest == newest { - // Single section: count from pruning boundary - let tail_len = inner.section_len(newest).await?; - return Ok(pruning_boundary + tail_len); + return Ok((size, None)); } - // Multiple sections: sum actual item counts - let oldest_len = inner.section_len(oldest).await?; - let tail_len = inner.section_len(newest).await?; + if oldest_len < expected_oldest { + return Ok(( + size, + Some(RecoveryRepair { + section: oldest, + byte_offset: oldest_len + .checked_mul(Self::CHUNK_SIZE_U64) + .ok_or(Error::OffsetOverflow)?, + }), + )); + } - // Middle sections are assumed full - let middle_sections = newest - oldest - 1; - let middle_items = middle_sections * items_per_blob; + for section in oldest + 1..=newest { + let (len, capacity) = + Self::section_len_within_capacity(inner, items_per_blob, pruning_boundary, section) + .await?; + + size = size.checked_add(len).ok_or(Error::OffsetOverflow)?; + if len < capacity { + if section == newest { + return Ok((size, None)); + } + return Ok(( + size, + Some(RecoveryRepair { + section, + byte_offset: len + .checked_mul(Self::CHUNK_SIZE_U64) + .ok_or(Error::OffsetOverflow)?, + }), + )); + } + } - Ok(pruning_boundary + oldest_len + middle_items + tail_len) + Ok((size, None)) } /// Initialize a new `Journal` instance in a pruned state at a given size. @@ -689,7 +969,6 @@ impl Journal { write_buffer: cfg.write_buffer, }; - // Initialize both stores. let meta_cfg = MetadataConfig { partition: format!("{}-metadata", cfg.partition), codec_config: ((0..).into(), ()), @@ -698,33 +977,27 @@ impl Journal { Metadata::<_, u64, Vec>::init(context.child("meta"), meta_cfg).await?; let mut journal = SegmentedJournal::init(context.child("blobs"), segmented_cfg).await?; - // Clear blobs before updating metadata. - // This ordering is critical for crash safety: - // - Crash after clear: no blobs, recovery returns (0, 0), metadata ignored - // - Crash after create: old metadata triggers "metadata ahead" warning, - // recovery falls back to blob state. + if Self::update_metadata_watermark_before_clear(&mut metadata, size)? { + metadata.sync().await?; + } journal.clear().await?; journal.ensure_section_exists(tail_section).await?; - // Persist metadata if pruning_boundary is mid-section. - if !size.is_multiple_of(items_per_blob) { - metadata.put(PRUNING_BOUNDARY_KEY, size.to_be_bytes().to_vec()); - metadata.sync().await?; - } else if metadata.get(&PRUNING_BOUNDARY_KEY).is_some() { - metadata.remove(&PRUNING_BOUNDARY_KEY); - metadata.sync().await?; - } + let mut inner = Inner { + journal, + size, + metadata, + pruning_boundary: size, + dirty_from_section: None, + }; + Self::persist_metadata_entries(&mut inner, items_per_blob, size, size).await?; let metrics = Metrics::new(context); metrics.update(size, size, items_per_blob); Ok(Self { - inner: UpgradableAsyncRwLock::new(Inner { - journal, - size, - metadata, - pruning_boundary: size, // No data exists yet - }), + inner: AsyncRwLock::new(inner), + op_lock: AsyncMutex::new(()), items_per_blob, metrics, }) @@ -738,54 +1011,59 @@ impl Journal { (section, pos_in_section) } - /// Sync any pending updates to disk. + /// Fsync dirty sections under the read lock, allowing concurrent reads. + async fn fsync_dirty_sections(&self) -> Result<(), Error> { + let inner = self.inner.read().await; + if let Some(start_section) = inner.dirty_from_section { + let tail_section = inner.size / self.items_per_blob; + let start_section = inner + .journal + .oldest_section() + .map(|oldest| start_section.max(oldest)) + // With no retained blobs, any earlier dirty section was cleared or pruned. + // Syncing the tail section is harmless when it does not exist. + .unwrap_or(tail_section); + for section in start_section..=tail_section { + inner.journal.sync(section).await?; + } + } + Ok(()) + } + + /// Durably persists the current state of the structure. + /// + /// Does not advance the recovery watermark, so external consumers may need to replay entries + /// beyond the previous `sync()`. Use `sync()` to advance the watermark and to ensure that a + /// crash after this call doesn't require any recovery. + pub async fn commit(&self) -> Result<(), Error> { + let _timer = self.metrics.commit_timer(); + self.metrics.record_commit(); + let _op_guard = self.op_lock.lock().await; + self.fsync_dirty_sections().await?; + + let mut inner = self.inner.write().await; + inner.dirty_from_section = None; + Ok(()) + } + + /// Durably persist the current state of the structure, ensuring no recovery is required in the + /// event of a crash following this call. /// - /// Only the tail section can have pending updates since historical sections are synced - /// when they become full. + /// Advances the recovery watermark to the current size. pub async fn sync(&self) -> Result<(), Error> { let _timer = self.metrics.sync_timer(); self.metrics.sync_calls.inc(); - // Serialize with append/prune/rewind to ensure section selection is stable, while still allowing - // concurrent readers. - let inner = self.inner.upgradable_read().await; - - // Sync the tail section - let tail_section = inner.size / self.items_per_blob; - - // The tail section may not exist yet if the previous section was just filled, but syncing a - // non-existent section is safe (returns Ok). - inner.journal.sync(tail_section).await?; + let _op_guard = self.op_lock.lock().await; + self.fsync_dirty_sections().await?; - // Persist metadata only when pruning_boundary is mid-section. + let mut inner = self.inner.write().await; + inner.dirty_from_section = None; let pruning_boundary = inner.pruning_boundary; - let pruning_boundary_from_metadata = inner.metadata.get(&PRUNING_BOUNDARY_KEY).cloned(); - let put = if !pruning_boundary.is_multiple_of(self.items_per_blob) { - let needs_update = pruning_boundary_from_metadata - .is_none_or(|bytes| bytes.as_slice() != pruning_boundary.to_be_bytes()); - - if needs_update { - true - } else { - return Ok(()); - } - } else if pruning_boundary_from_metadata.is_some() { - false - } else { - return Ok(()); - }; + let size = inner.size; + Self::update_metadata_entries(&mut inner, self.items_per_blob, pruning_boundary, size)?; + drop(inner); - // Upgrade only for the metadata mutation; reads were allowed while syncing - // the tail section above. Downgrade before the metadata fsync to unblock readers. - let mut inner = inner.upgrade().await; - if put { - inner.metadata.put( - PRUNING_BOUNDARY_KEY, - pruning_boundary.to_be_bytes().to_vec(), - ); - } else { - inner.metadata.remove(&PRUNING_BOUNDARY_KEY); - } - let inner = inner.downgrade_to_upgradable(); + let inner = self.inner.read().await; inner.metadata.sync().await?; Ok(()) @@ -800,6 +1078,18 @@ impl Journal { } } + /// Return the recovery watermark. + pub(crate) async fn recovery_watermark(&self) -> u64 { + let inner = self.inner.read().await; + Self::parse_metadata_u64( + &inner.metadata, + RECOVERY_WATERMARK_KEY, + "recovery_watermark", + ) + .expect("valid recovery watermark metadata") + .expect("recovery watermark must exist after init") + } + /// Return the total number of items in the journal, irrespective of pruning. The next value /// appended to the journal will be at this position. pub async fn size(&self) -> u64 { @@ -853,8 +1143,10 @@ impl Journal { } } - // Mutating operations are serialized by taking the write guard. + let _op_guard = self.op_lock.lock().await; let mut inner = self.inner.write().await; + let first_dirty_section = inner.size / self.items_per_blob; + Self::mark_dirty_from(&mut inner, first_dirty_section); let mut written = 0; while written < items_count { let (section, pos_in_section) = self.position_to_section(inner.size); @@ -871,12 +1163,6 @@ impl Journal { written += batch_count; if inner.size.is_multiple_of(self.items_per_blob) { - // The section was filled and must be synced. Downgrade so readers can continue - // during the sync, but keep mutators blocked. After sync, upgrade again to - // create the next tail section before any append can proceed. - let inner_ref = inner.downgrade_to_upgradable(); - inner_ref.journal.sync(section).await?; - inner = inner_ref.upgrade().await; inner.journal.ensure_section_exists(section + 1).await?; } } @@ -891,10 +1177,12 @@ impl Journal { /// /// # Warnings /// - /// * This operation is not guaranteed to survive restarts until sync is called. + /// * This operation is not guaranteed to survive restarts until `commit()` or `sync()` is + /// called. /// * This operation is not atomic, but it will always leave the journal in a consistent state /// in the event of failure since blobs are always removed from newest to oldest. pub async fn rewind(&self, size: u64) -> Result<(), Error> { + let _op_guard = self.op_lock.lock().await; let mut inner = self.inner.write().await; match size.cmp(&inner.size) { @@ -908,15 +1196,24 @@ impl Journal { } let section = size / self.items_per_blob; - let section_start = section * self.items_per_blob; + let pos_in_section = + size - first_in_section(inner.pruning_boundary, section, self.items_per_blob)?; + let byte_offset = pos_in_section + .checked_mul(Self::CHUNK_SIZE_U64) + .ok_or(Error::OffsetOverflow)?; + + let should_sync_metadata = Self::lower_recovery_watermark(&mut inner, size)?; + drop(inner); - // Calculate offset within section for rewind - let first_in_section = inner.pruning_boundary.max(section_start); - let pos_in_section = size - first_in_section; - let byte_offset = pos_in_section * Self::CHUNK_SIZE_U64; + if should_sync_metadata { + let inner = self.inner.read().await; + inner.metadata.sync().await?; + } + let mut inner = self.inner.write().await; inner.journal.rewind(section, byte_offset).await?; inner.size = size; + Self::mark_dirty_from(&mut inner, section); self.metrics .update(inner.size, inner.pruning_boundary, self.items_per_blob); @@ -936,6 +1233,7 @@ impl Journal { /// Note that this operation may NOT be atomic, however it's guaranteed not to leave gaps in the /// event of failure as items are always pruned in order from oldest to newest. pub async fn prune(&self, min_item_pos: u64) -> Result { + let _op_guard = self.op_lock.lock().await; let mut inner = self.inner.write().await; // Calculate the section that would contain min_item_pos @@ -958,6 +1256,9 @@ impl Journal { // Pruning boundary only moves forward assert!(inner.pruning_boundary < new_oldest * self.items_per_blob); inner.pruning_boundary = new_oldest * self.items_per_blob; + if let Some(dirty_from) = inner.dirty_from_section { + inner.dirty_from_section = Some(dirty_from.max(new_oldest)); + } self.metrics .update(inner.size, inner.pruning_boundary, self.items_per_blob); } @@ -986,28 +1287,26 @@ impl Journal { /// If a crash occurs during this operation, `init()` will recover to a consistent state /// (though possibly different from the intended `new_size`). pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> { - // Clear blobs before updating metadata. - // This ordering is critical for crash safety: - // - Crash after clear: no blobs, recovery returns (0, 0), metadata ignored - // - Crash after create: old metadata triggers "metadata ahead" warning, - // recovery falls back to blob state + let _op_guard = self.op_lock.lock().await; + let mut inner = self.inner.write().await; + + let should_sync_metadata = Self::lower_recovery_watermark(&mut inner, new_size)?; + drop(inner); + + if should_sync_metadata { + let inner = self.inner.read().await; + inner.metadata.sync().await?; + } + let mut inner = self.inner.write().await; inner.journal.clear().await?; let tail_section = new_size / self.items_per_blob; inner.journal.ensure_section_exists(tail_section).await?; inner.size = new_size; - inner.pruning_boundary = new_size; // No data exists - - // Persist metadata only when pruning_boundary is mid-section. - if !inner.pruning_boundary.is_multiple_of(self.items_per_blob) { - let value = inner.pruning_boundary.to_be_bytes().to_vec(); - inner.metadata.put(PRUNING_BOUNDARY_KEY, value); - inner.metadata.sync().await?; - } else if inner.metadata.get(&PRUNING_BOUNDARY_KEY).is_some() { - inner.metadata.remove(&PRUNING_BOUNDARY_KEY); - inner.metadata.sync().await?; - } + inner.pruning_boundary = new_size; + inner.dirty_from_section = None; + Self::persist_metadata_entries(&mut inner, self.items_per_blob, new_size, new_size).await?; self.metrics .update(inner.size, inner.pruning_boundary, self.items_per_blob); @@ -1039,6 +1338,17 @@ impl Journal { let inner = self.inner.read().await; inner.journal.newest_section() } + + /// Test helper: Set and persist the recovery watermark directly. + #[cfg(test)] + pub(crate) async fn test_set_recovery_watermark(&self, watermark: u64) -> Result<(), Error> { + let mut inner = self.inner.write().await; + inner + .metadata + .put(RECOVERY_WATERMARK_KEY, watermark.to_be_bytes().to_vec()); + inner.metadata.sync().await?; + Ok(()) + } } // Implement Contiguous trait for fixed-length journals @@ -1076,7 +1386,7 @@ impl Persistable for Journal { type Error = Error; async fn commit(&self) -> Result<(), Error> { - self.sync().await + self.commit().await } async fn sync(&self) -> Result<(), Error> { @@ -1120,9 +1430,11 @@ impl crate::journal::authenticated::Inner fo #[cfg(test)] mod tests { use super::*; + use commonware_codec::FixedSize; use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256}; use commonware_macros::test_traced; use commonware_runtime::{ + buffer::paged::Append, deterministic::{self, Context}, Blob, BufferPooler, Error as RuntimeError, Metrics as _, Runner, Storage, Supervisor as _, }; @@ -1571,10 +1883,8 @@ mod tests { journal.sync().await.expect("Failed to sync journal"); drop(journal); - // Manually truncate a non-tail blob to make sure it's detected during initialization. - // The segmented journal will trim the incomplete blob on init, resulting in the blob - // missing one item. This should be detected during init because all non-tail blobs - // must be full. + // Manually truncate a non-tail blob. Recovery should keep the contiguous prefix up to + // the shortened section and discard newer sections. let (blob, size) = context .open(&blob_partition(&cfg), &40u64.to_be_bytes()) .await @@ -1582,29 +1892,14 @@ mod tests { blob.resize(size - 1).await.expect("Failed to corrupt blob"); blob.sync().await.expect("Failed to sync blob"); - // The segmented journal will trim the incomplete blob on init, resulting in the blob - // missing one item. This should be detected as corruption during replay. let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) .await - .expect("failed to initialize journal"); - - // Journal size is computed from the tail section, so it's unchanged - // despite the corruption in section 40. - let expected_size = ITEMS_PER_BLOB.get() * 100 + ITEMS_PER_BLOB.get() / 2; - assert_eq!(journal.size().await, expected_size); - - // Replay should detect corruption (incomplete section) in section 40 - let reader = journal.reader().await; - match reader.replay(NZUsize!(1024), 0).await { - Err(Error::Corruption(msg)) => { - assert!( - msg.contains("section 40"), - "Error should mention section 40, got: {msg}" - ); - } - Err(e) => panic!("Expected Corruption error for section 40, got: {:?}", e), - Ok(_) => panic!("Expected replay to fail with corruption"), - }; + .expect("failed to recover journal"); + let expected_size = 40 * ITEMS_PER_BLOB.get() + 6; + assert_eq!(journal.bounds().await, 0..expected_size); + assert_eq!(journal.recovery_watermark().await, expected_size); + assert_eq!(journal.test_newest_section().await, Some(40)); + journal.destroy().await.unwrap(); }); } @@ -1630,25 +1925,16 @@ mod tests { .await .expect("failed to remove blob"); - // Init won't detect the corruption. - let result = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) .await - .expect("init shouldn't fail"); - - // But replay will. - let reader = result.reader().await; - match reader.replay(NZUsize!(1024), 0).await { - Err(Error::Corruption(_)) => {} - Err(err) => panic!("expected Corruption, got: {err}"), - Ok(_) => panic!("expected Corruption, got ok"), - }; - - // As will trying to read an item that was in the deleted blob. - match result.read(2).await { - Err(Error::Corruption(_)) => {} - Err(err) => panic!("expected Corruption, got: {err}"), - Ok(_) => panic!("expected Corruption, got ok"), - }; + .expect("failed to recover journal"); + assert_eq!(journal.bounds().await, 0..2); + assert_eq!(journal.recovery_watermark().await, 2); + assert!(matches!( + journal.read(2).await, + Err(Error::ItemOutOfRange(2)) + )); + journal.destroy().await.unwrap(); }); } @@ -1803,9 +2089,10 @@ mod tests { // The truncation invalidates the last page, which is removed. This loses one item. assert_eq!(journal.pruning_boundary().await, 0); assert_eq!(journal.size().await, 4); + assert_eq!(journal.recovery_watermark().await, 4); drop(journal); - // Delete the second blob and re-init + // Delete the second blob and re-init. Recovery keeps the contiguous prefix. context .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes())) .await @@ -1813,51 +2100,598 @@ mod tests { let journal = Journal::<_, Digest>::init(context.child("third"), cfg.clone()) .await - .expect("Failed to re-initialize journal"); - // Only the first blob remains - assert_eq!(journal.size().await, 3); - + .expect("Failed to recover journal"); + assert_eq!(journal.bounds().await, 0..3); + assert_eq!(journal.recovery_watermark().await, 3); + assert!(matches!( + journal.read(3).await, + Err(Error::ItemOutOfRange(3)) + )); journal.destroy().await.unwrap(); }); } #[test_traced] - fn test_fixed_journal_recover_detects_oldest_section_too_short() { + fn test_fixed_journal_init_persists_trailing_item_repair() { let executor = deterministic::Runner::default(); - executor.start(|context| async move { - let cfg = test_cfg(&context, NZU64!(5)); - let journal = - Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7) + let ((blob_partition, expected_size), checkpoint) = + executor.start_and_recover(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let blob_partition = blob_partition(&cfg); + let journal = Journal::init(context.child("first"), cfg.clone()) .await - .expect("failed to initialize journal at size"); + .unwrap(); - // Append items so section 1 has exactly the expected minimum (3 items). - for i in 0..8u64 { - journal - .append(&test_digest(100 + i)) - .await - .expect("failed to append data"); - } - journal.sync().await.expect("failed to sync journal"); - assert_eq!(journal.pruning_boundary().await, 7); - assert_eq!(journal.size().await, 15); - drop(journal); + for i in 0..3 { + journal.append(&test_digest(i)).await.unwrap(); + } + journal.sync().await.unwrap(); + drop(journal); - // Corrupt the oldest section by truncating one byte (drops one item on recovery). - let (blob, size) = context - .open(&blob_partition(&cfg), &1u64.to_be_bytes()) + let (blob, raw_size) = context + .open(&blob_partition, &0u64.to_be_bytes()) + .await + .unwrap(); + let append = Append::new( + blob, + raw_size, + 2048, + CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE), + ) .await - .expect("failed to open oldest blob"); - blob.resize(size - 1).await.expect("failed to corrupt blob"); - blob.sync().await.expect("failed to sync blob"); + .unwrap(); + let logical_size = append.size().await; + assert_eq!(logical_size, 3 * Digest::SIZE as u64); + append.resize(logical_size - 1).await.unwrap(); + append.sync().await.unwrap(); + drop(append); - let result = Journal::<_, Digest>::init(context.child("second"), cfg.clone()).await; - assert!(matches!(result, Err(Error::Corruption(_)))); + let journal = Journal::<_, Digest>::init(context.child("second"), cfg) + .await + .unwrap(); + assert_eq!(journal.size().await, 2); + drop(journal); + + (blob_partition, 2 * Digest::SIZE as u64) + }); + + deterministic::Runner::from(checkpoint).start(move |context| async move { + let (blob, raw_size) = context + .open(&blob_partition, &0u64.to_be_bytes()) + .await + .unwrap(); + let append = Append::new( + blob, + raw_size, + 2048, + CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE), + ) + .await + .unwrap(); + assert_eq!(append.size().await, expected_size); }); } #[test_traced] - fn test_fixed_journal_recover_to_empty_from_partial_write() { + fn test_fixed_journal_recover_accepts_clean_short_tail() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let segmented_cfg = SegmentedConfig { + partition: blob_partition(&cfg), + page_cache: cfg.page_cache.clone(), + write_buffer: cfg.write_buffer, + }; + let mut inner = + SegmentedJournal::<_, Digest>::init(context.child("blobs"), segmented_cfg) + .await + .unwrap(); + + for i in 0..5 { + inner.append(0, &test_digest(i)).await.unwrap(); + } + for i in 5..7 { + inner.append(1, &test_digest(i)).await.unwrap(); + } + inner.sync(0).await.unwrap(); + inner.sync(1).await.unwrap(); + + let (size, repair) = Journal::<_, Digest>::recover_by_walking_lengths(&mut inner, 5, 0) + .await + .unwrap(); + assert_eq!(size, 7); + assert!(repair.is_none()); + inner.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_recover_accepts_clean_empty_tail() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let segmented_cfg = SegmentedConfig { + partition: blob_partition(&cfg), + page_cache: cfg.page_cache.clone(), + write_buffer: cfg.write_buffer, + }; + let mut inner = + SegmentedJournal::<_, Digest>::init(context.child("blobs"), segmented_cfg) + .await + .unwrap(); + + for i in 0..5 { + inner.append(0, &test_digest(i)).await.unwrap(); + } + inner.ensure_section_exists(1).await.unwrap(); + inner.sync(0).await.unwrap(); + inner.sync(1).await.unwrap(); + + let (size, repair) = Journal::<_, Digest>::recover_by_walking_lengths(&mut inner, 5, 0) + .await + .unwrap(); + assert_eq!(size, 5); + assert!(repair.is_none()); + inner.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_recover_truncates_short_oldest_section() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let journal = + Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7) + .await + .expect("failed to initialize journal at size"); + + // Append items so section 1 has exactly the expected minimum (3 items). + for i in 0..8u64 { + journal + .append(&test_digest(100 + i)) + .await + .expect("failed to append data"); + } + journal.sync().await.expect("failed to sync journal"); + assert_eq!(journal.pruning_boundary().await, 7); + assert_eq!(journal.size().await, 15); + drop(journal); + + // Corrupt the oldest section by truncating one byte (drops one item on recovery). + let (blob, size) = context + .open(&blob_partition(&cfg), &1u64.to_be_bytes()) + .await + .expect("failed to open oldest blob"); + blob.resize(size - 1).await.expect("failed to corrupt blob"); + blob.sync().await.expect("failed to sync blob"); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + .await + .expect("failed to recover journal"); + assert_eq!(journal.bounds().await, 7..9); + assert_eq!(journal.recovery_watermark().await, 9); + assert_eq!(journal.read(7).await.unwrap(), test_digest(100)); + assert_eq!(journal.read(8).await.unwrap(), test_digest(101)); + assert!(matches!( + journal.read(9).await, + Err(Error::ItemOutOfRange(9)) + )); + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_recover_fallback_truncates_after_short_oldest_section() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let journal = + Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7) + .await + .expect("failed to initialize journal at size"); + + for i in 0..8u64 { + journal + .append(&test_digest(100 + i)) + .await + .expect("failed to append data"); + } + journal.sync().await.expect("failed to sync journal"); + assert_eq!(journal.bounds().await, 7..15); + + { + let mut inner = journal.inner.write().await; + inner + .metadata + .put(RECOVERY_WATERMARK_KEY, 6u64.to_be_bytes().to_vec()); + inner + .metadata + .sync() + .await + .expect("failed to sync stale recovery watermark"); + } + drop(journal); + + let (blob, size) = context + .open(&blob_partition(&cfg), &1u64.to_be_bytes()) + .await + .expect("failed to open oldest blob"); + blob.resize(size - 1).await.expect("failed to corrupt blob"); + blob.sync().await.expect("failed to sync blob"); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + .await + .expect("failed to recover journal"); + assert_eq!(journal.bounds().await, 7..9); + assert_eq!(journal.read(7).await.unwrap(), test_digest(100)); + assert_eq!(journal.read(8).await.unwrap(), test_digest(101)); + assert!(matches!( + journal.read(9).await, + Err(Error::ItemOutOfRange(9)) + )); + assert_eq!(journal.test_oldest_section().await, Some(1)); + assert_eq!(journal.inner.read().await.journal.newest_section(), Some(1)); + + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_stale_pruning_metadata_preserves_watermark() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let journal = + Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7) + .await + .expect("failed to initialize journal at size"); + + for i in 0..10u64 { + journal + .append(&test_digest(i)) + .await + .expect("failed to append data"); + } + journal.sync().await.expect("failed to sync journal"); + assert_eq!(journal.bounds().await, 7..17); + + { + let mut inner = journal.inner.write().await; + inner + .journal + .rewind_section(2, 2 * Digest::SIZE as u64) + .await + .expect("failed to shorten anchored section"); + inner + .journal + .sync(2) + .await + .expect("failed to sync shortened anchored section"); + inner + .metadata + .put(RECOVERY_WATERMARK_KEY, 12u64.to_be_bytes().to_vec()); + inner + .metadata + .sync() + .await + .expect("failed to sync recovery watermark"); + } + drop(journal); + + // Remove the metadata's oldest section so PRUNING_BOUNDARY_KEY=7 is stale. The + // watermark is preserved because length-based recovery ends at the same point. + context + .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes())) + .await + .expect("failed to remove stale oldest section"); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + .await + .expect("failed to recover journal"); + assert_eq!(journal.bounds().await, 10..12); + assert_eq!(journal.recovery_watermark().await, 12); + assert_eq!(journal.read(10).await.unwrap(), test_digest(3)); + assert_eq!(journal.read(11).await.unwrap(), test_digest(4)); + assert!(matches!( + journal.read(12).await, + Err(Error::ItemOutOfRange(12)) + )); + + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_stale_pruning_metadata_without_watermark_walks_lengths() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let journal = + Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7) + .await + .expect("failed to initialize journal at size"); + + for i in 0..10u64 { + journal + .append(&test_digest(i)) + .await + .expect("failed to append data"); + } + journal.sync().await.expect("failed to sync journal"); + assert_eq!(journal.bounds().await, 7..17); + + { + let mut inner = journal.inner.write().await; + inner.metadata.remove(&RECOVERY_WATERMARK_KEY); + inner + .metadata + .sync() + .await + .expect("failed to remove recovery watermark"); + } + drop(journal); + + // Remove the metadata's oldest section so PRUNING_BOUNDARY_KEY=7 is stale. Without a + // recovery watermark, recovery must still walk lengths from the recovered blob boundary. + context + .remove(&blob_partition(&cfg), Some(&1u64.to_be_bytes())) + .await + .expect("failed to remove stale oldest section"); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + .await + .expect("failed to recover journal"); + assert_eq!(journal.bounds().await, 10..17); + assert_eq!(journal.recovery_watermark().await, 17); + assert_eq!(journal.read(10).await.unwrap(), test_digest(3)); + assert_eq!(journal.read(16).await.unwrap(), test_digest(9)); + assert!(matches!( + journal.read(17).await, + Err(Error::ItemOutOfRange(17)) + )); + + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_legacy_recovery_installs_watermark() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone()) + .await + .expect("failed to initialize journal"); + + for i in 0..12u64 { + journal + .append(&test_digest(i)) + .await + .expect("failed to append data"); + } + journal.sync().await.expect("failed to sync journal"); + + { + let mut inner = journal.inner.write().await; + inner.metadata.remove(&RECOVERY_WATERMARK_KEY); + inner + .metadata + .sync() + .await + .expect("failed to remove recovery watermark"); + } + drop(journal); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + .await + .expect("failed to recover legacy journal"); + assert_eq!(journal.bounds().await, 0..12); + assert_eq!(journal.recovery_watermark().await, 12); + drop(journal); + + let meta_cfg = MetadataConfig { + partition: format!("{}-metadata", cfg.partition), + codec_config: ((0..).into(), ()), + }; + let metadata = Metadata::<_, u64, Vec>::init(context.child("metadata"), meta_cfg) + .await + .expect("failed to reopen metadata"); + let raw_watermark = metadata + .get(&RECOVERY_WATERMARK_KEY) + .expect("missing recovery watermark after legacy recovery"); + let persisted_watermark = + u64::from_be_bytes(raw_watermark.as_slice().try_into().unwrap()); + assert_eq!(persisted_watermark, 12); + drop(metadata); + + let journal = Journal::<_, Digest>::init(context.child("third"), cfg.clone()) + .await + .expect("failed to reopen upgraded journal"); + assert_eq!(journal.bounds().await, 0..12); + assert_eq!(journal.recovery_watermark().await, 12); + + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_update_metadata_watermark_before_clear_lowers_only() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let meta_cfg = MetadataConfig { + partition: format!("{}-metadata", cfg.partition), + codec_config: ((0..).into(), ()), + }; + let mut metadata = + Metadata::<_, u64, Vec>::init(context.child("metadata"), meta_cfg) + .await + .expect("failed to initialize metadata"); + metadata.put(RECOVERY_WATERMARK_KEY, 7u64.to_be_bytes().to_vec()); + + let changed = + Journal::<_, Digest>::update_metadata_watermark_before_clear(&mut metadata, 9) + .expect("failed to update metadata watermark"); + assert!(!changed); + let raw_watermark = metadata + .get(&RECOVERY_WATERMARK_KEY) + .expect("missing recovery watermark"); + let persisted_watermark = + u64::from_be_bytes(raw_watermark.as_slice().try_into().unwrap()); + assert_eq!(persisted_watermark, 7); + + let changed = + Journal::<_, Digest>::update_metadata_watermark_before_clear(&mut metadata, 5) + .expect("failed to update metadata watermark"); + assert!(changed); + let raw_watermark = metadata + .get(&RECOVERY_WATERMARK_KEY) + .expect("missing recovery watermark"); + let persisted_watermark = + u64::from_be_bytes(raw_watermark.as_slice().try_into().unwrap()); + assert_eq!(persisted_watermark, 5); + }); + } + + #[test_traced] + fn test_fixed_journal_prune_to_blob_boundary_removes_pruning_metadata() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let journal = + Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 7) + .await + .expect("failed to initialize journal at size"); + + for i in 0..8u64 { + journal + .append(&test_digest(i)) + .await + .expect("failed to append data"); + } + journal.sync().await.expect("failed to sync journal"); + assert_eq!(journal.bounds().await, 7..15); + + journal.prune(10).await.expect("failed to prune journal"); + journal.sync().await.expect("failed to sync pruned journal"); + assert_eq!(journal.bounds().await, 10..15); + drop(journal); + + let meta_cfg = MetadataConfig { + partition: format!("{}-metadata", cfg.partition), + codec_config: ((0..).into(), ()), + }; + let metadata = Metadata::<_, u64, Vec>::init(context.child("metadata"), meta_cfg) + .await + .expect("failed to reopen metadata"); + assert!(metadata.get(&PRUNING_BOUNDARY_KEY).is_none()); + drop(metadata); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + .await + .expect("failed to reopen journal"); + assert_eq!(journal.bounds().await, 10..15); + assert_eq!(journal.read(10).await.unwrap(), test_digest(3)); + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_recover_rejects_overlong_section() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone()) + .await + .expect("failed to initialize journal"); + + for i in 0..5u64 { + journal + .append(&test_digest(i)) + .await + .expect("failed to append data"); + } + journal.sync().await.expect("failed to sync journal"); + + { + let extra = test_digest(99); + let mut inner = journal.inner.write().await; + inner + .journal + .append_raw(0, extra.as_ref()) + .await + .expect("failed to append extra item"); + inner + .journal + .sync(0) + .await + .expect("failed to sync corrupted section"); + } + drop(journal); + + let result = Journal::<_, Digest>::init(context.child("second"), cfg.clone()).await; + assert!(matches!(result, Err(Error::Corruption(_)))); + }); + } + + #[test_traced] + fn test_fixed_journal_recover_truncates_short_middle_before_watermark() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone()) + .await + .expect("failed to initialize journal"); + + for i in 0..15u64 { + journal + .append(&test_digest(i)) + .await + .expect("failed to append data"); + } + journal.sync().await.expect("failed to sync journal"); + assert_eq!(journal.recovery_watermark().await, 15); + + { + let mut inner = journal.inner.write().await; + inner + .journal + .rewind_section(1, 4 * Digest::SIZE as u64) + .await + .expect("failed to shorten middle section"); + inner + .journal + .sync(1) + .await + .expect("failed to sync shortened middle section"); + } + drop(journal); + + // Remove the empty tail so the watermark points beyond newest. Recovery now keeps the + // contiguous prefix up to the short section and lowers the watermark. + context + .remove(&blob_partition(&cfg), Some(&3u64.to_be_bytes())) + .await + .expect("failed to remove tail section"); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + .await + .expect("failed to recover journal"); + assert_eq!(journal.bounds().await, 0..9); + assert_eq!(journal.recovery_watermark().await, 9); + assert_eq!(journal.read(8).await.unwrap(), test_digest(8)); + assert!(matches!( + journal.read(9).await, + Err(Error::ItemOutOfRange(9)) + )); + + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_recover_to_empty_from_partial_write() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // Initialize the journal, allowing a max of 10 items per blob. @@ -1893,6 +2727,7 @@ mod tests { let bounds = journal.bounds().await; assert_eq!(bounds.end, 0); assert!(bounds.is_empty()); + assert_eq!(journal.recovery_watermark().await, 0); // Make sure journal still works for appending. journal .append(&test_digest(0)) @@ -2057,6 +2892,171 @@ mod tests { }); } + #[test_traced] + fn test_fixed_journal_rewind_commit_reopen() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone()) + .await + .expect("failed to initialize journal"); + + for i in 0..12u64 { + journal + .append(&test_digest(i)) + .await + .expect("failed to append data"); + } + journal.sync().await.expect("failed to sync journal"); + + journal.rewind(7).await.expect("failed to rewind journal"); + journal.commit().await.expect("failed to commit journal"); + drop(journal); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + .await + .expect("failed to re-initialize journal"); + assert_eq!(journal.bounds().await, 0..7); + for i in 0..7u64 { + assert_eq!(journal.read(i).await.unwrap(), test_digest(i)); + } + assert!(matches!( + journal.read(7).await, + Err(Error::ItemOutOfRange(7)) + )); + + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_rewind_persists_lower_watermark() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone()) + .await + .expect("failed to initialize journal"); + + for i in 0..12u64 { + journal + .append(&test_digest(i)) + .await + .expect("failed to append data"); + } + journal.sync().await.expect("failed to sync journal"); + journal.rewind(7).await.expect("failed to rewind journal"); + drop(journal); + + let meta_cfg = MetadataConfig { + partition: format!("{}-metadata", cfg.partition), + codec_config: ((0..).into(), ()), + }; + let metadata = Metadata::<_, u64, Vec>::init(context.child("metadata"), meta_cfg) + .await + .expect("failed to reopen metadata"); + let raw_watermark = metadata + .get(&RECOVERY_WATERMARK_KEY) + .expect("missing recovery watermark after rewind"); + let persisted_watermark = + u64::from_be_bytes(raw_watermark.as_slice().try_into().unwrap()); + assert_eq!(persisted_watermark, 7); + drop(metadata); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + .await + .expect("failed to re-initialize journal"); + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_recover_after_watermark_lowered_before_rewind() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone()) + .await + .expect("failed to initialize journal"); + + for i in 0..12u64 { + journal + .append(&test_digest(i)) + .await + .expect("failed to append data"); + } + journal.sync().await.expect("failed to sync journal"); + + { + let mut inner = journal.inner.write().await; + inner + .metadata + .put(RECOVERY_WATERMARK_KEY, 7u64.to_be_bytes().to_vec()); + inner + .metadata + .sync() + .await + .expect("failed to lower recovery watermark"); + } + drop(journal); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + .await + .expect("failed to recover journal"); + assert_eq!(journal.bounds().await, 0..12); + assert_eq!(journal.recovery_watermark().await, 7); + assert_eq!(journal.read(11).await.unwrap(), test_digest(11)); + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_rewind_append_commit_reopen() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone()) + .await + .expect("failed to initialize journal"); + + for i in 0..12u64 { + journal + .append(&test_digest(i)) + .await + .expect("failed to append data"); + } + journal.sync().await.expect("failed to sync journal"); + + journal.rewind(7).await.expect("failed to rewind journal"); + for i in 0..3u64 { + journal + .append(&test_digest(100 + i)) + .await + .expect("failed to append data"); + } + journal.commit().await.expect("failed to commit journal"); + drop(journal); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + .await + .expect("failed to re-initialize journal"); + assert_eq!(journal.bounds().await, 0..10); + assert_eq!(journal.recovery_watermark().await, 7); + for i in 0..7u64 { + assert_eq!(journal.read(i).await.unwrap(), test_digest(i)); + } + for i in 0..3u64 { + assert_eq!(journal.read(7 + i).await.unwrap(), test_digest(100 + i)); + } + assert!(matches!( + journal.read(10).await, + Err(Error::ItemOutOfRange(10)) + )); + + journal.destroy().await.unwrap(); + }); + } + /// Test recovery when blob is truncated to a page boundary with item size not dividing page size. /// /// This tests the scenario where: @@ -2126,6 +3126,7 @@ mod tests { "Journal should recover to {} items after truncation", expected_items ); + assert_eq!(journal.recovery_watermark().await, expected_items); // Verify we can still read the remaining items for i in 0..expected_items { @@ -2441,6 +3442,48 @@ mod tests { }); } + #[test_traced] + fn test_fixed_journal_append_many_after_mid_section_start() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(100)); + let journal = + Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 150) + .await + .unwrap(); + + let items: Vec<_> = (0..100u64).map(|i| test_digest(1500 + i)).collect(); + let last = journal.append_many(Many::Flat(&items)).await.unwrap(); + assert_eq!(last, 249); + assert_eq!(journal.bounds().await, 150..250); + + for (position, index) in [(150, 0), (199, 49), (200, 50), (249, 99)] { + assert_eq!( + journal.read(position).await.unwrap(), + items[index], + "item at position {position} did not match" + ); + } + + journal.sync().await.unwrap(); + drop(journal); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + .await + .unwrap(); + assert_eq!(journal.bounds().await, 150..250); + for (position, index) in [(150, 0), (199, 49), (200, 50), (249, 99)] { + assert_eq!( + journal.read(position).await.unwrap(), + items[index], + "item at position {position} did not match after reopen" + ); + } + + journal.destroy().await.unwrap(); + }); + } + #[test_traced] fn test_fixed_journal_init_at_size_persistence() { let executor = deterministic::Runner::default(); @@ -2664,10 +3707,7 @@ mod tests { for i in 0..5u64 { journal.append(&test_digest(i)).await.unwrap(); } - let inner = journal.inner.read().await; - let tail_section = inner.size / journal.items_per_blob; - inner.journal.sync(tail_section).await.unwrap(); - drop(inner); + journal.commit().await.unwrap(); drop(journal); let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) @@ -2681,7 +3721,7 @@ mod tests { } #[test_traced] - fn test_fixed_journal_oldest_section_invalid_len() { + fn test_fixed_journal_missing_mid_section_metadata_truncates_oldest() { // Old meta = None (aligned), new boundary = mid-section. let executor = deterministic::Runner::default(); executor.start(|context| async move { @@ -2703,16 +3743,14 @@ mod tests { drop(inner); drop(journal); - // Section 1 has items 7,8,9 but metadata is missing, so falls back to blob-based boundary. - // Section 1 has 3 items, but recovery thinks it should have 5 because metadata deletion - // causes us to forget that section 1 starts at logical position 7. - let result = Journal::<_, Digest>::init(context.child("second"), cfg.clone()).await; - assert!(matches!(result, Err(Error::Corruption(_)))); - context.remove(&blob_partition(&cfg), None).await.unwrap(); - context - .remove(&format!("{}-metadata", cfg.partition), None) + // Section 1 has items 7,8,9 but metadata is missing, so recovery falls back to the + // section-aligned blob boundary and keeps only the contiguous prefix. + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) .await - .unwrap(); + .expect("failed to recover journal"); + assert_eq!(journal.bounds().await, 5..8); + assert_eq!(journal.recovery_watermark().await, 8); + journal.destroy().await.unwrap(); }); } @@ -2729,10 +3767,7 @@ mod tests { for i in 0..3u64 { journal.append(&test_digest(i)).await.unwrap(); } - let inner = journal.inner.read().await; - let tail_section = inner.size / journal.items_per_blob; - inner.journal.sync(tail_section).await.unwrap(); - drop(inner); + journal.commit().await.unwrap(); drop(journal); let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) @@ -2760,10 +3795,7 @@ mod tests { assert_eq!(journal.size().await, 17); journal.prune(10).await.unwrap(); - let inner = journal.inner.read().await; - let tail_section = inner.size / journal.items_per_blob; - inner.journal.sync(tail_section).await.unwrap(); - drop(inner); + journal.commit().await.unwrap(); drop(journal); let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) @@ -3022,6 +4054,81 @@ mod tests { }); } + #[test_traced] + fn test_fixed_journal_clear_to_size_crash_aligned_metadata() { + // Regression: when the old pruning boundary was section-aligned, + // PRUNING_BOUNDARY_KEY is absent. A crash during clear_to_size after + // blobs are recreated but before metadata sync leaves a stale + // RECOVERY_WATERMARK_KEY with no positive conflict signal from the pruning key. + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + + // Start with an aligned state: 10 items, pruning_boundary=0. + let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone()) + .await + .unwrap(); + for i in 0..10u64 { + journal.append(&test_digest(i)).await.unwrap(); + } + journal.sync().await.unwrap(); + drop(journal); + + // Simulate clear_to_size(7) crash: blobs cleared, section 1 created, + // but metadata still has recovery_watermark=10. + let blob_part = blob_partition(&cfg); + context.remove(&blob_part, None).await.unwrap(); + let (blob, _) = context.open(&blob_part, &1u64.to_be_bytes()).await.unwrap(); + blob.sync().await.unwrap(); + + let journal = Journal::<_, Digest>::init(context.child("crash"), cfg.clone()) + .await + .expect("init failed after clear_to_size crash with aligned metadata"); + + let bounds = journal.bounds().await; + assert_eq!(bounds.start, 5); + assert_eq!(bounds.end, 5); + assert_eq!(journal.recovery_watermark().await, 5); + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_clear_to_size_crash_aligned_metadata_far_watermark() { + // Regression: the stale recovery watermark may point more than one section + // past the recreated empty tail. + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + + let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone()) + .await + .unwrap(); + for i in 0..10u64 { + journal.append(&test_digest(i)).await.unwrap(); + } + journal.sync().await.unwrap(); + drop(journal); + + // Simulate clear_to_size(2) crash: blobs cleared, section 0 created, + // but metadata still has recovery_watermark=10. + let blob_part = blob_partition(&cfg); + context.remove(&blob_part, None).await.unwrap(); + let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap(); + blob.sync().await.unwrap(); + + let journal = Journal::<_, Digest>::init(context.child("crash"), cfg.clone()) + .await + .expect("init failed after clear_to_size crash with far aligned metadata"); + + let bounds = journal.bounds().await; + assert_eq!(bounds.start, 0); + assert_eq!(bounds.end, 0); + assert_eq!(journal.recovery_watermark().await, 0); + journal.destroy().await.unwrap(); + }); + } + #[test_traced] fn test_read_many_empty() { let executor = deterministic::Runner::default(); @@ -3167,6 +4274,7 @@ mod tests { let items: Vec<_> = (0..5).map(test_digest).collect(); journal.append_many(Many::Flat(&items)).await.unwrap(); journal.append(&test_digest(5)).await.unwrap(); + journal.commit().await.unwrap(); journal.sync().await.unwrap(); journal.reader().await.read(0).await.unwrap(); journal.reader().await.try_read_sync(0).unwrap(); @@ -3186,11 +4294,13 @@ mod tests { "fixed_metrics_read_many_calls_total 1", "fixed_metrics_try_read_sync_hits_total 1", "fixed_metrics_items_read_total 5", + "fixed_metrics_commit_calls_total 1", "fixed_metrics_sync_calls_total 1", "fixed_metrics_append_duration_count 1", "fixed_metrics_append_many_duration_count 1", "fixed_metrics_read_duration_count 1", "fixed_metrics_read_many_duration_count 1", + "fixed_metrics_commit_duration_count 1", "fixed_metrics_sync_duration_count 1", "fixed_metrics_cache_hits_total", "fixed_metrics_cache_misses_total", diff --git a/storage/src/journal/contiguous/metrics.rs b/storage/src/journal/contiguous/metrics.rs index 67a5c3e695d..01126f5d6c3 100644 --- a/storage/src/journal/contiguous/metrics.rs +++ b/storage/src/journal/contiguous/metrics.rs @@ -18,7 +18,7 @@ pub(super) struct CacheMetrics { misses: Counter, } -/// Metrics registered only for variable-size journals. +/// Metrics registered for durable commits. pub(super) struct CommitMetrics { /// Durable commit calls that do not fully sync all indexes. calls: Counter, @@ -189,6 +189,7 @@ impl CommonMetrics { pub(super) struct FixedMetrics { common: CommonMetrics, cache: CacheMetrics, + commit: CommitMetrics, } impl FixedMetrics { @@ -203,10 +204,22 @@ impl FixedMetrics { "Number of fixed items not satisfied synchronously, including pruned or out-of-range \ try_read_sync probes that returned None", ); + let calls = context + .as_ref() + .counter("commit_calls", "Number of commit calls"); + let duration = duration_histogram( + context.as_ref(), + "commit_duration", + "Duration of commit calls", + ); let common = CommonMetrics::new(context); Self { common, cache: CacheMetrics { hits, misses }, + commit: CommitMetrics { + calls, + duration: Timed::new(duration), + }, } } } @@ -219,6 +232,14 @@ impl FixedMetrics { pub(super) fn record_cache_misses(&self, misses: u64) { self.cache.misses.inc_by(misses); } + + pub(super) fn commit_timer(&self) -> ScopedTimer { + self.commit.duration.scoped(&self.common.clock) + } + + pub(super) fn record_commit(&self) { + self.commit.calls.inc(); + } } impl Deref for FixedMetrics { diff --git a/storage/src/journal/contiguous/mod.rs b/storage/src/journal/contiguous/mod.rs index 1fbe782d5b1..6d60766294c 100644 --- a/storage/src/journal/contiguous/mod.rs +++ b/storage/src/journal/contiguous/mod.rs @@ -18,12 +18,12 @@ mod tests; /// A reader guard that holds a consistent view of the journal. /// -/// While this guard exists, operations that may modify the bounds (such as `append`, `prune`, and -/// `rewind`) will block until the guard is dropped. This keeps bounds stable, so any position -/// within `bounds()` is guaranteed readable. -// -// TODO(): Relax locking to allow `append` -// since it doesn't invalidate reads within the cached bounds. +/// While this guard exists, the reader's logical bounds remain stable, and any position within +/// `bounds()` remains readable through this guard. +/// +/// Implementations may still make physical storage progress, such as unlinking backing blobs from +/// future namespace lookups, but they must not invalidate reads within the captured bounds or +/// change the bounds visible through this reader. pub trait Reader: Send + Sync { /// The type of items stored in the journal. type Item; diff --git a/storage/src/journal/contiguous/variable.rs b/storage/src/journal/contiguous/variable.rs index d23c88392e5..825c7d67b65 100644 --- a/storage/src/journal/contiguous/variable.rs +++ b/storage/src/journal/contiguous/variable.rs @@ -1,7 +1,7 @@ //! Position-based journal for variable-length items. //! -//! This journal enforces section fullness: all non-final sections are full and synced. -//! On init, only the last section needs to be replayed to determine the exact size. +//! The data journal is the source of truth. The offsets journal provides indexed access and records +//! the preferred recovery point for replaying data to rebuild offset entries. use super::Reader as _; use crate::{ @@ -15,7 +15,7 @@ use crate::{ use commonware_codec::{Codec, CodecShared}; use commonware_runtime::buffer::paged::CacheRef; use commonware_utils::{ - sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock}, + sync::{AsyncMutex, AsyncRwLock, AsyncRwLockReadGuard}, NZUsize, }; #[commonware_macros::stability(ALPHA)] @@ -68,7 +68,7 @@ pub struct Config { /// The number of items to store in each section. /// /// Once set, this value cannot be changed across restarts. - /// All non-final sections will be full and persisted. + /// All non-final sections are logically full. pub items_per_section: NonZeroU64, /// Optional compression level for stored items. @@ -117,6 +117,12 @@ struct Inner { /// /// Never decreases (pruning only moves forward). pruning_boundary: u64, + + /// Earliest data section modified since the last `commit()` or `sync()`. + /// + /// Tracks which sections need fsyncing. Reset by both `commit()` and `sync()` so + /// that repeated commit-without-sync cycles only fsync newly dirtied sections. + dirty_from_section: Option, } impl Inner { @@ -191,32 +197,36 @@ impl Inner { /// /// # Invariants /// -/// ## 1. Section Fullness -/// -/// All non-final sections are full (`items_per_section` items) and persisted. This ensures -/// that on `init()`, we only need to replay the last section to determine the exact size. -/// -/// ## 2. Data Journal is Source of Truth +/// ## 1. Data Journal is Source of Truth /// /// The data journal is always the source of truth. The offsets journal is an index /// that may temporarily diverge during crashes. Divergences are automatically /// aligned during init(): -/// * If offsets.size() < data.size(): Rebuild missing offsets by replaying data. -/// (This can happen if we crash after writing data journal but before writing offsets journal) -/// * If offsets.size() > data.size(): Rewind offsets to match data size. -/// (This can happen if we crash after rewinding data journal but before rewinding offsets journal) +/// * If offsets are behind data after the recovery watermark: rebuild missing offsets by replaying +/// data from the recovery anchor. +/// * If offsets are ahead of the retained data prefix: rewind offsets to match the data-backed +/// size. /// * If offsets.bounds().start < data.bounds().start: Prune offsets to match /// (This can happen if we crash after pruning data journal but before pruning offsets journal) /// /// Note that we don't recover from the case where offsets.bounds().start > /// data.bounds().start. This should never occur because we always prune the data journal /// before the offsets journal. +/// +/// ## 2. Offsets Recovery Watermark +/// +/// The offsets journal's recovery watermark records a preferred point for replaying data to rebuild +/// offset entries after a crash. If that point falls outside the recovered offsets bounds, init +/// falls back to the offsets start. Replay after the anchor stops at the first short data section +/// and truncates newer sections so the recovered journal remains a contiguous prefix. pub struct Journal { /// Inner state for data journal metadata. /// - /// Serializes persistence and write operations (`sync`, `append`, `prune`, `rewind`) to prevent - /// race conditions while allowing concurrent reads during sync. - inner: UpgradableAsyncRwLock>, + /// Reads can proceed during `commit()` and `sync()`, while mutators take the write side. + inner: AsyncRwLock>, + + /// Serializes mutators with `commit()` and `sync()` so a plain rwlock is sufficient. + op_lock: AsyncMutex<()>, /// Index mapping positions to byte offsets within the data journal. /// The section can be calculated from the position using items_per_section. @@ -411,6 +421,15 @@ impl super::Reader for Reader<'_, E, V> { } impl Journal { + #[inline] + fn mark_dirty_from(inner: &mut Inner, section: u64) { + inner.dirty_from_section = Some( + inner + .dirty_from_section + .map_or(section, |existing| existing.min(section)), + ); + } + /// Initialize a contiguous variable journal. /// /// # Crash Recovery @@ -455,11 +474,13 @@ impl Journal { metrics.update(size, pruning_boundary, items_per_section); Ok(Self { - inner: UpgradableAsyncRwLock::new(Inner { + inner: AsyncRwLock::new(Inner { data, size, pruning_boundary, + dirty_from_section: None, }), + op_lock: AsyncMutex::new(()), offsets, items_per_section, compression: cfg.compression, @@ -504,11 +525,13 @@ impl Journal { metrics.update(size, size, items_per_section); Ok(Self { - inner: UpgradableAsyncRwLock::new(Inner { + inner: AsyncRwLock::new(Inner { data, size, pruning_boundary: size, + dirty_from_section: None, }), + op_lock: AsyncMutex::new(()), offsets, items_per_section, compression: cfg.compression, @@ -626,6 +649,7 @@ impl Journal { /// /// - This operation is not guaranteed to survive restarts until `commit` or `sync` is called. pub async fn rewind(&self, size: u64) -> Result<(), Error> { + let _op_guard = self.op_lock.lock().await; let mut inner = self.inner.write().await; // Validate rewind target @@ -655,6 +679,7 @@ impl Journal { // Update our size inner.size = size; + Self::mark_dirty_from(&mut inner, discard_section); self.metrics .update(inner.size, inner.pruning_boundary, self.items_per_section); @@ -666,9 +691,6 @@ impl Journal { /// The position returned is a stable, consecutively increasing value starting from 0. /// This position remains constant after pruning. /// - /// When a section becomes full, both the data journal and offsets journal are persisted - /// to maintain the invariant that all non-final sections are full and consistent. - /// /// # Errors /// /// Returns an error if the underlying storage operation fails or if the item cannot @@ -721,6 +743,8 @@ impl Journal { } } + let _op_guard = self.op_lock.lock().await; + // Mutating operations are serialized by taking the write guard. let mut inner = self.inner.write().await; @@ -752,7 +776,7 @@ impl Journal { }) .collect::, _>>()?; - // Persist the offsets for this section batch in the offsets journal. + // Append the offsets for this section batch to the offsets journal. let last_offsets_pos = self .offsets .append_many(Many::Flat(&absolute_offsets)) @@ -761,22 +785,7 @@ impl Journal { inner.size += batch_count as u64; written += batch_count; - - // The section was filled and must be synced. Downgrade so readers can continue - // during the sync while mutators remain blocked. - if inner.size.is_multiple_of(self.items_per_section) { - let inner_ref = inner.downgrade_to_upgradable(); - futures::try_join!(inner_ref.data.sync(section), self.offsets.sync())?; - if written == items_count { - self.metrics.update( - inner_ref.size, - inner_ref.pruning_boundary, - self.items_per_section, - ); - return Ok(inner_ref.size - 1); - } - inner = inner_ref.upgrade().await; - } + Self::mark_dirty_from(&mut inner, section); } self.metrics @@ -811,6 +820,7 @@ impl Journal { /// Errors may leave the journal in an inconsistent state. The journal should be closed and /// reopened to trigger alignment in [Journal::init]. pub async fn prune(&self, min_position: u64) -> Result { + let _op_guard = self.op_lock.lock().await; let mut inner = self.inner.write().await; if min_position <= inner.pruning_boundary { @@ -828,46 +838,55 @@ impl Journal { let new_oldest = (min_section * self.items_per_section).max(inner.pruning_boundary); inner.pruning_boundary = new_oldest; self.offsets.prune(new_oldest).await?; + if let Some(dirty_from) = inner.dirty_from_section { + inner.dirty_from_section = Some(dirty_from.max(min_section)); + } self.metrics .update(inner.size, inner.pruning_boundary, self.items_per_section); } Ok(pruned) } - /// Durably persist the journal. - /// - /// This is faster than `sync()` but recovery will be required on startup if a crash occurs - /// before the next call to `sync()`. + /// Fsync dirty data sections under the read lock, allowing concurrent reads. + async fn fsync_dirty_data(&self) -> Result<(), Error> { + let inner = self.inner.read().await; + if let Some(start_section) = inner.dirty_from_section { + let tail_section = position_to_section(inner.size, self.items_per_section); + let start_section = inner + .data + .oldest_section() + .map(|oldest| start_section.max(oldest)) + // With no retained data blobs, any earlier dirty section was cleared or pruned. + // Syncing the tail section is harmless when it does not exist. + .unwrap_or(tail_section); + for section in start_section..=tail_section { + inner.data.sync(section).await?; + } + } + Ok(()) + } + + /// Persist dirty data and offsets sections so committed data survives a crash. pub async fn commit(&self) -> Result<(), Error> { let _timer = self.metrics.commit_timer(); self.metrics.record_commit(); - // Serialize with append/prune/rewind so section selection is stable, while still allowing - // concurrent readers. - let inner = self.inner.upgradable_read().await; - - let section = position_to_section(inner.size, self.items_per_section); - inner.data.sync(section).await?; + let _op_guard = self.op_lock.lock().await; + self.fsync_dirty_data().await?; + self.offsets.commit().await?; + let mut inner = self.inner.write().await; + inner.dirty_from_section = None; Ok(()) } - /// Durably persist the journal and ensure recovery is not required on startup. - /// - /// This is slower than `commit()` but ensures the journal doesn't require recovery on startup. + /// Persist dirty data sections and all metadata for both the data and offsets journals. pub async fn sync(&self) -> Result<(), Error> { let _timer = self.metrics.sync_timer(); self.metrics.sync_calls.inc(); - // Serialize with append/prune/rewind so section selection is stable, while still allowing - // concurrent readers. - let inner = self.inner.upgradable_read().await; - - // Persist only the current (final) section of the data journal. - // All non-final sections are already persisted per Invariant #1. - let section = position_to_section(inner.size, self.items_per_section); - - // Persist both journals concurrently. These journals may not exist yet if the - // previous section was just filled. This is checked internally. - futures::try_join!(inner.data.sync(section), self.offsets.sync())?; - + let _op_guard = self.op_lock.lock().await; + self.fsync_dirty_data().await?; + self.offsets.sync().await?; + let mut inner = self.inner.write().await; + inner.dirty_from_section = None; Ok(()) } @@ -886,12 +905,14 @@ impl Journal { /// After clearing, the journal will behave as if initialized with `init_at_size(new_size)`. #[commonware_macros::stability(ALPHA)] pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> { + let _op_guard = self.op_lock.lock().await; let mut inner = self.inner.write().await; inner.data.clear().await?; self.offsets.clear_to_size(new_size).await?; inner.size = new_size; inner.pruning_boundary = new_size; + inner.dirty_from_section = None; self.metrics .update(inner.size, inner.pruning_boundary, self.items_per_section); Ok(()) @@ -900,8 +921,8 @@ impl Journal { /// Align the offsets journal and data journal to be consistent in case a crash occurred /// on a previous run and left the journals in an inconsistent state. /// - /// The data journal is the source of truth. This function scans it to determine - /// what SHOULD be in the offsets journal, then fixes any mismatches. + /// The data journal is the source of truth. This function replays the data journal as needed to + /// verify or rebuild the offsets suffix, then fixes any mismatches. /// /// # Returns /// @@ -925,6 +946,11 @@ impl Journal { } None => 0, }; + if items_in_last_section > items_per_section { + return Err(Error::Corruption(format!( + "data section has too many items: expected at most {items_per_section}, got {items_in_last_section}" + ))); + } // Data journal is empty if there are no sections or if there is one section and it has no items. // The latter should only occur if a crash occured after opening a data journal blob but @@ -970,7 +996,6 @@ impl Journal { // === Handle non-empty data journal case === let data_first_section = data.oldest_section().unwrap(); - let data_last_section = data.newest_section().unwrap(); // data_oldest_pos is ALWAYS section-aligned because it's computed from the section index. // This differs from offsets bounds start which can be mid-section after init_at_size. @@ -1021,37 +1046,74 @@ impl Journal { } } - // Compute the correct logical size - // Uses bounds.start from offsets as the anchor because it tracks the exact starting - // position, which may be mid-section after init_at_size. - // - // Note: Corruption checks above ensure bounds.start is in data_first_section, - // so the subtraction in oldest_items cannot underflow. // Re-fetch bounds since prune may have been called above. - let (offsets_bounds, data_size) = { + let offsets_bounds = { let offsets_reader = offsets.reader().await; - let offsets_bounds = offsets_reader.bounds(); - let data_size = if data_first_section == data_last_section { - offsets_bounds.start + items_in_last_section - } else { - let oldest_items = - (data_first_section + 1) * items_per_section - offsets_bounds.start; - let middle_items = (data_last_section - data_first_section - 1) * items_per_section; - offsets_bounds.start + oldest_items + middle_items + items_in_last_section - }; - (offsets_bounds, data_size) + offsets_reader.bounds() + }; + // The newest data section bounds how far recovery can possibly go. If it is also the + // oldest retained section, its logical start may be a mid-section pruning boundary. + let data_newest_section = data + .newest_section() + .expect("non-empty data journal should have newest section"); + let data_newest_start = data_newest_section + .checked_mul(items_per_section) + .ok_or(Error::OffsetOverflow)?; + let retained_data_end_bound = data_newest_start + .max(offsets_bounds.start) + .checked_add(items_in_last_section) + .ok_or(Error::OffsetOverflow)?; + + let recovery_watermark = offsets.recovery_watermark().await; + let recovery_start = if recovery_watermark < offsets_bounds.start + || recovery_watermark > offsets_bounds.end + || recovery_watermark > retained_data_end_bound + { + warn!( + recovery_watermark, + start = offsets_bounds.start, + end = offsets_bounds.end, + retained_data_end_bound, + "crash repair: offsets recovery watermark is unusable, rebuilding from offsets start" + ); + offsets_bounds.start + } else { + recovery_watermark }; - // Align sizes - let offsets_size = offsets_bounds.end; - if offsets_size > data_size { - // Crashed after writing offsets but before writing data. - warn!("crash repair: rewinding offsets from {offsets_size} to {data_size}"); - offsets.rewind(data_size).await?; - } else if offsets_size < data_size { - // Crashed after writing data but before writing offsets. - Self::add_missing_offsets(data, offsets, offsets_size, items_per_section).await?; - } + let data_size = match Self::rebuild_offsets_from_anchor( + data, + offsets, + items_per_section, + offsets_bounds.start, + recovery_start, + ) + .await? + { + Some(size) => size, + None if recovery_start != offsets_bounds.start => { + warn!( + recovery_watermark = recovery_start, + pruning_boundary = offsets_bounds.start, + "crash repair: data journal shorter than offsets recovery watermark, rebuilding from pruning boundary" + ); + Self::rebuild_offsets_from_anchor( + data, + offsets, + items_per_section, + offsets_bounds.start, + offsets_bounds.start, + ) + .await? + .expect("rebuild from pruning boundary should succeed after pruning alignment") + } + None => { + return Err(Error::Corruption(format!( + "data journal shorter than pruning boundary {}", + offsets_bounds.start + ))) + } + }; // Final invariant checks let pruning_boundary = { @@ -1077,71 +1139,97 @@ impl Journal { Ok((pruning_boundary, data_size)) } - /// Rebuild missing offset entries by replaying the data journal and - /// appending the missing entries to the offsets journal. + /// Rebuild the offsets suffix by replaying the data journal from a recovery anchor. /// - /// The data journal is the source of truth. This function brings the offsets - /// journal up to date by replaying data items and indexing their positions. - /// - /// # Warning - /// - /// - Panics if data journal is empty - /// - Panics if `offsets_size` >= `data.size()` - async fn add_missing_offsets( - data: &variable::Journal, + /// Returns `Ok(None)` if the anchor is ahead of the data journal and callers should retry from + /// an earlier point. If replay finds a short section after the anchor, recovery truncates newer + /// data sections and returns the contiguous data-backed size. + async fn rebuild_offsets_from_anchor( + data: &mut variable::Journal, offsets: &mut fixed::Journal, - offsets_size: u64, items_per_section: u64, - ) -> Result<(), Error> { + pruning_boundary: u64, + anchor: u64, + ) -> Result, Error> { assert!( !data.is_empty(), "rebuild_offsets called with empty data journal" ); - // Find where to start replaying - let (start_section, resume_offset, skip_first) = { + let offsets_bounds = { let offsets_reader = offsets.reader().await; - let offsets_bounds = offsets_reader.bounds(); - if offsets_bounds.is_empty() { - // Offsets empty -- start from first data section - // SAFETY: data is non-empty (checked above) - let first_section = data.oldest_section().unwrap(); - (first_section, 0, false) - } else if offsets_bounds.start < offsets_size { - // Offsets has items -- resume from last indexed position - let last_offset = offsets_reader.read(offsets_size - 1).await?; - let last_section = position_to_section(offsets_size - 1, items_per_section); - (last_section, last_offset, true) - } else { - // Offsets fully pruned but data has items -- start from first data section - // SAFETY: data is non-empty (checked above) - let first_section = data.oldest_section().unwrap(); - (first_section, 0, false) - } + offsets_reader.bounds() }; + if anchor < pruning_boundary || anchor > offsets_bounds.end { + return Ok(None); + } - // Replay data journal from start position through the end and index all items. - // The data journal is the source of truth, so we consume the entire stream. - // (replay streams from start_section onwards through all subsequent sections) - let stream = data - .replay(start_section, resume_offset, REPLAY_BUFFER_SIZE) - .await?; - futures::pin_mut!(stream); + if offsets_bounds.end > anchor { + offsets.rewind(anchor).await?; + } - let mut skipped_first = false; - while let Some(result) = stream.next().await { - let (_section, offset, _size, _item) = result?; + let start_section = position_to_section(anchor, items_per_section); + let first_position = pruning_boundary.max(start_section * items_per_section); + + let (size, repair) = { + let skip = anchor - first_position; + let stream = data.replay(start_section, 0, REPLAY_BUFFER_SIZE).await?; + futures::pin_mut!(stream); + + let mut skipped = 0; + while skipped < skip { + let Some(result) = stream.next().await else { + return Ok(None); + }; + let (section, _offset, _size, _item) = result?; + let position = first_position + skipped; + let expected_section = position_to_section(position, items_per_section); + if section != expected_section { + if section > expected_section { + return Ok(None); + } + return Err(Error::Corruption(format!( + "data section {section} contains logical position {position}, expected section {expected_section}" + ))); + } + skipped += 1; + } - // Skip first item if resuming from last indexed offset - if skip_first && !skipped_first { - skipped_first = true; - continue; + let mut size = anchor; + let mut repair = None; + while let Some(result) = stream.next().await { + let (section, offset, _size, _item) = result?; + let expected_section = position_to_section(size, items_per_section); + if section != expected_section { + if section > expected_section { + let byte_offset = data.size(expected_section).await?; + repair = Some((expected_section, section, size, byte_offset)); + break; + } + return Err(Error::Corruption(format!( + "data section {section} contains logical position {size}, expected section {expected_section}" + ))); + } + offsets.append(&offset).await?; + size += 1; } + (size, repair) + }; - offsets.append(&offset).await?; + if let Some((section, next_section, size, byte_offset)) = repair { + warn!( + section, + next_section, + size, + byte_offset, + "crash repair: truncating data after short section" + ); + data.rewind(section, byte_offset).await?; + data.sync(section).await?; + return Ok(Some(size)); } - Ok(()) + Ok(Some(size)) } } @@ -1249,11 +1337,30 @@ impl Journal { self.offsets.rewind(position).await } + /// Test helper: Set and persist the offsets recovery watermark directly. + pub(crate) async fn test_set_offsets_recovery_watermark( + &self, + watermark: u64, + ) -> Result<(), Error> { + self.offsets.test_set_recovery_watermark(watermark).await + } + /// Test helper: Get the size of the internal offsets journal. pub(crate) async fn test_offsets_size(&self) -> u64 { self.offsets.size().await } + /// Test helper: Rewind the internal data journal to the item at `position`. + pub(crate) async fn test_rewind_data_to_position(&self, position: u64) -> Result<(), Error> { + let offset = { + let offsets_reader = self.offsets.reader().await; + offsets_reader.read(position).await? + }; + let section = position_to_section(position, self.items_per_section); + let mut inner = self.inner.write().await; + inner.data.rewind_to_offset(section, offset).await + } + /// Test helper: Append directly to the internal data journal (simulates crash scenario). pub(crate) async fn test_append_data( &self, @@ -1280,7 +1387,8 @@ mod tests { use crate::journal::contiguous::tests::run_contiguous_tests; use commonware_macros::test_traced; use commonware_runtime::{ - buffer::paged::CacheRef, deterministic, Metrics as _, Runner, Storage, Supervisor as _, + buffer::paged::{Append, CacheRef}, + deterministic, Metrics as _, Runner, Storage, Supervisor as _, }; use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64}; use futures::FutureExt as _; @@ -1989,6 +2097,34 @@ mod tests { }); } + #[test_traced] + fn test_variable_recovery_rejects_overlong_data_section() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = Config { + partition: "recovery-overlong-data-section".into(), + items_per_section: NZU64!(10), + compression: None, + codec_config: (), + page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), + write_buffer: NZUsize!(1024), + }; + + let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone()) + .await + .unwrap(); + + for i in 0..11u64 { + journal.test_append_data(0, i * 100).await.unwrap(); + } + journal.test_sync_data().await.unwrap(); + drop(journal); + + let result = Journal::<_, u64>::init(context.child("second"), cfg.clone()).await; + assert!(matches!(result, Err(Error::Corruption(_)))); + }); + } + /// Test recovery from multiple prune operations with crash. #[test_traced] fn test_variable_recovery_multiple_prunes_crash() { @@ -2053,14 +2189,13 @@ mod tests { }); } - /// Test recovery from crash during rewind operation. + /// Test recovery when the offsets journal is behind the data journal. /// - /// Simulates a crash after offsets.rewind() completes but before data.rewind() completes. - /// This creates a situation where offsets journal has been rewound but data journal still - /// contains items across multiple sections. Verifies that init() correctly rebuilds the - /// offsets index across all sections to match the data journal. + /// This creates a situation where offsets are missing while the data journal still contains + /// items across multiple sections. Verifies that init() rebuilds the offsets suffix across all + /// remaining data sections. #[test_traced] - fn test_variable_recovery_rewind_crash_multi_section() { + fn test_variable_recovery_offsets_behind_data_multi_section() { let executor = deterministic::Runner::default(); executor.start(|context| async move { // === Setup: Create Variable wrapper with data across multiple sections === @@ -2084,10 +2219,8 @@ mod tests { assert_eq!(variable.size().await, 25); - // === Simulate crash during rewind(5) === - // Rewind offsets journal to size 5 (keeps positions 0-4) + // Keep offsets for positions 0-4, while data still contains all 25 items. variable.test_rewind_offsets(5).await.unwrap(); - // CRASH before data.rewind() completes - data still has all 3 sections variable.sync().await.unwrap(); drop(variable); @@ -2119,6 +2252,413 @@ mod tests { }); } + #[test_traced] + fn test_variable_recovery_offsets_watermark_outside_bounds_rebuilds_from_start() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = Config { + partition: "recovery-watermark-outside-bounds".into(), + items_per_section: NZU64!(10), + compression: None, + codec_config: (), + page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), + write_buffer: NZUsize!(1024), + }; + + let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone()) + .await + .unwrap(); + + for i in 0..15u64 { + journal.append(&(i * 100)).await.unwrap(); + } + journal.sync().await.unwrap(); + + // Simulate stale metadata that points past the recovered offsets bounds. + journal + .test_set_offsets_recovery_watermark(30) + .await + .unwrap(); + drop(journal); + + let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone()) + .await + .unwrap(); + assert_eq!(journal.bounds().await, 0..15); + assert_eq!(journal.test_offsets_size().await, 15); + for i in 0..15u64 { + assert_eq!(journal.read(i).await.unwrap(), i * 100); + } + + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_variable_rebuild_offsets_anchor_outside_bounds_returns_none() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let data_cfg = variable::Config { + partition: "rebuild-anchor-outside-data".into(), + compression: None, + codec_config: (), + page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), + write_buffer: NZUsize!(1024), + }; + let offsets_cfg = fixed::Config { + partition: "rebuild-anchor-outside-offsets".into(), + items_per_blob: NZU64!(10), + page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), + write_buffer: NZUsize!(1024), + }; + + let mut data = variable::Journal::<_, u64>::init(context.child("data"), data_cfg) + .await + .unwrap(); + let mut offsets = fixed::Journal::<_, u64>::init(context.child("offsets"), offsets_cfg) + .await + .unwrap(); + + let (offset, _) = data.append(0, &100).await.unwrap(); + offsets.append(&offset).await.unwrap(); + + let result = + Journal::<_, u64>::rebuild_offsets_from_anchor(&mut data, &mut offsets, 10, 0, 2) + .await + .unwrap(); + assert!(result.is_none()); + + data.destroy().await.unwrap(); + offsets.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_variable_recovery_retries_from_pruning_boundary_when_anchor_too_far() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = Config { + partition: "recovery-anchor-too-far".into(), + items_per_section: NZU64!(10), + compression: None, + codec_config: (), + page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), + write_buffer: NZUsize!(1024), + }; + + let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone()) + .await + .unwrap(); + + for i in 0..20u64 { + journal.append(&(i * 100)).await.unwrap(); + } + journal.sync().await.unwrap(); + + // The offsets watermark is in-bounds, but the data journal is shorter than that + // anchor. Recovery should retry from the pruning boundary and rebuild only the + // retained data prefix. + journal + .test_set_offsets_recovery_watermark(15) + .await + .unwrap(); + journal.test_rewind_data_to_position(12).await.unwrap(); + journal.test_sync_data().await.unwrap(); + drop(journal); + + let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone()) + .await + .unwrap(); + assert_eq!(journal.bounds().await, 0..12); + assert_eq!(journal.test_offsets_size().await, 12); + for i in 0..12u64 { + assert_eq!(journal.read(i).await.unwrap(), i * 100); + } + assert!(matches!( + journal.read(12).await, + Err(Error::ItemOutOfRange(12)) + )); + + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_variable_rewind_commit_reopen() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = Config { + partition: "rewind-commit-reopen".into(), + items_per_section: NZU64!(10), + compression: None, + codec_config: (), + page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), + write_buffer: NZUsize!(1024), + }; + + let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone()) + .await + .unwrap(); + + for i in 0..25u64 { + journal.append(&(i * 100)).await.unwrap(); + } + journal.sync().await.unwrap(); + + journal.rewind(12).await.unwrap(); + journal.commit().await.unwrap(); + drop(journal); + + let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone()) + .await + .unwrap(); + assert_eq!(journal.bounds().await, 0..12); + for i in 0..12u64 { + assert_eq!(journal.read(i).await.unwrap(), i * 100); + } + assert!(matches!( + journal.read(12).await, + Err(Error::ItemOutOfRange(12)) + )); + + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_variable_recovery_boundary_data_rewind_rebuilds_offsets() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = Config { + partition: "recovery-boundary-data-rewind".into(), + items_per_section: NZU64!(10), + compression: None, + codec_config: (), + page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), + write_buffer: NZUsize!(1024), + }; + + let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone()) + .await + .unwrap(); + + for i in 0..20u64 { + journal.append(&(i * 100)).await.unwrap(); + } + journal.sync().await.unwrap(); + + journal.test_rewind_data_to_position(10).await.unwrap(); + journal.test_sync_data().await.unwrap(); + drop(journal); + + let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone()) + .await + .unwrap(); + assert_eq!(journal.bounds().await, 0..10); + assert_eq!(journal.test_offsets_size().await, 10); + for i in 0..10u64 { + assert_eq!(journal.read(i).await.unwrap(), i * 100); + } + assert!(matches!( + journal.read(10).await, + Err(Error::ItemOutOfRange(10)) + )); + + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_variable_recovery_truncates_short_data_section_after_anchor() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = Config { + partition: "recovery-short-section-after-anchor".into(), + items_per_section: NZU64!(10), + compression: None, + codec_config: (), + page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), + write_buffer: NZUsize!(1024), + }; + + let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone()) + .await + .unwrap(); + + for i in 0..25u64 { + journal.append(&(i * 100)).await.unwrap(); + } + journal.sync().await.unwrap(); + + // Simulate a crash after the previous recovery checkpoint where section 1 was only + // partly durable but section 2 was present. Recovery should keep the contiguous prefix + // and discard section 2 rather than treating the section jump as hard corruption. + journal + .test_set_offsets_recovery_watermark(10) + .await + .unwrap(); + let offset = { + let offsets = journal.offsets.reader().await; + offsets.read(12).await.unwrap() + }; + { + let mut inner = journal.inner.write().await; + inner.data.rewind_section(1, offset).await.unwrap(); + inner.data.sync(1).await.unwrap(); + inner.data.sync(2).await.unwrap(); + } + drop(journal); + + let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone()) + .await + .unwrap(); + assert_eq!(journal.bounds().await, 0..12); + assert_eq!(journal.test_offsets_size().await, 12); + for i in 0..12u64 { + assert_eq!(journal.read(i).await.unwrap(), i * 100); + } + assert!(matches!( + journal.read(12).await, + Err(Error::ItemOutOfRange(12)) + )); + + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_variable_init_persists_offsets_trailing_item_repair() { + let executor = deterministic::Runner::default(); + let ((offsets_blob_partition, expected_size), checkpoint) = + executor.start_and_recover(|context| async move { + let cfg = Config { + partition: "offsets-init-repair-sync".into(), + items_per_section: NZU64!(10), + compression: None, + codec_config: (), + page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), + write_buffer: NZUsize!(1024), + }; + let offsets_blob_partition = format!("{}-blobs", cfg.offsets_partition()); + let expected_size = 2 * std::mem::size_of::() as u64; + + let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone()) + .await + .unwrap(); + journal.append(&10).await.unwrap(); + journal.append(&20).await.unwrap(); + journal.sync().await.unwrap(); + drop(journal); + + let (blob, raw_size) = context + .open(&offsets_blob_partition, &0u64.to_be_bytes()) + .await + .unwrap(); + let append = Append::new( + blob, + raw_size, + 2048, + CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), + ) + .await + .unwrap(); + assert_eq!(append.size().await, expected_size); + append.resize(expected_size + 1).await.unwrap(); + append.sync().await.unwrap(); + drop(append); + + let journal = Journal::<_, u64>::init(context.child("second"), cfg) + .await + .unwrap(); + assert_eq!(journal.bounds().await, 0..2); + drop(journal); + + (offsets_blob_partition, expected_size) + }); + + deterministic::Runner::from(checkpoint).start(move |context| async move { + let (blob, raw_size) = context + .open(&offsets_blob_partition, &0u64.to_be_bytes()) + .await + .unwrap(); + let append = Append::new( + blob, + raw_size, + 2048, + CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), + ) + .await + .unwrap(); + assert_eq!(append.size().await, expected_size); + }); + } + + #[test_traced] + fn test_variable_init_persists_data_tail_repair() { + let executor = deterministic::Runner::default(); + let ((data_partition, expected_size), checkpoint) = + executor.start_and_recover(|context| async move { + let cfg = Config { + partition: "data-init-repair-sync".into(), + items_per_section: NZU64!(10), + compression: None, + codec_config: (), + page_cache: CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), + write_buffer: NZUsize!(1024), + }; + let data_partition = cfg.data_partition(); + + let journal = Journal::<_, u64>::init(context.child("first"), cfg.clone()) + .await + .unwrap(); + journal.append(&10).await.unwrap(); + journal.append(&20).await.unwrap(); + journal.sync().await.unwrap(); + drop(journal); + + let (blob, raw_size) = context + .open(&data_partition, &0u64.to_be_bytes()) + .await + .unwrap(); + let append = Append::new( + blob, + raw_size, + 2048, + CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), + ) + .await + .unwrap(); + let expected_size = append.size().await; + append.append(&[0xFF, 0xFF]).await.unwrap(); + append.sync().await.unwrap(); + drop(append); + + let journal = Journal::<_, u64>::init(context.child("second"), cfg) + .await + .unwrap(); + assert_eq!(journal.bounds().await, 0..2); + drop(journal); + + (data_partition, expected_size) + }); + + deterministic::Runner::from(checkpoint).start(move |context| async move { + let (blob, raw_size) = context + .open(&data_partition, &0u64.to_be_bytes()) + .await + .unwrap(); + let append = Append::new( + blob, + raw_size, + 2048, + CacheRef::from_pooler(&context, LARGE_PAGE_SIZE, NZUsize!(10)), + ) + .await + .unwrap(); + assert_eq!(append.size().await, expected_size); + }); + } + /// Test recovery from crash after data sync but before offsets sync when journal was /// previously emptied by pruning. #[test_traced] @@ -2233,6 +2773,43 @@ mod tests { }); } + #[test_traced] + fn test_variable_recovery_from_mid_section_durable_anchor() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = Config { + partition: "mid-section-durable-anchor".into(), + items_per_section: NZU64!(5), + compression: None, + codec_config: (), + page_cache: CacheRef::from_pooler(&context, SMALL_PAGE_SIZE, NZUsize!(2)), + write_buffer: NZUsize!(1024), + }; + + let journal = Journal::<_, u64>::init_at_size(context.child("first"), cfg.clone(), 7) + .await + .unwrap(); + assert_eq!(journal.append(&700).await.unwrap(), 7); + journal.sync().await.unwrap(); + + for i in 1..6u64 { + assert_eq!(journal.append(&(700 + i)).await.unwrap(), 7 + i); + } + journal.commit().await.unwrap(); + drop(journal); + + let journal = Journal::<_, u64>::init(context.child("second"), cfg.clone()) + .await + .unwrap(); + assert_eq!(journal.bounds().await, 7..13); + for i in 0..6u64 { + assert_eq!(journal.read(7 + i).await.unwrap(), 700 + i); + } + + journal.destroy().await.unwrap(); + }); + } + #[test_traced] fn test_init_at_size_zero() { let executor = deterministic::Runner::default(); diff --git a/storage/src/journal/segmented/fixed.rs b/storage/src/journal/segmented/fixed.rs index 0f991914e15..14e91b10641 100644 --- a/storage/src/journal/segmented/fixed.rs +++ b/storage/src/journal/segmented/fixed.rs @@ -105,6 +105,9 @@ impl Journal { "trailing bytes detected: truncating" ); manager.rewind_section(section, valid_size).await?; + // Startup repair is exceptional; make it durable immediately so callers do not + // need to track repaired sections separately. + manager.sync(section).await?; } } diff --git a/storage/src/journal/segmented/variable.rs b/storage/src/journal/segmented/variable.rs index a986199ae4f..bbb4be39eb4 100644 --- a/storage/src/journal/segmented/variable.rs +++ b/storage/src/journal/segmented/variable.rs @@ -402,6 +402,9 @@ impl Journal { new_size = state.valid_offset, "trailing bytes detected: truncating" ); + // Tail repair is exceptional; make it durable + // immediately so callers do not need to track + // replay-time repaired sections separately. if let Err(err) = state.blob.resize(state.valid_offset).await { @@ -409,6 +412,11 @@ impl Journal { state.done = true; return Some((batch, state)); } + if let Err(err) = state.blob.sync().await { + batch.push(Err(err.into())); + state.done = true; + return Some((batch, state)); + } } state.done = true; return if batch.is_empty() { @@ -439,6 +447,11 @@ impl Journal { state.done = true; return Some((batch, state)); } + if let Err(err) = state.blob.sync().await { + batch.push(Err(err.into())); + state.done = true; + return Some((batch, state)); + } state.done = true; return if batch.is_empty() { None diff --git a/storage/src/qmdb/any/sync/tests.rs b/storage/src/qmdb/any/sync/tests.rs index 47ba565587b..5716145561a 100644 --- a/storage/src/qmdb/any/sync/tests.rs +++ b/storage/src/qmdb/any/sync/tests.rs @@ -242,6 +242,7 @@ mod harnesses { crate::qmdb::any::ordered::fixed::test::apply_ops(&mut db, ops).await; let merkleized = db.new_batch().merkleize(&db, None::).await.unwrap(); db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); db } } @@ -312,6 +313,7 @@ mod harnesses { .await .unwrap(); db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); db } } @@ -375,6 +377,7 @@ mod harnesses { crate::qmdb::any::unordered::fixed::test::apply_ops(&mut db, ops).await; let merkleized = db.new_batch().merkleize(&db, None::).await.unwrap(); db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); db } } @@ -450,6 +453,7 @@ mod harnesses { .await .unwrap(); db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); db } } @@ -535,6 +539,7 @@ mod harnesses { db.apply_batch(merkleized).await.unwrap(); let merkleized = db.new_batch().merkleize(&db, None::).await.unwrap(); db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); db } } @@ -627,6 +632,7 @@ mod harnesses { .await .unwrap(); db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); db } } @@ -712,6 +718,7 @@ mod harnesses { db.apply_batch(merkleized).await.unwrap(); let merkleized = db.new_batch().merkleize(&db, None::).await.unwrap(); db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); db } } @@ -807,6 +814,7 @@ mod harnesses { .await .unwrap(); db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); db } } diff --git a/storage/src/qmdb/benches/common.rs b/storage/src/qmdb/benches/common.rs index b8cfaf6f258..60f878162fb 100644 --- a/storage/src/qmdb/benches/common.rs +++ b/storage/src/qmdb/benches/common.rs @@ -29,9 +29,9 @@ use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize}; pub type Digest = ::Digest; -/// Default items per blob for benchmarks. This is small enough that blob boundary crossings -/// (which trigger fsync) can dominate benchmark time. Benchmarks that don't want to measure -/// that cost should override via the `_with` config generators. +/// Default items per blob for benchmarks. This is small enough that blob boundary crossings can +/// affect benchmark time. Benchmarks that don't want to measure that cost should override via the +/// `_with` config generators. pub const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(50_000); pub const CHUNK_SIZE: usize = 32; pub const THREADS: NonZeroUsize = NZUsize!(8); diff --git a/storage/src/qmdb/benches/merkleize.rs b/storage/src/qmdb/benches/merkleize.rs index 7b7534df961..1d5d6636554 100644 --- a/storage/src/qmdb/benches/merkleize.rs +++ b/storage/src/qmdb/benches/merkleize.rs @@ -275,8 +275,7 @@ type CurOVar256Mmb = commonware_storage::qmdb::current::ordered::variable::Db< // -- Config -- -// Use huge blobs to avoid iteration times being affected by multiple fsyncs from crossing blob -// boundaries. +// Use huge blobs to avoid iteration times being affected by blob boundary crossings. const ITEMS_PER_BLOB: NonZeroU64 = NZU64!(10_000_000); const THREADS: NonZeroUsize = NZUsize!(8); const PAGE_SIZE: NonZeroU16 = NZU16!(4096); diff --git a/storage/src/qmdb/current/sync/tests.rs b/storage/src/qmdb/current/sync/tests.rs index 5728c6b5e34..8cc764e8172 100644 --- a/storage/src/qmdb/current/sync/tests.rs +++ b/storage/src/qmdb/current/sync/tests.rs @@ -229,6 +229,7 @@ mod harnesses { batch.merkleize(&db, None::).await.unwrap() }; db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); db } @@ -254,6 +255,7 @@ mod harnesses { batch.merkleize(&db, None::).await.unwrap() }; db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); db } @@ -279,6 +281,7 @@ mod harnesses { batch.merkleize(&db, None::).await.unwrap() }; db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); db } @@ -304,6 +307,7 @@ mod harnesses { batch.merkleize(&db, None::).await.unwrap() }; db.apply_batch(merkleized).await.unwrap(); + db.commit().await.unwrap(); db } diff --git a/storage/src/qmdb/store/db.rs b/storage/src/qmdb/store/db.rs index a2176c5c105..eb38a2ebcd4 100644 --- a/storage/src/qmdb/store/db.rs +++ b/storage/src/qmdb/store/db.rs @@ -646,8 +646,7 @@ mod test { let fetched_value = db.get(&key).await.unwrap(); assert_eq!(fetched_value.unwrap(), value); - // Simulate commit failure: drop without commit. The small batch fits in a single - // journal section so it is not auto-synced. + // Simulate commit failure: drop without commit. drop(db); // Re-open the store diff --git a/storage/src/qmdb/sync/engine.rs b/storage/src/qmdb/sync/engine.rs index d4ae78d75a1..e6b7401cbc3 100644 --- a/storage/src/qmdb/sync/engine.rs +++ b/storage/src/qmdb/sync/engine.rs @@ -599,8 +599,6 @@ where { for op in operations { self.journal.append(op).await?; - // No need to sync here -- the journal will periodically sync its storage - // and we will also sync when we're done applying all operations. } Ok(()) } diff --git a/storage/src/queue/shared.rs b/storage/src/queue/shared.rs index 38bb0753dd3..af532a54035 100644 --- a/storage/src/queue/shared.rs +++ b/storage/src/queue/shared.rs @@ -81,9 +81,7 @@ impl Writer { /// Append an item without committing, returning its position. The item /// is immediately visible to the reader but is **not durable** until - /// [Self::commit] is called or the underlying journal auto-syncs at a - /// section boundary (see [`variable::Journal`](crate::journal::contiguous::variable::Journal) - /// invariant 1). + /// [Self::commit] or [Self::sync] is called. /// /// # Errors /// diff --git a/storage/src/queue/storage.rs b/storage/src/queue/storage.rs index 632b8600a4a..981ec60e4de 100644 --- a/storage/src/queue/storage.rs +++ b/storage/src/queue/storage.rs @@ -150,8 +150,7 @@ impl Queue { /// Append an item without persisting. Call [Self::commit] or [Self::sync] /// afterwards to make it durable. The item is readable immediately but - /// is not guaranteed to survive a crash until committed or the journal - /// auto-syncs at a section boundary (see [`variable::Journal`] invariant 1). + /// is not guaranteed to survive a crash until committed or synced. /// /// # Errors ///