Skip to content

Commit aced80d

Browse files
authored
MQE: Remove projections logic from ingesters, store-gateways (#15109)
Projections don't make sense as a way to reduce TCO. Remove the projections fields from ingester and store-gateway request objects and metrics for measuring their effect. Part of #13863
1 parent d226725 commit aced80d

26 files changed

Lines changed: 245 additions & 987 deletions

pkg/distributor/distributor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,7 +1143,7 @@ func TestDistributor_PushQuery(t *testing.T) {
11431143
assert.Nil(t, err)
11441144

11451145
queryMetrics := stats.NewQueryMetrics(reg[0])
1146-
resp, err := ds[0].QueryStream(ctx, queryMetrics, 0, 10, false, nil, tc.matchers...)
1146+
resp, err := ds[0].QueryStream(ctx, queryMetrics, 0, 10, tc.matchers...)
11471147

11481148
if tc.expectedError == nil {
11491149
require.NoError(t, err)
@@ -2589,7 +2589,7 @@ func TestSlowQueries(t *testing.T) {
25892589
})
25902590

25912591
queryMetrics := stats.NewQueryMetrics(reg[0])
2592-
_, err := ds[0].QueryStream(ctx, queryMetrics, 0, 10, false, nil, nameMatcher)
2592+
_, err := ds[0].QueryStream(ctx, queryMetrics, 0, 10, nameMatcher)
25932593
assert.Equal(t, expectedErr, err)
25942594
})
25952595
}

pkg/distributor/query.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, m
8282
}
8383

