-
Notifications
You must be signed in to change notification settings - Fork 3.5k
Expand file tree
/
Copy pathmetrics.go
More file actions
120 lines (102 loc) · 3.11 KB
/
metrics.go
File metadata and controls
120 lines (102 loc) · 3.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package failoverconnector // import "github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector"
import (
"context"
"errors"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/zap"
)
type metricsRouter struct {
*baseFailoverRouter[consumer.Metrics]
}
func newMetricsRouter(provider consumerProvider[consumer.Metrics], cfg *Config) (*metricsRouter, error) {
failover, err := newBaseFailoverRouter(provider, cfg)
if err != nil {
return nil, err
}
return &metricsRouter{baseFailoverRouter: failover}, nil
}
// Consume is the metrics-specific consumption method
func (f *metricsRouter) Consume(ctx context.Context, md pmetric.Metrics) error {
select {
case <-f.notifyRetry:
if !f.sampleRetryConsumers(ctx, md) {
return f.consumeByHealthyPipeline(ctx, md)
}
return nil
default:
return f.consumeByHealthyPipeline(ctx, md)
}
}
// consumeByHealthyPipeline will consume the metrics by the current healthy level
func (f *metricsRouter) consumeByHealthyPipeline(ctx context.Context, md pmetric.Metrics) error {
for {
tc, idx := f.getCurrentConsumer()
if idx >= len(f.cfg.PipelinePriority) {
return errNoValidPipeline
}
if err := tc.ConsumeMetrics(ctx, md); err != nil {
if idx > 0 && idx == len(f.cfg.PipelinePriority)-1 {
if f.sampleRetryConsumers(ctx, md) {
return nil
}
}
f.reportConsumerError(idx)
continue
}
return nil
}
}
// sampleRetryConsumers iterates through all unhealthy consumers to re-establish a healthy connection
func (f *metricsRouter) sampleRetryConsumers(ctx context.Context, md pmetric.Metrics) bool {
stableIndex := f.pS.CurrentPipeline()
for i := range stableIndex {
consumer := f.getConsumerAtIndex(i)
err := consumer.ConsumeMetrics(ctx, md)
if err == nil {
f.pS.ResetHealthyPipeline(i)
return true
}
}
return false
}
type metricsFailover struct {
component.StartFunc
component.ShutdownFunc
config *Config
failover *metricsRouter
logger *zap.Logger
}
func (*metricsFailover) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}
// ConsumeMetrics will try to export to the current set priority level and handle failover in the case of an error
func (f *metricsFailover) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
return f.failover.Consume(ctx, md)
}
func (f *metricsFailover) Shutdown(context.Context) error {
if f.failover != nil {
f.failover.Shutdown()
}
return nil
}
func newMetricsToMetrics(set connector.Settings, cfg component.Config, metrics consumer.Metrics) (connector.Metrics, error) {
config := cfg.(*Config)
mr, ok := metrics.(connector.MetricsRouterAndConsumer)
if !ok {
return nil, errors.New("consumer is not of type MetricsRouter")
}
failover, err := newMetricsRouter(mr.Consumer, config)
if err != nil {
return nil, err
}
return &metricsFailover{
config: config,
failover: failover,
logger: set.Logger,
}, nil
}