Skip to content

Commit 35ac726

Browse files
committed
Merge remote-tracking branch 'origin/main' into fix/parallel-exec-prague-setcode
2 parents b3ceb41 + 97ff85c commit 35ac726

38 files changed

Lines changed: 1036 additions & 807 deletions

cl/phase1/core/checkpoint_sync/checkpoint_sync_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,18 @@ func TestRemoteCheckpointSyncTimeout(t *testing.T) {
9090
require.True(t, errors.Is(err, context.DeadlineExceeded))
9191
}
9292

93+
func TestRemoteCheckpointSyncCancel(t *testing.T) {
94+
ctx, cancel := context.WithCancel(context.Background())
95+
cancel()
96+
97+
clparams.ConfigurableCheckpointsURLs = []string{"http://127.0.0.1:1"}
98+
syncer := NewRemoteCheckpointSync(&clparams.MainnetBeaconConfig, chainspec.MainnetChainID)
99+
currentState, err := syncer.GetLatestBeaconState(ctx)
100+
101+
require.Nil(t, currentState)
102+
require.ErrorIs(t, err, context.Canceled)
103+
}
104+
93105
func TestRemoteCheckpointSyncPossiblyAfterTimeout(t *testing.T) {
94106
if testing.Short() {
95107
t.Skip()

cl/phase1/core/checkpoint_sync/remote_checkpoint_sync.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,12 @@ func (r *RemoteCheckpointSync) GetLatestBeaconState(ctx context.Context) (*state
6060
}
6161
marshaled, err := io.ReadAll(resp.Body)
6262
if err != nil {
63-
return nil, fmt.Errorf("checkpoint sync read failed %s", err)
63+
return nil, fmt.Errorf("checkpoint sync read failed: %w", err)
6464
}
6565

6666
slot, err := utils.ExtractSlotFromSerializedBeaconState(marshaled)
6767
if err != nil {
68-
return nil, fmt.Errorf("checkpoint sync read failed %s", err)
68+
return nil, fmt.Errorf("checkpoint sync read failed: %w", err)
6969
}
7070

7171
epoch := slot / r.beaconConfig.SlotsPerEpoch
@@ -86,6 +86,9 @@ func (r *RemoteCheckpointSync) GetLatestBeaconState(ctx context.Context) (*state
8686
if err == nil {
8787
return beaconState, nil
8888
}
89+
if errors.Is(err, context.Canceled) {
90+
return nil, err
91+
}
8992
log.Warn("[Checkpoint Sync] Failed to fetch beacon state", "uri", uri, "err", err)
9093
}
9194
return nil, err

cl/phase1/core/checkpoint_sync/util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package checkpoint_sync
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"path/filepath"
78

@@ -27,6 +28,9 @@ func ReadOrFetchLatestBeaconState(ctx context.Context, dirs datadir.Dirs, beacon
2728
if err == nil {
2829
return st, nil
2930
}
31+
if errors.Is(err, context.Canceled) {
32+
return nil, err
33+
}
3034
log.Warn("[Checkpoint Sync] Remote checkpoint sync failed, attempting to read local head state", "err", err)
3135

3236
// Fallback: try to read the local head state from disk

cmd/evm/enginexrunner.go

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
// Copyright 2026 The Erigon Authors
2+
// This file is part of Erigon.
3+
//
4+
// Erigon is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// Erigon is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with Erigon. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"errors"
23+
"fmt"
24+
"os"
25+
"path/filepath"
26+
"regexp"
27+
"sort"
28+
"strings"
29+
"sync"
30+
31+
"github.com/urfave/cli/v2"
32+
33+
"github.com/erigontech/erigon/common/log/v3"
34+
"github.com/erigontech/erigon/execution/engineapi/engineapitester"
35+
)
36+
37+
var engineXTestCommand = cli.Command{
38+
Action: engineXTestCmd,
39+
Name: "enginextest",
40+
Usage: "Executes engine-x test fixtures using the existing EngineXTestRunner",
41+
ArgsUsage: "<path>",
42+
Description: "Each test runs in its own short-lived Erigon node whose datadir lives\n" +
43+
"under $TMPDIR (default /tmp). On a journaled filesystem like ext4 the\n" +
44+
"per-tester create/unlink work serialises through the journal and\n" +
45+
"dominates wall time. Pointing $TMPDIR at a RAM-backed filesystem cuts\n" +
46+
"wall time by ~2× on Linux. Easiest cross-platform setup:\n" +
47+
"\n" +
48+
" RAMDISK=$(./tools/create-ramdisk)\n" +
49+
" TMPDIR=$RAMDISK evm enginextest ...\n" +
50+
"\n" +
51+
"On Linux you can also use /dev/shm directly (TMPDIR=/dev/shm), which is\n" +
52+
"a tmpfs sized at half of RAM by default. ~5GB free is plenty for the\n" +
53+
"full EEST engine_x set.",
54+
Flags: []cli.Flag{
55+
&cli.StringFlag{
56+
Name: "pre-alloc-dir",
57+
Usage: "Directory containing engine-x pre-alloc JSON files",
58+
Required: true,
59+
},
60+
&RunFlag,
61+
&VerbosityFlag,
62+
&WorkersFlag,
63+
},
64+
}
65+
66+
type engineXNamedTest struct {
67+
name string
68+
def engineapitester.EngineXTestDefinition
69+
}
70+
71+
type engineXGroupKey struct {
72+
fork engineapitester.Fork
73+
hash engineapitester.PreAllocHash
74+
}
75+
76+
func engineXTestCmd(cliCtx *cli.Context) error {
77+
if cliCtx.Int(VerbosityFlag.Name) > 0 {
78+
log.Root().SetHandler(log.LvlFilterHandler(log.Lvl(cliCtx.Int(VerbosityFlag.Name)), log.StderrHandler))
79+
} else {
80+
log.Root().SetHandler(log.LvlFilterHandler(log.LvlError, log.StderrHandler))
81+
}
82+
83+
path := cliCtx.Args().First()
84+
if path == "" {
85+
return errors.New("path argument required")
86+
}
87+
preAllocDir := cliCtx.String("pre-alloc-dir")
88+
if preAllocDir == "" {
89+
return errors.New("--pre-alloc-dir is required")
90+
}
91+
re, err := regexp.Compile(cliCtx.String(RunFlag.Name))
92+
if err != nil {
93+
return fmt.Errorf("invalid --run regex: %w", err)
94+
}
95+
workers := cliCtx.Int(WorkersFlag.Name)
96+
if workers <= 0 {
97+
// Knee of the wall-time vs. RAM curve on the full EEST engine_x set
98+
// (~64k tests / ~36k groups): workers=8 reaches plateau speedup at
99+
// ~4.3GB peak RSS. Higher values barely improve wall time and
100+
// values >24 risk MDBX virtual-memory exhaustion.
101+
workers = 8
102+
}
103+
104+
ctx, cancel := context.WithCancel(cliCtx.Context)
105+
defer cancel()
106+
107+
groups, totalTests, err := loadEngineXGroups(path, re)
108+
if err != nil {
109+
return err
110+
}
111+
if workers > len(groups) && len(groups) > 0 {
112+
workers = len(groups)
113+
}
114+
fmt.Fprintf(os.Stderr, "Collected %d tests across %d (fork, preAllocHash) groups; running with %d workers\n", totalTests, len(groups), workers)
115+
116+
if totalTests == 0 {
117+
report(cliCtx, nil)
118+
return nil
119+
}
120+
121+
runner, err := engineapitester.NewEngineXTestRunner(ctx, log.Root(), preAllocDir)
122+
if err != nil {
123+
return fmt.Errorf("create runner: %w", err)
124+
}
125+
defer func() {
126+
cerr := runner.Close()
127+
if cerr != nil {
128+
fmt.Fprintf(os.Stderr, "runner.Close: %v\n", cerr)
129+
}
130+
}()
131+
132+
groupKeys := make([]engineXGroupKey, 0, len(groups))
133+
for k := range groups {
134+
groupKeys = append(groupKeys, k)
135+
}
136+
137+
groupCh := make(chan engineXGroupKey)
138+
resultCh := make(chan testResult, totalTests)
139+
140+
var wg sync.WaitGroup
141+
for w := 0; w < workers; w++ {
142+
wg.Add(1)
143+
go func() {
144+
defer wg.Done()
145+
for key := range groupCh {
146+
runEngineXGroup(ctx, runner, key, groups[key], resultCh)
147+
}
148+
}()
149+
}
150+
151+
go func() {
152+
defer close(groupCh)
153+
for _, key := range groupKeys {
154+
select {
155+
case <-ctx.Done():
156+
return
157+
case groupCh <- key:
158+
}
159+
}
160+
}()
161+
162+
go func() {
163+
wg.Wait()
164+
close(resultCh)
165+
}()
166+
167+
results := make([]testResult, 0, totalTests)
168+
for r := range resultCh {
169+
results = append(results, r)
170+
}
171+
172+
sort.Slice(results, func(i, j int) bool { return results[i].Name < results[j].Name })
173+
174+
report(cliCtx, results)
175+
return nil
176+
}
177+
178+
// loadEngineXGroups walks path for JSON files, parses each, filters tests by
179+
// the regex, and groups them by (fork, preAllocHash). Each group is the unit
180+
// of execution given to a worker — a worker creates one EngineApiTester for
181+
// the group's (fork, preAllocHash), runs all tests in the group sequentially
182+
// on that tester, then evicts it.
183+
//
184+
// Files under any pre_alloc/ directory are skipped: those are pre-allocation
185+
// inputs (consumed via --pre-alloc-dir), not test fixtures, and they don't
186+
// follow the engine-x JSON schema. Every other JSON in the tree is expected
187+
// to be a valid engine-x fixture; an unmarshal error is treated as a hard
188+
// failure rather than silently skipped.
189+
func loadEngineXGroups(path string, re *regexp.Regexp) (map[engineXGroupKey][]engineXNamedTest, int, error) {
190+
files := collectFiles(path)
191+
groups := make(map[engineXGroupKey][]engineXNamedTest)
192+
total := 0
193+
for _, fname := range files {
194+
if isUnderPreAlloc(fname) {
195+
continue
196+
}
197+
src, err := os.ReadFile(fname)
198+
if err != nil {
199+
return nil, 0, fmt.Errorf("read %s: %w", fname, err)
200+
}
201+
var tests map[string]engineapitester.EngineXTestDefinition
202+
err = json.Unmarshal(src, &tests)
203+
if err != nil {
204+
return nil, 0, fmt.Errorf("unmarshal %s: %w", fname, err)
205+
}
206+
for name, def := range tests {
207+
if !re.MatchString(name) {
208+
continue
209+
}
210+
key := engineXGroupKey{fork: def.Fork, hash: def.PreAllocHash}
211+
groups[key] = append(groups[key], engineXNamedTest{name: name, def: def})
212+
total++
213+
}
214+
}
215+
return groups, total, nil
216+
}
217+
218+
// isUnderPreAlloc reports whether any directory component of p is named
219+
// "pre_alloc" — those are pre-allocation inputs, not test fixtures.
220+
func isUnderPreAlloc(p string) bool {
221+
for _, c := range strings.Split(filepath.ToSlash(p), "/") {
222+
if c == "pre_alloc" {
223+
return true
224+
}
225+
}
226+
return false
227+
}
228+
229+
// runEngineXGroup executes every test in the group sequentially on a single
230+
// tester (created lazily by the runner) then evicts the tester to free its
231+
// node and temp directory before returning. Results are streamed to resultCh
232+
// so the parent goroutine can collect across all workers.
233+
func runEngineXGroup(
234+
ctx context.Context,
235+
runner *engineapitester.EngineXTestRunner,
236+
key engineXGroupKey,
237+
tests []engineXNamedTest,
238+
resultCh chan<- testResult,
239+
) {
240+
defer func() {
241+
err := runner.Evict(key.fork, key.hash)
242+
if err != nil {
243+
fmt.Fprintf(os.Stderr, "evict fork=%s preAllocHash=%s: %v\n", key.fork, key.hash, err)
244+
}
245+
}()
246+
for _, t := range tests {
247+
r := testResult{Name: t.name, Pass: true}
248+
err := runner.Run(ctx, t.def)
249+
if err != nil {
250+
r.Pass = false
251+
r.Error = err.Error()
252+
}
253+
resultCh <- r
254+
}
255+
}

cmd/evm/main.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,16 @@ var (
131131
Name: "noreturndata",
132132
Usage: "disable return data output",
133133
}
134+
RunFlag = cli.StringFlag{
135+
Name: "run",
136+
Value: ".*",
137+
Usage: "Run only those tests matching the regular expression.",
138+
}
139+
WorkersFlag = cli.IntFlag{
140+
Name: "workers",
141+
Value: 0,
142+
Usage: "Number of workers to execute tests in parallel (0 means use the command's default)",
143+
}
134144
)
135145

136146
var stateTransitionCommand = cli.Command{
@@ -187,6 +197,7 @@ func init() {
187197
&disasmCommand,
188198
&runCommand,
189199
&blockTestCommand,
200+
&engineXTestCommand,
190201
&stateTestCommand,
191202
&stateTransitionCommand,
192203
}

cmd/integration/commands/stages.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ func stageSenders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) er
606606
return err
607607
}
608608

