Skip to content

Commit 92c205a

Browse files
authored
[processor/k8s_attributes] Share processor across signal types (#47686)
#### Description Allow sharing of k8sattributes processor instances between pipelines. The implementation uses sharedcomponent and is hidden behind a feature gate. #### Link to tracking issue Partially fixes #36234. #### Testing Added a e2e test that: - Enabled the feature gate - Creates two processors with different configuration, one of which is shared between two pipelines - Verifies that data emitted by both is correct - Verifies that the shared processor is actually shared by looking at logs #### Documentation Autogenerated feature gate documentation.
1 parent 3c598f2 commit 92c205a

17 files changed

Lines changed: 506 additions & 4 deletions

File tree

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: processor/k8s_attributes
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Allow k8sattributes processors to be shared between pipelines
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [36234]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
When the processor.k8sattributes.ShareProcessorBetweenPipelines feature flag is enabled, k8sattributes processors
20+
using the same configuration are shared between pipelines. This reduces the local cache size and the number of
21+
connections to the K8s API Server.
22+
23+
# If your change doesn't affect end users or the exported elements of any package,
24+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
25+
# Optional: The change log or logs in which this entry should be included.
26+
# e.g. '[user]' or '[user, api]'
27+
# Include 'user' if the change is relevant to end users.
28+
# Include 'api' if there is a change to a library API.
29+
# Default: '[user]'
30+
change_logs: []

internal/datadog/e2e/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ require (
336336
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.150.0 // indirect
337337
github.com/open-telemetry/opentelemetry-collector-contrib/internal/metadataproviders v0.150.0 // indirect
338338
github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil v0.150.0 // indirect
339+
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.150.0 // indirect
339340
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/datadog v0.150.0 // indirect
340341
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/experimentalmetricmetadata v0.150.0 // indirect
341342
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.150.0 // indirect
@@ -536,6 +537,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sa
536537

537538
replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/pdatautil => ../../pdatautil
538539

540+
replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../sharedcomponent
541+
539542
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/core/xidutils => ../../../pkg/core/xidutils
540543

541544
replace github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor => ../../../processor/deltatocumulativeprocessor

pkg/xk8stest/k8s_collector.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,3 +232,24 @@ func fetchContainerLogs(ctx context.Context, coreClient corev1client.CoreV1Inter
232232
}
233233
return strings.TrimRight(string(logs), "\n")
234234
}
235+
236+
// FetchPodLogs returns the full log output for the first running pod matching
237+
// the given labels in the specified namespace. It is intended for e2e test
238+
// assertions that inspect collector behavior via its log output.
239+
func FetchPodLogs(t *testing.T, client *K8sClient, namespace string, podLabels map[string]any) string {
240+
t.Helper()
241+
coreClient, err := corev1client.NewForConfig(client.restConfig)
242+
require.NoError(t, err, "failed to create core client for pod logs")
243+
244+
podGVR := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
245+
listOptions := metav1.ListOptions{LabelSelector: SelectorFromMap(podLabels).String()}
246+
list, err := client.DynamicClient.Resource(podGVR).Namespace(namespace).List(t.Context(), listOptions)
247+
require.NoError(t, err, "failed to list pods")
248+
require.NotEmpty(t, list.Items, "no pods found matching labels")
249+
250+
podName := list.Items[0].GetName()
251+
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
252+
defer cancel()
253+
254+
return fetchContainerLogs(ctx, coreClient, namespace, podName, "", false, nil)
255+
}

processor/k8sattributesprocessor/documentation.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ This component has the following feature gates:
320320
| `k8sattr.labelsAnnotationsSingular.allow` | deprecated | When enabled, default k8s label and annotation resource attribute keys will be singular, instead of plural | v0.125.0 | v0.145.0 | [Link](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/39774) |
321321
| `processor.k8sattributes.DontEmitV0K8sConventions` | alpha | When enabled, semconv legacy attributes are disabled. | v0.145.0 | N/A | [Link](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/44589) |
322322
| `processor.k8sattributes.EmitV1K8sConventions` | alpha | When enabled, semconv stable attributes are enabled. | v0.145.0 | N/A | [Link](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/44589) |
323+
| `processor.k8sattributes.ShareProcessorBetweenPipelines` | alpha | When enabled, processor instances with identical configuration are shared across different signal type pipelines, reducing duplicate Kubernetes API watchers. | v0.150.0 | N/A | [Link](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/2450) |
323324
| `processor.k8sattributes.telemetry.disableOldFormatMetrics` | alpha | When enabled, old formatted internal telemetry metrics are disabled. | v0.146.0 | N/A | [Link](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/45871) |
324325
| `processor.k8sattributes.telemetry.enableNewFormatMetrics` | alpha | When enabled, new formatted internal telemetry metrics are enabled. | v0.146.0 | N/A | [Link](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/45871) |
325326

processor/k8sattributesprocessor/e2e_test.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"os/exec"
1313
"path/filepath"
1414
"regexp"
15+
"strings"
1516
"testing"
1617
"time"
1718

@@ -1968,6 +1969,146 @@ func waitForData(t *testing.T, entriesNum int, mc *consumertest.MetricsSink, tc
19681969
len(mc.AllMetrics()), len(tc.AllTraces()), len(lc.AllLogs()), len(pc.AllProfiles()), timeoutMinutes)
19691970
}
19701971

