Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 166 additions & 88 deletions internal/component/loki/process/stages/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"regexp"
"slices"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/push"

"github.com/grafana/alloy/internal/component/common/loki"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/loki/pkg/push"
)

// Configuration errors.
Expand Down Expand Up @@ -68,22 +68,18 @@ func validateMultilineConfig(cfg MultilineConfig) (*regexp.Regexp, error) {

// multilineStage matches lines to determine whether the following lines belong to a block and should be collapsed
type multilineStage struct {
logger log.Logger
cfg MultilineConfig
regex *regexp.Regexp
logger log.Logger
cfg MultilineConfig
regex *regexp.Regexp
streams map[model.Fingerprint]*multilineState
}

// multilineState captures the internal state of a running multiline stage.
type multilineState struct {
buffer *bytes.Buffer // The lines of the current multiline block.
startLineEntry Entry // The entry of the start line of a multiline block.
currentLines uint64 // The number of lines of the current multiline block.
}

func (s *multilineState) Reset() {
// We don't reset startLineEntry here to keep old behaviour.
s.buffer.Reset()
s.currentLines = 0
buffer *bytes.Buffer
startLineEntry Entry // The entry of the start line of a multiline block.
currentLines uint64 // The number of lines of the current multiline block.
lastSeen time.Time
}

// newMultilineStage creates a MulitlineStage from config
Expand All @@ -94,9 +90,10 @@ func newMultilineStage(logger log.Logger, config MultilineConfig) (Stage, error)
}

return &multilineStage{
logger: log.With(logger, "component", "stage", "type", "multiline"),
cfg: config,
regex: regex,
logger: log.With(logger, "component", "stage", "type", "multiline"),
cfg: config,
regex: regex,
streams: make(map[model.Fingerprint]*multilineState),
}, nil
}

