Skip to content

Commit 046d070

Browse files
dashpoleDylan-M
authored andcommitted
add start_time_metric strategy to the metricstarttime processor (open-telemetry#41047)
#### Description Add the `start_time_metric` strategy to the metricstarttime processor. This is needed to reach feature parity with the prometheus receiver's adjuster. It supports the same `start_time_metric_regex` configuration parameter, and falls back to the collector's start time. It does not yet support reset detection. That will be implemented in open-telemetry#38381, and I left a TODO in the code. #### Link to tracking issue Fixes open-telemetry#38383 #### Testing Added unit tests. #### Documentation Updated the README.md @ridwanmsharif
1 parent 8f2db90 commit 046d070

File tree

9 files changed

+572
-1
lines changed

9 files changed

+572
-1
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: metricstarttimeprocessor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add the start_time_metric, which sets the start time based on another metric in the batch of metrics.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38383]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user, api]

processor/metricstarttimeprocessor/README.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,34 @@ Cons:
6969

7070
* The absolute value of counters is modified. This is generally not an issue, since counters are usually used to compute rates.
7171
* The initial point is dropped, which loses information.
72+
73+
### Strategy: Start Time Metric
74+
75+
The `start_time_metric` strategy handles missing start times by looking for the
76+
`process_start_time` metric, which is commonly supported by Prometheus exporters.
77+
If found, it uses the value of the `process_start_time` metric as the start time
78+
for all other cumulative points in the batch of metrics.
79+
80+
Use the `start_time_metric_regex` configuration option to change the name of the
81+
metric used for the start time.
82+
83+
If the start time metric is not found, it falls back to the time at which the
84+
collector started.
85+
86+
This strategy should only be used in limited circumstances:
87+
88+
* When your application has a metric with the start time in Unix nanoseconds,
89+
such as `process_start_time`.
90+
* The metricstarttime processor is used _before_ any batching, so that the
91+
batch of metrics all originate from a single application.
92+
* This strategy can be used when the collector is run as a sidecar to the
93+
application, where the collector's start time is a good approximation of the
94+
application's start time.
95+
96+
Cons:
97+
98+
* If the collector's start time is used as a fallback and the collector
99+
restarts, it can produce rates that are incorrect and higher than expected.
100+
* The process' start time isn't the time at which individual instruments or
101+
timeseries are initialized. It may result in lower rates if the first
102+
observation is significantly later than the process' start time.

processor/metricstarttimeprocessor/config.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ package metricstarttimeprocessor // import "github.com/open-telemetry/openteleme
66
import (
77
"errors"
88
"fmt"
9+
"regexp"
910
"time"
1011

1112
"go.opentelemetry.io/collector/component"
1213

14+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/starttimemetric"
1315
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/subtractinitial"
1416
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/truereset"
1517
)
@@ -18,6 +20,8 @@ import (
1820
type Config struct {
1921
Strategy string `mapstructure:"strategy"`
2022
GCInterval time.Duration `mapstructure:"gc_interval"`
23+
// StartTimeMetricRegex only applies then the start_time_metric strategy is used
24+
StartTimeMetricRegex string `mapstructure:"start_time_metric_regex"`
2125
}
2226

2327
var _ component.Config = (*Config)(nil)
@@ -34,11 +38,20 @@ func (cfg *Config) Validate() error {
3438
switch cfg.Strategy {
3539
case truereset.Type:
3640
case subtractinitial.Type:
41+
case starttimemetric.Type:
3742
default:
3843
return fmt.Errorf("%q is not a valid strategy", cfg.Strategy)
3944
}
4045
if cfg.GCInterval <= 0 {
4146
return errors.New("gc_interval must be positive")
4247
}
48+
if cfg.StartTimeMetricRegex != "" {
49+
if _, err := regexp.Compile(cfg.StartTimeMetricRegex); err != nil {
50+
return err
51+
}
52+
if cfg.Strategy != starttimemetric.Type {
53+
return errors.New("start_time_metric_regex can only be used with the start_time_metric strategy")
54+
}
55+
}
4356
return nil
4457
}

processor/metricstarttimeprocessor/config_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"go.opentelemetry.io/collector/confmap/xconfmap"
1616

1717
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/metadata"
18+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/starttimemetric"
1819
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/subtractinitial"
1920
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/truereset"
2021
)
@@ -56,6 +57,29 @@ func TestLoadConfig(t *testing.T) {
5657
id: component.NewIDWithName(metadata.Type, "invalid_strategy"),
5758
errorMessage: "\"bad\" is not a valid strategy",
5859
},
60+
{
61+
id: component.NewIDWithName(metadata.Type, "true_reset_point"),
62+
expected: &Config{
63+
Strategy: truereset.Type,
64+
GCInterval: 10 * time.Minute,
65+
},
66+
},
67+
{
68+
id: component.NewIDWithName(metadata.Type, "start_time_metric"),
69+
expected: &Config{
70+
Strategy: starttimemetric.Type,
71+
GCInterval: 10 * time.Minute,
72+
StartTimeMetricRegex: "^.+_process_start_time_seconds$",
73+
},
74+
},
75+
{
76+
id: component.NewIDWithName(metadata.Type, "invalid_regex"),
77+
errorMessage: "error parsing regexp: missing closing ): `((((`",
78+
},
79+
{
80+
id: component.NewIDWithName(metadata.Type, "regex_with_subtract_initial_point"),
81+
errorMessage: "start_time_metric_regex can only be used with the start_time_metric strategy",
82+
},
5983
}
6084

6185
for _, tt := range tests {

processor/metricstarttimeprocessor/factory.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ package metricstarttimeprocessor // import "github.com/open-telemetry/openteleme
55

66
import (
77
"context"
8+
"regexp"
89

910
"go.opentelemetry.io/collector/component"
1011
"go.opentelemetry.io/collector/consumer"
1112
"go.opentelemetry.io/collector/processor"
1213
"go.opentelemetry.io/collector/processor/processorhelper"
1314

1415
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/metadata"
16+
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/starttimemetric"
1517
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/subtractinitial"
1618
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/truereset"
1719
)
@@ -42,6 +44,17 @@ func createMetricsProcessor(
4244
case subtractinitial.Type:
4345
adjuster := subtractinitial.NewAdjuster(set.TelemetrySettings, rCfg.GCInterval)
4446
adjustMetrics = adjuster.AdjustMetrics
47+
case starttimemetric.Type:
48+
var startTimeMetricRegex *regexp.Regexp
49+
var err error
50+
if rCfg.StartTimeMetricRegex != "" {
51+
startTimeMetricRegex, err = regexp.Compile(rCfg.StartTimeMetricRegex)
52+
if err != nil {
53+
return nil, err
54+
}
55+
}
56+
adjuster := starttimemetric.NewAdjuster(set.TelemetrySettings, startTimeMetricRegex)
57+
adjustMetrics = adjuster.AdjustMetrics
4558
}
4659

4760
return processorhelper.NewMetrics(
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package starttimemetric // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstarttimeprocessor/internal/starttimemetric"
5+
6+
import (
7+
"context"
8+
"errors"
9+
"regexp"
10+
"time"
11+
12+
"go.opentelemetry.io/collector/component"
13+
"go.opentelemetry.io/collector/pdata/pcommon"
14+
"go.opentelemetry.io/collector/pdata/pmetric"
15+
"go.uber.org/zap"
16+
)
17+
18+
const (
19+
// Type is the value users can use to configure the start time metric adjuster.
20+
Type = "start_time_metric"
21+
startTimeMetricName = "process_start_time_seconds"
22+
)
23+
24+
var (
25+
errNoStartTimeMetrics = errors.New("start_time metric is missing")
26+
errNoDataPointsStartTimeMetric = errors.New("start time metric with no data points")
27+
errUnsupportedTypeStartTimeMetric = errors.New("unsupported data type for start time metric")
28+
// approximateCollectorStartTime is the approximate start time of the
29+
// collector. Used as a fallback start time for metrics when the start time
30+
// metric is not found. Set when the component is initialized.
31+
approximateCollectorStartTime time.Time
32+
)
33+
34+
func init() {
35+
approximateCollectorStartTime = time.Now()
36+
}
37+
38+
type Adjuster struct {
39+
startTimeMetricRegex *regexp.Regexp
40+
set component.TelemetrySettings
41+
}
42+
43+
// NewAdjuster returns a new Adjuster which adjust metrics' start times based on the initial received points.
44+
func NewAdjuster(set component.TelemetrySettings, startTimeMetricRegex *regexp.Regexp) *Adjuster {
45+
return &Adjuster{
46+
set: set,
47+
startTimeMetricRegex: startTimeMetricRegex,
48+
}
49+
}
50+
51+
// AdjustMetrics adjusts the start time of metrics based on a different metric in the batch.
52+
func (a *Adjuster) AdjustMetrics(_ context.Context, metrics pmetric.Metrics) (pmetric.Metrics, error) {
53+
startTime, err := a.getStartTime(metrics)
54+
if err != nil {
55+
a.set.Logger.Debug("Couldn't get start time for metrics. Using fallback start time.", zap.Error(err), zap.Time("fallback_start_time", approximateCollectorStartTime))
56+
startTime = float64(approximateCollectorStartTime.Unix())
57+
}
58+
59+
startTimeTs := timestampFromFloat64(startTime)
60+
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
61+
rm := metrics.ResourceMetrics().At(i)
62+
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
63+
ilm := rm.ScopeMetrics().At(j)
64+
for k := 0; k < ilm.Metrics().Len(); k++ {
65+
metric := ilm.Metrics().At(k)
66+
switch metric.Type() {
67+
case pmetric.MetricTypeGauge:
68+
continue
69+
70+
case pmetric.MetricTypeSum:
71+
dataPoints := metric.Sum().DataPoints()
72+
for l := 0; l < dataPoints.Len(); l++ {
73+
dp := dataPoints.At(l)
74+
dp.SetStartTimestamp(startTimeTs)
75+
}
76+
77+
case pmetric.MetricTypeSummary:
78+
dataPoints := metric.Summary().DataPoints()
79+
for l := 0; l < dataPoints.Len(); l++ {
80+
dp := dataPoints.At(l)
81+
dp.SetStartTimestamp(startTimeTs)
82+
}
83+
84+
case pmetric.MetricTypeHistogram:
85+
dataPoints := metric.Histogram().DataPoints()
86+
for l := 0; l < dataPoints.Len(); l++ {
87+
dp := dataPoints.At(l)
88+
dp.SetStartTimestamp(startTimeTs)
89+
}
90+
91+
case pmetric.MetricTypeExponentialHistogram:
92+
dataPoints := metric.ExponentialHistogram().DataPoints()
93+
for l := 0; l < dataPoints.Len(); l++ {
94+
dp := dataPoints.At(l)
95+
dp.SetStartTimestamp(startTimeTs)
96+
}
97+
98+
case pmetric.MetricTypeEmpty:
99+
fallthrough
100+
101+
default:
102+
a.set.Logger.Warn("Unknown metric type", zap.String("type", metric.Type().String()))
103+
}
104+
}
105+
}
106+
}
107+
// TODO: handle resets by factoring reset handling out of other strategies
108+
return metrics, nil
109+
}
110+
111+
func timestampFromFloat64(ts float64) pcommon.Timestamp {
112+
secs := int64(ts)
113+
nanos := int64((ts - float64(secs)) * 1e9)
114+
return pcommon.Timestamp(secs*1e9 + nanos)
115+
}
116+
117+
func (a *Adjuster) getStartTime(metrics pmetric.Metrics) (float64, error) {
118+
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
119+
rm := metrics.ResourceMetrics().At(i)
120+
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
121+
ilm := rm.ScopeMetrics().At(j)
122+
for k := 0; k < ilm.Metrics().Len(); k++ {
123+
metric := ilm.Metrics().At(k)
124+
if a.matchStartTimeMetric(metric.Name()) {
125+
switch metric.Type() {
126+
case pmetric.MetricTypeGauge:
127+
if metric.Gauge().DataPoints().Len() == 0 {
128+
return 0.0, errNoDataPointsStartTimeMetric
129+
}
130+
return metric.Gauge().DataPoints().At(0).DoubleValue(), nil
131+
132+
case pmetric.MetricTypeSum:
133+
if metric.Sum().DataPoints().Len() == 0 {
134+
return 0.0, errNoDataPointsStartTimeMetric
135+
}
136+
return metric.Sum().DataPoints().At(0).DoubleValue(), nil
137+
138+
case pmetric.MetricTypeEmpty, pmetric.MetricTypeHistogram, pmetric.MetricTypeExponentialHistogram, pmetric.MetricTypeSummary:
139+
fallthrough
140+
default:
141+
return 0, errUnsupportedTypeStartTimeMetric
142+
}
143+
}
144+
}
145+
}
146+
}
147+
return 0.0, errNoStartTimeMetrics
148+
}
149+
150+
func (a *Adjuster) matchStartTimeMetric(metricName string) bool {
151+
if a.startTimeMetricRegex != nil {
152+
return a.startTimeMetricRegex.MatchString(metricName)
153+
}
154+
155+
return metricName == startTimeMetricName
156+
}

0 commit comments

Comments
 (0)