Skip to content

Commit 745451f

Browse files
authored
execution: add --fcu.timeout and --fcu.background.prune flags (#18723)
trying out some new flags for getting the gigagas execution client benchmarks to work
1 parent 05080a3 commit 745451f

File tree

10 files changed

+63
-45
lines changed

10 files changed

+63
-45
lines changed

cmd/utils/flags.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1103,6 +1103,16 @@ var (
11031103
Name: "experimental.always-generate-changesets",
11041104
Usage: "Allows to override changesets generation logic",
11051105
}
1106+
FcuTimeoutFlag = cli.DurationFlag{
1107+
Name: "fcu.timeout",
1108+
Usage: "FCU timeout before it switches to being process async (use 0 to disable)",
1109+
Value: ethconfig.Defaults.FcuTimeout,
1110+
}
1111+
FcuBackgroundPruneFlag = cli.BoolFlag{
1112+
Name: "fcu.background.prune",
1113+
Usage: "Enables background pruning post fcu",
1114+
Value: ethconfig.Defaults.FcuBackgroundPrune,
1115+
}
11061116
// ErigonDB geometry settings
11071117
ErigonDBStepSizeFlag = cli.Uint64Flag{
11081118
Name: "erigondb.override.stepsize",
@@ -1919,7 +1929,8 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *nodecfg.Config, cfg *ethconfig.C
19191929

19201930
cfg.ErigonDBStepSize = ctx.Int(ErigonDBStepSizeFlag.Name)
19211931
cfg.ErigonDBStepsInFrozenFile = ctx.Int(ErigonDBStepsInFrozenFileFlag.Name)
1922-
1932+
cfg.FcuTimeout = ctx.Duration(FcuTimeoutFlag.Name)
1933+
cfg.FcuBackgroundPrune = ctx.Bool(FcuBackgroundPruneFlag.Name)
19231934
if ctx.IsSet(RPCGlobalGasCapFlag.Name) {
19241935
cfg.RPCGasCap = ctx.Uint64(RPCGlobalGasCapFlag.Name)
19251936
}

execution/engineapi/engine_block_downloader/block_downloader.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,6 @@ import (
3838
"github.com/erigontech/erigon/node/gointerfaces/executionproto"
3939
)
4040

41-
const (
42-
forkchoiceTimeoutMillis = 5000
43-
)
44-
4541
type Status int
4642

4743
const (
@@ -80,14 +76,16 @@ func NewEngineBlockDownloader(
8076
if err != nil {
8177
panic(fmt.Errorf("failed to create badHeaders cache: %w", err))
8278
}
79+
// the block downloader has fcuTimeout=0 to avoid having to deal with async fcu
80+
chainRW := eth1_chain_reader.NewChainReaderEth1(config, executionClient, 0 /* fcuTimeout */)
8381
return &EngineBlockDownloader{
8482
backgroundCtx: ctx,
8583
db: db,
8684
status: s,
8785
syncCfg: syncCfg,
8886
logger: logger,
8987
blockReader: blockReader,
90-
chainRW: eth1_chain_reader.NewChainReaderEth1(config, executionClient, forkchoiceTimeoutMillis),
88+
chainRW: chainRW,
9189
bbd: bbd,
9290
badHeaders: badHeaders,
9391
}

execution/engineapi/engine_server.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,6 @@ type EngineServer struct {
8383
printPectraBanner bool
8484
}
8585

86-
const DefaultFcuTimeout = 1 * time.Second
87-
8886
func NewEngineServer(
8987
logger log.Logger,
9088
config *chain.Config,
@@ -96,9 +94,6 @@ func NewEngineServer(
9694
txPool txpoolproto.TxpoolClient,
9795
fcuTimeout time.Duration,
9896
) *EngineServer {
99-
if fcuTimeout == 0 {
100-
fcuTimeout = DefaultFcuTimeout
101-
}
10297
chainRW := eth1_chain_reader.NewChainReaderEth1(config, executionService, uint64(fcuTimeout.Milliseconds()))
10398
srv := &EngineServer{
10499
logger: logger,

execution/engineapi/engine_server_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,8 @@ func TestGetBlobsV1(t *testing.T) {
112112

113113
executionRpc := direct.NewExecutionClientDirect(mockSentry.Eth1ExecutionService)
114114
eth := rpcservices.NewRemoteBackend(nil, mockSentry.DB, mockSentry.BlockReader)
115-
engineServer := NewEngineServer(mockSentry.Log, mockSentry.ChainConfig, executionRpc, nil, false, false, true, txPool, DefaultFcuTimeout)
115+
fcuTimeout := ethconfig.Defaults.FcuTimeout
116+
engineServer := NewEngineServer(mockSentry.Log, mockSentry.ChainConfig, executionRpc, nil, false, false, true, txPool, fcuTimeout)
116117
ctx, cancel := context.WithCancel(ctx)
117118
var eg errgroup.Group
118119
t.Cleanup(func() {
@@ -162,7 +163,8 @@ func TestGetBlobsV2(t *testing.T) {
162163

163164
executionRpc := direct.NewExecutionClientDirect(mockSentry.Eth1ExecutionService)
164165
eth := rpcservices.NewRemoteBackend(nil, mockSentry.DB, mockSentry.BlockReader)
165-
engineServer := NewEngineServer(mockSentry.Log, mockSentry.ChainConfig, executionRpc, nil, false, false, true, txPool, DefaultFcuTimeout)
166+
fcuTimeout := ethconfig.Defaults.FcuTimeout
167+
engineServer := NewEngineServer(mockSentry.Log, mockSentry.ChainConfig, executionRpc, nil, false, false, true, txPool, fcuTimeout)
166168
ctx, cancel := context.WithCancel(ctx)
167169
var eg errgroup.Group
168170
t.Cleanup(func() {

execution/eth1/ethereum_execution.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"errors"
2222
"math/big"
2323
"strings"
24-
"sync/atomic"
2524
"time"
2625

2726
"golang.org/x/sync/semaphore"
@@ -120,8 +119,7 @@ type EthereumExecutionModule struct {
120119
// rules engine
121120
engine rules.Engine
122121

123-
doingPostForkchoice atomic.Bool
124-
122+
fcuBackgroundPrune bool
125123
// metrics for average mgas/sec
126124
avgMgasSec float64
127125

@@ -137,6 +135,7 @@ func NewEthereumExecutionModule(blockReader services.FullBlockReader, db kv.Temp
137135
logger log.Logger, engine rules.Engine,
138136
syncCfg ethconfig.Sync,
139137
ctx context.Context,
138+
fcuBackgroundPrune bool,
140139
) *EthereumExecutionModule {
141140
return &EthereumExecutionModule{
142141
blockReader: blockReader,
@@ -155,6 +154,7 @@ func NewEthereumExecutionModule(blockReader services.FullBlockReader, db kv.Temp
155154
engine: engine,
156155
syncCfg: syncCfg,
157156
bacgroundCtx: ctx,
157+
fcuBackgroundPrune: fcuBackgroundPrune,
158158
}
159159
}
160160

execution/eth1/forkchoice.go

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,12 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
180180
}, false)
181181
return
182182
}
183-
defer e.semaphore.Release(1)
183+
shouldReleaseSema := true
184+
defer func() {
185+
if shouldReleaseSema {
186+
e.semaphore.Release(1)
187+
}
188+
}()
184189

185190
defer UpdateForkChoiceDuration(time.Now())
186191

@@ -602,8 +607,22 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
602607
e.logger.Info("head updated", logArgs...)
603608
}
604609
}
605-
if *headNumber >= startPruneFrom {
606-
e.runPostForkchoiceInBackground(initialCycle)
610+
if e.fcuBackgroundPrune {
611+
shouldReleaseSema = false // pass on semaphore to background goroutine doing post fcu processing
612+
go func() {
613+
defer e.semaphore.Release(1)
614+
err := e.runPostForkchoice(*headNumber, initialCycle)
615+
if err != nil {
616+
e.logger.Error("run post fork choice in background", "error", err)
617+
}
618+
}()
619+
} else {
620+
err := e.runPostForkchoice(*headNumber, initialCycle)
621+
if err != nil {
622+
e.logger.Error("run post fork choice in background", "error", err)
623+
sendForkchoiceErrorWithoutWaiting(e.logger, outcomeCh, err, stateFlushingInParallel)
624+
return
625+
}
607626
}
608627

609628
sendForkchoiceReceiptWithoutWaiting(outcomeCh, &executionproto.ForkChoiceReceipt{
@@ -613,18 +632,9 @@ func (e *EthereumExecutionModule) updateForkChoice(ctx context.Context, original
613632
}, stateFlushingInParallel)
614633
}
615634

616-
func (e *EthereumExecutionModule) runPostForkchoiceInBackground(initialCycle bool) {
617-
if !e.doingPostForkchoice.CompareAndSwap(false, true) {
618-
return
619-
}
620-
go func() {
621-
defer e.doingPostForkchoice.Store(false)
622-
var timings []interface{}
623-
// Wait for semaphore to be available
624-
if e.semaphore.Acquire(e.bacgroundCtx, 1) != nil {
625-
return
626-
}
627-
defer e.semaphore.Release(1)
635+
func (e *EthereumExecutionModule) runPostForkchoice(headNumber uint64, initialCycle bool) error {
636+
var timings []interface{}
637+
if headNumber >= startPruneFrom {
628638
pruneStart := time.Now()
629639
defer UpdateForkChoicePruneDuration(pruneStart)
630640
if err := e.db.Update(e.bacgroundCtx, func(tx kv.RwTx) error {
@@ -634,16 +644,14 @@ func (e *EthereumExecutionModule) runPostForkchoiceInBackground(initialCycle boo
634644
if pruneTimings := e.executionPipeline.PrintTimings(); len(pruneTimings) > 0 {
635645
timings = append(timings, pruneTimings...)
636646
}
637-
638647
return nil
639648
}); err != nil {
640-
e.logger.Error("runPostForkchoiceInBackground", "error", err)
641-
return
649+
return err
642650
}
643-
644-
if len(timings) > 0 {
645-
timings = append(timings, "initialCycle", initialCycle)
646-
e.logger.Info("Timings: Post-Forkchoice", timings...)
647-
}
648-
}()
651+
}
652+
if len(timings) > 0 {
653+
timings = append(timings, "initialCycle", initialCycle)
654+
e.logger.Info("Timings: Post-Forkchoice", timings...)
655+
}
656+
return nil
649657
}

execution/tests/mock/mock_sentry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
548548
pipelineStages := stageloop.NewPipelineStages(mock.Ctx, db, &cfg, mock.sentriesClient, mock.Notifications, snapDownloader, mock.BlockReader, blockRetire, nil, forkValidator, tracer)
549549
mock.posStagedSync = stagedsync.New(cfg.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ModeApplyingBlocks)
550550

551-
mock.Eth1ExecutionService = eth1.NewEthereumExecutionModule(mock.BlockReader, mock.DB, mock.posStagedSync, forkValidator, mock.ChainConfig, assembleBlockPOS, nil, mock.Notifications.Accumulator, mock.Notifications.RecentLogs, mock.Notifications.StateChangesConsumer, logger, engine, cfg.Sync, ctx)
551+
mock.Eth1ExecutionService = eth1.NewEthereumExecutionModule(mock.BlockReader, mock.DB, mock.posStagedSync, forkValidator, mock.ChainConfig, assembleBlockPOS, nil, mock.Notifications.Accumulator, mock.Notifications.RecentLogs, mock.Notifications.StateChangesConsumer, logger, engine, cfg.Sync, ctx, cfg.FcuBackgroundPrune)
552552

553553
mock.sentriesClient.Hd.StartPoSDownloader(mock.Ctx, sendHeaderRequest, penalize)
554554

node/cli/default_flags.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ var DefaultFlags = []cli.Flag{
4343
&PruneBlocksDistanceFlag,
4444
&PruneModeFlag,
4545
&utils.KeepExecutionProofsFlag,
46-
46+
&utils.FcuTimeoutFlag,
47+
&utils.FcuBackgroundPruneFlag,
4748
&BatchSizeFlag,
4849
&BodyCacheLimitFlag,
4950
&DatabaseVerbosityFlag,

node/eth/backend.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -990,12 +990,12 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
990990

991991
pipelineStages := stageloop.NewPipelineStages(ctx, backend.chainDB, config, backend.sentriesClient, backend.notifications, backend.downloaderClient, blockReader, blockRetire, backend.silkworm, backend.forkValidator, tracer)
992992
backend.pipelineStagedSync = stagedsync.New(config.Sync, pipelineStages, stagedsync.PipelineUnwindOrder, stagedsync.PipelinePruneOrder, logger, stages.ModeApplyingBlocks)
993-
backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.RecentLogs, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx)
993+
backend.eth1ExecutionServer = eth1.NewEthereumExecutionModule(blockReader, backend.chainDB, backend.pipelineStagedSync, backend.forkValidator, chainConfig, assembleBlockPOS, hook, backend.notifications.Accumulator, backend.notifications.RecentLogs, backend.notifications.StateChangesConsumer, logger, backend.engine, config.Sync, ctx, config.FcuBackgroundPrune)
994994
executionRpc := direct.NewExecutionClientDirect(backend.eth1ExecutionServer)
995995

996996
var executionEngine executionclient.ExecutionEngine
997997

998-
executionEngine, err = executionclient.NewExecutionClientDirect(eth1_chain_reader.NewChainReaderEth1(chainConfig, executionRpc, 1000), txPoolRpcClient)
998+
executionEngine, err = executionclient.NewExecutionClientDirect(eth1_chain_reader.NewChainReaderEth1(chainConfig, executionRpc, uint64(config.FcuTimeout.Milliseconds())), txPoolRpcClient)
999999
if err != nil {
10001000
return nil, err
10011001
}

node/ethconfig/config.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ var Defaults = Config{
117117

118118
ErigonDBStepSize: config3.DefaultStepSize,
119119
ErigonDBStepsInFrozenFile: config3.DefaultStepsInFrozenFile,
120+
FcuTimeout: 1 * time.Second,
121+
FcuBackgroundPrune: true,
120122
}
121123

122124
const DefaultChainDBPageSize = 16 * datasize.KB
@@ -280,7 +282,8 @@ type Config struct {
280282
ErigonDBStepsInFrozenFile int
281283

282284
// fork choice update timeout
283-
FcuTimeout time.Duration
285+
FcuTimeout time.Duration
286+
FcuBackgroundPrune bool
284287
}
285288

286289
type Sync struct {

0 commit comments

Comments
 (0)