-
Notifications
You must be signed in to change notification settings - Fork 3.9k
feat: implement otlp prom exporter #24158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
ea63eac
c373791
3dc7abf
38664e2
cbc1c77
c05a3a5
0cd6572
7472934
527a99a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -90,6 +90,15 @@ type Config struct { | |
// DatadogHostname defines the hostname to use when emitting metrics to | ||
// Datadog. Only utilized if MetricsSink is set to "dogstatsd". | ||
DatadogHostname string `mapstructure:"datadog-hostname"` | ||
|
||
// Otlp Exporter fields | ||
OtlpExporterEnabled bool `mapstructure:"otlp-exporter-enabled"` | ||
OtlpCollectorEndpoint string `mapstructure:"otlp-collector-endpoint"` | ||
OtlpCollectorMetricsURLPath string `mapstructure:"otlp-collector-metrics-url-path"` | ||
OtlpUser string `mapstructure:"otlp-user"` | ||
OtlpToken string `mapstructure:"otlp-token"` | ||
OtlpServiceName string `mapstructure:"otlp-service-name"` | ||
OtlpPushInterval time.Duration `mapstructure:"otlp-push-interval"` | ||
} | ||
Comment on lines
+93
to
102
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Consider documenting defaults & ensuring zero‑value safety for new OTLP fields The new fields are great, but:
Please: |
||
|
||
// Metrics defines a wrapper around application telemetry functionality. It allows | ||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,171 @@ | ||||||||||||||||||||
package telemetry | ||||||||||||||||||||
|
||||||||||||||||||||
import ( | ||||||||||||||||||||
"context" | ||||||||||||||||||||
"encoding/base64" | ||||||||||||||||||||
"fmt" | ||||||||||||||||||||
"log" | ||||||||||||||||||||
"math" | ||||||||||||||||||||
"time" | ||||||||||||||||||||
|
||||||||||||||||||||
"github.com/prometheus/client_golang/prometheus" | ||||||||||||||||||||
dto "github.com/prometheus/client_model/go" | ||||||||||||||||||||
"go.opentelemetry.io/otel" | ||||||||||||||||||||
"go.opentelemetry.io/otel/attribute" | ||||||||||||||||||||
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" | ||||||||||||||||||||
otmetric "go.opentelemetry.io/otel/metric" | ||||||||||||||||||||
"go.opentelemetry.io/otel/sdk/metric" | ||||||||||||||||||||
"go.opentelemetry.io/otel/sdk/resource" | ||||||||||||||||||||
semconv "go.opentelemetry.io/otel/semconv/v1.21.0" | ||||||||||||||||||||
) | ||||||||||||||||||||
|
||||||||||||||||||||
const meterName = "cosmos-sdk-otlp-exporter" | ||||||||||||||||||||
|
||||||||||||||||||||
func StartOtlpExporter(cfg Config) { | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lets have a comment on what this does and how it works |
||||||||||||||||||||
ctx := context.Background() | ||||||||||||||||||||
|
||||||||||||||||||||
exporter, err := otlpmetrichttp.New(ctx, | ||||||||||||||||||||
otlpmetrichttp.WithEndpoint(cfg.OtlpCollectorEndpoint), | ||||||||||||||||||||
otlpmetrichttp.WithURLPath(cfg.OtlpCollectorMetricsURLPath), | ||||||||||||||||||||
otlpmetrichttp.WithHeaders(map[string]string{ | ||||||||||||||||||||
"Authorization": "Basic " + formatBasicAuth(cfg.OtlpUser, cfg.OtlpToken), | ||||||||||||||||||||
}), | ||||||||||||||||||||
Comment on lines
+30
to
+32
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This feels brittle in general. Couldn't some collectors expect auth in Http headers, some in grpc request directly, some without authz entirely? |
||||||||||||||||||||
) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
log.Fatalf("OTLP exporter setup failed: %v", err) | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
Comment on lines
+34
to
+37
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid
- if err != nil {
- log.Fatalf("OTLP exporter setup failed: %v", err)
- }
+ if err != nil {
+ return fmt.Errorf("OTLP exporter setup failed: %w", err)
+ } …and bubble it up to the caller as noted in the 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||
res, _ := resource.New(ctx, resource.WithAttributes( | ||||||||||||||||||||
semconv.ServiceName(cfg.OtlpServiceName), | ||||||||||||||||||||
)) | ||||||||||||||||||||
Comment on lines
+38
to
+40
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need to check the ignored error here? if not, can we comment why we are able to ignore it? |
||||||||||||||||||||
|
||||||||||||||||||||
Comment on lines
+38
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Handle and log the error returned by The second return value is currently discarded. If the resource cannot be created, the exporter will run with incomplete metadata. -res, _ := resource.New(ctx, resource.WithAttributes(
- semconv.ServiceName(cfg.OtlpServiceName),
-))
+res, rErr := resource.New(ctx, resource.WithAttributes(
+ semconv.ServiceName(cfg.OtlpServiceName),
+))
+if rErr != nil {
+ return fmt.Errorf("failed to initialise OTLP resource: %w", rErr)
+} 📝 Committable suggestion
Suggested change
|
||||||||||||||||||||
meterProvider := metric.NewMeterProvider( | ||||||||||||||||||||
metric.WithReader(metric.NewPeriodicReader(exporter, | ||||||||||||||||||||
metric.WithInterval(cfg.OtlpPushInterval))), | ||||||||||||||||||||
metric.WithResource(res), | ||||||||||||||||||||
) | ||||||||||||||||||||
otel.SetMeterProvider(meterProvider) | ||||||||||||||||||||
meter := otel.Meter(meterName) | ||||||||||||||||||||
|
||||||||||||||||||||
gauges := make(map[string]otmetric.Float64Gauge) | ||||||||||||||||||||
histograms := make(map[string]otmetric.Float64Histogram) | ||||||||||||||||||||
|
||||||||||||||||||||
go func() { | ||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how does this behave when shutting a node down? Do we want any kind of graceful shutdown here? |
||||||||||||||||||||
for { | ||||||||||||||||||||
if err := scrapePrometheusMetrics(ctx, meter, gauges, histograms); err != nil { | ||||||||||||||||||||
log.Printf("error scraping metrics: %v", err) | ||||||||||||||||||||
} | ||||||||||||||||||||
time.Sleep(cfg.OtlpPushInterval) | ||||||||||||||||||||
Comment on lines
+54
to
+58
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. small nit
Suggested change
|
||||||||||||||||||||
} | ||||||||||||||||||||
}() | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
func scrapePrometheusMetrics(ctx context.Context, meter otmetric.Meter, gauges map[string]otmetric.Float64Gauge, histograms map[string]otmetric.Float64Histogram) error { | ||||||||||||||||||||
metricFamilies, err := prometheus.DefaultGatherer.Gather() | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
log.Printf("failed to gather prometheus metrics: %v", err) | ||||||||||||||||||||
return err | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
for _, mf := range metricFamilies { | ||||||||||||||||||||
name := mf.GetName() | ||||||||||||||||||||
for _, m := range mf.Metric { | ||||||||||||||||||||
switch mf.GetType() { | ||||||||||||||||||||
case dto.MetricType_GAUGE: | ||||||||||||||||||||
recordGauge(ctx, meter, gauges, name, mf.GetHelp(), m.Gauge.GetValue(), nil) | ||||||||||||||||||||
|
||||||||||||||||||||
case dto.MetricType_COUNTER: | ||||||||||||||||||||
recordGauge(ctx, meter, gauges, name, mf.GetHelp(), m.Counter.GetValue(), nil) | ||||||||||||||||||||
|
||||||||||||||||||||
case dto.MetricType_HISTOGRAM: | ||||||||||||||||||||
recordHistogram(ctx, meter, histograms, name, mf.GetHelp(), m.Histogram) | ||||||||||||||||||||
|
||||||||||||||||||||
case dto.MetricType_SUMMARY: | ||||||||||||||||||||
recordSummary(ctx, meter, gauges, name, mf.GetHelp(), m.Summary) | ||||||||||||||||||||
|
||||||||||||||||||||
default: | ||||||||||||||||||||
continue | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
return nil | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
func recordGauge(ctx context.Context, meter otmetric.Meter, gauges map[string]otmetric.Float64Gauge, name, help string, val float64, attrs []attribute.KeyValue) { | ||||||||||||||||||||
g, ok := gauges[name] | ||||||||||||||||||||
if !ok { | ||||||||||||||||||||
var err error | ||||||||||||||||||||
g, err = meter.Float64Gauge(name, otmetric.WithDescription(help)) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
log.Printf("failed to create gauge %q: %v", name, err) | ||||||||||||||||||||
return | ||||||||||||||||||||
} | ||||||||||||||||||||
gauges[name] = g | ||||||||||||||||||||
} | ||||||||||||||||||||
g.Record(ctx, val, otmetric.WithAttributes(attrs...)) | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
func recordHistogram(ctx context.Context, meter otmetric.Meter, histograms map[string]otmetric.Float64Histogram, name, help string, h *dto.Histogram) { | ||||||||||||||||||||
boundaries := make([]float64, 0, len(h.Bucket)-1) // excluding +Inf | ||||||||||||||||||||
bucketCounts := make([]uint64, 0, len(h.Bucket)) | ||||||||||||||||||||
|
||||||||||||||||||||
for _, bucket := range h.Bucket { | ||||||||||||||||||||
if math.IsInf(bucket.GetUpperBound(), +1) { | ||||||||||||||||||||
continue // Skip +Inf bucket boundary explicitly | ||||||||||||||||||||
} | ||||||||||||||||||||
Comment on lines
+114
to
+116
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why do we skip this? i know it says +Inf boundary, but as someone who is unfamiliar with OTL, i am not sure what the significance is |
||||||||||||||||||||
boundaries = append(boundaries, bucket.GetUpperBound()) | ||||||||||||||||||||
bucketCounts = append(bucketCounts, bucket.GetCumulativeCount()) | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
hist, ok := histograms[name] | ||||||||||||||||||||
if !ok { | ||||||||||||||||||||
var err error | ||||||||||||||||||||
hist, err = meter.Float64Histogram( | ||||||||||||||||||||
name, | ||||||||||||||||||||
otmetric.WithDescription(help), | ||||||||||||||||||||
otmetric.WithExplicitBucketBoundaries(boundaries...), | ||||||||||||||||||||
) | ||||||||||||||||||||
if err != nil { | ||||||||||||||||||||
log.Printf("failed to create histogram %s: %v", name, err) | ||||||||||||||||||||
return | ||||||||||||||||||||
} | ||||||||||||||||||||
histograms[name] = hist | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
prevCount := uint64(0) | ||||||||||||||||||||
for i, count := range bucketCounts { | ||||||||||||||||||||
countInBucket := count - prevCount | ||||||||||||||||||||
prevCount = count | ||||||||||||||||||||
|
||||||||||||||||||||
// Explicitly record the mid-point of the bucket as approximation: | ||||||||||||||||||||
var value float64 | ||||||||||||||||||||
if i == 0 { | ||||||||||||||||||||
value = boundaries[0] / 2.0 | ||||||||||||||||||||
Check noticeCode scanning / CodeQL Floating point arithmetic Note
Floating point arithmetic operations are not associative and a possible source of non-determinism
|
||||||||||||||||||||
} else { | ||||||||||||||||||||
value = (boundaries[i-1] + boundaries[i]) / 2.0 | ||||||||||||||||||||
Check noticeCode scanning / CodeQL Floating point arithmetic Note
Floating point arithmetic operations are not associative and a possible source of non-determinism
Check noticeCode scanning / CodeQL Floating point arithmetic Note
Floating point arithmetic operations are not associative and a possible source of non-determinism
|
||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
// Record `countInBucket` number of observations explicitly (approximation): | ||||||||||||||||||||
for j := uint64(0); j < countInBucket; j++ { | ||||||||||||||||||||
hist.Record(ctx, value) | ||||||||||||||||||||
} | ||||||||||||||||||||
Comment on lines
+150
to
+152
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
func recordSummary(ctx context.Context, meter otmetric.Meter, gauges map[string]otmetric.Float64Gauge, name, help string, s *dto.Summary) { | ||||||||||||||||||||
recordGauge(ctx, meter, gauges, name+"_sum", help+" (summary sum)", s.GetSampleSum(), nil) | ||||||||||||||||||||
recordGauge(ctx, meter, gauges, name+"_count", help+" (summary count)", float64(s.GetSampleCount()), nil) | ||||||||||||||||||||
|
||||||||||||||||||||
for _, q := range s.Quantile { | ||||||||||||||||||||
attrs := []attribute.KeyValue{ | ||||||||||||||||||||
attribute.String("quantile", fmt.Sprintf("%v", q.GetQuantile())), | ||||||||||||||||||||
} | ||||||||||||||||||||
recordGauge(ctx, meter, gauges, name, help+" (summary quantile)", q.GetValue(), attrs) | ||||||||||||||||||||
} | ||||||||||||||||||||
} | ||||||||||||||||||||
|
||||||||||||||||||||
func formatBasicAuth(username, token string) string { | ||||||||||||||||||||
auth := username + ":" + token | ||||||||||||||||||||
return base64.StdEncoding.EncodeToString([]byte(auth)) | ||||||||||||||||||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Exporter is started without lifecycle management or error propagation
StartOtlpExporter
(1) blocks fatal‑logging on failure and (2) launches a goroutine that never stops.Starting it here means:
graceDuration
, tests, etc.).telemetry.New
registers the Prom sink.Recommend returning a cancel/cleanup func and wiring it into the existing
errgroup
, then starting aftertelemetry.New
: