Skip to content

Commit 8d8b6e6

Browse files
address review
1 parent 6e89780 commit 8d8b6e6

9 files changed

Lines changed: 63 additions & 74 deletions

File tree

examples/sync/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ curl http://localhost:9090/metrics
117117
4. **Sync Iteration Loop**: For each sync iteration:
118118
- **Database Initialization**: Client opens a new database (or reopens existing one)
119119
- **Connection**: Client establishes connection to server
120-
- **Initial Sync Target**: Client requests server metadata to determine sync target (inactivity floor, size, and root digest)
120+
- **Initial Sync Target**: Client requests server metadata to determine sync target (sync boundary, size, and root digest)
121121
- **Dynamic Target Updates**: Client periodically requests target updates during sync to handle new operations added by the server
122122
- **Sync Completion**: Client continues until all operations are applied and state matches server's target
123123
- **Database Closure**: Client closes the database to prepare for next iteration

storage/src/qmdb/any/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ pub(crate) mod test {
289289
db.apply_batch(merkleized).await.unwrap();
290290
}
291291
db.commit().await.unwrap();
292-
db.prune(db.inactivity_floor_loc().await).await.unwrap();
292+
db.prune(db.sync_boundary().await).await.unwrap();
293293
let root = db.root();
294294
let op_count = db.size().await;
295295
let inactivity_floor_loc = db.inactivity_floor_loc().await;
@@ -642,7 +642,7 @@ pub(crate) mod test {
642642
}
643643
// Commit + sync with pruning raises inactivity floor.
644644
db.sync().await.unwrap();
645-
db.prune(db.inactivity_floor_loc().await).await.unwrap();
645+
db.prune(db.sync_boundary().await).await.unwrap();
646646

