Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions crates/anvil-polkadot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -163,5 +163,4 @@ op-alloy-rpc-types.workspace = true

[features]
default = []
asm-keccak = ["alloy-primitives/asm-keccak"]
forking-tests = []
asm-keccak = ["alloy-primitives/asm-keccak"]
48 changes: 42 additions & 6 deletions crates/anvil-polkadot/src/api_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,13 @@ use sqlx::sqlite::SqlitePoolOptions;
use std::{collections::BTreeSet, sync::Arc, time::Duration};
use substrate_runtime::{Balance, constants::NATIVE_TO_ETH_RATIO};
use subxt::{
Metadata as SubxtMetadata, OnlineClient, backend::rpc::RpcClient,
client::RuntimeVersion as SubxtRuntimeVersion, config::substrate::H256,
ext::subxt_rpcs::LegacyRpcMethods, utils::H160,
Metadata as SubxtMetadata, OnlineClient,
backend::rpc::RpcClient,
client::RuntimeVersion as SubxtRuntimeVersion,
config::substrate::H256,
dynamic::{Value as DynamicValue, tx as dynamic_tx},
ext::subxt_rpcs::LegacyRpcMethods,
utils::H160,
};
use subxt_signer::eth::Keypair;
use tokio::try_join;
Expand All @@ -113,6 +117,12 @@ pub struct ApiServer {
/// Tracks all active filters
filters: Filters,
hardcoded_chain_id: u64,
/// RPC methods for submitting transactions
rpc: LegacyRpcMethods<SrcChainConfig>,
/// Subxt OnlineClient for dynamic transaction building.
/// When forking, the metadata comes from the forked chain's WASM (loaded via lazy loading),
/// so pallet indices will be correct for the forked runtime.
api: OnlineClient<SrcChainConfig>,
}

/// Fetch the chain ID from the substrate chain.
Expand Down Expand Up @@ -140,7 +150,7 @@ impl ApiServer {
let eth_rpc_client = create_revive_rpc_client(
api.clone(),
rpc_client.clone(),
rpc,
rpc.clone(),
block_provider.clone(),
substrate_service.spawn_handle.clone(),
revive_rpc_block_limit,
Expand Down Expand Up @@ -176,6 +186,8 @@ impl ApiServer {
instance_id: B256::random(),
filters,
hardcoded_chain_id: chain_id,
rpc,
api,
})
}

