-
Notifications
You must be signed in to change notification settings - Fork 3.5k
[processor/interval] Feature flush buffer on shutdown #47238
Description
Component(s)
processor/interval
Is your feature request related to a problem? Please describe.
Description
The interval processor silently drops all metrics accumulated in its in-memory buffer when it receives a shutdown signal. Unlike the batch processor in core (processor/batchprocessor), which drains and flushes its buffer before exiting, the interval processor simply cancels its context and returns.
Current behavior
In processor.go, the Shutdown method only cancels the context:
func (p *intervalProcessor) Shutdown(_ context.Context) error {
p.cancel()
return nil
}The background goroutine in Start receives the cancellation, stops the ticker, and returns, without exporting the buffered p.md:
case <-p.ctx.Done():
exportTicker.Stop()
returnAny metrics accumulated since the last interval export are lost.
Proposed behavior
On shutdown, the processor should flush its remaining buffered metrics to the next consumer before exiting, consistent with how the batch processor in core handles this:
// From processor/batchprocessor/batch_processor.go (startLoop)
case <-b.processor.shutdownC:
// drain remaining items...
if b.batch.itemCount() > 0 {
b.sendItems(triggerTimeout)
}
returnThe logdedupprocessor in contrib also follows this pattern — it calls exportLogs on ctx.Done() and uses a sync.WaitGroup so Shutdown waits for the flush to complete.
Impact
This causes data loss in two scenarios:
- Crash/restart: Any metrics buffered in the current interval window are permanently lost.
- Rolling deployments: During a rollout, the old pod's interval processor drops up to one full interval's worth of buffered data. Combined with Kafka consumer group rebalance time and new pod startup, this creates a significant data gap.
This is particularly impactful for pipelines that use the interval processor with a long aggregation window (e.g., 30s) downstream of a Kafka receiver, where the lost data cannot be re-delivered if offsets were already committed.
Describe the solution you'd like
Proposed fix
- Add a
sync.WaitGroupto the processor struct to track the background goroutine. - On
ctx.Done(), callexportMetricsbefore returning from the goroutine. Usecontext.Background()(or the shutdown context) for the final export sincep.ctxis already cancelled. - In
Shutdown, cancel the context thenwg.Wait()to ensure the flush completes before returning.
func (p *intervalProcessor) Start(_ context.Context, _ component.Host) error {
exportTicker := time.NewTicker(p.config.Interval)
p.wg.Add(1)
go func() {
defer p.wg.Done()
for {
select {
case <-p.ctx.Done():
exportTicker.Stop()
p.exportMetrics() // flush remaining buffer
return
case <-exportTicker.C:
p.exportMetrics()
}
}
}()
return nil
}
func (p *intervalProcessor) Shutdown(_ context.Context) error {
p.cancel()
p.wg.Wait()
return nil
}Note: exportMetrics currently uses p.ctx to call nextConsumer.ConsumeMetrics. Since p.ctx is cancelled at shutdown, the method would need to accept a context parameter so the final flush uses a valid context.
Describe alternatives you've considered
No response
Additional context
References
opentelemetry-collector/processor/batchprocessor/batch_processor.go— flushes on shutdown (lines 199-215)opentelemetry-collector-contrib/processor/logdedupprocessor/processor.go— flushes onctx.Done()withwg.Wait()inShutdown
Tip
React with 👍 to help prioritize this issue. Please use comments to provide useful context, avoiding +1 or me too, to help us triage it. Learn more here.