Skip to content

Commit 36b0a90

Browse files
authored
tests(launcher): add a launcher test for read window aggregate push down (#18412)
This adds a launcher test for the read window aggregate push down to verify that it is done when a query is sent with the appropriate pattern, the output is correct, and that the metric is incremented that signals the push down happened.
1 parent 9f4e60d commit 36b0a90

File tree

3 files changed

+127
-9
lines changed

3 files changed

+127
-9
lines changed

cmd/influxd/launcher/launcher.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ type Launcher struct {
337337
secretStore string
338338

339339
featureFlags map[string]string
340+
flagger feature.Flagger
340341

341342
// Query options.
342343
concurrencyQuota int
@@ -849,7 +850,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
849850
Addr: m.httpBindAddress,
850851
}
851852

852-
flagger := feature.DefaultFlagger()
853+
m.flagger = feature.DefaultFlagger()
853854
if len(m.featureFlags) > 0 {
854855
f, err := overrideflagger.Make(m.featureFlags, feature.ByKey)
855856
if err != nil {
@@ -858,15 +859,15 @@ func (m *Launcher) run(ctx context.Context) (err error) {
858859
return err
859860
}
860861
m.log.Info("Running with feature flag overrides", zap.Any("overrides", m.featureFlags))
861-
flagger = f
862+
m.flagger = f
862863
}
863864

864865
var sessionSvc platform.SessionService
865866
{
866867
sessionSvc = session.NewService(session.NewStorage(inmem.NewSessionStore()), userSvc, userResourceSvc, authSvc, time.Duration(m.sessionLength)*time.Minute)
867868
sessionSvc = session.NewSessionMetrics(m.reg, sessionSvc)
868869
sessionSvc = session.NewSessionLogger(m.log.With(zap.String("service", "session")), sessionSvc)
869-
sessionSvc = session.NewServiceController(flagger, m.kvService, sessionSvc)
870+
sessionSvc = session.NewServiceController(m.flagger, m.kvService, sessionSvc)
870871
}
871872

872873
var labelSvc platform.LabelService
@@ -877,7 +878,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
877878
return err
878879
}
879880
ls := label.NewService(labelsStore)
880-
labelSvc = label.NewLabelController(flagger, m.kvService, ls)
881+
labelSvc = label.NewLabelController(m.flagger, m.kvService, ls)
881882
}
882883

883884
m.apibackend = &http.APIBackend{
@@ -925,7 +926,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
925926
OrgLookupService: m.kvService,
926927
WriteEventRecorder: infprom.NewEventRecorder("write"),
927928
QueryEventRecorder: infprom.NewEventRecorder("query"),
928-
Flagger: flagger,
929+
Flagger: m.flagger,
929930
FlagsHandler: feature.NewFlagsHandler(kithttp.ErrorHandler(0), feature.ByKey),
930931
}
931932

@@ -993,7 +994,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
993994
labelSvc = label.NewLabelMetrics(m.reg, labelSvc)
994995
newHandler := label.NewHTTPLabelHandler(m.log, labelSvc)
995996

996-
labelsHTTPServer = kithttp.NewFeatureHandler(feature.NewLabelPackage(), flagger, oldHandler, newHandler, newHandler.Prefix())
997+
labelsHTTPServer = kithttp.NewFeatureHandler(feature.NewLabelPackage(), m.flagger, oldHandler, newHandler, newHandler.Prefix())
997998
}
998999

9991000
// feature flagging for new authorization service
@@ -1017,7 +1018,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
10171018
authService = authorization.NewAuthLogger(authLogger, authService)
10181019

10191020
newHandler := authorization.NewHTTPAuthHandler(m.log, authService, ts, lookupSvc)
1020-
authHTTPServer = kithttp.NewFeatureHandler(feature.NewAuthPackage(), flagger, oldHandler, newHandler, newHandler.Prefix())
1021+
authHTTPServer = kithttp.NewFeatureHandler(feature.NewAuthPackage(), m.flagger, oldHandler, newHandler, newHandler.Prefix())
10211022
}
10221023

