Skip to content

Commit f864413

Browse files
authored
[processor/elasticapm] enhance data_stream.dataset field with service.name (#925)
Add a configuration knob in the elasticapm processor to populate the `data_stream.dataset` filed dynamically based on the `service.name` attribute. This is disabled by default to maintain backwards compatbility. --------- Signed-off-by: inge4pres <[email protected]>
1 parent 88102af commit f864413

File tree

9 files changed

+298
-22
lines changed

9 files changed

+298
-22
lines changed

processor/elasticapmprocessor/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,9 @@ type Config struct {
2727
// the x-elastic-mapping-mode metadata is set to "ecs". Traces are always enriched regardless
2828
// of this setting. Defaults to false for backwards compatibility (always enrich).
2929
SkipEnrichment bool `mapstructure:"skip_enrichment"`
30+
31+
// ServiceNameInDataStreamDataset controls whether the service.name attribute
32+
// is included in the data_stream.dataset value. If true, the dataset will be
33+
// in the format "apm.app.<service.name>". Defaults to false for backwards compatibility.
34+
ServiceNameInDataStreamDataset bool `mapstructure:"service_name_in_datastream_dataset"`
3035
}

processor/elasticapmprocessor/internal/routing/data_stream.go

Lines changed: 69 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,78 @@
1717

1818
package routing // import "github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/routing"
1919

20-
import "go.opentelemetry.io/collector/pdata/pcommon"
20+
import (
21+
"strings"
2122

22-
func EncodeDataStream(resource pcommon.Resource, dataStreamType string) {
23+
"go.opentelemetry.io/collector/pdata/pcommon"
24+
)
25+
26+
// DataStreamType tracks the text associated with a data stream type.
27+
const (
28+
DataStreamTypeLogs = "logs"
29+
DataStreamTypeMetrics = "metrics"
30+
DataStreamTypeTraces = "traces"
31+
32+
ServiceNameAttributeKey = "service.name"
33+
34+
NamespaceDefault = "default" //TODO: make this configurable
35+
36+
ServiceNameUnknownAttributeUnknonw = "unknown"
37+
)
38+
39+
func EncodeDataStream(resource pcommon.Resource, dataStreamType string, serviceNameInDataset bool) {
40+
if serviceNameInDataset {
41+
encodeDataStreamWithServiceName(resource, dataStreamType)
42+
} else {
43+
encodeDataStreamDefault(resource, dataStreamType)
44+
}
45+
}
46+
47+
func encodeDataStreamDefault(resource pcommon.Resource, dataStreamType string) {
2348
attributes := resource.Attributes()
2449

2550
attributes.PutStr("data_stream.type", dataStreamType)
2651
attributes.PutStr("data_stream.dataset", "apm")
27-
attributes.PutStr("data_stream.namespace", "default") //TODO: make this configurable
52+
attributes.PutStr("data_stream.namespace", NamespaceDefault)
53+
}
54+
55+
func encodeDataStreamWithServiceName(resource pcommon.Resource, dataStreamType string) {
56+
attributes := resource.Attributes()
57+
58+
serviceName, ok := attributes.Get(ServiceNameAttributeKey)
59+
if !ok || serviceName.Str() == "" {
60+
serviceName = pcommon.NewValueStr(ServiceNameUnknownAttributeUnknonw)
61+
}
62+
63+
attributes.PutStr("data_stream.type", dataStreamType)
64+
attributes.PutStr("data_stream.dataset", "apm.app."+normalizeServiceName(serviceName.Str()))
65+
attributes.PutStr("data_stream.namespace", NamespaceDefault)
66+
}
67+
68+
// The follwing is Copied from apm-data
69+
// https://github.com/elastic/apm-data/blob/46a81347bdbb81a7a308e8d2f58f39c0b1137a77/model/modelprocessor/datastream.go#L186C1-L209C2
70+
71+
// normalizeServiceName translates serviceName into a string suitable
72+
// for inclusion in a data stream name.
73+
//
74+
// Concretely, this function will lowercase the string and replace any
75+
// reserved characters with "_".
76+
func normalizeServiceName(s string) string {
77+
s = strings.ToLower(s)
78+
s = strings.Map(replaceReservedRune, s)
79+
return s
80+
}
81+
82+
func replaceReservedRune(r rune) rune {
83+
switch r {
84+
case '\\', '/', '*', '?', '"', '<', '>', '|', ' ', ',', '#', ':':
85+
// These characters are not permitted in data stream names
86+
// by Elasticsearch.
87+
return '_'
88+
case '-':
89+
// Hyphens are used to separate the data stream type, dataset,
90+
// and namespace.
91+
return '_'
92+
}
93+
return r
2894
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package routing_test
19+
20+
import (
21+
"testing"
22+
23+
"github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/routing"
24+
"github.com/stretchr/testify/assert"
25+
"go.opentelemetry.io/collector/pdata/pcommon"
26+
)
27+
28+
func TestDataStremaEncoderDefault(t *testing.T) {
29+
resource := pcommon.NewResource()
30+
routing.EncodeDataStream(resource, "logs", false)
31+
32+
attributes := resource.Attributes()
33+
34+
dataStreamType, ok := attributes.Get("data_stream.type")
35+
assert.True(t, ok)
36+
assert.Equal(t, "logs", dataStreamType.Str())
37+
38+
dataStreamDataset, ok := attributes.Get("data_stream.dataset")
39+
assert.True(t, ok)
40+
assert.Equal(t, "apm", dataStreamDataset.Str())
41+
42+
dataStreamNamespace, ok := attributes.Get("data_stream.namespace")
43+
assert.True(t, ok)
44+
assert.Equal(t, "default", dataStreamNamespace.Str())
45+
}
46+
47+
func TestDataStreamEncoderWithServiceName(t *testing.T) {
48+
resource := pcommon.NewResource()
49+
attributes := resource.Attributes()
50+
attributes.PutStr("service.name", "my-service")
51+
52+
routing.EncodeDataStream(resource, "metrics", true)
53+
54+
dataStreamType, ok := attributes.Get("data_stream.type")
55+
assert.True(t, ok)
56+
assert.Equal(t, "metrics", dataStreamType.Str())
57+
58+
dataStreamDataset, ok := attributes.Get("data_stream.dataset")
59+
assert.True(t, ok)
60+
assert.Equal(t, "apm.app.my_service", dataStreamDataset.Str())
61+
62+
dataStreamNamespace, ok := attributes.Get("data_stream.namespace")
63+
assert.True(t, ok)
64+
assert.Equal(t, "default", dataStreamNamespace.Str())
65+
}

processor/elasticapmprocessor/processor.go

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ func (p *TraceProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) er
6565
resourceSpan := resourceSpans.At(i)
6666
resource := resourceSpan.Resource()
6767
ecs.TranslateResourceMetadata(resource)
68-
routing.EncodeDataStream(resource, "traces")
68+
// Traces signal never need to be routed to service-specific datasets
69+
routing.EncodeDataStream(resource, routing.DataStreamTypeTraces, false)
6970
p.enricher.Config.Resource.DeploymentEnvironment.Enabled = false
7071
}
7172
}
@@ -92,18 +93,20 @@ type LogProcessor struct {
9293
component.StartFunc
9394
component.ShutdownFunc
9495

95-
next consumer.Logs
96-
enricher *enrichments.Enricher
97-
logger *zap.Logger
98-
skipEnrichment bool
96+
next consumer.Logs
97+
enricher *enrichments.Enricher
98+
logger *zap.Logger
99+
skipEnrichment bool
100+
datasetWithServiceName bool
99101
}
100102

101103
func newLogProcessor(cfg *Config, next consumer.Logs, logger *zap.Logger) *LogProcessor {
102104
return &LogProcessor{
103-
next: next,
104-
logger: logger,
105-
enricher: enrichments.NewEnricher(cfg.Config),
106-
skipEnrichment: cfg.SkipEnrichment,
105+
next: next,
106+
logger: logger,
107+
enricher: enrichments.NewEnricher(cfg.Config),
108+
skipEnrichment: cfg.SkipEnrichment,
109+
datasetWithServiceName: cfg.ServiceNameInDataStreamDataset,
107110
}
108111
}
109112

@@ -115,18 +118,20 @@ type MetricProcessor struct {
115118
component.StartFunc
116119
component.ShutdownFunc
117120

118-
next consumer.Metrics
119-
enricher *enrichments.Enricher
120-
logger *zap.Logger
121-
skipEnrichment bool
121+
next consumer.Metrics
122+
enricher *enrichments.Enricher
123+
logger *zap.Logger
124+
skipEnrichment bool
125+
datasetWithServiceName bool
122126
}
123127

124128
func newMetricProcessor(cfg *Config, next consumer.Metrics, logger *zap.Logger) *MetricProcessor {
125129
return &MetricProcessor{
126-
next: next,
127-
logger: logger,
128-
enricher: enrichments.NewEnricher(cfg.Config),
129-
skipEnrichment: cfg.SkipEnrichment,
130+
next: next,
131+
logger: logger,
132+
enricher: enrichments.NewEnricher(cfg.Config),
133+
skipEnrichment: cfg.SkipEnrichment,
134+
datasetWithServiceName: cfg.ServiceNameInDataStreamDataset,
130135
}
131136
}
132137

@@ -142,7 +147,7 @@ func (p *MetricProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics
142147
resourceMetric := resourceMetrics.At(i)
143148
resource := resourceMetric.Resource()
144149
ecs.TranslateResourceMetadata(resource)
145-
routing.EncodeDataStream(resource, "metrics")
150+
routing.EncodeDataStream(resource, routing.DataStreamTypeMetrics, p.datasetWithServiceName)
146151
p.enricher.Config.Resource.DeploymentEnvironment.Enabled = false
147152
}
148153
}
@@ -162,7 +167,7 @@ func (p *LogProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
162167
resourceLog := resourceLogs.At(i)
163168
resource := resourceLog.Resource()
164169
ecs.TranslateResourceMetadata(resource)
165-
routing.EncodeDataStream(resource, "logs")
170+
routing.EncodeDataStream(resource, routing.DataStreamTypeLogs, p.datasetWithServiceName)
166171
p.enricher.Config.Resource.AgentVersion.Enabled = false
167172
p.enricher.Config.Resource.DeploymentEnvironment.Enabled = false
168173
}

