Skip to content

Commit d3a0090

Browse files
mattsseampcode-com
andcommitted
refactor(engine): improve payload processor tx iterator
- Add transaction_count to ExecutionEnv, removing redundant parameter passing - Use rayon::spawn instead of spawn_blocking for parallel tx conversion - Remove transaction_count_hint parameter from spawn_caching_with and PrewarmCacheTask Amp-Thread-ID: https://ampcode.com/threads/T-019c157e-9967-76f6-bfeb-25f84b7b72b9 Co-authored-by: Amp <[email protected]>
1 parent 9127563 commit d3a0090

File tree

3 files changed

+16
-22
lines changed

3 files changed

+16
-22
lines changed

crates/engine/tree/src/tree/payload_processor/mod.rs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,7 @@ where
235235
+ 'static,
236236
{
237237
// start preparing transactions immediately
238-
let (prewarm_rx, execution_rx, transaction_count_hint) =
239-
self.spawn_tx_iterator(transactions);
238+
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
240239

241240
let span = Span::current();
242241
let (to_sparse_trie, sparse_trie_rx) = channel();
@@ -260,7 +259,6 @@ where
260259
self.spawn_caching_with(
261260
env,
262261
prewarm_rx,
263-
transaction_count_hint,
264262
provider_builder.clone(),
265263
None, // Don't send proof targets when BAL is present
266264
Some(bal),
@@ -271,7 +269,6 @@ where
271269
self.spawn_caching_with(
272270
env,
273271
prewarm_rx,
274-
transaction_count_hint,
275272
provider_builder.clone(),
276273
Some(to_multi_proof.clone()),
277274
None,
@@ -355,10 +352,10 @@ where
355352
where
356353
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
357354
{
358-
let (prewarm_rx, execution_rx, size_hint) = self.spawn_tx_iterator(transactions);
355+
let (prewarm_rx, execution_rx) = self.spawn_tx_iterator(transactions);
359356
// This path doesn't use multiproof, so V2 proofs flag doesn't matter
360357
let prewarm_handle =
361-
self.spawn_caching_with(env, prewarm_rx, size_hint, provider_builder, None, bal, false);
358+
self.spawn_caching_with(env, prewarm_rx, provider_builder, None, bal, false);
362359
PayloadHandle {
363360
to_multi_proof: None,
364361
prewarm_handle,
@@ -376,18 +373,16 @@ where
376373
) -> (
377374
mpsc::Receiver<WithTxEnv<TxEnvFor<Evm>, I::Recovered>>,
378375
mpsc::Receiver<Result<WithTxEnv<TxEnvFor<Evm>, I::Recovered>, I::Error>>,
379-
usize,
380376
) {
381377
let (transactions, convert) = transactions.into();
382378
let transactions = transactions.into_par_iter();
383-
let transaction_count_hint = transactions.len();
384379

385380
let (ooo_tx, ooo_rx) = mpsc::channel();
386381
let (prewarm_tx, prewarm_rx) = mpsc::channel();
387382
let (execute_tx, execute_rx) = mpsc::channel();
388383

389384
// Spawn a task that `convert`s all transactions in parallel and sends them out-of-order.
390-
self.executor.spawn_blocking(move || {
385+
rayon::spawn(move || {
391386
transactions.enumerate().for_each_with(ooo_tx, |ooo_tx, (idx, tx)| {
392387
let tx = convert(tx);
393388
let tx = tx.map(|tx| {
@@ -424,16 +419,14 @@ where
424419
}
425420
});
426421

427-
(prewarm_rx, execute_rx, transaction_count_hint)
422+
(prewarm_rx, execute_rx)
428423
}
429424

430425
/// Spawn prewarming optionally wired to the multiproof task for target updates.
431-
#[expect(clippy::too_many_arguments)]
432426
fn spawn_caching_with<P>(
433427
&self,
434428
env: ExecutionEnv<Evm>,
435429
mut transactions: mpsc::Receiver<impl ExecutableTxFor<Evm> + Clone + Send + 'static>,
436-
transaction_count_hint: usize,
437430
provider_builder: StateProviderBuilder<N, P>,
438431
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
439432
bal: Option<Arc<BlockAccessList>>,
@@ -468,7 +461,6 @@ where
468461
self.execution_cache.clone(),
469462
prewarm_ctx,
470463
to_multi_proof,
471-
transaction_count_hint,
472464
self.prewarm_max_concurrency,
473465
);
474466

@@ -961,6 +953,10 @@ pub struct ExecutionEnv<Evm: ConfigureEvm> {
961953
/// Used for sparse trie continuation: if the preserved trie's anchor matches this,
962954
/// the trie can be reused directly.
963955
pub parent_state_root: B256,
956+
/// Number of transactions in the block.
957+
/// Used to determine parallel worker count for prewarming.
958+
/// A value of 0 indicates the count is unknown.
959+
pub transaction_count: usize,
964960
}
965961

966962
impl<Evm: ConfigureEvm> Default for ExecutionEnv<Evm>
@@ -973,6 +969,7 @@ where
973969
hash: Default::default(),
974970
parent_hash: Default::default(),
975971
parent_state_root: Default::default(),
972+
transaction_count: 0,
976973
}
977974
}
978975
}

crates/engine/tree/src/tree/payload_processor/prewarm.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,6 @@ where
8484
ctx: PrewarmContext<N, P, Evm>,
8585
/// How many transactions should be executed in parallel
8686
max_concurrency: usize,
87-
/// The number of transactions to be processed
88-
transaction_count_hint: usize,
8987
/// Sender to emit evm state outcome messages, if any.
9088
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
9189
/// Receiver for events produced by tx execution
@@ -106,15 +104,14 @@ where
106104
execution_cache: PayloadExecutionCache,
107105
ctx: PrewarmContext<N, P, Evm>,
108106
to_multi_proof: Option<CrossbeamSender<MultiProofMessage>>,
109-
transaction_count_hint: usize,
110107
max_concurrency: usize,
111108
) -> (Self, Sender<PrewarmTaskEvent<N::Receipt>>) {
112109
let (actions_tx, actions_rx) = channel();
113110

114111
trace!(
115112
target: "engine::tree::payload_processor::prewarm",
116113
max_concurrency,
117-
transaction_count_hint,
114+
transaction_count = ctx.env.transaction_count,
118115
"Initialized prewarm task"
119116
);
120117

@@ -124,7 +121,6 @@ where
124121
execution_cache,
125122
ctx,
126123
max_concurrency,
127-
transaction_count_hint,
128124
to_multi_proof,
129125
actions_rx,
130126
parent_span: Span::current(),
@@ -148,21 +144,21 @@ where
148144
let executor = self.executor.clone();
149145
let ctx = self.ctx.clone();
150146
let max_concurrency = self.max_concurrency;
151-
let transaction_count_hint = self.transaction_count_hint;
152147
let span = Span::current();
153148

154149
self.executor.spawn_blocking(move || {
155150
let _enter = debug_span!(target: "engine::tree::payload_processor::prewarm", parent: span, "spawn_all").entered();
156151

157152
let (done_tx, done_rx) = mpsc::channel();
158153

159-
// When transaction_count_hint is 0, it means the count is unknown. In this case, spawn
154+
// When transaction_count is 0, it means the count is unknown. In this case, spawn
160155
// max workers to handle potentially many transactions in parallel rather
161156
// than bottlenecking on a single worker.
162-
let workers_needed = if transaction_count_hint == 0 {
157+
let transaction_count = ctx.env.transaction_count;
158+
let workers_needed = if transaction_count == 0 {
163159
max_concurrency
164160
} else {
165-
transaction_count_hint.min(max_concurrency)
161+
transaction_count.min(max_concurrency)
166162
};
167163

168164
// Spawn workers

crates/engine/tree/src/tree/payload_validator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ where
407407
hash: input.hash(),
408408
parent_hash: input.parent_hash(),
409409
parent_state_root: parent_block.state_root(),
410+
transaction_count: input.transaction_count(),
410411
};
411412

412413
// Plan the strategy used for state root computation.

0 commit comments

Comments
 (0)