Skip to content

Commit 0e6a8b9

Browse files
Igor Makhlingshatz73
authored andcommitted
TSDB-46: Prometheus MIN_OVER_TIME function omits the first value in a series (#182)
* improve debug log * add more logs * fix compilation issue and initialize logger * Apply fix to IterSortMerger. See also PR 174 * fix problem when querying multi partitions and getting data labelsets OOO * fix skipping first element issue * Add integration test for MIN, MAX aggregations * Revert "Add integration test for MIN, MAX aggregations" This reverts commit f8d0bfd. * cosmetic fixes; see comments in PR
1 parent b87487e commit 0e6a8b9

File tree

4 files changed

+66
-27
lines changed

4 files changed

+66
-27
lines changed

pkg/querier/querier.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/v3io/v3io-tsdb/pkg/utils"
3232
"sort"
3333
"strings"
34+
"time"
3435
)
3536

3637
// Create a new Querier interface
@@ -86,16 +87,18 @@ func (q *V3ioQuerier) SelectOverlap(name, functions string, step int64, windows
8687
func (q *V3ioQuerier) selectQry(
8788
name, functions string, step int64, windows []int, filter string) (set SeriesSet, err error) {
8889

89-
set = nullSeriesSet{}
90-
91-
filter = strings.Replace(filter, "__name__", "_name", -1)
92-
q.logger.DebugWith("Select query", "metric", name, "func", functions, "step", step, "filter", filter, "disableAllAggr", q.disableAllAggr, "disableClientAggr", q.disableClientAggr, "window", windows)
9390
err = q.partitionMngr.ReadAndUpdateSchema()
94-
9591
if err != nil {
9692
return nullSeriesSet{}, errors.Wrap(err, "Failed to read/update the TSDB schema.")
9793
}
9894

95+
set = nullSeriesSet{}
96+
97+
q.logger.Debug("Select query:\n\tMetric: %s\n\tStart Time: %s (%d)\n\tEnd Time: %s (%d)\n\tFunction: %s\n\t"+
98+
"Step: %d\n\tFilter: %s\n\tWindows: %v\n\tDisable All Aggr: %t\n\tDisable Client Aggr: %t",
99+
name, time.Unix(q.mint/1000, 0).String(), q.mint, time.Unix(q.maxt/1000, 0).String(), q.maxt, functions, step,
100+
filter, windows, q.disableAllAggr, q.disableClientAggr)
101+
99102
q.performanceReporter.WithTimer("QueryTimer", func() {
100103
filter = strings.Replace(filter, "__name__", "_name", -1)
101104

pkg/querier/series.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,18 @@ func (it *v3ioSeriesIterator) Seek(t int64) bool {
147147
t = it.mint
148148
}
149149

150+
// Check the first element
151+
t0, _ := it.iter.At()
152+
if t0 > it.maxt {
153+
return false
154+
}
155+
if t <= t0 {
156+
return true
157+
}
158+
150159
for {
151160
if it.iter.Next() {
152-
t0, _ := it.At()
161+
t0, _ := it.iter.At()
153162
if t0 > it.maxt {
154163
return false
155164
}

promtsdb/promtsdb.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"github.com/nuclio/logger"
7+
"github.com/pkg/errors"
78
"github.com/prometheus/prometheus/pkg/labels"
89
"github.com/prometheus/prometheus/storage"
910
"github.com/v3io/v3io-go-http"
@@ -12,17 +13,27 @@ import (
1213
"github.com/v3io/v3io-tsdb/pkg/config"
1314
"github.com/v3io/v3io-tsdb/pkg/querier"
1415
"github.com/v3io/v3io-tsdb/pkg/tsdb"
16+
"github.com/v3io/v3io-tsdb/pkg/utils"
1517
"strings"
1618
)
1719

1820
type V3ioPromAdapter struct {
19-
db *tsdb.V3ioAdapter
21+
db *tsdb.V3ioAdapter
22+
logger logger.Logger
2023
}
2124

2225
func NewV3ioProm(cfg *config.V3ioConfig, container *v3io.Container, logger logger.Logger) (*V3ioPromAdapter, error) {
2326

27+
if logger == nil {
28+
newLogger, err := utils.NewLogger(cfg.LogLevel)
29+
if err != nil {
30+
return nil, errors.Wrap(err, "Unable to initialize logger.")
31+
}
32+
logger = newLogger
33+
}
34+
2435
adapter, err := tsdb.NewV3ioAdapter(cfg, container, logger)
25-
newAdapter := V3ioPromAdapter{db: adapter}
36+
newAdapter := V3ioPromAdapter{db: adapter, logger: logger.GetChild("v3io-prom-adapter")}
2637
return &newAdapter, err
2738
}
2839

@@ -46,19 +57,22 @@ func (a *V3ioPromAdapter) Close() error {
4657

4758
func (a *V3ioPromAdapter) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) {
4859
v3ioQuerier, err := a.db.Querier(nil, mint, maxt)
49-
querier := V3ioPromQuerier{q: v3ioQuerier}
50-
return &querier, err
60+
promQuerier := V3ioPromQuerier{v3ioQuerier: v3ioQuerier, logger: a.logger.GetChild("v3io-prom-query")}
61+
return &promQuerier, err
5162
}
5263

5364
type V3ioPromQuerier struct {
54-
q *querier.V3ioQuerier
65+
v3ioQuerier *querier.V3ioQuerier
66+
logger logger.Logger
5567
}
5668

5769
// Select returns a set of series that matches the given label matchers.
58-
func (q *V3ioPromQuerier) Select(params *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, error) {
59-
name, filter, functions := match2filter(oms)
70+
func (promQuery *V3ioPromQuerier) Select(params *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, error) {
71+
name, filter, functions := match2filter(oms, promQuery.logger)
6072
noAggr := false
6173

74+
promQuery.logger.Debug("SelectParams: %+v", params)
75+
6276
if params.Func != "" {
6377
// only pass xx_over_time functions (just the xx part)
6478
// TODO: support count/stdxx, require changes in Prometheus: promql/functions.go, not calc aggregate twice
@@ -71,26 +85,27 @@ func (q *V3ioPromQuerier) Select(params *storage.SelectParams, oms ...*labels.Ma
7185
}
7286
}
7387
}
74-
set, err := q.q.SelectProm(name, functions, params.Step, filter, noAggr)
88+
set, err := promQuery.v3ioQuerier.SelectProm(name, functions, params.Step, filter, noAggr)
7589
return &V3ioPromSeriesSet{s: set}, err
7690
}
7791

7892
// LabelValues returns all potential values for a label name.
79-
func (q *V3ioPromQuerier) LabelValues(name string) ([]string, error) {
80-
return q.q.LabelValues(name)
93+
func (promQuery *V3ioPromQuerier) LabelValues(name string) ([]string, error) {
94+
return promQuery.v3ioQuerier.LabelValues(name)
8195
}
8296

8397
// Close releases the resources of the Querier.
84-
func (q *V3ioPromQuerier) Close() error {
98+
func (promQuery *V3ioPromQuerier) Close() error {
8599
return nil
86100
}
87101

88-
func match2filter(oms []*labels.Matcher) (string, string, string) {
89-
filter := []string{}
102+
func match2filter(oms []*labels.Matcher, logger logger.Logger) (string, string, string) {
103+
var filter []string
90104
agg := ""
91105
name := ""
92106

93107
for _, matcher := range oms {
108+
logger.Debug("Matcher: %+v", matcher)
94109
if matcher.Name == aggregate.AggregateLabel {
95110
agg = matcher.Value
96111
} else if matcher.Name == "__name__" && matcher.Type == labels.MatchEqual {
@@ -109,7 +124,8 @@ func match2filter(oms []*labels.Matcher) (string, string, string) {
109124
}
110125
}
111126
}
112-
return name, strings.Join(filter, " and "), agg
127+
filterExp := strings.Join(filter, " and ")
128+
return name, filterExp, agg
113129
}
114130

115131
type V3ioPromSeriesSet struct {

promtsdb/promtsdb_test.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,22 @@ such restriction.
2323
package promtsdb
2424

2525
import (
26+
"github.com/nuclio/logger"
2627
"github.com/prometheus/prometheus/pkg/labels"
2728
"github.com/stretchr/testify/suite"
29+
"github.com/v3io/v3io-tsdb/pkg/config"
30+
"github.com/v3io/v3io-tsdb/pkg/utils"
2831
"testing"
2932
)
3033

3134
type testPromTsdbSuite struct {
3235
suite.Suite
36+
logger logger.Logger
3337
}
3438

3539
func (suite *testPromTsdbSuite) TestMatch2filterEmpty() {
3640

37-
name, filter, aggr := match2filter(nil)
41+
name, filter, aggr := match2filter(nil, suite.logger)
3842

3943
suite.Require().Equal("", name)
4044
suite.Require().Equal("", filter)
@@ -46,7 +50,7 @@ func (suite *testPromTsdbSuite) TestMatch2filterEqual() {
4650
matchers := []*labels.Matcher{
4751
{Type: labels.MatchEqual, Name: "field", Value: "literal"},
4852
}
49-
name, filter, aggr := match2filter(matchers)
53+
name, filter, aggr := match2filter(matchers, suite.logger)
5054

5155
suite.Require().Equal("", name)
5256
suite.Require().Equal("field=='literal'", filter)
@@ -59,7 +63,7 @@ func (suite *testPromTsdbSuite) TestMatch2filterMultiple() {
5963
{Type: labels.MatchEqual, Name: "field1", Value: "literal1"},
6064
{Type: labels.MatchNotEqual, Name: "field2", Value: "literal2"},
6165
}
62-
name, filter, aggr := match2filter(matchers)
66+
name, filter, aggr := match2filter(matchers, suite.logger)
6367

6468
suite.Require().Equal("", name)
6569
suite.Require().Equal("field1=='literal1' and field2!='literal2'", filter)
@@ -72,7 +76,7 @@ func (suite *testPromTsdbSuite) TestMatch2filterMultipleWithName() {
7276
{Type: labels.MatchEqual, Name: "__name__", Value: "literal1"},
7377
{Type: labels.MatchNotEqual, Name: "field2", Value: "literal2"},
7478
}
75-
name, filter, aggr := match2filter(matchers)
79+
name, filter, aggr := match2filter(matchers, suite.logger)
7680

7781
suite.Require().Equal("literal1", name)
7882
suite.Require().Equal("field2!='literal2'", filter)
@@ -84,7 +88,7 @@ func (suite *testPromTsdbSuite) TestMatch2filterRegex() {
8488
matchers := []*labels.Matcher{
8589
{Type: labels.MatchRegexp, Name: "field", Value: ".*"},
8690
}
87-
name, filter, aggr := match2filter(matchers)
91+
name, filter, aggr := match2filter(matchers, suite.logger)
8892

8993
suite.Require().Equal("", name)
9094
suite.Require().Equal(`regexp_instr(field,'.*') == 0`, filter)
@@ -97,13 +101,20 @@ func (suite *testPromTsdbSuite) TestMatch2filterRegexMultiple() {
97101
{Type: labels.MatchRegexp, Name: "field1", Value: ".*"},
98102
{Type: labels.MatchNotRegexp, Name: "field2", Value: "..."},
99103
}
100-
name, filter, aggr := match2filter(matchers)
104+
name, filter, aggr := match2filter(matchers, suite.logger)
101105

102106
suite.Require().Equal("", name)
103107
suite.Require().Equal(`regexp_instr(field1,'.*') == 0 and regexp_instr(field2,'...') != 0`, filter)
104108
suite.Require().Equal("", aggr)
105109
}
106110

107111
func TestPromTsdbSuite(t *testing.T) {
108-
suite.Run(t, new(testPromTsdbSuite))
112+
log, err := utils.NewLogger(config.DefaultLogLevel)
113+
if err != nil {
114+
t.Fatalf("Unable to initialize logger. Error: %v", err)
115+
}
116+
117+
testSuit := new(testPromTsdbSuite)
118+
testSuit.logger = log
119+
suite.Run(t, testSuit)
109120
}

0 commit comments

Comments
 (0)