Skip to content

Commit 0a0a416

Browse files
authored
Merge pull request #790 from grafana/revert-766-start-time-quiet
Revert "TSDB: Add 'quiet zero' for Otel start-time handling"
2 parents b02daec + 953a36a commit 0a0a416

File tree

6 files changed

+20
-72
lines changed

6 files changed

+20
-72
lines changed

model/value/value.go

-3
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@ const (
2626
// complicated values in the future. It is 2 rather than 1 to make
2727
// it easier to distinguish from the NormalNaN by a human when debugging.
2828
StaleNaN uint64 = 0x7ff0000000000002
29-
30-
// QuietZeroNaN signals TSDB to add a zero, but do nothing if there is already a value at that timestamp.
31-
QuietZeroNaN uint64 = 0x7ff0000000000003
3229
)
3330

3431
// IsStaleNaN returns true when the provided NaN value is a stale marker.

storage/remote/otlptranslator/prometheusremotewrite/helper.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -595,10 +595,9 @@ const defaultIntervalForStartTimestamps = int64(300_000)
595595
// handleStartTime adds a zero sample at startTs only if startTs is within validIntervalForStartTimestamps of the sample timestamp.
596596
// The reason for doing this is that PRW v1 doesn't support Created Timestamps. After switching to PRW v2's direct CT support,
597597
// make use of its direct support fort Created Timestamps instead.
598-
// See https://github.com/prometheus/prometheus/issues/14600 for context.
599598
// See https://opentelemetry.io/docs/specs/otel/metrics/data-model/#resets-and-gaps to know more about how OTel handles
600599
// resets for cumulative metrics.
601-
func (c *PrometheusConverter) handleStartTime(startTs, ts int64, labels []prompb.Label, settings Settings, typ string, val float64, logger *slog.Logger) {
600+
func (c *PrometheusConverter) handleStartTime(startTs, ts int64, labels []prompb.Label, settings Settings, typ string, value float64, logger *slog.Logger) {
602601
if !settings.EnableCreatedTimestampZeroIngestion {
603602
return
604603
}
@@ -620,9 +619,10 @@ func (c *PrometheusConverter) handleStartTime(startTs, ts int64, labels []prompb
620619
return
621620
}
622621

623-
logger.Debug("adding zero value at start_ts", "type", typ, "labels", labelsStringer(labels), "start_ts", startTs, "sample_ts", ts, "sample_value", val)
622+
logger.Debug("adding zero value at start_ts", "type", typ, "labels", labelsStringer(labels), "start_ts", startTs, "sample_ts", ts, "sample_value", value)
624623

625-
c.addSample(&prompb.Sample{Timestamp: startTs, Value: math.Float64frombits(value.QuietZeroNaN)}, labels)
624+
// See https://github.com/prometheus/prometheus/issues/14600 for context.
625+
c.addSample(&prompb.Sample{Timestamp: startTs}, labels)
626626
}
627627

628628
// handleHistogramStartTime similar to the method above but for native histograms..

storage/remote/otlptranslator/prometheusremotewrite/helper_test.go

+5-14
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@ package prometheusremotewrite
1818

1919
import (
2020
"context"
21-
"math"
2221
"testing"
2322
"time"
2423

25-
"github.com/google/go-cmp/cmp"
2624
"github.com/stretchr/testify/assert"
2725
"github.com/stretchr/testify/require"
2826
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -31,9 +29,7 @@ import (
3129
"github.com/prometheus/common/model"
3230
"github.com/prometheus/common/promslog"
3331

34-
"github.com/prometheus/prometheus/model/value"
3532
"github.com/prometheus/prometheus/prompb"
36-
"github.com/prometheus/prometheus/util/testutil"
3733
)
3834

3935
func TestCreateAttributes(t *testing.T) {
@@ -317,14 +313,14 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
317313
timeSeriesSignature(labels): {
318314
Labels: labels,
319315
Samples: []prompb.Sample{
320-
{Value: math.Float64frombits(value.QuietZeroNaN), Timestamp: convertTimeStamp(nowMinus2m30s)},
316+
{Value: 0, Timestamp: convertTimeStamp(nowMinus2m30s)},
321317
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
322318
},
323319
},
324320
timeSeriesSignature(sumLabels): {
325321
Labels: sumLabels,
326322
Samples: []prompb.Sample{
327-
{Value: math.Float64frombits(value.QuietZeroNaN), Timestamp: convertTimeStamp(nowMinus2m30s)},
323+
{Value: 0, Timestamp: convertTimeStamp(nowMinus2m30s)},
328324
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
329325
},
330326
},
@@ -365,14 +361,14 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
365361
timeSeriesSignature(labels): {
366362
Labels: labels,
367363
Samples: []prompb.Sample{
368-
{Value: math.Float64frombits(value.QuietZeroNaN), Timestamp: convertTimeStamp(nowMinus6m)},
364+
{Value: 0, Timestamp: convertTimeStamp(nowMinus6m)},
369365
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
370366
},
371367
},
372368
timeSeriesSignature(sumLabels): {
373369
Labels: sumLabels,
374370
Samples: []prompb.Sample{
375-
{Value: math.Float64frombits(value.QuietZeroNaN), Timestamp: convertTimeStamp(nowMinus6m)},
371+
{Value: 0, Timestamp: convertTimeStamp(nowMinus6m)},
376372
{Value: 0, Timestamp: convertTimeStamp(nowUnixNano)},
377373
},
378374
},
@@ -485,17 +481,12 @@ func TestPrometheusConverter_AddSummaryDataPoints(t *testing.T) {
485481
)
486482
require.NoError(t, err)
487483

488-
testutil.RequireEqualWithOptions(t, tt.want(), converter.unique, []cmp.Option{cmp.Comparer(equalSamples)})
484+
assert.Equal(t, tt.want(), converter.unique)
489485
assert.Empty(t, converter.conflicts)
490486
})
491487
}
492488
}
493489

