Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 766c41b

Browse files
committed
Works I think :D
1 parent 2709c9e commit 766c41b

24 files changed

+1310
-532
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package bucket_aggregations
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"quesma/logger"
9+
"quesma/model"
10+
"time"
11+
)
12+
13+
type AutoDateHistogram struct {
14+
ctx context.Context
15+
field model.Expr // name of the field, e.g. timestamp
16+
bucketsNr int
17+
key int64
18+
}
19+
20+
func NewAutoDateHistogram(ctx context.Context, field model.Expr, bucketsNr int) *AutoDateHistogram {
21+
return &AutoDateHistogram{ctx: ctx, field: field, bucketsNr: bucketsNr}
22+
}
23+
24+
func (query *AutoDateHistogram) AggregationType() model.AggregationType {
25+
return model.BucketAggregation
26+
}
27+
28+
func (query *AutoDateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
29+
fmt.Println(rows)
30+
if len(rows) == 0 {
31+
logger.WarnWithCtx(query.ctx).Msgf("no rows returned for %s", query.String())
32+
return make(model.JsonMap, 0)
33+
}
34+
return model.JsonMap{
35+
"buckets": []model.JsonMap{{
36+
"key": query.key,
37+
"key_as_string": time.UnixMilli(query.key).Format("2006-01-02T15:04:05.000-07:00"),
38+
"doc_count": rows[0].LastColValue(),
39+
}},
40+
"interval": "100y",
41+
}
42+
}
43+
44+
func (query *AutoDateHistogram) String() string {
45+
return fmt.Sprintf("auto_date_histogram(field: %v, bucketsNr: %d)", model.AsString(query.field), query.bucketsNr)
46+
}
47+
48+
func (query *AutoDateHistogram) GetField() model.Expr {
49+
return query.field
50+
}
51+
52+
func (query *AutoDateHistogram) SetKey(key int64) {
53+
query.key = key
54+
}

