Skip to content

Commit b5fbbfa

Browse files
feat: add add_kubernetes_metadata processor to OTel Beat processor (#47942) (#47981)
* feat: add `add_kubernetes_metadata` processor to OTel Beat processor * test: use test Logger in tests * refactor: fix typo in function name * test: revert to no-op logger (cherry picked from commit 3a9d1f4) Co-authored-by: Andrzej Stencel <[email protected]>
1 parent 5b95438 commit b5fbbfa

File tree

3 files changed

+148
-70
lines changed

3 files changed

+148
-70
lines changed

x-pack/otel/processor/beatprocessor/README.md

Lines changed: 60 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,6 @@
66

77
[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development
88

9-
> [!NOTE]
10-
> This component is currently in development and functionality is limited.
11-
129
The Beat processor (`beat`) is an OpenTelemetry Collector processor that wraps the [Beat processors].
1310
This allows you to use Beat processors like e.g. [add_host_metadata] anywhere in the OpenTelemetry Collector's pipeline, independently of Beat receivers.
1411

@@ -17,11 +14,10 @@ This allows you to use Beat processors like e.g. [add_host_metadata] anywhere in
1714
> This is because it relies on the specific structure of telemetry emitted by those components.
1815
> Using it with data coming from other components is not recommended and may result in unexpected behavior.
1916
20-
The processor enriches the telemetry with host metadata by using the [add_host_metadata] processor under the hood.
21-
Note that configuration is limited at this stage.
22-
Host metadata is added unconditionally and cannot be disabled.
23-
You can configure the host metadata enrichment using the options that the [add_host_metadata] processor allows.
24-
The only exception is that the option `replace_fields` is always set to `true` and setting it to `false` has no effect.
17+
Here are the currently supported processors:
18+
19+
- [add_host_metadata]
20+
- [add_kubernetes_metadata]
2521

2622
## Default processors in Beat receivers
2723

@@ -61,11 +57,17 @@ receivers:
6157
The above Filebeat receiver configuration specifies an empty list of processors.
6258
In this case, none of the default processors are ran as part of the Filebeat receiver.
6359

64-
## Example
60+
## Examples
6561

66-
The following [Filebeat receiver] configuration
62+
The following OpenTelemetry Collector configuration using only the [Filebeat receiver]:
6763

6864
```yaml
65+
service:
66+
pipelines:
67+
logs:
68+
receivers: [filebeatreceiver]
69+
exporters: [debug]
70+
6971
receivers:
7072
filebeatreceiver:
7173
filebeat:
@@ -78,11 +80,21 @@ receivers:
7880
- add_host_metadata:
7981
netinfo:
8082
enabled: false
83+
84+
exporters:
85+
debug:
8186
```
8287

8388
is functionally equivalent to this one, using the Beat processor:
8489

8590
```yaml
91+
service:
92+
pipelines:
93+
logs:
94+
receivers: [filebeatreceiver]
95+
processors: [beat]
96+
exporters: [debug]
97+
8698
receivers:
8799
filebeatreceiver:
88100
filebeat:
@@ -99,12 +111,50 @@ processors:
99111
- add_host_metadata:
100112
netinfo:
101113
enabled: false
114+
115+
exporters:
116+
debug:
102117
```
103118

119+
## Using the `add_host_metadata` processor
120+
121+
To use the [add_host_metadata] processor, configure the processor as follows:
122+
123+
```yaml
124+
processors:
125+
beat:
126+
processors:
127+
- add_host_metadata:
128+
```
129+
130+
You can configure the host metadata enrichment using the options supported by the [add_host_metadata] processor.
131+
132+
## Using the `add_kubernetes_metadata` processor
133+
134+
To use the [add_kubernetes_metadata] processor, configure the processor as follows:
135+
136+
```yaml
137+
processors:
138+
beat:
139+
processors:
140+
- add_kubernetes_metadata:
141+
indexers:
142+
- container:
143+
matchers:
144+
- logs_path:
145+
```
146+
147+
You can configure the Kubernetes metadata enrichment using the options supported by the [add_kubernetes_metadata] processor.
148+
149+
Note that you need to explicitly configure at least one [indexer][indexers] and at least one [matcher][matchers] for the enrichment to work.
150+
In the example above, the `container` indexer and the `logs_path` matcher are configured.
151+
104152
[Beat processors]: https://www.elastic.co/docs/reference/beats/filebeat/filtering-enhancing-data#using-processors
105153
[Filebeat receiver]: https://github.com/elastic/beats/tree/main/x-pack/filebeat/fbreceiver
106154
[Metricbeat receiver]: https://github.com/elastic/beats/tree/main/x-pack/metricbeat/mbreceiver
107155
[add_cloud_metadata]: https://www.elastic.co/docs/reference/beats/filebeat/add-cloud-metadata
108156
[add_docker_metadata]: https://www.elastic.co/docs/reference/beats/filebeat/add-docker-metadata
109157
[add_host_metadata]: https://www.elastic.co/docs/reference/beats/filebeat/add-host-metadata
110158
[add_kubernetes_metadata]: https://www.elastic.co/docs/reference/beats/filebeat/add-kubernetes-metadata
159+
[indexers]: https://www.elastic.co/docs/reference/beats/filebeat/add-kubernetes-metadata#_indexers
160+
[matchers]: https://www.elastic.co/docs/reference/beats/filebeat/add-kubernetes-metadata#_matchers

x-pack/otel/processor/beatprocessor/processor.go

Lines changed: 69 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ import (
1010
"fmt"
1111

1212
"github.com/elastic/beats/v7/libbeat/beat"
13+
"github.com/elastic/beats/v7/libbeat/otelbeat/otelmap"
1314
"github.com/elastic/beats/v7/libbeat/processors/add_host_metadata"
15+
"github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata"
1416
"github.com/elastic/elastic-agent-libs/config"
1517
"github.com/elastic/elastic-agent-libs/logp"
1618
"github.com/elastic/elastic-agent-libs/mapstr"
1719

18-
"go.opentelemetry.io/collector/pdata/pcommon"
1920
"go.opentelemetry.io/collector/pdata/plog"
2021
"go.opentelemetry.io/collector/processor"
2122
"go.uber.org/zap"
@@ -32,20 +33,30 @@ func newBeatProcessor(set processor.Settings, cfg *Config) (*beatProcessor, erro
3233
processors: []beat.Processor{},
3334
}
3435

36+
logpLogger, err := logp.ConfigureWithCoreLocal(logp.Config{}, set.Logger.Core())
37+
if err != nil {
38+
return nil, fmt.Errorf("failed to configure logp logger: %w", err)
39+
}
40+
3541
for _, processorConfig := range cfg.Processors {
36-
processor, err := createProcessor(processorConfig)
42+
processor, err := createProcessor(processorConfig, logpLogger)
3743
if err != nil {
3844
return nil, fmt.Errorf("failed to create processor: %w", err)
3945
}
4046
if processor != nil {
4147
bp.processors = append(bp.processors, processor)
48+
bp.logger.Info("Configured Beat processor", zap.String("processor_name", processor.String()))
4249
}
4350
}
4451

4552
return bp, nil
4653
}
4754

48-
func createProcessor(cfg map[string]any) (beat.Processor, error) {
55+
// createProcessor creates a Beat processor using the provided configuration.
56+
// The configuration is expected to be a map with a single key containing the processor name
57+
// and the processor's configuration as the value for that key.
58+
// For example: {"add_host_metadata":{"netinfo":{"enabled":false}}}
59+
func createProcessor(cfg map[string]any, logpLogger *logp.Logger) (beat.Processor, error) {
4960
if len(cfg) == 0 {
5061
return nil, nil
5162
}
@@ -59,58 +70,70 @@ func createProcessor(cfg map[string]any) (beat.Processor, error) {
5970
}
6071
return nil, fmt.Errorf("expected single processor name but got %v", len(cfg))
6172
}
73+
6274
for processorName, processorConfig := range cfg {
6375
switch processorName {
6476
case "add_host_metadata":
65-
return createAddHostMetadataProcessor(processorConfig)
77+
return createAddHostMetadataProcessor(processorConfig, logpLogger)
78+
case "add_kubernetes_metadata":
79+
return createAddKubernetesMetadataProcessor(processorConfig, logpLogger)
6680
default:
6781
return nil, fmt.Errorf("invalid processor name '%s'", processorName)
6882
}
6983
}
7084
return nil, errors.New("malformed processor config")
7185
}
7286

73-
func createAddHostMetadataProcessor(cfg any) (beat.Processor, error) {
87+
func createAddHostMetadataProcessor(cfg any, logpLogger *logp.Logger) (beat.Processor, error) {
7488
addHostMetadataConfig, err := config.NewConfigFrom(cfg)
7589
if err != nil {
7690
return nil, fmt.Errorf("failed to create add_host_metadata processor config: %w", err)
7791
}
78-
addHostMetadataProcessor, err := add_host_metadata.New(addHostMetadataConfig, logp.NewLogger("beatprocessor"))
92+
addHostMetadataProcessor, err := add_host_metadata.New(addHostMetadataConfig, logpLogger)
7993
if err != nil {
8094
return nil, fmt.Errorf("failed to create add_host_metadata processor: %w", err)
8195
}
8296
return addHostMetadataProcessor, nil
8397
}
8498

99+
func createAddKubernetesMetadataProcessor(cfg any, logpLogger *logp.Logger) (beat.Processor, error) {
100+
addKubernetesMetadataConfig, err := config.NewConfigFrom(cfg)
101+
if err != nil {
102+
return nil, fmt.Errorf("failed to create add_kubernetes_metadata processor config: %w", err)
103+
}
104+
addKubernetesMetadataProcessor, err := add_kubernetes_metadata.New(addKubernetesMetadataConfig, logpLogger)
105+
if err != nil {
106+
return nil, fmt.Errorf("failed to create add_kubernetes_metadata processor: %w", err)
107+
}
108+
return addKubernetesMetadataProcessor, nil
109+
}
110+
85111
func (p *beatProcessor) ConsumeLogs(_ context.Context, logs plog.Logs) (plog.Logs, error) {
86112
if len(p.processors) == 0 {
87113
return logs, nil
88114
}
89115

90-
for _, hostProcessor := range p.processors {
91-
dummyEvent := &beat.Event{}
92-
dummyEvent.Fields = mapstr.M{}
93-
dummyEvent.Meta = mapstr.M{}
94-
dummyEventWithHostMetadata, err := hostProcessor.Run(dummyEvent)
95-
if err != nil {
96-
p.logger.Error("error processing host metadata", zap.Error(err))
97-
continue
98-
}
99-
hostMap, ok := dummyEventWithHostMetadata.Fields["host"].(mapstr.M)
100-
if !ok {
101-
p.logger.Error("error casting host metadata to mapstr.M", zap.Error(err))
102-
continue
103-
}
104-
otelMap, err := toOtelMap(&hostMap)
105-
if err != nil {
106-
p.logger.Error("error converting host metadata", zap.Error(err))
107-
continue
108-
}
109-
for _, resourceLogs := range logs.ResourceLogs().All() {
110-
for _, scopeLogs := range resourceLogs.ScopeLogs().All() {
111-
for _, logRecord := range scopeLogs.LogRecords().All() {
112-
bodyMap := logRecord.Body().Map().PutEmptyMap("host")
113-
otelMap.CopyTo(bodyMap)
116+
for _, resourceLogs := range logs.ResourceLogs().All() {
117+
for _, scopeLogs := range resourceLogs.ScopeLogs().All() {
118+
for _, logRecord := range scopeLogs.LogRecords().All() {
119+
beatEvent, err := unpackBeatEventFromOTelLogRecord(logRecord)
120+
if err != nil {
121+
p.logger.Error("error converting OTel log to Beat event", zap.Error(err))
122+
continue
123+
}
124+
125+
for _, processor := range p.processors {
126+
processedEvent, err := processor.Run(beatEvent)
127+
if err != nil {
128+
p.logger.Error("error processing Beat event", zap.Error(err))
129+
continue
130+
}
131+
beatEvent = processedEvent
132+
}
133+
134+
packingError := packBeatEventIntoOTelLogRecord(beatEvent, logRecord)
135+
if packingError != nil {
136+
p.logger.Error("error converting processed Beat event to OTel log record", zap.Error(packingError))
114137
}
115138
}
116139
}
@@ -119,29 +142,20 @@ func (p *beatProcessor) ConsumeLogs(_ context.Context, logs plog.Logs) (plog.Log
119142
return logs, nil
120143
}
121144

122-
func toOtelMap(m *mapstr.M) (pcommon.Map, error) {
123-
otelMap := pcommon.NewMap()
124-
for key, value := range *m {
125-
switch typedValue := value.(type) {
126-
case mapstr.M:
127-
subMap, err := toOtelMap(&typedValue)
128-
if err != nil {
129-
return pcommon.Map{}, fmt.Errorf("failed to convert map for key '%s': %w", key, err)
130-
}
131-
otelSubMap := otelMap.PutEmptyMap(key)
132-
subMap.MoveTo(otelSubMap)
133-
case []string:
134-
otelValue := otelMap.PutEmptySlice(key)
135-
for _, item := range typedValue {
136-
otelValue.AppendEmpty().SetStr(item)
137-
}
138-
default:
139-
otelValue := otelMap.PutEmpty(key)
140-
err := otelValue.FromRaw(typedValue)
141-
if err != nil {
142-
return pcommon.Map{}, fmt.Errorf("failed to convert value for key '%s': %w", key, err)
143-
}
144-
}
145-
}
146-
return otelMap, nil
145+
func unpackBeatEventFromOTelLogRecord(logRecord plog.LogRecord) (*beat.Event, error) {
146+
beatEvent := &beat.Event{}
147+
beatEvent.Timestamp = logRecord.Timestamp().AsTime()
148+
149+
beatEvent.Meta = mapstr.M{}
150+
151+
beatEvent.Fields = logRecord.Body().Map().AsRaw()
152+
153+
return beatEvent, nil
154+
}
155+
156+
func packBeatEventIntoOTelLogRecord(beatEvent *beat.Event, logRecord plog.LogRecord) error {
157+
beatEvent.Fields = beatEvent.Fields.Clone()
158+
otelmap.ConvertNonPrimitive((map[string]any)(beatEvent.Fields))
159+
err := logRecord.Body().Map().FromRaw(beatEvent.Fields)
160+
return err
147161
}

x-pack/otel/processor/beatprocessor/processor_test.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"testing"
1111

1212
"github.com/elastic/beats/v7/libbeat/beat"
13+
"github.com/elastic/elastic-agent-libs/logp"
1314
"github.com/elastic/elastic-agent-libs/mapstr"
1415

1516
"github.com/stretchr/testify/assert"
@@ -65,15 +66,19 @@ func TestConsumeLogs(t *testing.T) {
6566
}
6667
}
6768

69+
func testLogger() *logp.Logger {
70+
return logp.NewNopLogger()
71+
}
72+
6873
func TestCreateProcessor(t *testing.T) {
6974
t.Run("nil config returns nil processor", func(t *testing.T) {
70-
processor, err := createProcessor(nil)
75+
processor, err := createProcessor(nil, testLogger())
7176
require.NoError(t, err)
7277
assert.Nil(t, processor)
7378
})
7479

7580
t.Run("empty config returns nil processor", func(t *testing.T) {
76-
processor, err := createProcessor(map[string]any{})
81+
processor, err := createProcessor(map[string]any{}, testLogger())
7782
require.NoError(t, err)
7883
assert.Nil(t, processor)
7984
})
@@ -82,27 +87,36 @@ func TestCreateProcessor(t *testing.T) {
8287
_, err := createProcessor(map[string]any{
8388
"add_host_metadata": map[string]any{},
8489
"another_key": map[string]any{},
85-
})
90+
}, testLogger())
8691
require.Error(t, err)
8792
assert.Contains(t, err.Error(), "expected single processor name")
8893
})
8994

9095
t.Run("unknown processor returns error", func(t *testing.T) {
9196
_, err := createProcessor(map[string]any{
9297
"unknown_processor": map[string]any{},
93-
})
98+
}, testLogger())
9499
require.Error(t, err)
95100
assert.Contains(t, err.Error(), "invalid processor name 'unknown_processor'")
96101
})
97102

98103
t.Run("valid add_host_metadata processor config returns processor", func(t *testing.T) {
99104
processor, err := createProcessor(map[string]any{
100105
"add_host_metadata": map[string]any{},
101-
})
106+
}, testLogger())
102107
require.NoError(t, err)
103108
require.NotNil(t, processor)
104109
assert.Equal(t, "add_host_metadata", processor.String()[:len("add_host_metadata")])
105110
})
111+
112+
t.Run("valid add_kubernetes_metadata processor config returns processor", func(t *testing.T) {
113+
processor, err := createProcessor(map[string]any{
114+
"add_kubernetes_metadata": map[string]any{},
115+
}, testLogger())
116+
require.NoError(t, err)
117+
require.NotNil(t, processor)
118+
assert.Equal(t, "add_kubernetes_metadata", processor.String()[:len("add_kubernetes_metadata")])
119+
})
106120
}
107121

108122
type mockProcessor struct {

0 commit comments

Comments
 (0)