Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 36 additions & 36 deletions quesma/model/bucket_aggregations/dateRange.go
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only very very minor style improvements in this file

Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,23 @@ const UnboundedInterval = "*"
// 1) in Clickhouse's proper format, e.g. toStartOfDay(subDate(now(), INTERVAL 3 week))
// 2) * (UnboundedInterval), which means no bound
type DateTimeInterval struct {
Begin string
End string
begin string
end string
}

func NewDateTimeInterval(begin, end string) DateTimeInterval {
return DateTimeInterval{
Begin: begin,
End: end,
begin: begin,
end: end,
}
}

// BeginTimestampToSQL returns SQL select for the begin timestamp, and a boolean indicating if the select is needed
// We query Clickhouse for this timestamp, as it's defined in Clickhouse's format, e.g. now()-1d.
// It's only 1 more field to our SELECT query, so it shouldn't be a performance issue.
func (interval DateTimeInterval) BeginTimestampToSQL() (sqlSelect model.Expr, selectNeeded bool) {
if interval.Begin != UnboundedInterval {
return model.NewFunction("toInt64", model.NewFunction("toUnixTimestamp", model.NewLiteral(interval.Begin))), true
if interval.begin != UnboundedInterval {
return model.NewFunction("toInt64", model.NewFunction("toUnixTimestamp", model.NewLiteral(interval.begin))), true
}
return nil, false
}
Expand All @@ -41,21 +41,21 @@ func (interval DateTimeInterval) BeginTimestampToSQL() (sqlSelect model.Expr, se
// We query Clickhouse for this timestamp, as it's defined in Clickhouse's format, e.g. now()-1d.
// It's only 1 more field to our SELECT query, so it isn't a performance issue.
func (interval DateTimeInterval) EndTimestampToSQL() (sqlSelect model.Expr, selectNeeded bool) {
if interval.End != UnboundedInterval {
return model.NewFunction("toInt64", model.NewFunction("toUnixTimestamp", model.NewLiteral(interval.End))), true
if interval.end != UnboundedInterval {
return model.NewFunction("toInt64", model.NewFunction("toUnixTimestamp", model.NewLiteral(interval.end))), true
}
return nil, false
}

func (interval DateTimeInterval) ToWhereClause(fieldName string) model.Expr {
func (interval DateTimeInterval) ToWhereClause(field model.Expr) model.Expr {
begin, isBegin := interval.BeginTimestampToSQL()
end, isEnd := interval.EndTimestampToSQL()

if isBegin {
begin = model.NewInfixExpr(model.NewColumnRef(fieldName), ">=", begin)
begin = model.NewInfixExpr(field, ">=", begin)
}
if isEnd {
end = model.NewInfixExpr(model.NewColumnRef(fieldName), "<", end)
end = model.NewInfixExpr(field, "<", end)
}

if isBegin && isEnd {
Expand All @@ -65,20 +65,20 @@ func (interval DateTimeInterval) ToWhereClause(fieldName string) model.Expr {
} else if isEnd {
return end
} else {
return model.NewLiteral("TRUE")
return model.TrueExpr
}
}

type DateRange struct {
ctx context.Context
FieldName string
Format string
Intervals []DateTimeInterval
SelectColumnsNr int // how many columns we add to the query because of date_range aggregation, e.g. SELECT x,y,z -> 3
field model.Expr
format string
intervals []DateTimeInterval
selectColumnsNr int // how many columns we add to the query because of date_range aggregation, e.g. SELECT x,y,z -> 3
}

func NewDateRange(ctx context.Context, fieldName string, format string, intervals []DateTimeInterval, selectColumnsNr int) DateRange {
return DateRange{ctx: ctx, FieldName: fieldName, Format: format, Intervals: intervals, SelectColumnsNr: selectColumnsNr}
func NewDateRange(ctx context.Context, field model.Expr, format string, intervals []DateTimeInterval, selectColumnsNr int) DateRange {
return DateRange{ctx: ctx, field: field, format: format, intervals: intervals, selectColumnsNr: selectColumnsNr}
}

func (query DateRange) AggregationType() model.AggregationType {
Expand All @@ -92,15 +92,15 @@ func (query DateRange) TranslateSqlResponseToJson(rows []model.QueryResultRow) m
}

response := make([]model.JsonMap, 0)
startIteration := len(rows[0].Cols) - 1 - query.SelectColumnsNr
startIteration := len(rows[0].Cols) - 1 - query.selectColumnsNr
if startIteration < 0 || startIteration >= len(rows[0].Cols) {
logger.ErrorWithCtx(query.ctx).Msgf(
"unexpected column nr in aggregation response, startIteration: %d, len(rows[0].Cols): %d",
startIteration, len(rows[0].Cols),
)
return nil
}
for intervalIdx, columnIdx := 0, startIteration; intervalIdx < len(query.Intervals); intervalIdx++ {
for intervalIdx, columnIdx := 0, startIteration; intervalIdx < len(query.intervals); intervalIdx++ {
responseForInterval, nextColumnIdx := query.responseForInterval(&rows[0], intervalIdx, columnIdx)
response = append(response, responseForInterval)
columnIdx = nextColumnIdx
Expand All @@ -111,7 +111,7 @@ func (query DateRange) TranslateSqlResponseToJson(rows []model.QueryResultRow) m
}

func (query DateRange) String() string {
return "date_range, intervals: " + fmt.Sprintf("%v", query.Intervals)
return "date_range, intervals: " + fmt.Sprintf("%v", query.intervals)
}

func (query DateRange) responseForInterval(row *model.QueryResultRow, intervalIdx, columnIdx int) (
Expand All @@ -123,7 +123,7 @@ func (query DateRange) responseForInterval(row *model.QueryResultRow, intervalId

var from, to int64
var fromString, toString string
if query.Intervals[intervalIdx].Begin == UnboundedInterval {
if query.intervals[intervalIdx].begin == UnboundedInterval {
fromString = UnboundedInterval
} else {
if columnIdx >= len(row.Cols) {
Expand All @@ -137,7 +137,7 @@ func (query DateRange) responseForInterval(row *model.QueryResultRow, intervalId
columnIdx++
}

if query.Intervals[intervalIdx].End == UnboundedInterval {
if query.intervals[intervalIdx].end == UnboundedInterval {
toString = UnboundedInterval
} else {
if columnIdx >= len(row.Cols) {
Expand Down Expand Up @@ -173,16 +173,16 @@ func (query DateRange) DoesNotHaveGroupBy() bool {
}

func (query DateRange) CombinatorGroups() (result []CombinatorGroup) {
for intervalIdx, interval := range query.Intervals {
for intervalIdx, interval := range query.intervals {
prefix := fmt.Sprintf("range_%d__", intervalIdx)
if len(query.Intervals) == 1 {
if len(query.intervals) == 1 {
prefix = ""
}
result = append(result, CombinatorGroup{
idx: intervalIdx,
Prefix: prefix,
Key: prefix, // TODO: we need translate date to real time
WhereClause: interval.ToWhereClause(query.FieldName),
WhereClause: interval.ToWhereClause(query.field),
})
}
return
Expand All @@ -199,23 +199,23 @@ func (query DateRange) CombinatorTranslateSqlResponseToJson(subGroup CombinatorG
}

// TODO: we need translate relative to real time
interval := query.Intervals[subGroup.idx]
if interval.Begin != UnboundedInterval {
response["from"] = interval.Begin
response["from_as_string"] = interval.Begin
interval := query.intervals[subGroup.idx]
if interval.begin != UnboundedInterval {
response["from"] = interval.begin
response["from_as_string"] = interval.begin
}
if interval.End != UnboundedInterval {
response["to"] = interval.End
response["to_as_string"] = interval.End
if interval.end != UnboundedInterval {
response["to"] = interval.end
response["to_as_string"] = interval.end
}

return response
}

func (query DateRange) CombinatorSplit() []model.QueryType {
result := make([]model.QueryType, 0, len(query.Intervals))
for _, interval := range query.Intervals {
result = append(result, NewDateRange(query.ctx, query.FieldName, query.Format, []DateTimeInterval{interval}, query.SelectColumnsNr))
result := make([]model.QueryType, 0, len(query.intervals))
for _, interval := range query.intervals {
result = append(result, NewDateRange(query.ctx, query.field, query.format, []DateTimeInterval{interval}, query.selectColumnsNr))
}
return result
}
6 changes: 0 additions & 6 deletions quesma/model/bucket_aggregations/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"strings"
)

const keyedDefaultValue = false

var IntervalInfiniteRange = math.NaN()

type Interval struct {
Expand Down Expand Up @@ -91,10 +89,6 @@ func NewRange(ctx context.Context, expr model.Expr, intervals []Interval, keyed
return Range{ctx, expr, intervals, keyed}
}

func NewRangeWithDefaultKeyed(ctx context.Context, expr model.Expr, intervals []Interval) Range {
return Range{ctx, expr, intervals, keyedDefaultValue}
}

func (query Range) AggregationType() model.AggregationType {
return model.BucketAggregation
}
Expand Down
66 changes: 39 additions & 27 deletions quesma/queryparser/aggregation_date_range_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,32 @@
package queryparser

import (
"fmt"
"quesma/logger"
"quesma/model/bucket_aggregations"
"unicode"
)

func (cw *ClickhouseQueryTranslator) parseDateRangeAggregation(dateRange QueryMap) (bucket_aggregations.DateRange, error) {
var err error
var fieldName, format string

if field, exists := dateRange["field"]; exists {
if fieldNameRaw, ok := field.(string); ok {
fieldName = cw.ResolveField(cw.Ctx, fieldNameRaw)
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("field specified for date range aggregation is not a string. Using empty. Querymap: %v", dateRange)
}
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("no field specified for date range aggregation. Using empty. Querymap: %v", dateRange)
}
var ranges []any
var ok bool
if formatRaw, exists := dateRange["format"]; exists {
func (cw *ClickhouseQueryTranslator) parseDateRangeAggregation(aggregation *pancakeAggregationTreeNode, params QueryMap) (err error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder about type signature? Why pass aggregation as argument, shouldn't we just return optional queryType?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, after reading later it become obvious why you did that way.

var format string
if formatRaw, exists := params["format"]; exists {
if formatParsed, ok := formatRaw.(string); ok {
format = formatParsed
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("format specified for date range aggregation is not a string. Using empty. Querymap: %v", dateRange)
logger.WarnWithCtx(cw.Ctx).Msgf("format specified for date range aggregation is not a string. Using empty. Params: %v", params)
}
}
if rangesRaw, exists := dateRange["ranges"]; exists {

var ranges []any
var ok bool
if rangesRaw, exists := params["ranges"]; exists {
if ranges, ok = rangesRaw.([]any); !ok {
logger.WarnWithCtx(cw.Ctx).Msgf("ranges specified for date range aggregation is not an array. Using empty. Querymap: %v", dateRange)
return fmt.Errorf("ranges specified for date range aggregation is not an array, params: %v", params)
}
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("no ranges specified for date range aggregation. Using empty. Querymap: %v", dateRange)
return fmt.Errorf("no ranges specified for date range aggregation, params: %v", params)
}

intervals := make([]bucket_aggregations.DateTimeInterval, 0, len(ranges))
selectColumnsNr := len(ranges) // we query Clickhouse for every unbounded part of interval (begin and end)
for _, Range := range ranges {
Expand All @@ -47,12 +39,12 @@ func (cw *ClickhouseQueryTranslator) parseDateRangeAggregation(dateRange QueryMa
if fromRaw, ok := from.(string); ok {
intervalBegin, err = cw.parseDateTimeInClickhouseMathLanguage(fromRaw)
if err != nil {
return bucket_aggregations.DateRange{}, err
return err
}
selectColumnsNr++
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("from specified for date range aggregation is not a string. Querymap: %v "+
"Using default (unbounded).", dateRange)
logger.WarnWithCtx(cw.Ctx).Msgf("from specified for date range aggregation is not a string, params: %v "+
"using default (unbounded).", params)
intervalBegin = bucket_aggregations.UnboundedInterval
}
} else {
Expand All @@ -63,20 +55,40 @@ func (cw *ClickhouseQueryTranslator) parseDateRangeAggregation(dateRange QueryMa
if toRaw, ok := to.(string); ok {
intervalEnd, err = cw.parseDateTimeInClickhouseMathLanguage(toRaw)
if err != nil {
return bucket_aggregations.DateRange{}, err
return err
}
selectColumnsNr++
} else {
logger.WarnWithCtx(cw.Ctx).Msgf("To specified for date range aggregation is not a string. Querymap: %v "+
"Using default (unbounded).", dateRange)
logger.WarnWithCtx(cw.Ctx).Msgf("To specified for date range aggregation is not a string, params: %v "+
"using default (unbounded).", params)
intervalEnd = bucket_aggregations.UnboundedInterval
}
} else {
intervalEnd = bucket_aggregations.UnboundedInterval
}
intervals = append(intervals, bucket_aggregations.NewDateTimeInterval(intervalBegin, intervalEnd))
}
return bucket_aggregations.NewDateRange(cw.Ctx, fieldName, format, intervals, selectColumnsNr), nil

// TODO: keep for reference as relative time, but no longer needed
/*
for _, interval := range dateRangeParsed.Intervals {

aggregation.selectedColumns = append(aggregation.selectedColumns, interval.ToSQLSelectQuery(dateRangeParsed.FieldName))

if sqlSelect, selectNeeded := interval.BeginTimestampToSQL(); selectNeeded {
aggregation.selectedColumns = append(aggregation.selectedColumns, sqlSelect)
}
if sqlSelect, selectNeeded := interval.EndTimestampToSQL(); selectNeeded {
aggregation.selectedColumns = append(aggregation.selectedColumns, sqlSelect)
}
}*/

field := cw.parseFieldField(params, "date_range")
if field == nil {
return fmt.Errorf("no field specified for date range aggregation, params: %v", params)
}
aggregation.queryType = bucket_aggregations.NewDateRange(cw.Ctx, field, format, intervals, selectColumnsNr)
return nil
}

// parseDateTimeInClickhouseMathLanguage parses dateTime from Clickhouse's format
Expand Down
17 changes: 16 additions & 1 deletion quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package queryparser

import (
"fmt"
"quesma/clickhouse"
"quesma/logger"
"quesma/model"
Expand Down Expand Up @@ -175,7 +176,11 @@ func (cw *ClickhouseQueryTranslator) parseTopHits(queryMap QueryMap) (parsedTopH
const defaultSize = 1
size := cw.parseSize(params, defaultSize)

orderBy := cw.parseOrder(params, queryMap, []model.Expr{})
orderBy, err := cw.parseOrder(params, []model.Expr{})
if err != nil {
logger.WarnWithCtx(cw.Ctx).Msgf("error parsing order in top_hits: %v", err)
return
}
if len(orderBy) == 1 && orderBy[0].IsCountDesc() { // we don't need count DESC
orderBy = []model.OrderByExpr{}
}
Expand Down Expand Up @@ -287,6 +292,16 @@ func (cw *ClickhouseQueryTranslator) parseStringField(queryMap QueryMap, fieldNa
return defaultValue
}

func (cw *ClickhouseQueryTranslator) parseArrayField(queryMap QueryMap, fieldName string) ([]any, error) {
if valueRaw, exists := queryMap[fieldName]; exists {
if asArray, ok := valueRaw.([]any); ok {
return asArray, nil
}
return nil, fmt.Errorf("%s is not an array, but %T, value: %v", fieldName, valueRaw, valueRaw)
}
return nil, fmt.Errorf("array field '%s' not found in aggregation queryMap: %v", fieldName, queryMap)
}

// parseFieldFieldMaybeScript is basically almost a copy of parseFieldField above, but it also handles a basic script, if "field" is missing.
func (cw *ClickhouseQueryTranslator) parseFieldFieldMaybeScript(shouldBeMap any, aggregationType string) (field model.Expr, isFromScript bool) {
Map, ok := shouldBeMap.(QueryMap)
Expand Down
Loading
Loading