Skip to content

Commit 077b119

Browse files
use TraceKey for Receipt integrity check (#18538)
ReceiptsNoDups check: 7:50m-> 1:38m
1 parent cae9b2d commit 077b119

File tree

4 files changed

+142
-32
lines changed

4 files changed

+142
-32
lines changed

db/integrity/rcache_no_duplicates.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@ func CheckRCacheNoDups(ctx context.Context, db kv.TemporalRoDB, blockReader serv
4646
return parallelChunkCheck(ctx, fromBlock, toBlock, db, blockReader, failFast, RCacheNoDupsRange)
4747
}
4848

49-
func RCacheNoDupsRange(ctx context.Context, fromBlock, toBlock uint64, tx kv.TemporalTx, blockReader services.FullBlockReader, failFast bool) (err error) {
49+
func RCacheNoDupsRange(ctx context.Context, fromBlock, toBlock uint64, db kv.TemporalRoDB, blockReader services.FullBlockReader, failFast bool) (err error) {
50+
tx, err := db.BeginTemporalRo(ctx)
51+
if err != nil {
52+
return err
53+
}
54+
defer tx.Rollback()
5055
txNumsReader := blockReader.TxnumReader(ctx)
5156
fromTxNum, err := txNumsReader.Min(tx, fromBlock)
5257
if err != nil {
@@ -126,7 +131,7 @@ func RCacheNoDupsRange(ctx context.Context, fromBlock, toBlock uint64, tx kv.Tem
126131
return nil
127132
}
128133

129-
type chunkFn func(ctx context.Context, fromBlock, toBlock uint64, tx kv.TemporalTx, blockReader services.FullBlockReader, failFast bool) error
134+
type chunkFn func(ctx context.Context, fromBlock, toBlock uint64, db kv.TemporalRoDB, blockReader services.FullBlockReader, failFast bool) error
130135

131136
func parallelChunkCheck(ctx context.Context, fromBlock, toBlock uint64, db kv.TemporalRoDB, blockReader services.FullBlockReader, failFast bool, fn chunkFn) (err error) {
132137
blockRange := toBlock - fromBlock + 1
@@ -167,13 +172,7 @@ func parallelChunkCheck(ctx context.Context, fromBlock, toBlock uint64, db kv.Te
167172
chunkEnd := end // Capture loop variable
168173

169174
g.Go(func() error {
170-
tx, err := db.BeginTemporalRo(ctx)
171-
if err != nil {
172-
return err
173-
}
174-
defer tx.Rollback()
175-
176-
chunkErr := fn(ctx, chunkStart, chunkEnd, tx, blockReader, failFast)
175+
chunkErr := fn(ctx, chunkStart, chunkEnd, db, blockReader, failFast)
177176
if chunkErr != nil {
178177
return chunkErr
179178
}

db/integrity/receipts_no_duplicates.go

Lines changed: 125 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package integrity
22

33
import (
44
"context"
5+
"encoding/binary"
56
"fmt"
67

78
"github.com/erigontech/erigon/common/log/v3"
@@ -38,7 +39,12 @@ func CheckReceiptsNoDups(ctx context.Context, db kv.TemporalRoDB, blockReader se
3839
return parallelChunkCheck(ctx, fromBlock, toBlock, db, blockReader, failFast, ReceiptsNoDupsRange)
3940
}
4041

41-
func ReceiptsNoDupsRange(ctx context.Context, fromBlock, toBlock uint64, tx kv.TemporalTx, blockReader services.FullBlockReader, failFast bool) (err error) {
42+
func checkCumGas(ctx context.Context, fromBlock, toBlock uint64, db kv.TemporalRoDB, blockReader services.FullBlockReader, failFast bool) (err error) {
43+
tx, err := db.BeginTemporalRo(ctx)
44+
if err != nil {
45+
return err
46+
}
47+
defer tx.Rollback()
4248
txNumsReader := blockReader.TxnumReader(ctx)
4349
fromTxNum, err := txNumsReader.Min(tx, fromBlock)
4450
if err != nil {
@@ -54,49 +60,134 @@ func ReceiptsNoDupsRange(ctx context.Context, fromBlock, toBlock uint64, tx kv.T
5460
return err
5561
}
5662

57-
prevCumUsedGas := -1
58-
prevLogIdxAfterTx := uint32(0)
63+
prevCumGasUsed := -1
5964
blockNum := fromBlock
6065
var _min, _max uint64
6166
_min = fromTxNum
6267
_max, _ = txNumsReader.Max(tx, fromBlock)
63-
for txNum := fromTxNum; txNum <= toTxNum; txNum++ {
64-
cumUsedGas, _, logIdxAfterTx, err := rawtemporaldb.ReceiptAsOf(tx, txNum+1)
68+
69+
cumGasTx, err := db.BeginTemporalRo(ctx)
70+
if err != nil {
71+
return err
72+
}
73+
defer cumGasTx.Rollback()
74+
cumGasIt, err := cumGasTx.Debug().TraceKey(kv.ReceiptDomain, rawtemporaldb.CumulativeGasUsedInBlockKey, fromTxNum, toTxNum+1)
75+
if err != nil {
76+
return err
77+
}
78+
defer cumGasIt.Close()
79+
80+
for cumGasIt.HasNext() {
81+
txNum, v, err := cumGasIt.Next()
6582
if err != nil {
6683
return err
6784
}
85+
cumGasUsed := uvarint(v)
86+
blockChanged := false
6887

69-
blockChanged := txNum == _min
88+
for txNum >= _max {
89+
blockNum++
90+
_min = _max + 1
91+
_max, _ = txNumsReader.Max(tx, blockNum)
92+
blockChanged = true
93+
}
94+
//fmt.Println("txNum:", txNum, "cumGasUsed:", cumGasUsed)
7095
if blockChanged {
71-
prevCumUsedGas = 0
72-
prevLogIdxAfterTx = 0
96+
prevCumGasUsed = 0
7397
}
7498

75-
strongMonotonicCumGasUsed := int(cumUsedGas) > prevCumUsedGas
76-
if !strongMonotonicCumGasUsed && txNum != _min && txNum != _max { // system tx can be skipped
77-
err := fmt.Errorf("CheckReceiptsNoDups: non-monotonic cumGasUsed at txnum: %d, block: %d(%d-%d), cumGasUsed=%d, prevCumGasUsed=%d", txNum, blockNum, _min, _max, cumUsedGas, prevCumUsedGas)
99+
strongMonotonicCumGasUsed := int(cumGasUsed) > prevCumGasUsed
100+
if !strongMonotonicCumGasUsed && !blockChanged { // system tx can be skipped
101+
err := fmt.Errorf("CheckReceiptsNoDups: non-monotonic cumGasUsed at txnum: %d, block: %d(%d-%d), cumGasUsed=%d, prevCumGasUsed=%d", txNum, blockNum, _min, _max, cumGasUsed, prevCumGasUsed)
78102
if failFast {
79103
return err
80104
}
81105
log.Error(err.Error())
82106
}
83107

108+
if !blockChanged {
109+
prevCumGasUsed = int(cumGasUsed)
110+
}
111+
112+
if txNum%1000 == 0 {
113+
select {
114+
case <-ctx.Done():
115+
return ctx.Err()
116+
default:
117+
}
118+
}
119+
}
120+
return nil
121+
}
122+
123+
func checkLogIdx(ctx context.Context, fromBlock, toBlock uint64, db kv.TemporalRoDB, blockReader services.FullBlockReader, failFast bool) (err error) {
124+
tx, err := db.BeginTemporalRo(ctx)
125+
if err != nil {
126+
return err
127+
}
128+
defer tx.Rollback()
129+
txNumsReader := blockReader.TxnumReader(ctx)
130+
fromTxNum, err := txNumsReader.Min(tx, fromBlock)
131+
if err != nil {
132+
return err
133+
}
134+
135+
if toBlock > 0 {
136+
toBlock-- // [fromBlock,toBlock)
137+
}
138+
139+
toTxNum, err := txNumsReader.Max(tx, toBlock)
140+
if err != nil {
141+
return err
142+
}
143+
144+
prevLogIdxAfterTx := uint32(0)
145+
blockNum := fromBlock
146+
var _min, _max uint64
147+
_min = fromTxNum
148+
_max, _ = txNumsReader.Max(tx, fromBlock)
149+
150+
logIdxAfterTxTx, err := db.BeginTemporalRo(ctx)
151+
if err != nil {
152+
return err
153+
}
154+
defer logIdxAfterTxTx.Rollback()
155+
156+
logIdxAfterTxIt, err := logIdxAfterTxTx.Debug().TraceKey(kv.ReceiptDomain, rawtemporaldb.LogIndexAfterTxKey, fromTxNum, toTxNum+1)
157+
if err != nil {
158+
return err
159+
}
160+
defer logIdxAfterTxIt.Close()
161+
for logIdxAfterTxIt.HasNext() {
162+
txNum, v, err := logIdxAfterTxIt.Next()
163+
if err != nil {
164+
return err
165+
}
166+
logIdxAfterTx := uint32(uvarint(v))
167+
blockChanged := false
168+
169+
for txNum >= _max {
170+
blockNum++
171+
_min, _ = txNumsReader.Min(tx, blockNum)
172+
_max, _ = txNumsReader.Max(tx, blockNum)
173+
blockChanged = true
174+
}
175+
176+
if blockChanged {
177+
prevLogIdxAfterTx = 0
178+
}
179+
84180
monotonicLogIdx := logIdxAfterTx >= prevLogIdxAfterTx
85-
if !monotonicLogIdx && txNum != _min && txNum != _max {
181+
if !monotonicLogIdx && !blockChanged {
86182
err := fmt.Errorf("CheckReceiptsNoDups: non-monotonic logIndex at txnum: %d, block: %d(%d-%d), logIdxAfterTx=%d, prevLogIdxAfterTx=%d", txNum, blockNum, _min, _max, logIdxAfterTx, prevLogIdxAfterTx)
87183
if failFast {
88184
return err
89185
}
90186
log.Error(err.Error())
91187
}
92188

93-
prevCumUsedGas = int(cumUsedGas)
94-
prevLogIdxAfterTx = logIdxAfterTx
95-
96-
if txNum == _max {
97-
blockNum++
98-
_min = _max + 1
99-
_max, _ = txNumsReader.Max(tx, blockNum)
189+
if !blockChanged {
190+
prevLogIdxAfterTx = logIdxAfterTx
100191
}
101192

102193
if txNum%1000 == 0 {
@@ -110,6 +201,16 @@ func ReceiptsNoDupsRange(ctx context.Context, fromBlock, toBlock uint64, tx kv.T
110201
return nil
111202
}
112203

204+
func ReceiptsNoDupsRange(ctx context.Context, fromBlock, toBlock uint64, db kv.TemporalRoDB, blockReader services.FullBlockReader, failFast bool) (err error) {
205+
if err := checkCumGas(ctx, fromBlock, toBlock, db, blockReader, failFast); err != nil {
206+
return err
207+
}
208+
if err := checkLogIdx(ctx, fromBlock, toBlock, db, blockReader, failFast); err != nil {
209+
return err
210+
}
211+
return nil
212+
}
213+
113214
func ValidateDomainProgress(db kv.TemporalRoDB, domain kv.Domain, txNumsReader rawdbv3.TxNumsReader) (err error) {
114215
tx, err := db.BeginTemporalRo(context.Background())
115216
if err != nil {
@@ -145,3 +246,8 @@ func ValidateDomainProgress(db kv.TemporalRoDB, domain kv.Domain, txNumsReader r
145246
// }
146247
return nil
147248
}
249+
250+
func uvarint(in []byte) (res uint64) {
251+
res, _ = binary.Uvarint(in)
252+
return res
253+
}

db/state/history_stream.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -749,7 +749,12 @@ func (ht *HistoryTraceKeyFiles) advance() error {
749749
continue
750750
}
751751
getter.Reset(offset)
752-
getter.Next(ht.efbuf[:0]) // skip key
752+
gkey, _ := getter.Next(ht.efbuf[:0]) // skip key
753+
if !bytes.Equal(gkey, ht.key) {
754+
ht.logger.Debug("weird thing - key mismatch for %s in file %s", hexutil.Encode(ht.key), item.src.decompressor.FileName())
755+
moveToNextFileFn()
756+
continue
757+
}
753758
ht.efbuf, _ = getter.Next(ht.efbuf[:0])
754759
currSeq := multiencseq.ReadMultiEncSeq(item.startTxNum, ht.efbuf)
755760
ht.seqItr = currSeq.Iterator(int(ht.fromTxNum))

execution/stagedsync/stage_custom_trace.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func customTraceBatchProduce(ctx context.Context, produce Produce, cfg *exec.Exe
246246

247247
//asserts
248248
if produce.ReceiptDomain {
249-
if err = AssertReceipts(ctx, cfg, tx, fromBlock, toBlock); err != nil {
249+
if err = AssertReceipts(ctx, cfg, db, fromBlock, toBlock); err != nil {
250250
return err
251251
}
252252
}
@@ -287,14 +287,14 @@ func customTraceBatchProduce(ctx context.Context, produce Produce, cfg *exec.Exe
287287
return nil
288288
}
289289

290-
func AssertReceipts(ctx context.Context, cfg *exec.ExecArgs, tx kv.TemporalTx, fromBlock, toBlock uint64) (err error) {
290+
func AssertReceipts(ctx context.Context, cfg *exec.ExecArgs, db kv.TemporalRoDB, fromBlock, toBlock uint64) (err error) {
291291
if !dbg.AssertEnabled {
292292
return
293293
}
294294
if cfg.ChainConfig.Bor != nil { //TODO: enable me
295295
return nil
296296
}
297-
return integrity.ReceiptsNoDupsRange(ctx, fromBlock, toBlock, tx, cfg.BlockReader, true)
297+
return integrity.ReceiptsNoDupsRange(ctx, fromBlock, toBlock, db, cfg.BlockReader, true)
298298
}
299299

300300
func customTraceBatch(ctx context.Context, produce Produce, cfg *exec.ExecArgs, tx kv.TemporalRwTx, doms *execctx.SharedDomains, fromBlock, toBlock uint64, logPrefix string, logger log.Logger) error {

0 commit comments

Comments
 (0)