6060 // DefaultEvaluationInterval is the default evaluation interval of
6161 // a subquery in milliseconds.
6262 DefaultEvaluationInterval int64
63+
64+ supportedV3ioFunctions = map [string ]bool {"max_over_time" : true ,
65+ "min_over_time" : true ,
66+ "avg_over_time" : true ,
67+ "sum_over_time" : true ,
68+ "count_over_time" : true ,
69+ "stddev_over_time" : true ,
70+ "stdvar_over_time" : true }
71+ supportedV3ioAggregations = map [ItemType ]bool {itemAvg : true ,
72+ itemCount : true ,
73+ itemSum : true ,
74+ itemMin : true ,
75+ itemMax : true ,
76+ itemStddev : true ,
77+ itemStdvar : true }
6378)
6479
6580// SetDefaultEvaluationInterval sets DefaultEvaluationInterval.
@@ -518,6 +533,8 @@ func (ng *Engine) cumulativeSubqueryOffset(path []Node) time.Duration {
518533
519534func (ng * Engine ) populateSeries (ctx context.Context , q storage.Queryable , s * EvalStmt ) (storage.Querier , storage.Warnings , error ) {
520535 var maxOffset time.Duration
536+ var aggregationWindow int64
537+
521538 Inspect (s .Expr , func (node Node , path []Node ) error {
522539 subqOffset := ng .cumulativeSubqueryOffset (path )
523540 switch n := node .(type ) {
@@ -529,6 +546,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
529546 maxOffset = n .Offset + LookbackDelta + subqOffset
530547 }
531548 case * MatrixSelector :
549+ aggregationWindow = n .Range .Nanoseconds () / 1000000
532550 if maxOffset < n .Range + subqOffset {
533551 maxOffset = n .Range + subqOffset
534552 }
@@ -556,21 +574,19 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
556574 Step : durationToInt64Millis (s .Interval ),
557575 }
558576
577+ querier .(* tsdb.V3ioPromQuerier ).UseAggregates = isV3ioEligibleQueryExpr (s .Expr )
578+
559579 switch n := node .(type ) {
560580 case * VectorSelector :
561581 params .Start = params .Start - durationMilliseconds (LookbackDelta )
562582 params .Func = extractFuncFromPath (path )
583+ params .AggregationWindow = aggregationWindow
563584 if n .Offset > 0 {
564585 offsetMilliseconds := durationMilliseconds (n .Offset )
565586 params .Start = params .Start - offsetMilliseconds
566587 params .End = params .End - offsetMilliseconds
567588 }
568589
569- switch e := s .Expr .(type ) {
570- case * AggregateExpr :
571- querier .(* tsdb.V3ioPromQuerier ).UseAggregates = isV3ioEligibleQueryExpr (e )
572- }
573-
574590 level .Debug (ng .logger ).Log ("msg" , "Querying v3io vector selector" ,
575591 "useV3ioAggregations" , querier .(* tsdb.V3ioPromQuerier ).UseAggregates ,
576592 "use3ioAggregationConfig" , querier .(* tsdb.V3ioPromQuerier ).UseAggregatesConfig )
@@ -587,6 +603,7 @@ func (ng *Engine) populateSeries(ctx context.Context, q storage.Queryable, s *Ev
587603 // For all matrix queries we want to ensure that we have (end-start) + range selected
588604 // this way we have `range` data before the start time
589605 params .Start = params .Start - durationMilliseconds (n .Range )
606+ params .AggregationWindow = aggregationWindow
590607 if n .Offset > 0 {
591608 offsetMilliseconds := durationMilliseconds (n .Offset )
592609 params .Start = params .Start - offsetMilliseconds
@@ -977,6 +994,10 @@ func (ev *evaluator) eval(expr Expr) Value {
977994 otherInArgs := make ([]Vector , len (e .Args ))
978995 for i , e := range e .Args {
979996 if i != matrixArgIndex {
997+ if ev .useV3ioAggregations {
998+ return ev .emptyAggregation (e )
999+ }
1000+
9801001 otherArgs [i ] = ev .eval (e ).(Matrix )
9811002 otherInArgs [i ] = Vector {Sample {}}
9821003 inArgs [i ] = otherInArgs [i ]
@@ -1906,33 +1927,34 @@ func (ev *evaluator) emptyAggregation(e Expr) Matrix {
19061927}
19071928
19081929func isV3ioEligibleAggregation (op ItemType ) bool {
1909- supportedV3ioAggregations := []ItemType {itemAvg , itemCount , itemSum , itemMin , itemMax , itemStddev , itemStdvar }
1910- return containsItemType (op , supportedV3ioAggregations )
1930+ return supportedV3ioAggregations [op ]
19111931}
19121932
1913- func isV3ioEligibleQueryExpr (e * AggregateExpr ) bool {
1914- if ! isV3ioEligibleAggregation (e .Op ) {
1915- return false
1916- }
1917- if e .Without {
1918- return false
1919- }
1920- // Currently only supports non-nested functions.
1921- // Not supported - avg(max_over_time(cpu[10m])), Supported - avg(cpu)
1922- if e , ok := e .Expr .(* Call ); ok {
1923- if e .Func != nil {
1924- return false
1925- }
1926- }
1927- return true
1933+ func isV3ioEligibleFunction (function string ) bool {
1934+ return supportedV3ioFunctions [function ]
19281935}
19291936
1930- func containsItemType (item ItemType , slice []ItemType ) bool {
1931- for _ , curr := range slice {
1932- if curr == item {
1933- return true
1937+ func isV3ioEligibleQueryExpr (e Expr ) bool {
1938+ switch expr := e .(type ) {
1939+ case * AggregateExpr :
1940+ if ! isV3ioEligibleAggregation (expr .Op ) {
1941+ return false
1942+ }
1943+ if expr .Without {
1944+ return false
19341945 }
1946+ // Currently only supports non-nested functions.
1947+ // Not supported - avg(max_over_time(cpu[10m])), Supported - avg(cpu)
1948+ if e , ok := expr .Expr .(* Call ); ok {
1949+ if e .Func != nil {
1950+ return false
1951+ }
1952+ }
1953+ return true
1954+ case * Call :
1955+ return isV3ioEligibleFunction (expr .Func .Name )
19351956 }
1957+
19361958 return false
19371959}
19381960
0 commit comments