Skip to content

[feat/query] Update the Prometheus-related code to utilize the new annotations package, and adjust the signatures of the query methods to support context passing. #4319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 136 additions & 81 deletions go.mod

Large diffs are not rendered by default.

1,168 changes: 470 additions & 698 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/m3ninx/index/segment/fst/fst_terms_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (f *fstTermsIter) Next() bool {

if f.firstNext {
f.firstNext = false
if err := f.iter.Reset(f.opts.fst, nil, nil, nil); err != nil {
if err := f.iter.Reset(f.opts.fst, nil, nil, nil, true); err != nil {
f.handleIterErr(err)
return false
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/api/v1/handler/prom/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

jsoniter "github.com/json-iterator/go"
promql "github.com/prometheus/prometheus/promql/parser"
promstorage "github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"

xhttp "github.com/m3db/m3/src/x/net/http"
)
Expand Down Expand Up @@ -68,7 +68,7 @@ type response struct {
}

// Respond responds with HTTP OK status code and writes response JSON to response body.
func Respond(w http.ResponseWriter, data interface{}, warnings promstorage.Warnings) error {
func Respond(w http.ResponseWriter, data interface{}, warnings annotations.Annotations) error {
statusMessage := statusSuccess
var warningStrings []string
for _, warning := range warnings {
Expand Down
30 changes: 14 additions & 16 deletions src/query/api/v1/handler/prom/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@ package prom
import (
"context"
"errors"
"log/slog"
"os"
"time"

"github.com/go-kit/kit/log"
kitlogzap "github.com/go-kit/kit/log/zap"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
promstorage "github.com/prometheus/prometheus/storage"
"go.uber.org/zap/zapcore"

"github.com/m3db/m3/src/x/instrument"
"github.com/prometheus/prometheus/util/annotations"
)

type mockQuerier struct {
Expand All @@ -44,12 +42,13 @@ type mockSeriesSet struct {
promstorage.SeriesSet
}

func (m *mockSeriesSet) Next() bool { return false }
func (m *mockSeriesSet) At() promstorage.Series { return nil }
func (m *mockSeriesSet) Err() error { return nil }
func (m *mockSeriesSet) Warnings() promstorage.Warnings { return nil }
func (m *mockSeriesSet) Next() bool { return false }
func (m *mockSeriesSet) At() promstorage.Series { return nil }
func (m *mockSeriesSet) Err() error { return nil }
func (m *mockSeriesSet) Warnings() annotations.Annotations { return nil }

func (q *mockQuerier) Select(
ctx context.Context,
sortSeries bool,
hints *promstorage.SelectHints,
labelMatchers ...*labels.Matcher,
Expand All @@ -60,11 +59,11 @@ func (q *mockQuerier) Select(
return &mockSeriesSet{mockOptions: q.mockOptions}
}

func (*mockQuerier) LabelValues(string, ...*labels.Matcher) ([]string, promstorage.Warnings, error) {
func (*mockQuerier) LabelValues(context.Context, string, *promstorage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, errors.New("not implemented")
}

func (*mockQuerier) LabelNames(...*labels.Matcher) ([]string, promstorage.Warnings, error) {
func (*mockQuerier) LabelNames(context.Context, *promstorage.LabelHints, ...*labels.Matcher) ([]string, annotations.Annotations, error) {
return nil, nil, errors.New("not implemented")
}

Expand All @@ -84,7 +83,7 @@ type mockQueryable struct {
mockOptions
}

func (q *mockQueryable) Querier(_ context.Context, _, _ int64) (promstorage.Querier, error) {
func (q *mockQueryable) Querier(_, _ int64) (promstorage.Querier, error) {
return &mockQuerier{mockOptions: q.mockOptions}, nil
}

Expand All @@ -94,10 +93,9 @@ func durationMilliseconds(d time.Duration) int64 {

func newMockPromQLEngine() *promql.Engine {
var (
instrumentOpts = instrument.NewOptions()
kitLogger = kitlogzap.NewZapSugarLogger(instrumentOpts.Logger(), zapcore.InfoLevel)
opts = promql.EngineOpts{
Logger: log.With(kitLogger, "component", "query engine"),
slogLogger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
opts = promql.EngineOpts{
Logger: slogLogger,
MaxSamples: 100,
Timeout: 1 * time.Minute,
NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 {
Expand Down
8 changes: 6 additions & 2 deletions src/query/api/v1/handler/prom/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ var (
return nil, err
}
return engine.NewRangeQuery(
context.Background(),
queryable,
nil,
params.Query,
params.Start.ToTime(),
params.End.ToTime(),
Expand All @@ -79,7 +81,9 @@ var (
return nil, err
}
return engine.NewInstantQuery(
context.Background(),
queryable,
nil,
params.Query,
params.Now)
}
Expand Down Expand Up @@ -178,7 +182,7 @@ func (h *readHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

for _, warn := range resultMetadata.Warnings {
res.Warnings = append(res.Warnings, errors.New(warn.Message))
res.Warnings.Add(errors.New(warn.Message))
}

query := params.Query
Expand Down Expand Up @@ -288,7 +292,7 @@ func (h *readHandler) limitReturnedData(query string,
}

for _, d := range m {
datapointCount := len(d.Points)
datapointCount := len(d.Floats)
if fetchOpts.ReturnedSeriesLimit > 0 && series+1 > fetchOpts.ReturnedSeriesLimit {
limited = true
break
Expand Down
26 changes: 13 additions & 13 deletions src/query/api/v1/handler/prom/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,9 @@ func TestLimitedReturnedDataVector(t *testing.T) {

r := &promql.Result{
Value: promql.Vector{
{Point: promql.Point{T: 1, V: 1.0}},
{Point: promql.Point{T: 2, V: 2.0}},
{Point: promql.Point{T: 3, V: 3.0}},
{T: 1, F: 1.0},
{T: 2, F: 2.0},
{T: 3, F: 3.0},
},
}

Expand Down Expand Up @@ -450,17 +450,17 @@ func TestLimitedReturnedDataMatrix(t *testing.T) {

r := &promql.Result{
Value: promql.Matrix{
{Points: []promql.Point{
{T: 1, V: 1.0},
{Floats: []promql.FPoint{
{T: 1, F: 1.0},
}},
{Points: []promql.Point{
{T: 1, V: 1.0},
{T: 2, V: 2.0},
{Floats: []promql.FPoint{
{T: 1, F: 1.0},
{T: 2, F: 2.0},
}},
{Points: []promql.Point{
{T: 1, V: 1.0},
{T: 2, V: 2.0},
{T: 3, V: 3.0},
{Floats: []promql.FPoint{
{T: 1, F: 1.0},
{T: 2, F: 2.0},
{T: 3, F: 3.0},
}},
},
}
Expand Down Expand Up @@ -580,7 +580,7 @@ func TestLimitedReturnedDataMatrix(t *testing.T) {
seriesCount := len(m)
datapointCount := 0
for _, d := range m {
datapointCount += len(d.Points)
datapointCount += len(d.Floats)
}

if limited.Limited {
Expand Down
6 changes: 3 additions & 3 deletions src/query/api/v1/middleware/rewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ type fakeQueryable struct {
calculatedEndTime time.Time
}

func (f *fakeQueryable) Querier(ctx context.Context, mint, maxt int64) (promstorage.Querier, error) {
func (f *fakeQueryable) Querier(mint, maxt int64) (promstorage.Querier, error) {
f.calculatedStartTime = xtime.FromUnixMillis(mint)
f.calculatedEndTime = xtime.FromUnixMillis(maxt)
// fail here to cause prometheus to give up on query execution
Expand All @@ -371,9 +371,9 @@ func (f *fakeQueryable) calculateQueryBounds(
var query promql.Query
if f.instant {
// startTime and endTime are the same for instant queries
query, err = f.engine.NewInstantQuery(f, q, start)
query, err = f.engine.NewInstantQuery(context.Background(), f, nil, q, start)
} else {
query, err = f.engine.NewRangeQuery(f, q, start, end, step)
query, err = f.engine.NewRangeQuery(context.Background(), f, nil, q, start, end, step)
}
if err != nil {
return err
Expand Down
12 changes: 5 additions & 7 deletions src/query/api/v1/middleware/rewrite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ package middleware
import (
"bytes"
"io/ioutil"
"log/slog"
"net/http"
"net/http/httptest"
"net/url"
"os"
"testing"
"time"

"github.com/go-kit/kit/log"
kitlogzap "github.com/go-kit/kit/log/zap"
"github.com/gorilla/mux"
"github.com/prometheus/prometheus/promql"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

"github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions"
"github.com/m3db/m3/src/query/storage/m3/storagemetadata"
Expand Down Expand Up @@ -355,10 +354,9 @@ func durationMilliseconds(d time.Duration) int64 {

func makeBaseOpts(t *testing.T, r *mux.Router, addPromEngine bool) Options {
var (
instrumentOpts = instrument.NewOptions()
kitLogger = kitlogzap.NewZapSugarLogger(instrumentOpts.Logger(), zapcore.InfoLevel)
engineOpts = promql.EngineOpts{
Logger: log.With(kitLogger, "component", "query engine"),
slogLogger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
engineOpts = promql.EngineOpts{
Logger: slogLogger,
MaxSamples: 100,
Timeout: 1 * time.Minute,
NoStepSubqueryIntervalFn: func(rangeMillis int64) int64 {
Expand Down
10 changes: 4 additions & 6 deletions src/query/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,19 @@ package server
import (
"context"
"fmt"
"log/slog"
"math/rand"
"net"
"net/http"
"os"
"strings"
"time"

"github.com/go-kit/kit/log"
kitlogzap "github.com/go-kit/kit/log/zap"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
extprom "github.com/prometheus/client_golang/prometheus"
prometheuspromql "github.com/prometheus/prometheus/promql"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc"
Expand Down Expand Up @@ -1376,9 +1374,9 @@ func newPromQLEngine(

instrumentOpts.Logger().Debug("creating new PromQL engine", zap.Duration("lookbackDelta", lookbackDelta))
var (
kitLogger = kitlogzap.NewZapSugarLogger(instrumentOpts.Logger(), zapcore.InfoLevel)
opts = prometheuspromql.EngineOpts{
Logger: log.With(kitLogger, "component", "prometheus_engine"),
slogLogger = slog.New(slog.NewJSONHandler(os.Stdout, nil))
opts = prometheuspromql.EngineOpts{
Logger: slogLogger,
Reg: registry,
MaxSamples: cfg.Query.Prometheus.MaxSamplesPerQueryOrDefault(),
Timeout: cfg.Query.TimeoutOrDefault(),
Expand Down
Loading