609-
cfg := stagedsync.StageSendersCfg(chainConfig, sync.Cfg(), false /* badBlockHalt */, tmpdir, pm, br, nil /* hd */)
609+
cfg := stagedsync.StageSendersCfg(chainConfig, sync.Cfg(), false /* badBlockHalt */, tmpdir, pm, br, nil /* hd */, exec.NewBlockReadAheader())
610610
if unwind > 0 {
611611
if unwind > s.BlockNumber {
612612
return errors.New("cannot unwind past 0")
@@ -680,7 +680,7 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error
680680
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, chainConfig, engine, vmConfig, notifications,
681681
/*stateStream=*/ false,
682682
/*badBlockHalt=*/ true,
683-
dirs, br, nil, genesis, syncCfg, false /*experimentalBAL*/)
683+
dirs, br, nil, genesis, syncCfg, false /*experimentalBAL*/, exec.NewBlockReadAheader())
684684

685685
if unwind > 0 {
686686
if err := db.ViewTemporal(ctx, func(tx kv.TemporalTx) error {
@@ -1258,7 +1258,7 @@ func newSync(ctx context.Context, db kv.TemporalRwDB, builderConfig *buildercfg.
12581258
}
12591259
notifications := shards.NewNotifications(nil)
12601260
blockRetire := freezeblocks.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs, blockReader, blockWriter, db, heimdallStore, bridgeStore, chainConfig, &cfg, notifications.Events, blockSnapBuildSema, logger)
1261-
stageList := stageloop.NewDefaultStages(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, nil, nil)
1261+
stageList := stageloop.NewDefaultStages(context.Background(), db, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, nil, nil, exec.NewBlockReadAheader())
12621262
sync := stagedsync.New(cfg.Sync, stageList, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, stages.ModeApplyingBlocks)
12631263
return blockRetire, engine, vmConfig, sync
12641264
}

cmd/integration/commands/state_stages.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"github.com/erigontech/erigon/db/state/execctx"
3838
"github.com/erigontech/erigon/execution/builder/buildercfg"
3939
chainspec "github.com/erigontech/erigon/execution/chain/spec"
40+
"github.com/erigontech/erigon/execution/exec"
4041
"github.com/erigontech/erigon/execution/stagedsync"
4142
"github.com/erigontech/erigon/execution/stagedsync/stages"
4243
"github.com/erigontech/erigon/execution/tracing/tracers/logger"
@@ -183,7 +184,7 @@ func syncBySmallSteps(db kv.TemporalRwDB, builderConfig buildercfg.BuilderConfig
183184
}
184185

185186
br, _ := blocksIO(db, logger1)
186-
execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, chainConfig, engine, vmConfig, notifications, false, true, dirs, br, nil, spec.Genesis, syncCfg, false)
187+
execCfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, chainConfig, engine, vmConfig, notifications, false, true, dirs, br, nil, spec.Genesis, syncCfg, false, exec.NewBlockReadAheader())
187188

