Skip to content

Commit 3e95cda

Browse files
package resetstate to not import stagedsync package (#15098)
maybe will fix: ``` fatal error: too many address space collisions for -race mode runtime stack: runtime.throw({0x104023000?, 0x4000000?}) runtime/panic.go:1073 +0x38 fp=0x175a02d00 sp=0x175a02cd0 pc=0x102481958 runtime.(*mheap).sysAlloc(0x105b15240, 0x106b90000?, 0x105b25488, 0x1) runtime/malloc.go:699 +0x55c fp=0x175a02d90 sp=0x175a02d00 pc=0x1024182fc runtime.(*mheap).grow(0x105b15240, 0x62?) runtime/mheap.go:1489 +0x84 fp=0x175a02e10 sp=0x175a02d90 pc=0x102433a54 runtime.(*mheap).allocSpan(0x105b15240, 0x62, 0x0, 0x1) runtime/mheap.go:1244 +0x164 fp=0x175a02ec0 sp=0x175a02e10 pc=0x102433124 runtime.(*mheap).alloc.func1() runtime/mheap.go:962 +0x6c fp=0x175a02f10 sp=0x175a02ec0 pc=0x102432c1c runtime.systemstack(0x3ffc000) runtime/asm_arm64.s:244 +0x6c fp=0x175a02f20 sp=0x175a02f10 pc=0x10248874c goroutine 25 gp=0xc000102c40 m=3 mp=0xc00007d008 [running, locked to thread]: runtime.systemstack_switch() runtime/asm_arm64.s:201 +0x8 fp=0xc00005d640 sp=0xc00005d630 pc=0x1024886c8 runtime.(*mheap).alloc(0xc4000?, 0x62?, 0xd8?) runtime/mheap.go:956 +0x54 fp=0xc00005d690 sp=0xc00005d640 pc=0x102432b74 runtime.(*mcache).allocLarge(0x103f7addc?, 0xc3500, 0x1) runtime/mcache.go:234 +0x74 fp=0xc00005d6e0 sp=0xc00005d690 pc=0x10241e9b4 runtime.mallocgc(0xc3500, 0x10482e140, 0x1) runtime/malloc.go:1177 +0x510 fp=0xc00005d790 sp=0xc00005d6e0 pc=0x10247bac0 runtime.makeslice(0xc00005d7d8?, 0x1024b2b24?, 0xc00005d818?) runtime/slice.go:116 +0x50 fp=0xc00005d7c0 sp=0xc00005d790 pc=0x1024844d0 github.com/erigontech/erigon-lib/etl.(*sortableBuffer).Prealloc(...) github.com/erigontech/erigon-lib@v0.0.0-00010101000000-000000000000/etl/buffers.go:156 github.com/erigontech/erigon-lib/etl.init.func2() github.com/erigontech/erigon-lib@v0.0.0-00010101000000-000000000000/etl/etl.go:185 +0x138 fp=0xc00005d820 sp=0xc00005d7c0 pc=0x102d5b8d8 sync.(*Pool).Get(0xc0009bc810) sync/pool.go:155 +0xd8 fp=0xc00005d860 sp=0xc00005d820 pc=0x1024b1d78 github.com/erigontech/erigon-lib/etl.(*Allocator).Get(0xc00011c2e8) github.com/erigontech/erigon-lib@v0.0.0-00010101000000-000000000000/etl/collector.go:53 +0x3c fp=0xc00005d8a0 sp=0xc00005d860 pc=0x102d5fbec github.com/erigontech/erigon-lib/etl.NewCollectorWithAllocator({0xc0018ae0a8, 0x16}, {0xc00004cd70, 0x4e}, 0xc00011c2e8, {0x104b49130, 0xc00078c080}) github.com/erigontech/erigon-lib@v0.0.0-00010101000000-000000000000/etl/collector.go:78 +0x44 fp=0xc00005d8f0 sp=0xc00005d8a0 pc=0x102d5fcd4 github.com/erigontech/erigon-lib/state.(*InvertedIndexRoTx).newWriter(0xc0018a8a10, {0xc00004cd70, 0x4e}, 0x0) github.com/erigontech/erigon-lib@v0.0.0-00010101000000-000000000000/state/inverted_index.go:497 +0x1b8 fp=0xc00005d9b0 sp=0xc00005d8f0 pc=0x10301b8b8 github.com/erigontech/erigon-lib/state.(*InvertedIndexRoTx).NewWriter(...) github.com/erigontech/erigon-lib@v0.0.0-00010101000000-000000000000/state/inverted_index.go:414 github.com/erigontech/erigon-lib/state.NewSharedDomains({0x104b54a30, 0xc001890240}, {0x104b49130, 0xc0000ee1a0}) github.com/erigontech/erigon-lib@v0.0.0-00010101000000-000000000000/state/domain_shared.go:121 +0x2ec fp=0xc00005db10 sp=0xc00005d9b0 pc=0x102fe1a2c github.com/erigontech/erigon/core/test.TestCommit(0xc000828340) github.com/erigontech/erigon/core/test/domains_restart_test.go:483 +0x1b4 fp=0xc00005dec0 sp=0xc00005db10 pc=0x103f23514 testing.tRunner(0xc000828340, 0x104b1f288) ```
1 parent e69290c commit 3e95cda

File tree

2 files changed

+195
-190
lines changed

2 files changed

+195
-190
lines changed

eth/rawdbreset/reset_stages.go

Lines changed: 192 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,24 @@ package rawdbreset
1818

1919
import (
2020
"context"
21+
"encoding/binary"
2122
"fmt"
23+
"math/big"
24+
"time"
2225

2326
"github.com/erigontech/erigon-db/rawdb"
2427
"github.com/erigontech/erigon-db/rawdb/blockio"
28+
"github.com/erigontech/erigon-lib/common"
2529
"github.com/erigontech/erigon-lib/common/datadir"
30+
"github.com/erigontech/erigon-lib/common/dbg"
31+
"github.com/erigontech/erigon-lib/diagnostics"
32+
"github.com/erigontech/erigon-lib/downloader/snaptype"
33+
"github.com/erigontech/erigon-lib/etl"
2634
"github.com/erigontech/erigon-lib/kv"
2735
"github.com/erigontech/erigon-lib/kv/backup"
36+
"github.com/erigontech/erigon-lib/kv/rawdbv3"
2837
"github.com/erigontech/erigon-lib/log/v3"
29-
"github.com/erigontech/erigon/eth/stagedsync"
38+
"github.com/erigontech/erigon-lib/types"
3039
"github.com/erigontech/erigon/eth/stagedsync/stages"
3140
"github.com/erigontech/erigon/turbo/services"
3241
)
@@ -86,7 +95,7 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, br services.FullBlockReader, bw *blocki
8695

8796
if br.FrozenBlocks() > 0 {
8897
logger.Info("filling db from snapshots", "blocks", br.FrozenBlocks())
89-
if err := stagedsync.FillDBFromSnapshots("filling_db_from_snapshots", context.Background(), tx, dirs, br, logger); err != nil {
98+
if err := FillDBFromSnapshots("filling_db_from_snapshots", context.Background(), tx, dirs, br, logger); err != nil {
9099
return err
91100
}
92101
_ = stages.SaveStageProgress(tx, stages.Snapshots, br.FrozenBlocks())
@@ -228,3 +237,184 @@ func Reset(ctx context.Context, db kv.RwDB, stagesList ...stages.SyncStage) erro
228237
return nil
229238
})
230239
}
240+
241+
func FillDBFromSnapshots(logPrefix string, ctx context.Context, tx kv.RwTx, dirs datadir.Dirs, blockReader services.FullBlockReader, logger log.Logger) error {
242+
startTime := time.Now()
243+
blocksAvailable := blockReader.FrozenBlocks()
244+
logEvery := time.NewTicker(20 * time.Second)
245+
defer logEvery.Stop()
246+
pruneMarkerBlockThreshold := GetPruneMarkerSafeThreshold(blockReader)
247+
248+
// updating the progress of further stages (but only forward) that are contained inside of snapshots
249+
for _, stage := range []stages.SyncStage{stages.Headers, stages.Bodies, stages.BlockHashes, stages.Senders} {
250+
progress, err := stages.GetStageProgress(tx, stage)
251+
252+
if err != nil {
253+
return fmt.Errorf("get %s stage progress to advance: %w", stage, err)
254+
}
255+
if progress >= blocksAvailable {
256+
continue
257+
}
258+
259+
if err = stages.SaveStageProgress(tx, stage, blocksAvailable); err != nil {
260+
return fmt.Errorf("advancing %s stage: %w", stage, err)
261+
}
262+
263+
switch stage {
264+
case stages.Headers:
265+
h2n := etl.NewCollector(logPrefix, dirs.Tmp, etl.NewSortableBuffer(etl.BufferOptimalSize/2), logger)
266+
defer h2n.Close()
267+
h2n.SortAndFlushInBackground(true)
268+
h2n.LogLvl(log.LvlDebug)
269+
270+
// fill some small tables from snapshots, in future we may store this data in snapshots also, but
271+
// for now easier just store them in db
272+
td := big.NewInt(0)
273+
blockNumBytes := make([]byte, 8)
274+
if err := blockReader.HeadersRange(ctx, func(header *types.Header) error {
275+
blockNum, blockHash := header.Number.Uint64(), header.Hash()
276+
td.Add(td, header.Difficulty)
277+
// What can happen if chaindata is deleted is that maybe header.seg progress is lower or higher than
278+
// body.seg progress. In this case we need to skip the header, and "normalize" the progress to keep them in sync.
279+
if blockNum > blocksAvailable {
280+
return nil // This can actually happen as FrozenBlocks() is SegmentIdMax() and not the last .seg
281+
}
282+
if !dbg.PruneTotalDifficulty() {
283+
if err := rawdb.WriteTd(tx, blockHash, blockNum, td); err != nil {
284+
return err
285+
}
286+
}
287+
288+
// Write marker for pruning only if we are above our safe threshold
289+
if blockNum >= pruneMarkerBlockThreshold || blockNum == 0 {
290+
if err := rawdb.WriteCanonicalHash(tx, blockHash, blockNum); err != nil {
291+
return err
292+
}
293+
binary.BigEndian.PutUint64(blockNumBytes, blockNum)
294+
if err := h2n.Collect(blockHash[:], blockNumBytes); err != nil {
295+
return err
296+
}
297+
if dbg.PruneTotalDifficulty() {
298+
if err := rawdb.WriteTd(tx, blockHash, blockNum, td); err != nil {
299+
return err
300+
}
301+
}
302+
}
303+
select {
304+
case <-ctx.Done():
305+
return ctx.Err()
306+
case <-logEvery.C:
307+
diagnostics.Send(diagnostics.SnapshotFillDBStageUpdate{
308+
Stage: diagnostics.SnapshotFillDBStage{
309+
StageName: string(stage),
310+
Current: header.Number.Uint64(),
311+
Total: blocksAvailable,
312+
},
313+
TimeElapsed: time.Since(startTime).Seconds(),
314+
})
315+
logger.Info(fmt.Sprintf("[%s] Total difficulty index: %s/%s", logPrefix,
316+
common.PrettyCounter(header.Number.Uint64()), common.PrettyCounter(blockReader.FrozenBlocks())))
317+
default:
318+
}
319+
return nil
320+
}); err != nil {
321+
return err
322+
}
323+
if err := h2n.Load(tx, kv.HeaderNumber, etl.IdentityLoadFunc, etl.TransformArgs{}); err != nil {
324+
return err
325+
}
326+
canonicalHash, ok, err := blockReader.CanonicalHash(ctx, tx, blocksAvailable)
327+
if err != nil {
328+
return err
329+
}
330+
if !ok {
331+
return fmt.Errorf("canonical marker not found: %d", blocksAvailable)
332+
}
333+
if err = rawdb.WriteHeadHeaderHash(tx, canonicalHash); err != nil {
334+
return err
335+
}
336+
337+
case stages.Bodies:
338+
firstTxNum := blockReader.FirstTxnNumNotInSnapshots()
339+
if err := tx.ResetSequence(kv.EthTx, firstTxNum); err != nil {
340+
return err
341+
}
342+
343+
_ = tx.ClearTable(kv.MaxTxNum)
344+
if err := blockReader.IterateFrozenBodies(func(blockNum, baseTxNum, txAmount uint64) error {
345+
select {
346+
case <-ctx.Done():
347+
return ctx.Err()
348+
case <-logEvery.C:
349+
diagnostics.Send(diagnostics.SnapshotFillDBStageUpdate{
350+
Stage: diagnostics.SnapshotFillDBStage{
351+
StageName: string(stage),
352+
Current: blockNum,
353+
Total: blocksAvailable,
354+
},
355+
TimeElapsed: time.Since(startTime).Seconds(),
356+
})
357+
logger.Info(fmt.Sprintf("[%s] MaxTxNums index: %s/%s", logPrefix, common.PrettyCounter(blockNum), common.PrettyCounter(blockReader.FrozenBlocks())))
358+
default:
359+
}
360+
if baseTxNum+txAmount == 0 {
361+
panic(baseTxNum + txAmount) //uint-underflow
362+
}
363+
maxTxNum := baseTxNum + txAmount - 1
364+
// What can happen if chaindata is deleted is that maybe header.seg progress is lower or higher than
365+
// body.seg progress. In this case we need to skip the header, and "normalize" the progress to keep them in sync.
366+
if blockNum > blocksAvailable {
367+
return nil // This can actually happen as FrozenBlocks() is SegmentIdMax() and not the last .seg
368+
}
369+
if blockNum >= pruneMarkerBlockThreshold || blockNum == 0 {
370+
if err := rawdbv3.TxNums.Append(tx, blockNum, maxTxNum); err != nil {
371+
return fmt.Errorf("%w. blockNum=%d, maxTxNum=%d", err, blockNum, maxTxNum)
372+
}
373+
}
374+
return nil
375+
}); err != nil {
376+
return fmt.Errorf("build txNum => blockNum mapping: %w", err)
377+
}
378+
if blockReader.FrozenBlocks() > 0 {
379+
if err := rawdb.AppendCanonicalTxNums(tx, blockReader.FrozenBlocks()+1); err != nil {
380+
return err
381+
}
382+
} else {
383+
if err := rawdb.AppendCanonicalTxNums(tx, 0); err != nil {
384+
return err
385+
}
386+
}
387+
388+
default:
389+
diagnostics.Send(diagnostics.SnapshotFillDBStageUpdate{
390+
Stage: diagnostics.SnapshotFillDBStage{
391+
StageName: string(stage),
392+
Current: blocksAvailable, // as we are done with other stages
393+
Total: blocksAvailable,
394+
},
395+
TimeElapsed: time.Since(startTime).Seconds(),
396+
})
397+
}
398+
}
399+
return nil
400+
}
401+
402+
const (
403+
/*
404+
we strive to read indexes from snapshots instead to db... this means that there can be sometimes (e.g when we merged past indexes),
405+
a situation when we need to read indexes and we choose to read them from either a corrupt index or an incomplete index.
406+
so we need to extend the threshold to > max_merge_segment_size.
407+
*/
408+
pruneMarkerSafeThreshold = snaptype.Erigon2MergeLimit * 1.5 // 1.5x the merge limit
409+
)
410+
411+
func GetPruneMarkerSafeThreshold(blockReader services.FullBlockReader) uint64 {
412+
snapProgress := min(blockReader.FrozenBorBlocks(), blockReader.FrozenBlocks())
413+
if blockReader.BorSnapshots() == nil {
414+
snapProgress = blockReader.FrozenBlocks()
415+
}
416+
if snapProgress < pruneMarkerSafeThreshold {
417+
return 0
418+
}
419+
return snapProgress - pruneMarkerSafeThreshold
420+
}

0 commit comments

Comments
 (0)