Skip to content

feat: Adopt encoding extension streaming support for AWS Logs encoding extension#45804

Closed
Kavindu-Dodan wants to merge 2 commits intoopen-telemetry:mainfrom
Kavindu-Dodan:feat/streaming-for-aws-logs
Closed

feat: Adopt encoding extension streaming support for AWS Logs encoding extension#45804
Kavindu-Dodan wants to merge 2 commits intoopen-telemetry:mainfrom
Kavindu-Dodan:feat/streaming-for-aws-logs

Conversation

@Kavindu-Dodan
Copy link
Copy Markdown
Contributor

@Kavindu-Dodan Kavindu-Dodan commented Feb 2, 2026

Description

Adopting encoder extension stream decoding added through #45567

All sub log types under AWS Logs Encoding extension now support streaming.

The table below summarizes streaming support details for each log type, along with the offset tracking mechanism,

Log Type Sub Log Type/Source Offset Tracking
CloudTrail Generic records Number of records processed
CloudTrail Digest record Always 0 (full payload processed)
ELB Access Logs ALB/NLB/CLB Bytes processed
Network Firewall Alert/Flow/TLS Bytes processed
S3 Access Logs - Bytes processed
Subscription filter - Always 0 (full payload processed)
VPC Flow Logs S3 plain text Bytes processed
VPC Flow Logs CloudWatch subscription filter Always 0 (full payload processed)
WAF Logs - Bytes processed

Testing

Dedicated unit tests to validate streaming behaviour.

Documentation

Code docs & usage pattern through tests

@Kavindu-Dodan Kavindu-Dodan force-pushed the feat/streaming-for-aws-logs branch 6 times, most recently from fd6b745 to 48892ff Compare February 10, 2026 22:06
atoulme pushed a commit that referenced this pull request Feb 11, 2026
### Description

This PR introduce interfaces for end-to-end streaming support for
encoding extensions. This PR contains,

- Introduce streaming contracts for encoding extensions
- Abstracted experimental helpers for streaming with a dedicated module
`pkg/xstreamencoding`
- To prove concept, adopting streaming support for
`textencodingextension`

The streaming interface contract is as below,

```go
// LogsDecoder unmarshals logs from a stream, returning one batch per DecodeLogs call.
type LogsDecoder interface {
	// DecodeLogs is expected to be called iteratively to read all derived plog.Logs batches from the stream.
	// The last batch of logs should be returned with a nil error. io.EOF error should follow on the subsequent call.
	DecodeLogs() (plog.Logs, error)
	// OffSet returns the current offset read from the stream.
	// The exact meaning of the offset may vary by decoder (e.g. bytes, lines, records).
	// You may use this value with WithOffset option to resume reading from the same offset when retrying after a failure.
	Offset() int64
}

// LogsDecoderExtension is an extension that unmarshals logs from a stream.
type LogsDecoderExtension interface {
	extension.Extension
	NewLogsDecoder(reader io.Reader, options ...DecoderOption) (LogsDecoder, error)
}

// MetricsDecoder unmarshals metrics from a stream, returning one batch per DecodeMetrics call.
type MetricsDecoder interface {
	// DecodeMetrics is expected to be called iteratively to read all derived pmetric.Metrics batches from the stream.
	// The last batch of metrics should be returned with a nil error. io.EOF error should follow on the subsequent call.
	DecodeMetrics() (pmetric.Metrics, error)
	// OffSet returns the current offset read from the stream.
	// The exact meaning of the offset may vary by decoder (e.g. bytes, lines, records).
	// You may use this value with WithOffset option to resume reading from the same offset when retrying after a failure.
	Offset() int64
}

// MetricsDecoderExtension is an extension that unmarshals metrics from a stream.
type MetricsDecoderExtension interface {
	extension.Extension
	NewMetricsDecoder(reader io.Reader, options ...DecoderOption) (MetricsDecoder, error)
}
```

### Why Streaming  ?

The need of streaming arises when dealing with blobs. For example,
consider S3 objects as a signal source. These blobs can be large. If
there is no end to end streaming, then OTel collector sourcing &
decoding signals from blobs requires high memory allocation.

