Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions src/chain/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,9 +319,9 @@ where
/// is usually 900. The `heaviest_tipset` is a reference point in the
/// blockchain. It must be a child of the look-back tipset.
pub fn get_lookback_tipset_for_round(
chain_index: Arc<ChainIndex<Arc<DB>>>,
chain_config: Arc<ChainConfig>,
heaviest_tipset: Arc<Tipset>,
chain_index: &Arc<ChainIndex<Arc<DB>>>,
chain_config: &Arc<ChainConfig>,
heaviest_tipset: &Arc<Tipset>,
round: ChainEpoch,
) -> Result<(Arc<Tipset>, Cid), Error>
where
Expand All @@ -343,19 +343,19 @@ where
let beacon = Arc::new(chain_config.get_beacon_schedule(genesis_timestamp));
let StateOutput { state_root, .. } = crate::state_manager::apply_block_messages(
genesis_timestamp,
Arc::clone(&chain_index),
Arc::clone(&chain_config),
Arc::clone(chain_index),
Arc::clone(chain_config),
beacon,
// Using shared WASM engine here as creating new WASM engines is expensive
// (takes seconds to minutes). It's only acceptable here because this situation is
// so rare (may happen in dev-networks, doesn't happen in calibnet or mainnet.)
&crate::shim::machine::GLOBAL_MULTI_ENGINE,
Arc::clone(&heaviest_tipset),
Arc::clone(heaviest_tipset),
crate::state_manager::NO_CALLBACK,
VMTrace::NotTraced,
)
.map_err(|e| Error::Other(e.to_string()))?;
return Ok((heaviest_tipset, state_root));
return Ok((heaviest_tipset.clone(), state_root));
}

