Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3267,9 +3267,11 @@ respsLoop:

result := make([]labels.Labels, 0, len(metrics))
for _, m := range metrics {
if err := queryLimiter.AddSeries(m); err != nil {
// ignore duplicated check because metrics map should already be unique
if _, err := queryLimiter.AddSeries(m); err != nil {
return nil, err
}

result = append(result, m)
}
return result, nil
Expand Down
10 changes: 8 additions & 2 deletions pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,17 @@ func (r *ingesterQueryResult) receiveResponse(stream ingester_client.Ingester_Qu
for _, s := range resp.StreamingSeries {
l := mimirpb.FromLabelAdaptersToLabelsWithCopy(s.Labels)

if err := memoryConsumptionTracker.IncreaseMemoryConsumptionForLabels(l); err != nil {
duplicated, err := queryLimiter.AddSeries(l)
if err != nil {
return nil, false, err
}

if err := queryLimiter.AddSeries(l); err != nil {
if duplicated {
continue
}

// TODO move this inside AddSeries
if err := memoryConsumptionTracker.IncreaseMemoryConsumptionForLabels(l); err != nil {
return nil, false, err
}

Expand Down
16 changes: 11 additions & 5 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1016,15 +1016,21 @@ func (q *blocksStoreQuerier) receiveMessage(c BlocksStoreClient, stream storegat
for _, s := range ss.Series {
ls := mimirpb.FromLabelAdaptersToLabelsWithCopy(s.Labels)

if err := memoryTracker.IncreaseMemoryConsumptionForLabels(ls); err != nil {
return myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}

// Add series fingerprint to query limiter; will return error if we are over the limit
if limitErr := queryLimiter.AddSeries(ls); limitErr != nil {
duplicated, limitErr := queryLimiter.AddSeries(ls)
if limitErr != nil {
return myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, limitErr
}

if duplicated {
continue
}

// TODO move this inside AddSeries
if err := memoryTracker.IncreaseMemoryConsumptionForLabels(ls); err != nil {
return myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}

myStreamingSeriesLabels = append(myStreamingSeriesLabels, ls)
Comment on lines +1020 to 1034
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
duplicated, limitErr := queryLimiter.AddSeries(ls)
if limitErr != nil {
return myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, limitErr
}
if duplicated {
continue
}
// TODO move this inside AddSeries
if err := memoryTracker.IncreaseMemoryConsumptionForLabels(ls); err != nil {
return myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}
myStreamingSeriesLabels = append(myStreamingSeriesLabels, ls)
ls, limitErr := queryLimiter.AddSeries(ls)
if limitErr != nil {
return myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, limitErr
}
// TODO move this inside AddSeries
if err := memoryTracker.IncreaseMemoryConsumptionForLabels(ls); err != nil {
return myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err
}
myStreamingSeriesLabels = append(myStreamingSeriesLabels, ls)

}

Expand Down
9 changes: 5 additions & 4 deletions pkg/util/limiter/query_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,18 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter {
}

// AddSeries adds the input series and returns an error if the limit is reached.
func (ql *QueryLimiter) AddSeries(seriesLabels labels.Labels) validation.LimitError {
func (ql *QueryLimiter) AddSeries(seriesLabels labels.Labels) (bool, validation.LimitError) {
// If the max series is unlimited just return without managing map
if ql.maxSeriesPerQuery == 0 {
return nil
return false, nil
}
fingerprint := seriesLabels.Hash()

ql.uniqueSeriesMx.Lock()
defer ql.uniqueSeriesMx.Unlock()

uniqueSeriesBefore := len(ql.uniqueSeries)
_, duplicated := ql.uniqueSeries[fingerprint]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_, duplicated := ql.uniqueSeries[fingerprint]
seriesLabels, duplicated := ql.uniqueSeries[fingerprint]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other thing we should do here: we'll need to handle hash collisions.

ql.uniqueSeries[fingerprint] = struct{}{}
uniqueSeriesAfter := len(ql.uniqueSeries)

Expand All @@ -94,9 +95,9 @@ func (ql *QueryLimiter) AddSeries(seriesLabels labels.Labels) validation.LimitEr
ql.queryMetrics.QueriesRejectedTotal.WithLabelValues(stats.RejectReasonMaxSeries).Inc()
}

return NewMaxSeriesHitLimitError(uint64(ql.maxSeriesPerQuery))
return duplicated, NewMaxSeriesHitLimitError(uint64(ql.maxSeriesPerQuery))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return duplicated, NewMaxSeriesHitLimitError(uint64(ql.maxSeriesPerQuery))
return seriesLabels, NewMaxSeriesHitLimitError(uint64(ql.maxSeriesPerQuery))

}
return nil
return duplicated, nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return duplicated, nil
return seriesLabels, nil

}

// uniqueSeriesCount returns the count of unique series seen by this query limiter.
Expand Down
142 changes: 134 additions & 8 deletions pkg/util/limiter/query_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,19 @@ func TestQueryLimiter_AddSeries_ShouldReturnNoErrorOnLimitNotExceeded(t *testing
reg = prometheus.NewPedanticRegistry()
limiter = NewQueryLimiter(100, 0, 0, 0, stats.NewQueryMetrics(reg))
)
err := limiter.AddSeries(series1)
duplicated, err := limiter.AddSeries(series1)
assert.NoError(t, err)
err = limiter.AddSeries(series2)
assert.False(t, duplicated)
duplicated, err = limiter.AddSeries(series2)
assert.NoError(t, err)
assert.False(t, duplicated)
assert.Equal(t, 2, limiter.uniqueSeriesCount())
assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)

// Re-add previous series to make sure it's not double counted
err = limiter.AddSeries(series1)
duplicated, err = limiter.AddSeries(series1)
assert.NoError(t, err)
assert.True(t, duplicated)
assert.Equal(t, 2, limiter.uniqueSeriesCount())
assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)
}
Expand All @@ -72,22 +75,26 @@ func TestQueryLimiter_AddSeries_ShouldReturnErrorOnLimitExceeded(t *testing.T) {
reg = prometheus.NewPedanticRegistry()
limiter = NewQueryLimiter(1, 0, 0, 0, stats.NewQueryMetrics(reg))
)
err := limiter.AddSeries(series1)
duplicated, err := limiter.AddSeries(series1)
require.NoError(t, err)
require.False(t, duplicated)
assertRejectedQueriesMetricValue(t, reg, 0, 0, 0, 0)

err = limiter.AddSeries(series2)
duplicated, err = limiter.AddSeries(series2)
require.Error(t, err)
require.False(t, duplicated)
assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)

// Add the same series again and ensure that we don't increment the failed queries metric again.
err = limiter.AddSeries(series2)
duplicated, err = limiter.AddSeries(series2)
require.Error(t, err)
require.True(t, duplicated)
assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)

// Add another series and ensure that we don't increment the failed queries metric again.
err = limiter.AddSeries(series3)
duplicated, err = limiter.AddSeries(series3)
require.Error(t, err)
require.False(t, duplicated)
assertRejectedQueriesMetricValue(t, reg, 1, 0, 0, 0)
}

