6
6
"bytes"
7
7
"compress/gzip"
8
8
"context"
9
+ "fmt"
9
10
"io"
10
11
"net/http"
11
12
"time"
@@ -71,7 +72,92 @@ func OTLPHandler(
71
72
) http.Handler {
72
73
discardedDueToOtelParseError := validation .DiscardedSamplesCounter (reg , otelParseError )
73
74
74
- return otlpHandler (maxRecvMsgSize , requestBufferPool , sourceIPs , retryCfg , push , logger , func (ctx context.Context , r * http.Request , maxRecvMsgSize int , buffers * util.RequestBuffers , req * mimirpb.PreallocWriteRequest , logger log.Logger ) error {
75
+ return http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
76
+ ctx := r .Context ()
77
+ logger := utillog .WithContext (ctx , logger )
78
+ if sourceIPs != nil {
79
+ source := sourceIPs .Get (r )
80
+ if source != "" {
81
+ logger = utillog .WithSourceIPs (source , logger )
82
+ }
83
+ }
84
+
85
+ otlpConverter := newOTLPMimirConverter ()
86
+
87
+ parser := newOTLPParser (limits , resourceAttributePromotionConfig , otlpConverter , enableStartTimeQuietZero , pushMetrics , discardedDueToOtelParseError )
88
+
89
+ supplier := func () (* mimirpb.WriteRequest , func (), error ) {
90
+ rb := util .NewRequestBuffers (requestBufferPool )
91
+ var req mimirpb.PreallocWriteRequest
92
+ if err := parser (ctx , r , maxRecvMsgSize , rb , & req , logger ); err != nil {
93
+ // Check for httpgrpc error, default to client error if parsing failed
94
+ if _ , ok := httpgrpc .HTTPResponseFromError (err ); ! ok {
95
+ err = httpgrpc .Error (http .StatusBadRequest , err .Error ())
96
+ }
97
+
98
+ rb .CleanUp ()
99
+ return nil , nil , err
100
+ }
101
+
102
+ cleanup := func () {
103
+ mimirpb .ReuseSlice (req .Timeseries )
104
+ rb .CleanUp ()
105
+ }
106
+ return & req .WriteRequest , cleanup , nil
107
+ }
108
+ req := newRequest (supplier )
109
+
110
+ pushErr := push (ctx , req )
111
+ if pushErr == nil {
112
+ // Push was successful, but OTLP converter left out some samples. We let the client know about it by replying with 4xx (and an insight log).
113
+ if otlpErr := otlpConverter .Err (); otlpErr != nil {
114
+ pushErr = httpgrpc .Error (http .StatusBadRequest , otlpErr .Error ())
115
+ }
116
+ }
117
+ if pushErr != nil {
118
+ if errors .Is (pushErr , context .Canceled ) {
119
+ level .Warn (logger ).Log ("msg" , "push request canceled" , "err" , pushErr )
120
+ writeErrorToHTTPResponseBody (r .Context (), w , statusClientClosedRequest , codes .Canceled , "push request context canceled" , logger )
121
+ return
122
+ }
123
+ var (
124
+ httpCode int
125
+ grpcCode codes.Code
126
+ errorMsg string
127
+ )
128
+ if st , ok := grpcutil .ErrorToStatus (pushErr ); ok {
129
+ // This code is needed for a correct handling of errors returned by the supplier function.
130
+ // These errors are created by using the httpgrpc package.
131
+ httpCode = httpRetryableToOTLPRetryable (int (st .Code ()))
132
+ grpcCode = st .Code ()
133
+ errorMsg = st .Message ()
134
+ } else {
135
+ grpcCode , httpCode = toOtlpGRPCHTTPStatus (pushErr )
136
+ errorMsg = pushErr .Error ()
137
+ }
138
+ if httpCode != 202 {
139
+ // This error message is consistent with error message in Prometheus remote-write handler, and ingester's ingest-storage pushToStorage method.
140
+ msgs := []interface {}{"msg" , "detected an error while ingesting OTLP metrics request (the request may have been partially ingested)" , "httpCode" , httpCode , "err" , pushErr }
141
+ if httpCode / 100 == 4 {
142
+ msgs = append (msgs , "insight" , true )
143
+ }
144
+ level .Error (logger ).Log (msgs ... )
145
+ }
146
+ addHeaders (w , pushErr , r , httpCode , retryCfg )
147
+ writeErrorToHTTPResponseBody (r .Context (), w , httpCode , grpcCode , errorMsg , logger )
148
+ }
149
+ })
150
+ }
151
+
152
+ func newOTLPParser (
153
+ limits OTLPHandlerLimits ,
154
+ resourceAttributePromotionConfig OTelResourceAttributePromotionConfig ,
155
+ otlpConverter * otlpMimirConverter ,
156
+ enableStartTimeQuietZero bool ,
157
+ pushMetrics * PushMetrics ,
158
+ discardedDueToOtelParseError * prometheus.CounterVec ,
159
+ ) parserFunc {
160
+ return func (ctx context.Context , r * http.Request , maxRecvMsgSize int , buffers * util.RequestBuffers , req * mimirpb.PreallocWriteRequest , logger log.Logger ) error {
75
161
contentType := r .Header .Get ("Content-Type" )
76
162
contentEncoding := r .Header .Get ("Content-Encoding" )
77
163
var compression util.CompressionType
@@ -183,8 +269,10 @@ func OTLPHandler(
183
269
pushMetrics .IncOTLPRequest (tenantID )
184
270
pushMetrics .ObserveUncompressedBodySize (tenantID , float64 (uncompressedBodySize ))
185
271
186
- var metrics []mimirpb.PreallocTimeseries
187
- metrics , err = otelMetricsToTimeseries (ctx , tenantID , addSuffixes , enableCTZeroIngestion , enableStartTimeQuietZero , promoteResourceAttributes , keepIdentifyingResourceAttributes , discardedDueToOtelParseError , spanLogger , otlpReq .Metrics ())
272
+ metrics , metricsDropped , err := otelMetricsToTimeseries (ctx , otlpConverter , addSuffixes , enableCTZeroIngestion , enableStartTimeQuietZero , promoteResourceAttributes , keepIdentifyingResourceAttributes , otlpReq .Metrics (), spanLogger )
273
+ if metricsDropped > 0 {
274
+ discardedDueToOtelParseError .WithLabelValues (tenantID , "" ).Add (float64 (metricsDropped )) // "group" label is empty here as metrics couldn't be parsed
275
+ }
188
276
if err != nil {
189
277
return err
190
278
}
@@ -203,6 +291,7 @@ func OTLPHandler(
203
291
level .Debug (spanLogger ).Log (
204
292
"msg" , "OTLP to Prometheus conversion complete" ,
205
293
"metric_count" , metricCount ,
294
+ "metrics_dropped" , metricsDropped ,
206
295
"sample_count" , sampleCount ,
207
296
"histogram_count" , histogramCount ,
208
297
"exemplar_count" , exemplarCount ,
@@ -213,80 +302,7 @@ func OTLPHandler(
213
302
req .Metadata = otelMetricsToMetadata (addSuffixes , otlpReq .Metrics ())
214
303
215
304
return nil
216
- })
217
- }
218
-
219
- func otlpHandler (
220
- maxRecvMsgSize int ,
221
- requestBufferPool util.Pool ,
222
- sourceIPs * middleware.SourceIPExtractor ,
223
- retryCfg RetryConfig ,
224
- push PushFunc ,
225
- logger log.Logger ,
226
- parser parserFunc ,
227
- ) http.Handler {
228
- return http .HandlerFunc (func (w http.ResponseWriter , r * http.Request ) {
229
- ctx := r .Context ()
230
- logger := utillog .WithContext (ctx , logger )
231
- if sourceIPs != nil {
232
- source := sourceIPs .Get (r )
233
- if source != "" {
234
- logger = utillog .WithSourceIPs (source , logger )
235
- }
236
- }
237
- supplier := func () (* mimirpb.WriteRequest , func (), error ) {
238
- rb := util .NewRequestBuffers (requestBufferPool )
239
- var req mimirpb.PreallocWriteRequest
240
- if err := parser (ctx , r , maxRecvMsgSize , rb , & req , logger ); err != nil {
241
- // Check for httpgrpc error, default to client error if parsing failed
242
- if _ , ok := httpgrpc .HTTPResponseFromError (err ); ! ok {
243
- err = httpgrpc .Error (http .StatusBadRequest , err .Error ())
244
- }
245
-
246
- rb .CleanUp ()
247
- return nil , nil , err
248
- }
249
-
250
- cleanup := func () {
251
- mimirpb .ReuseSlice (req .Timeseries )
252
- rb .CleanUp ()
253
- }
254
- return & req .WriteRequest , cleanup , nil
255
- }
256
- req := newRequest (supplier )
257
- if err := push (ctx , req ); err != nil {
258
- if errors .Is (err , context .Canceled ) {
259
- level .Warn (logger ).Log ("msg" , "push request canceled" , "err" , err )
260
- writeErrorToHTTPResponseBody (r .Context (), w , statusClientClosedRequest , codes .Canceled , "push request context canceled" , logger )
261
- return
262
- }
263
- var (
264
- httpCode int
265
- grpcCode codes.Code
266
- errorMsg string
267
- )
268
- if st , ok := grpcutil .ErrorToStatus (err ); ok {
269
- // This code is needed for a correct handling of errors returned by the supplier function.
270
- // These errors are created by using the httpgrpc package.
271
- httpCode = httpRetryableToOTLPRetryable (int (st .Code ()))
272
- grpcCode = st .Code ()
273
- errorMsg = st .Message ()
274
- } else {
275
- grpcCode , httpCode = toOtlpGRPCHTTPStatus (err )
276
- errorMsg = err .Error ()
277
- }
278
- if httpCode != 202 {
279
- // This error message is consistent with error message in Prometheus remote-write handler, and ingester's ingest-storage pushToStorage method.
280
- msgs := []interface {}{"msg" , "detected an error while ingesting OTLP metrics request (the request may have been partially ingested)" , "httpCode" , httpCode , "err" , err }
281
- if httpCode / 100 == 4 {
282
- msgs = append (msgs , "insight" , true )
283
- }
284
- level .Error (logger ).Log (msgs ... )
285
- }
286
- addHeaders (w , err , r , httpCode , retryCfg )
287
- writeErrorToHTTPResponseBody (r .Context (), w , httpCode , grpcCode , errorMsg , logger )
288
- }
289
- })
305
+ }
290
306
}
291
307
292
308
// toOtlpGRPCHTTPStatus is utilized by the OTLP endpoint.
@@ -414,33 +430,68 @@ func otelMetricsToMetadata(addSuffixes bool, md pmetric.Metrics) []*mimirpb.Metr
414
430
return metadata
415
431
}
416
432
417
- func otelMetricsToTimeseries (ctx context.Context , tenantID string , addSuffixes , enableCTZeroIngestion , enableStartTimeQuietZero bool , promoteResourceAttributes []string , keepIdentifyingResourceAttributes bool , discardedDueToOtelParseError * prometheus.CounterVec , logger log.Logger , md pmetric.Metrics ) ([]mimirpb.PreallocTimeseries , error ) {
418
- converter := otlp .NewMimirConverter ()
419
- _ , errs := converter .FromMetrics (ctx , md , otlp.Settings {
433
+ func otelMetricsToTimeseries (
434
+ ctx context.Context ,
435
+ converter * otlpMimirConverter ,
436
+ addSuffixes , enableCTZeroIngestion , enableStartTimeQuietZero bool ,
437
+ promoteResourceAttributes []string ,
438
+ keepIdentifyingResourceAttributes bool ,
439
+ md pmetric.Metrics ,
440
+ logger log.Logger ,
441
+ ) ([]mimirpb.PreallocTimeseries , int , error ) {
442
+ settings := otlp.Settings {
420
443
AddMetricSuffixes : addSuffixes ,
421
444
EnableCreatedTimestampZeroIngestion : enableCTZeroIngestion ,
422
445
EnableStartTimeQuietZero : enableStartTimeQuietZero ,
423
446
PromoteResourceAttributes : promoteResourceAttributes ,
424
447
KeepIdentifyingResourceAttributes : keepIdentifyingResourceAttributes ,
425
- }, utillog .SlogFromGoKit (logger ))
426
- mimirTS := converter .TimeSeries ()
427
- if errs != nil {
428
- dropped := len (multierr .Errors (errs ))
429
- discardedDueToOtelParseError .WithLabelValues (tenantID , "" ).Add (float64 (dropped )) // Group is empty here as metrics couldn't be parsed
430
-
431
- parseErrs := errs .Error ()
432
- if len (parseErrs ) > maxErrMsgLen {
433
- parseErrs = parseErrs [:maxErrMsgLen ]
434
- }
448
+ }
449
+ mimirTS := converter .ToTimeseries (ctx , md , settings , logger )
435
450
436
- if len (mimirTS ) == 0 {
437
- return nil , errors .New (parseErrs )
438
- }
451
+ dropped := converter .DroppedTotal ()
452
+ if len (mimirTS ) == 0 && dropped > 0 {
453
+ return nil , dropped , converter .Err ()
454
+ }
455
+ return mimirTS , dropped , nil
456
+ }
457
+
458
+ type otlpMimirConverter struct {
459
+ converter * otlp.MimirConverter
460
+ // err holds OTLP parse errors
461
+ err error
462
+ }
463
+
464
+ func newOTLPMimirConverter () * otlpMimirConverter {
465
+ return & otlpMimirConverter {
466
+ converter : otlp .NewMimirConverter (),
467
+ }
468
+ }
439
469
440
- level .Warn (logger ).Log ("msg" , "OTLP parse error" , "err" , parseErrs )
470
+ func (c * otlpMimirConverter ) ToTimeseries (ctx context.Context , md pmetric.Metrics , settings otlp.Settings , logger log.Logger ) []mimirpb.PreallocTimeseries {
471
+ if c .err != nil {
472
+ return nil
441
473
}
442
474
443
- return mimirTS , nil
475
+ _ , c .err = c .converter .FromMetrics (ctx , md , settings , utillog .SlogFromGoKit (logger ))
476
+ return c .converter .TimeSeries ()
477
+ }
478
+
479
+ func (c * otlpMimirConverter ) DroppedTotal () int {
480
+ if c .err != nil {
481
+ return len (multierr .Errors (c .err ))
482
+ }
483
+ return 0
484
+ }
485
+
486
+ func (c * otlpMimirConverter ) Err () error {
487
+ if c .err != nil {
488
+ errMsg := c .err .Error ()
489
+ if len (errMsg ) > maxErrMsgLen {
490
+ errMsg = errMsg [:maxErrMsgLen ]
491
+ }
492
+ return fmt .Errorf ("otlp parse error: %s" , errMsg )
493
+ }
494
+ return nil
444
495
}
445
496
446
497
// TimeseriesToOTLPRequest is used in tests.
0 commit comments