Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions db/migrations/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ var migrations = map[kv.Label][]Migration{
dbcfg.ChainDB: {
dbSchemaVersion5,
ResetStageTxnLookup,
SegHeaderV2,
},
dbcfg.TxPoolDB: {},
dbcfg.SentryDB: {},
Expand Down
181 changes: 181 additions & 0 deletions db/migrations/seg_header_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 2024 The Erigon Authors
// This file is part of Erigon.
//
// Erigon is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Erigon is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with Erigon. If not, see <http://www.gnu.org/licenses/>.

package migrations

import (
"context"
"os"
"path/filepath"
"strings"

"github.com/erigontech/erigon/common/log/v3"
"github.com/erigontech/erigon/db/datadir"
"github.com/erigontech/erigon/db/kv"
"github.com/erigontech/erigon/db/seg"
"github.com/erigontech/erigon/db/state/statecfg"
)

// SegHeaderV2 upgrades all V1 .seg/.v/.ef snapshot files to V2 by patching
// the two-byte file header in-place. V2 is identical to V1 except that the
// featureFlagBitmask byte now reliably encodes KeyCompressionEnabled /
// ValCompressionEnabled in addition to PageLevelCompressionEnabled.
//
// The correct compression flags are looked up from statecfg.Schema so that
// no heuristic detection (DetectCompressType) is needed.
var SegHeaderV2 = Migration{
Name: "seg_header_v2",
Up: func(db kv.RwDB, dirs datadir.Dirs, progress []byte, BeforeCommit Callback, logger log.Logger) error {
tx, err := db.BeginRw(context.Background())
if err != nil {
return err
}
defer tx.Rollback()

lookup := buildSegCompressionLookup()

snapDirs := []string{
dirs.Snap,
dirs.SnapDomain,
dirs.SnapHistory,
dirs.SnapIdx,
dirs.SnapAccessors,
dirs.SnapCaplin,
}
for _, dir := range snapDirs {
if err := upgradeSegFilesInDir(dir, lookup, logger); err != nil {
return err
}
}

if err := BeforeCommit(tx, nil, true); err != nil {
return err
}
return tx.Commit()
},
}

// segDataExts are the file extensions written by the seg Compressor that
// carry the version/featureFlag header.
var segDataExts = map[string]bool{".seg": true, ".v": true, ".ef": true}

func upgradeSegFilesInDir(dir string, lookup map[string]seg.FileCompression, logger log.Logger) error {
return filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
if err != nil || d.IsDir() {
return err
}
ext := filepath.Ext(path)
if !segDataExts[ext] {
return nil
}
return upgradeSegHeaderV1toV2(path, ext, lookup, logger)
})
}

// upgradeSegHeaderV1toV2 patches a single seg-format file from V1 to V2.
// It is a no-op for V0 files and files that are already V2+.
func upgradeSegHeaderV1toV2(path, ext string, lookup map[string]seg.FileCompression, logger log.Logger) error {
d, err := seg.NewDecompressor(path)
if err != nil {
return err
}
if d.CompressionFormatVersion() != seg.FileCompressionFormatV1 {
d.Close()
return nil
}

pageCnt := d.CompressedPageValuesCount()
d.Close() // release mmap before writing

// Determine key/val compression from the schema lookup (tag = last dash-separated
// component of the base name, e.g. "accounts" from "v1-000000-000100-accounts.seg").
base := filepath.Base(path)
tag := segTag(base, ext)
fc := lookup[tag+ext] // zero value (no compression) if unknown

var bitmask seg.FeatureFlagBitmask
if pageCnt > 0 {
bitmask.Set(seg.PageLevelCompressionEnabled)
}
if fc.Has(seg.CompressKeys) {
bitmask.Set(seg.KeyCompressionEnabled)
}
if fc.Has(seg.CompressVals) {
bitmask.Set(seg.ValCompressionEnabled)
}

f, err := os.OpenFile(path, os.O_RDWR, 0)
if err != nil {
return err
}
defer f.Close()

if _, err := f.WriteAt([]byte{seg.FileCompressionFormatV2, byte(bitmask)}, 0); err != nil {
return err
}
if err := f.Sync(); err != nil {
return err
}

logger.Debug("[seg_header_v2] upgraded", "file", base)
return nil
}

// segTag extracts the file-type tag from a snapshot base name.
// Format: "{version}-{from}-{to}-{tag}.{ext}" (first 3 fields are separated by "-").
func segTag(base, ext string) string {
withoutExt := base[:len(base)-len(ext)]
parts := strings.SplitN(withoutExt, "-", 4)
if len(parts) == 4 {
return parts[3]
}
return ""
}