Expand All @@ -105,97 +102,177 @@ func (m *multilineStage) Run(in chan Entry) chan Entry {
go func() {
defer close(out)

streams := make(map[model.Fingerprint]chan Entry)
wg := new(sync.WaitGroup)

for e := range in {
key := e.Labels.FastFingerprint()
s, ok := streams[key]
if !ok {
// Pass through entries until we hit first start line.
if !m.regex.MatchString(e.Line) {
level.Debug(m.logger).Log("msg", "pass through entry", "stream", key)
out <- e
continue
}
// timer fires at the earliest per-stream deadline (lastSeen + MaxWaitTime).
// Start it stopped; it is armed on the first entry that starts a block.
timer := time.NewTimer(0)
if !timer.Stop() {
<-timer.C
}

level.Debug(m.logger).Log("msg", "creating new stream", "stream", key)
s = make(chan Entry)
streams[key] = s
// nearestDeadline tracks the earliest active stream deadline so that
// we can easily update the timer if the incoming entry has a newer deadline.
var nearestDeadline time.Time

armTimer := func(deadline time.Time) {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
if d := time.Until(deadline); d > 0 {
timer.Reset(d)
} else {
timer.Reset(0)
}
nearestDeadline = deadline
}

wg.Add(1)
go m.runMultiline(s, out, wg)
// rescanDeadline rescans all streams to find the new nearest deadline
// after a flush removes a stream from contention. We include streams with
// currentLines==0 (flushed at max_lines) so they are cleaned up by the
// timer even when idle.
rescanDeadline := func() {
nearestDeadline = time.Time{}
for _, state := range m.streams {
if dl := state.lastSeen.Add(m.cfg.MaxWaitTime); nearestDeadline.IsZero() || dl.Before(nearestDeadline) {
nearestDeadline = dl
}
}
if nearestDeadline.IsZero() {
return // no streams; leave timer stopped
}
level.Debug(m.logger).Log("msg", "pass entry", "stream", key, "line", e.Line)
s <- e
armTimer(nearestDeadline)
}

// Close all streams and wait for them to finish being processed.
for _, s := range streams {
close(s)
for {
select {
case e, ok := <-in:
if !ok {
// Flush all per-stream buffers when the input closes.
for _, state := range m.streams {
if state.currentLines > 0 {
out <- m.flushState(state)
}
}
m.streams = nil
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timer is created for the Run loop but never stopped on the early return when in closes. Stopping it before returning avoids keeping a runtime timer around unnecessarily until it fires/gets GC'd.

Suggested change
m.streams = nil
m.streams = nil
timer.Stop()

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is worth it to fix

timer.Stop()
return
}
// Capture the stream key before emitting entries downstream.
// A downstream stage goroutine may mutate e.Labels concurrently
// once the entry is sent on out, which would race with a
// post-emit FastFingerprint() call.
key := e.Labels.FastFingerprint()
for _, r := range m.processEntry(e) {
out <- r
}
// Arm the timer for any stream that now has the earliest deadline,
// including streams where currentLines==0 (just hit max_lines) so
// the timer fires to remove them if they subsequently go idle.
if m.streams[key] != nil {
if dl := m.streams[key].lastSeen.Add(m.cfg.MaxWaitTime); nearestDeadline.IsZero() || dl.Before(nearestDeadline) {
armTimer(dl)
}
}
Comment on lines +170 to +177
Copy link

Copilot AI Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timer is only re-armed when an entry creates an earlier deadline. If the stream that currently defines nearestDeadline receives new entries (pushing its deadline later), the timer won't be extended and will still fire at the old deadline, causing periodic full-map scans even when no streams are actually expiring. Consider tracking which stream owns nearestDeadline (or rescanning when the owning stream is updated) so the timer can be pushed out when the earliest deadline moves later.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is worth it to fix

case <-timer.C:
nearestDeadline = time.Time{}
// Remove every stream whose deadline has been reached. Flush its
// buffer if it has accumulated lines; streams with currentLines==0
// (flushed at max_lines and then gone idle) are deleted.
now := time.Now()
for key, state := range m.streams {
if !state.lastSeen.Add(m.cfg.MaxWaitTime).After(now) {
if state.currentLines > 0 {
if Debug {
level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.MaxWaitTime), "block", state.buffer.String())
}
out <- m.flushState(state)
}
delete(m.streams, key)
}
}
rescanDeadline()
}
}
wg.Wait()
}()
return out
}

func (m *multilineStage) runMultiline(in chan Entry, out chan Entry, wg *sync.WaitGroup) {
defer wg.Done()

state := &multilineState{
buffer: new(bytes.Buffer),
currentLines: 0,
// processEntry processes a single entry synchronously, returning any entries
// ready to emit. Before the first start line is seen for a stream, non-start
// lines are passed through unchanged. Once a stream is started, all lines are
// accumulated
func (m *multilineStage) processEntry(e Entry) []Entry {
key := e.Labels.FastFingerprint()
if m.streams == nil {
m.streams = make(map[model.Fingerprint]*multilineState)
}
state, hasState := m.streams[key]

for {
select {
case <-time.After(m.cfg.MaxWaitTime):
level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.MaxWaitTime), "block", state.buffer.String())
m.flush(out, state)
case e, ok := <-in:
level.Debug(m.logger).Log("msg", "processing line", "line", e.Line, "stream", e.Labels.FastFingerprint())

if !ok {
level.Debug(m.logger).Log("msg", "flush multiline block because inbound closed", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint())
m.flush(out, state)
return
}
var out []Entry

isFirstLine := m.regex.MatchString(e.Line)
if isFirstLine {
level.Debug(m.logger).Log("msg", "flush multiline block because new start line", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint())
m.flush(out, state)
// flush stale block before processing new entry.
if hasState && state.currentLines > 0 && time.Since(state.lastSeen) >= m.cfg.MaxWaitTime {
if Debug {
level.Debug(m.logger).Log("msg", fmt.Sprintf("flush multiline block due to %v timeout", m.cfg.MaxWaitTime), "block", state.buffer.String())
}
out = append(out, m.flushState(state))
}

// The start line entry is used to set timestamp and labels in the flush method.
// The timestamps for following lines are ignored for now.
state.startLineEntry = e
isFirstLine := m.regex.MatchString(e.Line)
if !hasState {
// Pass through entries until the first start line for this stream.
if !isFirstLine {
if Debug {
level.Debug(m.logger).Log("msg", "pass through entry", "stream", key)
}
return append(out, e)
}
state = &multilineState{buffer: new(bytes.Buffer)}
m.streams[key] = state
}

// Append block line
if state.buffer.Len() > 0 {
state.buffer.WriteRune('\n')
}
// Stream is active: flush current block if a new start line arrived.
if isFirstLine && state.currentLines > 0 {
if Debug {
level.Debug(m.logger).Log("msg", "flush multiline block because new start line", "block", state.buffer.String(), "stream", key)
}
out = append(out, m.flushState(state))
}
// startLineEntry is only updated on start lines; it is intentionally
// preserved across max_lines flushes to match the original behaviour.
if isFirstLine {
state.startLineEntry = e
}

line := e.Line
if m.cfg.TrimNewlines {
line = strings.TrimRight(line, "\r\n")
}
state.buffer.WriteString(line)
state.currentLines++
if Debug {
level.Debug(m.logger).Log("msg", "processing line", "line", e.Line, "stream", key)
}

if state.currentLines == m.cfg.MaxLines {
m.flush(out, state)
}
}
// Append line to buffer.
if state.buffer.Len() > 0 {
state.buffer.WriteRune('\n')
}
}
line := e.Line
if m.cfg.TrimNewlines {
line = strings.TrimRight(line, "\r\n")
}
state.buffer.WriteString(line)
state.currentLines++
state.lastSeen = time.Now()

func (m *multilineStage) flush(out chan Entry, s *multilineState) {
if s.buffer.Len() == 0 {
level.Debug(m.logger).Log("msg", "nothing to flush", "buffer_len", s.buffer.Len())
return
if state.currentLines == m.cfg.MaxLines {
out = append(out, m.flushState(state))
}

return out
}

// flushState collapses the accumulated block into a single entry and resets
// the line counter and buffer. startLineEntry is intentionally not reset so
// that subsequent lines (before the next start line) inherit its metadata.
func (m *multilineStage) flushState(s *multilineState) Entry {
// copy extracted data.
extracted := make(map[string]any, len(s.startLineEntry.Extracted))
for k, v := range s.startLineEntry.Extracted {
Expand All @@ -210,9 +287,10 @@ func (m *multilineStage) flush(out chan Entry, s *multilineState) {
}),
}

s.Reset()
s.buffer.Reset()
s.currentLines = 0

out <- collapsed
return collapsed
}

// Cleanup implements Stage.
Expand Down
Loading
Loading