fix(loki.process): Eliminate per-stream goroutines in multiline stage#6036
fix(loki.process): Eliminate per-stream goroutines in multiline stage#6036
Conversation
There was a problem hiding this comment.
Pull request overview
This PR optimizes the loki.process multiline stage by removing per-stream goroutines/time.After usage and replacing it with a single Run-loop goroutine that manages per-stream state and a shared timer to flush on max_wait_time, with additional debug-log guarding to reduce overhead when debug is disabled.
Changes:
- Reworked multiline stage execution to track per-stream state in a shared map and flush via one shared
time.Timer. - Added synchronous
processEntry/flushStateflow and stream cleanup logic to prevent idle-stream state buildup. - Expanded tests and added a benchmark covering debug on/off performance.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| internal/component/loki/process/stages/multiline.go | Replaces per-stream goroutines with a single timer-driven loop and shared per-stream state map. |
| internal/component/loki/process/stages/multiline_test.go | Adds new test cases and a benchmark for multiline behavior and performance. |
| // Arm the timer for any stream that now has the earliest deadline, | ||
| // including streams where currentLines==0 (just hit max_lines) so | ||
| // the timer fires to remove them if they subsequently go idle. | ||
| if key := e.Labels.FastFingerprint(); m.streams[key] != nil { | ||
| if dl := m.streams[key].lastSeen.Add(m.cfg.MaxWaitTime); nearestDeadline.IsZero() || dl.Before(nearestDeadline) { | ||
| armTimer(dl) | ||
| } | ||
| } |
There was a problem hiding this comment.
The timer is only re-armed when an entry creates an earlier deadline. If the stream that currently defines nearestDeadline receives new entries (pushing its deadline later), the timer won't be extended and will still fire at the old deadline, causing periodic full-map scans even when no streams are actually expiring. Consider tracking which stream owns nearestDeadline (or rescanning when the owning stream is updated) so the timer can be pushed out when the earliest deadline moves later.
There was a problem hiding this comment.
I don't think this is worth it to fix
| out <- m.flushState(state) | ||
| } | ||
| } | ||
| m.streams = nil |
There was a problem hiding this comment.
timer is created for the Run loop but never stopped on the early return when in closes. Stopping it before returning avoids keeping a runtime timer around unnecessarily until it fires/gets GC'd.
| m.streams = nil | |
| m.streams = nil | |
| timer.Stop() |
There was a problem hiding this comment.
I don't think this is worth it to fix
Brief description of Pull Request
Replace per-stream goroutines in the
loki.processmultiline stage with a single goroutine and a sharedtime.Timer, reducing allocations by up to 76% and latency by up to 39% under benchmark.Pull Request Details
The multiline stage previously spawned one goroutine per active log stream to handle
max_wait_timetimeouts viatime.After. In environments with many label combinations this creates O(N) goroutines and allocations that grow with cardinality.The new implementation uses a single goroutine with a shared timer set to the earliest per-stream deadline. When it fires, all expired streams are flushed and the timer is rescheduled to the next deadline. Streams are removed from the map after a timeout flush, preventing unbounded map growth for idle streams.
All debug log calls are now wrapped in
if Debug { ... }guards, consistent with thedrop,regex, andlabelsstages, to avoidfmt.Sprintfandbuffer.String()more overhead when debug logging is disabled.Benchmarks
Notes to the Reviewer
Two intentional behavioural changes:
Stale-block flush on entry receipt: Previously a timed-out block was only flushed when the
time.Aftergoroutine fired. The new code also flushes inline inprocessEntrywhen a new entry arrives for a stream that has been idle pastmax_wait_time. The outcome is the same — the block is flushed before the new entry is processed — but the trigger differs.Cross-stream output ordering is now deterministic: The old goroutine-per-stream model had multiple goroutines writing concurrently to
out, so ordering across streams was non-deterministic. The new model serializes all output through a single goroutine, so entries are emitted in arrival order.PR Checklist