Skip to content

feat: implement message metrics #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions mock/relayer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 13 additions & 3 deletions observability/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func initSecondView() sdkmetric.View {
1.0, // 1 s
5.0, // 5 s
10.0, // 10 s
100.0, // 100s
1000.0, // 1000s
10000.0, // 10000s
},
NoMinMax: false,
},
Expand Down Expand Up @@ -112,6 +115,7 @@ func initGasView() sdkmetric.View {

type RelayerMetrics struct {
*metrics.SystemMetrics
*metrics.MessageMetrics
*metrics.ChainMetrics

Opts api.MeasurementOption
Expand All @@ -126,14 +130,20 @@ func NewRelayerMetrics(ctx context.Context, meter metric.Meter, attributes ...at
return nil, err
}

messageMetrics, err := metrics.NewMessageMetrics(ctx, meter, opts)
if err != nil {
return nil, err
}

chainMetrics, err := metrics.NewChainMetrics(ctx, meter, opts)
if err != nil {
return nil, err
}

return &RelayerMetrics{
SystemMetrics: systemMetrics,
ChainMetrics: chainMetrics,
Opts: opts,
SystemMetrics: systemMetrics,
ChainMetrics: chainMetrics,
MessageMetrics: messageMetrics,
Opts: opts,
}, err
}
107 changes: 107 additions & 0 deletions observability/metrics/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package metrics

import (
"context"
"time"
"unsafe"

"github.com/sygmaprotocol/sygma-core/relayer/message"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

type MessageMetrics struct {
opts metric.MeasurementOption

totalMessageCounter metric.Int64Counter
failedMessageCounter metric.Int64Counter
successfulMessageCounter metric.Int64Counter
latencyHistogram metric.Float64Histogram
transactionSizeHistogram metric.Int64Histogram
}

// NewMessageMetrics initializes metrics that insight into relayer message handling performance
func NewMessageMetrics(ctx context.Context, meter metric.Meter, opts metric.MeasurementOption) (*MessageMetrics, error) {
totalMessageCounter, err := meter.Int64Counter(
"relayer.TotalMessageCount",
metric.WithDescription("Total number of messages the relayer has processed."),
)
if err != nil {
return nil, err
}
failedMessageCounter, err := meter.Int64Counter(
"relayer.FailedMessageCount",
metric.WithDescription("Number of messages that have failed."),
)
if err != nil {
return nil, err
}
successfulMessageCounter, err := meter.Int64Counter(
"relayer.SuccessfulMessageCount",
metric.WithDescription("Number of messages that were relayed successfully."),
)
if err != nil {
return nil, err
}

latencyHistogram, err := meter.Float64Histogram(
"relayer.LatencySeconds",
metric.WithDescription("Time taken to relay messages."),
metric.WithUnit("s"),
)
if err != nil {
return nil, err
}

transactionSizeHistogram, err := meter.Int64Histogram(
"relayer.MessageSizeBytes",
metric.WithDescription("Sizes of messages processed."),
)
if err != nil {
return nil, err
}

return &MessageMetrics{
opts: opts,
totalMessageCounter: totalMessageCounter,
failedMessageCounter: failedMessageCounter,
successfulMessageCounter: successfulMessageCounter,
latencyHistogram: latencyHistogram,
transactionSizeHistogram: transactionSizeHistogram,
}, nil
}

func (m *MessageMetrics) TrackMessages(msgs []*message.Message, status message.MessageStatus) {
switch status {
case message.PendingMessage:
m.totalMessageCounter.Add(
context.Background(),
int64(len(msgs)),
metric.WithAttributes(attribute.Int64("source", int64(msgs[0].Source))),
metric.WithAttributes(attribute.Int64("destination", int64(msgs[0].Destination))))
for _, msg := range msgs {
m.transactionSizeHistogram.Record(
context.Background(),
int64(unsafe.Sizeof(msg)))
}
case message.FailedMessage:
m.failedMessageCounter.Add(
context.Background(),
int64(len(msgs)),
metric.WithAttributes(attribute.Int64("source", int64(msgs[0].Source))),
metric.WithAttributes(attribute.Int64("destination", int64(msgs[0].Destination))))
case message.SuccessfulMessage:
m.successfulMessageCounter.Add(
context.Background(),
int64(len(msgs)),
metric.WithAttributes(attribute.Int64("source", int64(msgs[0].Source))),
metric.WithAttributes(attribute.Int64("destination", int64(msgs[0].Destination))))
for _, msg := range msgs {
m.latencyHistogram.Record(
context.Background(),
time.Since(msg.Timestamp).Seconds(),
metric.WithAttributes(attribute.Int64("source", int64(msgs[0].Source))),
metric.WithAttributes(attribute.Int64("destination", int64(msgs[0].Destination))))
}
}
}
19 changes: 18 additions & 1 deletion relayer/message/message.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,37 @@
package message

import "time"

type MessageStatus string

const (
SuccessfulMessage MessageStatus = "successful"
FailedMessage MessageStatus = "failed"
PendingMessage MessageStatus = "pending"
)

type MessageType string
type Message struct {
Source uint8 // Source where message was initiated
Destination uint8 // Destination chain of message
Data interface{} // Data associated with the message
ID string // ID is used to track and identify message across networks
Type MessageType // Message type
Timestamp time.Time //
}

func NewMessage(source, destination uint8, data interface{}, id string, msgType MessageType) *Message {
func NewMessage(
source, destination uint8,
data interface{},
id string,
msgType MessageType,
timestamp time.Time) *Message {
return &Message{
Source: source,
Destination: destination,
Data: data,
Type: msgType,
Timestamp: timestamp,
ID: id,
}
}
18 changes: 15 additions & 3 deletions relayer/relayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,20 @@ type RelayedChain interface {
DomainID() uint8
}

func NewRelayer(chains map[uint8]RelayedChain) *Relayer {
return &Relayer{relayedChains: chains}
type MessageTracker interface {
TrackMessages(msgs []*message.Message, status message.MessageStatus)
}

func NewRelayer(chains map[uint8]RelayedChain, messageTracker MessageTracker) *Relayer {
return &Relayer{
relayedChains: chains,
messageTracker: messageTracker,
}
}

type Relayer struct {
relayedChains map[uint8]RelayedChain
relayedChains map[uint8]RelayedChain
messageTracker MessageTracker
}

// Start function starts polling events for each chain and listens to cross-chain messages.
Expand All @@ -55,6 +63,7 @@ func (r *Relayer) Start(ctx context.Context, msgChan chan []*message.Message) {

// Route function routes the messages to the destination chain.
func (r *Relayer) route(msgs []*message.Message) {
r.messageTracker.TrackMessages(msgs, message.PendingMessage)
destChain, ok := r.relayedChains[msgs[0].Destination]
if !ok {
log.Error().Uint8("domainID", msgs[0].Destination).Msgf("No chain registered for destination domain")
Expand All @@ -69,6 +78,7 @@ func (r *Relayer) route(msgs []*message.Message) {
prop, err := destChain.ReceiveMessage(m)
if err != nil {
log.Err(err).Msgf("Failed receiving message %+v", m)
r.messageTracker.TrackMessages([]*message.Message{m}, message.FailedMessage)
continue
}

Expand All @@ -85,7 +95,9 @@ func (r *Relayer) route(msgs []*message.Message) {
log.Debug().Msgf("Writing message")
err := destChain.Write(props)
if err != nil {
r.messageTracker.TrackMessages(msgs, message.FailedMessage)
log.Err(err).Msgf("Failed writing message")
return
}
r.messageTracker.TrackMessages(msgs, message.SuccessfulMessage)
}
11 changes: 10 additions & 1 deletion relayer/relayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (

type RouteTestSuite struct {
suite.Suite
mockRelayedChain *mock.MockRelayedChain
mockRelayedChain *mock.MockRelayedChain
mockMessageTracker *mock.MockMessageTracker
}

func TestRunRouteTestSuite(t *testing.T) {
Expand All @@ -27,6 +28,8 @@ func (s *RouteTestSuite) TearDownSuite() {}
func (s *RouteTestSuite) SetupTest() {
gomockController := gomock.NewController(s.T())
s.mockRelayedChain = mock.NewMockRelayedChain(gomockController)
s.mockMessageTracker = mock.NewMockMessageTracker(gomockController)
s.mockMessageTracker.EXPECT().TrackMessages(gomock.Any(), gomock.Any()).AnyTimes()
}
func (s *RouteTestSuite) TearDownTest() {}

Expand All @@ -44,6 +47,7 @@ func (s *RouteTestSuite) TestStartListensOnChannel() {
chains[1] = s.mockRelayedChain
relayer := NewRelayer(
chains,
s.mockMessageTracker,
)

msgChan := make(chan []*message.Message, 1)
Expand All @@ -61,6 +65,7 @@ func (s *RouteTestSuite) TestReceiveMessageFails() {
chains[1] = s.mockRelayedChain
relayer := NewRelayer(
chains,
s.mockMessageTracker,
)

relayer.route([]*message.Message{
Expand All @@ -75,6 +80,7 @@ func (s *RouteTestSuite) TestAvoidWriteWithoutProposals() {
chains[1] = s.mockRelayedChain
relayer := NewRelayer(
chains,
s.mockMessageTracker,
)

relayer.route([]*message.Message{
Expand All @@ -93,6 +99,7 @@ func (s *RouteTestSuite) TestWriteFails() {
chains[1] = s.mockRelayedChain
relayer := NewRelayer(
chains,
s.mockMessageTracker,
)

relayer.route([]*message.Message{
Expand All @@ -111,6 +118,7 @@ func (s *RouteTestSuite) TestWritesToChain() {
chains[1] = s.mockRelayedChain
relayer := NewRelayer(
chains,
s.mockMessageTracker,
)

relayer.route([]*message.Message{
Expand All @@ -126,6 +134,7 @@ func (s *RouteTestSuite) Test_Route_ChainDoesNotExist() {
chains[1] = s.mockRelayedChain
relayer := NewRelayer(
chains,
s.mockMessageTracker,
)

relayer.route([]*message.Message{
Expand Down
Loading