Expand Down Expand Up @@ -797,8 +809,32 @@ impl ApiServer {

async fn send_raw_transaction(&self, transaction: Bytes) -> Result<H256> {
let hash = H256(keccak_256(&transaction.0));
let call = subxt_client::tx().revive().eth_transact(transaction.0);
self.eth_rpc_client.submit(call).await?;

// Prefetch storage keys for the sender to speed up transaction validation.
// This is especially important when forking from a remote chain, as each storage
// read would otherwise require a separate RPC call. When not forking, this is a no-op.
if let Ok(signed_tx) = TransactionSigned::decode(&transaction.0)
&& let Ok(sender) = recover_maybe_impersonated_address(&signed_tx)
{
self.backend.prefetch_eth_transaction_keys(sender);
}

// Use dynamic transaction building to ensure the correct pallet index is used.
// The metadata in self.api comes from the runtime's WASM (via runtime API call),
// which is the forked chain's WASM when forking. This ensures correct pallet indices.
let payload_value = DynamicValue::from_bytes(transaction.0.clone());
let tx_payload = dynamic_tx("Revive", "eth_transact", vec![payload_value]);

let ext = self.api.tx().create_unsigned(&tx_payload).map_err(|e| {
Error::InternalError(format!("Failed to create unsigned extrinsic: {e}"))
})?;

// Submit the extrinsic to the transaction pool
self.rpc
.author_submit_extrinsic(ext.encoded())
.await
.map_err(|e| Error::InternalError(format!("Failed to submit transaction: {e}")))?;

Ok(hash)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ impl<Block: BlockT + DeserializeOwned> Blockchain<Block> {
storage.genesis_hash = hash;
}

// Update leaves for non-genesis blocks
if storage.blocks.len() > 1 {
storage.leaves.import(hash, number, *header.parent_hash());
}
// Update leaves for all blocks including genesis.
// For genesis when forking, the parent_hash points to the previous block on the remote chain.
// That parent won't be in our leaf set, so this effectively adds genesis as a new leaf.
storage.leaves.import(hash, number, *header.parent_hash());

// Finalize block only if explicitly requested via new_state
if let NewBlockState::Final = new_state {
Expand Down Expand Up @@ -266,25 +266,43 @@ impl<Block: BlockT + DeserializeOwned> HeaderBackend<Block> for Blockchain<Block

// If not found in local storage, fetch from RPC client
let header = if let Some(rpc) = self.rpc() {
rpc.block(Some(hash)).ok().flatten().map(|full| {
let block = full.block.clone();
self.storage
.write()
.blocks
.insert(hash, StoredBlock::Full(block.clone(), full.justifications));
block.header().clone()
})
match rpc.block(Some(hash)) {
Ok(Some(full)) => {
let block = full.block.clone();
self.storage
.write()
.blocks
.insert(hash, StoredBlock::Full(block.clone(), full.justifications));
Some(block.header().clone())
}
Ok(None) => {
// Block not found on remote chain - this is expected for locally-built blocks
tracing::debug!(
target: LAZY_LOADING_LOG_TARGET,
"Block {:?} not found in local storage or remote RPC",
hash
);
None
}
Err(e) => {
tracing::warn!(
target: LAZY_LOADING_LOG_TARGET,
"Failed to fetch block {:?} from RPC: {}",
hash,
e
);
None
}
}
} else {
None
};

if header.is_none() {
tracing::warn!(
// No RPC configured - block simply doesn't exist locally
tracing::debug!(
target: LAZY_LOADING_LOG_TARGET,
"Expected block {:x?} to exist.",
&hash
"Block {:?} not found in local storage (no RPC configured)",
hash
);
}
None
};

Ok(header)
}
Expand Down Expand Up @@ -418,19 +436,34 @@ impl<Block: BlockT + DeserializeOwned> sp_blockchain::Backend<Block> for Blockch
Ok(leaves)
}

fn children(&self, _parent_hash: Block::Hash) -> sp_blockchain::Result<Vec<Block::Hash>> {
unimplemented!("Not supported by the `lazy-loading` backend.")
fn children(&self, parent_hash: Block::Hash) -> sp_blockchain::Result<Vec<Block::Hash>> {
// Find all blocks whose parent_hash matches the given hash
let storage = self.storage.read();
let children: Vec<Block::Hash> = storage
.blocks
.iter()
.filter_map(|(hash, block)| {
if *block.header().parent_hash() == parent_hash {
Some(*hash)
} else {
None
}
})
.collect();
Ok(children)
}

fn indexed_transaction(&self, _hash: Block::Hash) -> sp_blockchain::Result<Option<Vec<u8>>> {
unimplemented!("Not supported by the `lazy-loading` backend.")
// Indexed transactions are not supported in the lazy-loading backend
Ok(None)
}

fn block_indexed_body(
&self,
_hash: Block::Hash,
) -> sp_blockchain::Result<Option<Vec<Vec<u8>>>> {
unimplemented!("Not supported by the `lazy-loading` backend.")
// Indexed block bodies are not supported in the lazy-loading backend
Ok(None)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use polkadot_sdk::{
traits::{Block as BlockT, HashingFor},
},
sp_state_machine::{
self, BackendTransaction, InMemoryBackend, IterArgs, StorageCollection, StorageValue,
TrieBackend, backend::AsTrieBackend,
self, Backend as StateMachineBackend, BackendTransaction, InMemoryBackend, IterArgs,
StorageCollection, StorageValue, TrieBackend, backend::AsTrieBackend,
},
sp_storage::ChildInfo,
sp_trie::{self, PrefixedMemoryDB},
Expand Down Expand Up @@ -99,6 +99,63 @@ impl<Block: BlockT + DeserializeOwned> ForkedLazyBackend<Block> {
pub(crate) fn rpc(&self) -> Option<&dyn RPCClient<Block>> {
self.rpc_client.as_deref()
}

/// Prefetch multiple storage keys in a single RPC batch call.
/// This significantly reduces latency when we know which keys will be needed.
/// Keys that are already cached or marked as removed will be skipped.
/// Returns the number of keys actually fetched from remote.
pub fn prefetch_keys(&self, keys: &[Vec<u8>]) -> usize {
if keys.is_empty() {
return 0;
}

let rpc = match self.rpc() {
Some(rpc) => rpc,
None => return 0,
};

// Filter out keys that are already cached or removed
let db = self.db.read();
let removed = self.removed_keys.read();

let keys_to_fetch: Vec<polkadot_sdk::sp_storage::StorageKey> = keys
.iter()
.filter(|key| {
// Skip if already in cache
if StateMachineBackend::storage(&*db, key).ok().flatten().is_some() {
return false;
}
// Skip if marked as removed
if removed.contains(*key) {
return false;
}
true
})
.map(|key| polkadot_sdk::sp_storage::StorageKey(key.clone()))
.collect();

drop(db);
drop(removed);

if keys_to_fetch.is_empty() {
return 0;
}

let fetch_count = keys_to_fetch.len();

// Use the batch RPC call
let block_to_query = if self.before_fork { self.block_hash } else { self.fork_block };

match rpc.storage_batch(keys_to_fetch, block_to_query) {
Ok(results) => {
for (key, value) in results {
self.update_storage(&key.0, &value.map(|v| v.0));
}
fetch_count
}
Err(_) => 0,
}
}
}

impl<Block: BlockT + DeserializeOwned> sp_state_machine::Backend<HashingFor<Block>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ mod tests;
use parking_lot::RwLock;
use polkadot_sdk::{
sc_client_api::{
TrieCacheContext, UsageInfo,
backend::{self, AuxStore},
HeaderBackend, TrieCacheContext, UsageInfo,
backend::{self, AuxStore, Backend as ClientBackend},
},
sp_blockchain,
sp_core::{H256, offchain::storage::InMemOffchainStorage},
Expand Down Expand Up @@ -59,6 +59,22 @@ impl<Block: BlockT + DeserializeOwned> Backend<Block> {
fn fork_checkpoint(&self) -> Option<&Block::Header> {
self.fork_config.as_ref().map(|(_, checkpoint)| checkpoint)
}

/// Prefetch multiple storage keys in a single batch RPC call.
/// This significantly reduces latency when we know which keys will be needed
/// (e.g., before transaction validation).
/// Returns the number of keys actually fetched from remote.
pub fn prefetch_storage_keys(&self, keys: &[Vec<u8>]) -> usize {
// Get the best block hash to find the current state
let best_hash = HeaderBackend::info(&self.blockchain).best_hash;

// Try to get the state for the best block
if let Ok(state) = ClientBackend::state_at(self, best_hash, TrieCacheContext::Trusted) {
state.prefetch_keys(keys)
} else {
0
}
}
}

impl<Block: BlockT + DeserializeOwned> AuxStore for Backend<Block> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;
use mock_rpc::{Rpc, TestBlock, TestHeader};
use parking_lot::RwLock;
use polkadot_sdk::{
sc_client_api::{Backend as BackendT, HeaderBackend, StateBackend},
sc_client_api::{Backend as BackendT, StateBackend},
sp_runtime::{
OpaqueExtrinsic,
traits::{BlakeTwo256, Header as HeaderT},
Expand Down Expand Up @@ -276,6 +276,22 @@ mod mock_rpc {
let take = min(filtered.len(), count as usize);
Ok(filtered.into_iter().take(take).map(|k| k.0).collect())
}

fn storage_batch(
&self,
keys: Vec<StorageKey>,
at: Option<Block::Hash>,
) -> Result<Vec<(StorageKey, Option<StorageData>)>, jsonrpsee::core::ClientError> {
// Simple implementation: just call storage for each key
let results = keys
.into_iter()
.map(|key| {
let value = self.storage(key.clone(), at).ok().flatten();
(key, value)
})
.collect();
Ok(results)
}
}
}

Expand Down
Loading
Loading