Skip to content

Commit 8e8f2a7

Browse files
authored
Merge pull request #184 from v3io/development
Development --> Master (0.0.14)
2 parents 53d3746 + 0e6a8b9 commit 8e8f2a7

File tree

7 files changed

+151
-36
lines changed

7 files changed

+151
-36
lines changed

pkg/querier/querier.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/v3io/v3io-tsdb/pkg/utils"
3232
"sort"
3333
"strings"
34+
"time"
3435
)
3536

3637
// Create a new Querier interface
@@ -86,16 +87,18 @@ func (q *V3ioQuerier) SelectOverlap(name, functions string, step int64, windows
8687
func (q *V3ioQuerier) selectQry(
8788
name, functions string, step int64, windows []int, filter string) (set SeriesSet, err error) {
8889

89-
set = nullSeriesSet{}
90-
91-
filter = strings.Replace(filter, "__name__", "_name", -1)
92-
q.logger.DebugWith("Select query", "metric", name, "func", functions, "step", step, "filter", filter, "disableAllAggr", q.disableAllAggr, "disableClientAggr", q.disableClientAggr, "window", windows)
9390
err = q.partitionMngr.ReadAndUpdateSchema()
94-
9591
if err != nil {
9692
return nullSeriesSet{}, errors.Wrap(err, "Failed to read/update the TSDB schema.")
9793
}
9894

95+
set = nullSeriesSet{}
96+
97+
q.logger.Debug("Select query:\n\tMetric: %s\n\tStart Time: %s (%d)\n\tEnd Time: %s (%d)\n\tFunction: %s\n\t"+
98+
"Step: %d\n\tFilter: %s\n\tWindows: %v\n\tDisable All Aggr: %t\n\tDisable Client Aggr: %t",
99+
name, time.Unix(q.mint/1000, 0).String(), q.mint, time.Unix(q.maxt/1000, 0).String(), q.maxt, functions, step,
100+
filter, windows, q.disableAllAggr, q.disableClientAggr)
101+
99102
q.performanceReporter.WithTimer("QueryTimer", func() {
100103
filter = strings.Replace(filter, "__name__", "_name", -1)
101104

pkg/querier/series.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,18 @@ func (it *v3ioSeriesIterator) Seek(t int64) bool {
147147
t = it.mint
148148
}
149149

150+
// Check the first element
151+
t0, _ := it.iter.At()
152+
if t0 > it.maxt {
153+
return false
154+
}
155+
if t <= t0 {
156+
return true
157+
}
158+
150159
for {
151160
if it.iter.Next() {
152-
t0, _ := it.At()
161+
t0, _ := it.iter.At()
153162
if t0 > it.maxt {
154163
return false
155164
}
@@ -251,7 +260,7 @@ type aggrSeriesIterator struct {
251260
// Advance an iterator to the specified time (t)
252261
func (s *aggrSeriesIterator) Seek(t int64) bool {
253262
if t <= s.set.baseTime {
254-
s.index = 0
263+
s.index = s.getNextValidCell(-1)
255264
return true
256265
}
257266

@@ -266,14 +275,16 @@ func (s *aggrSeriesIterator) Seek(t int64) bool {
266275
// Advance an iterator to the next time interval/bucket
267276
func (s *aggrSeriesIterator) Next() bool {
268277
// Advance the index to the next non-empty cell
269-
var nextIndex int
270-
for nextIndex = s.index + 1; nextIndex <= s.aggrSet.GetMaxCell() && !s.aggrSet.DoesCellHaveData(nextIndex); nextIndex++ {
271-
}
272-
273-
s.index = nextIndex
278+
s.index = s.getNextValidCell(s.index)
274279
return s.index <= s.aggrSet.GetMaxCell()
275280
}
276281

282+
func (s *aggrSeriesIterator) getNextValidCell(from int) (nextIndex int) {
283+
for nextIndex = from + 1; nextIndex <= s.aggrSet.GetMaxCell() && !s.aggrSet.DoesCellHaveData(nextIndex); nextIndex++ {
284+
}
285+
return
286+
}
287+
277288
// Return the time and value at the current bucket
278289
func (s *aggrSeriesIterator) At() (t int64, v float64) {
279290
val, _ := s.aggrSet.GetCellValue(s.aggrType, s.index)

pkg/tsdb/v3iotsdb.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,6 @@ func (a *V3ioAdapter) connect() error {
151151
return errors.Wrapf(err, "Failed to initialize the TSDB partition manager at: %s", fullpath)
152152
}
153153

154-
a.logger.Info("Starting the V3IO TSDB client for the TSDB instance at '%s'", fullpath)
155154
a.logger.Debug("Running with the following TSDB configuration: %+v\n", a.cfg)
156155

157156
return nil

pkg/tsdb/v3iotsdb_integration_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,81 @@ func testQueryDataOverlappingWindowCase(test *testing.T, v3ioConfig *config.V3io
471471
}
472472
}
473473

474+
// Calling Seek instead of next for the first time while iterating over data (TSDB-43)
475+
func TestIgnoreNaNWhenSeekingAggSeries(t *testing.T) {
476+
v3ioConfig, err := tsdbtest.LoadV3ioConfig()
477+
if err != nil {
478+
t.Fatalf("unable to load configuration. Error: %v", err)
479+
}
480+
metricsName := "cpu"
481+
baseTime := int64(1532940510000)
482+
userLabels := utils.LabelsFromStrings("os", "linux", "iguaz", "yesplease")
483+
data := []tsdbtest.DataPoint{{Time: baseTime, Value: 300.3},
484+
{Time: baseTime + minuteInMillis, Value: 300.3},
485+
{Time: baseTime + 2*minuteInMillis, Value: 100.4},
486+
{Time: baseTime + 5*minuteInMillis, Value: 200.0}}
487+
from := int64(baseTime - 60*minuteInMillis)
488+
to := int64(baseTime + 6*minuteInMillis)
489+
step := int64(2 * minuteInMillis)
490+
agg := "avg"
491+
expected := map[string][]tsdbtest.DataPoint{
492+
"avg": {{baseTime, 300.3},
493+
{baseTime + step, 100.4},
494+
{baseTime + 2*step, 200}}}
495+
496+
adapter, teardown := tsdbtest.SetUpWithData(t, v3ioConfig, metricsName, data, userLabels)
497+
defer teardown()
498+
499+
qry, err := adapter.Querier(nil, from, to)
500+
if err != nil {
501+
t.Fatalf("Failed to create Querier. reason: %v", err)
502+
}
503+
504+
set, err := qry.Select(metricsName, agg, step, "")
505+
if err != nil {
506+
t.Fatalf("Failed to run Select. reason: %v", err)
507+
}
508+
509+
var counter int
510+
for counter = 0; set.Next(); counter++ {
511+
if set.Err() != nil {
512+
t.Fatalf("Failed to query metric. reason: %v", set.Err())
513+
}
514+
515+
series := set.At()
516+
agg := series.Labels().Get(aggregate.AggregateLabel)
517+
iter := series.Iterator()
518+
if iter.Err() != nil {
519+
t.Fatalf("Failed to query data series. reason: %v", iter.Err())
520+
}
521+
if !iter.Seek(0) {
522+
t.Fatal("Seek time returned false, iterator error:", iter.Err())
523+
}
524+
var actual []tsdbtest.DataPoint
525+
t0, v0 := iter.At()
526+
if iter.Err() != nil {
527+
t.Fatal("error iterating over series", iter.Err())
528+
}
529+
actual = append(actual, tsdbtest.DataPoint{Time: t0, Value: v0})
530+
for iter.Next() {
531+
t1, v1 := iter.At()
532+
533+
if iter.Err() != nil {
534+
t.Fatal("error iterating over series", iter.Err())
535+
}
536+
actual = append(actual, tsdbtest.DataPoint{Time: t1, Value: v1})
537+
}
538+
assert.ElementsMatch(t, expected[agg], actual)
539+
}
540+
541+
if set.Err() != nil {
542+
t.Fatalf("Failed to query metric. reason: %v", set.Err())
543+
}
544+
if counter == 0 && len(expected) > 0 {
545+
t.Fatalf("No data was received")
546+
}
547+
}
548+
474549
func TestCreateTSDB(t *testing.T) {
475550
v3ioConfig, err := tsdbtest.LoadV3ioConfig()
476551
if err != nil {

pkg/tsdbctl/info.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ func newInfoCommandeer(rootCommandeer *RootCommandeer) *infoCommandeer {
5353
}
5454

5555
cmd.Flags().BoolVarP(&commandeer.getNames, "names", "n", false,
56-
"Display the names of the metrics contained in the TSDB.")
56+
"Display the metric names in the TSDB.")
5757
cmd.Flags().BoolVarP(&commandeer.getCount, "performance", "m", false,
58-
"Display a count of the number of metric objects contained in the TSDB.")
58+
"Display the number of metric items in the TSDB.")
5959

6060
commandeer.cmd = cmd
6161

promtsdb/promtsdb.go

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"github.com/nuclio/logger"
7+
"github.com/pkg/errors"
78
"github.com/prometheus/prometheus/pkg/labels"
89
"github.com/prometheus/prometheus/storage"
910
"github.com/v3io/v3io-go-http"
@@ -12,17 +13,27 @@ import (
1213
"github.com/v3io/v3io-tsdb/pkg/config"
1314
"github.com/v3io/v3io-tsdb/pkg/querier"
1415
"github.com/v3io/v3io-tsdb/pkg/tsdb"
16+
"github.com/v3io/v3io-tsdb/pkg/utils"
1517
"strings"
1618
)
1719

1820
type V3ioPromAdapter struct {
19-
db *tsdb.V3ioAdapter
21+
db *tsdb.V3ioAdapter
22+
logger logger.Logger
2023
}
2124

2225
func NewV3ioProm(cfg *config.V3ioConfig, container *v3io.Container, logger logger.Logger) (*V3ioPromAdapter, error) {
2326

27+
if logger == nil {
28+
newLogger, err := utils.NewLogger(cfg.LogLevel)
29+
if err != nil {
30+
return nil, errors.Wrap(err, "Unable to initialize logger.")
31+
}
32+
logger = newLogger
33+
}
34+
2435
adapter, err := tsdb.NewV3ioAdapter(cfg, container, logger)
25-
newAdapter := V3ioPromAdapter{db: adapter}
36+
newAdapter := V3ioPromAdapter{db: adapter, logger: logger.GetChild("v3io-prom-adapter")}
2637
return &newAdapter, err
2738
}
2839

@@ -46,19 +57,22 @@ func (a *V3ioPromAdapter) Close() error {
4657

4758
func (a *V3ioPromAdapter) Querier(_ context.Context, mint, maxt int64) (storage.Querier, error) {
4859
v3ioQuerier, err := a.db.Querier(nil, mint, maxt)
49-
querier := V3ioPromQuerier{q: v3ioQuerier}
50-
return &querier, err
60+
promQuerier := V3ioPromQuerier{v3ioQuerier: v3ioQuerier, logger: a.logger.GetChild("v3io-prom-query")}
61+
return &promQuerier, err
5162
}
5263

5364
type V3ioPromQuerier struct {
54-
q *querier.V3ioQuerier
65+
v3ioQuerier *querier.V3ioQuerier
66+
logger logger.Logger
5567
}
5668

5769
// Select returns a set of series that matches the given label matchers.
58-
func (q *V3ioPromQuerier) Select(params *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, error) {
59-
name, filter, functions := match2filter(oms)
70+
func (promQuery *V3ioPromQuerier) Select(params *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, error) {
71+
name, filter, functions := match2filter(oms, promQuery.logger)
6072
noAggr := false
6173

74+
promQuery.logger.Debug("SelectParams: %+v", params)
75+
6276
if params.Func != "" {
6377
// only pass xx_over_time functions (just the xx part)
6478
// TODO: support count/stdxx, require changes in Prometheus: promql/functions.go, not calc aggregate twice
@@ -71,26 +85,27 @@ func (q *V3ioPromQuerier) Select(params *storage.SelectParams, oms ...*labels.Ma
7185
}
7286
}
7387
}
74-
set, err := q.q.SelectProm(name, functions, params.Step, filter, noAggr)
88+
set, err := promQuery.v3ioQuerier.SelectProm(name, functions, params.Step, filter, noAggr)
7589
return &V3ioPromSeriesSet{s: set}, err
7690
}
7791

7892
// LabelValues returns all potential values for a label name.
79-
func (q *V3ioPromQuerier) LabelValues(name string) ([]string, error) {
80-
return q.q.LabelValues(name)
93+
func (promQuery *V3ioPromQuerier) LabelValues(name string) ([]string, error) {
94+
return promQuery.v3ioQuerier.LabelValues(name)
8195
}
8296

8397
// Close releases the resources of the Querier.
84-
func (q *V3ioPromQuerier) Close() error {
98+
func (promQuery *V3ioPromQuerier) Close() error {
8599
return nil
86100
}
87101

88-
func match2filter(oms []*labels.Matcher) (string, string, string) {
89-
filter := []string{}
102+
func match2filter(oms []*labels.Matcher, logger logger.Logger) (string, string, string) {
103+
var filter []string
90104
agg := ""
91105
name := ""
92106

93107
for _, matcher := range oms {
108+
logger.Debug("Matcher: %+v", matcher)
94109
if matcher.Name == aggregate.AggregateLabel {
95110
agg = matcher.Value
96111
} else if matcher.Name == "__name__" && matcher.Type == labels.MatchEqual {
@@ -109,7 +124,8 @@ func match2filter(oms []*labels.Matcher) (string, string, string) {
109124
}
110125
}
111126
}
112-
return name, strings.Join(filter, " and "), agg
127+
filterExp := strings.Join(filter, " and ")
128+
return name, filterExp, agg
113129
}
114130

115131
type V3ioPromSeriesSet struct {

promtsdb/promtsdb_test.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,22 @@ such restriction.
2323
package promtsdb
2424

2525
import (
26+
"github.com/nuclio/logger"
2627
"github.com/prometheus/prometheus/pkg/labels"
2728
"github.com/stretchr/testify/suite"
29+
"github.com/v3io/v3io-tsdb/pkg/config"
30+
"github.com/v3io/v3io-tsdb/pkg/utils"
2831
"testing"
2932
)
3033

3134
type testPromTsdbSuite struct {
3235
suite.Suite
36+
logger logger.Logger
3337
}
3438

3539
func (suite *testPromTsdbSuite) TestMatch2filterEmpty() {
3640

37-
name, filter, aggr := match2filter(nil)
41+
name, filter, aggr := match2filter(nil, suite.logger)
3842

3943
suite.Require().Equal("", name)
4044
suite.Require().Equal("", filter)
@@ -46,7 +50,7 @@ func (suite *testPromTsdbSuite) TestMatch2filterEqual() {
4650
matchers := []*labels.Matcher{
4751
{Type: labels.MatchEqual, Name: "field", Value: "literal"},
4852
}
49-
name, filter, aggr := match2filter(matchers)
53+
name, filter, aggr := match2filter(matchers, suite.logger)
5054

5155
suite.Require().Equal("", name)
5256
suite.Require().Equal("field=='literal'", filter)
@@ -59,7 +63,7 @@ func (suite *testPromTsdbSuite) TestMatch2filterMultiple() {
5963
{Type: labels.MatchEqual, Name: "field1", Value: "literal1"},
6064
{Type: labels.MatchNotEqual, Name: "field2", Value: "literal2"},
6165
}
62-
name, filter, aggr := match2filter(matchers)
66+
name, filter, aggr := match2filter(matchers, suite.logger)
6367

6468
suite.Require().Equal("", name)
6569
suite.Require().Equal("field1=='literal1' and field2!='literal2'", filter)
@@ -72,7 +76,7 @@ func (suite *testPromTsdbSuite) TestMatch2filterMultipleWithName() {
7276
{Type: labels.MatchEqual, Name: "__name__", Value: "literal1"},
7377
{Type: labels.MatchNotEqual, Name: "field2", Value: "literal2"},
7478
}
75-
name, filter, aggr := match2filter(matchers)
79+
name, filter, aggr := match2filter(matchers, suite.logger)
7680

7781
suite.Require().Equal("literal1", name)
7882
suite.Require().Equal("field2!='literal2'", filter)
@@ -84,7 +88,7 @@ func (suite *testPromTsdbSuite) TestMatch2filterRegex() {
8488
matchers := []*labels.Matcher{
8589
{Type: labels.MatchRegexp, Name: "field", Value: ".*"},
8690
}
87-
name, filter, aggr := match2filter(matchers)
91+
name, filter, aggr := match2filter(matchers, suite.logger)
8892

8993
suite.Require().Equal("", name)
9094
suite.Require().Equal(`regexp_instr(field,'.*') == 0`, filter)
@@ -97,13 +101,20 @@ func (suite *testPromTsdbSuite) TestMatch2filterRegexMultiple() {
97101
{Type: labels.MatchRegexp, Name: "field1", Value: ".*"},
98102
{Type: labels.MatchNotRegexp, Name: "field2", Value: "..."},
99103
}
100-
name, filter, aggr := match2filter(matchers)
104+
name, filter, aggr := match2filter(matchers, suite.logger)
101105

102106
suite.Require().Equal("", name)
103107
suite.Require().Equal(`regexp_instr(field1,'.*') == 0 and regexp_instr(field2,'...') != 0`, filter)
104108
suite.Require().Equal("", aggr)
105109
}
106110

107111
func TestPromTsdbSuite(t *testing.T) {
108-
suite.Run(t, new(testPromTsdbSuite))
112+
log, err := utils.NewLogger(config.DefaultLogLevel)
113+
if err != nil {
114+
t.Fatalf("Unable to initialize logger. Error: %v", err)
115+
}
116+
117+
testSuit := new(testPromTsdbSuite)
118+
testSuit.logger = log
119+
suite.Run(t, testSuit)
109120
}

0 commit comments

Comments
 (0)