// buildSegCompressionLookup returns a map from "{tag}{ext}" to the FileCompression
// used when writing that file type, derived from the global statecfg.Schema.
func buildSegCompressionLookup() map[string]seg.FileCompression {
s := statecfg.Schema
m := make(map[string]seg.FileCompression)

domains := []statecfg.DomainCfg{
s.AccountsDomain,
s.StorageDomain,
s.CodeDomain,
s.CommitmentDomain,
s.ReceiptDomain,
s.RCacheDomain,
}
for _, d := range domains {
name := d.Name.String()
m[name+".seg"] = d.Compression
m[name+".v"] = d.Hist.Compression
// Each domain's inverted-index uses the domain name as its FilenameBase.
m[d.Hist.IiCfg.FilenameBase+".ef"] = d.Hist.IiCfg.Compression
}

// Standalone inverted indexes (log/trace).
for _, ii := range []statecfg.InvIdxCfg{
s.LogAddrIdx,
s.LogTopicIdx,
s.TracesFromIdx,
s.TracesToIdx,
} {
m[ii.FilenameBase+".ef"] = ii.Compression
}

return m
}
19 changes: 17 additions & 2 deletions db/seg/compress.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
version uint8
featureFlagBitmask FeatureFlagBitmask
compPageValuesCount uint8
totalPairsCount uint64
metadata []byte
}

Expand Down Expand Up @@ -193,7 +194,7 @@
lvl: lvl,
wg: wg,
logger: logger,
version: FileCompressionFormatV1,
version: FileCompressionFormatV2,
}

if cfg.ValuesOnCompressedPage > 0 {
Expand All @@ -215,7 +216,14 @@
func (c *Compressor) SetTrace(trace bool) { c.trace = trace }
func (c *Compressor) FileName() string { return c.outputFileName }
func (c *Compressor) WorkersAmount() int { return c.Workers }
func (c *Compressor) GetValuesOnCompressedPage() int { return int(c.ValuesOnCompressedPage) }

Check failure on line 219 in db/seg/compress.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofmt)

Check failure on line 219 in db/seg/compress.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofmt)
// SetPairsCount stores the total number of key-value pairs in the file header
// (V2+, PairsCountEnabled). Must be called before Compress().
func (c *Compressor) SetPairsCount(n uint64) {
c.totalPairsCount = n
c.featureFlagBitmask.Set(PairsCountEnabled)
}

func (c *Compressor) SetMetadata(metadata []byte) {
if !c.ExpectMetadata {
panic("metadata not expected in compressor")
Expand Down Expand Up @@ -343,7 +351,7 @@
defer dir.RemoveFile(tmpFileName)
defer cf.Close()

if c.version == FileCompressionFormatV1 {
if c.version == FileCompressionFormatV1 || c.version == FileCompressionFormatV2 {
if _, err := cf.Write([]byte{c.version, byte(c.featureFlagBitmask)}); err != nil {
return err
}
Expand All @@ -353,6 +361,13 @@
return err
}
}
if c.featureFlagBitmask.Has(PairsCountEnabled) {
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], c.totalPairsCount)
if _, err := cf.Write(buf[:]); err != nil {
return err
}
}
}

