Skip to content

Commit 406d855

Browse files
[e30] stage_custom_trace: more aggressive prune (#15225)
1 parent e17ba2a commit 406d855

File tree

22 files changed

+318
-385
lines changed

22 files changed

+318
-385
lines changed

cmd/integration/commands/stages.go

Lines changed: 71 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"fmt"
2424
"os"
2525
"path/filepath"
26+
"runtime"
2627
"slices"
2728
"strings"
2829
"sync"
@@ -51,7 +52,6 @@ import (
5152
libstate "github.com/erigontech/erigon-lib/state"
5253
"github.com/erigontech/erigon-lib/state/stats"
5354
"github.com/erigontech/erigon-lib/wrap"
54-
5555
"github.com/erigontech/erigon/cl/clparams"
5656
"github.com/erigontech/erigon/cmd/hack/tool/fromdb"
5757
"github.com/erigontech/erigon/consensus"
@@ -62,6 +62,7 @@ import (
6262
"github.com/erigontech/erigon/core/types"
6363
"github.com/erigontech/erigon/core/vm"
6464
"github.com/erigontech/erigon/eth/ethconfig"
65+
"github.com/erigontech/erigon/eth/ethconfig/estimate"
6566
"github.com/erigontech/erigon/eth/ethconsensusconfig"
6667
"github.com/erigontech/erigon/eth/integrity"
6768
"github.com/erigontech/erigon/eth/stagedsync"
@@ -607,49 +608,52 @@ func init() {
607608

608609
func stageSnapshots(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
609610
br, bw := blocksIO(db, logger)
610-
_, _, _, _, _ = newSync(ctx, db, nil /* miningConfig */, logger)
611611

612-
return db.Update(ctx, func(tx kv.RwTx) error {
613-
if reset {
614-
if err := stages.SaveStageProgress(tx, stages.Snapshots, 0); err != nil {
615-
return fmt.Errorf("saving Snapshots progress failed: %w", err)
616-
}
617-
}
618-
dirs := datadir.New(datadirCli)
619-
if err := reset2.ResetBlocks(tx, br, bw, dirs, logger); err != nil {
620-
return fmt.Errorf("resetting blocks: %w", err)
621-
}
612+
tx, err := db.BeginTemporalRw(ctx)
613+
if err != nil {
614+
return err
615+
}
616+
defer tx.Rollback()
622617

623-
temporalTx, ok := tx.(kv.TemporalTx)
624-
if !ok {
625-
return errors.New("tx is not a temporal tx")
626-
}
627-
domains, err := libstate.NewSharedDomains(temporalTx, logger)
628-
if err != nil {
629-
return err
618+
if reset {
619+
if err := stages.SaveStageProgress(tx, stages.Snapshots, 0); err != nil {
620+
return fmt.Errorf("saving Snapshots progress failed: %w", err)
630621
}
631-
defer domains.Close()
632-
//txnUm := domains.TxNum()
633-
blockNum := domains.BlockNum()
622+
}
623+
dirs := datadir.New(datadirCli)
624+
if err := reset2.ResetBlocks(tx, br, bw, dirs, logger); err != nil {
625+
return fmt.Errorf("resetting blocks: %w", err)
626+
}
627+
628+
temporalTx, ok := tx.(kv.TemporalTx)
629+
if !ok {
630+
return errors.New("tx is not a temporal tx")
631+
}
632+
domains, err := libstate.NewSharedDomains(temporalTx, logger)
633+
if err != nil {
634+
return err
635+
}
636+
defer domains.Close()
637+
//txnUm := domains.TxNum()
638+
blockNum := domains.BlockNum()
639+
640+
// stagedsync.SpawnStageSnapshots(s, ctx, rwTx, logger)
641+
progress, err := stages.GetStageProgress(tx, stages.Snapshots)
642+
if err != nil {
643+
return fmt.Errorf("re-read Snapshots progress: %w", err)
644+
}
634645

635-
// stagedsync.SpawnStageSnapshots(s, ctx, rwTx, logger)
636-
progress, err := stages.GetStageProgress(tx, stages.Snapshots)
646+
if blockNum > progress {
647+
if err := stages.SaveStageProgress(tx, stages.Execution, blockNum); err != nil {
648+
return fmt.Errorf("saving Snapshots progress failed: %w", err)
649+
}
650+
progress, err = stages.GetStageProgress(tx, stages.Snapshots)
637651
if err != nil {
638652
return fmt.Errorf("re-read Snapshots progress: %w", err)
639653
}
640-
641-
if blockNum > progress {
642-
if err := stages.SaveStageProgress(tx, stages.Execution, blockNum); err != nil {
643-
return fmt.Errorf("saving Snapshots progress failed: %w", err)
644-
}
645-
progress, err = stages.GetStageProgress(tx, stages.Snapshots)
646-
if err != nil {
647-
return fmt.Errorf("re-read Snapshots progress: %w", err)
648-
}
649-
}
650-
logger.Info("Progress", "snapshots", progress)
651-
return nil
652-
})
654+
}
655+
logger.Info("Progress", "snapshots", progress)
656+
return nil
653657
}
654658

655659
func stageHeaders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
@@ -659,7 +663,6 @@ func stageHeaders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) er
659663
}
660664

661665
br, bw := blocksIO(db, logger)
662-
_, _, _, _, _ = newSync(ctx, db, nil /* miningConfig */, logger)
663666

664667
if integritySlow {
665668
if err := db.View(ctx, func(tx kv.Tx) error {
@@ -741,7 +744,7 @@ func stageHeaders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) er
741744
}
742745

743746
func stageBorHeimdall(db kv.TemporalRwDB, ctx context.Context, unwindTypes []string, logger log.Logger) error {
744-
engine, _, sync, _, miningState := newSync(ctx, db, nil /* miningConfig */, logger)
747+
_, engine, _, sync, _, miningState := newSync(ctx, db, nil /* miningConfig */, logger)
745748
chainConfig := fromdb.ChainConfig(db)
746749

747750
heimdallClient := engine.(*bor.Bor).HeimdallClient
@@ -819,7 +822,7 @@ func stageBorHeimdall(db kv.TemporalRwDB, ctx context.Context, unwindTypes []str
819822

820823
func stageBodies(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
821824
chainConfig := fromdb.ChainConfig(db)
822-
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
825+
_, _, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
823826
br, bw := blocksIO(db, logger)
824827

825828
if err := db.Update(ctx, func(tx kv.RwTx) error {
@@ -853,7 +856,7 @@ func stageBodies(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) err
853856

854857
func stagePolygonSync(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
855858
dirs := datadir.New(datadirCli)
856-
engine, _, stageSync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
859+
_, engine, _, stageSync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
857860
heimdallClient := engine.(*bor.Bor).HeimdallClient
858861
_, _, _, _, bridgeStore, heimdallStore, err := allSnapshots(ctx, db, logger)
859862
if err != nil {
@@ -886,7 +889,7 @@ func stagePolygonSync(db kv.TemporalRwDB, ctx context.Context, logger log.Logger
886889
func stageSenders(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
887890
tmpdir := datadir.New(datadirCli).Tmp
888891
chainConfig := fromdb.ChainConfig(db)
889-
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
892+
_, _, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
890893

891894
must(sync.SetCurrentStage(stages.Senders))
892895

@@ -972,7 +975,7 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error
972975
return err
973976
}
974977

975-
engine, vmConfig, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
978+
_, engine, vmConfig, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
976979
must(sync.SetCurrentStage(stages.Execution))
977980
_, _, agg, _, _, _, err := allSnapshots(ctx, db, logger)
978981
if err != nil {
@@ -1135,13 +1138,14 @@ func stageCustomTrace(db kv.TemporalRwDB, ctx context.Context, logger log.Logger
11351138
return err
11361139
}
11371140

1138-
engine, vmConfig, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
1141+
br, engine, vmConfig, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
11391142
must(sync.SetCurrentStage(stages.Execution))
11401143

11411144
chainConfig := fromdb.ChainConfig(db)
11421145
genesis := core.GenesisBlockByChainName(chain)
1143-
br, _ := blocksIO(db, logger)
1144-
cfg := stagedsync.StageCustomTraceCfg(strings.Split(domain, ","), db, dirs, br, chainConfig, engine, genesis, &syncCfg)
1146+
blockReader, _ := blocksIO(db, logger)
1147+
1148+
cfg := stagedsync.StageCustomTraceCfg(strings.Split(domain, ","), db, dirs, blockReader, chainConfig, engine, genesis, &syncCfg)
11451149
if reset {
11461150
tx, err := db.BeginTemporalRw(ctx)
11471151
if err != nil {
@@ -1189,6 +1193,18 @@ func stageCustomTrace(db kv.TemporalRwDB, ctx context.Context, logger log.Logger
11891193
var batchSize datasize.ByteSize
11901194
must(batchSize.UnmarshalText([]byte(batchSizeStr)))
11911195

1196+
agg := db.(libstate.HasAgg).Agg().(*libstate.Aggregator)
1197+
defer br.(*freezeblocks.BlockRetire).MadvNormal().DisableReadAhead()
1198+
//defer agg.MadvNormal().DisableReadAhead()
1199+
blockSnapBuildSema := semaphore.NewWeighted(int64(runtime.NumCPU()))
1200+
agg.SetSnapshotBuildSema(blockSnapBuildSema)
1201+
1202+
// `erigon retire` command is designed to maximize resouces utilization. But `Erigon itself` does minimize background impact (because not in rush).
1203+
agg.SetCollateAndBuildWorkers(estimate.StateV3Collate.Workers())
1204+
agg.SetMergeWorkers(2)
1205+
agg.SetCompressWorkers(estimate.CompressSnapshot.WorkersHalf())
1206+
agg.PeriodicalyPrintProcessSet(ctx)
1207+
11921208
err := stagedsync.SpawnCustomTrace(cfg, ctx, logger)
11931209
if err != nil {
11941210
return err
@@ -1206,7 +1222,7 @@ func stagePatriciaTrie(db kv.TemporalRwDB, ctx context.Context, logger log.Logge
12061222
}
12071223
defer sn.Close()
12081224
defer agg.Close()
1209-
_, _, _, _, _ = newSync(ctx, db, nil /* miningConfig */, logger)
1225+
_, _, _, _, _, _ = newSync(ctx, db, nil /* miningConfig */, logger)
12101226

12111227
if reset {
12121228
return reset2.Reset(ctx, db, stages.Execution)
@@ -1224,7 +1240,7 @@ func stagePatriciaTrie(db kv.TemporalRwDB, ctx context.Context, logger log.Logge
12241240

12251241
func stageTxLookup(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error {
12261242
dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db)
1227-
_, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
1243+
_, _, _, sync, _, _ := newSync(ctx, db, nil /* miningConfig */, logger)
12281244
chainConfig := fromdb.ChainConfig(db)
12291245
must(sync.SetCurrentStage(stages.TxLookup))
12301246
if reset {
@@ -1437,7 +1453,9 @@ func blocksIO(db kv.RwDB, logger log.Logger) (services.FullBlockReader, *blockio
14371453

14381454
const blockBufferSize = 128
14391455

1440-
func newSync(ctx context.Context, db kv.TemporalRwDB, miningConfig *params.MiningConfig, logger log.Logger) (consensus.Engine, *vm.Config, *stagedsync.Sync, *stagedsync.Sync, stagedsync.MiningState) {
1456+
func newSync(ctx context.Context, db kv.TemporalRwDB, miningConfig *params.MiningConfig, logger log.Logger) (
1457+
services.BlockRetire, consensus.Engine, *vm.Config, *stagedsync.Sync, *stagedsync.Sync, stagedsync.MiningState,
1458+
) {
14411459
dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db)
14421460

14431461
vmConfig := &vm.Config{}
@@ -1511,7 +1529,6 @@ func newSync(ctx context.Context, db kv.TemporalRwDB, miningConfig *params.Minin
15111529
agg.SetSnapshotBuildSema(blockSnapBuildSema)
15121530

15131531
notifications := shards.NewNotifications(nil)
1514-
blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, db, nil, nil, chainConfig, &cfg, notifications.Events, blockSnapBuildSema, logger)
15151532

15161533
var (
15171534
snapDb kv.RwDB
@@ -1527,6 +1544,9 @@ func newSync(ctx context.Context, db kv.TemporalRwDB, miningConfig *params.Minin
15271544
bridgeStore = bridge.NewSnapshotStore(bridge.NewDbStore(db), borSn, chainConfig.Bor)
15281545
heimdallStore = heimdall.NewSnapshotStore(heimdall.NewDbStore(db), borSn)
15291546
}
1547+
borSn.DownloadComplete() // mark as ready
1548+
blockRetire := freezeblocks.NewBlockRetire(estimate.CompressSnapshot.Workers(), dirs, blockReader, blockWriter, db, heimdallStore, bridgeStore, chainConfig, &cfg, notifications.Events, blockSnapBuildSema, logger)
1549+
15301550
stageList := stages2.NewDefaultStages(context.Background(), db, snapDb, p2p.Config{}, &cfg, sentryControlServer, notifications, nil, blockReader, blockRetire, nil, nil,
15311551
heimdallClient, heimdallStore, bridgeStore, recents, signatures, logger)
15321552
sync := stagedsync.New(cfg.Sync, stageList, stagedsync.DefaultUnwindOrder, stagedsync.DefaultPruneOrder, logger, stages.ModeApplyingBlocks)
@@ -1571,7 +1591,7 @@ func newSync(ctx context.Context, db kv.TemporalRwDB, miningConfig *params.Minin
15711591
stages.ModeBlockProduction,
15721592
)
15731593

1574-
return engine, vmConfig, sync, miningSync, miner
1594+
return blockRetire, engine, vmConfig, sync, miningSync, miner
15751595
}
15761596

15771597
func progress(tx kv.Getter, stage stages.SyncStage) uint64 {

cmd/integration/commands/state_stages.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ func syncBySmallSteps(db kv.TemporalRwDB, miningConfig params.MiningConfig, ctx
153153
return err
154154
}
155155

156-
engine, vmConfig, stateStages, miningStages, miner := newSync(ctx, db, &miningConfig, logger1)
156+
_, engine, vmConfig, stateStages, miningStages, miner := newSync(ctx, db, &miningConfig, logger1)
157157
chainConfig, pm := fromdb.ChainConfig(db), fromdb.PruneMode(db)
158158

159159
tx, err := db.BeginRw(ctx)
@@ -380,7 +380,7 @@ func checkMinedBlock(b1, b2 *types.Block, chainConfig *chain2.Config) {
380380
func loopExec(db kv.TemporalRwDB, ctx context.Context, unwind uint64, logger log.Logger) error {
381381
chainConfig := fromdb.ChainConfig(db)
382382
dirs, pm := datadir.New(datadirCli), fromdb.PruneMode(db)
383-
engine, vmConfig, sync, _, _ := newSync(ctx, db, nil, logger)
383+
_, engine, vmConfig, sync, _, _ := newSync(ctx, db, nil, logger)
384384

385385
tx, err := db.BeginRw(ctx)
386386
if err != nil {

cmd/rpctest/rpctest/bench_ethgetlogs.go

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,18 @@ func EthGetLogsInvariants(erigonURL, gethURL string, needCompare bool, blockFrom
159159
logEvery := time.NewTicker(20 * time.Second)
160160
defer logEvery.Stop()
161161

162+
noDuplicates := func(logs []Log, blockNum uint64, addr common.Address) error {
163+
if len(logs) <= 1 {
164+
return nil
165+
}
166+
//for i := 1; i < len(logs); i++ {
167+
// if logs[i-1].TxIndex == logs[i].TxIndex {
168+
// return fmt.Errorf("eth_getLogs: at blockNum=%d and addr %x has duplicated logs", blockNum, addr)
169+
// }
170+
//}
171+
return nil
172+
}
173+
162174
_prevBn := blockFrom
163175
for bn := blockFrom; bn < blockTo; {
164176
batchEnd := min(bn+1000, blockTo)
@@ -192,17 +204,22 @@ func EthGetLogsInvariants(erigonURL, gethURL string, needCompare bool, blockFrom
192204
if resp.Error != nil {
193205
return fmt.Errorf("Error getting modified accounts (Erigon): %d %s\n", resp.Error.Code, resp.Error.Message)
194206
}
195-
//invariant1: if `log` visible without filter - then must be visible with filter. (in another words: `address` must be indexed well)
207+
//invariant1.1: if `log` visible without filter - then must be visible with filter. (in another words: `address` must be indexed well)
196208
if len(resp.Result) == 0 {
197209
return fmt.Errorf("eth_getLogs: at blockNum=%d account %x not indexed", bn, l.Address)
198210
}
199211

200-
//invariant2: if `log` visible without filter - then must be visible with filter. (in another words: `topic` must be indexed well)
212+
//invariant1.2: no repeats
213+
if err := noDuplicates(resp.Result, bn, l.Address); err != nil {
214+
return err
215+
}
216+
217+
//invariant2.1: if `log` visible without filter - then must be visible with filter. (in another words: `topic` must be indexed well)
201218
if len(l.Topics) == 0 {
202219
continue
203220
}
204221

205-
if _, ok := sawTopic[l.Topics[0]]; ok { //nolint:staticcheck // SA4006: false positive
222+
if _, ok := sawTopic[l.Topics[0]]; ok {
206223
continue
207224
}
208225

@@ -218,6 +235,10 @@ func EthGetLogsInvariants(erigonURL, gethURL string, needCompare bool, blockFrom
218235
if len(resp.Result) == 0 {
219236
return fmt.Errorf("eth_getLogs: at blockNum=%d account %x, topic %x not indexed", bn, l.Address, l.Topics[0])
220237
}
238+
//invariant2.2: no repeats
239+
if err := noDuplicates(resp.Result, bn, l.Address); err != nil {
240+
return err
241+
}
221242
}
222243

223244
select {

0 commit comments

Comments
 (0)