Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/otelcorecol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ require (
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.64.0 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect
github.com/rs/cors v1.11.1 // indirect
github.com/shirou/gopsutil/v4 v4.25.5 // indirect
github.com/spf13/cobra v1.9.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions cmd/otelcorecol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions exporter/debugexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/config/configretry v1.33.0 // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.127.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions exporter/debugexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende

if be.queueCfg.Enabled || be.batcherCfg.Enabled {
qSet := queuebatch.Settings[request.Request]{
Signal: signal,
ID: set.ID,
Telemetry: set.TelemetrySettings,
Encoding: be.queueBatchSettings.Encoding,
Sizers: be.queueBatchSettings.Sizers,
Signal: signal,
ID: set.ID,
Telemetry: set.TelemetrySettings,
Encoding: be.queueBatchSettings.Encoding,
Sizers: be.queueBatchSettings.Sizers,
Partitioner: be.queueBatchSettings.Partitioner,
}
be.QueueSender, err = NewQueueSender(qSet, be.queueCfg, be.batcherCfg, be.ExportFailureMessage, be.firstSender)
if err != nil {
Expand Down
114 changes: 114 additions & 0 deletions exporter/exporterhelper/internal/queuebatch/multi_batcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queuebatch // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch"
import (
"context"
"sync"

"github.com/puzpuzpuz/xsync/v3"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
)

type multiBatcher struct {
cfg BatchConfig
workerPool chan struct{}
sizerType request.SizerType
sizer request.Sizer[request.Request]
partitioner Partitioner[request.Request]
consumeFunc sender.SendFunc[request.Request]

singleShard *shardBatcher
shards *xsync.MapOf[string, *shardBatcher]
}

var _ Batcher[request.Request] = (*multiBatcher)(nil)

func newMultiBatcher(bCfg BatchConfig, bSet batcherSettings[request.Request]) *multiBatcher {
var workerPool chan struct{}
if bSet.maxWorkers != 0 {
workerPool = make(chan struct{}, bSet.maxWorkers)
for i := 0; i < bSet.maxWorkers; i++ {
workerPool <- struct{}{}
}
}
mb := &multiBatcher{
cfg: bCfg,
workerPool: workerPool,
sizerType: bSet.sizerType,
sizer: bSet.sizer,
partitioner: bSet.partitioner,
consumeFunc: bSet.next,
}

if bSet.partitioner == nil {
mb.singleShard = &shardBatcher{
cfg: bCfg,
workerPool: mb.workerPool,
sizerType: bSet.sizerType,
sizer: bSet.sizer,
consumeFunc: bSet.next,
stopWG: sync.WaitGroup{},
shutdownCh: make(chan struct{}, 1),
}
} else {
mb.shards = xsync.NewMapOf[string, *shardBatcher]()
}
return mb
}

func (mb *multiBatcher) getShard(ctx context.Context, req request.Request) *shardBatcher {
if mb.singleShard != nil {
return mb.singleShard
}

key := mb.partitioner.GetKey(ctx, req)
result, _ := mb.shards.LoadOrCompute(key, func() *shardBatcher {
s := &shardBatcher{
cfg: mb.cfg,
workerPool: mb.workerPool,
sizerType: mb.sizerType,
sizer: mb.sizer,
consumeFunc: mb.consumeFunc,
stopWG: sync.WaitGroup{},
shutdownCh: make(chan struct{}, 1),
}
s.start(ctx, nil)
return s
})
return result
}

func (mb *multiBatcher) Start(ctx context.Context, host component.Host) error {
if mb.singleShard != nil {
mb.singleShard.start(ctx, host)
}
return nil
}

func (mb *multiBatcher) Consume(ctx context.Context, req request.Request, done Done) {
shard := mb.getShard(ctx, req)
shard.Consume(ctx, req, done)
}

func (mb *multiBatcher) Shutdown(ctx context.Context) error {
if mb.singleShard != nil {
mb.singleShard.shutdown(ctx)
return nil
}

var wg sync.WaitGroup
wg.Add(mb.shards.Size())
mb.shards.Range(func(_ string, shard *shardBatcher) bool {
go func() {
shard.shutdown(ctx)
wg.Done()
}()
return true
})
wg.Wait()
return nil
}
113 changes: 113 additions & 0 deletions exporter/exporterhelper/internal/queuebatch/multi_batcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package queuebatch

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
)

func TestMultiBatcher_NoTimeout(t *testing.T) {
cfg := BatchConfig{
FlushTimeout: 0,
MinSize: 10,
}
sink := requesttest.NewSink()

type partitionKey struct{}

ba := newMultiBatcher(cfg, batcherSettings[request.Request]{
sizerType: request.SizerTypeItems,
sizer: request.NewItemsSizer(),
partitioner: NewPartitioner(func(ctx context.Context, _ request.Request) string {
return ctx.Value(partitionKey{}).(string)
}),
next: sink.Export,
maxWorkers: 1,
})

require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, ba.Shutdown(context.Background()))
})

done := newFakeDone()
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}, done)
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}, done)

// Neither batch should be flushed since they haven't reached min threshold.
assert.Equal(t, 0, sink.RequestsCount())
assert.Equal(t, 0, sink.ItemsCount())

ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}, done)

assert.Eventually(t, func() bool {
return sink.RequestsCount() == 1 && sink.ItemsCount() == 16
}, 500*time.Millisecond, 10*time.Millisecond)

ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}, done)

assert.Eventually(t, func() bool {
return sink.RequestsCount() == 2 && sink.ItemsCount() == 28
}, 500*time.Millisecond, 10*time.Millisecond)

