Skip to content

Commit d7120c3

Browse files
AskAlexSharovinfo@weblogix.biz
authored andcommitted
SequenceBuilder: add .Reset() method and use it during merge (#19567)
Problem: <img width="1437" height="1233" alt="Screenshot 2026-03-02 at 17 20 26" src="https://github.com/user-attachments/assets/29993bd6-2a23-4c49-946c-62b390014818" /> In PR: - re-use 1 Builder object for many EF merges and many EF builds (in II.mergeFiles) - moved `Merge` method from `Reader` to `Builder` - moved reusable iterators for `Merge` method inside `Builder` object - “alloc=2.7g sys=5.4g” -> “alloc=2.7g sys=2.7g” (during large .rf file merge) --------- Co-authored-by: info@weblogix.biz <admin@10gbps.weblogix.it>
1 parent 575d1c5 commit d7120c3

File tree

8 files changed

+175
-57
lines changed

8 files changed

+175
-57
lines changed

cmd/mcp/README.md

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,41 +118,46 @@ for datadir mode.
118118
## Available Tools
119119

120120
### Ethereum Standard (eth_*)
121+
121122
`eth_blockNumber`, `eth_getBlockByNumber`, `eth_getBlockByHash`,
122123
`eth_getBalance`, `eth_getTransactionByHash`, `eth_getTransactionReceipt`,
123124
`eth_getBlockReceipts`, `eth_getLogs`, `eth_getCode`, `eth_getStorageAt`,
124125
`eth_getTransactionCount`, `eth_call`, `eth_estimateGas`, `eth_gasPrice`,
125126
`eth_chainId`, `eth_syncing`, `eth_getProof`, and more.
126127

127128
### Erigon-Specific (erigon_*)
129+
128130
`erigon_forks`, `erigon_blockNumber`, `erigon_getHeaderByNumber`,
129131
`erigon_getHeaderByHash`, `erigon_getBlockByTimestamp`,
130132
`erigon_getBalanceChangesInBlock`, `erigon_getLogsByHash`,
131133
`erigon_getLogs`, `erigon_getBlockReceiptsByBlockHash`, `erigon_nodeInfo`.
132134

133135
### Otterscan (ots_*)
136+
134137
`ots_getApiLevel`, `ots_getInternalOperations`,
135138
`ots_searchTransactionsBefore`, `ots_searchTransactionsAfter`,
136139
`ots_getBlockDetails`, `ots_getBlockTransactions`, `ots_hasCode`,
137140
`ots_traceTransaction`, `ots_getTransactionError`,
138141
`ots_getTransactionBySenderAndNonce`, `ots_getContractCreator`.
139142

140143
### Log Analysis
144+
141145
`logs_tail`, `logs_head`, `logs_grep`, `logs_stats` — requires `--log.dir`
142146
or `--datadir` to locate Erigon/torrent log files.
143147

144148
### Metrics
149+
145150
`metrics_list`, `metrics_get` — only available in embedded mode (inside Erigon).
146151
In standalone mode, these return an informational message.
147152

148153
## Flags
149154

150-
| Flag | Default | Description |
151-
|------|---------|-------------|
152-
| `--rpc.url` | `http://127.0.0.1:8545` | Erigon JSON-RPC endpoint URL |
153-
| `--port` | 0 | JSON-RPC port shorthand |
154-
| `--datadir` | | Erigon data directory (enables direct DB mode) |
155-
| `--private.api.addr` | `127.0.0.1:9090` | gRPC private API (with --datadir) |
156-
| `--transport` | `stdio` | Transport: `stdio` or `sse` |
157-
| `--sse.addr` | `127.0.0.1:8553` | SSE listen address |
158-
| `--log.dir` | | Log directory (overrides datadir detection) |
155+
| Flag | Default | Description |
156+
|----------------------|-------------------------|------------------------------------------------|
157+
| `--rpc.url` | `http://127.0.0.1:8545` | Erigon JSON-RPC endpoint URL |
158+
| `--port` | 0 | JSON-RPC port shorthand |
159+
| `--datadir` | | Erigon data directory (enables direct DB mode) |
160+
| `--private.api.addr` | `127.0.0.1:9090` | gRPC private API (with --datadir) |
161+
| `--transport` | `stdio` | Transport: `stdio` or `sse` |
162+
| `--sse.addr` | `127.0.0.1:8553` | SSE listen address |
163+
| `--log.dir` | | Log directory (overrides datadir detection) |

db/recsplit/eliasfano32/elias_fano.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (ef *EliasFano) deriveFields() int {
108108
jumpWords := ef.jumpSizeWords()
109109
totalWords := wordsLowerBits + wordsUpperBits + jumpWords
110110
//fmt.Printf("EF: %d, %d,%d,%d\n", totalWords, wordsLowerBits, wordsUpperBits, jumpWords)
111-
if ef.data == nil {
111+
if cap(ef.data) < totalWords {
112112
ef.data = make([]uint64, totalWords)
113113
} else {
114114
ef.data = ef.data[:totalWords]
@@ -120,6 +120,20 @@ func (ef *EliasFano) deriveFields() int {
120120
return wordsUpperBits
121121
}
122122

123+
// ResetForWrite reinitializes the EliasFano for writing a new sequence, reusing
124+
// the existing data slice if it has sufficient capacity (avoiding allocation).
125+
// The caller must call Build() after all AddOffset calls, same as with NewEliasFano.
126+
func (ef *EliasFano) ResetForWrite(count, maxOffset uint64) {
127+
ef.count = count - 1
128+
ef.maxOffset = maxOffset
129+
ef.u = maxOffset + 1
130+
ef.i = 0
131+
ef.wordsUpperBits = ef.deriveFields()
132+
// Zero out the backing array so OR-style setBits starts from a clean slate.
133+
// deriveFields() may have resliced ef.data without zeroing it.
134+
clear(ef.data)
135+
}
136+
123137
// Build construct Elias Fano index for a given sequences
124138
func (ef *EliasFano) Build() {
125139
for i, c, lastSuperQ := uint64(0), uint64(0), uint64(0); i < uint64(ef.wordsUpperBits); i++ {

db/recsplit/multiencseq/sequence_builder.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ type SequenceBuilder struct {
3333
smallBuf [SIMPLE_SEQUENCE_MAX_THRESHOLD]uint32 // rebased values for simple encoding (count <= 16)
3434
smallCount uint8
3535
rebasedEf *eliasfano32.EliasFano // direct rebased EF for large sequences (count > 16)
36+
it1 SequenceIterator
37+
it2 SequenceIterator
3638
}
3739

3840
// Creates a new builder. The builder is not meant to be reused. The construction
@@ -60,6 +62,23 @@ func NewBuilder(baseNum, count, maxOffset uint64) *SequenceBuilder {
6062
return &SequenceBuilder{baseNum: baseNum}
6163
}
6264

65+
// Reset reinitializes the builder for a new sequence, reusing the existing object
66+
// and its internal EliasFano allocation where possible.
67+
// Same parameter semantics as NewBuilder.
68+
func (b *SequenceBuilder) Reset(baseNum, count, maxOffset uint64) {
69+
b.baseNum = baseNum
70+
b.smallCount = 0
71+
if count > SIMPLE_SEQUENCE_MAX_THRESHOLD {
72+
if b.rebasedEf != nil {
73+
b.rebasedEf.ResetForWrite(count, maxOffset-baseNum)
74+
} else {
75+
b.rebasedEf = eliasfano32.NewEliasFano(count, maxOffset-baseNum)
76+
}
77+
} else {
78+
b.rebasedEf = nil
79+
}
80+
}
81+
6382
func (b *SequenceBuilder) AddOffset(offset uint64) {
6483
if b.rebasedEf != nil {
6584
b.rebasedEf.AddOffset(offset - b.baseNum)
@@ -94,3 +113,28 @@ func (b *SequenceBuilder) simpleEncoding(buf []byte) []byte {
94113

95114
return buf
96115
}
116+
117+
// Merge merges s1 and s2 into this builder, resetting it first.
118+
// s1 and s2 must be pre-sorted with s1.Max() <= s2.Min().
119+
// Call AppendBytes on the builder to serialize.
120+
func (b *SequenceBuilder) Merge(s1, s2 *SequenceReader, outBaseNum uint64) error {
121+
b.Reset(outBaseNum, s1.Count()+s2.Count(), s2.Max())
122+
b.it1.Reset(s1, 0)
123+
b.it2.Reset(s2, 0)
124+
for b.it1.HasNext() {
125+
v, err := b.it1.Next()
126+
if err != nil {
127+
return err
128+
}
129+
b.AddOffset(v)
130+
}
131+
for b.it2.HasNext() {
132+
v, err := b.it2.Next()
133+
if err != nil {
134+
return err
135+
}
136+
b.AddOffset(v)
137+
}
138+
b.Build()
139+
return nil
140+
}

db/recsplit/multiencseq/sequence_builder_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,88 @@ func TestMultiEncodingSeqBuilder(t *testing.T) {
102102
})
103103
}
104104

105+
func TestBuilderReset(t *testing.T) {
106+
var b SequenceBuilder
107+
108+
addAll := func(vals []uint64) {
109+
for _, v := range vals {
110+
b.AddOffset(v)
111+
}
112+
}
113+
check := func(t *testing.T, baseNum uint64, vals []uint64) {
114+
t.Helper()
115+
raw := b.AppendBytes(nil)
116+
s := ReadMultiEncSeq(baseNum, raw)
117+
require.Equal(t, uint64(len(vals)), s.Count())
118+
for i, want := range vals {
119+
require.Equal(t, want, s.Get(uint64(i)), "index %d", i)
120+
}
121+
}
122+
123+
small := []uint64{1001, 1003, 1005} // 3 elements → simple encoding
124+
large := make([]uint64, 17) // 17 elements → rebased EF
125+
for i := range large {
126+
large[i] = 2000 + uint64(i)*2
127+
}
128+
129+
t.Run("small then large: rebasedEf allocated on demand", func(t *testing.T) {
130+
b.Reset(1000, uint64(len(small)), small[len(small)-1])
131+
addAll(small)
132+
b.Build()
133+
require.Equal(t, byte(SimpleEncoding)|byte(len(small)-1), b.AppendBytes(nil)[0])
134+
check(t, 1000, small)
135+
136+
b.Reset(2000, uint64(len(large)), large[len(large)-1])
137+
addAll(large)
138+
b.Build()
139+
require.Equal(t, byte(RebasedEliasFano), b.AppendBytes(nil)[0])
140+
check(t, 2000, large)
141+
})
142+
143+
t.Run("large then small: rebasedEf nilled, smallCount zeroed", func(t *testing.T) {
144+
b.Reset(2000, uint64(len(large)), large[len(large)-1])
145+
addAll(large)
146+
b.Build()
147+
require.Equal(t, byte(RebasedEliasFano), b.AppendBytes(nil)[0])
148+
149+
b.Reset(1000, uint64(len(small)), small[len(small)-1])
150+
addAll(small)
151+
b.Build()
152+
require.Equal(t, byte(SimpleEncoding)|byte(len(small)-1), b.AppendBytes(nil)[0])
153+
check(t, 1000, small)
154+
})
155+
156+
t.Run("small then small: no stale smallCount", func(t *testing.T) {
157+
small2 := []uint64{3001, 3003}
158+
b.Reset(3000, uint64(len(small2)), small2[len(small2)-1])
159+
addAll(small2)
160+
b.Build()
161+
check(t, 3000, small2)
162+
163+
b.Reset(3000, uint64(len(small2)), small2[len(small2)-1])
164+
addAll(small2)
165+
b.Build()
166+
check(t, 3000, small2)
167+
})
168+
169+
t.Run("large then large: rebasedEf reused", func(t *testing.T) {
170+
large2 := make([]uint64, 17)
171+
for i := range large2 {
172+
large2[i] = 5000 + uint64(i)*3
173+
}
174+
b.Reset(2000, uint64(len(large)), large[len(large)-1])
175+
addAll(large)
176+
b.Build()
177+
ef1 := b.rebasedEf
178+
179+
b.Reset(5000, uint64(len(large2)), large2[len(large2)-1])
180+
addAll(large2)
181+
b.Build()
182+
require.Same(t, ef1, b.rebasedEf, "rebasedEf should be reused")
183+
check(t, 5000, large2)
184+
})
185+
}
186+
105187
func BenchmarkBuilder(b *testing.B) {
106188
const baseNum = 1_000_000
107189
const n = 500

db/recsplit/multiencseq/sequence_reader.go

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -194,31 +194,6 @@ func (s *SequenceReader) ReverseIterator(v int) stream.U64 {
194194
panic(fmt.Sprintf("unknown sequence encoding: %d", s.currentEnc))
195195
}
196196

197-
// Merge merges the other sequence into this one, returning a built SequenceBuilder
198-
// with outBaseNum. Both sequences must be pre-sorted with s.Max() <= other.Min().
199-
// Call AppendBytes on the result to serialize.
200-
func (s *SequenceReader) Merge(other *SequenceReader, outBaseNum uint64, it1, it2 *SequenceIterator) (*SequenceBuilder, error) {
201-
it1.Reset(s, 0)
202-
it2.Reset(other, 0)
203-
newSeq := NewBuilder(outBaseNum, s.Count()+other.Count(), other.Max())
204-
for it1.HasNext() {
205-
v, err := it1.Next()
206-
if err != nil {
207-
return nil, err
208-
}
209-
newSeq.AddOffset(v)
210-
}
211-
for it2.HasNext() {
212-
v, err := it2.Next()
213-
if err != nil {
214-
return nil, err
215-
}
216-
newSeq.AddOffset(v)
217-
}
218-
newSeq.Build()
219-
return newSeq, nil
220-
}
221-
222197
// SequenceIterator is a reusable iterator for SequenceReader.
223198
// Create as a value and call Reset() to (re)initialize — avoids heap allocation
224199
// for SimpleEncoding (the common case).

db/recsplit/multiencseq/sequence_reader_test.go

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ func TestMerge(t *testing.T) {
127127
s1 := ReadMultiEncSeq(1000, buildTestSeq(1000, 1001, 1003, 1005))
128128
s2 := ReadMultiEncSeq(1000, buildTestSeq(1000, 1007, 1009, 1011))
129129

130-
var it1, it2 SequenceIterator
131-
merged, err := s1.Merge(s2, 1000, &it1, &it2)
130+
var merged SequenceBuilder
131+
err := merged.Merge(s1, s2, 1000)
132132
require.NoError(t, err)
133133

134134
out := merged.AppendBytes(nil)
@@ -152,8 +152,8 @@ func TestMerge(t *testing.T) {
152152
s1 := ReadMultiEncSeq(1000, buildTestSeq(1000, vals1...))
153153
s2 := ReadMultiEncSeq(1000, buildTestSeq(1000, vals2...))
154154

155-
var it1, it2 SequenceIterator
156-
merged, err := s1.Merge(s2, 1000, &it1, &it2)
155+
var merged SequenceBuilder
156+
err := merged.Merge(s1, s2, 1000)
157157
require.NoError(t, err)
158158

159159
out := merged.AppendBytes(nil)
@@ -173,9 +173,8 @@ func TestMergeEncodingBoundary(t *testing.T) {
173173
merge := func(baseNum uint64, raw1, raw2 []byte) []byte {
174174
s1 := ReadMultiEncSeq(baseNum, raw1)
175175
s2 := ReadMultiEncSeq(baseNum, raw2)
176-
var it1, it2 SequenceIterator
177-
merged, err := s1.Merge(s2, baseNum, &it1, &it2)
178-
if err != nil {
176+
var merged SequenceBuilder
177+
if err := merged.Merge(s1, s2, baseNum); err != nil {
179178
t.Fatal(err)
180179
}
181180
return merged.AppendBytes(nil)
@@ -206,8 +205,8 @@ func TestMergeSeek(t *testing.T) {
206205
}
207206
s1 := ReadMultiEncSeq(1000, buildTestSeq(1000, vals1...))
208207
s2 := ReadMultiEncSeq(1000, buildTestSeq(1000, vals2...))
209-
var it1, it2 SequenceIterator
210-
merged, err := s1.Merge(s2, 1000, &it1, &it2)
208+
var merged SequenceBuilder
209+
err := merged.Merge(s1, s2, 1000)
211210
require.NoError(t, err)
212211
result := ReadMultiEncSeq(1000, merged.AppendBytes(nil))
213212

@@ -276,13 +275,12 @@ func BenchmarkMerge(b *testing.B) {
276275
}()
277276

278277
var s1, s2 SequenceReader
279-
var it1, it2 SequenceIterator
278+
var merged SequenceBuilder
280279
b.ResetTimer()
281280
for i := 0; i < b.N; i++ {
282281
s1.Reset(baseNum, raw1)
283282
s2.Reset(baseNum, raw2)
284-
merged, err := s1.Merge(&s2, baseNum, &it1, &it2)
285-
if err != nil {
283+
if err := merged.Merge(&s1, &s2, baseNum); err != nil {
286284
b.Fatal(err)
287285
}
288286
_ = merged.AppendBytes(nil)

db/state/merge.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,8 @@ func (iit *InvertedIndexRoTx) mergeFiles(ctx context.Context, files []*FilesItem
650650
var keyBuf, valBuf []byte
651651
var lastKey, lastVal []byte
652652
preSeq, mergeSeq := &multiencseq.SequenceReader{}, &multiencseq.SequenceReader{}
653-
preIt, mergeIt := &multiencseq.SequenceIterator{}, &multiencseq.SequenceIterator{}
653+
preIt := &multiencseq.SequenceIterator{}
654+
builder := &multiencseq.SequenceBuilder{}
654655
i := uint64(0)
655656
for cp.Len() > 0 {
656657
lastKey = append(lastKey[:0], cp[0].key...)
@@ -659,16 +660,16 @@ func (iit *InvertedIndexRoTx) mergeFiles(ctx context.Context, files []*FilesItem
659660
// Pre-rebase the first sequence
660661
preSeq.Reset(cp[0].startTxNum, lastVal)
661662
preIt.Reset(preSeq, 0)
662-
newSeq := multiencseq.NewBuilder(startTxNum, preSeq.Count(), preSeq.Max())
663+
builder.Reset(startTxNum, preSeq.Count(), preSeq.Max())
663664
for preIt.HasNext() {
664665
v, err := preIt.Next()
665666
if err != nil {
666667
return nil, err
667668
}
668-
newSeq.AddOffset(v)
669+
builder.AddOffset(v)
669670
}
670-
newSeq.Build()
671-
lastVal = newSeq.AppendBytes(nil)
671+
builder.Build()
672+
lastVal = builder.AppendBytes(lastVal[:0])
672673
var mergedOnce bool
673674

674675
// Advance all the items that have this key (including the top)
@@ -677,11 +678,10 @@ func (iit *InvertedIndexRoTx) mergeFiles(ctx context.Context, files []*FilesItem
677678
if mergedOnce {
678679
mergeSeq.Reset(ci1.startTxNum, ci1.val)
679680
preSeq.Reset(startTxNum, lastVal)
680-
merged, mergeErr := mergeSeq.Merge(preSeq, startTxNum, mergeIt, preIt)
681-
if mergeErr != nil {
681+
if mergeErr := builder.Merge(mergeSeq, preSeq, startTxNum); mergeErr != nil {
682682
return nil, fmt.Errorf("merge %s inverted index: %w", iit.ii.FilenameBase, mergeErr)
683683
}
684-
lastVal = merged.AppendBytes(nil)
684+
lastVal = builder.AppendBytes(lastVal[:0])
685685
} else {
686686
mergedOnce = true
687687
}

db/state/merge_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -748,10 +748,10 @@ func Test_mergeEliasFano(t *testing.T) {
748748
}
749749

750750
var seq1, seq2 multiencseq.SequenceReader
751-
var it1, it2 multiencseq.SequenceIterator
752751
seq1.Reset(0, firstBytes)
753752
seq2.Reset(0, secondBytes)
754-
mergedSeq, err := seq1.Merge(&seq2, 0, &it1, &it2)
753+
var mergedSeq multiencseq.SequenceBuilder
754+
err := mergedSeq.Merge(&seq1, &seq2, 0)
755755
require.NoError(t, err)
756756
menc := mergedSeq.AppendBytes(nil)
757757

0 commit comments

Comments
 (0)