With end-to-end streaming, the need for high memory goes away. And it
allows OTel collector to process signals in batches and emit them to
downstream consumers.

```mermaid
flowchart TB
    subgraph WITHOUT_STREAMING["Without Streaming "]
        S3A["S3 Blob"]
        OTelA["OTel Collector -  Load entire blob into memory"]
        ConsumerA["Downstream Consumer"]

        S3A --> OTelA --> ConsumerA
    end

    subgraph WITH_STREAMING["With Streaming"]
        S3B["S3 Blob"]
        OTelB["OTel Collector - Stream & process batches"]
        Batch1["Batch 1"]
        Batch2["Batch 2"]
        Batch3["Batch 3"]
        ConsumerB["Downstream Consumer"]

        S3B --> OTelB
        OTelB --> Batch1 --> ConsumerB
        OTelB --> Batch2 --> ConsumerB
        OTelB --> Batch3 --> ConsumerB
    end
```

### Proof of concept: OTel collector as a Lambda

Setup using combined changes of this PR, AWS logs adoption -
#45804
& AWS metrics adoptions -
#45805

- **Setup** : A collector deployment in AWS Lambda with [AWS Lambda
Receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/awslambdareceiver)
- **Challenge** : AWS CloudTrail logs are stored as a JSON. Internally
there's a record array that fits streaming solutionProcess. Further, AWS
Lambda has defined memory limits. In the POC this is set to 512MB
- **Test input**: Predictable S3 blobs with CloudTrail logs from
[Data-gen](https://github.com/Kavindu-Dodan/data-gen)


I ran the test round with CloudTrail logs of 30MB each file. This
translates to around ~30K trails.

#### Without streaming

With no end-to-end streaming, Lambda's max memory consumption is at
**360MB/512MB**.

<details>
<summary> Non streaming memory consumption</summary>
<img width="401" height="547" alt="image"
src="https://github.com/user-attachments/assets/9d647057-928f-4be2-b7b1-1d8bbd164541"
/>
</details>

#### With streaming - using components from this PR

With end to end streaming, memory consumption max was around
**189MB/512MB** This leaves space for spikes which can go upto 50MB per
known CloudTrail S3 file limits.

<details>
<summary> Streaming memory consumption</summary>
<img width="401" height="547" alt="image"
src="https://github.com/user-attachments/assets/a6f3103b-0375-40c4-85bc-dbdf2f32ba19"
/>
</details>

#### POC conclusions

- Streaming consumed 48% less memory
- This is compromising 14% of CPU runtime (this is the tradeoff)

Above shows why streaming is important and can be a good tradeoff to
save costs.

### Link to tracking issue

Fixes
#38780

#### Testing

Updated unit tests

### Documentation

Code level documentation is added with example usage pattern for
`textencodingextension`


___

If reviewing, recommends checking out and see details (`gh pr checkout
45567`)

---------

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>
@Kavindu-Dodan Kavindu-Dodan force-pushed the feat/streaming-for-aws-logs branch 2 times, most recently from 97bbda6 to 3db2d81 Compare February 12, 2026 17:36
@Kavindu-Dodan Kavindu-Dodan marked this pull request as ready for review February 12, 2026 17:42
@Kavindu-Dodan Kavindu-Dodan requested a review from a team as a code owner February 12, 2026 17:42
@github-actions github-actions bot requested a review from constanca-m February 12, 2026 17:42
@Kavindu-Dodan Kavindu-Dodan force-pushed the feat/streaming-for-aws-logs branch 3 times, most recently from 7fd3015 to eb3b5f0 Compare February 12, 2026 18:18
@Kavindu-Dodan Kavindu-Dodan changed the title [WIP] Adopt end to end streaming for AWS Logs encoding feat: Adopt encoding extension streaming support for AWS Logs encoding extension Feb 12, 2026
@Kavindu-Dodan Kavindu-Dodan force-pushed the feat/streaming-for-aws-logs branch 2 times, most recently from 9a34b03 to faefb0e Compare February 13, 2026 15:44
@MichaelKatsoulis
Copy link
Copy Markdown
Contributor

Thanks a lot @Kavindu-Dodan for this. Your code is very consistent along all formats!

return xstreamencoding.NewLogsDecoderAdapter(decodeF, offsetF), nil
}

