Skip to content

Commit 51cc661

Browse files
authored
Remote execution of query plans in queriers (#12302)
#### What this PR does This PR adds support for remote execution of query plans in queriers. Query-frontends can now send a Protobuf message (instead of a smuggled HTTP request) to queriers via query-schedulers, and queriers will respond with a stream of results. This has been implemented in a way where we can add other kinds of query requests and responses in the future, with the goal of eventually moving away from smuggling HTTP requests in Protobuf messages. In the case of queries, query-frontends can now send a query plan and request evaluation of a particular node in the plan, and queriers will respond with results for that node. I've also added a HTTP endpoint to queriers for debugging purposes (`/api/v1/evaluate`), and a CLI tool to exercise this (`tools/evaluate-query-plan`). I plan to remove this in a subsequent PR, once I've added the functionality to query-frontends to send these requests to queriers. Not included in this PR: * batching of responses to query-frontends (each series' data is sent as its own Protobuf message for now) * actually sending requests from query-frontends * metrics and tracing for requests to queriers There are a number of other items I'll handle in follow up PRs (eg. propagating strong consistency information and dealing with query stats) - my goal with this PR is to get the general structure in place and start getting feedback on the approach before I invest too much more time in this. I've chosen not to add a changelog entry given this code is not actively used at present, but open to other opinions on this. #### Which issue(s) this PR fixes or relates to (none) #### Checklist - [x] Tests updated. - [n/a] Documentation added. - [n/a] `CHANGELOG.md` updated - the order of entries should be `[CHANGE]`, `[FEATURE]`, `[ENHANCEMENT]`, `[BUGFIX]`. If changelog entry is not needed, please add the `changelog-not-needed` label to the PR. - [n/a] [`about-versioning.md`](https://github.com/grafana/mimir/blob/main/docs/sources/mimir/configure/about-versioning.md) updated with experimental features.
1 parent a70b72d commit 51cc661

30 files changed

+8326
-386
lines changed

pkg/api/api.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,10 @@ func (a *API) RegisterQueryAPI(handler http.Handler, buildInfoHandler http.Handl
476476
a.RegisterRoute(path.Join(a.cfg.PrometheusHTTPPrefix, "/api/v1/format_query"), handler, true, true, "GET", "POST")
477477
}
478478

479+
func (a *API) RegisterEvaluationAPI(handler http.Handler) {
480+
a.RegisterRoute("/api/v1/evaluate", handler, true, true, "POST")
481+
}
482+
479483
func (a *API) RegisterQueryAnalysisAPI(handler http.Handler) {
480484
a.RegisterRoute("/api/v1/analyze", handler, true, true, "POST")
481485
}

pkg/api/handlers.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ func (cfg *Config) statusFlagsHandler() http.HandlerFunc {
214214
func NewQuerierHandler(
215215
cfg Config,
216216
querierCfg querier.Config,
217+
dispatcher *querier.Dispatcher,
217218
queryable storage.SampleAndChunkQueryable,
218219
exemplarQueryable storage.ExemplarQueryable,
219220
metadataSupplier querier.MetadataSupplier,
@@ -311,9 +312,8 @@ func NewQuerierHandler(
311312
router.Use(querierapi.ConsistencyMiddleware().Wrap)
312313

313314
// Define the prefixes for all routes
314-
prefix := path.Join(cfg.ServerPrefix, cfg.PrometheusHTTPPrefix)
315-
316-
promRouter := route.New().WithPrefix(path.Join(prefix, "/api/v1"))
315+
promPrefix := path.Join(cfg.ServerPrefix, cfg.PrometheusHTTPPrefix, "/api/v1")
316+
promRouter := route.New().WithPrefix(path.Join(promPrefix))
317317
api.Register(promRouter)
318318

319319
// Track the requests count in the anonymous usage stats.
@@ -329,19 +329,23 @@ func NewQuerierHandler(
329329

330330
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
331331
// https://github.com/prometheus/prometheus/pull/7125/files
332-
router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(remoteReadStats.Wrap(querier.RemoteReadHandler(queryable, logger, querierCfg)))
333-
router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(instantQueryStats.Wrap(promRouter))
334-
router.Path(path.Join(prefix, "/api/v1/query_range")).Methods("GET", "POST").Handler(rangeQueryStats.Wrap(promRouter))
335-
router.Path(path.Join(prefix, "/api/v1/query_exemplars")).Methods("GET", "POST").Handler(exemplarsQueryStats.Wrap(promRouter))
336-
router.Path(path.Join(prefix, "/api/v1/labels")).Methods("GET", "POST").Handler(labelsQueryStats.Wrap(promRouter))
337-
router.Path(path.Join(prefix, "/api/v1/label/{name}/values")).Methods("GET").Handler(labelsQueryStats.Wrap(promRouter))
338-
router.Path(path.Join(prefix, "/api/v1/series")).Methods("GET", "POST", "DELETE").Handler(seriesQueryStats.Wrap(promRouter))
339-
router.Path(path.Join(prefix, "/api/v1/metadata")).Methods("GET").Handler(metadataQueryStats.Wrap(querier.NewMetadataHandler(metadataSupplier)))
340-
router.Path(path.Join(prefix, "/api/v1/cardinality/label_names")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.LabelNamesCardinalityHandler(distributor, limits)))
341-
router.Path(path.Join(prefix, "/api/v1/cardinality/label_values")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.LabelValuesCardinalityHandler(distributor, limits)))
342-
router.Path(path.Join(prefix, "/api/v1/cardinality/active_series")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.ActiveSeriesCardinalityHandler(distributor, limits)))
343-
router.Path(path.Join(prefix, "/api/v1/cardinality/active_native_histogram_metrics")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.ActiveNativeHistogramMetricsHandler(distributor, limits)))
344-
router.Path(path.Join(prefix, "/api/v1/format_query")).Methods("GET", "POST").Handler(formattingQueryStats.Wrap(promRouter))
332+
router.Path(path.Join(promPrefix, "/read")).Methods("POST").Handler(remoteReadStats.Wrap(querier.RemoteReadHandler(queryable, logger, querierCfg)))
333+
router.Path(path.Join(promPrefix, "/query")).Methods("GET", "POST").Handler(instantQueryStats.Wrap(promRouter))
334+
router.Path(path.Join(promPrefix, "/query_range")).Methods("GET", "POST").Handler(rangeQueryStats.Wrap(promRouter))
335+
router.Path(path.Join(promPrefix, "/query_exemplars")).Methods("GET", "POST").Handler(exemplarsQueryStats.Wrap(promRouter))
336+
router.Path(path.Join(promPrefix, "/labels")).Methods("GET", "POST").Handler(labelsQueryStats.Wrap(promRouter))
337+
router.Path(path.Join(promPrefix, "/label/{name}/values")).Methods("GET").Handler(labelsQueryStats.Wrap(promRouter))
338+
router.Path(path.Join(promPrefix, "/series")).Methods("GET", "POST", "DELETE").Handler(seriesQueryStats.Wrap(promRouter))
339+
router.Path(path.Join(promPrefix, "/metadata")).Methods("GET").Handler(metadataQueryStats.Wrap(querier.NewMetadataHandler(metadataSupplier)))
340+
router.Path(path.Join(promPrefix, "/cardinality/label_names")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.LabelNamesCardinalityHandler(distributor, limits)))
341+
router.Path(path.Join(promPrefix, "/cardinality/label_values")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.LabelValuesCardinalityHandler(distributor, limits)))
342+
router.Path(path.Join(promPrefix, "/cardinality/active_series")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.ActiveSeriesCardinalityHandler(distributor, limits)))
343+
router.Path(path.Join(promPrefix, "/cardinality/active_native_histogram_metrics")).Methods("GET", "POST").Handler(cardinalityQueryStats.Wrap(querier.ActiveNativeHistogramMetricsHandler(distributor, limits)))
344+
router.Path(path.Join(promPrefix, "/format_query")).Methods("GET", "POST").Handler(formattingQueryStats.Wrap(promRouter))
345+
346+
if dispatcher != nil {
347+
router.Path("/api/v1/evaluate").Methods("POST").Handler(dispatcher)
348+
}
345349

346350
// Track execution time.
347351
return stats.NewWallTimeMiddleware().Wrap(router)

pkg/frontend/v2/frontend_scheduler_adapter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func (a *frontendToSchedulerAdapter) frontendToSchedulerEnqueueRequest(
3838
Type: schedulerpb.ENQUEUE,
3939
QueryID: req.queryID,
4040
UserID: req.userID,
41-
HttpRequest: req.request,
41+
Payload: &schedulerpb.FrontendToScheduler_HttpRequest{HttpRequest: req.request},
4242
FrontendAddress: frontendAddr,
4343
StatsEnabled: req.statsEnabled,
4444
AdditionalQueueDimensions: addlQueueDims,

0 commit comments

Comments
 (0)