Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions tools/querytee/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ProxyConfig struct {
RequestURLFilter *regexp.Regexp
InstrumentCompares bool
Goldfish goldfish.Config
BackendSelectionStrategy BackendSelectionStrategy
}

func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
Expand All @@ -67,6 +68,10 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {

// Register Goldfish configuration flags
cfg.Goldfish.RegisterFlags(f)

// The default pick mode is naive but if there's a preferred backend configured, we switch to preferred.
cfg.BackendSelectionStrategy = BackendSelectionStrategyNaive
f.Var(&cfg.BackendSelectionStrategy, "proxy.backend-selection-strategy", "Proxy backend selection strategy (preferred, fastest, naive). If naive (the default) is selected but there's a preferred backend configured, the 'preferred' strategy is used instead.")
}

type Route struct {
Expand Down Expand Up @@ -278,6 +283,7 @@ func (p *Proxy) Start() error {
p.logger,
comp,
p.cfg.InstrumentCompares,
p.cfg.BackendSelectionStrategy,
)

// Add Goldfish if configured
Expand Down Expand Up @@ -325,6 +331,7 @@ func (p *Proxy) Start() error {
p.logger,
comp,
p.cfg.InstrumentCompares,
p.cfg.BackendSelectionStrategy,
)

router.Path(route.Path).Methods(route.Methods...).Handler(endpoint)
Expand Down
123 changes: 89 additions & 34 deletions tools/querytee/proxy_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -39,6 +40,37 @@ type ComparisonSummary struct {
missingMetrics int
}

type BackendSelectionStrategy string

func (m *BackendSelectionStrategy) Set(value string) error {
switch value {
case "preferred":
*m = BackendSelectionStrategyPreferred
case "fastest":
*m = BackendSelectionStrategyFastest
case "naive":
*m = BackendSelectionStrategyNaive
default:
return fmt.Errorf("invalid goldfish pick mode %s", value)
}
return nil
}

func (m *BackendSelectionStrategy) String() string {
return string(*m)
}

const (
// Preferred mode picks the preferred backend response as CellA and the first non-preferred response as CellB.
BackendSelectionStrategyPreferred BackendSelectionStrategy = "preferred"

// Fastest mode sorts the responses by duration and picks the first two. First one (quickest) is used as CellA and second one is used as CellB.
BackendSelectionStrategyFastest BackendSelectionStrategy = "fastest"

// Naive mode picks the first two responses from the list without any sorting. First one is used as CellA and second one is used as CellB.
BackendSelectionStrategyNaive BackendSelectionStrategy = "naive"
)

type ProxyEndpoint struct {
backends []*ProxyBackend
metrics *ProxyMetrics
Expand All @@ -47,8 +79,8 @@ type ProxyEndpoint struct {

instrumentCompares bool

// Whether for this endpoint there's a preferred backend configured.
hasPreferredBackend bool
// How goldfish will pick the cells for comparison and which will be used as the final response.
selectionStrategy BackendSelectionStrategy

// The route name used to track metrics.
routeName string
Expand All @@ -75,23 +107,25 @@ func NewProxyEndpoint(
logger log.Logger,
comparator comparator.ResponsesComparator,
instrumentCompares bool,
backendSelectionStrategy BackendSelectionStrategy,
) *ProxyEndpoint {
hasPreferredBackend := false
for _, backend := range backends {
if backend.preferred {
hasPreferredBackend = true
break
if backendSelectionStrategy == BackendSelectionStrategyNaive {
for _, backend := range backends {
if backend.preferred {
backendSelectionStrategy = BackendSelectionStrategyPreferred
break
}
}
}

return &ProxyEndpoint{
backends: backends,
routeName: routeName,
metrics: metrics,
logger: logger,
comparator: comparator,
hasPreferredBackend: hasPreferredBackend,
instrumentCompares: instrumentCompares,
backends: backends,
routeName: routeName,
metrics: metrics,
logger: logger,
comparator: comparator,
selectionStrategy: backendSelectionStrategy,
instrumentCompares: instrumentCompares,
}
}

Expand Down Expand Up @@ -349,25 +383,7 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *Back

// Process with Goldfish if enabled and sampled
if goldfishSample && p.goldfishManager != nil && len(responses) >= 2 {
// Use preferred backend as Cell A, first non-preferred as Cell B
var cellAResp, cellBResp *BackendResponse

// Find preferred backend response
for _, resp := range responses {
if resp != nil && resp.backend.preferred {
cellAResp = resp
break
}
}

// Find first non-preferred backend response
for _, resp := range responses {
if resp != nil && !resp.backend.preferred {
cellBResp = resp
break
}
}

cellAResp, cellBResp := p.pickBackendResponses(responses, p.selectionStrategy)
if cellAResp != nil && cellBResp != nil {
tenantID, _, _ := tenant.ExtractTenantIDFromHTTPRequest(r)
level.Info(p.logger).Log("msg", "Processing query with Goldfish",
Expand All @@ -384,6 +400,45 @@ func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *Back
}
}

// pickBackendResponses picks the backend responses used for comparison, where CellA returns will be used as the final response.
func (p *ProxyEndpoint) pickBackendResponses(responses []*BackendResponse, selectionStrategy BackendSelectionStrategy) (*BackendResponse, *BackendResponse) {
if selectionStrategy == BackendSelectionStrategyFastest {
sort.Slice(responses, func(i, j int) bool {
return responses[i].duration < responses[j].duration
})

p.metrics.fastestBackendSelected.WithLabelValues(responses[0].backend.name, p.routeName).Inc()

return responses[0], responses[1]
}

if selectionStrategy == BackendSelectionStrategyNaive {
return responses[0], responses[1]
}

if selectionStrategy != BackendSelectionStrategyPreferred {
level.Error(p.logger).Log("msg", "invalid selection strategy for proxy", "selection_strategy", selectionStrategy)
return nil, nil
}

var cellAResp, cellBResp *BackendResponse
for _, resp := range responses {
if resp != nil && resp.backend.preferred {
cellAResp = resp
break
}
}

for _, resp := range responses {
if resp != nil && !resp.backend.preferred {
cellBResp = resp
break
}
}

return cellAResp, cellBResp
}

func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *BackendResponse) *BackendResponse {
var (
responses = make([]*BackendResponse, 0, len(p.backends))
Expand All @@ -395,7 +450,7 @@ func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *BackendResp
// - There's no preferred backend configured
// - Or this response is from the preferred backend
// - Or the preferred backend response has already been received and wasn't successful
if res.succeeded() && (!p.hasPreferredBackend || res.backend.preferred || preferredResponseReceived) {
if res.succeeded() && (!(p.selectionStrategy == BackendSelectionStrategyPreferred) || res.backend.preferred || preferredResponseReceived) {
return res
}

Expand Down
1 change: 1 addition & 0 deletions tools/querytee/proxy_endpoint_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func TestProxyEndpoint_GoldfishQueriesContinueAfterNonGoldfishComplete(t *testin
log.NewNopLogger(),
nil,
false,
BackendSelectionStrategyNaive,
)

// Create a parent span to simulate incoming request with tracing
Expand Down
10 changes: 5 additions & 5 deletions tools/querytee/proxy_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func createTestEndpoint(backends []*ProxyBackend, routeName string, comparator c
metrics := NewProxyMetrics(nil)
logger := log.NewNopLogger()

endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, comparator, instrumentCompares)
endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, comparator, instrumentCompares, BackendSelectionStrategyNaive)
handlerFactory := NewHandlerFactory(HandlerFactoryConfig{
Backends: backends,
Codec: queryrange.DefaultCodec,
Expand All @@ -55,7 +55,7 @@ func createTestEndpoint(backends []*ProxyBackend, routeName string, comparator c
func createTestEndpointWithMetrics(backends []*ProxyBackend, routeName string, comp comparator.ResponsesComparator, instrumentCompares bool, metrics *ProxyMetrics) *ProxyEndpoint {
logger := log.NewNopLogger()

endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, comp, instrumentCompares)
endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, comp, instrumentCompares, BackendSelectionStrategyNaive)
handlerFactory := NewHandlerFactory(HandlerFactoryConfig{
Backends: backends,
Codec: queryrange.DefaultCodec,
Expand Down Expand Up @@ -84,7 +84,7 @@ func createTestEndpointWithGoldfish(backends []*ProxyBackend, routeName string,
Metrics: metrics,
})

endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, nil, false)
endpoint := NewProxyEndpoint(backends, routeName, metrics, logger, nil, false, BackendSelectionStrategyNaive)
queryHandler := handlerFactory.CreateHandler(routeName, nil, false)
metricQueryHandler := handlerFactory.CreateHandler(routeName, nil, true)
endpoint.WithQueryHandlers(queryHandler, metricQueryHandler, queryrange.DefaultCodec)
Expand Down Expand Up @@ -168,7 +168,7 @@ func Test_ProxyEndpoint_waitBackendResponseForDownstream(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
endpoint := NewProxyEndpoint(testData.backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil, false)
endpoint := NewProxyEndpoint(testData.backends, "test", NewProxyMetrics(nil), log.NewNopLogger(), nil, false, BackendSelectionStrategyNaive)

// Send the responses from a dedicated goroutine.
resCh := make(chan *BackendResponse)
Expand Down Expand Up @@ -334,7 +334,7 @@ func Test_ProxyEndpoint_WriteRequests(t *testing.T) {
// endpoint := createTestEndpoint(backends, "test", nil, false)
metrics := NewProxyMetrics(nil)
logger := log.NewNopLogger()
endpoint := NewProxyEndpoint(backends, "test", metrics, logger, nil, false)
endpoint := NewProxyEndpoint(backends, "test", metrics, logger, nil, false, BackendSelectionStrategyNaive)

for _, tc := range []struct {
name string
Expand Down
8 changes: 8 additions & 0 deletions tools/querytee/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type ProxyMetrics struct {
// Sampling metrics
queriesSampled *prometheus.CounterVec
samplingDecisions *prometheus.CounterVec

fastestBackendSelected *prometheus.CounterVec
}

func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics {
Expand Down Expand Up @@ -61,6 +63,12 @@ func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics {
Name: "sampling_decisions_total",
Help: "Total number of sampling decisions made.",
}, []string{"tenant", "route", "decision"}),

fastestBackendSelected: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_querytee",
Name: "fastest_backend_selected_total",
Help: "Total number of fastest backend selected.",
}, []string{"backend", "route"}),
}

return m
Expand Down