Skip to content

Commit 4823140

Browse files
jmichalek132codeboten
authored andcommitted
prom rw v2 exporter order samples by timestamp (open-telemetry#39980)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description As prometheus prefers to ingest timeseries in order by timestamp we are sorting them. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#33661 partially <!--Describe what testing was performed and which tests were added.--> #### Testing Added unit test I think it's okay to skip changelog here, given it's not changing the API and not really user facing given RW2 is not yet ready to be used. Co-authored-by: Alex Boten <[email protected]>
1 parent 8579474 commit 4823140

File tree

3 files changed

+122
-3
lines changed

3 files changed

+122
-3
lines changed

exporter/prometheusremotewriteexporter/exporter_v2.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,11 @@ func (prwe *prwExporter) handleExportV2(ctx context.Context, symbolsTable writev
102102
}
103103

104104
requests = append(requests, &writev2.Request{
105-
// TODO sort
106105
// Prometheus requires time series to be sorted by Timestamp to avoid out of order problems.
107106
// See:
108107
// * https://github.com/open-telemetry/wg-prometheus/issues/10
109108
// * https://github.com/open-telemetry/opentelemetry-collector/issues/2315
110-
// Timeseries: orderBySampleTimestamp(tsArray),
111-
Timeseries: tsArray,
109+
Timeseries: orderBySampleTimestampV2(tsArray),
112110
Symbols: symbolsTable.Symbols(),
113111
})
114112

exporter/prometheusremotewriteexporter/helper.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"sort"
1010

1111
"github.com/prometheus/prometheus/prompb"
12+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
1213
)
1314

1415
type batchTimeSeriesState struct {
@@ -120,3 +121,13 @@ func orderBySampleTimestamp(tsArray []prompb.TimeSeries) []prompb.TimeSeries {
120121
}
121122
return tsArray
122123
}
124+
125+
func orderBySampleTimestampV2(tsArray []writev2.TimeSeries) []writev2.TimeSeries {
126+
for i := range tsArray {
127+
sL := tsArray[i].Samples
128+
sort.Slice(sL, func(i, j int) bool {
129+
return sL[i].Timestamp < sL[j].Timestamp
130+
})
131+
}
132+
return tsArray
133+
}

exporter/prometheusremotewriteexporter/helper_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"testing"
99

1010
"github.com/prometheus/prometheus/prompb"
11+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
1112
"github.com/stretchr/testify/assert"
1213
)
1314

@@ -248,3 +249,112 @@ func TestEnsureTimeseriesPointsAreSortedByTimestamp(t *testing.T) {
248249
}
249250
}
250251
}
252+
253+
// Ensure that before a prompb.WriteRequest is created, that the points per TimeSeries
254+
// are sorted by Timestamp value, to prevent Prometheus from barfing when it gets poorly
255+
// sorted values. See issues:
256+
// * https://github.com/open-telemetry/wg-prometheus/issues/10
257+
// * https://github.com/open-telemetry/opentelemetry-collector/issues/2315
258+
func TestEnsureTimeseriesPointsAreSortedByTimestampV2(t *testing.T) {
259+
outOfOrder := []writev2.TimeSeries{
260+
{
261+
Samples: []writev2.Sample{
262+
{
263+
Value: 10.11,
264+
Timestamp: 1000,
265+
},
266+
{
267+
Value: 7.81,
268+
Timestamp: 2,
269+
},
270+
{
271+
Value: 987.81,
272+
Timestamp: 1,
273+
},
274+
{
275+
Value: 18.22,
276+
Timestamp: 999,
277+
},
278+
},
279+
},
280+
{
281+
Samples: []writev2.Sample{
282+
{
283+
Value: 99.91,
284+
Timestamp: 5,
285+
},
286+
{
287+
Value: 4.33,
288+
Timestamp: 3,
289+
},
290+
{
291+
Value: 47.81,
292+
Timestamp: 4,
293+
},
294+
{
295+
Value: 18.22,
296+
Timestamp: 8,
297+
},
298+
},
299+
},
300+
}
301+
got := orderBySampleTimestampV2(outOfOrder)
302+
303+
// We must ensure that the resulting Timeseries' sample points are sorted by Timestamp.
304+
want := []writev2.TimeSeries{
305+
{
306+
Samples: []writev2.Sample{
307+
{
308+
Value: 987.81,
309+
Timestamp: 1,
310+
},
311+
{
312+
Value: 7.81,
313+
Timestamp: 2,
314+
},
315+
{
316+
Value: 18.22,
317+
Timestamp: 999,
318+
},
319+
{
320+
Value: 10.11,
321+
Timestamp: 1000,
322+
},
323+
},
324+
},
325+
{
326+
Samples: []writev2.Sample{
327+
{
328+
Value: 4.33,
329+
Timestamp: 3,
330+
},
331+
{
332+
Value: 47.81,
333+
Timestamp: 4,
334+
},
335+
{
336+
Value: 99.91,
337+
Timestamp: 5,
338+
},
339+
{
340+
Value: 18.22,
341+
Timestamp: 8,
342+
},
343+
},
344+
},
345+
}
346+
assert.Equal(t, want, got)
347+
348+
// For a full sanity/logical check, assert that EVERY
349+
// Sample has a Timestamp bigger than its prior values.
350+
for ti, ts := range got {
351+
for i := range ts.Samples {
352+
si := ts.Samples[i]
353+
for j := 0; j < i; j++ {
354+
sj := ts.Samples[j]
355+
assert.LessOrEqual(t, sj.Timestamp, si.Timestamp, "Timeseries[%d]: Sample[%d].Timestamp(%d) > Sample[%d].Timestamp(%d)",
356+
ti, j, sj.Timestamp, i, si.Timestamp)
357+
}
358+
}
359+
}
360+
}

0 commit comments

Comments
 (0)