Skip to content

Commit 6dab51f

Browse files
committed
add prom metrics
1 parent 243e63b commit 6dab51f

File tree

2 files changed

+49
-19
lines changed

2 files changed

+49
-19
lines changed

cmd/p2p/sensor/sensor.go

+32-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ import (
2222
"github.com/ethereum/go-ethereum/p2p/enode"
2323
"github.com/ethereum/go-ethereum/p2p/nat"
2424
"github.com/ethereum/go-ethereum/rpc"
25+
"github.com/prometheus/client_golang/prometheus"
26+
"github.com/prometheus/client_golang/prometheus/promauto"
27+
"github.com/prometheus/client_golang/prometheus/promhttp"
2528
"github.com/rs/zerolog/log"
2629
"github.com/spf13/cobra"
2730

@@ -47,6 +50,8 @@ type (
4750
ShouldWriteTransactionEvents bool
4851
ShouldRunPprof bool
4952
PprofPort uint
53+
ShouldRunPrometheus bool
54+
PrometheusPort uint
5055
KeyFile string
5156
Port int
5257
DiscoveryPort int
@@ -106,13 +111,23 @@ var SensorCmd = &cobra.Command{
106111

107112
if inputSensorParams.ShouldRunPprof {
108113
go func() {
109-
addr := fmt.Sprintf("0.0.0.0:%v", inputSensorParams.PprofPort)
114+
addr := fmt.Sprintf(":%v", inputSensorParams.PprofPort)
110115
if pprofErr := http.ListenAndServe(addr, nil); pprofErr != nil {
111116
log.Error().Err(pprofErr).Msg("Failed to start pprof")
112117
}
113118
}()
114119
}
115120

121+
if inputSensorParams.ShouldRunPrometheus {
122+
go func() {
123+
http.Handle("/metrics", promhttp.Handler())
124+
addr := fmt.Sprintf(":%v", inputSensorParams.PrometheusPort)
125+
if promErr := http.ListenAndServe(addr, nil); promErr != nil {
126+
log.Error().Err(promErr).Msg("Failed to start Prometheus handler")
127+
}
128+
}()
129+
}
130+
116131
inputSensorParams.privateKey, err = crypto.GenerateKey()
117132
if err != nil {
118133
return err
@@ -174,6 +189,18 @@ var SensorCmd = &cobra.Command{
174189
Number: block.Number.ToUint64(),
175190
}
176191

192+
peersGauge := promauto.NewGauge(prometheus.GaugeOpts{
193+
Namespace: "sensor",
194+
Name: "peers",
195+
Help: "The number of peers the sensor is connected to",
196+
})
197+
198+
msgCounter := promauto.NewCounterVec(prometheus.CounterOpts{
199+
Namespace: "sensor",
200+
Name: "messages",
201+
Help: "The number and type of messages the sensor has received",
202+
}, []string{"code", "message"})
203+
177204
opts := p2p.EthProtocolOptions{
178205
Context: cmd.Context(),
179206
Database: db,
@@ -185,8 +212,8 @@ var SensorCmd = &cobra.Command{
185212
Peers: make(chan *enode.Node),
186213
Head: &head,
187214
HeadMutex: &sync.RWMutex{},
188-
Count: &p2p.MessageCount{},
189215
ForkID: forkid.ID{Hash: [4]byte(inputSensorParams.ForkID)},
216+
MsgCounter: msgCounter,
190217
}
191218

192219
config := ethp2p.Config{
@@ -243,9 +270,7 @@ var SensorCmd = &cobra.Command{
243270
for {
244271
select {
245272
case <-ticker.C:
246-
count := opts.Count.Load()
247-
opts.Count.Clear()
248-
log.Info().Interface("peers", server.PeerCount()).Interface("counts", count).Send()
273+
peersGauge.Set(float64(server.PeerCount()))
249274
case peer := <-opts.Peers:
250275
// Update the peer list and the nodes file.
251276
if _, ok := peers[peer.ID()]; !ok {
@@ -327,6 +352,8 @@ increase CPU and memory usage.`)
327352
significantly increase CPU and memory usage.`)
328353
SensorCmd.Flags().BoolVar(&inputSensorParams.ShouldRunPprof, "pprof", false, "Whether to run pprof")
329354
SensorCmd.Flags().UintVar(&inputSensorParams.PprofPort, "pprof-port", 6060, "Port pprof runs on")
355+
SensorCmd.Flags().BoolVar(&inputSensorParams.ShouldRunPrometheus, "prom", true, "Whether to run Prometheus")
356+
SensorCmd.Flags().UintVar(&inputSensorParams.PrometheusPort, "prom-port", 2112, "Port Prometheus runs on")
330357
SensorCmd.Flags().StringVarP(&inputSensorParams.KeyFile, "key-file", "k", "", "Private key file")
331358
SensorCmd.Flags().IntVar(&inputSensorParams.Port, "port", 30303, "TCP network listening port")
332359
SensorCmd.Flags().IntVar(&inputSensorParams.DiscoveryPort, "discovery-port", 30303, "UDP P2P discovery port")

p2p/protocol.go

+17-14
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"fmt"
99
"math/big"
1010
"sync"
11-
"sync/atomic"
1211
"time"
1312

1413
"github.com/ethereum/go-ethereum/common"
@@ -18,6 +17,7 @@ import (
1817
"github.com/ethereum/go-ethereum/eth/protocols/eth"
1918
ethp2p "github.com/ethereum/go-ethereum/p2p"
2019
"github.com/ethereum/go-ethereum/p2p/enode"
20+
"github.com/prometheus/client_golang/prometheus"
2121
"github.com/rs/zerolog"
2222
"github.com/rs/zerolog/log"
2323

@@ -33,7 +33,7 @@ type conn struct {
3333
db database.Database
3434
head *HeadBlock
3535
headMutex *sync.RWMutex
36-
count *MessageCount
36+
counter *prometheus.CounterVec
3737

3838
// requests is used to store the request ID and the block hash. This is used
3939
// when fetching block bodies because the eth protocol block bodies do not
@@ -56,8 +56,8 @@ type EthProtocolOptions struct {
5656
SensorID string
5757
NetworkID uint64
5858
Peers chan *enode.Node
59-
Count *MessageCount
6059
ForkID forkid.ID
60+
MsgCounter *prometheus.CounterVec
6161

6262
// Head keeps track of the current head block of the chain. This is required
6363
// when doing the status exchange.
@@ -91,7 +91,7 @@ func NewEthProtocol(version uint, opts EthProtocolOptions) ethp2p.Protocol {
9191
requestNum: 0,
9292
head: opts.Head,
9393
headMutex: opts.HeadMutex,
94-
count: opts.Count,
94+
counter: opts.MsgCounter,
9595
}
9696

9797
c.headMutex.RLock()
@@ -296,7 +296,7 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error {
296296
return err
297297
}
298298

299-
atomic.AddInt32(&c.count.BlockHashes, int32(len(packet)))
299+
c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(packet)))
300300

301301
hashes := make([]common.Hash, 0, len(packet))
302302
for _, hash := range packet {
@@ -317,7 +317,7 @@ func (c *conn) handleTransactions(ctx context.Context, msg ethp2p.Msg) error {
317317
return err
318318
}
319319

320-
atomic.AddInt32(&c.count.Transactions, int32(len(txs)))
320+
c.counter.WithLabelValues(fmt.Sprint(msg.Code), txs.Name()).Add(float64(len(txs)))
321321

322322
c.db.WriteTransactions(ctx, c.node, txs)
323323

@@ -330,7 +330,7 @@ func (c *conn) handleGetBlockHeaders(msg ethp2p.Msg) error {
330330
return err
331331
}
332332

333-
atomic.AddInt32(&c.count.BlockHeaderRequests, 1)
333+
c.counter.WithLabelValues(fmt.Sprint(msg.Code), request.Name()).Inc()
334334

335335
return ethp2p.Send(
336336
c.rw,
@@ -346,7 +346,7 @@ func (c *conn) handleBlockHeaders(ctx context.Context, msg ethp2p.Msg) error {
346346
}
347347

348348
headers := packet.BlockHeadersRequest
349-
atomic.AddInt32(&c.count.BlockHeaders, int32(len(headers)))
349+
c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(headers)))
350350

351351
for _, header := range headers {
352352
if err := c.getParentBlock(ctx, header); err != nil {
@@ -365,7 +365,7 @@ func (c *conn) handleGetBlockBodies(msg ethp2p.Msg) error {
365365
return err
366366
}
367367

368-
atomic.AddInt32(&c.count.BlockBodiesRequests, int32(len(request.GetBlockBodiesRequest)))
368+
c.counter.WithLabelValues(fmt.Sprint(msg.Code), request.Name()).Add(float64(len(request.GetBlockBodiesRequest)))
369369

370370
return ethp2p.Send(
371371
c.rw,
@@ -384,7 +384,7 @@ func (c *conn) handleBlockBodies(ctx context.Context, msg ethp2p.Msg) error {
384384
return nil
385385
}
386386

387-
atomic.AddInt32(&c.count.BlockBodies, int32(len(packet.BlockBodiesResponse)))
387+
c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(packet.BlockBodiesResponse)))
388388

389389
var hash *common.Hash
390390
for e := c.requests.Front(); e != nil; e = e.Next() {
@@ -413,7 +413,7 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error {
413413
return err
414414
}
415415

416-
atomic.AddInt32(&c.count.Blocks, 1)
416+
c.counter.WithLabelValues(fmt.Sprint(msg.Code), block.Name()).Inc()
417417

418418
// Set the head block if newer.
419419
c.headMutex.Lock()
@@ -444,7 +444,7 @@ func (c *conn) handleGetPooledTransactions(msg ethp2p.Msg) error {
444444
return err
445445
}
446446

447-
atomic.AddInt32(&c.count.TransactionRequests, int32(len(request.GetPooledTransactionsRequest)))
447+
c.counter.WithLabelValues(fmt.Sprint(msg.Code), request.Name()).Add(float64(len(request.GetPooledTransactionsRequest)))
448448

449449
return ethp2p.Send(
450450
c.rw,
@@ -454,6 +454,7 @@ func (c *conn) handleGetPooledTransactions(msg ethp2p.Msg) error {
454454

455455
func (c *conn) handleNewPooledTransactionHashes(ctx context.Context, version uint, msg ethp2p.Msg) error {
456456
var hashes []common.Hash
457+
var name string
457458

458459
switch version {
459460
case 66, 67:
@@ -462,17 +463,19 @@ func (c *conn) handleNewPooledTransactionHashes(ctx context.Context, version uin
462463
return err
463464
}
464465
hashes = txs
466+
name = txs.Name()
465467
case 68:
466468
var txs eth.NewPooledTransactionHashesPacket68
467469
if err := msg.Decode(&txs); err != nil {
468470
return err
469471
}
470472
hashes = txs.Hashes
473+
name = txs.Name()
471474
default:
472475
return errors.New("protocol version not found")
473476
}
474477

475-
atomic.AddInt32(&c.count.TransactionHashes, int32(len(hashes)))
478+
c.counter.WithLabelValues(fmt.Sprint(msg.Code), name).Add(float64(len(hashes)))
476479

477480
if !c.db.ShouldWriteTransactions() || !c.db.ShouldWriteTransactionEvents() {
478481
return nil
@@ -491,7 +494,7 @@ func (c *conn) handlePooledTransactions(ctx context.Context, msg ethp2p.Msg) err
491494
return err
492495
}
493496

494-
atomic.AddInt32(&c.count.Transactions, int32(len(packet.PooledTransactionsResponse)))
497+
c.counter.WithLabelValues(fmt.Sprint(msg.Code), packet.Name()).Add(float64(len(packet.PooledTransactionsResponse)))
495498

496499
c.db.WriteTransactions(ctx, c.node, packet.PooledTransactionsResponse)
497500

0 commit comments

Comments
 (0)