diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 0c43fb6f17b..e6f5a531040 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -30,6 +30,9 @@ import ( // errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached. var errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batcher metadata-value combinations")) +// errShuttingDown is returned when data is received while the processor is shutting down. +var errShuttingDown = errors.New("batch processor is shutting down") + // batch_processor is a component that accepts spans and metrics, places them // into batches and sends downstream. // @@ -290,9 +293,15 @@ func (sb *singleShardBatcher[T]) start(context.Context) error { return nil } -func (sb *singleShardBatcher[T]) consume(_ context.Context, data T) error { - sb.single.newItem <- data - return nil +func (sb *singleShardBatcher[T]) consume(ctx context.Context, data T) error { + select { + case sb.single.newItem <- data: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-sb.processor.shutdownC: + return errShuttingDown + } } func (sb *singleShardBatcher[T]) currentMetadataCardinality() int { @@ -362,8 +371,14 @@ func (mb *multiShardBatcher[T]) consume(ctx context.Context, data T) error { } mb.lock.Unlock() } - b.(*shard[T]).newItem <- data - return nil + select { + case b.(*shard[T]).newItem <- data: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-mb.processor.shutdownC: + return errShuttingDown + } } func (mb *multiShardBatcher[T]) currentMetadataCardinality() int { diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index 58fbe797079..b82de4902a2 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -1010,6 +1010,590 @@ func TestShutdown(t *testing.T) { processortest.VerifyShutdown(t, factory, factory.CreateDefaultConfig()) } +// TestBatchProcessor_ShutdownDuringBlocked verifies that the batch processor +// does not hang when Shutdown is called while a ConsumeX call is blocked +// waiting for buffer space. This was a critical bug that caused goroutine +// leaks and shutdown deadlocks under high load. +func TestBatchProcessor_ShutdownDuringBlocked(t *testing.T) { + // Create a consumer that blocks until we signal it + blockingConsumer := &blockingTracesSink{block: make(chan struct{})} + + cfg := &Config{ + Timeout: 10 * time.Second, // Long timeout so timer doesn't trigger + SendBatchSize: 1, // Send immediately when 1 span arrives + } + + traces, err := NewFactory().CreateTraces( + context.Background(), + processortest.NewNopSettings(metadata.Type), + cfg, + blockingConsumer, + ) + require.NoError(t, err) + require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) + + // Send first item - this will cause the shard to call export() which blocks + td := testdata.GenerateTraces(1) + go func() { + _ = traces.ConsumeTraces(context.Background(), td) + }() + + // Wait for the shard to be blocked in export + time.Sleep(50 * time.Millisecond) + + // Now flood with more data - these will queue in the buffer and then block + var wg sync.WaitGroup + errCh := make(chan error, 100) + numSenders := 50 // More than buffer size (runtime.NumCPU()) + + for range numSenders { + wg.Add(1) + go func() { + defer wg.Done() + td := testdata.GenerateTraces(1) + err := traces.ConsumeTraces(context.Background(), td) + if err != nil { + errCh <- err + } + }() + } + + // Wait for senders to start and some to block + time.Sleep(100 * time.Millisecond) + + // Shutdown - this should unblock all waiting senders + shutdownDone := make(chan struct{}) + go func() { + _ = traces.Shutdown(context.Background()) + close(shutdownDone) + }() + + // Unblock the consumer so shutdown can complete + time.Sleep(50 * time.Millisecond) + close(blockingConsumer.block) + + // Shutdown should complete within a reasonable time + select { + case <-shutdownDone: + // Success - shutdown completed + case <-time.After(5 * time.Second): + t.Fatal("Shutdown timed out - likely blocked goroutines were not released") + } + + // Wait for all send goroutines to complete + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // Success - all goroutines completed + case <-time.After(5 * time.Second): + t.Fatal("Goroutines did not complete - likely goroutine leak") + } + + // Collect errors - we expect some shutting down errors + close(errCh) + var shutdownErrors int + for err := range errCh { + if err == errShuttingDown { + shutdownErrors++ + } + } + // The key assertion: shutdown completed and goroutines were released + // (the number of shutdown errors depends on timing, but no goroutine leak occurred) + t.Logf("Shutdown completed successfully, %d sends rejected with errShuttingDown", shutdownErrors) +} + +// TestBatchProcessor_ContextCancellation verifies that ConsumeX respects context cancellation. +func TestBatchProcessor_ContextCancellation(t *testing.T) { + // Create a consumer that blocks forever + blockingConsumer := &blockingTracesSink{block: make(chan struct{})} + + cfg := &Config{ + Timeout: 10 * time.Second, + SendBatchSize: 1, + } + + traces, err := NewFactory().CreateTraces( + context.Background(), + processortest.NewNopSettings(metadata.Type), + cfg, + blockingConsumer, + ) + require.NoError(t, err) + require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) + + // Send first item to block the shard in export + go func() { + td := testdata.GenerateTraces(1) + _ = traces.ConsumeTraces(context.Background(), td) + }() + time.Sleep(50 * time.Millisecond) + + // Fill the buffer with background sends + for range 20 { + go func() { + td := testdata.GenerateTraces(1) + _ = traces.ConsumeTraces(context.Background(), td) + }() + } + + // Wait for buffer to fill + time.Sleep(100 * time.Millisecond) + + // Create a context with a short timeout + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + // This send should block (buffer is full) and then return with context deadline exceeded + td := testdata.GenerateTraces(1) + err = traces.ConsumeTraces(ctx, td) + + // Should get context error (buffer was full, send blocked, context timed out) + assert.ErrorIs(t, err, context.DeadlineExceeded) + + // Cleanup + close(blockingConsumer.block) + _ = traces.Shutdown(context.Background()) +} + +// slowTracesSink is a consumer that introduces a delay for each consume call. +type slowTracesSink struct { + delay time.Duration +} + +func (s *slowTracesSink) ConsumeTraces(_ context.Context, _ ptrace.Traces) error { + time.Sleep(s.delay) + return nil +} + +func (s *slowTracesSink) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// blockingTracesSink is a consumer that blocks until the block channel is closed. +type blockingTracesSink struct { + block chan struct{} +} + +func (b *blockingTracesSink) ConsumeTraces(_ context.Context, _ ptrace.Traces) error { + <-b.block + return nil +} + +func (b *blockingTracesSink) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// TestBatchProcessor_ShutdownDuringBlocked_Metrics verifies shutdown behavior for metrics. +func TestBatchProcessor_ShutdownDuringBlocked_Metrics(t *testing.T) { + blockingConsumer := &blockingMetricsSink{block: make(chan struct{})} + + cfg := &Config{ + Timeout: 10 * time.Second, + SendBatchSize: 1, + } + + metrics, err := NewFactory().CreateMetrics( + context.Background(), + processortest.NewNopSettings(metadata.Type), + cfg, + blockingConsumer, + ) + require.NoError(t, err) + require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) + + // Block the shard in export + go func() { + md := testdata.GenerateMetrics(1) + _ = metrics.ConsumeMetrics(context.Background(), md) + }() + time.Sleep(50 * time.Millisecond) + + var wg sync.WaitGroup + for range 50 { + wg.Add(1) + go func() { + defer wg.Done() + md := testdata.GenerateMetrics(1) + _ = metrics.ConsumeMetrics(context.Background(), md) + }() + } + + time.Sleep(100 * time.Millisecond) + + shutdownDone := make(chan struct{}) + go func() { + _ = metrics.Shutdown(context.Background()) + close(shutdownDone) + }() + + time.Sleep(50 * time.Millisecond) + close(blockingConsumer.block) + + select { + case <-shutdownDone: + // Success + case <-time.After(5 * time.Second): + t.Fatal("Shutdown timed out for metrics processor") + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(5 * time.Second): + t.Fatal("Goroutines did not complete for metrics processor") + } + + t.Log("Metrics shutdown completed successfully") +} + +// TestBatchProcessor_ShutdownDuringBlocked_Logs verifies shutdown behavior for logs. +func TestBatchProcessor_ShutdownDuringBlocked_Logs(t *testing.T) { + blockingConsumer := &blockingLogsSink{block: make(chan struct{})} + + cfg := &Config{ + Timeout: 10 * time.Second, + SendBatchSize: 1, + } + + logs, err := NewFactory().CreateLogs( + context.Background(), + processortest.NewNopSettings(metadata.Type), + cfg, + blockingConsumer, + ) + require.NoError(t, err) + require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) + + // Block the shard in export + go func() { + ld := testdata.GenerateLogs(1) + _ = logs.ConsumeLogs(context.Background(), ld) + }() + time.Sleep(50 * time.Millisecond) + + var wg sync.WaitGroup + for range 50 { + wg.Add(1) + go func() { + defer wg.Done() + ld := testdata.GenerateLogs(1) + _ = logs.ConsumeLogs(context.Background(), ld) + }() + } + + time.Sleep(100 * time.Millisecond) + + shutdownDone := make(chan struct{}) + go func() { + _ = logs.Shutdown(context.Background()) + close(shutdownDone) + }() + + time.Sleep(50 * time.Millisecond) + close(blockingConsumer.block) + + select { + case <-shutdownDone: + // Success + case <-time.After(5 * time.Second): + t.Fatal("Shutdown timed out for logs processor") + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(5 * time.Second): + t.Fatal("Goroutines did not complete for logs processor") + } + + t.Log("Logs shutdown completed successfully") +} + +// TestBatchProcessor_ShutdownDuringBlocked_MultiShardBatcher verifies shutdown behavior +// when using metadata keys (multiShardBatcher). +func TestBatchProcessor_ShutdownDuringBlocked_MultiShardBatcher(t *testing.T) { + blockingConsumer := &blockingTracesSink{block: make(chan struct{})} + + cfg := &Config{ + Timeout: 10 * time.Second, + SendBatchSize: 1, + MetadataKeys: []string{"tenant_id"}, // This triggers multiShardBatcher + } + + traces, err := NewFactory().CreateTraces( + context.Background(), + processortest.NewNopSettings(metadata.Type), + cfg, + blockingConsumer, + ) + require.NoError(t, err) + require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) + + // Block each shard's export by sending one item per tenant + for i := range 5 { + go func(tenantID int) { + ctx := client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "tenant_id": {strconv.Itoa(tenantID)}, + }), + }) + td := testdata.GenerateTraces(1) + _ = traces.ConsumeTraces(ctx, td) + }(i) + } + time.Sleep(50 * time.Millisecond) + + var wg sync.WaitGroup + + // Send more data to fill buffers and block + for i := range 50 { + wg.Add(1) + go func(tenantID int) { + defer wg.Done() + ctx := client.NewContext(context.Background(), client.Info{ + Metadata: client.NewMetadata(map[string][]string{ + "tenant_id": {strconv.Itoa(tenantID % 5)}, + }), + }) + td := testdata.GenerateTraces(1) + _ = traces.ConsumeTraces(ctx, td) + }(i) + } + + time.Sleep(100 * time.Millisecond) + + shutdownDone := make(chan struct{}) + go func() { + _ = traces.Shutdown(context.Background()) + close(shutdownDone) + }() + + time.Sleep(50 * time.Millisecond) + close(blockingConsumer.block) + + select { + case <-shutdownDone: + // Success + case <-time.After(5 * time.Second): + t.Fatal("Shutdown timed out for multiShardBatcher") + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + // Success + case <-time.After(5 * time.Second): + t.Fatal("Goroutines did not complete for multiShardBatcher") + } + + t.Log("MultiShardBatcher shutdown completed successfully") +} + +// TestBatchProcessor_NormalOperationAfterFix is a regression test to ensure +// normal operation still works correctly after the shutdown fix. +func TestBatchProcessor_NormalOperationAfterFix(t *testing.T) { + sink := new(consumertest.TracesSink) + + cfg := &Config{ + Timeout: 200 * time.Millisecond, + SendBatchSize: 10, + } + + traces, err := NewFactory().CreateTraces( + context.Background(), + processortest.NewNopSettings(metadata.Type), + cfg, + sink, + ) + require.NoError(t, err) + require.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) + + // Send some data + for range 25 { + td := testdata.GenerateTraces(1) + err := traces.ConsumeTraces(context.Background(), td) + require.NoError(t, err, "Normal consume should not return error") + } + + // Shutdown gracefully + require.NoError(t, traces.Shutdown(context.Background())) + + // Verify all data was processed + assert.Equal(t, 25, sink.SpanCount(), "All spans should be received") +} + +// TestBatchProcessor_ContextCancellation_Metrics verifies context cancellation for metrics. +func TestBatchProcessor_ContextCancellation_Metrics(t *testing.T) { + blockingConsumer := &blockingMetricsSink{block: make(chan struct{})} + + cfg := &Config{ + Timeout: 10 * time.Second, + SendBatchSize: 1, + } + + metrics, err := NewFactory().CreateMetrics( + context.Background(), + processortest.NewNopSettings(metadata.Type), + cfg, + blockingConsumer, + ) + require.NoError(t, err) + require.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) + + // Block the shard in export first + go func() { + md := testdata.GenerateMetrics(1) + _ = metrics.ConsumeMetrics(context.Background(), md) + }() + time.Sleep(50 * time.Millisecond) + + // Fill the buffer + for range 20 { + go func() { + md := testdata.GenerateMetrics(1) + _ = metrics.ConsumeMetrics(context.Background(), md) + }() + } + + time.Sleep(100 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + md := testdata.GenerateMetrics(1) + err = metrics.ConsumeMetrics(ctx, md) + + assert.ErrorIs(t, err, context.DeadlineExceeded) + + close(blockingConsumer.block) + _ = metrics.Shutdown(context.Background()) +} + +// TestBatchProcessor_ContextCancellation_Logs verifies context cancellation for logs. +func TestBatchProcessor_ContextCancellation_Logs(t *testing.T) { + blockingConsumer := &blockingLogsSink{block: make(chan struct{})} + + cfg := &Config{ + Timeout: 10 * time.Second, + SendBatchSize: 1, + } + + logs, err := NewFactory().CreateLogs( + context.Background(), + processortest.NewNopSettings(metadata.Type), + cfg, + blockingConsumer, + ) + require.NoError(t, err) + require.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) + + // Block the shard in export first + go func() { + ld := testdata.GenerateLogs(1) + _ = logs.ConsumeLogs(context.Background(), ld) + }() + time.Sleep(50 * time.Millisecond) + + // Fill the buffer + for range 20 { + go func() { + ld := testdata.GenerateLogs(1) + _ = logs.ConsumeLogs(context.Background(), ld) + }() + } + + time.Sleep(100 * time.Millisecond) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + + ld := testdata.GenerateLogs(1) + err = logs.ConsumeLogs(ctx, ld) + + assert.ErrorIs(t, err, context.DeadlineExceeded) + + close(blockingConsumer.block) + _ = logs.Shutdown(context.Background()) +} + +// slowMetricsSink is a consumer that introduces a delay for each consume call. +type slowMetricsSink struct { + delay time.Duration +} + +func (s *slowMetricsSink) ConsumeMetrics(_ context.Context, _ pmetric.Metrics) error { + time.Sleep(s.delay) + return nil +} + +func (s *slowMetricsSink) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// slowLogsSink is a consumer that introduces a delay for each consume call. +type slowLogsSink struct { + delay time.Duration +} + +func (s *slowLogsSink) ConsumeLogs(_ context.Context, _ plog.Logs) error { + time.Sleep(s.delay) + return nil +} + +func (s *slowLogsSink) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// blockingMetricsSink is a consumer that blocks until the block channel is closed. +type blockingMetricsSink struct { + block chan struct{} +} + +func (b *blockingMetricsSink) ConsumeMetrics(_ context.Context, _ pmetric.Metrics) error { + <-b.block + return nil +} + +func (b *blockingMetricsSink) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + +// blockingLogsSink is a consumer that blocks until the block channel is closed. +type blockingLogsSink struct { + block chan struct{} +} + +func (b *blockingLogsSink) ConsumeLogs(_ context.Context, _ plog.Logs) error { + <-b.block + return nil +} + +func (b *blockingLogsSink) Capabilities() consumer.Capabilities { + return consumer.Capabilities{MutatesData: false} +} + type metadataTracesSink struct { *consumertest.TracesSink