if c.ExpectMetadata {
Expand Down
30 changes: 28 additions & 2 deletions db/seg/decompress.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@
version uint8
featureFlagBitmask FeatureFlagBitmask
compPageValuesCount uint8
totalPairsCount uint64

serializedDictSize uint64
lenDictSize uint64 // huffman encoded lengths
Expand Down Expand Up @@ -258,10 +259,12 @@

d.version = d.data[0]

if d.version == FileCompressionFormatV1 {
if d.version == FileCompressionFormatV1 || d.version == FileCompressionFormatV2 {
// 1st byte: version,
// 2nd byte: defines how exactly the file is compressed
// 3rd byte (otional): exists if PageLevelCompressionEnabled flag is enabled, and defines number of values on compressed page
// 3rd byte (optional): exists if PageLevelCompressionEnabled flag is enabled, and defines number of values on compressed page
// Note: KeyCompressionEnabled / ValCompressionEnabled bits in the bitmask are only
// reliable for V2+; V1 files may have those bits unset even when keys/vals are compressed.
d.featureFlagBitmask = FeatureFlagBitmask(d.data[1])
d.data = d.data[2:]
}
Expand All @@ -270,6 +273,10 @@
d.compPageValuesCount = d.data[0]
d.data = d.data[1:]
}
if d.featureFlagBitmask.Has(PairsCountEnabled) {
d.totalPairsCount = binary.BigEndian.Uint64(d.data[:8])
d.data = d.data[8:]
}

if hasMetadata {
metadataLen := binary.BigEndian.Uint32(d.data[:4])
Expand Down Expand Up @@ -512,7 +519,26 @@
func (d *Decompressor) DictWords() int { return d.dictWords }
func (d *Decompressor) DictLens() int { return d.dictLens }
func (d *Decompressor) CompressedPageValuesCount() int { return int(d.compPageValuesCount) }
func (d *Decompressor) CompressionFormatVersion() uint8 { return d.version }

Check failure on line 522 in db/seg/decompress.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofmt)

Check failure on line 522 in db/seg/decompress.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofmt)
// TotalPairsCount returns the number of key-value pairs stored in a page-compressed
// file. Returns 0 for files that do not have PairsCountEnabled in their header (V1
// files and V2 files written without page compression).
func (d *Decompressor) TotalPairsCount() uint64 { return d.totalPairsCount }
// FileCompression returns the key/value compression flags stored in the file header.
// Only reliable for V2+ files; returns CompressNone for V0 and V1.
func (d *Decompressor) FileCompression() FileCompression {
if d.version < FileCompressionFormatV2 {
return CompressNone
}
c := CompressNone
if d.featureFlagBitmask.Has(KeyCompressionEnabled) {
c |= CompressKeys
}
if d.featureFlagBitmask.Has(ValCompressionEnabled) {
c |= CompressVals
}
return c
}

func (d *Decompressor) Size() int64 {
return d.size
Expand Down
8 changes: 8 additions & 0 deletions db/seg/seg_auto_rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ type Writer struct {
}

func NewWriter(kv *Compressor, compress FileCompression) *Writer {
if compress.Has(CompressKeys) {
kv.featureFlagBitmask.Set(KeyCompressionEnabled)
}
if compress.Has(CompressVals) {
kv.featureFlagBitmask.Set(ValCompressionEnabled)
}
return &Writer{kv, false, compress}
}

Expand Down Expand Up @@ -133,6 +139,8 @@ func (c *Writer) ReadFrom(r *Reader) error {
return nil
}

func (c *Writer) SetPairsCount(n uint64) { c.Compressor.SetPairsCount(n) }

func (c *Writer) Close() {
if c.Compressor != nil {
c.Compressor.Close()
Expand Down
11 changes: 8 additions & 3 deletions db/seg/seg_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@ const (
const (
FileCompressionFormatV0 = uint8(0)
FileCompressionFormatV1 = uint8(1)
// FileCompressionFormatV2 is like V1 but the featureFlagBitmask is fully
// populated, including KeyCompressionEnabled / ValCompressionEnabled bits.
// Files at V1 may have those bits unset even when keys/vals are compressed.
FileCompressionFormatV2 = uint8(2)
)

type FeatureFlag uint8

const (
PageLevelCompressionEnabled FeatureFlag = 1 << iota // 0b001
KeyCompressionEnabled // 0b010
ValCompressionEnabled // 0b100
PageLevelCompressionEnabled FeatureFlag = 1 << iota // 0b0001
KeyCompressionEnabled // 0b0010
ValCompressionEnabled // 0b0100
PairsCountEnabled // 0b1000 — total key-value pair count follows compPageValuesCount in the header (V2+ only)
)

type FeatureFlagBitmask uint8
Expand Down
9 changes: 9 additions & 0 deletions db/seg/seg_paged_rw.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,21 @@
func (c *PagedWriter) Empty() bool { return c.pairs == 0 }
func (c *PagedWriter) Close() {
c.parent.Close()
}

Check failure on line 262 in db/seg/seg_paged_rw.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofmt)

Check failure on line 262 in db/seg/seg_paged_rw.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofmt)
type pairsCountSetter interface {
SetPairsCount(n uint64)
}

func (c *PagedWriter) Compress() error {
// Flush any remaining unwritten page data
if err := c.Flush(); err != nil {
return err
}
if c.pageSize > 1 {
if setter, ok := c.parent.(pairsCountSetter); ok {
setter.SetPairsCount(uint64(c.pairs))
}
}
return c.parent.Compress()
}

Expand Down
Loading