Skip to content

Commit 3398805

Browse files
committed
Enabled telemetry based on featuregate
1 parent 488b658 commit 3398805

20 files changed

+261
-243
lines changed

service/internal/graph/connector.go

+24-16
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,13 @@ func (n *connectorNode) buildTraces(
108108
return err
109109
}
110110
// Connectors which might pass along data must inherit capabilities of all nexts
111-
capConsumer := capabilityconsumer.NewTraces(
112-
n.Component.(consumer.Traces),
113-
aggregateCap(n.Component.(consumer.Traces), nexts),
111+
n.consumer = obsconsumer.NewTraces(
112+
capabilityconsumer.NewTraces(
113+
n.Component.(consumer.Traces),
114+
aggregateCap(n.Component.(consumer.Traces), nexts),
115+
),
116+
tb.ConnectorConsumedItems,
114117
)
115-
n.consumer = obsconsumer.NewTraces(capConsumer, tb.ConnectorConsumedItems)
116118
case pipeline.SignalMetrics:
117119
n.Component, err = builder.CreateMetricsToTraces(ctx, set, next)
118120
if err != nil {
@@ -168,11 +170,13 @@ func (n *connectorNode) buildMetrics(
168170
return err
169171
}
170172
// Connectors which might pass along data must inherit capabilities of all nexts
171-
capConsumer := capabilityconsumer.NewMetrics(
172-
n.Component.(consumer.Metrics),
173-
aggregateCap(n.Component.(consumer.Metrics), nexts),
173+
n.consumer = obsconsumer.NewMetrics(
174+
capabilityconsumer.NewMetrics(
175+
n.Component.(consumer.Metrics),
176+
aggregateCap(n.Component.(consumer.Metrics), nexts),
177+
),
178+
tb.ConnectorConsumedItems,
174179
)
175-
n.consumer = obsconsumer.NewMetrics(capConsumer, tb.ConnectorConsumedItems)
176180
case pipeline.SignalTraces:
177181
n.Component, err = builder.CreateTracesToMetrics(ctx, set, next)
178182
if err != nil {
@@ -228,11 +232,13 @@ func (n *connectorNode) buildLogs(
228232
return err
229233
}
230234
// Connectors which might pass along data must inherit capabilities of all nexts
231-
capConsumer := capabilityconsumer.NewLogs(
232-
n.Component.(consumer.Logs),
233-
aggregateCap(n.Component.(consumer.Logs), nexts),
235+
n.consumer = obsconsumer.NewLogs(
236+
capabilityconsumer.NewLogs(
237+
n.Component.(consumer.Logs),
238+
aggregateCap(n.Component.(consumer.Logs), nexts),
239+
),
240+
tb.ConnectorConsumedItems,
234241
)
235-
n.consumer = obsconsumer.NewLogs(capConsumer, tb.ConnectorConsumedItems)
236242
case pipeline.SignalTraces:
237243
n.Component, err = builder.CreateTracesToLogs(ctx, set, next)
238244
if err != nil {
@@ -288,11 +294,13 @@ func (n *connectorNode) buildProfiles(
288294
return err
289295
}
290296
// Connectors which might pass along data must inherit capabilities of all nexts
291-
capConsumer := capabilityconsumer.NewProfiles(
292-
n.Component.(xconsumer.Profiles),
293-
aggregateCap(n.Component.(xconsumer.Profiles), nexts),
297+
n.consumer = obsconsumer.NewProfiles(
298+
capabilityconsumer.NewProfiles(
299+
n.Component.(xconsumer.Profiles),
300+
aggregateCap(n.Component.(xconsumer.Profiles), nexts),
301+
),
302+
tb.ConnectorConsumedItems,
294303
)
295-
n.consumer = obsconsumer.NewProfiles(capConsumer, tb.ConnectorConsumedItems)
296304
case pipeline.SignalTraces:
297305
n.Component, err = builder.CreateTracesToProfiles(ctx, set, next)
298306
if err != nil {

service/internal/graph/graph_test.go

+44
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,17 @@ import (
3636
)
3737

3838
func TestConnectorPipelinesGraph(t *testing.T) {
39+
t.Run("with_internal_telemetry", func(t *testing.T) {
40+
setObsConsumerGateForTest(t, true)
41+
testConnectorPipelinesGraph(t)
42+
})
43+
t.Run("without_internal_telemetry", func(t *testing.T) {
44+
setObsConsumerGateForTest(t, false)
45+
testConnectorPipelinesGraph(t)
46+
})
47+
}
48+
49+
func testConnectorPipelinesGraph(t *testing.T) {
3950
tests := []struct {
4051
name string
4152
pipelineConfigs pipelines.Config
@@ -1004,6 +1015,17 @@ func TestConnectorPipelinesGraph(t *testing.T) {
10041015
}
10051016

10061017
func TestInstances(t *testing.T) {
1018+
t.Run("with_internal_telemetry", func(t *testing.T) {
1019+
setObsConsumerGateForTest(t, true)
1020+
testInstances(t)
1021+
})
1022+
t.Run("without_internal_telemetry", func(t *testing.T) {
1023+
setObsConsumerGateForTest(t, false)
1024+
testInstances(t)
1025+
})
1026+
}
1027+
1028+
func testInstances(t *testing.T) {
10071029
tests := []struct {
10081030
name string
10091031
pipelineConfigs pipelines.Config
@@ -1168,6 +1190,17 @@ func TestInstances(t *testing.T) {
11681190
}
11691191

11701192
func TestConnectorRouter(t *testing.T) {
1193+
t.Run("with_internal_telemetry", func(t *testing.T) {
1194+
setObsConsumerGateForTest(t, true)
1195+
testConnectorRouter(t)
1196+
})
1197+
t.Run("without_internal_telemetry", func(t *testing.T) {
1198+
setObsConsumerGateForTest(t, false)
1199+
testConnectorRouter(t)
1200+
})
1201+
}
1202+
1203+
func testConnectorRouter(t *testing.T) {
11711204
rcvrID := component.MustNewID("examplereceiver")
11721205
routeTracesID := component.MustNewIDWithName("examplerouter", "traces")
11731206
routeMetricsID := component.MustNewIDWithName("examplerouter", "metrics")
@@ -1394,6 +1427,17 @@ func TestConnectorRouter(t *testing.T) {
13941427
}
13951428

13961429
func TestGraphBuildErrors(t *testing.T) {
1430+
t.Run("with_internal_telemetry", func(t *testing.T) {
1431+
setObsConsumerGateForTest(t, true)
1432+
testGraphBuildErrors(t)
1433+
})
1434+
t.Run("without_internal_telemetry", func(t *testing.T) {
1435+
setObsConsumerGateForTest(t, false)
1436+
testGraphBuildErrors(t)
1437+
})
1438+
}
1439+
1440+
func testGraphBuildErrors(t *testing.T) {
13971441
nopReceiverFactory := receivertest.NewNopFactory()
13981442
nopProcessorFactory := processortest.NewNopFactory()
13991443
nopExporterFactory := exportertest.NewNopFactory()

service/internal/graph/lifecycle_test.go

+55
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,17 @@ import (
3131
)
3232

3333
func TestGraphStartStop(t *testing.T) {
34+
t.Run("with_internal_telemetry", func(t *testing.T) {
35+
setObsConsumerGateForTest(t, true)
36+
testGraphStartStop(t)
37+
})
38+
t.Run("without_internal_telemetry", func(t *testing.T) {
39+
setObsConsumerGateForTest(t, false)
40+
testGraphStartStop(t)
41+
})
42+
}
43+
44+
func testGraphStartStop(t *testing.T) {
3445
testCases := []struct {
3546
name string
3647
edges [][2]component.ID
@@ -117,6 +128,17 @@ func TestGraphStartStop(t *testing.T) {
117128
}
118129

119130
func TestGraphStartStopCycle(t *testing.T) {
131+
t.Run("with_internal_telemetry", func(t *testing.T) {
132+
setObsConsumerGateForTest(t, true)
133+
testGraphStartStopCycle(t)
134+
})
135+
t.Run("without_internal_telemetry", func(t *testing.T) {
136+
setObsConsumerGateForTest(t, false)
137+
testGraphStartStopCycle(t)
138+
})
139+
}
140+
141+
func testGraphStartStopCycle(t *testing.T) {
120142
pg := &Graph{componentGraph: simple.NewDirectedGraph()}
121143

122144
r1 := &testNode{id: component.MustNewIDWithName("r", "1")}
@@ -144,6 +166,17 @@ func TestGraphStartStopCycle(t *testing.T) {
144166
}
145167

146168
func TestGraphStartStopComponentError(t *testing.T) {
169+
t.Run("with_internal_telemetry", func(t *testing.T) {
170+
setObsConsumerGateForTest(t, true)
171+
testGraphStartStopComponentError(t)
172+
})
173+
t.Run("without_internal_telemetry", func(t *testing.T) {
174+
setObsConsumerGateForTest(t, false)
175+
testGraphStartStopComponentError(t)
176+
})
177+
}
178+
179+
func testGraphStartStopComponentError(t *testing.T) {
147180
pg := &Graph{componentGraph: simple.NewDirectedGraph()}
148181
pg.telemetry = componenttest.NewNopTelemetrySettings()
149182
r1 := &testNode{
@@ -169,6 +202,17 @@ func TestGraphStartStopComponentError(t *testing.T) {
169202
// This includes all tests from the previous implementation, plus a new one
170203
// relevant only to the new graph-based implementation.
171204
func TestGraphFailToStartAndShutdown(t *testing.T) {
205+
t.Run("with_internal_telemetry", func(t *testing.T) {
206+
setObsConsumerGateForTest(t, true)
207+
testGraphFailToStartAndShutdown(t)
208+
})
209+
t.Run("without_internal_telemetry", func(t *testing.T) {
210+
setObsConsumerGateForTest(t, false)
211+
testGraphFailToStartAndShutdown(t)
212+
})
213+
}
214+
215+
func testGraphFailToStartAndShutdown(t *testing.T) {
172216
errReceiverFactory := newErrReceiverFactory()
173217
errProcessorFactory := newErrProcessorFactory()
174218
errExporterFactory := newErrExporterFactory()
@@ -287,6 +331,17 @@ func TestGraphFailToStartAndShutdown(t *testing.T) {
287331
}
288332

289333
func TestStatusReportedOnStartupShutdown(t *testing.T) {
334+
t.Run("with_internal_telemetry", func(t *testing.T) {
335+
setObsConsumerGateForTest(t, true)
336+
testStatusReportedOnStartupShutdown(t)
337+
})
338+
t.Run("without_internal_telemetry", func(t *testing.T) {
339+
setObsConsumerGateForTest(t, false)
340+
testStatusReportedOnStartupShutdown(t)
341+
})
342+
}
343+
344+
func testStatusReportedOnStartupShutdown(t *testing.T) {
290345
rNoErr := &testNode{id: component.MustNewIDWithName("r_no_err", "1")}
291346
rStErr := &testNode{id: component.MustNewIDWithName("r_st_err", "1"), startErr: assert.AnError}
292347
rSdErr := &testNode{id: component.MustNewIDWithName("r_sd_err", "1"), shutdownErr: assert.AnError}

service/internal/graph/obs_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
)
2828

2929
func TestComponentInstrumentation(t *testing.T) {
30+
setObsConsumerGateForTest(t, true)
3031
// All IDs have a name to ensure the "otelcol.component.id" attribute is not just the type
3132
rcvrID := component.MustNewIDWithName("examplereceiver", "foo")
3233
procID := component.MustNewIDWithName("exampleprocessor", "bar")

service/internal/graph/processor.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -63,29 +63,29 @@ func (n *processorNode) buildComponent(ctx context.Context,
6363

6464
switch n.pipelineID.Signal() {
6565
case pipeline.SignalTraces:
66-
obsConsumer := obsconsumer.NewTraces(next.(consumer.Traces), tb.ProcessorProducedItems)
67-
n.Component, err = builder.CreateTraces(ctx, set, obsConsumer)
66+
n.Component, err = builder.CreateTraces(ctx, set,
67+
obsconsumer.NewTraces(next.(consumer.Traces), tb.ProcessorProducedItems))
6868
if err != nil {
6969
return fmt.Errorf("failed to create %q processor, in pipeline %q: %w", set.ID, n.pipelineID.String(), err)
7070
}
7171
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ProcessorConsumedItems)
7272
case pipeline.SignalMetrics:
73-
obsConsumer := obsconsumer.NewMetrics(next.(consumer.Metrics), tb.ProcessorProducedItems)
74-
n.Component, err = builder.CreateMetrics(ctx, set, obsConsumer)
73+
n.Component, err = builder.CreateMetrics(ctx, set,
74+
obsconsumer.NewMetrics(next.(consumer.Metrics), tb.ProcessorProducedItems))
7575
if err != nil {
7676
return fmt.Errorf("failed to create %q processor, in pipeline %q: %w", set.ID, n.pipelineID.String(), err)
7777
}
7878
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ProcessorConsumedItems)
7979
case pipeline.SignalLogs:
80-
obsConsumer := obsconsumer.NewLogs(next.(consumer.Logs), tb.ProcessorProducedItems)
81-
n.Component, err = builder.CreateLogs(ctx, set, obsConsumer)
80+
n.Component, err = builder.CreateLogs(ctx, set,
81+
obsconsumer.NewLogs(next.(consumer.Logs), tb.ProcessorProducedItems))
8282
if err != nil {
8383
return fmt.Errorf("failed to create %q processor, in pipeline %q: %w", set.ID, n.pipelineID.String(), err)
8484
}
8585
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ProcessorConsumedItems)
8686
case xpipeline.SignalProfiles:
87-
obsConsumer := obsconsumer.NewProfiles(next.(xconsumer.Profiles), tb.ProcessorProducedItems)
88-
n.Component, err = builder.CreateProfiles(ctx, set, obsConsumer)
87+
n.Component, err = builder.CreateProfiles(ctx, set,
88+
obsconsumer.NewProfiles(next.(xconsumer.Profiles), tb.ProcessorProducedItems))
8989
if err != nil {
9090
return fmt.Errorf("failed to create %q processor, in pipeline %q: %w", set.ID, n.pipelineID.String(), err)
9191
}

service/internal/graph/receiver.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -61,29 +61,29 @@ func (n *receiverNode) buildComponent(ctx context.Context,
6161
for _, next := range nexts {
6262
consumers = append(consumers, next.(consumer.Traces))
6363
}
64-
obsConsumer := obsconsumer.NewTraces(fanoutconsumer.NewTraces(consumers), tb.ReceiverProducedItems)
65-
n.Component, err = builder.CreateTraces(ctx, set, obsConsumer)
64+
n.Component, err = builder.CreateTraces(ctx, set,
65+
obsconsumer.NewTraces(fanoutconsumer.NewTraces(consumers), tb.ReceiverProducedItems))
6666
case pipeline.SignalMetrics:
6767
var consumers []consumer.Metrics
6868
for _, next := range nexts {
6969
consumers = append(consumers, next.(consumer.Metrics))
7070
}
71-
obsConsumer := obsconsumer.NewMetrics(fanoutconsumer.NewMetrics(consumers), tb.ReceiverProducedItems)
72-
n.Component, err = builder.CreateMetrics(ctx, set, obsConsumer)
71+
n.Component, err = builder.CreateMetrics(ctx, set,
72+
obsconsumer.NewMetrics(fanoutconsumer.NewMetrics(consumers), tb.ReceiverProducedItems))
7373
case pipeline.SignalLogs:
7474
var consumers []consumer.Logs
7575
for _, next := range nexts {
7676
consumers = append(consumers, next.(consumer.Logs))
7777
}
78-
obsConsumer := obsconsumer.NewLogs(fanoutconsumer.NewLogs(consumers), tb.ReceiverProducedItems)
79-
n.Component, err = builder.CreateLogs(ctx, set, obsConsumer)
78+
n.Component, err = builder.CreateLogs(ctx, set,
79+
obsconsumer.NewLogs(fanoutconsumer.NewLogs(consumers), tb.ReceiverProducedItems))
8080
case xpipeline.SignalProfiles:
8181
var consumers []xconsumer.Profiles
8282
for _, next := range nexts {
8383
consumers = append(consumers, next.(xconsumer.Profiles))
8484
}
85-
obsConsumer := obsconsumer.NewProfiles(fanoutconsumer.NewProfiles(consumers), tb.ReceiverProducedItems)
86-
n.Component, err = builder.CreateProfiles(ctx, set, obsConsumer)
85+
n.Component, err = builder.CreateProfiles(ctx, set,
86+
obsconsumer.NewProfiles(fanoutconsumer.NewProfiles(consumers), tb.ReceiverProducedItems))
8787
default:
8888
return fmt.Errorf("error creating receiver %q for data type %q is not supported", set.ID, n.pipelineType)
8989
}

service/internal/graph/util_test.go

+13
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import (
88
"errors"
99
"hash/fnv"
1010
"sync"
11+
"testing"
12+
13+
"github.com/stretchr/testify/require"
1114

1215
"go.opentelemetry.io/collector/component"
1316
"go.opentelemetry.io/collector/connector"
@@ -17,6 +20,8 @@ import (
1720
"go.opentelemetry.io/collector/consumer/xconsumer"
1821
"go.opentelemetry.io/collector/exporter"
1922
"go.opentelemetry.io/collector/exporter/xexporter"
23+
"go.opentelemetry.io/collector/featuregate"
24+
"go.opentelemetry.io/collector/internal/telemetry"
2025
"go.opentelemetry.io/collector/pipeline"
2126
"go.opentelemetry.io/collector/pipeline/xpipeline"
2227
"go.opentelemetry.io/collector/processor"
@@ -304,3 +309,11 @@ func (e errComponent) Start(context.Context, component.Host) error {
304309
func (e errComponent) Shutdown(context.Context) error {
305310
return errors.New("my error")
306311
}
312+
313+
func setObsConsumerGateForTest(t *testing.T, enabled bool) {
314+
initial := telemetry.NewPipelineTelemetryGate.IsEnabled()
315+
require.NoError(t, featuregate.GlobalRegistry().Set(telemetry.NewPipelineTelemetryGate.ID(), enabled))
316+
t.Cleanup(func() {
317+
require.NoError(t, featuregate.GlobalRegistry().Set(telemetry.NewPipelineTelemetryGate.ID(), initial))
318+
})
319+
}

0 commit comments

Comments
 (0)