Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions db/recsplit/recsplit.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/erigontech/erigon/common"
"github.com/erigontech/erigon/common/assert"
"github.com/erigontech/erigon/common/background"
"github.com/erigontech/erigon/common/dir"
"github.com/erigontech/erigon/common/log/v3"
"github.com/erigontech/erigon/common/mmap"
Expand Down Expand Up @@ -164,6 +165,8 @@ type RecSplit struct {

noFsync bool // fsync is enabled by default, but tests can manually disable
timings Timings

progress *background.Progress // If set, tracks 0-100%: add-keys fills 0-50%, build fills 50-100%
}

type RecSplitArgs struct {
Expand Down Expand Up @@ -395,6 +398,9 @@ func (rs *RecSplit) ResetNextSalt() {
rs.collision = false
rs.keysAdded = 0
rs.salt++
if rs.progress != nil {
rs.progress.Processed.Store(0)
}
if rs.bucketCollector != nil {
rs.bucketCollector.Close()
}
Expand Down Expand Up @@ -519,6 +525,9 @@ func (rs *RecSplit) AddKey(key []byte, offset uint64) error {

rs.keysAdded++
rs.prevOffset = offset
if rs.progress != nil && rs.keysAdded%1024 == 0 {
rs.progress.Processed.Add(1024)
}
return nil
}

Expand Down Expand Up @@ -790,6 +799,10 @@ func (rs *RecSplit) loadFuncBucket(k, v []byte, _ etl.CurrentTableReader, _ etl.
if err := rs.recsplitCurrentBucket(); err != nil {
return err
}
if rs.progress != nil {
// Build phase fills the 50–100% half: each bucket ≈ bucketSize keys worth.
rs.progress.Processed.Add(uint64(rs.bucketSize))
}
}
rs.currentBucketIdx = bucketIdx
}
Expand Down Expand Up @@ -820,6 +833,26 @@ func (rs *RecSplit) buildOffsetEf() error {
return nil
}

// KeyCount returns the number of keys added to the RecSplit.
func (rs *RecSplit) KeyCount() uint64 { return rs.keysAdded }

// BucketCount returns the number of buckets.
func (rs *RecSplit) BucketCount() uint64 { return rs.bucketCount }

// SetProgress wires a single progress tracker covering the full build lifecycle.
// Total = 2*keyExpectedCount; AddKey fills 0→keyExpectedCount (0–50%) and
// the bucket-building phase fills keyExpectedCount→2*keyExpectedCount (50–100%).
// Progress is automatically reset on ResetNextSalt (collision retry).
func (rs *RecSplit) SetProgress(p *background.Progress) {
if p == nil {
return
}
p.Name.Store(&rs.fileName)
p.Processed.Store(0)
p.Total.Store(2 * rs.keyExpectedCount)
rs.progress = p
}

// Build has to be called after all the keys have been added, and it initiates the process
// of building the perfect hash function and writing index into a file
func (rs *RecSplit) Build(ctx context.Context) error {
Expand Down
12 changes: 2 additions & 10 deletions db/snaptype/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,11 +492,6 @@ func BuildIndex(ctx context.Context, info FileInfo, indexVersion version.Version
}
defer d.Close()

if p != nil {
fname := info.Name()
p.Name.Store(&fname)
p.Total.Store(uint64(d.Count()))
}
cfg.KeyCount = d.Count()
idxVer := indexVersion.Current
cfg.IndexFile = filepath.Join(info.Dir(), info.Type.IdxFileName(idxVer, info.From, info.To))
Expand All @@ -515,6 +510,7 @@ func BuildIndex(ctx context.Context, info FileInfo, indexVersion version.Version
defer d.MadvSequential().DisableReadAhead()

for {
rs.SetProgress(p)
g := d.MakeGetter()
var i, offset, nextPos uint64
word := make([]byte, 0, 4096)
Expand Down Expand Up @@ -560,11 +556,6 @@ func BuildIndexWithSnapName(ctx context.Context, info FileInfo, cfg recsplit.Rec
}
defer d.Close()

if p != nil {
fname := info.Name()
p.Name.Store(&fname)
p.Total.Store(uint64(d.Count()))
}
cfg.KeyCount = d.Count()
cfg.IndexFile = filepath.Join(info.Dir(), IdxFileName(info.Version, info.From, info.To, info.CaplinTypeString))
rs, err := recsplit.NewRecSplit(cfg, logger)
Expand All @@ -577,6 +568,7 @@ func BuildIndexWithSnapName(ctx context.Context, info FileInfo, cfg recsplit.Rec
defer d.MadvSequential().DisableReadAhead()

for {
rs.SetProgress(p)
g := d.MakeGetter()
var i, offset, nextPos uint64
word := make([]byte, 0, 4096)
Expand Down
11 changes: 1 addition & 10 deletions db/snaptype2/block_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,6 @@ var (
return fmt.Errorf("TransactionsIdx: at=%d-%d, pre index building, expect: %d, got %d", sn.From, sn.To, expectedCount, d.Count())
}

if p != nil {
name := sn.Name()
p.Name.Store(&name)
p.Total.Store(uint64(d.Count() * 2))
}

txnHashIdx, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{
KeyCount: d.Count(),

Expand Down Expand Up @@ -291,6 +285,7 @@ var (
defer bodiesSegment.MadvSequential().DisableReadAhead()

for {
txnHashIdx.SetProgress(p)
g, bodyGetter := d.MakeGetter(), bodiesSegment.MakeGetter()
var ti, offset, nextPos uint64
blockNum := firstBlockNum
Expand All @@ -302,10 +297,6 @@ var (
}

for g.HasNext() {
if p != nil {
p.Processed.Add(1)
}

word, nextPos = g.Next(word[:0])
select {
case <-ctx.Done():
Expand Down
3 changes: 1 addition & 2 deletions db/state/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1220,6 +1220,7 @@ func buildHashMapAccessor(ctx context.Context, g *seg.Reader, idxPath string, va
return err
}
g.Reset(0)
rs.SetProgress(p)
for g.HasNext() {
word, valPos = g.Next(word[:0])
if values {
Expand All @@ -1234,8 +1235,6 @@ func buildHashMapAccessor(ctx context.Context, g *seg.Reader, idxPath string, va

// Skip value
keyPos, _ = g.Skip()

p.Processed.Add(1)
}
if err = rs.Build(ctx); err != nil {
if rs.Collision() {
Expand Down
2 changes: 1 addition & 1 deletion db/state/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,12 +283,12 @@ func (h *History) buildVI(ctx context.Context, historyIdxPath string, hist, efHi
for {
histReader.Reset(0)
iiReader.Reset(0)
rs.SetProgress(p)

valOffset = 0
for iiReader.HasNext() {
keyBuf, _ = iiReader.Next(keyBuf[:0])
valBuf, _ = iiReader.Next(valBuf[:0])
p.Processed.Add(1)

// fmt.Printf("ef key %x\n", keyBuf)

Expand Down
10 changes: 1 addition & 9 deletions db/state/simple_accessor_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,6 @@ func (s *SimpleAccessorBuilder) Build(ctx context.Context, decomp *seg.Decompres
idxFile, _ := s.parser.AccessorIdxFile(version.V1_0, from, to, uint16(s.indexPos))

keyCount := iidq.Count()
if p != nil {
baseFileName := filepath.Base(idxFile)
p.Name.Store(&baseFileName)
p.Total.Store(keyCount)
}
salt, err := Registry.Salt(s.id)
if err != nil {
return nil, err
Expand Down Expand Up @@ -197,16 +192,14 @@ func (s *SimpleAccessorBuilder) Build(ctx context.Context, decomp *seg.Decompres

defer iidq.reader.MadvNormal().DisableReadAhead()
for {
rs.SetProgress(p)
stream := iidq.GetStream(ctx)
defer stream.Close()
for stream.HasNext() {
word, index, offset, err := stream.Next()
if err != nil {
return nil, err
}
if p != nil {
p.Processed.Add(1)
}
key := s.kf.Make(word, index)
if err = rs.AddKey(key, offset); err != nil {
return nil, err
Expand All @@ -222,7 +215,6 @@ func (s *SimpleAccessorBuilder) Build(ctx context.Context, decomp *seg.Decompres
if err = rs.Build(ctx); err != nil {
// collision handling
if rs.Collision() {
p.Processed.Store(0)
s.logger.Debug("found collision, trying again", "file", filepath.Base(idxFile), "salt", rs.Salt(), "err", err)
rs.ResetNextSalt()
continue
Expand Down
2 changes: 2 additions & 0 deletions polygon/heimdall/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ var (
defer d.MadvSequential().DisableReadAhead()

for {
rs.SetProgress(p)
g.Reset(0)
first = true
var i, offset, nextPos uint64
Expand Down Expand Up @@ -528,6 +529,7 @@ func buildValueIndex(ctx context.Context, version version.Versions, sn snaptype.
defer d.MadvSequential().DisableReadAhead()

for {
rs.SetProgress(p)
g := d.MakeGetter()
var i, offset, nextPos uint64
var key [8]byte
Expand Down
Loading