Skip to content

Commit 6f018f4

Browse files
committed
save
1 parent 8981bb1 commit 6f018f4

File tree

9 files changed

+80
-46
lines changed

9 files changed

+80
-46
lines changed

cmd/utils/app/snapshots_cmd.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2659,15 +2659,15 @@ func doRetireCommand(cliCtx *cli.Context, dirs datadir.Dirs) error {
26592659
}
26602660
defer clean()
26612661

2662-
defer br.MadvNormal().DisableReadAhead()
2663-
defer agg.MadvNormal().DisableReadAhead()
2662+
//defer br.MadvNormal().DisableReadAhead()
2663+
//defer agg.MadvNormal().DisableReadAhead()
26642664

26652665
blockSnapBuildSema := semaphore.NewWeighted(int64(runtime.NumCPU()))
26662666
agg.SetSnapshotBuildSema(blockSnapBuildSema)
26672667

26682668
// `erigon retire` command is designed to maximize resouces utilization. But `Erigon itself` does minimize background impact (because not in rush).
26692669
agg.SetCollateAndBuildWorkers(min(8, estimate.StateV3Collate.Workers()))
2670-
agg.SetMergeWorkers(min(8, estimate.StateV3Collate.Workers()))
2670+
agg.SetMergeWorkers(2)
26712671
agg.SetCompressWorkers(estimate.CompressSnapshot.Workers())
26722672
agg.PeriodicalyPrintProcessSet(ctx)
26732673

db/seg/silkworm_seg_fuzz_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,8 @@ func SegUnzip(path string) error {
8080
return err
8181
}
8282

