Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
* [ENHANCEMENT] OTLP: Add experimental metric `cortex_distributor_otlp_array_lengths` to better understand the layout of OTLP packets in practice. #13525
* [ENHANCEMENT] Ruler: gRPC errors without details are classified as `operator` errors, and rule evaluation failures (such as duplicate labelsets) are classified as `user` errors. #13586
* [ENHANCEMENT] Server: The `/metrics` endpoint now supports metrics filtering by providing one or more `name[]` query parameters. #13746
* [ENHANCEMENT] Distributor: Improved the performance of configuration retrieval in the validation middleware. #13807
* [ENHANCEMENT] Ingester: Make sharded active-series requests matching all series faster. #13491
* [BUGFIX] Compactor: Fix potential concurrent map writes. #13053
* [BUGFIX] Query-frontend: Fix issue where queries sometimes fail with `failed to receive query result stream message: rpc error: code = Canceled desc = context canceled` if remote execution is enabled. #13084
Expand Down
25 changes: 13 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ func (d *Distributor) checkSample(ctx context.Context, userID, cluster, replica
// Returns an error explaining the first validation finding.
// May alter timeseries data in-place.
// The returned error MUST NOT retain label strings - they point into a gRPC buffer which is re-used.
func (d *Distributor) validateSamples(now model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error {
func (d *Distributor) validateSamples(now model.Time, ts *mimirpb.PreallocTimeseries, userID, group string, cfg sampleValidationConfig) error {
if len(ts.Samples) == 0 {
return nil
}
Expand All @@ -942,7 +942,7 @@ func (d *Distributor) validateSamples(now model.Time, ts *mimirpb.PreallocTimese
if len(ts.Samples) == 1 {
delta := now - model.Time(ts.Samples[0].TimestampMs)
d.sampleDelay.WithLabelValues(userID).Observe(float64(delta) / 1000)
return validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, ts.Samples[0], cat)
return validateSample(d.sampleValidationMetrics, now, cfg, userID, group, ts.Labels, ts.Samples[0], cat)
}

timestamps := make(map[int64]struct{}, min(len(ts.Samples), 100))
Expand All @@ -960,7 +960,7 @@ func (d *Distributor) validateSamples(now model.Time, ts *mimirpb.PreallocTimese
delta := now - model.Time(s.TimestampMs)
d.sampleDelay.WithLabelValues(userID).Observe(float64(delta) / 1000)

if err := validateSample(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, s, cat); err != nil {
if err := validateSample(d.sampleValidationMetrics, now, cfg, userID, group, ts.Labels, s, cat); err != nil {
return err
}

Expand All @@ -980,7 +980,7 @@ func (d *Distributor) validateSamples(now model.Time, ts *mimirpb.PreallocTimese
// Returns an error explaining the first validation finding.
// May alter timeseries data in-place.
// The returned error MUST NOT retain label strings - they point into a gRPC buffer which is re-used.
func (d *Distributor) validateHistograms(now model.Time, ts *mimirpb.PreallocTimeseries, userID, group string) error {
func (d *Distributor) validateHistograms(now model.Time, ts *mimirpb.PreallocTimeseries, userID, group string, cfg sampleValidationConfig) error {
if len(ts.Histograms) == 0 {
return nil
}
Expand All @@ -990,7 +990,7 @@ func (d *Distributor) validateHistograms(now model.Time, ts *mimirpb.PreallocTim
delta := now - model.Time(ts.Histograms[0].Timestamp)
d.sampleDelay.WithLabelValues(userID).Observe(float64(delta) / 1000)

updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[0], cat)
updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, cfg, userID, group, ts.Labels, &ts.Histograms[0], cat)
if err != nil {
return err
}
Expand All @@ -1015,7 +1015,7 @@ func (d *Distributor) validateHistograms(now model.Time, ts *mimirpb.PreallocTim
delta := now - model.Time(ts.Histograms[idx].Timestamp)
d.sampleDelay.WithLabelValues(userID).Observe(float64(delta) / 1000)

updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, d.limits, userID, group, ts.Labels, &ts.Histograms[idx], cat)
updated, err := validateSampleHistogram(d.sampleValidationMetrics, now, cfg, userID, group, ts.Labels, &ts.Histograms[idx], cat)
if err != nil {
return err
}
Expand Down Expand Up @@ -1078,21 +1078,21 @@ func (d *Distributor) validateExemplars(ts *mimirpb.PreallocTimeseries, userID s
// Returns an error explaining the first validation finding. Non-nil error means the timeseries should be removed from the request.
// The returned error MUST NOT retain label strings - they point into a gRPC buffer which is re-used.
// It uses the passed nowt time to observe the delay of sample timestamps.
func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeseries, userID, group string, skipLabelValidation, skipLabelCountValidation bool, minExemplarTS, maxExemplarTS int64, valueTooLongSummaries *labelValueTooLongSummaries) error {
func (d *Distributor) validateSeries(nowt time.Time, ts *mimirpb.PreallocTimeseries, userID, group string, cfg validationConfig, skipLabelValidation, skipLabelCountValidation bool, minExemplarTS, maxExemplarTS int64, valueTooLongSummaries *labelValueTooLongSummaries) error {
cat := d.costAttributionMgr.SampleTracker(userID)

if err := validateLabels(d.sampleValidationMetrics, d.limits, userID, group, ts.Labels, skipLabelValidation, skipLabelCountValidation, cat, nowt, valueTooLongSummaries); err != nil {
if err := validateLabels(d.sampleValidationMetrics, cfg.labels, userID, group, ts.Labels, skipLabelValidation, skipLabelCountValidation, cat, nowt, valueTooLongSummaries); err != nil {
return err
}

now := model.TimeFromUnixNano(nowt.UnixNano())
totalSamplesAndHistograms := len(ts.Samples) + len(ts.Histograms)

if err := d.validateSamples(now, ts, userID, group); err != nil {
if err := d.validateSamples(now, ts, userID, group, cfg.samples); err != nil {
return err
}

if err := d.validateHistograms(now, ts, userID, group); err != nil {
if err := d.validateHistograms(now, ts, userID, group, cfg.samples); err != nil {
return err
}

Expand Down Expand Up @@ -1328,6 +1328,7 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
d.activeUsers.UpdateUserTimestamp(userID, now)

pushReq.group = d.activeGroups.UpdateActiveGroupTimestamp(userID, validation.GroupLabel(d.limits, userID, req.Timeseries), now)
cfg := newValidationConfig(userID, d.limits)

// A WriteRequest can only contain series or metadata but not both. This might change in the future.
validatedMetadata := 0
Expand Down Expand Up @@ -1393,7 +1394,7 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
// Note that validateSeries may drop some data in ts.
rawSamples := len(ts.Samples)
rawHistograms := len(ts.Histograms)
validationErr := d.validateSeries(now, &req.Timeseries[tsIdx], userID, pushReq.group, skipLabelValidation, skipLabelCountValidation, minExemplarTS, maxExemplarTS, &valueTooLongSummaries)
validationErr := d.validateSeries(now, &req.Timeseries[tsIdx], userID, pushReq.group, cfg, skipLabelValidation, skipLabelCountValidation, minExemplarTS, maxExemplarTS, &valueTooLongSummaries)

if countDroppedNativeHistograms {
droppedNativeHistograms += len(ts.Histograms)
Expand Down Expand Up @@ -1467,7 +1468,7 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
}

for mIdx, m := range req.Metadata {
if validationErr := cleanAndValidateMetadata(d.metadataValidationMetrics, d.limits, userID, m); validationErr != nil {
if validationErr := cleanAndValidateMetadata(d.metadataValidationMetrics, cfg.metadata, userID, m); validationErr != nil {
if firstPartialErr == nil {
// The series are never retained by validationErr. This is guaranteed by the way the latter is built.
firstPartialErr = newValidationError(validationErr)
Expand Down
12 changes: 8 additions & 4 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1652,9 +1652,10 @@ func TestDistributor_ValidateSeries(t *testing.T) {
require.Len(t, ds, 1)
require.Len(t, regs, 1)

cfg := newValidationConfig("user", ds[0].limits)
now := mtime.Now()
for _, ts := range tc.req.Timeseries {
err := ds[0].validateSeries(now, &ts, "user", "test-group", true, true, 0, 0, nil)
err := ds[0].validateSeries(now, &ts, "user", "test-group", cfg, true, true, 0, 0, nil)
require.NoError(t, err)
}

Expand Down Expand Up @@ -1826,11 +1827,12 @@ func BenchmarkDistributor_SampleDuplicateTimestamp(b *testing.B) {
for name, tc := range testCases {
b.Run(name, func(b *testing.B) {
timeseries := tc.setup(b.N)
cfg := newValidationConfig("user", ds[0].limits)
b.ReportAllocs()
b.ResetTimer()
for n := 0; n < b.N; n++ {
for _, ts := range timeseries[n] {
err := ds[0].validateSeries(now, &ts, "user", "test-group", true, true, 0, 0, nil)
err := ds[0].validateSeries(now, &ts, "user", "test-group", cfg, true, true, 0, 0, nil)
if err != nil {
b.Fatal(err)
}
Expand Down Expand Up @@ -2072,8 +2074,9 @@ func TestDistributor_ExemplarValidation(t *testing.T) {
require.Len(t, ds, 1)
require.Len(t, regs, 1)

cfg := newValidationConfig("user", ds[0].limits)
for _, ts := range tc.req.Timeseries {
err := ds[0].validateSeries(now, &ts, "user", "test-group", false, false, tc.minExemplarTS, tc.maxExemplarTS, nil)
err := ds[0].validateSeries(now, &ts, "user", "test-group", cfg, false, false, tc.minExemplarTS, tc.maxExemplarTS, nil)
assert.NoError(t, err)
}

Expand Down Expand Up @@ -2179,8 +2182,9 @@ func TestDistributor_HistogramReduction(t *testing.T) {
require.Len(t, ds, 1)
require.Len(t, regs, 1)

cfg := newValidationConfig("user", ds[0].limits)
for _, ts := range tc.req.Timeseries {
err := ds[0].validateSeries(now, &ts, "user", "test-group", false, false, 0, 0, nil)
err := ds[0].validateSeries(now, &ts, "user", "test-group", cfg, false, false, 0, 0, nil)
if tc.expectedError != nil {
require.ErrorAs(t, err, &tc.expectedError)
} else {
Expand Down
Loading
Loading