Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion db/recsplit/eliasfano32/elias_fano.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (ef *EliasFano) deriveFields() int {
jumpWords := ef.jumpSizeWords()
totalWords := wordsLowerBits + wordsUpperBits + jumpWords
//fmt.Printf("EF: %d, %d,%d,%d\n", totalWords, wordsLowerBits, wordsUpperBits, jumpWords)
if ef.data == nil {
if cap(ef.data) < totalWords {
ef.data = make([]uint64, totalWords)
} else {
ef.data = ef.data[:totalWords]
Expand All @@ -120,6 +120,20 @@ func (ef *EliasFano) deriveFields() int {
return wordsUpperBits
}

// ResetForWrite reinitializes the EliasFano for writing a new sequence, reusing
// the existing data slice if it has sufficient capacity (avoiding allocation).
// The caller must call Build() after all AddOffset calls, same as with NewEliasFano.
func (ef *EliasFano) ResetForWrite(count, maxOffset uint64) {
ef.count = count - 1
ef.maxOffset = maxOffset
ef.u = maxOffset + 1
ef.i = 0
ef.wordsUpperBits = ef.deriveFields()
// Zero out the backing array so OR-style setBits starts from a clean slate.
// deriveFields() may have resliced ef.data without zeroing it.
clear(ef.data)
}

// Build construct Elias Fano index for a given sequences
func (ef *EliasFano) Build() {
for i, c, lastSuperQ := uint64(0), uint64(0), uint64(0); i < uint64(ef.wordsUpperBits); i++ {
Expand Down
39 changes: 39 additions & 0 deletions db/recsplit/multiencseq/sequence_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const SIMPLE_SEQUENCE_MAX_THRESHOLD = 16
type SequenceBuilder struct {
baseNum uint64
ef *eliasfano32.EliasFano
it1 SequenceIterator
it2 SequenceIterator
}

// Creates a new builder. The builder is not meant to be reused. The construction
Expand All @@ -53,6 +55,18 @@ func NewBuilder(baseNum, count, maxOffset uint64) *SequenceBuilder {
}
}

// Reset reinitializes the builder for a new sequence, reusing the existing object
// and its internal EliasFano allocation where possible.
// Same parameter semantics as NewBuilder.
func (b *SequenceBuilder) Reset(baseNum, count, maxOffset uint64) {
b.baseNum = baseNum
if b.ef != nil {
b.ef.ResetForWrite(count, maxOffset)
} else {
b.ef = eliasfano32.NewEliasFano(count, maxOffset)
}
}

func (b *SequenceBuilder) AddOffset(offset uint64) {
// TODO: write offset already subtracting baseNum now that PlainEF is gone
b.ef.AddOffset(offset)
Expand Down Expand Up @@ -94,6 +108,31 @@ func (b *SequenceBuilder) simpleEncoding(buf []byte) []byte {
return buf
}

// Merge merges s1 and s2 into this builder, resetting it first.
// s1 and s2 must be pre-sorted with s1.Max() <= s2.Min().
// Call AppendBytes on the builder to serialize.
func (b *SequenceBuilder) Merge(s1, s2 *SequenceReader, outBaseNum uint64) error {
b.Reset(outBaseNum, s1.Count()+s2.Count(), s2.Max())
b.it1.Reset(s1, 0)
b.it2.Reset(s2, 0)
for b.it1.HasNext() {
v, err := b.it1.Next()
if err != nil {
return err
}
b.AddOffset(v)
}
for b.it2.HasNext() {
v, err := b.it2.Next()
if err != nil {
return err
}
b.AddOffset(v)
}
b.Build()
return nil
}

func (b *SequenceBuilder) rebasedEliasFano(buf []byte) []byte {
// Reserved encoding type 0x90 == rebased elias fano
buf = append(buf, byte(RebasedEliasFano))
Expand Down
25 changes: 0 additions & 25 deletions db/recsplit/multiencseq/sequence_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,31 +194,6 @@ func (s *SequenceReader) ReverseIterator(v int) stream.U64 {
panic(fmt.Sprintf("unknown sequence encoding: %d", s.currentEnc))
}

// Merge merges the other sequence into this one, returning a built SequenceBuilder
// with outBaseNum. Both sequences must be pre-sorted.
// Call AppendBytes on the result to serialize.
func (s *SequenceReader) Merge(other *SequenceReader, outBaseNum uint64, it1, it2 *SequenceIterator) (*SequenceBuilder, error) {
it1.Reset(s, 0)
it2.Reset(other, 0)
newSeq := NewBuilder(outBaseNum, s.Count()+other.Count(), other.Max())
for it1.HasNext() {
v, err := it1.Next()
if err != nil {
return nil, err
}
newSeq.AddOffset(v)
}
for it2.HasNext() {
v, err := it2.Next()
if err != nil {
return nil, err
}
newSeq.AddOffset(v)
}
newSeq.Build()
return newSeq, nil
}

// SequenceIterator is a reusable iterator for SequenceReader.
// Create as a value and call Reset() to (re)initialize — avoids heap allocation
// for SimpleEncoding (the common case).
Expand Down
16 changes: 8 additions & 8 deletions db/state/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,24 +645,25 @@ func (iit *InvertedIndexRoTx) mergeFiles(ctx context.Context, files []*FilesItem
var keyBuf, valBuf []byte
var lastKey, lastVal []byte
preSeq, mergeSeq := &multiencseq.SequenceReader{}, &multiencseq.SequenceReader{}
preIt, mergeIt := &multiencseq.SequenceIterator{}, &multiencseq.SequenceIterator{}
preIt := &multiencseq.SequenceIterator{}
builder := &multiencseq.SequenceBuilder{}
for cp.Len() > 0 {
lastKey = append(lastKey[:0], cp[0].key...)
lastVal = append(lastVal[:0], cp[0].val...)

// Pre-rebase the first sequence
preSeq.Reset(cp[0].startTxNum, lastVal)
preIt.Reset(preSeq, 0)
newSeq := multiencseq.NewBuilder(startTxNum, preSeq.Count(), preSeq.Max())
builder.Reset(startTxNum, preSeq.Count(), preSeq.Max())
for preIt.HasNext() {
v, err := preIt.Next()
if err != nil {
return nil, err
}
newSeq.AddOffset(v)
builder.AddOffset(v)
}
newSeq.Build()
lastVal = newSeq.AppendBytes(nil)
builder.Build()
lastVal = builder.AppendBytes(lastVal[:0])
var mergedOnce bool

// Advance all the items that have this key (including the top)
Expand All @@ -671,11 +672,10 @@ func (iit *InvertedIndexRoTx) mergeFiles(ctx context.Context, files []*FilesItem
if mergedOnce {
mergeSeq.Reset(ci1.startTxNum, ci1.val)
preSeq.Reset(startTxNum, lastVal)
merged, mergeErr := mergeSeq.Merge(preSeq, startTxNum, mergeIt, preIt)
if mergeErr != nil {
if mergeErr := builder.Merge(mergeSeq, preSeq, startTxNum); mergeErr != nil {
return nil, fmt.Errorf("merge %s inverted index: %w", iit.ii.FilenameBase, mergeErr)
}
lastVal = merged.AppendBytes(nil)
lastVal = builder.AppendBytes(lastVal[:0])
} else {
mergedOnce = true
}
Expand Down
4 changes: 2 additions & 2 deletions db/state/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,10 +748,10 @@ func Test_mergeEliasFano(t *testing.T) {
}

var seq1, seq2 multiencseq.SequenceReader
var it1, it2 multiencseq.SequenceIterator
seq1.Reset(0, firstBytes)
seq2.Reset(0, secondBytes)
mergedSeq, err := seq1.Merge(&seq2, 0, &it1, &it2)
var mergedSeq multiencseq.SequenceBuilder
err := mergedSeq.Merge(&seq1, &seq2, 0)
require.NoError(t, err)
menc := mergedSeq.AppendBytes(nil)

Expand Down
Loading