Skip to content

Commit eb2a35f

Browse files
committed
[storage/qmdb] Coordinate state sync finalization
1 parent 5958ad7 commit eb2a35f

7 files changed

Lines changed: 339 additions & 38 deletions

File tree

examples/sync/src/bin/client.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ where
150150
apply_batch_size: 1024,
151151
max_outstanding_requests: config.max_outstanding_requests,
152152
update_rx: Some(update_receiver),
153+
finish_rx: None,
154+
reached_target_tx: None,
153155
max_retained_roots: 8,
154156
};
155157

@@ -214,6 +216,8 @@ where
214216
apply_batch_size: 1024,
215217
max_outstanding_requests: config.max_outstanding_requests,
216218
update_rx: Some(update_receiver),
219+
finish_rx: None,
220+
reached_target_tx: None,
217221
max_retained_roots: 8,
218222
};
219223

@@ -275,6 +279,8 @@ where
275279
apply_batch_size: 1024,
276280
max_outstanding_requests: config.max_outstanding_requests,
277281
update_rx: Some(update_receiver),
282+
finish_rx: None,
283+
reached_target_tx: None,
278284
max_retained_roots: 8,
279285
};
280286

storage/fuzz/fuzz_targets/qmdb_any_fixed_sync.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ async fn test_sync<
122122
let sync_config: sync::engine::Config<FixedDb, R> = sync::engine::Config {
123123
context: context.with_label("sync").with_attribute("id", sync_id),
124124
update_rx: None,
125+
finish_rx: None,
126+
reached_target_tx: None,
125127
db_config,
126128
fetch_batch_size: NZU64!((fetch_batch_size % 100) + 1),
127129
target,

storage/src/qmdb/any/sync/tests.rs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@ use crate::{
2222
};
2323
use commonware_codec::Encode;
2424
use commonware_cryptography::sha256::Digest;
25+
use commonware_macros::select;
2526
use commonware_runtime::{deterministic, BufferPooler, Metrics, Runner as _};
2627
use commonware_utils::{
2728
channel::{mpsc, oneshot},
2829
non_empty_range,
2930
sync::AsyncRwLock,
3031
NZU64,
3132
};
33+
use futures::{pin_mut, FutureExt};
3234
use rand::RngCore as _;
3335
use std::{num::NonZeroU64, sync::Arc};
3436

@@ -132,6 +134,8 @@ where
132134
apply_batch_size: 1024,
133135
max_outstanding_requests: 1,
134136
update_rx: None,
137+
finish_rx: None,
138+
reached_target_tx: None,
135139
max_retained_roots: 0,
136140
};
137141

@@ -174,6 +178,8 @@ where
174178
fetch_batch_size: NZU64!(2),
175179
db_config,
176180
update_rx: None,
181+
finish_rx: None,
182+
reached_target_tx: None,
177183
max_retained_roots: 0,
178184
};
179185

@@ -223,6 +229,8 @@ where
223229
apply_batch_size: 1024,
224230
max_outstanding_requests: 1,
225231
update_rx: None,
232+
finish_rx: None,
233+
reached_target_tx: None,
226234
max_retained_roots: 0,
227235
};
228236

@@ -304,6 +312,8 @@ where
304312
apply_batch_size: 1024,
305313
max_outstanding_requests: 1,
306314
update_rx: None,
315+
finish_rx: None,
316+
reached_target_tx: None,
307317
max_retained_roots: 0,
308318
};
309319

