Skip to content

Commit c914e4e

Browse files
feat(executionlayer): metrics (#1426)
fix naming change reorg log severity
1 parent 87dbfb6 commit c914e4e

File tree

9 files changed

+216
-67
lines changed

9 files changed

+216
-67
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package metrics
2+
3+
import (
4+
"github.com/prometheus/client_golang/prometheus"
5+
"github.com/prometheus/client_golang/prometheus/promauto"
6+
)
7+
8+
var (
9+
IndexingBlockDifference = promauto.NewGaugeVec(prometheus.GaugeOpts{
10+
Name: "indexing_block_difference",
11+
Help: "Difference between the latest on-chain block and the last indexed block",
12+
}, []string{"chainID"})
13+
14+
IndexingTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
15+
Name: "indexing_time_seconds",
16+
Help: "Time taken to index the new blocks",
17+
}, []string{"chainID"})
18+
19+
IndexingTransformerProcessingTime = promauto.NewHistogramVec(prometheus.HistogramOpts{
20+
Name: "indexing_transformer_processing_time_seconds",
21+
Help: "Time taken by a transformer to index a block",
22+
}, []string{"chainID", "transformer"})
23+
24+
IndexingPendingBalanceUpdate = promauto.NewGaugeVec(prometheus.GaugeOpts{
25+
Name: "indexing_pending_balance_update",
26+
Help: "Pending balances to update",
27+
}, []string{"chainID"})
28+
29+
IndexingPendingENSUpdate = promauto.NewGaugeVec(prometheus.GaugeOpts{
30+
Name: "indexing_pending_ens_update",
31+
Help: "Pending ENS to update",
32+
}, []string{"chainID"})
33+
34+
IndexingReorgTotal = promauto.NewCounterVec(prometheus.CounterOpts{
35+
Name: "indexing_reorg_total",
36+
Help: "Total number of block impacted by chain reorganizations",
37+
}, []string{"chainID"})
38+
)

