Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 1735b98

Browse files
trzysiektrzysiek
andauthored
rate - new aggregation from Kibana 8.15 (#999)
Tested also manually and it seems working, e.g. us: <img width="1728" alt="Screenshot 2025-01-26 at 17 09 41" src="https://github.com/user-attachments/assets/20085833-19f9-4e03-b5a2-5e8c02ebae7c" /> Elastic: <img width="1728" alt="Screenshot 2025-01-26 at 17 09 48" src="https://github.com/user-attachments/assets/bea2334e-9f3e-4451-9485-91c4622456a7" /> --------- Co-authored-by: trzysiek <[email protected]>
1 parent 9710f8d commit 1735b98

File tree

13 files changed

+1002
-33
lines changed

13 files changed

+1002
-33
lines changed

docs/public/docs/limitations.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ Currently supported:
3838
including: `boolean`, `match`, `match phrase`, `multi-match`, `query string`, `nested`, `match all`, `exists`, `prefix`, `range`, `term`, `terms`, `wildcard`
3939
- most popular [Aggregations](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html),
4040
including: `avg`, `cardinality`, `max`, `min`, `percentile ranks`, `percentiles`, `stats`, `sum`, `top hits`, `top metrics`, `value count`,
41-
`date histogram`, `date range`, `filter`, `filters`, `histogram`, `range`, `singificant terms`, `terms`, `ip prefix`, `ip range`, `geo_bounds`, `geohash_grid`
41+
`date histogram`, `date range`, `filter`, `filters`, `histogram`, `range`, `significant terms`, `terms`, `ip prefix`, `ip range`, `geo_bounds`, `geohash_grid`
4242

4343
Which as a result allows you to run Kibana/OSD queries and dashboards on data residing in ClickHouse/Hydrolix.
4444

platform/model/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ More info: https://www.elastic.co/guide/en/elasticsearch/reference/current/searc
2929
Min | :white_check_mark: | Histogram | :white_check_mark: | Moving percentiles | :x: |
3030
Percentile ranks | :white_check_mark: | IP prefix | :white_check_mark: | Normalize | :x: |
3131
Percentiles | :white_check_mark: | IP range | :white_check_mark: | Percentiles bucket | :x: |
32-
Rate | :x: | Missing | :x: | Serial differencing | :white_check_mark: |
32+
Rate | :white_check_mark: | Missing | :x: | Serial differencing | :white_check_mark: |
3333
Scripted metric | :x: | Multi-terms | :white_check_mark: | Stats bucket | :x: |
3434
Stats | :white_check_mark: | Nested | :x: | Sum bucket | :white_check_mark: |
3535
String stats | :x: | Parent | :x: |

platform/model/bucket_aggregations/date_histogram.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,15 @@ func (query *DateHistogram) getKey(row model.QueryResultRow) int64 {
225225
return row.Cols[len(row.Cols)-2].Value.(int64)
226226
}
227227

228+
func (query *DateHistogram) Interval() (interval time.Duration, ok bool) {
229+
if duration, err := util.ParseInterval(query.interval); err == nil {
230+
return duration, true
231+
} else {
232+
logger.WarnWithCtx(query.ctx).Msg(err.Error())
233+
return 0, false
234+
}
235+
}
236+
228237
func (query *DateHistogram) calculateResponseKeyInUTC(originalKey int64) int64 {
229238
if query.intervalType == DateHistogramCalendarInterval {
230239
return originalKey
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package metrics_aggregations
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"github.com/QuesmaOrg/quesma/platform/logger"
9+
"github.com/QuesmaOrg/quesma/platform/model"
10+
"github.com/QuesmaOrg/quesma/platform/util"
11+
"reflect"
12+
"strings"
13+
"time"
14+
)
15+
16+
type (
17+
Rate struct {
18+
ctx context.Context
19+
unit RateUnit
20+
multiplier float64
21+
parentInterval time.Duration
22+
fieldPresent bool
23+
}
24+
RateUnit int
25+
RateMode string
26+
)
27+
28+
const (
29+
second RateUnit = iota
30+
minute
31+
hour
32+
day
33+
week
34+
month
35+
quarter
36+
year
37+
)
38+
39+
const (
40+
RateModeSum RateMode = "sum"
41+
RateModeValueCount RateMode = "value_count"
42+
RateModeInvalid RateMode = "invalid"
43+
)
44+
45+
// NewRate creates a new Rate aggregation, during parsing.
46+
// 'multiplier' and 'parentIntervalInMs' are set later, during pancake transformation.
47+
func NewRate(ctx context.Context, unit string, fieldPresent bool) (*Rate, error) {
48+
rateUnit, err := newRateUnit(ctx, unit)
49+
rate := &Rate{ctx: ctx, unit: rateUnit, fieldPresent: fieldPresent}
50+
if err != nil {
51+
rate.unit = second
52+
}
53+
return rate, err
54+
}
55+
56+
func (query *Rate) AggregationType() model.AggregationType {
57+
return model.MetricsAggregation
58+
}
59+
60+
func (query *Rate) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
61+
// rows[0] is either: val (1 column)
62+
// or parent date_histogram's key, val (2 columns)
63+
if len(rows) != 1 || (len(rows[0].Cols) != 1 && len(rows[0].Cols) != 2) {
64+
logger.ErrorWithCtx(query.ctx).Msgf("unexpected number of rows or columns returned for %s: %+v.", query.String(), rows)
65+
return model.JsonMap{"value": nil}
66+
}
67+
68+
parentVal, ok := util.ExtractNumeric64Maybe(rows[0].LastColValue())
69+
if !ok {
70+
logger.WarnWithCtx(query.ctx).Msgf("cannot extract numeric value from %v, %T", rows[0].Cols[0], rows[0].Cols[0].Value)
71+
return model.JsonMap{"value": nil}
72+
}
73+
74+
var (
75+
fix = 1.0 // e.g. 90/88 if there are 88 days in 3 months, but our calculations are based on 90 days
76+
thirtyDays = 30 * util.Day()
77+
needToCountDaysNr = query.parentInterval.Milliseconds()%thirtyDays.Milliseconds() == 0 &&
78+
(query.unit == second || query.unit == minute || query.unit == hour || query.unit == day || query.unit == week)
79+
weHaveParentDateHistogramKey = len(rows[0].Cols) == 2
80+
)
81+
82+
if needToCountDaysNr && weHaveParentDateHistogramKey {
83+
// Calculating 'fix':
84+
// We need to count days of every month, as it can be 28, 29, 30 or 31...
85+
// So that our average is correct (in Elastic it always is)
86+
parentDateHistogramKey, ok := rows[0].Cols[0].Value.(int64)
87+
if !ok {
88+
logger.WarnWithCtx(query.ctx).Msgf("cannot extract parent date_histogram key from %v, %T", rows[0].Cols[0], rows[0].Cols[0].Value)
89+
return model.JsonMap{"value": nil}
90+
}
91+
92+
someTime := time.UnixMilli(parentDateHistogramKey).Add(48 * time.Hour)
93+
// someTime.Day() is in [28, 31] U {1}. I want it to be >= 2, so I'm sure I'm in the right month for all timezones.
94+
for someTime.Day() == 1 || someTime.Day() > 25 {
95+
someTime = someTime.Add(24 * time.Hour)
96+
}
97+
98+
actualDays := 0
99+
currentDays := query.parentInterval.Milliseconds() / thirtyDays.Milliseconds() * 30 // e.g. 90 for 3 months date_histogram
100+
currentDaysConst := currentDays
101+
for currentDays > 0 {
102+
actualDays += util.DaysInMonth(someTime)
103+
currentDays -= 30
104+
someTime = someTime.AddDate(0, -1, 0)
105+
}
106+
fix = float64(currentDaysConst) / float64(actualDays)
107+
}
108+
109+
return model.JsonMap{"value": fix * parentVal * query.multiplier}
110+
}
111+
112+
func (query *Rate) CalcAndSetMultiplier(parentInterval time.Duration) {
113+
query.parentInterval = parentInterval
114+
if parentInterval.Milliseconds() == 0 {
115+
logger.ErrorWithCtx(query.ctx).Msgf("parent interval is 0, cannot calculate rate multiplier")
116+
return
117+
}
118+
119+
rate := query.unit.ToDuration(query.ctx)
120+
// unit month/quarter/year is special, only compatible with month/quarter/year calendar intervals
121+
if query.unit == month || query.unit == quarter || query.unit == year {
122+
oneMonth := 30 * util.Day()
123+
if parentInterval < oneMonth {
124+
logger.WarnWithCtx(query.ctx).Msgf("parent interval (%d ms) is not compatible with rate unit %s", parentInterval, query.unit.String(query.ctx))
125+
return
126+
}
127+
if query.unit == year {
128+
rate = 360 * util.Day() // round to 360 days, so year/month = 12, year/quarter = 3, as should be
129+
}
130+
}
131+
132+
if rate.Milliseconds()%parentInterval.Milliseconds() == 0 {
133+
query.multiplier = float64(rate.Milliseconds() / parentInterval.Milliseconds())
134+
} else {
135+
query.multiplier = float64(rate.Milliseconds()) / float64(parentInterval.Milliseconds())
136+
}
137+
}
138+
139+
func (query *Rate) String() string {
140+
return fmt.Sprintf("rate(unit: %s)", query.unit.String(query.ctx))
141+
}
142+
143+
func (query *Rate) FieldPresent() bool {
144+
return query.fieldPresent
145+
}
146+
147+
func newRateUnit(ctx context.Context, unit string) (RateUnit, error) {
148+
switch strings.ToLower(unit) {
149+
case "second":
150+
return second, nil
151+
case "minute":
152+
return minute, nil
153+
case "hour":
154+
return hour, nil
155+
case "day":
156+
return day, nil
157+
case "week":
158+
return week, nil
159+
case "month":
160+
return month, nil
161+
case "quarter":
162+
return quarter, nil
163+
case "year":
164+
return year, nil
165+
default:
166+
logger.WarnWithCtxAndThrottling(ctx, "rate", "unit", "invalid rate unit: %s", unit)
167+
return second, fmt.Errorf("invalid rate unit: %s", unit)
168+
}
169+
}
170+
171+
func (u RateUnit) String(ctx context.Context) string {
172+
switch u {
173+
case second:
174+
return "second"
175+
case minute:
176+
return "minute"
177+
case hour:
178+
return "hour"
179+
case day:
180+
return "day"
181+
case week:
182+
return "week"
183+
case month:
184+
return "month"
185+
case quarter:
186+
return "quarter"
187+
case year:
188+
return "year"
189+
default:
190+
// theoretically unreachable
191+
logger.WarnWithCtxAndThrottling(ctx, "rate", "unit", "invalid rate unit: %d", u)
192+
return "invalid"
193+
}
194+
}
195+
196+
func (u RateUnit) ToDuration(ctx context.Context) time.Duration {
197+
switch u {
198+
case second:
199+
return time.Second
200+
case minute:
201+
return time.Minute
202+
case hour:
203+
return time.Hour
204+
case day:
205+
return util.Day()
206+
case week:
207+
return 7 * util.Day()
208+
case month:
209+
return 30 * util.Day()
210+
case quarter:
211+
return 90 * util.Day()
212+
case year:
213+
return 365 * util.Day()
214+
default:
215+
logger.ErrorWithCtx(ctx).Msgf("invalid rate unit: %s", u.String(ctx))
216+
return 0
217+
}
218+
}
219+
220+
func NewRateMode(ctx context.Context, mode string) RateMode {
221+
switch mode {
222+
case "sum", "":
223+
return RateModeSum
224+
case "value_count":
225+
return RateModeValueCount
226+
default:
227+
logger.WarnWithCtxAndThrottling(ctx, "rate", "mode", "invalid rate mode: %s", mode)
228+
return RateModeInvalid
229+
}
230+
}
231+
232+
func (m RateMode) String() string {
233+
switch m {
234+
case RateModeSum:
235+
return "sum"
236+
case RateModeValueCount:
237+
return "value_count"
238+
case RateModeInvalid:
239+
return "invalid"
240+
default:
241+
return "invalid, but not RateModeInvalid"
242+
}
243+
}
244+
245+
// TODO make part of QueryType interface and implement for all aggregations
246+
// TODO add bad requests to tests
247+
// Doing so will ensure we see 100% of what we're interested in in our logs (now we see ~95%)
248+
func CheckParamsRate(ctx context.Context, paramsRaw any) error {
249+
requiredParams := map[string]string{
250+
"unit": "string",
251+
}
252+
optionalParams := map[string]string{
253+
"field": "string",
254+
"mode": "string",
255+
}
256+
257+
params, ok := paramsRaw.(model.JsonMap)
258+
if !ok {
259+
return fmt.Errorf("params is not a map, but %+v", paramsRaw)
260+
}
261+
262+
// check if required are present
263+
for paramName, paramType := range requiredParams {
264+
paramVal, exists := params[paramName]
265+
if !exists {
266+
return fmt.Errorf("required parameter %s not found in params", paramName)
267+
}
268+
if reflect.TypeOf(paramVal).Name() != paramType { // TODO I'll make a small rewrite to not use reflect here
269+
return fmt.Errorf("required parameter %s is not of type %s, but %T", paramName, paramType, paramVal)
270+
}
271+
}
272+
if _, err := newRateUnit(ctx, params["unit"].(string)); err != nil {
273+
return fmt.Errorf("invalid rate unit: %v", params["unit"])
274+
}
275+
276+
// check if only required/optional are present, and if present - that they have correct types
277+
for paramName := range params {
278+
if _, isRequired := requiredParams[paramName]; !isRequired {
279+
wantedType, isOptional := optionalParams[paramName]
280+
if !isOptional {
281+
return fmt.Errorf("unexpected parameter %s found in Rate params %v", paramName, params)
282+
}
283+
if reflect.TypeOf(params[paramName]).Name() != wantedType { // TODO I'll make a small rewrite to not use reflect here
284+
return fmt.Errorf("optional parameter %s is not of type %s, but %T", paramName, wantedType, params[paramName])
285+
}
286+
}
287+
}
288+
if mode, exists := params["mode"]; exists && NewRateMode(ctx, mode.(string)) == RateModeInvalid {
289+
return fmt.Errorf("invalid rate mode: %v", params["mode"])
290+
}
291+
292+
return nil
293+
}

platform/parsers/elastic_query_dsl/aggregation_parser.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/QuesmaOrg/quesma/platform/clickhouse"
88
"github.com/QuesmaOrg/quesma/platform/logger"
99
"github.com/QuesmaOrg/quesma/platform/model"
10+
"github.com/QuesmaOrg/quesma/platform/model/metrics_aggregations"
1011
"regexp"
1112
"slices"
1213
"strconv"
@@ -27,6 +28,8 @@ type metricsAggregation struct {
2728
Order string // Only for top_metrics
2829
IsFieldNameCompound bool // Only for a few aggregations, where we have only 1 field. It's a compound, so e.g. toHour(timestamp), not just "timestamp"
2930
sigma float64 // only for standard deviation
31+
unit string // only for rate
32+
mode string // only for rate
3033
}
3134

3235
type aggregationParser = func(queryMap QueryMap) (model.QueryType, error)
@@ -158,6 +161,25 @@ func (cw *ClickhouseQueryTranslator) tryMetricsAggregation(queryMap QueryMap) (m
158161
}, true
159162
}
160163

164+
if rate, exists := queryMap["rate"]; exists {
165+
if err := metrics_aggregations.CheckParamsRate(cw.Ctx, rate); err != nil {
166+
logger.WarnWithCtx(cw.Ctx).Msgf("rate aggregation has invalid parameters: %v. Skipping.", rate)
167+
return metricsAggregation{}, false
168+
}
169+
field := cw.parseFieldField(rate, "rate")
170+
var fields []model.Expr
171+
if field != nil {
172+
fields = append(fields, field)
173+
}
174+
const defaultMode = "sum"
175+
return metricsAggregation{
176+
AggrType: "rate",
177+
Fields: fields,
178+
// safe casts below, because of CheckParamsRate
179+
unit: cw.parseStringField(rate.(JsonMap), "unit", ""), // default doesn't matter, it's checked in CheckParamsRate, it will never be empty here
180+
mode: cw.parseStringField(rate.(JsonMap), "mode", defaultMode),
181+
}, true
182+
}
161183
if geoBounds, exists := queryMap["geo_bounds"]; exists {
162184
return metricsAggregation{
163185
AggrType: "geo_bounds",

0 commit comments

Comments
 (0)