83-
err = decompressor.WithReadAhead(func() error {
83+
err = decompressor.WithReadAhead(func(getter *Getter) error {
8484
word := make([]byte, 0)
85-
getter := decompressor.MakeGetter()
8685
for getter.HasNext() {
8786
word, _ = getter.Next(word[:0])
8887
appendErr := words.Append(word)

db/snapshotsync/freezeblocks/block_snapshots.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,8 +1032,7 @@ func ForEachHeader(ctx context.Context, s *RoSnapshots, walker func(header *type
10321032
defer view.Close()
10331033

10341034
for _, sn := range view.Headers() {
1035-
if err := sn.Src().WithReadAhead(func() error {
1036-
g := sn.Src().MakeGetter()
1035+
if err := sn.Src().WithReadAhead(func(g *seg.Getter) error {
10371036
for i := 0; g.HasNext(); i++ {
10381037
word, _ = g.Next(word[:0])
10391038
var header types.Header

db/state/deduplicate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ func (iit *InvertedIndexRoTx) deduplicateFiles(ctx context.Context, files []*Fil
377377
}
378378
ps.Delete(p)
379379

380-
if err := iit.ii.buildMapAccessor(ctx, fromStep, toStep, iit.dataReader(outItem.decompressor), ps); err != nil {
380+
if err := iit.ii.buildMapAccessor(ctx, fromStep, toStep, outItem.decompressor, ps); err != nil {
381381
return nil, fmt.Errorf("merge %s buildHashMapAccessor [%d-%d]: %w", iit.ii.FilenameBase, startTxNum, endTxNum, err)
382382
}
383383
if outItem.index, err = iit.ii.openHashMapAccessor(iit.ii.efAccessorNewFilePath(fromStep, toStep)); err != nil {

db/state/domain.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1365,7 +1365,8 @@ func (dt *DomainRoTx) getLatestFromFiles(k []byte, maxTxNum uint64) (v []byte, f
13651365
maxTxNum = math.MaxUint64
13661366
}
13671367
useExistenceFilter := dt.d.Accessors.Has(statecfg.AccessorExistence)
1368-
useCache := dt.name != kv.CommitmentDomain && maxTxNum == math.MaxUint64
1368+
//useCache := dt.name != kv.CommitmentDomain && maxTxNum == math.MaxUint64
1369+
useCache := maxTxNum == math.MaxUint64
13691370

13701371
hi, lo := dt.ht.iit.hashKey(k)
13711372

@@ -1422,6 +1423,9 @@ func (dt *DomainRoTx) getLatestFromFiles(k []byte, maxTxNum uint64) (v []byte, f
14221423
fmt.Printf("GetLatest(%s, %x) -> found in file %s\n", dt.name.String(), k, dt.files[i].src.decompressor.FileName())
14231424
}
14241425

1426+
if dt.files[i].endTxNum-dt.files[i].startTxNum == dt.stepSize {
1427+
useCache = false
1428+
}
14251429
if dt.getFromFileCache != nil && useCache {
14261430
dt.getFromFileCache.Add(hi, domainGetFromFileCacheItem{lvl: uint8(i), v: v})
14271431
}

db/state/forkable_merge.go

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -105,30 +105,34 @@ func (f *ProtoForkable) MergeFiles(ctx context.Context, _filesToMerge []visibleF
105105
startRootNum, endRootNum := item.src.Range()
106106
compression := f.isCompressionUsed(RootNum(startRootNum), RootNum(endRootNum))
107107

108-
if err = item.src.decompressor.WithReadAhead(func() error {
109-
reader := f.PagedDataReader(item.src.decompressor, compression)
110-
var k, v []byte
111-
var fmeta NumMetadata
112-
if err := fmeta.Unmarshal(reader.GetMetadata()); err != nil {
113-
return err
114-
}
115-
if meta.Count == 0 {
116-
meta.First = fmeta.First
117-
}
118-
meta.Last = fmeta.Last
119-
meta.Count += fmeta.Count
120-
121-
for reader.HasNext() {
122-
k, v, word, _ = reader.Next2(word[:0])
123-
if err = writer.Add(k, v); err != nil {
124-
return err
125-
}
126-
p.Processed.Add(1)
127-
}
128-
return nil
129-
}); err != nil {
108+
view, viewErr := item.src.decompressor.OpenSequentialView()
109+
if viewErr != nil {
110+
err = viewErr
111+
return
112+
}
113+
defer view.Close()
114+
reader := seg.NewPagedReader(
115+
seg.NewReader(view.MakeGetter(), f.cfg.Compression),
116+
f.cfg.ValuesOnCompressedPage, compression,
117+
)
118+
var k, v []byte
119+
var fmeta NumMetadata
120+
if err = fmeta.Unmarshal(reader.GetMetadata()); err != nil {
130121
return
131122
}
123+
if meta.Count == 0 {
124+
meta.First = fmeta.First
125+
}
126+
meta.Last = fmeta.Last
127+
meta.Count += fmeta.Count
128+
129+
for reader.HasNext() {
130+
k, v, word, _ = reader.Next2(word[:0])
131+
if err = writer.Add(k, v); err != nil {
132+
return
133+
}
134+
p.Processed.Add(1)
135+
}
132136

133137
}
134138

db/state/history.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -237,10 +237,18 @@ func (h *History) buildVI(ctx context.Context, historyIdxPath string, hist, efHi
237237
var histKey []byte
238238
var valOffset uint64
239239

240-
defer hist.MadvSequential().DisableReadAhead()
241-
defer efHist.MadvSequential().DisableReadAhead()
240+
histView, err := hist.OpenSequentialView()
241+
if err != nil {
242+
return err
243+
}
244+
defer histView.Close()
245+
efHistView, err := efHist.OpenSequentialView()
246+
if err != nil {
247+
return err
248+
}
249+
defer efHistView.Close()
242250

243-
iiReader := h.InvertedIndex.dataReader(efHist)
251+
iiReader := seg.NewReader(efHistView.MakeGetter(), h.InvertedIndex.Compression)
244252

245253
var keyBuf, valBuf []byte
246254
cnt := uint64(0)
@@ -255,7 +263,7 @@ func (h *History) buildVI(ctx context.Context, historyIdxPath string, hist, efHi
255263
}
256264
}
257265

258-
histReader := h.dataReader(hist)
266+
histReader := seg.NewReader(histView.MakeGetter(), h.Compression)
259267

260268
_, fName := filepath.Split(historyIdxPath)
261269
p := ps.AddNew(fName, uint64(efHist.Count())/2)
@@ -815,7 +823,7 @@ func (h *History) buildFiles(ctx context.Context, step kv.Step, collation Histor
815823
return HistoryFiles{}, fmt.Errorf("open %s .ef history decompressor: %w", h.FilenameBase, err)
816824
}
817825
{
818-
if err := h.InvertedIndex.buildMapAccessor(ctx, step, step+1, h.InvertedIndex.dataReader(efHistoryDecomp), ps); err != nil {
826+
if err := h.InvertedIndex.buildMapAccessor(ctx, step, step+1, efHistoryDecomp, ps); err != nil {
819827
return HistoryFiles{}, fmt.Errorf("build %s .ef history idx: %w", h.FilenameBase, err)
820828
}
821829
if efHistoryIdx, err = h.InvertedIndex.openHashMapAccessor(h.InvertedIndex.efAccessorNewFilePath(step, step+1)); err != nil {

db/state/inverted_index.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ func (ii *InvertedIndex) buildEfAccessor(ctx context.Context, item *FilesItem, p
264264
if item.decompressor == nil {
265265
return fmt.Errorf("buildEfAccessor: passed item with nil decompressor %s %d-%d", ii.FilenameBase, fromStep, toStep)
266266
}
267-
return ii.buildMapAccessor(ctx, fromStep, toStep, ii.dataReader(item.decompressor), ps)
267+
return ii.buildMapAccessor(ctx, fromStep, toStep, item.decompressor, ps)
268268
}
269269
func (ii *InvertedIndex) dataReader(f *seg.Decompressor) *seg.Reader {
270270
if !strings.Contains(f.FileName(), ".ef") {
@@ -1116,7 +1116,7 @@ func (ii *InvertedIndex) buildFiles(ctx context.Context, step kv.Step, coll Inve
11161116
return InvertedFiles{}, fmt.Errorf("open %s decompressor: %w", ii.FilenameBase, err)
11171117
}
11181118

1119-
if err := ii.buildMapAccessor(ctx, step, step+1, ii.dataReader(decomp), ps); err != nil {
1119+
if err := ii.buildMapAccessor(ctx, step, step+1, decomp, ps); err != nil {
11201120
return InvertedFiles{}, fmt.Errorf("build %s efi: %w", ii.FilenameBase, err)
11211121
}
11221122
if ii.Accessors.Has(statecfg.AccessorHashMap) {
@@ -1129,7 +1129,7 @@ func (ii *InvertedIndex) buildFiles(ctx context.Context, step kv.Step, coll Inve
11291129
return InvertedFiles{decomp: decomp, index: mapAccessor, existence: existenceFilter}, nil
11301130
}
11311131

1132-
func (ii *InvertedIndex) buildMapAccessor(ctx context.Context, fromStep, toStep kv.Step, data *seg.Reader, ps *background.ProgressSet) error {
1132+
func (ii *InvertedIndex) buildMapAccessor(ctx context.Context, fromStep, toStep kv.Step, data *seg.Decompressor, ps *background.ProgressSet) error {
11331133
idxPath := ii.efAccessorNewFilePath(fromStep, toStep)
11341134
versionOfRs := uint8(0)
11351135
if !ii.FileVersion.AccessorEFI.Current.Eq(version.V1_0) { // inner version=1 incompatible with .efi v1.0
@@ -1175,7 +1175,7 @@ func (ii *InvertedIndex) buildMapAccessor(ctx context.Context, fromStep, toStep
11751175
// each such non-existing key read `MPH` transforms to random
11761176
// key read. `LessFalsePositives=true` feature filtering-out such cases (with `1/256=0.3%` false-positives).
11771177

1178-
if err := buildHashMapAccessor(ctx, data, idxPath, false, cfg, ps, ii.logger); err != nil {
1178+
if err := buildHashMapAccessor(ctx, data, ii.Compression, idxPath, false, cfg, ps, ii.logger); err != nil {
11791179
return err
11801180
}
11811181
return nil

db/state/merge.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,12 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
436436
var cp CursorHeap
437437
heap.Init(&cp)
438438
for _, item := range domainFiles {
439-
g := dt.dataReader(item.decompressor)
439+
view, err := item.decompressor.OpenSequentialView()
440+
if err != nil {
441+
return nil, nil, nil, err
442+
}
443+
defer view.Close()
444+
g := seg.NewReader(view.MakeGetter(), dt.d.Compression)
440445
g.Reset(0)
441446
if g.HasNext() {
442447
key, _ := g.Next(nil)
@@ -546,7 +551,7 @@ func (dt *DomainRoTx) mergeFiles(ctx context.Context, domainFiles, indexFiles, h
546551
}
547552
}
548553
if dt.d.Accessors.Has(statecfg.AccessorHashMap) {
549-
if err = dt.d.buildHashMapAccessor(ctx, fromStep, toStep, dt.dataReader(valuesIn.decompressor), ps); err != nil {
554+
if err = dt.d.buildHashMapAccessor(ctx, fromStep, toStep, valuesIn.decompressor, ps); err != nil {
550555
return nil, nil, nil, fmt.Errorf("merge %s buildHashMapAccessor [%d-%d]: %w", dt.d.FilenameBase, r.values.from, r.values.to, err)
551556
}
552557
if valuesIn.index, err = dt.d.openHashMapAccessor(dt.d.kviAccessorNewFilePath(fromStep, toStep)); err != nil {
@@ -616,7 +621,12 @@ func (iit *InvertedIndexRoTx) mergeFiles(ctx context.Context, files []*FilesItem
616621
heap.Init(&cp)
617622

618623
for _, item := range files {
619-
g := iit.dataReader(item.decompressor)
624+
view, err := item.decompressor.OpenSequentialView()
625+
if err != nil {
626+
return nil, err
627+
}
628+
defer view.Close()
629+
g := seg.NewReader(view.MakeGetter(), iit.ii.Compression)
620630
g.Reset(0)
621631
if g.HasNext() {
622632
key, _ := g.Next(nil)
@@ -720,7 +730,7 @@ func (iit *InvertedIndexRoTx) mergeFiles(ctx context.Context, files []*FilesItem
720730
}
721731
ps.Delete(p)
722732

723-
if err := iit.ii.buildMapAccessor(ctx, fromStep, toStep, iit.dataReader(outItem.decompressor), ps); err != nil {
733+
if err := iit.ii.buildMapAccessor(ctx, fromStep, toStep, outItem.decompressor, ps); err != nil {
724734
return nil, fmt.Errorf("merge %s buildHashMapAccessor [%d-%d]: %w", iit.ii.FilenameBase, startTxNum, endTxNum, err)
725735
}
726736
if outItem.index, err = iit.ii.openHashMapAccessor(iit.ii.efAccessorNewFilePath(fromStep, toStep)); err != nil {
@@ -791,7 +801,12 @@ func (ht *HistoryRoTx) mergeFiles(ctx context.Context, indexFiles, historyFiles
791801
var cp CursorHeap
792802
heap.Init(&cp)
793803
for _, item := range indexFiles {
794-
g := ht.iit.dataReader(item.decompressor)
804+
idxView, err := item.decompressor.OpenSequentialView()
805+
if err != nil {
806+
return nil, nil, err
807+
}
808+
defer idxView.Close()
809+
g := seg.NewReader(idxView.MakeGetter(), ht.h.InvertedIndex.Compression)
795810
g.Reset(0)
796811
if g.HasNext() {
797812
var g2 *seg.PagedReader
@@ -803,7 +818,12 @@ func (ht *HistoryRoTx) mergeFiles(ctx context.Context, indexFiles, historyFiles
803818
compressedPageValuesCount = ht.h.HistoryValuesOnCompressedPage
804819
}
805820

806-
g2 = seg.NewPagedReader(ht.dataReader(hi.decompressor), compressedPageValuesCount, true)
821+
histView, err := hi.decompressor.OpenSequentialView()
822+
if err != nil {
823+
return nil, nil, err
824+
}
825+
defer histView.Close()
826+
g2 = seg.NewPagedReader(seg.NewReader(histView.MakeGetter(), ht.h.Compression), compressedPageValuesCount, true)
807827
break
808828
}
809829
}

0 commit comments

Comments
 (0)