let next_ts = chain_index
Expand Down Expand Up @@ -616,36 +616,36 @@ impl Default for MsgsInTipsetCache {

/// Same as [`messages_for_tipset`] but uses a cache to store messages for each tipset.
pub fn messages_for_tipset_with_cache<DB>(
db: Arc<DB>,
db: &Arc<DB>,
ts: &Tipset,
cache: Arc<MsgsInTipsetCache>,
cache: &MsgsInTipsetCache,
) -> Result<Vec<ChainMessage>, Error>
where
DB: Blockstore,
{
let key = ts.key();
cache
.get_or_insert_with(key, || {
messages_for_tipset(Arc::clone(&db), ts).context("failed to get messages for tipset")
messages_for_tipset(db, ts).context("failed to get messages for tipset")
})
.map_err(Into::into)
}

/// Given a tipset this function will return all unique messages in that tipset.
/// Note: This function is resource-intensive and can be a bottleneck for certain use-cases.
/// Consider using [`messages_for_tipset_with_cache`] for better performance.
pub fn messages_for_tipset<DB>(db: Arc<DB>, ts: &Tipset) -> Result<Vec<ChainMessage>, Error>
pub fn messages_for_tipset<DB>(db: &Arc<DB>, ts: &Tipset) -> Result<Vec<ChainMessage>, Error>
where
DB: Blockstore,
{
let mut applied: HashMap<Address, u64> = HashMap::new();
let mut balances: HashMap<Address, TokenAmount> = HashMap::new();
let state = StateTree::new_from_tipset(Arc::clone(&db), ts)?;
let state = StateTree::new_from_tipset(Arc::clone(db), ts)?;

// message to get all messages for block_header into a single iterator
let mut get_message_for_block_header =
|b: &CachingBlockHeader| -> Result<Vec<ChainMessage>, Error> {
let (unsigned, signed) = block_messages(&db, b)?;
let (unsigned, signed) = block_messages(db, b)?;
let mut messages = Vec::with_capacity(unsigned.len() + signed.len());
let unsigned_box = unsigned.into_iter().map(ChainMessage::Unsigned);
let signed_box = signed.into_iter().map(ChainMessage::Signed);
Expand Down
2 changes: 1 addition & 1 deletion src/chain_sync/chain_follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ impl SyncTask {
} => {
let genesis = cs.genesis_tipset();
match validate_tipset(
state_manager.clone(),
&state_manager,
cs,
tipset.deref().clone(),
&genesis,
Expand Down
2 changes: 1 addition & 1 deletion src/chain_sync/sync_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl SyncStatusReport {

pub(crate) fn update<DB: Blockstore + Sync + Send + 'static>(
&mut self,
state_manager: &Arc<StateManager<DB>>,
state_manager: &StateManager<DB>,
current_active_forks: Vec<ForkSyncInfo>,
stateless_mode: bool,
) {
Expand Down
171 changes: 90 additions & 81 deletions src/chain_sync/tipset_syncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl TipsetSyncerError {
/// ones to the bad block cache, depending on strategy. Any bad block fails
/// validation.
pub async fn validate_tipset<DB: Blockstore + Send + Sync + 'static>(
state_manager: Arc<StateManager<DB>>,
state_manager: &Arc<StateManager<DB>>,
chainstore: &ChainStore<DB>,
full_tipset: FullTipset,
genesis: &Tipset,
Expand Down Expand Up @@ -205,9 +205,9 @@ async fn validate_block<DB: Blockstore + Sync + Send + 'static>(

// Retrieve lookback tipset for validation
let lookback_state = ChainStore::get_lookback_tipset_for_round(
state_manager.chain_store().chain_index().clone(),
state_manager.chain_config().clone(),
base_tipset.clone(),
state_manager.chain_store().chain_index(),
state_manager.chain_config(),
&base_tipset,
block.header().epoch,
)
.map_err(|e| (*block_cid, e.into()))
Expand All @@ -224,100 +224,109 @@ async fn validate_block<DB: Blockstore + Sync + Send + 'static>(

// Check block messages
validations.spawn(check_block_messages(
Arc::clone(&state_manager),
Arc::clone(&block),
Arc::clone(&base_tipset),
state_manager.clone(),
block.clone(),
base_tipset.clone(),
));

// Base fee check
let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
let v_base_tipset = Arc::clone(&base_tipset);
let v_block_store = state_manager.blockstore_owned();
let v_block = Arc::clone(&block);
validations.spawn_blocking(move || {
let base_fee = crate::chain::compute_base_fee(&v_block_store, &v_base_tipset, smoke_height)
.map_err(|e| {
TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
})?;
let parent_base_fee = &v_block.header.parent_base_fee;
if &base_fee != parent_base_fee {
return Err(TipsetSyncerError::Validation(format!(
"base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
)));
validations.spawn_blocking({
let smoke_height = state_manager.chain_config().epoch(Height::Smoke);
let base_tipset = Arc::clone(&base_tipset);
let block_store = state_manager.blockstore_owned();
let block = Arc::clone(&block);
move || {
let base_fee = crate::chain::compute_base_fee(&block_store, &base_tipset, smoke_height)
.map_err(|e| {
TipsetSyncerError::Validation(format!("Could not compute base fee: {e}"))
})?;
let parent_base_fee = &block.header.parent_base_fee;
if &base_fee != parent_base_fee {
return Err(TipsetSyncerError::Validation(format!(
"base fee doesn't match: {parent_base_fee} (header), {base_fee} (computed)"
)));
}
Ok(())
}
Ok(())
});

// Parent weight calculation check
let v_block_store = state_manager.blockstore_owned();
let v_base_tipset = Arc::clone(&base_tipset);
let weight = header.weight.clone();
validations.spawn_blocking(move || {
let calc_weight = fil_cns::weight(&v_block_store, &v_base_tipset).map_err(|e| {
TipsetSyncerError::Calculation(format!("Error calculating weight: {e}"))
})?;
if weight != calc_weight {
return Err(TipsetSyncerError::Validation(format!(
"Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
)));
validations.spawn_blocking({
let block_store = state_manager.blockstore_owned();
let base_tipset = base_tipset.clone();
let weight = header.weight.clone();
move || {
let calc_weight = fil_cns::weight(&block_store, &base_tipset).map_err(|e| {
TipsetSyncerError::Calculation(format!("Error calculating weight: {e}"))
})?;
if weight != calc_weight {
return Err(TipsetSyncerError::Validation(format!(
"Parent weight doesn't match: {weight} (header), {calc_weight} (computed)"
)));
}
Ok(())
}
Ok(())
});

// State root and receipt root validations
let v_state_manager = Arc::clone(&state_manager);
let v_base_tipset = Arc::clone(&base_tipset);
let v_block = Arc::clone(&block);
validations.spawn(async move {
let header = v_block.header();
let (state_root, receipt_root) = v_state_manager
.tipset_state(&v_base_tipset)
.await
.map_err(|e| {
TipsetSyncerError::Calculation(format!("Failed to calculate state: {e}"))
})?;
validations.spawn({
let state_manager = state_manager.clone();
let block = block.clone();
async move {
let header = block.header();
let (state_root, receipt_root) = state_manager
.tipset_state(&base_tipset)
.await
.map_err(|e| {
TipsetSyncerError::Calculation(format!("Failed to calculate state: {e}"))
})?;

if state_root != header.state_root {
return Err(TipsetSyncerError::Validation(format!(
"Parent state root did not match computed state: {} (header), {} (computed)",
header.state_root, state_root,
)));
}
if state_root != header.state_root {
return Err(TipsetSyncerError::Validation(format!(
"Parent state root did not match computed state: {} (header), {} (computed)",
header.state_root, state_root,
)));
}

if receipt_root != header.message_receipts {
return Err(TipsetSyncerError::Validation(format!(
"Parent receipt root did not match computed root: {} (header), {} (computed)",
header.message_receipts, receipt_root
)));
if receipt_root != header.message_receipts {
return Err(TipsetSyncerError::Validation(format!(
"Parent receipt root did not match computed root: {} (header), {} (computed)",
header.message_receipts, receipt_root
)));
}
Ok(())
}
Ok(())
});

// Block signature check
let v_block = block.clone();
validations.spawn_blocking(move || {
v_block.header().verify_signature_against(&work_addr)?;
Ok(())
validations.spawn_blocking({
let block = block.clone();
move || {
block.header().verify_signature_against(&work_addr)?;
Ok(())
}
});

let v_block = block.clone();
validations.spawn(async move {
consensus
.validate_block(state_manager, v_block)
.map_err(|errs| {
// NOTE: Concatenating errors here means the wrapper type of error
// never surfaces, yet we always pay the cost of the generic argument.
// But there's no reason `validate_block` couldn't return a list of all
// errors instead of a single one that has all the error messages,
// removing the caller's ability to distinguish between them.

TipsetSyncerError::concat(
errs.into_iter_ne()
.map(TipsetSyncerError::ConsensusError)
.collect_vec(),
)
})
.await
validations.spawn({
let block = block.clone();
async move {
consensus
.validate_block(state_manager, block)
.map_err(|errs| {
// NOTE: Concatenating errors here means the wrapper type of error
// never surfaces, yet we always pay the cost of the generic argument.
// But there's no reason `validate_block` couldn't return a list of all
// errors instead of a single one that has all the error messages,
// removing the caller's ability to distinguish between them.

TipsetSyncerError::concat(
errs.into_iter_ne()
.map(TipsetSyncerError::ConsensusError)
.collect_vec(),
)
})
.await
}
});

// Collect the errors from the async validations
Expand Down Expand Up @@ -355,9 +364,9 @@ async fn check_block_messages<DB: Blockstore + Send + Sync + 'static>(
// check block message and signatures in them
let mut pub_keys = Vec::with_capacity(block.bls_msgs().len());
let mut cids = Vec::with_capacity(block.bls_msgs().len());
let db = state_manager.blockstore_owned();
let db = state_manager.blockstore();
for m in block.bls_msgs() {
let pk = StateManager::get_bls_public_key(&db, &m.from, *base_tipset.parent_state())?;
let pk = StateManager::get_bls_public_key(db, &m.from, *base_tipset.parent_state())?;
pub_keys.push(pk);
cids.push(m.cid().to_bytes());
}
Expand Down
Loading
Loading