Skip to content

Commit 48892ff

Browse files
committed
Intrduce streaming support for aws logs
Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>
1 parent bc6a7ed commit 48892ff

File tree

65 files changed

+2888
-1453
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+2888
-1453
lines changed

.github/CODEOWNERS

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ extension/encoding/jsonlogencodingextension/ @open-telemetry
110110
extension/encoding/otlpencodingextension/ @open-telemetry/collector-contrib-approvers @dao-jun @VihasMakwana
111111
extension/encoding/skywalkingencodingextension/ @open-telemetry/collector-contrib-approvers @JaredTan95
112112
extension/encoding/textencodingextension/ @open-telemetry/collector-contrib-approvers @MovieStoreGuy @atoulme
113-
extension/encoding/xstream/ @open-telemetry/collector-contrib-approvers @Kavindu-Dodan @axw
114113
extension/encoding/zipkinencodingextension/ @open-telemetry/collector-contrib-approvers @MovieStoreGuy @dao-jun
115114
extension/googleclientauthextension/ @open-telemetry/collector-contrib-approvers @dashpole @aabmass @braydonk @jsuereth @psx95 @ridwanmsharif
116115
extension/headerssetterextension/ @open-telemetry/collector-contrib-approvers @VihasMakwana

.github/ISSUE_TEMPLATE/beta_stability.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@ body:
107107
- extension/encoding/otlpencoding
108108
- extension/encoding/skywalkingencoding
109109
- extension/encoding/textencoding
110-
- extension/encoding/xstream
111110
- extension/encoding/zipkinencoding
112111
- extension/googleclientauth
113112
- extension/headerssetter

.github/ISSUE_TEMPLATE/bug_report.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ body:
110110
- extension/encoding/otlpencoding
111111
- extension/encoding/skywalkingencoding
112112
- extension/encoding/textencoding
113-
- extension/encoding/xstream
114113
- extension/encoding/zipkinencoding
115114
- extension/googleclientauth
116115
- extension/headerssetter

.github/ISSUE_TEMPLATE/feature_request.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ body:
104104
- extension/encoding/otlpencoding
105105
- extension/encoding/skywalkingencoding
106106
- extension/encoding/textencoding
107-
- extension/encoding/xstream
108107
- extension/encoding/zipkinencoding
109108
- extension/googleclientauth
110109
- extension/headerssetter

.github/ISSUE_TEMPLATE/other.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,6 @@ body:
104104
- extension/encoding/otlpencoding
105105
- extension/encoding/skywalkingencoding
106106
- extension/encoding/textencoding
107-
- extension/encoding/xstream
108107
- extension/encoding/zipkinencoding
109108
- extension/googleclientauth
110109
- extension/headerssetter

.github/ISSUE_TEMPLATE/unmaintained.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ body:
109109
- extension/encoding/otlpencoding
110110
- extension/encoding/skywalkingencoding
111111
- extension/encoding/textencoding
112-
- extension/encoding/xstream
113112
- extension/encoding/zipkinencoding
114113
- extension/googleclientauth
115114
- extension/headerssetter

.github/component_labels.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ extension/encoding/jsonlogencodingextension extension/encoding/jsonlogencoding
9191
extension/encoding/otlpencodingextension extension/encoding/otlpencoding
9292
extension/encoding/skywalkingencodingextension extension/encoding/skywalkingencoding
9393
extension/encoding/textencodingextension extension/encoding/textencoding
94-
extension/encoding/xstream extension/encoding/xstream
9594
extension/encoding/zipkinencodingextension extension/encoding/zipkinencoding
9695
extension/googleclientauthextension extension/googleclientauth
9796
extension/headerssetterextension extension/headerssetter

extension/encoding/awslogsencodingextension/extension.go

Lines changed: 58 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,10 @@ func init() {
5555
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/45459"))
5656
}
5757

58-
var _ encoding.LogsUnmarshalerExtension = (*encodingExtension)(nil)
58+
var (
59+
_ encoding.LogsUnmarshalerExtension = (*encodingExtension)(nil)
60+
_ encoding.LogsDecoderExtension = (*encodingExtension)(nil)
61+
)
5962

