Skip to content

Commit 97ff85c

Browse files
authored
execution: make block read aheader non-global (#21056)
# execution: make block read aheader non-global ## Summary Replaces the package-level `globalReadAheader` in `execution/exec/blocks_read_ahead.go` with a per-`Ethereum` instance. Each backend now owns its own `*BlockReadAheader` (caches + warmup `WaitGroup`) and threads it through to the consumers that need it. ## Why The `globalReadAheader` is a process-wide singleton. The exec module's `ValidateChain` calls `AddHeaderAndBody`, which spawns a fire-and-forget warmup goroutine tracked on a shared `sync.WaitGroup`. The eth backend's `Stop()` waits on that same WaitGroup before closing the chain DB. With **multiple `Ethereum` instances in one process** (e.g. the new `evm enginextest` runner that creates ~36 k testers in a single run, or any other harness that spins up many backends), `Add(1)` from one tester races with `Wait()` returning in another. Once the counter momentarily hits zero, a concurrent `Add(1)` panics: ``` panic: sync: WaitGroup is reused before previous Wait has returned ``` This was reproducible at `--workers ≥ 16` running `evm enginextest` against the full EEST `blockchain_tests_engine_x` set on tmpfs. Single-Erigon-per-process production deployments never trigger it, which is why it has been latent. ## Approach Make the read-aheader a regular struct owned by each `Ethereum`. The lifetime is the same as the eth backend, the WaitGroup never escapes the backend, and the race vanishes. ## Files (13 changed, +69 / −58) - `execution/exec/blocks_read_ahead.go` — exports `BlockReadAheader` and `NewBlockReadAheader()`. Methods replace the package-level `*Global*` wrappers (`AddHeaderAndBodyToGlobalReadAheader`, `AddSendersToGlobalReadAheader`, `ReadBodyWithTransactionsFromGlobalReadAheader`, `ReadBlockWithSendersFromGlobalReadAheader`, `WaitForWarmup`). The unused `WarmBodyFromGlobalReadAheader` is dropped. - `node/eth/backend.go` — `Ethereum.readAheader` field, initialized in `New` to `exec.NewBlockReadAheader()`, used by `Stop()` and threaded into the staged-sync configs and `NewExecModule`. - `execution/execmodule/exec_module.go` — `ExecModule.readAheader` field; new arg on `NewExecModule`. `ValidateChain` now calls `e.readAheader.AddHeaderAndBody(...)`. - `execution/stagedsync/stage_senders.go` (+ `stage_senders_test.go`) — `SendersCfg.readAheader`; new arg on `StageSendersCfg`; calls `cfg.readAheader.{AddSenders, ReadBodyWithTransactions}`. - `execution/stagedsync/stage_execute.go` — `ExecuteBlockCfg.readAheader`; new arg on `StageExecuteBlocksCfg`. - `execution/stagedsync/exec3.go`, `exec3_serial.go` — call `te.cfg.readAheader.ReadBlockWithSenders(...)` / `se.cfg.readAheader.ReadBlockWithSenders(...)`. - `execution/stagedsync/stage_witness.go` — passes a fresh `exec.NewBlockReadAheader()` (rewind-only path, doesn't share with the live backend). - `execution/stagedsync/stageloop/stageloop.go` — `NewDefaultStages` / `NewPipelineStages` / `NewInMemoryExecution` accept `readAheader` and plumb it into the senders + execute stage configs. - `execution/execmodule/execmoduletester/exec_module_tester.go` — constructs one `readAheader` and reuses it across the tester's stage configs and `NewExecModule` call (mirroring how `Ethereum` shares a single instance). - `cmd/integration/commands/stages.go`, `state_stages.go` — pass fresh `exec.NewBlockReadAheader()` instances at the integration-CLI call sites (each invocation is its own short-lived process). No runtime semantics change inside the read-aheader itself: same caches, same one-warmup-at-a-time `atomic.Bool`, same `WaitForWarmup` behaviour. The only change is that the WaitGroup is per-instance rather than process-wide. ## Verification - `make lint` clean. - **`evm enginextest`** against the full EEST `blockchain_tests_engine_x` set (`fixtures_develop` v5.4.0), `--workers 8` on `TMPDIR=/dev/shm`: | metric | value | |--------|-------| | Tests passed | **63920 / 63920** | | Wall time | 5:10 | | Peak RSS | 5.0 GB | No `WaitGroup` panic at any worker count tested (8, 16, 24). - **`make test-all`** with `ERIGON_EXECUTION_TESTS_TMPDIR=/mnt/erigon-ramdisk GOGC=80`: **222 packages green, 0 failures.** All packages this PR touches pass directly: - `execution/exec` (0.10s) - `execution/execmodule` (0.83s), `execmoduletester` (0.29s) - `execution/stagedsync` (7.18s), `stagedsync/bodydownload`, `stagedsync/headerdownload` - `execution/tests` (212.89s, contains the previously-flaky `TestInvalidReceiptHashHighMgas`) - Leak-loop tooling test (build tag `leak`, in the engine-x test runner package) shows goroutine count flat at 3 across 15 iterations — the read-aheader's warmup goroutine terminates cleanly per Close. ## Notes - The integration commands (`cmd/integration/commands/stages.go`, `state_stages.go`) and the `RewindStagesForWitness` helper construct fresh `exec.NewBlockReadAheader()` instances rather than threading one through. Each is a one-shot, short-lived process where the cache wouldn't carry value across the call. If we ever want shared caching there, those sites have a clear seam to do it. - Production single-instance Erigon behaviour is unchanged: same caches, same warmup semantics, same `Stop()` wait, same `dbg.ReadAhead` gate inside `warmBody`.
1 parent 5c577df commit 97ff85c

13 files changed

Lines changed: 68 additions & 58 deletions

File tree

cmd/integration/commands/stages.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ func stageSenders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) er
606606
return err
607607
}
608608

