Skip to content

Commit fc65e18

Browse files
committed
execmodule: extract PipelineExecutor for unified hasMore loop
Consolidate the hasMore pipeline loop pattern shared by ProcessFrozenBlocks (startup catchup) and updateForkChoice (FCU catchup) into a single PipelineExecutor.RunLoop method. Callers provide their specific commit/break/setup logic as callbacks: - CommitCycleFn: flush SD, commit, return fresh tx - ShouldBreakFn: optional early exit (e.g. frozen blocks reached) - BeforeIterationFn: optional per-iteration setup (e.g. state cache) Key design decision: RunPrune and CommitCycle only execute when hasMore=true, since the FCU overlay tx (MemoryMutation on RO tx) cannot handle PruneSmallBatches type assertions.
1 parent 161ac62 commit fc65e18

3 files changed

Lines changed: 261 additions & 38 deletions

File tree

execution/execmodule/exec_module.go

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -538,11 +538,11 @@ func (e *ExecModule) Start(ctx context.Context, hook *stageloop.Hook) {
538538
}
539539
defer e.semaphore.Release(1)
540540

541-
if err := stageloop.ProcessFrozenBlocks(ctx, e.db, e.blockReader, e.executionPipeline, hook, e.onlySnapDownloadOnStart, e.logger); err != nil {
541+
if err := e.processFrozenBlocks(ctx, hook); err != nil {
542542
if !errors.Is(err, context.Canceled) {
543543
e.logger.Error("Could not start execution service", "err", err)
544544
}
545-
// During parallel execution, an invalid block in initial sync (ProcessFrozenBlocks)
545+
// During parallel execution, an invalid block in initial sync (processFrozenBlocks)
546546
// is unrecoverable: the parallel executor cannot unwind and retrying will hit the
547547
// same block forever, pushing Caplin's backward target further back.
548548
// Exit the process so the operator can investigate.
@@ -558,6 +558,111 @@ func (e *ExecModule) Start(ctx context.Context, hook *stageloop.Hook) {
558558
}
559559
}
560560

561+
// processFrozenBlocks runs the pipeline over snapshot blocks at startup,
562+
// using PipelineExecutor for the hasMore loop.
563+
func (e *ExecModule) processFrozenBlocks(ctx context.Context, hook *stageloop.Hook) error {
564+
sawZeroBlocksTimes := 0
565+
tx, err := e.db.BeginTemporalRw(ctx)
566+
if err != nil {
567+
return err
568+
}
569+
defer tx.Rollback()
570+
571+
// Run snapshots stage — downloads block files.
572+
if err = e.executionPipeline.RunSnapshots(nil, tx); err != nil {
573+
return err
574+
}
575+
if e.onlySnapDownloadOnStart {
576+
return nil
577+
}
578+
579+
// If domains are ahead of block files, nothing to execute.
580+
if execctx.IsDomainAheadOfBlocks(ctx, tx, e.logger) {
581+
return tx.Commit()
582+
}
583+
584+
doms, err := execctx.NewSharedDomains(ctx, tx, e.logger)
585+
if err != nil {
586+
return err
587+
}
588+
defer doms.Close()
589+
590+
var finishStageBeforeSync uint64
591+
if hook != nil {
592+
finishStageBeforeSync, err = stages.GetStageProgress(tx, stages.Finish)
593+
if err != nil {
594+
return err
595+
}
596+
if err = hook.BeforeRun(tx, false); err != nil {
597+
return err
598+
}
599+
}
600+
601+
pe := NewPipelineExecutor(e.executionPipeline, e.logger)
602+
result, err := pe.RunLoop(ctx, RunLoopConfig{
603+
SD: doms,
604+
Tx: tx,
605+
InitialCycle: true,
606+
FirstCycle: false,
607+
PruneTimeout: 0,
608+
CommitCycle: func(ctx context.Context, sd *execctx.SharedDomains) (kv.TemporalRwTx, error) {
609+
if err := sd.Flush(ctx, tx); err != nil {
610+
return nil, fmt.Errorf("processFrozenBlocks: flush: %w", err)
611+
}
612+
sd.ClearRam(true)
613+
if err := tx.Commit(); err != nil {
614+
return nil, err
615+
}
616+
newTx, err := e.db.BeginTemporalRw(ctx) //nolint:gocritic
617+
if err != nil {
618+
return nil, err
619+
}
620+
tx = newTx
621+
return newTx, nil
622+
},
623+
ShouldBreak: func(curTx kv.TemporalRwTx) (bool, error) {
624+
if e.blockReader.FrozenBlocks() > 0 {
625+
p, err := stages.GetStageProgress(curTx, stages.Finish)
626+
if err != nil {
627+
return false, err
628+
}
629+
return p >= e.blockReader.FrozenBlocks(), nil
630+
}
631+
sawZeroBlocksTimes++
632+
return sawZeroBlocksTimes > 2, nil
633+
},
634+
})
635+
if err != nil {
636+
return fmt.Errorf("processFrozenBlocks: %w", err)
637+
}
638+
tx = result.FinalTx
639+
640+
if err := doms.Flush(ctx, tx); err != nil {
641+
return fmt.Errorf("processFrozenBlocks: final flush: %w", err)
642+
}
643+
doms.ClearRam(true)
644+
if err := tx.Commit(); err != nil {
645+
return err
646+
}
647+
648+
if hook != nil {
649+
if err := e.db.View(ctx, func(tx kv.Tx) error {
650+
headersProgress, err := stages.GetStageProgress(tx, stages.Headers)
651+
if err != nil {
652+
return err
653+
}
654+
if err = hook.AfterRun(tx, finishStageBeforeSync, false); err != nil {
655+
return err
656+
}
657+
hook.LastNewBlockSeen(headersProgress)
658+
return nil
659+
}); err != nil {
660+
return err
661+
}
662+
}
663+
return nil
664+
}
665+
561666
func (e *ExecModule) Ready(ctx context.Context, _ *emptypb.Empty) (*executionproto.ReadyResponse, error) {
562667

563668
// setup a timeout for the context to avoid waiting indefinitely

execution/execmodule/executor.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright 2024 The Erigon Authors
2+
// This file is part of Erigon.
3+
//
4+
// Erigon is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// Erigon is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with Erigon. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package execmodule
18+
19+
import (
20+
"context"
21+
"time"
22+
23+
"github.com/erigontech/erigon/common/log/v3"
24+
"github.com/erigontech/erigon/db/kv"
25+
"github.com/erigontech/erigon/db/state/execctx"
26+
"github.com/erigontech/erigon/execution/stagedsync"
27+
)
28+
29+
// PipelineExecutor runs the staged sync pipeline in a hasMore loop,
30+
// abstracting the common pattern shared by ProcessFrozenBlocks (startup
31+
// catchup) and updateForkChoice (FCU catchup). The executor is stateless
32+
// between calls — callers provide their own tx/SD lifecycle and commit
33+
// strategy via callbacks.
34+
type PipelineExecutor struct {
35+
sync *stagedsync.Sync
36+
logger log.Logger
37+
}
38+
39+
// NewPipelineExecutor creates a new executor wrapping the given pipeline.
40+
func NewPipelineExecutor(sync *stagedsync.Sync, logger log.Logger) *PipelineExecutor {
41+
return &PipelineExecutor{sync: sync, logger: logger}
42+
}
43+
44+
// CommitCycleFn is called between hasMore iterations to persist accumulated
45+
// state and obtain a fresh tx for the next iteration. Implementations must
46+
// flush the SharedDomains, commit, and return a fresh kv.TemporalRwTx.
47+
type CommitCycleFn func(ctx context.Context, sd *execctx.SharedDomains) (kv.TemporalRwTx, error)
48+
49+
// ShouldBreakFn is called after each iteration (after prune, before commit)
50+
// to check whether the loop should stop early. Return true to break.
51+
type ShouldBreakFn func(tx kv.TemporalRwTx) (bool, error)
52+
53+
// BeforeIterationFn is called before each pipeline Run (e.g. to set state cache).
54+
type BeforeIterationFn func(sd *execctx.SharedDomains)
55+
56+
// RunLoopConfig configures a single RunLoop invocation.
57+
type RunLoopConfig struct {
58+
SD *execctx.SharedDomains
59+
Tx kv.TemporalRwTx
60+
InitialCycle bool
61+
FirstCycle bool
62+
PruneTimeout time.Duration
63+
CommitCycle CommitCycleFn // required when hasMore
64+
ShouldBreak ShouldBreakFn // optional early exit
65+
BeforeIteration BeforeIterationFn // optional per-iteration setup
66+
}
67+
68+
// RunLoopResult contains the state after RunLoop completes.
69+
type RunLoopResult struct {
70+
// FinalTx is the tx after the last iteration. May differ from the
71+
// input Tx if CommitCycle was called (which returns a fresh tx).
72+
FinalTx kv.TemporalRwTx
73+
}
74+
75+
// RunLoop executes the pipeline in a hasMore loop:
76+
//
77+
// for hasMore {
78+
// [BeforeIteration] → sync.Run → if hasMore { RunPrune → [ShouldBreak] → CommitCycle }
79+
// }
80+
//
81+
// The loop continues until Run returns hasMore=false, ShouldBreak returns
82+
// true, or an error occurs. On error, FinalTx is the tx from the last
83+
// iteration (may be used by the caller for error recovery reads).
84+
func (pe *PipelineExecutor) RunLoop(ctx context.Context, cfg RunLoopConfig) (RunLoopResult, error) {
85+
tx := cfg.Tx
86+
for hasMore := true; hasMore; {
87+
if cfg.BeforeIteration != nil {
88+
cfg.BeforeIteration(cfg.SD)
89+
}
90+
91+
var err error
92+
hasMore, err = pe.sync.Run(cfg.SD, tx, cfg.InitialCycle, cfg.FirstCycle)
93+
if err != nil {
94+
return RunLoopResult{FinalTx: tx}, err
95+
}
96+
97+
if hasMore {
98+
if err := pe.sync.RunPrune(ctx, tx, cfg.InitialCycle, cfg.PruneTimeout); err != nil {
99+
return RunLoopResult{FinalTx: tx}, err
100+
}
101+
102+
if cfg.ShouldBreak != nil {
103+
stop, err := cfg.ShouldBreak(tx)
104+
if err != nil {
105+
return RunLoopResult{FinalTx: tx}, err
106+
}
107+
if stop {
108+
break
109+
}
110+
}
111+
112+
newTx, err := cfg.CommitCycle(ctx, cfg.SD)
113+
if err != nil {
114+
return RunLoopResult{FinalTx: tx}, err
115+
}
116+
tx = newTx
117+
}
118+
}
119+
return RunLoopResult{FinalTx: tx}, nil
120+
}

execution/execmodule/forkchoice.go

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -490,71 +490,69 @@ func (e *ExecModule) updateForkChoice(ctx context.Context, originalBlockHash, sa
490490
initialCycle := limitedBigJump
491491
firstCycle := false
492492

493-
sendError := func(msg string, err error) error {
494-
err = fmt.Errorf("%s: %w", msg, err)
495-
e.logger.Warn("Cannot update chain head", "hash", blockHash, "err", err)
496-
return sendForkchoiceErrorWithoutWaiting(e.logger, outcomeCh, err, stateFlushingInParallel)
497-
}
498-
499-
for hasMore := true; hasMore; {
500-
currentContext.SetStateCache(e.stateCache)
501-
hasMore, err = e.executionPipeline.Run(currentContext, tx, initialCycle, firstCycle)
502-
if err != nil {
503-
err = fmt.Errorf("updateForkChoice: %w", err)
504-
e.logger.Warn("Cannot update chain head", "hash", blockHash, "err", err)
505-
if errors.Is(err, rules.ErrInvalidBlock) {
506-
return sendForkchoiceReceiptWithoutWaiting(outcomeCh, &executionproto.ForkChoiceReceipt{
507-
Status: executionproto.ExecutionStatus_BadBlock,
508-
ValidationError: err.Error(),
509-
LatestValidHash: gointerfaces.ConvertHashToH256(rawdb.ReadHeadBlockHash(tx)),
510-
}, stateFlushingInParallel)
511-
}
512-
return sendForkchoiceErrorWithoutWaiting(e.logger, outcomeCh, err, stateFlushingInParallel)
513-
}
514-
if hasMore {
515-
err = e.executionPipeline.RunPrune(ctx, tx, initialCycle, 500*time.Millisecond)
516-
if err != nil {
517-
return sendError("updateForkChoice: RunPrune after hasMore", err)
518-
}
493+
pe := NewPipelineExecutor(e.executionPipeline, e.logger)
494+
result, err := pe.RunLoop(ctx, RunLoopConfig{
495+
SD: currentContext,
496+
Tx: tx,
497+
InitialCycle: initialCycle,
498+
FirstCycle: firstCycle,
499+
PruneTimeout: 500 * time.Millisecond,
500+
BeforeIteration: func(sd *execctx.SharedDomains) {
501+
sd.SetStateCache(e.stateCache)
502+
},
503+
CommitCycle: func(ctx context.Context, sd *execctx.SharedDomains) (kv.TemporalRwTx, error) {
519504
// Flush SD + overlay to a brief RwTx to relieve memory pressure.
520505
commitRwTx, err := e.db.BeginTemporalRw(ctx) //nolint:gocritic
521506
if err != nil {
522-
return sendError("updateForkChoice: begin rw after hasMore", err)
507+
return nil, err
523508
}
524-
if err := currentContext.Flush(ctx, commitRwTx); err != nil {
509+
if err := sd.Flush(ctx, commitRwTx); err != nil {
525510
commitRwTx.Rollback()
526-
return sendError("updateForkChoice: flush sd after hasMore", err)
511+
return nil, err
527512
}
528513
if err := fcuOverlay.Flush(ctx, commitRwTx); err != nil {
529514
commitRwTx.Rollback()
530-
return sendError("updateForkChoice: flush overlay after hasMore", err)
515+
return nil, err
531516
}
532-
currentContext.ClearRam(true)
517+
sd.ClearRam(true)
533518
if err = commitRwTx.Commit(); err != nil {
534-
return sendError("updateForkChoice: tx commit after hasMore", err)
519+
return nil, err
535520
}
536521
// Recreate RO tx + overlay on the fresh committed state.
537522
fcuOverlay.Close()
538523
roTx.Rollback()
539524
roTx, err = e.db.BeginTemporalRo(ctx) //nolint:gocritic
540525
if err != nil {
541-
return sendError("updateForkChoice: begin ro after hasMore", err)
526+
return nil, err
542527
}
543528
fcuOverlay, err = membatchwithdb.NewMemoryBatch(roTx, roTx.Debug().Dirs().Tmp, e.logger)
544529
if err != nil {
545530
roTx.Rollback()
546-
return sendError("updateForkChoice: new overlay after hasMore", err)
531+
return nil, err
547532
}
548-
tx = fcuOverlay
549533
// Re-flush InsertBlocks data into the fresh overlay.
550534
if hasOverlay {
551535
e.currentContext.BlockOverlay().UpdateTxn(roTx)
552536
if err := e.currentContext.BlockOverlay().Flush(ctx, fcuOverlay); err != nil {
553-
return sendError("updateForkChoice: re-flush overlay after hasMore", err)
537+
return nil, err
554538
}
555539
}
540+
return fcuOverlay, nil
541+
},
542+
})
543+
if err != nil {
544+
err = fmt.Errorf("updateForkChoice: %w", err)
545+
e.logger.Warn("Cannot update chain head", "hash", blockHash, "err", err)
546+
if errors.Is(err, rules.ErrInvalidBlock) {
547+
return sendForkchoiceReceiptWithoutWaiting(outcomeCh, &executionproto.ForkChoiceReceipt{
548+
Status: executionproto.ExecutionStatus_BadBlock,
549+
ValidationError: err.Error(),
550+
LatestValidHash: gointerfaces.ConvertHashToH256(rawdb.ReadHeadBlockHash(result.FinalTx)),
551+
}, stateFlushingInParallel)
556552
}
553+
return sendForkchoiceErrorWithoutWaiting(e.logger, outcomeCh, err, stateFlushingInParallel)
557554
}
555+
tx = result.FinalTx
558556

559557
// if head hash was set then success otherwise no
560558
headHash := rawdb.ReadHeadBlockHash(tx)

0 commit comments

Comments
 (0)