Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 18 additions & 62 deletions execution/stagedsync/stage_txlookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,17 @@ import (
"context"
"encoding/binary"
"fmt"
"time"

"github.com/erigontech/erigon/common"
"github.com/erigontech/erigon/common/dbg"
"github.com/erigontech/erigon/common/hexutil"
"github.com/erigontech/erigon/common/log/v3"
"github.com/erigontech/erigon/db/etl"
"github.com/erigontech/erigon/db/kv"
mdbx2 "github.com/erigontech/erigon/db/kv/mdbx"
"github.com/erigontech/erigon/db/kv/prune"
"github.com/erigontech/erigon/db/kv/rawdbv3"
"github.com/erigontech/erigon/db/rawdb"
"github.com/erigontech/erigon/db/services"
"github.com/erigontech/erigon/db/state"
"github.com/erigontech/erigon/execution/stagedsync/stages"
"github.com/erigontech/erigon/execution/types"
)
Expand Down Expand Up @@ -227,73 +224,32 @@ func PruneTxLookup(s *PruneState, tx kv.RwTx, cfg TxLookupCfg, ctx context.Conte
blockTo = cfg.blockReader.CanPruneTo(s.ForwardProgress)
}

if blockFrom >= blockTo {
return nil
}

txNumReader := cfg.blockReader.TxnumReader()
txFrom, err := txNumReader.Min(ctx, tx, blockFrom)
if err != nil {
return fmt.Errorf("txNumReader.Min(%d): %w", blockFrom, err)
}
txTo, err := txNumReader.Max(ctx, tx, blockTo)
if err != nil {
return fmt.Errorf("txNumReader.Max(%d): %w", blockTo, err)
}
if txFrom >= txTo {
return nil
// At chain tip, cap the number of blocks pruned per cycle to bound the
// time spent inside a single DB transaction. Each block adds ~150 random
// key deletions to TxLookup, so 100 blocks ≈ 15 000 deletes — a few
// tens of milliseconds. During initial sync there is no slot budget
// pressure, so we let it run unrestricted.
if !s.CurrentSyncCycle.IsInitialCycle {
const maxBlocksPerCycle = 100
blockTo = min(blockTo, blockFrom+maxBlocksPerCycle)
}

prevStat, err := state.GetPruneValProgress(tx, []byte(kv.TxLookup))
if err != nil {
return err
}
if prevStat != nil && prevStat.TxFrom == txFrom && prevStat.TxTo == txTo && prevStat.ValueProgress == prune.Done {
if blockFrom >= blockTo {
return nil
}
if prevStat == nil {
prevStat = &prune.Stat{}
}

valsRwCursor, err := tx.RwCursor(kv.TxLookup)
if err != nil {
return fmt.Errorf("create TxLookup cursor: %w", err)
}
defer valsRwCursor.Close()
var valsCursor kv.PseudoDupSortRwCursor
switch c := valsRwCursor.(type) {
case *mdbx2.MdbxCursor:
valsCursor = &mdbx2.MdbxCursorPseudoDupSort{MdbxCursor: c}
default:
return fmt.Errorf("unexpected cursor type %T for table %s", valsRwCursor, kv.TxLookup)
}

logEvery := time.NewTicker(logInterval)
defer logEvery.Stop()

pruneTimeout := 2 * time.Second
if s.CurrentSyncCycle.IsInitialCycle {
pruneTimeout = time.Hour
}
pruneCtx, pruneCancel := context.WithTimeout(ctx, pruneTimeout)
defer pruneCancel()

pruneStat, err := prune.TableScanningPrune(pruneCtx, logPrefix, "txlookup", txFrom, txTo, 0, 1,
logEvery, logger, nil, valsCursor, false, prevStat, prune.ValueOffset8StorageMode)
if err != nil {
// Use precise block-based deletion instead of full table scan.
// TxLookup keys are tx hashes (random), so scanning the entire table to find
// entries within a txNum range is O(table_size). Instead, we iterate
// HeaderCanonical for the block range, look up each block's transactions,
// and delete their TxLookup entries directly — O(txs_to_delete).
// etl.Transform uses ExtractEndKey as exclusive bound, therefore blockTo + 1
if err := deleteTxLookupRange(tx, logPrefix, blockFrom, blockTo+1, ctx, cfg, logger); err != nil {
return fmt.Errorf("prune TxLookup: %w", err)
}
defer func() {
pruneStat.TxFrom, pruneStat.TxTo = txFrom, txTo
if e := state.SavePruneValProgress(tx, kv.TxLookup, pruneStat); e != nil {
logger.Error("[snapshots] save prune val progress", "name", "txlookup", "err", e)
}
}()

if pruneStat.ValueProgress == prune.Done {
if err = s.DoneAt(tx, blockTo); err != nil {
return err
}
if err = s.DoneAt(tx, blockTo); err != nil {
return err
}

return nil
Expand Down
Loading