func (v *vpcFlowLogUnmarshaler) unmarshalPlainTextLogs(reader io.Reader) (plog.Logs, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this dead code? I don't see it being used anymore

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Yes this is no longer used by the interface api contract. However, this was used by the becnhmark test. I migrated benchmark test to use the new implementation & removed this with commit e564f38

@Kavindu-Dodan Kavindu-Dodan force-pushed the feat/streaming-for-aws-logs branch 2 times, most recently from bc5a6f5 to 4f0d8b8 Compare February 17, 2026 21:48
@axw
Copy link
Copy Markdown
Contributor

axw commented Feb 18, 2026

@Kavindu-Dodan it's going to take me a very long time to get through a PR this big. Can you please break it up into a PR per format?

@MichaelKatsoulis
Copy link
Copy Markdown
Contributor

@Kavindu-Dodan it's going to take me a very long time to get through a PR this big. Can you please break it up into a PR per format?

I feel this can be an overkill because there are 7 different formats.
Anyway I believe that all the formats follow the same pattern and that it makes sense to be in one PR.

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

# Conflicts:
#	extension/encoding/awslogsencodingextension/go.mod
#	extension/encoding/awslogsencodingextension/internal/unmarshaler/subscription-filter/unmarshaler.go
@Kavindu-Dodan Kavindu-Dodan force-pushed the feat/streaming-for-aws-logs branch from 4f0d8b8 to 097ebf0 Compare February 18, 2026 15:26
Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>
@Kavindu-Dodan
Copy link
Copy Markdown
Contributor Author

@Kavindu-Dodan it's going to take me a very long time to get through a PR this big. Can you please break it up into a PR per format?

I feel this can be an overkill because there are 7 different formats. Anyway I believe that all the formats follow the same pattern and that it makes sense to be in one PR.

I am of the same opinion of having 7 PRs is tiresome. Also, we have two signals at metrics (PR - #45805) which means a split by signal brings total PR count to 9.

Anyway the core changes are only at unmarshaler.go for each signal. The rest is test additions to validate streaming & supporting test data.

Also, if we has to go with split PRs, this will delay the AWS Lambda adoption (PR - #46188 )

@axw
Copy link
Copy Markdown
Contributor

axw commented Feb 19, 2026

The reality is I will not be able to review a ~3500 line diff any time soon, and definitely less soon than reviewing 7 targeted PRs.

@MichaelKatsoulis
Copy link
Copy Markdown
Contributor

@axw Ok then! I opened PR for CloudTrail and more will follow

@Kavindu-Dodan
Copy link
Copy Markdown
Contributor Author

geekdave pushed a commit to oso-team/opentelemetry-collector-contrib that referenced this pull request Feb 20, 2026
…telemetry#45567)

### Description

This PR introduce interfaces for end-to-end streaming support for
encoding extensions. This PR contains,

- Introduce streaming contracts for encoding extensions
- Abstracted experimental helpers for streaming with a dedicated module
`pkg/xstreamencoding`
- To prove concept, adopting streaming support for
`textencodingextension`

The streaming interface contract is as below,

```go
// LogsDecoder unmarshals logs from a stream, returning one batch per DecodeLogs call.
type LogsDecoder interface {
	// DecodeLogs is expected to be called iteratively to read all derived plog.Logs batches from the stream.
	// The last batch of logs should be returned with a nil error. io.EOF error should follow on the subsequent call.
	DecodeLogs() (plog.Logs, error)
	// OffSet returns the current offset read from the stream.
	// The exact meaning of the offset may vary by decoder (e.g. bytes, lines, records).
	// You may use this value with WithOffset option to resume reading from the same offset when retrying after a failure.
	Offset() int64
}

// LogsDecoderExtension is an extension that unmarshals logs from a stream.
type LogsDecoderExtension interface {
	extension.Extension
	NewLogsDecoder(reader io.Reader, options ...DecoderOption) (LogsDecoder, error)
}

// MetricsDecoder unmarshals metrics from a stream, returning one batch per DecodeMetrics call.
type MetricsDecoder interface {
	// DecodeMetrics is expected to be called iteratively to read all derived pmetric.Metrics batches from the stream.
	// The last batch of metrics should be returned with a nil error. io.EOF error should follow on the subsequent call.
	DecodeMetrics() (pmetric.Metrics, error)
	// OffSet returns the current offset read from the stream.
	// The exact meaning of the offset may vary by decoder (e.g. bytes, lines, records).
	// You may use this value with WithOffset option to resume reading from the same offset when retrying after a failure.
	Offset() int64
}

// MetricsDecoderExtension is an extension that unmarshals metrics from a stream.
type MetricsDecoderExtension interface {
	extension.Extension
	NewMetricsDecoder(reader io.Reader, options ...DecoderOption) (MetricsDecoder, error)
}
```

### Why Streaming  ?

The need of streaming arises when dealing with blobs. For example,
consider S3 objects as a signal source. These blobs can be large. If
there is no end to end streaming, then OTel collector sourcing &
decoding signals from blobs requires high memory allocation.

With end-to-end streaming, the need for high memory goes away. And it
allows OTel collector to process signals in batches and emit them to
downstream consumers.

```mermaid
flowchart TB
    subgraph WITHOUT_STREAMING["Without Streaming "]
        S3A["S3 Blob"]
        OTelA["OTel Collector -  Load entire blob into memory"]
        ConsumerA["Downstream Consumer"]

        S3A --> OTelA --> ConsumerA
    end

    subgraph WITH_STREAMING["With Streaming"]
        S3B["S3 Blob"]
        OTelB["OTel Collector - Stream & process batches"]
        Batch1["Batch 1"]
        Batch2["Batch 2"]
        Batch3["Batch 3"]
        ConsumerB["Downstream Consumer"]

        S3B --> OTelB
        OTelB --> Batch1 --> ConsumerB
        OTelB --> Batch2 --> ConsumerB
        OTelB --> Batch3 --> ConsumerB
    end
```

### Proof of concept: OTel collector as a Lambda

Setup using combined changes of this PR, AWS logs adoption -
open-telemetry#45804
& AWS metrics adoptions -
open-telemetry#45805

- **Setup** : A collector deployment in AWS Lambda with [AWS Lambda
Receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/awslambdareceiver)
- **Challenge** : AWS CloudTrail logs are stored as a JSON. Internally
there's a record array that fits streaming solutionProcess. Further, AWS
Lambda has defined memory limits. In the POC this is set to 512MB
- **Test input**: Predictable S3 blobs with CloudTrail logs from
[Data-gen](https://github.com/Kavindu-Dodan/data-gen)


I ran the test round with CloudTrail logs of 30MB each file. This
translates to around ~30K trails.

#### Without streaming

With no end-to-end streaming, Lambda's max memory consumption is at
**360MB/512MB**.

<details>
<summary> Non streaming memory consumption</summary>
<img width="401" height="547" alt="image"
src="https://github.com/user-attachments/assets/9d647057-928f-4be2-b7b1-1d8bbd164541"
/>
</details>

#### With streaming - using components from this PR

With end to end streaming, memory consumption max was around
**189MB/512MB** This leaves space for spikes which can go upto 50MB per
known CloudTrail S3 file limits.

<details>
<summary> Streaming memory consumption</summary>
<img width="401" height="547" alt="image"
src="https://github.com/user-attachments/assets/a6f3103b-0375-40c4-85bc-dbdf2f32ba19"
/>
</details>

#### POC conclusions

- Streaming consumed 48% less memory
- This is compromising 14% of CPU runtime (this is the tradeoff)

Above shows why streaming is important and can be a good tradeoff to
save costs.

### Link to tracking issue

Fixes
open-telemetry#38780

#### Testing

Updated unit tests

### Documentation

Code level documentation is added with example usage pattern for
`textencodingextension`


___

If reviewing, recommends checking out and see details (`gh pr checkout
45567`)

---------

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants