Skip to content

Commit 2df4841

Browse files
Integrate Metrics with Beholder (#42)
* Centralized TXM metrics and integrated beholder * Renamed types and added metric attributes * Tidied go mod * Simplified time and block till confirmation observations * Add beholder metrics * Add Beholder Metrics * Add metrics * Create generic metrics * Update send_only_node.go * Update send_only_node_lifecycle.go * Export logpoller metrics * Set dataset size * Bump mockery * Add network to node fsm metrics * Mock test metrics * Add test coverage * lint * lint * Update chain_families.go * Create txm_test.go * Prefix beholder metrics * Add NodeClientVersion * Add NodeClientVersion test * Add ClientVersion metric * Update metrics * Fix test * Fix tests * lint * Use test context * Update node_fsm.go * Rename ClientVersion --------- Co-authored-by: amit-momin <[email protected]>
1 parent e0f5602 commit 2df4841

39 files changed

+2543
-458
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ gomodtidy: gomods
88

99
.PHONY: mockery
1010
mockery: $(mockery) ## Install mockery.
11-
go install github.com/vektra/mockery/v2@v2.46.3
11+
go install github.com/vektra/mockery/v2@v2.53.0
1212

1313
.PHONY: generate
1414
generate: mockery

chains/txmgr/broadcaster.go

Lines changed: 13 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"time"
1010

1111
"github.com/jpillora/backoff"
12-
"github.com/prometheus/client_golang/prometheus"
13-
"github.com/prometheus/client_golang/prometheus/promauto"
1412
"go.uber.org/multierr"
1513
"gopkg.in/guregu/null.v4"
1614

@@ -43,22 +41,6 @@ const (
4341
hederaChainType = "hedera"
4442
)
4543

46-
var (
47-
promTimeUntilBroadcast = promauto.NewHistogramVec(prometheus.HistogramOpts{
48-
Name: "tx_manager_time_until_tx_broadcast",
49-
Help: "The amount of time elapsed from when a transaction is enqueued to until it is broadcast.",
50-
Buckets: []float64{
51-
float64(500 * time.Millisecond),
52-
float64(time.Second),
53-
float64(5 * time.Second),
54-
float64(15 * time.Second),
55-
float64(30 * time.Second),
56-
float64(time.Minute),
57-
float64(2 * time.Minute),
58-
},
59-
}, []string{"chainID"})
60-
)
61-
6244
var ErrTxRemoved = errors.New("tx removed")
6345

6446
type ProcessUnstartedTxs[ADDR chains.Hashable] func(ctx context.Context, fromAddress ADDR) (retryable bool, err error)
@@ -79,6 +61,11 @@ type TransmitChecker[CID chains.ID, ADDR chains.Hashable, THASH, BHASH chains.Ha
7961
Check(ctx context.Context, l logger.SugaredLogger, tx types.Tx[CID, ADDR, THASH, BHASH, SEQ, FEE], a types.TxAttempt[CID, ADDR, THASH, BHASH, SEQ, FEE]) error
8062
}
8163

64+
type broadcasterMetrics interface {
65+
IncrementNumBroadcastedTxs(ctx context.Context)
66+
RecordTimeUntilTxBroadcast(ctx context.Context, duration float64)
67+
}
68+
8269
// Broadcaster monitors txes for transactions that need to
8370
// be broadcast, assigns sequences and ensures that at least one node
8471
// somewhere has received the transaction successfully.
@@ -106,6 +93,7 @@ type Broadcaster[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, T
10693
feeConfig types.BroadcasterFeeConfig
10794
txConfig types.BroadcasterTransactionsConfig
10895
listenerConfig types.BroadcasterListenerConfig
96+
metrics broadcasterMetrics
10997

11098
// autoSyncSequence, if set, will cause Broadcaster to fast-forward the sequence
11199
// when Start is called
@@ -144,6 +132,7 @@ func NewBroadcaster[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable
144132
checkerFactory TransmitCheckerFactory[CID, ADDR, THASH, BHASH, SEQ, FEE],
145133
autoSyncSequence bool,
146134
chainType string,
135+
metrics broadcasterMetrics,
147136
) *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE] {
148137
lggr = logger.Named(lggr, "Broadcaster")
149138
b := &Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]{
@@ -161,6 +150,7 @@ func NewBroadcaster[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable
161150
checkerFactory: checkerFactory,
162151
autoSyncSequence: autoSyncSequence,
163152
sequenceTracker: sequenceTracker,
153+
metrics: metrics,
164154
}
165155

166156
b.processUnstartedTxsImpl = b.processUnstartedTxs
@@ -532,11 +522,12 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) handleInProgress
532522
// In all scenarios, the correct thing to do is assume success for now
533523
// and hand off to the confirmer to get the receipt (or mark as
534524
// failed).
535-
observeTimeUntilBroadcast(eb.chainID, etx.CreatedAt, time.Now())
525+
observeTimeUntilBroadcast(ctx, eb.metrics, etx.CreatedAt, time.Now())
536526
err = eb.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, &etx, attempt, types.TxAttemptBroadcast)
537527
if err != nil {
538528
return err, true
539529
}
530+
eb.metrics.IncrementNumBroadcastedTxs(ctx)
540531
// Increment sequence if successfully broadcasted
541532
eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence)
542533
return err, true
@@ -601,6 +592,7 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) handleInProgress
601592
if err != nil {
602593
return err, true
603594
}
595+
eb.metrics.IncrementNumBroadcastedTxs(ctx)
604596
// Increment sequence if successfully broadcasted
605597
eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence)
606598
return err, true
@@ -755,7 +747,7 @@ func (eb *Broadcaster[CID, HEAD, ADDR, THASH, BHASH, SEQ, FEE]) saveFatallyError
755747
return eb.txStore.UpdateTxFatalErrorAndDeleteAttempts(ctx, etx)
756748
}
757749

