Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@
* [ENHANCEMENT] Distributor: OTLP endpoint now returns partial success (HTTP 200) instead of HTTP 429 when the usage tracker rejects some series due to the active series limit but other series are successfully ingested. The `RejectedDataPoints` field reports the count of distributor-side rejections (usage tracker filtering). #14789
* [ENHANCEMENT] MQE: Account for memory consumption of labels returned by binary operations in query memory consumption estimate earlier. #15033
* [ENHANCEMENT] Query-frontend: Log the number of series and samples returned for queries in `query stats` log lines. #15044
* [ENHANCEMENT] Querier and query-frontend: When remote execution is enabled, send series metadata in batches, rather than in a single large message. The batch size can be configured with `-query-frontend.remote-execution-series-metadata-batch-size`. #15047
* [ENHANCEMENT] Ingest storage: Update the default configuration to enable ingest storage concurrency: #15072
* `-ingest-storage.kafka.fetch-concurrency-max` from `0` to `12`
* `-ingest-storage.kafka.ingestion-concurrency-max` from `0` to `8`
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -8263,6 +8263,17 @@
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "remote_execution_series_metadata_batch_size",
"required": false,
"desc": "Maximum number of series metadata entries to send in a single remote execution response from a querier.",
"fieldValue": null,
"fieldDefaultValue": 128,
"fieldFlag": "query-frontend.remote-execution-series-metadata-batch-size",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "split_queries_by_interval",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2591,6 +2591,8 @@ Usage of ./cmd/mimir/mimir:
False to disable query statistics tracking. When enabled, a message with some statistics is logged for every query. (default true)
-query-frontend.remote-execution-batch-size uint
[experimental] Maximum number of series to send in a single remote execution response from a querier. (default 128)
-query-frontend.remote-execution-series-metadata-batch-size uint
[experimental] Maximum number of series metadata entries to send in a single remote execution response from a querier. (default 128)
-query-frontend.results-cache-ttl duration
Time to live duration for cached query results. If query falls into out-of-order time window, -query-frontend.results-cache-ttl-for-out-of-order-time-window is used instead. (default 1w)
-query-frontend.results-cache-ttl-for-cardinality-query duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2175,6 +2175,11 @@ The `frontend` block configures the query-frontend.
# CLI flag: -query-frontend.remote-execution-batch-size
[remote_execution_batch_size: <int> | default = 128]

# (experimental) Maximum number of series metadata entries to send in a single
# remote execution response from a querier.
# CLI flag: -query-frontend.remote-execution-series-metadata-batch-size
[remote_execution_series_metadata_batch_size: <int> | default = 128]

# (advanced) Split range queries by an interval and execute in parallel. You
# should use a multiple of 24 hours to optimize querying blocks. 0 to disable
# it.
Expand Down
1 change: 1 addition & 0 deletions operations/mimir/mimir-flags-defaults.json
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@
"query-frontend.instance-addr": "",
"query-frontend.instance-port": 0,
"query-frontend.remote-execution-batch-size": 128,
"query-frontend.remote-execution-series-metadata-batch-size": 128,
"query-frontend.split-queries-by-interval": 86400000000000,
"query-frontend.results-cache.backend": "",
"query-frontend.results-cache.memcached.addresses": "",
Expand Down
8 changes: 7 additions & 1 deletion pkg/frontend/v2/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ type Config struct {
Addr string `yaml:"address" category:"advanced"`
Port int `category:"advanced"`

RemoteExecutionBatchSize uint64 `yaml:"remote_execution_batch_size" category:"experimental"`
RemoteExecutionBatchSize uint64 `yaml:"remote_execution_batch_size" category:"experimental"`
RemoteExecutionSeriesMetadataBatchSize uint64 `yaml:"remote_execution_series_metadata_batch_size" category:"experimental"`

// These configuration options are injected internally.
QuerySchedulerDiscovery schedulerdiscovery.Config `yaml:"-"`
Expand All @@ -94,6 +95,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.IntVar(&cfg.Port, "query-frontend.instance-port", 0, "Port to advertise to querier (via scheduler) (defaults to server.grpc-listen-port).")

f.Uint64Var(&cfg.RemoteExecutionBatchSize, "query-frontend.remote-execution-batch-size", 128, "Maximum number of series to send in a single remote execution response from a querier.")
f.Uint64Var(&cfg.RemoteExecutionSeriesMetadataBatchSize, "query-frontend.remote-execution-series-metadata-batch-size", 128, "Maximum number of series metadata entries to send in a single remote execution response from a querier.")

cfg.GRPCClientConfig.CustomCompressors = []string{s2.Name}
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-frontend.grpc-client-config", f)
Expand All @@ -108,6 +110,10 @@ func (cfg *Config) Validate() error {
return fmt.Errorf("remote execution batch size must be greater than 0")
}

if cfg.RemoteExecutionSeriesMetadataBatchSize <= 0 {
return fmt.Errorf("remote execution series metadata batch size must be greater than 0")
}

return cfg.GRPCClientConfig.Validate()
}

