diff --git a/examples/sync/src/bin/client.rs b/examples/sync/src/bin/client.rs index 791bfade160..5101aa2c81e 100644 --- a/examples/sync/src/bin/client.rs +++ b/examples/sync/src/bin/client.rs @@ -150,6 +150,8 @@ where apply_batch_size: 1024, max_outstanding_requests: config.max_outstanding_requests, update_rx: Some(update_receiver), + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; @@ -214,6 +216,8 @@ where apply_batch_size: 1024, max_outstanding_requests: config.max_outstanding_requests, update_rx: Some(update_receiver), + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; @@ -275,6 +279,8 @@ where apply_batch_size: 1024, max_outstanding_requests: config.max_outstanding_requests, update_rx: Some(update_receiver), + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; diff --git a/storage/fuzz/fuzz_targets/qmdb_any_fixed_sync.rs b/storage/fuzz/fuzz_targets/qmdb_any_fixed_sync.rs index bfe4db4394c..0c5f71f317c 100644 --- a/storage/fuzz/fuzz_targets/qmdb_any_fixed_sync.rs +++ b/storage/fuzz/fuzz_targets/qmdb_any_fixed_sync.rs @@ -130,6 +130,8 @@ async fn test_sync< let sync_config: sync::engine::Config = sync::engine::Config { context: context.with_label("sync").with_attribute("id", sync_id), update_rx: None, + finish_rx: None, + reached_target_tx: None, db_config, fetch_batch_size: NZU64!((fetch_batch_size % 100) + 1), target, diff --git a/storage/src/qmdb/any/sync/tests.rs b/storage/src/qmdb/any/sync/tests.rs index 44fcaafa9ff..b41cbc96524 100644 --- a/storage/src/qmdb/any/sync/tests.rs +++ b/storage/src/qmdb/any/sync/tests.rs @@ -22,6 +22,7 @@ use crate::{ }; use commonware_codec::Encode; use commonware_cryptography::sha256::Digest; +use commonware_macros::select; use commonware_runtime::{deterministic, BufferPooler, Metrics, Runner as _}; use commonware_utils::{ channel::{mpsc, oneshot}, @@ -29,6 +30,7 @@ use commonware_utils::{ sync::AsyncRwLock, NZU64, }; +use futures::{pin_mut, FutureExt}; use rand::RngCore as _; use std::{num::NonZeroU64, sync::Arc}; @@ -132,6 +134,8 @@ where apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; @@ -174,6 +178,8 @@ where fetch_batch_size: NZU64!(2), db_config, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; @@ -223,6 +229,8 @@ where apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; @@ -304,6 +312,8 @@ where apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; @@ -378,6 +388,8 @@ where apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; let synced_db: H::Db = sync::sync(config).await.unwrap(); @@ -479,6 +491,8 @@ where apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; let synced_db: H::Db = sync::sync(config).await.unwrap(); @@ -541,6 +555,8 @@ where apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), + finish_rx: None, + reached_target_tx: None, max_retained_roots: 1, }; let client: Engine = Engine::new(config).await.unwrap(); @@ -608,6 +624,8 @@ where apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), + finish_rx: None, + reached_target_tx: None, max_retained_roots: 1, }; let client: Engine = Engine::new(config).await.unwrap(); @@ -689,6 +707,8 @@ where apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: Some(update_receiver), + finish_rx: None, + reached_target_tx: None, max_retained_roots: 1, }; @@ -759,6 +779,8 @@ where apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), + finish_rx: None, + reached_target_tx: None, max_retained_roots: 1, }; @@ -790,6 +812,273 @@ where }); } +/// Test that explicit finish control waits for a finish signal even after reaching target. +pub(crate) fn test_sync_waits_for_explicit_finish() +where + Arc>: Resolver, Digest = Digest>, + OpOf: Encode, + JournalOf: Contiguous, +{ + let executor = deterministic::Runner::default(); + executor.start(|mut context| async move { + let mut target_db = H::init_db(context.with_label("target")).await; + target_db = H::apply_ops(target_db, H::create_ops(10)).await; + let initial_target = Target { + root: H::sync_target_root(&target_db), + range: non_empty_range!( + target_db.inactivity_floor_loc().await, + target_db.bounds().await.end + ), + }; + + target_db = H::apply_ops(target_db, H::create_ops_seeded(5, 1)).await; + let updated_lower_bound = target_db.inactivity_floor_loc().await; + let updated_upper_bound = target_db.bounds().await.end; + let updated_target = Target { + root: H::sync_target_root(&target_db), + range: non_empty_range!(updated_lower_bound, updated_upper_bound), + }; + let updated_verification_root = target_db.root(); + + let (update_sender, update_receiver) = mpsc::channel(1); + let (finish_sender, finish_receiver) = mpsc::channel(1); + let (reached_sender, mut reached_receiver) = mpsc::channel(1); + let target_db = Arc::new(target_db); + let config = Config { + context: context.with_label("client"), + db_config: H::config(&context.next_u64().to_string(), &context), + fetch_batch_size: NZU64!(10), + target: initial_target.clone(), + resolver: target_db.clone(), + apply_batch_size: 1024, + max_outstanding_requests: 1, + update_rx: Some(update_receiver), + finish_rx: Some(finish_receiver), + reached_target_tx: Some(reached_sender), + max_retained_roots: 0, + }; + + let sync_handle = sync::sync(config); + pin_mut!(sync_handle); + + select! { + _ = sync_handle.as_mut() => { + panic!("sync completed before explicit finish signal"); + }, + reached = reached_receiver.recv() => { + let reached = reached.expect("engine should report reached-target before finish"); + assert_eq!(reached, initial_target); + } + } + assert!( + sync_handle.as_mut().now_or_never().is_none(), + "sync must wait for explicit finish signal after reaching target" + ); + + update_sender + .send(updated_target.clone()) + .await + .expect("target update channel should be open"); + + select! { + _ = sync_handle.as_mut() => { + panic!("sync completed before explicit finish signal for updated target"); + }, + reached = reached_receiver.recv() => { + let reached = reached.expect("engine should report updated target before finish"); + assert_eq!(reached, updated_target); + } + } + assert!( + sync_handle.as_mut().now_or_never().is_none(), + "sync must still wait for explicit finish signal after updated target is reached" + ); + + finish_sender + .send(()) + .await + .expect("finish signal channel should be open"); + + let synced_db: H::Db = sync_handle + .await + .expect("sync should succeed after finish signal"); + assert_eq!(synced_db.root(), updated_verification_root); + assert_eq!(synced_db.bounds().await.end, updated_upper_bound); + assert_eq!(synced_db.inactivity_floor_loc().await, updated_lower_bound); + + synced_db.destroy().await.unwrap(); + Arc::try_unwrap(target_db) + .unwrap_or_else(|_| panic!("failed to unwrap Arc")) + .destroy() + .await + .unwrap(); + }); +} + +/// Test that a finish signal received before target completion still allows full sync. +pub(crate) fn test_sync_handles_early_finish_signal() +where + Arc>: Resolver, Digest = Digest>, + OpOf: Encode, + JournalOf: Contiguous, +{ + let executor = deterministic::Runner::default(); + executor.start(|mut context| async move { + let mut target_db = H::init_db(context.with_label("target")).await; + target_db = H::apply_ops(target_db, H::create_ops(30)).await; + let lower_bound = target_db.inactivity_floor_loc().await; + let upper_bound = target_db.bounds().await.end; + let target = Target { + root: H::sync_target_root(&target_db), + range: non_empty_range!(lower_bound, upper_bound), + }; + let verification_root = target_db.root(); + + let (finish_sender, finish_receiver) = mpsc::channel(1); + let (reached_sender, mut reached_receiver) = mpsc::channel(1); + finish_sender + .send(()) + .await + .expect("finish signal channel should be open"); + + let target_db = Arc::new(target_db); + let config = Config { + context: context.with_label("client"), + db_config: H::config(&context.next_u64().to_string(), &context), + fetch_batch_size: NZU64!(3), + target: target.clone(), + resolver: target_db.clone(), + apply_batch_size: 1024, + max_outstanding_requests: 1, + update_rx: None, + finish_rx: Some(finish_receiver), + reached_target_tx: Some(reached_sender), + max_retained_roots: 1, + }; + + let synced_db: H::Db = sync::sync(config) + .await + .expect("sync should complete after early finish signal"); + let reached = reached_receiver + .recv() + .await + .expect("engine should report reached-target"); + + assert_eq!(reached, target); + assert_eq!(synced_db.root(), verification_root); + assert_eq!(synced_db.bounds().await.end, upper_bound); + assert_eq!(synced_db.inactivity_floor_loc().await, lower_bound); + + synced_db.destroy().await.unwrap(); + Arc::try_unwrap(target_db) + .unwrap_or_else(|_| panic!("failed to unwrap Arc")) + .destroy() + .await + .unwrap(); + }); +} + +/// Test that dropping finish sender without sending is treated as an error. +pub(crate) fn test_sync_fails_when_finish_sender_dropped() +where + Arc>: Resolver, Digest = Digest>, + OpOf: Encode, + JournalOf: Contiguous, +{ + let executor = deterministic::Runner::default(); + executor.start(|mut context| async move { + let mut target_db = H::init_db(context.with_label("target")).await; + target_db = H::apply_ops(target_db, H::create_ops(10)).await; + let lower_bound = target_db.inactivity_floor_loc().await; + let upper_bound = target_db.bounds().await.end; + + let (finish_sender, finish_receiver) = mpsc::channel(1); + drop(finish_sender); + + let target_db = Arc::new(target_db); + let config = Config { + context: context.with_label("client"), + db_config: H::config(&context.next_u64().to_string(), &context), + fetch_batch_size: NZU64!(5), + target: Target { + root: H::sync_target_root(&target_db), + range: non_empty_range!(lower_bound, upper_bound), + }, + resolver: target_db.clone(), + apply_batch_size: 1024, + max_outstanding_requests: 1, + update_rx: None, + finish_rx: Some(finish_receiver), + reached_target_tx: None, + max_retained_roots: 1, + }; + + let result: Result = sync::sync(config).await; + assert!(matches!( + result, + Err(sync::Error::Engine(sync::EngineError::FinishChannelClosed)) + )); + + Arc::try_unwrap(target_db) + .unwrap_or_else(|_| panic!("failed to unwrap Arc")) + .destroy() + .await + .unwrap(); + }); +} + +/// Test that dropping reached-target receiver does not fail sync. +pub(crate) fn test_sync_allows_dropped_reached_target_receiver() +where + Arc>: Resolver, Digest = Digest>, + OpOf: Encode, + JournalOf: Contiguous, +{ + let executor = deterministic::Runner::default(); + executor.start(|mut context| async move { + let mut target_db = H::init_db(context.with_label("target")).await; + target_db = H::apply_ops(target_db, H::create_ops(10)).await; + let lower_bound = target_db.inactivity_floor_loc().await; + let upper_bound = target_db.bounds().await.end; + let verification_root = target_db.root(); + + let (reached_sender, reached_receiver) = mpsc::channel(1); + drop(reached_receiver); + + let target_db = Arc::new(target_db); + let config = Config { + context: context.with_label("client"), + db_config: H::config(&context.next_u64().to_string(), &context), + fetch_batch_size: NZU64!(5), + target: Target { + root: H::sync_target_root(&target_db), + range: non_empty_range!(lower_bound, upper_bound), + }, + resolver: target_db.clone(), + apply_batch_size: 1024, + max_outstanding_requests: 1, + update_rx: None, + finish_rx: None, + reached_target_tx: Some(reached_sender), + max_retained_roots: 1, + }; + + let synced_db: H::Db = sync::sync(config) + .await + .expect("sync should succeed when reached-target receiver is dropped"); + assert_eq!(synced_db.root(), verification_root); + assert_eq!(synced_db.bounds().await.end, upper_bound); + assert_eq!(synced_db.inactivity_floor_loc().await, lower_bound); + + synced_db.destroy().await.unwrap(); + Arc::try_unwrap(target_db) + .unwrap_or_else(|_| panic!("failed to unwrap Arc")) + .destroy() + .await + .unwrap(); + }); +} + /// Test that the client can handle target updates during sync execution. pub(crate) fn test_target_update_during_sync( initial_ops: usize, @@ -831,6 +1120,8 @@ pub(crate) fn test_target_update_during_sync( max_outstanding_requests: 10, apply_batch_size: 1024, update_rx: Some(update_receiver), + finish_rx: None, + reached_target_tx: None, max_retained_roots: 1, }; let mut client: Engine = Engine::new(config).await.unwrap(); @@ -938,6 +1229,8 @@ where apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; let synced_db: H::Db = sync::sync(config).await.unwrap(); @@ -1004,6 +1297,8 @@ where apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; let synced_db: H::Db = sync::sync(config).await.unwrap(); @@ -1357,6 +1652,8 @@ where apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; @@ -1678,6 +1975,26 @@ macro_rules! sync_tests_for_harness { super::test_target_update_on_done_client::<$harness>(); } + #[test_traced] + fn test_sync_waits_for_explicit_finish() { + super::test_sync_waits_for_explicit_finish::<$harness>(); + } + + #[test_traced] + fn test_sync_handles_early_finish_signal() { + super::test_sync_handles_early_finish_signal::<$harness>(); + } + + #[test_traced] + fn test_sync_fails_when_finish_sender_dropped() { + super::test_sync_fails_when_finish_sender_dropped::<$harness>(); + } + + #[test_traced] + fn test_sync_allows_dropped_reached_target_receiver() { + super::test_sync_allows_dropped_reached_target_receiver::<$harness>(); + } + #[rstest] #[case(1, 1)] #[case(1, 2)] diff --git a/storage/src/qmdb/current/sync/tests.rs b/storage/src/qmdb/current/sync/tests.rs index 768969807de..958de4a0332 100644 --- a/storage/src/qmdb/current/sync/tests.rs +++ b/storage/src/qmdb/current/sync/tests.rs @@ -426,6 +426,30 @@ macro_rules! current_sync_tests_for_harness { crate::qmdb::any::sync::tests::test_target_update_on_done_client::<$harness>(); } + #[test_traced] + fn test_sync_waits_for_explicit_finish() { + crate::qmdb::any::sync::tests::test_sync_waits_for_explicit_finish::<$harness>(); + } + + #[test_traced] + fn test_sync_handles_early_finish_signal() { + crate::qmdb::any::sync::tests::test_sync_handles_early_finish_signal::<$harness>(); + } + + #[test_traced] + fn test_sync_fails_when_finish_sender_dropped() { + crate::qmdb::any::sync::tests::test_sync_fails_when_finish_sender_dropped::< + $harness, + >(); + } + + #[test_traced] + fn test_sync_allows_dropped_reached_target_receiver() { + crate::qmdb::any::sync::tests::test_sync_allows_dropped_reached_target_receiver::< + $harness, + >(); + } + #[rstest] #[case(1, 1)] #[case(1, 2)] diff --git a/storage/src/qmdb/immutable/sync.rs b/storage/src/qmdb/immutable/sync.rs index acd85106931..c6b30f56105 100644 --- a/storage/src/qmdb/immutable/sync.rs +++ b/storage/src/qmdb/immutable/sync.rs @@ -282,6 +282,8 @@ mod tests { apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap(); @@ -361,6 +363,8 @@ mod tests { apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; let got_db: ImmutableSyncTest = sync::sync(config).await.unwrap(); @@ -411,6 +415,8 @@ mod tests { apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap(); @@ -496,6 +502,8 @@ mod tests { max_outstanding_requests: 10, apply_batch_size: 1024, update_rx: Some(update_receiver), + finish_rx: None, + reached_target_tx: None, max_retained_roots: 1, }; let mut client: Engine = Engine::new(config).await.unwrap(); @@ -585,6 +593,8 @@ mod tests { apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; let synced_db: ImmutableSyncTest = sync::sync(config).await.unwrap(); @@ -647,6 +657,8 @@ mod tests { apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap(); @@ -705,6 +717,8 @@ mod tests { apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: None, + finish_rx: None, + reached_target_tx: None, max_retained_roots: 8, }; let sync_db: ImmutableSyncTest = sync::sync(config).await.unwrap(); @@ -752,6 +766,8 @@ mod tests { apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), + finish_rx: None, + reached_target_tx: None, max_retained_roots: 1, }; let client: Engine = Engine::new(config).await.unwrap(); @@ -813,6 +829,8 @@ mod tests { apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), + finish_rx: None, + reached_target_tx: None, max_retained_roots: 1, }; let client: Engine = Engine::new(config).await.unwrap(); @@ -892,6 +910,8 @@ mod tests { apply_batch_size: 1024, max_outstanding_requests: 1, update_rx: Some(update_receiver), + finish_rx: None, + reached_target_tx: None, max_retained_roots: 1, }; @@ -951,6 +971,8 @@ mod tests { apply_batch_size: 1024, max_outstanding_requests: 10, update_rx: Some(update_receiver), + finish_rx: None, + reached_target_tx: None, max_retained_roots: 1, }; diff --git a/storage/src/qmdb/sync/engine.rs b/storage/src/qmdb/sync/engine.rs index bb760e452eb..7aaeafc195c 100644 --- a/storage/src/qmdb/sync/engine.rs +++ b/storage/src/qmdb/sync/engine.rs @@ -18,10 +18,17 @@ use commonware_cryptography::Digest; use commonware_macros::select; use commonware_runtime::Metrics as _; use commonware_utils::{ - channel::{fallible::OneshotExt as _, mpsc, oneshot}, + channel::{ + fallible::{AsyncFallibleExt, OneshotExt as _}, + mpsc, oneshot, + }, NZU64, }; -use futures::{future::Either, StreamExt}; +use futures::{ + future::{pending, Either}, + StreamExt, +}; +use mpsc::error::TryRecvError; use std::{ collections::{BTreeMap, HashMap, VecDeque}, fmt::Debug, @@ -49,6 +56,10 @@ enum Event { BatchReceived(IndexedFetchResult), /// The target update channel was closed UpdateChannelClosed, + /// A finish signal was received + FinishRequested, + /// The finish signal channel was closed + FinishChannelClosed, } /// Result from a fetch operation with its request ID and starting location. @@ -62,23 +73,41 @@ pub(super) struct IndexedFetchResult { pub result: Result, E>, } -/// Wait for the next synchronization event from either target updates or fetch results. -/// Returns `None` if the sync is stalled (there are no outstanding requests). +/// Wait for the next synchronization event. +/// Returns `None` when there are no outstanding requests and no channels to wait on. async fn wait_for_event( - update_receiver: &mut Option>>, + update_rx: &mut Option>>, + finish_rx: &mut Option>, outstanding_requests: &mut Requests, ) -> Option> { - let target_update_fut = update_receiver.as_mut().map_or_else( - || Either::Right(futures::future::pending()), + if outstanding_requests.len() == 0 && update_rx.is_none() && finish_rx.is_none() { + return None; + } + + let target_update_fut = update_rx.as_mut().map_or_else( + || Either::Right(pending()), |update_rx| Either::Left(update_rx.recv()), ); + let finish_fut = finish_rx.as_mut().map_or_else( + || Either::Right(pending()), + |finish_rx| Either::Left(finish_rx.recv()), + ); + let batch_result_fut = if outstanding_requests.len() == 0 { + Either::Right(pending()) + } else { + Either::Left(outstanding_requests.futures_mut().next()) + }; select! { + finish = finish_fut => finish.map_or_else( + || Some(Event::FinishChannelClosed), + |_| Some(Event::FinishRequested) + ), target = target_update_fut => target.map_or_else( || Some(Event::UpdateChannelClosed), |target| Some(Event::TargetUpdate(target)) ), - result = outstanding_requests.futures_mut().next() => { + result = batch_result_fut => { result.map(|fetch_result| Event::BatchReceived(fetch_result)) }, } @@ -107,6 +136,17 @@ where pub db_config: DB::Config, /// Channel for receiving sync target updates pub update_rx: Option>>, + /// Channel that requests sync completion once the current target is reached. + /// + /// When `None`, sync completes as soon as the target is reached. + pub finish_rx: Option>, + /// Channel used to notify an observer once the current target is reached. + /// The engine sends at most one notification for each target. + /// + /// When `reached_target_tx` is `Some(...)`, this receiver must be actively + /// drained by the observer. The engine awaits send capacity on this channel before + /// proceeding, so backpressure can pause progress at target. + pub reached_target_tx: Option>>, /// Maximum number of previous roots to retain for verifying in-flight /// requests after target updates. Set to 0 to disable (all retained /// requests will be re-fetched). @@ -174,7 +214,26 @@ where config: DB::Config, /// Optional receiver for target updates during sync - update_receiver: Option>>, + update_rx: Option>>, + + /// Channel that requests sync completion once the current target is reached. + /// + /// When `None`, sync completes as soon as the target is reached. + finish_rx: Option>, + + /// Channel used to notify an observer once the current target is reached. + /// The engine sends at most one notification for each target. + /// + /// When `reached_target_tx` is `Some(...)`, this receiver must be actively + /// drained by the observer. The engine awaits send capacity on this channel before + /// proceeding, so backpressure can pause progress at target. + reached_target_tx: Option>>, + + /// Whether explicit finish has been requested. + finish_requested: bool, + + /// Tracks whether the current target has already been reported as reached. + reached_current_target_reported: bool, } #[cfg(test)] @@ -228,7 +287,11 @@ where hasher: StandardHasher::::new(), context: config.context, config: config.db_config, - update_receiver: config.update_rx, + update_rx: config.update_rx, + finish_rx: config.finish_rx, + reached_target_tx: config.reached_target_tx, + finish_requested: false, + reached_current_target_reported: false, }; engine.schedule_requests().await?; Ok(engine) @@ -357,9 +420,59 @@ where } self.target = new_target; + self.reached_current_target_reported = false; Ok(self) } + /// Drain a pending explicit-finish signal without blocking. + /// + /// If a finish signal is present, the engine transitions into "finish requested" + /// mode via [`Self::accept_finish`]. If the finish channel is disconnected before + /// a finish request is observed, this returns [`EngineError::FinishChannelClosed`]. + fn drain_finish_requests(&mut self) -> Result<(), Error> { + let Some(finish_rx) = self.finish_rx.as_mut() else { + return Ok(()); + }; + match finish_rx.try_recv() { + Ok(()) => { + self.accept_finish(); + Ok(()) + } + Err(TryRecvError::Empty) => Ok(()), + Err(TryRecvError::Disconnected) => { + Err(SyncError::Engine(EngineError::FinishChannelClosed)) + } + } + } + + /// Mark that explicit finish has been requested and stop listening for more signals. + /// + /// This is a one-way transition for the current engine instance. Once set, the + /// engine may complete as soon as it is at a target (or the next time it reaches one). + fn accept_finish(&mut self) { + self.finish_requested = true; + self.finish_rx = None; + } + + /// Notify an observer that the current target has been reached. The notification is sent + /// at most once per target, guarded by `reached_current_target_reported`. + /// + /// This send awaits backpressure. When `reached_target_tx` is `Some(...)`, + /// the receiver is expected to consume notifications promptly so the engine + /// can keep making progress. If the receiver side is closed, we drop the + /// sender and continue syncing without further reached-target notifications. + async fn report_reached_target(&mut self) { + if self.reached_current_target_reported { + return; + } + if let Some(sender) = self.reached_target_tx.as_ref() { + if !sender.send_lossy(self.target.clone()).await { + self.reached_target_tx = None; + } + } + self.reached_current_target_reported = true; + } + /// Store a batch of fetched operations. If the input list is empty, this is a no-op. pub(crate) fn store_operations(&mut self, start_loc: Location, operations: Vec) { if operations.is_empty() { @@ -434,7 +547,7 @@ where } /// Check if sync is complete based on the current journal size and target - pub async fn is_complete(&self) -> Result> { + pub async fn is_at_target(&self) -> Result> { let journal_size = self.journal.size().await; let target_journal_size = self.target.range.end(); @@ -542,6 +655,37 @@ where Ok(()) } + /// Handle a sync event and return the next engine state. + async fn handle_event( + mut self, + event: Event, + ) -> Result, Error> { + match event { + Event::TargetUpdate(new_target) => { + validate_update(&self.target, &new_target)?; + + let mut updated_self = self.reset_for_target_update(new_target).await?; + updated_self.schedule_requests().await?; + Ok(NextStep::Continue(updated_self)) + } + Event::UpdateChannelClosed => { + self.update_rx = None; + Ok(NextStep::Continue(self)) + } + Event::FinishRequested => { + self.accept_finish(); + Ok(NextStep::Continue(self)) + } + Event::FinishChannelClosed => Err(SyncError::Engine(EngineError::FinishChannelClosed)), + Event::BatchReceived(fetch_result) => { + self.handle_fetch_result(fetch_result)?; + self.schedule_requests().await?; + self.apply_operations().await?; + Ok(NextStep::Continue(self)) + } + } + } + /// Execute one step of the synchronization process. /// /// This is the main coordination method that: @@ -553,8 +697,23 @@ where /// Returns `StepResult::Complete(database)` when sync is finished, or /// `StepResult::Continue(self)` when more work remains. pub(crate) async fn step(mut self) -> Result, Error> { + self.drain_finish_requests()?; + // Check if sync is complete - if self.is_complete().await? { + if self.is_at_target().await? { + self.report_reached_target().await; + + if self.finish_rx.is_some() && !self.finish_requested { + let event = wait_for_event( + &mut self.update_rx, + &mut self.finish_rx, + &mut self.outstanding_requests, + ) + .await + .ok_or(SyncError::Engine(EngineError::SyncStalled))?; + return self.handle_event(event).await; + } + self.journal.sync().await?; // Build the database from the completed sync @@ -582,38 +741,14 @@ where } // Wait for the next synchronization event - let event = wait_for_event(&mut self.update_receiver, &mut self.outstanding_requests) - .await - .ok_or(SyncError::Engine(EngineError::SyncStalled))?; - - match event { - Event::TargetUpdate(new_target) => { - // Validate and handle the target update - validate_update(&self.target, &new_target)?; - - let mut updated_self = self.reset_for_target_update(new_target).await?; - - // Schedule new requests for the updated target - updated_self.schedule_requests().await?; - - return Ok(NextStep::Continue(updated_self)); - } - Event::UpdateChannelClosed => { - self.update_receiver = None; - } - Event::BatchReceived(fetch_result) => { - // Process the fetch result - self.handle_fetch_result(fetch_result)?; - - // Request operations in the sync range - self.schedule_requests().await?; - - // Apply operations that are now contiguous with the current journal - self.apply_operations().await?; - } - } - - Ok(NextStep::Continue(self)) + let event = wait_for_event( + &mut self.update_rx, + &mut self.finish_rx, + &mut self.outstanding_requests, + ) + .await + .ok_or(SyncError::Engine(EngineError::SyncStalled))?; + self.handle_event(event).await } /// Run sync to completion, returning the final database when done. diff --git a/storage/src/qmdb/sync/error.rs b/storage/src/qmdb/sync/error.rs index 82d97e37f98..d9821afbc05 100644 --- a/storage/src/qmdb/sync/error.rs +++ b/storage/src/qmdb/sync/error.rs @@ -29,6 +29,9 @@ pub enum EngineError { /// Sync stalled - no pending fetches #[error("sync stalled - no pending fetches")] SyncStalled, + /// Sync finish signal channel closed before finish was requested. + #[error("sync finish signal channel closed before finish was requested")] + FinishChannelClosed, /// Error extracting pinned nodes #[error("error extracting pinned nodes: {0}")] PinnedNodes(String),