Skip to content
This repository was archived by the owner on Aug 18, 2025. It is now read-only.

Commit 7a632e7

Browse files
dinalDina NimroditalIguazorkiguazioGal Topper
authored
Development (#70)
* IG 15394 - use new tsdb metric cache (#67) * new tsdb addFast * new AddFast Co-authored-by: Dina Nimrodi <dinan@iguazio.com> * new tsdb 0.11.2 (#68) Co-authored-by: Dina Nimrodi <dinan@iguazio.com> * fix step parsing in subQueries + fix inferring whether v3io needs to run aggregations (#66) * validate development unstable * Fix go.mod. go mod vendor. (#72) Co-authored-by: Gal Topper <galt@iguazio.com> Co-authored-by: Dina Nimrodi <dinan@iguazio.com> Co-authored-by: Tal Neiman <33829179+talIguaz@users.noreply.github.com> Co-authored-by: ork <ork@iguazio.com> Co-authored-by: Gal Topper <galt@iguaz.io> Co-authored-by: Gal Topper <galt@iguazio.com>
1 parent 8a3b020 commit 7a632e7

File tree

29 files changed

+982
-330
lines changed

29 files changed

+982
-330
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,3 +121,4 @@ Apache License 2.0, see [LICENSE](https://github.com/prometheus/prometheus/blob/
121121
[hub]: https://hub.docker.com/r/prom/prometheus/
122122
[circleci]: https://circleci.com/gh/prometheus/prometheus
123123
[quay]: https://quay.io/repository/prometheus/prometheus
124+

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ require (
4040
github.com/shurcooL/vfsgen v0.0.0-20181202132449-6a9ea43bcacd
4141
github.com/soheilhy/cmux v0.1.4
4242
github.com/stretchr/testify v1.4.0
43-
github.com/v3io/v3io-tsdb v0.10.12
43+
github.com/v3io/v3io-tsdb v0.11.2
4444
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80
4545
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
4646
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a
@@ -61,8 +61,8 @@ require (
6161

6262
replace (
6363
github.com/golang/lint => golang.org/x/lint v0.0.0-20190409202823-959b441ac422
64-
github.com/v3io/frames => github.com/v3io/frames v0.7.36
65-
github.com/v3io/v3io-tsdb => github.com/v3io/v3io-tsdb v0.10.12
64+
github.com/v3io/frames => github.com/v3io/frames v0.8.1
65+
github.com/v3io/v3io-tsdb => github.com/v3io/v3io-tsdb v0.11.2
6666
google.golang.org/grpc => google.golang.org/grpc v1.19.1
6767
k8s.io/klog => github.com/simonpasquier/klog-gokit v0.1.0
6868
)

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -388,12 +388,12 @@ github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
388388
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
389389
github.com/tinylib/msgp v1.1.1 h1:TnCZ3FIuKeaIy+F45+Cnp+caqdXGy4z74HvwXN+570Y=
390390
github.com/tinylib/msgp v1.1.1/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE=
391-
github.com/v3io/frames v0.7.36 h1:vWLJWbjvKMgWbCNrMj79wfKStcmvuAkh2UyfNGPIRfs=
392-
github.com/v3io/frames v0.7.36/go.mod h1:qUW5rLv+DK4pC0t52685E9HxqDBg3VFgourbbYfRmaI=
391+
github.com/v3io/frames v0.8.1 h1:wUWCX6RUEQ05SG3P9VtMBYh7gWrYNOiA81HcWXRXGZw=
392+
github.com/v3io/frames v0.8.1/go.mod h1:qQnXBGO3uLKwN9HVcQ2ggHlIs8xT+200uJaWUgDocpk=
393393
github.com/v3io/v3io-go v0.1.9 h1:etkrrRmrI++i8sxGfK/+13f41TxIMohYeZHwVUM62vw=
394394
github.com/v3io/v3io-go v0.1.9/go.mod h1:5poBlcjZG5TiexRTYI44PE6tHzZz5Z60w+iS899pWtc=
395-
github.com/v3io/v3io-tsdb v0.10.12 h1:+AfPHFJVxzpKtng0Hiz256ef9abzIwKwPuzB+0jG7w8=
396-
github.com/v3io/v3io-tsdb v0.10.12/go.mod h1:kp586KxTfROIGwb/nzNxwDbX2Wterxro+HbiZHmK548=
395+
github.com/v3io/v3io-tsdb v0.11.2 h1:knT+IwB+Bkd1aRPTIKltBnYrue1z+2EXmESJLkCexe8=
396+
github.com/v3io/v3io-tsdb v0.11.2/go.mod h1:l+WxedjLmjy/TThj2vhSW/OnpRw8C7dOntIXHmM/a7I=
397397
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
398398
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
399399
github.com/valyala/fasthttp v1.2.0 h1:dzZJf2IuMiclVjdw0kkT+f9u4YdrapbNyGAN47E/qnk=

promql/engine.go

Lines changed: 61 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -464,14 +464,14 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
464464
if s.Start == s.End && s.Interval == 0 {
465465
start := timeMilliseconds(s.Start)
466466
evaluator := &evaluator{
467-
startTimestamp: start,
468-
endTimestamp: start,
469-
interval: 1,
470-
ctx: ctxInnerEval,
471-
maxSamples: ng.maxSamplesPerQuery,
472-
defaultEvalInterval: GetDefaultEvaluationInterval(),
473-
logger: ng.logger,
474-
useV3ioAggregations: querier.(*tsdb.V3ioPromQuerier).UseV3ioAggregations(),
467+
startTimestamp: start,
468+
endTimestamp: start,
469+
interval: 1,
470+
ctx: ctxInnerEval,
471+
maxSamples: ng.maxSamplesPerQuery,
472+
defaultEvalInterval: GetDefaultEvaluationInterval(),
473+
logger: ng.logger,
474+
isAlreadyV3IOAggregated: querier.(*tsdb.V3ioPromQuerier).IsAlreadyAggregated,
475475
}
476476
val, err := evaluator.Eval(s.Expr)
477477
if err != nil {
@@ -507,14 +507,14 @@ func (ng *Engine) execEvalStmt(ctx context.Context, query *query, s *EvalStmt) (
507507

508508
// Range evaluation.
509509
evaluator := &evaluator{
510-
startTimestamp: timeMilliseconds(s.Start),
511-
endTimestamp: timeMilliseconds(s.End),
512-
interval: durationMilliseconds(s.Interval),
513-
ctx: ctxInnerEval,
514-
maxSamples: ng.maxSamplesPerQuery,
515-
defaultEvalInterval: GetDefaultEvaluationInterval(),
516-
logger: ng.logger,
517-
useV3ioAggregations: querier.(*tsdb.V3ioPromQuerier).UseV3ioAggregations(),
510+
startTimestamp: timeMilliseconds(s.Start),
511+
endTimestamp: timeMilliseconds(s.End),
512+
interval: durationMilliseconds(s.Interval),
513+
ctx: ctxInnerEval,
514+
maxSamples: ng.maxSamplesPerQuery,
515+
defaultEvalInterval: GetDefaultEvaluationInterval(),
516+
logger: ng.logger,
517+
isAlreadyV3IOAggregated: querier.(*tsdb.V3ioPromQuerier).IsAlreadyAggregated,
518518
}
519519
val, err := evaluator.Eval(s.Expr)
520520
if err != nil {
@@ -556,6 +556,7 @@ func (ng *Engine) cumulativeSubqueryOffset(path []Node) time.Duration {
556556
func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *EvalStmt) (storage.Querier, storage.Warnings, error) {
557557
var maxOffset time.Duration
558558
var aggregationWindow int64
559+
var subQueryStep int64
559560

560561
Inspect(s.Expr, func(node Node, path []Node) error {
561562
subqOffset := ng.cumulativeSubqueryOffset(path)
@@ -568,13 +569,17 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
568569
maxOffset = n.Offset + LookbackDelta + subqOffset
569570
}
570571
case *MatrixSelector:
571-
aggregationWindow = n.Range.Nanoseconds() / 1000000
572+
aggregationWindow = durationToInt64Millis(n.Range)
572573
if maxOffset < n.Range+subqOffset {
573574
maxOffset = n.Range + subqOffset
574575
}
575576
if n.Offset+n.Range+subqOffset > maxOffset {
576577
maxOffset = n.Offset + n.Range + subqOffset
577578
}
579+
case *SubqueryExpr:
580+
// Save the step if it is provided in a subquery rather than as an interval
581+
// Example query: `sum(metric)[1h:10m]` -> step=10m
582+
subQueryStep = durationToInt64Millis(n.Step)
578583
}
579584
return nil
580585
})
@@ -591,10 +596,16 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
591596
Inspect(s.Expr, func(node Node, path []Node) error {
592597
var set storage.SeriesSet
593598
var wrn storage.Warnings
599+
600+
// Get the Step from the sub query, in case the interval is not set.
601+
step := durationToInt64Millis(s.Interval)
602+
if step == 0 {
603+
step = subQueryStep
604+
}
594605
params := &storage.SelectParams{
595606
Start: timestamp.FromTime(s.Start),
596607
End: timestamp.FromTime(s.End),
597-
Step: durationToInt64Millis(s.Interval),
608+
Step: step,
598609
}
599610

600611
// We need to make sure we select the timerange selected by the subquery.
@@ -607,6 +618,9 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
607618

608619
switch n := node.(type) {
609620
case *VectorSelector:
621+
// Validate if the current query can be aggregated via v3io
622+
querier.(*tsdb.V3ioPromQuerier).UseAggregates = isV3ioEligibleQueryExpr(path)
623+
610624
params.Start = params.Start - durationMilliseconds(LookbackDelta)
611625
params.Func = extractFuncFromPath(path)
612626
params.By, params.Grouping = extractGroupsFromPath(path)
@@ -629,6 +643,9 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
629643
n.unexpandedSeriesSet = set
630644

631645
case *MatrixSelector:
646+
// Validate if the current query can be aggregated via v3io
647+
querier.(*tsdb.V3ioPromQuerier).UseAggregates = isV3ioEligibleQueryExpr(path)
648+
632649
params.Func = extractFuncFromPath(path)
633650
params.Range = durationMilliseconds(n.Range)
634651
// For all matrix queries we want to ensure that we have (end-start) + range selected
@@ -738,7 +755,7 @@ type evaluator struct {
738755
defaultEvalInterval int64
739756
logger log.Logger
740757

741-
useV3ioAggregations bool // Indicates whether v3io tsdb already queried and aggregated the data, or just returned raw data
758+
isAlreadyV3IOAggregated func(op string) bool // Indicates whether v3io tsdb already queried and aggregated the data, or just returned raw data
742759
}
743760

744761
// errorf causes a panic with the input formatted into an error.
@@ -980,7 +997,7 @@ func (ev *evaluator) eval(expr Expr) Value {
980997

981998
switch e := expr.(type) {
982999
case *AggregateExpr:
983-
if ev.useV3ioAggregations {
1000+
if ev.isAlreadyV3IOAggregated(e.Op.String()) {
9841001
return ev.emptyAggregation(e.Expr)
9851002
}
9861003
if s, ok := e.Param.(*StringLiteral); ok {
@@ -1038,9 +1055,10 @@ func (ev *evaluator) eval(expr Expr) Value {
10381055
// Evaluate any non-matrix arguments.
10391056
otherArgs := make([]Matrix, len(e.Args))
10401057
otherInArgs := make([]Vector, len(e.Args))
1058+
function := e.Func.Name
10411059
for i, e := range e.Args {
10421060
if i != matrixArgIndex {
1043-
if ev.useV3ioAggregations {
1061+
if ev.isAlreadyV3IOAggregated(function) {
10441062
return ev.emptyAggregation(e)
10451063
}
10461064

@@ -1226,14 +1244,14 @@ func (ev *evaluator) eval(expr Expr) Value {
12261244
offsetMillis := durationToInt64Millis(e.Offset)
12271245
rangeMillis := durationToInt64Millis(e.Range)
12281246
newEv := &evaluator{
1229-
endTimestamp: ev.endTimestamp - offsetMillis,
1230-
interval: ev.defaultEvalInterval,
1231-
ctx: ev.ctx,
1232-
currentSamples: ev.currentSamples,
1233-
maxSamples: ev.maxSamples,
1234-
defaultEvalInterval: ev.defaultEvalInterval,
1235-
logger: ev.logger,
1236-
useV3ioAggregations: ev.useV3ioAggregations,
1247+
endTimestamp: ev.endTimestamp - offsetMillis,
1248+
interval: ev.defaultEvalInterval,
1249+
ctx: ev.ctx,
1250+
currentSamples: ev.currentSamples,
1251+
maxSamples: ev.maxSamples,
1252+
defaultEvalInterval: ev.defaultEvalInterval,
1253+
logger: ev.logger,
1254+
isAlreadyV3IOAggregated: ev.isAlreadyV3IOAggregated,
12371255
}
12381256

12391257
if e.Step != 0 {
@@ -1996,28 +2014,34 @@ func isV3ioEligibleFunction(function string) bool {
19962014
return supportedV3ioFunctions[function]
19972015
}
19982016

1999-
func isV3ioEligibleQueryExpr(e Expr) bool {
2000-
switch expr := e.(type) {
2017+
func isV3ioEligibleQueryExpr(p []Node) bool {
2018+
if len(p) == 0 {
2019+
return true
2020+
}
2021+
switch n := p[len(p)-1].(type) {
20012022
case *AggregateExpr:
2002-
if !isV3ioEligibleAggregation(expr.Op) {
2023+
if !isV3ioEligibleAggregation(n.Op) {
20032024
return false
20042025
}
2005-
if expr.Without {
2026+
if n.Without {
20062027
return false
20072028
}
20082029
// Currently only supports non-nested functions.
20092030
// Not supported - avg(max_over_time(cpu[10m])), Supported - avg(cpu)
2010-
if e, ok := expr.Expr.(*Call); ok {
2031+
if e, ok := n.Expr.(*Call); ok {
20112032
if e.Func != nil {
20122033
return false
20132034
}
20142035
}
20152036
return true
20162037
case *Call:
2017-
return isV3ioEligibleFunction(expr.Func.Name)
2038+
return isV3ioEligibleFunction(n.Func.Name)
2039+
case *BinaryExpr:
2040+
// If we hit a binary expression we terminate since we only care about functions
2041+
// or aggregations over a single metric.
2042+
return false
20182043
}
2019-
2020-
return false
2044+
return isV3ioEligibleQueryExpr(p[:len(p)-1])
20212045
}
20222046

20232047
// btos returns 1 if b is true, 0 otherwise.

storage/tsdb/promtsdb.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,22 @@ type V3ioPromQuerier struct {
7575
logger logger.Logger
7676
mint, maxt int64
7777

78-
UseAggregatesConfig bool // Indicate whether or not to use v3io aggregations by default (passed from prometheus.yml)
79-
UseAggregates bool // Indicate whether the current query is eligible for using v3io aggregations (should be set after creating a Querier instance)
78+
UseAggregatesConfig bool // Indicate whether or not to use v3io aggregations by default (passed from prometheus.yml)
79+
UseAggregates bool // Indicate whether the current query is eligible for using v3io aggregations (should be set after creating a Querier instance)
80+
LastTSDBAggregatedAggr string
8081
}
8182

8283
func (promQuery *V3ioPromQuerier) UseV3ioAggregations() bool {
8384
return promQuery.UseAggregates && promQuery.UseAggregatesConfig
8485
}
8586

87+
func (promQuery *V3ioPromQuerier) IsAlreadyAggregated(op string) bool {
88+
if promQuery.UseV3ioAggregations() && promQuery.LastTSDBAggregatedAggr == op {
89+
return true
90+
}
91+
return false
92+
}
93+
8694
// Select returns a set of series that matches the given label matchers.
8795
func (promQuery *V3ioPromQuerier) Select(params *storage.SelectParams, oms ...*labels.Matcher) (storage.SeriesSet, storage.Warnings, error) {
8896
name, filter, function := match2filter(oms, promQuery.logger)
@@ -101,15 +109,23 @@ func (promQuery *V3ioPromQuerier) Select(params *storage.SelectParams, oms ...*l
101109
promQuery.logger.Debug("SelectParams: %+v", params)
102110
overTimeSuffix := "_over_time"
103111

112+
// Currently we do aggregations only for:
113+
// 1. All Cross-series aggregation
114+
// 2. Over-time aggregations where no Step or aggregationWindow was specified
115+
// 3. Over-time aggregation for v3io-tsdb compatible aggregates
116+
// 4. Down sampling - when only a step is provided
117+
// Note: in addition to the above cases, we also take into consider the `UseAggregatesConfig` configuration and of
118+
// course whether or not the requested aggregation is a valid v3io-tsdb aggregation
104119
if params.Func != "" {
105120
// only pass xx_over_time functions (just the xx part)
106121
// TODO: support count/stdxx, require changes in Prometheus: promql/functions.go, not calc aggregate twice
107122
if strings.HasSuffix(params.Func, overTimeSuffix) {
108-
if promQuery.UseAggregates && promQuery.UseAggregatesConfig {
123+
if promQuery.UseV3ioAggregations() {
109124
function = strings.TrimSuffix(params.Func, overTimeSuffix)
110125
} else {
111126
f := params.Func[0:3]
112-
if params.Step == 0 && (f == "min" || f == "max" || f == "sum" || f == "avg") {
127+
if params.Step == 0 && params.AggregationWindow == 0 &&
128+
(f == "min" || f == "max" || f == "sum" || f == "avg") {
113129
function = f
114130
} else {
115131
noAggr = true
@@ -120,6 +136,10 @@ func (promQuery *V3ioPromQuerier) Select(params *storage.SelectParams, oms ...*l
120136
}
121137
}
122138

139+
if function != "" && !noAggr {
140+
promQuery.LastTSDBAggregatedAggr = params.Func
141+
}
142+
123143
selectParams := &pquerier.SelectParams{Name: name,
124144
Functions: function,
125145
Step: params.Step,
@@ -128,6 +148,8 @@ func (promQuery *V3ioPromQuerier) Select(params *storage.SelectParams, oms ...*l
128148
To: promQuery.maxt,
129149
AggregationWindow: params.AggregationWindow}
130150

151+
promQuery.logger.DebugWith("Going to query tsdb", "params", selectParams,
152+
"UseAggregates", promQuery.UseAggregates, "UseAggregatesConfig", promQuery.UseAggregatesConfig)
131153
set, err := promQuery.v3ioQuerier.SelectProm(selectParams, noAggr)
132154
return &V3ioPromSeriesSet{s: set}, nil, err
133155
}
@@ -276,6 +298,11 @@ func (ls Labels) GetKey() (string, string, uint64) {
276298

277299
}
278300

301+
302+
func (ls Labels) HashWithName() uint64 {
303+
return ls.lbls.Hash()
304+
}
305+
279306
// create update expression
280307
func (ls Labels) GetExpr() string {
281308
var lblExprBuilder strings.Builder

0 commit comments

Comments
 (0)