609-
cfg := stagedsync.StageSendersCfg(chainConfig, sync.Cfg(), false /* badBlockHalt */, tmpdir, pm, br, nil /* hd */)
609+
cfg := stagedsync.StageSendersCfg(chainConfig, sync.Cfg(), false /* badBlockHalt */, tmpdir, pm, br, nil /* hd */, exec.NewBlockReadAheader())
610610
if unwind > 0 {
611611
if unwind > s.BlockNumber {
612612
return errors.New("cannot unwind past 0")
@@ -680,7 +680,7 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error
680680
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, chainConfig, engine, vmConfig, notifications,
681681
/*stateStream=*/ false,
682682
/*badBlockHalt=*/ true,
683-
dirs, br, nil, genesis, syncCfg, false /*experimentalBAL*/)
683+
dirs, br, nil, genesis, syncCfg, false /*experimentalBAL*/, exec.NewBlockReadAheader())
684684

685685
if unwind > 0 {
686686
if err := db.ViewTemporal(ctx, func(tx kv.TemporalTx) error {
@@ -1258,7 +1258,7 @@ func newSync(ctx context.Context, db kv.TemporalRwDB, builderConfig *buildercfg.
12581258
}
12591259
notifications := shards.NewNotifications(nil)
12601260
blockRetire := freezeblocks.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs, blockReader, blockWriter, db, heimdallStore, bridgeStore, chainConfig, &cfg, notifications.Events, blockSnapBuildSema, logger)
1261-
stageList := stageloop.NewDefaultStages(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, nil, nil)
1261+
stageList := stageloop.NewDefaultStages(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, nil, nil, exec.NewBlockReadAheader())
12621262
sync := stagedsync.New(cfg.Sync, stageList, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, stages.ModeApplyingBlocks)
12631263
return blockRetire, engine, vmConfig, sync
12641264
}

cmd/integration/commands/state_stages.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/erigontech/erigon/db/state/execctx"
3838
"github.com/erigontech/erigon/execution/builder/buildercfg"
3939
chainspec "github.com/erigontech/erigon/execution/chain/spec"
40+
"github.com/erigontech/erigon/execution/exec"
4041
"github.com/erigontech/erigon/execution/stagedsync"
4142
"github.com/erigontech/erigon/execution/stagedsync/stages"
4243
"github.com/erigontech/erigon/execution/tracing/tracers/logger"
@@ -183,7 +184,7 @@ func syncBySmallSteps(db kv.TemporalRwDB, builderConfig buildercfg.BuilderConfig
183184
}
184185

185186
br, _ := blocksIO(db, logger1)
186-
execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, chainConfig, engine, vmConfig, notifications, false, true, dirs, br, nil, spec.Genesis, syncCfg, false)
187+
execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, chainConfig, engine, vmConfig, notifications, false, true, dirs, br, nil, spec.Genesis, syncCfg, false, exec.NewBlockReadAheader())
187188

