Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 1f434d4

Browse files
authored
bucket_script (pipeline aggr) enhancements (#923)
Before `bucket_script` was only handled in one simplest case (which happened surprisingly often) - when it meant just a simple count. Now we accept the script to be based on more than one parent aggregation + it works for 1 particular example of a script. TODO: After implementing some basic arithmetic parsing, it should work in all cases.
1 parent ca8ec4a commit 1f434d4

File tree

9 files changed

+460
-29
lines changed

9 files changed

+460
-29
lines changed

quesma/model/pipeline_aggregations/bucket_script.go

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,44 +4,85 @@ package pipeline_aggregations
44

55
import (
66
"context"
7+
"fmt"
78
"quesma/logger"
89
"quesma/model"
10+
"quesma/util"
11+
"strings"
912
)
1013

1114
type BucketScript struct {
12-
ctx context.Context
15+
*PipelineAggregation
16+
script string
1317
}
1418

15-
func NewBucketScript(ctx context.Context) BucketScript {
16-
return BucketScript{ctx: ctx}
19+
func NewBucketScript(ctx context.Context, script string) BucketScript {
20+
return BucketScript{script: script, PipelineAggregation: newPipelineAggregation(ctx, "_count")}
1721
}
1822

1923
func (query BucketScript) AggregationType() model.AggregationType {
2024
return model.PipelineMetricsAggregation // not sure
2125
}
2226

2327
func (query BucketScript) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
24-
if len(rows) == 0 {
25-
logger.WarnWithCtx(query.ctx).Msg("no rows returned for bucket script aggregation")
26-
return model.JsonMap{"value": 0}
27-
}
28-
var response []model.JsonMap
29-
for _, row := range rows {
30-
response = append(response, model.JsonMap{"value": row.Cols[0].Value})
31-
}
32-
return model.JsonMap{
33-
"buckets": response,
28+
const defaultValue = 0.
29+
switch {
30+
case query.script == "params.numerator != null && params.denominator != null && params.denominator != 0 ? params.numerator / params.denominator : 0":
31+
numerator := query.findFilterValue(rows, "numerator")
32+
denominator := query.findFilterValue(rows, "denominator")
33+
if denominator == 0 {
34+
return model.JsonMap{"value": defaultValue}
35+
}
36+
return model.JsonMap{"value": numerator / denominator}
37+
case len(rows) == 1:
38+
for _, row := range rows {
39+
return model.JsonMap{"value": util.ExtractNumeric64(row.LastColValue())}
40+
}
3441
}
42+
43+
logger.WarnWithCtx(query.ctx).Msgf("unexpected result in bucket_script: %s, len(rows): %d. Returning default.", query.String(), len(rows))
44+
return model.JsonMap{"value": defaultValue}
3545
}
3646

37-
func (query BucketScript) CalculateResultWhenMissing(*model.Query, []model.QueryResultRow) []model.QueryResultRow {
38-
return []model.QueryResultRow{}
47+
func (query BucketScript) CalculateResultWhenMissing(parentRows []model.QueryResultRow) []model.QueryResultRow {
48+
resultRows := make([]model.QueryResultRow, 0, len(parentRows))
49+
for _, parentRow := range parentRows {
50+
resultRow := parentRow.Copy()
51+
if len(resultRow.Cols) != 0 {
52+
resultRow.Cols[len(resultRow.Cols)-1].Value = util.ExtractNumeric64(parentRow.LastColValue())
53+
} else {
54+
logger.ErrorWithCtx(query.ctx).Msgf("unexpected empty parent row in bucket_script: %s", query.String())
55+
}
56+
resultRows = append(resultRows, resultRow)
57+
}
58+
return resultRows
3959
}
4060

4161
func (query BucketScript) String() string {
42-
return "bucket script"
62+
return fmt.Sprintf("bucket_script(isCount: %v, parent: %s, pathToParent: %v, parentBucketAggregation: %v, script: %v)",
63+
query.isCount, query.Parent, query.PathToParent, query.parentBucketAggregation, query.script)
4364
}
4465

4566
func (query BucketScript) PipelineAggregationType() model.PipelineAggregationType {
46-
return model.PipelineParentAggregation // not sure, maybe it's sibling.
67+
return model.PipelineParentAggregation // not sure, maybe it's sibling. change hasn't changed the result when running some tests.
68+
}
69+
70+
func (query BucketScript) findFilterValue(rows []model.QueryResultRow, filterName string) float64 {
71+
const defaultValue = 0.0
72+
for _, row := range rows {
73+
for _, col := range row.Cols {
74+
colName := col.ColName
75+
if !strings.HasSuffix(colName, "_col_0") {
76+
continue
77+
}
78+
colName = strings.TrimSuffix(colName, "_col_0")
79+
if strings.HasSuffix(colName, "-"+filterName) {
80+
return float64(util.ExtractInt64(col.Value))
81+
}
82+
}
83+
}
84+
85+
logger.WarnWithCtx(query.ctx).Msgf("could not find filter value for filter: %s, bucket_script: %s, len(rows): %d."+
86+
"Returning default", filterName, query.String(), len(rows))
87+
return defaultValue
4788
}

quesma/model/pipeline_aggregations/pipeline_aggregation.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func newPipelineAggregation(ctx context.Context, bucketsPath string) *PipelineAg
2727
const delimiter = ">"
2828
if len(bucketsPath) == 0 {
2929
logger.WarnWithCtx(ctx).Msgf("invalid bucketsPath: %s. Using empty string as parent.", bucketsPath)
30-
return &PipelineAggregation{}
30+
return &PipelineAggregation{isCount: true} // count, as it's the simplest case
3131
}
3232

3333
parent := ""
@@ -54,6 +54,10 @@ func (p *PipelineAggregation) IsCount() bool {
5454
return p.isCount
5555
}
5656

57+
func (p *PipelineAggregation) GetParentBucketAggregation() model.QueryType {
58+
return p.parentBucketAggregation
59+
}
60+
5761
func (p *PipelineAggregation) SetParentBucketAggregation(parentBucketAggregation model.QueryType) {
5862
p.parentBucketAggregation = parentBucketAggregation
5963
}

quesma/model/pipeline_query_type.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,6 @@ type PipelineQueryType interface {
2626
GetPathToParent() []string
2727
IsCount() bool
2828

29+
GetParentBucketAggregation() QueryType
2930
SetParentBucketAggregation(parentBucketAggregation QueryType)
3031
}

quesma/queryparser/aggregation_parser_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,7 @@ func allAggregationTests() []testdata.AggregationTestCase {
666666
add(kibana_visualize.PipelineAggregationTests, "kibana-visualize/pipeline_agg_req")
667667
add(clients.KunkkaTests, "clients/kunkka")
668668
add(clients.OpheliaTests, "clients/ophelia")
669+
add(clients.CloverTests, "clients/clover")
669670

670671
return allTests
671672
}

quesma/queryparser/pancake_pipelines.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ func (p pancakePipelinesProcessor) selectPipelineRows(pipeline model.PipelineQue
1919
bucketAggregation *pancakeModelBucketAggregation) (
2020
result []model.QueryResultRow) {
2121

22+
if bucketAggregation == nil {
23+
return rows
24+
}
25+
2226
isCount := pipeline.IsCount()
2327
for _, row := range rows {
2428
newRow := model.QueryResultRow{Index: row.Index}

quesma/queryparser/pancake_sql_query_generation_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ func TestPancakeQueryGeneration(t *testing.T) {
6060
t.Skip("Need to implement order by top metrics (talk with Jacek, he has an idea)")
6161
}
6262

63+
if test.TestName == "multiple buckets_path(file:clients/clover,nr:1)" {
64+
t.Skip("Unskip after merge of auto_date_histogram")
65+
}
66+
6367
fmt.Println("i:", i, "test:", test.TestName)
6468

6569
jsonp, err := types.ParseJSON(test.QueryRequestJson)

quesma/queryparser/pipeline_aggregations.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"quesma/logger"
77
"quesma/model"
88
"quesma/model/pipeline_aggregations"
9+
"strings"
910
)
1011

1112
// CAUTION: maybe "return" everywhere isn't corrent, as maybe there can be multiple pipeline aggregations at one level.
@@ -168,26 +169,29 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) (
168169
if !ok {
169170
return
170171
}
171-
if bucketsPath != pipeline_aggregations.BucketsPathCount {
172+
if !strings.HasSuffix(bucketsPath, pipeline_aggregations.BucketsPathCount) {
172173
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not '_count', but %s. Skipping this aggregation", bucketsPath)
173174
return
174175
}
175176

176-
// if ["script"]["source"] != "_value", skip the aggregation
177177
scriptRaw, exists := bucketScript["script"]
178178
if !exists {
179179
logger.WarnWithCtx(cw.Ctx).Msg("no script in bucket_script. Skipping this aggregation")
180180
return
181181
}
182+
if script, ok := scriptRaw.(string); ok {
183+
return pipeline_aggregations.NewBucketScript(cw.Ctx, script), true
184+
}
185+
182186
script, ok := scriptRaw.(QueryMap)
183187
if !ok {
184188
logger.WarnWithCtx(cw.Ctx).Msgf("script is not a map, but %T, value: %v. Skipping this aggregation", scriptRaw, scriptRaw)
185189
return
186190
}
187191
if sourceRaw, exists := script["source"]; exists {
188192
if source, ok := sourceRaw.(string); ok {
189-
if source != "_value" {
190-
logger.WarnWithCtx(cw.Ctx).Msgf("source is not '_value', but %s. Skipping this aggregation", source)
193+
if source != "_value" && source != "count * 1" {
194+
logger.WarnWithCtx(cw.Ctx).Msgf("source is not '_value'/'count * 1', but %s. Skipping this aggregation", source)
191195
return
192196
}
193197
} else {
@@ -200,10 +204,10 @@ func (cw *ClickhouseQueryTranslator) parseBucketScriptBasic(queryMap QueryMap) (
200204
}
201205

202206
// okay, we've checked everything, it's indeed a simple count
203-
return pipeline_aggregations.NewBucketScript(cw.Ctx), true
207+
return pipeline_aggregations.NewBucketScript(cw.Ctx, ""), true
204208
}
205209

206-
func (cw *ClickhouseQueryTranslator) parseBucketsPath(shouldBeQueryMap any, aggregationName string) (bucketsPath string, success bool) {
210+
func (cw *ClickhouseQueryTranslator) parseBucketsPath(shouldBeQueryMap any, aggregationName string) (bucketsPathStr string, success bool) {
207211
queryMap, ok := shouldBeQueryMap.(QueryMap)
208212
if !ok {
209213
logger.WarnWithCtx(cw.Ctx).Msgf("%s is not a map, but %T, value: %v", aggregationName, shouldBeQueryMap, shouldBeQueryMap)
@@ -214,10 +218,27 @@ func (cw *ClickhouseQueryTranslator) parseBucketsPath(shouldBeQueryMap any, aggr
214218
logger.WarnWithCtx(cw.Ctx).Msg("no buckets_path in avg_bucket")
215219
return
216220
}
217-
bucketsPath, ok = bucketsPathRaw.(string)
218-
if !ok {
219-
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a string, but %T, value: %v", bucketsPathRaw, bucketsPathRaw)
220-
return
221+
222+
switch bucketsPath := bucketsPathRaw.(type) {
223+
case string:
224+
return bucketsPath, true
225+
case QueryMap:
226+
// TODO: handle arbitrary nr of keys (and arbitrary scripts, because we also handle only one special case)
227+
if len(bucketsPath) == 1 || len(bucketsPath) == 2 {
228+
for _, bucketPath := range bucketsPath {
229+
if _, ok = bucketPath.(string); !ok {
230+
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a map with string values, but %T. Skipping this aggregation", bucketPath)
231+
return
232+
}
233+
// Kinda weird to return just the first value, but seems working on all cases so far.
234+
// After fixing the TODO above, it should also get fixed.
235+
return bucketPath.(string), true
236+
}
237+
} else {
238+
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path is not a map with one or two keys, but %d. Skipping this aggregation", len(bucketsPath))
239+
}
221240
}
222-
return bucketsPath, true
241+
242+
logger.WarnWithCtx(cw.Ctx).Msgf("buckets_path in wrong format, type: %T, value: %v", bucketsPathRaw, bucketsPathRaw)
243+
return
223244
}

0 commit comments

Comments
 (0)