Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/logs/internal/parsers/dockerfile/docker_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ func (p *dockerFileFormat) Parse(msg *message.Message) (*message.Message, error)
msg.Status = status
msg.ParsingExtra.IsPartial = partial
msg.ParsingExtra.Timestamp = log.Time
// Tag the stream (stdout/stderr) for container logs parsed from docker JSON files
if log.Stream == "stdout" || log.Stream == "stderr" {
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.LogSourceTag(log.Stream))
}
return msg, nil
}

Expand Down
15 changes: 14 additions & 1 deletion pkg/logs/internal/parsers/dockerstream/docker_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (p *dockerStreamFormat) SupportsPartialLine() bool {

func parseDockerStream(msg *message.Message, containerID string) (*message.Message, error) {
content := msg.GetContent()
stream := "" // stdout or stderr
// The format of the message should be :
// [8]byte{STREAM_TYPE, 0, 0, 0, SIZE1, SIZE2, SIZE3, SIZE4}[]byte{OUTPUT}
// If we don't have at the very least 8 bytes we can consider this message can't be parsed.
Expand Down Expand Up @@ -86,6 +87,14 @@ func parseDockerStream(msg *message.Message, containerID string) (*message.Messa
msg.Status = status
return msg, fmt.Errorf("cannot parse docker message for container %v: message too short after processing", containerID)
}
// Before removing the header, capture the stream from the first byte:
// 1 -> stdout, 2 -> stderr
switch content[0] {
case 1:
stream = "stdout"
case 2:
stream = "stderr"
}
// remove the header as we don't need it anymore
content = content[dockerHeaderLength:]

Expand All @@ -102,8 +111,12 @@ func parseDockerStream(msg *message.Message, containerID string) (*message.Messa
msg.SetContent(content[idx+1:])
msg.Status = status
msg.ParsingExtra.IsPartial = false
// Add a tag for the stream when deducible from the header byte
if stream != "" {
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.LogSourceTag(stream))
}
return msg, nil
}
}

// getDockerSeverity returns the status of the message based on the value of the
// STREAM_TYPE byte in the header. STREAM_TYPE can be 1 for stdout and 2 for
Expand Down
5 changes: 5 additions & 0 deletions pkg/logs/internal/parsers/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ func parseKubernetes(msg *message.Message) (*message.Message, error) {
msg.ParsingExtra = message.ParsingExtra{
IsPartial: isPartial(flag),
}
// Tag the stream (stdout/stderr) so downstream can filter by origin stream.
stream := string(components[1]) // stdout or stderr
if stream == "stdout" || stream == "stderr" { // tag the stream so downstream can filter by origin stream.
msg.ParsingExtra.Tags = append(msg.ParsingExtra.Tags, message.LogSourceTag(stream)) // add it to rest of tags
}

// Validate timestamp format. K8s API uses either RFC3339 or RFC3339Nano
// but RFC3339Nano is a superset that can parse both formats.
Expand Down
6 changes: 6 additions & 0 deletions pkg/logs/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,12 @@ func MultiLineSourceTag(source string) string {
return "multiline:" + source
}

// LogSourceTag returns a tag indicating the stream a container log line came from.
// Example values: "logsource:stdout", "logsource:stderr".
func LogSourceTag(stream string) string {
return "logsource:" + stream
}

// IsMRF returns true if the payload should be sent to MRF endpoints.
func (m *Payload) IsMRF() bool {
if len(m.MessageMetas) == 0 {
Expand Down
Loading