Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion processor/elasticapmprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ The processor enriches traces, metrics, and logs with elastic specific requireme

## Configuration

| Field | Description | Required | Default |
|--------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------|
| `skip_enrichment` | Controls whether enrichment should be skipped for logs and metrics when the mapping mode is not "ecs". When `true`, logs and metrics are only enriched when the `x-elastic-mapping-mode` metadata is set to "ecs". Traces are always enriched regardless of this setting. | No | `false` |
| `service_name_in_datastream_dataset` | Controls whether the service.name attribute is included in the data_stream.dataset value. If true, the dataset will be in the format "apm.app.<service.name>". | No | `false` | |
| `host_ip_enabled` | Controls whether the `host.ip` attribute should be set using the client address. When `true`, the processor will set the `host.ip` attribute from the client address when the mapping mode is "ecs". | No | `false` |

The processor configuration embeds all configuration options from [opentelemetry-lib enrichments config](https://github.com/elastic/opentelemetry-lib/tree/main/enrichments/config). All opentelemetry-lib enricher configuration fields are available and can be configured directly in the processor configuration.

### Enable all enrichments

```yaml
Expand All @@ -31,4 +39,4 @@ When `skip_enrichment` is set to `true`, logs and metrics are only enriched when
processors:
elasticapm:
skip_enrichment: true
```
```
5 changes: 5 additions & 0 deletions processor/elasticapmprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,9 @@ type Config struct {
// is included in the data_stream.dataset value. If true, the dataset will be
// in the format "apm.app.<service.name>". Defaults to false for backwards compatibility.
ServiceNameInDataStreamDataset bool `mapstructure:"service_name_in_datastream_dataset"`

// HostIPEnabled controls whether the `host.ip` resource attribute should be set using client info address.
// When true, the processor will set the `host.ip` attribute from the client address when
// the mapping mode is "ecs". Defaults to true.
HostIPEnabled bool `mapstructure:"host_ip_enabled"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the reasoning behind doing this via a flag vs adding it by default if the Client address exists?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the config since I thought users may not/need this new field and to align with the existing approach in the opentelemtry-lib config where each field enrichment is behind a config.

}
45 changes: 45 additions & 0 deletions processor/elasticapmprocessor/internal/ecs/client_address.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package ecs // import "github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/ecs"

import (
"context"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/pdata/pcommon"
)

// SetHostIP sets the `host.ip` attribute using the client address.
// Does not override any existing `host.ip` values.
// Safely handles empty client addresses and empty attribute maps.
func SetHostIP(ctx context.Context, attributesMap pcommon.Map) {
cl := client.FromContext(ctx)
if cl.Addr == nil {
return
}

ip := cl.Addr.String()
if ip == "" {
return
}

// Only set host.ip when it does not exist or is empty
if value, ok := attributesMap.Get("host.ip"); !ok || value.Str() == "" {
attributesMap.PutStr("host.ip", ip)
}
}
101 changes: 101 additions & 0 deletions processor/elasticapmprocessor/internal/ecs/client_address_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package ecs

import (
"context"
"net"
"testing"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/pdata/pcommon"
)

func TestSetHostIP(t *testing.T) {
var (
ctxWithAddr = client.NewContext(context.Background(), client.Info{
Addr: &net.IPAddr{
IP: net.IPv4(1, 2, 3, 4),
},
Metadata: client.NewMetadata(map[string][]string{"x-elastic-mapping-mode": {"ecs"}}),
})
ctxWithEmptyAddr = client.NewContext(context.Background(), client.Info{
Addr: &net.IPAddr{IP: nil},
Metadata: client.NewMetadata(map[string][]string{"x-elastic-mapping-mode": {"ecs"}}),
})
)

tests := []struct {
name string
ctx context.Context
attributesMap pcommon.Map
wantIP string
}{
{
name: "empty attributes map with valid client address",
ctx: ctxWithAddr,
attributesMap: pcommon.NewMap(),
wantIP: "1.2.3.4",
},
{
name: "empty client context",
ctx: context.Background(),
attributesMap: pcommon.NewMap(),
wantIP: "",
},
{
name: "client with nil address",
ctx: ctxWithEmptyAddr,
attributesMap: pcommon.NewMap(),
wantIP: "",
},
{
name: "attributes map with existing host.ip",
ctx: ctxWithAddr,
attributesMap: func() pcommon.Map {
m := pcommon.NewMap()
m.PutStr("host.ip", "10.0.0.1")
m.PutStr("service.name", "test-service")
return m
}(),
wantIP: "10.0.0.1",
},
{
name: "attributes map with empty host.ip string",
ctx: ctxWithAddr,
attributesMap: func() pcommon.Map {
m := pcommon.NewMap()
m.PutStr("host.ip", "")
m.PutStr("service.name", "test-service")
return m
}(),
wantIP: "1.2.3.4",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
SetHostIP(tt.ctx, tt.attributesMap)

value, _ := tt.attributesMap.Get("host.ip")
if value.Str() != tt.wantIP {
t.Errorf("SetHostIP() host.ip = %q, want %q", value.Str(), tt.wantIP)
}
})
}
}
63 changes: 36 additions & 27 deletions processor/elasticapmprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ package elasticapmprocessor // import "github.com/elastic/opentelemetry-collecto
import (
"context"

"github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/ecs"
"github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/routing"
"github.com/elastic/opentelemetry-lib/enrichments"
"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -31,6 +28,10 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/processor"
"go.uber.org/zap"

"github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/ecs"
"github.com/elastic/opentelemetry-collector-components/processor/elasticapmprocessor/internal/routing"
"github.com/elastic/opentelemetry-lib/enrichments"
)