Expand Down Expand Up @@ -188,11 +195,130 @@ func BenchmarkQueryLimiter_AddSeries(b *testing.B) {
reg := prometheus.NewPedanticRegistry()
limiter := NewQueryLimiter(b.N+1, 0, 0, 0, stats.NewQueryMetrics(reg))
for _, s := range series {
err := limiter.AddSeries(s)
_, err := limiter.AddSeries(s)
assert.NoError(b, err)
}
}

// BenchmarkQueryLimiter_AddSeries_WithCallerDedup benchmarks caller-side deduplication with 50% duplicates
func BenchmarkQueryLimiter_AddSeries_WithCallerDedup_50pct(b *testing.B) {
const (
metricName = "test_metric"
uniqueSeries = 500
totalSeries = 1000 // 50% duplicates
)

// Create unique series
uniqueSet := make([]labels.Labels, 0, uniqueSeries)
for i := 0; i < uniqueSeries; i++ {
uniqueSet = append(uniqueSet, labels.FromMap(map[string]string{
model.MetricNameLabel: metricName,
"series": fmt.Sprint(i),
}))
}

// Create series array with duplicates
series := make([]labels.Labels, 0, totalSeries)
for i := 0; i < totalSeries; i++ {
series = append(series, uniqueSet[i%uniqueSeries])
}

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
reg := prometheus.NewPedanticRegistry()
limiter := NewQueryLimiter(totalSeries, 0, 0, 0, stats.NewQueryMetrics(reg))

// Simulate caller behavior: skip duplicates
result := make([]labels.Labels, 0, totalSeries)
for _, s := range series {
duplicated, _ := limiter.AddSeries(s)
if duplicated {
continue
}
result = append(result, s)
}
}
}

