Skip to content

Commit 54203f3

Browse files
Revert "[SharovBot] fix(history): correct .vi index page offset for V1 paged snapshot files" (#19440)
Reverts #19411 seems merged without team agreement
1 parent bc6c31b commit 54203f3

File tree

5 files changed

+31
-423
lines changed

5 files changed

+31
-423
lines changed

db/seg/seg_paged_rw.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727

2828
var be = binary.BigEndian
2929

30-
func GetFromPage(key, compressedPage []byte, compressionBuf []byte, compressionEnabled bool) (v []byte, ok bool, compressionBufOut []byte) {
30+
func GetFromPage(key, compressedPage []byte, compressionBuf []byte, compressionEnabled bool) (v []byte, compressionBufOut []byte) {
3131
var err error
3232
var page []byte
3333
compressionBuf, page, err = compress.DecodeZstdIfNeed(compressionBuf[:0], compressedPage, compressionEnabled)
@@ -37,7 +37,7 @@ func GetFromPage(key, compressedPage []byte, compressionBuf []byte, compressionE
3737

3838
cnt := int(page[0])
3939
if cnt == 0 {
40-
return nil, false, compressionBuf
40+
return nil, compressionBuf
4141
}
4242
meta, data := page[1:1+cnt*4*2], page[1+cnt*4*2:]
4343
kLens, vLens := meta[:cnt*4], meta[cnt*4:]
@@ -53,14 +53,14 @@ func GetFromPage(key, compressedPage []byte, compressionBuf []byte, compressionE
5353
kLen, vLen := be.Uint32(kLens[i:]), be.Uint32(vLens[i:])
5454
foundKey := keys[kOffset : kOffset+kLen]
5555
if bytes.Equal(key, foundKey) {
56-
return vals[vOffset : vOffset+vLen], true, compressionBuf
56+
return vals[vOffset : vOffset+vLen], compressionBuf
5757
} else {
5858
_ = data
5959
}
6060
kOffset += kLen
6161
vOffset += vLen
6262
}
63-
return nil, false, compressionBuf
63+
return nil, compressionBuf
6464
}
6565

6666
type Page struct {

db/seg/seg_paged_rw_test.go

Lines changed: 1 addition & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,7 @@ func TestPage(t *testing.T) {
132132
for i := 0; i < sampling+1; i++ {
133133
iter++
134134
expectK, expectV := fmt.Sprintf("k %d", i), fmt.Sprintf("v %d", i)
135-
v, ok, _ := GetFromPage([]byte(expectK), pages[pageNum], nil, false)
136-
require.True(ok)
135+
v, _ := GetFromPage([]byte(expectK), pages[pageNum], nil, false)
137136
require.Equal(expectV, string(v), i)
138137
require.True(p1.HasNext())
139138
k, v := p1.Next()
@@ -172,83 +171,6 @@ func TestPagedReaderWithCompression(t *testing.T) {
172171
require.Equal(len(loremStrings), i, "should have read all entries")
173172
}
174173

175-
func TestGetFromPageKeyNotFound(t *testing.T) {
176-
require := require.New(t)
177-
178-
// Create a page with 4 key-value pairs
179-
pageSize := 4
180-
buf := &multyBytesWriter{pageSize: pageSize}
181-
w := NewPagedWriter(buf, false)
182-
for i := 0; i < pageSize; i++ {
183-
k, v := fmt.Sprintf("k %d", i), fmt.Sprintf("v %d", i)
184-
require.NoError(w.Add([]byte(k), []byte(v)))
185-
}
186-
require.NoError(w.Flush())
187-
pages := buf.Bytes()
188-
require.Len(pages, 1)
189-
190-
// Existing keys should be found
191-
for i := 0; i < pageSize; i++ {
192-
k := fmt.Sprintf("k %d", i)
193-
v, ok, _ := GetFromPage([]byte(k), pages[0], nil, false)
194-
require.True(ok, "key %q should be found", k)
195-
require.Equal(fmt.Sprintf("v %d", i), string(v))
196-
}
197-
198-
// Non-existing key should NOT be found
199-
v, ok, _ := GetFromPage([]byte("nonexistent"), pages[0], nil, false)
200-
require.False(ok, "nonexistent key should not be found")
201-
require.Nil(v)
202-
203-
// Key with similar prefix should NOT be found
204-
v, ok, _ = GetFromPage([]byte("k 99"), pages[0], nil, false)
205-
require.False(ok, "key 'k 99' should not be found")
206-
require.Nil(v)
207-
}
208-
209-
// TestGetFromPageNonFullPages verifies that when pages have varying numbers
210-
// of entries (e.g., a last page with fewer entries than the page size),
211-
// GetFromPage still finds keys correctly. This tests the scenario that
212-
// caused the .vi index page offset regression.
213-
func TestGetFromPageNonFullPages(t *testing.T) {
214-
require := require.New(t)
215-
216-
// Create pages with pageSize=4 but 5 entries total
217-
// This produces: page0 with 4 entries, page1 with 1 entry
218-
pageSize := 4
219-
totalEntries := pageSize + 1
220-
buf := &multyBytesWriter{pageSize: pageSize}
221-
w := NewPagedWriter(buf, false)
222-
for i := 0; i < totalEntries; i++ {
223-
k, v := fmt.Sprintf("k %d", i), fmt.Sprintf("v %d", i)
224-
require.NoError(w.Add([]byte(k), []byte(v)))
225-
}
226-
require.NoError(w.Flush())
227-
pages := buf.Bytes()
228-
require.Len(pages, 2, "should have 2 pages: one full and one partial")
229-
230-
// All 4 keys on page 0 should be found on page 0
231-
for i := 0; i < pageSize; i++ {
232-
k := fmt.Sprintf("k %d", i)
233-
v, ok, _ := GetFromPage([]byte(k), pages[0], nil, false)
234-
require.True(ok, "key %q should be found on page 0", k)
235-
require.Equal(fmt.Sprintf("v %d", i), string(v))
236-
}
237-
238-
// The 5th key should NOT be on page 0
239-
_, ok, _ := GetFromPage([]byte("k 4"), pages[0], nil, false)
240-
require.False(ok, "key 'k 4' should NOT be on page 0")
241-
242-
// The 5th key should be on page 1
243-
v, ok, _ := GetFromPage([]byte("k 4"), pages[1], nil, false)
244-
require.True(ok, "key 'k 4' should be found on page 1")
245-
require.Equal("v 4", string(v))
246-
247-
// Keys from page 0 should NOT be on page 1
248-
_, ok, _ = GetFromPage([]byte("k 0"), pages[1], nil, false)
249-
require.False(ok, "key 'k 0' should NOT be on page 1")
250-
}
251-
252174
func BenchmarkName(b *testing.B) {
253175
buf := &multyBytesWriter{pageSize: 16}
254176
w := NewPagedWriter(buf, false)

db/state/history.go

Lines changed: 24 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -234,104 +234,6 @@ func (h *History) buildVi(ctx context.Context, item *FilesItem, ps *background.P
234234
}
235235

236236
func (h *History) buildVI(ctx context.Context, historyIdxPath string, hist, efHist *seg.Decompressor, efBaseTxNum uint64, ps *background.ProgressSet) error {
237-
// file not the config is the source of truth for the .v file compression state
238-
compressedPageValuesCount := hist.CompressedPageValuesCount()
239-
if hist.CompressionFormatVersion() == seg.FileCompressionFormatV0 {
240-
compressedPageValuesCount = h.HistoryValuesOnCompressedPage
241-
}
242-
243-
if compressedPageValuesCount > 1 {
244-
return h.buildVIFromPages(ctx, historyIdxPath, hist, ps)
245-
}
246-
return h.buildVIFromEF(ctx, historyIdxPath, hist, efHist, efBaseTxNum, ps)
247-
}
248-
249-
// buildVIFromPages builds the .vi index by reading keys directly from the .v file's pages.
250-
// This is used for V1 files with page-level compression where each page contains multiple
251-
// key-value pairs. Reading keys from the pages ensures the index correctly maps each key
252-
// to its containing page, regardless of whether the .ef file has the same entry ordering.
253-
func (h *History) buildVIFromPages(ctx context.Context, historyIdxPath string, hist *seg.Decompressor, ps *background.ProgressSet) error {
254-
defer hist.MadvSequential().DisableReadAhead()
255-
256-
histReader := h.dataReader(hist)
257-
258-
// Count total entries by scanning all pages
259-
cnt := 0
260-
page := &seg.Page{}
261-
histReader.Reset(0)
262-
for histReader.HasNext() { //TODO: use vFile.Count()
263-
pageData, _ := histReader.Next(nil)
264-
page.Reset(pageData, true)
265-
for page.HasNext() {
266-
page.Next()
267-
cnt++
268-
}
269-
select {
270-
case <-ctx.Done():
271-
return ctx.Err()
272-
default:
273-
}
274-
}
275-
276-
_, fName := filepath.Split(historyIdxPath)
277-
p := ps.AddNew(fName, uint64(cnt))
278-
defer ps.Delete(p)
279-
rs, err := recsplit.NewRecSplit(recsplit.RecSplitArgs{
280-
KeyCount: cnt,
281-
Enums: false,
282-
BucketSize: recsplit.DefaultBucketSize,
283-
LeafSize: recsplit.DefaultLeafSize,
284-
TmpDir: h.dirs.Tmp,
285-
IndexFile: historyIdxPath,
286-
Salt: h.salt.Load(),
287-
NoFsync: h.noFsync,
288-
}, h.logger)
289-
if err != nil {
290-
return fmt.Errorf("create recsplit: %w", err)
291-
}
292-
defer rs.Close()
293-
rs.LogLvl(log.LvlTrace)
294-
295-
for {
296-
histReader.Reset(0)
297-
var valOffset uint64
298-
299-
for histReader.HasNext() {
300-
pageData, nextOffset := histReader.Next(nil)
301-
page.Reset(pageData, true)
302-
for page.HasNext() {
303-
k, _ := page.Next()
304-
if err = rs.AddKey(k, valOffset); err != nil {
305-
return err
306-
}
307-
}
308-
p.Processed.Add(1)
309-
valOffset = nextOffset
310-
311-
select {
312-
case <-ctx.Done():
313-
return ctx.Err()
314-
default:
315-
}
316-
}
317-
318-
if err = rs.Build(ctx); err != nil {
319-
if rs.Collision() {
320-
log.Info("Building recsplit. Collision happened. It's ok. Restarting...")
321-
rs.ResetNextSalt()
322-
} else {
323-
return fmt.Errorf("build idx: %w", err)
324-
}
325-
} else {
326-
break
327-
}
328-
}
329-
return nil
330-
}
331-
332-
// buildVIFromEF builds the .vi index using the .ef inverted index file to enumerate keys.
333-
// This is used for V0 files or files without page-level compression.
334-
func (h *History) buildVIFromEF(ctx context.Context, historyIdxPath string, hist, efHist *seg.Decompressor, efBaseTxNum uint64, ps *background.ProgressSet) error {
335237
var histKey []byte
336238
var valOffset uint64
337239

@@ -342,8 +244,8 @@ func (h *History) buildVIFromEF(ctx context.Context, historyIdxPath string, hist
342244

343245
var keyBuf, valBuf []byte
344246
cnt := uint64(0)
345-
for iiReader.HasNext() { //TODO: use vFile.Count()
346-
keyBuf, _ = iiReader.Next(keyBuf[:0])
247+
for iiReader.HasNext() {
248+
keyBuf, _ = iiReader.Next(keyBuf[:0]) // skip key
347249
valBuf, _ = iiReader.Next(valBuf[:0])
348250
cnt += multiencseq.Count(efBaseTxNum, valBuf)
349251
select {
@@ -377,6 +279,7 @@ func (h *History) buildVIFromEF(ctx context.Context, historyIdxPath string, hist
377279
seq := &multiencseq.SequenceReader{}
378280
it := &multiencseq.SequenceIterator{}
379281

282+
i := 0
380283
for {
381284
histReader.Reset(0)
382285
iiReader.Reset(0)
@@ -387,6 +290,8 @@ func (h *History) buildVIFromEF(ctx context.Context, historyIdxPath string, hist
387290
valBuf, _ = iiReader.Next(valBuf[:0])
388291
p.Processed.Add(1)
389292

293+
// fmt.Printf("ef key %x\n", keyBuf)
294+
390295
seq.Reset(efBaseTxNum, valBuf)
391296
it.Reset(seq, 0)
392297
for it.HasNext() {
@@ -398,7 +303,22 @@ func (h *History) buildVIFromEF(ctx context.Context, historyIdxPath string, hist
398303
if err = rs.AddKey(histKey, valOffset); err != nil {
399304
return err
400305
}
401-
valOffset, _ = histReader.Skip()
306+
307+
// file not the config is the source of truth for the .v file compression state
308+
compressedPageValuesCount := hist.CompressedPageValuesCount()
309+
310+
if hist.CompressionFormatVersion() == seg.FileCompressionFormatV0 {
311+
compressedPageValuesCount = h.HistoryValuesOnCompressedPage
312+
}
313+
314+
if compressedPageValuesCount == 0 {
315+
valOffset, _ = histReader.Skip()
316+
} else {
317+
i++
318+
if i%compressedPageValuesCount == 0 {
319+
valOffset, _ = histReader.Skip()
320+
}
321+
}
402322
}
403323

404324
select {
@@ -1335,6 +1255,7 @@ func (ht *HistoryRoTx) historySeekInFiles(key []byte, txNum uint64) ([]byte, boo
13351255
}
13361256
g := ht.statelessGetter(historyItem.i)
13371257
g.Reset(offset)
1258+
//fmt.Printf("[dbg] hist.seek: offset=%d\n", offset)
13381259
v, _ := g.Next(nil)
13391260
if traceGetAsOf == ht.h.FilenameBase {
13401261
fmt.Printf("DomainGetAsOf(%s, %x, %d) -> %s, histTxNum=%d, isNil(v)=%t\n", ht.h.FilenameBase, key, txNum, g.FileName(), histTxNum, v == nil)
@@ -1347,13 +1268,7 @@ func (ht *HistoryRoTx) historySeekInFiles(key []byte, txNum uint64) ([]byte, boo
13471268
}
13481269

13491270
if compressedPageValuesCount > 1 {
1350-
v, ok, ht.snappyReadBuffer = seg.GetFromPage(historyKey, v, ht.snappyReadBuffer, true)
1351-
if !ok {
1352-
// Key not found in page. The inverted index (.ef) contains a txNum
1353-
// entry that has no corresponding entry in the history values (.v)
1354-
// file. Fall back to DB/latest lookup to get the correct value.
1355-
return nil, false, nil
1356-
}
1271+
v, ht.snappyReadBuffer = seg.GetFromPage(historyKey, v, ht.snappyReadBuffer, true)
13571272
}
13581273
return v, true, nil
13591274
}
@@ -1655,10 +1570,7 @@ func (ht *HistoryRoTx) HistoryDump(fromTxNum, toTxNum int, keyToDump *[]byte, du
16551570

16561571
if compressedPageValuesCount > 0 {
16571572
histKeyBuf = historyKey(txNum, key, histKeyBuf)
1658-
val, ok, _ = seg.GetFromPage(histKeyBuf, val, nil, true)
1659-
if !ok {
1660-
return fmt.Errorf("HistoryDump: not found key [%x] on compressed page: %s", key, viFile.Fullpath())
1661-
}
1573+
val, _ = seg.GetFromPage(histKeyBuf, val, nil, true)
16621574
}
16631575

16641576
dumpTo(key, txNum, val)

0 commit comments

Comments
 (0)