// Check that done callback is called for the right amount of times.
assert.EqualValues(t, 0, done.errors.Load())
assert.EqualValues(t, 4, done.success.Load())

require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
}

func TestMultiBatcher_Timeout(t *testing.T) {
cfg := BatchConfig{
FlushTimeout: 100 * time.Millisecond,
MinSize: 100,
}
sink := requesttest.NewSink()

type partitionKey struct{}

ba := newMultiBatcher(cfg, batcherSettings[request.Request]{
sizerType: request.SizerTypeItems,
sizer: request.NewItemsSizer(),
partitioner: NewPartitioner(func(ctx context.Context, _ request.Request) string {
return ctx.Value(partitionKey{}).(string)
}),
next: sink.Export,
maxWorkers: 1,
})

require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
require.NoError(t, ba.Shutdown(context.Background()))
})

done := newFakeDone()
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}, done)
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}, done)

// Neither batch should be flushed since they haven't reached min threshold.
assert.Equal(t, 0, sink.RequestsCount())
assert.Equal(t, 0, sink.ItemsCount())

ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}, done)
ba.Consume(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}, done)

assert.Eventually(t, func() bool {
return sink.RequestsCount() == 2 && sink.ItemsCount() == 28
}, 1*time.Second, 10*time.Millisecond)
// Check that done callback is called for the right amount of times.
assert.EqualValues(t, 0, done.errors.Load())
assert.EqualValues(t, 4, done.success.Load())

require.NoError(t, ba.Start(context.Background(), componenttest.NewNopHost()))
}
33 changes: 18 additions & 15 deletions exporter/exporterhelper/internal/queuebatch/queue_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import (

// Settings defines settings for creating a QueueBatch.
type Settings[T any] struct {
Signal pipeline.Signal
ID component.ID
Telemetry component.TelemetrySettings
Encoding Encoding[T]
Sizers map[request.SizerType]request.Sizer[T]
Signal pipeline.Signal
ID component.ID
Telemetry component.TelemetrySettings
Encoding Encoding[T]
Sizers map[request.SizerType]request.Sizer[T]
Partitioner Partitioner[T]
}

type QueueBatch struct {
Expand Down Expand Up @@ -65,18 +66,20 @@ func newQueueBatch(
if cfg.Batch != nil {
if oldBatcher {
// If user configures the old batcher we only can support "items" sizer.
b = newDefaultBatcher(*cfg.Batch, batcherSettings[request.Request]{
sizerType: request.SizerTypeItems,
sizer: request.NewItemsSizer(),
next: next,
maxWorkers: cfg.NumConsumers,
b = newMultiBatcher(*cfg.Batch, batcherSettings[request.Request]{
sizerType: request.SizerTypeItems,
sizer: request.NewItemsSizer(),
partitioner: set.Partitioner,
next: next,
maxWorkers: cfg.NumConsumers,
})
} else {
b = newDefaultBatcher(*cfg.Batch, batcherSettings[request.Request]{
sizerType: cfg.Sizer,
sizer: sizer,
next: next,
maxWorkers: cfg.NumConsumers,
b = newMultiBatcher(*cfg.Batch, batcherSettings[request.Request]{
sizerType: cfg.Sizer,
sizer: sizer,
partitioner: set.Partitioner,
next: next,
maxWorkers: cfg.NumConsumers,
})
}
// Keep the number of queue consumers to 1 if batching is enabled until we support sharding as described in
Expand Down
42 changes: 42 additions & 0 deletions exporter/exporterhelper/internal/queuebatch/queue_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,48 @@ func TestQueueBatch_MergeOrSplit(t *testing.T) {
require.NoError(t, qb.Shutdown(context.Background()))
}

func TestQueueBatch_MergeOrSplit_Multibatch(t *testing.T) {
sink := requesttest.NewSink()
cfg := newTestConfig()
cfg.Batch = &BatchConfig{
FlushTimeout: 100 * time.Millisecond,
MinSize: 10,
}

type partitionKey struct{}
set := newFakeRequestSettings()
set.Partitioner = NewPartitioner(func(ctx context.Context, _ request.Request) string {
key := ctx.Value(partitionKey{}).(string)
return key
})

qb, err := NewQueueBatch(set, cfg, sink.Export)
require.NoError(t, err)
require.NoError(t, qb.Start(context.Background(), componenttest.NewNopHost()))

// should be sent right away by reaching the minimum items size.
require.NoError(t, qb.Send(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}))
require.NoError(t, qb.Send(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}))

// Neither batch should be flushed since they haven't reached min threshold.
assert.Equal(t, 0, sink.RequestsCount())
assert.Equal(t, 0, sink.ItemsCount())

require.NoError(t, qb.Send(context.WithValue(context.Background(), partitionKey{}, "p1"), &requesttest.FakeRequest{Items: 8}))

assert.Eventually(t, func() bool {
return sink.RequestsCount() == 1 && sink.ItemsCount() == 16
}, 500*time.Millisecond, 10*time.Millisecond)

require.NoError(t, qb.Send(context.WithValue(context.Background(), partitionKey{}, "p2"), &requesttest.FakeRequest{Items: 6}))

assert.Eventually(t, func() bool {
return sink.RequestsCount() == 2 && sink.ItemsCount() == 28
}, 500*time.Millisecond, 10*time.Millisecond)

require.NoError(t, qb.Shutdown(context.Background()))
}

func TestQueueBatch_Shutdown(t *testing.T) {
sink := requesttest.NewSink()
qb, err := NewQueueBatch(newFakeRequestSettings(), newTestConfig(), sink.Export)
Expand Down
Loading
Loading