@@ -15,42 +15,29 @@ import (
1515 "go.opentelemetry.io/collector/pdata/plog"
1616 conventions "go.opentelemetry.io/otel/semconv/v1.38.0"
1717
18+ "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding"
1819 "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension/internal/constants"
1920 "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension/internal/metadata"
2021 "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awslogsencodingextension/internal/unmarshaler"
22+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xstreamencoding"
2123)
2224
25+ const ctrlMessageType = "CONTROL_MESSAGE"
26+
2327var (
2428 errEmptyOwner = errors .New ("cloudwatch log with message type 'DATA_MESSAGE' has empty owner field" )
2529 errEmptyLogGroup = errors .New ("cloudwatch log with message type 'DATA_MESSAGE' has empty log group field" )
2630 errEmptyLogStream = errors .New ("cloudwatch log with message type 'DATA_MESSAGE' has empty log stream field" )
2731)
2832
29- func validateLog (log cloudwatchLogsData ) error {
30- switch log .MessageType {
31- case "DATA_MESSAGE" :
32- if log .Owner == "" {
33- return errEmptyOwner
34- }
35- if log .LogGroup == "" {
36- return errEmptyLogGroup
37- }
38- if log .LogStream == "" {
39- return errEmptyLogStream
40- }
41- case "CONTROL_MESSAGE" :
42- default :
43- return fmt .Errorf ("cloudwatch log has invalid message type %q" , log .MessageType )
44- }
45- return nil
46- }
33+ var _ unmarshaler.StreamingLogsUnmarshaler = (* SubscriptionFilterUnmarshaler )(nil )
4734
48- type subscriptionFilterUnmarshaler struct {
35+ type SubscriptionFilterUnmarshaler struct {
4936 buildInfo component.BuildInfo
5037}
5138
52- func NewSubscriptionFilterUnmarshaler (buildInfo component.BuildInfo ) unmarshaler. AWSUnmarshaler {
53- return & subscriptionFilterUnmarshaler {
39+ func NewSubscriptionFilterUnmarshaler (buildInfo component.BuildInfo ) * SubscriptionFilterUnmarshaler {
40+ return & SubscriptionFilterUnmarshaler {
5441 buildInfo : buildInfo ,
5542 }
5643}
@@ -61,36 +48,97 @@ func NewSubscriptionFilterUnmarshaler(buildInfo component.BuildInfo) unmarshaler
6148// logs are further grouped by their extracted account ID and region.
6249// Logs are assumed to be gzip-compressed as specified at
6350// https://docs.aws.amazon.com/firehose/latest/dev/writing-with-cloudwatch-logs.html.
64- func (f * subscriptionFilterUnmarshaler ) UnmarshalAWSLogs (reader io.Reader ) (plog.Logs , error ) {
65- logs := plog .NewLogs ()
66- resourceLogsByKey := make (map [resourceGroupKey ]plog.LogRecordSlice )
51+ func (f * SubscriptionFilterUnmarshaler ) UnmarshalAWSLogs (reader io.Reader ) (plog.Logs , error ) {
52+ // Decode as a stream but flush all at once using flush options
53+ streamUnmarshaler , err := f .NewLogsDecoder (reader , encoding .WithFlushItems (0 ), encoding .WithFlushBytes (0 ))
54+ if err != nil {
55+ return plog.Logs {}, err
56+ }
57+ logs , err := streamUnmarshaler .DecodeLogs ()
58+ if err != nil {
59+ // we must check for EOF with direct comparison and avoid wrapped EOF that can come from stream itself
60+ //nolint:errorlint
61+ if err == io .EOF {
62+ // EOF indicates no logs were found, return any logs that's available
63+ return logs , nil
64+ }
65+
66+ return plog.Logs {}, err
67+ }
6768
69+ return logs , nil
70+ }
71+
72+ // NewLogsDecoder returns a LogsDecoder that processes CloudWatch Logs subscription filter events.
73+ // Supported sub formats:
74+ // - DATA_MESSAGE: Returns logs grouped by owner, log group, and stream; offset is the number of records processed
75+ // - CONTROL_MESSAGE: Returns empty log; offset is the number of records processed
76+ func (f * SubscriptionFilterUnmarshaler ) NewLogsDecoder (reader io.Reader , options ... encoding.DecoderOption ) (encoding.LogsDecoder , error ) {
77+ batchHelper := xstreamencoding .NewBatchHelper (options ... )
6878 decoder := gojson .NewDecoder (reader )
69- for decoder .More () {
70- var cwLog cloudwatchLogsData
71- if err := decoder .Decode (& cwLog ); err != nil {
72- return plog.Logs {}, fmt .Errorf ("failed to decode decompressed reader: %w" , err )
73- }
7479
75- if cwLog .MessageType == "CONTROL_MESSAGE" {
76- continue
77- }
80+ var offset int64
7881
79- if err := validateLog (cwLog ); err != nil {
80- return plog.Logs {}, fmt .Errorf ("invalid cloudwatch log: %w" , err )
81- }
82+ if batchHelper .Options ().Offset > 0 {
83+ for offset < batchHelper .Options ().Offset {
84+ if ! decoder .More () {
85+ return nil , fmt .Errorf ("EOF reached before offset %d records were discarded" , batchHelper .Options ().Offset )
86+ }
8287
83- f .appendLogs (logs , resourceLogsByKey , cwLog )
88+ var raw gojson.RawMessage
89+ if err := decoder .Decode (& raw ); err != nil {
90+ return nil , err
91+ }
92+ offset ++
93+ }
8494 }
8595
86- return logs , nil
96+ return xstreamencoding .NewLogsDecoderAdapter (
97+ func () (plog.Logs , error ) {
98+ logs := plog .NewLogs ()
99+ resourceLogsByKey := make (map [resourceGroupKey ]plog.LogRecordSlice )
100+
101+ for decoder .More () {
102+ var cwLog cloudwatchLogsData
103+ if err := decoder .Decode (& cwLog ); err != nil {
104+ return plog.Logs {}, fmt .Errorf ("failed to decode decompressed reader: %w" , err )
105+ }
106+
107+ offset ++
108+ batchHelper .IncrementItems (1 )
109+
110+ if cwLog .MessageType == ctrlMessageType {
111+ continue
112+ }
113+
114+ if err := validateLog (cwLog ); err != nil {
115+ return plog.Logs {}, fmt .Errorf ("invalid cloudwatch log: %w" , err )
116+ }
117+
118+ f .appendLogs (logs , resourceLogsByKey , cwLog )
119+
120+ if batchHelper .ShouldFlush () {
121+ batchHelper .Reset ()
122+ return logs , nil
123+ }
124+ }
125+
126+ if logs .ResourceLogs ().Len () == 0 {
127+ return plog .NewLogs (), io .EOF
128+ }
129+
130+ return logs , nil
131+ }, func () int64 {
132+ return offset
133+ },
134+ ), nil
87135}
88136
89137// appendLogs appends log records from cwLog into the given plog.Logs, reusing
90138// existing ResourceLogs entries tracked by resourceLogsByKey when possible.
91139// Events are grouped by their extracted fields (account ID + region) and
92140// by log group/stream combination.
93- func (f * subscriptionFilterUnmarshaler ) appendLogs (logs plog.Logs , resourceLogsByKey map [resourceGroupKey ]plog.LogRecordSlice , cwLog cloudwatchLogsData ) {
141+ func (f * SubscriptionFilterUnmarshaler ) appendLogs (logs plog.Logs , resourceLogsByKey map [resourceGroupKey ]plog.LogRecordSlice , cwLog cloudwatchLogsData ) {
94142 for _ , event := range cwLog .LogEvents {
95143 key := extractResourceKey (event , cwLog .Owner , cwLog .LogGroup , cwLog .LogStream )
96144
@@ -141,3 +189,22 @@ func extractResourceKey(event cloudwatchLogsLogEvent, owner, logGroup, logStream
141189 }
142190 return key
143191}
192+
193+ func validateLog (log cloudwatchLogsData ) error {
194+ switch log .MessageType {
195+ case "DATA_MESSAGE" :
196+ if log .Owner == "" {
197+ return errEmptyOwner
198+ }
199+ if log .LogGroup == "" {
200+ return errEmptyLogGroup
201+ }
202+ if log .LogStream == "" {
203+ return errEmptyLogStream
204+ }
205+ case ctrlMessageType :
206+ default :
207+ return fmt .Errorf ("cloudwatch log has invalid message type %q" , log .MessageType )
208+ }
209+ return nil
210+ }
0 commit comments