Skip to content

Commit 6bd4770

Browse files
Introduce end to end streaming support for encoding extensions (#45567)
### Description This PR introduce interfaces for end-to-end streaming support for encoding extensions. This PR contains, - Introduce streaming contracts for encoding extensions - Abstracted experimental helpers for streaming with a dedicated module `pkg/xstreamencoding` - To prove concept, adopting streaming support for `textencodingextension` The streaming interface contract is as below, ```go // LogsDecoder unmarshals logs from a stream, returning one batch per DecodeLogs call. type LogsDecoder interface { // DecodeLogs is expected to be called iteratively to read all derived plog.Logs batches from the stream. // The last batch of logs should be returned with a nil error. io.EOF error should follow on the subsequent call. DecodeLogs() (plog.Logs, error) // OffSet returns the current offset read from the stream. // The exact meaning of the offset may vary by decoder (e.g. bytes, lines, records). // You may use this value with WithOffset option to resume reading from the same offset when retrying after a failure. Offset() int64 } // LogsDecoderExtension is an extension that unmarshals logs from a stream. type LogsDecoderExtension interface { extension.Extension NewLogsDecoder(reader io.Reader, options ...DecoderOption) (LogsDecoder, error) } // MetricsDecoder unmarshals metrics from a stream, returning one batch per DecodeMetrics call. type MetricsDecoder interface { // DecodeMetrics is expected to be called iteratively to read all derived pmetric.Metrics batches from the stream. // The last batch of metrics should be returned with a nil error. io.EOF error should follow on the subsequent call. DecodeMetrics() (pmetric.Metrics, error) // OffSet returns the current offset read from the stream. // The exact meaning of the offset may vary by decoder (e.g. bytes, lines, records). // You may use this value with WithOffset option to resume reading from the same offset when retrying after a failure. Offset() int64 } // MetricsDecoderExtension is an extension that unmarshals metrics from a stream. type MetricsDecoderExtension interface { extension.Extension NewMetricsDecoder(reader io.Reader, options ...DecoderOption) (MetricsDecoder, error) } ``` ### Why Streaming ? The need of streaming arises when dealing with blobs. For example, consider S3 objects as a signal source. These blobs can be large. If there is no end to end streaming, then OTel collector sourcing & decoding signals from blobs requires high memory allocation. With end-to-end streaming, the need for high memory goes away. And it allows OTel collector to process signals in batches and emit them to downstream consumers. ```mermaid flowchart TB subgraph WITHOUT_STREAMING["Without Streaming "] S3A["S3 Blob"] OTelA["OTel Collector - Load entire blob into memory"] ConsumerA["Downstream Consumer"] S3A --> OTelA --> ConsumerA end subgraph WITH_STREAMING["With Streaming"] S3B["S3 Blob"] OTelB["OTel Collector - Stream & process batches"] Batch1["Batch 1"] Batch2["Batch 2"] Batch3["Batch 3"] ConsumerB["Downstream Consumer"] S3B --> OTelB OTelB --> Batch1 --> ConsumerB OTelB --> Batch2 --> ConsumerB OTelB --> Batch3 --> ConsumerB end ``` ### Proof of concept: OTel collector as a Lambda Setup using combined changes of this PR, AWS logs adoption - #45804 & AWS metrics adoptions - #45805 - **Setup** : A collector deployment in AWS Lambda with [AWS Lambda Receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/awslambdareceiver) - **Challenge** : AWS CloudTrail logs are stored as a JSON. Internally there's a record array that fits streaming solutionProcess. Further, AWS Lambda has defined memory limits. In the POC this is set to 512MB - **Test input**: Predictable S3 blobs with CloudTrail logs from [Data-gen](https://github.com/Kavindu-Dodan/data-gen) I ran the test round with CloudTrail logs of 30MB each file. This translates to around ~30K trails. #### Without streaming With no end-to-end streaming, Lambda's max memory consumption is at **360MB/512MB**. <details> <summary> Non streaming memory consumption</summary> <img width="401" height="547" alt="image" src="https://github.com/user-attachments/assets/9d647057-928f-4be2-b7b1-1d8bbd164541" /> </details> #### With streaming - using components from this PR With end to end streaming, memory consumption max was around **189MB/512MB** This leaves space for spikes which can go upto 50MB per known CloudTrail S3 file limits. <details> <summary> Streaming memory consumption</summary> <img width="401" height="547" alt="image" src="https://github.com/user-attachments/assets/a6f3103b-0375-40c4-85bc-dbdf2f32ba19" /> </details> #### POC conclusions - Streaming consumed 48% less memory - This is compromising 14% of CPU runtime (this is the tradeoff) Above shows why streaming is important and can be a good tradeoff to save costs. ### Link to tracking issue Fixes #38780 #### Testing Updated unit tests ### Documentation Code level documentation is added with example usage pattern for `textencodingextension` ___ If reviewing, recommends checking out and see details (`gh pr checkout 45567`) --------- Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>
1 parent 0252d54 commit 6bd4770

28 files changed

Lines changed: 1004 additions & 12 deletions

.chloggen/config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ components:
179179
- pkg/translator/zipkin
180180
- pkg/winperfcounters
181181
- pkg/xk8stest
182+
- pkg/xstreamencoding
182183
- processor/akamaidetector
183184
- processor/alibabaecsdetector
184185
- processor/attributes
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: extension/encoding
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Introduce streaming support for encoding extensions
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [38780]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

.github/CODEOWNERS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ pkg/translator/splunk/ @open-telemetry
192192
pkg/translator/zipkin/ @open-telemetry/collector-contrib-approvers @MovieStoreGuy @andrzej-stencel @crobert-1
193193
pkg/winperfcounters/ @open-telemetry/collector-contrib-approvers @dashpole @Mrod1598 @alxbl @pjanotti
194194
pkg/xk8stest/ @open-telemetry/collector-contrib-approvers @crobert-1
195+
pkg/xstreamencoding/ @open-telemetry/collector-contrib-approvers @Kavindu-Dodan @axw
195196
processor/attributesprocessor/ @open-telemetry/collector-contrib-approvers @boostchicken
196197
processor/coralogixprocessor/ @open-telemetry/collector-contrib-approvers @crobert-1 @povilasv @iblancasa
197198
processor/cumulativetodeltaprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth

.github/ISSUE_TEMPLATE/beta_stability.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ body:
190190
- pkg/translator/zipkin
191191
- pkg/winperfcounters
192192
- pkg/xk8stest
193+
- pkg/xstreamencoding
193194
- processor/attributes
194195
- processor/coralogix
195196
- processor/cumulativetodelta

.github/ISSUE_TEMPLATE/bug_report.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ body:
193193
- pkg/translator/zipkin
194194
- pkg/winperfcounters
195195
- pkg/xk8stest
196+
- pkg/xstreamencoding
196197
- processor/attributes
197198
- processor/coralogix
198199
- processor/cumulativetodelta

.github/ISSUE_TEMPLATE/feature_request.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ body:
187187
- pkg/translator/zipkin
188188
- pkg/winperfcounters
189189
- pkg/xk8stest
190+
- pkg/xstreamencoding
190191
- processor/attributes
191192
- processor/coralogix
192193
- processor/cumulativetodelta

.github/ISSUE_TEMPLATE/other.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ body:
187187
- pkg/translator/zipkin
188188
- pkg/winperfcounters
189189
- pkg/xk8stest
190+
- pkg/xstreamencoding
190191
- processor/attributes
191192
- processor/coralogix
192193
- processor/cumulativetodelta

.github/ISSUE_TEMPLATE/unmaintained.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ body:
192192
- pkg/translator/zipkin
193193
- pkg/winperfcounters
194194
- pkg/xk8stest
195+
- pkg/xstreamencoding
195196
- processor/attributes
196197
- processor/coralogix
197198
- processor/cumulativetodelta

.github/component_labels.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ pkg/translator/splunk pkg/translator/splunk
173173
pkg/translator/zipkin pkg/translator/zipkin
174174
pkg/winperfcounters pkg/winperfcounters
175175
pkg/xk8stest pkg/xk8stest
176+
pkg/xstreamencoding pkg/xstreamencoding
176177
processor/attributesprocessor processor/attributes
177178
processor/coralogixprocessor processor/coralogix
178179
processor/cumulativetodeltaprocessor processor/cumulativetodelta

extension/encoding/encoding.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,20 @@
44
package encoding // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding"
55

66
import (
7+
"io"
8+
79
"go.opentelemetry.io/collector/extension"
810
"go.opentelemetry.io/collector/pdata/plog"
911
"go.opentelemetry.io/collector/pdata/pmetric"
1012
"go.opentelemetry.io/collector/pdata/pprofile"
1113
"go.opentelemetry.io/collector/pdata/ptrace"
1214
)
1315

16+
const (
17+
defaultFlushBytes = 1024 * 1024 // 1MB
18+
defaultFlushItems = 1000 // 1000 items
19+
)
20+
1421
// LogsMarshalerExtension is an extension that marshals logs.
1522
type LogsMarshalerExtension interface {
1623
extension.Extension
@@ -23,6 +30,23 @@ type LogsUnmarshalerExtension interface {
2330
plog.Unmarshaler
2431
}
2532

33+
// LogsDecoder unmarshals logs from a stream, returning one batch per DecodeLogs call.
34+
type LogsDecoder interface {
35+
// DecodeLogs is expected to be called iteratively to read all derived plog.Logs batches from the stream.
36+
// The last batch of logs should be returned with a nil error. io.EOF error should follow on the subsequent call.
37+
DecodeLogs() (plog.Logs, error)
38+
// Offset returns the offset after the most recent batch read from the stream, or the initial offset.
39+
// The exact meaning of the offset may vary by decoder (e.g. bytes, lines, records).
40+
// You may use this value with WithOffset option to resume reading from the same offset when retrying after a failure.
41+
Offset() int64
42+
}
43+
44+
// LogsDecoderExtension is an extension that unmarshals logs from a stream.
45+
type LogsDecoderExtension interface {
46+
extension.Extension
47+
NewLogsDecoder(reader io.Reader, options ...DecoderOption) (LogsDecoder, error)
48+
}
49+
2650
// MetricsMarshalerExtension is an extension that marshals metrics.
2751
type MetricsMarshalerExtension interface {
2852
extension.Extension
@@ -35,6 +59,23 @@ type MetricsUnmarshalerExtension interface {
3559
pmetric.Unmarshaler
3660
}
3761

62+
// MetricsDecoder unmarshals metrics from a stream, returning one batch per DecodeMetrics call.
63+
type MetricsDecoder interface {
64+
// DecodeMetrics is expected to be called iteratively to read all derived pmetric.Metrics batches from the stream.
65+
// The last batch of metrics should be returned with a nil error. io.EOF error should follow on the subsequent call.
66+
DecodeMetrics() (pmetric.Metrics, error)
67+
// Offset returns the offset after the most recent batch read from the stream, or the initial offset.
68+
// The exact meaning of the offset may vary by decoder (e.g. bytes, lines, records).
69+
// You may use this value with WithOffset option to resume reading from the same offset when retrying after a failure.
70+
Offset() int64
71+
}
72+
73+
// MetricsDecoderExtension is an extension that unmarshals metrics from a stream.
74+
type MetricsDecoderExtension interface {
75+
extension.Extension
76+
NewMetricsDecoder(reader io.Reader, options ...DecoderOption) (MetricsDecoder, error)
77+
}
78+
3879
// TracesMarshalerExtension is an extension that marshals traces.
3980
type TracesMarshalerExtension interface {
4081
extension.Extension
@@ -58,3 +99,53 @@ type ProfilesUnmarshalerExtension interface {
5899
extension.Extension
59100
pprofile.Unmarshaler
60101
}
102+
103+
// DecoderOptions configures the behavior of stream decoding.
104+
// FlushBytes and FlushItems control how often the decoder should flush decoded data from the stream.
105+
// Offset defines the initial stream offset for the stream.
106+
// Use NewDecoderOptions to construct with default options.
107+
type DecoderOptions struct {
108+
FlushBytes int64
109+
FlushItems int64
110+
Offset int64
111+
}
112+
113+
func NewDecoderOptions(opts ...DecoderOption) DecoderOptions {
114+
options := DecoderOptions{
115+
FlushBytes: defaultFlushBytes,
116+
FlushItems: defaultFlushItems,
117+
Offset: 0,
118+
}
119+
120+
for _, o := range opts {
121+
o(&options)
122+
}
123+
return options
124+
}
125+
126+
// DecoderOption defines the functional option for DecoderOptions.
127+
type DecoderOption func(*DecoderOptions)
128+
129+
// WithFlushBytes sets the number of bytes after stream decoder should flush.
130+
// Use WithFlushBytes(0) to disable flushing by byte count.
131+
func WithFlushBytes(b int64) DecoderOption {
132+
return func(o *DecoderOptions) {
133+
o.FlushBytes = b
134+
}
135+
}
136+
137+
// WithFlushItems sets the number of items after stream decoder should flush.
138+
// Use WithFlushItems(0) to disable flushing by item count.
139+
func WithFlushItems(i int64) DecoderOption {
140+
return func(o *DecoderOptions) {
141+
o.FlushItems = i
142+
}
143+
}
144+
145+
// WithOffset defines the initial stream offset for the stream.
146+
// The exact meaning of the offset may vary by decoder (e.g. bytes, lines, records).
147+
func WithOffset(offset int64) DecoderOption {
148+
return func(o *DecoderOptions) {
149+
o.Offset = offset
150+
}
151+
}

0 commit comments

Comments
 (0)