Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ pub struct ChainStore<DB> {
heaviest_tipset_key_provider: Arc<dyn HeaviestTipsetKeyProvider + Sync + Send>,

/// Heaviest tipset cache
heaviest_tipset_cache: Arc<RwLock<Tipset>>,
heaviest_tipset: Arc<RwLock<Tipset>>,

/// F3 finalized tipset cache
f3_finalized_tipset: Arc<RwLock<Option<Tipset>>>,

/// Used as a cache for tipset `lookbacks`.
chain_index: Arc<ChainIndex<Arc<DB>>>,
Expand Down Expand Up @@ -129,26 +132,42 @@ where
genesis_block_header: CachingBlockHeader,
) -> anyhow::Result<Self> {
let (publisher, _) = broadcast::channel(SINK_CAP);
let chain_index = Arc::new(ChainIndex::new(Arc::clone(&db)));
let validated_blocks = Mutex::new(HashSet::default());
let head = if let Some(head_tsk) = heaviest_tipset_key_provider
.heaviest_tipset_key()
.context("failed to load head tipset key")?
&& let Some(head) = chain_index
.load_tipset(&head_tsk)
.context("failed to load head tipset")?
{
head
Tipset::load_required(&db, &head_tsk)
.with_context(|| format!("failed to load head tipset with key {head_tsk}"))?
} else {
Tipset::from(&genesis_block_header)
};
let heaviest_tipset = Arc::new(RwLock::new(head));
let f3_finalized_tipset: Arc<RwLock<Option<Tipset>>> = Default::default();
let chain_index = Arc::new(
ChainIndex::new(db.clone()).with_is_tipset_finalized(Box::new({
let chain_finality = chain_config.policy.chain_finality;
let heaviest_tipset = heaviest_tipset.clone();
let f3_finalized_tipset = f3_finalized_tipset.clone();
move |ts| {
let finalized = f3_finalized_tipset
.read()
.as_ref()
.map(|ts| ts.epoch())
.unwrap_or_default()
.max(heaviest_tipset.read().epoch() - chain_finality);
ts.epoch() <= finalized
}
})),
);
let cs = Self {
head_changes_tx: publisher,
chain_index,
tipset_tracker: TipsetTracker::new(Arc::clone(&db), chain_config.clone()),
db,
heaviest_tipset_key_provider,
heaviest_tipset_cache: Arc::new(RwLock::new(head)),
heaviest_tipset,
f3_finalized_tipset,
genesis_block_header,
validated_blocks,
eth_mappings,
Expand All @@ -159,6 +178,16 @@ where
Ok(cs)
}

/// Sets F3 finalized tipset
pub fn set_f3_finalized_tipset(&self, ts: Tipset) {
self.f3_finalized_tipset.write().replace(ts);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

/// Gets F3 finalized tipset
pub fn f3_finalized_tipset(&self) -> Option<Tipset> {
self.f3_finalized_tipset.read().clone()
}

/// Cache for messages in tipsets, keyed by tipset key.
pub fn messages_in_tipset_cache(&self) -> &MessagesInTipsetCache {
&self.messages_in_tipset_cache
Expand All @@ -169,7 +198,7 @@ where
head.key().save(self.blockstore())?;
self.heaviest_tipset_key_provider
.set_heaviest_tipset_key(head.key())?;
let old_head = std::mem::replace(&mut *self.heaviest_tipset_cache.write(), head.clone());
let old_head = std::mem::replace(&mut *self.heaviest_tipset.write(), head.clone());

if crate::utils::broadcast::has_subscribers(&self.head_changes_tx) {
let changes = match crate::rpc::chain::chain_get_path(self, old_head.key(), head.key())
Expand Down Expand Up @@ -242,7 +271,7 @@ where

/// Returns the currently tracked heaviest tipset.
pub fn heaviest_tipset(&self) -> Tipset {
self.heaviest_tipset_cache.read().clone()
self.heaviest_tipset.read().clone()
}

/// Returns the genesis tipset.
Expand Down
62 changes: 45 additions & 17 deletions src/chain/store/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0, MIT

use std::num::NonZeroUsize;
use std::sync::LazyLock;

use crate::beacon::{BeaconEntry, IGNORE_DRAND};
use crate::blocks::{Tipset, TipsetKey};
Expand All @@ -19,13 +18,21 @@ const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(2880_usize);

type TipsetCache = SizeTrackingLruCache<TipsetKey, Tipset>;

type TipsetHeightCache = SizeTrackingLruCache<ChainEpoch, TipsetKey>;

type IsTipsetFinalizedFn = Box<dyn Fn(&Tipset) -> bool + Send + Sync>;

/// Keeps look-back tipsets in cache at a given interval `skip_length` and can
/// be used to look-back at the chain to retrieve an old tipset.
pub struct ChainIndex<DB> {
/// `Arc` reference tipset cache.
/// tipset key to tipset mappings.
ts_cache: TipsetCache,
/// epoch to tipset key mappings.
ts_height_cache: TipsetHeightCache,
/// `Blockstore` pointer needed to load tipsets from cold storage.
db: DB,
/// check whether a tipset is finalized
is_tipset_finalized: Option<IsTipsetFinalizedFn>,
}

#[derive(Debug, Clone, Copy)]
Expand All @@ -41,7 +48,23 @@ impl<DB: Blockstore> ChainIndex<DB> {
pub fn new(db: DB) -> Self {
let ts_cache =
SizeTrackingLruCache::new_with_metrics("tipset".into(), DEFAULT_TIPSET_CACHE_SIZE);
Self { ts_cache, db }
let ts_height_cache: SizeTrackingLruCache<ChainEpoch, TipsetKey> =
SizeTrackingLruCache::new_with_metrics(
"tipset_by_height".into(),
// 20480 * 900 = 18432000 which is sufficient for mainnet
nonzero!(20480_usize),
);
Self {
ts_cache,
ts_height_cache,
db,
is_tipset_finalized: None,
}
}

pub fn with_is_tipset_finalized(mut self, f: IsTipsetFinalizedFn) -> Self {
self.is_tipset_finalized = Some(f);
self
Comment thread
hanabi1224 marked this conversation as resolved.
}

pub fn db(&self) -> &DB {
Expand Down Expand Up @@ -129,24 +152,21 @@ impl<DB: Blockstore> ChainIndex<DB> {
) -> Result<Tipset, Error> {
use crate::shim::policy::policy_constants::CHAIN_FINALITY;

static CACHE: LazyLock<SizeTrackingLruCache<ChainEpoch, TipsetKey>> = LazyLock::new(|| {
SizeTrackingLruCache::new_with_metrics(
"tipset_by_height".into(),
// 20480 * 900 = 18432000 which is sufficient for mainnet
nonzero!(20480_usize),
)
});

// use `CHAIN_FINALITY` as checkpoint interval
const CHECKPOINT_INTERVAL: ChainEpoch = CHAIN_FINALITY;
fn next_checkpoint(epoch: ChainEpoch) -> ChainEpoch {
epoch - epoch.mod_floor(&CHAIN_FINALITY) + CHAIN_FINALITY
epoch - epoch.mod_floor(&CHECKPOINT_INTERVAL) + CHECKPOINT_INTERVAL
}
fn is_checkpoint(epoch: ChainEpoch) -> bool {
epoch.mod_floor(&CHECKPOINT_INTERVAL) == 0
}

let from_epoch = from.epoch();

let mut checkpoint_from_epoch = to;
while checkpoint_from_epoch < from_epoch {
if let Some(checkpoint_from_key) = CACHE.get_cloned(&checkpoint_from_epoch)
if let Some(checkpoint_from_key) =
self.ts_height_cache.get_cloned(&checkpoint_from_epoch)
&& let Ok(Some(checkpoint_from)) = self.load_tipset(&checkpoint_from_key)
{
from = checkpoint_from;
Expand All @@ -165,11 +185,19 @@ impl<DB: Blockstore> ChainIndex<DB> {
)));
}

let from_epoch = from.epoch();
let is_finalized = |ts: &Tipset| {
if let Some(is_finalized_fn) = &self.is_tipset_finalized {
is_finalized_fn(ts)
} else {
ts.epoch() <= from_epoch - CHAIN_FINALITY
}
};
for (child, parent) in from.chain(&self.db).tuple_windows() {
// use `child.epoch() + CHAIN_FINALITY <= from_epoch`
// to ensure the cached child is finalized(not on a fork).
if child.epoch() % CHAIN_FINALITY == 0 && child.epoch() + CHAIN_FINALITY <= from_epoch {
CACHE.push(child.epoch(), child.key().clone());
// update cache only when child is finalized.
if is_checkpoint(child.epoch()) && is_finalized(&child) {
self.ts_height_cache
.push(child.epoch(), child.key().clone());
Comment thread
hanabi1224 marked this conversation as resolved.
}

if to == child.epoch() {
Expand Down
41 changes: 40 additions & 1 deletion src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ use crate::utils;
use crate::utils::misc::env::is_env_truthy;
use crate::utils::{proofs_api::ensure_proof_params_downloaded, version::FOREST_VERSION_STRING};
use anyhow::{Context as _, bail};
use backon::{ExponentialBuilder, Retryable};
use dialoguer::theme::ColorfulTheme;
use futures::{Future, FutureExt};
use std::path::Path;
use std::sync::Arc;
use std::sync::OnceLock;
use std::time::Instant;
use std::time::{Duration, Instant};
use tokio::{
net::TcpListener,
signal::{
Expand Down Expand Up @@ -479,6 +480,44 @@ fn maybe_start_f3_service(opts: &CliOpts, config: &Config, ctx: &AppContext) ->
);
}
});
tokio::task::spawn({
let chain_store = ctx.chain_store().clone();
async move {
// wait 1s to let F3 RPC server start
tokio::time::sleep(Duration::from_secs(1)).await;
Comment thread
hanabi1224 marked this conversation as resolved.
match (|| crate::rpc::f3::F3GetLatestCertificate::get())
.retry(ExponentialBuilder::default())
.await
{
Ok(f3_finalized_cert) => {
let f3_finalized_head = f3_finalized_cert.chain_head();
match chain_store
.chain_index()
.load_required_tipset(&f3_finalized_head.key)
{
Ok(ts) => {
chain_store.set_f3_finalized_tipset(ts);
tracing::info!(
"Set F3 finalized tipset to epoch {} and key {}",
f3_finalized_head.epoch,
f3_finalized_head.key,
);
}
Err(e) => {
tracing::error!(
"Failed to get F3 finalized tipset epoch {} and key {}: {e}",
f3_finalized_head.epoch,
f3_finalized_head.key
);
}
}
}
Err(e) => {
tracing::error!("Failed to get F3 latest certificate: {e}");
}
}
}
});
}

Ok(())
Expand Down
22 changes: 5 additions & 17 deletions src/rpc/methods/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1116,28 +1116,16 @@ impl ChainGetTipSetV2 {
pub async fn get_latest_finalized_tipset(
ctx: &Ctx<impl Blockstore + Send + Sync + 'static>,
) -> anyhow::Result<Tipset> {
let Ok(f3_finalized_cert) = crate::rpc::f3::F3GetLatestCertificate::get().await else {
let Some(f3_finalized_head) = ctx.chain_store().f3_finalized_tipset() else {
return Self::get_ec_finalized_tipset(ctx);
};

let f3_finalized_head = f3_finalized_cert.chain_head();
let head = ctx.chain_store().heaviest_tipset();
// Latest F3 finalized tipset is older than EC finality, falling back to EC finality
if head.epoch() > f3_finalized_head.epoch + ctx.chain_config().policy.chain_finality {
return Self::get_ec_finalized_tipset(ctx);
if head.epoch() > f3_finalized_head.epoch() + ctx.chain_config().policy.chain_finality {
Self::get_ec_finalized_tipset(ctx)
} else {
Ok(f3_finalized_head)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

let ts = ctx
.chain_index()
.load_required_tipset(&f3_finalized_head.key)
.map_err(|e| {
anyhow::anyhow!(
"Failed to load F3 finalized tipset at epoch {} with key {}: {e}",
f3_finalized_head.epoch,
f3_finalized_head.key,
)
})?;
Ok(ts)
}

pub fn get_ec_finalized_tipset(ctx: &Ctx<impl Blockstore>) -> anyhow::Result<Tipset> {
Expand Down
4 changes: 3 additions & 1 deletion src/rpc/methods/f3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,8 +574,10 @@ impl RpcMethod<1> for Finalize {
)?;
let ts = Arc::new(Tipset::from(fts));
ctx.chain_store().put_tipset(&ts)?;
ctx.chain_store().set_heaviest_tipset(finalized_ts)?;
ctx.chain_store()
.set_heaviest_tipset(finalized_ts.clone())?;
}
ctx.chain_store().set_f3_finalized_tipset(finalized_ts);
}
Ok(())
}
Expand Down
Loading