8484
// QueryStream queries multiple ingesters via the streaming interface and returns a big ol' set of chunks.
85-
func (d *Distributor) QueryStream(ctx context.Context, queryMetrics *stats.QueryMetrics, from, to model.Time, projectionInclude bool, projectionLabels []string, matchers ...*labels.Matcher) (ingester_client.CombinedQueryStreamResponse, error) {
85+
func (d *Distributor) QueryStream(ctx context.Context, queryMetrics *stats.QueryMetrics, from, to model.Time, matchers ...*labels.Matcher) (ingester_client.CombinedQueryStreamResponse, error) {
8686
var result ingester_client.CombinedQueryStreamResponse
8787
err := instrument.CollectedRequest(ctx, "Distributor.QueryStream", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error {
88-
req, err := ingester_client.ToQueryRequest(from, to, projectionInclude, projectionLabels, matchers)
88+
req, err := ingester_client.ToQueryRequest(from, to, matchers)
8989
if err != nil {
9090
return err
9191
}

pkg/distributor/query_ingest_storage_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ func TestDistributor_QueryStream_ShouldSupportIngestStorage(t *testing.T) {
523523

524524
// Query ingesters.
525525
queryMetrics := stats.NewQueryMetrics(distributorRegistries[0])
526-
resp, err := distributors[0].QueryStream(ctx, queryMetrics, 0, 10, false, nil, testData.matchers...)
526+
resp, err := distributors[0].QueryStream(ctx, queryMetrics, 0, 10, testData.matchers...)
527527

528528
if testData.expectedErr == nil {
529529
require.NoError(t, err)
@@ -693,7 +693,7 @@ func TestDistributor_QueryStream_InactivePartitionsLookback(t *testing.T) {
693693

694694
// Query ingesters.
695695
queryMetrics := stats.NewQueryMetrics(distributorRegistries[0])
696-
resp, err := d.QueryStream(ctx, queryMetrics, 0, 10, false, nil, selectAllSeriesMatcher)
696+
resp, err := d.QueryStream(ctx, queryMetrics, 0, 10, selectAllSeriesMatcher)
697697

698698
if scenario.expectQueryError {
699699
require.Error(t, err)

pkg/distributor/query_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
237237

238238
// Since the number of series (and thus chunks) is equal to the limit (but doesn't
239239
// exceed it), we expect a query running on all series to succeed.
240-
queryRes, err := ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, false, nil, allSeriesMatchers...)
240+
queryRes, err := ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
241241
require.NoError(t, err)
242242

243243
require.Len(t, queryRes.StreamingSeries, initialSeries)
@@ -265,7 +265,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
265265

266266
// Since the number of series (and thus chunks) is exceeding to the limit, we expect
267267
// a query running on all series to fail.
268-
_, err = ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, false, nil, allSeriesMatchers...)
268+
_, err = ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
269269
require.Error(t, err)
270270
require.ErrorContains(t, err, testCase.expectedError)
271271

@@ -319,7 +319,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac
319319
// exceed it), we expect a query running on all series to succeed.
320320
queryCtx := limiter.AddQueryLimiterToContext(userCtx, limiter.NewQueryLimiter(maxSeriesLimit, 0, 0, 0, stats.NewQueryMetrics(prometheus.NewPedanticRegistry())))
321321
queryCtx = limiter.ContextWithNewUnlimitedMemoryConsumptionTracker(queryCtx)
322-
queryRes, err := ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, false, nil, allSeriesMatchers...)
322+
queryRes, err := ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
323323
require.NoError(t, err)
324324

325325
assert.Len(t, queryRes.StreamingSeries, initialSeries)
@@ -342,7 +342,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac
342342

343343
// Since the number of series is exceeding the limit, we expect
344344
// a query running on all series to fail.
345-
_, err = ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, false, nil, allSeriesMatchers...)
345+
_, err = ds[0].QueryStream(queryCtx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
346346
require.Error(t, err)
347347
assert.ErrorContains(t, err, "the query exceeded the maximum number of series")
348348

@@ -384,7 +384,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
384384
assert.Nil(t, err)
385385

386386
queryMetrics := stats.NewQueryMetrics(reg[0])
387-
chunkSizeResponse, err := ds[0].QueryStream(ctx, queryMetrics, math.MinInt32, math.MaxInt32, false, nil, allSeriesMatchers...)
387+
chunkSizeResponse, err := ds[0].QueryStream(ctx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
388388
require.NoError(t, err)
389389

390390
_, responseChunkSize, err := countStreamingChunksAndBytes(chunkSizeResponse)
@@ -404,7 +404,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
404404

405405
// Since the number of chunk bytes is equal to the limit (but doesn't
406406
// exceed it), we expect a query running on all series to succeed.
407-
queryRes, err := ds[0].QueryStream(ctx, queryMetrics, math.MinInt32, math.MaxInt32, false, nil, allSeriesMatchers...)
407+
queryRes, err := ds[0].QueryStream(ctx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
408408
require.NoError(t, err)
409409
assert.Len(t, queryRes.StreamingSeries, seriesToAdd)
410410

@@ -418,7 +418,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
418418
// Since the aggregated chunk size is exceeding the limit, we expect
419419
// a query running on all series to fail but only when the chunks are
420420
// actually consumed from the stream.
421-
finalResp, err := ds[0].QueryStream(ctx, queryMetrics, math.MinInt32, math.MaxInt32, false, nil, allSeriesMatchers...)
421+
finalResp, err := ds[0].QueryStream(ctx, queryMetrics, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
422422
require.NoError(t, err)
423423

424424
_, _, err = countStreamingChunksAndBytes(finalResp)
@@ -491,7 +491,7 @@ func TestDistributor_QueryStream_ShouldSuccessfullyRunOnSlowIngesterWithStreamin
491491
t.Run(fmt.Sprintf("Query #%d", i), func(t *testing.T) {
492492
t.Parallel()
493493

494-
res, err := distributors[0].QueryStream(ctx, queryMetrics, math.MinInt32, math.MaxInt32, false, nil, matchers)
494+
res, err := distributors[0].QueryStream(ctx, queryMetrics, math.MinInt32, math.MaxInt32, matchers)
495495
require.NoError(t, err)
496496
require.Equal(t, numSeries, len(res.StreamingSeries))
497497

pkg/ingester/client/compat.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,16 @@ import (
1616
)
1717

1818
// ToQueryRequest builds a QueryRequest proto.
19-
func ToQueryRequest(from, to model.Time, projectionInclude bool, projectionLabels []string, matchers []*labels.Matcher) (*QueryRequest, error) {
19+
func ToQueryRequest(from, to model.Time, matchers []*labels.Matcher) (*QueryRequest, error) {
2020
ms, err := ToLabelMatchers(matchers)
2121
if err != nil {
2222
return nil, err
2323
}
2424

2525
return &QueryRequest{
26-
StartTimestampMs: int64(from),
27-
EndTimestampMs: int64(to),
28-
Matchers: ms,
29-
ProjectionInclude: projectionInclude,
30-
ProjectionLabels: projectionLabels,
26+
StartTimestampMs: int64(from),
27+
EndTimestampMs: int64(to),
28+
Matchers: ms,
3129
}, nil
3230
}
3331

pkg/ingester/client/compat_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestQueryRequest(t *testing.T) {
4343
}
4444
matchers = append(matchers, matcher4)
4545

46-
req, err := ToQueryRequest(from, to, false, nil, matchers)
46+
req, err := ToQueryRequest(from, to, matchers)
4747
if err != nil {
4848
t.Fatal(err)
4949
}

0 commit comments

Comments
 (0)