1717package integrity
1818
1919import (
20+ "bytes"
2021 "context"
2122 "encoding/hex"
2223 "errors"
2324 "fmt"
2425 "path/filepath"
2526 "strings"
26- "sync/atomic"
2727 "time"
2828
2929 "golang.org/x/sync/errgroup"
3030
31- "github.com/erigontech/erigon/common/dbg "
31+ "github.com/erigontech/erigon/common/estimate "
3232 "github.com/erigontech/erigon/common/log/v3"
3333 "github.com/erigontech/erigon/db/kv"
3434 "github.com/erigontech/erigon/db/recsplit"
@@ -41,22 +41,14 @@ import (
4141// ErrIntegrity is useful to differentiate integrity errors from program errors.
4242var ErrIntegrity = errors .New ("integrity error" )
4343
44+ // CheckKvis checks all kvi index files for a domain sequentially (one file at a time),
45+ // parallelizing the lookup work inside each file.
4446func CheckKvis (ctx context.Context , tx kv.TemporalTx , domain kv.Domain , failFast bool , logger log.Logger ) error {
4547 start := time .Now ()
4648 aggTx := state .AggTx (tx )
4749 files := aggTx .Files (domain )
4850 kvCompression := statecfg .Schema .GetDomainCfg (domain ).Compression
49- var eg * errgroup.Group
50- if failFast {
51- // if 1 goroutine fails, fail others
52- eg , ctx = errgroup .WithContext (ctx )
53- } else {
54- eg = & errgroup.Group {}
55- }
56- if dbg .EnvBool ("CHECK_KVIS_SEQUENTIAL" , false ) {
57- eg .SetLimit (1 )
58- }
59- var keyCount atomic.Uint64
51+ var keyCount uint64
6052 for _ , file := range files {
6153 if ! strings .HasSuffix (file .Fullpath (), ".kv" ) {
6254 continue
@@ -76,26 +68,24 @@ func CheckKvis(ctx context.Context, tx kv.TemporalTx, domain kv.Domain, failFast
7668 if ! ok {
7769 return fmt .Errorf ("kvi not found for %s" , kvPath )
7870 }
79- eg .Go (func () error {
80- keys , err := CheckKvi (ctx , kviPath , kvPath , kvCompression , failFast , logger )
81- if err == nil {
82- keyCount .Add (keys )
83- return nil
84- }
85- if ! failFast {
86- logger .Warn (err .Error ())
71+ keys , err := CheckKvi (ctx , kviPath , kvPath , kvCompression , failFast , logger )
72+ keyCount += keys
73+ if err != nil {
74+ if failFast {
75+ return err
8776 }
88- return err
89- })
90- }
91- err := eg .Wait ()
92- if err != nil {
93- return err
77+ logger .Warn (err .Error ())
78+ }
9479 }
95- logger .Info ("checked kvi files in" , "dur" , time .Since (start ), "files" , len (files ), "keys" , keyCount . Load () )
80+ logger .Info ("checked kvi files in" , "dur" , time .Since (start ), "files" , len (files ), "keys" , keyCount )
9681 return nil
9782}
9883
84+ type kviWorkItem struct {
85+ key []byte
86+ offset uint64
87+ }
88+
9989func CheckKvi (ctx context.Context , kviPath string , kvPath string , kvCompression seg.FileCompression , failFast bool , logger log.Logger ) (uint64 , error ) {
10090 kviFileName := filepath .Base (kviPath )
10191 kvFileName := filepath .Base (kvPath )
@@ -106,74 +96,108 @@ func CheckKvi(ctx context.Context, kviPath string, kvPath string, kvCompression
10696 return 0 , err
10797 }
10898 defer kvi .Close ()
109- kviReader := kvi .GetReaderFromPool ()
11099 kvDecompressor , err := seg .NewDecompressor (kvPath )
111100 if err != nil {
112101 return 0 , err
113102 }
114103 defer kvDecompressor .Close ()
115104 kvReader := seg .NewReader (kvDecompressor .MakeGetter (), kvCompression )
116- var integrityErr error
105+
106+ var firstErr error
117107 if kvKeyCount := uint64 (kvReader .Count ()) / 2 ; kvKeyCount != kvi .KeyCount () {
118108 err = fmt .Errorf ("kv key count %d != kvi key count %d in %s" , kvKeyCount , kvi .KeyCount (), kviFileName )
119109 if failFast {
120110 return 0 , err
121111 }
122112 logger .Warn (err .Error ())
123- integrityErr = fmt .Errorf ("%w: %w" , ErrIntegrity , err )
113+ firstErr = fmt .Errorf ("%w: %w" , ErrIntegrity , err )
114+ }
115+
116+ trace := logger .Enabled (ctx , log .LvlTrace )
117+ checkOne := func (kviReader * recsplit.IndexReader , work kviWorkItem ) error {
118+ if trace {
119+ logger .Trace ("[integrity] checking kvi for" , "key" , hex .EncodeToString (work .key ), "offset" , work .offset , "kvi" , kviFileName )
120+ }
121+ kviOffset , found := kviReader .Lookup (work .key )
122+ if ! found {
123+ return fmt .Errorf ("%w: key %x not found in %s" , ErrIntegrity , work .key , kviFileName )
124+ }
125+ if kviOffset != work .offset {
126+ return fmt .Errorf ("%w: key %x offset mismatch %d != %d in %s" , ErrIntegrity , work .key , work .offset , kviOffset , kviFileName )
127+ }
128+ return nil
124129 }
125- logTicker := time .NewTicker (30 * time .Second )
126- defer logTicker .Stop ()
127- var keyBuf []byte
128- var keyOffset , keyCount uint64
129- var atValue bool
130- for i := 0 ; kvReader .HasNext (); i ++ {
131- if i % 1024 == 0 {
130+
131+ var keyCount uint64
132+ eg , ctx := errgroup .WithContext (ctx )
133+ numWorkers := estimate .AlmostAllCPUs ()
134+ workCh := make (chan kviWorkItem , numWorkers * 4 )
135+
136+ for range numWorkers {
137+ eg .Go (func () error {
138+ kviReader := kvi .GetReaderFromPool ()
139+ defer kviReader .Close ()
140+ for {
141+ select {
142+ case <- ctx .Done ():
143+ return ctx .Err ()
144+ case work , ok := <- workCh :
145+ if ! ok {
146+ return nil
147+ }
148+ if err := checkOne (kviReader , work ); err != nil {
149+ if ! failFast {
150+ logger .Warn (err .Error ())
151+ }
152+ return err
153+ }
154+ }
155+ }
156+ })
157+ }
158+
159+ // Producer: scan kv file sequentially, emit (key, offset) pairs to workers.
160+ eg .Go (func () error {
161+ defer close (workCh )
162+ logTicker := time .NewTicker (30 * time .Second )
163+ defer logTicker .Stop ()
164+ var keyBuf []byte
165+ var keyOffset uint64
166+ var atValue bool
167+ for kvReader .HasNext () {
168+ if atValue {
169+ keyOffset , _ = kvReader .Skip ()
170+ atValue = false
171+ continue
172+ }
173+ keyBuf , _ = kvReader .Next (keyBuf [:0 ])
174+ keyCount ++
175+ atValue = true
176+
132177 select {
133178 case <- ctx .Done ():
134- return 0 , ctx .Err ()
179+ return nil
180+ case workCh <- kviWorkItem {key : bytes .Clone (keyBuf ), offset : keyOffset }:
181+ }
182+
183+ select {
135184 case <- logTicker .C :
136185 at := fmt .Sprintf ("%d/%d" , keyCount , kvi .KeyCount ())
137186 percent := fmt .Sprintf ("%.1f%%" , float64 (keyCount )/ float64 (kvi .KeyCount ())* 100 )
138187 rate := float64 (keyCount ) / time .Since (start ).Seconds ()
139188 eta := time .Duration (float64 (kvi .KeyCount ()- keyCount )/ rate ) * time .Second
140- logger .Info ("[integrity] checking kvi progress" , "at" , at , "p" , percent , "k/s" , rate , "kvi " , kviFileName , "eta " , eta )
141- default : // proceed
189+ logger .Info ("[integrity] kvi progress" , "at" , at , "p" , percent , "k/s" , rate , "eta " , eta , "kvi " , kviFileName )
190+ default :
142191 }
143192 }
193+ return nil
194+ })
144195
145- if atValue {
146- keyOffset , _ = kvReader .Skip ()
147- atValue = false
148- continue
149- }
150- keyBuf , _ = kvReader .Next (keyBuf [:0 ])
151- if logger .Enabled (ctx , log .LvlTrace ) {
152- logger .Trace ("[integrity] checking kvi for" , "key" , hex .EncodeToString (keyBuf ), "kvi" , kviFileName , "offset" , keyOffset )
153- }
154- keyCount ++
155- atValue = true
156- kviOffset , ok := kviReader .Lookup (keyBuf )
157- if ! ok {
158- err = fmt .Errorf ("key %x not found in %s" , keyBuf , kviFileName )
159- if failFast {
160- return 0 , err
161- }
162- logger .Warn (err .Error ())
163- integrityErr = fmt .Errorf ("%w: %w" , ErrIntegrity , err )
164- continue
165- }
166- if kviOffset != keyOffset {
167- err = fmt .Errorf ("key %x offset mismatch %d != %d in %s" , keyBuf , keyOffset , kviOffset , kviFileName )
168- if failFast {
169- return 0 , err
170- }
171- logger .Warn (err .Error ())
172- integrityErr = fmt .Errorf ("%w: %w" , ErrIntegrity , err )
173- }
196+ if err := eg .Wait (); err != nil {
197+ return keyCount , err
174198 }
175199 duration := time .Since (start )
176200 rate := float64 (keyCount ) / duration .Seconds ()
177201 logger .Info ("checked kvi in" , "dur" , duration , "keys" , keyCount , "k/s" , rate , "kvi" , kviFileName , "kv" , kvFileName )
178- return keyCount , integrityErr
202+ return keyCount , firstErr
179203}
0 commit comments