Skip to content

Commit 9d28a20

Browse files
mattsseampcode-com
andcommitted
refactor(engine): improve payload processor tx iterator
- Add transaction_count to ExecutionEnv, removing redundant parameter passing - Use rayon::spawn instead of spawn_blocking for parallel tx conversion - Remove transaction_count_hint parameter from spawn_caching_with and PrewarmCacheTask Co-authored-by: Amp <[email protected]> Amp-Thread-ID: https://ampcode.com/threads/T-019c157e-9967-76f6-bfeb-25f84b7b72b9
1 parent 9127563 commit 9d28a20

File tree

3 files changed

+18
-26
lines changed

3 files changed

+18
-26
lines changed

crates/engine/tree/src/tree/payload_processor/mod.rs

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,7 @@ where
235235
+ 'static,
236236
{
237237
// start preparing transactions immediately
238-
let (prewarm_rx, execution_rx, transaction_count_hint) =
239-
self.spawn_tx_iterator(transactions);
238+
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
240239

241240
let span = Span::current();
242241
let (to_sparse_trie, sparse_trie_rx) = channel();
@@ -260,7 +259,6 @@ where
260259
self.spawn_caching_with(
261260
env,
262261
prewarm_rx,
263-
transaction_count_hint,
264262
provider_builder.clone(),
265263
None, // Don't send proof targets when BAL is present
266264
Some(bal),
@@ -271,7 +269,6 @@ where
271269
self.spawn_caching_with(
272270
env,
273271
prewarm_rx,
274-
transaction_count_hint,
275272
provider_builder.clone(),
276273
Some(to_multi_proof.clone()),
277274
None,
@@ -355,10 +352,10 @@ where
355352
where
356353
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
357354
{
358-
let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
355+
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
359356
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
360357
let prewarm_handle =
361-
self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal, false);
358+
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
362359
PayloadHandle {
363360
to_multi_proof: None,
364361
prewarm_handle,
@@ -376,19 +373,15 @@ where
376373
) -> (
377374
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
378375
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
379-
usize,
380376
) {
381-
let (transactions, convert) = transactions.into();
382-
let transactions = transactions.into_par_iter();
383-
let transaction_count_hint = transactions.len();
384-
385377
let (ooo_tx, ooo_rx) = mpsc::channel();
386378
let (prewarm_tx, prewarm_rx) = mpsc::channel();
387379
let (execute_tx, execute_rx) = mpsc::channel();
388380

389381
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
390-
self.executor.spawn_blocking(move || {
391-
transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
382+
rayon::spawn(move || {
383+
let (transactions, convert) = transactions.into();
384+
transactions.into_par_iter().enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
392385
let tx = convert(tx);
393386
let tx = tx.map(|tx| {
394387
let (tx_env, tx) = tx.into_parts();
@@ -424,16 +417,14 @@ where
424417
}
425418
});
426419

427-
(prewarm_rx, execute_rx, transaction_count_hint)
420+
(prewarm_rx, execute_rx)
428421
}
429422

430423
/// Spawn prewarming optionally wired to the multiproof task for target updates.
431-
#[expect(clippy::too_many_arguments)]
432424
fn spawn_caching_with<P>(
433425
&self,
434426
env: ExecutionEnv<Evm>,
435427
mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
436-
transaction_count_hint: usize,
437428
provider_builder: StateProviderBuilder<N, P>,
438429
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
439430
bal: Option<Arc<BlockAccessList>>,
@@ -468,7 +459,6 @@ where
468459
self.execution_cache.clone(),
469460
prewarm_ctx,
470461
to_multi_proof,
471-
transaction_count_hint,
472462
self.prewarm_max_concurrency,
473463
);
474464

@@ -961,6 +951,10 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
961951
/// Used for sparse trie continuation: if the preserved trie's anchor matches this,
962952
/// the trie can be reused directly.
963953
pub parent_state_root: B256,
954+
/// Number of transactions in the block.
955+
/// Used to determine parallel worker count for prewarming.
956+
/// A value of 0 indicates the count is unknown.
957+
pub transaction_count: usize,
964958
}
965959

966960
impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
@@ -973,6 +967,7 @@ where
973967
hash: Default::default(),
974968
parent_hash: Default::default(),
975969
parent_state_root: Default::default(),
970+
transaction_count: 0,
976971
}
977972
}
978973
}

crates/engine/tree/src/tree/payload_processor/prewarm.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,6 @@ where
8484
ctx: PrewarmContext<N, P, Evm>,
8585
/// How many transactions should be executed in parallel
8686
max_concurrency: usize,
87-
/// The number of transactions to be processed
88-
transaction_count_hint: usize,
8987
/// Sender to emit evm state outcome messages, if any.
9088
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
9189
/// Receiver for events produced by tx execution
@@ -106,15 +104,14 @@ where
106104
execution_cache: PayloadExecutionCache,
107105
ctx: PrewarmContext<N, P, Evm>,
108106
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
109-
transaction_count_hint: usize,
110107
max_concurrency: usize,
111108
) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
112109
let (actions_tx, actions_rx) = channel();
113110

114111
trace!(
115112
target: "engine::tree::payload_processor::prewarm",
116113
max_concurrency,
117-
transaction_count_hint,
114+
transaction_count = ctx.env.transaction_count,
118115
"Initialized prewarm task"
119116
);
120117

@@ -124,7 +121,6 @@ where
124121
execution_cache,
125122
ctx,
126123
max_concurrency,
127-
transaction_count_hint,
128124
to_multi_proof,
129125
actions_rx,
130126
parent_span: Span::current(),
@@ -148,21 +144,21 @@ where
148144
let executor = self.executor.clone();
149145
let ctx = self.ctx.clone();
150146
let max_concurrency = self.max_concurrency;
151-
let transaction_count_hint = self.transaction_count_hint;
152147
let span = Span::current();
153148

154149
self.executor.spawn_blocking(move || {
155150
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
156151

157152
let (done_tx, done_rx) = mpsc::channel();
158153

159-
// When transaction_count_hint is 0, it means the count is unknown. In this case, spawn
154+
// When transaction_count is 0, it means the count is unknown. In this case, spawn
160155
// max workers to handle potentially many transactions in parallel rather
161156
// than bottlenecking on a single worker.
162-
let workers_needed = if transaction_count_hint == 0 {
157+
let transaction_count = ctx.env.transaction_count;
158+
let workers_needed = if transaction_count == 0 {
163159
max_concurrency
164160
} else {
165-
transaction_count_hint.min(max_concurrency)
161+
transaction_count.min(max_concurrency)
166162
};
167163

168164
// Spawn workers

crates/engine/tree/src/tree/payload_validator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ where
407407
hash: input.hash(),
408408
parent_hash: input.parent_hash(),
409409
parent_state_root: parent_block.state_root(),
410+
transaction_count: input.transaction_count(),
410411
};
411412

412413
// Plan the strategy used for state root computation.

0 commit comments

Comments
 (0)