Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions components/metrics/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"

"github.com/ThreeDotsLabs/watermill/message"

"github.com/prometheus/client_golang/prometheus"
)

const (
Expand All @@ -26,7 +24,7 @@ var (
}
)

func labelsFromCtx(ctx context.Context, labels ...string) prometheus.Labels {
func labelsFromCtx(ctx context.Context, labels ...string) map[string]string {
ctxLabels := map[string]string{}

for _, l := range labels {
Expand Down
89 changes: 89 additions & 0 deletions components/metrics/opentelemetry_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package metrics

import (
"github.com/ThreeDotsLabs/watermill/internal"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/metric"
)

func NewOpenTelemetryMetricsBuilder(meter metric.Meter, namespace string, subsystem string) OpenTelemetryMetricsBuilder {
return OpenTelemetryMetricsBuilder{
Namespace: namespace,
Subsystem: subsystem,
meter: meter,
}
}

// OpenTelemetryMetricsBuilder provides methods to decorate publishers, subscribers and handlers.
type OpenTelemetryMetricsBuilder struct {
meter metric.Meter

Namespace string
Subsystem string
// PublishBuckets defines the histogram buckets for publish time histogram, defaulted if nil.
PublishBuckets []float64
// HandlerBuckets defines the histogram buckets for handle execution time histogram, defaulted to watermill's default.
HandlerBuckets []float64
}

// AddOpenTelemetryRouterMetrics is a convenience function that acts on the message router to add the metrics middleware
// to all its handlers. The handlers' publishers and subscribers are also decorated.
func (b OpenTelemetryMetricsBuilder) AddOpenTelemetryRouterMetrics(r *message.Router) {
r.AddPublisherDecorators(b.DecoratePublisher)
r.AddSubscriberDecorators(b.DecorateSubscriber)
r.AddMiddleware(b.NewRouterMiddleware().Middleware)
}

// DecoratePublisher wraps the underlying publisher with OpenTelemetry metrics.
func (b OpenTelemetryMetricsBuilder) DecoratePublisher(pub message.Publisher) (message.Publisher, error) {
var err error
d := PublisherOpenTelemetryMetricsDecorator{
pub: pub,
publisherName: internal.StructName(pub),
}

d.publishTimeSeconds, err = b.meter.Float64Histogram(
b.name("publish_time_seconds"),
metric.WithUnit("seconds"),
metric.WithDescription("The time that a publishing attempt (success or not) took in seconds"),
metric.WithExplicitBucketBoundaries(b.PublishBuckets...),
)

if err != nil {
return nil, errors.Wrap(err, "could not register publish time metric")
}
return d, nil
}

// DecorateSubscriber wraps the underlying subscriber with OpenTelemetry metrics.
func (b OpenTelemetryMetricsBuilder) DecorateSubscriber(sub message.Subscriber) (message.Subscriber, error) {
var err error
d := &SubscriberOpenTelemetryMetricsDecorator{
subscriberName: internal.StructName(sub),
}

d.subscriberMessagesReceivedTotal, err = b.meter.Int64Counter(
b.name("subscriber_messages_received_total"),
metric.WithDescription("The total number of messages received by the subscriber"),
)
if err != nil {
return nil, errors.Wrap(err, "could not register time to ack metric")
}

d.Subscriber, err = message.MessageTransformSubscriberDecorator(d.recordMetrics)(sub)
if err != nil {
return nil, errors.Wrap(err, "could not decorate subscriber with metrics decorator")
}

return d, nil
}
func (b OpenTelemetryMetricsBuilder) name(name string) string {
if b.Subsystem != "" {
name = b.Subsystem + "_" + name
}
if b.Namespace != "" {
name = b.Namespace + "_" + name
}
return name
}
86 changes: 86 additions & 0 deletions components/metrics/opentelemetry_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package metrics

import (
"time"

"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var (
handlerLabelKeys = []string{
labelKeyHandlerName,
labelSuccess,
}

// defaultHandlerExecutionTimeBuckets are one order of magnitude smaller than default buckets (5ms~10s),
// because the handler execution times are typically shorter (µs~ms range).
defaultHandlerExecutionTimeBuckets = []float64{
0.0005,
0.001,
0.0025,
0.005,
0.01,
0.025,
0.05,
0.1,
0.25,
0.5,
1,
}
)

// HandlerOpenTelemetryMetricsMiddleware is a middleware that captures OpenTelemetry metrics.
type HandlerOpenTelemetryMetricsMiddleware struct {
handlerExecutionTimeSeconds metric.Float64Histogram
}

// Middleware returns the middleware ready to be used with watermill's Router.
func (m HandlerOpenTelemetryMetricsMiddleware) Middleware(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) (msgs []*message.Message, err error) {
now := time.Now()
ctx := msg.Context()
labels := []attribute.KeyValue{
attribute.String(labelKeyHandlerName, message.HandlerNameFromCtx(ctx)),
}

defer func() {
if err != nil {
labels = append(labels, attribute.String(labelSuccess, "false"))
} else {
labels = append(labels, attribute.String(labelSuccess, "true"))
}
m.handlerExecutionTimeSeconds.Record(
ctx,
time.Since(now).Seconds(),
metric.WithAttributes(labels...),
)
}()

return h(msg)
}
}

// NewRouterMiddleware returns new middleware.
func (b OpenTelemetryMetricsBuilder) NewRouterMiddleware() HandlerOpenTelemetryMetricsMiddleware {
var err error
m := HandlerOpenTelemetryMetricsMiddleware{}

if b.HandlerBuckets == nil {
b.HandlerBuckets = defaultHandlerExecutionTimeBuckets
}

m.handlerExecutionTimeSeconds, err = b.meter.Float64Histogram(
b.name("handler_execution_time_seconds"),
metric.WithUnit("seconds"),
metric.WithDescription("The total time elapsed while executing the handler function in seconds"),
metric.WithExplicitBucketBoundaries(b.HandlerBuckets...),
)
if err != nil {
panic(errors.Wrap(err, "could not register handler execution time metric"))
}

return m
}
68 changes: 68 additions & 0 deletions components/metrics/opentelemetry_publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package metrics

import (
"time"

"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// PublisherOpenTelemetryMetricsDecorator decorates a publisher to capture OpenTelemetry metrics.
type PublisherOpenTelemetryMetricsDecorator struct {
pub message.Publisher
publisherName string
publishTimeSeconds metric.Float64Histogram
}

// Publish updates the relevant publisher metrics and calls the wrapped publisher's Publish.
func (m PublisherOpenTelemetryMetricsDecorator) Publish(topic string, messages ...*message.Message) (err error) {
if len(messages) == 0 {
return m.pub.Publish(topic)
}

// TODO: take ctx not only from first msg. Might require changing the signature of Publish, which is planned anyway.
ctx := messages[0].Context()
labelsMap := labelsFromCtx(ctx, publisherLabelKeys...)
if labelsMap[labelKeyPublisherName] == "" {
labelsMap[labelKeyPublisherName] = m.publisherName
}
if labelsMap[labelKeyHandlerName] == "" {
labelsMap[labelKeyHandlerName] = labelValueNoHandler
}
labels := make([]attribute.KeyValue, 0, len(labelsMap))
for k, v := range labelsMap {
labels = append(labels, attribute.String(k, v))
}
start := time.Now()

defer func() {
if publishAlreadyObserved(ctx) {
// decorator idempotency when applied decorator multiple times
return
}

if err != nil {
labels = append(labels, attribute.String(labelSuccess, "false"))
} else {
labels = append(labels, attribute.String(labelSuccess, "true"))
}

m.publishTimeSeconds.Record(
ctx,
time.Since(start).Seconds(),
metric.WithAttributes(labels...),
)
}()

for _, msg := range messages {
msg.SetContext(setPublishObservedToCtx(msg.Context()))
}

return m.pub.Publish(topic, messages...)
}

// Close decreases the total publisher count, closes the OpenTelemetry HTTP server and calls wrapped Close.
func (m PublisherOpenTelemetryMetricsDecorator) Close() error {
return m.pub.Close()
}
50 changes: 50 additions & 0 deletions components/metrics/opentelemetry_subscriber.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package metrics

import (
"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

// SubscriberOpenTelemetryMetricsDecorator decorates a subscriber to capture OpenTelemetry metrics.
type SubscriberOpenTelemetryMetricsDecorator struct {
message.Subscriber
subscriberName string
subscriberMessagesReceivedTotal metric.Int64Counter
}

func (s SubscriberOpenTelemetryMetricsDecorator) recordMetrics(msg *message.Message) {
if msg == nil {
return
}

ctx := msg.Context()
labelsMap := labelsFromCtx(ctx, subscriberLabelKeys...)
if labelsMap[labelKeySubscriberName] == "" {
labelsMap[labelKeySubscriberName] = s.subscriberName
}
if labelsMap[labelKeyHandlerName] == "" {
labelsMap[labelKeyHandlerName] = labelValueNoHandler
}
labels := make([]attribute.KeyValue, 0, len(labelsMap))
for k, v := range labelsMap {
labels = append(labels, attribute.String(k, v))
}

go func() {
if subscribeAlreadyObserved(ctx) {
// decorator idempotency when applied decorator multiple times
return
}

select {
case <-msg.Acked():
labels = append(labels, attribute.String(labelAcked, "acked"))
case <-msg.Nacked():
labels = append(labels, attribute.String(labelAcked, "nacked"))
}
s.subscriberMessagesReceivedTotal.Add(ctx, 1, metric.WithAttributes(labels...))
}()

msg.SetContext(setSubscribeObservedToCtx(msg.Context()))
}
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,6 @@ import (
"github.com/ThreeDotsLabs/watermill/message"
)

var (
handlerLabelKeys = []string{
labelKeyHandlerName,
labelSuccess,
}

// defaultHandlerExecutionTimeBuckets are one order of magnitude smaller than default buckets (5ms~10s),
// because the handler execution times are typically shorter (µs~ms range).
defaultHandlerExecutionTimeBuckets = []float64{
0.0005,
0.001,
0.0025,
0.005,
0.01,
0.025,
0.05,
0.1,
0.25,
0.5,
1,
}
)

// HandlerPrometheusMetricsMiddleware is a middleware that captures Prometheus metrics.
type HandlerPrometheusMetricsMiddleware struct {
handlerExecutionTimeSeconds *prometheus.HistogramVec
Expand Down
Loading