Skip to content

Commit 5084b57

Browse files
authored
Merge branch 'main' into akaladarshi/update-tipset-state
2 parents 47a3b4e + 8e7c0bb commit 5084b57

File tree

21 files changed

+506
-610
lines changed

21 files changed

+506
-610
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
### Breaking
2929

3030
- [#5559](https://github.com/ChainSafe/forest/pull/5559) Change `Filecoin.ChainGetMinBaseFee` to `Forest.ChainGetMinBaseFee` with read access.
31+
- [#5589](https://github.com/ChainSafe/forest/pull/5589) Replace existing `Filecoin.SyncState` API with new `Forest.SyncStatus` to track node syncing progress specific to Forest.
3132

3233
### Added
3334

src/chain_sync/chain_follower.rs

Lines changed: 46 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@
1515
//!
1616
//! The state machine does not do any network requests or validation. Those are
1717
//! handled by an external actor.
18-
use std::time::SystemTime;
19-
use std::{ops::Deref as _, sync::Arc};
20-
2118
use crate::libp2p::hello::HelloRequest;
2219
use crate::message_pool::MessagePool;
2320
use crate::message_pool::MpoolRpcProvider;
@@ -31,11 +28,15 @@ use fvm_ipld_blockstore::Blockstore;
3128
use itertools::Itertools;
3229
use libp2p::PeerId;
3330
use parking_lot::Mutex;
31+
use std::time::SystemTime;
32+
use std::{ops::Deref as _, sync::Arc};
3433
use tokio::{sync::Notify, task::JoinSet};
3534
use tracing::{debug, error, info, trace, warn};
3635

37-
use crate::chain_sync::SyncState;
36+
use super::network_context::SyncNetworkContext;
37+
use crate::chain_sync::sync_status::SyncStatusReport;
3838
use crate::chain_sync::tipset_syncer::validate_tipset;
39+
use crate::chain_sync::{ForkSyncInfo, ForkSyncStage};
3940
use crate::{
4041
blocks::{Block, FullTipset, Tipset, TipsetKey},
4142
chain::ChainStore,
@@ -44,12 +45,9 @@ use crate::{
4445
};
4546
use parking_lot::RwLock;
4647

47-
use super::SyncStage;
48-
use super::network_context::SyncNetworkContext;
49-
5048
pub struct ChainFollower<DB> {
51-
/// Syncing state of chain sync workers.
52-
pub sync_states: Arc<RwLock<nunny::Vec<SyncState>>>,
49+
/// Syncing status of the chain
50+
pub sync_status: Arc<RwLock<SyncStatusReport>>,
5351

5452
/// manages retrieving and updates state objects
5553
state_manager: Arc<StateManager<DB>>,
@@ -93,14 +91,9 @@ impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
9391
stateless_mode: bool,
9492
mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
9593
) -> Self {
96-
let heaviest = state_manager.chain_store().heaviest_tipset();
97-
let mut main_sync_state = SyncState::default();
98-
main_sync_state.init(heaviest.clone(), heaviest.clone());
99-
main_sync_state.set_epoch(heaviest.epoch());
100-
main_sync_state.set_stage(SyncStage::Idle);
10194
let (tipset_sender, tipset_receiver) = flume::bounded(20);
10295
Self {
103-
sync_states: Arc::new(RwLock::new(nunny::vec![main_sync_state])),
96+
sync_status: Arc::new(RwLock::new(SyncStatusReport::init())),
10497
state_manager,
10598
network,
10699
genesis,
@@ -121,7 +114,7 @@ impl<DB: Blockstore + Sync + Send + 'static> ChainFollower<DB> {
121114
self.tipset_receiver,
122115
self.network,
123116
self.mem_pool,
124-
self.sync_states,
117+
self.sync_status,
125118
self.genesis,
126119
self.stateless_mode,
127120
)
@@ -138,7 +131,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
138131
tipset_receiver: flume::Receiver<Arc<FullTipset>>,
139132
network: SyncNetworkContext<DB>,
140133
mem_pool: Arc<MessagePool<MpoolRpcProvider<DB>>>,
141-
sync_states: Arc<RwLock<nunny::Vec<SyncState>>>,
134+
sync_status: Arc<RwLock<SyncStatusReport>>,
142135
genesis: Arc<Tipset>,
143136
stateless_mode: bool,
144137
) -> anyhow::Result<()> {
@@ -228,7 +221,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
228221
}
229222
});
230223

231-
// When the state machine is updated, we need to update the sync states and spawn tasks
224+
// When the state machine is updated, we need to update the sync status and spawn tasks
232225
set.spawn({
233226
let state_manager = state_manager.clone();
234227
let state_machine = state_machine.clone();
@@ -239,31 +232,16 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
239232
state_changed.notified().await;
240233

241234
let mut tasks_set = tasks.lock();
242-
let (task_vec, states) = state_machine.lock().tasks();
235+
let (task_vec, current_active_forks) = state_machine.lock().tasks();
243236

244237
// Update the sync states
245238
{
246-
let heaviest = state_manager.chain_store().heaviest_tipset();
247-
let mut sync_states_guard = sync_states.write();
248-
249-
sync_states_guard.truncate(std::num::NonZeroUsize::new(1).unwrap());
250-
let first = sync_states_guard.first_mut();
251-
first.set_epoch(heaviest.epoch());
252-
first.set_target(Some(
253-
state_machine
254-
.lock()
255-
.heaviest_tipset()
256-
.unwrap_or(heaviest.clone()),
257-
));
258-
let seconds_per_epoch = state_manager.chain_config().block_delay_secs;
259-
let time_diff =
260-
(Utc::now().timestamp() as u64).saturating_sub(heaviest.min_timestamp());
261-
if time_diff < seconds_per_epoch as u64 * 2 {
262-
first.set_stage(SyncStage::Complete);
263-
} else {
264-
first.set_stage(SyncStage::Messages);
265-
}
266-
sync_states_guard.extend(states);
239+
let mut status_report_guard = sync_status.write();
240+
status_report_guard.update(
241+
&state_manager,
242+
current_active_forks,
243+
stateless_mode,
244+
);
267245
}
268246

269247
for task in task_vec {
@@ -322,7 +300,7 @@ pub async fn chain_follower<DB: Blockstore + Sync + Send + 'static>(
322300

323301
// Only print 'Catching up to HEAD' if we're more than 10 epochs
324302
// behind. Otherwise it can be too spammy.
325-
match (expected_head as i64 - heaviest_epoch > 10, to_download > 0) {
303+
match (expected_head - heaviest_epoch > 10, to_download > 0) {
326304
(true, true) => info!(
327305
"Catching up to HEAD: {} -> {}, downloading {} tipsets",
328306
heaviest_epoch, expected_head, to_download
@@ -582,13 +560,6 @@ impl<DB: Blockstore> SyncStateMachine<DB> {
582560
chains
583561
}
584562

585-
fn heaviest_tipset(&self) -> Option<Arc<Tipset>> {
586-
self.tipsets
587-
.values()
588-
.max_by_key(|ts| ts.weight())
589-
.map(|ts| Arc::new(ts.deref().clone().into_tipset()))
590-
}
591-
592563
fn is_validated(&self, tipset: &FullTipset) -> bool {
593564
let db = self.cs.blockstore();
594565
self.stateless_mode || db.has(tipset.parent_state()).unwrap_or(false)
@@ -732,36 +703,44 @@ impl<DB: Blockstore> SyncStateMachine<DB> {
732703
}
733704
}
734705

735-
pub fn tasks(&self) -> (Vec<SyncTask>, Vec<SyncState>) {
736-
let mut states = Vec::new();
706+
pub fn tasks(&self) -> (Vec<SyncTask>, Vec<ForkSyncInfo>) {
707+
// Get the node's current validated head epoch once, as it's the same for all forks.
708+
let current_validated_epoch = self.cs.heaviest_tipset().epoch();
709+
let now = Utc::now();
710+
711+
let mut active_sync_info = Vec::new();
737712
let mut tasks = Vec::new();
738713
for chain in self.chains() {
739714
if let Some(first_ts) = chain.first() {
740-
let last = chain.last().expect("Infallible");
741-
let mut state = SyncState::default();
742-
state.init(
743-
Arc::new(first_ts.deref().clone().into_tipset()),
744-
Arc::new(last.deref().clone().into_tipset()),
745-
);
746-
state.set_epoch(first_ts.epoch());
715+
let last_ts = chain.last().expect("Infallible");
716+
let stage: ForkSyncStage;
717+
let start_time = Some(now);
718+
747719
if !self.is_ready_for_validation(first_ts) {
748-
state.set_stage(SyncStage::Headers);
720+
stage = ForkSyncStage::FetchingHeaders;
749721
tasks.push(SyncTask::FetchTipset(
750722
first_ts.parents().clone(),
751723
first_ts.epoch(),
752724
));
753725
} else {
754-
if last.epoch() - first_ts.epoch() > 5 {
755-
state.set_stage(SyncStage::Messages);
756-
} else {
757-
state.set_stage(SyncStage::Complete);
758-
}
726+
stage = ForkSyncStage::ValidatingTipsets;
759727
tasks.push(SyncTask::ValidateTipset(first_ts.clone()));
760728
}
761-
states.push(state);
729+
730+
let fork_info = ForkSyncInfo {
731+
target_tipset_key: last_ts.key().clone(),
732+
target_epoch: last_ts.epoch(),
733+
target_sync_epoch_start: first_ts.epoch(),
734+
stage,
735+
validated_chain_head_epoch: current_validated_epoch,
736+
start_time,
737+
last_updated: Some(now),
738+
};
739+
740+
active_sync_info.push(fork_info);
762741
}
763742
}
764-
(tasks, states)
743+
(tasks, active_sync_info)
765744
}
766745
}
767746

@@ -898,7 +877,7 @@ mod tests {
898877
}
899878

900879
#[test]
901-
fn test_sync_state_machine_validation_order() {
880+
fn test_state_machine_validation_order() {
902881
let (cs, c4u) = setup();
903882
let db = cs.db.clone();
904883

@@ -927,7 +906,7 @@ mod tests {
927906
// Record validation order by processing all validation tasks in each iteration
928907
let mut validation_tasks = Vec::new();
929908
loop {
930-
let (tasks, _states) = state_machine.tasks();
909+
let (tasks, _) = state_machine.tasks();
931910

932911
// Find all validation tasks
933912
let validation_tipsets: Vec<_> = tasks

src/chain_sync/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ mod chain_muxer;
77
pub mod consensus;
88
pub mod metrics;
99
pub mod network_context;
10-
mod sync_state;
10+
mod sync_status;
1111
mod tipset_syncer;
1212
mod validation;
1313

@@ -16,6 +16,6 @@ pub use self::{
1616
chain_follower::ChainFollower,
1717
chain_muxer::SyncConfig,
1818
consensus::collect_errs,
19-
sync_state::{SyncStage, SyncState},
19+
sync_status::{ForkSyncInfo, ForkSyncStage, NodeSyncStatus, SyncStatusReport},
2020
validation::{TipsetValidationError, TipsetValidator},
2121
};

0 commit comments

Comments
 (0)