diff --git a/Cargo.toml b/Cargo.toml index a54a81ec2e22..2d3e8575e780 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -365,7 +365,7 @@ rustdoc-args = ["--document-private-items"] [workspace.lints.clippy] ref_option = "deny" ref_option_ref = "deny" -cast_lossless = "deny" +cast_lossless = "deny" [workspace] members = ["interop-tests"] diff --git a/src/chain/store/chain_store.rs b/src/chain/store/chain_store.rs index 84086620907b..aac538e37dec 100644 --- a/src/chain/store/chain_store.rs +++ b/src/chain/store/chain_store.rs @@ -275,6 +275,30 @@ where } } + pub fn load_child_tipset(&self, ts: &Tipset) -> Result { + let head = self.heaviest_tipset(); + if head.parents() == ts.key() { + Ok(head) + } else if head.epoch() > ts.epoch() { + let maybe_child = self.chain_index().tipset_by_height( + ts.epoch() + 1, + head, + ResolveNullTipset::TakeNewer, + )?; + if maybe_child.parents() == ts.key() { + Ok(maybe_child) + } else { + Err(Error::NotFound( + format!("child of tipset@{}", ts.epoch()).into(), + )) + } + } else { + Err(Error::NotFound( + format!("child of tipset@{}", ts.epoch()).into(), + )) + } + } + /// Determines if provided tipset is heavier than existing known heaviest /// tipset fn update_heaviest(&self, ts: Tipset) -> Result<(), Error> { diff --git a/src/chain/store/errors.rs b/src/chain/store/errors.rs index dab9dd023f39..27999c7d2d2d 100644 --- a/src/chain/store/errors.rs +++ b/src/chain/store/errors.rs @@ -1,7 +1,7 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -use std::fmt::Debug; +use std::{borrow::Cow, fmt::Debug}; use crate::blocks::CreateTipsetError; use cid::Error as CidErr; @@ -17,7 +17,7 @@ pub enum Error { UndefinedKey(String), /// Key not found in database #[error("{0} not found")] - NotFound(String), + NotFound(Cow<'static, str>), /// Error originating constructing blockchain structures #[error(transparent)] Blockchain(#[from] CreateTipsetError), @@ -29,7 +29,7 @@ pub enum Error { Cid(#[from] CidErr), /// Amt error #[error("State error: {0}")] - State(String), + State(Cow<'static, str>), /// Other chain error #[error("{0}")] Other(String), @@ -43,7 +43,7 @@ impl From for Error { impl From for Error { fn from(e: AmtErr) -> Error { - Error::State(e.to_string()) + Error::state(e.to_string()) } } @@ -72,7 +72,7 @@ impl From> for Error { } impl Error { - pub fn state(msg: impl std::fmt::Display) -> Self { - Self::State(msg.to_string()) + pub fn state(msg: impl Into>) -> Self { + Self::State(msg.into()) } } diff --git a/src/dev/subcommands/state_cmd.rs b/src/dev/subcommands/state_cmd.rs index b57f58084766..d5f97de1e70a 100644 --- a/src/dev/subcommands/state_cmd.rs +++ b/src/dev/subcommands/state_cmd.rs @@ -90,11 +90,7 @@ impl ComputeCommand { chain_store.heaviest_tipset(), ResolveNullTipset::TakeOlder, )?; - let ts_next = chain_store.chain_index().tipset_by_height( - epoch + 1, - chain_store.heaviest_tipset(), - ResolveNullTipset::TakeNewer, - )?; + let ts_next = chain_store.load_child_tipset(&ts)?; db.resume_tracking(); SettingsStoreExt::write_obj( &db.tracker, @@ -113,7 +109,6 @@ impl ComputeCommand { let StateOutput { state_root, receipt_root, - .. } = state_manager .compute_tipset_state(ts, crate::state_manager::NO_CALLBACK, VMTrace::NotTraced) .await?; diff --git a/src/rpc/methods/eth.rs b/src/rpc/methods/eth.rs index 44ec2f76a49d..801486b68ffd 100644 --- a/src/rpc/methods/eth.rs +++ b/src/rpc/methods/eth.rs @@ -52,7 +52,7 @@ use crate::shim::gas::GasOutputs; use crate::shim::message::Message; use crate::shim::trace::{CallReturn, ExecutionEvent}; use crate::shim::{clock::ChainEpoch, state_tree::StateTree}; -use crate::state_manager::{StateLookupPolicy, VMFlush}; +use crate::state_manager::{ExecutedMessage, ExecutedTipset, StateLookupPolicy, VMFlush}; use crate::utils::cache::SizeTrackingLruCache; use crate::utils::db::BlockstoreExt as _; use crate::utils::encoding::from_slice_with_fallback; @@ -497,16 +497,28 @@ impl Block { let block_number = EthInt64(tipset.epoch()); let block_hash: EthHash = block_cid.into(); - let (state_root, msgs_and_receipts) = execute_tipset(&ctx, &tipset).await?; - + let ExecutedTipset { + state_root, + executed_messages, + } = ctx + .state_manager + .load_executed_tipset_without_events(&tipset) + .await?; + let has_transactions = !executed_messages.is_empty(); let state_tree = ctx.state_manager.get_state_tree(&state_root)?; let mut full_transactions = vec![]; let mut gas_used = 0; - for (i, (msg, receipt)) in msgs_and_receipts.iter().enumerate() { + for ( + i, + ExecutedMessage { + message, receipt, .. + }, + ) in executed_messages.into_iter().enumerate() + { let ti = EthUint64(i as u64); gas_used += receipt.gas_used(); - let smsg = match msg { + let smsg = match message { ChainMessage::Signed(msg) => msg.clone(), ChainMessage::Unsigned(msg) => { let sig = Signature::new_bls(vec![]); @@ -538,7 +550,7 @@ impl Block { .into(), gas_used: EthUint64(gas_used), transactions: Transactions::Full(full_transactions), - ..Block::new(!msgs_and_receipts.is_empty(), tipset.len()) + ..Block::new(has_transactions, tipset.len()) }; ETH_BLOCK_CACHE.push(block_cid.into(), b.clone()); b @@ -959,28 +971,6 @@ fn resolve_block_hash_tipset( Ok(ts) } -async fn execute_tipset( - data: &Ctx, - tipset: &Tipset, -) -> Result<(Cid, Vec<(ChainMessage, Receipt)>)> { - let msgs = data.chain_store().messages_for_tipset(tipset)?; - - let (state_root, _) = data - .state_manager - .tipset_state(tipset, StateLookupPolicy::Enabled) - .await?; - let receipts = data.state_manager.tipset_message_receipts(tipset).await?; - - if msgs.len() != receipts.len() { - bail!("receipts and message array lengths didn't match for tipset: {tipset:?}") - } - - Ok(( - state_root, - msgs.into_iter().zip(receipts.into_iter()).collect(), - )) -} - pub fn is_eth_address(addr: &VmAddress) -> bool { if addr.protocol() != Protocol::Delegated { return false; @@ -1426,19 +1416,31 @@ async fn get_block_receipts( let ts_key = ts_ref.key(); // Execute the tipset to get the messages and receipts - let (state_root, msgs_and_receipts) = execute_tipset(ctx, &ts_ref).await?; + let ExecutedTipset { + state_root, + executed_messages, + } = ctx + .state_manager + .load_executed_tipset_without_events(&ts_ref) + .await?; // Load the state tree let state_tree = ctx.state_manager.get_state_tree(&state_root)?; - let mut eth_receipts = Vec::with_capacity(msgs_and_receipts.len()); - for (i, (msg, receipt)) in msgs_and_receipts.into_iter().enumerate() { + let mut eth_receipts = Vec::with_capacity(executed_messages.len()); + for ( + i, + ExecutedMessage { + message, receipt, .. + }, + ) in executed_messages.into_iter().enumerate() + { let tx = new_eth_tx( ctx, &state_tree, ts_ref.epoch(), &ts_key.cid()?, - &msg.cid(), + &message.cid(), i as u64, )?; @@ -1929,9 +1931,17 @@ async fn eth_fee_history( .take(block_count as _) { let base_fee = &ts.block_headers().first().parent_base_fee; - let (_state_root, messages_and_receipts) = execute_tipset(&ctx, &ts).await?; - let mut tx_gas_rewards = Vec::with_capacity(messages_and_receipts.len()); - for (message, receipt) in messages_and_receipts { + let ExecutedTipset { + executed_messages, .. + } = ctx + .state_manager + .load_executed_tipset_without_events(&ts) + .await?; + let mut tx_gas_rewards = Vec::with_capacity(executed_messages.len()); + for ExecutedMessage { + message, receipt, .. + } in executed_messages + { let premium = message.effective_gas_premium(base_fee); tx_gas_rewards.push(GasReward { gas_used: receipt.gas_used(), diff --git a/src/rpc/methods/eth/filter/mod.rs b/src/rpc/methods/eth/filter/mod.rs index dec8bec0c854..b724553f77ab 100644 --- a/src/rpc/methods/eth/filter/mod.rs +++ b/src/rpc/methods/eth/filter/mod.rs @@ -38,7 +38,7 @@ use crate::rpc::types::{Event, EventEntry}; use crate::shim::address::Address; use crate::shim::clock::ChainEpoch; use crate::shim::executor::{Entry, StampedEvent}; -use crate::state_manager::StateEvents; +use crate::state_manager::ExecutedMessage; use crate::utils::misc::env::env_or_default; use ahash::AHashMap as HashMap; use anyhow::{Context, Error, anyhow, bail, ensure}; @@ -267,89 +267,79 @@ impl EthEventHandler { skip_event: SkipEvent, collected_events: &mut Vec, ) -> anyhow::Result<()> { - let tipset_key = tipset.key().clone(); let height = tipset.epoch(); - - let messages = ctx.chain_store().messages_for_tipset(tipset)?; - - let StateEvents { events, .. } = ctx.state_manager.tipset_state_events(tipset).await?; - - ensure!( - messages.len() == events.len(), - "Length of messages ({}) and events ({}) do not match", - messages.len(), - events.len(), - ); - + let tipset_key = tipset.key(); + let executed_tipset = ctx.state_manager.load_executed_tipset(tipset).await?; let mut event_count = 0; - for (i, (message, events)) in messages.iter().zip(events.into_iter()).enumerate() { - for event in events.iter() { - let id_addr = Address::new_id(event.emitter()); - let result = ctx - .state_manager - .resolve_to_deterministic_address(id_addr, tipset) - .await - .with_context(|| { - format!( - "resolving address {} failed (EPOCH = {})", - id_addr, - tipset.epoch() - ) - }); - let resolved = if let Ok(resolved) = result { - resolved - } else { - event_count += 1; - if let SkipEvent::OnUnresolvedAddress = skip_event { - // Skip event - continue; + for ( + msg_idx, + ExecutedMessage { + message, events, .. + }, + ) in executed_tipset.executed_messages.into_iter().enumerate() + { + if let Some(events) = events { + let event_idx_base = u64::try_from(event_count)?; + event_count += events.len(); + for (event_idx, event) in (event_idx_base..).zip(events.iter()) { + let id_addr = Address::new_id(event.emitter()); + let result = ctx + .state_manager + .resolve_to_deterministic_address(id_addr, tipset) + .await + .with_context(|| { + format!( + "resolving address {} failed (EPOCH = {})", + id_addr, + tipset.epoch() + ) + }); + let resolved = if let Ok(resolved) = result { + resolved } else { - id_addr - } - }; - - let entries: Vec = event.event().entries(); + if let SkipEvent::OnUnresolvedAddress = skip_event { + // Skip event + continue; + } else { + id_addr + } + }; - let matched = if let Some(spec) = spec { - let matched = spec.matches(&resolved, &entries)?; - tracing::debug!( - "Event {} {}match filter topics", - event_count, - if matched { "" } else { "do not " } - ); - matched - } else { - true - }; - if matched { - let entries: Vec = entries - .into_iter() - .map(|entry| { - let (flags, key, codec, value) = entry.into_parts(); - EventEntry { - flags, - key, - codec, - value: value.into(), - } - }) - .collect(); - - let ce = CollectedEvent { - entries, - emitter_addr: resolved, - event_idx: event_count, - reverted: false, - height, - tipset_key: tipset_key.clone(), - msg_idx: i as u64, - msg_cid: message.cid(), + let entries: Vec = event.event().entries(); + let matched = if let Some(spec) = spec { + spec.matches(&resolved, &entries)? + } else { + true }; - if collected_events.len() >= ctx.eth_event_handler.max_filter_results { - bail!("filter matches too many events, try a more restricted filter"); + if matched { + let entries: Vec = entries + .into_iter() + .map(|entry| { + let (flags, key, codec, value) = entry.into_parts(); + EventEntry { + flags, + key, + codec, + value: value.into(), + } + }) + .collect(); + + let ce = CollectedEvent { + entries, + emitter_addr: resolved, + event_idx, + reverted: false, + height, + tipset_key: tipset_key.clone(), + msg_idx: msg_idx as u64, + msg_cid: message.cid(), + }; + if collected_events.len() >= ctx.eth_event_handler.max_filter_results { + bail!("filter matches too many events, try a more restricted filter"); + } + collected_events.push(ce); } - collected_events.push(ce); - event_count += 1; } } } @@ -384,7 +374,6 @@ impl EthEventHandler { match &pf.tipsets { ParsedFilterTipsets::Hash(block_hash) => { let tipset = get_tipset_from_hash(ctx.chain_store(), block_hash)?; - let tipset = Arc::new(tipset); Self::collect_events(ctx, &tipset, Some(pf), skip_event, &mut collected_events) .await?; } diff --git a/src/state_manager/cache.rs b/src/state_manager/cache.rs index fab6dea5d05d..8415a45ca186 100644 --- a/src/state_manager/cache.rs +++ b/src/state_manager/cache.rs @@ -1,14 +1,11 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT use crate::blocks::TipsetKey; -use crate::shim::executor::Receipt; -use crate::state_manager::{DEFAULT_TIPSET_CACHE_SIZE, StateEvents}; +use crate::state_manager::DEFAULT_TIPSET_CACHE_SIZE; use crate::utils::cache::{LruValueConstraints, SizeTrackingLruCache}; -use nonzero_ext::nonzero; use parking_lot::Mutex as SyncMutex; use std::future::Future; use std::num::NonZeroUsize; -use std::pin::Pin; use std::sync::Arc; use tokio::sync::Mutex as TokioMutex; @@ -136,170 +133,15 @@ impl TipsetStateCache { } } -// Type alias for the compute function for receipts -type ComputeReceiptFn = - Box Pin>> + Send>> + Send>; - -// Type alias for the compute function for state events -type ComputeEventsFn = - Box Pin> + Send>> + Send>; - -/// Defines the interface for caching and retrieving tipset-specific events and receipts. -pub trait TipsetReceiptEventCacheHandler: Send + Sync + 'static { - fn insert_receipt(&self, key: &TipsetKey, receipt: Vec); - fn insert_events(&self, key: &TipsetKey, events: StateEvents); - #[allow(dead_code)] - fn get_events(&self, key: &TipsetKey) -> Option; - #[allow(dead_code)] - fn get_receipts(&self, key: &TipsetKey) -> Option>; - fn get_receipt_or_else( - &self, - key: &TipsetKey, - compute: ComputeReceiptFn, - ) -> Pin>> + Send + '_>>; - fn get_events_or_else( - &self, - key: &TipsetKey, - compute: ComputeEventsFn, - ) -> Pin> + Send + '_>>; -} - -/// Cache for tipset-related events and receipts. -pub struct EnabledTipsetDataCache { - events_cache: TipsetStateCache, - receipt_cache: TipsetStateCache>, -} - -impl EnabledTipsetDataCache { - pub fn new() -> Self { - const DEFAULT_RECEIPT_AND_EVENT_CACHE_SIZE: NonZeroUsize = nonzero!(4096usize); - - Self { - events_cache: TipsetStateCache::with_size( - "events", - DEFAULT_RECEIPT_AND_EVENT_CACHE_SIZE, - ), - receipt_cache: TipsetStateCache::with_size( - "receipts", - DEFAULT_RECEIPT_AND_EVENT_CACHE_SIZE, - ), - } - } -} - -impl TipsetReceiptEventCacheHandler for EnabledTipsetDataCache { - fn insert_receipt(&self, key: &TipsetKey, mut receipts: Vec) { - if !receipts.is_empty() { - receipts.shrink_to_fit(); - self.receipt_cache.insert(key.clone(), receipts); - } - } - - fn insert_events(&self, key: &TipsetKey, mut events_data: StateEvents) { - if !events_data.events.is_empty() { - events_data.events.shrink_to_fit(); - events_data.roots.shrink_to_fit(); - self.events_cache.insert(key.clone(), events_data); - } - } - - fn get_events(&self, key: &TipsetKey) -> Option { - self.events_cache.get(key) - } - - fn get_receipts(&self, key: &TipsetKey) -> Option> { - self.receipt_cache.get(key) - } - - fn get_receipt_or_else( - &self, - key: &TipsetKey, - compute: ComputeReceiptFn, - ) -> Pin>> + Send + '_>> { - let key = key.clone(); - let receipt_cache = &self.receipt_cache; - - Box::pin(async move { - receipt_cache - .get_or_else(&key, || async move { compute().await }) - .await - }) - } - - fn get_events_or_else( - &self, - key: &TipsetKey, - compute: ComputeEventsFn, - ) -> Pin> + Send + '_>> { - let key = key.clone(); - let events_cache = &self.events_cache; - - Box::pin(async move { - events_cache - .get_or_else(&key, || async move { compute().await }) - .await - }) - } -} - -/// Fake cache for tipset-related events and receipts. -pub struct DisabledTipsetDataCache; - -impl DisabledTipsetDataCache { - pub fn new() -> Self { - Self {} - } -} - -impl TipsetReceiptEventCacheHandler for DisabledTipsetDataCache { - fn insert_receipt(&self, _key: &TipsetKey, _receipts: Vec) { - // No-op - } - - fn insert_events(&self, _key: &TipsetKey, _events_data: StateEvents) { - // No-op - } - - fn get_events(&self, _key: &TipsetKey) -> Option { - None - } - - fn get_receipts(&self, _key: &TipsetKey) -> Option> { - None - } - - fn get_receipt_or_else( - &self, - _key: &TipsetKey, - _compute: ComputeReceiptFn, - ) -> Pin>> + Send + '_>> { - Box::pin(async move { Ok(vec![]) }) - } - - fn get_events_or_else( - &self, - _key: &TipsetKey, - _compute: ComputeEventsFn, - ) -> Pin> + Send + '_>> { - Box::pin(async move { - Ok(StateEvents { - events: vec![], - roots: vec![], - }) - }) - } -} - #[cfg(test)] mod tests { use super::*; use crate::blocks::TipsetKey; - use crate::shim::executor::Receipt; use cid::Cid; use fvm_ipld_encoding::DAG_CBOR; use multihash_derive::MultihashDigest; use std::sync::Arc; - use std::sync::atomic::{AtomicU8, AtomicU32, Ordering}; + use std::sync::atomic::{AtomicU8, Ordering}; use std::time::Duration; fn create_test_tipset_key(i: u64) -> TipsetKey { @@ -311,15 +153,6 @@ mod tests { TipsetKey::from(nunny::vec![cid]) } - fn create_test_receipt(i: u64) -> Vec { - vec![Receipt::V4(fvm_shared4::receipt::Receipt { - exit_code: fvm_shared4::error::ExitCode::new(0), - return_data: fvm_ipld_encoding::RawBytes::default(), - gas_used: i * 100, - events_root: None, - })] - } - #[tokio::test] async fn test_tipset_cache_basic_functionality() { let cache: TipsetStateCache = TipsetStateCache::new("test"); @@ -428,98 +261,4 @@ mod tests { assert_eq!(result.as_ref().unwrap(), &format!("value_{i}")); } } - - #[tokio::test] - async fn test_enabled_cache_concurrent_access() { - let cache = Arc::new(EnabledTipsetDataCache::new()); - let key = create_test_tipset_key(1); - let computation_count = Arc::new(AtomicU32::new(0)); - - let mut handles = vec![]; - for i in 0..5 { - let cache_clone = Arc::clone(&cache); - let key_clone = key.clone(); - let count_clone = Arc::clone(&computation_count); - - let handle = tokio::spawn(async move { - cache_clone - .get_receipt_or_else( - &key_clone, - Box::new(move || { - let count = Arc::clone(&count_clone); - Box::pin(async move { - count.fetch_add(1, Ordering::SeqCst); - tokio::time::sleep(Duration::from_millis(10)).await; - Ok(create_test_receipt(i)) - }) - }), - ) - .await - }); - handles.push(handle); - } - - let results: Vec<_> = futures::future::join_all(handles) - .await - .into_iter() - .collect::, _>>() - .unwrap(); - - // Computation should have been performed once - assert_eq!(computation_count.load(Ordering::SeqCst), 1); - - // Only one result should be returned as computation was performed once, - // and all tasks will get the same result from the cache - let first_result = results[0].as_ref().unwrap(); - for result in &results { - let receipts = result.as_ref().unwrap(); - assert_eq!(receipts.len(), first_result.len()); - } - } - - #[tokio::test] - async fn test_disabled_cache_behavior() { - let cache = Arc::new(DisabledTipsetDataCache::new()); - let key = create_test_tipset_key(1); - let computation_count = Arc::new(AtomicU32::new(0)); - - // Test that the disabled cache doesn't compute and returns empty results - let mut handles = vec![]; - for i in 0..3 { - let cache_clone = Arc::clone(&cache); - let key_clone = key.clone(); - let count_clone = Arc::clone(&computation_count); - - let handle = tokio::spawn(async move { - cache_clone - .get_receipt_or_else( - &key_clone, - Box::new(move || { - let count = Arc::clone(&count_clone); - Box::pin(async move { - count.fetch_add(1, Ordering::SeqCst); - Ok(create_test_receipt(i)) - }) - }), - ) - .await - }); - handles.push(handle); - } - - let results: Vec<_> = futures::future::join_all(handles) - .await - .into_iter() - .collect::, _>>() - .unwrap(); - - // Disabled cache should never compute - it returns empty results immediately - assert_eq!(computation_count.load(Ordering::SeqCst), 0); - - // All results should be empty - for result in &results { - let receipts = result.as_ref().unwrap(); - assert!(receipts.is_empty()); - } - } } diff --git a/src/state_manager/mod.rs b/src/state_manager/mod.rs index ffff5decb28c..a60b9aaa23d4 100644 --- a/src/state_manager/mod.rs +++ b/src/state_manager/mod.rs @@ -51,15 +51,10 @@ use crate::shim::{ state_tree::{ActorState, StateTree}, version::NetworkVersion, }; -use crate::state_manager::cache::{ - DisabledTipsetDataCache, EnabledTipsetDataCache, TipsetReceiptEventCacheHandler, - TipsetStateCache, -}; +use crate::state_manager::cache::TipsetStateCache; use crate::state_manager::chain_rand::draw_randomness; use crate::state_migration::run_state_migrations; -use crate::utils::get_size::{ - GetSize, vec_heap_size_helper, vec_with_stack_only_item_heap_size_helper, -}; +use crate::utils::get_size::GetSize; use ahash::{HashMap, HashMapExt}; use anyhow::{Context as _, bail, ensure}; use bls_signatures::{PublicKey as BlsPublicKey, Serialize as _}; @@ -94,50 +89,40 @@ pub const EVENTS_AMT_BITWIDTH: u32 = 5; /// Intermediary for retrieving state objects and updating actor states. type CidPair = (Cid, Cid); -#[derive(Debug, Clone, GetSize)] // Added Debug -pub struct StateEvents { - #[get_size(size_fn = vec_heap_size_helper)] - pub events: Vec>, - #[get_size(size_fn = vec_with_stack_only_item_heap_size_helper)] - pub roots: Vec>, +/// Result of executing an individual chain message in a tipset. +/// +/// Includes the executed message itself, the execution receipt, and +/// optional events emitted by the actor during execution. +pub struct ExecutedMessage { + pub message: ChainMessage, + pub receipt: Receipt, + pub events: Option>, } -#[derive(Clone)] -pub struct StateOutput { +/// Aggregated execution result for a tipset. +/// +/// `state_root` is the resulting state tree root after message execution +/// and `executed_messages` contains per-message execution details. +pub struct ExecutedTipset { pub state_root: Cid, - pub receipt_root: Cid, - pub events: Vec>, - pub events_roots: Vec>, + pub executed_messages: Vec, +} + +/// Options controlling how `load_executed_tipset` fetches extra execution data. +/// +/// `include_events` toggles whether event logs are loaded from receipts. +pub struct LoadExecutedTipsetOptions { + pub include_events: bool, } #[derive(Debug, Default, Clone, GetSize)] -pub struct StateOutputValue { +pub struct StateOutput { #[get_size(ignore)] pub state_root: Cid, #[get_size(ignore)] pub receipt_root: Cid, } -impl From for StateOutput { - fn from(value: StateOutputValue) -> Self { - Self { - state_root: value.state_root, - receipt_root: value.receipt_root, - events: vec![], - events_roots: vec![], - } - } -} - -impl From for StateOutputValue { - fn from(value: StateOutput) -> Self { - StateOutputValue { - state_root: value.state_root, - receipt_root: value.receipt_root, - } - } -} - /// External format for returning market balance from state. #[derive( Debug, Default, Serialize, Deserialize, Clone, PartialEq, Eq, PartialOrd, Ord, JsonSchema, @@ -162,11 +147,9 @@ pub struct StateManager { /// Chain store cs: Arc>, /// This is a cache which indexes tipsets to their calculated state output (state root, receipt root). - cache: TipsetStateCache, + cache: TipsetStateCache, beacon: Arc, engine: Arc, - /// Handler for caching/retrieving tipset events and receipts. - receipt_event_cache_handler: Box, } #[allow(clippy::type_complexity)] @@ -187,19 +170,11 @@ where let genesis = cs.genesis_block_header(); let beacon = Arc::new(cs.chain_config().get_beacon_schedule(genesis.timestamp)); - let cache_handler: Box = - if cs.chain_config().enable_receipt_event_caching { - Box::new(EnabledTipsetDataCache::new()) - } else { - Box::new(DisabledTipsetDataCache::new()) - }; - Ok(Self { cs, - cache: TipsetStateCache::new("state_output"), // For StateOutputValue + cache: TipsetStateCache::new("state_output"), // For StateOutput beacon, engine, - receipt_event_cache_handler: cache_handler, }) } @@ -273,17 +248,11 @@ where let receipt_root = child.min_ticket_block().message_receipts; self.cache.insert( key.clone(), - StateOutputValue { + StateOutput { state_root, receipt_root, }, ); - if let Ok(receipts) = Receipt::get_receipts(self.blockstore(), receipt_root) - && !receipts.is_empty() - { - self.receipt_event_cache_handler - .insert_receipt(key, receipts); - } } } @@ -465,7 +434,6 @@ where let StateOutput { state_root, receipt_root, - .. } = self.tipset_state_output(tipset, state_lookup).await?; Ok((state_root, receipt_root)) } @@ -498,86 +466,116 @@ where .compute_tipset_state(tipset.clone(), NO_CALLBACK, VMTrace::NotTraced) .await?; - self.update_cache_with_state_output(key, &state_output); - - let ts_state = state_output.into(); - - Ok(ts_state) + Ok(state_output) }) .await - .map(StateOutput::from) } - /// update the receipt and events caches - fn update_cache_with_state_output(&self, key: &TipsetKey, state_output: &StateOutput) { - if !state_output.events.is_empty() || !state_output.events_roots.is_empty() { - let events_data = StateEvents { - events: state_output.events.clone(), - roots: state_output.events_roots.clone(), - }; - self.receipt_event_cache_handler - .insert_events(key, events_data); - } - - if let Ok(receipts) = Receipt::get_receipts(self.blockstore(), state_output.receipt_root) - && !receipts.is_empty() - { - self.receipt_event_cache_handler - .insert_receipt(key, receipts); - } + /// Load an executed tipset, including message receipts and state root, + /// without loading event logs from receipts. + pub async fn load_executed_tipset_without_events( + self: &Arc, + ts: &Tipset, + ) -> anyhow::Result { + let receipt_ts = self.chain_store().load_child_tipset(ts).ok(); + self.load_executed_tipset_inner( + ts, + receipt_ts.as_ref(), + LoadExecutedTipsetOptions { + include_events: false, + }, + ) + .await } - #[instrument(skip(self))] - pub async fn tipset_message_receipts( + /// Load an executed tipset, including message receipts and state root, + /// with event logs loaded when available. + pub async fn load_executed_tipset( self: &Arc, - tipset: &Tipset, - ) -> anyhow::Result> { - let key = tipset.key(); - let ts = tipset.clone(); - let this = Arc::clone(self); - self.receipt_event_cache_handler - .get_receipt_or_else( - key, - Box::new(move || { - Box::pin(async move { - let StateOutput { receipt_root, .. } = this - .compute_tipset_state(ts, NO_CALLBACK, VMTrace::NotTraced) - .await?; - trace!("Completed tipset state calculation"); - Receipt::get_receipts(this.blockstore(), receipt_root) - }) - }), - ) - .await + ts: &Tipset, + ) -> anyhow::Result { + let receipt_ts = self.chain_store().load_child_tipset(ts).ok(); + self.load_executed_tipset_inner( + ts, + receipt_ts.as_ref(), + LoadExecutedTipsetOptions { + include_events: true, + }, + ) + .await } - #[instrument(skip(self))] - pub async fn tipset_state_events( + async fn load_executed_tipset_inner( self: &Arc, - tipset: &Tipset, - ) -> anyhow::Result { - let key = tipset.key(); - let ts = tipset.clone(); - let this = Arc::clone(self); - let cids = tipset.cids(); - self.receipt_event_cache_handler - .get_events_or_else( - key, - Box::new(move || { - Box::pin(async move { - // Fallback: compute the tipset state if events not found in the blockstore - let state_out = this - .compute_tipset_state(ts, NO_CALLBACK, VMTrace::NotTraced) + msg_ts: &Tipset, + // when `msg_ts` is the current head, `receipt_ts` is `None` + receipt_ts: Option<&Tipset>, + options: LoadExecutedTipsetOptions, + ) -> anyhow::Result { + let LoadExecutedTipsetOptions { include_events } = options; + if let Some(receipt_ts) = receipt_ts { + anyhow::ensure!( + msg_ts.key() == receipt_ts.parents(), + "message tipset should be the parent of message receipt tipset" + ); + } + let messages = self.chain_store().messages_for_tipset(msg_ts)?; + let mut recomputed = false; + let (state_root, receipts) = match receipt_ts.and_then(|ts| { + Receipt::get_receipts(self.cs.blockstore(), *ts.parent_message_receipts()) + .ok() + .map(|r| (*ts.parent_state(), r)) + }) { + Some((state_root, receipts)) => (state_root, receipts), + None => { + let state_output = self + .compute_tipset_state(msg_ts.clone(), NO_CALLBACK, VMTrace::NotTraced) + .await?; + recomputed = true; + ( + state_output.state_root, + Receipt::get_receipts(self.cs.blockstore(), state_output.receipt_root)?, + ) + } + }; + anyhow::ensure!( + messages.len() == receipts.len(), + "mismatching message and receipt counts ({} messages, {} receipts)", + messages.len(), + receipts.len() + ); + let mut executed_messages = Vec::with_capacity(messages.len()); + for (message, receipt) in messages.into_iter().zip(receipts.into_iter()) { + let events = if include_events && let Some(events_root) = receipt.events_root() { + Some( + match StampedEvent::get_events(self.cs.blockstore(), &events_root) { + Ok(events) => events, + Err(e) if recomputed => return Err(e), + Err(_) => { + self.compute_tipset_state( + msg_ts.clone(), + NO_CALLBACK, + VMTrace::NotTraced, + ) .await?; - trace!("Completed tipset state calculation {:?}", cids); - Ok(StateEvents { - events: state_out.events, - roots: state_out.events_roots, - }) - }) - }), - ) - .await + recomputed = true; + StampedEvent::get_events(self.cs.blockstore(), &events_root)? + } + }, + ) + } else { + None + }; + executed_messages.push(ExecutedMessage { + message, + receipt, + events, + }); + } + Ok(ExecutedTipset { + state_root, + executed_messages, + }) } #[instrument(skip(self, rand))] @@ -1776,34 +1774,16 @@ where /// Attempts to lookup the state and receipt root of the next tipset. /// This is a performance optimization to avoid recomputing the state and receipt root by checking the blockstore. /// It only checks the immediate next epoch, as this is the most likely place to find a child. - fn try_lookup_state_from_next_tipset(&self, tipset: &Tipset) -> Option { - let epoch = tipset.epoch(); - let next_epoch = epoch + 1; - - // Only check the immediate next epoch - this is the most likely place to find a child - let heaviest = self.heaviest_tipset(); - if next_epoch > heaviest.epoch() { - return None; - } - + fn try_lookup_state_from_next_tipset(&self, ts: &Tipset) -> Option { // Check if the next tipset has the same parent - if let Ok(next_tipset) = - self.chain_index() - .tipset_by_height(next_epoch, heaviest, ResolveNullTipset::TakeNewer) - { - // verify that the parent of the `next_tipset` is the same as the current tipset - if !next_tipset.parents().eq(tipset.key()) { - return None; - } - - let state_root = next_tipset.parent_state(); - let receipt_root = next_tipset.min_ticket_block().message_receipts; - - if self.blockstore().has(state_root).unwrap_or(false) + if let Ok(child_ts) = self.chain_store().load_child_tipset(ts) { + let state_root = *child_ts.parent_state(); + let receipt_root = *child_ts.parent_message_receipts(); + if self.blockstore().has(&state_root).unwrap_or(false) && self.blockstore().has(&receipt_root).unwrap_or(false) { - return Some(StateOutputValue { - state_root: state_root.into(), + return Some(StateOutput { + state_root, receipt_root, }); } @@ -1834,7 +1814,6 @@ where let StateOutput { state_root: actual_state, receipt_root: actual_receipt, - .. } = apply_block_messages( genesis_timestamp, chain_index.clone(), @@ -2085,8 +2064,6 @@ where return Ok(StateOutput { state_root: *tipset.parent_state(), receipt_root: message_receipts, - events: vec![], - events_roots: vec![], }); } @@ -2139,8 +2116,6 @@ where Ok(StateOutput { state_root, receipt_root, - events, - events_roots, }) }) } diff --git a/src/state_manager/tests.rs b/src/state_manager/tests.rs index fb531fca2b08..1692e8e21aad 100644 --- a/src/state_manager/tests.rs +++ b/src/state_manager/tests.rs @@ -2,16 +2,16 @@ // SPDX-License-Identifier: Apache-2.0, MIT use super::*; -use crate::blocks::{Chain4U, HeaderBuilder, TipsetKey, chain4u}; +use crate::blocks::{Chain4U, HeaderBuilder, chain4u}; use crate::chain::ChainStore; use crate::db::MemoryDB; use crate::networks::ChainConfig; use crate::shim::clock::ChainEpoch; -use crate::shim::executor::{Receipt, StampedEvent}; +use crate::shim::executor::StampedEvent; use crate::utils::db::CborStoreExt; use crate::utils::multihash::MultihashCode; use cid::Cid; -use fil_actors_shared::fvm_ipld_amt::{Amt, Amtv0}; +use fil_actors_shared::fvm_ipld_amt::Amt; use fvm_ipld_blockstore::Blockstore; use fvm_ipld_encoding::DAG_CBOR; use multihash_derive::MultihashDigest; @@ -39,9 +39,7 @@ fn dummy_node(db: impl Blockstore, i: ChainEpoch) -> HeaderBuilder { /// Structure to hold the setup components for chain tests struct TestChainSetup { - db: Arc, chain_store: Arc>, - state_manager: Arc>, chain_builder: Chain4U>, state_root: Cid, receipt_root: Cid, @@ -68,8 +66,6 @@ fn setup_chain_with_tipsets() -> TestChainSetup { .expect("should create chain store"), ); - let state_manager = Arc::new(StateManager::new(chain_store.clone()).unwrap()); - // Create dummy state and receipt roots and store them in blockstore let state_root = create_dummy_cid(1); let receipt_root = create_dummy_cid(2); @@ -83,9 +79,7 @@ fn setup_chain_with_tipsets() -> TestChainSetup { .unwrap(); TestChainSetup { - db, chain_store, - state_manager, chain_builder, // Assign c4u to the named field state_root, receipt_root, @@ -305,131 +299,10 @@ fn test_try_lookup_state_from_next_tipset_missing_state_root() { // Should return None since the state root is missing assert!(result.is_none()); } -#[test] -fn test_update_receipt_and_events_cache_empty_events() { - let TestChainSetup { state_manager, .. } = setup_chain_with_tipsets(); - let tipset_key = TipsetKey::from(nunny::vec![create_dummy_cid(1)]); - - // Create state output with empty events - let state_output = StateOutput { - state_root: create_dummy_cid(2), - receipt_root: create_dummy_cid(3), - events: Vec::new(), - events_roots: Vec::new(), - }; - - state_manager.update_cache_with_state_output(&tipset_key, &state_output); - - // Verify events cache wasn't updated - assert!( - state_manager - .receipt_event_cache_handler - .get_events(&tipset_key) - .is_none() - ); - assert!( - state_manager - .receipt_event_cache_handler - .get_receipts(&tipset_key) - .is_none() - ); -} - -#[test] -fn test_update_receipt_and_events_cache_with_events() { - let TestChainSetup { - db, state_manager, .. - } = setup_chain_with_tipsets(); - let tipset_key = TipsetKey::from(nunny::vec![create_dummy_cid(1)]); - - let mock_event = vec![StampedEvent::V4(fvm_shared4::event::StampedEvent { - emitter: 1000, - event: fvm_shared4::event::ActorEvent { entries: vec![] }, - })]; - - let events_root = Amtv0::new_from_iter(&db, mock_event.clone()).unwrap(); - - // Create state output with non-empty events - let state_output = StateOutput { - state_root: create_dummy_cid(2), - receipt_root: create_dummy_cid(3), - events: vec![mock_event], - events_roots: vec![Some(events_root)], - }; - - state_manager.update_cache_with_state_output(&tipset_key, &state_output); - - // Verify events cache was updated - let cached_events = state_manager - .receipt_event_cache_handler - .get_events(&tipset_key); - assert!(cached_events.is_some()); - let events = cached_events.unwrap(); - assert_eq!(events.events.len(), 1); - assert_eq!(events.roots.len(), 1); -} - -#[test] -fn test_update_receipt_and_events_cache_receipts_success() { - let TestChainSetup { - db, state_manager, .. - } = setup_chain_with_tipsets(); - let tipset_key = TipsetKey::from(nunny::vec![create_dummy_cid(1)]); - - // Create dummy receipt data - let receipt = Receipt::V4(fvm_shared4::receipt::Receipt { - exit_code: fvm_shared4::error::ExitCode::new(0), - return_data: fvm_ipld_encoding::RawBytes::default(), - gas_used: 100, - events_root: None, - }); - - let receipt_root = Amtv0::new_from_iter(&db, vec![receipt]).unwrap(); - - let state_output = StateOutput { - state_root: create_dummy_cid(2), - receipt_root, - events: Vec::new(), - events_roots: Vec::new(), - }; - - state_manager.update_cache_with_state_output(&tipset_key, &state_output); - - // Verify the receipt cache was updated - let cached_receipts = state_manager - .receipt_event_cache_handler - .get_receipts(&tipset_key); - assert!(cached_receipts.is_some()); - let receipts = cached_receipts.unwrap(); - assert_eq!(receipts.len(), 1); -} - -#[test] -fn test_update_receipt_and_events_cache_receipts_failure() { - let TestChainSetup { state_manager, .. } = setup_chain_with_tipsets(); - let tipset_key = TipsetKey::from(nunny::vec![create_dummy_cid(1)]); - let receipt_root = create_dummy_cid(3); - - let state_output = StateOutput { - state_root: create_dummy_cid(2), - receipt_root, - events: Vec::new(), - events_roots: Vec::new(), - }; - - state_manager.update_cache_with_state_output(&tipset_key, &state_output); - - assert!( - state_manager - .receipt_event_cache_handler - .get_receipts(&tipset_key) - .is_none() - ); -} #[test] fn test_state_output_get_size() { - let s = StateOutputValue::default(); + let s = StateOutput::default(); assert_eq!(s.get_size(), std::mem::size_of_val(&s)); } diff --git a/src/state_manager/utils.rs b/src/state_manager/utils.rs index d295aedb08bd..a812507014a5 100644 --- a/src/state_manager/utils.rs +++ b/src/state_manager/utils.rs @@ -313,7 +313,6 @@ pub mod state_compute { let StateOutput { state_root, receipt_root, - .. } = state_manager .compute_tipset_state(ts, crate::state_manager::NO_CALLBACK, VMTrace::NotTraced) .await?; diff --git a/src/utils/get_size/mod.rs b/src/utils/get_size/mod.rs index 4adb98764962..6c35b3f2714a 100644 --- a/src/utils/get_size/mod.rs +++ b/src/utils/get_size/mod.rs @@ -35,14 +35,6 @@ macro_rules! impl_vec_alike_heap_size_helper { }; } -pub fn vec_with_stack_only_item_heap_size_helper(v: &Vec) -> usize { - v.capacity() * std::mem::size_of::() -} - -pub fn vec_heap_size_helper(v: &Vec) -> usize { - impl_vec_alike_heap_size_helper!(v, T) -} - pub fn vec_heap_size_with_fn_helper(v: &Vec, get_heap_size: impl Fn(&T) -> usize) -> usize { impl_vec_alike_heap_size_with_fn_helper!(v, T, std::mem::size_of::, get_heap_size) }