Skip to content

Commit 4b22b2c

Browse files
committed
Cleanup Queue factory and initialization
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent beb9002 commit 4b22b2c

21 files changed

+288
-402
lines changed
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterqueue
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Remove exporterqueue.Factory in favor of the NewQueue function, and merge configs for memory and persistent.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12509]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

.chloggen/new-queue-factory.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Change the signature of the exporterhelper.WithQueueRequest to accept Encoding instead of the Factory.
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [12509]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

exporter/exporterhelper/common.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,16 @@ func WithRetry(config configretry.BackOffConfig) Option {
4242
// WithQueue overrides the default QueueConfig for an exporter.
4343
// The default QueueConfig is to disable queueing.
4444
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
45-
func WithQueue(config internal.QueueConfig) Option {
45+
func WithQueue(config QueueConfig) Option {
4646
return internal.WithQueue(config)
4747
}
4848

4949
// WithRequestQueue enables queueing for an exporter.
5050
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
5151
// Experimental: This API is at the early stage of development and may change without backward compatibility
5252
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
53-
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option {
54-
return internal.WithRequestQueue(cfg, queueFactory)
53+
func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[Request]) Option {
54+
return internal.WithRequestQueue(cfg, encoding)
5555
}
5656

5757
// WithCapabilities overrides the default Capabilities() function for a Consumer.

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 20 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ type BaseExporter struct {
2727
component.StartFunc
2828
component.ShutdownFunc
2929

30-
Marshaler exporterqueue.Marshaler[request.Request]
31-
Unmarshaler exporterqueue.Unmarshaler[request.Request]
30+
encoding exporterqueue.Encoding[request.Request]
3231

3332
Set exporter.Settings
3433

@@ -45,18 +44,16 @@ type BaseExporter struct {
4544

4645
ConsumerOptions []consumer.Option
4746

48-
timeoutCfg TimeoutConfig
49-
retryCfg configretry.BackOffConfig
50-
queueFactory exporterqueue.Factory[request.Request]
51-
queueCfg exporterqueue.Config
52-
batcherCfg exporterbatcher.Config
47+
timeoutCfg TimeoutConfig
48+
retryCfg configretry.BackOffConfig
49+
queueCfg exporterqueue.Config
50+
batcherCfg exporterbatcher.Config
5351
}
5452

5553
func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...Option) (*BaseExporter, error) {
5654
be := &BaseExporter{
57-
Set: set,
58-
timeoutCfg: NewDefaultTimeoutConfig(),
59-
queueFactory: exporterqueue.NewMemoryQueueFactory[request.Request](),
55+
Set: set,
56+
timeoutCfg: NewDefaultTimeoutConfig(),
6057
}
6158

6259
for _, op := range options {
@@ -98,11 +95,12 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, options ...O
9895
}
9996

10097
if be.queueCfg.Enabled || be.batcherCfg.Enabled {
101-
qSet := exporterqueue.Settings{
98+
qSet := exporterqueue.Settings[request.Request]{
10299
Signal: signal,
103100
ExporterSettings: set,
101+
Encoding: be.encoding,
104102
}
105-
be.QueueSender, err = NewQueueSender(be.queueFactory, qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
103+
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
106104
if err != nil {
107105
return nil, err
108106
}
@@ -199,44 +197,30 @@ func WithRetry(config configretry.BackOffConfig) Option {
199197
// WithQueue overrides the default QueueConfig for an exporter.
200198
// The default QueueConfig is to disable queueing.
201199
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
202-
func WithQueue(config QueueConfig) Option {
200+
func WithQueue(cfg exporterqueue.Config) Option {
203201
return func(o *BaseExporter) error {
204-
if o.Marshaler == nil || o.Unmarshaler == nil {
202+
if o.encoding == nil {
205203
return errors.New("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
206204
}
207-
if !config.Enabled {
208-
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
209-
return nil
210-
}
211-
o.queueCfg = exporterqueue.Config{
212-
Enabled: config.Enabled,
213-
NumConsumers: config.NumConsumers,
214-
QueueSize: config.QueueSize,
215-
Blocking: config.Blocking,
216-
}
217-
o.queueFactory = exporterqueue.NewPersistentQueueFactory[request.Request](config.StorageID, exporterqueue.PersistentQueueSettings[request.Request]{
218-
Marshaler: o.Marshaler,
219-
Unmarshaler: o.Unmarshaler,
220-
})
221-
return nil
205+
return WithRequestQueue(cfg, o.encoding)(o)
222206
}
223207
}
224208

225209
// WithRequestQueue enables queueing for an exporter.
226210
// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
227211
// Experimental: This API is at the early stage of development and may change without backward compatibility
228212
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
229-
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[request.Request]) Option {
213+
func WithRequestQueue(cfg exporterqueue.Config, encoding exporterqueue.Encoding[request.Request]) Option {
230214
return func(o *BaseExporter) error {
231-
if o.Marshaler != nil || o.Unmarshaler != nil {
232-
return errors.New("WithRequestQueue option must be used with the new request exporters only, use WithQueue instead")
215+
if cfg.Enabled && cfg.StorageID != nil && encoding == nil {
216+
return errors.New("`encoding` must not be nil when persistent queue is enabled")
233217
}
218+
o.encoding = encoding
234219
if !cfg.Enabled {
235220
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
236221
return nil
237222
}
238223
o.queueCfg = cfg
239-
o.queueFactory = queueFactory
240224
return nil
241225
}
242226
}
@@ -263,20 +247,11 @@ func WithBatcher(cfg exporterbatcher.Config) Option {
263247
}
264248
}
265249

266-
// WithMarshaler is used to set the request marshaler for the new exporter helper.
267-
// It must be provided as the first option when creating a new exporter helper.
268-
func WithMarshaler(marshaler exporterqueue.Marshaler[request.Request]) Option {
269-
return func(o *BaseExporter) error {
270-
o.Marshaler = marshaler
271-
return nil
272-
}
273-
}
274-
275-
// WithUnmarshaler is used to set the request unmarshaler for the new exporter helper.
250+
// WithEncoding is used to set the request encoding for the new exporter helper.
276251
// It must be provided as the first option when creating a new exporter helper.
277-
func WithUnmarshaler(unmarshaler exporterqueue.Unmarshaler[request.Request]) Option {
252+
func WithEncoding(encoding exporterqueue.Encoding[request.Request]) Option {
278253
return func(o *BaseExporter) error {
279-
o.Unmarshaler = unmarshaler
254+
o.encoding = encoding
280255
return nil
281256
}
282257
}

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,16 +71,18 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
7171
bs, err := NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
7272
WithRetry(configretry.NewDefaultBackOffConfig()))
7373
require.NoError(t, err)
74-
require.Nil(t, bs.Marshaler)
75-
require.Nil(t, bs.Unmarshaler)
74+
require.Nil(t, bs.encoding)
7675
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
77-
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueConfig()))
76+
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(exporterqueue.NewDefaultConfig()))
7877
require.Error(t, err)
7978

79+
qCfg := exporterqueue.NewDefaultConfig()
80+
storageId := component.NewID(component.MustNewType("test"))
81+
qCfg.StorageID = &storageId
8082
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), defaultSignal,
81-
WithMarshaler(mockRequestMarshaler), WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})),
83+
WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})),
8284
WithRetry(configretry.NewDefaultBackOffConfig()),
83-
WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[request.Request]()))
85+
WithRequestQueue(qCfg, nil))
8486
require.Error(t, err)
8587
}
8688

