Skip to content

Commit a0cbea7

Browse files
mx-psievan-bradleyjmacd
authored
[exporterhelper] Use configoptional.Optional for exporterhelper QueueBatchConfig (open-telemetry#14155)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description <!-- Issue number if applicable --> Uses `configoptional.Optional` for `exporterhelper.QueueBatchConfig`. #### Link to tracking issue Updates open-telemetry#14021 --------- Co-authored-by: Evan Bradley <[email protected]> Co-authored-by: Joshua MacDonald <[email protected]>
1 parent ced8cda commit a0cbea7

34 files changed

+135
-106
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: pkg/exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Use `configoptional.Optional` for the `exporterhelper.QueueBatchConfig`
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [14155]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext: |
19+
It's recommended to change the field type in your component configuration to be `configoptional.Optional[exporterhelper.QueueBatchConfig]` to keep the `enabled` subfield. Use configoptional.Some(exporterhelper.NewDefaultQueueConfig()) to enable by default. Use configoptional.Default(exporterhelper.NewDefaultQueueConfig()) to disable by default.
20+
21+
# Optional: The change log or logs in which this entry should be included.
22+
# e.g. '[user]' or '[user, api]'
23+
# Include 'user' if the change is relevant to end users.
24+
# Include 'api' if there is a change to a library API.
25+
# Default: '[user]'
26+
change_logs: [api]

exporter/debugexporter/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88

99
"go.opentelemetry.io/collector/component"
10+
"go.opentelemetry.io/collector/config/configoptional"
1011
"go.opentelemetry.io/collector/config/configtelemetry"
1112
"go.opentelemetry.io/collector/exporter/exporterhelper"
1213
)
@@ -33,7 +34,7 @@ type Config struct {
3334
// UseInternalLogger defines whether the exporter sends the output to the collector's internal logger.
3435
UseInternalLogger bool `mapstructure:"use_internal_logger"`
3536

36-
QueueConfig exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"`
37+
QueueConfig configoptional.Optional[exporterhelper.QueueBatchConfig] `mapstructure:"sending_queue"`
3738

3839
// prevent unkeyed literal initialization
3940
_ struct{}

exporter/debugexporter/config_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/stretchr/testify/assert"
1111
"github.com/stretchr/testify/require"
1212

13+
"go.opentelemetry.io/collector/config/configoptional"
1314
"go.opentelemetry.io/collector/config/configtelemetry"
1415
"go.opentelemetry.io/collector/confmap"
1516
"go.opentelemetry.io/collector/confmap/confmaptest"
@@ -25,7 +26,6 @@ func TestUnmarshalDefaultConfig(t *testing.T) {
2526

2627
func TestUnmarshalConfig(t *testing.T) {
2728
queueCfg := exporterhelper.NewDefaultQueueConfig()
28-
queueCfg.Enabled = false
2929
tests := []struct {
3030
filename string
3131
cfg *Config
@@ -37,7 +37,7 @@ func TestUnmarshalConfig(t *testing.T) {
3737
Verbosity: configtelemetry.LevelDetailed,
3838
SamplingInitial: 10,
3939
SamplingThereafter: 50,
40-
QueueConfig: queueCfg,
40+
QueueConfig: configoptional.Default(queueCfg),
4141
},
4242
},
4343
{

exporter/debugexporter/exporter_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ import (
1212
"github.com/stretchr/testify/require"
1313
"go.uber.org/zap/zaptest"
1414

15+
"go.opentelemetry.io/collector/config/configoptional"
1516
"go.opentelemetry.io/collector/config/configtelemetry"
1617
"go.opentelemetry.io/collector/exporter/debugexporter/internal/metadata"
18+
"go.opentelemetry.io/collector/exporter/exporterhelper"
1719
"go.opentelemetry.io/collector/exporter/exportertest"
1820
"go.opentelemetry.io/collector/pdata/plog"
1921
"go.opentelemetry.io/collector/pdata/pmetric"
@@ -111,15 +113,17 @@ func createTestCases() []testCase {
111113
name: "default config",
112114
config: func() *Config {
113115
c := createDefaultConfig().(*Config)
114-
c.QueueConfig.QueueSize = 10
116+
c.QueueConfig = configoptional.Some(exporterhelper.NewDefaultQueueConfig())
117+
c.QueueConfig.Get().QueueSize = 10
115118
return c
116119
}(),
117120
},
118121
{
119122
name: "don't use internal logger",
120123
config: func() *Config {
121124
cfg := createDefaultConfig().(*Config)
122-
cfg.QueueConfig.QueueSize = 10
125+
cfg.QueueConfig = configoptional.Some(exporterhelper.NewDefaultQueueConfig())
126+
cfg.QueueConfig.Get().QueueSize = 10
123127
cfg.UseInternalLogger = false
124128
return cfg
125129
}(),

exporter/debugexporter/factory.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"go.uber.org/zap/zapcore"
1212

1313
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/config/configoptional"
1415
"go.opentelemetry.io/collector/config/configtelemetry"
1516
"go.opentelemetry.io/collector/consumer"
1617
"go.opentelemetry.io/collector/exporter"
@@ -42,15 +43,12 @@ func NewFactory() exporter.Factory {
4243
}
4344

4445
func createDefaultConfig() component.Config {
45-
queueCfg := exporterhelper.NewDefaultQueueConfig()
46-
queueCfg.Enabled = false
47-
4846
return &Config{
4947
Verbosity: configtelemetry.LevelBasic,
5048
SamplingInitial: defaultSamplingInitial,
5149
SamplingThereafter: defaultSamplingThereafter,
5250
UseInternalLogger: true,
53-
QueueConfig: queueCfg,
51+
QueueConfig: configoptional.Default(exporterhelper.NewDefaultQueueConfig()),
5452
}
5553
}
5654

exporter/debugexporter/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ require (
66
github.com/stretchr/testify v1.11.1
77
go.opentelemetry.io/collector/component v1.47.0
88
go.opentelemetry.io/collector/component/componenttest v0.141.0
9+
go.opentelemetry.io/collector/config/configoptional v1.47.0
910
go.opentelemetry.io/collector/config/configtelemetry v0.141.0
1011
go.opentelemetry.io/collector/confmap v1.47.0
1112
go.opentelemetry.io/collector/consumer v1.47.0
@@ -43,7 +44,6 @@ require (
4344
github.com/pmezard/go-difflib v1.0.0 // indirect
4445
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
4546
go.opentelemetry.io/collector/client v1.47.0 // indirect
46-
go.opentelemetry.io/collector/config/configoptional v1.47.0 // indirect
4747
go.opentelemetry.io/collector/config/configretry v1.47.0 // indirect
4848
go.opentelemetry.io/collector/confmap/xconfmap v0.141.0 // indirect
4949
go.opentelemetry.io/collector/consumer/consumererror v0.141.0 // indirect

exporter/example_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"go.uber.org/zap"
1111

1212
"go.opentelemetry.io/collector/component"
13+
"go.opentelemetry.io/collector/config/configoptional"
1314
"go.opentelemetry.io/collector/config/configretry"
1415
"go.opentelemetry.io/collector/consumer"
1516
"go.opentelemetry.io/collector/exporter"
@@ -22,7 +23,7 @@ var typeStr = component.MustNewType("example")
2223

2324
// exampleConfig holds configuration settings for the exporter.
2425
type exampleConfig struct {
25-
QueueSettings exporterhelper.QueueBatchConfig
26+
QueueSettings configoptional.Optional[exporterhelper.QueueBatchConfig]
2627
BackOffConfig configretry.BackOffConfig
2728
}
2829

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"go.uber.org/zap"
1212

1313
"go.opentelemetry.io/collector/component"
14+
"go.opentelemetry.io/collector/config/configoptional"
1415
"go.opentelemetry.io/collector/config/configretry"
1516
"go.opentelemetry.io/collector/consumer"
1617
"go.opentelemetry.io/collector/exporter"
@@ -47,7 +48,7 @@ type BaseExporter struct {
4748
retryCfg configretry.BackOffConfig
4849

4950
queueBatchSettings queuebatch.Settings[request.Request]
50-
queueCfg queuebatch.Config
51+
queueCfg configoptional.Optional[queuebatch.Config]
5152
}
5253

5354
func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sender.SendFunc[request.Request], options ...Option) (*BaseExporter, error) {
@@ -82,19 +83,19 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
8283
return nil, err
8384
}
8485

85-
if be.queueCfg.Batch.HasValue() {
86+
if be.queueCfg.HasValue() && be.queueCfg.Get().Batch.HasValue() {
8687
// Batcher mutates the data.
8788
be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
8889
}
8990

90-
if be.queueCfg.Enabled {
91+
if be.queueCfg.HasValue() {
9192
qSet := queuebatch.AllSettings[request.Request]{
9293
Settings: be.queueBatchSettings,
9394
Signal: signal,
9495
ID: set.ID,
9596
Telemetry: set.TelemetrySettings,
9697
}
97-
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.ExportFailureMessage, be.firstSender)
98+
be.QueueSender, err = NewQueueSender(qSet, *be.queueCfg.Get(), be.ExportFailureMessage, be.firstSender)
9899
if err != nil {
99100
return nil, err
100101
}
@@ -191,7 +192,7 @@ func WithRetry(config configretry.BackOffConfig) Option {
191192
// WithQueue overrides the default queuebatch.Config for an exporter.
192193
// The default queuebatch.Config is to disable queueing.
193194
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
194-
func WithQueue(cfg queuebatch.Config) Option {
195+
func WithQueue(cfg configoptional.Optional[queuebatch.Config]) Option {
195196
return func(o *BaseExporter) error {
196197
if o.queueBatchSettings.Encoding == nil {
197198
return errors.New("WithQueue option is not available for the new request exporters, use WithQueueBatch instead")
@@ -204,13 +205,13 @@ func WithQueue(cfg queuebatch.Config) Option {
204205
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
205206
// Experimental: This API is at the early stage of development and may change without backward compatibility
206207
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
207-
func WithQueueBatch(cfg queuebatch.Config, set queuebatch.Settings[request.Request]) Option {
208+
func WithQueueBatch(cfg configoptional.Optional[queuebatch.Config], set queuebatch.Settings[request.Request]) Option {
208209
return func(o *BaseExporter) error {
209-
if !cfg.Enabled {
210+
if !cfg.HasValue() {
210211
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
211212
return nil
212213
}
213-
if cfg.StorageID != nil && set.Encoding == nil {
214+
if cfg.Get().StorageID != nil && set.Encoding == nil {
214215
return errors.New("`Settings.Encoding` must not be nil when persistent queue is enabled")
215216
}
216217
o.queueBatchSettings = set

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"go.opentelemetry.io/collector/component"
1717
"go.opentelemetry.io/collector/component/componenttest"
18+
"go.opentelemetry.io/collector/config/configoptional"
1819
"go.opentelemetry.io/collector/config/configretry"
1920
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
2021
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -49,7 +50,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
4950
require.NoError(t, err)
5051
require.Nil(t, bs.queueBatchSettings.Encoding)
5152
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
52-
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
53+
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(configoptional.Some(NewDefaultQueueConfig())))
5354
require.Error(t, err)
5455

5556
qCfg := NewDefaultQueueConfig()
@@ -58,7 +59,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
5859
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
5960
WithQueueBatchSettings(newFakeQueueBatch()),
6061
WithRetry(configretry.NewDefaultBackOffConfig()),
61-
WithQueueBatch(qCfg, queuebatch.Settings[request.Request]{}))
62+
WithQueueBatch(configoptional.Some(qCfg), queuebatch.Settings[request.Request]{}))
6263
require.Error(t, err)
6364
}
6465

@@ -72,7 +73,7 @@ func TestBaseExporterLogging(t *testing.T) {
7273
qCfg.WaitForResult = true
7374
bs, err := NewBaseExporter(set, pipeline.SignalMetrics, errExport,
7475
WithQueueBatchSettings(newFakeQueueBatch()),
75-
WithQueue(qCfg),
76+
WithQueue(configoptional.Some(qCfg)),
7677
WithRetry(rCfg))
7778
require.NoError(t, err)
7879
require.NoError(t, bs.Start(context.Background(), componenttest.NewNopHost()))
@@ -98,19 +99,15 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
9899
queueOptions: []Option{
99100
WithQueueBatchSettings(newFakeQueueBatch()),
100101
func() Option {
101-
qs := NewDefaultQueueConfig()
102-
qs.Enabled = false
103-
return WithQueue(qs)
102+
return WithQueue(configoptional.None[queuebatch.Config]())
104103
}(),
105104
},
106105
},
107106
{
108107
name: "WithRequestQueue",
109108
queueOptions: []Option{
110109
func() Option {
111-
qs := NewDefaultQueueConfig()
112-
qs.Enabled = false
113-
return WithQueueBatch(qs, newFakeQueueBatch())
110+
return WithQueueBatch(configoptional.None[queuebatch.Config](), newFakeQueueBatch())
114111
}(),
115112
},
116113
},

exporter/exporterhelper/internal/queue_sender.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,16 @@ import (
1616
)
1717

1818
// NewDefaultQueueConfig returns the default config for queuebatch.Config.
19-
// By default, the queue stores 1000 requests of telemetry and is non-blocking when full.
19+
// By default:
20+
//
21+
// - the queue stores 1000 requests of telemetry
22+
// - is non-blocking when full
23+
// - concurrent exports limited to 10
24+
// - emits batches of 8192 items, timeout 200ms
2025
func NewDefaultQueueConfig() queuebatch.Config {
2126
return queuebatch.Config{
22-
Enabled: true,
23-
Sizer: request.SizerTypeRequests,
24-
NumConsumers: 10,
25-
// By default, batches are 8192 spans, for a total of up to 8 million spans in the queue
26-
// This can be estimated at 1-4 GB worth of maximum memory usage
27-
// This default is probably still too high, and may be adjusted further down in a future release
27+
Sizer: request.SizerTypeRequests,
28+
NumConsumers: 10,
2829
QueueSize: 1_000,
2930
BlockOnOverflow: false,
3031
Batch: configoptional.Default(queuebatch.BatchConfig{

0 commit comments

Comments
 (0)