processor/elasticapmprocessor/processor_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,3 +254,61 @@ func TestSkipEnrichmentMetrics(t *testing.T) {
254254
})
255255
}
256256
}
257+
258+
func TestServiceNameInDataStreamDataset_Logs(t *testing.T) {
259+
ctx := context.Background()
260+
ctx = client.NewContext(ctx, client.Info{
261+
Metadata: client.NewMetadata(map[string][]string{"x-elastic-mapping-mode": {"ecs"}}),
262+
})
263+
cfg := createDefaultConfig().(*Config)
264+
cfg.ServiceNameInDataStreamDataset = true
265+
266+
factory := NewFactory()
267+
settings := processortest.NewNopSettings(metadata.Type)
268+
settings.TelemetrySettings.Logger = zaptest.NewLogger(t, zaptest.Level(zapcore.DebugLevel))
269+
next := &consumertest.LogsSink{}
270+
271+
lp, err := factory.CreateLogs(ctx, settings, cfg, next)
272+
require.NoError(t, err)
273+
274+
dir := filepath.Join("testdata", "elastic_apm")
275+
inputLogs, err := golden.ReadLogs(filepath.Join(dir, "logs_input.yaml"))
276+
require.NoError(t, err)
277+
278+
outputFile := filepath.Join(dir, "logs_output.yaml")
279+
require.NoError(t, lp.ConsumeLogs(ctx, inputLogs))
280+
actual := next.AllLogs()[0]
281+
282+
expectedLogs, err := golden.ReadLogs(outputFile)
283+
require.NoError(t, err)
284+
assert.NoError(t, plogtest.CompareLogs(expectedLogs, actual))
285+
}
286+
287+
func TestServiceNameInDataStream_Metrics(t *testing.T) {
288+
ctx := context.Background()
289+
ctx = client.NewContext(ctx, client.Info{
290+
Metadata: client.NewMetadata(map[string][]string{"x-elastic-mapping-mode": {"ecs"}}),
291+
})
292+
cfg := createDefaultConfig().(*Config)
293+
cfg.ServiceNameInDataStreamDataset = true
294+
295+
factory := NewFactory()
296+
settings := processortest.NewNopSettings(metadata.Type)
297+
settings.TelemetrySettings.Logger = zaptest.NewLogger(t, zaptest.Level(zapcore.DebugLevel))
298+
next := &consumertest.MetricsSink{}
299+
300+
mp, err := factory.CreateMetrics(ctx, settings, cfg, next)
301+
require.NoError(t, err)
302+
303+
dir := filepath.Join("testdata", "elastic_apm")
304+
inputMetrics, err := golden.ReadMetrics(filepath.Join(dir, "metrics_input.yaml"))
305+
require.NoError(t, err)
306+
307+
outputFile := filepath.Join(dir, "metrics_output.yaml")
308+
require.NoError(t, mp.ConsumeMetrics(ctx, inputMetrics))
309+
actual := next.AllMetrics()[0]
310+
311+
expectedMetrics, err := golden.ReadMetrics(outputFile)
312+
require.NoError(t, err)
313+
assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, actual))
314+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
resourceLogs:
2+
- resource:
3+
attributes:
4+
- key: service.name
5+
value:
6+
stringValue: test-service
7+
scopeLogs:
8+
- logRecords:
9+
- body:
10+
stringValue: test log message
11+
timeUnixNano: "0"
12+
scope: {}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
resourceLogs:
2+
- resource:
3+
attributes:
4+
- key: service.name
5+
value:
6+
stringValue: test-service
7+
- key: data_stream.type
8+
value:
9+
stringValue: logs
10+
- key: data_stream.dataset
11+
value:
12+
stringValue: apm.app.test_service
13+
- key: data_stream.namespace
14+
value:
15+
stringValue: default
16+
- key: agent.name
17+
value:
18+
stringValue: otlp
19+
scopeLogs:
20+
- logRecords:
21+
- body:
22+
stringValue: test log message
23+
scope: {}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
resourceMetrics:
2+
- resource:
3+
attributes:
4+
- key: service.name
5+
value:
6+
stringValue: test-service
7+
scopeMetrics:
8+
- metrics:
9+
- name: test_metric
10+
gauge:
11+
dataPoints:
12+
- asInt: 123
13+
timeUnixNano: "0"
14+
scope: {}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
resourceMetrics:
2+
- resource:
3+
attributes:
4+
- key: agent.name
5+
value:
6+
stringValue: otlp
7+
- key: agent.version
8+
value:
9+
stringValue: unknown
10+
- key: data_stream.dataset
11+
value:
12+
stringValue: apm.app.test_service
13+
- key: data_stream.namespace
14+
value:
15+
stringValue: default
16+
- key: data_stream.type
17+
value:
18+
stringValue: metrics
19+
- key: service.name
20+
value:
21+
stringValue: test-service
22+
scopeMetrics:
23+
- metrics:
24+
- gauge:
25+
dataPoints:
26+
- asInt: "123"
27+
name: test_metric
28+
scope: {}

0 commit comments

Comments
 (0)