@@ -93,7 +95,7 @@ func TestBaseExporterLogging(t *testing.T) {
9395
qCfg := exporterqueue.NewDefaultConfig()
9496
qCfg.Enabled = false
9597
bs, err := NewBaseExporter(set, defaultSignal,
96-
WithRequestQueue(qCfg, exporterqueue.NewMemoryQueueFactory[request.Request]()),
98+
WithRequestQueue(qCfg, newFakeEncoding(&requesttest.FakeRequest{})),
9799
WithBatcher(exporterbatcher.NewDefaultConfig()),
98100
WithRetry(rCfg))
99101
require.NoError(t, err)
@@ -118,10 +120,9 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
118120
{
119121
name: "WithQueue",
120122
queueOptions: []Option{
121-
WithMarshaler(mockRequestMarshaler),
122-
WithUnmarshaler(mockRequestUnmarshaler(&requesttest.FakeRequest{Items: 1})),
123+
WithEncoding(newFakeEncoding(&requesttest.FakeRequest{Items: 1})),
123124
func() Option {
124-
qs := NewDefaultQueueConfig()
125+
qs := exporterqueue.NewDefaultConfig()
125126
qs.Enabled = false
126127
return WithQueue(qs)
127128
}(),
@@ -138,7 +139,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) {
138139
func() Option {
139140
qs := exporterqueue.NewDefaultConfig()
140141
qs.Enabled = false
141-
return WithRequestQueue(qs, exporterqueue.NewMemoryQueueFactory[request.Request]())
142+
return WithRequestQueue(qs, newFakeEncoding(&requesttest.FakeRequest{Items: 1}))
142143
}(),
143144
func() Option {
144145
bs := exporterbatcher.NewDefaultConfig()

exporter/exporterhelper/internal/batcher/disabled_batcher_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ func TestDisabledBatcher_Basic(t *testing.T) {
4545
tt.maxWorkers)
4646
require.NoError(t, err)
4747

48-
q := exporterqueue.NewMemoryQueueFactory[request.Request]()(
48+
q := exporterqueue.NewQueue[request.Request](
4949
context.Background(),
50-
exporterqueue.Settings{
50+
exporterqueue.Settings[request.Request]{
5151
Signal: pipeline.SignalTraces,
5252
ExporterSettings: exportertest.NewNopSettings(exportertest.NopType),
5353
},

exporter/exporterhelper/internal/obs_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type obsQueue[T request.Request] struct {
2323
enqueueFailedInst metric.Int64Counter
2424
}
2525

26-
func newObsQueue[T request.Request](set exporterqueue.Settings, delegate exporterqueue.Queue[T]) (exporterqueue.Queue[T], error) {
26+
func newObsQueue[T request.Request](set exporterqueue.Settings[T], delegate exporterqueue.Queue[T]) (exporterqueue.Queue[T], error) {
2727
tb, err := metadata.NewTelemetryBuilder(set.ExporterSettings.TelemetrySettings)
2828
if err != nil {
2929
return nil, err

exporter/exporterhelper/internal/obs_queue_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func TestObsQueueLogsSizeCapacity(t *testing.T) {
5050
tt := componenttest.NewTelemetry()
5151
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
5252

53-
te, err := newObsQueue[request.Request](exporterqueue.Settings{
53+
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
5454
Signal: pipeline.SignalLogs,
5555
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
5656
}, newFakeQueue[request.Request](nil, 7, 9))
@@ -80,7 +80,7 @@ func TestObsQueueLogsFailure(t *testing.T) {
8080
tt := componenttest.NewTelemetry()
8181
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
8282

83-
te, err := newObsQueue[request.Request](exporterqueue.Settings{
83+
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
8484
Signal: pipeline.SignalLogs,
8585
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
8686
}, newFakeQueue[request.Request](errors.New("my error"), 7, 9))
@@ -100,7 +100,7 @@ func TestObsQueueTracesSizeCapacity(t *testing.T) {
100100
tt := componenttest.NewTelemetry()
101101
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
102102

103-
te, err := newObsQueue[request.Request](exporterqueue.Settings{
103+
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
104104
Signal: pipeline.SignalTraces,
105105
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
106106
}, newFakeQueue[request.Request](nil, 17, 19))
@@ -130,7 +130,7 @@ func TestObsQueueTracesFailure(t *testing.T) {
130130
tt := componenttest.NewTelemetry()
131131
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
132132

133-
te, err := newObsQueue[request.Request](exporterqueue.Settings{
133+
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
134134
Signal: pipeline.SignalTraces,
135135
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
136136
}, newFakeQueue[request.Request](errors.New("my error"), 0, 0))
@@ -150,7 +150,7 @@ func TestObsQueueMetrics(t *testing.T) {
150150
tt := componenttest.NewTelemetry()
151151
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
152152

153-
te, err := newObsQueue[request.Request](exporterqueue.Settings{
153+
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
154154
Signal: pipeline.SignalMetrics,
155155
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
156156
}, newFakeQueue[request.Request](nil, 27, 29))
@@ -180,7 +180,7 @@ func TestObsQueueMetricsFailure(t *testing.T) {
180180
tt := componenttest.NewTelemetry()
181181
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
182182

183-
te, err := newObsQueue[request.Request](exporterqueue.Settings{
183+
te, err := newObsQueue[request.Request](exporterqueue.Settings[request.Request]{
184184
Signal: pipeline.SignalMetrics,
185185
ExporterSettings: exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()},
186186
}, newFakeQueue[request.Request](errors.New("my error"), 0, 0))

0 commit comments

Comments
 (0)