494-
func equalSamples(a, b prompb.Sample) bool {
495-
// Compare Float64bits so NaN values which are exactly the same will compare equal.
496-
return a.Timestamp == b.Timestamp && math.Float64bits(a.Value) == math.Float64bits(b.Value)
497-
}
498-
499490
func TestPrometheusConverter_AddHistogramDataPoints(t *testing.T) {
500491
ts := pcommon.Timestamp(time.Now().UnixNano())
501492
tests := []struct {

tsdb/db_test.go

-9
Original file line numberDiff line numberDiff line change
@@ -105,15 +105,6 @@ func openTestDB(t testing.TB, opts *Options, rngs []int64) (db *DB) {
105105
return db
106106
}
107107

108-
// queryHead is a helper to query the head for a given time range and labelset.
109-
func queryHead(t testing.TB, head *Head, mint, maxt int64, label labels.Label) (map[string][]chunks.Sample, error) {
110-
q, err := NewBlockQuerier(head, mint, maxt)
111-
if err != nil {
112-
return nil, err
113-
}
114-
return query(t, q, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)), nil
115-
}
116-
117108
// query runs a matcher query against the querier and fully expands its data.
118109
func query(t testing.TB, q storage.Querier, matchers ...*labels.Matcher) map[string][]chunks.Sample {
119110
ss := q.Select(context.Background(), false, nil, matchers...)

tsdb/head_append.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ func (s *memSeries) appendable(t int64, v float64, headMaxt, minValidTime, oooTi
497497
if s.lastHistogramValue != nil || s.lastFloatHistogramValue != nil {
498498
return false, 0, storage.NewDuplicateHistogramToFloatErr(t, v)
499499
}
500-
if math.Float64bits(s.lastValue) != math.Float64bits(v) && math.Float64bits(v) != value.QuietZeroNaN {
500+
if math.Float64bits(s.lastValue) != math.Float64bits(v) {
501501
return false, 0, storage.NewDuplicateFloatErr(t, s.lastValue, v)
502502
}
503503
// Sample is identical (ts + value) with most current (highest ts) sample in sampleBuf.
@@ -1141,10 +1141,6 @@ func (a *headAppender) commitSamples(acc *appenderCommitContext) {
11411141
handleAppendableError(err, &acc.floatsAppended, &acc.floatOOORejected, &acc.floatOOBRejected, &acc.floatTooOldRejected)
11421142
}
11431143

1144-
if math.Float64bits(s.V) == value.QuietZeroNaN {
1145-
s.V = 0
1146-
}
1147-
11481144
switch {
11491145
case err != nil:
11501146
// Do nothing here.

tsdb/head_test.go

+10-37
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,15 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
547547
})
548548
}
549549

550+
// queryHead is a helper to query the head for a given time range and labelset.
551+
queryHead := func(mint, maxt uint64, label labels.Label) (map[string][]chunks.Sample, error) {
552+
q, err := NewBlockQuerier(head, int64(mint), int64(maxt))
553+
if err != nil {
554+
return nil, err
555+
}
556+
return query(t, q, labels.MustNewMatcher(labels.MatchEqual, label.Name, label.Value)), nil
557+
}
558+
550559
// readerTsCh will be used by the coordinator go routine to coordinate which timestamps the reader should read.
551560
readerTsCh := make(chan uint64)
552561

@@ -574,7 +583,7 @@ func TestHead_HighConcurrencyReadAndWrite(t *testing.T) {
574583
lbls.Range(func(l labels.Label) {
575584
lbl = l
576585
})
577-
samples, err := queryHead(t, head, int64(ts-qryRange), int64(ts), lbl)
586+
samples, err := queryHead(ts-qryRange, ts, lbl)
578587
if err != nil {
579588
return false, err
580589
}
@@ -6089,42 +6098,6 @@ func TestCuttingNewHeadChunks(t *testing.T) {
60896098
}
60906099
}
60916100

6092-
func TestAppendDuplicates(t *testing.T) {
6093-
ts := int64(1695209650)
6094-
lbls := labels.FromStrings("foo", "bar")
6095-
h, _ := newTestHead(t, DefaultBlockDuration, wlog.CompressionNone, false)
6096-
defer func() {
6097-
require.NoError(t, h.Close())
6098-
}()
6099-
6100-
a := h.Appender(context.Background())
6101-
_, err := a.Append(0, lbls, ts, 42.0)
6102-
require.NoError(t, err)
6103-
_, err = a.Append(0, lbls, ts, 42.0) // Exactly the same value.
6104-
require.NoError(t, err)
6105-
_, err = a.Append(0, lbls, ts, math.Float64frombits(value.QuietZeroNaN)) // Should be a no-op.
6106-
require.NoError(t, err)
6107-
require.NoError(t, a.Commit())
6108-
6109-
result, err := queryHead(t, h, math.MinInt64, math.MaxInt64, labels.Label{Name: "foo", Value: "bar"})
6110-
require.NoError(t, err)
6111-
expectedSamples := []chunks.Sample{sample{t: ts, f: 42.0}}
6112-
require.Equal(t, expectedSamples, result[`{foo="bar"}`])
6113-
6114-
a = h.Appender(context.Background())
6115-
_, err = a.Append(0, lbls, ts+10, math.Float64frombits(value.QuietZeroNaN)) // This is at a different timestamp so should append a real zero.
6116-
require.NoError(t, err)
6117-
require.NoError(t, a.Commit())
6118-
6119-
result, err = queryHead(t, h, math.MinInt64, math.MaxInt64, labels.Label{Name: "foo", Value: "bar"})
6120-
require.NoError(t, err)
6121-
expectedSamples = []chunks.Sample{
6122-
sample{t: ts, f: 42.0},
6123-
sample{t: ts + 10, f: 0},
6124-
}
6125-
require.Equal(t, expectedSamples, result[`{foo="bar"}`])
6126-
}
6127-
61286101
// TestHeadDetectsDuplicateSampleAtSizeLimit tests a regression where a duplicate sample
61296102
// is appended to the head, right when the head chunk is at the size limit.
61306103
// The test adds all samples as duplicate, thus expecting that the result has

0 commit comments

Comments
 (0)