var _ processor.Traces = (*TraceProcessor)(nil)
Expand All @@ -44,13 +45,15 @@ type TraceProcessor struct {
next consumer.Traces
enricher *enrichments.Enricher
logger *zap.Logger
cfg *Config
}

func NewTraceProcessor(cfg *Config, next consumer.Traces, logger *zap.Logger) *TraceProcessor {
return &TraceProcessor{
next: next,
logger: logger,
enricher: enrichments.NewEnricher(cfg.Config),
cfg: cfg,
}
}

Expand All @@ -68,6 +71,10 @@ func (p *TraceProcessor) ConsumeTraces(ctx context.Context, td ptrace.Traces) er
ecs.ApplyResourceConventions(resource)
// Traces signal never need to be routed to service-specific datasets
routing.EncodeDataStream(resource, routing.DataStreamTypeTraces, false)
if p.cfg.HostIPEnabled {
ecs.SetHostIP(ctx, resource.Attributes())
}
// Traces signal never need to be routed to service-specific datasets
p.enricher.Config.Resource.DeploymentEnvironment.Enabled = false
}
}
Expand All @@ -94,20 +101,18 @@ type LogProcessor struct {
component.StartFunc
component.ShutdownFunc

next consumer.Logs
enricher *enrichments.Enricher
logger *zap.Logger
skipEnrichment bool
datasetWithServiceName bool
next consumer.Logs
enricher *enrichments.Enricher
logger *zap.Logger
cfg *Config
}

func newLogProcessor(cfg *Config, next consumer.Logs, logger *zap.Logger) *LogProcessor {
return &LogProcessor{
next: next,
logger: logger,
enricher: enrichments.NewEnricher(cfg.Config),
skipEnrichment: cfg.SkipEnrichment,
datasetWithServiceName: cfg.ServiceNameInDataStreamDataset,
next: next,
logger: logger,
enricher: enrichments.NewEnricher(cfg.Config),
cfg: cfg,
}
}

Expand All @@ -119,20 +124,18 @@ type MetricProcessor struct {
component.StartFunc
component.ShutdownFunc

next consumer.Metrics
enricher *enrichments.Enricher
logger *zap.Logger
skipEnrichment bool
datasetWithServiceName bool
next consumer.Metrics
enricher *enrichments.Enricher
logger *zap.Logger
cfg *Config
}

func newMetricProcessor(cfg *Config, next consumer.Metrics, logger *zap.Logger) *MetricProcessor {
return &MetricProcessor{
next: next,
logger: logger,
enricher: enrichments.NewEnricher(cfg.Config),
skipEnrichment: cfg.SkipEnrichment,
datasetWithServiceName: cfg.ServiceNameInDataStreamDataset,
next: next,
logger: logger,
enricher: enrichments.NewEnricher(cfg.Config),
cfg: cfg,
}
}

Expand All @@ -149,13 +152,16 @@ func (p *MetricProcessor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics
resource := resourceMetric.Resource()
ecs.TranslateResourceMetadata(resource)
ecs.ApplyResourceConventions(resource)
routing.EncodeDataStream(resource, routing.DataStreamTypeMetrics, p.datasetWithServiceName)
routing.EncodeDataStream(resource, routing.DataStreamTypeMetrics, p.cfg.ServiceNameInDataStreamDataset)
if p.cfg.HostIPEnabled {
ecs.SetHostIP(ctx, resource.Attributes())
}
p.enricher.Config.Resource.DeploymentEnvironment.Enabled = false
}
}
// When skipEnrichment is true, only enrich when mapping mode is ecs
// When skipEnrichment is false (default), always enrich (backwards compatible)
if !p.skipEnrichment || ecsMode {
if !p.cfg.SkipEnrichment || ecsMode {
p.enricher.EnrichMetrics(md)
}
return p.next.ConsumeMetrics(ctx, md)
Expand All @@ -170,14 +176,17 @@ func (p *LogProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
resource := resourceLog.Resource()
ecs.TranslateResourceMetadata(resource)
ecs.ApplyResourceConventions(resource)
routing.EncodeDataStream(resource, routing.DataStreamTypeLogs, p.datasetWithServiceName)
routing.EncodeDataStream(resource, routing.DataStreamTypeLogs, p.cfg.ServiceNameInDataStreamDataset)
if p.cfg.HostIPEnabled {
ecs.SetHostIP(ctx, resource.Attributes())
}
p.enricher.Config.Resource.AgentVersion.Enabled = false
p.enricher.Config.Resource.DeploymentEnvironment.Enabled = false
}
}
// When skipEnrichment is true, only enrich when mapping mode is ecs
// When skipEnrichment is false (default), always enrich (backwards compatible)
if !p.skipEnrichment || ecsMode {
if !p.cfg.SkipEnrichment || ecsMode {
p.enricher.EnrichLogs(ld)
}
return p.next.ConsumeLogs(ctx, ld)
Expand Down
Loading