Skip to content

Commit 5bdd398

Browse files
committed
Create obsconsumer package and switch connector produced attribute to data point
1 parent 1eecde2 commit 5bdd398

File tree

5 files changed

+117
-254
lines changed

5 files changed

+117
-254
lines changed

service/internal/graph/connector.go

+98-141
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ import (
2020
"go.opentelemetry.io/collector/service/internal/builders"
2121
"go.opentelemetry.io/collector/service/internal/capabilityconsumer"
2222
"go.opentelemetry.io/collector/service/internal/metadata"
23+
"go.opentelemetry.io/collector/service/internal/obsconsumer"
2324
)
2425

25-
const pipelineIDAttrKey = "otelcol.pipeline.id.output"
26+
const pipelineIDAttrKey = "otelcol.pipeline.id"
2627

2728
var _ consumerNode = (*connectorNode)(nil)
2829

@@ -80,67 +81,56 @@ func (n *connectorNode) buildTraces(
8081
builder *builders.ConnectorBuilder,
8182
nexts []baseConsumer,
8283
) error {
83-
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
84-
for _, next := range nexts {
85-
pipelineAttrs := otelattr.String(pipelineIDAttrKey, next.(*capabilitiesNode).pipelineID.String())
86-
routeSet := otelattr.NewSet(append(n.Set().ToSlice(), pipelineAttrs)...)
87-
tb, err := metadata.NewTelemetryBuilder(telemetry.WithAttributeSet(set.TelemetrySettings, routeSet))
88-
if err != nil {
89-
return err
90-
}
91-
consumers[next.(*capabilitiesNode).pipelineID] = obsConsumerTraces{
92-
Traces: next.(consumer.Traces),
93-
itemCounter: tb.ConnectorProducedItems,
94-
}
95-
}
96-
next := connector.NewTracesRouter(consumers)
97-
9884
tb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
9985
if err != nil {
10086
return err
10187
}
10288

89+
consumers := make(map[pipeline.ID]consumer.Traces, len(nexts))
90+
for _, next := range nexts {
91+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewTraces(
92+
next.(consumer.Traces),
93+
tb.ConnectorProducedItems,
94+
obsconsumer.WithStaticDataPointAttribute(
95+
otelattr.String(
96+
pipelineIDAttrKey,
97+
next.(*capabilitiesNode).pipelineID.String(),
98+
),
99+
),
100+
)
101+
}
102+
next := connector.NewTracesRouter(consumers)
103+
103104
switch n.exprPipelineType {
104105
case pipeline.SignalTraces:
105106
n.Component, err = builder.CreateTracesToTraces(ctx, set, next)
106107
if err != nil {
107108
return err
108109
}
109-
n.consumer = obsConsumerTraces{
110-
// Connectors which might pass along data must inherit capabilities of all nexts
111-
Traces: capabilityconsumer.NewTraces(
112-
n.Component.(consumer.Traces),
113-
aggregateCap(n.Component.(consumer.Traces), nexts),
114-
),
115-
itemCounter: tb.ConnectorConsumedItems,
116-
}
110+
// Connectors which might pass along data must inherit capabilities of all nexts
111+
capConsumer := capabilityconsumer.NewTraces(
112+
n.Component.(consumer.Traces),
113+
aggregateCap(n.Component.(consumer.Traces), nexts),
114+
)
115+
n.consumer = obsconsumer.NewTraces(capConsumer, tb.ConnectorConsumedItems)
117116
case pipeline.SignalMetrics:
118117
n.Component, err = builder.CreateMetricsToTraces(ctx, set, next)
119118
if err != nil {
120119
return err
121120
}
122-
n.consumer = obsConsumerMetrics{
123-
Metrics: n.Component.(consumer.Metrics),
124-
itemCounter: tb.ConnectorConsumedItems,
125-
}
121+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
126122
case pipeline.SignalLogs:
127123
n.Component, err = builder.CreateLogsToTraces(ctx, set, next)
128124
if err != nil {
129125
return err
130126
}
131-
n.consumer = obsConsumerLogs{
132-
Logs: n.Component.(consumer.Logs),
133-
itemCounter: tb.ConnectorConsumedItems,
134-
}
127+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
135128
case xpipeline.SignalProfiles:
136129
n.Component, err = builder.CreateProfilesToTraces(ctx, set, next)
137130
if err != nil {
138131
return err
139132
}
140-
n.consumer = obsConsumerProfiles{
141-
Profiles: n.Component.(xconsumer.Profiles),
142-
itemCounter: tb.ConnectorConsumedItems,
143-
}
133+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems)
144134
}
145135
return nil
146136
}
@@ -151,67 +141,56 @@ func (n *connectorNode) buildMetrics(
151141
builder *builders.ConnectorBuilder,
152142
nexts []baseConsumer,
153143
) error {
154-
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
155-
for _, next := range nexts {
156-
pipelineAttrs := otelattr.String(pipelineIDAttrKey, next.(*capabilitiesNode).pipelineID.String())
157-
routeSet := otelattr.NewSet(append(n.Set().ToSlice(), pipelineAttrs)...)
158-
tb, err := metadata.NewTelemetryBuilder(telemetry.WithAttributeSet(set.TelemetrySettings, routeSet))
159-
if err != nil {
160-
return err
161-
}
162-
consumers[next.(*capabilitiesNode).pipelineID] = obsConsumerMetrics{
163-
Metrics: next.(consumer.Metrics),
164-
itemCounter: tb.ConnectorProducedItems,
165-
}
166-
}
167-
next := connector.NewMetricsRouter(consumers)
168-
169144
tb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
170145
if err != nil {
171146
return err
172147
}
173148

149+
consumers := make(map[pipeline.ID]consumer.Metrics, len(nexts))
150+
for _, next := range nexts {
151+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewMetrics(
152+
next.(consumer.Metrics),
153+
tb.ConnectorProducedItems,
154+
obsconsumer.WithStaticDataPointAttribute(
155+
otelattr.String(
156+
pipelineIDAttrKey,
157+
next.(*capabilitiesNode).pipelineID.String(),
158+
),
159+
),
160+
)
161+
}
162+
next := connector.NewMetricsRouter(consumers)
163+
174164
switch n.exprPipelineType {
175165
case pipeline.SignalMetrics:
176166
n.Component, err = builder.CreateMetricsToMetrics(ctx, set, next)
177167
if err != nil {
178168
return err
179169
}
180-
n.consumer = obsConsumerMetrics{
181-
// Connectors which might pass along data must inherit capabilities of all nexts
182-
Metrics: capabilityconsumer.NewMetrics(
183-
n.Component.(consumer.Metrics),
184-
aggregateCap(n.Component.(consumer.Metrics), nexts),
185-
),
186-
itemCounter: tb.ConnectorConsumedItems,
187-
}
170+
// Connectors which might pass along data must inherit capabilities of all nexts
171+
capConsumer := capabilityconsumer.NewMetrics(
172+
n.Component.(consumer.Metrics),
173+
aggregateCap(n.Component.(consumer.Metrics), nexts),
174+
)
175+
n.consumer = obsconsumer.NewMetrics(capConsumer, tb.ConnectorConsumedItems)
188176
case pipeline.SignalTraces:
189177
n.Component, err = builder.CreateTracesToMetrics(ctx, set, next)
190178
if err != nil {
191179
return err
192180
}
193-
n.consumer = obsConsumerTraces{
194-
Traces: n.Component.(consumer.Traces),
195-
itemCounter: tb.ConnectorConsumedItems,
196-
}
181+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
197182
case pipeline.SignalLogs:
198183
n.Component, err = builder.CreateLogsToMetrics(ctx, set, next)
199184
if err != nil {
200185
return err
201186
}
202-
n.consumer = obsConsumerLogs{
203-
Logs: n.Component.(consumer.Logs),
204-
itemCounter: tb.ConnectorConsumedItems,
205-
}
187+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
206188
case xpipeline.SignalProfiles:
207189
n.Component, err = builder.CreateProfilesToMetrics(ctx, set, next)
208190
if err != nil {
209191
return err
210192
}
211-
n.consumer = obsConsumerProfiles{
212-
Profiles: n.Component.(xconsumer.Profiles),
213-
itemCounter: tb.ConnectorConsumedItems,
214-
}
193+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems)
215194
}
216195
return nil
217196
}
@@ -222,67 +201,56 @@ func (n *connectorNode) buildLogs(
222201
builder *builders.ConnectorBuilder,
223202
nexts []baseConsumer,
224203
) error {
225-
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
226-
for _, next := range nexts {
227-
pipelineAttrs := otelattr.String(pipelineIDAttrKey, next.(*capabilitiesNode).pipelineID.String())
228-
routeSet := otelattr.NewSet(append(n.Set().ToSlice(), pipelineAttrs)...)
229-
tb, err := metadata.NewTelemetryBuilder(telemetry.WithAttributeSet(set.TelemetrySettings, routeSet))
230-
if err != nil {
231-
return err
232-
}
233-
consumers[next.(*capabilitiesNode).pipelineID] = obsConsumerLogs{
234-
Logs: next.(consumer.Logs),
235-
itemCounter: tb.ConnectorProducedItems,
236-
}
237-
}
238-
next := connector.NewLogsRouter(consumers)
239-
240204
tb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
241205
if err != nil {
242206
return err
243207
}
244208

209+
consumers := make(map[pipeline.ID]consumer.Logs, len(nexts))
210+
for _, next := range nexts {
211+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewLogs(
212+
next.(consumer.Logs),
213+
tb.ConnectorProducedItems,
214+
obsconsumer.WithStaticDataPointAttribute(
215+
otelattr.String(
216+
pipelineIDAttrKey,
217+
next.(*capabilitiesNode).pipelineID.String(),
218+
),
219+
),
220+
)
221+
}
222+
next := connector.NewLogsRouter(consumers)
223+
245224
switch n.exprPipelineType {
246225
case pipeline.SignalLogs:
247226
n.Component, err = builder.CreateLogsToLogs(ctx, set, next)
248227
if err != nil {
249228
return err
250229
}
251-
n.consumer = obsConsumerLogs{
252-
// Connectors which might pass along data must inherit capabilities of all nexts
253-
Logs: capabilityconsumer.NewLogs(
254-
n.Component.(consumer.Logs),
255-
aggregateCap(n.Component.(consumer.Logs), nexts),
256-
),
257-
itemCounter: tb.ConnectorConsumedItems,
258-
}
230+
// Connectors which might pass along data must inherit capabilities of all nexts
231+
capConsumer := capabilityconsumer.NewLogs(
232+
n.Component.(consumer.Logs),
233+
aggregateCap(n.Component.(consumer.Logs), nexts),
234+
)
235+
n.consumer = obsconsumer.NewLogs(capConsumer, tb.ConnectorConsumedItems)
259236
case pipeline.SignalTraces:
260237
n.Component, err = builder.CreateTracesToLogs(ctx, set, next)
261238
if err != nil {
262239
return err
263240
}
264-
n.consumer = obsConsumerTraces{
265-
Traces: n.Component.(consumer.Traces),
266-
itemCounter: tb.ConnectorConsumedItems,
267-
}
241+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
268242
case pipeline.SignalMetrics:
269243
n.Component, err = builder.CreateMetricsToLogs(ctx, set, next)
270244
if err != nil {
271245
return err
272246
}
273-
n.consumer = obsConsumerMetrics{
274-
Metrics: n.Component.(consumer.Metrics),
275-
itemCounter: tb.ConnectorConsumedItems,
276-
}
247+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
277248
case xpipeline.SignalProfiles:
278249
n.Component, err = builder.CreateProfilesToLogs(ctx, set, next)
279250
if err != nil {
280251
return err
281252
}
282-
n.consumer = obsConsumerProfiles{
283-
Profiles: n.Component.(xconsumer.Profiles),
284-
itemCounter: tb.ConnectorConsumedItems,
285-
}
253+
n.consumer = obsconsumer.NewProfiles(n.Component.(xconsumer.Profiles), tb.ConnectorConsumedItems)
286254
}
287255
return nil
288256
}
@@ -293,67 +261,56 @@ func (n *connectorNode) buildProfiles(
293261
builder *builders.ConnectorBuilder,
294262
nexts []baseConsumer,
295263
) error {
296-
consumers := make(map[pipeline.ID]xconsumer.Profiles, len(nexts))
297-
for _, next := range nexts {
298-
pipelineAttrs := otelattr.String(pipelineIDAttrKey, next.(*capabilitiesNode).pipelineID.String())
299-
routeSet := otelattr.NewSet(append(n.Set().ToSlice(), pipelineAttrs)...)
300-
tb, err := metadata.NewTelemetryBuilder(telemetry.WithAttributeSet(set.TelemetrySettings, routeSet))
301-
if err != nil {
302-
return err
303-
}
304-
consumers[next.(*capabilitiesNode).pipelineID] = obsConsumerProfiles{
305-
Profiles: next.(xconsumer.Profiles),
306-
itemCounter: tb.ConnectorProducedItems,
307-
}
308-
}
309-
next := xconnector.NewProfilesRouter(consumers)
310-
311264
tb, err := metadata.NewTelemetryBuilder(set.TelemetrySettings)
312265
if err != nil {
313266
return err
314267
}
315268

269+
consumers := make(map[pipeline.ID]xconsumer.Profiles, len(nexts))
270+
for _, next := range nexts {
271+
consumers[next.(*capabilitiesNode).pipelineID] = obsconsumer.NewProfiles(
272+
next.(xconsumer.Profiles),
273+
tb.ConnectorProducedItems,
274+
obsconsumer.WithStaticDataPointAttribute(
275+
otelattr.String(
276+
pipelineIDAttrKey,
277+
next.(*capabilitiesNode).pipelineID.String(),
278+
),
279+
),
280+
)
281+
}
282+
next := xconnector.NewProfilesRouter(consumers)
283+
316284
switch n.exprPipelineType {
317285
case xpipeline.SignalProfiles:
318286
n.Component, err = builder.CreateProfilesToProfiles(ctx, set, next)
319287
if err != nil {
320288
return err
321289
}
322-
n.consumer = obsConsumerProfiles{
323-
// Connectors which might pass along data must inherit capabilities of all nexts
324-
Profiles: capabilityconsumer.NewProfiles(
325-
n.Component.(xconsumer.Profiles),
326-
aggregateCap(n.Component.(xconsumer.Profiles), nexts),
327-
),
328-
itemCounter: tb.ConnectorConsumedItems,
329-
}
290+
// Connectors which might pass along data must inherit capabilities of all nexts
291+
capConsumer := capabilityconsumer.NewProfiles(
292+
n.Component.(xconsumer.Profiles),
293+
aggregateCap(n.Component.(xconsumer.Profiles), nexts),
294+
)
295+
n.consumer = obsconsumer.NewProfiles(capConsumer, tb.ConnectorConsumedItems)
330296
case pipeline.SignalTraces:
331297
n.Component, err = builder.CreateTracesToProfiles(ctx, set, next)
332298
if err != nil {
333299
return err
334300
}
335-
n.consumer = obsConsumerTraces{
336-
Traces: n.Component.(consumer.Traces),
337-
itemCounter: tb.ConnectorConsumedItems,
338-
}
301+
n.consumer = obsconsumer.NewTraces(n.Component.(consumer.Traces), tb.ConnectorConsumedItems)
339302
case pipeline.SignalMetrics:
340303
n.Component, err = builder.CreateMetricsToProfiles(ctx, set, next)
341304
if err != nil {
342305
return err
343306
}
344-
n.consumer = obsConsumerMetrics{
345-
Metrics: n.Component.(consumer.Metrics),
346-
itemCounter: tb.ConnectorConsumedItems,
347-
}
307+
n.consumer = obsconsumer.NewMetrics(n.Component.(consumer.Metrics), tb.ConnectorConsumedItems)
348308
case pipeline.SignalLogs:
349309
n.Component, err = builder.CreateLogsToProfiles(ctx, set, next)
350310
if err != nil {
351311
return err
352312
}
353-
n.consumer = obsConsumerLogs{
354-
Logs: n.Component.(consumer.Logs),
355-
itemCounter: tb.ConnectorConsumedItems,
356-
}
313+
n.consumer = obsconsumer.NewLogs(n.Component.(consumer.Logs), tb.ConnectorConsumedItems)
357314
}
358315
return nil
359316
}

0 commit comments

Comments
 (0)