Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions db/state/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@ import (
"container/heap"
"context"
"fmt"
"hash/crc32"
"io"
"math"
"os"
"path"
"path/filepath"
"strings"
"time"

"github.com/tidwall/btree"

Expand All @@ -42,6 +46,19 @@ import (
"github.com/erigontech/erigon/execution/commitment/commitmentdb"
)

// crc32cFile computes a CRC32C (Castagnoli) checksum of a file on disk.
// Uses hardware-accelerated CRC32C when available. Intended for debugging only.
func crc32cFile(filePath string) uint32 {
f, err := os.Open(filePath)
if err != nil {
return 0
}
defer f.Close()
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
_, _ = io.Copy(h, f)
return h.Sum32()
}

func (d *Domain) dirtyFilesEndTxNumMinimax() uint64 {
if d == nil {
return 0
Expand Down Expand Up @@ -410,6 +427,8 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
closeFiles = false
return
}
var tAdd, tCompress, tIndex time.Duration
tAddStart := time.Now()

fromStep, toStep := kv.Step(r.values.from/r.aggStep), kv.Step(r.values.to/r.aggStep)
kvFilePath := dt.d.kvNewFilePath(fromStep, toStep)
Expand All @@ -418,13 +437,18 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
if err != nil {
return nil, nil, nil, fmt.Errorf("merge %s compressor: %w", dt.d.FilenameBase, err)
}
kvFile.CollectTimings()

forceNoCompress := toStep-fromStep < DomainMinStepsToCompress
kvWriter = dt.dataWriter(kvFile, forceNoCompress)
if dt.d.noFsync {
kvWriter.DisableFsync()
}

defer func() {
log.Debug("[merge] timings", "name", path.Base(kvFilePath), "tAdd", tAdd, "tCompress", tCompress, "tIndex", tIndex, "cksum", fmt.Sprintf("%08x", crc32cFile(kvFilePath)))
}()

cnt := 0
for _, item := range domainFiles {
cnt += item.decompressor.Count()
Expand Down Expand Up @@ -520,14 +544,18 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
return nil, nil, nil, err
}
}
tAdd = time.Since(tAddStart)

tCompressStart := time.Now()
if err = kvWriter.Compress(); err != nil {
return nil, nil, nil, err
}
kvWriter.Close()
kvWriter = nil
ps.Delete(p)
tCompress = time.Since(tCompressStart)

tIndexStart := time.Now()
valuesIn = newFilesItem(r.values.from, r.values.to, dt.stepSize, dt.stepsInFrozenFile)
valuesIn.frozen = false
if valuesIn.decompressor, err = seg.NewDecompressor(kvFilePath); err != nil {
Expand Down Expand Up @@ -567,6 +595,7 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
}
}
}
tIndex = time.Since(tIndexStart)

closeFiles = false
return
Expand Down Expand Up @@ -607,6 +636,7 @@ func (iit *InvertedIndexRoTx) mergeFiles(ctx context.Context, files []*FilesItem
if iit.ii.noFsync {
comp.DisableFsync()
}
comp.CollectTimings()

write := iit.dataWriter(comp, false)
p := ps.AddNew(path.Base(datPath), 1)
Expand Down Expand Up @@ -784,6 +814,12 @@ func (ht *HistoryRoTx) mergeFiles(ctx context.Context, indexFiles, historyFiles
comp.DisableFsync()
}

var tAdd, tCompress, tIndex time.Time
defer func() {
log.Debug("[merge] timings", "name", path.Base(datPath), "tAdd", time.Since(tAdd), "tCompress", time.Since(tCompress), "tIndex", time.Since(tIndex), "cksum", fmt.Sprintf("%08x", crc32cFile(datPath)))
}()
tAdd = time.Now()

pagedWr := ht.dataWriter(comp)
p := ps.AddNew(path.Base(datPath), 1)
defer ps.Delete(p)
Expand Down Expand Up @@ -869,6 +905,7 @@ func (ht *HistoryRoTx) mergeFiles(ctx context.Context, indexFiles, historyFiles
}
}
}
tCompress = time.Now()
if err := pagedWr.Compress(); err != nil {
return nil, nil, err
}
Expand All @@ -879,6 +916,7 @@ func (ht *HistoryRoTx) mergeFiles(ctx context.Context, indexFiles, historyFiles
}
ps.Delete(p)

tIndex = time.Now()
if err = ht.h.buildVI(ctx, idxPath, decomp, indexIn.decompressor, indexIn.startTxNum, ps); err != nil {
return nil, nil, err
}
Expand Down
Loading