backend/pkg/executionlayer/blockindexer.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ type BlockIndexerConfig struct {
3131
type BlockIndexer struct {
3232
store Store
3333
lastBlockStore db2.LastBlocksStore
34-
transformers []TransformFunc
34+
transformers []Transformer
3535
client Client
3636
config BlockIndexerConfig
3737
}
3838

39-
func NewBlockIndexer(store Store, lastBlockStore db2.LastBlocksStore, config BlockIndexerConfig, client Client, transformers ...TransformFunc) *BlockIndexer {
39+
func NewBlockIndexer(store Store, lastBlockStore db2.LastBlocksStore, config BlockIndexerConfig, client Client, transformers ...Transformer) *BlockIndexer {
4040
return &BlockIndexer{
4141
store: store,
4242
lastBlockStore: lastBlockStore,
@@ -170,11 +170,13 @@ func (indexer *BlockIndexer) indexBlock(chainID string, block *types.Eth1Block)
170170
Number: block.Number,
171171
Hash: block.Hash,
172172
}
173-
for _, transform := range indexer.transformers {
174-
err := transform(chainID, block, &res)
173+
for _, transformer := range indexer.transformers {
174+
start := time.Now()
175+
err := transformer.fn(chainID, block, &res)
175176
if err != nil {
176-
return fmt.Errorf("error transforming block [%v]", block.Number)
177+
return fmt.Errorf("error transforming [%v] block [%v]", transformer.name, block.Number)
177178
}
179+
indexingMetrics.TransformerProcessingTime(chainID, transformer.name, time.Since(start))
178180
}
179181
if err := indexer.store.AddIndexedBlock(res); err != nil {
180182
return fmt.Errorf("error saving block [%v]: %w", block.Number, err)

backend/pkg/executionlayer/blockindexer_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,15 @@ func TestIndexerWithBigTable(t *testing.T) {
3232

3333
tests := []struct {
3434
name string
35-
transformers []TransformFunc
35+
transformers []Transformer
3636
action func(*testing.T) error
3737
dataKeys []string
3838
updatesKeys []string
3939
metadataKeys []string
4040
}{
4141
{
4242
name: "transaction",
43-
transformers: []TransformFunc{
43+
transformers: []Transformer{
4444
TransformTx,
4545
},
4646
action: func(t *testing.T) error {
@@ -56,7 +56,7 @@ func TestIndexerWithBigTable(t *testing.T) {
5656
},
5757
{
5858
name: "block",
59-
transformers: []TransformFunc{
59+
transformers: []Transformer{
6060
TransformBlock,
6161
},
6262
action: func(t *testing.T) error {
@@ -71,7 +71,7 @@ func TestIndexerWithBigTable(t *testing.T) {
7171
},
7272
{
7373
name: "erc20 transfer",
74-
transformers: []TransformFunc{
74+
transformers: []Transformer{
7575
TransformERC20,
7676
},
7777
action: func(t *testing.T) error {
@@ -89,7 +89,7 @@ func TestIndexerWithBigTable(t *testing.T) {
8989
},
9090
{
9191
name: "erc721 transfer",
92-
transformers: []TransformFunc{
92+
transformers: []Transformer{
9393
TransformERC721,
9494
},
9595
action: func(t *testing.T) error {
@@ -104,7 +104,7 @@ func TestIndexerWithBigTable(t *testing.T) {
104104
},
105105
{
106106
name: "erc1155 transfer",
107-
transformers: []TransformFunc{
107+
transformers: []Transformer{
108108
TransformERC1155,
109109
},
110110
action: func(t *testing.T) error {
@@ -119,7 +119,7 @@ func TestIndexerWithBigTable(t *testing.T) {
119119
},
120120
{
121121
name: "contract",
122-
transformers: []TransformFunc{
122+
transformers: []Transformer{
123123
TransformContract,
124124
},
125125
action: func(t *testing.T) error {

backend/pkg/executionlayer/executionlayer.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,25 @@ func (service *IndexerService) SyncRange(start, end uint64, skipNode, skipData b
9595

9696
func (service *IndexerService) SyncLive() {
9797
for ; ; time.Sleep(service.config.BlockFrequency) {
98+
reorgDepth, err := service.reorgWatcher.LookForReorg()
99+
if err != nil {
100+
logger.WithField("error", err).Error("reorg lookup")
101+
continue
102+
}
103+
if reorgDepth != 0 {
104+
indexingMetrics.ReorgBlockTotal(service.stateReader.chainID, reorgDepth)
105+
logger.WithField("depth", reorgDepth).Info("reorg detected")
106+
}
107+
98108
state, err := service.stateReader.state()
99109
if err != nil {
100110
logger.WithField("error", err).Error("cannot get state")
101111
continue
102112
}
103113
logger.WithFields(state.Fields()).Info("last blocks")
114+
indexingMetrics.BlockDifference(state.chainID, state.node-state.LastProcessed())
104115

105-
if err := service.reorgWatcher.LookForReorg(); err != nil {
106-
logger.WithField("error", err).Error("reorg lookup")
116+
if state.node == state.LastProcessed() {
107117
continue
108118
}
109119

@@ -184,6 +194,11 @@ type syncState struct {
184194
data uint64
185195
}
186196

197+
// LastProcessed returns the real last block processed by taking the smallest block between state.data and state.blocks
198+
func (state syncState) LastProcessed() uint64 {
199+
return min(state.data, state.blocks)
200+
}
201+
187202
func (state syncState) Fields() map[string]interface{} {
188203
return map[string]interface{}{
189204
"chainID": state.chainID,
@@ -236,8 +251,7 @@ func (service *Indexer) FromHead(state syncState) error {
236251
// clear balance cache
237252
defer service.balanceCache.Clear(state.chainID)
238253

239-
// get the real last block processed by taking the smallest block between state.data and state.blocks
240-
startBlock := max(min(state.data, state.blocks)+1, 0)
254+
startBlock := max(state.LastProcessed()+1, 0)
241255
bulk := min(service.config.Bulk, state.node-startBlock+1)
242256

243257
for ; startBlock <= state.node; startBlock += bulk {
@@ -257,9 +271,11 @@ func (service *Indexer) FromHead(state syncState) error {
257271
"elapsed": time.Since(start),
258272
}).Info("indexed blocks")
259273
}
274+
end := time.Since(start) // save end to unify log and metric value
275+
indexingMetrics.IndexingTime(state.chainID, end)
260276

261277
logger.WithFields(logrus.Fields{
262-
"duration": time.Since(start),
278+
"duration": end,
263279
}).Info("indexed head")
264280
return nil
265281
}
@@ -278,6 +294,7 @@ func (service *Indexer) Balances(state syncState) {
278294
continue
279295
}
280296
logger = logger.WithField("pending", total)
297+
indexingMetrics.PendingBalanceUpdate(state.chainID, total)
281298
if total == 0 {
282299
logger.Info("finished updating balances")
283300
return
@@ -312,6 +329,7 @@ func (service *Indexer) ENS(state syncState) {
312329
continue
313330
}
314331
logger = logger.WithField("pending", total)
332+
indexingMetrics.PendingENSUpdate(state.chainID, total)
315333
if total == 0 {
316334
logger.Info("finished importing ens")
317335
return
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package executionlayer
2+
3+
import (
4+
"time"
5+
6+
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
7+
)
8+
9+
var indexingMetrics = prometheusMetrics{}
10+
11+
type prometheusMetrics struct{}
12+
13+
func (p prometheusMetrics) BlockDifference(chainID string, difference uint64) {
14+
metrics.IndexingBlockDifference.WithLabelValues(chainID).Set(float64(difference))
15+
}
16+
17+
func (p prometheusMetrics) IndexingTime(chainID string, t time.Duration) {
18+
metrics.IndexingTime.WithLabelValues(chainID).Observe(t.Seconds())
19+
}
20+
21+
func (p prometheusMetrics) TransformerProcessingTime(chainID string, transformer string, t time.Duration) {
22+
metrics.IndexingTransformerProcessingTime.WithLabelValues(chainID, transformer).Observe(t.Seconds())
23+
}
24+
25+
func (p prometheusMetrics) PendingBalanceUpdate(chainID string, count int64) {
26+
metrics.IndexingPendingBalanceUpdate.WithLabelValues(chainID).Set(float64(count))
27+
}
28+
29+
func (p prometheusMetrics) PendingENSUpdate(chainID string, count int64) {
30+
metrics.IndexingPendingENSUpdate.WithLabelValues(chainID).Set(float64(count))
31+
}
32+
33+
func (p prometheusMetrics) ReorgBlockTotal(chainID string, depth uint64) {
34+
metrics.IndexingReorgTotal.WithLabelValues(chainID).Add(float64(depth))
35+
}

backend/pkg/executionlayer/reorg.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ func NewReorgWatcher(client EthClient, store ReorgStore, config ReorgConfig, cha
4646
}
4747
}
4848

49-
func (r *ReorgWatcher) LookForReorg() error {
49+
func (r *ReorgWatcher) LookForReorg() (uint64, error) {
5050
head, err := r.client.HeaderByNumber(context.Background(), nil)
5151
if err != nil {
52-
return err
52+
return 0, err
5353
}
5454

5555
// ensure we will not try to retrieve blocks that do not exist
@@ -59,32 +59,34 @@ func (r *ReorgWatcher) LookForReorg() error {
5959
}
6060

6161
ctx := context.Background()
62-
62+
var reorgDepth uint64
6363
// for each block check if block node hash and block db hash match
6464
for i := head.Number.Uint64() - depth; i <= head.Number.Uint64(); i++ {
6565
nodeBlock, err := r.client.HeaderByNumber(ctx, big.NewInt(int64(i)))
6666
if err != nil {
67-
return err
67+
return 0, err
6868
}
6969
dbBlock, err := r.store.GetBlock(r.chainID, i)
7070
if err != nil {
7171
// exit if we hit a block that is not yet in the db
7272
// it means that we never processed that block or that we revert that block
7373
if errors.Is(err, database.ErrNotFound) {
74-
return nil
74+
break
7575
}
76-
return err
76+
return 0, err
7777
}
7878

7979
if bytes.Equal(nodeBlock.Hash().Bytes(), dbBlock.Hash) {
80+
// block was not reorg, continue
8081
continue
8182
}
83+
reorgDepth = head.Number.Uint64() - i + 1
8284
log.Warnf("found incosistency at height %v, node block hash: %x, db block hash: %x", i, nodeBlock.Hash().Bytes(), dbBlock.Hash)
8385
if err := r.handleReorg(i, head.Number.Uint64()); err != nil {
84-
return err
86+
return 0, err
8587
}
8688
}
87-
return nil
89+
return reorgDepth, nil
8890
}
8991

9092
func (r *ReorgWatcher) handleReorg(number uint64, head uint64) error {

backend/pkg/executionlayer/reorg_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,16 @@ func TestReorgWatcher(t *testing.T) {
8080
client := newFakeClient(tt.nodeBlocks...)
8181
store := newStubReorgStore(tt.dbBlocks...)
8282
reorg := NewReorgWatcher(client, store, ReorgConfig{tt.depth}, "chainID", tt.lastBlockStore)
83-
if err := reorg.LookForReorg(); err != nil {
83+
depth, err := reorg.LookForReorg()
84+
if err != nil {
8485
t.Fatal(err)
8586
}
8687
if got, want := store.reverted, tt.reverted; !reflect.DeepEqual(got, want) {
8788
t.Errorf("got %v, want %v", got, want)
8889
}
90+
if got, want := depth, uint64(len(tt.reverted)); got != want {
91+
t.Errorf("got %v, want %v", got, want)
92+
}
8993
})
9094
}
9195
}
@@ -192,7 +196,7 @@ func TestReorgWithBackendAndIndexer(t *testing.T) {
192196
// create block that has the same number as revertedBlock (root+1)
193197
backend.Commit()
194198

195-
if err := reorg.LookForReorg(); err != nil {
199+
if _, err := reorg.LookForReorg(); err != nil {
196200
t.Fatal(err)
197201
}
198202

0 commit comments

Comments
 (0)