Expand Down
75 changes: 48 additions & 27 deletions pkg/frontend/v2/remoteexec.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,10 @@ func (g *RemoteExecutionGroupEvaluator) sendRequest(ctx context.Context) error {
}

req := &querierpb.EvaluateQueryRequest{
Plan: *encodedPlan,
Nodes: make([]querierpb.EvaluationNode, 0, len(nodeIndices)),
BatchSize: g.cfg.RemoteExecutionBatchSize,
Plan: *encodedPlan,
Nodes: make([]querierpb.EvaluationNode, 0, len(nodeIndices)),
BatchSize: g.cfg.RemoteExecutionBatchSize,
SeriesMetadataBatchSize: g.cfg.RemoteExecutionSeriesMetadataBatchSize,
}

overallQueriedTimeRange := planning.NoDataQueried()
Expand Down Expand Up @@ -706,38 +707,58 @@ func (r *rangeVectorExecutionResponse) Close() {
}

func readSeriesMetadata(ctx context.Context, group *RemoteExecutionGroupEvaluator, nodeStreamIndex remoteExecutionNodeStreamIndex, memoryConsumptionTracker *limiter.MemoryConsumptionTracker) ([]types.SeriesMetadata, error) {
resp, releaseMessage, err := group.getNextMessageForStream(ctx, nodeStreamIndex)
if err != nil {
return nil, err
}
var combinedMetadata []types.SeriesMetadata

// The labels in the message contain references to the underlying buffer, so we can't release it immediately.
// The value returned by FromLabelAdaptersToLabels does not retain a reference to the underlying buffer,
// so we can release the buffer once that method returns.
defer releaseMessage()
readOne := func() (int64, error) {
resp, releaseMessage, err := group.getNextMessageForStream(ctx, nodeStreamIndex)
if err != nil {
return -1, err
}

seriesMetadata := resp.GetSeriesMetadata()
if seriesMetadata == nil {
return nil, fmt.Errorf("expected SeriesMetadata, got %T", resp.Message)
}
// The labels in the message contain references to the underlying buffer, so we can't release it immediately.
// The value returned by FromLabelAdaptersToLabels does not retain a reference to the underlying buffer,
// so we can release the buffer once that method returns.
msg := resp.GetSeriesMetadata()
defer releaseMessage()

mqeSeries, err := types.SeriesMetadataSlicePool.Get(len(seriesMetadata.Series), memoryConsumptionTracker)
if err != nil {
return nil, err
}
if msg == nil {
return -1, fmt.Errorf("expected SeriesMetadata, got %T", resp.Message)
}

for _, s := range seriesMetadata.Series {
m := types.SeriesMetadata{
Labels: mimirpb.FromLabelAdaptersToLabels(s.Labels),
DropName: s.DropName,
if combinedMetadata == nil {
// First message, allocate the slice now.
combinedMetadata, err = types.SeriesMetadataSlicePool.Get(max(len(msg.Series), int(msg.TotalSeriesCountForNode)), memoryConsumptionTracker)
if err != nil {
return -1, err
}
} else if len(combinedMetadata)+len(msg.Series) > cap(combinedMetadata) {
return -1, fmt.Errorf("expected %d series metadata, but got at least %d", cap(combinedMetadata), len(combinedMetadata)+len(msg.Series))
}
mqeSeries, err = types.AppendSeriesMetadata(memoryConsumptionTracker, mqeSeries, m)
if err != nil {
return nil, err

for _, s := range msg.Series {
m := types.SeriesMetadata{
Labels: mimirpb.FromLabelAdaptersToLabels(s.Labels),
DropName: s.DropName,
}
combinedMetadata, err = types.AppendSeriesMetadata(memoryConsumptionTracker, combinedMetadata, m)
if err != nil {
return -1, err
}
}

return msg.TotalSeriesCountForNode, nil
}

return mqeSeries, nil
for {
if totalSeriesCount, err := readOne(); err != nil {
return nil, err
} else if int64(len(combinedMetadata)) >= totalSeriesCount {
// Either the querier doesn't support batching (TotalSeriesCountForNode == 0) and so there's only one message,
// or we've received the last message.
// We're done.
return combinedMetadata, nil
}
}
}

func decodeEvaluationCompletedMessage(msg *querierpb.EvaluateQueryResponseEvaluationCompleted) (*annotations.Annotations, stats.Stats) {
Expand Down
82 changes: 80 additions & 2 deletions pkg/frontend/v2/remoteexec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,79 @@ func TestInstantVectorExecutionResponse_Batching(t *testing.T) {
require.Zerof(t, memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes(), "buffers should be released when closing response, have: %v", memoryConsumptionTracker.DescribeCurrentMemoryConsumption())
}

func TestInstantVectorExecutionResponse_BatchedSeriesMetadata(t *testing.T) {
ctx := context.Background()
memoryConsumptionTracker := limiter.NewUnlimitedMemoryConsumptionTracker(ctx)

stream := &mockResponseStream{
responses: []mockResponse{
{msg: newSeriesMetadataBatch(0, false, 4, labels.FromStrings("series", "1"))},
{msg: newSeriesMetadataBatch(0, false, 4, labels.FromStrings("series", "2"), labels.FromStrings("series", "3"))},
{msg: newSeriesMetadataBatch(0, false, 4, labels.FromStrings("series", "4"))},
{msg: newInstantVectorSeriesData(0, generateFPoints(1000, 1, 0), nil)},
{msg: newInstantVectorSeriesData(0, generateFPoints(1000, 1, 1), nil)},
{msg: newInstantVectorSeriesData(0, generateFPoints(1000, 1, 2), nil)},
{msg: newInstantVectorSeriesData(0, generateFPoints(1000, 1, 3), nil)},
},
}

frontend := &mockFrontend{stream: stream}
group := NewRemoteExecutionGroupEvaluator(frontend, Config{}, true, &planning.QueryParameters{}, memoryConsumptionTracker)
response, err := group.CreateInstantVectorExecution(ctx, createDummyNode(), types.NewInstantQueryTimeRange(time.Now()))
require.NoError(t, err)

require.NoError(t, response.Start(ctx))
series, err := response.GetSeriesMetadata(ctx)
require.NoError(t, err)

expectedSeries := []types.SeriesMetadata{
{Labels: labels.FromStrings("series", "1")},
{Labels: labels.FromStrings("series", "2")},
{Labels: labels.FromStrings("series", "3")},
{Labels: labels.FromStrings("series", "4")},
}
require.Equal(t, expectedSeries, series)
types.SeriesMetadataSlicePool.Put(&series, memoryConsumptionTracker)

response.Close()
require.True(t, stream.closed.Load())
require.Zerof(t, memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes(), "buffers should be released when closing response, have: %v", memoryConsumptionTracker.DescribeCurrentMemoryConsumption())
}

func TestRangeVectorExecutionResponse_BatchedSeriesMetadata(t *testing.T) {
ctx := context.Background()
memoryConsumptionTracker := limiter.NewUnlimitedMemoryConsumptionTracker(ctx)

stream := &mockResponseStream{
responses: []mockResponse{
{msg: newSeriesMetadataBatch(0, false, 2, labels.FromStrings("series", "1"))},
{msg: newSeriesMetadataBatch(0, false, 2, labels.FromStrings("series", "2"))},
{msg: newRangeVectorStepData(0, 0, -10_000, 0, generateFPoints(1000, 1, 0), nil)},
{msg: newRangeVectorStepData(1, 0, -10_000, 0, generateFPoints(1000, 1, 1), nil)},
},
}

frontend := &mockFrontend{stream: stream}
group := NewRemoteExecutionGroupEvaluator(frontend, Config{}, true, &planning.QueryParameters{}, memoryConsumptionTracker)
response, err := group.CreateRangeVectorExecution(ctx, createDummyNode(), types.NewInstantQueryTimeRange(time.Now()))
require.NoError(t, err)

require.NoError(t, response.Start(ctx))
series, err := response.GetSeriesMetadata(ctx)
require.NoError(t, err)

expectedSeries := []types.SeriesMetadata{
{Labels: labels.FromStrings("series", "1")},
{Labels: labels.FromStrings("series", "2")},
}
require.Equal(t, expectedSeries, series)
types.SeriesMetadataSlicePool.Put(&series, memoryConsumptionTracker)

response.Close()
require.True(t, stream.closed.Load())
require.Zerof(t, memoryConsumptionTracker.CurrentEstimatedMemoryConsumptionBytes(), "buffers should be released when closing response, have: %v", memoryConsumptionTracker.DescribeCurrentMemoryConsumption())
}

func TestInstantVectorExecutionResponse_DelayedNameRemoval(t *testing.T) {
ctx := context.Background()
memoryConsumptionTracker := limiter.NewUnlimitedMemoryConsumptionTracker(ctx)
Expand Down Expand Up @@ -939,6 +1012,10 @@ func newScalarValue(samples ...mimirpb.Sample) *frontendv2pb.QueryResultStreamRe
}

func newSeriesMetadata(nodeIndex int64, dropName bool, series ...labels.Labels) *frontendv2pb.QueryResultStreamRequest {
return newSeriesMetadataBatch(nodeIndex, dropName, 0, series...)
}

func newSeriesMetadataBatch(nodeIndex int64, dropName bool, totalSeriesCount int64, series ...labels.Labels) *frontendv2pb.QueryResultStreamRequest {
protoSeries := make([]querierpb.SeriesMetadata, 0, len(series))
for _, series := range series {
protoSeries = append(protoSeries, querierpb.SeriesMetadata{
Expand All @@ -952,8 +1029,9 @@ func newSeriesMetadata(nodeIndex int64, dropName bool, series ...labels.Labels)
EvaluateQueryResponse: &querierpb.EvaluateQueryResponse{
Message: &querierpb.EvaluateQueryResponse_SeriesMetadata{
SeriesMetadata: &querierpb.EvaluateQueryResponseSeriesMetadata{
NodeIndex: nodeIndex,
Series: protoSeries,
NodeIndex: nodeIndex,
Series: protoSeries,
TotalSeriesCountForNode: totalSeriesCount,
},
},
},
Expand Down
60 changes: 41 additions & 19 deletions pkg/querier/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ func (d *Dispatcher) evaluateQuery(ctx context.Context, body []byte, resp *query
timeNow: d.timeNow,
originalExpression: req.Plan.OriginalExpression,
batchSize: req.BatchSize,
seriesMetadataBatchSize: req.SeriesMetadataBatchSize,
instantVectorSeriesDataBatches: make(map[planning.Node]*instantVectorSeriesDataBatch, instantVectorNodeCount),
}

Expand Down Expand Up @@ -332,11 +333,12 @@ func (w *queryResponseWriter) write(ctx context.Context, resp *frontendv2pb.Quer
}

type evaluationObserver struct {
w *queryResponseWriter
nodeIndices map[planning.Node]int64
batchSize uint64
startTime time.Time
timeNow func() time.Time
w *queryResponseWriter
nodeIndices map[planning.Node]int64
batchSize uint64
seriesMetadataBatchSize uint64
startTime time.Time
timeNow func() time.Time

instantVectorSeriesDataBatches map[planning.Node]*instantVectorSeriesDataBatch

Expand Down Expand Up @@ -364,23 +366,43 @@ func (o *evaluationObserver) SeriesMetadataEvaluated(ctx context.Context, evalua
return err
}

protoSeries := make([]querierpb.SeriesMetadata, 0, len(series))

for _, s := range series {
protoSeries = append(protoSeries, querierpb.SeriesMetadata{
Labels: mimirpb.FromLabelsToLabelAdapters(s.Labels),
DropName: s.DropName,
})
batchSize := int(o.seriesMetadataBatchSize)
if batchSize == 0 {
// Frontend doesn't support batching metadata, so send everything in one batch.
batchSize = len(series)
}

return o.w.Write(ctx, querierpb.EvaluateQueryResponse{
Message: &querierpb.EvaluateQueryResponse_SeriesMetadata{
SeriesMetadata: &querierpb.EvaluateQueryResponseSeriesMetadata{
NodeIndex: nodeIndex,
Series: protoSeries,
sentOne := false

// Note the slightly unusual condition: we always send at least one message, even when there are no series.
for startIdx := 0; startIdx < len(series) || (len(series) == 0 && !sentOne); startIdx += batchSize {
endIdx := min(startIdx+batchSize, len(series))
batch := series[startIdx:endIdx]

protoSeries := make([]querierpb.SeriesMetadata, 0, len(batch))
for _, s := range batch {
protoSeries = append(protoSeries, querierpb.SeriesMetadata{
Labels: mimirpb.FromLabelsToLabelAdapters(s.Labels),
DropName: s.DropName,
})
}

if err := o.w.Write(ctx, querierpb.EvaluateQueryResponse{
Message: &querierpb.EvaluateQueryResponse_SeriesMetadata{
SeriesMetadata: &querierpb.EvaluateQueryResponseSeriesMetadata{
NodeIndex: nodeIndex,
Series: protoSeries,
TotalSeriesCountForNode: int64(len(series)),
},
},
},
})
}); err != nil {
return err
}

sentOne = true
}

return nil
}

func (o *evaluationObserver) InstantVectorSeriesDataEvaluated(ctx context.Context, evaluator *streamingpromql.Evaluator, node planning.Node, seriesIndex int, seriesCount int, seriesData types.InstantVectorSeriesData) error {
Expand Down
Loading
Loading