Skip to content

Commit 05080a3

Browse files
authored
Forward compatible snapshots format (#18746)
Cherry-picked this PR: #18226 and also changed snapshots format version from v1 to v0 in compressor settings because new format will be used as default only since 3.4
1 parent f473a97 commit 05080a3

File tree

10 files changed

+188
-57
lines changed

10 files changed

+188
-57
lines changed

db/seg/compress.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,14 @@ type Cfg struct {
7777

7878
// arbitrary bytes set by user at start of the file
7979
ExpectMetadata bool
80+
81+
// number of values on compressed page. if > 0 then page level compression is enabled
82+
ValuesOnCompressedPage int
83+
}
84+
85+
func (c Cfg) WithValuesOnCompressedPage(n int) Cfg {
86+
c.ValuesOnCompressedPage = n
87+
return c
8088
}
8189

8290
var DefaultCfg = Cfg{
@@ -123,7 +131,10 @@ type Compressor struct {
123131
logger log.Logger
124132
noFsync bool // fsync is enabled by default, but tests can manually disable
125133

126-
metadata []byte
134+
version uint8
135+
featureFlagBitmask FeatureFlagBitmask
136+
compPageValuesCount uint8
137+
metadata []byte
127138
}
128139

129140
func NewCompressor(ctx context.Context, logPrefix, outputFile, tmpDir string, cfg Cfg, lvl log.Lvl, logger log.Logger) (*Compressor, error) {
@@ -151,7 +162,7 @@ func NewCompressor(ctx context.Context, logPrefix, outputFile, tmpDir string, cf
151162
go extractPatternsInSuperstrings(ctx, superstrings, collector, cfg, wg, logger)
152163
}
153164
_, outputFileName := filepath.Split(outputFile)
154-
return &Compressor{
165+
cc := &Compressor{
155166
Cfg: cfg,
156167
uncompressedFile: uncompressedFile,
157168
outputFile: outputFile,
@@ -164,7 +175,15 @@ func NewCompressor(ctx context.Context, logPrefix, outputFile, tmpDir string, cf
164175
lvl: lvl,
165176
wg: wg,
166177
logger: logger,
167-
}, nil
178+
version: FileCompressionFormatV0, // for release 3.3 only, since 3.4 we use v1 format
179+
}
180+
181+
if cfg.ValuesOnCompressedPage > 0 {
182+
cc.featureFlagBitmask.Set(PageLevelCompressionEnabled)
183+
cc.compPageValuesCount = uint8(cfg.ValuesOnCompressedPage)
184+
}
185+
186+
return cc, nil
168187
}
169188

170189
func (c *Compressor) Close() {
@@ -175,9 +194,10 @@ func (c *Compressor) Close() {
175194
c.suffixCollectors = nil
176195
}
177196

178-
func (c *Compressor) SetTrace(trace bool) { c.trace = trace }
179-
func (c *Compressor) FileName() string { return c.outputFileName }
180-
func (c *Compressor) WorkersAmount() int { return c.Workers }
197+
func (c *Compressor) SetTrace(trace bool) { c.trace = trace }
198+
func (c *Compressor) FileName() string { return c.outputFileName }
199+
func (c *Compressor) WorkersAmount() int { return c.Workers }
200+
func (c *Compressor) GetValuesOnCompressedPage() int { return int(c.ValuesOnCompressedPage) }
181201
func (c *Compressor) SetMetadata(metadata []byte) {
182202
if !c.ExpectMetadata {
183203
panic("metadata not expected in compressor")
@@ -276,6 +296,19 @@ func (c *Compressor) Compress() error {
276296
tmpFileName := cf.Name()
277297
defer dir.RemoveFile(tmpFileName)
278298
defer cf.Close()
299+
300+
if c.version == FileCompressionFormatV1 {
301+
if _, err := cf.Write([]byte{c.version, byte(c.featureFlagBitmask)}); err != nil {
302+
return err
303+
}
304+
305+
if c.featureFlagBitmask.Has(PageLevelCompressionEnabled) {
306+
if _, err := cf.Write([]byte{c.compPageValuesCount}); err != nil {
307+
return err
308+
}
309+
}
310+
}
311+
279312
if c.ExpectMetadata {
280313
dataLen := uint32(len(c.metadata))
281314
var dataLenB [4]byte

db/seg/decompress.go

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -121,19 +121,22 @@ func (e ErrCompressedFileCorrupted) Is(err error) bool {
121121

122122
// Decompressor provides access to the superstrings in a file produced by a compressor
123123
type Decompressor struct {
124-
f *os.File
125-
mmapHandle2 *[mmap.MaxMapSize]byte // mmap handle for windows (this is used to close mmap)
126-
dict *patternTable
127-
posDict *posTable
128-
mmapHandle1 []byte // mmap handle for unix (this is used to close mmap)
129-
data []byte // slice of correct size for the decompressor to work with
130-
wordsStart uint64 // Offset of whether the superstrings actually start
131-
size int64
132-
modTime time.Time
133-
wordsCount uint64
134-
emptyWordsCount uint64
135-
hasMetadata bool
136-
metadata []byte
124+
f *os.File
125+
mmapHandle2 *[mmap.MaxMapSize]byte // mmap handle for windows (this is used to close mmap)
126+
dict *patternTable
127+
posDict *posTable
128+
mmapHandle1 []byte // mmap handle for unix (this is used to close mmap)
129+
data []byte // slice of correct size for the decompressor to work with
130+
wordsStart uint64 // Offset of whether the superstrings actually start
131+
size int64
132+
modTime time.Time
133+
wordsCount uint64
134+
emptyWordsCount uint64
135+
hasMetadata bool
136+
metadata []byte
137+
version uint8
138+
featureFlagBitmask FeatureFlagBitmask
139+
compPageValuesCount uint8
137140

138141
serializedDictSize uint64
139142
lenDictSize uint64 // huffman encoded lengths
@@ -231,6 +234,21 @@ func NewDecompressorWithMetadata(compressedFilePath string, hasMetadata bool) (*
231234
d.data = d.mmapHandle1[:d.size]
232235
defer d.MadvNormal().DisableReadAhead() //speedup opening on slow drives
233236

237+
d.version = d.data[0]
238+
239+
if d.version == FileCompressionFormatV1 {
240+
// 1st byte: version,
241+
// 2nd byte: defines how exactly the file is compressed
242+
// 3rd byte (otional): exists if PageLevelCompressionEnabled flag is enabled, and defines number of values on compressed page
243+
d.featureFlagBitmask = FeatureFlagBitmask(d.data[1])
244+
d.data = d.data[2:]
245+
}
246+
247+
if d.featureFlagBitmask.Has(PageLevelCompressionEnabled) {
248+
d.compPageValuesCount = d.data[0]
249+
d.data = d.data[1:]
250+
}
251+
234252
if hasMetadata {
235253
metadataLen := binary.BigEndian.Uint32(d.data[:4])
236254
d.metadata = d.data[4 : 4+metadataLen]
@@ -362,10 +380,11 @@ func NewDecompressorWithMetadata(compressedFilePath string, hasMetadata bool) (*
362380
}
363381
d.wordsStart = pos + dictSize
364382

365-
if d.Count() == 0 && dictSize == 0 && d.size > compressedMinSize {
383+
if d.Count() == 0 && dictSize == 0 && d.size > d.calcCompressedMinSize() {
366384
return nil, &ErrCompressedFileCorrupted{
367385
FileName: fName, Reason: fmt.Sprintf("size %v but no words in it", datasize.ByteSize(d.size).HR())}
368386
}
387+
369388
validationPassed = true
370389
return d, nil
371390
}
@@ -466,10 +485,12 @@ func buildPosTable(depths []uint64, poss []uint64, table *posTable, code uint16,
466485
func (d *Decompressor) DataHandle() unsafe.Pointer {
467486
return unsafe.Pointer(&d.data[0])
468487
}
469-
func (d *Decompressor) SerializedDictSize() uint64 { return d.serializedDictSize }
470-
func (d *Decompressor) SerializedLenSize() uint64 { return d.lenDictSize }
471-
func (d *Decompressor) DictWords() int { return d.dictWords }
472-
func (d *Decompressor) DictLens() int { return d.dictLens }
488+
func (d *Decompressor) SerializedDictSize() uint64 { return d.serializedDictSize }
489+
func (d *Decompressor) SerializedLenSize() uint64 { return d.lenDictSize }
490+
func (d *Decompressor) DictWords() int { return d.dictWords }
491+
func (d *Decompressor) DictLens() int { return d.dictLens }
492+
func (d *Decompressor) CompressedPageValuesCount() int { return int(d.compPageValuesCount) }
493+
func (d *Decompressor) CompressionFormatVersion() uint8 { return d.version }
473494

474495
func (d *Decompressor) Size() int64 {
475496
return d.size
@@ -1082,3 +1103,15 @@ func (g *Getter) BinarySearch(seek []byte, count int, getOffset func(i uint64) (
10821103
}
10831104
return foundOffset, true
10841105
}
1106+
1107+
func (d *Decompressor) calcCompressedMinSize() int64 {
1108+
if d.version == FileCompressionFormatV0 {
1109+
return compressedMinSize
1110+
}
1111+
1112+
if d.featureFlagBitmask.Has(PageLevelCompressionEnabled) {
1113+
return compressedMinSize + 3 // 2 bytes always are used for bitmask and version + 1 optional for page level compression if enabled
1114+
}
1115+
1116+
return compressedMinSize + 2
1117+
}

db/seg/seg_interface.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,29 @@ const (
2424
CompressVals FileCompression = 0b100
2525
)
2626

27+
const (
28+
FileCompressionFormatV0 = uint8(0)
29+
FileCompressionFormatV1 = uint8(1)
30+
)
31+
32+
type FeatureFlag uint8
33+
34+
const (
35+
PageLevelCompressionEnabled FeatureFlag = 1 << iota // 0b001
36+
KeyCompressionEnabled // 0b010
37+
ValCompressionEnabled // 0b100
38+
)
39+
40+
type FeatureFlagBitmask uint8
41+
42+
func (m FeatureFlagBitmask) Has(flag FeatureFlag) bool {
43+
return m&FeatureFlagBitmask(flag) == FeatureFlagBitmask(flag)
44+
}
45+
46+
func (m *FeatureFlagBitmask) Set(flag FeatureFlag) {
47+
*m |= FeatureFlagBitmask(flag)
48+
}
49+
2750
func ParseFileCompression(s string) (FileCompression, error) {
2851
// Implementation would be here
2952
return CompressNone, nil

db/seg/seg_paged_rw.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,13 @@ func (g *PagedReader) Skip() (uint64, int) {
227227
return offset, len(v)
228228
}
229229

230-
func NewPagedWriter(parent CompressorI, pageSize int, compressionEnabled bool) *PagedWriter {
231-
return &PagedWriter{parent: parent, pageSize: pageSize, compressionEnabled: compressionEnabled}
230+
func NewPagedWriter(parent CompressorI, compressionEnabled bool) *PagedWriter {
231+
232+
return &PagedWriter{
233+
parent: parent,
234+
pageSize: parent.GetValuesOnCompressedPage(),
235+
compressionEnabled: compressionEnabled,
236+
}
232237
}
233238

234239
type CompressorI interface {
@@ -238,6 +243,7 @@ type CompressorI interface {
238243
Count() int
239244
FileName() string
240245
SetMetadata(data []byte)
246+
GetValuesOnCompressedPage() int
241247
}
242248
type PagedWriter struct {
243249
parent CompressorI

db/seg/seg_paged_rw_test.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,14 +35,14 @@ func prepareLoremDictOnPagedWriter(t *testing.T, pageSize int, pageCompression b
3535
logger, require := log.New(), require.New(t)
3636
tmpDir := t.TempDir()
3737
file := filepath.Join(tmpDir, "compressed1")
38-
cfg := DefaultCfg
38+
cfg := DefaultCfg.WithValuesOnCompressedPage(pageSize)
3939
cfg.MinPatternScore = 1
4040
cfg.Workers = 1
4141
c, err := NewCompressor(context.Background(), t.Name(), file, tmpDir, cfg, log.LvlDebug, logger)
4242
require.NoError(err)
4343
defer c.Close()
4444

45-
p := NewPagedWriter(NewWriter(c, CompressNone), pageSize, pageCompression)
45+
p := NewPagedWriter(NewWriter(c, CompressNone), pageCompression)
4646
for k, w := range loremStrings {
4747
key := fmt.Sprintf("key %d", k)
4848
val := fmt.Sprintf("%s %d", w, k)
@@ -97,25 +97,27 @@ func TestPagedReader(t *testing.T) {
9797

9898
// multyBytesWriter is a writer for [][]byte, similar to bytes.Writer.
9999
type multyBytesWriter struct {
100-
buffer [][]byte
100+
buffer [][]byte
101+
pageSize int
101102
}
102103

103104
func (w *multyBytesWriter) Write(p []byte) (n int, err error) {
104105
w.buffer = append(w.buffer, common.Copy(p))
105106
return len(p), nil
106107
}
107-
func (w *multyBytesWriter) Bytes() [][]byte { return w.buffer }
108-
func (w *multyBytesWriter) FileName() string { return "" }
109-
func (w *multyBytesWriter) Count() int { return 0 }
110-
func (w *multyBytesWriter) Close() {}
111-
func (w *multyBytesWriter) Compress() error { return nil }
112-
func (w *multyBytesWriter) Reset() { w.buffer = nil }
113-
func (w *multyBytesWriter) SetMetadata([]byte) {}
108+
func (w *multyBytesWriter) Bytes() [][]byte { return w.buffer }
109+
func (w *multyBytesWriter) FileName() string { return "" }
110+
func (w *multyBytesWriter) Count() int { return 0 }
111+
func (w *multyBytesWriter) Close() {}
112+
func (w *multyBytesWriter) Compress() error { return nil }
113+
func (w *multyBytesWriter) Reset() { w.buffer = nil }
114+
func (w *multyBytesWriter) SetMetadata([]byte) {}
115+
func (w *multyBytesWriter) GetValuesOnCompressedPage() int { return w.pageSize }
114116

115117
func TestPage(t *testing.T) {
116-
buf, require := &multyBytesWriter{}, require.New(t)
117118
sampling := 2
118-
w := NewPagedWriter(buf, sampling, false)
119+
buf, require := &multyBytesWriter{pageSize: sampling}, require.New(t)
120+
w := NewPagedWriter(buf, false)
119121
for i := 0; i < sampling+1; i++ {
120122
k, v := fmt.Sprintf("k %d", i), fmt.Sprintf("v %d", i)
121123
require.NoError(w.Add([]byte(k), []byte(v)))
@@ -147,8 +149,8 @@ func TestPage(t *testing.T) {
147149
}
148150

149151
func BenchmarkName(b *testing.B) {
150-
buf := &multyBytesWriter{}
151-
w := NewPagedWriter(buf, 16, false)
152+
buf := &multyBytesWriter{pageSize: 16}
153+
w := NewPagedWriter(buf, false)
152154
for i := 0; i < 16; i++ {
153155
w.Add([]byte{byte(i)}, []byte{10 + byte(i)})
154156
}

db/state/history.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -298,11 +298,12 @@ func (h *History) buildVI(ctx context.Context, historyIdxPath string, hist, efHi
298298
if err = rs.AddKey(histKey, valOffset); err != nil {
299299
return err
300300
}
301-
if h.HistoryValuesOnCompressedPage == 0 {
301+
302+
if h.CompressorCfg.ValuesOnCompressedPage == 0 {
302303
valOffset, _ = histReader.Skip()
303304
} else {
304305
i++
305-
if i%h.HistoryValuesOnCompressedPage == 0 {
306+
if i%h.CompressorCfg.ValuesOnCompressedPage == 0 {
306307
valOffset, _ = histReader.Skip()
307308
}
308309
}
@@ -856,10 +857,12 @@ func (h *History) dataWriter(f *seg.Compressor) *seg.PagedWriter {
856857
if !strings.Contains(f.FileName(), ".v") {
857858
panic("assert: miss-use " + f.FileName())
858859
}
859-
return seg.NewPagedWriter(seg.NewWriter(f, h.Compression), h.HistoryValuesOnCompressedPage, true)
860+
return seg.NewPagedWriter(seg.NewWriter(f, h.Compression), f.GetValuesOnCompressedPage() > 0)
861+
}
862+
func (ht *HistoryRoTx) dataReader(f *seg.Decompressor) *seg.Reader { return ht.h.dataReader(f) }
863+
func (ht *HistoryRoTx) datarWriter(f *seg.Compressor) *seg.PagedWriter {
864+
return ht.h.dataWriter(f)
860865
}
861-
func (ht *HistoryRoTx) dataReader(f *seg.Decompressor) *seg.Reader { return ht.h.dataReader(f) }
862-
func (ht *HistoryRoTx) datarWriter(f *seg.Compressor) *seg.PagedWriter { return ht.h.dataWriter(f) }
863866

864867
func (h *History) isEmpty(tx kv.Tx) (bool, error) {
865868
k, err := kv.FirstKey(tx, h.ValuesTable)
@@ -1137,7 +1140,13 @@ func (ht *HistoryRoTx) historySeekInFiles(key []byte, txNum uint64) ([]byte, boo
11371140
fmt.Printf("DomainGetAsOf(%s, %x, %d) -> %s, histTxNum=%d, isNil(v)=%t\n", ht.h.FilenameBase, key, txNum, g.FileName(), histTxNum, v == nil)
11381141
}
11391142

1140-
if ht.h.HistoryValuesOnCompressedPage > 1 {
1143+
compressedPageValuesCount := historyItem.src.decompressor.CompressedPageValuesCount()
1144+
1145+
if historyItem.src.decompressor.CompressionFormatVersion() == seg.FileCompressionFormatV0 {
1146+
compressedPageValuesCount = ht.h.HistoryValuesOnCompressedPage
1147+
}
1148+
1149+
if compressedPageValuesCount > 1 {
11411150
v, ht.snappyReadBuffer = seg.GetFromPage(historyKey, v, ht.snappyReadBuffer, true)
11421151
}
11431152
return v, true, nil
@@ -1400,9 +1409,15 @@ func (ht *HistoryRoTx) HistoryDump(fromTxNum, toTxNum int, dumpTo io.Writer) err
14001409
return fmt.Errorf("HistoryDump: failed to resolve offset in .vi %s file for key [%x]", viFile.Fullpath(), key)
14011410
}
14021411

1412+
compressedPageValuesCount := viFile.src.decompressor.CompressedPageValuesCount()
1413+
1414+
if viFile.src.decompressor.CompressionFormatVersion() == seg.FileCompressionFormatV0 {
1415+
compressedPageValuesCount = ht.h.HistoryValuesOnCompressedPage
1416+
}
1417+
14031418
vGetter := seg.NewPagedReader(
14041419
ht.statelessGetter(viFile.i),
1405-
ht.h.HistoryValuesOnCompressedPage,
1420+
compressedPageValuesCount,
14061421
true,
14071422
)
14081423

0 commit comments

Comments
 (0)