44package awscloudwatchmetricstreamsencodingextension // import "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awscloudwatchmetricstreamsencodingextension"
55
66import (
7+ "bufio"
8+ "bytes"
79 "encoding/binary"
810 "errors"
911 "fmt"
12+ "io"
1013
1114 "go.opentelemetry.io/collector/component"
1215 "go.opentelemetry.io/collector/pdata/pmetric"
1316 "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
1417
18+ "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding"
1519 "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding/awscloudwatchmetricstreamsencodingextension/internal/metadata"
20+ "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xstreamencoding"
1621)
1722
1823var errInvalidUvarint = errors .New ("invalid OTLP message length: failed to decode varint" )
@@ -24,39 +29,126 @@ type formatOpenTelemetry10Unmarshaler struct {
2429var _ pmetric.Unmarshaler = (* formatOpenTelemetry10Unmarshaler )(nil )
2530
2631func (f * formatOpenTelemetry10Unmarshaler ) UnmarshalMetrics (record []byte ) (pmetric.Metrics , error ) {
27- md := pmetric . NewMetrics ()
28- dataLen , start := len ( record ), 0
29- for start < dataLen {
30- // get size of datum
31- nLen , bytesRead := binary . Uvarint ( record [ start :])
32- if bytesRead <= 0 {
33- return pmetric. Metrics {}, errInvalidUvarint
34- }
35- start += bytesRead
36- end := start + int ( nLen )
37- if end > len ( record ) {
38- return pmetric. Metrics {}, errors . New ( "index out of bounds: length prefix exceeds available bytes in record" )
32+ // Decode as a stream but flush all at once using flush options
33+ decoder , err := f . NewMetricsDecoder ( bytes . NewReader ( record ), encoding . WithFlushBytes ( 0 ), encoding . WithFlushItems ( 0 ))
34+ if err != nil {
35+ return pmetric. Metrics {}, err
36+ }
37+
38+ metrics , err := decoder . DecodeMetrics ()
39+ if err != nil {
40+ //nolint:errorlint
41+ if err == io . EOF {
42+ // EOF indicates no metrics were found, return any metrics that's available
43+ return metrics , nil
3944 }
4045
41- // unmarshal datum
42- req := pmetricotlp .NewExportRequest ()
43- if err := req .UnmarshalProto (record [start :end ]); err != nil {
44- return pmetric.Metrics {}, fmt .Errorf ("unable to unmarshal input: %w" , err )
46+ return pmetric.Metrics {}, err
47+ }
48+
49+ return metrics , nil
50+ }
51+
52+ func (f * formatOpenTelemetry10Unmarshaler ) NewMetricsDecoder (reader io.Reader , options ... encoding.DecoderOption ) (encoding.MetricsDecoder , error ) {
53+ var bufReader * bufio.Reader
54+ if r , ok := reader .(* bufio.Reader ); ok {
55+ bufReader = r
56+ } else {
57+ bufReader = bufio .NewReader (reader )
58+ }
59+
60+ batchHelper := xstreamencoding .NewBatchHelper (options ... )
61+ offSetTracker := batchHelper .Options ().Offset
62+
63+ if offSetTracker > 0 {
64+ _ , err := bufReader .Discard (int (offSetTracker ))
65+ if err != nil {
66+ if errors .Is (err , io .EOF ) {
67+ return nil , fmt .Errorf ("EOF reached before offset %d bytes were discarded" , offSetTracker )
68+ }
69+ return nil , err
4570 }
46- start = end
71+ }
72+
73+ offsetF := func () int64 {
74+ return offSetTracker
75+ }
76+
77+ decodeF := func () (pmetric.Metrics , error ) {
78+ md := pmetric .NewMetrics ()
79+
80+ var buf []byte
81+ for {
82+ peek , err := bufReader .Peek (binary .MaxVarintLen64 )
83+ if err != nil && ! errors .Is (err , io .EOF ) {
84+ return pmetric.Metrics {}, err
85+ }
86+
87+ if len (peek ) == 0 {
88+ // reached EOF
89+ break
90+ }
91+
92+ // Read next toRead bytes to get the length of the next OTLP metric message
93+ toRead , bytesRead := binary .Uvarint (peek )
94+ if bytesRead <= 0 {
95+ return pmetric.Metrics {}, errInvalidUvarint
96+ }
97+
98+ // Skip bytesRead
99+ _ , err = bufReader .Discard (bytesRead )
100+ if err != nil {
101+ return pmetric.Metrics {}, fmt .Errorf ("unable to discard varint: %w" , err )
102+ }
103+
104+ // Reuse buffer, grow only if needed
105+ if cap (buf ) < int (toRead ) {
106+ buf = make ([]byte , toRead )
107+ } else {
108+ buf = buf [:toRead ]
109+ }
110+
111+ // Read the OTLP metric message
112+ _ , err = io .ReadFull (bufReader , buf )
113+ if err != nil {
114+ return pmetric.Metrics {}, fmt .Errorf ("unable to read OTLP metric message: %w" , err )
115+ }
47116
48- // add scope name and build info version to
49- // the resource metrics
50- for i := 0 ; i < req .Metrics ().ResourceMetrics ().Len (); i ++ {
51- rm := req .Metrics ().ResourceMetrics ().At (i )
52- for j := 0 ; j < rm .ScopeMetrics ().Len (); j ++ {
53- sm := rm .ScopeMetrics ().At (j )
54- sm .Scope ().SetName (metadata .ScopeName )
55- sm .Scope ().SetVersion (f .buildInfo .Version )
117+ // unmarshal metric
118+ req := pmetricotlp .NewExportRequest ()
119+ if err := req .UnmarshalProto (buf ); err != nil {
120+ return pmetric.Metrics {}, fmt .Errorf ("unable to unmarshal input: %w" , err )
56121 }
122+
123+ // add scope name and build info version to the resource metrics
124+ for i := 0 ; i < req .Metrics ().ResourceMetrics ().Len (); i ++ {
125+ rm := req .Metrics ().ResourceMetrics ().At (i )
126+ for j := 0 ; j < rm .ScopeMetrics ().Len (); j ++ {
127+ sm := rm .ScopeMetrics ().At (j )
128+ sm .Scope ().SetName (metadata .ScopeName )
129+ sm .Scope ().SetVersion (f .buildInfo .Version )
130+ }
131+ }
132+ req .Metrics ().ResourceMetrics ().MoveAndAppendTo (md .ResourceMetrics ())
133+
134+ advanced := int64 (toRead ) + int64 (bytesRead )
135+ offSetTracker += advanced
136+ batchHelper .IncrementItems (1 )
137+ batchHelper .IncrementBytes (advanced )
138+
139+ if batchHelper .ShouldFlush () {
140+ batchHelper .Reset ()
141+
142+ return md , nil
143+ }
144+ }
145+
146+ if md .ResourceMetrics ().Len () == 0 {
147+ return pmetric .NewMetrics (), io .EOF
57148 }
58- req .Metrics ().ResourceMetrics ().MoveAndAppendTo (md .ResourceMetrics ())
149+
150+ return md , nil
59151 }
60152
61- return md , nil
153+ return xstreamencoding . NewMetricsDecoderAdapter ( decodeF , offsetF ) , nil
62154}
0 commit comments