Skip to content

Commit 5aa2746

Browse files
authored
Ensure loadbalancing child exporters use the OTLP type so backend creation succeeds (open-telemetry#43960)
#### Description <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#43950 <!--Describe what testing was performed and which tests were added.--> #### Testing Tested with config: ```yaml exporters: loadbalancing: retry_on_failure: enabled: true initial_interval: 5s max_interval: 30s max_elapsed_time: 60s sending_queue: enabled: true num_consumers: 10 queue_size: 10000 block_on_overflow: false sizer: items protocol: otlp: timeout: 1s tls: insecure: true resolver: dns: hostname: otel-receivers.test port: '4317' interval: 5s timeout: 1s ``` Signed-off-by: Israel Blancas <[email protected]>
1 parent e4bb1af commit 5aa2746

File tree

7 files changed

+210
-21
lines changed

7 files changed

+210
-21
lines changed

.chloggen/43950.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: exporter/loadbalancing
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Ensure loadbalancing child exporters use the OTLP type so backend creation succeeds
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [43950]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/loadbalancingexporter/factory.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"go.opentelemetry.io/collector/exporter"
1414
"go.opentelemetry.io/collector/exporter/exporterhelper"
1515
"go.opentelemetry.io/collector/exporter/otlpexporter"
16+
metricnoop "go.opentelemetry.io/otel/metric/noop"
17+
tracenoop "go.opentelemetry.io/otel/trace/noop"
1618
"go.uber.org/zap"
1719

1820
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata"
@@ -54,8 +56,18 @@ func buildExporterConfig(cfg *Config, endpoint string) otlpexporter.Config {
5456
return oCfg
5557
}
5658

57-
func buildExporterSettings(params exporter.Settings, endpoint string) exporter.Settings {
59+
func buildExporterSettings(typ component.Type, params exporter.Settings, endpoint string) exporter.Settings {
60+
if name := params.ID.Name(); name != "" {
61+
params.ID = component.NewIDWithName(typ, name)
62+
} else {
63+
params.ID = component.NewID(typ)
64+
}
65+
telemetry := params.TelemetrySettings
66+
telemetry.MeterProvider = metricnoop.NewMeterProvider()
67+
telemetry.TracerProvider = tracenoop.NewTracerProvider()
5868
params.Logger = params.Logger.With(zap.String(zapEndpointKey, endpoint))
69+
telemetry.Logger = params.Logger
70+
params.TelemetrySettings = telemetry
5971
return params
6072
}
6173

exporter/loadbalancingexporter/factory_test.go

Lines changed: 72 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,18 +4,22 @@
44
package loadbalancingexporter
55

66
import (
7+
"context"
78
"path/filepath"
89
"testing"
910

1011
"github.com/stretchr/testify/assert"
1112
"github.com/stretchr/testify/require"
1213
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/component/componenttest"
1315
"go.opentelemetry.io/collector/config/configoptional"
1416
"go.opentelemetry.io/collector/config/configretry"
1517
"go.opentelemetry.io/collector/exporter/exporterhelper"
1618
"go.opentelemetry.io/collector/exporter/exportertest"
1719
"go.opentelemetry.io/collector/exporter/otlpexporter"
1820
"go.opentelemetry.io/collector/otelcol/otelcoltest"
21+
metricnoop "go.opentelemetry.io/otel/metric/noop"
22+
tracenoop "go.opentelemetry.io/otel/trace/noop"
1923
"go.uber.org/zap"
2024
"go.uber.org/zap/zaptest/observer"
2125

@@ -98,24 +102,75 @@ func TestBuildExporterConfig(t *testing.T) {
98102
}
99103

100104
func TestBuildExporterSettings(t *testing.T) {
101-
// prepare
102-
creationParams := exportertest.NewNopSettings(metadata.Type)
103-
testEndpoint := "the-endpoint"
104-
observedZapCore, observedLogs := observer.New(zap.InfoLevel)
105-
creationParams.Logger = zap.New(observedZapCore)
106-
107-
// test
108-
exporterParams := buildExporterSettings(creationParams, testEndpoint)
109-
exporterParams.Logger.Info("test")
110-
111-
assert.Equal(t, creationParams.ID, exporterParams.ID)
105+
otlpType := otlpexporter.NewFactory().Type()
106+
107+
t.Run("without exporter name", func(t *testing.T) {
108+
ctx := context.Background() //nolint:usetesting // Context must outlive test for cleanup
109+
creationParams := exportertest.NewNopSettings(metadata.Type)
110+
creationParams.ID = component.NewID(metadata.Type)
111+
telemetry := componenttest.NewTelemetry()
112+
t.Cleanup(func() {
113+
require.NoError(t, telemetry.Shutdown(ctx))
114+
})
115+
creationParams.TelemetrySettings = telemetry.NewTelemetrySettings()
116+
originalTelemetry := creationParams.TelemetrySettings
117+
testEndpoint := "the-endpoint"
118+
observedZapCore, observedLogs := observer.New(zap.InfoLevel)
119+
creationParams.Logger = zap.New(observedZapCore)
120+
originalLogger := creationParams.Logger
121+
122+
exporterParams := buildExporterSettings(otlpType, creationParams, testEndpoint)
123+
exporterParams.Logger.Info("test")
124+
125+
assert.Equal(t, component.NewID(otlpType), exporterParams.ID)
126+
assert.IsType(t, metricnoop.NewMeterProvider(), exporterParams.MeterProvider)
127+
assert.IsType(t, tracenoop.NewTracerProvider(), exporterParams.TracerProvider)
128+
129+
assert.Same(t, originalTelemetry.MeterProvider, creationParams.MeterProvider)
130+
assert.Same(t, originalTelemetry.TracerProvider, creationParams.TracerProvider)
131+
assert.Same(t, originalLogger, creationParams.Logger)
132+
133+
allLogs := observedLogs.All()
134+
require.Equal(t, 1, observedLogs.Len())
135+
assert.Contains(t,
136+
allLogs[0].Context,
137+
zap.String(zapEndpointKey, testEndpoint),
138+
)
139+
})
112140

113-
allLogs := observedLogs.All()
114-
require.Equal(t, 1, observedLogs.Len())
115-
assert.Contains(t,
116-
allLogs[0].Context,
117-
zap.String(zapEndpointKey, testEndpoint),
118-
)
141+
t.Run("with exporter name", func(t *testing.T) {
142+
ctx := context.Background() //nolint:usetesting // Context must outlive test for cleanup
143+
creationParams := exportertest.NewNopSettings(metadata.Type)
144+
creationParams.ID = component.NewIDWithName(metadata.Type, "custom")
145+
telemetry := componenttest.NewTelemetry()
146+
t.Cleanup(func() {
147+
require.NoError(t, telemetry.Shutdown(ctx))
148+
})
149+
creationParams.TelemetrySettings = telemetry.NewTelemetrySettings()
150+
originalTelemetry := creationParams.TelemetrySettings
151+
testEndpoint := "the-endpoint"
152+
observedZapCore, observedLogs := observer.New(zap.InfoLevel)
153+
creationParams.Logger = zap.New(observedZapCore)
154+
originalLogger := creationParams.Logger
155+
156+
exporterParams := buildExporterSettings(otlpType, creationParams, testEndpoint)
157+
exporterParams.Logger.Info("test")
158+
159+
assert.Equal(t, component.NewIDWithName(otlpType, "custom"), exporterParams.ID)
160+
assert.IsType(t, metricnoop.NewMeterProvider(), exporterParams.MeterProvider)
161+
assert.IsType(t, tracenoop.NewTracerProvider(), exporterParams.TracerProvider)
162+
163+
assert.Same(t, originalTelemetry.MeterProvider, creationParams.MeterProvider)
164+
assert.Same(t, originalTelemetry.TracerProvider, creationParams.TracerProvider)
165+
assert.Same(t, originalLogger, creationParams.Logger)
166+
167+
allLogs := observedLogs.All()
168+
require.Equal(t, 1, observedLogs.Len())
169+
assert.Contains(t,
170+
allLogs[0].Context,
171+
zap.String(zapEndpointKey, testEndpoint),
172+
)
173+
})
119174
}
120175

121176
func TestWrappedExporterHasEndpointAttribute(t *testing.T) {

exporter/loadbalancingexporter/log_exporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func newLogsExporter(params exporter.Settings, cfg component.Config) (*logExport
4343
exporterFactory := otlpexporter.NewFactory()
4444
cfFunc := func(ctx context.Context, endpoint string) (component.Component, error) {
4545
oCfg := buildExporterConfig(cfg.(*Config), endpoint)
46-
oParams := buildExporterSettings(params, endpoint)
46+
oParams := buildExporterSettings(exporterFactory.Type(), params, endpoint)
4747

4848
return exporterFactory.CreateLogs(ctx, oParams, &oCfg)
4949
}

exporter/loadbalancingexporter/log_exporter_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,15 @@ import (
2121
"go.opentelemetry.io/collector/consumer"
2222
"go.opentelemetry.io/collector/consumer/consumertest"
2323
"go.opentelemetry.io/collector/exporter"
24+
"go.opentelemetry.io/collector/exporter/exporterhelper"
2425
"go.opentelemetry.io/collector/exporter/exportertest"
26+
"go.opentelemetry.io/collector/exporter/otlpexporter"
2527
"go.opentelemetry.io/collector/pdata/pcommon"
2628
"go.opentelemetry.io/collector/pdata/plog"
29+
"go.opentelemetry.io/otel/attribute"
30+
metricnoop "go.opentelemetry.io/otel/metric/noop"
31+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
32+
tracenoop "go.opentelemetry.io/otel/trace/noop"
2733
"go.uber.org/zap"
2834

2935
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata"
@@ -155,6 +161,95 @@ func TestConsumeLogs(t *testing.T) {
155161
assert.NoError(t, res)
156162
}
157163

164+
func TestConsumeLogsEmitsOnlyParentExporterMetrics(t *testing.T) {
165+
ctx := t.Context()
166+
shutdownCtx := context.Background() //nolint:usetesting // Context must outlive test for cleanup
167+
telemetry := componenttest.NewTelemetry()
168+
t.Cleanup(func() {
169+
require.NoError(t, telemetry.Shutdown(shutdownCtx))
170+
})
171+
172+
parentParams := exportertest.NewNopSettings(metadata.Type)
173+
parentParams.TelemetrySettings = telemetry.NewTelemetrySettings()
174+
175+
cfg := simpleConfig()
176+
logsExporter, err := newLogsExporter(parentParams, cfg)
177+
require.NoError(t, err)
178+
179+
otlpFactory := otlpexporter.NewFactory()
180+
var childSettings []exporter.Settings
181+
logsExporter.loadBalancer.componentFactory = func(createCtx context.Context, endpoint string) (component.Component, error) {
182+
childCfg := buildExporterConfig(cfg, endpoint)
183+
childParams := buildExporterSettings(otlpFactory.Type(), parentParams, endpoint)
184+
childSettings = append(childSettings, childParams)
185+
186+
return exporterhelper.NewLogs(createCtx, childParams, &childCfg, func(context.Context, plog.Logs) error {
187+
return nil
188+
})
189+
}
190+
wrappedExporter, err := exporterhelper.NewLogs(
191+
ctx,
192+
parentParams,
193+
cfg,
194+
logsExporter.ConsumeLogs,
195+
exporterhelper.WithStart(logsExporter.Start),
196+
exporterhelper.WithShutdown(logsExporter.Shutdown),
197+
exporterhelper.WithCapabilities(logsExporter.Capabilities()),
198+
)
199+
require.NoError(t, err)
200+
201+
require.NoError(t, wrappedExporter.Start(ctx, componenttest.NewNopHost()))
202+
t.Cleanup(func() {
203+
require.NoError(t, wrappedExporter.Shutdown(ctx))
204+
})
205+
206+
logs := generateSingleLogRecord()
207+
require.NoError(t, wrappedExporter.ConsumeLogs(ctx, logs))
208+
209+
metric, err := telemetry.GetMetric("otelcol_exporter_sent_log_records")
210+
require.NoError(t, err)
211+
sum, ok := metric.Data.(metricdata.Sum[int64])
212+
require.True(t, ok)
213+
214+
exporterKey := attribute.Key("exporter")
215+
var loadbalancingTotal int64
216+
for _, dp := range sum.DataPoints {
217+
attr, found := dp.Attributes.Value(exporterKey)
218+
require.True(t, found, "exporter attribute must be present")
219+
if attr.AsString() != parentParams.ID.String() {
220+
assert.Failf(t, "unexpected exporter attribute", "got %s", attr.AsString())
221+
continue
222+
}
223+
loadbalancingTotal += dp.Value
224+
}
225+
226+
assert.Equal(t, int64(logs.LogRecordCount()), loadbalancingTotal)
227+
228+
loadbalancerMetric, err := telemetry.GetMetric("otelcol_loadbalancer_backend_outcome")
229+
require.NoError(t, err)
230+
lbSum, ok := loadbalancerMetric.Data.(metricdata.Sum[int64])
231+
require.True(t, ok)
232+
var totalBackendOutcome int64
233+
for _, dp := range lbSum.DataPoints {
234+
totalBackendOutcome += dp.Value
235+
}
236+
assert.Equal(t, int64(1), totalBackendOutcome)
237+
238+
require.Len(t, childSettings, 1)
239+
assert.IsType(t, metricnoop.NewMeterProvider(), childSettings[0].MeterProvider)
240+
assert.IsType(t, tracenoop.NewTracerProvider(), childSettings[0].TracerProvider)
241+
}
242+
243+
func generateSingleLogRecord() plog.Logs {
244+
logs := plog.NewLogs()
245+
rl := logs.ResourceLogs().AppendEmpty()
246+
sl := rl.ScopeLogs().AppendEmpty()
247+
logRecord := sl.LogRecords().AppendEmpty()
248+
logRecord.Body().SetStr("test log")
249+
logRecord.SetTimestamp(pcommon.Timestamp(123))
250+
return logs
251+
}
252+
158253
func TestConsumeLogsUnexpectedExporterType(t *testing.T) {
159254
componentFactory := func(_ context.Context, _ string) (component.Component, error) {
160255
return newNopMockExporter(), nil

exporter/loadbalancingexporter/metrics_exporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func newMetricsExporter(params exporter.Settings, cfg component.Config) (*metric
4646
exporterFactory := otlpexporter.NewFactory()
4747
cfFunc := func(ctx context.Context, endpoint string) (component.Component, error) {
4848
oCfg := buildExporterConfig(cfg.(*Config), endpoint)
49-
oParams := buildExporterSettings(params, endpoint)
49+
oParams := buildExporterSettings(exporterFactory.Type(), params, endpoint)
5050

5151
return exporterFactory.CreateMetrics(ctx, oParams, &oCfg)
5252
}

exporter/loadbalancingexporter/trace_exporter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func newTracesExporter(params exporter.Settings, cfg component.Config) (*traceEx
5454
exporterFactory := otlpexporter.NewFactory()
5555
cfFunc := func(ctx context.Context, endpoint string) (component.Component, error) {
5656
oCfg := buildExporterConfig(cfg.(*Config), endpoint)
57-
oParams := buildExporterSettings(params, endpoint)
57+
oParams := buildExporterSettings(exporterFactory.Type(), params, endpoint)
5858

5959
return exporterFactory.CreateTraces(ctx, oParams, &oCfg)
6060
}

0 commit comments

Comments
 (0)