// BenchmarkQueryLimiter_AddSeries_WithCallerDedup_NoDuplicates benchmarks with all unique series
func BenchmarkQueryLimiter_AddSeries_WithCallerDedup_NoDuplicates(b *testing.B) {
const (
metricName = "test_metric"
totalSeries = 1000
)

// Create all unique series
series := make([]labels.Labels, 0, totalSeries)
for i := 0; i < totalSeries; i++ {
series = append(series, labels.FromMap(map[string]string{
model.MetricNameLabel: metricName,
"series": fmt.Sprint(i),
}))
}

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
reg := prometheus.NewPedanticRegistry()
limiter := NewQueryLimiter(totalSeries*2, 0, 0, 0, stats.NewQueryMetrics(reg))

// Simulate caller behavior: skip duplicates
result := make([]labels.Labels, 0, totalSeries)
for _, s := range series {
duplicated, _ := limiter.AddSeries(s)
if duplicated {
continue
}
result = append(result, s)
}
}
}

// BenchmarkQueryLimiter_AddSeries_WithCallerDedup_90pct benchmarks with 90% duplicates
func BenchmarkQueryLimiter_AddSeries_WithCallerDedup_90pct(b *testing.B) {
const (
metricName = "test_metric"
uniqueSeries = 100
totalSeries = 1000 // 90% duplicates
)

// Create few unique series
uniqueSet := make([]labels.Labels, 0, uniqueSeries)
for i := 0; i < uniqueSeries; i++ {
uniqueSet = append(uniqueSet, labels.FromMap(map[string]string{
model.MetricNameLabel: metricName,
"series": fmt.Sprint(i),
}))
}

// Build series with many duplicates
series := make([]labels.Labels, 0, totalSeries)
for i := 0; i < totalSeries; i++ {
series = append(series, uniqueSet[i%uniqueSeries])
}

b.ResetTimer()
b.ReportAllocs()

for i := 0; i < b.N; i++ {
reg := prometheus.NewPedanticRegistry()
limiter := NewQueryLimiter(totalSeries, 0, 0, 0, stats.NewQueryMetrics(reg))

// Simulate caller behavior: skip duplicates
result := make([]labels.Labels, 0, totalSeries)
for _, s := range series {
duplicated, _ := limiter.AddSeries(s)
if duplicated {
continue
}
result = append(result, s)
}
}
}

func assertRejectedQueriesMetricValue(t *testing.T, c prometheus.Collector, expectedMaxSeries, expectedMaxChunkBytes, expectedMaxChunks, expectedMaxEstimatedChunks int) {
expected := fmt.Sprintf(`
# HELP cortex_querier_queries_rejected_total Number of queries that were rejected, for example because they exceeded a limit.
Expand Down
Loading