diff --git a/Cargo.lock b/Cargo.lock index 95753f2a822..fe588b88688 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1700,6 +1700,7 @@ dependencies = [ "criterion", "futures", "futures-util", + "governor", "rand 0.8.5", "rstest", "thiserror 2.0.17", diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 95a9e924280..fa7313c761a 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -705,6 +705,8 @@ stability_scope!(BETA { /// Remove a blob from a given partition. /// /// If no `name` is provided, the entire partition is removed. + /// If a `name` is provided, existing [Blob] handles for that blob must remain readable + /// until they are dropped, but the blob must be removed from future namespace lookups. /// /// An Ok result indicates the blob is durably removed. fn remove( diff --git a/runtime/src/storage/mod.rs b/runtime/src/storage/mod.rs index f21817028b2..325a0604e80 100644 --- a/runtime/src/storage/mod.rs +++ b/runtime/src/storage/mod.rs @@ -305,6 +305,7 @@ pub(crate) mod tests { test_read_at_buf_returns_same_buffer(&storage).await; test_read_at_buf_insufficient_capacity(&storage).await; test_read_at_buf_larger_capacity(&storage).await; + test_read_after_remove(&storage).await; } /// Test opening a blob, writing to it, and reading back the data. @@ -342,6 +343,25 @@ pub(crate) mod tests { assert!(blobs.is_empty(), "Blob was not removed as expected"); } + /// Test that existing blob handles remain readable after the blob is removed from storage. + async fn test_read_after_remove(storage: &S) + where + S: Storage + Send + Sync, + S::Blob: Send + Sync, + { + let (blob, _) = storage.open("read_after_remove", b"blob").await.unwrap(); + blob.write_at(0, b"still here").await.unwrap(); + blob.sync().await.unwrap(); + + storage + .remove("read_after_remove", Some(b"blob")) + .await + .unwrap(); + + let read = blob.read_at(0, 10).await.unwrap(); + assert_eq!(read.coalesce(), b"still here"); + } + /// Test scanning a partition for blobs. async fn test_scan(storage: &S) where diff --git a/storage/Cargo.toml b/storage/Cargo.toml index 24753430c2c..8b6c4cc8314 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -38,6 +38,7 @@ commonware-math.workspace = true commonware-runtime = { workspace = true, features = ["test-utils"] } commonware-storage = { path = ".", features = ["std"] } criterion.workspace = true +governor.workspace = true rand.workspace = true rstest.workspace = true tracing-subscriber.workspace = true diff --git a/storage/src/journal/contiguous/fixed.rs b/storage/src/journal/contiguous/fixed.rs index adc350704f9..2c4c5252a0a 100644 --- a/storage/src/journal/contiguous/fixed.rs +++ b/storage/src/journal/contiguous/fixed.rs @@ -58,7 +58,10 @@ use super::Reader as _; use crate::{ journal::{ contiguous::{metrics::FixedMetrics as Metrics, Many, Mutable}, - segmented::fixed::{Config as SegmentedConfig, Journal as SegmentedJournal}, + segmented::fixed::{ + Config as SegmentedConfig, Journal as SegmentedJournal, + PendingPrune as SegmentedPendingPrune, + }, Error, }, metadata::{Config as MetadataConfig, Metadata}, @@ -66,7 +69,9 @@ use crate::{ }; use commonware_codec::CodecFixedShared; use commonware_runtime::buffer::paged::CacheRef; -use commonware_utils::sync::{AsyncRwLockReadGuard, UpgradableAsyncRwLock}; +use commonware_utils::sync::{ + AsyncRwLockReadGuard, UpgradableAsyncRwLock, UpgradableAsyncRwLockUpgradableReadGuard, +}; use futures::{stream::Stream, StreamExt}; use std::num::{NonZeroU64, NonZeroUsize}; use tracing::warn; @@ -210,6 +215,40 @@ pub struct Reader<'a, E: Context, A: CodecFixedShared> { metrics: &'a Metrics, } +/// Pending fixed journal prune whose storage removal has completed but is not yet published. +pub(in crate::journal) struct PendingPrune<'a, E: Context, A: CodecFixedShared> { + inner: UpgradableAsyncRwLockUpgradableReadGuard<'a, Inner>, + pending: SegmentedPendingPrune, + items_per_blob: u64, + metrics: &'a Metrics, +} + +impl PendingPrune<'_, E, A> { + /// Finish a successful prune by dropping removed section handles and advancing the boundary. + pub(in crate::journal) async fn finish(self) { + let Self { + inner, + pending, + items_per_blob, + metrics, + } = self; + let mut inner = inner.upgrade().await; + inner.journal.finish_prune(pending); + + // Update pruning_boundary to the start of the oldest remaining section. + let new_oldest = inner + .journal + .oldest_section() + .expect("all sections pruned - violates tail section invariant"); + // New pruning boundary should only move forward + assert!(inner.pruning_boundary < new_oldest * items_per_blob); + inner.pruning_boundary = new_oldest * items_per_blob; + + // Update metrics + metrics.update(inner.size, inner.pruning_boundary, items_per_blob); + } +} + impl super::Reader for Reader<'_, E, A> { type Item = A; @@ -936,7 +975,22 @@ impl Journal { /// Note that this operation may NOT be atomic, however it's guaranteed not to leave gaps in the /// event of failure as items are always pruned in order from oldest to newest. pub async fn prune(&self, min_item_pos: u64) -> Result { - let mut inner = self.inner.write().await; + let Some(pending) = self.begin_prune(min_item_pos).await? else { + return Ok(false); + }; + pending.finish().await; + Ok(true) + } + + /// Begin pruning by removing old section blobs from storage without dropping open handles. + /// + /// Returns a token if any sections were removed from storage. Call [PendingPrune::finish] to + /// publish the prune in memory. + pub(in crate::journal) async fn begin_prune( + &self, + min_item_pos: u64, + ) -> Result>, Error> { + let inner = self.inner.upgradable_read().await; // Calculate the section that would contain min_item_pos let target_section = min_item_pos / self.items_per_blob; @@ -947,22 +1001,17 @@ impl Journal { // Cap to tail section. The tail section is guaranteed to exist by our invariant. let min_section = std::cmp::min(target_section, tail_section); - let pruned = inner.journal.prune(min_section).await?; - - // After pruning, update pruning_boundary to the start of the oldest remaining section - if pruned { - let new_oldest = inner - .journal - .oldest_section() - .expect("all sections pruned - violates tail section invariant"); - // Pruning boundary only moves forward - assert!(inner.pruning_boundary < new_oldest * self.items_per_blob); - inner.pruning_boundary = new_oldest * self.items_per_blob; - self.metrics - .update(inner.size, inner.pruning_boundary, self.items_per_blob); - } + // Begin pruning old sections while allowing concurrent readers to use existing open handles. + let Some(pending) = inner.journal.begin_prune(min_section).await? else { + return Ok(None); + }; - Ok(pruned) + Ok(Some(PendingPrune { + inner, + pending, + items_per_blob: self.items_per_blob, + metrics: &self.metrics, + })) } /// Remove any persisted data created by the journal. @@ -1121,14 +1170,22 @@ impl crate::journal::authenticated::Inner fo mod tests { use super::*; use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256}; - use commonware_macros::test_traced; + use commonware_macros::{select, test_traced}; use commonware_runtime::{ deterministic::{self, Context}, - Blob, BufferPooler, Error as RuntimeError, Metrics as _, Runner, Storage, Supervisor as _, + Blob, BufferPooler, Clock as _, Error as RuntimeError, Metrics as _, Runner, Spawner as _, + Storage, Supervisor as _, }; - use commonware_utils::{NZUsize, NZU16, NZU64}; + use commonware_utils::{sync::Notify, NZUsize, NZU16, NZU64}; use futures::{pin_mut, StreamExt}; - use std::num::NonZeroU16; + use std::{ + num::NonZeroU16, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::{Duration, SystemTime}, + }; const PAGE_SIZE: NonZeroU16 = NZU16!(44); const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(3); @@ -1159,6 +1216,190 @@ mod tests { } } + /// Coordinates a test pause after a target section is removed from storage. + struct RemoveBlocker { + target: Vec, + removed: Notify, + release: Notify, + } + + impl RemoveBlocker { + /// Create a blocker for removal of the given section. + fn new(section: u64) -> Self { + Self { + target: section.to_be_bytes().to_vec(), + removed: Notify::new(), + release: Notify::new(), + } + } + } + + enum RemoveHook { + Block(Arc), + FailOnCall { + calls: Arc, + fail_on: usize, + }, + } + + /// Deterministic test context that can intercept remove calls. + struct RemoveHookContext { + inner: Context, + hook: RemoveHook, + } + + impl RemoveHookContext { + /// Wrap a deterministic context and pause removal of the blocker's target section. + fn blocking(inner: Context, blocker: Arc) -> Self { + Self { + inner, + hook: RemoveHook::Block(blocker), + } + } + + /// Wrap a deterministic context and fail the `fail_on`th remove call. + fn failing(inner: Context, fail_on: usize) -> Self { + Self { + inner, + hook: RemoveHook::FailOnCall { + calls: Arc::new(AtomicUsize::new(0)), + fail_on, + }, + } + } + } + + impl commonware_runtime::Supervisor for RemoveHookContext { + fn name(&self) -> commonware_runtime::Name { + self.inner.name() + } + + fn child(&self, label: &'static str) -> Self { + Self { + inner: self.inner.child(label), + hook: self.hook.clone(), + } + } + + fn with_attribute(self, key: &'static str, value: impl std::fmt::Display) -> Self { + Self { + inner: self.inner.with_attribute(key, value), + hook: self.hook, + } + } + } + + impl Clone for RemoveHook { + fn clone(&self) -> Self { + match self { + Self::Block(blocker) => Self::Block(blocker.clone()), + Self::FailOnCall { calls, fail_on } => Self::FailOnCall { + calls: calls.clone(), + fail_on: *fail_on, + }, + } + } + } + + impl commonware_runtime::Metrics for RemoveHookContext { + fn register< + N: Into, + H: Into, + M: commonware_runtime::telemetry::metrics::Metric, + >( + &self, + name: N, + help: H, + metric: M, + ) -> commonware_runtime::telemetry::metrics::Registered { + self.inner.register(name, help, metric) + } + + fn encode(&self) -> String { + self.inner.encode() + } + } + + impl governor::clock::Clock for RemoveHookContext { + type Instant = SystemTime; + + fn now(&self) -> Self::Instant { + self.inner.current() + } + } + + impl governor::clock::ReasonablyRealtime for RemoveHookContext {} + + impl commonware_runtime::Clock for RemoveHookContext { + fn current(&self) -> SystemTime { + self.inner.current() + } + + fn sleep( + &self, + duration: Duration, + ) -> impl std::future::Future + Send + 'static { + self.inner.sleep(duration) + } + + fn sleep_until( + &self, + deadline: SystemTime, + ) -> impl std::future::Future + Send + 'static { + self.inner.sleep_until(deadline) + } + } + + impl BufferPooler for RemoveHookContext { + fn network_buffer_pool(&self) -> &commonware_runtime::BufferPool { + self.inner.network_buffer_pool() + } + + fn storage_buffer_pool(&self) -> &commonware_runtime::BufferPool { + self.inner.storage_buffer_pool() + } + } + + impl Storage for RemoveHookContext { + type Blob = ::Blob; + + async fn open_versioned( + &self, + partition: &str, + name: &[u8], + versions: std::ops::RangeInclusive, + ) -> Result<(Self::Blob, u64, u16), RuntimeError> { + self.inner.open_versioned(partition, name, versions).await + } + + async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), RuntimeError> { + match &self.hook { + RemoveHook::Block(blocker) => { + let block = name.is_some_and(|name| name == blocker.target.as_slice()); + let result = self.inner.remove(partition, name).await; + if block { + blocker.removed.notify_one(); + blocker.release.notified().await; + } + result + } + RemoveHook::FailOnCall { calls, fail_on } => { + let call = calls.fetch_add(1, Ordering::Relaxed) + 1; + if call == *fail_on { + return Err(RuntimeError::Io(std::io::Error::other( + "injected remove failure", + ))); + } + self.inner.remove(partition, name).await + } + } + } + + async fn scan(&self, partition: &str) -> Result>, RuntimeError> { + self.inner.scan(partition).await + } + } + #[test_traced] fn test_fixed_journal_init_conflicting_partitions() { let executor = deterministic::Runner::default(); @@ -1401,6 +1642,133 @@ mod tests { }); } + #[test_traced] + fn test_fixed_journal_reads_during_prune_unlink() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let blocker = Arc::new(RemoveBlocker::new(0)); + let journal_context = + RemoveHookContext::blocking(context.child("journal"), blocker.clone()); + let cfg = test_cfg(&journal_context, NZU64!(2)); + let journal = Arc::new( + Journal::<_, Digest>::init(journal_context.child("inner"), cfg) + .await + .expect("failed to initialize journal"), + ); + + for i in 0..6 { + let pos = journal.append(&test_digest(i)).await.unwrap(); + assert_eq!(pos, i); + } + journal.sync().await.unwrap(); + + let held_reader = journal.reader().await; + assert_eq!(held_reader.read(0).await.unwrap(), test_digest(0)); + + let prune = context.child("prune").spawn({ + let journal = journal.clone(); + |_| async move { journal.prune(4).await } + }); + + select! { + _ = blocker.removed.notified() => {}, + _ = context.sleep(Duration::from_secs(1)) => panic!("prune did not unlink section"), + } + + let reader = journal.reader(); + pin_mut!(reader); + let reader = select! { + reader = reader => reader, + _ = context.sleep(Duration::from_secs(1)) => { + panic!("reader blocked while prune was unlinking") + }, + }; + + assert_eq!(reader.read(0).await.unwrap(), test_digest(0)); + assert_eq!(reader.read(4).await.unwrap(), test_digest(4)); + drop(reader); + + blocker.release.notify_one(); + drop(held_reader); + assert!(prune.await.unwrap().unwrap()); + + assert_eq!(journal.pruning_boundary().await, 4); + assert_eq!(journal.test_oldest_section().await, Some(2)); + assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(0)))); + assert_eq!(journal.read(4).await.unwrap(), test_digest(4)); + }); + } + + #[test_traced] + fn test_fixed_journal_prune_remove_failure_reopens_contiguous() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = test_cfg(&context, NZU64!(2)); + let journal = Journal::<_, Digest>::init(context.child("first"), cfg.clone()) + .await + .expect("failed to initialize journal"); + + for i in 0..6 { + let pos = journal.append(&test_digest(i)).await.unwrap(); + assert_eq!(pos, i); + } + journal.sync().await.unwrap(); + + let fault_cfg = context.storage_fault_config(); + *fault_cfg.write() = deterministic::FaultConfig::default().remove(1.0); + let err = journal.prune(4).await.expect_err("prune should fail"); + assert!(matches!(err, Error::Runtime(RuntimeError::Io(_)))); + drop(journal); + + *fault_cfg.write() = deterministic::FaultConfig::default(); + let journal = Journal::<_, Digest>::init(context.child("second"), cfg) + .await + .expect("failed to re-initialize journal"); + assert_eq!(journal.bounds().await, 0..6); + assert_eq!(journal.test_oldest_section().await, Some(0)); + for i in 0..6 { + assert_eq!(journal.read(i).await.unwrap(), test_digest(i)); + } + }); + } + + #[test_traced] + fn test_fixed_journal_partial_prune_remove_failure_reopens_suffix() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let remove_context = RemoveHookContext::failing(context.child("first"), 2); + let cfg = test_cfg(&remove_context, NZU64!(2)); + let journal = Journal::<_, Digest>::init(remove_context.child("journal"), cfg.clone()) + .await + .expect("failed to initialize journal"); + + for i in 0..6 { + let pos = journal.append(&test_digest(i)).await.unwrap(); + assert_eq!(pos, i); + } + journal.sync().await.unwrap(); + + let err = journal.prune(4).await.expect_err("prune should fail"); + assert!(matches!(err, Error::Runtime(RuntimeError::Io(_)))); + assert_eq!(journal.pruning_boundary().await, 0); + + // The first section was unlinked from storage but remains readable through the + // open handle until the failed journal instance is dropped. + assert_eq!(journal.read(0).await.unwrap(), test_digest(0)); + drop(journal); + + let journal = Journal::<_, Digest>::init(context.child("second"), cfg) + .await + .expect("failed to re-initialize journal"); + assert_eq!(journal.bounds().await, 2..6); + assert_eq!(journal.test_oldest_section().await, Some(1)); + assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(0)))); + for i in 2..6 { + assert_eq!(journal.read(i).await.unwrap(), test_digest(i)); + } + }); + } + /// Append a lot of data to make sure we exercise page cache paging boundaries. #[test_traced] fn test_fixed_journal_append_a_lot_of_data() { diff --git a/storage/src/journal/contiguous/variable.rs b/storage/src/journal/contiguous/variable.rs index 9e25f45dd8c..94745b1e543 100644 --- a/storage/src/journal/contiguous/variable.rs +++ b/storage/src/journal/contiguous/variable.rs @@ -811,7 +811,7 @@ impl Journal { /// Errors may leave the journal in an inconsistent state. The journal should be closed and /// reopened to trigger alignment in [Journal::init]. pub async fn prune(&self, min_position: u64) -> Result { - let mut inner = self.inner.write().await; + let inner = self.inner.upgradable_read().await; if min_position <= inner.pruning_boundary { return Ok(false); @@ -822,16 +822,24 @@ impl Journal { // Calculate section number let min_section = position_to_section(min_position, self.items_per_section); + let new_boundary = (min_section * self.items_per_section).max(inner.pruning_boundary); - let pruned = inner.data.prune(min_section).await?; - if pruned { - let new_oldest = (min_section * self.items_per_section).max(inner.pruning_boundary); - inner.pruning_boundary = new_oldest; - self.offsets.prune(new_oldest).await?; - self.metrics - .update(inner.size, inner.pruning_boundary, self.items_per_section); - } - Ok(pruned) + let Some(data_pending) = inner.data.begin_prune(min_section).await? else { + return Ok(false); + }; + let Some(offsets_pending) = self.offsets.begin_prune(new_boundary).await? else { + return Err(Error::Corruption( + "data prune removed sections but offsets prune removed none".into(), + )); + }; + + let mut inner = inner.upgrade().await; + inner.data.finish_prune(data_pending); + offsets_pending.finish().await; + inner.pruning_boundary = new_boundary; + self.metrics + .update(inner.size, inner.pruning_boundary, self.items_per_section); + Ok(true) } /// Durably persist the journal. @@ -1278,13 +1286,21 @@ impl Journal { mod tests { use super::*; use crate::journal::contiguous::tests::run_contiguous_tests; - use commonware_macros::test_traced; + use commonware_macros::{select, test_traced}; use commonware_runtime::{ - buffer::paged::CacheRef, deterministic, Metrics as _, Runner, Storage, Supervisor as _, + buffer::paged::CacheRef, deterministic, BufferPooler, Clock as _, Error as RuntimeError, + Metrics as _, Runner, Spawner as _, Storage, Supervisor as _, + }; + use commonware_utils::{sequence::FixedBytes, sync::Notify, NZUsize, NZU16, NZU64}; + use futures::{pin_mut, FutureExt as _}; + use std::{ + num::NonZeroU16, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::{Duration, SystemTime}, }; - use commonware_utils::{sequence::FixedBytes, NZUsize, NZU16, NZU64}; - use futures::FutureExt as _; - use std::num::NonZeroU16; // Use some jank sizes to exercise boundary conditions. const PAGE_SIZE: NonZeroU16 = NZU16!(101); @@ -1293,6 +1309,376 @@ mod tests { const LARGE_PAGE_SIZE: NonZeroU16 = NZU16!(1024); const SMALL_PAGE_SIZE: NonZeroU16 = NZU16!(512); + /// Coordinates a test pause after a target section is removed from storage. + struct RemoveBlocker { + partition: String, + target: Vec, + removed: Notify, + release: Notify, + } + + impl RemoveBlocker { + /// Create a blocker for removal of the given section in `partition`. + fn new(partition: String, section: u64) -> Self { + Self { + partition, + target: section.to_be_bytes().to_vec(), + removed: Notify::new(), + release: Notify::new(), + } + } + } + + enum RemoveHook { + Block(Arc), + FailOnPartition { + partition: String, + calls: Arc, + fail_on: usize, + }, + } + + impl Clone for RemoveHook { + fn clone(&self) -> Self { + match self { + Self::Block(blocker) => Self::Block(blocker.clone()), + Self::FailOnPartition { + partition, + calls, + fail_on, + } => Self::FailOnPartition { + partition: partition.clone(), + calls: calls.clone(), + fail_on: *fail_on, + }, + } + } + } + + /// Deterministic test context that can intercept remove calls. + struct RemoveHookContext { + inner: deterministic::Context, + hook: RemoveHook, + } + + impl RemoveHookContext { + /// Wrap a deterministic context and pause removal of `section` in `partition`. + fn blocking( + inner: deterministic::Context, + partition: String, + section: u64, + ) -> (Self, Arc) { + let blocker = Arc::new(RemoveBlocker::new(partition, section)); + ( + Self { + inner, + hook: RemoveHook::Block(blocker.clone()), + }, + blocker, + ) + } + + /// Wrap a deterministic context and fail the `fail_on`th remove call in `partition`. + fn failing(inner: deterministic::Context, partition: String, fail_on: usize) -> Self { + Self { + inner, + hook: RemoveHook::FailOnPartition { + partition, + calls: Arc::new(AtomicUsize::new(0)), + fail_on, + }, + } + } + } + + impl commonware_runtime::Supervisor for RemoveHookContext { + fn name(&self) -> commonware_runtime::Name { + self.inner.name() + } + + fn child(&self, label: &'static str) -> Self { + Self { + inner: self.inner.child(label), + hook: self.hook.clone(), + } + } + + fn with_attribute(self, key: &'static str, value: impl std::fmt::Display) -> Self { + Self { + inner: self.inner.with_attribute(key, value), + hook: self.hook, + } + } + } + + impl commonware_runtime::Metrics for RemoveHookContext { + fn register< + N: Into, + H: Into, + M: commonware_runtime::telemetry::metrics::Metric, + >( + &self, + name: N, + help: H, + metric: M, + ) -> commonware_runtime::telemetry::metrics::Registered { + self.inner.register(name, help, metric) + } + + fn encode(&self) -> String { + self.inner.encode() + } + } + + impl governor::clock::Clock for RemoveHookContext { + type Instant = SystemTime; + + fn now(&self) -> Self::Instant { + self.inner.current() + } + } + + impl governor::clock::ReasonablyRealtime for RemoveHookContext {} + + impl commonware_runtime::Clock for RemoveHookContext { + fn current(&self) -> SystemTime { + self.inner.current() + } + + fn sleep( + &self, + duration: Duration, + ) -> impl std::future::Future + Send + 'static { + self.inner.sleep(duration) + } + + fn sleep_until( + &self, + deadline: SystemTime, + ) -> impl std::future::Future + Send + 'static { + self.inner.sleep_until(deadline) + } + } + + impl BufferPooler for RemoveHookContext { + fn network_buffer_pool(&self) -> &commonware_runtime::BufferPool { + self.inner.network_buffer_pool() + } + + fn storage_buffer_pool(&self) -> &commonware_runtime::BufferPool { + self.inner.storage_buffer_pool() + } + } + + impl Storage for RemoveHookContext { + type Blob = ::Blob; + + async fn open_versioned( + &self, + partition: &str, + name: &[u8], + versions: std::ops::RangeInclusive, + ) -> Result<(Self::Blob, u64, u16), RuntimeError> { + self.inner.open_versioned(partition, name, versions).await + } + + async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), RuntimeError> { + match &self.hook { + RemoveHook::Block(blocker) => { + let block = partition == blocker.partition + && name.is_some_and(|name| name == blocker.target.as_slice()); + let result = self.inner.remove(partition, name).await; + if block { + blocker.removed.notify_one(); + blocker.release.notified().await; + } + result + } + RemoveHook::FailOnPartition { + partition: target, + calls, + fail_on, + } => { + if partition == target { + let call = calls.fetch_add(1, Ordering::Relaxed) + 1; + if call == *fail_on { + return Err(RuntimeError::Io(std::io::Error::other( + "injected remove failure", + ))); + } + } + self.inner.remove(partition, name).await + } + } + } + + async fn scan(&self, partition: &str) -> Result>, RuntimeError> { + self.inner.scan(partition).await + } + } + + #[test_traced] + fn test_variable_journal_reads_during_prune_begin() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let partition = "variable-prune-begin-reads".to_string(); + let offsets_partition = format!("{partition}{OFFSETS_SUFFIX}-blobs"); + let (journal_context, blocker) = + RemoveHookContext::blocking(context.child("journal"), offsets_partition, 0); + let cfg = Config { + partition, + items_per_section: NZU64!(2), + compression: None, + codec_config: (), + page_cache: CacheRef::from_pooler( + &journal_context, + PAGE_SIZE, + NZUsize!(PAGE_CACHE_SIZE), + ), + write_buffer: NZUsize!(1024), + }; + let journal = Arc::new( + Journal::<_, u64>::init(journal_context.child("inner"), cfg) + .await + .expect("failed to initialize journal"), + ); + + for i in 0..6 { + let pos = journal.append(&(i * 100)).await.unwrap(); + assert_eq!(pos, i); + } + journal.sync().await.unwrap(); + + let held_reader = journal.reader().await; + assert_eq!(held_reader.read(0).await.unwrap(), 0); + + let prune = context.child("prune").spawn({ + let journal = journal.clone(); + |_| async move { journal.prune(4).await } + }); + + select! { + _ = blocker.removed.notified() => {}, + _ = context.sleep(Duration::from_secs(1)) => panic!("prune did not remove offset section"), + } + + let reader = journal.reader(); + pin_mut!(reader); + let reader = select! { + reader = reader => reader, + _ = context.sleep(Duration::from_secs(1)) => { + panic!("reader blocked while prune was removing blobs") + }, + }; + + assert_eq!(reader.read(0).await.unwrap(), 0); + assert_eq!(reader.read(4).await.unwrap(), 400); + drop(reader); + + blocker.release.notify_one(); + drop(held_reader); + assert!(prune.await.unwrap().unwrap()); + + assert_eq!(journal.bounds().await, 4..6); + assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(0)))); + assert_eq!(journal.read(4).await.unwrap(), 400); + }); + } + + #[test_traced] + fn test_variable_journal_prune_data_remove_failure_reopens_contiguous() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let partition = "variable-prune-data-failure".to_string(); + let data_partition = format!("{partition}{DATA_SUFFIX}"); + let journal_context = + RemoveHookContext::failing(context.child("first"), data_partition, 1); + let cfg = Config { + partition, + items_per_section: NZU64!(2), + compression: None, + codec_config: (), + page_cache: CacheRef::from_pooler( + &journal_context, + PAGE_SIZE, + NZUsize!(PAGE_CACHE_SIZE), + ), + write_buffer: NZUsize!(1024), + }; + let journal = Journal::<_, u64>::init(journal_context.child("journal"), cfg.clone()) + .await + .expect("failed to initialize journal"); + + for i in 0..6 { + let pos = journal.append(&(i * 100)).await.unwrap(); + assert_eq!(pos, i); + } + journal.sync().await.unwrap(); + + let err = journal.prune(4).await.expect_err("prune should fail"); + assert!(matches!(err, Error::Runtime(RuntimeError::Io(_)))); + assert_eq!(journal.bounds().await, 0..6); + for i in 0..6 { + assert_eq!(journal.read(i).await.unwrap(), i * 100); + } + drop(journal); + + let journal = Journal::<_, u64>::init(context.child("second"), cfg) + .await + .expect("failed to re-initialize journal"); + assert_eq!(journal.bounds().await, 0..6); + for i in 0..6 { + assert_eq!(journal.read(i).await.unwrap(), i * 100); + } + }); + } + + #[test_traced] + fn test_variable_journal_prune_offsets_remove_failure_reopens_suffix() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let partition = "variable-prune-offsets-failure".to_string(); + let offsets_partition = format!("{partition}{OFFSETS_SUFFIX}-blobs"); + let journal_context = + RemoveHookContext::failing(context.child("first"), offsets_partition, 1); + let cfg = Config { + partition, + items_per_section: NZU64!(2), + compression: None, + codec_config: (), + page_cache: CacheRef::from_pooler( + &journal_context, + PAGE_SIZE, + NZUsize!(PAGE_CACHE_SIZE), + ), + write_buffer: NZUsize!(1024), + }; + let journal = Journal::<_, u64>::init(journal_context.child("journal"), cfg.clone()) + .await + .expect("failed to initialize journal"); + + for i in 0..6 { + let pos = journal.append(&(i * 100)).await.unwrap(); + assert_eq!(pos, i); + } + journal.sync().await.unwrap(); + + let err = journal.prune(4).await.expect_err("prune should fail"); + assert!(matches!(err, Error::Runtime(RuntimeError::Io(_)))); + assert_eq!(journal.bounds().await, 0..6); + assert_eq!(journal.read(0).await.unwrap(), 0); + drop(journal); + + let journal = Journal::<_, u64>::init(context.child("second"), cfg) + .await + .expect("failed to re-initialize journal"); + assert_eq!(journal.bounds().await, 4..6); + assert!(matches!(journal.read(0).await, Err(Error::ItemPruned(0)))); + assert_eq!(journal.read(4).await.unwrap(), 400); + assert_eq!(journal.read(5).await.unwrap(), 500); + }); + } + #[test_traced] fn test_variable_append_many_compressed() { let executor = deterministic::Runner::default(); diff --git a/storage/src/journal/segmented/fixed.rs b/storage/src/journal/segmented/fixed.rs index 0f991914e15..b9eb938e56a 100644 --- a/storage/src/journal/segmented/fixed.rs +++ b/storage/src/journal/segmented/fixed.rs @@ -20,7 +20,7 @@ //! All data must be assigned to a `section`. This allows pruning entire sections //! (and their corresponding blobs) independently. -use super::manager::{AppendFactory, Config as ManagerConfig, Manager}; +use super::manager::{AppendFactory, Config as ManagerConfig, Manager, Unlinked}; use crate::journal::Error; use commonware_codec::{CodecFixed, CodecFixedShared, DecodeExt as _, ReadExt as _}; use commonware_runtime::{ @@ -73,6 +73,11 @@ pub struct Journal { _array: PhantomData, } +/// Opaque token for sections unlinked from storage and ready to finish pruning. +pub(in crate::journal) struct PendingPrune { + unlinked: Unlinked, +} + impl Journal { /// Size of each entry. pub const CHUNK_SIZE: usize = A::SIZE; @@ -381,6 +386,27 @@ impl Journal { self.manager.prune(min).await } + /// Begin pruning by removing section blobs less than `min` from storage without dropping open + /// handles. + /// + /// This allows fixed contiguous journals to perform physical removal while readers keep using + /// existing section handles. Returns a token if any sections were unlinked. + pub(in crate::journal) async fn begin_prune( + &self, + min: u64, + ) -> Result, Error> { + Ok(self + .manager + .begin_prune(min) + .await? + .map(|unlinked| PendingPrune { unlinked })) + } + + /// Finish pruning sections described by `pending` by removing them from memory. + pub(in crate::journal) fn finish_prune(&mut self, pending: PendingPrune) { + self.manager.finish_prune(pending.unlinked) + } + /// Returns the oldest section number, if any blobs exist. pub fn oldest_section(&self) -> Option { self.manager.oldest_section() diff --git a/storage/src/journal/segmented/manager.rs b/storage/src/journal/segmented/manager.rs index f804defa05f..54727b2e48b 100644 --- a/storage/src/journal/segmented/manager.rs +++ b/storage/src/journal/segmented/manager.rs @@ -142,6 +142,11 @@ pub struct Manager> { pruned: Counter, } +/// Opaque token for sections unlinked from storage and ready to finish pruning. +pub(in crate::journal) struct Unlinked { + min: u64, +} + impl> Manager { /// Initialize a new `Manager`. /// @@ -233,37 +238,51 @@ impl> Manager { Ok(()) } - /// Prune all sections less than `min`. Returns true if any were pruned. - pub async fn prune(&mut self, min: u64) -> Result { - // Prune any blobs that are smaller than the minimum - let mut pruned = false; - while let Some((§ion, _)) = self.blobs.first_key_value() { - // Stop pruning if we reach the minimum - if section >= min { - break; - } - - // Remove blob from map - let blob = self.blobs.remove(§ion).unwrap(); - let size = blob.size().await; - drop(blob); - - // Remove blob from storage + /// Begin pruning by removing section blobs less than `min` from storage without dropping open + /// handles. + /// + /// Already-open handles remain in the in-memory map, allowing readers to continue using the + /// sections until [Self::finish_prune] publishes the prune. + /// + /// Returns a token if any sections were unlinked. + pub(super) async fn begin_prune(&self, min: u64) -> Result, Error> { + let mut unlinked = false; + for (§ion, _) in self.blobs.range(..min) { self.context .remove(&self.partition, Some(§ion.to_be_bytes())) .await?; - pruned = true; + unlinked = true; - debug!(section, size, "pruned blob"); - self.tracked.dec(); - self.pruned.inc(); + debug!(section, "unlinked blob"); } + Ok(unlinked.then_some(Unlinked { min })) + } - if pruned { - self.oldest_retained_section = min; + /// Finish a successful unlink by removing unlinked sections from the in-memory map. + pub(super) fn finish_prune(&mut self, unlinked: Unlinked) { + let min = unlinked.min; + let retained = self.blobs.split_off(&min); + let pruned = std::mem::replace(&mut self.blobs, retained); + assert!(!pruned.is_empty(), "unlinked token must prune sections"); + for (section, blob) in pruned { + drop(blob); + debug!(section, "pruned blob"); + self.tracked.dec(); + self.pruned.inc(); } + self.oldest_retained_section = min; + } - Ok(pruned) + /// Prune all sections less than `min`. Returns true if any were pruned. + /// + /// Callers that need concurrent readers during physical removal should use + /// [Self::begin_prune] and [Self::finish_prune] instead. + pub(super) async fn prune(&mut self, min: u64) -> Result { + let Some(unlinked) = self.begin_prune(min).await? else { + return Ok(false); + }; + self.finish_prune(unlinked); + Ok(true) } /// Returns the oldest section number, if any blobs exist. diff --git a/storage/src/journal/segmented/variable.rs b/storage/src/journal/segmented/variable.rs index 0f44cc013b1..793608dc684 100644 --- a/storage/src/journal/segmented/variable.rs +++ b/storage/src/journal/segmented/variable.rs @@ -80,7 +80,7 @@ //! }); //! ``` -use super::manager::{AppendFactory, Config as ManagerConfig, Manager}; +use super::manager::{AppendFactory, Config as ManagerConfig, Manager, Unlinked}; use crate::journal::Error; use commonware_codec::{ varint::{UInt, MAX_U32_VARINT_SIZE}, @@ -221,6 +221,11 @@ pub struct Journal { codec_config: V::Cfg, } +/// Opaque token for sections unlinked from storage and ready to finish pruning. +pub(in crate::journal) struct PendingPrune { + unlinked: Unlinked, +} + impl Journal { /// Initialize a new `Journal` instance. /// @@ -819,6 +824,27 @@ impl Journal { self.manager.prune(min).await } + /// Begin pruning by removing section blobs less than `min` from storage without dropping open + /// handles. + /// + /// This allows contiguous variable journals to perform physical removal while readers keep + /// using existing section handles. Returns a token if any sections were unlinked. + pub(in crate::journal) async fn begin_prune( + &self, + min: u64, + ) -> Result, Error> { + Ok(self + .manager + .begin_prune(min) + .await? + .map(|unlinked| PendingPrune { unlinked })) + } + + /// Finish pruning sections described by `pending` by removing them from memory. + pub(in crate::journal) fn finish_prune(&mut self, pending: PendingPrune) { + self.manager.finish_prune(pending.unlinked) + } + /// Returns the number of the oldest section in the journal. pub fn oldest_section(&self) -> Option { self.manager.oldest_section()