@@ -378,6 +388,8 @@ where
378388
apply_batch_size: 1024,
379389
max_outstanding_requests: 1,
380390
update_rx: None,
391+
finish_rx: None,
392+
reached_target_tx: None,
381393
max_retained_roots: 0,
382394
};
383395
let synced_db: H::Db = sync::sync(config).await.unwrap();
@@ -479,6 +491,8 @@ where
479491
apply_batch_size: 1024,
480492
max_outstanding_requests: 1,
481493
update_rx: None,
494+
finish_rx: None,
495+
reached_target_tx: None,
482496
max_retained_roots: 0,
483497
};
484498
let synced_db: H::Db = sync::sync(config).await.unwrap();
@@ -541,6 +555,8 @@ where
541555
apply_batch_size: 1024,
542556
max_outstanding_requests: 10,
543557
update_rx: Some(update_receiver),
558+
finish_rx: None,
559+
reached_target_tx: None,
544560
max_retained_roots: 1,
545561
};
546562
let client: Engine<H::Db, _> = Engine::new(config).await.unwrap();
@@ -608,6 +624,8 @@ where
608624
apply_batch_size: 1024,
609625
max_outstanding_requests: 10,
610626
update_rx: Some(update_receiver),
627+
finish_rx: None,
628+
reached_target_tx: None,
611629
max_retained_roots: 1,
612630
};
613631
let client: Engine<H::Db, _> = Engine::new(config).await.unwrap();
@@ -689,6 +707,8 @@ where
689707
apply_batch_size: 1024,
690708
max_outstanding_requests: 1,
691709
update_rx: Some(update_receiver),
710+
finish_rx: None,
711+
reached_target_tx: None,
692712
max_retained_roots: 1,
693713
};
694714

@@ -759,6 +779,8 @@ where
759779
apply_batch_size: 1024,
760780
max_outstanding_requests: 10,
761781
update_rx: Some(update_receiver),
782+
finish_rx: None,
783+
reached_target_tx: None,
762784
max_retained_roots: 1,
763785
};
764786

@@ -790,6 +812,109 @@ where
790812
});
791813
}
792814