188189
execUntilFunc := func(execToBlock uint64) stagedsync.ExecFunc {
189190
return func(badBlockUnwind bool, s *stagedsync.StageState, unwinder stagedsync.Unwinder, doms *execctx.SharedDomains, rwTx kv.TemporalRwTx, logger log.Logger) error {
@@ -369,7 +370,7 @@ func loopExec(db kv.TemporalRwDB, ctx context.Context, unwind uint64, logger log
369370
initialCycle := false
370371
br, _ := blocksIO(db, logger)
371372
notifications := shards.NewNotifications(nil)
372-
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, chainConfig, engine, vmConfig, notifications, false, true, dirs, br, nil, spec.Genesis, syncCfg, false)
373+
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, chainConfig, engine, vmConfig, notifications, false, true, dirs, br, nil, spec.Genesis, syncCfg, false, exec.NewBlockReadAheader())
373374

374375
// set block limit of execute stage
375376
sync.MockExecFunc(stages.Execution, func(badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, sd *execctx.SharedDomains, tx kv.TemporalRwTx, logger log.Logger) error {

execution/exec/blocks_read_ahead.go

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"github.com/erigontech/erigon/execution/types/accounts"
2323
)
2424

25-
type blockReadAheader struct {
25+
type BlockReadAheader struct {
2626
// keeps some caches for block themselves
2727
headers *lru.Cache[common.Hash, *types.Header]
2828
bodies *lru.Cache[common.Hash, *types.Body]
@@ -33,7 +33,7 @@ type blockReadAheader struct {
3333
warmWg sync.WaitGroup
3434
}
3535

36-
func newBlockReadAheader() *blockReadAheader {
36+
func NewBlockReadAheader() *BlockReadAheader {
3737
headers, err := lru.New[common.Hash, *types.Header](4)
3838
if err != nil {
3939
panic(err)
@@ -46,16 +46,14 @@ func newBlockReadAheader() *blockReadAheader {
4646
if err != nil {
4747
panic(err)
4848
}
49-
return &blockReadAheader{
49+
return &BlockReadAheader{
5050
headers: headers,
5151
bodies: bodies,
5252
senders: senders,
5353
}
5454
}
5555

56-
var globalReadAheader = newBlockReadAheader()
57-
58-
func (bra *blockReadAheader) AddHeaderAndBody(ctx context.Context, db kv.RoDB, header *types.Header, body *types.Body) {
56+
func (bra *BlockReadAheader) AddHeaderAndBody(ctx context.Context, db kv.RoDB, header *types.Header, body *types.Body) {
5957
blockHash := header.Hash()
6058
bra.headers.Add(blockHash, header)
6159
bra.bodies.Add(blockHash, body)
@@ -75,10 +73,10 @@ func (bra *blockReadAheader) AddHeaderAndBody(ctx context.Context, db kv.RoDB, h
7573
// WaitForWarmup blocks until any in-flight warmBody goroutine finishes or
7674
// the context is cancelled. Call before closing the database to avoid
7775
// waitTxsAllDoneOnClose hangs.
78-
func WaitForWarmup(ctx context.Context) {
76+
func (bra *BlockReadAheader) WaitForWarmup(ctx context.Context) {
7977
done := make(chan struct{})
8078
go func() {
81-
globalReadAheader.warmWg.Wait()
79+
bra.warmWg.Wait()
8280
close(done)
8381
}()
8482
select {
@@ -87,26 +85,18 @@ func WaitForWarmup(ctx context.Context) {
8785
}
8886
}
8987

90-
func (bra *blockReadAheader) AddSenders(senders []byte, blockHash common.Hash) {
88+
func (bra *BlockReadAheader) AddSenders(senders []byte, blockHash common.Hash) {
9189
if _, ok := bra.bodies.Get(blockHash); !ok {
9290
return
9391
}
9492
bra.senders.Add(blockHash, common.Copy(senders))
9593
}
9694

97-
func AddHeaderAndBodyToGlobalReadAheader(ctx context.Context, db kv.RoDB, header *types.Header, body *types.Body) {
98-
globalReadAheader.AddHeaderAndBody(ctx, db, header, body)
99-
}
100-
101-
func AddSendersToGlobalReadAheader(senders []byte, blockHash common.Hash) {
102-
globalReadAheader.AddSenders(senders, blockHash)
103-
}
104-
10595
// warmBody warms state for all transactions in a body using multiple workers.
10696
// It reads: To accounts, To account code, To account storage from access lists,
10797
// and block-level access lists. Each worker creates its own transaction.
10898
// Only one warmBody can run at a time - concurrent calls are no-ops.
109-
func (bra *blockReadAheader) warmBody(ctx context.Context, db kv.RoDB, header *types.Header, body *types.Body, workers int) {
99+
func (bra *BlockReadAheader) warmBody(ctx context.Context, db kv.RoDB, header *types.Header, body *types.Body, workers int) {
110100
defer bra.warming.Store(false)
111101

112102
if !dbg.ReadAhead {
@@ -269,24 +259,20 @@ func (bra *blockReadAheader) warmBody(ctx context.Context, db kv.RoDB, header *t
269259
wg.Wait()
270260
}
271261

272-
func WarmBodyFromGlobalReadAheader(ctx context.Context, db kv.RoDB, header *types.Header, body *types.Body, workers int) {
273-
globalReadAheader.warmBody(ctx, db, header, body, workers)
274-
}
275-
276-
func ReadBodyWithTransactionsFromGlobalReadAheader(blockHash common.Hash) (*types.Body, bool) {
277-
return globalReadAheader.bodies.Get(blockHash)
262+
func (bra *BlockReadAheader) ReadBodyWithTransactions(blockHash common.Hash) (*types.Body, bool) {
263+
return bra.bodies.Get(blockHash)
278264
}
279265

280-
func ReadBlockWithSendersFromGlobalReadAheader(blockHash common.Hash) (*types.Block, bool) {
281-
header, ok := globalReadAheader.headers.Get(blockHash)
266+
func (bra *BlockReadAheader) ReadBlockWithSenders(blockHash common.Hash) (*types.Block, bool) {
267+
header, ok := bra.headers.Get(blockHash)
282268
if header == nil || !ok {
283269
return nil, false
284270
}
285-
body, ok := globalReadAheader.bodies.Get(blockHash)
271+
body, ok := bra.bodies.Get(blockHash)
286272
if body == nil || !ok {
287273
return nil, false
288274
}
289-
senders, ok := globalReadAheader.senders.Get(blockHash)
275+
senders, ok := bra.senders.Get(blockHash)
290276
if len(senders) == 0 || !ok {
291277
return nil, false
292278
}

execution/execmodule/exec_module.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ type ExecModule struct {
225225
publishedSD func() *execctx.SharedDomains // fallback for background commit
226226

227227
// stateCache is a cache for state data (accounts, storage, code)
228-
stateCache *cache.StateCache
228+
stateCache *cache.StateCache
229+
readAheader *exec.BlockReadAheader
229230

230231
stopNode func() error
231232
}
@@ -249,6 +250,7 @@ func NewExecModule(
249250
fcuBackgroundPrune bool,
250251
fcuBackgroundCommit bool,
251252
onlySnapDownloadOnStart bool,
253+
readAheader *exec.BlockReadAheader,
252254
stopNode func() error,
253255
) *ExecModule {
254256
domainCache := cache.NewDefaultStateCache()
@@ -273,6 +275,7 @@ func NewExecModule(
273275
fcuBackgroundCommit: fcuBackgroundCommit,
274276
onlySnapDownloadOnStart: onlySnapDownloadOnStart,
275277
stateCache: domainCache,
278+
readAheader: readAheader,
276279
stopNode: stopNode,
277280
}
278281

@@ -418,7 +421,7 @@ func (e *ExecModule) ValidateChain(ctx context.Context, blockHash common.Hash, b
418421
if err != nil {
419422
return ValidationResult{}, err
420423
}
421-
exec.AddHeaderAndBodyToGlobalReadAheader(ctx, e.db, header, body)
424+
e.readAheader.AddHeaderAndBody(ctx, e.db, header, body)
422425
currentBlockNumber = rawdb.ReadCurrentBlockNumber(overlay)
423426
} else {
424427
if err := e.db.View(ctx, func(tx kv.Tx) error {
@@ -431,7 +434,7 @@ func (e *ExecModule) ValidateChain(ctx context.Context, blockHash common.Hash, b
431434
if err != nil {
432435
return err
433436
}
434-
exec.AddHeaderAndBodyToGlobalReadAheader(ctx, e.db, header, body)
437+
e.readAheader.AddHeaderAndBody(ctx, e.db, header, body)
435438
currentBlockNumber = rawdb.ReadCurrentBlockNumber(tx)
436439
return nil
437440
}); err != nil {

execution/execmodule/execmoduletester/exec_module_tester.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import (
5454
dbstate "github.com/erigontech/erigon/db/state"
5555
"github.com/erigontech/erigon/execution/builder"
5656
"github.com/erigontech/erigon/execution/chain"
57+
"github.com/erigontech/erigon/execution/exec"
5758
"github.com/erigontech/erigon/execution/execmodule"
5859
"github.com/erigontech/erigon/execution/execmodule/chainreader"
5960
"github.com/erigontech/erigon/execution/protocol/rules"
@@ -616,6 +617,7 @@ func New(tb testing.TB, opts ...Option) *ExecModuleTester {
616617
close(miningCancel)
617618
}()
618619

620+
readAheader := exec.NewBlockReadAheader()
619621
blkBuilder := builder.NewBuilder(
620622
mock.Ctx,
621623
mock.DB,
@@ -639,6 +641,7 @@ func New(tb testing.TB, opts ...Option) *ExecModuleTester {
639641
gspec,
640642
cfg.Sync,
641643
false, /*experimentalBAL*/
644+
readAheader,
642645
),
643646
nil, /*notifier*/
644647
&vm.Config{},
@@ -659,7 +662,7 @@ func New(tb testing.TB, opts ...Option) *ExecModuleTester {
659662
stagedsync.StageHeadersCfg(mock.sentriesClient.Hd, mock.ChainConfig, cfg.Sync, sendHeaderRequest, propagateNewBlockHashes, penalize, false /* noP2PDiscovery */, mock.BlockReader),
660663
stagedsync.StageBlockHashesCfg(mock.Dirs.Tmp, blockWriter),
661664
stagedsync.StageBodiesCfg(mock.sentriesClient.Bd, sendBodyRequest, penalize, blockPropagator, cfg.Sync.BodyDownloadTimeoutSeconds, mock.ChainConfig, mock.BlockReader, blockWriter),
662-
stagedsync.StageSendersCfg(mock.ChainConfig, cfg.Sync, false /* badBlockHalt */, dirs.Tmp, pruneMode, mock.BlockReader, mock.sentriesClient.Hd),
665+
stagedsync.StageSendersCfg(mock.ChainConfig, cfg.Sync, false /* badBlockHalt */, dirs.Tmp, pruneMode, mock.BlockReader, mock.sentriesClient.Hd, readAheader),
663666
stagedsync.StageExecuteBlocksCfg(
664667
mock.DB,
665668
pruneMode,
@@ -676,6 +679,7 @@ func New(tb testing.TB, opts ...Option) *ExecModuleTester {
676679
gspec,
677680
cfg.Sync,
678681
false, /*experimentalBAL*/
682+
readAheader,
679683
),
680684
stagedsync.StageTxLookupCfg(pruneMode, dirs.Tmp, mock.BlockReader),
681685
stagedsync.StageFinishCfg(),
@@ -695,13 +699,13 @@ func New(tb testing.TB, opts ...Option) *ExecModuleTester {
695699
}
696700

697701
cfg.Genesis = gspec
698-
pipelineStages := stageloop.NewPipelineStages(mock.Ctx, db, &cfg, mock.sentriesClient, mock.Notifications, snapDownloader, mock.BlockReader, blockRetire, tracer, nil)
702+
pipelineStages := stageloop.NewPipelineStages(mock.Ctx, db, &cfg, mock.sentriesClient, mock.Notifications, snapDownloader, mock.BlockReader, blockRetire, tracer, nil, readAheader)
699703
mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ModeApplyingBlocks)
700704

701705
// Create validation Sync and PipelineExecutor.
702706
validationNotifications := shards.NewNotifications(nil)
703707
validationSync := stageloop.NewInMemoryExecution(mock.Ctx, mock.DB, &cfg, mock.sentriesClient,
704-
validationNotifications, mock.BlockReader, blockWriter, logger)
708+
validationNotifications, mock.BlockReader, blockWriter, logger, readAheader)
705709
dispatcher := execmodule.NewDispatcher(mock.ChainConfig, mock.Notifications.Events, mock.Notifications.StateChangesConsumer, logger)
706710
pipelineExecutor := execmodule.NewPipelineExecutor(mock.posStagedSync, mock.DB, mock.BlockReader, mock.ChainConfig, mock.Engine, validationSync, validationNotifications, dispatcher, logger)
707711

@@ -731,6 +735,7 @@ func New(tb testing.TB, opts ...Option) *ExecModuleTester {
731735
cfg.FcuBackgroundPrune,
732736
cfg.FcuBackgroundCommit,
733737
onlySnapDownloadOnStart,
738+
readAheader,
734739
func() error { return nil },
735740
)
736741
mock.ForkValidator = mock.ExecModule.ForkValidator()

execution/stagedsync/exec3.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,7 @@ func (te *txExecutor) executeBlocks(ctx context.Context, startBlockNum uint64, m
583583
if err != nil {
584584
return err
585585
}
586-
b, ok := exec.ReadBlockWithSendersFromGlobalReadAheader(canonicalHash)
586+
b, ok := te.cfg.readAheader.ReadBlockWithSenders(canonicalHash)
587587
if b == nil || !ok {
588588
b, err = exec.BlockWithSenders(ctx, te.cfg.db, blockTx, te.cfg.blockReader, blockNum)
589589
}

execution/stagedsync/exec3_serial.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (se *serialExecutor) exec(ctx context.Context, execStage *StageState, u Unw
101101
return nil, rwTx, err
102102
}
103103
var ok bool
104-
b, ok = exec.ReadBlockWithSendersFromGlobalReadAheader(canonicalHash)
104+
b, ok = se.cfg.readAheader.ReadBlockWithSenders(canonicalHash)
105105
if b == nil || !ok {
106106
b, err = exec.BlockWithSenders(ctx, se.cfg.db, se.applyTx, se.cfg.blockReader, blockNum)
107107
if err != nil {

execution/stagedsync/stage_execute.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"github.com/erigontech/erigon/db/state/execctx"
4444
"github.com/erigontech/erigon/diagnostics/metrics"
4545
"github.com/erigontech/erigon/execution/chain"
46+
"github.com/erigontech/erigon/execution/exec"
4647
"github.com/erigontech/erigon/execution/protocol/rules"
4748
"github.com/erigontech/erigon/execution/stagedsync/stages"
4849
"github.com/erigontech/erigon/execution/types"
@@ -85,6 +86,7 @@ type ExecuteBlockCfg struct {
8586
genesis *types.Genesis
8687

8788
experimentalBAL bool
89+
readAheader *exec.BlockReadAheader
8890
}
8991

9092
func StageExecuteBlocksCfg(
@@ -104,6 +106,7 @@ func StageExecuteBlocksCfg(
104106
genesis *types.Genesis,
105107
syncCfg ethconfig.Sync,
106108
experimentalBAL bool,
109+
readAheader *exec.BlockReadAheader,
107110
) ExecuteBlockCfg {
108111
if dirs.SnapDomain == "" {
109112
panic("empty `dirs` variable")
@@ -126,6 +129,7 @@ func StageExecuteBlocksCfg(
126129
historyV3: true,
127130
syncCfg: syncCfg,
128131
experimentalBAL: experimentalBAL,
132+
readAheader: readAheader,
129133
}
130134
}
131135

execution/stagedsync/stage_senders.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,10 @@ type SendersCfg struct {
5656
chainConfig *chain.Config
5757
hd *headerdownload.HeaderDownload
5858
blockReader services.FullBlockReader
59+
readAheader *exec.BlockReadAheader
5960
}
6061

61-
func StageSendersCfg(chainCfg *chain.Config, syncCfg ethconfig.Sync, badBlockHalt bool, tmpdir string, prune prune.Mode, blockReader services.FullBlockReader, hd *headerdownload.HeaderDownload) SendersCfg {
62+
func StageSendersCfg(chainCfg *chain.Config, syncCfg ethconfig.Sync, badBlockHalt bool, tmpdir string, prune prune.Mode, blockReader services.FullBlockReader, hd *headerdownload.HeaderDownload, readAheader *exec.BlockReadAheader) SendersCfg {
6263
const sendersBatchSize = 1000
6364
return SendersCfg{
6465
batchSize: sendersBatchSize,
@@ -68,6 +69,7 @@ func StageSendersCfg(chainCfg *chain.Config, syncCfg ethconfig.Sync, badBlockHal
6869
chainConfig: chainCfg,
6970
hd: hd,
7071
blockReader: blockReader,
72+
readAheader: readAheader,
7173
}
7274
}
7375

@@ -167,7 +169,7 @@ func SpawnRecoverSendersStage(cfg SendersCfg, s *StageState, u Unwinder, tx kv.R
167169

168170
// Check if block is complete
169171
if pb.received == pb.txCount {
170-
exec.AddSendersToGlobalReadAheader(pb.senders, pb.hash)
172+
cfg.readAheader.AddSenders(pb.senders, pb.hash)
171173
if err := collectorSenders.Collect(dbutils.BlockBodyKey(s.BlockNumber+uint64(j.blockIndex)+1, pb.hash), pb.senders); err != nil {
172174
pendingMu.Unlock()
173175
errCh <- senderRecoveryError{err: err}
@@ -238,7 +240,7 @@ Loop:
238240
continue
239241
}
240242

241-
body, ok := exec.ReadBodyWithTransactionsFromGlobalReadAheader(blockHash)
243+
body, ok := cfg.readAheader.ReadBodyWithTransactions(blockHash)
242244
if body == nil || !ok {
243245
if body, err = cfg.blockReader.BodyWithTransactions(ctx, tx, blockHash, blockNumber); err != nil {
244246
return err

execution/stagedsync/stage_senders_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/erigontech/erigon/db/kv/prune"
3131
"github.com/erigontech/erigon/db/rawdb"
3232
"github.com/erigontech/erigon/execution/chain"
33+
"github.com/erigontech/erigon/execution/exec"
3334
"github.com/erigontech/erigon/execution/execmodule/execmoduletester"
3435
"github.com/erigontech/erigon/execution/stagedsync"
3536
"github.com/erigontech/erigon/execution/stagedsync/stages"
@@ -145,7 +146,7 @@ func TestSenders(t *testing.T) {
145146

146147
require.NoError(stages.SaveStageProgress(tx, stages.Bodies, 3))
147148

148-
cfg := stagedsync.StageSendersCfg(chain.TestChainBerlinConfig, ethconfig.Defaults.Sync, false, "", prune.Mode{}, br, nil)
149+
cfg := stagedsync.StageSendersCfg(chain.TestChainBerlinConfig, ethconfig.Defaults.Sync, false, "", prune.Mode{}, br, nil, exec.NewBlockReadAheader())
149150
err = stagedsync.SpawnRecoverSendersStage(cfg, &stagedsync.StageState{ID: stages.Senders}, nil, tx, 3, m.Ctx, log.New())
150151
require.NoError(err)
151152

0 commit comments

Comments
 (0)