Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 14c022b

Browse files
authored
[Grafana] Handle min_doc_count also in terms (#1323)
I do that by adding `count() >= min_doc_count DESC` to the `ORDER BY` (at the beginning). Then we'll have rows with `>= min_doc_count` first, and can filter out the rest. Before: <img width="1268" alt="Screenshot 2025-03-02 at 21 16 36" src="https://github.com/user-attachments/assets/2d194199-cd5d-4e8e-ac80-bc269d692822" /> After: <img width="1431" alt="Screenshot 2025-03-02 at 21 15 40" src="https://github.com/user-attachments/assets/29f51da1-83b3-4be9-927a-08d02e920baa" />
1 parent ca49adc commit 14c022b

File tree

4 files changed

+287
-11
lines changed

4 files changed

+287
-11
lines changed

platform/model/bucket_aggregations/terms.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
type Terms struct {
1717
ctx context.Context
1818
significant bool // true <=> significant_terms, false <=> terms
19+
minDocCount int
1920
// include is either:
2021
// - single value: then for strings, it can be a regex.
2122
// - array: then field must match exactly one of the values (never a regex)
@@ -28,8 +29,8 @@ type Terms struct {
2829
exclude any
2930
}
3031

31-
func NewTerms(ctx context.Context, significant bool, include, exclude any) Terms {
32-
return Terms{ctx: ctx, significant: significant, include: include, exclude: exclude}
32+
func NewTerms(ctx context.Context, significant bool, minDocCount int, include, exclude any) Terms {
33+
return Terms{ctx: ctx, significant: significant, minDocCount: minDocCount, include: include, exclude: exclude}
3334
}
3435

3536
func (query Terms) AggregationType() model.AggregationType {
@@ -45,6 +46,10 @@ func (query Terms) TranslateSqlResponseToJson(rows []model.QueryResultRow) model
4546
return model.JsonMap{"buckets": []model.JsonMap{}}
4647
}
4748

49+
if query.minDocCount > 1 {
50+
rows = query.NewRowsTransformer().Transform(query.ctx, rows)
51+
}
52+
4853
buckets := make([]model.JsonMap, 0, len(rows))
4954
for _, row := range rows {
5055
docCount := query.docCount(row)
@@ -187,8 +192,8 @@ func CheckParamsTerms(ctx context.Context, paramsRaw any) error {
187192
"value_type": "string",
188193
}
189194
logIfYouSeeThemParams := []string{
190-
"shard_size", "min_doc_count", "shard_min_doc_count",
191-
"show_term_doc_count_error", "collect_mode", "execution_hint", "value_type",
195+
"shard_size", "shard_min_doc_count", "show_term_doc_count_error",
196+
"collect_mode", "execution_hint", "value_type",
192197
}
193198

194199
params, ok := paramsRaw.(model.JsonMap)
@@ -245,3 +250,28 @@ func CheckParamsTerms(ctx context.Context, paramsRaw any) error {
245250

246251
return nil
247252
}
253+
254+
func (query Terms) NewRowsTransformer() model.QueryRowsTransformer {
255+
return &TermsRowsTransformer{minDocCount: int64(query.minDocCount)}
256+
}
257+
258+
type TermsRowsTransformer struct {
259+
minDocCount int64
260+
}
261+
262+
// TODO unify with other transformers
263+
func (qt TermsRowsTransformer) Transform(ctx context.Context, rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
264+
postprocessedRows := make([]model.QueryResultRow, 0, len(rowsFromDB))
265+
for _, row := range rowsFromDB {
266+
docCount, err := util.ExtractInt64(row.LastColValue())
267+
if err != nil {
268+
logger.ErrorWithCtx(ctx).Msgf("unexpected type for terms doc_count: %T, value: %v. Returning empty rows.",
269+
row.LastColValue(), row.LastColValue())
270+
return []model.QueryResultRow{}
271+
}
272+
if docCount >= qt.minDocCount {
273+
postprocessedRows = append(postprocessedRows, row)
274+
}
275+
}
276+
return postprocessedRows
277+
}

platform/parsers/elastic_query_dsl/pancake_aggregation_parser_buckets.go

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,14 @@ func (cw *ClickhouseQueryTranslator) parseTermsAggregation(aggregation *pancakeA
159159
return err
160160
}
161161

162+
const (
163+
defaultSize = 10
164+
defaultMinDocCount = 1
165+
)
166+
167+
minDocCount := cw.parseIntField(params, "min_doc_count", defaultMinDocCount)
162168
terms := bucket_aggregations.NewTerms(
163-
cw.Ctx, aggrName == "significant_terms", params["include"], params["exclude"],
169+
cw.Ctx, aggrName == "significant_terms", minDocCount, params["include"], params["exclude"],
164170
)
165171

166172
var didWeAddMissing, didWeUpdateFieldHere bool
@@ -178,18 +184,37 @@ func (cw *ClickhouseQueryTranslator) parseTermsAggregation(aggregation *pancakeA
178184
aggregation.filterOutEmptyKeyBucket = true
179185
}
180186

181-
const defaultSize = 10
182-
size := cw.parseSize(params, defaultSize)
183-
184187
orderBy, err := cw.parseOrder(params, []model.Expr{field})
185188
if err != nil {
186189
return err
187190
}
188191

189192
aggregation.queryType = terms
190193
aggregation.selectedColumns = append(aggregation.selectedColumns, field)
191-
aggregation.limit = size
194+
aggregation.limit = cw.parseSize(params, defaultSize)
192195
aggregation.orderBy = orderBy
196+
if minDocCount > 1 {
197+
// If you have a better solution, feel free to implement. This works, but adds another ORDER BY + we have to filter out rows later.
198+
//
199+
// We only want rows with (count() >= min_doc_count).
200+
// I think we can't do it in WHERE or HAVING clause, as it might affect the aggregations before/after in the aggregation tree.
201+
// Or at least it's not obvious, this solution is much easier.
202+
// We add the condition as the first ORDER BY. This way rows with count() < min_doc_count will be at the end, and we'll filter them out later.
203+
//
204+
// Example:
205+
// Without this trick, if we have rows (key, count): (k1, 1), (k2, 1), ..., (k_n, 1) and (a, 100), (b, 100)
206+
// (they'll be returned this way, because of some order by)
207+
// If we add the condition below, if min_doc_count>1, rows will be returned (a, 100), (b, 100), (k1, 1), (k2, 1), ...
208+
// We filter out (k1, 1), (k2, 1), ..., and we are fine.
209+
// Without this trick, if we do a query with a limit (without filtering in SQL), we could only receive (k1, 1), (k2, 1), ...,
210+
// and after filtering we'd have 0 rows to return.
211+
condition := model.NewInfixExpr(model.NewCountFunc(), ">=", model.NewLiteral(minDocCount))
212+
firstOrderBy := model.NewOrderByExpr(condition, model.DescOrder)
213+
aggregation.orderBy = append(
214+
[]model.OrderByExpr{firstOrderBy},
215+
aggregation.orderBy...,
216+
)
217+
}
193218
return nil
194219
}
195220

platform/parsers/elastic_query_dsl/pancake_sql_query_generation.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ func (p *pancakeSqlQueryGenerator) generatePartitionBy(groupByColumns []model.Al
4747

4848
// TODO: Implement more if needed.
4949
func (p *pancakeSqlQueryGenerator) generateAccumAggrFunctions(origExpr model.Expr, queryType model.QueryType) (accumExpr model.Expr, aggrFuncName string, err error) {
50-
switch origFunc := origExpr.(type) {
50+
switch origExprTyped := origExpr.(type) {
5151
case model.FunctionExpr:
52+
origFunc := origExprTyped
5253
switch origFunc.Name {
5354
case "sum", "sumOrNull", "min", "minOrNull", "max", "maxOrNull":
5455
return origExpr, origFunc.Name, nil
@@ -64,6 +65,11 @@ func (p *pancakeSqlQueryGenerator) generateAccumAggrFunctions(origExpr model.Exp
6465
return model.NewFunction(strings.Replace(origFunc.Name, "quantiles", "quantilesState", 1), origFunc.Args...),
6566
strings.Replace(origFunc.Name, "quantiles", "quantilesMerge", 1), nil
6667
}
68+
case model.InfixExpr:
69+
origInfix := origExprTyped
70+
if f, ok := origInfix.Left.(model.FunctionExpr); ok && f.Name == "count" {
71+
return origInfix, "sum", nil
72+
}
6773
}
6874
debugQueryType := "<nil>"
6975
if queryType != nil {

platform/testdata/grafana.go

Lines changed: 216 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import "github.com/QuesmaOrg/quesma/platform/model"
66

77
var GrafanaAggregationTests = []AggregationTestCase{
88
{ // [0]
9-
TestName: "simple max/min aggregation as 2 siblings",
9+
TestName: "format: epoch_millis",
1010
QueryRequestJson: `
1111
{
1212
"aggs": {
@@ -83,4 +83,219 @@ var GrafanaAggregationTests = []AggregationTestCase{
8383
"aggr__2__key_0"
8484
ORDER BY "aggr__2__key_0" ASC`,
8585
},
86+
{ // [1]
87+
TestName: "1x terms with min_doc_count, need to erase some rows with count < min_doc_count",
88+
QueryRequestJson: `
89+
{
90+
"aggs": {
91+
"2": {
92+
"terms": {
93+
"field": "extension.keyword",
94+
"size": 4,
95+
"min_doc_count": 40,
96+
"order": {
97+
"_key": "desc"
98+
}
99+
}
100+
}
101+
},
102+
"size": 0
103+
}`,
104+
ExpectedResponse: `
105+
{
106+
"aggregations": {
107+
"2": {
108+
"doc_count_error_upper_bound": 0,
109+
"sum_other_doc_count": 196,
110+
"buckets": [
111+
{
112+
"key": "zip",
113+
"doc_count": 40
114+
}
115+
]
116+
}
117+
},
118+
"hits": {
119+
"hits": [],
120+
"max_score": null,
121+
"total": {
122+
"relation": "eq",
123+
"value": 234
124+
}
125+
},
126+
"status": 200,
127+
"timed_out": false,
128+
"took": 1
129+
}`,
130+
ExpectedPancakeResults: []model.QueryResultRow{
131+
{Cols: []model.QueryResultCol{
132+
model.NewQueryResultCol("aggr__2__parent_count", int64(236)),
133+
model.NewQueryResultCol("aggr__2__key_0", "zip"),
134+
model.NewQueryResultCol("aggr__2__count", int64(40)),
135+
model.NewQueryResultCol("aggr__2__order_1", int64(1)),
136+
}},
137+
{Cols: []model.QueryResultCol{
138+
model.NewQueryResultCol("aggr__2__parent_count", int64(236)),
139+
model.NewQueryResultCol("aggr__2__key_0", "tar"),
140+
model.NewQueryResultCol("aggr__2__count", int64(30)),
141+
model.NewQueryResultCol("aggr__2__order_1", int64(0)),
142+
}},
143+
},
144+
ExpectedPancakeSQL: `
145+
SELECT sum(count(*)) OVER () AS "aggr__2__parent_count",
146+
"extension" AS "aggr__2__key_0", count(*) AS "aggr__2__count",
147+
count(*)>=40 AS "aggr__2__order_1"
148+
FROM __quesma_table_name
149+
GROUP BY "extension" AS "aggr__2__key_0"
150+
ORDER BY "aggr__2__order_1" DESC, "aggr__2__key_0" DESC
151+
LIMIT 5`,
152+
},
153+
{ // [2]
154+
TestName: "2x terms with min_doc_count",
155+
QueryRequestJson: `
156+
{
157+
"aggs": {
158+
"2": {
159+
"aggs": {
160+
"3": {
161+
"terms": {
162+
"field": "message"
163+
}
164+
}
165+
},
166+
"terms": {
167+
"field": "extension.keyword",
168+
"size": 4,
169+
"min_doc_count": 30,
170+
"order": {
171+
"_key": "desc"
172+
}
173+
}
174+
}
175+
},
176+
"size": 0
177+
}`,
178+
ExpectedResponse: `
179+
{
180+
"aggregations": {
181+
"2": {
182+
"doc_count_error_upper_bound": 0,
183+
"sum_other_doc_count": 164,
184+
"buckets": [
185+
{
186+
"3": {
187+
"doc_count_error_upper_bound": 0,
188+
"sum_other_doc_count": 9,
189+
"buckets": [
190+
{
191+
"key": 0,
192+
"doc_count": 18
193+
},
194+
{
195+
"key": 6680,
196+
"doc_count": 4
197+
}
198+
]
199+
},
200+
"key": "zip",
201+
"doc_count": 31
202+
},
203+
{
204+
"3": {
205+
"doc_count_error_upper_bound": 0,
206+
"sum_other_doc_count": 14,
207+
"buckets": [
208+
{
209+
"key": 0,
210+
"doc_count": 25
211+
},
212+
{
213+
"key": 1873,
214+
"doc_count": 2
215+
}
216+
]
217+
},
218+
"key": "tar",
219+
"doc_count": 41
220+
}
221+
]
222+
}
223+
},
224+
"hits": {
225+
"hits": [],
226+
"max_score": null,
227+
"total": {
228+
"relation": "eq",
229+
"value": 234
230+
}
231+
},
232+
"status": 200,
233+
"timed_out": false,
234+
"took": 1
235+
}`,
236+
ExpectedPancakeResults: []model.QueryResultRow{
237+
{Cols: []model.QueryResultCol{
238+
model.NewQueryResultCol("aggr__2__parent_count", int64(236)),
239+
model.NewQueryResultCol("aggr__2__key_0", "zip"),
240+
model.NewQueryResultCol("aggr__2__count", int64(31)),
241+
model.NewQueryResultCol("aggr__2__order_1", int64(0)),
242+
model.NewQueryResultCol("aggr__2__3__parent_count", int64(31)),
243+
model.NewQueryResultCol("aggr__2__3__key_0", 0),
244+
model.NewQueryResultCol("aggr__2__3__count", int64(18)),
245+
}},
246+
{Cols: []model.QueryResultCol{
247+
model.NewQueryResultCol("aggr__2__parent_count", int64(236)),
248+
model.NewQueryResultCol("aggr__2__key_0", "zip"),
249+
model.NewQueryResultCol("aggr__2__count", int64(31)),
250+
model.NewQueryResultCol("aggr__2__order_1", int64(1)),
251+
model.NewQueryResultCol("aggr__2__3__parent_count", int64(31)),
252+
model.NewQueryResultCol("aggr__2__3__key_0", 6680),
253+
model.NewQueryResultCol("aggr__2__3__count", int64(4)),
254+
}},
255+
{Cols: []model.QueryResultCol{
256+
model.NewQueryResultCol("aggr__2__parent_count", int64(236)),
257+
model.NewQueryResultCol("aggr__2__key_0", "tar"),
258+
model.NewQueryResultCol("aggr__2__count", int64(41)),
259+
model.NewQueryResultCol("aggr__2__order_1", int64(1)),
260+
model.NewQueryResultCol("aggr__2__3__parent_count", int64(41)),
261+
model.NewQueryResultCol("aggr__2__3__key_0", 0),
262+
model.NewQueryResultCol("aggr__2__3__count", int64(25)),
263+
}},
264+
{Cols: []model.QueryResultCol{
265+
model.NewQueryResultCol("aggr__2__parent_count", int64(236)),
266+
model.NewQueryResultCol("aggr__2__key_0", "tar"),
267+
model.NewQueryResultCol("aggr__2__count", int64(41)),
268+
model.NewQueryResultCol("aggr__2__order_1", int64(1)),
269+
model.NewQueryResultCol("aggr__2__3__parent_count", int64(41)),
270+
model.NewQueryResultCol("aggr__2__3__key_0", 1873),
271+
model.NewQueryResultCol("aggr__2__3__count", int64(2)),
272+
}},
273+
},
274+
ExpectedPancakeSQL: `
275+
SELECT "aggr__2__parent_count", "aggr__2__key_0", "aggr__2__count",
276+
"aggr__2__order_1", "aggr__2__3__parent_count", "aggr__2__3__key_0",
277+
"aggr__2__3__count"
278+
FROM (
279+
SELECT "aggr__2__parent_count", "aggr__2__key_0", "aggr__2__count",
280+
"aggr__2__order_1", "aggr__2__3__parent_count", "aggr__2__3__key_0",
281+
"aggr__2__3__count",
282+
dense_rank() OVER (ORDER BY "aggr__2__order_1" DESC, "aggr__2__key_0" DESC)
283+
AS "aggr__2__order_1_rank",
284+
dense_rank() OVER (PARTITION BY "aggr__2__key_0" ORDER BY
285+
"aggr__2__3__count" DESC, "aggr__2__3__key_0" ASC) AS
286+
"aggr__2__3__order_1_rank"
287+
FROM (
288+
SELECT sum(count(*)) OVER () AS "aggr__2__parent_count",
289+
"extension" AS "aggr__2__key_0",
290+
sum(count(*)) OVER (PARTITION BY "aggr__2__key_0") AS "aggr__2__count",
291+
sum(count(*)>=30) OVER (PARTITION BY "aggr__2__key_0") AS
292+
"aggr__2__order_1",
293+
sum(count(*)) OVER (PARTITION BY "aggr__2__key_0") AS
294+
"aggr__2__3__parent_count", "message" AS "aggr__2__3__key_0",
295+
count(*) AS "aggr__2__3__count"
296+
FROM __quesma_table_name
297+
GROUP BY "extension" AS "aggr__2__key_0", "message" AS "aggr__2__3__key_0"))
298+
WHERE ("aggr__2__order_1_rank"<=5 AND "aggr__2__3__order_1_rank"<=11)
299+
ORDER BY "aggr__2__order_1_rank" ASC, "aggr__2__3__order_1_rank" ASC`,
300+
},
86301
}

0 commit comments

Comments
 (0)