diff --git a/tools/querytee/proxy.go b/tools/querytee/proxy.go index 8e85fdbb9b92b..0305b4cbd16cb 100644 --- a/tools/querytee/proxy.go +++ b/tools/querytee/proxy.go @@ -44,6 +44,7 @@ type ProxyConfig struct { RequestURLFilter *regexp.Regexp InstrumentCompares bool Goldfish goldfish.Config + BackendSelectionStrategy BackendSelectionStrategy } func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { @@ -67,6 +68,10 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) { // Register Goldfish configuration flags cfg.Goldfish.RegisterFlags(f) + + // The default pick mode is naive but if there's a preferred backend configured, we switch to preferred. + cfg.BackendSelectionStrategy = BackendSelectionStrategyNaive + f.Var(&cfg.BackendSelectionStrategy, "proxy.backend-selection-strategy", "Proxy backend selection strategy (preferred, fastest, naive). If naive (the default) is selected but there's a preferred backend configured, the 'preferred' strategy is used instead.") } type Route struct { @@ -278,6 +283,7 @@ func (p *Proxy) Start() error { p.logger, comp, p.cfg.InstrumentCompares, + p.cfg.BackendSelectionStrategy, ) // Add Goldfish if configured @@ -325,6 +331,7 @@ func (p *Proxy) Start() error { p.logger, comp, p.cfg.InstrumentCompares, + p.cfg.BackendSelectionStrategy, ) router.Path(route.Path).Methods(route.Methods...).Handler(endpoint) diff --git a/tools/querytee/proxy_endpoint.go b/tools/querytee/proxy_endpoint.go index 97e95741532e3..ac6c7ae568f0a 100644 --- a/tools/querytee/proxy_endpoint.go +++ b/tools/querytee/proxy_endpoint.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "net/http" + "sort" "strconv" "strings" "sync" @@ -39,6 +40,37 @@ type ComparisonSummary struct { missingMetrics int } +type BackendSelectionStrategy string + +func (m *BackendSelectionStrategy) Set(value string) error { + switch value { + case "preferred": + *m = BackendSelectionStrategyPreferred + case "fastest": + *m = BackendSelectionStrategyFastest + case "naive": + *m = BackendSelectionStrategyNaive + default: + return fmt.Errorf("invalid goldfish pick mode %s", value) + } + return nil +} + +func (m *BackendSelectionStrategy) String() string { + return string(*m) +} + +const ( + // Preferred mode picks the preferred backend response as CellA and the first non-preferred response as CellB. + BackendSelectionStrategyPreferred BackendSelectionStrategy = "preferred" + + // Fastest mode sorts the responses by duration and picks the first two. First one (quickest) is used as CellA and second one is used as CellB. + BackendSelectionStrategyFastest BackendSelectionStrategy = "fastest" + + // Naive mode picks the first two responses from the list without any sorting. First one is used as CellA and second one is used as CellB. + BackendSelectionStrategyNaive BackendSelectionStrategy = "naive" +) + type ProxyEndpoint struct { backends []*ProxyBackend metrics *ProxyMetrics @@ -47,8 +79,8 @@ type ProxyEndpoint struct { instrumentCompares bool - // Whether for this endpoint there's a preferred backend configured. - hasPreferredBackend bool + // How goldfish will pick the cells for comparison and which will be used as the final response. + selectionStrategy BackendSelectionStrategy // The route name used to track metrics. routeName string @@ -75,23 +107,25 @@ func NewProxyEndpoint( logger log.Logger, comparator comparator.ResponsesComparator, instrumentCompares bool, + backendSelectionStrategy BackendSelectionStrategy, ) *ProxyEndpoint { - hasPreferredBackend := false - for _, backend := range backends { - if backend.preferred { - hasPreferredBackend = true - break + if backendSelectionStrategy == BackendSelectionStrategyNaive { + for _, backend := range backends { + if backend.preferred { + backendSelectionStrategy = BackendSelectionStrategyPreferred + break + } } } return &ProxyEndpoint{ - backends: backends, - routeName: routeName, - metrics: metrics, - logger: logger, - comparator: comparator, - hasPreferredBackend: hasPreferredBackend, - instrumentCompares: instrumentCompares, + backends: backends, + routeName: routeName, + metrics: metrics, + logger: logger, + comparator: comparator, + selectionStrategy: backendSelectionStrategy, + instrumentCompares: instrumentCompares, } } @@ -349,25 +383,7 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *Back // Process with Goldfish if enabled and sampled if goldfishSample && p.goldfishManager != nil && len(responses) >= 2 { - // Use preferred backend as Cell A, first non-preferred as Cell B - var cellAResp, cellBResp *BackendResponse - - // Find preferred backend response - for _, resp := range responses { - if resp != nil && resp.backend.preferred { - cellAResp = resp - break - } - } - - // Find first non-preferred backend response - for _, resp := range responses { - if resp != nil && !resp.backend.preferred { - cellBResp = resp - break - } - } - + cellAResp, cellBResp := p.pickBackendResponses(responses, p.selectionStrategy) if cellAResp != nil && cellBResp != nil { tenantID, _, _ := tenant.ExtractTenantIDFromHTTPRequest(r) level.Info(p.logger).Log("msg", "Processing query with Goldfish", @@ -384,6 +400,45 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *Back } } +// pickBackendResponses picks the backend responses used for comparison, where CellA returns will be used as the final response. +func (p *ProxyEndpoint) pickBackendResponses(responses []*BackendResponse, selectionStrategy BackendSelectionStrategy) (*BackendResponse, *BackendResponse) { + if selectionStrategy == BackendSelectionStrategyFastest { + sort.Slice(responses, func(i, j int) bool { + return responses[i].duration < responses[j].duration + }) + + p.metrics.fastestBackendSelected.WithLabelValues(responses[0].backend.name, p.routeName).Inc() + + return responses[0], responses[1] + } + + if selectionStrategy == BackendSelectionStrategyNaive { + return responses[0], responses[1] + } + + if selectionStrategy != BackendSelectionStrategyPreferred { + level.Error(p.logger).Log("msg", "invalid selection strategy for proxy", "selection_strategy", selectionStrategy) + return nil, nil + } + + var cellAResp, cellBResp *BackendResponse + for _, resp := range responses { + if resp != nil && resp.backend.preferred { + cellAResp = resp + break + } + } + + for _, resp := range responses { + if resp != nil && !resp.backend.preferred { + cellBResp = resp + break + } + } + + return cellAResp, cellBResp +} + func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *BackendResponse) *BackendResponse { var ( responses = make([]*BackendResponse, 0, len(p.backends)) @@ -395,7 +450,7 @@ func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *BackendResp // - There's no preferred backend configured // - Or this response is from the preferred backend // - Or the preferred backend response has already been received and wasn't successful - if res.succeeded() && (!p.hasPreferredBackend || res.backend.preferred || preferredResponseReceived) { + if res.succeeded() && (!(p.selectionStrategy == BackendSelectionStrategyPreferred) || res.backend.preferred || preferredResponseReceived) { return res } diff --git a/tools/querytee/proxy_endpoint_integration_test.go b/tools/querytee/proxy_endpoint_integration_test.go index 9379efc315d69..a9a681a57140d 100644 --- a/tools/querytee/proxy_endpoint_integration_test.go +++ b/tools/querytee/proxy_endpoint_integration_test.go @@ -86,6 +86,7 @@ func TestProxyEndpoint_GoldfishQueriesContinueAfterNonGoldfishComplete(t *testin log.NewNopLogger(), nil, false, + BackendSelectionStrategyNaive, ) // Create a parent span to simulate incoming request with tracing diff --git a/tools/querytee/proxy_endpoint_test.go b/tools/querytee/proxy_endpoint_test.go index d7869d4a01ad5..503529c7d5bb9 100644 --- a/tools/querytee/proxy_endpoint_test.go +++ b/tools/querytee/proxy_endpoint_test.go @@ -36,7 +36,7 @@ func createTestEndpoint(backends []*ProxyBackend, routeName string, comparator c metrics := NewProxyMetrics(nil) logger := log.NewNopLogger() - endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, comparator, instrumentCompares) + endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, comparator, instrumentCompares, BackendSelectionStrategyNaive) handlerFactory := NewHandlerFactory(HandlerFactoryConfig{ Backends: backends, Codec: queryrange.DefaultCodec, @@ -55,7 +55,7 @@ func createTestEndpoint(backends []*ProxyBackend, routeName string, comparator c func createTestEndpointWithMetrics(backends []*ProxyBackend, routeName string, comp comparator.ResponsesComparator, instrumentCompares bool, metrics *ProxyMetrics) *ProxyEndpoint { logger := log.NewNopLogger() - endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, comp, instrumentCompares) + endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, comp, instrumentCompares, BackendSelectionStrategyNaive) handlerFactory := NewHandlerFactory(HandlerFactoryConfig{ Backends: backends, Codec: queryrange.DefaultCodec, @@ -84,7 +84,7 @@ func createTestEndpointWithGoldfish(backends []*ProxyBackend, routeName string, Metrics: metrics, }) - endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, nil, false) + endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, nil, false, BackendSelectionStrategyNaive) queryHandler := handlerFactory.CreateHandler(routeName, nil, false) metricQueryHandler := handlerFactory.CreateHandler(routeName, nil, true) endpoint.WithQueryHandlers(queryHandler, metricQueryHandler, queryrange.DefaultCodec) @@ -168,7 +168,7 @@ func Test_ProxyEndpoint_waitBackendResponseForDownstream(t *testing.T) { for testName, testData := range tests { t.Run(testName, func(t *testing.T) { - endpoint := NewProxyEndpoint(testData.backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil, false) + endpoint := NewProxyEndpoint(testData.backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil, false, BackendSelectionStrategyNaive) // Send the responses from a dedicated goroutine. resCh := make(chan *BackendResponse) @@ -334,7 +334,7 @@ func Test_ProxyEndpoint_WriteRequests(t *testing.T) { // endpoint := createTestEndpoint(backends, "test", nil, false) metrics := NewProxyMetrics(nil) logger := log.NewNopLogger() - endpoint := NewProxyEndpoint(backends, "test", metrics, logger, nil, false) + endpoint := NewProxyEndpoint(backends, "test", metrics, logger, nil, false, BackendSelectionStrategyNaive) for _, tc := range []struct { name string diff --git a/tools/querytee/proxy_metrics.go b/tools/querytee/proxy_metrics.go index 8e2139f26ebd4..69f5f7f3ecaca 100644 --- a/tools/querytee/proxy_metrics.go +++ b/tools/querytee/proxy_metrics.go @@ -23,6 +23,8 @@ type ProxyMetrics struct { // Sampling metrics queriesSampled *prometheus.CounterVec samplingDecisions *prometheus.CounterVec + + fastestBackendSelected *prometheus.CounterVec } func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics { @@ -61,6 +63,12 @@ func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics { Name: "sampling_decisions_total", Help: "Total number of sampling decisions made.", }, []string{"tenant", "route", "decision"}), + + fastestBackendSelected: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki_querytee", + Name: "fastest_backend_selected_total", + Help: "Total number of fastest backend selected.", + }, []string{"backend", "route"}), } return m