1972+
// TestE2E_SharedProcessor verifies that the k8s_attributes processor is shared across signal types
1973+
// when using the same processor configuration, and that different configurations produce separate
1974+
// processor instances. The test deploys a collector with two k8s_attributes processor configs:
1975+
// - k8s_attributes/full: extracts pod name, uid, namespace, deployment, node name, and pod labels.
1976+
// Used by both the traces and metrics pipelines.
1977+
// - k8s_attributes/minimal: extracts only the pod name.
1978+
// Used by the logs pipeline.
1979+
//
1980+
// The test then asserts that traces and metrics both receive the full set of attributes (proving the
1981+
// shared processor works for both signal types), while logs receive only the minimal set (proving a
1982+
// different config creates a separate processor instance).
1983+
func TestE2E_SharedProcessor(t *testing.T) {
1984+
testDir := filepath.Join("testdata", "e2e", "sharedprocessor")
1985+
1986+
k8sClient, err := k8stest.NewK8sClient(testKubeConfig)
1987+
require.NoError(t, err)
1988+
1989+
nsFile := filepath.Join(testDir, "namespace.yaml")
1990+
buf, err := os.ReadFile(nsFile)
1991+
require.NoErrorf(t, err, "failed to read namespace object file %s", nsFile)
1992+
nsObj, err := k8stest.CreateObject(k8sClient, buf)
1993+
require.NoErrorf(t, err, "failed to create k8s namespace from file %s", nsFile)
1994+
1995+
testNs := nsObj.GetName()
1996+
defer func() {
1997+
require.NoErrorf(t, k8stest.DeleteObject(k8sClient, nsObj), "failed to delete namespace %s", testNs)
1998+
}()
1999+
2000+
metricsConsumer := new(consumertest.MetricsSink)
2001+
tracesConsumer := new(consumertest.TracesSink)
2002+
logsConsumer := new(consumertest.LogsSink)
2003+
profilesConsumer := new(consumertest.ProfilesSink)
2004+
shutdownSinks := startUpSinks(t, metricsConsumer, tracesConsumer, logsConsumer, profilesConsumer)
2005+
defer shutdownSinks()
2006+
2007+
testID := uuid.NewString()[:8]
2008+
collectorObjs := k8stest.CreateCollectorObjects(t, k8sClient, testID, filepath.Join(testDir, "collector"), map[string]string{}, "")
2009+
createTeleOpts := &k8stest.TelemetrygenCreateOpts{
2010+
ManifestsDir: filepath.Join(testDir, "telemetrygen"),
2011+
TestID: testID,
2012+
OtlpEndpoint: fmt.Sprintf("otelcol-%s.%s:4317", testID, testNs),
2013+
DataTypes: []string{"metrics", "logs", "traces"},
2014+
}
2015+
telemetryGenObjs, telemetryGenObjInfos := k8stest.CreateTelemetryGenObjects(t, k8sClient, createTeleOpts)
2016+
defer func() {
2017+
for _, obj := range append(collectorObjs, telemetryGenObjs...) {
2018+
require.NoErrorf(t, k8stest.DeleteObject(k8sClient, obj), "failed to delete object %s", obj.GetName())
2019+
}
2020+
}()
2021+
2022+
for _, info := range telemetryGenObjInfos {
2023+
k8stest.WaitForTelemetryGenToStart(t, k8sClient, info.Namespace, info.PodLabelSelectors, info.Workload, info.DataType)
2024+
}
2025+
2026+
wantEntries := 128
2027+
waitForData(t, wantEntries, metricsConsumer, tracesConsumer, logsConsumer, profilesConsumer)
2028+
2029+
// Verify sharing by inspecting the collector's log output.
2030+
// The kube client logs "k8s filtering" once per Start() call. With sharedcomponent,
2031+
// Start() is called once per unique processor config. We have two configs
2032+
// (k8s_attributes/full and k8s_attributes/minimal), so we expect exactly 2 occurrences.
2033+
// Without sharing, each pipeline would get its own processor, producing 3 occurrences
2034+
// (traces + metrics + logs).
2035+
collectorPodLabels := map[string]any{
2036+
"app.kubernetes.io/name": "opentelemetry-collector",
2037+
"app.kubernetes.io/instance": "otelcol-" + testID,
2038+
}
2039+
podLogs := k8stest.FetchPodLogs(t, k8sClient, testNs, collectorPodLabels)
2040+
initCount := strings.Count(podLogs, "k8s filtering")
2041+
assert.Equal(t, 2, initCount,
2042+
"expected 2 kube client initializations (one per unique processor config), got %d; "+
2043+
"this suggests processors are not being shared across signal types", initCount)
2044+
2045+
// Attributes that the "full" processor (traces + metrics) should add.
2046+
fullAttrs := map[string]*expectedValue{
2047+
"k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-.*-deployment-[a-z0-9]*-[a-z0-9]*"),
2048+
"k8s.pod.uid": newExpectedValue(regex, uidRe),
2049+
"k8s.namespace.name": newExpectedValue(equal, testNs),
2050+
"k8s.deployment.name": newExpectedValue(regex, "telemetrygen-"+testID+"-.*-deployment"),
2051+
"k8s.node.name": newExpectedValue(exist, ""),
2052+
"k8s.labels.app": newExpectedValue(regex, "telemetrygen-"+testID+"-.*-deployment"),
2053+
}
2054+
2055+
// Attributes that the "minimal" processor (logs) should add.
2056+
// It only extracts k8s.pod.name; the other attributes must NOT be present.
2057+
minimalAttrs := map[string]*expectedValue{
2058+
"k8s.pod.name": newExpectedValue(regex, "telemetrygen-"+testID+"-.*-deployment-[a-z0-9]*-[a-z0-9]*"),
2059+
"k8s.pod.uid": newExpectedValue(shouldnotexist, ""),
2060+
"k8s.deployment.name": newExpectedValue(shouldnotexist, ""),
2061+
"k8s.node.name": newExpectedValue(shouldnotexist, ""),
2062+
"k8s.labels.app": newExpectedValue(shouldnotexist, ""),
2063+
}
2064+
2065+
tcs := []struct {
2066+
name string
2067+
dataType pipeline.Signal
2068+
service string
2069+
attrs map[string]*expectedValue
2070+
}{
2071+
{
2072+
// Traces use k8s_attributes/full – expect all attributes.
2073+
name: "traces-deployment-full",
2074+
dataType: pipeline.SignalTraces,
2075+
service: "test-traces-deployment",
2076+
attrs: fullAttrs,
2077+
},
2078+
{
2079+
// Metrics also use k8s_attributes/full – same shared processor,
2080+
// so they must produce the same set of attributes.
2081+
name: "metrics-deployment-full",
2082+
dataType: pipeline.SignalMetrics,
2083+
service: "test-metrics-deployment",
2084+
attrs: fullAttrs,
2085+
},
2086+
{
2087+
// Logs use k8s_attributes/minimal – different processor instance,
2088+
// so only k8s.pod.name should be present.
2089+
name: "logs-deployment-minimal",
2090+
dataType: pipeline.SignalLogs,
2091+
service: "test-logs-deployment",
2092+
attrs: minimalAttrs,
2093+
},
2094+
}
2095+
2096+
for _, tc := range tcs {
2097+
t.Run(tc.name, func(t *testing.T) {
2098+
switch tc.dataType {
2099+
case pipeline.SignalTraces:
2100+
scanTracesForAttributes(t, tracesConsumer, tc.service, tc.attrs)
2101+
case pipeline.SignalMetrics:
2102+
scanMetricsForAttributes(t, metricsConsumer, tc.service, tc.attrs)
2103+
case pipeline.SignalLogs:
2104+
scanLogsForAttributes(t, logsConsumer, tc.service, tc.attrs)
2105+
default:
2106+
t.Fatalf("unknown data type %s", tc.dataType)
2107+
}
2108+
})
2109+
}
2110+
}
2111+
19712112
func TestE2E_ContainerIDAssociation(t *testing.T) {
19722113
testDir := filepath.Join("testdata", "e2e", "container_id_association_only")
19732114

processor/k8sattributesprocessor/factory.go

Lines changed: 71 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"go.uber.org/zap"
1818

1919
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
20+
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent"
2021
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube"
2122
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/metadata"
2223
)
@@ -25,6 +26,7 @@ var (
2526
kubeClientProvider = kube.ClientProvider(nil)
2627
consumerCapabilities = consumer.Capabilities{MutatesData: true}
2728
defaultExcludes = ExcludeConfig{Pods: []ExcludePodConfig{{Name: "jaeger-agent"}, {Name: "jaeger-collector"}}}
29+
processors = sharedcomponent.NewSharedComponents()
2830
)
2931