758-
func observeTimeUntilBroadcast[CHAIN_ID chains.ID](chainID CHAIN_ID, createdAt, broadcastAt time.Time) {
750+
func observeTimeUntilBroadcast(ctx context.Context, metrics broadcasterMetrics, createdAt, broadcastAt time.Time) {
759751
duration := float64(broadcastAt.Sub(createdAt))
760-
promTimeUntilBroadcast.WithLabelValues(chainID.String()).Observe(duration)
752+
metrics.RecordTimeUntilTxBroadcast(ctx, duration)
761753
}

chains/txmgr/confirmer.go

Lines changed: 38 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import (
1010
"sync"
1111
"time"
1212

13-
"github.com/prometheus/client_golang/prometheus"
14-
"github.com/prometheus/client_golang/prometheus/promauto"
1513
"go.uber.org/multierr"
1614

1715
commonhex "github.com/smartcontractkit/chainlink-common/pkg/utils/hex"
@@ -33,48 +31,13 @@ const (
3331
processHeadTimeout = 10 * time.Minute
3432
)
3533

36-
var (
37-
promNumGasBumps = promauto.NewCounterVec(prometheus.CounterOpts{
38-
Name: "tx_manager_num_gas_bumps",
39-
Help: "Number of gas bumps",
40-
}, []string{"chainID"})
41-
42-
promGasBumpExceedsLimit = promauto.NewCounterVec(prometheus.CounterOpts{
43-
Name: "tx_manager_gas_bump_exceeds_limit",
44-
Help: "Number of times gas bumping failed from exceeding the configured limit. Any counts of this type indicate a serious problem.",
45-
}, []string{"chainID"})
46-
promNumConfirmedTxs = promauto.NewCounterVec(prometheus.CounterOpts{
47-
Name: "tx_manager_num_confirmed_transactions",
48-
Help: "Total number of confirmed transactions. Note that this can err to be too high since transactions are counted on each confirmation, which can happen multiple times per transaction in the case of re-orgs",
49-
}, []string{"chainID"})
50-
promTimeUntilTxConfirmed = promauto.NewHistogramVec(prometheus.HistogramOpts{
51-
Name: "tx_manager_time_until_tx_confirmed",
52-
Help: "The amount of time elapsed from a transaction being broadcast to being included in a block.",
53-
Buckets: []float64{
54-
float64(500 * time.Millisecond),
55-
float64(time.Second),
56-
float64(5 * time.Second),
57-
float64(15 * time.Second),
58-
float64(30 * time.Second),
59-
float64(time.Minute),
60-
float64(2 * time.Minute),
61-
float64(5 * time.Minute),
62-
float64(10 * time.Minute),
63-
},
64-
}, []string{"chainID"})
65-
promBlocksUntilTxConfirmed = promauto.NewHistogramVec(prometheus.HistogramOpts{
66-
Name: "tx_manager_blocks_until_tx_confirmed",
67-
Help: "The amount of blocks that have been mined from a transaction being broadcast to being included in a block.",
68-
Buckets: []float64{
69-
float64(1),
70-
float64(5),
71-
float64(10),
72-
float64(20),
73-
float64(50),
74-
float64(100),
75-
},
76-
}, []string{"chainID"})
77-
)
34+
type confimerMetrics interface {
35+
IncrementNumGasBumps(ctx context.Context)
36+
IncrementGasBumpExceedsLimit(ctx context.Context)
37+
IncrementNumConfirmedTxs(ctx context.Context, confirmedTransactions int)
38+
RecordTimeUntilTxConfirmed(ctx context.Context, duration float64)
39+
RecordBlocksUntilTxConfirmed(ctx context.Context, blocksElapsed float64)
40+
}
7841

7942
// Confirmer is a broad service which performs four different tasks in sequence on every new longest chain
8043
// Step 1: Mark that all currently pending transaction attempts were broadcast before this block
@@ -95,6 +58,7 @@ type Confirmer[CID chains.ID, HEAD chains.Head[BHASH], ADDR chains.Hashable, THA
9558
txConfig types.ConfirmerTransactionsConfig
9659
dbConfig types.ConfirmerDatabaseConfig
9760
chainID CID
61+
metrics confimerMetrics
9862

9963
ks types.KeyStore[ADDR]
10064
enabledAddresses []ADDR
@@ -127,6 +91,7 @@ func NewConfirmer[
12791
lggr logger.Logger,
12892
isReceiptNil func(R) bool,
12993
stuckTxDetector types.StuckTxDetector[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
94+
metrics confimerMetrics,
13095
) *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE] {
13196
lggr = logger.Named(lggr, "Confirmer")
13297
return &Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]{
@@ -143,6 +108,7 @@ func NewConfirmer[
143108
mb: mailbox.NewSingle[HEAD](),
144109
isReceiptNil: isReceiptNil,
145110
stuckTxDetector: stuckTxDetector,
111+
metrics: metrics,
146112
}
147113
}
148114

@@ -368,7 +334,7 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) ProcessIncluded
368334
return nil
369335
}
370336
// Add newly confirmed transactions to the prom metric
371-
promNumConfirmedTxs.WithLabelValues(ec.chainID.String()).Add(float64(len(includedTxs)))
337+
ec.metrics.IncrementNumConfirmedTxs(ctx, len(includedTxs))
372338

373339
purgeTxIDs := make([]int64, 0, len(includedTxs))
374340
confirmedTxIDs := make([]int64, 0, len(includedTxs))
@@ -381,7 +347,7 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) ProcessIncluded
381347
continue
382348
}
383349
confirmedTxIDs = append(confirmedTxIDs, tx.ID)
384-
observeUntilTxConfirmed(ec.chainID, tx.TxAttempts, head)
350+
observeUntilTxConfirmed(ctx, ec.metrics, tx, head)
385351
}
386352
// Mark the transactions included on-chain with a purge attempt as fatal error with the terminally stuck error message
387353
if err := ec.txStore.UpdateTxFatalError(ctx, purgeTxIDs, ec.stuckTxDetector.StuckTxFatalError()); err != nil {
@@ -667,13 +633,13 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) bumpGas(ctx con
667633
// if no error, return attempt
668634
// if err, continue below
669635
if err == nil {
670-
promNumGasBumps.WithLabelValues(ec.chainID.String()).Inc()
636+
ec.metrics.IncrementNumGasBumps(ctx)
671637
ec.lggr.Debugw("Rebroadcast bumping fee for tx", append(logFields, "bumpedFee", bumpedFee.String(), "bumpedFeeLimit", bumpedFeeLimit)...)
672638
return bumpedAttempt, err
673639
}
674640

675641
if errors.Is(err, fees.ErrBumpFeeExceedsLimit) {
676-
promGasBumpExceedsLimit.WithLabelValues(ec.chainID.String()).Inc()
642+
ec.metrics.IncrementGasBumpExceedsLimit(ctx)
677643
}
678644

679645
return bumpedAttempt, fmt.Errorf("error bumping gas: %w", err)
@@ -712,7 +678,7 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) handleInProgres
712678
if err != nil {
713679
return fmt.Errorf("could not bump gas for terminally underpriced transaction: %w", err)
714680
}
715-
promNumGasBumps.WithLabelValues(ec.chainID.String()).Inc()
681+
ec.metrics.IncrementNumGasBumps(ctx)
716682
lggr.With(
717683
"sendError", sendError,
718684
"maxGasPriceConfig", ec.feeConfig.MaxFeePrice(),
@@ -853,38 +819,35 @@ func (ec *Confirmer[CID, HEAD, ADDR, THASH, BHASH, R, SEQ, FEE]) sendEmptyTransa
853819
return txhash, nil
854820
}
855821