10231024
var oldSessionHandler nethttp.Handler
@@ -1033,8 +1034,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
10331034
http.WithResourceHandler(onboardHTTPServer),
10341035
http.WithResourceHandler(authHTTPServer),
10351036
http.WithResourceHandler(labelsHTTPServer),
1036-
http.WithResourceHandler(kithttp.NewFeatureHandler(feature.SessionService(), flagger, oldSessionHandler, sessionHTTPServer.SignInResourceHandler(), sessionHTTPServer.SignInResourceHandler().Prefix())),
1037-
http.WithResourceHandler(kithttp.NewFeatureHandler(feature.SessionService(), flagger, oldSessionHandler, sessionHTTPServer.SignOutResourceHandler(), sessionHTTPServer.SignOutResourceHandler().Prefix())),
1037+
http.WithResourceHandler(kithttp.NewFeatureHandler(feature.SessionService(), m.flagger, oldSessionHandler, sessionHTTPServer.SignInResourceHandler(), sessionHTTPServer.SignInResourceHandler().Prefix())),
1038+
http.WithResourceHandler(kithttp.NewFeatureHandler(feature.SessionService(), m.flagger, oldSessionHandler, sessionHTTPServer.SignOutResourceHandler(), sessionHTTPServer.SignOutResourceHandler().Prefix())),
10381039
http.WithResourceHandler(userHTTPServer.MeResourceHandler()),
10391040
http.WithResourceHandler(userHTTPServer.UserResourceHandler()),
10401041
)

cmd/influxd/launcher/launcher_helpers.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@ import (
2020
"github.com/influxdata/influxdb/v2/bolt"
2121
influxdbcontext "github.com/influxdata/influxdb/v2/context"
2222
"github.com/influxdata/influxdb/v2/http"
23+
"github.com/influxdata/influxdb/v2/kit/feature"
2324
"github.com/influxdata/influxdb/v2/mock"
2425
"github.com/influxdata/influxdb/v2/pkg/httpc"
2526
"github.com/influxdata/influxdb/v2/pkger"
2627
"github.com/influxdata/influxdb/v2/query"
28+
dto "github.com/prometheus/client_model/go"
29+
"github.com/prometheus/common/expfmt"
2730
)
2831

