diff --git a/CHANGELOG.md b/CHANGELOG.md index 8eb1bce347e..79a175acfb2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -187,6 +187,7 @@ * [ENHANCEMENT] Distributor: OTLP endpoint now returns partial success (HTTP 200) instead of HTTP 429 when the usage tracker rejects some series due to the active series limit but other series are successfully ingested. The `RejectedDataPoints` field reports the count of distributor-side rejections (usage tracker filtering). #14789 * [ENHANCEMENT] MQE: Account for memory consumption of labels returned by binary operations in query memory consumption estimate earlier. #15033 * [ENHANCEMENT] Query-frontend: Log the number of series and samples returned for queries in `query stats` log lines. #15044 +* [ENHANCEMENT] Querier and query-frontend: When remote execution is enabled, send series metadata in batches, rather than in a single large message. The batch size can be configured with `-query-frontend.remote-execution-series-metadata-batch-size`. #15047 * [ENHANCEMENT] Ingest storage: Update the default configuration to enable ingest storage concurrency: #15072 * `-ingest-storage.kafka.fetch-concurrency-max` from `0` to `12` * `-ingest-storage.kafka.ingestion-concurrency-max` from `0` to `8` diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index 5b0d12403ee..e185f209125 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -8263,6 +8263,17 @@ "fieldType": "int", "fieldCategory": "experimental" }, + { + "kind": "field", + "name": "remote_execution_series_metadata_batch_size", + "required": false, + "desc": "Maximum number of series metadata entries to send in a single remote execution response from a querier.", + "fieldValue": null, + "fieldDefaultValue": 128, + "fieldFlag": "query-frontend.remote-execution-series-metadata-batch-size", + "fieldType": "int", + "fieldCategory": "experimental" + }, { "kind": "field", "name": "split_queries_by_interval", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 2e675d98db3..e16edafc37b 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -2591,6 +2591,8 @@ Usage of ./cmd/mimir/mimir: False to disable query statistics tracking. When enabled, a message with some statistics is logged for every query. (default true) -query-frontend.remote-execution-batch-size uint [experimental] Maximum number of series to send in a single remote execution response from a querier. (default 128) + -query-frontend.remote-execution-series-metadata-batch-size uint + [experimental] Maximum number of series metadata entries to send in a single remote execution response from a querier. (default 128) -query-frontend.results-cache-ttl duration Time to live duration for cached query results. If query falls into out-of-order time window, -query-frontend.results-cache-ttl-for-out-of-order-time-window is used instead. (default 1w) -query-frontend.results-cache-ttl-for-cardinality-query duration diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index b2d6d6f4805..320774381da 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -2175,6 +2175,11 @@ The `frontend` block configures the query-frontend. # CLI flag: -query-frontend.remote-execution-batch-size [remote_execution_batch_size: | default = 128] +# (experimental) Maximum number of series metadata entries to send in a single +# remote execution response from a querier. +# CLI flag: -query-frontend.remote-execution-series-metadata-batch-size +[remote_execution_series_metadata_batch_size: | default = 128] + # (advanced) Split range queries by an interval and execute in parallel. You # should use a multiple of 24 hours to optimize querying blocks. 0 to disable # it. diff --git a/operations/mimir/mimir-flags-defaults.json b/operations/mimir/mimir-flags-defaults.json index 25683241875..51322d1e258 100644 --- a/operations/mimir/mimir-flags-defaults.json +++ b/operations/mimir/mimir-flags-defaults.json @@ -568,6 +568,7 @@ "query-frontend.instance-addr": "", "query-frontend.instance-port": 0, "query-frontend.remote-execution-batch-size": 128, + "query-frontend.remote-execution-series-metadata-batch-size": 128, "query-frontend.split-queries-by-interval": 86400000000000, "query-frontend.results-cache.backend": "", "query-frontend.results-cache.memcached.addresses": "", diff --git a/pkg/frontend/v2/frontend.go b/pkg/frontend/v2/frontend.go index f8020cecac0..7a0c4b92c6b 100644 --- a/pkg/frontend/v2/frontend.go +++ b/pkg/frontend/v2/frontend.go @@ -74,7 +74,8 @@ type Config struct { Addr string `yaml:"address" category:"advanced"` Port int `category:"advanced"` - RemoteExecutionBatchSize uint64 `yaml:"remote_execution_batch_size" category:"experimental"` + RemoteExecutionBatchSize uint64 `yaml:"remote_execution_batch_size" category:"experimental"` + RemoteExecutionSeriesMetadataBatchSize uint64 `yaml:"remote_execution_series_metadata_batch_size" category:"experimental"` // These configuration options are injected internally. QuerySchedulerDiscovery schedulerdiscovery.Config `yaml:"-"` @@ -94,6 +95,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) { f.IntVar(&cfg.Port, "query-frontend.instance-port", 0, "Port to advertise to querier (via scheduler) (defaults to server.grpc-listen-port).") f.Uint64Var(&cfg.RemoteExecutionBatchSize, "query-frontend.remote-execution-batch-size", 128, "Maximum number of series to send in a single remote execution response from a querier.") + f.Uint64Var(&cfg.RemoteExecutionSeriesMetadataBatchSize, "query-frontend.remote-execution-series-metadata-batch-size", 128, "Maximum number of series metadata entries to send in a single remote execution response from a querier.") cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name} cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-frontend.grpc-client-config", f) @@ -108,6 +110,10 @@ func (cfg *Config) Validate() error { return fmt.Errorf("remote execution batch size must be greater than 0") } + if cfg.RemoteExecutionSeriesMetadataBatchSize <= 0 { + return fmt.Errorf("remote execution series metadata batch size must be greater than 0") + } + return cfg.GRPCClientConfig.Validate() } diff --git a/pkg/frontend/v2/remoteexec.go b/pkg/frontend/v2/remoteexec.go index e4f5d65f23a..bcccc6f4ae9 100644 --- a/pkg/frontend/v2/remoteexec.go +++ b/pkg/frontend/v2/remoteexec.go @@ -161,9 +161,10 @@ func (g *RemoteExecutionGroupEvaluator) sendRequest(ctx context.Context) error { } req := &querierpb.EvaluateQueryRequest{ - Plan: *encodedPlan, - Nodes: make([]querierpb.EvaluationNode, 0, len(nodeIndices)), - BatchSize: g.cfg.RemoteExecutionBatchSize, + Plan: *encodedPlan, + Nodes: make([]querierpb.EvaluationNode, 0, len(nodeIndices)), + BatchSize: g.cfg.RemoteExecutionBatchSize, + SeriesMetadataBatchSize: g.cfg.RemoteExecutionSeriesMetadataBatchSize, } overallQueriedTimeRange := planning.NoDataQueried() @@ -706,38 +707,58 @@ func (r *rangeVectorExecutionResponse) Close() { } func readSeriesMetadata(ctx context.Context, group *RemoteExecutionGroupEvaluator, nodeStreamIndex remoteExecutionNodeStreamIndex, memoryConsumptionTracker *limiter.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) { - resp, releaseMessage, err := group.getNextMessageForStream(ctx, nodeStreamIndex) - if err != nil { - return nil, err - } + var combinedMetadata []types.SeriesMetadata - // The labels in the message contain references to the underlying buffer, so we can't release it immediately. - // The value returned by FromLabelAdaptersToLabels does not retain a reference to the underlying buffer, - // so we can release the buffer once that method returns. - defer releaseMessage() + readOne := func() (int64, error) { + resp, releaseMessage, err := group.getNextMessageForStream(ctx, nodeStreamIndex) + if err != nil { + return -1, err + } - seriesMetadata := resp.GetSeriesMetadata() - if seriesMetadata == nil { - return nil, fmt.Errorf("expected SeriesMetadata, got %T", resp.Message) - } + // The labels in the message contain references to the underlying buffer, so we can't release it immediately. + // The value returned by FromLabelAdaptersToLabels does not retain a reference to the underlying buffer, + // so we can release the buffer once that method returns. + msg := resp.GetSeriesMetadata() + defer releaseMessage() - mqeSeries, err := types.SeriesMetadataSlicePool.Get(len(seriesMetadata.Series), memoryConsumptionTracker) - if err != nil { - return nil, err - } + if msg == nil { + return -1, fmt.Errorf("expected SeriesMetadata, got %T", resp.Message) + } - for _, s := range seriesMetadata.Series { - m := types.SeriesMetadata{ - Labels: mimirpb.FromLabelAdaptersToLabels(s.Labels), - DropName: s.DropName, + if combinedMetadata == nil { + // First message, allocate the slice now. + combinedMetadata, err = types.SeriesMetadataSlicePool.Get(max(len(msg.Series), int(msg.TotalSeriesCountForNode)), memoryConsumptionTracker) + if err != nil { + return -1, err + } + } else if len(combinedMetadata)+len(msg.Series) > cap(combinedMetadata) { + return -1, fmt.Errorf("expected %d series metadata, but got at least %d", cap(combinedMetadata), len(combinedMetadata)+len(msg.Series)) } - mqeSeries, err = types.AppendSeriesMetadata(memoryConsumptionTracker, mqeSeries, m) - if err != nil { - return nil, err + + for _, s := range msg.Series { + m := types.SeriesMetadata{ + Labels: mimirpb.FromLabelAdaptersToLabels(s.Labels), + DropName: s.DropName, + } + combinedMetadata, err = types.AppendSeriesMetadata(memoryConsumptionTracker, combinedMetadata, m) + if err != nil { + return -1, err + } } + + return msg.TotalSeriesCountForNode, nil } - return mqeSeries, nil + for { + if totalSeriesCount, err := readOne(); err != nil { + return nil, err + } else if int64(len(combinedMetadata)) >= totalSeriesCount { + // Either the querier doesn't support batching (TotalSeriesCountForNode == 0) and so there's only one message, + // or we've received the last message. + // We're done. + return combinedMetadata, nil + } + } } func decodeEvaluationCompletedMessage(msg *querierpb.EvaluateQueryResponseEvaluationCompleted) (*annotations.Annotations, stats.Stats) { diff --git a/pkg/frontend/v2/remoteexec_test.go b/pkg/frontend/v2/remoteexec_test.go index 6997bf382c8..e204e659e53 100644 --- a/pkg/frontend/v2/remoteexec_test.go +++ b/pkg/frontend/v2/remoteexec_test.go @@ -243,6 +243,79 @@ func TestInstantVectorExecutionResponse_Batching(t *testing.T) { require.Zerof(t, memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes(), "buffers should be released when closing response, have: %v", memoryConsumptionTracker.DescribeCurrentMemoryConsumption()) } +func TestInstantVectorExecutionResponse_BatchedSeriesMetadata(t *testing.T) { + ctx := context.Background() + memoryConsumptionTracker := limiter.NewUnlimitedMemoryConsumptionTracker(ctx) + + stream := &mockResponseStream{ + responses: []mockResponse{ + {msg: newSeriesMetadataBatch(0, false, 4, labels.FromStrings("series", "1"))}, + {msg: newSeriesMetadataBatch(0, false, 4, labels.FromStrings("series", "2"), labels.FromStrings("series", "3"))}, + {msg: newSeriesMetadataBatch(0, false, 4, labels.FromStrings("series", "4"))}, + {msg: newInstantVectorSeriesData(0, generateFPoints(1000, 1, 0), nil)}, + {msg: newInstantVectorSeriesData(0, generateFPoints(1000, 1, 1), nil)}, + {msg: newInstantVectorSeriesData(0, generateFPoints(1000, 1, 2), nil)}, + {msg: newInstantVectorSeriesData(0, generateFPoints(1000, 1, 3), nil)}, + }, + } + + frontend := &mockFrontend{stream: stream} + group := NewRemoteExecutionGroupEvaluator(frontend, Config{}, true, &planning.QueryParameters{}, memoryConsumptionTracker) + response, err := group.CreateInstantVectorExecution(ctx, createDummyNode(), types.NewInstantQueryTimeRange(time.Now())) + require.NoError(t, err) + + require.NoError(t, response.Start(ctx)) + series, err := response.GetSeriesMetadata(ctx) + require.NoError(t, err) + + expectedSeries := []types.SeriesMetadata{ + {Labels: labels.FromStrings("series", "1")}, + {Labels: labels.FromStrings("series", "2")}, + {Labels: labels.FromStrings("series", "3")}, + {Labels: labels.FromStrings("series", "4")}, + } + require.Equal(t, expectedSeries, series) + types.SeriesMetadataSlicePool.Put(&series, memoryConsumptionTracker) + + response.Close() + require.True(t, stream.closed.Load()) + require.Zerof(t, memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes(), "buffers should be released when closing response, have: %v", memoryConsumptionTracker.DescribeCurrentMemoryConsumption()) +} + +func TestRangeVectorExecutionResponse_BatchedSeriesMetadata(t *testing.T) { + ctx := context.Background() + memoryConsumptionTracker := limiter.NewUnlimitedMemoryConsumptionTracker(ctx) + + stream := &mockResponseStream{ + responses: []mockResponse{ + {msg: newSeriesMetadataBatch(0, false, 2, labels.FromStrings("series", "1"))}, + {msg: newSeriesMetadataBatch(0, false, 2, labels.FromStrings("series", "2"))}, + {msg: newRangeVectorStepData(0, 0, -10_000, 0, generateFPoints(1000, 1, 0), nil)}, + {msg: newRangeVectorStepData(1, 0, -10_000, 0, generateFPoints(1000, 1, 1), nil)}, + }, + } + + frontend := &mockFrontend{stream: stream} + group := NewRemoteExecutionGroupEvaluator(frontend, Config{}, true, &planning.QueryParameters{}, memoryConsumptionTracker) + response, err := group.CreateRangeVectorExecution(ctx, createDummyNode(), types.NewInstantQueryTimeRange(time.Now())) + require.NoError(t, err) + + require.NoError(t, response.Start(ctx)) + series, err := response.GetSeriesMetadata(ctx) + require.NoError(t, err) + + expectedSeries := []types.SeriesMetadata{ + {Labels: labels.FromStrings("series", "1")}, + {Labels: labels.FromStrings("series", "2")}, + } + require.Equal(t, expectedSeries, series) + types.SeriesMetadataSlicePool.Put(&series, memoryConsumptionTracker) + + response.Close() + require.True(t, stream.closed.Load()) + require.Zerof(t, memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes(), "buffers should be released when closing response, have: %v", memoryConsumptionTracker.DescribeCurrentMemoryConsumption()) +} + func TestInstantVectorExecutionResponse_DelayedNameRemoval(t *testing.T) { ctx := context.Background() memoryConsumptionTracker := limiter.NewUnlimitedMemoryConsumptionTracker(ctx) @@ -939,6 +1012,10 @@ func newScalarValue(samples ...mimirpb.Sample) *frontendv2pb.QueryResultStreamRe } func newSeriesMetadata(nodeIndex int64, dropName bool, series ...labels.Labels) *frontendv2pb.QueryResultStreamRequest { + return newSeriesMetadataBatch(nodeIndex, dropName, 0, series...) +} + +func newSeriesMetadataBatch(nodeIndex int64, dropName bool, totalSeriesCount int64, series ...labels.Labels) *frontendv2pb.QueryResultStreamRequest { protoSeries := make([]querierpb.SeriesMetadata, 0, len(series)) for _, series := range series { protoSeries = append(protoSeries, querierpb.SeriesMetadata{ @@ -952,8 +1029,9 @@ func newSeriesMetadata(nodeIndex int64, dropName bool, series ...labels.Labels) EvaluateQueryResponse: &querierpb.EvaluateQueryResponse{ Message: &querierpb.EvaluateQueryResponse_SeriesMetadata{ SeriesMetadata: &querierpb.EvaluateQueryResponseSeriesMetadata{ - NodeIndex: nodeIndex, - Series: protoSeries, + NodeIndex: nodeIndex, + Series: protoSeries, + TotalSeriesCountForNode: totalSeriesCount, }, }, }, diff --git a/pkg/querier/dispatcher.go b/pkg/querier/dispatcher.go index 99ada0f7cee..66068f4bc42 100644 --- a/pkg/querier/dispatcher.go +++ b/pkg/querier/dispatcher.go @@ -185,6 +185,7 @@ func (d *Dispatcher) evaluateQuery(ctx context.Context, body []byte, resp *query timeNow: d.timeNow, originalExpression: req.Plan.OriginalExpression, batchSize: req.BatchSize, + seriesMetadataBatchSize: req.SeriesMetadataBatchSize, instantVectorSeriesDataBatches: make(map[planning.Node]*instantVectorSeriesDataBatch, instantVectorNodeCount), } @@ -332,11 +333,12 @@ func (w *queryResponseWriter) write(ctx context.Context, resp *frontendv2pb.Quer } type evaluationObserver struct { - w *queryResponseWriter - nodeIndices map[planning.Node]int64 - batchSize uint64 - startTime time.Time - timeNow func() time.Time + w *queryResponseWriter + nodeIndices map[planning.Node]int64 + batchSize uint64 + seriesMetadataBatchSize uint64 + startTime time.Time + timeNow func() time.Time instantVectorSeriesDataBatches map[planning.Node]*instantVectorSeriesDataBatch @@ -364,23 +366,43 @@ func (o *evaluationObserver) SeriesMetadataEvaluated(ctx context.Context, evalua return err } - protoSeries := make([]querierpb.SeriesMetadata, 0, len(series)) - - for _, s := range series { - protoSeries = append(protoSeries, querierpb.SeriesMetadata{ - Labels: mimirpb.FromLabelsToLabelAdapters(s.Labels), - DropName: s.DropName, - }) + batchSize := int(o.seriesMetadataBatchSize) + if batchSize == 0 { + // Frontend doesn't support batching metadata, so send everything in one batch. + batchSize = len(series) } - return o.w.Write(ctx, querierpb.EvaluateQueryResponse{ - Message: &querierpb.EvaluateQueryResponse_SeriesMetadata{ - SeriesMetadata: &querierpb.EvaluateQueryResponseSeriesMetadata{ - NodeIndex: nodeIndex, - Series: protoSeries, + sentOne := false + + // Note the slightly unusual condition: we always send at least one message, even when there are no series. + for startIdx := 0; startIdx < len(series) || (len(series) == 0 && !sentOne); startIdx += batchSize { + endIdx := min(startIdx+batchSize, len(series)) + batch := series[startIdx:endIdx] + + protoSeries := make([]querierpb.SeriesMetadata, 0, len(batch)) + for _, s := range batch { + protoSeries = append(protoSeries, querierpb.SeriesMetadata{ + Labels: mimirpb.FromLabelsToLabelAdapters(s.Labels), + DropName: s.DropName, + }) + } + + if err := o.w.Write(ctx, querierpb.EvaluateQueryResponse{ + Message: &querierpb.EvaluateQueryResponse_SeriesMetadata{ + SeriesMetadata: &querierpb.EvaluateQueryResponseSeriesMetadata{ + NodeIndex: nodeIndex, + Series: protoSeries, + TotalSeriesCountForNode: int64(len(series)), + }, }, - }, - }) + }); err != nil { + return err + } + + sentOne = true + } + + return nil } func (o *evaluationObserver) InstantVectorSeriesDataEvaluated(ctx context.Context, evaluator *streamingpromql.Evaluator, node planning.Node, seriesIndex int, seriesCount int, seriesData types.InstantVectorSeriesData) error { diff --git a/pkg/querier/dispatcher_test.go b/pkg/querier/dispatcher_test.go index c58accc1a16..e1a5513bf6d 100644 --- a/pkg/querier/dispatcher_test.go +++ b/pkg/querier/dispatcher_test.go @@ -70,6 +70,16 @@ func TestDispatcher_HandleProtobuf(t *testing.T) { return createQueryRequestForSpecificNodes(t, ctx, planner, expr, timeRange, enableDelayedNameRemoval, batchSize, nil) } + createQueryRequestWithSeriesMetadataBatchSize := func(expr string, timeRange types.QueryTimeRange, seriesMetadataBatchSize uint64) *prototypes.Any { + reqAny := createQueryRequestForSpecificNodes(t, ctx, planner, expr, timeRange, enableDelayedNameRemoval, 128, nil) + req := &querierpb.EvaluateQueryRequest{} + require.NoError(t, prototypes.UnmarshalAny(reqAny, req)) + req.SeriesMetadataBatchSize = seriesMetadataBatchSize + result, err := prototypes.MarshalAny(req) + require.NoError(t, err) + return result + } + createQueryRequestWithNoNodes := func() *prototypes.Any { body := &querierpb.EvaluateQueryRequest{ BatchSize: 1, @@ -899,6 +909,267 @@ func TestDispatcher_HandleProtobuf(t *testing.T) { expectedStatusCode: "OK", expectStorageToBeCalledWithPropagatedHeaders: true, }, + + "query that returns an instant vector with series metadata batching, where all series fit exactly into one batch": { + req: createQueryRequestWithSeriesMetadataBatchSize(`my_three_item_series + 0.123`, types.NewRangeQueryTimeRange(startT, startT.Add(20*time.Second), 10*time.Second), 3), + expectedResponseMessages: []*frontendv2pb.QueryResultStreamRequest{ + newBatchedSeriesMetadataMessage( + 3, 3, + querierpb.SeriesMetadata{Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("idx", "0"))}, + querierpb.SeriesMetadata{Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("idx", "1"))}, + querierpb.SeriesMetadata{Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("idx", "2"))}, + ), + newInstantVectorSeriesDataMessage( + 3, + querierpb.InstantVectorSeriesData{ + Floats: []mimirpb.Sample{ + {TimestampMs: 0, Value: 3.123}, + {TimestampMs: 10_000, Value: 7.123}, + {TimestampMs: 20_000, Value: 11.123}, + }, + }, + querierpb.InstantVectorSeriesData{ + Floats: []mimirpb.Sample{ + {TimestampMs: 0, Value: 4.123}, + {TimestampMs: 10_000, Value: 9.123}, + {TimestampMs: 20_000, Value: 14.123}, + }, + }, + querierpb.InstantVectorSeriesData{ + Floats: []mimirpb.Sample{ + {TimestampMs: 0, Value: 5.123}, + {TimestampMs: 10_000, Value: 11.123}, + {TimestampMs: 20_000, Value: 17.123}, + }, + }, + ), + newEvaluationCompletedMessage(stats.Stats{ + SamplesProcessed: 9, + QueueTime: 3 * time.Second, + WallTime: expectedQueryWallTime, + FetchedSeriesCount: 123, + FetchedChunksCount: 456, + FetchedChunkBytes: 789, + }), + }, + expectedStatusCode: "OK", + expectStorageToBeCalledWithPropagatedHeaders: true, + }, + + "query that returns an instant vector with series metadata batching, where all series fit into one batch with space to spare": { + req: createQueryRequestWithSeriesMetadataBatchSize(`my_three_item_series + 0.123`, types.NewRangeQueryTimeRange(startT, startT.Add(20*time.Second), 10*time.Second), 4), + expectedResponseMessages: []*frontendv2pb.QueryResultStreamRequest{ + newBatchedSeriesMetadataMessage( + 3, 3, + querierpb.SeriesMetadata{Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("idx", "0"))}, + querierpb.SeriesMetadata{Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("idx", "1"))}, + querierpb.SeriesMetadata{Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("idx", "2"))}, + ), + newInstantVectorSeriesDataMessage( + 3, + querierpb.InstantVectorSeriesData{ + Floats: []mimirpb.Sample{ + {TimestampMs: 0, Value: 3.123}, + {TimestampMs: 10_000, Value: 7.123}, + {TimestampMs: 20_000, Value: 11.123}, + }, + }, + querierpb.InstantVectorSeriesData{ + Floats: []mimirpb.Sample{ + {TimestampMs: 0, Value: 4.123}, + {TimestampMs: 10_000, Value: 9.123}, + {TimestampMs: 20_000, Value: 14.123}, + }, + }, + querierpb.InstantVectorSeriesData{ + Floats: []mimirpb.Sample{ + {TimestampMs: 0, Value: 5.123}, + {TimestampMs: 10_000, Value: 11.123}, + {TimestampMs: 20_000, Value: 17.123}, + }, + }, + ), + newEvaluationCompletedMessage(stats.Stats{ + SamplesProcessed: 9, + QueueTime: 3 * time.Second, + WallTime: expectedQueryWallTime, + FetchedSeriesCount: 123, + FetchedChunksCount: 456, + FetchedChunkBytes: 789, + }), + }, + expectedStatusCode: "OK", + expectStorageToBeCalledWithPropagatedHeaders: true, + }, + + "query that returns an instant vector with series metadata batching, where the last batch is not completely full": { + req: createQueryRequestWithSeriesMetadataBatchSize(`my_three_item_series + 0.123`, types.NewRangeQueryTimeRange(startT, startT.Add(20*time.Second), 10*time.Second), 2), + expectedResponseMessages: []*frontendv2pb.QueryResultStreamRequest{ + newBatchedSeriesMetadataMessage( + 3, 3, + querierpb.SeriesMetadata{Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("idx", "0"))}, + querierpb.SeriesMetadata{Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("idx", "1"))}, + ), + newBatchedSeriesMetadataMessage( + 3, 3, + querierpb.SeriesMetadata{Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("idx", "2"))}, + ), + newInstantVectorSeriesDataMessage( + 3, + querierpb.InstantVectorSeriesData{ + Floats: []mimirpb.Sample{ + {TimestampMs: 0, Value: 3.123}, + {TimestampMs: 10_000, Value: 7.123}, + {TimestampMs: 20_000, Value: 11.123}, + }, + }, + querierpb.InstantVectorSeriesData{ + Floats: []mimirpb.Sample{ + {TimestampMs: 0, Value: 4.123}, + {TimestampMs: 10_000, Value: 9.123}, + {TimestampMs: 20_000, Value: 14.123}, + }, + }, + querierpb.InstantVectorSeriesData{ + Floats: []mimirpb.Sample{ + {TimestampMs: 0, Value: 5.123}, + {TimestampMs: 10_000, Value: 11.123}, + {TimestampMs: 20_000, Value: 17.123}, + }, + }, + ), + newEvaluationCompletedMessage(stats.Stats{ + SamplesProcessed: 9, + QueueTime: 3 * time.Second, + WallTime: expectedQueryWallTime, + FetchedSeriesCount: 123, + FetchedChunksCount: 456, + FetchedChunkBytes: 789, + }), + }, + expectedStatusCode: "OK", + expectStorageToBeCalledWithPropagatedHeaders: true, + }, + + "query that returns an instant vector with series metadata batching, where each series gets its own batch": { + req: createQueryRequestWithSeriesMetadataBatchSize(`my_three_item_series + 0.123`, types.NewRangeQueryTimeRange(startT, startT.Add(20*time.Second), 10*time.Second), 1), + expectedResponseMessages: []*frontendv2pb.QueryResultStreamRequest{ + newBatchedSeriesMetadataMessage( + 3, 3, + querierpb.SeriesMetadata{Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("idx", "0"))}, + ), + newBatchedSeriesMetadataMessage( + 3, 3, + querierpb.SeriesMetadata{Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("idx", "1"))}, + ), + newBatchedSeriesMetadataMessage( + 3, 3, + querierpb.SeriesMetadata{Labels: mimirpb.FromLabelsToLabelAdapters(labels.FromStrings("idx", "2"))}, + ), + newInstantVectorSeriesDataMessage( + 3, + querierpb.InstantVectorSeriesData{ + Floats: []mimirpb.Sample{ + {TimestampMs: 0, Value: 3.123}, + {TimestampMs: 10_000, Value: 7.123}, + {TimestampMs: 20_000, Value: 11.123}, + }, + }, + querierpb.InstantVectorSeriesData{ + Floats: []mimirpb.Sample{ + {TimestampMs: 0, Value: 4.123}, + {TimestampMs: 10_000, Value: 9.123}, + {TimestampMs: 20_000, Value: 14.123}, + }, + }, + querierpb.InstantVectorSeriesData{ + Floats: []mimirpb.Sample{ + {TimestampMs: 0, Value: 5.123}, + {TimestampMs: 10_000, Value: 11.123}, + {TimestampMs: 20_000, Value: 17.123}, + }, + }, + ), + newEvaluationCompletedMessage(stats.Stats{ + SamplesProcessed: 9, + QueueTime: 3 * time.Second, + WallTime: expectedQueryWallTime, + FetchedSeriesCount: 123, + FetchedChunksCount: 456, + FetchedChunkBytes: 789, + }), + }, + expectedStatusCode: "OK", + expectStorageToBeCalledWithPropagatedHeaders: true, + }, + + "query that returns an instant vector with no series, metadata batching disabled": { + req: createQueryRequest(`my_non_existent_series + 0.123`, types.NewRangeQueryTimeRange(startT, startT.Add(20*time.Second), 10*time.Second)), + expectedResponseMessages: []*frontendv2pb.QueryResultStreamRequest{ + newBatchedSeriesMetadataMessage(3, 0), + newEvaluationCompletedMessage(stats.Stats{ + SamplesProcessed: 0, + QueueTime: 3 * time.Second, + WallTime: expectedQueryWallTime, + FetchedSeriesCount: 123, + FetchedChunksCount: 456, + FetchedChunkBytes: 789, + }), + }, + expectedStatusCode: "OK", + expectStorageToBeCalledWithPropagatedHeaders: true, + }, + + "query that returns an instant vector with no series, metadata batching enabled": { + req: createQueryRequestWithBatchSize(`my_non_existent_series + 0.123`, types.NewRangeQueryTimeRange(startT, startT.Add(20*time.Second), 10*time.Second), 3), + expectedResponseMessages: []*frontendv2pb.QueryResultStreamRequest{ + newBatchedSeriesMetadataMessage(3, 0), + newEvaluationCompletedMessage(stats.Stats{ + SamplesProcessed: 0, + QueueTime: 3 * time.Second, + WallTime: expectedQueryWallTime, + FetchedSeriesCount: 123, + FetchedChunksCount: 456, + FetchedChunkBytes: 789, + }), + }, + expectedStatusCode: "OK", + expectStorageToBeCalledWithPropagatedHeaders: true, + }, + + "query that returns a range vector with no series, metadata batching disabled": { + req: createQueryRequest(`my_non_existent_series[2h]`, types.NewInstantQueryTimeRange(startT)), + expectedResponseMessages: []*frontendv2pb.QueryResultStreamRequest{ + newBatchedSeriesMetadataMessage(0, 0), + newEvaluationCompletedMessage(stats.Stats{ + SamplesProcessed: 0, + QueueTime: 3 * time.Second, + WallTime: expectedQueryWallTime, + FetchedSeriesCount: 123, + FetchedChunksCount: 456, + FetchedChunkBytes: 789, + }), + }, + expectedStatusCode: "OK", + expectStorageToBeCalledWithPropagatedHeaders: true, + }, + + "query that returns an range vector with no series, metadata batching enabled": { + req: createQueryRequestWithBatchSize(`my_non_existent_series[2h]`, types.NewInstantQueryTimeRange(startT), 3), + expectedResponseMessages: []*frontendv2pb.QueryResultStreamRequest{ + newBatchedSeriesMetadataMessage(0, 0), + newEvaluationCompletedMessage(stats.Stats{ + SamplesProcessed: 0, + QueueTime: 3 * time.Second, + WallTime: expectedQueryWallTime, + FetchedSeriesCount: 123, + FetchedChunksCount: 456, + FetchedChunkBytes: 789, + }), + }, + expectedStatusCode: "OK", + expectStorageToBeCalledWithPropagatedHeaders: true, + }, } for name, testCase := range testCases { @@ -1421,13 +1692,18 @@ func newErrorMessage(typ mimirpb.QueryErrorType, message string) *frontendv2pb.Q } func newSeriesMetadataMessage(nodeIndex int64, series ...querierpb.SeriesMetadata) *frontendv2pb.QueryResultStreamRequest { + return newBatchedSeriesMetadataMessage(nodeIndex, int64(len(series)), series...) +} + +func newBatchedSeriesMetadataMessage(nodeIndex int64, totalSeriesCount int64, series ...querierpb.SeriesMetadata) *frontendv2pb.QueryResultStreamRequest { return &frontendv2pb.QueryResultStreamRequest{ Data: &frontendv2pb.QueryResultStreamRequest_EvaluateQueryResponse{ EvaluateQueryResponse: &querierpb.EvaluateQueryResponse{ Message: &querierpb.EvaluateQueryResponse_SeriesMetadata{ SeriesMetadata: &querierpb.EvaluateQueryResponseSeriesMetadata{ - NodeIndex: nodeIndex, - Series: series, + NodeIndex: nodeIndex, + Series: series, + TotalSeriesCountForNode: totalSeriesCount, }, }, }, diff --git a/pkg/querier/querierpb/querier.pb.go b/pkg/querier/querierpb/querier.pb.go index c022d0228f0..44403302730 100644 --- a/pkg/querier/querierpb/querier.pb.go +++ b/pkg/querier/querierpb/querier.pb.go @@ -33,6 +33,11 @@ type EvaluateQueryRequest struct { Plan planning.EncodedQueryPlan `protobuf:"bytes,1,opt,name=plan,proto3" json:"plan"` Nodes []EvaluationNode `protobuf:"bytes,2,rep,name=nodes,proto3" json:"nodes"` BatchSize uint64 `protobuf:"varint,4,opt,name=batchSize,proto3" json:"batchSize,omitempty"` + // seriesMetadataBatchSize is the maximum number of series metadata entries to include in a single + // EvaluateQueryResponseSeriesMetadata message. A non-zero value indicates that the query-frontend + // supports receiving batched series metadata messages (ie. multiple EvaluateQueryResponseSeriesMetadata + // messages per node). + SeriesMetadataBatchSize uint64 `protobuf:"varint,5,opt,name=seriesMetadataBatchSize,proto3" json:"seriesMetadataBatchSize,omitempty"` } func (m *EvaluateQueryRequest) Reset() { *m = EvaluateQueryRequest{} } @@ -88,6 +93,13 @@ func (m *EvaluateQueryRequest) GetBatchSize() uint64 { return 0 } +func (m *EvaluateQueryRequest) GetSeriesMetadataBatchSize() uint64 { + if m != nil { + return m.SeriesMetadataBatchSize + } + return 0 +} + type EvaluationNode struct { NodeIndex int64 `protobuf:"varint,1,opt,name=nodeIndex,proto3" json:"nodeIndex,omitempty"` TimeRange planning.EncodedQueryTimeRange `protobuf:"bytes,2,opt,name=timeRange,proto3" json:"timeRange"` @@ -279,6 +291,11 @@ func (*EvaluateQueryResponse) XXX_OneofWrappers() []interface{} { type EvaluateQueryResponseSeriesMetadata struct { NodeIndex int64 `protobuf:"varint,1,opt,name=nodeIndex,proto3" json:"nodeIndex,omitempty"` Series []SeriesMetadata `protobuf:"bytes,2,rep,name=series,proto3" json:"series"` + // totalSeriesCountForNode is non-zero if the EvaluateQueryRequest has a non-zero seriesMetadataBatchSize and the + // querier supports batching series metadata messages. + // Why send a total count rather than a "more messages to come" flag? Sending the count allows the query-frontend to + // allocate a slice of exactly the right length to hold all series. + TotalSeriesCountForNode int64 `protobuf:"varint,3,opt,name=totalSeriesCountForNode,proto3" json:"totalSeriesCountForNode,omitempty"` } func (m *EvaluateQueryResponseSeriesMetadata) Reset() { *m = EvaluateQueryResponseSeriesMetadata{} } @@ -327,6 +344,13 @@ func (m *EvaluateQueryResponseSeriesMetadata) GetSeries() []SeriesMetadata { return nil } +func (m *EvaluateQueryResponseSeriesMetadata) GetTotalSeriesCountForNode() int64 { + if m != nil { + return m.TotalSeriesCountForNode + } + return 0 +} + type SeriesMetadata struct { Labels []github_com_grafana_mimir_pkg_mimirpb.LabelAdapter `protobuf:"bytes,1,rep,name=labels,proto3,customtype=github.com/grafana/mimir/pkg/mimirpb.LabelAdapter" json:"labels"` DropName bool `protobuf:"varint,2,opt,name=dropName,proto3" json:"dropName,omitempty"` @@ -845,67 +869,69 @@ func init() { func init() { proto.RegisterFile("querier.proto", fileDescriptor_7edfe438abd6b96f) } var fileDescriptor_7edfe438abd6b96f = []byte{ - // 955 bytes of a gzipped FileDescriptorProto + // 990 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0x4f, 0x6f, 0x1b, 0x45, - 0x14, 0xdf, 0x8d, 0xff, 0x34, 0x7e, 0x86, 0xa8, 0xda, 0xa6, 0xd4, 0x84, 0x6a, 0x63, 0x2d, 0x17, - 0x4b, 0x54, 0x6b, 0x48, 0x81, 0x8a, 0x0b, 0x6d, 0x5d, 0x8c, 0x5c, 0x04, 0x6d, 0x19, 0x47, 0x11, - 0xe2, 0x52, 0x8d, 0xbd, 0x93, 0xcd, 0x8a, 0xdd, 0x99, 0xcd, 0xcc, 0xb8, 0x34, 0x88, 0x03, 0x5f, - 0x00, 0xc4, 0xa1, 0xdf, 0x01, 0xee, 0x7c, 0x89, 0x1e, 0x73, 0xac, 0x38, 0x54, 0xc4, 0x11, 0x12, - 0xc7, 0x7e, 0x04, 0x34, 0x33, 0x6b, 0xef, 0xc6, 0xb1, 0x63, 0xb7, 0x17, 0x7b, 0xe6, 0xcd, 0xfb, - 0xfd, 0xde, 0xcc, 0xef, 0xbd, 0x79, 0x3b, 0xf0, 0xf6, 0xe1, 0x88, 0xf0, 0x88, 0x70, 0x3f, 0xe5, - 0x4c, 0x32, 0xa7, 0x96, 0x4d, 0xd3, 0xc1, 0xd6, 0x87, 0x61, 0x24, 0x0f, 0x46, 0x03, 0x7f, 0xc8, - 0x92, 0x76, 0xc8, 0xf1, 0x3e, 0xa6, 0xb8, 0x9d, 0x44, 0x49, 0xc4, 0xdb, 0xe9, 0x0f, 0xa1, 0x19, - 0xa5, 0x03, 0xf3, 0x6f, 0xc0, 0x5b, 0x9f, 0x5e, 0x88, 0xc8, 0x98, 0xdb, 0x42, 0x62, 0x29, 0xcc, - 0x6f, 0x86, 0xbb, 0x73, 0x21, 0x4e, 0x48, 0x4e, 0x70, 0x12, 0xd1, 0x30, 0xe5, 0x2c, 0x39, 0x8c, - 0xdb, 0x69, 0x8c, 0x29, 0x8d, 0x68, 0xa8, 0x07, 0x19, 0xc3, 0x66, 0xc8, 0x42, 0xa6, 0x87, 0x6d, - 0x35, 0x32, 0x56, 0xef, 0x0f, 0x1b, 0x36, 0xbb, 0x4f, 0x70, 0x3c, 0xc2, 0x92, 0x7c, 0x3b, 0x22, - 0xfc, 0x08, 0x91, 0xc3, 0x11, 0x11, 0xd2, 0xf9, 0x18, 0xca, 0x0a, 0xdc, 0xb0, 0x9b, 0x76, 0xab, - 0xbe, 0xb3, 0xe5, 0x4f, 0x28, 0xfd, 0x2e, 0x1d, 0xb2, 0x80, 0x04, 0xda, 0xf9, 0x51, 0x8c, 0x69, - 0xa7, 0xfc, 0xfc, 0xe5, 0xb6, 0x85, 0xb4, 0xb7, 0xf3, 0x09, 0x54, 0x28, 0x0b, 0x88, 0x68, 0xac, - 0x35, 0x4b, 0xad, 0xfa, 0xce, 0xbb, 0xfe, 0x54, 0x2b, 0x3f, 0x8b, 0x12, 0x31, 0xfa, 0x80, 0x05, - 0x24, 0x43, 0x19, 0x6f, 0xe7, 0x3a, 0xd4, 0x06, 0x58, 0x0e, 0x0f, 0xfa, 0xd1, 0x4f, 0xa4, 0x51, - 0x6e, 0xda, 0xad, 0x32, 0xca, 0x0d, 0x5f, 0x95, 0xd7, 0x4b, 0x97, 0xcb, 0x9e, 0x80, 0x8d, 0xb3, - 0x14, 0x0a, 0xa5, 0xe0, 0xf7, 0x69, 0x40, 0x9e, 0xea, 0x7d, 0x96, 0x50, 0x6e, 0x70, 0xee, 0x41, - 0x4d, 0x46, 0x09, 0x41, 0x98, 0x86, 0xa4, 0xb1, 0xa6, 0x4f, 0xb1, 0x3d, 0xff, 0x14, 0xbb, 0x13, - 0xb7, 0x6c, 0x53, 0x39, 0xce, 0xfb, 0xb7, 0x0c, 0x57, 0x67, 0xe4, 0x11, 0x29, 0xa3, 0x82, 0x38, - 0xdf, 0xc1, 0x86, 0x50, 0x47, 0x13, 0xdf, 0x10, 0x89, 0x03, 0x2c, 0x71, 0xa6, 0x94, 0x7f, 0xfe, - 0xc8, 0x67, 0x91, 0xfd, 0x33, 0xa8, 0x9e, 0x85, 0x66, 0x78, 0x9c, 0x87, 0x50, 0x17, 0x92, 0x47, - 0x34, 0xdc, 0xc3, 0xf1, 0x68, 0xb2, 0xf5, 0x0f, 0x96, 0xd2, 0xe6, 0x90, 0x9e, 0x85, 0x8a, 0x0c, - 0x9a, 0x70, 0x88, 0x63, 0xcc, 0x0d, 0x61, 0x69, 0x45, 0xc2, 0x1c, 0xa2, 0x09, 0xf3, 0xa9, 0x23, - 0xe0, 0x5a, 0x44, 0x85, 0xc4, 0x54, 0xee, 0x91, 0xa1, 0x64, 0xdc, 0x1c, 0xe9, 0x0b, 0x25, 0x42, - 0x59, 0x93, 0xdf, 0x5a, 0x46, 0x7e, 0x7f, 0x3e, 0xbc, 0x67, 0xa1, 0x45, 0xcc, 0x4e, 0x08, 0x57, - 0xb8, 0xca, 0x49, 0xb6, 0x20, 0x49, 0xaa, 0x03, 0x56, 0x74, 0xc0, 0x9b, 0xcb, 0x02, 0xa2, 0xf3, - 0xd0, 0x9e, 0x85, 0xe6, 0x31, 0xaa, 0x40, 0x64, 0x5a, 0x68, 0xf7, 0x58, 0x92, 0xc6, 0x44, 0x92, - 0xa0, 0x51, 0x5d, 0x2d, 0x50, 0xf7, 0x3c, 0x54, 0x05, 0x9a, 0xc3, 0xd8, 0xa9, 0xc1, 0xa5, 0x84, - 0x08, 0x81, 0x43, 0xe2, 0xfd, 0x0c, 0xef, 0xaf, 0x50, 0x2c, 0x4b, 0x2a, 0xfe, 0x16, 0x54, 0x4d, - 0x29, 0xcd, 0xb9, 0x7d, 0x67, 0x89, 0xb2, 0x42, 0xcf, 0xdc, 0xbd, 0x67, 0x36, 0x6c, 0xcc, 0x44, - 0xda, 0x87, 0x6a, 0x8c, 0x07, 0x24, 0x16, 0x0d, 0x5b, 0x73, 0x5d, 0xf1, 0x87, 0x8c, 0x4b, 0xf2, - 0x34, 0x1d, 0xf8, 0x5f, 0x2b, 0xfb, 0x23, 0x1c, 0xf1, 0xce, 0x67, 0x8a, 0xe5, 0xef, 0x97, 0xdb, - 0x1f, 0xad, 0xd2, 0x06, 0x0d, 0xee, 0x6e, 0x80, 0x53, 0x49, 0x38, 0xca, 0xd8, 0x9d, 0x2d, 0x58, - 0x0f, 0x38, 0x4b, 0x1f, 0xe0, 0xc4, 0x54, 0xfa, 0x3a, 0x9a, 0xce, 0xbd, 0x3d, 0x68, 0x2e, 0x2b, - 0xf5, 0x25, 0x8a, 0x6c, 0x42, 0xe5, 0xc9, 0xf4, 0x12, 0xd5, 0x90, 0x99, 0x78, 0xe9, 0x22, 0xde, - 0x42, 0x89, 0x5f, 0xcc, 0xeb, 0x43, 0x55, 0x53, 0x4d, 0x94, 0xbe, 0x9c, 0xab, 0xd3, 0xc7, 0x2a, - 0xbd, 0x13, 0x81, 0x8d, 0x97, 0xf7, 0x9b, 0x0d, 0x37, 0x5e, 0xe7, 0x1e, 0x2c, 0x09, 0x7f, 0x67, - 0x26, 0xd1, 0x5e, 0x21, 0xd1, 0x0b, 0x18, 0x67, 0x32, 0xfe, 0xab, 0x0d, 0xd7, 0x16, 0xc5, 0xf6, - 0xa1, 0xba, 0x1f, 0x33, 0x2c, 0x27, 0xa9, 0x5f, 0x78, 0x38, 0xe3, 0xe5, 0x74, 0x00, 0x0e, 0x22, - 0x21, 0x59, 0xc8, 0x71, 0x32, 0xd9, 0xd1, 0xf5, 0x1c, 0xf3, 0xa5, 0xf2, 0xea, 0x4d, 0x1c, 0x74, - 0xdd, 0x18, 0x7c, 0x01, 0xe5, 0xfd, 0xb5, 0x06, 0xad, 0x55, 0xef, 0xed, 0x12, 0x71, 0x9a, 0x50, - 0x37, 0x87, 0x34, 0xeb, 0x6b, 0x7a, 0xbd, 0x68, 0x72, 0xae, 0x42, 0x55, 0x48, 0x92, 0x3e, 0x96, - 0xba, 0x15, 0x96, 0x50, 0x45, 0xcd, 0x76, 0x9d, 0x6d, 0xa8, 0xeb, 0x76, 0xf0, 0x58, 0x48, 0xcc, - 0xa5, 0xee, 0x64, 0x25, 0x04, 0xda, 0xd4, 0x57, 0x16, 0xe7, 0x3d, 0xa8, 0x19, 0x07, 0x42, 0x03, - 0xdd, 0x77, 0x4a, 0x68, 0x5d, 0x1b, 0xba, 0x34, 0x28, 0xa8, 0x56, 0x7d, 0x03, 0xd5, 0x2e, 0xbd, - 0x91, 0x6a, 0x0f, 0xa1, 0xd2, 0xe5, 0x9c, 0x71, 0xe7, 0x06, 0x94, 0xe5, 0x51, 0x4a, 0xb4, 0x18, - 0x1b, 0x3b, 0x8d, 0x9c, 0x46, 0x6b, 0xa9, 0x7d, 0x76, 0x8f, 0x52, 0x82, 0xb4, 0x97, 0xd3, 0x98, - 0xf6, 0x9d, 0xec, 0x5e, 0x4c, 0xdb, 0xd0, 0x33, 0x7b, 0x41, 0x1a, 0xe6, 0x74, 0x35, 0xe7, 0x73, - 0xa8, 0x63, 0x4a, 0x99, 0xd4, 0x66, 0x91, 0x7d, 0xfe, 0xde, 0x29, 0x94, 0xe2, 0xdd, 0x7c, 0x35, - 0xdb, 0x7c, 0x11, 0xe0, 0xb4, 0xa0, 0xa2, 0x5f, 0x38, 0xd9, 0x17, 0xee, 0x2d, 0xdf, 0xbc, 0x77, - 0xfa, 0xea, 0x77, 0xf2, 0x3c, 0xd0, 0x26, 0xef, 0x36, 0xd4, 0x0b, 0x5c, 0xaa, 0x67, 0xfc, 0x88, - 0xb9, 0xfa, 0x8c, 0x9b, 0x12, 0xad, 0xa1, 0xe9, 0x5c, 0xdd, 0xf8, 0x88, 0xee, 0x33, 0x53, 0x87, - 0x35, 0x64, 0x26, 0x9d, 0xdb, 0xc7, 0x27, 0xae, 0xf5, 0xe2, 0xc4, 0xb5, 0x5e, 0x9d, 0xb8, 0xf6, - 0x2f, 0x63, 0xd7, 0xfe, 0x73, 0xec, 0xda, 0xcf, 0xc7, 0xae, 0x7d, 0x3c, 0x76, 0xed, 0x7f, 0xc6, - 0xae, 0xfd, 0xdf, 0xd8, 0xb5, 0x5e, 0x8d, 0x5d, 0xfb, 0xf7, 0x53, 0xd7, 0x3a, 0x3e, 0x75, 0xad, - 0x17, 0xa7, 0xae, 0xf5, 0x7d, 0xfe, 0xd0, 0x1b, 0x54, 0xf5, 0x6b, 0xe9, 0xe6, 0xff, 0x01, 0x00, - 0x00, 0xff, 0xff, 0x09, 0x68, 0x81, 0x05, 0x0b, 0x0a, 0x00, 0x00, + 0x14, 0xdf, 0x8d, 0xff, 0x34, 0x7e, 0x86, 0xa8, 0x9a, 0xa6, 0xc4, 0x84, 0x6a, 0x13, 0x2d, 0x97, + 0x48, 0x54, 0x6b, 0x48, 0x81, 0xc2, 0x85, 0xb6, 0x0e, 0xa9, 0x5c, 0x04, 0x6d, 0x19, 0x47, 0x11, + 0xe2, 0x52, 0x8d, 0xbd, 0x93, 0xcd, 0x8a, 0xdd, 0x99, 0xcd, 0xcc, 0xb8, 0x34, 0x9c, 0xf8, 0x02, + 0x20, 0x0e, 0xfd, 0x10, 0xdc, 0x91, 0xf8, 0x0c, 0x3d, 0xe6, 0x58, 0x71, 0xa8, 0x88, 0x23, 0x24, + 0x8e, 0xfd, 0x08, 0x68, 0x66, 0xd6, 0x5e, 0x3b, 0xb1, 0x63, 0xd3, 0x8b, 0x3d, 0xf3, 0xe6, 0xf7, + 0xfb, 0xbd, 0x99, 0x37, 0xef, 0xbd, 0x1d, 0x78, 0xfb, 0xa8, 0x4f, 0x45, 0x4c, 0x45, 0x90, 0x09, + 0xae, 0x38, 0xaa, 0xe5, 0xd3, 0xac, 0xbb, 0xfe, 0x61, 0x14, 0xab, 0xc3, 0x7e, 0x37, 0xe8, 0xf1, + 0xb4, 0x19, 0x09, 0x72, 0x40, 0x18, 0x69, 0xa6, 0x71, 0x1a, 0x8b, 0x66, 0xf6, 0x43, 0x64, 0x47, + 0x59, 0xd7, 0xfe, 0x5b, 0xf2, 0xfa, 0xa7, 0x97, 0x32, 0x72, 0xe5, 0xa6, 0x54, 0x44, 0x49, 0xfb, + 0x9b, 0xf3, 0xee, 0x5e, 0xca, 0x93, 0x4a, 0x50, 0x92, 0xc6, 0x2c, 0xca, 0x04, 0x4f, 0x8f, 0x92, + 0x66, 0x96, 0x10, 0xc6, 0x62, 0x16, 0x99, 0x41, 0xae, 0xb0, 0x1a, 0xf1, 0x88, 0x9b, 0x61, 0x53, + 0x8f, 0xac, 0xd5, 0x3f, 0x75, 0x61, 0x75, 0xf7, 0x29, 0x49, 0xfa, 0x44, 0xd1, 0x6f, 0xfb, 0x54, + 0x1c, 0x63, 0x7a, 0xd4, 0xa7, 0x52, 0xa1, 0x8f, 0xa1, 0xac, 0xc9, 0x0d, 0x77, 0xd3, 0xdd, 0xaa, + 0x6f, 0xaf, 0x07, 0x43, 0xc9, 0x60, 0x97, 0xf5, 0x78, 0x48, 0x43, 0x03, 0x7e, 0x9c, 0x10, 0xd6, + 0x2a, 0xbf, 0x78, 0xb5, 0xe1, 0x60, 0x83, 0x46, 0x9f, 0x40, 0x85, 0xf1, 0x90, 0xca, 0xc6, 0xd2, + 0x66, 0x69, 0xab, 0xbe, 0xfd, 0x6e, 0x30, 0x8a, 0x55, 0x90, 0x7b, 0x89, 0x39, 0x7b, 0xc8, 0x43, + 0x9a, 0xb3, 0x2c, 0x1a, 0xdd, 0x80, 0x5a, 0x97, 0xa8, 0xde, 0x61, 0x27, 0xfe, 0x89, 0x36, 0xca, + 0x9b, 0xee, 0x56, 0x19, 0x17, 0x06, 0xf4, 0x19, 0xac, 0x49, 0xad, 0x22, 0xbf, 0xa1, 0x8a, 0x84, + 0x44, 0x91, 0xd6, 0x08, 0x5b, 0x31, 0xd8, 0x59, 0xcb, 0x5f, 0x95, 0x97, 0x4b, 0x57, 0xcb, 0xbe, + 0x84, 0x95, 0x49, 0xe7, 0xda, 0x9f, 0x76, 0xfc, 0x80, 0x85, 0xf4, 0x99, 0x39, 0x61, 0x09, 0x17, + 0x06, 0xb4, 0x03, 0x35, 0x15, 0xa7, 0x14, 0x13, 0x16, 0xd1, 0xc6, 0x92, 0x39, 0xff, 0xc6, 0xf4, + 0xf3, 0xef, 0x0d, 0x61, 0xf9, 0x71, 0x0a, 0x9e, 0xff, 0x4f, 0x19, 0xae, 0x9f, 0x0b, 0xac, 0xcc, + 0x38, 0x93, 0x14, 0x7d, 0x07, 0x2b, 0x93, 0xfb, 0xcd, 0x63, 0x1c, 0x5c, 0x0c, 0xd6, 0x24, 0xb3, + 0x33, 0xc1, 0x6a, 0x3b, 0xf8, 0x9c, 0x0e, 0x7a, 0x04, 0x75, 0xa9, 0x44, 0xcc, 0xa2, 0x7d, 0x92, + 0xf4, 0x87, 0x5b, 0xff, 0x60, 0xae, 0x6c, 0x41, 0x69, 0x3b, 0x78, 0x5c, 0xc1, 0x08, 0xf6, 0x48, + 0x42, 0x84, 0x15, 0x2c, 0x2d, 0x28, 0x58, 0x50, 0x8c, 0x60, 0x31, 0x45, 0x12, 0xd6, 0x62, 0x26, + 0x15, 0x61, 0x6a, 0x9f, 0xf6, 0x14, 0x17, 0xf6, 0x48, 0x5f, 0xea, 0x20, 0x94, 0x8d, 0xf8, 0xed, + 0x79, 0xe2, 0x0f, 0xa6, 0xd3, 0xdb, 0x0e, 0x9e, 0xa5, 0x8c, 0x22, 0xb8, 0x26, 0xf4, 0x9d, 0xe4, + 0x0b, 0x8a, 0x66, 0xc6, 0x61, 0xc5, 0x38, 0xbc, 0x35, 0xcf, 0x21, 0xbe, 0x48, 0x6d, 0x3b, 0x78, + 0x9a, 0xa2, 0x76, 0x44, 0x47, 0x89, 0xb6, 0xc3, 0xd3, 0x2c, 0xa1, 0x8a, 0x86, 0x8d, 0xea, 0x62, + 0x8e, 0x76, 0x2f, 0x52, 0xb5, 0xa3, 0x29, 0x8a, 0xad, 0x1a, 0x5c, 0x49, 0xa9, 0x94, 0x24, 0xa2, + 0xfe, 0x9f, 0x2e, 0xbc, 0xbf, 0x40, 0xb6, 0xcc, 0x49, 0xf9, 0xdb, 0x50, 0xb5, 0xb9, 0x34, 0xa5, + 0x70, 0x27, 0x85, 0xf2, 0x4c, 0xcf, 0xe1, 0xba, 0x36, 0x15, 0x57, 0x24, 0xb1, 0xa0, 0x1d, 0xde, + 0x67, 0xea, 0x3e, 0x17, 0xba, 0xc8, 0x4c, 0xb6, 0x94, 0xf0, 0xac, 0x65, 0xff, 0xb9, 0x0b, 0x2b, + 0xe7, 0xf6, 0x78, 0x00, 0xd5, 0x84, 0x74, 0x69, 0x22, 0x1b, 0xae, 0xd9, 0xc5, 0xb5, 0xa0, 0xc7, + 0x85, 0xa2, 0xcf, 0xb2, 0x6e, 0xf0, 0xb5, 0xb6, 0x3f, 0x26, 0xb1, 0x68, 0x7d, 0xae, 0xfd, 0xff, + 0xf5, 0x6a, 0xe3, 0xa3, 0x45, 0x7a, 0xaf, 0xe5, 0xdd, 0x0b, 0x49, 0xa6, 0xa8, 0xc0, 0xb9, 0x3a, + 0x5a, 0x87, 0xe5, 0x50, 0xf0, 0xec, 0x21, 0x49, 0x6d, 0x91, 0x2c, 0xe3, 0xd1, 0xdc, 0xdf, 0x87, + 0xcd, 0x79, 0x55, 0x32, 0x27, 0x96, 0xab, 0x50, 0x79, 0x3a, 0xaa, 0xbf, 0x1a, 0xb6, 0x13, 0x3f, + 0x9b, 0xa5, 0x3b, 0x56, 0x1d, 0x97, 0xeb, 0x06, 0x50, 0x35, 0x52, 0xc3, 0x3b, 0xba, 0x5a, 0x44, + 0xa7, 0x43, 0x74, 0x66, 0x0c, 0xaf, 0xc6, 0xa2, 0xfc, 0x5f, 0x5d, 0xb8, 0xf9, 0x7f, 0x4a, 0x68, + 0x8e, 0xfb, 0xbb, 0xe7, 0x52, 0xc4, 0x1f, 0x4b, 0x91, 0x19, 0x8a, 0x93, 0xb9, 0xe2, 0xff, 0xe2, + 0xc2, 0xda, 0x2c, 0xdf, 0x01, 0x54, 0x0f, 0x12, 0x4e, 0xd4, 0xf0, 0xea, 0x67, 0x1e, 0xce, 0xa2, + 0x50, 0x0b, 0xe0, 0x30, 0x96, 0x8a, 0x47, 0x82, 0xa4, 0xc3, 0x1d, 0xdd, 0x28, 0x38, 0xf7, 0x35, + 0xaa, 0x3d, 0x04, 0x98, 0xbc, 0xb1, 0xfc, 0x31, 0x96, 0xff, 0xc7, 0x12, 0x6c, 0x2d, 0x5a, 0xf2, + 0x73, 0x82, 0xb3, 0x09, 0x75, 0x7b, 0x48, 0xbb, 0xbe, 0x64, 0xd6, 0xc7, 0x4d, 0xe8, 0x3a, 0x54, + 0xa5, 0xa2, 0xd9, 0x13, 0x95, 0xd7, 0x45, 0x45, 0xcf, 0xf6, 0xd0, 0x06, 0xd4, 0x4d, 0x27, 0x79, + 0x22, 0x15, 0x11, 0xca, 0x34, 0xc1, 0x12, 0x06, 0x63, 0xea, 0x68, 0x0b, 0x7a, 0x0f, 0x6a, 0x16, + 0x40, 0x59, 0x68, 0x5a, 0x56, 0x09, 0x2f, 0x1b, 0xc3, 0x2e, 0x0b, 0xc7, 0xa2, 0x56, 0x7d, 0x83, + 0xa8, 0x5d, 0x79, 0xa3, 0xa8, 0x3d, 0x82, 0xca, 0xae, 0x10, 0x5c, 0xa0, 0x9b, 0x50, 0x56, 0xc7, + 0x19, 0x35, 0xc1, 0x58, 0xd9, 0x6e, 0x14, 0x32, 0x26, 0x96, 0x06, 0xb3, 0x77, 0x9c, 0x51, 0x6c, + 0x50, 0xa8, 0x31, 0x6a, 0x59, 0x79, 0x5d, 0x8c, 0x3a, 0xd8, 0x73, 0x77, 0xc6, 0x35, 0x4c, 0x69, + 0x88, 0xe8, 0x0b, 0xa8, 0x13, 0xc6, 0xb8, 0x32, 0x66, 0x99, 0x7f, 0x39, 0xdf, 0x19, 0x4b, 0xc5, + 0x7b, 0xc5, 0x6a, 0xbe, 0xf9, 0x71, 0x02, 0xda, 0x82, 0x8a, 0x79, 0x56, 0xe5, 0x1f, 0xc7, 0xb7, + 0x02, 0xfb, 0xc8, 0xea, 0xe8, 0xdf, 0xe1, 0x9b, 0xc4, 0x98, 0xfc, 0x3b, 0x50, 0x1f, 0xd3, 0xd2, + 0x3d, 0xe3, 0x47, 0x22, 0xf4, 0x0b, 0xc0, 0xa6, 0x68, 0x0d, 0x8f, 0xe6, 0xba, 0xe2, 0x63, 0x76, + 0xc0, 0x6d, 0x1e, 0xd6, 0xb0, 0x9d, 0xb4, 0xee, 0x9c, 0x9c, 0x7a, 0xce, 0xcb, 0x53, 0xcf, 0x79, + 0x7d, 0xea, 0xb9, 0x3f, 0x0f, 0x3c, 0xf7, 0xf7, 0x81, 0xe7, 0xbe, 0x18, 0x78, 0xee, 0xc9, 0xc0, + 0x73, 0xff, 0x1e, 0x78, 0xee, 0xbf, 0x03, 0xcf, 0x79, 0x3d, 0xf0, 0xdc, 0xdf, 0xce, 0x3c, 0xe7, + 0xe4, 0xcc, 0x73, 0x5e, 0x9e, 0x79, 0xce, 0xf7, 0xc5, 0xeb, 0xb2, 0x5b, 0x35, 0x4f, 0xb4, 0x5b, + 0xff, 0x05, 0x00, 0x00, 0xff, 0xff, 0x31, 0xdd, 0x29, 0xe5, 0x80, 0x0a, 0x00, 0x00, } func (this *EvaluateQueryRequest) Equal(that interface{}) bool { @@ -941,6 +967,9 @@ func (this *EvaluateQueryRequest) Equal(that interface{}) bool { if this.BatchSize != that1.BatchSize { return false } + if this.SeriesMetadataBatchSize != that1.SeriesMetadataBatchSize { + return false + } return true } func (this *EvaluationNode) Equal(that interface{}) bool { @@ -1174,6 +1203,9 @@ func (this *EvaluateQueryResponseSeriesMetadata) Equal(that interface{}) bool { return false } } + if this.TotalSeriesCountForNode != that1.TotalSeriesCountForNode { + return false + } return true } func (this *SeriesMetadata) Equal(that interface{}) bool { @@ -1483,7 +1515,7 @@ func (this *EvaluateQueryRequest) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 7) + s := make([]string, 0, 8) s = append(s, "&querierpb.EvaluateQueryRequest{") s = append(s, "Plan: "+strings.Replace(this.Plan.GoString(), `&`, ``, 1)+",\n") if this.Nodes != nil { @@ -1494,6 +1526,7 @@ func (this *EvaluateQueryRequest) GoString() string { s = append(s, "Nodes: "+fmt.Sprintf("%#v", vs)+",\n") } s = append(s, "BatchSize: "+fmt.Sprintf("%#v", this.BatchSize)+",\n") + s = append(s, "SeriesMetadataBatchSize: "+fmt.Sprintf("%#v", this.SeriesMetadataBatchSize)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1572,7 +1605,7 @@ func (this *EvaluateQueryResponseSeriesMetadata) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 6) + s := make([]string, 0, 7) s = append(s, "&querierpb.EvaluateQueryResponseSeriesMetadata{") s = append(s, "NodeIndex: "+fmt.Sprintf("%#v", this.NodeIndex)+",\n") if this.Series != nil { @@ -1582,6 +1615,7 @@ func (this *EvaluateQueryResponseSeriesMetadata) GoString() string { } s = append(s, "Series: "+fmt.Sprintf("%#v", vs)+",\n") } + s = append(s, "TotalSeriesCountForNode: "+fmt.Sprintf("%#v", this.TotalSeriesCountForNode)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1753,6 +1787,11 @@ func (m *EvaluateQueryRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.SeriesMetadataBatchSize != 0 { + i = encodeVarintQuerier(dAtA, i, uint64(m.SeriesMetadataBatchSize)) + i-- + dAtA[i] = 0x28 + } if m.BatchSize != 0 { i = encodeVarintQuerier(dAtA, i, uint64(m.BatchSize)) i-- @@ -2001,6 +2040,11 @@ func (m *EvaluateQueryResponseSeriesMetadata) MarshalToSizedBuffer(dAtA []byte) _ = i var l int _ = l + if m.TotalSeriesCountForNode != 0 { + i = encodeVarintQuerier(dAtA, i, uint64(m.TotalSeriesCountForNode)) + i-- + dAtA[i] = 0x18 + } if len(m.Series) > 0 { for iNdEx := len(m.Series) - 1; iNdEx >= 0; iNdEx-- { { @@ -2463,6 +2507,9 @@ func (m *EvaluateQueryRequest) Size() (n int) { if m.BatchSize != 0 { n += 1 + sovQuerier(uint64(m.BatchSize)) } + if m.SeriesMetadataBatchSize != 0 { + n += 1 + sovQuerier(uint64(m.SeriesMetadataBatchSize)) + } return n } @@ -2579,6 +2626,9 @@ func (m *EvaluateQueryResponseSeriesMetadata) Size() (n int) { n += 1 + l + sovQuerier(uint64(l)) } } + if m.TotalSeriesCountForNode != 0 { + n += 1 + sovQuerier(uint64(m.TotalSeriesCountForNode)) + } return n } @@ -2778,6 +2828,7 @@ func (this *EvaluateQueryRequest) String() string { `Plan:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Plan), "EncodedQueryPlan", "planning.EncodedQueryPlan", 1), `&`, ``, 1) + `,`, `Nodes:` + repeatedStringForNodes + `,`, `BatchSize:` + fmt.Sprintf("%v", this.BatchSize) + `,`, + `SeriesMetadataBatchSize:` + fmt.Sprintf("%v", this.SeriesMetadataBatchSize) + `,`, `}`, }, "") return s @@ -2875,6 +2926,7 @@ func (this *EvaluateQueryResponseSeriesMetadata) String() string { s := strings.Join([]string{`&EvaluateQueryResponseSeriesMetadata{`, `NodeIndex:` + fmt.Sprintf("%v", this.NodeIndex) + `,`, `Series:` + repeatedStringForSeries + `,`, + `TotalSeriesCountForNode:` + fmt.Sprintf("%v", this.TotalSeriesCountForNode) + `,`, `}`, }, "") return s @@ -3136,6 +3188,25 @@ func (m *EvaluateQueryRequest) Unmarshal(dAtA []byte) error { break } } + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SeriesMetadataBatchSize", wireType) + } + m.SeriesMetadataBatchSize = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuerier + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.SeriesMetadataBatchSize |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipQuerier(dAtA[iNdEx:]) @@ -3601,6 +3672,25 @@ func (m *EvaluateQueryResponseSeriesMetadata) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TotalSeriesCountForNode", wireType) + } + m.TotalSeriesCountForNode = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuerier + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TotalSeriesCountForNode |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipQuerier(dAtA[iNdEx:]) diff --git a/pkg/querier/querierpb/querier.proto b/pkg/querier/querierpb/querier.proto index ac90005d8c3..d94801a3945 100644 --- a/pkg/querier/querierpb/querier.proto +++ b/pkg/querier/querierpb/querier.proto @@ -18,6 +18,11 @@ message EvaluateQueryRequest { repeated EvaluationNode nodes = 2 [(gogoproto.nullable) = false]; reserved 3; // Formerly: enablePerStepStats uint64 batchSize = 4; + // seriesMetadataBatchSize is the maximum number of series metadata entries to include in a single + // EvaluateQueryResponseSeriesMetadata message. A non-zero value indicates that the query-frontend + // supports receiving batched series metadata messages (ie. multiple EvaluateQueryResponseSeriesMetadata + // messages per node). + uint64 seriesMetadataBatchSize = 5; } message EvaluationNode { @@ -39,6 +44,12 @@ message EvaluateQueryResponse { message EvaluateQueryResponseSeriesMetadata { int64 nodeIndex = 1; repeated SeriesMetadata series = 2 [(gogoproto.nullable) = false]; + + // totalSeriesCountForNode is non-zero if the EvaluateQueryRequest has a non-zero seriesMetadataBatchSize and the + // querier supports batching series metadata messages. + // Why send a total count rather than a "more messages to come" flag? Sending the count allows the query-frontend to + // allocate a slice of exactly the right length to hold all series. + int64 totalSeriesCountForNode = 3; } message SeriesMetadata {