Skip to content

Commit 0fc700c

Browse files
merge: better progress in logs (#19554)
adding keys to compressed file didn't have normal progress report
1 parent b73882f commit 0fc700c

File tree

5 files changed

+29
-21
lines changed

5 files changed

+29
-21
lines changed

db/state/deduplicate.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,12 @@ func (iit *InvertedIndexRoTx) deduplicateFiles(ctx context.Context, files []*Fil
252252
}
253253

254254
write := iit.dataWriter(comp, false)
255-
p := ps.AddNew(path.Base(datPath), 1)
255+
256+
cnt := 0
257+
for _, item := range files {
258+
cnt += item.decompressor.Count()
259+
}
260+
p := ps.AddNew(path.Base(datPath), uint64(cnt/2))
256261
defer ps.Delete(p)
257262

258263
var cp CursorHeap
@@ -284,6 +289,7 @@ func (iit *InvertedIndexRoTx) deduplicateFiles(ctx context.Context, files []*Fil
284289
// (when CursorHeap cp is empty), there is a need to process the last pair `keyBuf=>valBuf`, because it was one step behind
285290
var keyBuf, valBuf []byte
286291
var lastKey, lastVal []byte
292+
i := uint64(0)
287293
for cp.Len() > 0 {
288294
lastKey = append(lastKey[:0], cp[0].key...)
289295
lastVal = append(lastVal[:0], cp[0].val...)
@@ -337,10 +343,14 @@ func (iit *InvertedIndexRoTx) deduplicateFiles(ctx context.Context, files []*Fil
337343
if ci1.kvReader.HasNext() {
338344
ci1.key, _ = ci1.kvReader.Next(ci1.key[:0])
339345
ci1.val, _ = ci1.kvReader.Next(ci1.val[:0])
346+
i += 2
340347
// fmt.Printf("heap next push %s [%d] %x\n", ii.KeysTable, ci1.endTxNum, ci1.key)
341348
heap.Push(&cp, ci1)
342349
}
343350
}
351+
if i%1024 == 0 {
352+
p.Processed.Store(i)
353+
}
344354
if keyBuf != nil {
345355
// fmt.Printf("pput %x->%x\n", keyBuf, valBuf)
346356
if _, err = write.Write(keyBuf); err != nil {

db/state/history.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -789,24 +789,14 @@ func (h *History) buildFiles(ctx context.Context, step kv.Step, collation Histor
789789
}
790790

791791
{
792-
ps := background.NewProgressSet()
793-
_, efHistoryFileName := filepath.Split(collation.efHistoryPath)
794-
p := ps.AddNew(efHistoryFileName, 1)
795-
defer ps.Delete(p)
796-
797792
if err = collation.efHistoryComp.Compress(); err != nil {
798793
return HistoryFiles{}, fmt.Errorf("compress %s .ef history: %w", h.FilenameBase, err)
799794
}
800-
ps.Delete(p)
801795
}
802796
{
803-
_, historyFileName := filepath.Split(collation.historyPath)
804-
p := ps.AddNew(historyFileName, 1)
805-
defer ps.Delete(p)
806797
if err = collation.historyComp.Compress(); err != nil {
807798
return HistoryFiles{}, fmt.Errorf("compress %s .v history: %w", h.FilenameBase, err)
808799
}
809-
ps.Delete(p)
810800
}
811801
collation.Close()
812802

db/state/inverted_index.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"fmt"
2525
"math"
2626
"os"
27-
"path"
2827
"path/filepath"
2928
"reflect"
3029
"strings"
@@ -1101,13 +1100,10 @@ func (ii *InvertedIndex) buildFiles(ctx context.Context, step kv.Step, coll Inve
11011100
}
11021101

11031102
{
1104-
p := ps.AddNew(path.Base(coll.iiPath), 1)
11051103
if err = coll.writer.Compress(); err != nil {
1106-
ps.Delete(p)
11071104
return InvertedFiles{}, fmt.Errorf("compress %s: %w", ii.FilenameBase, err)
11081105
}
11091106
coll.Close()
1110-
ps.Delete(p)
11111107
}
11121108

11131109
if decomp, err = seg.NewDecompressor(coll.iiPath); err != nil {

db/state/merge.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,12 @@ func (iit *InvertedIndexRoTx) mergeFiles(ctx context.Context, files []*FilesItem
612612
}
613613

614614
write := iit.dataWriter(comp, false)
615-
p := ps.AddNew(path.Base(datPath), 1)
615+
616+
cnt := 0
617+
for _, item := range files {
618+
cnt += item.decompressor.Count()
619+
}
620+
p := ps.AddNew(path.Base(datPath), uint64(cnt))
616621
defer ps.Delete(p)
617622

618623
var cp CursorHeap
@@ -646,6 +651,7 @@ func (iit *InvertedIndexRoTx) mergeFiles(ctx context.Context, files []*FilesItem
646651
var lastKey, lastVal []byte
647652
preSeq, mergeSeq := &multiencseq.SequenceReader{}, &multiencseq.SequenceReader{}
648653
preIt, mergeIt := &multiencseq.SequenceIterator{}, &multiencseq.SequenceIterator{}
654+
i := uint64(0)
649655
for cp.Len() > 0 {
650656
lastKey = append(lastKey[:0], cp[0].key...)
651657
lastVal = append(lastVal[:0], cp[0].val...)
@@ -683,10 +689,14 @@ func (iit *InvertedIndexRoTx) mergeFiles(ctx context.Context, files []*FilesItem
683689
if ci1.kvReader.HasNext() {
684690
ci1.key, _ = ci1.kvReader.Next(ci1.key[:0])
685691
ci1.val, _ = ci1.kvReader.Next(ci1.val[:0])
692+
i += 2
686693
// fmt.Printf("heap next push %s [%d] %x\n", ii.KeysTable, ci1.endTxNum, ci1.key)
687694
heap.Push(&cp, ci1)
688695
}
689696
}
697+
if i%1024 == 0 {
698+
p.Processed.Store(i)
699+
}
690700
if keyBuf != nil {
691701
// fmt.Printf("pput %x->%x\n", keyBuf, valBuf)
692702
if _, err = write.Write(keyBuf); err != nil {
@@ -788,7 +798,12 @@ func (ht *HistoryRoTx) mergeFiles(ctx context.Context, indexFiles, historyFiles
788798
}
789799

790800
pagedWr := ht.dataWriter(comp)
791-
p := ps.AddNew(path.Base(datPath), 1)
801+
802+
cnt := 0
803+
for _, item := range indexFiles {
804+
cnt += item.decompressor.Count()
805+
}
806+
p := ps.AddNew(path.Base(datPath), uint64(cnt/2))
792807
defer ps.Delete(p)
793808

794809
var cp CursorHeap
@@ -865,6 +880,7 @@ func (ht *HistoryRoTx) mergeFiles(ctx context.Context, indexFiles, historyFiles
865880
}
866881

867882
// fmt.Printf("fput '%x'->%x\n", lastKey, ci1.val)
883+
p.Processed.Add(1)
868884
if ci1.kvReader.HasNext() {
869885
ci1.key, _ = ci1.kvReader.Next(ci1.key[:0])
870886
ci1.val, _ = ci1.kvReader.Next(ci1.val[:0])

db/state/proto_forkable.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package state
33
import (
44
"context"
55
"fmt"
6-
"path/filepath"
76

87
"golang.org/x/sync/errgroup"
98

@@ -133,8 +132,6 @@ func (a *ProtoForkable) BuildFile(ctx context.Context, from, to RootNum, db kv.R
133132
}
134133
writer.SetMetadata(mbytes)
135134

136-
p := ps.AddNew(filepath.Base(path), 1)
137-
defer ps.Delete(p)
138135
if err := writer.Flush(); err != nil {
139136
return nil, false, err
140137
}
@@ -143,7 +140,6 @@ func (a *ProtoForkable) BuildFile(ctx context.Context, from, to RootNum, db kv.R
143140
}
144141
writer.Close()
145142
sn.Close()
146-
ps.Delete(p)
147143
}
148144

149145
valuesDecomp, err := seg.NewDecompressorWithMetadata(path, cfg.HasMetadata)

0 commit comments

Comments
 (0)