quesma/model/bucket_aggregations/date_histogram.go

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ func (query *DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultR
8585
// Implement default when query.minDocCount == DefaultMinDocCount, we need to return
8686
// all buckets between the first bucket that matches documents and the last one.
8787

88-
if query.minDocCount == 0 {
88+
fmt.Println("query.minDocCount", query.minDocCount, "query.ebmin", query.ebmin)
89+
if query.minDocCount == 0 || query.ebmin != 0 {
8990
rows = query.NewRowsTransformer().Transform(query.ctx, rows)
9091
}
9192

@@ -99,7 +100,7 @@ func (query *DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultR
99100
responseKey := query.calculateResponseKey(originalKey)
100101

101102
response = append(response, model.JsonMap{
102-
//OriginalKeyName: originalKey,
103+
OriginalKeyName: originalKey,
103104
"key": responseKey,
104105
"doc_count": docCount,
105106
"key_as_string": query.calculateKeyAsString(responseKey),
@@ -119,6 +120,7 @@ func (query *DateHistogram) String() string {
119120
// only intervals <= days are needed
120121
func (query *DateHistogram) intervalAsDuration() time.Duration {
121122
var intervalInHoursOrLess string
123+
//fmt.Println("query.interval", query.interval)
122124
if strings.HasSuffix(query.interval, "d") {
123125
// time.ParseDuration doesn't accept > hours, we need to convert days to hours
124126
daysNr, err := strconv.Atoi(strings.TrimSuffix(query.interval, "d"))
@@ -130,7 +132,9 @@ func (query *DateHistogram) intervalAsDuration() time.Duration {
130132
} else {
131133
intervalInHoursOrLess = query.interval
132134
}
135+
//fmt.Println("intervalInHoursOrLess", intervalInHoursOrLess)
133136
duration, _ := time.ParseDuration(intervalInHoursOrLess)
137+
//fmt.Println("duration", duration)
134138
return duration
135139
}
136140

@@ -228,7 +232,7 @@ func (query *DateHistogram) calculateResponseKey(originalKey int64) int64 {
228232
}
229233

230234
func (query *DateHistogram) calculateKeyAsString(key int64) string {
231-
return time.UnixMilli(key).In(query.wantedTimezone).Format("2006-01-02T15:04:05.000-07:00")
235+
return time.UnixMilli(key).In(query.wantedTimezone).Format("2006/01/02 15:04:05")
232236
}
233237

234238
func (query *DateHistogram) OriginalKeyToKeyAsString(originalKey any) string {
@@ -241,16 +245,14 @@ func (query *DateHistogram) SetMinDocCountToZero() {
241245
}
242246

243247
func (query *DateHistogram) NewRowsTransformer() model.QueryRowsTransformer {
244-
differenceBetweenTwoNextKeys := int64(1)
245-
if query.intervalType == DateHistogramCalendarInterval {
246-
duration, err := kibana.ParseInterval(query.interval)
247-
if err == nil {
248-
differenceBetweenTwoNextKeys = duration.Milliseconds()
249-
} else {
250-
logger.ErrorWithCtx(query.ctx).Err(err)
251-
differenceBetweenTwoNextKeys = 0
252-
}
248+
duration, err := kibana.ParseInterval(query.interval)
249+
var differenceBetweenTwoNextKeys int64
250+
if err == nil {
251+
differenceBetweenTwoNextKeys = duration.Milliseconds()
252+
} else {
253+
logger.ErrorWithCtx(query.ctx).Err(err)
253254
}
255+
fmt.Println("differenceBetweenTwoNextKeys", differenceBetweenTwoNextKeys)
254256
return &DateHistogramRowsTransformer{MinDocCount: query.minDocCount, differenceBetweenTwoNextKeys: differenceBetweenTwoNextKeys, EmptyValue: 0, ebmin: query.ebmin, ebmax: query.ebmax}
255257
}
256258

@@ -267,7 +269,7 @@ type DateHistogramRowsTransformer struct {
267269
// if MinDocCount == 0, and we have buckets e.g. [key, value1], [key+10, value2], we need to insert [key+1, 0], [key+2, 0]...
268270
// CAUTION: a different kind of postprocessing is needed for MinDocCount > 1, but I haven't seen any query with that yet, so not implementing it now.
269271
func (qt *DateHistogramRowsTransformer) Transform(ctx context.Context, rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
270-
if qt.MinDocCount != 0 || qt.differenceBetweenTwoNextKeys == 0 || len(rowsFromDB) < 1 {
272+
if qt.MinDocCount != 0 || qt.differenceBetweenTwoNextKeys == 0 {
271273
// we only add empty rows, when
272274
// a) MinDocCount == 0
273275
// b) we have valid differenceBetweenTwoNextKeys (>0)
@@ -281,7 +283,9 @@ func (qt *DateHistogramRowsTransformer) Transform(ctx context.Context, rowsFromD
281283

282284
emptyRowsAdded := 0
283285
postprocessedRows := make([]model.QueryResultRow, 0, len(rowsFromDB))
284-
postprocessedRows = append(postprocessedRows, rowsFromDB[0])
286+
if len(rowsFromDB) > 0 {
287+
postprocessedRows = append(postprocessedRows, rowsFromDB[0])
288+
}
285289
for i := 1; i < len(rowsFromDB); i++ {
286290
if len(rowsFromDB[i-1].Cols) < 2 || len(rowsFromDB[i].Cols) < 2 {
287291
logger.ErrorWithCtx(ctx).Msgf(
@@ -301,17 +305,31 @@ func (qt *DateHistogramRowsTransformer) Transform(ctx context.Context, rowsFromD
301305
}
302306
postprocessedRows = append(postprocessedRows, rowsFromDB[i])
303307
}
304-
/*
305-
fmt.Println("postprocessedRows 1", postprocessedRows, qt.getKey(postprocessedRows[0])-qt.differenceBetweenTwoNextKeys, qt.ebmin, (qt.getKey(postprocessedRows[0])-qt.differenceBetweenTwoNextKeys)*qt.differenceBetweenTwoNextKeys)
306-
307-
for maybePreKey, i := qt.getKey(postprocessedRows[0])-qt.differenceBetweenTwoNextKeys, 0; i < 96; maybePreKey*qt.differenceBetweenTwoNextKeys >= qt.ebminmaybePreKey, i = maybePreKey-qt.differenceBetweenTwoNextKeys, i+1 {
308-
preRow := postprocessedRows[0].Copy()
309-
preRow.Cols[len(preRow.Cols)-2].Value = maybePreKey
310-
preRow.Cols[len(preRow.Cols)-1].Value = qt.EmptyValue
311-
postprocessedRows = append([]model.QueryResultRow{preRow}, postprocessedRows...)
312-
emptyRowsAdded++
313-
}
314-
*/
308+
309+
//fmt.Println("postprocessedRows 1", postprocessedRows, qt.getKey(postprocessedRows[0])*qt.differenceBetweenTwoNextKeys-qt.differenceBetweenTwoNextKeys, qt.ebmin, qt.differenceBetweenTwoNextKeys)
310+
fmt.Println("pre: ", len(postprocessedRows), emptyRowsAdded)
311+
if qt.ebmin == 0 {
312+
return postprocessedRows
313+
}
314+
315+
if len(postprocessedRows) == 0 {
316+
postprocessedRows = append(postprocessedRows, model.QueryResultRow{
317+
Cols: []model.QueryResultCol{
318+
{Value: (qt.ebmin+1000*60*60*2)/qt.differenceBetweenTwoNextKeys - 1},
319+
{Value: qt.EmptyValue},
320+
},
321+
})
322+
}
323+
// gk*d-d = d(gk - 1)
324+
// gk*d-2d = d (gk-2) = d(gk-1) - d
325+
for maybePreKey := (qt.ebmin + 1000*60*60*2) / qt.differenceBetweenTwoNextKeys; maybePreKey*qt.differenceBetweenTwoNextKeys < qt.ebmax+1000*60*60*2; maybePreKey++ {
326+
preRow := postprocessedRows[0].Copy()
327+
preRow.Cols[len(preRow.Cols)-2].Value = maybePreKey
328+
preRow.Cols[len(preRow.Cols)-1].Value = qt.EmptyValue
329+
postprocessedRows = append(postprocessedRows, preRow)
330+
emptyRowsAdded++
331+
}
332+
fmt.Println("post:", len(postprocessedRows), emptyRowsAdded)
315333
return postprocessedRows
316334
}
317335

quesma/model/bucket_aggregations/filter.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package bucket_aggregations
44

55
import (
66
"context"
7+
"fmt"
78
"quesma/logger"
89
"quesma/model"
910
)
@@ -18,14 +19,15 @@ func NewFilterAgg(ctx context.Context, whereClause model.Expr) FilterAgg {
1819
}
1920

2021
func (query FilterAgg) AggregationType() model.AggregationType {
21-
return model.BucketAggregation
22+
return model.MetricsAggregation
2223
}
2324

2425
func (query FilterAgg) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
2526
if len(rows) == 0 {
2627
logger.WarnWithCtx(query.ctx).Msg("no rows returned for filter aggregation")
2728
return make(model.JsonMap, 0)
2829
}
30+
fmt.Println("filter_agg", query.String(), rows)
2931
return model.JsonMap{"doc_count": rows[0].Cols[0].Value}
3032
}
3133

quesma/model/bucket_aggregations/filters.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ type Filter struct {
2828
}
2929

3030
func NewFilter(name string, sql model.SimpleQuery) Filter {
31+
if sql.WhereClause == nil {
32+
sql.WhereClause = model.NewLiteral("True")
33+
}
3134
return Filter{Name: name, Sql: sql}
3235
}
3336

@@ -36,7 +39,7 @@ func (query Filters) AggregationType() model.AggregationType {
3639
}
3740

3841
func (query Filters) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
39-
var value any = nil
42+
var value any = 0.0
4043
if len(rows) > 0 {
4144
if len(rows[0].Cols) > 0 {
4245
value = rows[0].Cols[len(rows[0].Cols)-1].Value

quesma/model/metrics_aggregations/count.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package metrics_aggregations
44

55
import (
66
"context"
7+
"fmt"
78
"quesma/logger"
89
"quesma/model"
910
)
@@ -28,7 +29,11 @@ func (query Count) TranslateSqlResponseToJson(rows []model.QueryResultRow) model
2829
if len(rows) > 1 {
2930
logger.WarnWithCtx(query.ctx).Msg("More than one row returned for count aggregation")
3031
}
31-
return model.JsonMap{"doc_count": rows[0].Cols[0]}
32+
if len(rows[0].Cols) == 0 {
33+
return model.JsonMap{"doc_count": -1}
34+
}
35+
fmt.Println("COUNT", rows)
36+
return model.JsonMap{"doc_count": rows[0].Cols[0].Value}
3237
}
3338

3439
func (query Count) String() string {

quesma/model/pipeline_aggregations/bucket_script.go

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,55 +5,85 @@ package pipeline_aggregations
55
import (
66
"context"
77
"fmt"
8-
"quesma/logger"
98
"quesma/model"
109
"quesma/util"
10+
"strings"
1111
)
1212

1313
type BucketScript struct {
14+
script string
1415
*PipelineAggregation
1516
}
1617

17-
func NewBucketScript(ctx context.Context) BucketScript {
18-
return BucketScript{PipelineAggregation: newPipelineAggregation(ctx, "_count")}
18+
func NewBucketScript(ctx context.Context, script string) BucketScript {
19+
return BucketScript{script: script, PipelineAggregation: newPipelineAggregation(ctx, "_count")}
1920
}
2021

2122
func (query BucketScript) AggregationType() model.AggregationType {
22-
return model.PipelineBucketAggregation // not sure
23+
return model.PipelineMetricsAggregation // not sure
2324
}
2425

2526
func (query BucketScript) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
26-
fmt.Println("bucket_script", rows)
27+
//fmt.Println("bucket_script", query.String(), rows)
2728
if len(rows) == 0 {
28-
logger.WarnWithCtx(query.ctx).Msg("no rows returned for bucket script aggregation")
29+
//logger.WarnWithCtx(query.ctx).Msg("no rows returned for bucket script aggregation")
2930
}
30-
for _, row := range rows {
31-
return model.JsonMap{"value": int64(util.ExtractInt64(row.LastColValue()))}
31+
const defaultValue = 0.
32+
switch query.script {
33+
case "params.numerator != null && params.denominator != null && params.denominator != 0 ? params.numerator / params.denominator : 0":
34+
numerator := query.findFilterValue("numerator", rows)
35+
denominator := query.findFilterValue("denominator", rows)
36+
if denominator == 0 {
37+
return model.JsonMap{"value": defaultValue}
38+
}
39+
return model.JsonMap{"value": numerator / denominator}
40+
default:
41+
for _, row := range rows {
42+
return model.JsonMap{"value": util.ExtractInt64(row.LastColValue())}
43+
}
3244
}
33-
return model.JsonMap{"value": 0.}
45+
46+
return model.JsonMap{"value": 0.0}
3447
}
3548

3649
func (query BucketScript) CalculateResultWhenMissing(parentRows []model.QueryResultRow) []model.QueryResultRow {
37-
fmt.Println("bucket_script", parentRows)
50+
//fmt.Println("bucket_script", query.String(), parentRows[:max(0, len(parentRows))])
3851
if len(parentRows) == 0 {
39-
logger.WarnWithCtx(query.ctx).Msg("no rows returned for bucket script aggregation")
52+
//logger.WarnWithCtx(query.ctx).Msg("no rows returned for bucket script aggregation")
4053
return parentRows
4154
}
4255
resultRows := make([]model.QueryResultRow, 0, len(parentRows))
4356
for _, parentRow := range parentRows {
4457
resultRow := parentRow.Copy()
4558
resultRow.Cols[len(resultRow.Cols)-1].Value = float64(util.ExtractInt64(parentRow.LastColValue()))
46-
fmt.Printf("last col %T %v", resultRow.LastColValue(), resultRow.LastColValue())
59+
//fmt.Printf("last col %T %v", resultRow.LastColValue(), resultRow.LastColValue())
4760
resultRows = append(resultRows, resultRow)
4861
}
4962
return resultRows
5063
}
5164

5265
func (query BucketScript) String() string {
53-
return fmt.Sprintf("bucket script(isCount: %v, parent: %s, pathToParent: %v, parentBucketAggregation: %v)",
54-
query.isCount, query.Parent, query.PathToParent, query.parentBucketAggregation)
66+
return fmt.Sprintf("bucket script(isCount: %v, parent: %s, pathToParent: %v, parentBucketAggregation: %v, script: %v)",
67+
query.isCount, query.Parent, query.PathToParent, query.parentBucketAggregation, query.script)
5568
}
5669

5770
func (query BucketScript) PipelineAggregationType() model.PipelineAggregationType {
58-
return model.PipelineParentAggregation // not sure, maybe it's sibling.
71+
return model.PipelineParentAggregation // not sure, maybe it's sibling. doesnt change the result
72+
}
73+
74+
func (query BucketScript) findFilterValue(filterName string, rows []model.QueryResultRow) float64 {
75+
for _, row := range rows {
76+
for _, col := range row.Cols {
77+
//fmt.Println("col", col)
78+
colName := col.ColName
79+
if !strings.HasSuffix(colName, "_col_0") {
80+
continue
81+
}
82+
colName = strings.TrimSuffix(colName, "_col_0")
83+
if strings.HasSuffix(colName, "-"+filterName) {
84+
return float64(util.ExtractInt64(col.Value))
85+
}
86+
}
87+
}
88+
return 0
5989
}

quesma/model/where_visitor.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package model
2+
3+
// TODO: it's not 100% full/proper implementation, but works in the client case
4+
func FindTimestampLowerBound(expr Expr) (InfixExpr, bool) {
5+
candidates := make([]InfixExpr, 0)
6+
visitor := NewBaseVisitor()
7+
visitor.OverrideVisitInfix = func(visitor *BaseExprVisitor, e InfixExpr) interface{} {
8+
if e.Op == ">=" {
9+
candidates = append(candidates, e)
10+
} else if e.Op == "AND" {
11+
e.Left.Accept(visitor)
12+
e.Right.Accept(visitor)
13+
}
14+
return nil
15+
}
16+
17+
expr.Accept(visitor)
18+
if len(candidates) == 1 {
19+
return candidates[0], true
20+
}
21+
return InfixExpr{}, false
22+
}

0 commit comments

Comments
 (0)