Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions agents.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ This file provides guidance for AI agents working with this codebase.

```bash
make erigon # Build main binary (./build/bin/erigon)
make all # Build all binaries
make integration # Build integration test binary
make lint # Run golangci-lint + mod tidy check
make test-short # Quick unit tests (-short -failfast)
make test-all # Full test suite with coverage
make lint # Run golangci-lint + mod tidy check
make gen # Generate all auto-generated code (mocks, grpc, etc.)
```

Before committing, always verify changes with: `make lint && make erigon integration`

Run specific tests:
```bash
go test ./execution/stagedsync/...
Expand Down
95 changes: 93 additions & 2 deletions db/kv/stream/stream_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
type (
U64 Uno[uint64]
KV Duo[[]byte, []byte] // key, value
KU64 Duo[[]byte, uint64] // key, txnum
U64V Duo[uint64, []byte] // txnum, value
)

var (
EmptyU64 = &Empty[uint64]{}
EmptyKV = &EmptyDuo[[]byte, []byte]{}
EmptyU64 = &Empty[uint64]{}
EmptyKV = &EmptyDuo[[]byte, []byte]{}
EmptyKU64 = &EmptyDuo[[]byte, uint64]{}
)

var (
Expand Down Expand Up @@ -197,6 +199,95 @@ func (m *UnionKVIter) Close() {
}
}

// MultisetDuoIter - sorted merge of two Duo[[]byte, V] streams preserving duplicates.
// Unlike UnionKVIter which deduplicates on equal keys, this emits both.
// When keys are equal, x is emitted first; y is kept for the next call.
type MultisetDuoIter[V any] struct {
x, y Duo[[]byte, V]
xHasNext, yHasNext bool
xNextK, yNextK []byte
xNextV, yNextV V
limit int
err error
}

func multisetDuo[V any](x, y Duo[[]byte, V], limit int) Duo[[]byte, V] {
if x == nil && y == nil {
return &EmptyDuo[[]byte, V]{}
}
if x == nil {
return y
}
if y == nil {
return x
}
m := &MultisetDuoIter[V]{x: x, y: y, limit: limit}
m.advanceX()
m.advanceY()
return m
}

// MultisetKV returns a sorted merge of two KV streams preserving duplicates.
func MultisetKV(x, y KV, limit int) KV { return multisetDuo[[]byte](x, y, limit) }

// MultisetKU64 returns a sorted merge of two KU64 streams preserving duplicates.
func MultisetKU64(x, y KU64, limit int) KU64 { return multisetDuo[uint64](x, y, limit) }

func (m *MultisetDuoIter[V]) HasNext() bool {
return m.err != nil || (m.limit != 0 && (m.xHasNext || m.yHasNext))
}
func (m *MultisetDuoIter[V]) advanceX() {
if m.err != nil {
return
}
m.xHasNext = m.x.HasNext()
if m.xHasNext {
m.xNextK, m.xNextV, m.err = m.x.Next()
}
}
func (m *MultisetDuoIter[V]) advanceY() {
if m.err != nil {
return
}
m.yHasNext = m.y.HasNext()
if m.yHasNext {
m.yNextK, m.yNextV, m.err = m.y.Next()
}
}
func (m *MultisetDuoIter[V]) Next() ([]byte, V, error) {
var zero V
if m.err != nil {
return nil, zero, m.err
}
m.limit--
if m.xHasNext && m.yHasNext {
if bytes.Compare(m.xNextK, m.yNextK) <= 0 {
k, v, err := m.xNextK, m.xNextV, m.err
m.advanceX()
return k, v, err
}
k, v, err := m.yNextK, m.yNextV, m.err
m.advanceY()
return k, v, err
}
if m.xHasNext {
k, v, err := m.xNextK, m.xNextV, m.err
m.advanceX()
return k, v, err
}
k, v, err := m.yNextK, m.yNextV, m.err
m.advanceY()
return k, v, err
}
func (m *MultisetDuoIter[V]) Close() {
if x, ok := m.x.(Closer); ok {
x.Close()
}
if y, ok := m.y.(Closer); ok {
y.Close()
}
}

type Closer interface {
Close()
}
82 changes: 82 additions & 0 deletions db/kv/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,88 @@ func TestUnionPairs(t *testing.T) {
})
}

func TestMultisetKV(t *testing.T) {
db := memdb.NewTestDB(t, dbcfg.ChainDB)
ctx := context.Background()
t.Run("preserves duplicates", func(t *testing.T) {
require := require.New(t)
tx, _ := db.BeginRw(ctx)
defer tx.Rollback()
_ = tx.Put(kv.HeaderNumber, []byte{1}, []byte{1})
_ = tx.Put(kv.HeaderNumber, []byte{3}, []byte{1})
_ = tx.Put(kv.TblAccountVals, []byte{2}, []byte{9})
_ = tx.Put(kv.TblAccountVals, []byte{3}, []byte{9})
it, _ := tx.Range(kv.HeaderNumber, nil, nil, order.Asc, kv.Unlim)
it2, _ := tx.Range(kv.TblAccountVals, nil, nil, order.Asc, kv.Unlim)
keys, values, err := stream.ToArrayKV(stream.MultisetKV(it, it2, -1))
require.NoError(err)
// Key {3} appears twice (from both streams), unlike UnionKV which deduplicates
require.Equal([][]byte{{1}, {2}, {3}, {3}}, keys)
require.Equal([][]byte{{1}, {9}, {1}, {9}}, values)
})
t.Run("sorted merge", func(t *testing.T) {
require := require.New(t)
tx, _ := db.BeginRw(ctx)
defer tx.Rollback()
_ = tx.Put(kv.HeaderNumber, []byte{1}, []byte{1})
_ = tx.Put(kv.HeaderNumber, []byte{4}, []byte{1})
_ = tx.Put(kv.TblAccountVals, []byte{2}, []byte{9})
_ = tx.Put(kv.TblAccountVals, []byte{5}, []byte{9})
it, _ := tx.Range(kv.HeaderNumber, nil, nil, order.Asc, kv.Unlim)
it2, _ := tx.Range(kv.TblAccountVals, nil, nil, order.Asc, kv.Unlim)
keys, _, err := stream.ToArrayKV(stream.MultisetKV(it, it2, -1))
require.NoError(err)
require.Equal([][]byte{{1}, {2}, {4}, {5}}, keys)
})
t.Run("empty left", func(t *testing.T) {
require := require.New(t)
tx, _ := db.BeginRw(ctx)
defer tx.Rollback()
_ = tx.Put(kv.TblAccountVals, []byte{2}, []byte{9})
_ = tx.Put(kv.TblAccountVals, []byte{3}, []byte{9})
it, _ := tx.Range(kv.HeaderNumber, nil, nil, order.Asc, kv.Unlim)
it2, _ := tx.Range(kv.TblAccountVals, nil, nil, order.Asc, kv.Unlim)
keys, _, err := stream.ToArrayKV(stream.MultisetKV(it, it2, -1))
require.NoError(err)
require.Equal([][]byte{{2}, {3}}, keys)
})
t.Run("empty right", func(t *testing.T) {
require := require.New(t)
tx, _ := db.BeginRw(ctx)
defer tx.Rollback()
_ = tx.Put(kv.HeaderNumber, []byte{1}, []byte{1})
_ = tx.Put(kv.HeaderNumber, []byte{3}, []byte{1})
it, _ := tx.Range(kv.HeaderNumber, nil, nil, order.Asc, kv.Unlim)
it2, _ := tx.Range(kv.TblAccountVals, nil, nil, order.Asc, kv.Unlim)
keys, _, err := stream.ToArrayKV(stream.MultisetKV(it, it2, -1))
require.NoError(err)
require.Equal([][]byte{{1}, {3}}, keys)
})
t.Run("both empty", func(t *testing.T) {
require := require.New(t)
tx, _ := db.BeginRw(ctx)
defer tx.Rollback()
it, _ := tx.Range(kv.HeaderNumber, nil, nil, order.Asc, kv.Unlim)
it2, _ := tx.Range(kv.TblAccountVals, nil, nil, order.Asc, kv.Unlim)
m := stream.MultisetKV(it, it2, -1)
require.False(m.HasNext())
})
t.Run("limit", func(t *testing.T) {
require := require.New(t)
tx, _ := db.BeginRw(ctx)
defer tx.Rollback()
_ = tx.Put(kv.HeaderNumber, []byte{1}, []byte{1})
_ = tx.Put(kv.HeaderNumber, []byte{3}, []byte{1})
_ = tx.Put(kv.TblAccountVals, []byte{2}, []byte{9})
_ = tx.Put(kv.TblAccountVals, []byte{3}, []byte{9})
it, _ := tx.Range(kv.HeaderNumber, nil, nil, order.Asc, kv.Unlim)
it2, _ := tx.Range(kv.TblAccountVals, nil, nil, order.Asc, kv.Unlim)
keys, _, err := stream.ToArrayKV(stream.MultisetKV(it, it2, 2))
require.NoError(err)
require.Equal([][]byte{{1}, {2}}, keys)
})
}

func TestIntersect(t *testing.T) {
t.Run("intersect", func(t *testing.T) {
s1 := stream.Array[uint64]([]uint64{1, 3, 4, 5, 6, 7})
Expand Down
21 changes: 21 additions & 0 deletions db/state/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,6 +1482,27 @@ func (ht *HistoryRoTx) HistoryRange(fromTxNum, toTxNum int, asc order.By, limit
return stream.UnionKV(itOnFiles, itOnDB, limit), nil
}

// HistoryKeyTxNumRange returns (key, txNum) pairs for every txNum at which a key changed in [fromTxNum, toTxNum).
// Output is sorted by key ASC. Duplicates across files are not deduplicated.
func (ht *HistoryRoTx) HistoryKeyTxNumRange(fromTxNum, toTxNum int, asc order.By, limit int, roTx kv.Tx) (stream.KU64, error) {
if asc == order.Desc {
panic("not supported yet")
}
itOnFiles, err := ht.iterateKeyTxNumFrozen(fromTxNum, toTxNum, asc, limit)
if err != nil {
return nil, err
}
dbFrom := fromTxNum
if len(ht.iit.files) > 0 {
dbFrom = max(fromTxNum, int(ht.iit.files.EndTxNum()))
}
itOnDB, err := ht.iterateKeyTxNumRecent(dbFrom, toTxNum, asc, limit, roTx)
if err != nil {
return nil, err
}
return stream.MultisetKU64(itOnFiles, itOnDB, limit), nil
}

func (ht *HistoryRoTx) HistoryDump(fromTxNum, toTxNum int, keyToDump *[]byte, dumpTo func(key []byte, txNum uint64, val []byte)) error {
if len(ht.iit.files) == 0 {
return nil
Expand Down
Loading
Loading