Skip to content

Commit 77e7770

Browse files
authored
feat: implement message metrics (#39)
1 parent c4b524f commit 77e7770

File tree

6 files changed

+198
-8
lines changed

6 files changed

+198
-8
lines changed

mock/relayer.go

+35
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

observability/metrics.go

+13-3
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ func initSecondView() sdkmetric.View {
7878
1.0, // 1 s
7979
5.0, // 5 s
8080
10.0, // 10 s
81+
100.0, // 100s
82+
1000.0, // 1000s
83+
10000.0, // 10000s
8184
},
8285
NoMinMax: false,
8386
},
@@ -112,6 +115,7 @@ func initGasView() sdkmetric.View {
112115

113116
type RelayerMetrics struct {
114117
*metrics.SystemMetrics
118+
*metrics.MessageMetrics
115119
*metrics.ChainMetrics
116120

117121
Opts api.MeasurementOption
@@ -126,14 +130,20 @@ func NewRelayerMetrics(ctx context.Context, meter metric.Meter, attributes ...at
126130
return nil, err
127131
}
128132

133+
messageMetrics, err := metrics.NewMessageMetrics(ctx, meter, opts)
134+
if err != nil {
135+
return nil, err
136+
}
137+
129138
chainMetrics, err := metrics.NewChainMetrics(ctx, meter, opts)
130139
if err != nil {
131140
return nil, err
132141
}
133142

134143
return &RelayerMetrics{
135-
SystemMetrics: systemMetrics,
136-
ChainMetrics: chainMetrics,
137-
Opts: opts,
144+
SystemMetrics: systemMetrics,
145+
ChainMetrics: chainMetrics,
146+
MessageMetrics: messageMetrics,
147+
Opts: opts,
138148
}, err
139149
}

observability/metrics/message.go

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package metrics
2+
3+
import (
4+
"context"
5+
"time"
6+
"unsafe"
7+
8+
"github.com/sygmaprotocol/sygma-core/relayer/message"
9+
"go.opentelemetry.io/otel/attribute"
10+
"go.opentelemetry.io/otel/metric"
11+
)
12+
13+
type MessageMetrics struct {
14+
opts metric.MeasurementOption
15+
16+
totalMessageCounter metric.Int64Counter
17+
failedMessageCounter metric.Int64Counter
18+
successfulMessageCounter metric.Int64Counter
19+
latencyHistogram metric.Float64Histogram
20+
transactionSizeHistogram metric.Int64Histogram
21+
}
22+
23+
// NewMessageMetrics initializes metrics that insight into relayer message handling performance
24+
func NewMessageMetrics(ctx context.Context, meter metric.Meter, opts metric.MeasurementOption) (*MessageMetrics, error) {
25+
totalMessageCounter, err := meter.Int64Counter(
26+
"relayer.TotalMessageCount",
27+
metric.WithDescription("Total number of messages the relayer has processed."),
28+
)
29+
if err != nil {
30+
return nil, err
31+
}
32+
failedMessageCounter, err := meter.Int64Counter(
33+
"relayer.FailedMessageCount",
34+
metric.WithDescription("Number of messages that have failed."),
35+
)
36+
if err != nil {
37+
return nil, err
38+
}
39+
successfulMessageCounter, err := meter.Int64Counter(
40+
"relayer.SuccessfulMessageCount",
41+
metric.WithDescription("Number of messages that were relayed successfully."),
42+
)
43+
if err != nil {
44+
return nil, err
45+
}
46+
47+
latencyHistogram, err := meter.Float64Histogram(
48+
"relayer.LatencySeconds",
49+
metric.WithDescription("Time taken to relay messages."),
50+
metric.WithUnit("s"),
51+
)
52+
if err != nil {
53+
return nil, err
54+
}
55+
56+
transactionSizeHistogram, err := meter.Int64Histogram(
57+
"relayer.MessageSizeBytes",
58+
metric.WithDescription("Sizes of messages processed."),
59+
)
60+
if err != nil {
61+
return nil, err
62+
}
63+
64+
return &MessageMetrics{
65+
opts: opts,
66+
totalMessageCounter: totalMessageCounter,
67+
failedMessageCounter: failedMessageCounter,
68+
successfulMessageCounter: successfulMessageCounter,
69+
latencyHistogram: latencyHistogram,
70+
transactionSizeHistogram: transactionSizeHistogram,
71+
}, nil
72+
}
73+
74+
func (m *MessageMetrics) TrackMessages(msgs []*message.Message, status message.MessageStatus) {
75+
switch status {
76+
case message.PendingMessage:
77+
m.totalMessageCounter.Add(
78+
context.Background(),
79+
int64(len(msgs)),
80+
metric.WithAttributes(attribute.Int64("source", int64(msgs[0].Source))),
81+
metric.WithAttributes(attribute.Int64("destination", int64(msgs[0].Destination))))
82+
for _, msg := range msgs {
83+
m.transactionSizeHistogram.Record(
84+
context.Background(),
85+
int64(unsafe.Sizeof(msg)))
86+
}
87+
case message.FailedMessage:
88+
m.failedMessageCounter.Add(
89+
context.Background(),
90+
int64(len(msgs)),
91+
metric.WithAttributes(attribute.Int64("source", int64(msgs[0].Source))),
92+
metric.WithAttributes(attribute.Int64("destination", int64(msgs[0].Destination))))
93+
case message.SuccessfulMessage:
94+
m.successfulMessageCounter.Add(
95+
context.Background(),
96+
int64(len(msgs)),
97+
metric.WithAttributes(attribute.Int64("source", int64(msgs[0].Source))),
98+
metric.WithAttributes(attribute.Int64("destination", int64(msgs[0].Destination))))
99+
for _, msg := range msgs {
100+
m.latencyHistogram.Record(
101+
context.Background(),
102+
time.Since(msg.Timestamp).Seconds(),
103+
metric.WithAttributes(attribute.Int64("source", int64(msgs[0].Source))),
104+
metric.WithAttributes(attribute.Int64("destination", int64(msgs[0].Destination))))
105+
}
106+
}
107+
}

relayer/message/message.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,37 @@
11
package message
22

3+
import "time"
4+
5+
type MessageStatus string
6+
7+
const (
8+
SuccessfulMessage MessageStatus = "successful"
9+
FailedMessage MessageStatus = "failed"
10+
PendingMessage MessageStatus = "pending"
11+
)
12+
313
type MessageType string
414
type Message struct {
515
Source uint8 // Source where message was initiated
616
Destination uint8 // Destination chain of message
717
Data interface{} // Data associated with the message
818
ID string // ID is used to track and identify message across networks
919
Type MessageType // Message type
20+
Timestamp time.Time //
1021
}
1122

12-
func NewMessage(source, destination uint8, data interface{}, id string, msgType MessageType) *Message {
23+
func NewMessage(
24+
source, destination uint8,
25+
data interface{},
26+
id string,
27+
msgType MessageType,
28+
timestamp time.Time) *Message {
1329
return &Message{
1430
Source: source,
1531
Destination: destination,
1632
Data: data,
1733
Type: msgType,
34+
Timestamp: timestamp,
1835
ID: id,
1936
}
2037
}

relayer/relayer.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,20 @@ type RelayedChain interface {
2323
DomainID() uint8
2424
}
2525

26-
func NewRelayer(chains map[uint8]RelayedChain) *Relayer {
27-
return &Relayer{relayedChains: chains}
26+
type MessageTracker interface {
27+
TrackMessages(msgs []*message.Message, status message.MessageStatus)
28+
}
29+
30+
func NewRelayer(chains map[uint8]RelayedChain, messageTracker MessageTracker) *Relayer {
31+
return &Relayer{
32+
relayedChains: chains,
33+
messageTracker: messageTracker,
34+
}
2835
}
2936

3037
type Relayer struct {
31-
relayedChains map[uint8]RelayedChain
38+
relayedChains map[uint8]RelayedChain
39+
messageTracker MessageTracker
3240
}
3341

3442
// Start function starts polling events for each chain and listens to cross-chain messages.
@@ -55,6 +63,7 @@ func (r *Relayer) Start(ctx context.Context, msgChan chan []*message.Message) {
5563

5664
// Route function routes the messages to the destination chain.
5765
func (r *Relayer) route(msgs []*message.Message) {
66+
r.messageTracker.TrackMessages(msgs, message.PendingMessage)
5867
destChain, ok := r.relayedChains[msgs[0].Destination]
5968
if !ok {
6069
log.Error().Uint8("domainID", msgs[0].Destination).Msgf("No chain registered for destination domain")
@@ -69,6 +78,7 @@ func (r *Relayer) route(msgs []*message.Message) {
6978
prop, err := destChain.ReceiveMessage(m)
7079
if err != nil {
7180
log.Err(err).Msgf("Failed receiving message %+v", m)
81+
r.messageTracker.TrackMessages([]*message.Message{m}, message.FailedMessage)
7282
continue
7383
}
7484

@@ -85,7 +95,9 @@ func (r *Relayer) route(msgs []*message.Message) {
8595
log.Debug().Msgf("Writing message")
8696
err := destChain.Write(props)
8797
if err != nil {
98+
r.messageTracker.TrackMessages(msgs, message.FailedMessage)
8899
log.Err(err).Msgf("Failed writing message")
89100
return
90101
}
102+
r.messageTracker.TrackMessages(msgs, message.SuccessfulMessage)
91103
}

relayer/relayer_test.go

+10-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ import (
1515

1616
type RouteTestSuite struct {
1717
suite.Suite
18-
mockRelayedChain *mock.MockRelayedChain
18+
mockRelayedChain *mock.MockRelayedChain
19+
mockMessageTracker *mock.MockMessageTracker
1920
}
2021

2122
func TestRunRouteTestSuite(t *testing.T) {
@@ -27,6 +28,8 @@ func (s *RouteTestSuite) TearDownSuite() {}
2728
func (s *RouteTestSuite) SetupTest() {
2829
gomockController := gomock.NewController(s.T())
2930
s.mockRelayedChain = mock.NewMockRelayedChain(gomockController)
31+
s.mockMessageTracker = mock.NewMockMessageTracker(gomockController)
32+
s.mockMessageTracker.EXPECT().TrackMessages(gomock.Any(), gomock.Any()).AnyTimes()
3033
}
3134
func (s *RouteTestSuite) TearDownTest() {}
3235

@@ -44,6 +47,7 @@ func (s *RouteTestSuite) TestStartListensOnChannel() {
4447
chains[1] = s.mockRelayedChain
4548
relayer := NewRelayer(
4649
chains,
50+
s.mockMessageTracker,
4751
)
4852

4953
msgChan := make(chan []*message.Message, 1)
@@ -61,6 +65,7 @@ func (s *RouteTestSuite) TestReceiveMessageFails() {
6165
chains[1] = s.mockRelayedChain
6266
relayer := NewRelayer(
6367
chains,
68+
s.mockMessageTracker,
6469
)
6570

6671
relayer.route([]*message.Message{
@@ -75,6 +80,7 @@ func (s *RouteTestSuite) TestAvoidWriteWithoutProposals() {
7580
chains[1] = s.mockRelayedChain
7681
relayer := NewRelayer(
7782
chains,
83+
s.mockMessageTracker,
7884
)
7985

8086
relayer.route([]*message.Message{
@@ -93,6 +99,7 @@ func (s *RouteTestSuite) TestWriteFails() {
9399
chains[1] = s.mockRelayedChain
94100
relayer := NewRelayer(
95101
chains,
102+
s.mockMessageTracker,
96103
)
97104

98105
relayer.route([]*message.Message{
@@ -111,6 +118,7 @@ func (s *RouteTestSuite) TestWritesToChain() {
111118
chains[1] = s.mockRelayedChain
112119
relayer := NewRelayer(
113120
chains,
121+
s.mockMessageTracker,
114122
)
115123

116124
relayer.route([]*message.Message{
@@ -126,6 +134,7 @@ func (s *RouteTestSuite) Test_Route_ChainDoesNotExist() {
126134
chains[1] = s.mockRelayedChain
127135
relayer := NewRelayer(
128136
chains,
137+
s.mockMessageTracker,
129138
)
130139

131140
relayer.route([]*message.Message{

0 commit comments

Comments
 (0)