Skip to content

Implement streaming in cloudtrail format of awslogsencoding extension#46203

Closed
MichaelKatsoulis wants to merge 10 commits intoopen-telemetry:mainfrom
MichaelKatsoulis:feat/streaming-for-cloudtrail
Closed

Implement streaming in cloudtrail format of awslogsencoding extension#46203
MichaelKatsoulis wants to merge 10 commits intoopen-telemetry:mainfrom
MichaelKatsoulis:feat/streaming-for-cloudtrail

Conversation

@MichaelKatsoulis
Copy link
Copy Markdown
Contributor

@MichaelKatsoulis MichaelKatsoulis commented Feb 19, 2026

Description

Adopting encoder extension stream decoding added through #45567

This PR focuses only on CloudTrail log Format of awslogsencoding extension

Comment on lines +162 to +163
//nolint:errorlint
if err == io.EOF {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you supress this lint error?

Can you replace it with error.Is

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Done

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was there to avoid checking wrapped errors. Here the error get ignored if and only if it's an unwrapped io.EOF. And the expectation is to get this from the streaming implementation for EOF.

And we can get EOF wrapped from the stream itself. We should not ignore those errors and consider them as failures. This is the reason logic to be like this

@constanca-m @MichaelKatsoulis so we need this unwrapped EOF check

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Then can you also add a comment on the code about it? So we don't take it out again

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@constanca-m yup, makes sense to add a comment 💯

Copy link
Copy Markdown
Contributor

@constanca-m constanca-m left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, couldn't find any issues. @Kavindu-Dodan please review as well

@MichaelKatsoulis
Copy link
Copy Markdown
Contributor Author

@Kavindu-Dodan look into this commit a4350e1

I had to make streaming optional for the awslogs_encoding extension because not all the formats implement it for now.

Comment on lines +257 to +262
func (e *encodingExtension) NewLogsDecoder(reader io.Reader, options ...encoding.DecoderOption) (encoding.LogsDecoder, error) {
if u, ok := e.unmarshaler.(awsunmarshaler.StreamingLogsUnmarshaler); ok {
return u.NewLogsDecoder(reader, options...)
}
return nil, fmt.Errorf("streaming not supported for format %q", e.format)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@constanca-m @MichaelKatsoulis Should even add a signal to this PR ? IMO we can further split it to have API contract in this PR and isolate all signals to individual PRs :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #46211, this allows us to build all signal related PRs based on that specific PR. IMO it's easier that way

@Kavindu-Dodan
Copy link
Copy Markdown
Contributor

@MichaelKatsoulis I have opened multiple PR & extracted out the changes as follow,

Based on the streaming contract, I have added follow up signal adoptions,

This allows us to review individual signal with respect to API contract. If we use this PR, then we cannot adopt the streaming for other signals or we have to base other signal work on this PR which is not ideal.

Copy link
Copy Markdown
Contributor

@axw axw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs an update regarding the offset. I suggest we continue in @Kavindu-Dodan's PR, and close this one.

isEOF = true
return record, nil
}
return xstreamencoding.NewLogsDecoderAdapter(decoderF, func() int64 { return 0 }), nil
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning 0 is invalid per the contract:

// Offset returns the offset after the most recent batch read from the stream, or the initial offset.

... though the wording probably needs changing to clarify that "or the initial offset" is only in the case where no data has been flushed yet.

Should we continue this in #46212?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok!

@MichaelKatsoulis
Copy link
Copy Markdown
Contributor Author

Closing this and continue the implementation of adoption in PRs #46203 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants