Skip to content

Commit b9d9e01

Browse files
authored
[performance] enforce block-snapshots cap inside aggregator collation (#20966)
Cherry-pick of #20852 to performance. ## performance-specific adaptations Resolved trivial conflict in `db/state/aggregator.go` (added `SetFrozenBlocksProvider` method).
2 parents 2f0099c + 11b715e commit b9d9e01

7 files changed

Lines changed: 216 additions & 24 deletions

File tree

cmd/integration/commands/stages.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,6 +1091,7 @@ func allSnapshots(ctx context.Context, db kv.RoDB, logger log.Logger) (*freezebl
10911091
_aggSingleton = aggOpts.MustOpen(ctx, db)
10921092

10931093
_aggSingleton.SetProduceMod(snapCfg.ProduceE3)
1094+
_aggSingleton.SetFrozenBlocksProvider(blockReader)
10941095

10951096
g := &errgroup.Group{}
10961097
g.Go(func() error {

cmd/utils/app/snapshots_cmd.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ import (
6868
"github.com/erigontech/erigon/db/rawdb/blockio"
6969
"github.com/erigontech/erigon/db/recsplit"
7070
"github.com/erigontech/erigon/db/seg"
71-
"github.com/erigontech/erigon/db/services"
7271
"github.com/erigontech/erigon/db/snapshotsync"
7372
"github.com/erigontech/erigon/db/snapshotsync/freezeblocks"
7473
"github.com/erigontech/erigon/db/snaptype"
@@ -3224,6 +3223,9 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {
32243223
blockSnapBuildSema := semaphore.NewWeighted(int64(runtime.NumCPU()))
32253224
agg.SetSnapshotBuildSema(blockSnapBuildSema)
32263225

3226+
blockReader, _ := br.IO()
3227+
agg.SetFrozenBlocksProvider(blockReader)
3228+
32273229
agg.PresetOfflineMerge()
32283230
agg.PeriodicalyPrintProcessSet(ctx)
32293231

@@ -3244,8 +3246,6 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {
32443246
return err
32453247
}
32463248

3247-
blockReader, _ := br.IO()
3248-
32493249
blocksInSnapshots := blockReader.FrozenBlocks()
32503250
if chainConfig.Bor != nil {
32513251
blocksInSnapshots = min(blocksInSnapshots, blockReader.FrozenBorBlocks(false))
@@ -3294,15 +3294,7 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {
32943294
if err := db.Update(ctx, func(tx kv.RwTx) error {
32953295
execProgress, _ := stages.GetStageProgress(tx, stages.Execution)
32963296
lastTxNum, err = txNumsReader.Max(ctx, tx, execProgress)
3297-
if err != nil {
3298-
return err
3299-
}
3300-
maxCollatable, err := services.MaxCollatableTxNum(ctx, tx, blockReader)
3301-
if err != nil {
3302-
return err
3303-
}
3304-
lastTxNum = min(lastTxNum, maxCollatable)
3305-
return nil
3297+
return err
33063298
}); err != nil {
33073299
return err
33083300
}

db/services/snapshot_progress.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,8 @@ import (
2222
"github.com/erigontech/erigon/db/kv"
2323
)
2424

25-
// MaxCollatableTxNum returns the upper bound txNum that state collation may
26-
// target without running ahead of block snapshot files. Callers of
27-
// Aggregator.BuildFiles / BuildFilesInBackground must cap their target txNum
28-
// by this value — otherwise state files may advance past block files, an
29-
// unrecoverable state that requires manual `erigon seg rm-state --latest` to
30-
// release.
25+
// MaxCollatableTxNum: upper bound txNum that state collation may target.
26+
// Aggregator enforces this internally via SetFrozenBlocksProvider.
3127
func MaxCollatableTxNum(ctx context.Context, tx kv.Tx, blockReader FullBlockReader) (uint64, error) {
3228
return blockReader.TxnumReader().Max(ctx, tx, blockReader.FrozenBlocks())
3329
}

db/state/aggregator.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,13 @@ type Aggregator struct {
116116
configured bool
117117
savedSalt *uint32
118118
disableFsync bool
119+
120+
// nil = no cap. See #20701.
121+
frozenBlocks FrozenBlocksProvider
122+
}
123+
124+
type FrozenBlocksProvider interface {
125+
FrozenBlocks() uint64
119126
}
120127

121128
func newAggregator(ctx context.Context, dirs datadir.Dirs, reorgBlockDepth uint64, db kv.RoDB, logger log.Logger) (*Aggregator, error) {
@@ -966,6 +973,24 @@ func (a *Aggregator) readyForCollation(ctx context.Context, step kv.Step) (lastB
966973
if a.reorgBlockDepth == 0 {
967974
return 0, 0, 0, true, nil
968975
}
976+
977+
if a.frozenBlocks != nil {
978+
var capTxNum uint64
979+
if err = a.db.View(ctx, func(tx kv.Tx) error {
980+
var err error
981+
capTxNum, err = rawdbv3.TxNums.Max(ctx, tx, a.frozenBlocks.FrozenBlocks())
982+
return err
983+
}); err != nil {
984+
return 0, 0, 0, false, fmt.Errorf("read max collatable txNum: %w", err)
985+
}
986+
if uint64(step+1)*a.StepSize() > capTxNum {
987+
a.logger.Info("[snapshots] holding state collation at block snapshot boundary",
988+
"step", step,
989+
"stepEndTxNum", fmt.Sprintf("%d (step %d)", uint64(step+1)*a.StepSize(), uint64(step+1)),
990+
"blockSnapshotsTxNum", fmt.Sprintf("%d (step %d)", capTxNum, capTxNum/a.StepSize()))
991+
return 0, 0, 0, false, nil
992+
}
993+
}
969994
err = a.db.View(ctx, func(tx kv.Tx) error {
970995
lastBlockInStep, ok, err = rawdbv3.TxNums.FindBlockNum(ctx, tx, lastTxNumOfStep(step, a.stepSize.Load()))
971996
if err != nil {
@@ -1894,6 +1919,13 @@ func (a *Aggregator) SetProduceMod(produce bool) {
18941919
a.produce = produce
18951920
}
18961921

1922+
// SetFrozenBlocksProvider caps state collation at the block-snapshots boundary.
1923+
// Without it, state files may advance past block files — recovery requires
1924+
// `erigon seg rm-state --latest`. See #20701.
1925+
func (a *Aggregator) SetFrozenBlocksProvider(p FrozenBlocksProvider) {
1926+
a.frozenBlocks = p
1927+
}
1928+
18971929
func (a *Aggregator) BuildFilesInBackground(txNum uint64) chan struct{} {
18981930
return a.buildFilesInBackground(txNum, true)
18991931
}

db/test/aggregator_ext_test.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package test
1818

1919
import (
20+
"context"
2021
"encoding/binary"
2122
"encoding/hex"
2223
"math"
@@ -768,6 +769,180 @@ func TestAggregatorV3_BuildFiles_WithReorgDepth(t *testing.T) {
768769
require.Equal(t, kv.Step(6), kv.Step(agg.EndTxNumMinimax()/agg.StepSize()))
769770
}
770771

772+
// With txnsPerBlock=1, fakeFrozenBlocks(N) yields capTxNum=N.
773+
type fakeFrozenBlocks uint64
774+
775+
func (f fakeFrozenBlocks) FrozenBlocks() uint64 { return uint64(f) }
776+
777+
// Regression for #20701: cap must clamp the loop, not just early-return.
778+
func TestAggregatorV3_BuildFiles_RespectsMaxCollatableTxNumCap(t *testing.T) {
779+
if testing.Short() {
780+
t.Skip("slow test")
781+
}
782+
ctx := t.Context()
783+
logger := log.New()
784+
dirs := datadir.New(t.TempDir())
785+
db := mdbx.New(dbcfg.ChainDB, logger).InMem(t, dirs.Chaindata).GrowthStep(32 * datasize.MB).MapSize(2 * datasize.GB).MustOpen()
786+
t.Cleanup(db.Close)
787+
agg := state.NewTest(dirs).ReorgBlockDepth(1).StepSize(2).Logger(logger).MustOpen(ctx, db)
788+
t.Cleanup(agg.Close)
789+
require.NoError(t, agg.OpenFolder())
790+
791+
// Cap at 4 steps; write 9 steps' worth into the DB.
792+
const capTxNum = uint64(8)
793+
agg.SetFrozenBlocksProvider(fakeFrozenBlocks(capTxNum))
794+
795+
tdb, err := temporal.New(db, agg)
796+
require.NoError(t, err)
797+
t.Cleanup(tdb.Close)
798+
tx, err := tdb.BeginTemporalRw(ctx)
799+
require.NoError(t, err)
800+
t.Cleanup(tx.Rollback)
801+
doms, err := execctx.NewSharedDomains(context.Background(), tx, logger)
802+
require.NoError(t, err)
803+
t.Cleanup(doms.Close)
804+
805+
const txnNums = uint64(18)
806+
const txnsPerBlock = uint64(1)
807+
for i := uint64(0); i < txnNums/txnsPerBlock; i++ {
808+
require.NoError(t, rawdbv3.TxNums.Append(tx, i+1, (i+1)*txnsPerBlock))
809+
}
810+
generateSharedDomainsUpdates(t, doms, tx, txnNums, newRnd(0), length.Addr, 10, txnsPerBlock)
811+
require.NoError(t, doms.Flush(ctx, tx))
812+
require.NoError(t, tx.Commit())
813+
814+
require.NoError(t, agg.BuildFiles(txnNums))
815+
816+
require.Equal(t, capTxNum, agg.EndTxNumMinimax(),
817+
"EndTxNumMinimax must be clamped to cap (%d), not driven by lastInDB", capTxNum)
818+
require.Equal(t, kv.Step(capTxNum/agg.StepSize()), kv.Step(agg.EndTxNumMinimax()/agg.StepSize()))
819+
}
820+
821+
// nil provider = no cap; behavior unchanged.
822+
func TestAggregatorV3_BuildFiles_NoCapHookBuildsAll(t *testing.T) {
823+
if testing.Short() {
824+
t.Skip("slow test")
825+
}
826+
ctx := t.Context()
827+
logger := log.New()
828+
dirs := datadir.New(t.TempDir())
829+
db := mdbx.New(dbcfg.ChainDB, logger).InMem(t, dirs.Chaindata).GrowthStep(32 * datasize.MB).MapSize(2 * datasize.GB).MustOpen()
830+
t.Cleanup(db.Close)
831+
agg := state.NewTest(dirs).ReorgBlockDepth(1).StepSize(2).Logger(logger).MustOpen(ctx, db)
832+
t.Cleanup(agg.Close)
833+
require.NoError(t, agg.OpenFolder())
834+
835+
tdb, err := temporal.New(db, agg)
836+
require.NoError(t, err)
837+
t.Cleanup(tdb.Close)
838+
tx, err := tdb.BeginTemporalRw(ctx)
839+
require.NoError(t, err)
840+
t.Cleanup(tx.Rollback)
841+
doms, err := execctx.NewSharedDomains(context.Background(), tx, logger)
842+
require.NoError(t, err)
843+
t.Cleanup(doms.Close)
844+
845+
const txnNums = uint64(18)
846+
const txnsPerBlock = uint64(1)
847+
for i := uint64(0); i < txnNums/txnsPerBlock; i++ {
848+
require.NoError(t, rawdbv3.TxNums.Append(tx, i+1, (i+1)*txnsPerBlock))
849+
}
850+
generateSharedDomainsUpdates(t, doms, tx, txnNums, newRnd(0), length.Addr, 10, txnsPerBlock)
851+
require.NoError(t, doms.Flush(ctx, tx))
852+
require.NoError(t, tx.Commit())
853+
854+
require.NoError(t, agg.BuildFiles(txnNums))
855+
856+
// reorg-depth=1 holds the last block back; cap unset.
857+
require.Equal(t, uint64(16), agg.EndTxNumMinimax())
858+
}
859+
860+
// BuildFiles2 path (stage_custom_trace, squeeze) must honor the cap too.
861+
func TestAggregatorV3_BuildFiles2_RespectsMaxCollatableTxNumCap(t *testing.T) {
862+
if testing.Short() {
863+
t.Skip("slow test")
864+
}
865+
ctx := t.Context()
866+
logger := log.New()
867+
dirs := datadir.New(t.TempDir())
868+
db := mdbx.New(dbcfg.ChainDB, logger).InMem(t, dirs.Chaindata).GrowthStep(32 * datasize.MB).MapSize(2 * datasize.GB).MustOpen()
869+
t.Cleanup(db.Close)
870+
agg := state.NewTest(dirs).ReorgBlockDepth(1).StepSize(2).Logger(logger).MustOpen(ctx, db)
871+
t.Cleanup(agg.Close)
872+
require.NoError(t, agg.OpenFolder())
873+
874+
const capTxNum = uint64(8)
875+
agg.SetFrozenBlocksProvider(fakeFrozenBlocks(capTxNum))
876+
877+
tdb, err := temporal.New(db, agg)
878+
require.NoError(t, err)
879+
t.Cleanup(tdb.Close)
880+
tx, err := tdb.BeginTemporalRw(ctx)
881+
require.NoError(t, err)
882+
t.Cleanup(tx.Rollback)
883+
doms, err := execctx.NewSharedDomains(context.Background(), tx, logger)
884+
require.NoError(t, err)
885+
t.Cleanup(doms.Close)
886+
887+
const txnNums = uint64(18)
888+
for i := uint64(0); i < txnNums; i++ {
889+
require.NoError(t, rawdbv3.TxNums.Append(tx, i+1, i+1))
890+
}
891+
generateSharedDomainsUpdates(t, doms, tx, txnNums, newRnd(0), length.Addr, 10, 1)
892+
require.NoError(t, doms.Flush(ctx, tx))
893+
require.NoError(t, tx.Commit())
894+
895+
require.NoError(t, agg.BuildFiles2(ctx, 0, kv.Step(txnNums/agg.StepSize()), false))
896+
agg.WaitForFiles()
897+
898+
require.Equal(t, capTxNum, agg.EndTxNumMinimax(),
899+
"BuildFiles2 must clamp to cap (%d), not toStep", capTxNum)
900+
}
901+
902+
// Cap below visibleFilesMinimaxTxNum: defensive only — no rollback.
903+
func TestAggregatorV3_BuildFiles_CapBelowVisibleIsNoop(t *testing.T) {
904+
if testing.Short() {
905+
t.Skip("slow test")
906+
}
907+
ctx := t.Context()
908+
logger := log.New()
909+
dirs := datadir.New(t.TempDir())
910+
db := mdbx.New(dbcfg.ChainDB, logger).InMem(t, dirs.Chaindata).GrowthStep(32 * datasize.MB).MapSize(2 * datasize.GB).MustOpen()
911+
t.Cleanup(db.Close)
912+
agg := state.NewTest(dirs).ReorgBlockDepth(1).StepSize(2).Logger(logger).MustOpen(ctx, db)
913+
t.Cleanup(agg.Close)
914+
require.NoError(t, agg.OpenFolder())
915+
916+
tdb, err := temporal.New(db, agg)
917+
require.NoError(t, err)
918+
t.Cleanup(tdb.Close)
919+
tx, err := tdb.BeginTemporalRw(ctx)
920+
require.NoError(t, err)
921+
t.Cleanup(tx.Rollback)
922+
doms, err := execctx.NewSharedDomains(context.Background(), tx, logger)
923+
require.NoError(t, err)
924+
t.Cleanup(doms.Close)
925+
926+
const txnNums = uint64(18)
927+
const txnsPerBlock = uint64(1)
928+
for i := uint64(0); i < txnNums/txnsPerBlock; i++ {
929+
require.NoError(t, rawdbv3.TxNums.Append(tx, i+1, (i+1)*txnsPerBlock))
930+
}
931+
generateSharedDomainsUpdates(t, doms, tx, txnNums, newRnd(0), length.Addr, 10, txnsPerBlock)
932+
require.NoError(t, doms.Flush(ctx, tx))
933+
require.NoError(t, tx.Commit())
934+
935+
// build state up to txNum 8.
936+
agg.SetFrozenBlocksProvider(fakeFrozenBlocks(8))
937+
require.NoError(t, agg.BuildFiles(txnNums))
938+
require.Equal(t, uint64(8), agg.EndTxNumMinimax())
939+
940+
// lower cap below current visible — must be a no-op, no rollback.
941+
agg.SetFrozenBlocksProvider(fakeFrozenBlocks(4))
942+
require.NoError(t, agg.BuildFiles(txnNums))
943+
require.Equal(t, uint64(8), agg.EndTxNumMinimax())
944+
}
945+
771946
func compareMapsBytes(t *testing.T, m1, m2 map[string][]byte) {
772947
t.Helper()
773948
for k, v := range m1 {

execution/stagedsync/exec3.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ import (
3838
"github.com/erigontech/erigon/db/rawdb"
3939
"github.com/erigontech/erigon/db/rawdb/rawdbhelpers"
4040
"github.com/erigontech/erigon/db/rawdb/rawtemporaldb"
41-
"github.com/erigontech/erigon/db/services"
4241
dbstate "github.com/erigontech/erigon/db/state"
4342
"github.com/erigontech/erigon/db/state/execctx"
4443
"github.com/erigontech/erigon/execution/commitment"
@@ -142,11 +141,7 @@ func ExecV3(ctx context.Context,
142141
}
143142

144143
if execStage.SyncMode() == stages.ModeApplyingBlocks {
145-
maxCollatable, err := services.MaxCollatableTxNum(ctx, applyTx, cfg.blockReader)
146-
if err != nil {
147-
return err
148-
}
149-
agg.BuildFilesInBackground(min(initialTxNum, maxCollatable))
144+
agg.BuildFilesInBackground(initialTxNum)
150145
}
151146

152147
var (

node/eth/backend.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1299,6 +1299,7 @@ func SetUpBlockReader(ctx context.Context, db kv.RwDB, dirs datadir.Dirs, snConf
12991299
}
13001300
agg.SetSnapshotBuildSema(blockSnapBuildSema)
13011301
agg.SetProduceMod(snConfig.Snapshot.ProduceE3)
1302+
agg.SetFrozenBlocksProvider(blockReader)
13021303

13031304
allSegmentsDownloadComplete, err := rawdb.AllSegmentsDownloadCompleteFromDB(db)
13041305
if err != nil {

0 commit comments

Comments
 (0)