Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions quesma/model/bucket_aggregations/auto_date_histogram.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package bucket_aggregations

import (
"context"
"fmt"
"quesma/logger"
"quesma/model"
"time"
)

type AutoDateHistogram struct {
ctx context.Context
field model.Expr // name of the field, e.g. timestamp
bucketsNr int
key int64
}

func NewAutoDateHistogram(ctx context.Context, field model.Expr, bucketsNr int) *AutoDateHistogram {
return &AutoDateHistogram{ctx: ctx, field: field, bucketsNr: bucketsNr}
}

func (query *AutoDateHistogram) AggregationType() model.AggregationType {
return model.BucketAggregation
}

func (query *AutoDateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
fmt.Println(rows)
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msgf("no rows returned for %s", query.String())
return make(model.JsonMap, 0)
}
return model.JsonMap{
"buckets": []model.JsonMap{{
"key": query.key,
"key_as_string": time.UnixMilli(query.key).Format("2006-01-02T15:04:05.000-07:00"),
"doc_count": rows[0].LastColValue(),
}},
"interval": "100y",
}
}

func (query *AutoDateHistogram) String() string {
return fmt.Sprintf("auto_date_histogram(field: %v, bucketsNr: %d)", model.AsString(query.field), query.bucketsNr)
}

func (query *AutoDateHistogram) GetField() model.Expr {
return query.field
}

func (query *AutoDateHistogram) SetKey(key int64) {
query.key = key
}
65 changes: 49 additions & 16 deletions quesma/model/bucket_aggregations/date_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ type DateHistogram struct {
interval string
timezone string
wantedTimezone *time.Location // key is in `timezone` time, and we need it to be UTC
ebmin int64
ebmax int64
minDocCount int
intervalType DateHistogramIntervalType
fieldDateTimeType clickhouse.DateTimeType
}

func NewDateHistogram(ctx context.Context, field model.Expr, interval, timezone string,
minDocCount int, intervalType DateHistogramIntervalType, fieldDateTimeType clickhouse.DateTimeType) *DateHistogram {
minDocCount int, ebmin, ebmax int64, intervalType DateHistogramIntervalType, fieldDateTimeType clickhouse.DateTimeType) *DateHistogram {

wantedTimezone, err := time.LoadLocation(timezone)
if err != nil {
Expand All @@ -51,7 +53,7 @@ func NewDateHistogram(ctx context.Context, field model.Expr, interval, timezone
}

return &DateHistogram{ctx: ctx, field: field, interval: interval, timezone: timezone, wantedTimezone: wantedTimezone,
minDocCount: minDocCount, intervalType: intervalType, fieldDateTimeType: fieldDateTimeType}
minDocCount: minDocCount, ebmin: ebmin, ebmax: ebmax, intervalType: intervalType, fieldDateTimeType: fieldDateTimeType}
}