647647
// Drop & reopen and ensure state matches.
648648
let root = db.root();
@@ -2036,7 +2036,7 @@ pub(crate) mod test {
20362036
.collect();
20372037
commit_writes(&mut db, updates, None).await;
20382038

2039-
db.prune(db.inactivity_floor_loc()).await.unwrap();
2039+
db.prune(db.sync_boundary()).await.unwrap();
20402040
let bounds = db.bounds().await;
20412041
if bounds.start > first_range.start {
20422042
break;

storage/src/qmdb/any/ordered/fixed.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ pub(crate) mod test {
427427

428428
// Test that apply_batch + sync w/ pruning will raise the activity floor.
429429
db.sync().await.unwrap();
430-
db.prune(db.inactivity_floor_loc()).await.unwrap();
430+
db.prune(db.sync_boundary()).await.unwrap();
431431
assert_eq!(db.snapshot.items(), 857);
432432

433433
// Drop & reopen the db, making sure it has exactly the same state.
@@ -494,7 +494,7 @@ pub(crate) mod test {
494494
db.apply_batch(merkleized).await.unwrap();
495495
db.commit().await.unwrap();
496496
}
497-
db.prune(db.inactivity_floor_loc()).await.unwrap();
497+
db.prune(db.sync_boundary()).await.unwrap();
498498
let root = db.root();
499499
let op_count = db.bounds().await.end;
500500
let inactivity_floor_loc = db.inactivity_floor_loc();

storage/src/qmdb/any/ordered/mod.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -335,10 +335,7 @@ mod test {
335335
reopen_db: impl Fn(Context) -> Pin<Box<dyn Future<Output = D> + Send>>,
336336
) {
337337
assert!(db.get_metadata().await.unwrap().is_none());
338-
assert!(matches!(
339-
db.prune(db.inactivity_floor_loc().await).await,
340-
Ok(())
341-
));
338+
assert!(matches!(db.prune(db.sync_boundary().await).await, Ok(())));
342339

343340
// Make sure closing/reopening gets us back to the same state, even after adding an
344341
// uncommitted op, and even without a clean shutdown.
@@ -361,10 +358,7 @@ mod test {
361358
assert_eq!(range.start, Location::new(1));
362359
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
363360
let root = db.root();
364-
assert!(matches!(
365-
db.prune(db.inactivity_floor_loc().await).await,
366-
Ok(())
367-
));
361+
assert!(matches!(db.prune(db.sync_boundary().await).await, Ok(())));
368362

369363
// Re-opening the DB without a clean shutdown should still recover the correct state.
370364
let mut db = reopen_db(context.with_label("reopen2")).await;
@@ -577,7 +571,7 @@ mod test {
577571

578572
// Pruning inactive ops should not affect current state or root.
579573
let root = db.root();
580-
db.prune(db.inactivity_floor_loc().await).await.unwrap();
574+
db.prune(db.sync_boundary().await).await.unwrap();
581575
assert_eq!(db.root(), root);
582576

583577
db.destroy().await.unwrap();

storage/src/qmdb/any/sync/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub(crate) mod tests;
4949
/// Returns whether persisted local state already matches the requested sync target.
5050
///
5151
/// Shared across [crate::qmdb::any] and [crate::qmdb::current] sync because both
52-
/// build on the same operations-MMR layout and share the same merkle partition.
52+
/// build on the same operations-tree layout and share the same merkle partition.
5353
/// Verifies only that the persisted tree size and root match; the merkle pruning
5454
/// boundary is not re-checked. Callers must keep their local pruning point at or
5555
/// below `target.range.start()` or a later
@@ -71,8 +71,7 @@ where
7171
&hasher,
7272
)
7373
.await;
74-
// Size + root match implies the last CommitFloor op (and therefore the
75-
// size + root identify a unique state, so if they match the target's we can reuse
74+
// Size + root identify a unique state, so if they match the target's we can reuse
7675
// the persisted DB without fetching boundary pins.
7776
matches!(
7877
peek,

storage/src/qmdb/any/sync/tests.rs

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,6 @@ pub(crate) type ConfigOf<H> = <DbOf<H> as qmdb::sync::Database>::Config;
5353
/// Type alias for the journal type of a harness.
5454
pub(crate) type JournalOf<H> = <DbOf<H> as qmdb::sync::Database>::Journal;
5555

56-
/// Type alias for the merkle family used by a harness.
57-
pub(crate) type FamilyOf<H> = <DbOf<H> as qmdb::sync::Database>::Family;
58-
5956
/// Trait for cleanup operations in tests.
6057
pub(crate) trait Destructible {
6158
type Family: merkle::Family;
@@ -136,7 +133,7 @@ pub(crate) trait SyncTestHarness: Sized + 'static {
136133
/// Test that empty operations arrays fetched do not cause panics when stored and applied
137134
pub(crate) fn test_sync_empty_operations_no_panic<H: SyncTestHarness>()
138135
where
139-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
136+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
140137
OpOf<H>: Encode,
141138
JournalOf<H>: Contiguous,
142139
{
@@ -181,14 +178,14 @@ where
181178
/// Test that resolver failure is handled correctly
182179
pub(crate) fn test_sync_resolver_fails<H: SyncTestHarness>()
183180
where
184-
resolver::tests::FailResolver<FamilyOf<H>, OpOf<H>, Digest>:
185-
Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
181+
resolver::tests::FailResolver<H::Family, OpOf<H>, Digest>:
182+
Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
186183
OpOf<H>: Encode,
187184
JournalOf<H>: Contiguous,
188185
{
189186
let executor = deterministic::Runner::default();
190187
executor.start(|mut context| async move {
191-
let resolver = resolver::tests::FailResolver::<FamilyOf<H>, OpOf<H>, Digest>::new();
188+
let resolver = resolver::tests::FailResolver::<H::Family, OpOf<H>, Digest>::new();
192189
let target_root = Digest::from([0; 32]);
193190

194191
let db_config = H::config(&context.next_u64().to_string(), &context);
@@ -217,7 +214,7 @@ where
217214
/// Test basic sync functionality with various batch sizes
218215
pub(crate) fn test_sync<H: SyncTestHarness>(target_db_ops: usize, fetch_batch_size: NonZeroU64)
219216
where
220-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
217+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
221218
OpOf<H>: Encode,
222219
JournalOf<H>: Contiguous,
223220
{
@@ -300,8 +297,8 @@ where
300297
/// Test syncing to a subset of the target database (target has additional ops beyond sync range)
301298
pub(crate) fn test_sync_subset_of_target_database<H: SyncTestHarness>(target_db_ops: usize)
302299
where
303-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
304-
OpOf<H>: Encode + Clone + OperationTrait<FamilyOf<H>, Key = Digest>,
300+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
301+
OpOf<H>: Encode + Clone + OperationTrait<H::Family, Key = Digest>,
305302
JournalOf<H>: Contiguous,
306303
{
307304
let executor = deterministic::Runner::default();
@@ -366,8 +363,8 @@ where
366363
/// Tests the scenario where sync_db already has partial data and needs to sync additional ops.
367364
pub(crate) fn test_sync_use_existing_db_partial_match<H: SyncTestHarness>(original_ops: usize)
368365
where
369-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
370-
OpOf<H>: Encode + Clone + OperationTrait<FamilyOf<H>, Key = Digest>,
366+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
367+
OpOf<H>: Encode + Clone + OperationTrait<H::Family, Key = Digest>,
371368
JournalOf<H>: Contiguous,
372369
{
373370
let executor = deterministic::Runner::default();
@@ -460,9 +457,9 @@ where
460457
/// Uses FailResolver to verify that no network requests are made since data already exists.
461458
pub(crate) fn test_sync_use_existing_db_exact_match<H: SyncTestHarness>(num_ops: usize)
462459
where
463-
resolver::tests::FailResolver<FamilyOf<H>, OpOf<H>, Digest>:
464-
Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
465-
OpOf<H>: Encode + Clone + OperationTrait<FamilyOf<H>, Key = Digest>,
460+
resolver::tests::FailResolver<H::Family, OpOf<H>, Digest>:
461+
Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
462+
OpOf<H>: Encode + Clone + OperationTrait<H::Family, Key = Digest>,
466463
JournalOf<H>: Contiguous,
467464
{
468465
let executor = deterministic::Runner::default();
@@ -501,7 +498,7 @@ where
501498
// sync_db should never ask the resolver for operations
502499
// because it is already complete. Use a resolver that always fails
503500
// to ensure that it's not being used.
504-
let resolver = resolver::tests::FailResolver::<FamilyOf<H>, OpOf<H>, Digest>::new();
501+
let resolver = resolver::tests::FailResolver::<H::Family, OpOf<H>, Digest>::new();
505502
let config = Config {
506503
db_config: sync_config, // Use same config to access same partitions
507504
fetch_batch_size: NZU64!(10),
@@ -546,7 +543,7 @@ where
546543
/// Test that the client fails to sync if the lower bound is decreased via target update.
547544
pub(crate) fn test_target_update_lower_bound_decrease<H: SyncTestHarness>()
548545
where
549-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
546+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
550547
OpOf<H>: Encode,
551548
JournalOf<H>: Contiguous,
552549
{
@@ -621,7 +618,7 @@ where
621618
/// Test that the client fails to sync if the upper bound is decreased via target update.
622619
pub(crate) fn test_target_update_upper_bound_decrease<H: SyncTestHarness>()
623620
where
624-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
621+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
625622
OpOf<H>: Encode,
626623
JournalOf<H>: Contiguous,
627624
{
@@ -690,7 +687,7 @@ where
690687
/// Test that the client succeeds when bounds are updated (increased).
691688
pub(crate) fn test_target_update_bounds_increase<H: SyncTestHarness>()
692689
where
693-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
690+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
694691
OpOf<H>: Encode + Clone,
695692
JournalOf<H>: Contiguous,
696693
{
@@ -775,7 +772,7 @@ where
775772
/// Test that target updates can be sent even after the client is done (no panic).
776773
pub(crate) fn test_target_update_on_done_client<H: SyncTestHarness>()
777774
where
778-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
775+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
779776
OpOf<H>: Encode,
780777
JournalOf<H>: Contiguous,
781778
{
@@ -844,7 +841,7 @@ where
844841
/// Test that explicit finish control waits for a finish signal even after reaching target.
845842
pub(crate) fn test_sync_waits_for_explicit_finish<H: SyncTestHarness>()
846843
where
847-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
844+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
848845
OpOf<H>: Encode,
849846
JournalOf<H>: Contiguous,
850847
{
@@ -947,7 +944,7 @@ where
947944
/// Test that a finish signal received before target completion still allows full sync.
948945
pub(crate) fn test_sync_handles_early_finish_signal<H: SyncTestHarness>()
949946
where
950-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
947+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
951948
OpOf<H>: Encode,
952949
JournalOf<H>: Contiguous,
953950
{
@@ -1010,7 +1007,7 @@ where
10101007
/// Test that dropping finish sender without sending is treated as an error.
10111008
pub(crate) fn test_sync_fails_when_finish_sender_dropped<H: SyncTestHarness>()
10121009
where
1013-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
1010+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
10141011
OpOf<H>: Encode,
10151012
JournalOf<H>: Contiguous,
10161013
{
@@ -1059,7 +1056,7 @@ where
10591056
/// Test that dropping reached-target receiver does not fail sync.
10601057
pub(crate) fn test_sync_allows_dropped_reached_target_receiver<H: SyncTestHarness>()
10611058
where
1062-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
1059+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
10631060
OpOf<H>: Encode,
10641061
JournalOf<H>: Contiguous,
10651062
{
@@ -1113,8 +1110,7 @@ pub(crate) fn test_target_update_during_sync<H: SyncTestHarness>(
11131110
initial_ops: usize,
11141111
additional_ops: usize,
11151112
) where
1116-
Arc<AsyncRwLock<Option<DbOf<H>>>>:
1117-
Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
1113+
Arc<AsyncRwLock<Option<DbOf<H>>>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
11181114
OpOf<H>: Encode + Clone,
11191115
JournalOf<H>: Contiguous,
11201116
{
@@ -1225,7 +1221,7 @@ pub(crate) fn test_target_update_during_sync<H: SyncTestHarness>(
12251221
/// Test demonstrating that a synced database can be reopened and retain its state.
12261222
pub(crate) fn test_sync_database_persistence<H: SyncTestHarness>()
12271223
where
1228-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
1224+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
12291225
OpOf<H>: Encode + Clone,
12301226
JournalOf<H>: Contiguous,
12311227
{
@@ -1299,7 +1295,7 @@ where
12991295
/// Test post-sync usability: after syncing, the database supports normal operations.
13001296
pub(crate) fn test_sync_post_sync_usability<H: SyncTestHarness>()
13011297
where
1302-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
1298+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
13031299
OpOf<H>: Encode,
13041300
JournalOf<H>: Contiguous,
13051301
{
@@ -1639,7 +1635,7 @@ where
16391635
/// succeeds on retry when the resolver returns correct data.
16401636
pub(crate) fn test_sync_retries_bad_pinned_nodes<H: SyncTestHarness>()
16411637
where
1642-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
1638+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
16431639
OpOf<H>: Encode,
16441640
JournalOf<H>: Contiguous,
16451641
{
@@ -1776,7 +1772,7 @@ impl<R: Resolver<Digest = Digest>> Resolver for ReplayFreshBoundaryResolver<R> {
17761772
/// boundary retry is still outstanding.
17771773
pub(crate) fn test_sync_waits_for_boundary_retry_after_target_update<H: SyncTestHarness>()
17781774
where
1779-
Arc<DbOf<H>>: Resolver<Family = FamilyOf<H>, Op = OpOf<H>, Digest = Digest>,
1775+
Arc<DbOf<H>>: Resolver<Family = H::Family, Op = OpOf<H>, Digest = Digest>,
17801776
OpOf<H>: Encode,
17811777
JournalOf<H>: Contiguous,
17821778
{
@@ -2720,6 +2716,11 @@ macro_rules! sync_tests_for_harness {
27202716
fn test_sync_retries_bad_pinned_nodes() {
27212717
super::test_sync_retries_bad_pinned_nodes::<$harness>();
27222718
}
2719+
2720+
#[test_traced]
2721+
fn test_sync_waits_for_boundary_retry_after_target_update() {
2722+
super::test_sync_waits_for_boundary_retry_after_target_update::<$harness>();
2723+
}
27232724
}
27242725
};
27252726
}
@@ -2732,11 +2733,6 @@ macro_rules! from_sync_result_tests_for_harness {
27322733
use super::harnesses;
27332734
use commonware_macros::test_traced;
27342735

2735-
#[test_traced]
2736-
fn test_sync_waits_for_boundary_retry_after_target_update() {
2737-
super::test_sync_waits_for_boundary_retry_after_target_update::<$harness>();
2738-
}
2739-
27402736
#[test_traced("WARN")]
27412737
fn test_from_sync_result_empty_to_empty() {
27422738
super::test_from_sync_result_empty_to_empty::<$harness>();

0 commit comments

Comments
 (0)