diff --git a/crates/engine/tree/src/tree/payload_processor/mod.rs b/crates/engine/tree/src/tree/payload_processor/mod.rs index e517d2a83c1..f01c0cce0fe 100644 --- a/crates/engine/tree/src/tree/payload_processor/mod.rs +++ b/crates/engine/tree/src/tree/payload_processor/mod.rs @@ -235,8 +235,7 @@ where + 'static, { // start preparing transactions immediately - let (prewarm_rx, execution_rx, transaction_count_hint) = - self.spawn_tx_iterator(transactions); + let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions); let span = Span::current(); let (to_sparse_trie, sparse_trie_rx) = channel(); @@ -260,7 +259,6 @@ where self.spawn_caching_with( env, prewarm_rx, - transaction_count_hint, provider_builder.clone(), None, // Don't send proof targets when BAL is present Some(bal), @@ -271,7 +269,6 @@ where self.spawn_caching_with( env, prewarm_rx, - transaction_count_hint, provider_builder.clone(), Some(to_multi_proof.clone()), None, @@ -355,10 +352,10 @@ where where P: BlockReader + StateProviderFactory + StateReader + Clone + 'static, { - let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions); + let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions); // This path doesn't use multiproof, so V2 proofs flag doesn't matter let prewarm_handle = - self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal, false); + self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false); PayloadHandle { to_multi_proof: None, prewarm_handle, @@ -376,19 +373,15 @@ where ) -> ( mpsc::Receiver, I::Recovered>>, mpsc::Receiver, I::Recovered>, I::Error>>, - usize, ) { - let (transactions, convert) = transactions.into(); - let transactions = transactions.into_par_iter(); - let transaction_count_hint = transactions.len(); - let (ooo_tx, ooo_rx) = mpsc::channel(); let (prewarm_tx, prewarm_rx) = mpsc::channel(); let (execute_tx, execute_rx) = mpsc::channel(); // Spawn a task that `convert`s all transactions in parallel and sends them out-of-order. - self.executor.spawn_blocking(move || { - transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| { + rayon::spawn(move || { + let (transactions, convert) = transactions.into(); + transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| { let tx = convert(tx); let tx = tx.map(|tx| { let (tx_env, tx) = tx.into_parts(); @@ -424,16 +417,14 @@ where } }); - (prewarm_rx, execute_rx, transaction_count_hint) + (prewarm_rx, execute_rx) } /// Spawn prewarming optionally wired to the multiproof task for target updates. - #[expect(clippy::too_many_arguments)] fn spawn_caching_with

( &self, env: ExecutionEnv, mut transactions: mpsc::Receiver + Clone + Send + 'static>, - transaction_count_hint: usize, provider_builder: StateProviderBuilder, to_multi_proof: Option>, bal: Option>, @@ -468,7 +459,6 @@ where self.execution_cache.clone(), prewarm_ctx, to_multi_proof, - transaction_count_hint, self.prewarm_max_concurrency, ); @@ -961,6 +951,10 @@ pub struct ExecutionEnv { /// Used for sparse trie continuation: if the preserved trie's anchor matches this, /// the trie can be reused directly. pub parent_state_root: B256, + /// Number of transactions in the block. + /// Used to determine parallel worker count for prewarming. + /// A value of 0 indicates the count is unknown. + pub transaction_count: usize, } impl Default for ExecutionEnv @@ -973,6 +967,7 @@ where hash: Default::default(), parent_hash: Default::default(), parent_state_root: Default::default(), + transaction_count: 0, } } } diff --git a/crates/engine/tree/src/tree/payload_processor/prewarm.rs b/crates/engine/tree/src/tree/payload_processor/prewarm.rs index a7a15602979..4aa95c3c3b0 100644 --- a/crates/engine/tree/src/tree/payload_processor/prewarm.rs +++ b/crates/engine/tree/src/tree/payload_processor/prewarm.rs @@ -84,8 +84,6 @@ where ctx: PrewarmContext, /// How many transactions should be executed in parallel max_concurrency: usize, - /// The number of transactions to be processed - transaction_count_hint: usize, /// Sender to emit evm state outcome messages, if any. to_multi_proof: Option>, /// Receiver for events produced by tx execution @@ -106,7 +104,6 @@ where execution_cache: PayloadExecutionCache, ctx: PrewarmContext, to_multi_proof: Option>, - transaction_count_hint: usize, max_concurrency: usize, ) -> (Self, Sender>) { let (actions_tx, actions_rx) = channel(); @@ -114,7 +111,7 @@ where trace!( target: "engine::tree::payload_processor::prewarm", max_concurrency, - transaction_count_hint, + transaction_count = ctx.env.transaction_count, "Initialized prewarm task" ); @@ -124,7 +121,6 @@ where execution_cache, ctx, max_concurrency, - transaction_count_hint, to_multi_proof, actions_rx, parent_span: Span::current(), @@ -148,7 +144,6 @@ where let executor = self.executor.clone(); let ctx = self.ctx.clone(); let max_concurrency = self.max_concurrency; - let transaction_count_hint = self.transaction_count_hint; let span = Span::current(); self.executor.spawn_blocking(move || { @@ -156,13 +151,14 @@ where let (done_tx, done_rx) = mpsc::channel(); - // When transaction_count_hint is 0, it means the count is unknown. In this case, spawn + // When transaction_count is 0, it means the count is unknown. In this case, spawn // max workers to handle potentially many transactions in parallel rather // than bottlenecking on a single worker. - let workers_needed = if transaction_count_hint == 0 { + let transaction_count = ctx.env.transaction_count; + let workers_needed = if transaction_count == 0 { max_concurrency } else { - transaction_count_hint.min(max_concurrency) + transaction_count.min(max_concurrency) }; // Spawn workers diff --git a/crates/engine/tree/src/tree/payload_validator.rs b/crates/engine/tree/src/tree/payload_validator.rs index b1163054d2e..e559ee58afd 100644 --- a/crates/engine/tree/src/tree/payload_validator.rs +++ b/crates/engine/tree/src/tree/payload_validator.rs @@ -407,6 +407,7 @@ where hash: input.hash(), parent_hash: input.parent_hash(), parent_state_root: parent_block.state_root(), + transaction_count: input.transaction_count(), }; // Plan the strategy used for state root computation.