Skip to content

Commit d39b5b0

Browse files
committed
fix(batchprocessor): prevent goroutine leak and deadlock during shutdown
Signed-off-by: WHOIM1205 <[email protected]>
1 parent 55399d4 commit d39b5b0

File tree

2 files changed

+604
-5
lines changed

2 files changed

+604
-5
lines changed

processor/batchprocessor/batch_processor.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ import (
3030
// errTooManyBatchers is returned when the MetadataCardinalityLimit has been reached.
3131
var errTooManyBatchers = consumererror.NewPermanent(errors.New("too many batcher metadata-value combinations"))
3232

33+
// errShuttingDown is returned when data is received while the processor is shutting down.
34+
var errShuttingDown = errors.New("batch processor is shutting down")
35+
3336
// batch_processor is a component that accepts spans and metrics, places them
3437
// into batches and sends downstream.
3538
//
@@ -290,9 +293,15 @@ func (sb *singleShardBatcher[T]) start(context.Context) error {
290293
return nil
291294
}
292295

293-
func (sb *singleShardBatcher[T]) consume(_ context.Context, data T) error {
294-
sb.single.newItem <- data
295-
return nil
296+
func (sb *singleShardBatcher[T]) consume(ctx context.Context, data T) error {
297+
select {
298+
case sb.single.newItem <- data:
299+
return nil
300+
case <-ctx.Done():
301+
return ctx.Err()
302+
case <-sb.processor.shutdownC:
303+
return errShuttingDown
304+
}
296305
}
297306

298307
func (sb *singleShardBatcher[T]) currentMetadataCardinality() int {
@@ -362,8 +371,14 @@ func (mb *multiShardBatcher[T]) consume(ctx context.Context, data T) error {
362371
}
363372
mb.lock.Unlock()
364373
}
365-
b.(*shard[T]).newItem <- data
366-
return nil
374+
select {
375+
case b.(*shard[T]).newItem <- data:
376+
return nil
377+
case <-ctx.Done():
378+
return ctx.Err()
379+
case <-mb.processor.shutdownC:
380+
return errShuttingDown
381+
}
367382
}
368383

369384
func (mb *multiShardBatcher[T]) currentMetadataCardinality() int {

0 commit comments

Comments
 (0)