@@ -915,6 +915,170 @@ where
915915 } ) ;
916916}
917917
918+ /// Test that a finish signal received before target completion still allows full sync.
919+ pub ( crate ) fn test_sync_handles_early_finish_signal < H : SyncTestHarness > ( )
920+ where
921+ Arc < DbOf < H > > : Resolver < Op = OpOf < H > , Digest = Digest > ,
922+ OpOf < H > : Encode ,
923+ JournalOf < H > : Contiguous ,
924+ {
925+ let executor = deterministic:: Runner :: default ( ) ;
926+ executor. start ( |mut context| async move {
927+ let mut target_db = H :: init_db ( context. with_label ( "target" ) ) . await ;
928+ target_db = H :: apply_ops ( target_db, H :: create_ops ( 30 ) ) . await ;
929+ let lower_bound = target_db. inactivity_floor_loc ( ) . await ;
930+ let upper_bound = target_db. bounds ( ) . await . end ;
931+ let target = Target {
932+ root : H :: sync_target_root ( & target_db) ,
933+ range : non_empty_range ! ( lower_bound, upper_bound) ,
934+ } ;
935+ let verification_root = target_db. root ( ) ;
936+
937+ let ( finish_sender, finish_receiver) = mpsc:: channel ( 1 ) ;
938+ let ( reached_sender, mut reached_receiver) = mpsc:: channel ( 1 ) ;
939+ finish_sender
940+ . send ( ( ) )
941+ . await
942+ . expect ( "finish signal channel should be open" ) ;
943+
944+ let target_db = Arc :: new ( target_db) ;
945+ let config = Config {
946+ context : context. with_label ( "client" ) ,
947+ db_config : H :: config ( & context. next_u64 ( ) . to_string ( ) , & context) ,
948+ fetch_batch_size : NZU64 ! ( 3 ) ,
949+ target : target. clone ( ) ,
950+ resolver : target_db. clone ( ) ,
951+ apply_batch_size : 1024 ,
952+ max_outstanding_requests : 1 ,
953+ update_rx : None ,
954+ finish_rx : Some ( finish_receiver) ,
955+ reached_target_tx : Some ( reached_sender) ,
956+ max_retained_roots : 1 ,
957+ } ;
958+
959+ let synced_db: H :: Db = sync:: sync ( config)
960+ . await
961+ . expect ( "sync should complete after early finish signal" ) ;
962+ let reached = reached_receiver
963+ . recv ( )
964+ . await
965+ . expect ( "engine should report reached-target" ) ;
966+
967+ assert_eq ! ( reached, target) ;
968+ assert_eq ! ( synced_db. root( ) , verification_root) ;
969+ assert_eq ! ( synced_db. bounds( ) . await . end, upper_bound) ;
970+ assert_eq ! ( synced_db. inactivity_floor_loc( ) . await , lower_bound) ;
971+
972+ synced_db. destroy ( ) . await . unwrap ( ) ;
973+ Arc :: try_unwrap ( target_db)
974+ . unwrap_or_else ( |_| panic ! ( "failed to unwrap Arc" ) )
975+ . destroy ( )
976+ . await
977+ . unwrap ( ) ;
978+ } ) ;
979+ }
980+
981+ /// Test that dropping finish sender without sending is treated as an error.
982+ pub ( crate ) fn test_sync_fails_when_finish_sender_dropped < H : SyncTestHarness > ( )
983+ where
984+ Arc < DbOf < H > > : Resolver < Op = OpOf < H > , Digest = Digest > ,
985+ OpOf < H > : Encode ,
986+ JournalOf < H > : Contiguous ,
987+ {
988+ let executor = deterministic:: Runner :: default ( ) ;
989+ executor. start ( |mut context| async move {
990+ let mut target_db = H :: init_db ( context. with_label ( "target" ) ) . await ;
991+ target_db = H :: apply_ops ( target_db, H :: create_ops ( 10 ) ) . await ;
992+ let lower_bound = target_db. inactivity_floor_loc ( ) . await ;
993+ let upper_bound = target_db. bounds ( ) . await . end ;
994+
995+ let ( finish_sender, finish_receiver) = mpsc:: channel ( 1 ) ;
996+ drop ( finish_sender) ;
997+
998+ let target_db = Arc :: new ( target_db) ;
999+ let config = Config {
1000+ context : context. with_label ( "client" ) ,
1001+ db_config : H :: config ( & context. next_u64 ( ) . to_string ( ) , & context) ,
1002+ fetch_batch_size : NZU64 ! ( 5 ) ,
1003+ target : Target {
1004+ root : H :: sync_target_root ( & target_db) ,
1005+ range : non_empty_range ! ( lower_bound, upper_bound) ,
1006+ } ,
1007+ resolver : target_db. clone ( ) ,
1008+ apply_batch_size : 1024 ,
1009+ max_outstanding_requests : 1 ,
1010+ update_rx : None ,
1011+ finish_rx : Some ( finish_receiver) ,
1012+ reached_target_tx : None ,
1013+ max_retained_roots : 1 ,
1014+ } ;
1015+
1016+ let result: Result < H :: Db , _ > = sync:: sync ( config) . await ;
1017+ assert ! ( matches!(
1018+ result,
1019+ Err ( sync:: Error :: Engine ( sync:: EngineError :: FinishChannelClosed ) )
1020+ ) ) ;
1021+
1022+ Arc :: try_unwrap ( target_db)
1023+ . unwrap_or_else ( |_| panic ! ( "failed to unwrap Arc" ) )
1024+ . destroy ( )
1025+ . await
1026+ . unwrap ( ) ;
1027+ } ) ;
1028+ }
1029+
1030+ /// Test that dropping reached-target receiver does not fail sync.
1031+ pub ( crate ) fn test_sync_allows_dropped_reached_target_receiver < H : SyncTestHarness > ( )
1032+ where
1033+ Arc < DbOf < H > > : Resolver < Op = OpOf < H > , Digest = Digest > ,
1034+ OpOf < H > : Encode ,
1035+ JournalOf < H > : Contiguous ,
1036+ {
1037+ let executor = deterministic:: Runner :: default ( ) ;
1038+ executor. start ( |mut context| async move {
1039+ let mut target_db = H :: init_db ( context. with_label ( "target" ) ) . await ;
1040+ target_db = H :: apply_ops ( target_db, H :: create_ops ( 10 ) ) . await ;
1041+ let lower_bound = target_db. inactivity_floor_loc ( ) . await ;
1042+ let upper_bound = target_db. bounds ( ) . await . end ;
1043+ let verification_root = target_db. root ( ) ;
1044+
1045+ let ( reached_sender, reached_receiver) = mpsc:: channel ( 1 ) ;
1046+ drop ( reached_receiver) ;
1047+
1048+ let target_db = Arc :: new ( target_db) ;
1049+ let config = Config {
1050+ context : context. with_label ( "client" ) ,
1051+ db_config : H :: config ( & context. next_u64 ( ) . to_string ( ) , & context) ,
1052+ fetch_batch_size : NZU64 ! ( 5 ) ,
1053+ target : Target {
1054+ root : H :: sync_target_root ( & target_db) ,
1055+ range : non_empty_range ! ( lower_bound, upper_bound) ,
1056+ } ,
1057+ resolver : target_db. clone ( ) ,
1058+ apply_batch_size : 1024 ,
1059+ max_outstanding_requests : 1 ,
1060+ update_rx : None ,
1061+ finish_rx : None ,
1062+ reached_target_tx : Some ( reached_sender) ,
1063+ max_retained_roots : 1 ,
1064+ } ;
1065+
1066+ let synced_db: H :: Db = sync:: sync ( config)
1067+ . await
1068+ . expect ( "sync should succeed when reached-target receiver is dropped" ) ;
1069+ assert_eq ! ( synced_db. root( ) , verification_root) ;
1070+ assert_eq ! ( synced_db. bounds( ) . await . end, upper_bound) ;
1071+ assert_eq ! ( synced_db. inactivity_floor_loc( ) . await , lower_bound) ;
1072+
1073+ synced_db. destroy ( ) . await . unwrap ( ) ;
1074+ Arc :: try_unwrap ( target_db)
1075+ . unwrap_or_else ( |_| panic ! ( "failed to unwrap Arc" ) )
1076+ . destroy ( )
1077+ . await
1078+ . unwrap ( ) ;
1079+ } ) ;
1080+ }
1081+
9181082/// Test that the client can handle target updates during sync execution.
9191083pub ( crate ) fn test_target_update_during_sync < H : SyncTestHarness > (
9201084 initial_ops : usize ,
@@ -1816,6 +1980,21 @@ macro_rules! sync_tests_for_harness {
18161980 super :: test_sync_waits_for_explicit_finish:: <$harness>( ) ;
18171981 }
18181982
1983+ #[ test_traced]
1984+ fn test_sync_handles_early_finish_signal( ) {
1985+ super :: test_sync_handles_early_finish_signal:: <$harness>( ) ;
1986+ }
1987+
1988+ #[ test_traced]
1989+ fn test_sync_fails_when_finish_sender_dropped( ) {
1990+ super :: test_sync_fails_when_finish_sender_dropped:: <$harness>( ) ;
1991+ }
1992+
1993+ #[ test_traced]
1994+ fn test_sync_allows_dropped_reached_target_receiver( ) {
1995+ super :: test_sync_allows_dropped_reached_target_receiver:: <$harness>( ) ;
1996+ }
1997+
18191998 #[ rstest]
18201999 #[ case( 1 , 1 ) ]
18212000 #[ case( 1 , 2 ) ]
0 commit comments