3032
// NewFactory returns a new factory for the k8s processor.
@@ -57,7 +59,23 @@ func createTracesProcessor(
5759
cfg component.Config,
5860
next consumer.Traces,
5961
) (processor.Traces, error) {
60-
return createTracesProcessorWithOptions(ctx, params, cfg, next)
62+
if !metadata.ProcessorK8sattributesShareProcessorBetweenPipelinesFeatureGate.IsEnabled() {
63+
return createTracesProcessorWithOptions(ctx, params, cfg, next)
64+
}
65+
sc := processors.GetOrAdd(cfg, func() component.Component {
66+
return createKubernetesProcessor(params, cfg)
67+
})
68+
kp := sc.Unwrap().(*kubernetesprocessor)
69+
70+
return processorhelper.NewTraces(
71+
ctx,
72+
params,
73+
cfg,
74+
next,
75+
kp.processTraces,
76+
processorhelper.WithCapabilities(consumerCapabilities),
77+
processorhelper.WithStart(sc.Start),
78+
processorhelper.WithShutdown(sc.Shutdown))
6179
}
6280

6381
func createLogsProcessor(
@@ -66,7 +84,23 @@ func createLogsProcessor(
6684
cfg component.Config,
6785
nextLogsConsumer consumer.Logs,
6886
) (processor.Logs, error) {
69-
return createLogsProcessorWithOptions(ctx, params, cfg, nextLogsConsumer)
87+
if !metadata.ProcessorK8sattributesShareProcessorBetweenPipelinesFeatureGate.IsEnabled() {
88+
return createLogsProcessorWithOptions(ctx, params, cfg, nextLogsConsumer)
89+
}
90+
sc := processors.GetOrAdd(cfg, func() component.Component {
91+
return createKubernetesProcessor(params, cfg)
92+
})
93+
kp := sc.Unwrap().(*kubernetesprocessor)
94+
95+
return processorhelper.NewLogs(
96+
ctx,
97+
params,
98+
cfg,
99+
nextLogsConsumer,
100+
kp.processLogs,
101+
processorhelper.WithCapabilities(consumerCapabilities),
102+
processorhelper.WithStart(sc.Start),
103+
processorhelper.WithShutdown(sc.Shutdown))
70104
}
71105

72106
func createMetricsProcessor(
@@ -75,7 +109,23 @@ func createMetricsProcessor(
75109
cfg component.Config,
76110
nextMetricsConsumer consumer.Metrics,
77111
) (processor.Metrics, error) {
78-
return createMetricsProcessorWithOptions(ctx, params, cfg, nextMetricsConsumer)
112+
if !metadata.ProcessorK8sattributesShareProcessorBetweenPipelinesFeatureGate.IsEnabled() {
113+
return createMetricsProcessorWithOptions(ctx, params, cfg, nextMetricsConsumer)
114+
}
115+
sc := processors.GetOrAdd(cfg, func() component.Component {
116+
return createKubernetesProcessor(params, cfg)
117+
})
118+
kp := sc.Unwrap().(*kubernetesprocessor)
119+
120+
return processorhelper.NewMetrics(
121+
ctx,
122+
params,
123+
cfg,
124+
nextMetricsConsumer,
125+
kp.processMetrics,
126+
processorhelper.WithCapabilities(consumerCapabilities),
127+
processorhelper.WithStart(sc.Start),
128+
processorhelper.WithShutdown(sc.Shutdown))
79129
}
80130

81131
func createProfilesProcessor(
@@ -84,7 +134,24 @@ func createProfilesProcessor(
84134
cfg component.Config,
85135
nextProfilesConsumer xconsumer.Profiles,
86136
) (xprocessor.Profiles, error) {
87-
return createProfilesProcessorWithOptions(ctx, params, cfg, nextProfilesConsumer)
137+
if !metadata.ProcessorK8sattributesShareProcessorBetweenPipelinesFeatureGate.IsEnabled() {
138+
return createProfilesProcessorWithOptions(ctx, params, cfg, nextProfilesConsumer)
139+
}
140+
sc := processors.GetOrAdd(cfg, func() component.Component {
141+
return createKubernetesProcessor(params, cfg)
142+
})
143+
kp := sc.Unwrap().(*kubernetesprocessor)
144+
145+
return xprocessorhelper.NewProfiles(
146+
ctx,
147+
params,
148+
cfg,
149+
nextProfilesConsumer,
150+
kp.processProfiles,
151+
xprocessorhelper.WithCapabilities(consumerCapabilities),
152+
xprocessorhelper.WithStart(sc.Start),
153+
xprocessorhelper.WithShutdown(sc.Shutdown),
154+
)
88155
}
89156

90157
func createTracesProcessorWithOptions(

processor/k8sattributesprocessor/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.150.0
1010
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.150.0
1111
github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig v0.150.0
12+
github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent v0.150.0
1213
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xk8stest v0.150.0
1314
github.com/stretchr/testify v1.11.1
1415
go.opentelemetry.io/collector/client v1.56.1-0.20260415114935-307e3abdbae9
@@ -170,3 +171,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil
170171
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest
171172

172173
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
174+
175+
replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/sharedcomponent => ../../internal/sharedcomponent

processor/k8sattributesprocessor/internal/metadata/generated_feature_gates.go

Lines changed: 8 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

processor/k8sattributesprocessor/metadata.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,13 @@ feature_gates:
163163
When enabled, semconv stable attributes are enabled.
164164
from_version: v0.145.0
165165
reference_url: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/44589
166+
- id: processor.k8sattributes.ShareProcessorBetweenPipelines
167+
stage: alpha
168+
description: >-
169+
When enabled, processor instances with identical configuration are shared
170+
across different signal type pipelines, reducing duplicate Kubernetes API watchers.
171+
from_version: v0.150.0
172+
reference_url: https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/2450
166173
- id: processor.k8sattributes.telemetry.disableOldFormatMetrics
167174
stage: alpha
168175
description: >-

0 commit comments

Comments
 (0)