Skip to content

Commit d059e28

Browse files
authored
Merge branch 'main' into anacrolix/pls-faster-execution-tests
2 parents 0b931d2 + 59bd509 commit d059e28

6 files changed

Lines changed: 244 additions & 7 deletions

File tree

db/recsplit/recsplit.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,9 +173,10 @@ type RecSplit struct {
173173
built bool // Flag indicating that the hash function has been built and no more keys can be added
174174
logger log.Logger
175175

176-
noFsync bool // fsync is enabled by default, but tests can manually disable
177-
workers int // Number of parallel goroutines for Build(); 0 or 1 = sequential
178-
timings Timings
176+
noFsync bool // fsync is enabled by default, but tests can manually disable
177+
forceCollisionOnce bool // test-only: force one collision on the next Build
178+
workers int // Number of parallel goroutines for Build(); 0 or 1 = sequential
179+
timings Timings
179180

180181
progress *background.Progress // If set, tracks 0-100%: add-keys fills 0-50%, build fills 50-100%
181182
}
@@ -875,6 +876,11 @@ func (rs *RecSplit) Build(ctx context.Context) error {
875876
if rs.keysAdded != rs.keyExpectedCount {
876877
return fmt.Errorf("rs %s expected keys %d, got %d", rs.fileName, rs.keyExpectedCount, rs.keysAdded)
877878
}
879+
if rs.forceCollisionOnce {
880+
rs.forceCollisionOnce = false
881+
rs.collision = true
882+
return fmt.Errorf("%w: forced for testing", ErrCollision)
883+
}
878884
if rs.timings.Enabled {
879885
rs.timings.AddTook = time.Since(rs.timings.AddStart) // assume Adding data into compressor complete
880886
rs.timings.BuildStart = time.Now()
@@ -1102,6 +1108,12 @@ func (rs *RecSplit) Collision() bool {
11021108
return rs.collision
11031109
}
11041110

1111+
// ForceCollisionOnce makes the next Build() fail with a collision error.
1112+
// Test-only: used to exercise collision retry paths.
1113+
func (rs *RecSplit) ForceCollisionOnce() {
1114+
rs.forceCollisionOnce = true
1115+
}
1116+
11051117
// bucketResultPool is a package-level sync.Pool for reusing bucketResult instances
11061118
var bucketResultPool = &sync.Pool{
11071119
New: func() interface{} {

db/state/domain.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ type Domain struct {
9595
_visible *domainVisible
9696

9797
checker *DependencyIntegrityChecker
98+
99+
// _testBuildAccessorHook - test-only: called with the recsplit before the build loop in buildHashMapAccessor
100+
_testBuildAccessorHook func(rs *recsplit.RecSplit)
98101
}
99102

100103
type domainVisible struct {
@@ -1109,7 +1112,7 @@ func (d *Domain) buildHashMapAccessor(ctx context.Context, fromStep, toStep kv.S
11091112
NoFsync: d.noFsync,
11101113
//Workers: d.CompressorCfg.Workers,
11111114
}
1112-
return buildHashMapAccessor(ctx, data, idxPath, false, cfg, ps, d.logger)
1115+
return buildHashMapAccessor(ctx, data, idxPath, false, cfg, ps, d.logger, d._testBuildAccessorHook)
11131116
}
11141117

11151118
func (d *Domain) missedBtreeAccessors(source []*FilesItem, dl dirListing) (l []*FilesItem) {
@@ -1189,7 +1192,7 @@ func (d *Domain) BuildMissedAccessors(ctx context.Context, g *errgroup.Group, ps
11891192
}
11901193
}
11911194

1192-
func buildHashMapAccessor(ctx context.Context, g *seg.Reader, idxPath string, values bool, cfg recsplit.RecSplitArgs, ps *background.ProgressSet, logger log.Logger) (err error) {
1195+
func buildHashMapAccessor(ctx context.Context, g *seg.Reader, idxPath string, values bool, cfg recsplit.RecSplitArgs, ps *background.ProgressSet, logger log.Logger, testHook func(*recsplit.RecSplit)) (err error) {
11931196
_, fileName := filepath.Split(idxPath)
11941197
count := g.Count()
11951198
if !values {
@@ -1214,8 +1217,13 @@ func buildHashMapAccessor(ctx context.Context, g *seg.Reader, idxPath string, va
12141217
}
12151218
}()
12161219

1217-
var keyPos, valPos uint64
1220+
if testHook != nil {
1221+
testHook(rs)
1222+
}
1223+
12181224
for {
1225+
// Reset positions at the start of each iteration to handle collision retries correctly
1226+
var keyPos, valPos uint64
12191227
word := make([]byte, 0, 256)
12201228
if err := ctx.Err(); err != nil {
12211229
return err

db/state/domain_test.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import (
5050
"github.com/erigontech/erigon/db/kv/mdbx"
5151
"github.com/erigontech/erigon/db/kv/order"
5252
"github.com/erigontech/erigon/db/kv/stream"
53+
"github.com/erigontech/erigon/db/recsplit"
5354
"github.com/erigontech/erigon/db/seg"
5455
"github.com/erigontech/erigon/db/state/changeset"
5556
"github.com/erigontech/erigon/db/state/statecfg"
@@ -3103,3 +3104,119 @@ func TestDomain_IntegrateDirtyFilesNilGuard(t *testing.T) {
31033104
require.NotNil(t, foundAfter, "dirty file for step 0 must still exist after nil StaticFiles")
31043105
require.NotNil(t, foundAfter.decompressor, "dirty file decompressor must not be overwritten by nil StaticFiles")
31053106
}
3107+
3108+
// filledDomainWithHashMapAccessor creates a domain configured to use AccessorHashMap
3109+
// (like CommitmentDomain) for testing buildHashMapAccessor code paths.
3110+
func filledDomainWithHashMapAccessor(t *testing.T, logger log.Logger) (kv.RwDB, *Domain, uint64) {
3111+
t.Helper()
3112+
dirs := datadir2.New(t.TempDir())
3113+
3114+
// Start with AccountsDomain config but switch to HashMap accessor
3115+
cfg := statecfg.Schema.AccountsDomain
3116+
cfg.Accessors = statecfg.AccessorHashMap // Use HashMap instead of BTree
3117+
3118+
// Set version to V1_0_standart to enable HashMap accessor building
3119+
cfg.FileVersion = statecfg.DomainVersionTypes{
3120+
DataKV: version.V1_0_standart,
3121+
AccessorBT: version.V1_0_standart,
3122+
AccessorKVEI: version.V1_0_standart,
3123+
AccessorKVI: version.V1_0_standart,
3124+
}
3125+
cfg.Hist.IiCfg.FileVersion = statecfg.IIVersionTypes{
3126+
DataEF: version.V1_0_standart,
3127+
AccessorEFI: version.V1_0_standart,
3128+
}
3129+
3130+
db := mdbx.New(dbcfg.ChainDB, logger).InMem(t, dirs.Chaindata).MustOpen()
3131+
t.Cleanup(db.Close)
3132+
salt := uint32(1)
3133+
3134+
d, err := NewDomain(cfg, 16, config3.DefaultStepsInFrozenFile, dirs, logger)
3135+
require.NoError(t, err)
3136+
d.salt.Store(&salt)
3137+
d.DisableFsync()
3138+
t.Cleanup(d.Close)
3139+
3140+
txs := fillDomain(t, d, db, logger)
3141+
return db, d, txs
3142+
}
3143+
3144+
// collateAndMergeWithCollisionRetry is like collateAndMerge but forces a
3145+
// recsplit collision retry on every buildHashMapAccessor call during merge.
3146+
func collateAndMergeWithCollisionRetry(t *testing.T, tx kv.RwTx, d *Domain, txs uint64) {
3147+
t.Helper()
3148+
logEvery := time.NewTicker(30 * time.Second)
3149+
defer logEvery.Stop()
3150+
ctx := context.Background()
3151+
3152+
// Collate without collision forcing first
3153+
for step := kv.Step(0); step < kv.Step(txs/d.stepSize)-1; step++ {
3154+
require.NoError(t, d.collateBuildIntegrate(ctx, step, tx, background.NewProgressSet()))
3155+
}
3156+
3157+
// Now set up collision forcing for merge
3158+
d._testBuildAccessorHook = func(rs *recsplit.RecSplit) {
3159+
rs.ForceCollisionOnce()
3160+
}
3161+
3162+
domainRoTx := d.BeginFilesRo()
3163+
defer domainRoTx.Close()
3164+
3165+
// Merge with collision retry
3166+
r := domainRoTx.findMergeRange(d.dirtyFilesEndTxNumMinimax(), d.dirtyFilesEndTxNumMinimax())
3167+
if r.values.needMerge {
3168+
valuesOuts, indexOuts, historyOuts := domainRoTx.staticFilesInRange(r)
3169+
valuesIn, indexIn, historyIn, err := domainRoTx.mergeFiles(ctx, valuesOuts, indexOuts, historyOuts, r, nil, background.NewProgressSet())
3170+
require.NoError(t, err)
3171+
d.integrateMergedDirtyFiles(valuesIn, indexIn, historyIn)
3172+
d.reCalcVisibleFiles(d.dirtyFilesEndTxNumMinimax())
3173+
}
3174+
}
3175+
3176+
// TestDomain_KeyPosResetOnCollisionRetry verifies that keyPos and valPos
3177+
// used in buildHashMapAccessor are reset when the build retries due to a
3178+
// recsplit collision. Without the reset, the .kvi index would contain
3179+
// incorrect offsets on the retry pass.
3180+
func TestDomain_KeyPosResetOnCollisionRetry(t *testing.T) {
3181+
if testing.Short() {
3182+
t.Skip()
3183+
}
3184+
3185+
t.Parallel()
3186+
3187+
logger := log.New()
3188+
db, d, txs := filledDomainWithHashMapAccessor(t, logger)
3189+
3190+
ctx := context.Background()
3191+
tx, err := db.BeginRw(ctx)
3192+
require.NoError(t, err)
3193+
defer tx.Rollback()
3194+
3195+
// Collate and merge with forced collision retry
3196+
collateAndMergeWithCollisionRetry(t, tx, d, txs)
3197+
require.NoError(t, tx.Commit())
3198+
3199+
// Verify lookups still work correctly after collision retry
3200+
roTx, err := db.BeginRo(ctx)
3201+
require.NoError(t, err)
3202+
defer roTx.Rollback()
3203+
3204+
domainRoTx := d.BeginFilesRo()
3205+
defer domainRoTx.Close()
3206+
3207+
// Check that we can look up keys correctly
3208+
// If keyPos wasn't reset, the index would have wrong offsets
3209+
for keyNum := uint64(1); keyNum <= uint64(31); keyNum++ {
3210+
var k [8]byte
3211+
binary.BigEndian.PutUint64(k[:], keyNum)
3212+
3213+
val, _, found, err := domainRoTx.GetLatest(k[:], roTx)
3214+
require.NoError(t, err, "key %x", k)
3215+
require.True(t, found, "key %x should be found", k)
3216+
3217+
// Expected value is txs/keyNum
3218+
var expected [8]byte
3219+
binary.BigEndian.PutUint64(expected[:], txs/keyNum)
3220+
require.Equal(t, expected[:], val, "key %x value mismatch", k)
3221+
}
3222+
}

db/state/history.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ type History struct {
7171
// _visibleFiles - underscore in name means: don't use this field directly, use BeginFilesRo()
7272
// underlying array is immutable - means it's ready for zero-copy use
7373
_visibleFiles []visibleFile
74+
75+
// _testBuildVIHook - test-only: called with the recsplit before the build loop in buildVI
76+
_testBuildVIHook func(rs *recsplit.RecSplit)
7477
}
7578

7679
func NewHistory(cfg statecfg.HistCfg, stepSize, stepsInFrozenFile uint64, dirs datadir.Dirs, logger log.Logger) (*History, error) {
@@ -276,6 +279,9 @@ func (h *History) buildVI(ctx context.Context, historyIdxPath string, hist, efHi
276279
}
277280
defer rs.Close()
278281
rs.LogLvl(log.LvlTrace)
282+
if h._testBuildVIHook != nil {
283+
h._testBuildVIHook(rs)
284+
}
279285

280286
seq := &multiencseq.SequenceReader{}
281287
it := &multiencseq.SequenceIterator{}

db/state/history_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,41 @@ func TestHistoryCollationBuild(t *testing.T) {
322322
})
323323
}
324324

325+
// TestHistoryBuildVI_PageCounterResetOnCollisionRetry verifies that the page
326+
// counter 'i' used for paged history files is reset when buildVI retries due
327+
// to a recsplit collision. Without the reset, the .vi index would contain
328+
// incorrect offsets on the retry pass.
329+
//
330+
// The bug only manifests when compressedPageValuesCount > 0 (paged history),
331+
// which happens during merge (not initial collation). This test forces a
332+
// collision retry during merge and verifies history lookups remain correct.
333+
func TestHistoryBuildVI_PageCounterResetOnCollisionRetry(t *testing.T) {
334+
if testing.Short() {
335+
t.Skip()
336+
}
337+
338+
t.Parallel()
339+
340+
logger := log.New()
341+
342+
test := func(t *testing.T, largeValues bool) {
343+
t.Helper()
344+
db, h, txs := filledHistory(t, largeValues, logger)
345+
346+
// Force collision retries during buildVI calls.
347+
// Set the hook AFTER collation but BEFORE merge, so only merge's
348+
// buildVI calls get the forced collision.
349+
collateAndMergeHistoryWithCollisionRetry(t, db, h, txs)
350+
checkHistoryHistory(t, h, txs)
351+
}
352+
t.Run("large_values", func(t *testing.T) {
353+
test(t, true)
354+
})
355+
t.Run("small_values", func(t *testing.T) {
356+
test(t, false)
357+
})
358+
}
359+
325360
func TestHistoryAfterPrune(t *testing.T) {
326361
logger := log.New()
327362
logEvery := time.NewTicker(30 * time.Second)
@@ -1071,6 +1106,65 @@ func collateAndMergeHistory(tb testing.TB, db kv.RwDB, h *History, txs uint64, d
10711106
require.NoError(err)
10721107
}
10731108

1109+
// collateAndMergeHistoryWithCollisionRetry is like collateAndMergeHistory
1110+
// but forces a recsplit collision retry on every buildVI call during merge.
1111+
func collateAndMergeHistoryWithCollisionRetry(tb testing.TB, db kv.RwDB, h *History, txs uint64) {
1112+
tb.Helper()
1113+
require := require.New(tb)
1114+
1115+
logEvery := time.NewTicker(30 * time.Second)
1116+
defer logEvery.Stop()
1117+
ctx := context.Background()
1118+
tx, err := db.BeginRwNosync(ctx)
1119+
require.NoError(err)
1120+
defer tx.Rollback()
1121+
1122+
// Collate without collision forcing
1123+
for step := kv.Step(0); step < kv.Step(txs/h.stepSize)-1; step++ {
1124+
require.NoError(h.collateBuildIntegrate(ctx, step, tx, background.NewProgressSet()))
1125+
1126+
hc := h.BeginFilesRo()
1127+
_, err = hc.Prune(ctx, tx, step.ToTxNum(h.stepSize), (step + 1).ToTxNum(h.stepSize), math.MaxUint64, false, logEvery)
1128+
hc.Close()
1129+
require.NoError(err)
1130+
}
1131+
1132+
// Enable collision forcing for merge phase only
1133+
collisionRetries := 0
1134+
h._testBuildVIHook = func(rs *recsplit.RecSplit) {
1135+
rs.ForceCollisionOnce()
1136+
collisionRetries++
1137+
}
1138+
1139+
var r HistoryRanges
1140+
maxSpan := h.stepSize * config3.DefaultStepsInFrozenFile
1141+
1142+
for {
1143+
if stop := func() bool {
1144+
hc := h.BeginFilesRo()
1145+
defer hc.Close()
1146+
r = hc.findMergeRange(hc.files.EndTxNum(), maxSpan)
1147+
if !r.any() {
1148+
return true
1149+
}
1150+
indexOuts, historyOuts, err := hc.staticFilesInRange(r)
1151+
require.NoError(err)
1152+
indexIn, historyIn, err := hc.mergeFiles(ctx, indexOuts, historyOuts, r, background.NewProgressSet())
1153+
require.NoError(err)
1154+
h.integrateMergedDirtyFiles(indexIn, historyIn)
1155+
h.reCalcVisibleFiles(h.dirtyFilesEndTxNumMinimax())
1156+
return false
1157+
}(); stop {
1158+
break
1159+
}
1160+
}
1161+
1162+
require.Greater(collisionRetries, 0, "expected at least one buildVI collision retry during merge")
1163+
1164+
err = tx.Commit()
1165+
require.NoError(err)
1166+
}
1167+
10741168
func TestHistoryMergeFiles(t *testing.T) {
10751169
if testing.Short() {
10761170
t.Skip()

db/state/inverted_index.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1174,7 +1174,7 @@ func (ii *InvertedIndex) buildMapAccessor(ctx context.Context, fromStep, toStep
11741174
// each such non-existing key read `MPH` transforms to random
11751175
// key read. `LessFalsePositives=true` feature filtering-out such cases (with `1/256=0.3%` false-positives).
11761176

1177-
if err := buildHashMapAccessor(ctx, data, idxPath, false, cfg, ps, ii.logger); err != nil {
1177+
if err := buildHashMapAccessor(ctx, data, idxPath, false, cfg, ps, ii.logger, nil); err != nil {
11781178
return err
11791179
}
11801180
return nil

0 commit comments

Comments
 (0)