856-
// observeUntilTxConfirmed observes the promBlocksUntilTxConfirmed metric for each confirmed
857-
// transaction.
822+
// observeUntilTxConfirmed observes the timeUntilTxConfirmed and blocksUntilTxConfirmed metrics for each confirmed transaction.
858823
func observeUntilTxConfirmed[
859824
CHAIN_ID chains.ID,
860825
ADDR chains.Hashable,
861826
TX_HASH, BLOCK_HASH chains.Hashable,
862827
SEQ chains.Sequence,
863828
FEE fees.Fee,
864-
](chainID CHAIN_ID, attempts []types.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head chains.Head[BLOCK_HASH]) {
865-
for _, attempt := range attempts {
866-
// We estimate the time until confirmation by subtracting from the time the tx (not the attempt)
867-
// was created. We want to measure the amount of time taken from when a transaction is created
868-
// via e.g Txm.CreateTransaction to when it is confirmed on-chain, regardless of how many attempts
869-
// were needed to achieve this.
870-
duration := time.Since(attempt.Tx.CreatedAt)
871-
promTimeUntilTxConfirmed.
872-
WithLabelValues(chainID.String()).
873-
Observe(float64(duration))
874-
875-
// Since a tx can have many attempts, we take the number of blocks to confirm as the block number
876-
// of the receipt minus the block number of the first ever broadcast for this transaction.
877-
var minBroadcastBefore int64
878-
for _, a := range attempt.Tx.TxAttempts {
879-
if b := a.BroadcastBeforeBlockNum; b != nil && *b < minBroadcastBefore {
880-
minBroadcastBefore = *b
881-
}
882-
}
883-
if minBroadcastBefore > 0 {
884-
blocksElapsed := head.BlockNumber() - minBroadcastBefore
885-
promBlocksUntilTxConfirmed.
886-
WithLabelValues(chainID.String()).
887-
Observe(float64(blocksElapsed))
829+
](ctx context.Context, metrics confimerMetrics, tx *types.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], head chains.Head[BLOCK_HASH]) {
830+
if tx == nil {
831+
return
832+
}
833+
// We estimate the time until confirmation by subtracting from the time the tx (not the attempt)
834+
// was created. We want to measure the amount of time taken from when a transaction is created
835+
// via e.g Txm.CreateTransaction to when it is confirmed on-chain, regardless of how many attempts
836+
// were needed to achieve this.
837+
duration := time.Since(tx.CreatedAt)
838+
metrics.RecordTimeUntilTxConfirmed(ctx, float64(duration))
839+
840+
// Since a tx can have many attempts, we take the number of blocks to confirm as the current block number
841+
// minus the block number of the first ever broadcast for this transaction.
842+
var minBroadcastBefore int64
843+
for _, a := range tx.TxAttempts {
844+
if b := a.BroadcastBeforeBlockNum; b != nil && *b < minBroadcastBefore {
845+
minBroadcastBefore = *b
888846
}
889847
}
848+
849+
if minBroadcastBefore > 0 {
850+
blocksElapsed := head.BlockNumber() - minBroadcastBefore
851+
metrics.RecordBlocksUntilTxConfirmed(ctx, float64(blocksElapsed))
852+
}
890853
}

metrics/chain_families.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package metrics
22

33
const (
4-
EVM = "evm"
4+
EVM = "EVM"
55
Solana = "solana"
66
)

metrics/go.mod

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,66 @@ module github.com/smartcontractkit/chainlink-framework/metrics
22

33
go 1.24.1
44

5-
require github.com/prometheus/client_golang v1.22.0
5+
require (
6+
github.com/prometheus/client_golang v1.22.0
7+
github.com/smartcontractkit/chainlink-common v0.7.0
8+
github.com/stretchr/testify v1.10.0
9+
go.opentelemetry.io/otel v1.35.0
10+
go.opentelemetry.io/otel/metric v1.35.0
11+
)
612

713
require (
814
github.com/beorn7/perks v1.0.1 // indirect
15+
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
916
github.com/cespare/xxhash/v2 v2.3.0 // indirect
17+
github.com/cloudevents/sdk-go/binding/format/protobuf/v2 v2.15.2 // indirect
18+
github.com/cloudevents/sdk-go/v2 v2.16.0 // indirect
19+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
20+
github.com/go-logr/logr v1.4.2 // indirect
21+
github.com/go-logr/stdr v1.2.2 // indirect
22+
github.com/go-playground/locales v0.13.0 // indirect
23+
github.com/go-playground/universal-translator v0.17.0 // indirect
24+
github.com/go-playground/validator/v10 v10.4.1 // indirect
25+
github.com/google/uuid v1.6.0 // indirect
26+
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect
27+
github.com/json-iterator/go v1.1.12 // indirect
28+
github.com/kylelemons/godebug v1.1.0 // indirect
29+
github.com/leodido/go-urn v1.2.0 // indirect
30+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
31+
github.com/modern-go/reflect2 v1.0.2 // indirect
32+
github.com/mr-tron/base58 v1.2.0 // indirect
1033
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
34+
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
1135
github.com/prometheus/client_model v0.6.1 // indirect
12-
github.com/prometheus/common v0.62.0 // indirect
13-
github.com/prometheus/procfs v0.15.1 // indirect
14-
golang.org/x/sys v0.30.0 // indirect
15-
google.golang.org/protobuf v1.36.5 // indirect
36+
github.com/prometheus/common v0.63.0 // indirect
37+
github.com/prometheus/procfs v0.16.0 // indirect
38+
github.com/smartcontractkit/libocr v0.0.0-20250220133800-f3b940c4f298 // indirect
39+
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
40+
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.0.0-20240823153156-2a54df7bffb9 // indirect
41+
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.6.0 // indirect
42+
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.35.0 // indirect
43+
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.30.0 // indirect
44+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 // indirect
45+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 // indirect
46+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 // indirect
47+
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.4.0 // indirect
48+
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.28.0 // indirect
49+
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 // indirect
50+
go.opentelemetry.io/otel/log v0.6.0 // indirect
51+
go.opentelemetry.io/otel/sdk v1.35.0 // indirect
52+
go.opentelemetry.io/otel/sdk/log v0.6.0 // indirect
53+
go.opentelemetry.io/otel/sdk/metric v1.35.0 // indirect
54+
go.opentelemetry.io/otel/trace v1.35.0 // indirect
55+
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
56+
go.uber.org/multierr v1.11.0 // indirect
57+
go.uber.org/zap v1.27.0 // indirect
58+
golang.org/x/crypto v0.36.0 // indirect
59+
golang.org/x/net v0.38.0 // indirect
60+
golang.org/x/sys v0.31.0 // indirect
61+
golang.org/x/text v0.23.0 // indirect
62+
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect
63+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect
64+
google.golang.org/grpc v1.71.0 // indirect
65+
google.golang.org/protobuf v1.36.6 // indirect
66+
gopkg.in/yaml.v3 v3.0.1 // indirect
1667
)

0 commit comments

Comments
 (0)