diff --git a/db/seg/decompress.go b/db/seg/decompress.go index 4fa7eb090e3..294e3a39cf3 100644 --- a/db/seg/decompress.go +++ b/db/seg/decompress.go @@ -514,6 +514,16 @@ func (d *Decompressor) DictLens() int { return d.dictLens } func (d *Decompressor) CompressedPageValuesCount() int { return int(d.compPageValuesCount) } func (d *Decompressor) CompressionFormatVersion() uint8 { return d.version } +// BackfillV0PageValuesCount sets the page-compressed values count for V0-format +// files whose header does not carry this field. It is a no-op for V1+ files. +// Call once immediately after NewDecompressor when the schema-configured +// fallback value (HistoryValuesOnCompressedPage) is known. +func (d *Decompressor) BackfillV0PageValuesCount(n int) { + if d.version == FileCompressionFormatV0 { + d.compPageValuesCount = uint8(n) + } +} + func (d *Decompressor) Size() int64 { return d.size } @@ -752,10 +762,11 @@ func (g *Getter) MadvNormal() MadvDisabler { g.d.MadvNormal() return g } -func (g *Getter) DisableReadAhead() { g.d.DisableReadAhead() } -func (g *Getter) Trace(t bool) { g.trace = t } -func (g *Getter) Count() int { return g.d.Count() } -func (g *Getter) FileName() string { return g.fName } +func (g *Getter) DisableReadAhead() { g.d.DisableReadAhead() } +func (g *Getter) Trace(t bool) { g.trace = t } +func (g *Getter) Count() int { return g.d.Count() } +func (g *Getter) FileName() string { return g.fName } +func (g *Getter) CompressedPageValuesCount() int { return g.d.CompressedPageValuesCount() } func (g *Getter) GetMetadata() []byte { return g.d.GetMetadata() } // nextPosClean aligns to the next byte boundary then reads the next position. diff --git a/db/seg/seg_interface.go b/db/seg/seg_interface.go index 8a9cacb51d3..6dd55b1a69e 100644 --- a/db/seg/seg_interface.go +++ b/db/seg/seg_interface.go @@ -72,6 +72,7 @@ type ReaderI interface { GetMetadata() []byte MadvNormal() MadvDisabler DisableReadAhead() + CompressedPageValuesCount() int } type MadvDisabler interface { diff --git a/db/seg/seg_paged_rw.go b/db/seg/seg_paged_rw.go index 17b31c2b0d9..9bd3ec388ca 100644 --- a/db/seg/seg_paged_rw.go +++ b/db/seg/seg_paged_rw.go @@ -126,13 +126,16 @@ type PagedReader struct { currentPageOffset, nextPageOffset uint64 } -func NewPagedReader(r ReaderI, pageSize int, snappy bool) *PagedReader { +func NewPagedReader(r ReaderI, snappy bool) *PagedReader { + pageSize := r.CompressedPageValuesCount() if pageSize == 0 { pageSize = 1 } return &PagedReader{file: r, pageSize: pageSize, isCompressed: snappy, page: &Page{}} } +func (g *PagedReader) CompressedPageValuesCount() int { return g.pageSize } + func (g *PagedReader) Reset(offset uint64) { if g.pageSize <= 1 { g.file.Reset(offset) diff --git a/db/seg/seg_paged_rw_test.go b/db/seg/seg_paged_rw_test.go index 40e1b2d578e..d193e634b17 100644 --- a/db/seg/seg_paged_rw_test.go +++ b/db/seg/seg_paged_rw_test.go @@ -62,7 +62,7 @@ func TestPagedReader(t *testing.T) { require := require.New(t) d := prepareLoremDictOnPagedWriter(t, 2, false) defer d.Close() - g1 := NewPagedReader(d.MakeGetter(), 2, false) + g1 := NewPagedReader(d.MakeGetter(), false) var buf []byte _, _, buf, o1 := g1.Next2(buf[:0]) require.Zero(o1) @@ -71,7 +71,7 @@ func TestPagedReader(t *testing.T) { _, _, buf, o1 = g1.Next2(buf[:0]) require.NotZero(o1) - g := NewPagedReader(d.MakeGetter(), 2, false) + g := NewPagedReader(d.MakeGetter(), false) i := 0 for g.HasNext() { w := loremStrings[i] @@ -155,7 +155,7 @@ func TestPagedReaderWithCompression(t *testing.T) { d := prepareLoremDictOnPagedWriter(t, 2, true) // Enable page-level compression defer d.Close() - g := NewPagedReader(d.MakeGetter(), 2, true) // Read with compression enabled + g := NewPagedReader(d.MakeGetter(), true) // Read with compression enabled var buf []byte i := 0 for g.HasNext() { diff --git a/db/state/deduplicate.go b/db/state/deduplicate.go index 4baccea89a1..58fcfb3f858 100644 --- a/db/state/deduplicate.go +++ b/db/state/deduplicate.go @@ -72,13 +72,7 @@ func (ht *HistoryRoTx) deduplicateFiles(ctx context.Context, indexFiles, history var g2 *seg.PagedReader for _, hi := range historyFiles { // full-scan, because it's ok to have different amount files. by unclean-shutdown. if hi.startTxNum == item.startTxNum && hi.endTxNum == item.endTxNum { - compressedPageValuesCount := hi.decompressor.CompressedPageValuesCount() - - if hi.decompressor.CompressionFormatVersion() == seg.FileCompressionFormatV0 { - compressedPageValuesCount = ht.h.HistoryValuesOnCompressedPage - } - - g2 = seg.NewPagedReader(ht.dataReader(hi.decompressor), compressedPageValuesCount, true) + g2 = seg.NewPagedReader(ht.dataReader(hi.decompressor), true) break } } diff --git a/db/state/dirty_files.go b/db/state/dirty_files.go index af3c0a5d0e8..e92d5565939 100644 --- a/db/state/dirty_files.go +++ b/db/state/dirty_files.go @@ -493,6 +493,7 @@ func (h *History) openDirtyFiles(dataEntries, accessorEntries []string) error { // don't interrupt on error. other files may be good. but skip indices open. continue } + item.decompressor.BackfillV0PageValuesCount(h.HistoryValuesOnCompressedPage) } if item.index == nil { diff --git a/db/state/gc_test.go b/db/state/gc_test.go index 8c0237c4456..1e9523d8bfa 100644 --- a/db/state/gc_test.go +++ b/db/state/gc_test.go @@ -63,13 +63,7 @@ func TestGCReadAfterRemoveFile(t *testing.T) { lastInView := hc.files[len(hc.files)-1] - compressedPageValuesCount := lastInView.src.decompressor.CompressedPageValuesCount() - - if lastInView.src.decompressor.CompressionFormatVersion() == seg.FileCompressionFormatV0 { - compressedPageValuesCount = hc.h.HistoryValuesOnCompressedPage - } - - g := seg.NewPagedReader(hc.statelessGetter(len(hc.files)-1), compressedPageValuesCount, true) + g := seg.NewPagedReader(hc.statelessGetter(len(hc.files)-1), true) require.Equal(lastInView.startTxNum, lastOnFs.startTxNum) require.Equal(lastInView.endTxNum, lastOnFs.endTxNum) if g.HasNext() { diff --git a/db/state/history.go b/db/state/history.go index 07c4b5aae33..55ebf2612cc 100644 --- a/db/state/history.go +++ b/db/state/history.go @@ -307,10 +307,6 @@ func (h *History) buildVI(ctx context.Context, historyIdxPath string, hist, efHi // file not the config is the source of truth for the .v file compression state compressedPageValuesCount := hist.CompressedPageValuesCount() - if hist.CompressionFormatVersion() == seg.FileCompressionFormatV0 { - compressedPageValuesCount = h.HistoryValuesOnCompressedPage - } - if compressedPageValuesCount == 0 { valOffset, _ = histReader.Skip() } else { @@ -817,6 +813,7 @@ func (h *History) buildFiles(ctx context.Context, step kv.Step, collation Histor if err != nil { return HistoryFiles{}, fmt.Errorf("open %s v history decompressor: %w", h.FilenameBase, err) } + historyDecomp.BackfillV0PageValuesCount(h.HistoryValuesOnCompressedPage) historyIdxPath := h.vAccessorNewFilePath(step, step+1) err = h.buildVI(ctx, historyIdxPath, historyDecomp, efHistoryDecomp, collation.efBaseTxNum, ps) @@ -1253,10 +1250,6 @@ func (ht *HistoryRoTx) historySeekInFiles(key []byte, txNum uint64) ([]byte, boo compressedPageValuesCount := historyItem.src.decompressor.CompressedPageValuesCount() - if historyItem.src.decompressor.CompressionFormatVersion() == seg.FileCompressionFormatV0 { - compressedPageValuesCount = ht.h.HistoryValuesOnCompressedPage - } - if compressedPageValuesCount > 1 { v, ht.snappyReadBuffer = seg.GetFromPage(historyKey, v, ht.snappyReadBuffer, true) } @@ -1549,10 +1542,6 @@ func (ht *HistoryRoTx) HistoryDump(fromTxNum, toTxNum int, keyToDump *[]byte, du compressedPageValuesCount := viFile.src.decompressor.CompressedPageValuesCount() - if viFile.src.decompressor.CompressionFormatVersion() == seg.FileCompressionFormatV0 { - compressedPageValuesCount = ht.h.HistoryValuesOnCompressedPage - } - vReader := ht.statelessGetter(viFile.i) vReader.Reset(vOffset) diff --git a/db/state/history_stream.go b/db/state/history_stream.go index e9d52f03666..1098a5a22e3 100644 --- a/db/state/history_stream.go +++ b/db/state/history_stream.go @@ -145,16 +145,12 @@ func (hi *HistoryRangeAsOfFiles) advanceInFiles() error { compressedPageValuesCount := historyItem.src.decompressor.CompressedPageValuesCount() - if historyItem.src.decompressor.CompressionFormatVersion() == seg.FileCompressionFormatV0 { - compressedPageValuesCount = hi.hc.h.HistoryValuesOnCompressedPage - } - if compressedPageValuesCount <= 1 { g := hi.hc.statelessGetter(historyItem.i) g.Reset(offset) hi.nextVal, _ = g.Next(nil) } else { - g := seg.NewPagedReader(hi.hc.statelessGetter(historyItem.i), compressedPageValuesCount, true) + g := seg.NewPagedReader(hi.hc.statelessGetter(historyItem.i), true) g.Reset(offset) for i := 0; i < compressedPageValuesCount && g.HasNext(); i++ { k, v, _, _ := g.Next2(nil) @@ -449,16 +445,12 @@ func (hi *HistoryChangesIterFiles) advance() error { compressedPageValuesCount := historyItem.src.decompressor.CompressedPageValuesCount() - if historyItem.src.decompressor.CompressionFormatVersion() == seg.FileCompressionFormatV0 { - compressedPageValuesCount = hi.hc.h.HistoryValuesOnCompressedPage - } - if compressedPageValuesCount <= 1 { g := hi.hc.statelessGetter(historyItem.i) g.Reset(offset) hi.nextVal, _ = g.Next(nil) } else { - g := seg.NewPagedReader(hi.hc.statelessGetter(historyItem.i), compressedPageValuesCount, true) + g := seg.NewPagedReader(hi.hc.statelessGetter(historyItem.i), true) g.Reset(offset) for i := 0; i < compressedPageValuesCount && g.HasNext(); i++ { k, v, _, _ := g.Next2(nil) @@ -787,17 +779,12 @@ func (ht *HistoryTraceKeyFiles) advance() error { compressedPageValuesCount := historyItem.src.decompressor.CompressedPageValuesCount() - if historyItem.src.decompressor.CompressionFormatVersion() == seg.FileCompressionFormatV0 { - compressedPageValuesCount = ht.hc.h.HistoryValuesOnCompressedPage - } - if ht.histReader == nil { idxReader := ht.hc.statelessIdxReader(ht.fileIdx) getter := ht.hc.statelessGetter(ht.fileIdx) getter.Reset(0) ht.histReader = seg.NewPagedReader( getter, - compressedPageValuesCount, true, ) offset, ok := idxReader.Lookup(ht.histKey) diff --git a/db/state/history_test.go b/db/state/history_test.go index fcf537e9899..ed9eb2c33af 100644 --- a/db/state/history_test.go +++ b/db/state/history_test.go @@ -113,14 +113,8 @@ func TestHistoryCollationsAndBuilds(t *testing.T) { require.NotNil(t, sf) defer sf.CleanupOnError() - compressedPageValuesCount := sf.historyDecomp.CompressedPageValuesCount() - - if sf.historyDecomp.CompressionFormatVersion() == seg.FileCompressionFormatV0 { - compressedPageValuesCount = h.HistoryValuesOnCompressedPage - } - efReader := h.InvertedIndex.dataReader(sf.efHistoryDecomp) - hReader := seg.NewPagedReader(h.dataReader(sf.historyDecomp), compressedPageValuesCount, true) + hReader := seg.NewPagedReader(h.dataReader(sf.historyDecomp), true) // ef contains all sorted keys // for each key it has a list of txNums @@ -242,13 +236,7 @@ func TestHistoryCollationBuild(t *testing.T) { defer sf.CleanupOnError() var valWords []string - compressedPageValuesCount := sf.historyDecomp.CompressedPageValuesCount() - - if sf.historyDecomp.CompressionFormatVersion() == seg.FileCompressionFormatV0 { - compressedPageValuesCount = h.HistoryValuesOnCompressedPage - } - - gh := seg.NewPagedReader(h.dataReader(sf.historyDecomp), compressedPageValuesCount, true) + gh := seg.NewPagedReader(h.dataReader(sf.historyDecomp), true) gh.Reset(0) for gh.HasNext() { w, _ := gh.Next(nil) @@ -294,7 +282,7 @@ func TestHistoryCollationBuild(t *testing.T) { r = recsplit.NewIndexReader(sf.historyIdx) defer r.Close() - gh = seg.NewPagedReader(h.dataReader(sf.historyDecomp), compressedPageValuesCount, true) + gh = seg.NewPagedReader(h.dataReader(sf.historyDecomp), true) var vi int for i := 0; i < len(keyWords); i++ { ints := intArrs[i] diff --git a/db/state/merge.go b/db/state/merge.go index 07da04909cb..e90c850ec5d 100644 --- a/db/state/merge.go +++ b/db/state/merge.go @@ -815,13 +815,7 @@ func (ht *HistoryRoTx) mergeFiles(ctx context.Context, indexFiles, historyFiles var g2 *seg.PagedReader for _, hi := range historyFiles { // full-scan, because it's ok to have different amount files. by unclean-shutdown. if hi.startTxNum == item.startTxNum && hi.endTxNum == item.endTxNum { - compressedPageValuesCount := hi.decompressor.CompressedPageValuesCount() - - if hi.decompressor.CompressionFormatVersion() == seg.FileCompressionFormatV0 { - compressedPageValuesCount = ht.h.HistoryValuesOnCompressedPage - } - - g2 = seg.NewPagedReader(ht.dataReader(hi.decompressor), compressedPageValuesCount, true) + g2 = seg.NewPagedReader(ht.dataReader(hi.decompressor), true) break } } diff --git a/db/state/proto_forkable.go b/db/state/proto_forkable.go index 117c8466da1..8651a78dd79 100644 --- a/db/state/proto_forkable.go +++ b/db/state/proto_forkable.go @@ -169,7 +169,7 @@ func (a *ProtoForkable) DataReader(f *seg.Decompressor, compress bool) *seg.Read } func (a *ProtoForkable) PagedDataReader(f *seg.Decompressor, compress bool) *seg.PagedReader { - return seg.NewPagedReader(a.DataReader(f, compress), a.cfg.ValuesOnCompressedPage, compress) + return seg.NewPagedReader(a.DataReader(f, compress), compress) } func (a *ProtoForkable) BuildIndexes(ctx context.Context, decomp *seg.Decompressor, from, to RootNum, ps *background.ProgressSet) (indexes []*recsplit.Index, err error) { diff --git a/db/state/simple_accessor_builder.go b/db/state/simple_accessor_builder.go index c7242e17739..988833ff8d2 100644 --- a/db/state/simple_accessor_builder.go +++ b/db/state/simple_accessor_builder.go @@ -136,7 +136,7 @@ func (s *SimpleAccessorBuilder) SetAccessorArgs(args *AccessorArgs) { func (s *SimpleAccessorBuilder) GetInputDataQuery(decomp *seg.Decompressor, compressionUsed bool) (*DecompressorIndexInputDataQuery, error) { //sgname := s.parser.DataFile(version.V1_0, from, to) //decomp, _ := seg.NewDecompressorWithMetadata(sgname, true) - reader := seg.NewPagedReader(decomp.MakeGetter(), s.args.ValuesOnCompressedPage, compressionUsed) + reader := seg.NewPagedReader(decomp.MakeGetter(), compressionUsed) return NewDecompressorIndexInputDataQuery(reader) }