188189
execUntilFunc := func(execToBlock uint64) stagedsync.ExecFunc {
189190
return func(badBlockUnwind bool, s *stagedsync.StageState, unwinder stagedsync.Unwinder, doms *execctx.SharedDomains, rwTx kv.TemporalRwTx, logger log.Logger) error {
@@ -369,7 +370,7 @@ func loopExec(db kv.TemporalRwDB, ctx context.Context, unwind uint64, logger log
369370
initialCycle := false
370371
br, _ := blocksIO(db, logger)
371372
notifications := shards.NewNotifications(nil)
372-
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, chainConfig, engine, vmConfig, notifications, false, true, dirs, br, nil, spec.Genesis, syncCfg, false)
373+
cfg := stagedsync.StageExecuteBlocksCfg(db, pm, batchSize, chainConfig, engine, vmConfig, notifications, false, true, dirs, br, nil, spec.Genesis, syncCfg, false, exec.NewBlockReadAheader())
373374

374375
// set block limit of execute stage
375376
sync.MockExecFunc(stages.Execution, func(badBlockUnwind bool, stageState *stagedsync.StageState, unwinder stagedsync.Unwinder, sd *execctx.SharedDomains, tx kv.TemporalRwTx, logger log.Logger) error {

0 commit comments

Comments
 (0)