2932
// TestLauncher is a test wrapper for launcher.Launcher.
@@ -224,6 +227,7 @@ func (tl *TestLauncher) MustExecuteQuery(query string) *QueryResults {
224227
// Callers of ExecuteQuery must call Done on the returned QueryResults.
225228
func (tl *TestLauncher) ExecuteQuery(q string) (*QueryResults, error) {
226229
ctx := influxdbcontext.SetAuthorizer(context.Background(), mock.NewMockAuthorizer(true, nil))
230+
ctx, _ = feature.Annotate(ctx, tl.flagger)
227231
fq, err := tl.QueryController().Query(ctx, &query.Request{
228232
Authorization: tl.Auth,
229233
OrganizationID: tl.Auth.OrgID,
@@ -408,6 +412,25 @@ func (tl *TestLauncher) HTTPClient(tb testing.TB) *httpc.Client {
408412
return tl.httpClient
409413
}
410414

415+
func (tl *TestLauncher) Metrics(tb testing.TB) (metrics map[string]*dto.MetricFamily) {
416+
req := tl.HTTPClient(tb).
417+
Get("/metrics").
418+
RespFn(func(resp *nethttp.Response) error {
419+
if resp.StatusCode != nethttp.StatusOK {
420+
return fmt.Errorf("unexpected status code: %d %s", resp.StatusCode, resp.Status)
421+
}
422+
defer func() { _ = resp.Body.Close() }()
423+
424+
var parser expfmt.TextParser
425+
metrics, _ = parser.TextToMetricFamilies(resp.Body)
426+
return nil
427+
})
428+
if err := req.Do(context.Background()); err != nil {
429+
tb.Fatal(err)
430+
}
431+
return metrics
432+
}
433+
411434
// QueryResult wraps a single flux.Result with some helper methods.
412435
type QueryResult struct {
413436
t *testing.T

cmd/influxd/launcher/query_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"html/template"
99
"io"
10+
"io/ioutil"
1011
"math/rand"
1112
nethttp "net/http"
1213
"strings"
@@ -15,6 +16,7 @@ import (
1516
"time"
1617

1718
"github.com/influxdata/flux"
19+
"github.com/influxdata/flux/csv"
1820
"github.com/influxdata/flux/execute"
1921
"github.com/influxdata/flux/execute/executetest"
2022
"github.com/influxdata/flux/lang"
@@ -745,3 +747,95 @@ from(bucket: "%s")
745747
t.Fatal(err)
746748
}
747749
}
750+
751+
func TestLauncher_Query_PushDownWindowAggregate(t *testing.T) {
752+
l := launcher.RunTestLauncherOrFail(t, ctx,
753+
"--feature-flags", "pushDownWindowAggregateCount=true")
754+
l.SetupOrFail(t)
755+
defer l.ShutdownOrFail(t, ctx)
756+
757+
l.WritePointsOrFail(t, `
758+
m0,k=k0 f=0i 0
759+
m0,k=k0 f=1i 1000000000
760+
m0,k=k0 f=2i 2000000000
761+
m0,k=k0 f=3i 3000000000
762+
m0,k=k0 f=4i 4000000000
763+
m0,k=k0 f=5i 5000000000
764+
m0,k=k0 f=6i 6000000000
765+
m0,k=k0 f=5i 7000000000
766+
m0,k=k0 f=0i 8000000000
767+
m0,k=k0 f=6i 9000000000
768+
m0,k=k0 f=6i 10000000000
769+
m0,k=k0 f=7i 11000000000
770+
m0,k=k0 f=5i 12000000000
771+
m0,k=k0 f=8i 13000000000
772+
m0,k=k0 f=9i 14000000000
773+
m0,k=k0 f=5i 15000000000
774+
`)
775+
776+
getReadRequestCount := func() uint64 {
777+
const metricName = "query_influxdb_source_read_request_duration_seconds"
778+
mf := l.Metrics(t)[metricName]
779+
if mf != nil {
780+
for _, m := range mf.Metric {
781+
for _, label := range m.Label {
782+
if label.GetName() == "op" && label.GetValue() == "readWindowAggregate" {
783+
return m.Histogram.GetSampleCount()
784+
}
785+
}
786+
}
787+
}
788+
return 0
789+
}
790+
791+
for _, tt := range []struct {
792+
name string
793+
q string
794+
res string
795+
}{
796+
{
797+
name: "count",
798+
q: `
799+
from(bucket: v.bucket)
800+
|> range(start: 1970-01-01T00:00:00Z, stop: 1970-01-01T00:00:15Z)
801+
|> aggregateWindow(every: 5s, fn: count)
802+
|> drop(columns: ["_start", "_stop"])
803+
`,
804+
res: `
805+
#datatype,string,long,long,string,string,string,dateTime:RFC3339
806+
#group,false,false,false,true,true,true,false
807+
#default,_result,,,,,,
808+
,result,table,_value,_field,_measurement,k,_time
809+
,,0,5,f,m0,k0,1970-01-01T00:00:05Z
810+
,,0,5,f,m0,k0,1970-01-01T00:00:10Z
811+
,,0,5,f,m0,k0,1970-01-01T00:00:15Z
812+
`,
813+
},
814+
} {
815+
t.Run(tt.name, func(t *testing.T) {
816+
wantCount := getReadRequestCount() + 1
817+
818+
prelude := fmt.Sprintf("v = {bucket: \"%s\", timeRangeStart: 1970-01-01T00:00:00Z, timeRangeStop: 1970-01-01T00:00:15Z}", l.Bucket.Name)
819+
queryStr := prelude + "\n" + tt.q
820+
res := l.MustExecuteQuery(queryStr)
821+
defer res.Done()
822+
got := flux.NewSliceResultIterator(res.Results)
823+
defer got.Release()
824+
825+
dec := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
826+
want, err := dec.Decode(ioutil.NopCloser(strings.NewReader(tt.res)))
827+
if err != nil {
828+
t.Fatal(err)
829+
}
830+
defer want.Release()
831+
832+
if err := executetest.EqualResultIterators(want, got); err != nil {
833+
t.Fatal(err)
834+
}
835+
836+
if want, got := wantCount, getReadRequestCount(); want != got {
837+
t.Fatalf("unexpected sample count -want/+got:\n\t- %d\n\t+ %d", want, got)
838+
}
839+
})
840+
}
841+
}

0 commit comments

Comments
 (0)