Skip to content

Commit a4350e1

Browse files
Make StreamingLogsUnmarshaler optional
1 parent 31a686b commit a4350e1

3 files changed

Lines changed: 14 additions & 2 deletions

File tree

extension/encoding/awslogsencodingextension/extension.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,5 +255,8 @@ func (e *encodingExtension) UnmarshalLogs(buf []byte) (plog.Logs, error) {
255255
}
256256

257257
func (e *encodingExtension) NewLogsDecoder(reader io.Reader, options ...encoding.DecoderOption) (encoding.LogsDecoder, error) {
258-
return e.unmarshaler.NewLogsDecoder(reader, options...)
258+
if u, ok := e.unmarshaler.(awsunmarshaler.StreamingLogsUnmarshaler); ok {
259+
return u.NewLogsDecoder(reader, options...)
260+
}
261+
return nil, fmt.Errorf("streaming not supported for format %q", e.format)
259262
}

extension/encoding/awslogsencodingextension/internal/unmarshaler/cloudtraillog/unmarshaler.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ type CloudTrailLogUnmarshaler struct {
3333
uIDFeatureEnabled bool
3434
}
3535

36-
var _ unmarshaler.AWSUnmarshaler = (*CloudTrailLogUnmarshaler)(nil)
36+
var (
37+
_ unmarshaler.AWSUnmarshaler = (*CloudTrailLogUnmarshaler)(nil)
38+
_ unmarshaler.StreamingLogsUnmarshaler = (*CloudTrailLogUnmarshaler)(nil)
39+
)
3740

3841
// UserIdentity represents the user identity information in CloudTrail logs
3942
type UserIdentity struct {

extension/encoding/awslogsencodingextension/internal/unmarshaler/unmarshaler.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,13 @@ import (
1111
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding"
1212
)
1313

14+
// AWSUnmarshaler is the base interface for all AWS log format unmarshalers.
1415
type AWSUnmarshaler interface {
1516
UnmarshalAWSLogs(reader io.Reader) (plog.Logs, error)
17+
}
18+
19+
// StreamingLogsUnmarshaler is optionally implemented by unmarshalers that support streaming decoding.
20+
type StreamingLogsUnmarshaler interface {
21+
AWSUnmarshaler
1622
NewLogsDecoder(reader io.Reader, options ...encoding.DecoderOption) (encoding.LogsDecoder, error)
1723
}

0 commit comments

Comments
 (0)