Skip to content

Commit 8411df2

Browse files
lahsivjaraxw
andauthored
[connector/signaltometrics]Add otelcol instance info as res attribute (#178)
* [connector/signaltometrics]Add collector instance info as res attribute Metric data streams MUST obey single-writer. `signaltometrics` the component produces metrics from non-metric signals as well as alters resource attributes from the source signals. To keep the single-writer valid, the component adds collector information as resource attributes with the component name (`signaltometrics`) as the prefix for all added resource attributes. --------- Co-authored-by: Andrew Wilkins <[email protected]>
1 parent 53d7925 commit 8411df2

File tree

23 files changed

+664
-80
lines changed

23 files changed

+664
-80
lines changed

connector/signaltometricsconnector/config/config.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,11 @@ type MetricInfo struct {
143143
// IncludeResourceAttributes is a list of resource attributes that
144144
// needs to be included in the generated metric. If no resource
145145
// attribute is included in the list then all attributes are included.
146-
// Note that configuring this setting might cause the produced metric
147-
// to lose its identity or cause identity conflict.
146+
// Metric data streams MUST obey single-writer. The component produces
147+
// metrics from non-metric signals as well as alters resource attributes
148+
// from the source signals. To keep the single-writer valid, the
149+
// component adds collector information as resource attribute with the
150+
// component name as the prefix of the resource attributes.
148151
IncludeResourceAttributes []Attribute `mapstructure:"include_resource_attributes"`
149152
Attributes []Attribute `mapstructure:"attributes"`
150153
// Conditions are a set of OTTL condtions which are ORd. Data is

connector/signaltometricsconnector/connector.go

Lines changed: 19 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ type signalToMetrics struct {
4141
component.StartFunc
4242
component.ShutdownFunc
4343

44-
logger *zap.Logger
44+
collectorInstanceInfo *model.CollectorInstanceInfo
45+
logger *zap.Logger
4546

4647
next consumer.Metrics
4748
spanMetricDefs []model.MetricDef[ottlspan.TransformContext]
@@ -73,10 +74,8 @@ func (sm *signalToMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces)
7374
spanAttrs := span.Attributes()
7475
adjustedCount := int64(trace.CalculateAdjustedCount(span.TraceState().AsRaw()))
7576
for _, md := range sm.spanMetricDefs {
76-
filteredSpanAttrs := getFilteredAttributes(spanAttrs, md.Attributes)
77-
if filteredSpanAttrs.Len() != len(md.Attributes) {
78-
// If any of the configured attributes is not present in
79-
// source metric then don't count them.
77+
filteredSpanAttrs, ok := md.FilterAttributes(spanAttrs)
78+
if !ok {
8079
continue
8180
}
8281

@@ -95,14 +94,7 @@ func (sm *signalToMetrics) ConsumeTraces(ctx context.Context, td ptrace.Traces)
9594
}
9695
}
9796

98-
var filteredResAttrs pcommon.Map
99-
if len(md.IncludeResourceAttributes) > 0 {
100-
filteredResAttrs = getFilteredAttributes(resourceAttrs, md.IncludeResourceAttributes)
101-
} else {
102-
// Copy resource attrs to avoid mutating data
103-
filteredResAttrs = pcommon.NewMap()
104-
resourceAttrs.CopyTo(filteredResAttrs)
105-
}
97+
filteredResAttrs := md.FilterResourceAttributes(resourceAttrs, sm.collectorInstanceInfo)
10698
multiError = errors.Join(multiError, aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, filteredSpanAttrs, adjustedCount))
10799
}
108100
}
@@ -130,15 +122,7 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics
130122
metrics := scopeMetric.Metrics()
131123
metric := metrics.At(k)
132124
for _, md := range sm.dpMetricDefs {
133-
var filteredResAttrs pcommon.Map
134-
if len(md.IncludeResourceAttributes) > 0 {
135-
filteredResAttrs = getFilteredAttributes(resourceAttrs, md.IncludeResourceAttributes)
136-
} else {
137-
// Copy resource attrs to avoid mutating data
138-
filteredResAttrs = pcommon.NewMap()
139-
resourceAttrs.CopyTo(filteredResAttrs)
140-
}
141-
125+
filteredResAttrs := md.FilterResourceAttributes(resourceAttrs, sm.collectorInstanceInfo)
142126
aggregate := func(dp any, dpAttrs pcommon.Map) error {
143127
// The transform context is created from orginal attributes so that the
144128
// OTTL expressions are also applied on the original attributes.
@@ -163,10 +147,8 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics
163147
dps := metric.Gauge().DataPoints()
164148
for l := 0; l < dps.Len(); l++ {
165149
dp := dps.At(l)
166-
filteredDPAttrs := getFilteredAttributes(dp.Attributes(), md.Attributes)
167-
if filteredDPAttrs.Len() != len(md.Attributes) {
168-
// If all the configured attributes are not present in
169-
// source metric then don't count them.
150+
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
151+
if !ok {
170152
continue
171153
}
172154
multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs))
@@ -175,10 +157,8 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics
175157
dps := metric.Sum().DataPoints()
176158
for l := 0; l < dps.Len(); l++ {
177159
dp := dps.At(l)
178-
filteredDPAttrs := getFilteredAttributes(dp.Attributes(), md.Attributes)
179-
if filteredDPAttrs.Len() != len(md.Attributes) {
180-
// If all the configured attributes are not present in
181-
// source metric then don't count them.
160+
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
161+
if !ok {
182162
continue
183163
}
184164
multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs))
@@ -187,10 +167,8 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics
187167
dps := metric.Summary().DataPoints()
188168
for l := 0; l < dps.Len(); l++ {
189169
dp := dps.At(l)
190-
filteredDPAttrs := getFilteredAttributes(dp.Attributes(), md.Attributes)
191-
if filteredDPAttrs.Len() != len(md.Attributes) {
192-
// If all the configured attributes are not present in
193-
// source metric then don't count them.
170+
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
171+
if !ok {
194172
continue
195173
}
196174
multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs))
@@ -199,10 +177,8 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics
199177
dps := metric.Histogram().DataPoints()
200178
for l := 0; l < dps.Len(); l++ {
201179
dp := dps.At(l)
202-
filteredDPAttrs := getFilteredAttributes(dp.Attributes(), md.Attributes)
203-
if filteredDPAttrs.Len() != len(md.Attributes) {
204-
// If all the configured attributes are not present in
205-
// source metric then don't count them.
180+
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
181+
if !ok {
206182
continue
207183
}
208184
multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs))
@@ -211,10 +187,8 @@ func (sm *signalToMetrics) ConsumeMetrics(ctx context.Context, m pmetric.Metrics
211187
dps := metric.ExponentialHistogram().DataPoints()
212188
for l := 0; l < dps.Len(); l++ {
213189
dp := dps.At(l)
214-
filteredDPAttrs := getFilteredAttributes(dp.Attributes(), md.Attributes)
215-
if filteredDPAttrs.Len() != len(md.Attributes) {
216-
// If all the configured attributes are not present in
217-
// source metric then don't count them.
190+
filteredDPAttrs, ok := md.FilterAttributes(dp.Attributes())
191+
if !ok {
218192
continue
219193
}
220194
multiError = errors.Join(multiError, aggregate(dp, filteredDPAttrs))
@@ -248,10 +222,8 @@ func (sm *signalToMetrics) ConsumeLogs(ctx context.Context, logs plog.Logs) erro
248222
log := scopeLog.LogRecords().At(k)
249223
logAttrs := log.Attributes()
250224
for _, md := range sm.logMetricDefs {
251-
filteredLogAttrs := getFilteredAttributes(logAttrs, md.Attributes)
252-
if filteredLogAttrs.Len() != len(md.Attributes) {
253-
// If all the configured attributes are not present in
254-
// source metric then don't count them.
225+
filteredLogAttrs, ok := md.FilterAttributes(logAttrs)
226+
if !ok {
255227
continue
256228
}
257229

@@ -270,14 +242,7 @@ func (sm *signalToMetrics) ConsumeLogs(ctx context.Context, logs plog.Logs) erro
270242
}
271243
}
272244

273-
var filteredResAttrs pcommon.Map
274-
if len(md.IncludeResourceAttributes) > 0 {
275-
filteredResAttrs = getFilteredAttributes(resourceAttrs, md.IncludeResourceAttributes)
276-
} else {
277-
// Copy resource attrs to avoid mutating data
278-
filteredResAttrs = pcommon.NewMap()
279-
resourceAttrs.CopyTo(filteredResAttrs)
280-
}
245+
filteredResAttrs := md.FilterResourceAttributes(resourceAttrs, sm.collectorInstanceInfo)
281246
multiError = errors.Join(multiError, aggregator.Aggregate(ctx, tCtx, md, filteredResAttrs, filteredLogAttrs, 1))
282247
}
283248
}
@@ -306,17 +271,3 @@ func (sm *signalToMetrics) processNext(ctx context.Context, m pmetric.Metrics, e
306271
}
307272
return sm.next.ConsumeMetrics(ctx, m)
308273
}
309-
310-
func getFilteredAttributes(attrs pcommon.Map, filters []model.AttributeKeyValue) pcommon.Map {
311-
filteredAttrs := pcommon.NewMap()
312-
for _, filter := range filters {
313-
if attr, ok := attrs.Get(filter.Key); ok {
314-
attr.CopyTo(filteredAttrs.PutEmpty(filter.Key))
315-
continue
316-
}
317-
if filter.DefaultValue.Type() != pcommon.ValueTypeEmpty {
318-
filter.DefaultValue.CopyTo(filteredAttrs.PutEmpty(filter.Key))
319-
}
320-
}
321-
return filteredAttrs
322-
}

connector/signaltometricsconnector/connector_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ import (
3535
"go.opentelemetry.io/collector/connector/connectortest"
3636
"go.opentelemetry.io/collector/consumer"
3737
"go.opentelemetry.io/collector/consumer/consumertest"
38+
"go.opentelemetry.io/collector/pdata/pcommon"
3839
"go.opentelemetry.io/collector/pdata/pmetric"
40+
semconv "go.opentelemetry.io/collector/semconv/v1.26.0"
3941
"go.uber.org/zap/zapcore"
4042
"go.uber.org/zap/zaptest"
4143
)
@@ -299,6 +301,7 @@ func setupConnector(
299301
t.Helper()
300302
factory := NewFactory()
301303
settings := connectortest.NewNopSettings()
304+
telemetryResource(t).CopyTo(settings.TelemetrySettings.Resource)
302305
settings.TelemetrySettings.Logger = zaptest.NewLogger(t, zaptest.Level(zapcore.DebugLevel))
303306

304307
cfg := createDefaultConfig()
@@ -312,6 +315,16 @@ func setupConnector(
312315
return factory, settings, cfg
313316
}
314317

318+
func telemetryResource(t *testing.T) pcommon.Resource {
319+
t.Helper()
320+
321+
r := pcommon.NewResource()
322+
r.Attributes().PutStr(semconv.AttributeServiceInstanceID, "627cc493-f310-47de-96bd-71410b7dec09")
323+
r.Attributes().PutStr(semconv.AttributeServiceName, "signaltometrics")
324+
r.Attributes().PutStr(semconv.AttributeServiceNamespace, "test")
325+
return r
326+
}
327+
315328
func assertAggregatedMetrics(t *testing.T, expected, actual pmetric.Metrics) bool {
316329
t.Helper()
317330
return assert.NoError(t, pmetrictest.CompareMetrics(

connector/signaltometricsconnector/factory.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,10 @@ func createTracesToMetrics(
7373
}
7474

7575
return &signalToMetrics{
76-
logger: set.Logger,
76+
logger: set.Logger,
77+
collectorInstanceInfo: model.NewCollectorInstanceInfo(
78+
set.TelemetrySettings,
79+
),
7780
next: nextConsumer,
7881
spanMetricDefs: metricDefs,
7982
}, nil
@@ -101,7 +104,10 @@ func createMetricsToMetrics(
101104
}
102105

103106
return &signalToMetrics{
104-
logger: set.Logger,
107+
logger: set.Logger,
108+
collectorInstanceInfo: model.NewCollectorInstanceInfo(
109+
set.TelemetrySettings,
110+
),
105111
next: nextConsumer,
106112
dpMetricDefs: metricDefs,
107113
}, nil
@@ -129,7 +135,10 @@ func createLogsToMetrics(
129135
}
130136

131137
return &signalToMetrics{
132-
logger: set.Logger,
138+
logger: set.Logger,
139+
collectorInstanceInfo: model.NewCollectorInstanceInfo(
140+
set.TelemetrySettings,
141+
),
133142
next: nextConsumer,
134143
logMetricDefs: metricDefs,
135144
}, nil

connector/signaltometricsconnector/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ require (
1717
go.opentelemetry.io/collector/consumer/consumertest v0.111.0
1818
go.opentelemetry.io/collector/pdata v1.17.0
1919
go.opentelemetry.io/collector/pipeline v0.111.0
20+
go.opentelemetry.io/collector/semconv v0.111.0
2021
go.uber.org/goleak v1.3.0
2122
go.uber.org/zap v1.27.0
2223
)
@@ -58,7 +59,6 @@ require (
5859
go.opentelemetry.io/collector/consumer/consumerprofiles v0.111.0 // indirect
5960
go.opentelemetry.io/collector/internal/globalsignal v0.111.0 // indirect
6061
go.opentelemetry.io/collector/pdata/pprofile v0.111.0 // indirect
61-
go.opentelemetry.io/collector/semconv v0.111.0 // indirect
6262
go.opentelemetry.io/otel v1.30.0 // indirect
6363
go.opentelemetry.io/otel/metric v1.30.0 // indirect
6464
go.opentelemetry.io/otel/sdk v1.30.0 // indirect
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package model // import "github.com/elastic/opentelemetry-collector-components/connector/signaltometricsconnector/internal/model"
19+
20+
import (
21+
"github.com/elastic/opentelemetry-collector-components/connector/signaltometricsconnector/internal/metadata"
22+
"go.opentelemetry.io/collector/component"
23+
"go.opentelemetry.io/collector/pdata/pcommon"
24+
semconv "go.opentelemetry.io/collector/semconv/v1.26.0"
25+
)
26+
27+
var prefix = metadata.Type.String()
28+
29+
// CollectorInstanceInfo holds the attributes that could uniquely identify
30+
// the current collector instance. These attributes are initialized from the
31+
// telemetry settings. The CollectorInstanceInfo can copy these attributes,
32+
// with a given prefix, to a provided map.
33+
type CollectorInstanceInfo struct {
34+
size int
35+
serviceInstanceID string
36+
serviceName string
37+
serviceNamespace string
38+
}
39+
40+
func NewCollectorInstanceInfo(
41+
set component.TelemetrySettings,
42+
) *CollectorInstanceInfo {
43+
var info CollectorInstanceInfo
44+
set.Resource.Attributes().Range(func(k string, v pcommon.Value) bool {
45+
switch k {
46+
case semconv.AttributeServiceInstanceID:
47+
if str := v.Str(); str != "" {
48+
info.serviceInstanceID = str
49+
info.size++
50+
}
51+
case semconv.AttributeServiceName:
52+
if str := v.Str(); str != "" {
53+
info.serviceName = str
54+
info.size++
55+
}
56+
case semconv.AttributeServiceNamespace:
57+
if str := v.Str(); str != "" {
58+
info.serviceNamespace = str
59+
info.size++
60+
}
61+
}
62+
return true
63+
})
64+
return &info
65+
}
66+
67+
// Size returns the max number of attributes that defines a collector's
68+
// instance information. Can be used to presize the attributes.
69+
func (info CollectorInstanceInfo) Size() int {
70+
return info.size
71+
}
72+
73+
func (info CollectorInstanceInfo) Copy(to pcommon.Map) {
74+
to.EnsureCapacity(info.Size())
75+
if info.serviceInstanceID != "" {
76+
to.PutStr(keyWithPrefix(semconv.AttributeServiceInstanceID), info.serviceInstanceID)
77+
}
78+
if info.serviceName != "" {
79+
to.PutStr(keyWithPrefix(semconv.AttributeServiceName), info.serviceName)
80+
}
81+
if info.serviceNamespace != "" {
82+
to.PutStr(keyWithPrefix(semconv.AttributeServiceNamespace), info.serviceNamespace)
83+
}
84+
}
85+
86+
func keyWithPrefix(key string) string {
87+
return prefix + "." + key
88+
}

0 commit comments

Comments
 (0)