Skip to content

Commit 87dbfb6

Browse files
feat(execution layer): logging and better architecture (#1425)
* feat(executionlayer/indexer): new struct to coordinate + logs * chore(ci): remove depguard on logrus * feat(executionlayer): better separation * change config.validate to config.init * feat(execution layer): clearer config
1 parent b365fe2 commit 87dbfb6

File tree

28 files changed

+749
-360
lines changed

28 files changed

+749
-360
lines changed

backend/.golangci.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,6 @@ linters-settings:
133133
allow:
134134
- $gostd
135135
deny:
136-
- pkg: github.com/sirupsen/logrus
137-
desc: "Do not use an external logging library, use pkg/commons/log/log instead"
138136
- pkg: log
139137
desc: "Do not use an external logging library, use pkg/commons/log/log instead"
140138
- pkg: log/slog

backend/cmd/eth1indexer/main.go

Lines changed: 55 additions & 246 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package eth1indexer
22

33
import (
44
"context"
5-
"encoding/json"
65
"flag"
76
"fmt"
87
"net/http"
@@ -13,16 +12,13 @@ import (
1312
"github.com/gobitfly/beaconchain/pkg/commons/db"
1413
"github.com/gobitfly/beaconchain/pkg/commons/db2"
1514
"github.com/gobitfly/beaconchain/pkg/commons/db2/database"
16-
"github.com/gobitfly/beaconchain/pkg/commons/erc20"
1715
"github.com/gobitfly/beaconchain/pkg/commons/log"
1816
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
1917
"github.com/gobitfly/beaconchain/pkg/commons/rpc"
20-
"github.com/gobitfly/beaconchain/pkg/commons/services"
2118
"github.com/gobitfly/beaconchain/pkg/commons/types"
2219
"github.com/gobitfly/beaconchain/pkg/commons/utils"
2320
"github.com/gobitfly/beaconchain/pkg/commons/version"
2421
"github.com/gobitfly/beaconchain/pkg/executionlayer"
25-
"github.com/gobitfly/beaconchain/pkg/executionlayer/evm"
2622

2723
"github.com/coocood/freecache"
2824
"github.com/ethereum/go-ethereum/common"
@@ -38,39 +34,37 @@ func Run() {
3834
erigonEndpoint := fs.String("erigon", "", "Erigon archive node enpoint")
3935
block := fs.Uint64("block", 0, "Index a specific block")
4036

41-
reorgDepth := fs.Uint64("reorg.depth", 20, "Lookback to check and handle chain reorgs")
37+
reorgDepth := fs.Uint64("reorg.depth", executionlayer.DefaultConfig.Reorg.Depth, "Lookback to check and handle chain reorgs")
38+
39+
bulk := fs.Uint64("bulk", executionlayer.DefaultConfig.Indexer.Bulk, "Maximum number of blocks to be processed before saving")
40+
concurrency := fs.Uint64("concurrency", executionlayer.DefaultConfig.BlockIndexer.Concurrency, "Concurrency to use when indexing blocks from erigon")
41+
blockFrequency := fs.Duration("frequency", executionlayer.DefaultConfig.Service.BlockFrequency, "Frequency used to look for blocks")
42+
43+
traceMode := fs.String("blocks.tracemode", executionlayer.DefaultConfig.BlockIndexer.TraceMode, "Trace mode to use, can bei either 'parity', 'geth' or 'parity/geth' for both")
44+
45+
startBlocks := fs.Uint64("resync.start", 0, "Block to start indexing")
46+
endBlocks := fs.Uint64("resync.end", 0, "Block to finish indexing")
47+
skipBlocks := fs.Bool("resync.skipBlocks", false, "Skip resync for blocks table")
48+
skipData := fs.Bool("resync.skipData", false, "Skip resync for data table")
4249

43-
concurrencyBlocks := fs.Uint64("blocks.concurrency", 30, "Concurrency to use when indexing blocks from erigon")
44-
startBlocks := fs.Uint64("blocks.start", 0, "Block to start indexing")
45-
endBlocks := fs.Uint64("blocks.end", 0, "Block to finish indexing")
46-
bulkBlocks := fs.Uint64("blocks.bulk", 8000, "Maximum number of blocks to be processed before saving")
47-
offsetBlocks := fs.Uint64("blocks.offset", 100, "Blocks offset")
4850
checkBlocksGaps := fs.Bool("blocks.gaps", false, "Check for gaps in the blocks table")
4951
checkBlocksGapsLookback := fs.Int("blocks.gaps.lookback", 1000000, "Lookback for gaps check of the blocks table")
50-
traceMode := fs.String("blocks.tracemode", "parity/geth", "Trace mode to use, can bei either 'parity', 'geth' or 'parity/geth' for both")
5152

52-
concurrencyData := fs.Uint64("data.concurrency", 30, "Concurrency to use when indexing data from bigtable")
53-
startData := fs.Uint64("data.start", 0, "Block to start indexing")
54-
endData := fs.Uint64("data.end", 0, "Block to finish indexing")
55-
bulkData := fs.Uint64("data.bulk", 8000, "Maximum number of blocks to be processed before saving")
56-
offsetData := fs.Uint64("data.offset", 1000, "Data offset")
5753
checkDataGaps := fs.Bool("data.gaps", false, "Check for gaps in the data table")
5854
checkDataGapsLookback := fs.Int("data.gaps.lookback", 1000000, "Lookback for gaps check of the blocks table")
5955

60-
enableBalanceUpdater := fs.Bool("balances.enabled", false, "Enable balance update process")
61-
enableFullBalanceUpdater := fs.Bool("balances.full.enabled", false, "Enable full balance update process")
62-
balanceUpdaterBatchSize := fs.Int("balances.batch", 1000, "Batch size for balance updates")
56+
balanceUpdaterBatchSize := fs.Int64("balances.batch", executionlayer.DefaultConfig.Indexer.BalanceUpdaterBatchSize, "Batch size for balance updates")
6357

6458
tokenPriceExport := fs.Bool("token.price.enabled", false, "Enable token export process")
6559
tokenPriceExportList := fs.String("token.price.list", "", "Tokenlist path to use for the token price export")
66-
tokenPriceExportFrequency := fs.Duration("token.price.frequency", time.Hour, "Token price export interval")
60+
tokenPriceExportFrequency := fs.Duration("token.price.frequency", executionlayer.DefaultConfig.Service.TokenPriceFrequency, "Token price export interval")
6761

6862
versionFlag := fs.Bool("version", false, "Print version and exit")
6963

7064
configPath := fs.String("config", "", "Path to the config file, if empty string defaults will be used")
7165

72-
enableEnsUpdater := fs.Bool("ens.enabled", false, "Enable ens update process")
73-
ensBatchSize := fs.Int64("ens.batch", 200, "Batch size for ens updates")
66+
enableEnsUpdater := fs.Bool("ens.enabled", executionlayer.DefaultConfig.Indexer.EnableENS, "Enable ens update process")
67+
ensBatchSize := fs.Int64("ens.batch", executionlayer.DefaultConfig.Indexer.ENSImportBatchSize, "Batch size for ens updates")
7468

7569
_ = fs.Parse(os.Args[2:])
7670

@@ -141,18 +135,18 @@ func Run() {
141135
log.Fatal(err, "erigon client creation error", 0)
142136
}
143137

144-
chainId := strconv.FormatUint(utils.Config.Chain.ClConfig.DepositChainID, 10)
138+
chainID := strconv.FormatUint(utils.Config.Chain.ClConfig.DepositChainID, 10)
145139

146-
nodeChainId, err := client.GetNativeClient().ChainID(context.Background())
140+
nodeChainID, err := client.GetNativeClient().ChainID(context.Background())
147141
if err != nil {
148142
log.Fatal(err, "node chain id error", 0)
149143
}
150144

151-
if nodeChainId.String() != chainId {
152-
log.Fatal(fmt.Errorf("node chain id mismatch, wanted %v got %v", chainId, nodeChainId.String()), "", 0)
145+
if nodeChainID.String() != chainID {
146+
log.Fatal(fmt.Errorf("node chain id mismatch, wanted %v got %v", chainID, nodeChainID.String()), "", 0)
153147
}
154148

155-
bt, err := db.InitBigtable(utils.Config.Bigtable.Project, utils.Config.Bigtable.Instance, chainId, utils.Config.RedisCacheEndpoint)
149+
bt, err := db.InitBigtable(utils.Config.Bigtable.Project, utils.Config.Bigtable.Instance, chainID, utils.Config.RedisCacheEndpoint)
156150
if err != nil {
157151
log.Fatal(err, "error connecting to bigtable", 0)
158152
}
@@ -164,70 +158,55 @@ func Run() {
164158
}
165159

166160
cache := freecache.NewCache(100 * 1024 * 1024) // 100 MB limit
167-
store := db2.NewStoreV1FromBigtable(bigtable, database.FreeCache{Cache: cache})
161+
store := db2.NewStoreV1FromBigtable(bigtable, db2.CachedBalanceUpdates{RemoteCache: database.FreeCache{Cache: cache}})
162+
163+
config := executionlayer.DefaultConfig
168164

169-
batcherConfig := evm.BatcherConfig{
170-
Limit: utils.Config.Indexer.BatchLimit,
171-
}
172165
if utils.Config.Indexer.MulticallAddresses != "" {
173166
parsed := common.HexToAddress(utils.Config.Indexer.MulticallAddresses)
174-
batcherConfig.MulticallAddress = &parsed
167+
config.Batcher.MulticallAddress = &parsed
175168
}
176-
batcher := evm.NewBatcher(nodeChainId, client.GetNativeClient(), batcherConfig)
177-
178-
if *tokenPriceExport {
179-
go func() {
180-
for {
181-
tokenList, err := readTokenListFile(*tokenPriceExportList)
182-
if err != nil {
183-
log.Error(err, "error reading token list file", 0)
184-
}
185-
pricer := executionlayer.NewTokenPricer(
186-
store,
187-
chainId,
188-
executionlayer.NewLlamaClient(),
189-
tokenList,
190-
batcher,
191-
)
192-
if err := pricer.UpdateTokens(); err != nil {
193-
log.Error(err, "error updating tokens", 0)
194-
}
195-
time.Sleep(*tokenPriceExportFrequency)
196-
}
197-
}()
169+
if utils.Config.Indexer.BatchLimit != 0 {
170+
config.Batcher.Limit = utils.Config.Indexer.BatchLimit
198171
}
172+
config.BlockIndexer.Concurrency = *concurrency
173+
config.BlockIndexer.TraceMode = *traceMode
199174

200-
if *enableEnsUpdater {
201-
importer := executionlayer.NewENSImporter(store, db2.NewENSStore(db.WriterDb), executionlayer.NewEnsContracts(client.GetNativeClient()))
202-
go ImportEnsUpdatesLoop(chainId, importer, *ensBatchSize)
203-
}
175+
config.Reorg.Depth = *reorgDepth
204176

205-
lastBlockStore := db2.NewCachedLastBlocks(database.Redis{Client: redisClient}, store)
206-
indexer := executionlayer.NewIndexer(store, lastBlockStore, executionlayer.AllTransformers...)
207-
balanceUpdater := executionlayer.NewBalanceUpdater(chainId, store, store, batcher)
208-
reorgWatcher := executionlayer.NewReorgWatcher(client.GetNativeClient(), store, *reorgDepth, chainId, lastBlockStore)
177+
config.Indexer.BalanceUpdaterBatchSize = *balanceUpdaterBatchSize
178+
config.Indexer.ENSImportBatchSize = *ensBatchSize
179+
config.Indexer.Bulk = *bulk
180+
config.Indexer.EnableENS = *enableEnsUpdater
209181

210-
if *enableFullBalanceUpdater {
211-
ProcessBalanceUpdates(balanceUpdater, *balanceUpdaterBatchSize, -1)
212-
return
213-
}
182+
config.Service.TokenPriceFrequency = *tokenPriceExportFrequency
183+
config.Service.BlockFrequency = *blockFrequency
214184

185+
service := executionlayer.NewIndexerService(config, store, client, database.FreeCache{Cache: cache}, database.Redis{Client: redisClient})
186+
187+
start, end := uint64(0), uint64(0)
215188
if *block != 0 {
216-
if err := indexer.IndexNode(chainId, client, *block, *block, *concurrencyBlocks, *traceMode); err != nil {
217-
log.Fatal(err, "error indexing from node", 0, map[string]interface{}{"block": *block, "concurrency": *concurrencyBlocks})
218-
}
219-
if err := indexer.IndexEvents(chainId, *block, *block, *concurrencyData); err != nil {
220-
log.Fatal(err, "error indexing from bigtable", 0)
221-
}
222-
cache.Clear()
189+
start, end = *block, *block
190+
}
191+
if startBlocks != nil {
192+
start = *startBlocks
193+
}
194+
if endBlocks != nil {
195+
end = *endBlocks
196+
}
197+
if *tokenPriceExport {
198+
go service.SyncTokenPrice(*tokenPriceExportList)
199+
}
223200

224-
log.Infof("indexing of block %v completed", *block)
201+
if end != 0 {
202+
if err := service.SyncRange(start, end, *skipBlocks, *skipData); err != nil {
203+
log.Fatal(err, "error indexing from node", 0, map[string]interface{}{"start": *startBlocks, "end": *endBlocks, "concurrency": *concurrency})
204+
}
225205
return
226206
}
227207

228208
if *checkBlocksGaps {
229209
_, _, _, err := bt.CheckForGapsInBlocksTable(*checkBlocksGapsLookback)
230-
231210
if err != nil {
232211
log.Fatal(err, "error checking for gaps in blocks table", 0)
233212
}
@@ -242,175 +221,5 @@ func Run() {
242221
return
243222
}
244223

245-
if *endBlocks != 0 && *startBlocks < *endBlocks {
246-
if err = indexer.IndexNode(chainId, client, *startBlocks, *endBlocks, *concurrencyBlocks, *traceMode); err != nil {
247-
log.Fatal(err, "error indexing from node", 0, map[string]interface{}{"start": *startBlocks, "end": *endBlocks, "concurrency": *concurrencyBlocks})
248-
}
249-
return
250-
}
251-
252-
if *endData != 0 && *startData < *endData {
253-
if err := indexer.IndexEvents(chainId, *startData, *endData, *concurrencyData); err != nil {
254-
log.Fatal(err, "error indexing from bigtable", 0)
255-
}
256-
cache.Clear()
257-
return
258-
}
259-
260-
lastSuccessfulBlockIndexingTs := time.Now()
261-
for ; ; time.Sleep(time.Second * 14) {
262-
if err := reorgWatcher.LookForReorg(); err != nil {
263-
log.Error(err, "error handling chain reorg", 0)
264-
continue
265-
}
266-
267-
lastBlockFromNode, err := client.GetLatestEth1BlockNumber()
268-
if err != nil {
269-
log.Error(err, "error retrieving latest eth block number", 0)
270-
continue
271-
}
272-
273-
lastBlockFromBlocksTable, err := lastBlockStore.GetInBlocksTable(chainId)
274-
if err != nil {
275-
log.Error(err, "error retrieving last blocks from blocks table", 0)
276-
continue
277-
}
278-
279-
lastBlockFromDataTable, err := lastBlockStore.GetInDataTable(chainId)
280-
if err != nil {
281-
log.Error(err, "error retrieving last blocks from data table", 0)
282-
continue
283-
}
284-
285-
log.InfoWithFields(log.Fields{
286-
"node": lastBlockFromNode,
287-
"blocks": lastBlockFromBlocksTable,
288-
"data": lastBlockFromDataTable,
289-
}, "last blocks")
290-
291-
continueAfterError := false
292-
if lastBlockFromNode > 0 {
293-
if lastBlockFromBlocksTable < lastBlockFromNode {
294-
log.Infof("missing blocks %v to %v in blocks table, indexing ...", lastBlockFromBlocksTable+1, lastBlockFromNode)
295-
296-
startBlock := uint64(0)
297-
if int64(lastBlockFromDataTable+1-*offsetData) > 0 {
298-
startBlock = lastBlockFromBlocksTable + 1 - *offsetBlocks
299-
}
300-
301-
if *bulkBlocks <= 0 || *bulkBlocks > lastBlockFromNode-startBlock+1 {
302-
*bulkBlocks = lastBlockFromNode - startBlock + 1
303-
}
304-
305-
for startBlock <= lastBlockFromNode && !continueAfterError {
306-
endBlock := startBlock + *bulkBlocks - 1
307-
if endBlock > lastBlockFromNode {
308-
endBlock = lastBlockFromNode
309-
}
310-
311-
err = indexer.IndexNode(chainId, client, startBlock, endBlock, *concurrencyBlocks, *traceMode)
312-
if err != nil {
313-
errMsg := "error indexing from node"
314-
errFields := map[string]interface{}{
315-
"start": startBlock,
316-
"end": endBlock,
317-
"concurrency": *concurrencyBlocks}
318-
if time.Since(lastSuccessfulBlockIndexingTs) > time.Minute*30 {
319-
log.Fatal(err, errMsg, 0, errFields)
320-
} else {
321-
log.Error(err, errMsg, 0, errFields)
322-
}
323-
continueAfterError = true
324-
continue
325-
} else {
326-
lastSuccessfulBlockIndexingTs = time.Now()
327-
}
328-
329-
startBlock = endBlock + 1
330-
}
331-
if continueAfterError {
332-
continue
333-
}
334-
}
335-
336-
if lastBlockFromDataTable < lastBlockFromNode {
337-
log.Infof("missing blocks %v to %v in data table, indexing ...", lastBlockFromDataTable+1, lastBlockFromNode)
338-
339-
startBlock := uint64(0)
340-
if int64(lastBlockFromDataTable+1-*offsetData) > 0 {
341-
startBlock = lastBlockFromDataTable + 1 - *offsetData
342-
}
343-
344-
if *bulkData <= 0 || *bulkData > lastBlockFromNode-startBlock+1 {
345-
*bulkData = lastBlockFromNode - startBlock + 1
346-
}
347-
348-
for startBlock <= lastBlockFromNode && !continueAfterError {
349-
endBlock := startBlock + *bulkData - 1
350-
if endBlock > lastBlockFromNode {
351-
endBlock = lastBlockFromNode
352-
}
353-
354-
if err := indexer.IndexEvents(chainId, startBlock, endBlock, *concurrencyBlocks); err != nil {
355-
log.Error(err, "error indexing from bigtable", 0, map[string]interface{}{"start": startBlock, "end": endBlock, "concurrency": *concurrencyData})
356-
cache.Clear()
357-
continueAfterError = true
358-
continue
359-
}
360-
cache.Clear()
361-
362-
startBlock = endBlock + 1
363-
}
364-
if continueAfterError {
365-
continue
366-
}
367-
}
368-
}
369-
370-
if *enableBalanceUpdater {
371-
ProcessBalanceUpdates(balanceUpdater, *balanceUpdaterBatchSize, 10)
372-
}
373-
374-
log.Infof("index run completed")
375-
services.ReportStatus("eth1indexer", "Running", nil)
376-
}
377-
378-
// utils.WaitForCtrlC()
379-
}
380-
381-
func ImportEnsUpdatesLoop(chainID string, importer executionlayer.ENSImporter, batchSize int64) {
382-
for {
383-
time.Sleep(time.Second * 5)
384-
if err := importer.Import(chainID, batchSize); err != nil {
385-
log.Error(err, "error importing ens updates", 0, nil)
386-
continue
387-
}
388-
services.ReportStatus("ensIndexer", "Running", nil)
389-
}
390-
}
391-
392-
func readTokenListFile(path string) (erc20.ERC20TokenList, error) {
393-
tokenListContent, err := os.ReadFile(path)
394-
if err != nil {
395-
return erc20.ERC20TokenList{}, err
396-
}
397-
var tokenList erc20.ERC20TokenList
398-
if err := json.Unmarshal(tokenListContent, &tokenList); err != nil {
399-
return erc20.ERC20TokenList{}, err
400-
}
401-
return tokenList, nil
402-
}
403-
404-
// ProcessBalanceUpdates will use the balanceUpdater to fetch and update the balances
405-
// if iterations == -1 it will run forever
406-
func ProcessBalanceUpdates(balanceUpdater executionlayer.BalanceUpdater, batchSize int, iterations int) {
407-
for its := 0; iterations == -1 || its < iterations; its++ {
408-
start := time.Now()
409-
balances, err := balanceUpdater.UpdateBalances(int64(batchSize))
410-
if err != nil {
411-
log.Error(err, "error updating balances", 0)
412-
return
413-
}
414-
log.Infof("retrieved %v balances in %v, currently at %s", len(balances), time.Since(start), balances[len(balances)-1].Address)
415-
}
224+
service.SyncLive()
416225
}

0 commit comments

Comments
 (0)