From b4b9a508f69f48590646fc204e094deaafb2c711 Mon Sep 17 00:00:00 2001 From: Yi Shi Date: Wed, 18 Mar 2026 22:21:23 +0000 Subject: [PATCH 1/6] add rule config loading --- cmd/thanos/query_frontend.go | 23 +- pkg/queryfrontend/protection_config.go | 138 ++++++++++++ pkg/queryfrontend/protection_config_test.go | 233 ++++++++++++++++++++ 3 files changed, 390 insertions(+), 4 deletions(-) create mode 100644 pkg/queryfrontend/protection_config.go create mode 100644 pkg/queryfrontend/protection_config_test.go diff --git a/cmd/thanos/query_frontend.go b/cmd/thanos/query_frontend.go index 740b647485a..eb8b251d01a 100644 --- a/cmd/thanos/query_frontend.go +++ b/cmd/thanos/query_frontend.go @@ -4,6 +4,7 @@ package main import ( + "context" "net" "net/http" "time" @@ -43,9 +44,10 @@ import ( type queryFrontendConfig struct { queryfrontend.Config - http httpConfig - webDisableCORS bool - orgIdHeaders []string + http httpConfig + webDisableCORS bool + orgIdHeaders []string + protectionConfigPath string } func registerQueryFrontend(app *extkingpin.App) { @@ -174,6 +176,9 @@ func registerQueryFrontend(app *extkingpin.App) { cmd.Flag("query-frontend.slow-query-logs-user-header", "Set the value of the field remote_user in the slow query logs to the value of the given HTTP header. Falls back to reading the user from the basic auth header.").PlaceHolder("").Default("").StringVar(&cfg.CortexHandlerConfig.SlowQueryLogsUserHeader) + cmd.Flag("protection-config", "Path to YAML file containing query protection rules. If not set, protection engine starts with no rules."). + Default("").StringVar(&cfg.protectionConfigPath) + reqLogConfig := extkingpin.RegisterRequestLoggingFlags(cmd) cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { @@ -309,7 +314,17 @@ func runQueryFrontend( } } - tripperWare, err := queryfrontend.NewTripperware(cfg.Config, reg, logger) + engine := queryfrontend.NewProtectionEngine(nil) + if cfg.protectionConfigPath != "" { + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + return queryfrontend.WatchConfig(ctx, engine, cfg.protectionConfigPath, logger, 30*time.Second) + }, func(err error) { + cancel() + }) + } + + tripperWare, err := queryfrontend.NewTripperware(cfg.Config, reg, logger, engine) if err != nil { return errors.Wrap(err, "setup tripperwares") } diff --git a/pkg/queryfrontend/protection_config.go b/pkg/queryfrontend/protection_config.go new file mode 100644 index 00000000000..e8f781a154d --- /dev/null +++ b/pkg/queryfrontend/protection_config.go @@ -0,0 +1,138 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package queryfrontend + +import ( + "context" + "crypto/sha256" + "os" + "regexp" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "gopkg.in/yaml.v2" +) + +// ProtectionConfig is the top-level structure of the protection config file. +type ProtectionConfig struct { + Rules []RuleConfig `yaml:"rules"` +} + +// RuleConfig is the config for a single protection rule. +type RuleConfig struct { + Name string `yaml:"name"` + Protection string `yaml:"protection"` + Action string `yaml:"action"` + Actor string `yaml:"actor"` + Enabled bool `yaml:"enabled"` + Args map[string]string `yaml:"args"` +} + +// loadRulesFromFile reads a YAML config file and constructs a list of Rules. +func loadRulesFromFile(path string) ([]*Rule, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, errors.Wrap(err, "read protection config file") + } + + var cfg ProtectionConfig + if err := yaml.Unmarshal(data, &cfg); err != nil { + return nil, errors.Wrap(err, "parse protection config") + } + + rules := make([]*Rule, 0, len(cfg.Rules)) + for _, rc := range cfg.Rules { + rule, err := buildRule(rc) + if err != nil { + return nil, errors.Wrapf(err, "build rule %q", rc.Name) + } + rules = append(rules, rule) + } + return rules, nil +} + +func buildRule(rc RuleConfig) (*Rule, error) { + factory, err := LookupProtection(rc.Protection) + if err != nil { + return nil, err + } + + protection, err := factory(rc.Args) + if err != nil { + return nil, errors.Wrap(err, "construct protection") + } + + action, err := parseAction(rc.Action) + if err != nil { + return nil, err + } + + if rc.Name == "" { + return nil, errors.New("name is required") + } + if rc.Actor == "" { + return nil, errors.New("actor is required") + } + actorRegex, err := regexp.Compile(rc.Actor) + if err != nil { + return nil, errors.Wrapf(err, "compile actor regex %q", rc.Actor) + } + + return NewRule(rc.Name, protection, action, actorRegex, rc.Enabled), nil +} + +func parseAction(s string) (RuleAction, error) { + switch s { + case "log": + return RuleActionLog, nil + case "block": + return RuleActionBlock, nil + default: + return RuleActionLog, errors.Errorf("unknown action %q, must be log or block", s) + } +} + +// WatchConfig watches the protection config file for changes and reloads the engine. +// It polls the file at the given interval, detecting changes via SHA256 checksum. +// Blocks until ctx is cancelled. +func WatchConfig(ctx context.Context, engine *ProtectionEngine, path string, logger log.Logger, interval time.Duration) error { + if err := reloadRules(engine, path, logger); err != nil { + return errors.Wrap(err, "load initial protection config") + } + + var previousChecksum [sha256.Size]byte + + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(interval): + data, err := os.ReadFile(path) + if err != nil { + level.Error(logger).Log("msg", "failed to read protection config", "path", path, "err", err) + continue + } + checksum := sha256.Sum256(data) + if checksum == previousChecksum { + continue + } + previousChecksum = checksum + if err := reloadRules(engine, path, logger); err != nil { + level.Error(logger).Log("msg", "failed to reload protection config", "path", path, "err", err) + } + } + } +} + +func reloadRules(engine *ProtectionEngine, path string, logger log.Logger) error { + rules, err := loadRulesFromFile(path) + if err != nil { + return errors.Wrap(err, "load protection rules") + } + engine.UpdateRules(rules) + level.Info(logger).Log("msg", "protection config reloaded", "path", path, "rules", len(rules)) + return nil +} diff --git a/pkg/queryfrontend/protection_config_test.go b/pkg/queryfrontend/protection_config_test.go new file mode 100644 index 00000000000..843f8f27c0a --- /dev/null +++ b/pkg/queryfrontend/protection_config_test.go @@ -0,0 +1,233 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +// Tests for protection config loading and file watching: +// LoadRulesFromFile (YAML parsing and rule construction) and WatchConfig (polling reload on file change). +package queryfrontend + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" +) + +func writeConfigFile(t *testing.T, dir, content string) string { + t.Helper() + path := filepath.Join(dir, "protection.yaml") + require.NoError(t, os.WriteFile(path, []byte(content), 0600)) + return path +} + +func TestLoadRulesFromFile_FileNotFound(t *testing.T) { + _, err := loadRulesFromFile("/nonexistent/path/protection.yaml") + require.Error(t, err) +} + +func TestLoadRulesFromFile_Valid(t *testing.T) { + dir := t.TempDir() + path := writeConfigFile(t, dir, ` +rules: + - name: noop-log + protection: noop + action: log + actor: ".*" + enabled: true + - name: noop-block + protection: noop + action: block + actor: "^admin$" + enabled: false +`) + + rules, err := loadRulesFromFile(path) + require.NoError(t, err) + require.Len(t, rules, 2) + require.Equal(t, "noop-log", rules[0].name) + require.Equal(t, RuleActionLog, rules[0].action) + require.True(t, rules[0].enabled) + require.Equal(t, "noop-block", rules[1].name) + require.Equal(t, RuleActionBlock, rules[1].action) + require.False(t, rules[1].enabled) +} + +func TestLoadRulesFromFile_InvalidYAML(t *testing.T) { + dir := t.TempDir() + path := writeConfigFile(t, dir, ` +rules: + - name: bad + protection: noop + action: log + actor: ".*" + args: + - foo + - bar +`) + + _, err := loadRulesFromFile(path) + require.Error(t, err) + require.Contains(t, err.Error(), "parse protection config") +} + +func TestLoadRulesFromFile_UnknownProtection(t *testing.T) { + dir := t.TempDir() + path := writeConfigFile(t, dir, ` +rules: + - name: bad + protection: nonexistent + action: log + actor: ".*" + enabled: true +`) + + _, err := loadRulesFromFile(path) + require.Error(t, err) + require.Contains(t, err.Error(), "nonexistent") +} + +func TestLoadRulesFromFile_UnknownAction(t *testing.T) { + dir := t.TempDir() + path := writeConfigFile(t, dir, ` +rules: + - name: bad + protection: noop + action: unknown + actor: ".*" + enabled: true +`) + + _, err := loadRulesFromFile(path) + require.Error(t, err) + require.Contains(t, err.Error(), "unknown action") +} + +func TestLoadRulesFromFile_EmptyName(t *testing.T) { + dir := t.TempDir() + path := writeConfigFile(t, dir, ` +rules: + - protection: noop + action: log + actor: ".*" + enabled: true +`) + + _, err := loadRulesFromFile(path) + require.Error(t, err) + require.Contains(t, err.Error(), "name is required") +} + +func TestLoadRulesFromFile_EmptyActor(t *testing.T) { + dir := t.TempDir() + path := writeConfigFile(t, dir, ` +rules: + - name: bad + protection: noop + action: log + enabled: true +`) + + _, err := loadRulesFromFile(path) + require.Error(t, err) + require.Contains(t, err.Error(), "actor is required") +} + +func TestLoadRulesFromFile_InvalidActorRegex(t *testing.T) { + dir := t.TempDir() + path := writeConfigFile(t, dir, ` +rules: + - name: bad + protection: noop + action: log + actor: "[" + enabled: true +`) + + _, err := loadRulesFromFile(path) + require.Error(t, err) + require.Contains(t, err.Error(), "compile actor regex") +} + +func TestWatchConfig_InitialLoad(t *testing.T) { + dir := t.TempDir() + path := writeConfigFile(t, dir, ` +rules: + - name: initial-rule + protection: noop + action: log + actor: ".*" + enabled: true +`) + + engine := NewProtectionEngine(nil) + ctx, cancel := context.WithCancel(context.Background()) + cancel() // cancel immediately so WatchConfig returns after initial load + + err := WatchConfig(ctx, engine, path, log.NewNopLogger(), time.Hour) + require.NoError(t, err) + + // Verify the rule was loaded by evaluating a request. + result, err := engine.Evaluate(context.Background(), thanosQueryReq{actor: "anyone"}) + require.NoError(t, err) + require.NotNil(t, result) + require.Equal(t, "initial-rule", result.RuleName) +} + +func TestWatchConfig_InitialLoadFailure(t *testing.T) { + engine := NewProtectionEngine(nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err := WatchConfig(ctx, engine, "/nonexistent/path.yaml", log.NewNopLogger(), time.Hour) + require.Error(t, err) +} + +func TestWatchConfig_ReloadsOnFileChange(t *testing.T) { + dir := t.TempDir() + path := writeConfigFile(t, dir, ` +rules: + - name: rule-v1 + protection: noop + action: log + actor: ".*" + enabled: true +`) + + engine := NewProtectionEngine(nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Run WatchConfig with a short interval in the background. + done := make(chan error, 1) + go func() { + done <- WatchConfig(ctx, engine, path, log.NewNopLogger(), 20*time.Millisecond) + }() + + // Wait for initial load: rule-v1 should be active. + require.Eventually(t, func() bool { + result, err := engine.Evaluate(context.Background(), thanosQueryReq{actor: "anyone"}) + return err == nil && result != nil && result.RuleName == "rule-v1" + }, time.Second, 5*time.Millisecond) + + // Update the file: rename the rule to rule-v2. + require.NoError(t, os.WriteFile(path, []byte(` +rules: + - name: rule-v2 + protection: noop + action: block + actor: ".*" + enabled: true +`), 0600)) + + // Wait for the reload to pick up the new rule name. + require.Eventually(t, func() bool { + result, err := engine.Evaluate(context.Background(), thanosQueryReq{actor: "anyone"}) + return err == nil && result != nil && result.RuleName == "rule-v2" + }, time.Second, 5*time.Millisecond) + + cancel() + require.NoError(t, <-done) +} From 0cbaf1405494e7cf93f1f2a0c87309375800441b Mon Sep 17 00:00:00 2001 From: Yi Shi Date: Wed, 18 Mar 2026 22:22:06 +0000 Subject: [PATCH 2/6] add query protection middleware to query frontend roundtrip --- pkg/queryfrontend/roundtrip.go | 12 ++++++++++-- pkg/queryfrontend/roundtrip_test.go | 12 ++++++------ 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/pkg/queryfrontend/roundtrip.go b/pkg/queryfrontend/roundtrip.go index 6eaba9037b9..24dc209c641 100644 --- a/pkg/queryfrontend/roundtrip.go +++ b/pkg/queryfrontend/roundtrip.go @@ -34,7 +34,7 @@ const ( var labelValuesPattern = regexp.MustCompile("/api/v1/label/.+/values$") // NewTripperware returns a Tripperware which sends requests to different sub tripperwares based on the query type. -func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger) (queryrange.Tripperware, error) { +func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger, engine *ProtectionEngine) (queryrange.Tripperware, error) { var ( queryRangeLimits, labelsLimits queryrange.Limits err error @@ -63,7 +63,7 @@ func NewTripperware(config Config, reg prometheus.Registerer, logger log.Logger) queryRangeCodec, config.NumShards, config.CortexHandlerConfig.QueryStatsEnabled, - prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "query_range"}, reg), logger, config.ForwardHeaders) + prometheus.WrapRegistererWith(prometheus.Labels{"tripperware": "query_range"}, reg), logger, config.ForwardHeaders, engine) if err != nil { return nil, err } @@ -169,6 +169,7 @@ func newQueryRangeTripperware( reg prometheus.Registerer, logger log.Logger, forwardHeaders []string, + engine *ProtectionEngine, ) (queryrange.Tripperware, error) { queryRangeMiddleware := []queryrange.Middleware{queryrange.NewLimitsMiddleware(limits)} m := queryrange.NewInstrumentMiddlewareMetrics(reg) @@ -251,6 +252,13 @@ func newQueryRangeTripperware( NewRangeQueryLoggingMiddleware(logger, reg), ) + // Add protection middleware (inside logging so blocked queries are still logged). + queryRangeMiddleware = append( + queryRangeMiddleware, + queryrange.InstrumentMiddleware("protection", m, logger), + NewProtectionMiddleware(engine, logger, reg), + ) + return func(next http.RoundTripper) http.RoundTripper { rt := queryrange.NewRoundTripper(next, codec, forwardHeaders, queryRangeMiddleware...) return queryrange.RoundTripFunc(func(r *http.Request) (*http.Response, error) { diff --git a/pkg/queryfrontend/roundtrip_test.go b/pkg/queryfrontend/roundtrip_test.go index 01f35b6510a..7768cc08816 100644 --- a/pkg/queryfrontend/roundtrip_test.go +++ b/pkg/queryfrontend/roundtrip_test.go @@ -197,7 +197,7 @@ func TestRoundTripRetryMiddleware(t *testing.T) { Limits: defaultLimits, SplitQueriesByInterval: day, }, - }, nil, log.NewNopLogger(), + }, nil, log.NewNopLogger(), NewProtectionEngine(nil), ) testutil.Ok(t, err) @@ -370,7 +370,7 @@ func TestRoundTripSplitIntervalMiddleware(t *testing.T) { Limits: defaultLimits, SplitQueriesByInterval: tc.splitInterval, }, - }, nil, log.NewNopLogger(), + }, nil, log.NewNopLogger(), NewProtectionEngine(nil), ) testutil.Ok(t, err) @@ -469,7 +469,7 @@ func TestRoundTripQueryRangeCacheMiddleware(t *testing.T) { ResultsCacheConfig: cacheConf, SplitQueriesByInterval: day, }, - }, nil, log.NewNopLogger(), + }, nil, log.NewNopLogger(), NewProtectionEngine(nil), ) testutil.Ok(t, err) @@ -563,7 +563,7 @@ func TestRoundTripQueryCacheWithShardingMiddleware(t *testing.T) { SplitQueriesByInterval: day, }, CortexHandlerConfig: &transport.HandlerConfig{}, - }, nil, log.NewNopLogger(), + }, nil, log.NewNopLogger(), NewProtectionEngine(nil), ) testutil.Ok(t, err) @@ -688,7 +688,7 @@ func TestRoundTripLabelsCacheMiddleware(t *testing.T) { SplitQueriesByInterval: day, }, CortexHandlerConfig: &transport.HandlerConfig{}, - }, nil, log.NewNopLogger(), + }, nil, log.NewNopLogger(), NewProtectionEngine(nil), ) testutil.Ok(t, err) @@ -802,7 +802,7 @@ func TestRoundTripSeriesCacheMiddleware(t *testing.T) { ResultsCacheConfig: cacheConf, SplitQueriesByInterval: day, }, - }, nil, log.NewNopLogger(), + }, nil, log.NewNopLogger(), NewProtectionEngine(nil), ) testutil.Ok(t, err) From 6bf84333849a63a78f359f85630695259cf1ea7c Mon Sep 17 00:00:00 2001 From: Yi Shi Date: Wed, 18 Mar 2026 22:22:32 +0000 Subject: [PATCH 3/6] add query protection e2e test --- test/e2e/e2ethanos/services.go | 46 +++++++++++++++++++++++++ test/e2e/query_frontend_test.go | 61 +++++++++++++++++++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index c8a9e7fc62d..a932673f88f 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -1050,6 +1050,52 @@ func NewQueryFrontend(e e2e.Environment, name, downstreamURL string, config quer }), "http") } +// NewQueryFrontendWithProtection starts a query-frontend with a protection config file mounted into the container. +// protectionConfigContent is the YAML content of the protection config. +func NewQueryFrontendWithProtection(e e2e.Environment, name, downstreamURL string, config queryfrontend.Config, cacheConfig queryfrontend.CacheProviderConfig, protectionConfigContent string) *e2eobs.Observable { + cacheConfigBytes, err := yaml.Marshal(cacheConfig) + if err != nil { + return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrapf(err, "marshal response cache config file: %v", cacheConfig))} + } + + f := e.Runnable(fmt.Sprintf("query-frontend-%s", name)). + WithPorts(map[string]int{"http": 8080}). + Future() + + if err := os.MkdirAll(f.Dir(), 0750); err != nil { + return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "create dir"))} + } + + protectionConfigPath := filepath.Join(f.Dir(), "protection.yaml") + if err := os.WriteFile(protectionConfigPath, []byte(protectionConfigContent), 0600); err != nil { + return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "write protection config file"))} + } + + const containerProtectionConfigPath = "/etc/thanos/protection.yaml" + + flags := map[string]string{ + "--debug.name": fmt.Sprintf("query-frontend-%s", name), + "--http-address": ":8080", + "--query-frontend.downstream-url": downstreamURL, + "--log.level": infoLogLevel, + "--query-range.response-cache-config": string(cacheConfigBytes), + "--protection-config": containerProtectionConfigPath, + } + + if !config.QueryRangeConfig.AlignRangeWithStep { + flags["--no-query-range.align-range-with-step"] = "" + } + + return e2eobs.AsObservable(f.Init(e2e.StartOptions{ + Image: DefaultImage(), + Command: e2e.NewCommand("query-frontend", e2e.BuildArgs(flags)...), + Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200), + User: strconv.Itoa(os.Getuid()), + WaitReadyBackoff: &defaultBackoffConfig, + Volumes: []string{protectionConfigPath + ":" + containerProtectionConfigPath + ":ro"}, + }), "http") +} + func NewReverseProxy(e e2e.Environment, name, tenantID, target string) *e2eobs.Observable { conf := fmt.Sprintf(` events { diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index 87e6070eb74..4474f6b75a3 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -1203,3 +1203,64 @@ func TestQueryFrontendAnalyze(t *testing.T) { require.Equal(t, true, r.MatchString(strings.TrimSpace(string(body)))) } + +func TestQueryFrontendProtection(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("qfe-protection") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + q := e2ethanos.NewQuerierBuilder(e, "1").Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + t.Run("block action returns 400", func(t *testing.T) { + qfe := e2ethanos.NewQueryFrontendWithProtection(e, "block", "http://"+q.InternalEndpoint("http"), + queryfrontend.Config{}, + queryfrontend.CacheProviderConfig{Type: queryfrontend.INMEMORY}, + ` +rules: + - name: block-all + protection: noop + action: block + actor: ".*" + enabled: true +`) + testutil.Ok(t, e2e.StartAndWaitReady(qfe)) + + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/v1/query_range?query=up&start=0&end=3600&step=60", qfe.Endpoint("http")), nil) + require.NoError(t, err) + req.Header.Set("X-Source", "test-actor") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, resp.Body.Close()) }) + + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + + t.Run("log action passes through", func(t *testing.T) { + qfe := e2ethanos.NewQueryFrontendWithProtection(e, "log", "http://"+q.InternalEndpoint("http"), + queryfrontend.Config{}, + queryfrontend.CacheProviderConfig{Type: queryfrontend.INMEMORY}, + ` +rules: + - name: log-all + protection: noop + action: log + actor: ".*" + enabled: true +`) + testutil.Ok(t, e2e.StartAndWaitReady(qfe)) + + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/v1/query_range?query=up&start=0&end=3600&step=60", qfe.Endpoint("http")), nil) + require.NoError(t, err) + req.Header.Set("X-Source", "test-actor") + + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, resp.Body.Close()) }) + + require.Equal(t, http.StatusOK, resp.StatusCode) + }) +} From c667ddaf04e029614bc543df15af9619402eaf44 Mon Sep 17 00:00:00 2001 From: Yi Shi Date: Wed, 18 Mar 2026 22:57:53 +0000 Subject: [PATCH 4/6] remove spicific new query frontend protction, reuse NewQueryFrontend --- test/e2e/compatibility_test.go | 2 +- test/e2e/e2ethanos/services.go | 68 +++++++++--------------------- test/e2e/native_histograms_test.go | 2 +- test/e2e/query_frontend_test.go | 21 ++++----- 4 files changed, 34 insertions(+), 59 deletions(-) diff --git a/test/e2e/compatibility_test.go b/test/e2e/compatibility_test.go index c910c96f3f3..0accd374773 100644 --- a/test/e2e/compatibility_test.go +++ b/test/e2e/compatibility_test.go @@ -363,5 +363,5 @@ func newQueryFrontendRunnable(e e2e.Environment, name, downstreamURL string) *e2 }, NumShards: 3, } - return e2ethanos.NewQueryFrontend(e, name, downstreamURL, config, inMemoryCacheConfig) + return e2ethanos.NewQueryFrontend(e, name, downstreamURL, config, inMemoryCacheConfig, "") } diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index a932673f88f..c55d863f057 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -1003,12 +1003,19 @@ func (c *CompactorBuilder) Init(bucketConfig client.BucketConfig, relabelConfig })), "http") } -func NewQueryFrontend(e e2e.Environment, name, downstreamURL string, config queryfrontend.Config, cacheConfig queryfrontend.CacheProviderConfig) *e2eobs.Observable { +// NewQueryFrontend starts a query-frontend container. +// If protectionConfigContent is non-empty, the content is written to a file and mounted into the container +// with --protection-config pointing to it. +func NewQueryFrontend(e e2e.Environment, name, downstreamURL string, config queryfrontend.Config, cacheConfig queryfrontend.CacheProviderConfig, protectionConfigContent string) *e2eobs.Observable { cacheConfigBytes, err := yaml.Marshal(cacheConfig) if err != nil { return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrapf(err, "marshal response cache config file: %v", cacheConfig))} } + f := e.Runnable(fmt.Sprintf("query-frontend-%s", name)). + WithPorts(map[string]int{"http": 8080}). + Future() + flags := map[string]string{ "--debug.name": fmt.Sprintf("query-frontend-%s", name), "--http-address": ":8080", @@ -1039,51 +1046,18 @@ func NewQueryFrontend(e e2e.Environment, name, downstreamURL string, config quer flags["--query-frontend.default-tenant"] = config.DefaultTenant } - return e2eobs.AsObservable(e.Runnable(fmt.Sprintf("query-frontend-%s", name)). - WithPorts(map[string]int{"http": 8080}). - Init(e2e.StartOptions{ - Image: DefaultImage(), - Command: e2e.NewCommand("query-frontend", e2e.BuildArgs(flags)...), - Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200), - User: strconv.Itoa(os.Getuid()), - WaitReadyBackoff: &defaultBackoffConfig, - }), "http") -} - -// NewQueryFrontendWithProtection starts a query-frontend with a protection config file mounted into the container. -// protectionConfigContent is the YAML content of the protection config. -func NewQueryFrontendWithProtection(e e2e.Environment, name, downstreamURL string, config queryfrontend.Config, cacheConfig queryfrontend.CacheProviderConfig, protectionConfigContent string) *e2eobs.Observable { - cacheConfigBytes, err := yaml.Marshal(cacheConfig) - if err != nil { - return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrapf(err, "marshal response cache config file: %v", cacheConfig))} - } - - f := e.Runnable(fmt.Sprintf("query-frontend-%s", name)). - WithPorts(map[string]int{"http": 8080}). - Future() - - if err := os.MkdirAll(f.Dir(), 0750); err != nil { - return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "create dir"))} - } - - protectionConfigPath := filepath.Join(f.Dir(), "protection.yaml") - if err := os.WriteFile(protectionConfigPath, []byte(protectionConfigContent), 0600); err != nil { - return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "write protection config file"))} - } - - const containerProtectionConfigPath = "/etc/thanos/protection.yaml" - - flags := map[string]string{ - "--debug.name": fmt.Sprintf("query-frontend-%s", name), - "--http-address": ":8080", - "--query-frontend.downstream-url": downstreamURL, - "--log.level": infoLogLevel, - "--query-range.response-cache-config": string(cacheConfigBytes), - "--protection-config": containerProtectionConfigPath, - } - - if !config.QueryRangeConfig.AlignRangeWithStep { - flags["--no-query-range.align-range-with-step"] = "" + var volumes []string + if protectionConfigContent != "" { + if err := os.MkdirAll(f.Dir(), 0750); err != nil { + return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "create dir"))} + } + protectionConfigPath := filepath.Join(f.Dir(), "protection.yaml") + if err := os.WriteFile(protectionConfigPath, []byte(protectionConfigContent), 0600); err != nil { + return &e2eobs.Observable{Runnable: e2e.NewFailedRunnable(name, errors.Wrap(err, "write protection config file"))} + } + const containerProtectionConfigPath = "/etc/thanos/protection.yaml" + flags["--protection-config"] = containerProtectionConfigPath + volumes = []string{protectionConfigPath + ":" + containerProtectionConfigPath + ":ro"} } return e2eobs.AsObservable(f.Init(e2e.StartOptions{ @@ -1092,7 +1066,7 @@ func NewQueryFrontendWithProtection(e e2e.Environment, name, downstreamURL strin Readiness: e2e.NewHTTPReadinessProbe("http", "/-/ready", 200, 200), User: strconv.Itoa(os.Getuid()), WaitReadyBackoff: &defaultBackoffConfig, - Volumes: []string{protectionConfigPath + ":" + containerProtectionConfigPath + ":ro"}, + Volumes: volumes, }), "http") } diff --git a/test/e2e/native_histograms_test.go b/test/e2e/native_histograms_test.go index f2ece6adb75..8d53e0dfaf8 100644 --- a/test/e2e/native_histograms_test.go +++ b/test/e2e/native_histograms_test.go @@ -177,7 +177,7 @@ func TestQueryFrontendNativeHistograms(t *testing.T) { }, } - queryFrontend := e2ethanos.NewQueryFrontend(e, "query-frontend", "http://"+querier.InternalEndpoint("http"), queryfrontend.Config{}, inMemoryCacheConfig) + queryFrontend := e2ethanos.NewQueryFrontend(e, "query-frontend", "http://"+querier.InternalEndpoint("http"), queryfrontend.Config{}, inMemoryCacheConfig, "") testutil.Ok(t, e2e.StartAndWaitReady(queryFrontend)) rawRemoteWriteURL1 := "http://" + prom1.Endpoint("http") + "/api/v1/write" diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index 4474f6b75a3..9bc285a36c2 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -62,7 +62,7 @@ func TestQueryFrontend(t *testing.T) { } cfg := queryfrontend.Config{} - queryFrontend := e2ethanos.NewQueryFrontend(e, "1", "http://"+q.InternalEndpoint("http"), cfg, inMemoryCacheConfig) + queryFrontend := e2ethanos.NewQueryFrontend(e, "1", "http://"+q.InternalEndpoint("http"), cfg, inMemoryCacheConfig, "") testutil.Ok(t, e2e.StartAndWaitReady(queryFrontend)) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) @@ -466,7 +466,7 @@ func TestQueryFrontendMemcachedCache(t *testing.T) { } cfg := queryfrontend.Config{} - queryFrontend := e2ethanos.NewQueryFrontend(e, "1", "http://"+q.InternalEndpoint("http"), cfg, memCachedConfig) + queryFrontend := e2ethanos.NewQueryFrontend(e, "1", "http://"+q.InternalEndpoint("http"), cfg, memCachedConfig, "") testutil.Ok(t, e2e.StartAndWaitReady(queryFrontend)) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) @@ -646,7 +646,7 @@ func TestRangeQueryShardingWithRandomData(t *testing.T) { }, NumShards: 2, } - qfe := e2ethanos.NewQueryFrontend(e, "query-frontend", "http://"+q1.InternalEndpoint("http"), config, inMemoryCacheConfig) + qfe := e2ethanos.NewQueryFrontend(e, "query-frontend", "http://"+q1.InternalEndpoint("http"), config, inMemoryCacheConfig, "") testutil.Ok(t, e2e.StartAndWaitReady(qfe)) qryFunc := func() string { return `sum by (pod) (http_requests_total)` } @@ -700,7 +700,7 @@ func TestRangeQueryDynamicHorizontalSharding(t *testing.T) { SplitQueriesByInterval: 0, }, } - queryFrontend := e2ethanos.NewQueryFrontend(e, "1", "http://"+querier.InternalEndpoint("http"), cfg, inMemoryCacheConfig) + queryFrontend := e2ethanos.NewQueryFrontend(e, "1", "http://"+querier.InternalEndpoint("http"), cfg, inMemoryCacheConfig, "") testutil.Ok(t, e2e.StartAndWaitReady(queryFrontend)) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) @@ -851,7 +851,7 @@ func TestInstantQueryShardingWithRandomData(t *testing.T) { }, NumShards: 2, } - qfe := e2ethanos.NewQueryFrontend(e, "query-frontend", "http://"+q1.InternalEndpoint("http"), config, inMemoryCacheConfig) + qfe := e2ethanos.NewQueryFrontend(e, "query-frontend", "http://"+q1.InternalEndpoint("http"), config, inMemoryCacheConfig, "") testutil.Ok(t, e2e.StartAndWaitReady(qfe)) queryOpts := promclient.QueryOptions{Deduplicate: true} @@ -992,6 +992,7 @@ func TestQueryFrontendTenantForward(t *testing.T) { fmt.Sprintf("http://%s:%s", e.HostAddr(), tsPort), queryFrontendConfig, inMemoryCacheConfig, + "", ) testutil.Ok(t, e2e.StartAndWaitReady(queryFrontend)) @@ -1059,7 +1060,7 @@ func TestTenantQFEHTTPMetrics(t *testing.T) { } cfg := queryfrontend.Config{} - queryFrontend := e2ethanos.NewQueryFrontend(e, "1", "http://"+q.InternalEndpoint("http"), cfg, inMemoryCacheConfig) + queryFrontend := e2ethanos.NewQueryFrontend(e, "1", "http://"+q.InternalEndpoint("http"), cfg, inMemoryCacheConfig, "") ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) t.Cleanup(cancel) @@ -1158,7 +1159,7 @@ func TestQueryFrontendExplain(t *testing.T) { qfe := e2ethanos.NewQueryFrontend(e, "1", "http://"+q.InternalEndpoint("http"), queryfrontend.Config{}, queryfrontend.CacheProviderConfig{ Type: queryfrontend.INMEMORY, - }) + }, "") testutil.Ok(t, e2e.StartAndWaitReady(qfe)) resp, err := http.Get(fmt.Sprintf("http://%s/api/v1/query_explain?query=time()&engine=thanos", qfe.Endpoint("http"))) @@ -1184,7 +1185,7 @@ func TestQueryFrontendAnalyze(t *testing.T) { qfe := e2ethanos.NewQueryFrontend(e, "1", "http://"+q.InternalEndpoint("http"), queryfrontend.Config{}, queryfrontend.CacheProviderConfig{ Type: queryfrontend.INMEMORY, - }) + }, "") testutil.Ok(t, e2e.StartAndWaitReady(qfe)) resp, err := http.Get(fmt.Sprintf("http://%s/api/v1/query?query=time()&engine=thanos&analyze=true", qfe.Endpoint("http"))) @@ -1215,7 +1216,7 @@ func TestQueryFrontendProtection(t *testing.T) { testutil.Ok(t, e2e.StartAndWaitReady(q)) t.Run("block action returns 400", func(t *testing.T) { - qfe := e2ethanos.NewQueryFrontendWithProtection(e, "block", "http://"+q.InternalEndpoint("http"), + qfe := e2ethanos.NewQueryFrontend(e, "block", "http://"+q.InternalEndpoint("http"), queryfrontend.Config{}, queryfrontend.CacheProviderConfig{Type: queryfrontend.INMEMORY}, ` @@ -1240,7 +1241,7 @@ rules: }) t.Run("log action passes through", func(t *testing.T) { - qfe := e2ethanos.NewQueryFrontendWithProtection(e, "log", "http://"+q.InternalEndpoint("http"), + qfe := e2ethanos.NewQueryFrontend(e, "log", "http://"+q.InternalEndpoint("http"), queryfrontend.Config{}, queryfrontend.CacheProviderConfig{Type: queryfrontend.INMEMORY}, ` From 232af5d5b65508e96d7fa39ef10c314c5c970457 Mon Sep 17 00:00:00 2001 From: Yi Shi Date: Thu, 19 Mar 2026 00:00:01 +0000 Subject: [PATCH 5/6] lint --- pkg/queryfrontend/protection_config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/queryfrontend/protection_config.go b/pkg/queryfrontend/protection_config.go index e8f781a154d..019e3e33d50 100644 --- a/pkg/queryfrontend/protection_config.go +++ b/pkg/queryfrontend/protection_config.go @@ -97,7 +97,7 @@ func parseAction(s string) (RuleAction, error) { // WatchConfig watches the protection config file for changes and reloads the engine. // It polls the file at the given interval, detecting changes via SHA256 checksum. -// Blocks until ctx is cancelled. +// Blocks until ctx is canceled. func WatchConfig(ctx context.Context, engine *ProtectionEngine, path string, logger log.Logger, interval time.Duration) error { if err := reloadRules(engine, path, logger); err != nil { return errors.Wrap(err, "load initial protection config") From 91bb2c3fd6d7d6e29946840e260a4d5a360812c9 Mon Sep 17 00:00:00 2001 From: Yi Shi Date: Fri, 20 Mar 2026 18:51:28 +0000 Subject: [PATCH 6/6] update protection config to use always-match instead of noop --- pkg/queryfrontend/protection_config_test.go | 20 ++++++++++---------- test/e2e/query_frontend_test.go | 4 ++-- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/queryfrontend/protection_config_test.go b/pkg/queryfrontend/protection_config_test.go index 843f8f27c0a..29d5544fb01 100644 --- a/pkg/queryfrontend/protection_config_test.go +++ b/pkg/queryfrontend/protection_config_test.go @@ -33,12 +33,12 @@ func TestLoadRulesFromFile_Valid(t *testing.T) { path := writeConfigFile(t, dir, ` rules: - name: noop-log - protection: noop + protection: always-match action: log actor: ".*" enabled: true - name: noop-block - protection: noop + protection: always-match action: block actor: "^admin$" enabled: false @@ -60,7 +60,7 @@ func TestLoadRulesFromFile_InvalidYAML(t *testing.T) { path := writeConfigFile(t, dir, ` rules: - name: bad - protection: noop + protection: always-match action: log actor: ".*" args: @@ -94,7 +94,7 @@ func TestLoadRulesFromFile_UnknownAction(t *testing.T) { path := writeConfigFile(t, dir, ` rules: - name: bad - protection: noop + protection: always-match action: unknown actor: ".*" enabled: true @@ -109,7 +109,7 @@ func TestLoadRulesFromFile_EmptyName(t *testing.T) { dir := t.TempDir() path := writeConfigFile(t, dir, ` rules: - - protection: noop + - protection: always-match action: log actor: ".*" enabled: true @@ -125,7 +125,7 @@ func TestLoadRulesFromFile_EmptyActor(t *testing.T) { path := writeConfigFile(t, dir, ` rules: - name: bad - protection: noop + protection: always-match action: log enabled: true `) @@ -140,7 +140,7 @@ func TestLoadRulesFromFile_InvalidActorRegex(t *testing.T) { path := writeConfigFile(t, dir, ` rules: - name: bad - protection: noop + protection: always-match action: log actor: "[" enabled: true @@ -156,7 +156,7 @@ func TestWatchConfig_InitialLoad(t *testing.T) { path := writeConfigFile(t, dir, ` rules: - name: initial-rule - protection: noop + protection: always-match action: log actor: ".*" enabled: true @@ -190,7 +190,7 @@ func TestWatchConfig_ReloadsOnFileChange(t *testing.T) { path := writeConfigFile(t, dir, ` rules: - name: rule-v1 - protection: noop + protection: always-match action: log actor: ".*" enabled: true @@ -216,7 +216,7 @@ rules: require.NoError(t, os.WriteFile(path, []byte(` rules: - name: rule-v2 - protection: noop + protection: always-match action: block actor: ".*" enabled: true diff --git a/test/e2e/query_frontend_test.go b/test/e2e/query_frontend_test.go index 9bc285a36c2..55f5ed17b8f 100644 --- a/test/e2e/query_frontend_test.go +++ b/test/e2e/query_frontend_test.go @@ -1222,7 +1222,7 @@ func TestQueryFrontendProtection(t *testing.T) { ` rules: - name: block-all - protection: noop + protection: always-match action: block actor: ".*" enabled: true @@ -1247,7 +1247,7 @@ rules: ` rules: - name: log-all - protection: noop + protection: always-match action: log actor: ".*" enabled: true