func (typ DateHistogramIntervalType) String(ctx context.Context) string {
Expand Down Expand Up @@ -83,7 +85,8 @@ func (query *DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultR
// Implement default when query.minDocCount == DefaultMinDocCount, we need to return
// all buckets between the first bucket that matches documents and the last one.

if query.minDocCount == 0 {
fmt.Println("query.minDocCount", query.minDocCount, "query.ebmin", query.ebmin)
if query.minDocCount == 0 || query.ebmin != 0 {
rows = query.NewRowsTransformer().Transform(query.ctx, rows)
}

Expand Down Expand Up @@ -117,6 +120,7 @@ func (query *DateHistogram) String() string {
// only intervals <= days are needed
func (query *DateHistogram) intervalAsDuration() time.Duration {
var intervalInHoursOrLess string
//fmt.Println("query.interval", query.interval)
if strings.HasSuffix(query.interval, "d") {
// time.ParseDuration doesn't accept > hours, we need to convert days to hours
daysNr, err := strconv.Atoi(strings.TrimSuffix(query.interval, "d"))
Expand All @@ -128,7 +132,9 @@ func (query *DateHistogram) intervalAsDuration() time.Duration {
} else {
intervalInHoursOrLess = query.interval
}
//fmt.Println("intervalInHoursOrLess", intervalInHoursOrLess)
duration, _ := time.ParseDuration(intervalInHoursOrLess)
//fmt.Println("duration", duration)
return duration
}

Expand Down Expand Up @@ -226,7 +232,7 @@ func (query *DateHistogram) calculateResponseKey(originalKey int64) int64 {
}

func (query *DateHistogram) calculateKeyAsString(key int64) string {
return time.UnixMilli(key).UTC().Format("2006-01-02T15:04:05.000")
return time.UnixMilli(key).In(query.wantedTimezone).Format("2006/01/02 15:04:05")
}

func (query *DateHistogram) OriginalKeyToKeyAsString(originalKey any) string {
Expand All @@ -239,31 +245,31 @@ func (query *DateHistogram) SetMinDocCountToZero() {
}

func (query *DateHistogram) NewRowsTransformer() model.QueryRowsTransformer {
differenceBetweenTwoNextKeys := int64(1)
if query.intervalType == DateHistogramCalendarInterval {
duration, err := kibana.ParseInterval(query.interval)
if err == nil {
differenceBetweenTwoNextKeys = duration.Milliseconds()
} else {
logger.ErrorWithCtx(query.ctx).Err(err)
differenceBetweenTwoNextKeys = 0
}
duration, err := kibana.ParseInterval(query.interval)
var differenceBetweenTwoNextKeys int64
if err == nil {
differenceBetweenTwoNextKeys = duration.Milliseconds()
} else {
logger.ErrorWithCtx(query.ctx).Err(err)
}
return &DateHistogramRowsTransformer{MinDocCount: query.minDocCount, differenceBetweenTwoNextKeys: differenceBetweenTwoNextKeys, EmptyValue: 0}
fmt.Println("differenceBetweenTwoNextKeys", differenceBetweenTwoNextKeys)
return &DateHistogramRowsTransformer{MinDocCount: query.minDocCount, differenceBetweenTwoNextKeys: differenceBetweenTwoNextKeys, EmptyValue: 0, ebmin: query.ebmin, ebmax: query.ebmax}
}

// we're sure len(row.Cols) >= 2

type DateHistogramRowsTransformer struct {
MinDocCount int
differenceBetweenTwoNextKeys int64 // if 0, we don't add keys
ebmin int64
ebmax int64
EmptyValue any
}

// if MinDocCount == 0, and we have buckets e.g. [key, value1], [key+10, value2], we need to insert [key+1, 0], [key+2, 0]...
// CAUTION: a different kind of postprocessing is needed for MinDocCount > 1, but I haven't seen any query with that yet, so not implementing it now.
func (qt *DateHistogramRowsTransformer) Transform(ctx context.Context, rowsFromDB []model.QueryResultRow) []model.QueryResultRow {
if qt.MinDocCount != 0 || qt.differenceBetweenTwoNextKeys == 0 || len(rowsFromDB) < 2 {
if qt.MinDocCount != 0 || qt.differenceBetweenTwoNextKeys == 0 {
// we only add empty rows, when
// a) MinDocCount == 0
// b) we have valid differenceBetweenTwoNextKeys (>0)
Expand All @@ -277,7 +283,9 @@ func (qt *DateHistogramRowsTransformer) Transform(ctx context.Context, rowsFromD

emptyRowsAdded := 0
postprocessedRows := make([]model.QueryResultRow, 0, len(rowsFromDB))
postprocessedRows = append(postprocessedRows, rowsFromDB[0])
if len(rowsFromDB) > 0 {
postprocessedRows = append(postprocessedRows, rowsFromDB[0])
}
for i := 1; i < len(rowsFromDB); i++ {
if len(rowsFromDB[i-1].Cols) < 2 || len(rowsFromDB[i].Cols) < 2 {
logger.ErrorWithCtx(ctx).Msgf(
Expand All @@ -297,6 +305,31 @@ func (qt *DateHistogramRowsTransformer) Transform(ctx context.Context, rowsFromD
}
postprocessedRows = append(postprocessedRows, rowsFromDB[i])
}

//fmt.Println("postprocessedRows 1", postprocessedRows, qt.getKey(postprocessedRows[0])*qt.differenceBetweenTwoNextKeys-qt.differenceBetweenTwoNextKeys, qt.ebmin, qt.differenceBetweenTwoNextKeys)
fmt.Println("pre: ", len(postprocessedRows), emptyRowsAdded)
if qt.ebmin == 0 {
return postprocessedRows
}

if len(postprocessedRows) == 0 {
postprocessedRows = append(postprocessedRows, model.QueryResultRow{
Cols: []model.QueryResultCol{
{Value: (qt.ebmin+1000*60*60*2)/qt.differenceBetweenTwoNextKeys - 1},
{Value: qt.EmptyValue},
},
})
}
// gk*d-d = d(gk - 1)
// gk*d-2d = d (gk-2) = d(gk-1) - d
for maybePreKey := (qt.ebmin + 1000*60*60*2) / qt.differenceBetweenTwoNextKeys; maybePreKey*qt.differenceBetweenTwoNextKeys < qt.ebmax+1000*60*60*2; maybePreKey++ {
preRow := postprocessedRows[0].Copy()
preRow.Cols[len(preRow.Cols)-2].Value = maybePreKey
preRow.Cols[len(preRow.Cols)-1].Value = qt.EmptyValue
postprocessedRows = append(postprocessedRows, preRow)
emptyRowsAdded++
}
fmt.Println("post:", len(postprocessedRows), emptyRowsAdded)
return postprocessedRows
}

Expand Down
4 changes: 3 additions & 1 deletion quesma/model/bucket_aggregations/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package bucket_aggregations

import (
"context"
"fmt"
"quesma/logger"
"quesma/model"
)
Expand All @@ -18,14 +19,15 @@ func NewFilterAgg(ctx context.Context, whereClause model.Expr) FilterAgg {
}

func (query FilterAgg) AggregationType() model.AggregationType {
return model.BucketAggregation
return model.MetricsAggregation
}

func (query FilterAgg) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
if len(rows) == 0 {
logger.WarnWithCtx(query.ctx).Msg("no rows returned for filter aggregation")
return make(model.JsonMap, 0)
}
fmt.Println("filter_agg", query.String(), rows)
return model.JsonMap{"doc_count": rows[0].Cols[0].Value}
}

Expand Down
5 changes: 4 additions & 1 deletion quesma/model/bucket_aggregations/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ type Filter struct {
}

func NewFilter(name string, sql model.SimpleQuery) Filter {
if sql.WhereClause == nil {
sql.WhereClause = model.NewLiteral("True")
}
return Filter{Name: name, Sql: sql}
}

Expand All @@ -36,7 +39,7 @@ func (query Filters) AggregationType() model.AggregationType {
}

func (query Filters) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
var value any = nil
var value any = 0.0
if len(rows) > 0 {
if len(rows[0].Cols) > 0 {
value = rows[0].Cols[len(rows[0].Cols)-1].Value
Expand Down
7 changes: 6 additions & 1 deletion quesma/model/metrics_aggregations/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package metrics_aggregations

import (
"context"
"fmt"
"quesma/logger"
"quesma/model"
)
Expand All @@ -28,7 +29,11 @@ func (query Count) TranslateSqlResponseToJson(rows []model.QueryResultRow) model
if len(rows) > 1 {
logger.WarnWithCtx(query.ctx).Msg("More than one row returned for count aggregation")
}
return model.JsonMap{"doc_count": rows[0].Cols[0]}
if len(rows[0].Cols) == 0 {
return model.JsonMap{"doc_count": -1}
}
fmt.Println("COUNT", rows)
return model.JsonMap{"doc_count": rows[0].Cols[0].Value}
}

func (query Count) String() string {
Expand Down
8 changes: 4 additions & 4 deletions quesma/model/pipeline_aggregations/pipeline_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ func (p *PipelineAggregation) IsCount() bool {
return p.isCount
}

func (p *PipelineAggregation) GetParentBucketAggregation() model.QueryType {
return p.parentBucketAggregation
}

func (p *PipelineAggregation) SetParentBucketAggregation(parentBucketAggregation model.QueryType) {
p.parentBucketAggregation = parentBucketAggregation
}

func (p *PipelineAggregation) GetParentBucketAggregation() model.QueryType {
return p.parentBucketAggregation
}

func (p *PipelineAggregation) getKey(row model.QueryResultRow) any {
if len(row.Cols) < 2 {
logger.WarnWithCtx(p.ctx).Msgf("row has less than 2 columns: %v", row)
Expand Down
1 change: 1 addition & 0 deletions quesma/model/pipeline_query_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ type PipelineQueryType interface {

GetParentBucketAggregation() QueryType
SetParentBucketAggregation(parentBucketAggregation QueryType)
GetParentBucketAggregation() QueryType
}
22 changes: 22 additions & 0 deletions quesma/model/where_visitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package model

// TODO: it's not 100% full/proper implementation, but works in the client case
func FindTimestampLowerBound(expr Expr) (InfixExpr, bool) {
candidates := make([]InfixExpr, 0)
visitor := NewBaseVisitor()
visitor.OverrideVisitInfix = func(visitor *BaseExprVisitor, e InfixExpr) interface{} {
if e.Op == ">=" {
candidates = append(candidates, e)
} else if e.Op == "AND" {
e.Left.Accept(visitor)
e.Right.Accept(visitor)
}
return nil
}

expr.Accept(visitor)
if len(candidates) == 1 {
return candidates[0], true
}
return InfixExpr{}, false
}
19 changes: 19 additions & 0 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
package queryparser

import (
"fmt"
"github.com/k0kubun/pp"
"quesma/clickhouse"
"quesma/logger"
"quesma/model"
Expand Down Expand Up @@ -155,6 +157,23 @@ func (cw *ClickhouseQueryTranslator) tryMetricsAggregation(queryMap QueryMap) (m
}, true
}

if filterRaw, ok := queryMap["filter"]; ok {
if filter, ok := filterRaw.(QueryMap); ok {
pp.Println(cw.parseQueryMap(filter))
whereClause := cw.parseQueryMap(filter).WhereClause
if whereClause == nil {
whereClause = model.NewLiteral("True")
}
fmt.Println("WHERE", whereClause)
return metricsAggregation{
AggrType: "filter",
Fields: []model.Expr{model.NewFunction("countIf", whereClause)},
}, true
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("filter is not a map, but %T, value: %v. Skipping", filterRaw, filterRaw)
}
}

return metricsAggregation{}, false
}

Expand Down
2 changes: 2 additions & 0 deletions quesma/queryparser/lucene/lucene_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package lucene

import (
"context"
"fmt"
"math"
"quesma/logger"
"quesma/model"
Expand Down Expand Up @@ -106,6 +107,7 @@ func (p *luceneParser) tokenizeQuery(query string) {
p.tokens = append(p.tokens, nextTokens...)
query = strings.TrimSpace(remainingQuery)
}
fmt.Println(p.tokens)
}

func (p *luceneParser) nextToken(query string) (tokens []token, remainingQuery string) {
Expand Down
2 changes: 2 additions & 0 deletions quesma/queryparser/lucene/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func (p *luceneParser) buildValue(stack []value, parenthesisLevel int) value {
}
}

fmt.Printf("buildValue, curToken: %T %v\n", tok, tok)

switch currentToken := tok.(type) {
case leftParenthesisToken:
stack = append(stack, p.buildValue([]value{}, 1))
Expand Down
Loading