From 6553d72bef1ee61d77ca022c21b8e73b2ff7e34c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=98gouri-yerra=E2=80=99?= Date: Mon, 15 Dec 2025 15:22:05 -0700 Subject: [PATCH 1/2] Tag stderr logs with LogSource --- pkg/logs/internal/parsers/dockerfile/docker_file.go | 4 ++++ .../internal/parsers/dockerstream/docker_stream.go | 13 +++++++++++++ pkg/logs/internal/parsers/kubernetes/kubernetes.go | 5 +++++ pkg/logs/message/message.go | 6 ++++++ 4 files changed, 28 insertions(+) diff --git a/pkg/logs/internal/parsers/dockerfile/docker_file.go b/pkg/logs/internal/parsers/dockerfile/docker_file.go index 7080fc777141ad..98bee5139b8ddf 100644 --- a/pkg/logs/internal/parsers/dockerfile/docker_file.go +++ b/pkg/logs/internal/parsers/dockerfile/docker_file.go @@ -90,6 +90,10 @@ func (p *dockerFileFormat) Parse(msg *message.Message) (*message.Message, error) msg.Status = status msg.ParsingExtra.IsPartial = partial msg.ParsingExtra.Timestamp = log.Time + // Tag the stream (stdout/stderr) for container logs parsed from docker JSON files + if log.Stream == "stdout" || log.Stream == "stderr" { + msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.LogSourceTag(log.Stream)) + } return msg, nil } diff --git a/pkg/logs/internal/parsers/dockerstream/docker_stream.go b/pkg/logs/internal/parsers/dockerstream/docker_stream.go index 7745956bcac91c..16e6216c2c79c1 100644 --- a/pkg/logs/internal/parsers/dockerstream/docker_stream.go +++ b/pkg/logs/internal/parsers/dockerstream/docker_stream.go @@ -56,6 +56,7 @@ func (p *dockerStreamFormat) SupportsPartialLine() bool { func parseDockerStream(msg *message.Message, containerID string) (*message.Message, error) { content := msg.GetContent() + stream := "" // stdout or stderr // The format of the message should be : // [8]byte{STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4}[]byte{OUTPUT} // If we don't have at the very least 8 bytes we can consider this message can't be parsed. @@ -86,6 +87,14 @@ func parseDockerStream(msg *message.Message, containerID string) (*message.Messa msg.Status = status return msg, fmt.Errorf("cannot parse docker message for container %v: message too short after processing", containerID) } + // Before removing the header, capture the stream from the first byte: + // 1 -> stdout, 2 -> stderr + switch content[0] { + case 1: + stream = "stdout" + case 2: + stream = "stderr" + } // remove the header as we don't need it anymore content = content[dockerHeaderLength:] @@ -102,6 +111,10 @@ func parseDockerStream(msg *message.Message, containerID string) (*message.Messa msg.SetContent(content[idx+1:]) msg.Status = status msg.ParsingExtra.IsPartial = false + // Add a tag for the stream when deducible from the header byte + if stream != "" { + msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.LogSourceTag(stream)) + } return msg, nil } diff --git a/pkg/logs/internal/parsers/kubernetes/kubernetes.go b/pkg/logs/internal/parsers/kubernetes/kubernetes.go index 076c3b998488de..c746e43e833fc7 100644 --- a/pkg/logs/internal/parsers/kubernetes/kubernetes.go +++ b/pkg/logs/internal/parsers/kubernetes/kubernetes.go @@ -64,6 +64,11 @@ func parseKubernetes(msg *message.Message) (*message.Message, error) { msg.ParsingExtra = message.ParsingExtra{ IsPartial: isPartial(flag), } + // Tag the stream (stdout/stderr) so downstream can filter by origin stream. + stream := string(components[1]) // stdout or stderr + if stream == "stdout" || stream == "stderr" { // tag the stream so downstream can filter by origin stream. + msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.LogSourceTag(stream)) // add it to rest of tags + } // Validate timestamp format. K8s API uses either RFC3339 or RFC3339Nano // but RFC3339Nano is a superset that can parse both formats. diff --git a/pkg/logs/message/message.go b/pkg/logs/message/message.go index e564441ddfea98..6c137ccae6df8b 100644 --- a/pkg/logs/message/message.go +++ b/pkg/logs/message/message.go @@ -398,6 +398,12 @@ func MultiLineSourceTag(source string) string { return "multiline:" + source } +// LogSourceTag returns a tag indicating the stream a container log line came from. +// Example values: "logsource:stdout", "logsource:stderr". +func LogSourceTag(stream string) string { + return "logsource:" + stream +} + // IsMRF returns true if the payload should be sent to MRF endpoints. func (m *Payload) IsMRF() bool { if len(m.MessageMetas) == 0 { From b6a446ec86d5faa48ceace8b3d618de0771bd90a Mon Sep 17 00:00:00 2001 From: Gouri Yerra Date: Tue, 16 Dec 2025 09:13:56 -0700 Subject: [PATCH 2/2] adding whitespace for testing --- pkg/logs/internal/parsers/dockerstream/docker_stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logs/internal/parsers/dockerstream/docker_stream.go b/pkg/logs/internal/parsers/dockerstream/docker_stream.go index 16e6216c2c79c1..24f16465b8c85d 100644 --- a/pkg/logs/internal/parsers/dockerstream/docker_stream.go +++ b/pkg/logs/internal/parsers/dockerstream/docker_stream.go @@ -116,7 +116,7 @@ func parseDockerStream(msg *message.Message, containerID string) (*message.Messa msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.LogSourceTag(stream)) } return msg, nil -} +} // getDockerSeverity returns the status of the message based on the value of the // STREAM_TYPE byte in the header. STREAM_TYPE can be 1 for stdout and 2 for