Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions db/state/domain_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"fmt"
"math"

btree2 "github.com/tidwall/btree"
"github.com/anacrolix/btree"

"github.com/erigontech/erigon/common"
"github.com/erigontech/erigon/common/log/v3"
Expand All @@ -50,7 +50,7 @@
cDup kv.CursorDupSort
cNonDup kv.Cursor

iter btree2.MapIter[string, []dataWithTxNum]
iter *btree.MapIterator[string, []dataWithTxNum]
kvReader *seg.Reader
hist *seg.PagedReader
btCursor *btindex.Cursor
Expand Down Expand Up @@ -359,7 +359,7 @@
// debugIteratePrefix iterates over key-value pairs of the storage domain that start with given prefix
//
// k and v lifetime is bounded by the lifetime of the iterator
func (dt *DomainRoTx) debugIteratePrefixLatest(prefix []byte, ramIter btree2.MapIter[string, []dataWithTxNum], it func(k []byte, v []byte, step kv.Step) (cont bool, err error), roTx kv.Tx) error {
func (dt *DomainRoTx) debugIteratePrefixLatest(prefix []byte, storage *btree.Map[string, []dataWithTxNum], it func(k []byte, v []byte, step kv.Step) (cont bool, err error), roTx kv.Tx) error {

Check failure on line 362 in db/state/domain_stream.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 87 to the 60 allowed.

See more on https://sonarcloud.io/project/issues?id=erigontech_erigon&issues=AZzRperXdpAhUFcWDPDQ&open=AZzRperXdpAhUFcWDPDQ&pullRequest=19742
// Implementation:
// File endTxNum = last txNum of file step
// DB endTxNum = first txNum of step in db
Expand All @@ -375,13 +375,15 @@
var k, v []byte
var err error

if ramIter.Seek(string(prefix)) {
k := common.ToBytesZeroCopy(ramIter.Key())

v = ramIter.Value()[len(ramIter.Value())-1].data

if len(k) > 0 && bytes.HasPrefix(k, prefix) {
heap.Push(cpPtr, &CursorItem{t: RAM_CURSOR, key: common.Copy(k), val: common.Copy(v), step: 0, iter: ramIter, endTxNum: math.MaxUint64, reverse: true})
if storage != nil {
ramIt := storage.Iterator()
ramIt.SeekGE(string(prefix))
if ramIt.Valid() {
k := common.ToBytesZeroCopy(ramIt.Cur())
v = ramIt.Value()[len(ramIt.Value())-1].data
if len(k) > 0 && bytes.HasPrefix(k, prefix) {
heap.Push(cpPtr, &CursorItem{t: RAM_CURSOR, key: common.Copy(k), val: common.Copy(v), step: 0, iter: &ramIt, endTxNum: math.MaxUint64, reverse: true})
}
}
}

Expand Down Expand Up @@ -430,8 +432,9 @@
ci1 := heap.Pop(cpPtr).(*CursorItem)
switch ci1.t {
case RAM_CURSOR:
if ci1.iter.Next() {
k = common.ToBytesZeroCopy(ci1.iter.Key())
ci1.iter.Next()
if ci1.iter.Valid() {
k = common.ToBytesZeroCopy(ci1.iter.Cur())
if k != nil && bytes.HasPrefix(k, prefix) {
ci1.key = common.Copy(k)
ci1.val = common.Copy(ci1.iter.Value()[len(ci1.iter.Value())-1].data)
Expand Down
29 changes: 15 additions & 14 deletions db/state/temporal_mem_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (
"fmt"
"maps"
"sort"
"strings"
"sync"

btree2 "github.com/tidwall/btree"
"github.com/anacrolix/btree"

"github.com/erigontech/erigon/common"
"github.com/erigontech/erigon/db/kv"
Expand Down Expand Up @@ -60,7 +61,7 @@ type TemporalMemBatch struct {

latestStateLock sync.RWMutex
domains [kv.DomainLen]map[string][]dataWithTxNum
storage *btree2.Map[string, []dataWithTxNum] // TODO: replace hardcoded domain name to per-config configuration of available Guarantees/AccessMethods (range vs get)
storage btree.Map[string, []dataWithTxNum] // TODO: replace hardcoded domain name to per-config configuration of available Guarantees/AccessMethods (range vs get)

domainWriters [kv.DomainLen]*DomainBufferedWriter
iiWriters []*InvertedIndexBufferedWriter
Expand All @@ -81,7 +82,7 @@ type TemporalMemBatch struct {

func NewTemporalMemBatch(tx kv.TemporalTx, ioMetrics any) *TemporalMemBatch {
sd := &TemporalMemBatch{
storage: btree2.NewMap[string, []dataWithTxNum](128),
storage: btree.MakeMap[string, []dataWithTxNum](strings.Compare),
metrics: ioMetrics.(*changeset.DomainMetrics),
inMemHistoryReads: true,
}
Expand Down Expand Up @@ -158,14 +159,14 @@ func (sd *TemporalMemBatch) putLatest(domain kv.Domain, key string, val []byte,
if domain == kv.StorageDomain {
if old, ok := sd.storage.Get(key); ok {
if sd.inMemHistoryReads {
sd.storage.Set(key, append(old, valWithStep))
sd.storage.Upsert(key, append(old, valWithStep))
putValueSize += len(val)
} else {
putValueSize += len(val) - len(old[len(old)-1].data)
sd.storage.Set(key, []dataWithTxNum{valWithStep})
sd.storage.Upsert(key, []dataWithTxNum{valWithStep})
}
} else {
sd.storage.Set(key, []dataWithTxNum{valWithStep})
sd.storage.Upsert(key, []dataWithTxNum{valWithStep})
putKeySize += len(key)
putValueSize += len(val)
}
Expand Down Expand Up @@ -287,7 +288,7 @@ func (sd *TemporalMemBatch) ClearRam() {
sd.domains[i] = map[string][]dataWithTxNum{}
}

sd.storage = btree2.NewMap[string, []dataWithTxNum](128)
sd.storage.Reset()
sd.unwindToTxNum = 0
sd.unwindChangeset = nil

Expand All @@ -308,12 +309,12 @@ func (sd *TemporalMemBatch) ClearRam() {
func (sd *TemporalMemBatch) IteratePrefix(domain kv.Domain, prefix []byte, roTx kv.Tx, it func(k []byte, v []byte, step kv.Step) (cont bool, err error)) error {
sd.latestStateLock.RLock()
defer sd.latestStateLock.RUnlock()
var ramIter btree2.MapIter[string, []dataWithTxNum]
var storageMap *btree.Map[string, []dataWithTxNum]
if domain == kv.StorageDomain {
ramIter = sd.storage.Iter()
storageMap = &sd.storage
}

return AggTx(roTx).d[domain].debugIteratePrefixLatest(prefix, ramIter, it, roTx)
return AggTx(roTx).d[domain].debugIteratePrefixLatest(prefix, storageMap, it, roTx)
}

func (sd *TemporalMemBatch) HasPrefix(domain kv.Domain, prefix []byte, roTx kv.Tx) ([]byte, []byte, bool, error) {
Expand Down Expand Up @@ -462,10 +463,10 @@ func (sd *TemporalMemBatch) Merge(o kv.TemporalMemBatch) error {
maps.Copy(entries, otherEntries)
}

other.storage.Scan(func(key string, value []dataWithTxNum) bool {
sd.storage.Set(key, value)
return true
})
iter := other.storage.Iterator()
for iter.First(); iter.Valid(); iter.Next() {
sd.storage.Upsert(iter.Cur(), iter.Value())
}

for domain, writer := range other.domainWriters {
sd.pastDomainWriters[domain] = append(sd.pastDomainWriters[domain], writer)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/RoaringBitmap/roaring/v2 v2.14.4
github.com/alecthomas/kong v0.8.1
github.com/anacrolix/btree v0.1.1
github.com/anacrolix/envpprof v1.5.0
github.com/anacrolix/generics v0.2.0
github.com/anacrolix/go-libutp v1.3.3-0.20251121015447-f294e5ed5b4d
Expand Down Expand Up @@ -158,7 +159,6 @@ require (
github.com/alfatraining/structtag v1.0.0 // indirect
github.com/alingse/asasalint v0.0.11 // indirect
github.com/alingse/nilnesserr v0.2.0 // indirect
github.com/anacrolix/btree v0.1.1 // indirect
github.com/anacrolix/chansync v0.7.0 // indirect
github.com/anacrolix/dht/v2 v2.23.0 // indirect
github.com/anacrolix/missinggo v1.3.0 // indirect
Expand Down
Loading