Skip to content

Commit d23ad1a

Browse files
committed
refactor(executionlayer/reorg): separate reorg process + tests
* better cache * review
1 parent 5a9ffb2 commit d23ad1a

File tree

28 files changed

+1358
-520
lines changed

28 files changed

+1358
-520
lines changed

backend/cmd/eth1indexer/main.go

Lines changed: 52 additions & 190 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,13 @@
11
package eth1indexer
22

33
import (
4-
"bytes"
54
"context"
65
"encoding/json"
76
"flag"
87
"fmt"
9-
"math/big"
108
"net/http"
119
"os"
1210
"strconv"
13-
"sync/atomic"
1411
"time"
1512

1613
"github.com/gobitfly/beaconchain/pkg/commons/db"
@@ -29,8 +26,8 @@ import (
2926

3027
"github.com/coocood/freecache"
3128
"github.com/ethereum/go-ethereum/common"
29+
"github.com/go-redis/redis/v8"
3230
_ "github.com/jackc/pgx/v5/stdlib"
33-
"golang.org/x/sync/errgroup"
3431

3532
//nolint:gosec
3633
_ "net/http/pprof"
@@ -39,24 +36,24 @@ import (
3936
func Run() {
4037
fs := flag.NewFlagSet("fs", flag.ExitOnError)
4138
erigonEndpoint := fs.String("erigon", "", "Erigon archive node enpoint")
42-
block := fs.Int64("block", 0, "Index a specific block")
39+
block := fs.Uint64("block", 0, "Index a specific block")
4340

44-
reorgDepth := fs.Int("reorg.depth", 20, "Lookback to check and handle chain reorgs")
41+
reorgDepth := fs.Uint64("reorg.depth", 20, "Lookback to check and handle chain reorgs")
4542

46-
concurrencyBlocks := fs.Int64("blocks.concurrency", 30, "Concurrency to use when indexing blocks from erigon")
47-
startBlocks := fs.Int64("blocks.start", 0, "Block to start indexing")
48-
endBlocks := fs.Int64("blocks.end", 0, "Block to finish indexing")
49-
bulkBlocks := fs.Int64("blocks.bulk", 8000, "Maximum number of blocks to be processed before saving")
50-
offsetBlocks := fs.Int64("blocks.offset", 100, "Blocks offset")
43+
concurrencyBlocks := fs.Uint64("blocks.concurrency", 30, "Concurrency to use when indexing blocks from erigon")
44+
startBlocks := fs.Uint64("blocks.start", 0, "Block to start indexing")
45+
endBlocks := fs.Uint64("blocks.end", 0, "Block to finish indexing")
46+
bulkBlocks := fs.Uint64("blocks.bulk", 8000, "Maximum number of blocks to be processed before saving")
47+
offsetBlocks := fs.Uint64("blocks.offset", 100, "Blocks offset")
5148
checkBlocksGaps := fs.Bool("blocks.gaps", false, "Check for gaps in the blocks table")
5249
checkBlocksGapsLookback := fs.Int("blocks.gaps.lookback", 1000000, "Lookback for gaps check of the blocks table")
5350
traceMode := fs.String("blocks.tracemode", "parity/geth", "Trace mode to use, can bei either 'parity', 'geth' or 'parity/geth' for both")
5451

55-
concurrencyData := fs.Int64("data.concurrency", 30, "Concurrency to use when indexing data from bigtable")
56-
startData := fs.Int64("data.start", 0, "Block to start indexing")
57-
endData := fs.Int64("data.end", 0, "Block to finish indexing")
58-
bulkData := fs.Int64("data.bulk", 8000, "Maximum number of blocks to be processed before saving")
59-
offsetData := fs.Int64("data.offset", 1000, "Data offset")
52+
concurrencyData := fs.Uint64("data.concurrency", 30, "Concurrency to use when indexing data from bigtable")
53+
startData := fs.Uint64("data.start", 0, "Block to start indexing")
54+
endData := fs.Uint64("data.end", 0, "Block to finish indexing")
55+
bulkData := fs.Uint64("data.bulk", 8000, "Maximum number of blocks to be processed before saving")
56+
offsetData := fs.Uint64("data.offset", 1000, "Data offset")
6057
checkDataGaps := fs.Bool("data.gaps", false, "Check for gaps in the data table")
6158
checkDataGapsLookback := fs.Int("data.gaps.lookback", 1000000, "Lookback for gaps check of the blocks table")
6259

@@ -124,6 +121,11 @@ func Run() {
124121
defer db.ReaderDb.Close()
125122
defer db.WriterDb.Close()
126123

124+
redisClient := redis.NewClient(&redis.Options{
125+
Addr: utils.Config.RedisCacheEndpoint,
126+
ReadTimeout: time.Second * 20,
127+
})
128+
127129
if erigonEndpoint == nil || *erigonEndpoint == "" {
128130
if utils.Config.Eth1ErigonEndpoint == "" {
129131
log.Fatal(nil, "no erigon node url provided", 0)
@@ -162,7 +164,7 @@ func Run() {
162164
}
163165

164166
cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit
165-
store := db2.NewStoreV1FromBigtable(bigtable, cache)
167+
store := db2.NewStoreV1FromBigtable(bigtable, database.FreeCache{Cache: cache})
166168

167169
batcherConfig := evm.BatcherConfig{
168170
Limit: utils.Config.Indexer.BatchLimit,
@@ -199,21 +201,21 @@ func Run() {
199201
go ImportEnsUpdatesLoop(bt, client, *ensBatchSize)
200202
}
201203

202-
indexer := executionlayer.NewIndexer(store, executionlayer.AllTransformers...)
204+
lastBlockStore := db2.NewCachedLastBlocks(database.Redis{Client: redisClient}, store)
205+
indexer := executionlayer.NewIndexer(store, lastBlockStore, executionlayer.AllTransformers...)
203206
balanceUpdater := executionlayer.NewBalanceUpdater(chainId, store, store, batcher)
207+
reorgWatcher := executionlayer.NewReorgWatcher(client.GetNativeClient(), store, *reorgDepth, chainId, lastBlockStore)
204208

205209
if *enableFullBalanceUpdater {
206210
ProcessBalanceUpdates(balanceUpdater, *balanceUpdaterBatchSize, -1)
207211
return
208212
}
209213

210214
if *block != 0 {
211-
err = IndexFromNode(bt, client, *block, *block, *concurrencyBlocks, *traceMode)
212-
if err != nil {
215+
if err := indexer.IndexNode(chainId, client, *block, *block, *concurrencyBlocks, *traceMode); err != nil {
213216
log.Fatal(err, "error indexing from node", 0, map[string]interface{}{"block": *block, "concurrency": *concurrencyBlocks})
214217
}
215-
err = bt.IndexEventsWithIndexer(*block, *block, indexer, *concurrencyData)
216-
if err != nil {
218+
if err := indexer.IndexEvents(chainId, *block, *block, *concurrencyData); err != nil {
217219
log.Fatal(err, "error indexing from bigtable", 0)
218220
}
219221
cache.Clear()
@@ -240,26 +242,23 @@ func Run() {
240242
}
241243

242244
if *endBlocks != 0 && *startBlocks < *endBlocks {
243-
err = IndexFromNode(bt, client, *startBlocks, *endBlocks, *concurrencyBlocks, *traceMode)
244-
if err != nil {
245+
if err = indexer.IndexNode(chainId, client, *startBlocks, *endBlocks, *concurrencyBlocks, *traceMode); err != nil {
245246
log.Fatal(err, "error indexing from node", 0, map[string]interface{}{"start": *startBlocks, "end": *endBlocks, "concurrency": *concurrencyBlocks})
246247
}
247248
return
248249
}
249250

250251
if *endData != 0 && *startData < *endData {
251-
err = bt.IndexEventsWithIndexer(*startData, *endData, indexer, *concurrencyData)
252-
if err != nil {
252+
if err := indexer.IndexEvents(chainId, *startData, *endData, *concurrencyData); err != nil {
253253
log.Fatal(err, "error indexing from bigtable", 0)
254254
}
255255
cache.Clear()
256256
return
257257
}
258258

259-
lastSuccessulBlockIndexingTs := time.Now()
259+
lastSuccessfulBlockIndexingTs := time.Now()
260260
for ; ; time.Sleep(time.Second * 14) {
261-
err := HandleChainReorgs(bt, client, *reorgDepth)
262-
if err != nil {
261+
if err := reorgWatcher.LookForReorg(); err != nil {
263262
log.Error(err, "error handling chain reorg", 0)
264263
continue
265264
}
@@ -270,13 +269,13 @@ func Run() {
270269
continue
271270
}
272271

273-
lastBlockFromBlocksTable, err := bt.GetLastBlockInBlocksTable()
272+
lastBlockFromBlocksTable, err := lastBlockStore.GetInBlocksTable(chainId)
274273
if err != nil {
275274
log.Error(err, "error retrieving last blocks from blocks table", 0)
276275
continue
277276
}
278277

279-
lastBlockFromDataTable, err := bt.GetLastBlockInDataTable()
278+
lastBlockFromDataTable, err := lastBlockStore.GetInDataTable(chainId)
280279
if err != nil {
281280
log.Error(err, "error retrieving last blocks from data table", 0)
282281
continue
@@ -290,40 +289,40 @@ func Run() {
290289

291290
continueAfterError := false
292291
if lastBlockFromNode > 0 {
293-
if lastBlockFromBlocksTable < int(lastBlockFromNode) {
292+
if lastBlockFromBlocksTable < lastBlockFromNode {
294293
log.Infof("missing blocks %v to %v in blocks table, indexing ...", lastBlockFromBlocksTable+1, lastBlockFromNode)
295294

296-
startBlock := int64(lastBlockFromBlocksTable+1) - *offsetBlocks
297-
if startBlock < 0 {
298-
startBlock = 0
295+
startBlock := uint64(0)
296+
if int64(lastBlockFromDataTable+1-*offsetData) > 0 {
297+
startBlock = lastBlockFromBlocksTable + 1 - *offsetBlocks
299298
}
300299

301-
if *bulkBlocks <= 0 || *bulkBlocks > int64(lastBlockFromNode)-startBlock+1 {
302-
*bulkBlocks = int64(lastBlockFromNode) - startBlock + 1
300+
if *bulkBlocks <= 0 || *bulkBlocks > lastBlockFromNode-startBlock+1 {
301+
*bulkBlocks = lastBlockFromNode - startBlock + 1
303302
}
304303

305-
for startBlock <= int64(lastBlockFromNode) && !continueAfterError {
304+
for startBlock <= lastBlockFromNode && !continueAfterError {
306305
endBlock := startBlock + *bulkBlocks - 1
307-
if endBlock > int64(lastBlockFromNode) {
308-
endBlock = int64(lastBlockFromNode)
306+
if endBlock > lastBlockFromNode {
307+
endBlock = lastBlockFromNode
309308
}
310309

311-
err = IndexFromNode(bt, client, startBlock, endBlock, *concurrencyBlocks, *traceMode)
310+
err = indexer.IndexNode(chainId, client, startBlock, endBlock, *concurrencyBlocks, *traceMode)
312311
if err != nil {
313312
errMsg := "error indexing from node"
314313
errFields := map[string]interface{}{
315314
"start": startBlock,
316315
"end": endBlock,
317316
"concurrency": *concurrencyBlocks}
318-
if time.Since(lastSuccessulBlockIndexingTs) > time.Minute*30 {
317+
if time.Since(lastSuccessfulBlockIndexingTs) > time.Minute*30 {
319318
log.Fatal(err, errMsg, 0, errFields)
320319
} else {
321320
log.Error(err, errMsg, 0, errFields)
322321
}
323322
continueAfterError = true
324323
continue
325324
} else {
326-
lastSuccessulBlockIndexingTs = time.Now()
325+
lastSuccessfulBlockIndexingTs = time.Now()
327326
}
328327

329328
startBlock = endBlock + 1
@@ -333,26 +332,25 @@ func Run() {
333332
}
334333
}
335334

336-
if lastBlockFromDataTable < int(lastBlockFromNode) {
335+
if lastBlockFromDataTable < lastBlockFromNode {
337336
log.Infof("missing blocks %v to %v in data table, indexing ...", lastBlockFromDataTable+1, lastBlockFromNode)
338337

339-
startBlock := int64(lastBlockFromDataTable+1) - *offsetData
340-
if startBlock < 0 {
341-
startBlock = 0
338+
startBlock := uint64(0)
339+
if int64(lastBlockFromDataTable+1-*offsetData) > 0 {
340+
startBlock = lastBlockFromDataTable + 1 - *offsetData
342341
}
343342

344-
if *bulkData <= 0 || *bulkData > int64(lastBlockFromNode)-startBlock+1 {
345-
*bulkData = int64(lastBlockFromNode) - startBlock + 1
343+
if *bulkData <= 0 || *bulkData > lastBlockFromNode-startBlock+1 {
344+
*bulkData = lastBlockFromNode - startBlock + 1
346345
}
347346

348-
for startBlock <= int64(lastBlockFromNode) && !continueAfterError {
347+
for startBlock <= lastBlockFromNode && !continueAfterError {
349348
endBlock := startBlock + *bulkData - 1
350-
if endBlock > int64(lastBlockFromNode) {
351-
endBlock = int64(lastBlockFromNode)
349+
if endBlock > lastBlockFromNode {
350+
endBlock = lastBlockFromNode
352351
}
353352

354-
err = bt.IndexEventsWithIndexer(startBlock, endBlock, indexer, *concurrencyData)
355-
if err != nil {
353+
if err := indexer.IndexEvents(chainId, startBlock, endBlock, *concurrencyBlocks); err != nil {
356354
log.Error(err, "error indexing from bigtable", 0, map[string]interface{}{"start": startBlock, "end": endBlock, "concurrency": *concurrencyData})
357355
cache.Clear()
358356
continueAfterError = true
@@ -404,73 +402,6 @@ func readTokenListFile(path string) (erc20.ERC20TokenList, error) {
404402
return tokenList, nil
405403
}
406404

407-
func HandleChainReorgs(bt *db.Bigtable, client *rpc.ErigonClient, depth int) error {
408-
ctx := context.Background()
409-
// get latest block from the node
410-
latestNodeBlock, err := client.GetNativeClient().BlockByNumber(ctx, nil)
411-
if err != nil {
412-
return err
413-
}
414-
latestNodeBlockNumber := latestNodeBlock.NumberU64()
415-
416-
// for each block check if block node hash and block db hash match
417-
if depth > int(latestNodeBlockNumber) {
418-
depth = int(latestNodeBlockNumber)
419-
}
420-
for i := latestNodeBlockNumber - uint64(depth); i <= latestNodeBlockNumber; i++ {
421-
nodeBlock, err := client.GetNativeClient().HeaderByNumber(ctx, big.NewInt(int64(i)))
422-
if err != nil {
423-
return err
424-
}
425-
426-
dbBlock, err := bt.GetBlockFromBlocksTable(i)
427-
if err != nil {
428-
if err == db.ErrBlockNotFound { // exit if we hit a block that is not yet in the db
429-
return nil
430-
}
431-
return err
432-
}
433-
434-
if !bytes.Equal(nodeBlock.Hash().Bytes(), dbBlock.Hash) {
435-
log.Warnf("found incosistency at height %v, node block hash: %x, db block hash: %x", i, nodeBlock.Hash().Bytes(), dbBlock.Hash)
436-
437-
// first we set the cached marker of the last block in the blocks/data table to the block prior to the forked one
438-
if i > 0 {
439-
previousBlock := i - 1
440-
err := bt.SetLastBlockInBlocksTable(int64(previousBlock))
441-
if err != nil {
442-
return fmt.Errorf("error setting last block [%v] in blocks table: %w", previousBlock, err)
443-
}
444-
err = bt.SetLastBlockInDataTable(int64(previousBlock))
445-
if err != nil {
446-
return fmt.Errorf("error setting last block [%v] in data table: %w", previousBlock, err)
447-
}
448-
// now we can proceed to delete all blocks including and after the forked block
449-
}
450-
// delete all blocks starting from the fork block up to the latest block in the db
451-
for j := i; j <= latestNodeBlockNumber; j++ {
452-
dbBlock, err := bt.GetBlockFromBlocksTable(j)
453-
if err != nil {
454-
if err == db.ErrBlockNotFound { // exit if we hit a block that is not yet in the db
455-
return nil
456-
}
457-
return err
458-
}
459-
log.Infof("deleting block at height %v with hash %x", dbBlock.Number, dbBlock.Hash)
460-
461-
err = bt.DeleteBlock(dbBlock.Number, dbBlock.Hash)
462-
if err != nil {
463-
return err
464-
}
465-
}
466-
} else {
467-
log.Infof("height %v, node block hash: %x, db block hash: %x", i, nodeBlock.Hash().Bytes(), dbBlock.Hash)
468-
}
469-
}
470-
471-
return nil
472-
}
473-
474405
// ProcessBalanceUpdates will use the balanceUpdater to fetch and update the balances
475406
// if iterations == -1 it will run forever
476407
func ProcessBalanceUpdates(balanceUpdater executionlayer.BalanceUpdater, batchSize int, iterations int) {
@@ -484,72 +415,3 @@ func ProcessBalanceUpdates(balanceUpdater executionlayer.BalanceUpdater, batchSi
484415
log.Infof("retrieved %v balances in %v, currently at %s", len(balances), time.Since(start), balances[len(balances)-1].Address)
485416
}
486417
}
487-
488-
func IndexFromNode(bt *db.Bigtable, client *rpc.ErigonClient, start, end, concurrency int64, traceMode string) error {
489-
ctx := context.Background()
490-
g, gCtx := errgroup.WithContext(ctx)
491-
g.SetLimit(int(concurrency))
492-
493-
startTs := time.Now()
494-
lastTickTs := time.Now()
495-
496-
processedBlocks := int64(0)
497-
498-
for i := start; i <= end; i++ {
499-
i := i
500-
g.Go(func() error {
501-
select {
502-
case <-gCtx.Done():
503-
return gCtx.Err()
504-
default:
505-
}
506-
507-
blockStartTs := time.Now()
508-
bc, timings, err := client.GetBlock(i, traceMode)
509-
if err != nil {
510-
return fmt.Errorf("error getting block: %v from ethereum node err: %w", i, err)
511-
}
512-
513-
dbStart := time.Now()
514-
err = bt.SaveBlock(bc)
515-
if err != nil {
516-
return fmt.Errorf("error saving block: %v to bigtable: %w", i, err)
517-
}
518-
current := atomic.AddInt64(&processedBlocks, 1)
519-
if current%100 == 0 {
520-
r := end - start
521-
if r == 0 {
522-
r = 1
523-
}
524-
perc := float64(i-start) * 100 / float64(r)
525-
526-
log.Infof("retrieved & saved block %v (0x%x) in %v (header: %v, receipts: %v, traces: %v, db: %v)", bc.Number, bc.Hash, time.Since(blockStartTs), timings.Headers, timings.Receipts, timings.Traces, time.Since(dbStart))
527-
log.Infof("processed %v blocks in %v (%.1f blocks / sec); sync is %.1f%% complete", current, time.Since(startTs), float64((current))/time.Since(lastTickTs).Seconds(), perc)
528-
529-
lastTickTs = time.Now()
530-
atomic.StoreInt64(&processedBlocks, 0)
531-
}
532-
return nil
533-
})
534-
}
535-
536-
err := g.Wait()
537-
538-
if err != nil {
539-
return err
540-
}
541-
542-
lastBlockInCache, err := bt.GetLastBlockInBlocksTable()
543-
if err != nil {
544-
return err
545-
}
546-
547-
if end > int64(lastBlockInCache) {
548-
err := bt.SetLastBlockInBlocksTable(end)
549-
550-
if err != nil {
551-
return err
552-
}
553-
}
554-
return nil
555-
}

0 commit comments

Comments
 (0)