Skip to content

Commit 9bd0e5a

Browse files
seg: if word-level compression disabled. don't create .idt file (#19443)
Example: CommitmentDomain.History has page-level-compression enabled, word-level compression disabled. And 1 file there is 100Gb now. @eastorski i remember you added metadata to file. But I don't see how to get `CompressNone` flag in Compressor constructor. If we know word-level-compression in-advance - then can remove one intermediate file.
1 parent 12a5a04 commit 9bd0e5a

File tree

3 files changed

+242
-91
lines changed

3 files changed

+242
-91
lines changed

db/seg/compress.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -313,26 +313,16 @@ func (c *Compressor) Compress() error {
313313

314314
logEvery := time.NewTicker(20 * time.Second)
315315
defer logEvery.Stop()
316+
// Detect the fast path before sending the last superstring: if no superstrings were
317+
// ever produced (neither overflow nor the current partial one), then no words were
318+
// submitted for word-level compression, so the pattern dictionary will always be empty.
319+
noWordPatterns := c.superstringCount == 0 && len(c.superstring) == 0
316320
if len(c.superstring) > 0 {
317321
c.superstrings <- c.superstring
318322
}
319323
close(c.superstrings)
320324
c.wg.Wait()
321325

322-
if c.lvl < log.LvlTrace {
323-
c.logger.Log(c.lvl, fmt.Sprintf("[%s] BuildDict start", c.logPrefix), "workers", c.Workers)
324-
}
325-
db, err := DictionaryBuilderFromCollectors(c.ctx, c.Cfg, c.logPrefix, c.tmpDir, c.suffixCollectors, c.lvl, c.logger)
326-
if err != nil {
327-
return err
328-
}
329-
if c.trace {
330-
_, fileName := filepath.Split(c.outputFile)
331-
if err := PersistDictionary(filepath.Join(c.tmpDir, fileName)+".dictionary.txt", db); err != nil {
332-
return err
333-
}
334-
}
335-
336326
cf, err := dir.CreateTemp(c.outputFile)
337327
if err != nil {
338328
return err
@@ -366,8 +356,34 @@ func (c *Compressor) Compress() error {
366356
}
367357

368358
t := time.Now()
369-
if err := compressWithPatternCandidates(c.ctx, c.trace, c.Cfg, c.logPrefix, tmpFileName, cf, c.uncompressedFile, db, c.lvl, c.logger); err != nil {
370-
return err
359+
if noWordPatterns {
360+
// Fast path: no words were fed to the pattern-dictionary pipeline (e.g. history .v
361+
// files with CompressNone). Skip dictionary building and the intermediate file entirely.
362+
for _, coll := range c.suffixCollectors {
363+
coll.Close()
364+
}
365+
c.suffixCollectors = nil
366+
if err = compressNoWordPatterns(c.logPrefix, cf, c.uncompressedFile, c.lvl, c.logger); err != nil {
367+
return err
368+
}
369+
} else {
370+
if c.lvl < log.LvlTrace {
371+
c.logger.Log(c.lvl, fmt.Sprintf("[%s] BuildDict start", c.logPrefix), "workers", c.Workers)
372+
}
373+
var db *DictionaryBuilder
374+
db, err = DictionaryBuilderFromCollectors(c.ctx, c.Cfg, c.logPrefix, c.tmpDir, c.suffixCollectors, c.lvl, c.logger)
375+
if err != nil {
376+
return err
377+
}
378+
if c.trace {
379+
_, fileName := filepath.Split(c.outputFile)
380+
if err := PersistDictionary(filepath.Join(c.tmpDir, fileName)+".dictionary.txt", db); err != nil {
381+
return err
382+
}
383+
}
384+
if err = compressWithPatternCandidates(c.ctx, c.trace, c.Cfg, c.logPrefix, tmpFileName, cf, c.uncompressedFile, db, c.lvl, c.logger); err != nil {
385+
return err
386+
}
371387
}
372388
if err = c.fsync(cf); err != nil {
373389
return err

db/seg/compress_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ func TestCompressEmptyDict(t *testing.T) {
6565
if g.HasNext() {
6666
t.Fatalf("not expecting anything else")
6767
}
68+
if cs := checksum(file); cs != 2900861311 {
69+
t.Errorf("result file hash changed, %d", cs)
70+
}
6871
}
6972

7073
// nolint
@@ -324,4 +327,62 @@ func Test_CompressWithMetadata(t *testing.T) {
324327
}
325328
i++
326329
}
330+
if cs := checksum(d.filePath); cs != 4122484600 {
331+
t.Errorf("result file hash changed, %d", cs)
332+
}
333+
}
334+
335+
// TestCompressNoWordPatterns exercises the compressNoWordPatterns fast path, which is
336+
// triggered when all words are added via AddUncompressedWord (noWordPatterns == true).
337+
// Verifies that the output is a valid compressed file that round-trips correctly.
338+
func TestCompressNoWordPatterns(t *testing.T) {
339+
logger := log.New()
340+
tmpDir := t.TempDir()
341+
file := filepath.Join(tmpDir, "compressed")
342+
c, err := NewCompressor(context.Background(), t.Name(), file, tmpDir, DefaultCfg, log.LvlDebug, logger)
343+
require.NoError(t, err)
344+
defer c.Close()
345+
346+
// Build expected words: empty, short, varied-length — all via AddUncompressedWord.
347+
words := [][]byte{
348+
nil,
349+
[]byte("a"),
350+
}
351+
for i := range 100 {
352+
// Semantic: "empty word" means "found key with empty value". "nil" - means key was deleted - not encodable by compressor
353+
words = append(words,
354+
nil,
355+
[]byte{},
356+
357+
fmt.Appendf(nil, "%d longlongword %d", i, i),
358+
bytes.Repeat([]byte("x"), i+1),
359+
)
360+
}
361+
362+
for _, w := range words {
363+
require.NoError(t, c.AddUncompressedWord(w))
364+
}
365+
require.NoError(t, c.Compress())
366+
367+
d, err := NewDecompressor(file)
368+
require.NoError(t, err)
369+
defer d.Close()
370+
371+
require.EqualValues(t, len(words), d.Count())
372+
373+
g := d.MakeGetter()
374+
for _, expected := range words {
375+
require.True(t, g.HasNext())
376+
got, _ := g.Next(nil)
377+
if expected == nil {
378+
require.Equal(t, []byte{}, got) // Semantic: "empty word" means "found key with empty value". "nil" - means key was deleted - not encodable by compressor
379+
} else {
380+
require.Equal(t, expected, got)
381+
}
382+
}
383+
require.False(t, g.HasNext())
384+
385+
if cs := checksum(file); cs != 1879837905 {
386+
t.Errorf("fast-path output differs from main, checksum=%d", cs)
387+
}
327388
}

db/seg/parallel_compress.go

Lines changed: 149 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -570,83 +570,10 @@ func compressWithPatternCandidates(ctx context.Context, trace bool, cfg Cfg, log
570570
//fmt.Printf("[comp] depth=%d, code=[%b], codeLen=%d pattern=[%x]\n", p.depth, p.code, p.codeBits, p.word)
571571
}
572572

573-
var positionList PositionList
574-
pos2code := make(map[uint64]*Position)
575-
for pos, uses := range posMap {
576-
p := &Position{pos: pos, uses: uses, code: pos, codeBits: 0}
577-
positionList = append(positionList, p)
578-
pos2code[pos] = p
579-
}
580-
slices.SortFunc(positionList, positionListCmp)
581-
i = 0
582-
// Build Huffman tree for codes
583-
var posHeap PositionHeap
584-
heap.Init(&posHeap)
585-
tieBreaker = uint64(0)
586-
for posHeap.Len()+(positionList.Len()-i) > 1 {
587-
// New node
588-
h := &PositionHuff{
589-
tieBreaker: tieBreaker,
590-
}
591-
if posHeap.Len() > 0 && (i >= positionList.Len() || posHeap[0].uses < positionList[i].uses) {
592-
// Take h0 from the heap
593-
h.h0 = heap.Pop(&posHeap).(*PositionHuff)
594-
h.h0.AddZero()
595-
h.uses += h.h0.uses
596-
} else {
597-
// Take p0 from the list
598-
h.p0 = positionList[i]
599-
h.p0.code = 0
600-
h.p0.codeBits = 1
601-
h.uses += h.p0.uses
602-
i++
603-
}
604-
if posHeap.Len() > 0 && (i >= positionList.Len() || posHeap[0].uses < positionList[i].uses) {
605-
// Take h1 from the heap
606-
h.h1 = heap.Pop(&posHeap).(*PositionHuff)
607-
h.h1.AddOne()
608-
h.uses += h.h1.uses
609-
} else {
610-
// Take p1 from the list
611-
h.p1 = positionList[i]
612-
h.p1.code = 1
613-
h.p1.codeBits = 1
614-
h.uses += h.p1.uses
615-
i++
616-
}
617-
tieBreaker++
618-
heap.Push(&posHeap, h)
619-
}
620-
if posHeap.Len() > 0 {
621-
posRoot := heap.Pop(&posHeap).(*PositionHuff)
622-
posRoot.SetDepth(0)
623-
}
624-
// Calculate the size of pos dictionary
625-
var posSize uint64
626-
for _, p := range positionList {
627-
ns := binary.PutUvarint(numBuf[:], uint64(p.depth)) // Length of the position's depth
628-
n := binary.PutUvarint(numBuf[:], p.pos)
629-
posSize += uint64(ns + n)
630-
}
631-
// First, output dictionary size
632-
binary.BigEndian.PutUint64(numBuf[:], posSize) // Dictionary size
633-
if _, err = cw.Write(numBuf[:8]); err != nil {
573+
positionList, pos2code, posSize, err := buildAndWritePosDict(posMap, cw)
574+
if err != nil {
634575
return err
635576
}
636-
//fmt.Printf("posSize = %d\n", posSize)
637-
// Write all the positions
638-
slices.SortFunc(positionList, positionListCmp)
639-
for _, p := range positionList {
640-
ns := binary.PutUvarint(numBuf[:], uint64(p.depth))
641-
if _, err = cw.Write(numBuf[:ns]); err != nil {
642-
return err
643-
}
644-
n := binary.PutUvarint(numBuf[:], p.pos)
645-
if _, err = cw.Write(numBuf[:n]); err != nil {
646-
return err
647-
}
648-
//fmt.Printf("[comp] depth=%d, code=[%b], codeLen=%d pos=%d\n", p.depth, p.code, p.codeBits, p.pos)
649-
}
650577
if lvl < log.LvlTrace {
651578
logger.Log(lvl, fmt.Sprintf("[%s] Positional dictionary", logPrefix), "positionList.len", positionList.Len(), "posSize", common.ByteCount(posSize))
652579
}
@@ -748,6 +675,153 @@ func compressWithPatternCandidates(ctx context.Context, trace bool, cfg Cfg, log
748675
return nil
749676
}
750677

678+
// compressNoWordPatterns is a fast path for Compress when no words were submitted to the
679+
// pattern-dictionary pipeline (superstringCount == 0). This happens for history .v files
680+
// that use CompressNone. Instead of building a dictionary and writing/reading an intermediate
681+
// file, it makes two sequential passes over the raw words file and writes directly to cf.
682+
func compressNoWordPatterns(logPrefix string, cf *os.File, uncompressedFile *RawWordsFile, lvl log.Lvl, logger log.Logger) error {
683+
var numBuf [binary.MaxVarintLen64]byte
684+
685+
// Pass 1: collect word counts and position-length frequencies for the Huffman tree.
686+
var inCount, emptyWordsCount uint64
687+
posMap := make(map[uint64]uint64)
688+
if err := uncompressedFile.ForEach(func(v []byte, _ bool) error {
689+
inCount++
690+
l := uint64(len(v))
691+
posMap[l+1]++
692+
posMap[0]++
693+
if l == 0 {
694+
emptyWordsCount++
695+
}
696+
return nil
697+
}); err != nil {
698+
return err
699+
}
700+
701+
cw := getBufioWriter(cf)
702+
defer putBufioWriter(cw)
703+
704+
// Write data header: word count, empty word count, patternsSize=0, then position dict.
705+
binary.BigEndian.PutUint64(numBuf[:], inCount)
706+
if _, err := cw.Write(numBuf[:8]); err != nil {
707+
return err
708+
}
709+
binary.BigEndian.PutUint64(numBuf[:], emptyWordsCount)
710+
if _, err := cw.Write(numBuf[:8]); err != nil {
711+
return err
712+
}
713+
binary.BigEndian.PutUint64(numBuf[:], 0) // patternsSize = 0
714+
if _, err := cw.Write(numBuf[:8]); err != nil {
715+
return err
716+
}
717+
_, pos2code, posSize, err := buildAndWritePosDict(posMap, cw)
718+
if err != nil {
719+
return err
720+
}
721+
if lvl < log.LvlTrace {
722+
logger.Log(lvl, fmt.Sprintf("[%s] Positional dictionary (no-pattern fast path)", logPrefix), "posSize", common.ByteCount(posSize))
723+
}
724+
725+
// Pass 2: Huffman-encode position codes and copy raw word bytes to output.
726+
var hc BitWriter
727+
hc.w = cw
728+
if err := uncompressedFile.ForEach(func(v []byte, _ bool) error {
729+
l := uint64(len(v))
730+
if c := pos2code[l+1]; c != nil {
731+
if e := hc.encode(c.code, c.codeBits); e != nil {
732+
return e
733+
}
734+
}
735+
if l == 0 {
736+
return hc.flush()
737+
}
738+
if c := pos2code[0]; c != nil {
739+
if e := hc.encode(c.code, c.codeBits); e != nil {
740+
return e
741+
}
742+
}
743+
if e := hc.flush(); e != nil {
744+
return e
745+
}
746+
_, e := cw.Write(v)
747+
return e
748+
}); err != nil {
749+
return err
750+
}
751+
return cw.Flush()
752+
}
753+
754+
// buildAndWritePosDict builds the Huffman tree for position codes from posMap, writes the
755+
// position dictionary size and entries to cw, and returns (positionList, pos2code, posSize).
756+
func buildAndWritePosDict(posMap map[uint64]uint64, cw *bufio.Writer) (PositionList, map[uint64]*Position, uint64, error) {
757+
var numBuf [binary.MaxVarintLen64]byte
758+
positionList := make(PositionList, 0, len(posMap))
759+
pos2code := make(map[uint64]*Position, len(posMap))
760+
for pos, uses := range posMap {
761+
p := &Position{pos: pos, uses: uses, code: pos, codeBits: 0}
762+
positionList = append(positionList, p)
763+
pos2code[pos] = p
764+
}
765+
slices.SortFunc(positionList, positionListCmp)
766+
i := 0
767+
var posHeap PositionHeap
768+
heap.Init(&posHeap)
769+
tieBreaker := uint64(0)
770+
for posHeap.Len()+(positionList.Len()-i) > 1 {
771+
h := &PositionHuff{tieBreaker: tieBreaker}
772+
if posHeap.Len() > 0 && (i >= positionList.Len() || posHeap[0].uses < positionList[i].uses) {
773+
h.h0 = heap.Pop(&posHeap).(*PositionHuff)
774+
h.h0.AddZero()
775+
h.uses += h.h0.uses
776+
} else {
777+
h.p0 = positionList[i]
778+
h.p0.code = 0
779+
h.p0.codeBits = 1
780+
h.uses += h.p0.uses
781+
i++
782+
}
783+
if posHeap.Len() > 0 && (i >= positionList.Len() || posHeap[0].uses < positionList[i].uses) {
784+
h.h1 = heap.Pop(&posHeap).(*PositionHuff)
785+
h.h1.AddOne()
786+
h.uses += h.h1.uses
787+
} else {
788+
h.p1 = positionList[i]
789+
h.p1.code = 1
790+
h.p1.codeBits = 1
791+
h.uses += h.p1.uses
792+
i++
793+
}
794+
tieBreaker++
795+
heap.Push(&posHeap, h)
796+
}
797+
if posHeap.Len() > 0 {
798+
posRoot := heap.Pop(&posHeap).(*PositionHuff)
799+
posRoot.SetDepth(0)
800+
}
801+
var posSize uint64
802+
for _, p := range positionList {
803+
ns := binary.PutUvarint(numBuf[:], uint64(p.depth))
804+
n := binary.PutUvarint(numBuf[:], p.pos)
805+
posSize += uint64(ns + n)
806+
}
807+
binary.BigEndian.PutUint64(numBuf[:], posSize)
808+
if _, err := cw.Write(numBuf[:8]); err != nil {
809+
return nil, nil, 0, err
810+
}
811+
slices.SortFunc(positionList, positionListCmp)
812+
for _, p := range positionList {
813+
ns := binary.PutUvarint(numBuf[:], uint64(p.depth))
814+
if _, err := cw.Write(numBuf[:ns]); err != nil {
815+
return nil, nil, 0, err
816+
}
817+
n := binary.PutUvarint(numBuf[:], p.pos)
818+
if _, err := cw.Write(numBuf[:n]); err != nil {
819+
return nil, nil, 0, err
820+
}
821+
}
822+
return positionList, pos2code, posSize, nil
823+
}
824+
751825
// copyN - is alloc-free analog of io.CopyN func
752826
func copyN(r io.Reader, w io.Writer, uncoveredCount int, buf []byte) error {
753827
// Replace the io.CopyN call with manual copy using the buffer

0 commit comments

Comments
 (0)