6063
type encodingExtension struct {
6164
cfg *Config
@@ -70,8 +73,8 @@ func newExtension(cfg *Config, settings extension.Settings) (*encodingExtension,
7073
case constants.FormatCloudWatchLogsSubscriptionFilter, constants.FormatCloudWatchLogsSubscriptionFilterV1:
7174
if cfg.Format == constants.FormatCloudWatchLogsSubscriptionFilterV1 {
7275
settings.Logger.Warn("using old format value. This format will be removed in version 0.138.0.",
73-
zap.String("old_format", string(constants.FormatCloudWatchLogsSubscriptionFilterV1)),
74-
zap.String("new_format", string(constants.FormatCloudWatchLogsSubscriptionFilter)),
76+
zap.String("old_format", constants.FormatCloudWatchLogsSubscriptionFilterV1),
77+
zap.String("new_format", constants.FormatCloudWatchLogsSubscriptionFilter),
7578
)
7679
}
7780
return &encodingExtension{
@@ -81,8 +84,8 @@ func newExtension(cfg *Config, settings extension.Settings) (*encodingExtension,
8184
case constants.FormatVPCFlowLog, constants.FormatVPCFlowLogV1:
8285
if cfg.Format == constants.FormatVPCFlowLogV1 {
8386
settings.Logger.Warn("using old format value. This format will be removed in version 0.138.0.",
84-
zap.String("old_format", string(constants.FormatVPCFlowLogV1)),
85-
zap.String("new_format", string(constants.FormatVPCFlowLog)),
87+
zap.String("old_format", constants.FormatVPCFlowLogV1),
88+
zap.String("new_format", constants.FormatVPCFlowLog),
8689
)
8790
}
8891

@@ -100,8 +103,8 @@ func newExtension(cfg *Config, settings extension.Settings) (*encodingExtension,
100103
case constants.FormatS3AccessLog, constants.FormatS3AccessLogV1:
101104
if cfg.Format == constants.FormatS3AccessLogV1 {
102105
settings.Logger.Warn("using old format value. This format will be removed in version 0.138.0.",
103-
zap.String("old_format", string(constants.FormatS3AccessLogV1)),
104-
zap.String("new_format", string(constants.FormatS3AccessLog)),
106+
zap.String("old_format", constants.FormatS3AccessLogV1),
107+
zap.String("new_format", constants.FormatS3AccessLog),
105108
)
106109
}
107110
return &encodingExtension{
@@ -111,8 +114,8 @@ func newExtension(cfg *Config, settings extension.Settings) (*encodingExtension,
111114
case constants.FormatWAFLog, constants.FormatWAFLogV1:
112115
if cfg.Format == constants.FormatWAFLogV1 {
113116
settings.Logger.Warn("using old format value. This format will be removed in version 0.138.0.",
114-
zap.String("old_format", string(constants.FormatWAFLogV1)),
115-
zap.String("new_format", string(constants.FormatWAFLog)),
117+
zap.String("old_format", constants.FormatWAFLogV1),
118+
zap.String("new_format", constants.FormatWAFLog),
116119
)
117120
}
118121
return &encodingExtension{
@@ -122,8 +125,8 @@ func newExtension(cfg *Config, settings extension.Settings) (*encodingExtension,
122125
case constants.FormatCloudTrailLog, constants.FormatCloudTrailLogV1:
123126
if cfg.Format == constants.FormatCloudTrailLogV1 {
124127
settings.Logger.Warn("using old format value. This format will be removed in version 0.138.0.",
125-
zap.String("old_format", string(constants.FormatCloudTrailLogV1)),
126-
zap.String("new_format", string(constants.FormatCloudTrailLog)),
128+
zap.String("old_format", constants.FormatCloudTrailLogV1),
129+
zap.String("new_format", constants.FormatCloudTrailLog),
127130
)
128131
}
129132
return &encodingExtension{
@@ -135,8 +138,8 @@ func newExtension(cfg *Config, settings extension.Settings) (*encodingExtension,
135138
case constants.FormatELBAccessLog, constants.FormatELBAccessLogV1:
136139
if cfg.Format == constants.FormatELBAccessLogV1 {
137140
settings.Logger.Warn("using old format value. This format will be removed in version 0.138.0.",
138-
zap.String("old_format", string(constants.FormatELBAccessLogV1)),
139-
zap.String("new_format", string(constants.FormatELBAccessLog)),
141+
zap.String("old_format", constants.FormatELBAccessLogV1),
142+
zap.String("new_format", constants.FormatELBAccessLog),
140143
)
141144
}
142145
return &encodingExtension{
@@ -167,37 +170,30 @@ func (*encodingExtension) Shutdown(_ context.Context) error {
167170
return nil
168171
}
169172

170-
func (e *encodingExtension) getGzipReader(buf []byte) (io.Reader, error) {
171-
var err error
172-
gzipReader, ok := e.gzipPool.Get().(*gzip.Reader)
173-
if !ok {
174-
gzipReader, err = gzip.NewReader(bytes.NewReader(buf))
175-
} else {
176-
err = gzipReader.Reset(bytes.NewBuffer(buf))
173+
func (e *encodingExtension) UnmarshalLogs(buf []byte) (plog.Logs, error) {
174+
encodingReader, reader, err := e.getReaderFromFormat(buf)
175+
if err != nil {
176+
return plog.Logs{}, fmt.Errorf("failed to get reader for %q logs: %w", e.format, err)
177177
}
178178

179-
if err != nil {
180-
if gzipReader != nil {
181-
e.gzipPool.Put(gzipReader)
179+
defer func() {
180+
if encodingReader == gzipEncoding {
181+
r := reader.(*gzip.Reader)
182+
_ = r.Close()
183+
e.gzipPool.Put(r)
182184
}
183-
return nil, fmt.Errorf("failed to decompress content: %w", err)
184-
}
185+
}()
185186

186-
return gzipReader, nil
187-
}
187+
logs, err := e.unmarshaler.UnmarshalAWSLogs(reader)
188+
if err != nil {
189+
return plog.Logs{}, fmt.Errorf("failed to unmarshal logs as %q format: %w", e.format, err)
190+
}
188191

189-
// isGzipData checks if the buffer contains gzip-compressed data by examining magic bytes
190-
func isGzipData(buf []byte) bool {
191-
return len(buf) > 2 && buf[0] == 0x1f && buf[1] == 0x8b
192+
return logs, nil
192193
}
193194

194-
// getReaderForData returns the appropriate reader and encoding type based on data format
195-
func (e *encodingExtension) getReaderForData(buf []byte) (string, io.Reader, error) {
196-
if isGzipData(buf) {
197-
reader, err := e.getGzipReader(buf)
198-
return gzipEncoding, reader, err
199-
}
200-
return bytesEncoding, bytes.NewReader(buf), nil
195+
func (e *encodingExtension) NewLogsDecoder(reader io.Reader, options ...encoding.DecoderOption) (encoding.LogsDecoder, error) {
196+
return e.unmarshaler.NewLogsDecoder(reader, options...)
201197
}
202198

203199
func (e *encodingExtension) getReaderFromFormat(buf []byte) (string, io.Reader, error) {
@@ -229,24 +225,35 @@ func (e *encodingExtension) getReaderFromFormat(buf []byte) (string, io.Reader,
229225
}
230226
}
231227

232-
func (e *encodingExtension) UnmarshalLogs(buf []byte) (plog.Logs, error) {
233-
encodingReader, reader, err := e.getReaderFromFormat(buf)
234-
if err != nil {
235-
return plog.Logs{}, fmt.Errorf("failed to get reader for %q logs: %w", e.format, err)
228+
// getReaderForData returns the appropriate reader and encoding type based on data format
229+
func (e *encodingExtension) getReaderForData(buf []byte) (string, io.Reader, error) {
230+
if isGzipData(buf) {
231+
reader, err := e.getGzipReader(buf)
232+
return gzipEncoding, reader, err
236233
}
234+
return bytesEncoding, bytes.NewReader(buf), nil
235+
}
237236

238-
defer func() {
239-
if encodingReader == gzipEncoding {
240-
r := reader.(*gzip.Reader)
241-
_ = r.Close()
242-
e.gzipPool.Put(r)
243-
}
244-
}()
237+
func (e *encodingExtension) getGzipReader(buf []byte) (io.Reader, error) {
238+
var err error
239+
gzipReader, ok := e.gzipPool.Get().(*gzip.Reader)
240+
if !ok {
241+
gzipReader, err = gzip.NewReader(bytes.NewReader(buf))
242+
} else {
243+
err = gzipReader.Reset(bytes.NewBuffer(buf))
244+
}
245245

246-
logs, err := e.unmarshaler.UnmarshalAWSLogs(reader)
247246
if err != nil {
248-
return plog.Logs{}, fmt.Errorf("failed to unmarshal logs as %q format: %w", e.format, err)
247+
if gzipReader != nil {
248+
e.gzipPool.Put(gzipReader)
249+
}
250+
return nil, fmt.Errorf("failed to decompress content: %w", err)
249251
}
250252

251-
return logs, nil
253+
return gzipReader, nil
254+
}
255+
256+
// isGzipData checks if the buffer contains gzip-compressed data by examining magic bytes
257+
func isGzipData(buf []byte) bool {
258+
return len(buf) > 2 && buf[0] == 0x1f && buf[1] == 0x8b
252259
}

extension/encoding/awslogsencodingextension/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.145.0
1010
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.145.0
1111
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.145.0
12+
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xstreamencoding v0.145.0
1213
github.com/stretchr/testify v1.11.1
1314
go.opentelemetry.io/collector/component v1.51.1-0.20260205185216-81bc641f26c0
1415
go.opentelemetry.io/collector/component/componenttest v0.145.1-0.20260205185216-81bc641f26c0
@@ -57,6 +58,8 @@ require (
5758

5859
replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../
5960

61+
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xstreamencoding => ../../../pkg/xstreamencoding
62+
6063
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil
6164

6265
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../../pkg/pdatatest

extension/encoding/awslogsencodingextension/internal/unmarshaler/cloudtraillog/testdata/cloudtrail_log_expected.yaml

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ resourceLogs:
66
stringValue: aws
77
- key: cloud.region
88
value:
9-
stringValue: us-east-1
9+
stringValue: us-west-2
1010
- key: cloud.account.id
1111
value:
12-
stringValue: "123456789012"
12+
stringValue: "111122223333"
1313
scopeLogs:
1414
- logRecords:
1515
- attributes:
@@ -124,29 +124,29 @@ resourceLogs:
124124
values:
125125
- kvlistValue:
126126
values:
127-
- key: instanceId
128-
value:
129-
stringValue: i-EXAMPLEaff4840c22
130-
- key: currentState
127+
- key: previousState
131128
value:
132129
kvlistValue:
133130
values:
134131
- key: code
135132
value:
136-
doubleValue: 0
133+
doubleValue: 80
137134
- key: name
138135
value:
139-
stringValue: pending
140-
- key: previousState
136+
stringValue: stopped
137+
- key: instanceId
138+
value:
139+
stringValue: i-EXAMPLEaff4840c22
140+
- key: currentState
141141
value:
142142
kvlistValue:
143143
values:
144144
- key: code
145145
value:
146-
doubleValue: 80
146+
doubleValue: 0
147147
- key: name
148148
value:
149-
stringValue: stopped
149+
stringValue: pending
150150
- kvlistValue:
151151
values:
152152
- key: instanceId
@@ -166,12 +166,12 @@ resourceLogs:
166166
value:
167167
kvlistValue:
168168
values:
169-
- key: code
170-
value:
171-
doubleValue: 80
172169
- key: name
173170
value:
174171
stringValue: stopped
172+
- key: code
173+
value:
174+
doubleValue: 80
175175
body: {}
176176
timeUnixNano: "1689801448000000000"
177177
- attributes:
@@ -359,9 +359,6 @@ resourceLogs:
359359
value:
360360
kvlistValue:
361361
values:
362-
- key: topicArn
363-
value:
364-
stringValue: arn:aws:sns:us-east-1:123456789012:ExampleSNSTopic
365362
- key: message
366363
value:
367364
stringValue: HIDDEN_DUE_TO_SECURITY_REASONS
@@ -374,6 +371,9 @@ resourceLogs:
374371
- key: messageAttributes
375372
value:
376373
stringValue: HIDDEN_DUE_TO_SECURITY_REASONS
374+
- key: topicArn
375+
value:
376+
stringValue: arn:aws:sns:us-east-1:123456789012:ExampleSNSTopic
377377
- key: aws.response.elements
378378
value:
379379
kvlistValue:
@@ -611,15 +611,15 @@ resourceLogs:
611611
value:
612612
kvlistValue:
613613
values:
614+
- key: MFAUsed
615+
value:
616+
boolValue: true
614617
- key: MobileVersion
615618
value:
616619
stringValue: "No"
617620
- key: LoginTo
618621
value:
619622
stringValue: https://console.aws.amazon.com/console/home?region=us
620-
- key: MFAUsed
621-
value:
622-
boolValue: true
623623
body: {}
624624
timeUnixNano: "1749997800000000000"
625625
- attributes:

0 commit comments

Comments
 (0)