Skip to content
Draft

test #44257

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/logs/internal/parsers/dockerfile/docker_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (p *dockerFileFormat) Parse(msg *message.Message) (*message.Message, error)
msg.Status = status
msg.ParsingExtra.IsPartial = partial
msg.ParsingExtra.Timestamp = log.Time
// Tag the stream (stdout/stderr) for container logs parsed from docker JSON files
if log.Stream == "stdout" || log.Stream == "stderr" {
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.LogSourceTag(log.Stream))
}
return msg, nil
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/logs/internal/parsers/dockerstream/docker_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (p *dockerStreamFormat) SupportsPartialLine() bool {

func parseDockerStream(msg *message.Message, containerID string) (*message.Message, error) {
content := msg.GetContent()
stream := ""
// The format of the message should be :
// [8]byte{STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4}[]byte{OUTPUT}
// If we don't have at the very least 8 bytes we can consider this message can't be parsed.
Expand All @@ -75,6 +76,13 @@ func parseDockerStream(msg *message.Message, containerID string) (*message.Messa
status = message.StatusInfo

} else {
// Derive stream tag from header byte: 1 -> stdout, 2 -> stderr
switch content[0] {
case 1:
stream = "stdout"
case 2:
stream = "stderr"
}

// remove partial headers that are added by docker when the message gets too long
if len(content) > dockerBufferSize {
Expand Down Expand Up @@ -102,6 +110,10 @@ func parseDockerStream(msg *message.Message, containerID string) (*message.Messa
msg.SetContent(content[idx+1:])
msg.Status = status
msg.ParsingExtra.IsPartial = false
// Add a tag for the stream when deducible from the header byte
if stream != "" {
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.LogSourceTag(stream))
}
return msg, nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/logs/internal/parsers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func parseKubernetes(msg *message.Message) (*message.Message, error) {
msg.ParsingExtra = message.ParsingExtra{
IsPartial: isPartial(flag),
}
// Tag the stream (stdout/stderr) so downstream can filter by origin stream.
stream := string(components[1])
if stream == "stdout" || stream == "stderr" {
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.LogSourceTag(stream))
}

// Validate timestamp format. K8s API uses either RFC3339 or RFC3339Nano
// but RFC3339Nano is a superset that can parse both formats.
Expand Down
6 changes: 6 additions & 0 deletions pkg/logs/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,12 @@ func MultiLineSourceTag(source string) string {
return "multiline:" + source
}

// LogSourceTag returns a tag indicating the stream a container log line came from.
// Example values: "logsource:stdout", "logsource:stderr".
func LogSourceTag(stream string) string {
return "logsource:" + stream
}

// IsMRF returns true if the payload should be sent to MRF endpoints.
func (m *Payload) IsMRF() bool {
if len(m.MessageMetas) == 0 {
Expand Down
6 changes: 6 additions & 0 deletions releasenotes/notes/logsource-tag-stderr.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
features:
- |
Container logs now include a `LogSource` tag indicating whether each log
message originated from stdout or stderr. This applies to logs parsed via
Docker and Kubernetes CRI runtimes.
Loading