1515//!
1616//! The state machine does not do any network requests or validation. Those are
1717//! handled by an external actor.
18- use std:: time:: SystemTime ;
19- use std:: { ops:: Deref as _, sync:: Arc } ;
20-
2118use crate :: libp2p:: hello:: HelloRequest ;
2219use crate :: message_pool:: MessagePool ;
2320use crate :: message_pool:: MpoolRpcProvider ;
@@ -31,11 +28,15 @@ use fvm_ipld_blockstore::Blockstore;
3128use itertools:: Itertools ;
3229use libp2p:: PeerId ;
3330use parking_lot:: Mutex ;
31+ use std:: time:: SystemTime ;
32+ use std:: { ops:: Deref as _, sync:: Arc } ;
3433use tokio:: { sync:: Notify , task:: JoinSet } ;
3534use tracing:: { debug, error, info, trace, warn} ;
3635
37- use crate :: chain_sync:: SyncState ;
36+ use super :: network_context:: SyncNetworkContext ;
37+ use crate :: chain_sync:: sync_status:: SyncStatusReport ;
3838use crate :: chain_sync:: tipset_syncer:: validate_tipset;
39+ use crate :: chain_sync:: { ForkSyncInfo , ForkSyncStage } ;
3940use crate :: {
4041 blocks:: { Block , FullTipset , Tipset , TipsetKey } ,
4142 chain:: ChainStore ,
@@ -44,12 +45,9 @@ use crate::{
4445} ;
4546use parking_lot:: RwLock ;
4647
47- use super :: SyncStage ;
48- use super :: network_context:: SyncNetworkContext ;
49-
5048pub struct ChainFollower < DB > {
51- /// Syncing state of chain sync workers.
52- pub sync_states : Arc < RwLock < nunny :: Vec < SyncState > > > ,
49+ /// Syncing status of the chain
50+ pub sync_status : Arc < RwLock < SyncStatusReport > > ,
5351
5452 /// manages retrieving and updates state objects
5553 state_manager : Arc < StateManager < DB > > ,
@@ -93,14 +91,9 @@ impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
9391 stateless_mode : bool ,
9492 mem_pool : Arc < MessagePool < MpoolRpcProvider < DB > > > ,
9593 ) -> Self {
96- let heaviest = state_manager. chain_store ( ) . heaviest_tipset ( ) ;
97- let mut main_sync_state = SyncState :: default ( ) ;
98- main_sync_state. init ( heaviest. clone ( ) , heaviest. clone ( ) ) ;
99- main_sync_state. set_epoch ( heaviest. epoch ( ) ) ;
100- main_sync_state. set_stage ( SyncStage :: Idle ) ;
10194 let ( tipset_sender, tipset_receiver) = flume:: bounded ( 20 ) ;
10295 Self {
103- sync_states : Arc :: new ( RwLock :: new ( nunny :: vec! [ main_sync_state ] ) ) ,
96+ sync_status : Arc :: new ( RwLock :: new ( SyncStatusReport :: init ( ) ) ) ,
10497 state_manager,
10598 network,
10699 genesis,
@@ -121,7 +114,7 @@ impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
121114 self . tipset_receiver ,
122115 self . network ,
123116 self . mem_pool ,
124- self . sync_states ,
117+ self . sync_status ,
125118 self . genesis ,
126119 self . stateless_mode ,
127120 )
@@ -138,7 +131,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
138131 tipset_receiver : flume:: Receiver < Arc < FullTipset > > ,
139132 network : SyncNetworkContext < DB > ,
140133 mem_pool : Arc < MessagePool < MpoolRpcProvider < DB > > > ,
141- sync_states : Arc < RwLock < nunny :: Vec < SyncState > > > ,
134+ sync_status : Arc < RwLock < SyncStatusReport > > ,
142135 genesis : Arc < Tipset > ,
143136 stateless_mode : bool ,
144137) -> anyhow:: Result < ( ) > {
@@ -228,7 +221,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
228221 }
229222 } ) ;
230223
231- // When the state machine is updated, we need to update the sync states and spawn tasks
224+ // When the state machine is updated, we need to update the sync status and spawn tasks
232225 set. spawn ( {
233226 let state_manager = state_manager. clone ( ) ;
234227 let state_machine = state_machine. clone ( ) ;
@@ -239,31 +232,16 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
239232 state_changed. notified ( ) . await ;
240233
241234 let mut tasks_set = tasks. lock ( ) ;
242- let ( task_vec, states ) = state_machine. lock ( ) . tasks ( ) ;
235+ let ( task_vec, current_active_forks ) = state_machine. lock ( ) . tasks ( ) ;
243236
244237 // Update the sync states
245238 {
246- let heaviest = state_manager. chain_store ( ) . heaviest_tipset ( ) ;
247- let mut sync_states_guard = sync_states. write ( ) ;
248-
249- sync_states_guard. truncate ( std:: num:: NonZeroUsize :: new ( 1 ) . unwrap ( ) ) ;
250- let first = sync_states_guard. first_mut ( ) ;
251- first. set_epoch ( heaviest. epoch ( ) ) ;
252- first. set_target ( Some (
253- state_machine
254- . lock ( )
255- . heaviest_tipset ( )
256- . unwrap_or ( heaviest. clone ( ) ) ,
257- ) ) ;
258- let seconds_per_epoch = state_manager. chain_config ( ) . block_delay_secs ;
259- let time_diff =
260- ( Utc :: now ( ) . timestamp ( ) as u64 ) . saturating_sub ( heaviest. min_timestamp ( ) ) ;
261- if time_diff < seconds_per_epoch as u64 * 2 {
262- first. set_stage ( SyncStage :: Complete ) ;
263- } else {
264- first. set_stage ( SyncStage :: Messages ) ;
265- }
266- sync_states_guard. extend ( states) ;
239+ let mut status_report_guard = sync_status. write ( ) ;
240+ status_report_guard. update (
241+ & state_manager,
242+ current_active_forks,
243+ stateless_mode,
244+ ) ;
267245 }
268246
269247 for task in task_vec {
@@ -322,7 +300,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
322300
323301 // Only print 'Catching up to HEAD' if we're more than 10 epochs
324302 // behind. Otherwise it can be too spammy.
325- match ( expected_head as i64 - heaviest_epoch > 10 , to_download > 0 ) {
303+ match ( expected_head - heaviest_epoch > 10 , to_download > 0 ) {
326304 ( true , true ) => info ! (
327305 "Catching up to HEAD: {} -> {}, downloading {} tipsets" ,
328306 heaviest_epoch, expected_head, to_download
@@ -582,13 +560,6 @@ impl<DB: Blockstore> SyncStateMachine<DB> {
582560 chains
583561 }
584562
585- fn heaviest_tipset ( & self ) -> Option < Arc < Tipset > > {
586- self . tipsets
587- . values ( )
588- . max_by_key ( |ts| ts. weight ( ) )
589- . map ( |ts| Arc :: new ( ts. deref ( ) . clone ( ) . into_tipset ( ) ) )
590- }
591-
592563 fn is_validated ( & self , tipset : & FullTipset ) -> bool {
593564 let db = self . cs . blockstore ( ) ;
594565 self . stateless_mode || db. has ( tipset. parent_state ( ) ) . unwrap_or ( false )
@@ -732,36 +703,44 @@ impl<DB: Blockstore> SyncStateMachine<DB> {
732703 }
733704 }
734705
735- pub fn tasks ( & self ) -> ( Vec < SyncTask > , Vec < SyncState > ) {
736- let mut states = Vec :: new ( ) ;
706+ pub fn tasks ( & self ) -> ( Vec < SyncTask > , Vec < ForkSyncInfo > ) {
707+ // Get the node's current validated head epoch once, as it's the same for all forks.
708+ let current_validated_epoch = self . cs . heaviest_tipset ( ) . epoch ( ) ;
709+ let now = Utc :: now ( ) ;
710+
711+ let mut active_sync_info = Vec :: new ( ) ;
737712 let mut tasks = Vec :: new ( ) ;
738713 for chain in self . chains ( ) {
739714 if let Some ( first_ts) = chain. first ( ) {
740- let last = chain. last ( ) . expect ( "Infallible" ) ;
741- let mut state = SyncState :: default ( ) ;
742- state. init (
743- Arc :: new ( first_ts. deref ( ) . clone ( ) . into_tipset ( ) ) ,
744- Arc :: new ( last. deref ( ) . clone ( ) . into_tipset ( ) ) ,
745- ) ;
746- state. set_epoch ( first_ts. epoch ( ) ) ;
715+ let last_ts = chain. last ( ) . expect ( "Infallible" ) ;
716+ let stage: ForkSyncStage ;
717+ let start_time = Some ( now) ;
718+
747719 if !self . is_ready_for_validation ( first_ts) {
748- state . set_stage ( SyncStage :: Headers ) ;
720+ stage = ForkSyncStage :: FetchingHeaders ;
749721 tasks. push ( SyncTask :: FetchTipset (
750722 first_ts. parents ( ) . clone ( ) ,
751723 first_ts. epoch ( ) ,
752724 ) ) ;
753725 } else {
754- if last. epoch ( ) - first_ts. epoch ( ) > 5 {
755- state. set_stage ( SyncStage :: Messages ) ;
756- } else {
757- state. set_stage ( SyncStage :: Complete ) ;
758- }
726+ stage = ForkSyncStage :: ValidatingTipsets ;
759727 tasks. push ( SyncTask :: ValidateTipset ( first_ts. clone ( ) ) ) ;
760728 }
761- states. push ( state) ;
729+
730+ let fork_info = ForkSyncInfo {
731+ target_tipset_key : last_ts. key ( ) . clone ( ) ,
732+ target_epoch : last_ts. epoch ( ) ,
733+ target_sync_epoch_start : first_ts. epoch ( ) ,
734+ stage,
735+ validated_chain_head_epoch : current_validated_epoch,
736+ start_time,
737+ last_updated : Some ( now) ,
738+ } ;
739+
740+ active_sync_info. push ( fork_info) ;
762741 }
763742 }
764- ( tasks, states )
743+ ( tasks, active_sync_info )
765744 }
766745}
767746
@@ -898,7 +877,7 @@ mod tests {
898877 }
899878
900879 #[ test]
901- fn test_sync_state_machine_validation_order ( ) {
880+ fn test_state_machine_validation_order ( ) {
902881 let ( cs, c4u) = setup ( ) ;
903882 let db = cs. db . clone ( ) ;
904883
@@ -927,7 +906,7 @@ mod tests {
927906 // Record validation order by processing all validation tasks in each iteration
928907 let mut validation_tasks = Vec :: new ( ) ;
929908 loop {
930- let ( tasks, _states ) = state_machine. tasks ( ) ;
909+ let ( tasks, _ ) = state_machine. tasks ( ) ;
931910
932911 // Find all validation tasks
933912 let validation_tipsets: Vec < _ > = tasks
0 commit comments