815+
/// Test that explicit finish control waits for a finish signal even after reaching target.
816+
pub(crate) fn test_sync_waits_for_explicit_finish<H: SyncTestHarness>()
817+
where
818+
Arc<DbOf<H>>: Resolver<Op = OpOf<H>, Digest = Digest>,
819+
OpOf<H>: Encode,
820+
JournalOf<H>: Contiguous,
821+
{
822+
let executor = deterministic::Runner::default();
823+
executor.start(|mut context| async move {
824+
let mut target_db = H::init_db(context.with_label("target")).await;
825+
target_db = H::apply_ops(target_db, H::create_ops(10)).await;
826+
let initial_target = Target {
827+
root: H::sync_target_root(&target_db),
828+
range: non_empty_range!(
829+
target_db.inactivity_floor_loc().await,
830+
target_db.bounds().await.end
831+
),
832+
};
833+
834+
target_db = H::apply_ops(target_db, H::create_ops_seeded(5, 1)).await;
835+
let updated_lower_bound = target_db.inactivity_floor_loc().await;
836+
let updated_upper_bound = target_db.bounds().await.end;
837+
let updated_target = Target {
838+
root: H::sync_target_root(&target_db),
839+
range: non_empty_range!(updated_lower_bound, updated_upper_bound),
840+
};
841+
let updated_verification_root = target_db.root();
842+
843+
let (update_sender, update_receiver) = mpsc::channel(1);
844+
let (finish_sender, finish_receiver) = mpsc::channel(1);
845+
let (reached_sender, mut reached_receiver) = mpsc::channel(1);
846+
let target_db = Arc::new(target_db);
847+
let config = Config {
848+
context: context.with_label("client"),
849+
db_config: H::config(&context.next_u64().to_string(), &context),
850+
fetch_batch_size: NZU64!(10),
851+
target: initial_target.clone(),
852+
resolver: target_db.clone(),
853+
apply_batch_size: 1024,
854+
max_outstanding_requests: 1,
855+
update_rx: Some(update_receiver),
856+
finish_rx: Some(finish_receiver),
857+
reached_target_tx: Some(reached_sender),
858+
max_retained_roots: 0,
859+
};
860+
861+
let sync_handle = sync::sync(config);
862+
pin_mut!(sync_handle);
863+
864+
select! {
865+
_ = sync_handle.as_mut() => {
866+
panic!("sync completed before explicit finish signal");
867+
},
868+
reached = reached_receiver.recv() => {
869+
let reached = reached.expect("engine should report reached-target before finish");
870+
assert_eq!(reached, initial_target);
871+
}
872+
}
873+
assert!(
874+
sync_handle.as_mut().now_or_never().is_none(),
875+
"sync must wait for explicit finish signal after reaching target"
876+
);
877+
878+
update_sender
879+
.send(updated_target.clone())
880+
.await
881+
.expect("target update channel should be open");
882+
883+
select! {
884+
_ = sync_handle.as_mut() => {
885+
panic!("sync completed before explicit finish signal for updated target");
886+
},
887+
reached = reached_receiver.recv() => {
888+
let reached = reached.expect("engine should report updated target before finish");
889+
assert_eq!(reached, updated_target);
890+
}
891+
}
892+
assert!(
893+
sync_handle.as_mut().now_or_never().is_none(),
894+
"sync must still wait for explicit finish signal after updated target is reached"
895+
);
896+
897+
finish_sender
898+
.send(())
899+
.await
900+
.expect("finish signal channel should be open");
901+
902+
let synced_db: H::Db = sync_handle
903+
.await
904+
.expect("sync should succeed after finish signal");
905+
assert_eq!(synced_db.root(), updated_verification_root);
906+
assert_eq!(synced_db.bounds().await.end, updated_upper_bound);
907+
assert_eq!(synced_db.inactivity_floor_loc().await, updated_lower_bound);
908+
909+
synced_db.destroy().await.unwrap();
910+
Arc::try_unwrap(target_db)
911+
.unwrap_or_else(|_| panic!("failed to unwrap Arc"))
912+
.destroy()
913+
.await
914+
.unwrap();
915+
});
916+
}
917+
793918
/// Test that the client can handle target updates during sync execution.
794919
pub(crate) fn test_target_update_during_sync<H: SyncTestHarness>(
795920
initial_ops: usize,
@@ -831,6 +956,8 @@ pub(crate) fn test_target_update_during_sync<H: SyncTestHarness>(
831956
max_outstanding_requests: 10,
832957
apply_batch_size: 1024,
833958
update_rx: Some(update_receiver),
959+
finish_rx: None,
960+
reached_target_tx: None,
834961
max_retained_roots: 1,
835962
};
836963
let mut client: Engine<H::Db, _> = Engine::new(config).await.unwrap();
@@ -938,6 +1065,8 @@ where
9381065
apply_batch_size: 1024,
9391066
max_outstanding_requests: 1,
9401067
update_rx: None,
1068+
finish_rx: None,
1069+
reached_target_tx: None,
9411070
max_retained_roots: 0,
9421071
};
9431072
let synced_db: H::Db = sync::sync(config).await.unwrap();
@@ -1004,6 +1133,8 @@ where
10041133
apply_batch_size: 1024,
10051134
max_outstanding_requests: 1,
10061135
update_rx: None,
1136+
finish_rx: None,
1137+
reached_target_tx: None,
10071138
max_retained_roots: 0,
10081139
};
10091140
let synced_db: H::Db = sync::sync(config).await.unwrap();
@@ -1357,6 +1488,8 @@ where
13571488
apply_batch_size: 1024,
13581489
max_outstanding_requests: 1,
13591490
update_rx: None,
1491+
finish_rx: None,
1492+
reached_target_tx: None,
13601493
max_retained_roots: 0,
13611494
};
13621495

@@ -1678,6 +1811,11 @@ macro_rules! sync_tests_for_harness {
16781811
super::test_target_update_on_done_client::<$harness>();
16791812
}
16801813

1814+
#[test_traced]
1815+
fn test_sync_waits_for_explicit_finish() {
1816+
super::test_sync_waits_for_explicit_finish::<$harness>();
1817+
}
1818+
16811819
#[rstest]
16821820
#[case(1, 1)]
16831821
#[case(1, 2)]

storage/src/qmdb/current/sync/tests.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,11 @@ macro_rules! current_sync_tests_for_harness {
426426
crate::qmdb::any::sync::tests::test_target_update_on_done_client::<$harness>();
427427
}
428428

429+
#[test_traced]
430+
fn test_sync_waits_for_explicit_finish() {
431+
crate::qmdb::any::sync::tests::test_sync_waits_for_explicit_finish::<$harness>();
432+
}
433+
429434
#[rstest]
430435
#[case(1, 1)]
431436
#[case(1, 2)]

0 commit comments

Comments
 (0)