Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions crates/engine/tree/src/tree/payload_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ where
+ 'static,
{
// start preparing transactions immediately
let (prewarm_rx, execution_rx, transaction_count_hint) =
self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);

let span = Span::current();
let (to_sparse_trie, sparse_trie_rx) = channel();
Expand All @@ -260,7 +259,6 @@ where
self.spawn_caching_with(
env,
prewarm_rx,
transaction_count_hint,
provider_builder.clone(),
None, // Don't send proof targets when BAL is present
Some(bal),
Expand All @@ -271,7 +269,6 @@ where
self.spawn_caching_with(
env,
prewarm_rx,
transaction_count_hint,
provider_builder.clone(),
Some(to_multi_proof.clone()),
None,
Expand Down Expand Up @@ -355,10 +352,10 @@ where
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
{
let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
let prewarm_handle =
self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal, false);
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
PayloadHandle {
to_multi_proof: None,
prewarm_handle,
Expand All @@ -376,19 +373,15 @@ where
) -> (
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
usize,
) {
let (transactions, convert) = transactions.into();
let transactions = transactions.into_par_iter();
let transaction_count_hint = transactions.len();

let (ooo_tx, ooo_rx) = mpsc::channel();
let (prewarm_tx, prewarm_rx) = mpsc::channel();
let (execute_tx, execute_rx) = mpsc::channel();

// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
self.executor.spawn_blocking(move || {
transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
rayon::spawn(move || {
let (transactions, convert) = transactions.into();
transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
let tx = convert(tx);
let tx = tx.map(|tx| {
let (tx_env, tx) = tx.into_parts();
Expand Down Expand Up @@ -424,16 +417,14 @@ where
}
});

(prewarm_rx, execute_rx, transaction_count_hint)
(prewarm_rx, execute_rx)
}

/// Spawn prewarming optionally wired to the multiproof task for target updates.
#[expect(clippy::too_many_arguments)]
fn spawn_caching_with<P>(
&self,
env: ExecutionEnv<Evm>,
mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
transaction_count_hint: usize,
provider_builder: StateProviderBuilder<N, P>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
bal: Option<Arc<BlockAccessList>>,
Expand Down Expand Up @@ -468,7 +459,6 @@ where
self.execution_cache.clone(),
prewarm_ctx,
to_multi_proof,
transaction_count_hint,
self.prewarm_max_concurrency,
);

Expand Down Expand Up @@ -961,6 +951,10 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
/// Used for sparse trie continuation: if the preserved trie's anchor matches this,
/// the trie can be reused directly.
pub parent_state_root: B256,
/// Number of transactions in the block.
/// Used to determine parallel worker count for prewarming.
/// A value of 0 indicates the count is unknown.
pub transaction_count: usize,
}

impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
Expand All @@ -973,6 +967,7 @@ where
hash: Default::default(),
parent_hash: Default::default(),
parent_state_root: Default::default(),
transaction_count: 0,
}
}
}
Expand Down
14 changes: 5 additions & 9 deletions crates/engine/tree/src/tree/payload_processor/prewarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,6 @@ where
ctx: PrewarmContext<N, P, Evm>,
/// How many transactions should be executed in parallel
max_concurrency: usize,
/// The number of transactions to be processed
transaction_count_hint: usize,
/// Sender to emit evm state outcome messages, if any.
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
/// Receiver for events produced by tx execution
Expand All @@ -106,15 +104,14 @@ where
execution_cache: PayloadExecutionCache,
ctx: PrewarmContext<N, P, Evm>,
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
transaction_count_hint: usize,
max_concurrency: usize,
) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
let (actions_tx, actions_rx) = channel();

trace!(
target: "engine::tree::payload_processor::prewarm",
max_concurrency,
transaction_count_hint,
transaction_count = ctx.env.transaction_count,
"Initialized prewarm task"
);

Expand All @@ -124,7 +121,6 @@ where
execution_cache,
ctx,
max_concurrency,
transaction_count_hint,
to_multi_proof,
actions_rx,
parent_span: Span::current(),
Expand All @@ -148,21 +144,21 @@ where
let executor = self.executor.clone();
let ctx = self.ctx.clone();
let max_concurrency = self.max_concurrency;
let transaction_count_hint = self.transaction_count_hint;
let span = Span::current();

self.executor.spawn_blocking(move || {
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();

let (done_tx, done_rx) = mpsc::channel();

// When transaction_count_hint is 0, it means the count is unknown. In this case, spawn
// When transaction_count is 0, it means the count is unknown. In this case, spawn
// max workers to handle potentially many transactions in parallel rather
// than bottlenecking on a single worker.
let workers_needed = if transaction_count_hint == 0 {
let transaction_count = ctx.env.transaction_count;
let workers_needed = if transaction_count == 0 {
max_concurrency
} else {
transaction_count_hint.min(max_concurrency)
transaction_count.min(max_concurrency)
};

// Spawn workers
Expand Down
1 change: 1 addition & 0 deletions crates/engine/tree/src/tree/payload_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ where
hash: input.hash(),
parent_hash: input.parent_hash(),
parent_state_root: parent_block.state_root(),
transaction_count: input.transaction_count(),
};

// Plan the strategy used for state root computation.
Expand Down
Loading