diff --git a/storage/src/journal/contiguous/fixed.rs b/storage/src/journal/contiguous/fixed.rs index 4b8f749cf3..61ea8e53d9 100644 --- a/storage/src/journal/contiguous/fixed.rs +++ b/storage/src/journal/contiguous/fixed.rs @@ -71,9 +71,19 @@ use futures::{stream::Stream, StreamExt}; use std::num::{NonZeroU64, NonZeroUsize}; use tracing::warn; -/// Metadata key for storing the pruning boundary. +/// Metadata key for a mid-section pruning boundary. +/// +/// This key is present only when the oldest retained item is not section-aligned. It is persisted +/// after the blob state it describes exists, so recovery treats it as stale if it no longer matches +/// the oldest retained section. const PRUNING_BOUNDARY_KEY: u64 = 1; +/// Metadata key for an in-progress clear/reset target. +/// +/// This key is synced before destructive reset work starts. If recovery sees it, recovery +/// completes the reset to the recorded target before normal bounds recovery. +const CLEAR_TARGET_KEY: u64 = 2; + /// Configuration for `Journal` storage. #[derive(Clone)] pub struct Config { @@ -104,12 +114,7 @@ struct Inner { /// Total number of items appended (not affected by pruning). size: u64, - /// If the journal's pruning boundary is mid-section (that is, the oldest retained item's - /// position is not a multiple of `items_per_blob`), then the metadata stores the pruning - /// boundary. Otherwise, the metadata is empty. - /// - /// When the journal is pruned, `metadata` must be persisted AFTER the inner journal is - /// persisted to ensure that its pruning boundary is never after the inner journal's size. + /// Key-value metadata for pruning-boundary and crash-safe clear state. // TODO(#2939): Remove metadata metadata: Metadata>, @@ -447,6 +452,50 @@ impl Journal { } } + fn parse_metadata_u64( + metadata: &Metadata>, + key: u64, + name: &str, + ) -> Result, Error> { + metadata + .get(&key) + .map(|bytes| { + Ok(u64::from_be_bytes(bytes.as_slice().try_into().map_err( + |_| Error::Corruption(format!("invalid {name} metadata")), + )?)) + }) + .transpose() + } + + fn stage_pruning_boundary_metadata( + metadata: &mut Metadata>, + items_per_blob: u64, + pruning_boundary: u64, + ) { + if !pruning_boundary.is_multiple_of(items_per_blob) { + metadata.put( + PRUNING_BOUNDARY_KEY, + pruning_boundary.to_be_bytes().to_vec(), + ); + } else { + metadata.remove(&PRUNING_BOUNDARY_KEY); + } + } + + async fn complete_clear_to_size( + journal: &mut SegmentedJournal, + metadata: &mut Metadata>, + items_per_blob: u64, + size: u64, + ) -> Result<(), Error> { + journal.clear().await?; + journal.ensure_section_exists(size / items_per_blob).await?; + Self::stage_pruning_boundary_metadata(metadata, items_per_blob, size); + metadata.remove(&CLEAR_TARGET_KEY); + metadata.sync().await?; + Ok(()) + } + /// Initialize a new `Journal` instance. /// /// All backing blobs are opened but not read during initialization. The `replay` method can be @@ -471,36 +520,38 @@ impl Journal { let mut metadata = Metadata::<_, u64, Vec>::init(context.child("meta"), meta_cfg).await?; - // Parse metadata if present - let meta_pruning_boundary = match metadata.get(&PRUNING_BOUNDARY_KEY) { - Some(bytes) => Some(u64::from_be_bytes(bytes.as_slice().try_into().map_err( - |_| Error::Corruption("invalid pruning_boundary metadata".into()), - )?)), - None => None, - }; - - // Recover bounds from metadata and/or blobs - let (pruning_boundary, size, needs_metadata_update) = - Self::recover_bounds(&journal, items_per_blob, meta_pruning_boundary).await?; - - // Persist metadata if needed - if needs_metadata_update { - if pruning_boundary.is_multiple_of(items_per_blob) { - metadata.remove(&PRUNING_BOUNDARY_KEY); - } else { - metadata.put( - PRUNING_BOUNDARY_KEY, - pruning_boundary.to_be_bytes().to_vec(), + let clear_target = Self::parse_metadata_u64(&metadata, CLEAR_TARGET_KEY, "clear_target")?; + let (pruning_boundary, size) = if let Some(clear_target) = clear_target { + warn!(clear_target, "crash repair: completing interrupted clear"); + Self::complete_clear_to_size(&mut journal, &mut metadata, items_per_blob, clear_target) + .await?; + (clear_target, clear_target) + } else { + // Parse metadata if present + let meta_pruning_boundary = + Self::parse_metadata_u64(&metadata, PRUNING_BOUNDARY_KEY, "pruning_boundary")?; + + // Recover bounds from metadata and/or blobs + let (pruning_boundary, size, needs_metadata_update) = + Self::recover_bounds(&journal, items_per_blob, meta_pruning_boundary).await?; + + // Persist metadata if needed + if needs_metadata_update { + Self::stage_pruning_boundary_metadata( + &mut metadata, + items_per_blob, + pruning_boundary, ); + metadata.sync().await?; } - metadata.sync().await?; - } - // Invariant: Tail blob must exist, even if empty. This ensures we can reconstruct size on - // reopen even after pruning all items. The tail blob is at `size / items_per_blob` (where - // the next append would go). - let tail_section = size / items_per_blob; - journal.ensure_section_exists(tail_section).await?; + // Invariant: Tail blob must exist, even if empty. This ensures we can reconstruct size + // on reopen even after pruning all items. The tail blob is at `size / items_per_blob` + // (where the next append would go). + let tail_section = size / items_per_blob; + journal.ensure_section_exists(tail_section).await?; + (pruning_boundary, size) + }; let metrics = Metrics::new(context); metrics.update(size, pruning_boundary, items_per_blob); @@ -522,8 +573,9 @@ impl Journal { /// If `meta_pruning_boundary` is `Some`, validates it against the physical blob state: /// - If metadata is section-aligned, it's unnecessary and we use blob-based boundary /// - If metadata refers to a pruned section, it's stale and we use blob-based boundary - /// - If metadata refers to a future section, it must have been written by [Self::clear_to_size] - /// or [Self::init_at_size] and crashed before writing the blobs. Fall back to blobs. + /// - If metadata refers to a future section, it is stale relative to the blobs. Fall back to + /// blobs. Current clear/reset operations use `CLEAR_TARGET_KEY` and are completed before + /// this path runs. /// - Otherwise, metadata is valid and we use it /// /// If `meta_pruning_boundary` is `None`, computes bounds purely from blobs. @@ -543,9 +595,8 @@ impl Journal { let meta_oldest_section = meta_pruning_boundary / items_per_blob; match inner.oldest_section() { None => { - // No blobs exist but metadata claims mid-section boundary. - // This can happen if we crash after inner.clear() but before - // ensure_section_exists(). Ignore stale metadata. + // No blobs exist but metadata claims a mid-section boundary. Without a + // clear target this metadata is stale, so fall back to blobs. warn!( meta_oldest_section, "crash repair: no blobs exist, ignoring stale metadata" @@ -560,9 +611,9 @@ impl Journal { (blob_boundary, true) } Some(oldest_section) if meta_oldest_section > oldest_section => { - // Metadata references a section ahead of the oldest blob. This can happen - // if we crash during clear_to_size/init_at_size after blobs update but - // before metadata update. Fall back to blob state. + // Metadata references a section ahead of the oldest blob. Current + // clear/reset operations use CLEAR_TARGET_KEY, so without that key this is + // stale metadata. warn!( meta_oldest_section, oldest_section, @@ -675,59 +726,24 @@ impl Journal { /// - `bounds.start` equals `size` (no data exists) /// /// # Crash Safety - /// If a crash occurs during this operation, `init()` will recover to a consistent state - /// (though possibly different from the intended `size`). + /// In the event of a crash during this call, upon restart recovery will ensure the journal is + /// either still in its prior state, or has bounds `size..size`. #[commonware_macros::stability(ALPHA)] pub async fn init_at_size(context: E, cfg: Config, size: u64) -> Result { - let items_per_blob = cfg.items_per_blob.get(); - let tail_section = size / items_per_blob; - - let blob_partition = Self::select_blob_partition(&context, &cfg).await?; - let segmented_cfg = SegmentedConfig { - partition: blob_partition, - page_cache: cfg.page_cache, - write_buffer: cfg.write_buffer, - }; + // Fail before writing clear intent if existing blob partitions are already inconsistent. + Self::select_blob_partition(&context, &cfg).await?; - // Initialize both stores. let meta_cfg = MetadataConfig { partition: format!("{}-metadata", cfg.partition), codec_config: ((0..).into(), ()), }; let mut metadata = - Metadata::<_, u64, Vec>::init(context.child("meta"), meta_cfg).await?; - let mut journal = SegmentedJournal::init(context.child("blobs"), segmented_cfg).await?; - - // Clear blobs before updating metadata. - // This ordering is critical for crash safety: - // - Crash after clear: no blobs, recovery returns (0, 0), metadata ignored - // - Crash after create: old metadata triggers "metadata ahead" warning, - // recovery falls back to blob state. - journal.clear().await?; - journal.ensure_section_exists(tail_section).await?; - - // Persist metadata if pruning_boundary is mid-section. - if !size.is_multiple_of(items_per_blob) { - metadata.put(PRUNING_BOUNDARY_KEY, size.to_be_bytes().to_vec()); - metadata.sync().await?; - } else if metadata.get(&PRUNING_BOUNDARY_KEY).is_some() { - metadata.remove(&PRUNING_BOUNDARY_KEY); - metadata.sync().await?; - } - - let metrics = Metrics::new(context); - metrics.update(size, size, items_per_blob); + Metadata::<_, u64, Vec>::init(context.child("intent"), meta_cfg).await?; + metadata.put(CLEAR_TARGET_KEY, size.to_be_bytes().to_vec()); + metadata.sync().await?; + drop(metadata); - Ok(Self { - inner: UpgradableAsyncRwLock::new(Inner { - journal, - size, - metadata, - pruning_boundary: size, // No data exists yet - }), - items_per_blob, - metrics, - }) + Self::init(context, cfg).await } /// Convert a global position to (section, position_in_section). @@ -979,36 +995,27 @@ impl Journal { /// Clear all data and reset the journal to a new starting position. /// - /// Unlike `destroy`, this keeps the journal alive so it can be reused. - /// After clearing, the journal will behave as if initialized with `init_at_size(new_size)`. + /// Unlike `destroy`, this keeps the journal alive so it can be reused. After clearing, the + /// journal will behave as if initialized with `init_at_size(new_size)`. /// /// # Crash Safety - /// If a crash occurs during this operation, `init()` will recover to a consistent state - /// (though possibly different from the intended `new_size`). + /// + /// In the event of a crash during this call, upon restart recovery will ensure the journal is + /// either still in its prior state, or has bounds `new_size..new_size`. pub(crate) async fn clear_to_size(&self, new_size: u64) -> Result<(), Error> { - // Clear blobs before updating metadata. - // This ordering is critical for crash safety: - // - Crash after clear: no blobs, recovery returns (0, 0), metadata ignored - // - Crash after create: old metadata triggers "metadata ahead" warning, - // recovery falls back to blob state let mut inner = self.inner.write().await; - inner.journal.clear().await?; - let tail_section = new_size / self.items_per_blob; - inner.journal.ensure_section_exists(tail_section).await?; + inner + .metadata + .put(CLEAR_TARGET_KEY, new_size.to_be_bytes().to_vec()); + inner.metadata.sync().await?; + let Inner { + journal, metadata, .. + } = &mut *inner; + Self::complete_clear_to_size(journal, metadata, self.items_per_blob, new_size).await?; inner.size = new_size; inner.pruning_boundary = new_size; // No data exists - // Persist metadata only when pruning_boundary is mid-section. - if !inner.pruning_boundary.is_multiple_of(self.items_per_blob) { - let value = inner.pruning_boundary.to_be_bytes().to_vec(); - inner.metadata.put(PRUNING_BOUNDARY_KEY, value); - inner.metadata.sync().await?; - } else if inner.metadata.get(&PRUNING_BOUNDARY_KEY).is_some() { - inner.metadata.remove(&PRUNING_BOUNDARY_KEY); - inner.metadata.sync().await?; - } - self.metrics .update(inner.size, inner.pruning_boundary, self.items_per_blob); Ok(()) @@ -2919,12 +2926,23 @@ mod tests { journal.sync().await.unwrap(); drop(journal); - // Crash Scenario 1: After clear(), before blob creation - // Simulate by manually removing all blobs but leaving metadata + // Crash Scenario 1: after clear intent is synced and blobs are removed, but before + // the new tail blob is created. let blob_part = blob_partition(&cfg); + let meta_cfg = MetadataConfig { + partition: format!("{}-metadata", cfg.partition), + codec_config: ((0..).into(), ()), + }; + let mut metadata = + Metadata::<_, u64, Vec>::init(context.child("intent_meta"), meta_cfg.clone()) + .await + .unwrap(); + metadata.put(CLEAR_TARGET_KEY, 12u64.to_be_bytes().to_vec()); + metadata.sync().await.unwrap(); + drop(metadata); context.remove(&blob_part, None).await.unwrap(); - // Recovery should see no blobs and return empty journal, ignoring metadata + // Recovery should complete the interrupted init_at_size(12). let journal = Journal::<_, Digest>::init( context.child("crash").with_attribute("index", 1), cfg.clone(), @@ -2932,38 +2950,27 @@ mod tests { .await .expect("init failed after clear crash"); let bounds = journal.bounds().await; - assert_eq!(bounds.end, 0); - assert_eq!(bounds.start, 0); + assert_eq!(bounds.end, 12); + assert_eq!(bounds.start, 12); drop(journal); // Restore metadata for next scenario (it might have been removed by init) - let meta_cfg = MetadataConfig { - partition: format!("{}-metadata", cfg.partition), - codec_config: ((0..).into(), ()), - }; let mut metadata = Metadata::<_, u64, Vec>::init(context.child("restore_meta"), meta_cfg.clone()) .await .unwrap(); metadata.put(PRUNING_BOUNDARY_KEY, 7u64.to_be_bytes().to_vec()); + metadata.put(CLEAR_TARGET_KEY, 2u64.to_be_bytes().to_vec()); metadata.sync().await.unwrap(); + drop(metadata); - // Crash Scenario 2: After ensure_section_exists(), before metadata update - // Target: init_at_size(12) -> should be section 2 (starts at 10) - // State: Blob at section 2, Metadata says 7 (section 1) - // Wait, old metadata (7) is BEHIND new blob (12/5 = 2). - // recover_bounds treats "meta < blob" as stale -> uses blob. - - // Let's try init_at_size(2) -> section 0. - // Old metadata says 7 (section 1). - // State: Blob at section 0, Metadata says 7 (section 1). - // recover_bounds sees "meta (1) > blob (0)" -> metadata ahead -> uses blob. - - // Simulate: Create blob at section 0 (tail for init_at_size(2)) + // Crash Scenario 2: after the new tail blob is created, but before final metadata + // replaces the clear intent. let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap(); blob.sync().await.unwrap(); // Ensure it exists + drop(blob); - // Recovery should warn "metadata ahead" and use blob state (0, 0) + // Recovery should complete the interrupted init_at_size(2). let journal = Journal::<_, Digest>::init( context.child("crash").with_attribute("index", 2), cfg.clone(), @@ -2971,11 +2978,9 @@ mod tests { .await .expect("init failed after create crash"); - // Should recover to blob state (section 0 aligned) let bounds = journal.bounds().await; - assert_eq!(bounds.start, 0); - // Size is 0 because blob is empty - assert_eq!(bounds.end, 0); + assert_eq!(bounds.start, 2); + assert_eq!(bounds.end, 2); journal.destroy().await.unwrap(); }); } @@ -2995,29 +3000,114 @@ mod tests { journal.sync().await.unwrap(); drop(journal); - // Crash Scenario: clear_to_size(2) [Section 0] - // We want to simulate crash after blob 0 created, but metadata still 12. + // Crash Scenario: clear_to_size(2) after the intent is synced and blob 0 is created, + // but before final metadata replaces the clear intent. - // manually clear blobs let blob_part = blob_partition(&cfg); + let meta_cfg = MetadataConfig { + partition: format!("{}-metadata", cfg.partition), + codec_config: ((0..).into(), ()), + }; + let mut metadata = Metadata::<_, u64, Vec>::init(context.child("meta"), meta_cfg) + .await + .unwrap(); + metadata.put(CLEAR_TARGET_KEY, 2u64.to_be_bytes().to_vec()); + metadata.sync().await.unwrap(); + drop(metadata); + context.remove(&blob_part, None).await.unwrap(); - // manually create section 0 let (blob, _) = context.open(&blob_part, &0u64.to_be_bytes()).await.unwrap(); blob.sync().await.unwrap(); - // Metadata is still 12 (from setup) - // Blob is Section 0 - // Metadata (12 -> sec 2) > Blob (sec 0) -> Ahead warning - let journal = Journal::<_, Digest>::init(context.child("crash_clear"), cfg.clone()) .await .expect("init failed after clear_to_size crash"); - // Should fallback to blobs let bounds = journal.bounds().await; - assert_eq!(bounds.start, 0); - assert_eq!(bounds.end, 0); + assert_eq!(bounds.start, 2); + assert_eq!(bounds.end, 2); + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_clear_to_size_crash_after_intent_before_blobs() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(5)); + let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone()) + .await + .unwrap(); + for i in 0..12u64 { + journal.append(&test_digest(i)).await.unwrap(); + } + journal.sync().await.unwrap(); + + let meta_cfg = MetadataConfig { + partition: format!("{}-metadata", cfg.partition), + codec_config: ((0..).into(), ()), + }; + let mut metadata = Metadata::<_, u64, Vec>::init(context.child("meta"), meta_cfg) + .await + .unwrap(); + metadata.put(CLEAR_TARGET_KEY, 100u64.to_be_bytes().to_vec()); + metadata.sync().await.unwrap(); + drop(metadata); + drop(journal); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + .await + .expect("init failed after clear intent crash"); + assert_eq!(journal.bounds().await, 100..100); + let pos = journal.append(&test_digest(100)).await.unwrap(); + assert_eq!(pos, 100); + journal.destroy().await.unwrap(); + }); + } + + #[test_traced] + fn test_fixed_journal_clear_to_size_crash_after_mid_section_intent_with_old_blobs_present() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(10)); + let journal = + Journal::<_, Digest>::init_at_size(context.child("first"), cfg.clone(), 10) + .await + .unwrap(); + + for i in 0..6u64 { + let pos = journal.append(&test_digest(i)).await.unwrap(); + assert_eq!(pos, 10 + i); + } + journal.sync().await.unwrap(); + + let meta_cfg = MetadataConfig { + partition: format!("{}-metadata", cfg.partition), + codec_config: ((0..).into(), ()), + }; + let mut metadata = Metadata::<_, u64, Vec>::init(context.child("meta"), meta_cfg) + .await + .unwrap(); + metadata.put(CLEAR_TARGET_KEY, 15u64.to_be_bytes().to_vec()); + metadata.sync().await.unwrap(); + drop(metadata); + drop(journal); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg.clone()) + .await + .expect("init failed after mid-section clear intent crash"); + assert_eq!(journal.bounds().await, 15..15); + drop(journal); + + let journal = Journal::<_, Digest>::init(context.child("third"), cfg.clone()) + .await + .expect("init failed after completing mid-section clear intent"); + assert_eq!(journal.bounds().await, 15..15); + assert!(matches!(journal.read(14).await, Err(Error::ItemPruned(14)))); + let pos = journal.append(&test_digest(100)).await.unwrap(); + assert_eq!(pos, 15); + assert_eq!(journal.read(15).await.unwrap(), test_digest(100)); journal.destroy().await.unwrap(); }); }