diff --git a/quesma/model/bucket_aggregations/dateRange.go b/quesma/model/bucket_aggregations/dateRange.go index 87afb2062..2b7e0e0f9 100644 --- a/quesma/model/bucket_aggregations/dateRange.go +++ b/quesma/model/bucket_aggregations/dateRange.go @@ -16,14 +16,14 @@ const UnboundedInterval = "*" // 1) in Clickhouse's proper format, e.g. toStartOfDay(subDate(now(), INTERVAL 3 week)) // 2) * (UnboundedInterval), which means no bound type DateTimeInterval struct { - Begin string - End string + begin string + end string } func NewDateTimeInterval(begin, end string) DateTimeInterval { return DateTimeInterval{ - Begin: begin, - End: end, + begin: begin, + end: end, } } @@ -31,8 +31,8 @@ func NewDateTimeInterval(begin, end string) DateTimeInterval { // We query Clickhouse for this timestamp, as it's defined in Clickhouse's format, e.g. now()-1d. // It's only 1 more field to our SELECT query, so it shouldn't be a performance issue. func (interval DateTimeInterval) BeginTimestampToSQL() (sqlSelect model.Expr, selectNeeded bool) { - if interval.Begin != UnboundedInterval { - return model.NewFunction("toInt64", model.NewFunction("toUnixTimestamp", model.NewLiteral(interval.Begin))), true + if interval.begin != UnboundedInterval { + return model.NewFunction("toInt64", model.NewFunction("toUnixTimestamp", model.NewLiteral(interval.begin))), true } return nil, false } @@ -41,21 +41,21 @@ func (interval DateTimeInterval) BeginTimestampToSQL() (sqlSelect model.Expr, se // We query Clickhouse for this timestamp, as it's defined in Clickhouse's format, e.g. now()-1d. // It's only 1 more field to our SELECT query, so it isn't a performance issue. func (interval DateTimeInterval) EndTimestampToSQL() (sqlSelect model.Expr, selectNeeded bool) { - if interval.End != UnboundedInterval { - return model.NewFunction("toInt64", model.NewFunction("toUnixTimestamp", model.NewLiteral(interval.End))), true + if interval.end != UnboundedInterval { + return model.NewFunction("toInt64", model.NewFunction("toUnixTimestamp", model.NewLiteral(interval.end))), true } return nil, false } -func (interval DateTimeInterval) ToWhereClause(fieldName string) model.Expr { +func (interval DateTimeInterval) ToWhereClause(field model.Expr) model.Expr { begin, isBegin := interval.BeginTimestampToSQL() end, isEnd := interval.EndTimestampToSQL() if isBegin { - begin = model.NewInfixExpr(model.NewColumnRef(fieldName), ">=", begin) + begin = model.NewInfixExpr(field, ">=", begin) } if isEnd { - end = model.NewInfixExpr(model.NewColumnRef(fieldName), "<", end) + end = model.NewInfixExpr(field, "<", end) } if isBegin && isEnd { @@ -65,20 +65,20 @@ func (interval DateTimeInterval) ToWhereClause(fieldName string) model.Expr { } else if isEnd { return end } else { - return model.NewLiteral("TRUE") + return model.TrueExpr } } type DateRange struct { ctx context.Context - FieldName string - Format string - Intervals []DateTimeInterval - SelectColumnsNr int // how many columns we add to the query because of date_range aggregation, e.g. SELECT x,y,z -> 3 + field model.Expr + format string + intervals []DateTimeInterval + selectColumnsNr int // how many columns we add to the query because of date_range aggregation, e.g. SELECT x,y,z -> 3 } -func NewDateRange(ctx context.Context, fieldName string, format string, intervals []DateTimeInterval, selectColumnsNr int) DateRange { - return DateRange{ctx: ctx, FieldName: fieldName, Format: format, Intervals: intervals, SelectColumnsNr: selectColumnsNr} +func NewDateRange(ctx context.Context, field model.Expr, format string, intervals []DateTimeInterval, selectColumnsNr int) DateRange { + return DateRange{ctx: ctx, field: field, format: format, intervals: intervals, selectColumnsNr: selectColumnsNr} } func (query DateRange) AggregationType() model.AggregationType { @@ -92,7 +92,7 @@ func (query DateRange) TranslateSqlResponseToJson(rows []model.QueryResultRow) m } response := make([]model.JsonMap, 0) - startIteration := len(rows[0].Cols) - 1 - query.SelectColumnsNr + startIteration := len(rows[0].Cols) - 1 - query.selectColumnsNr if startIteration < 0 || startIteration >= len(rows[0].Cols) { logger.ErrorWithCtx(query.ctx).Msgf( "unexpected column nr in aggregation response, startIteration: %d, len(rows[0].Cols): %d", @@ -100,7 +100,7 @@ func (query DateRange) TranslateSqlResponseToJson(rows []model.QueryResultRow) m ) return nil } - for intervalIdx, columnIdx := 0, startIteration; intervalIdx < len(query.Intervals); intervalIdx++ { + for intervalIdx, columnIdx := 0, startIteration; intervalIdx < len(query.intervals); intervalIdx++ { responseForInterval, nextColumnIdx := query.responseForInterval(&rows[0], intervalIdx, columnIdx) response = append(response, responseForInterval) columnIdx = nextColumnIdx @@ -111,7 +111,7 @@ func (query DateRange) TranslateSqlResponseToJson(rows []model.QueryResultRow) m } func (query DateRange) String() string { - return "date_range, intervals: " + fmt.Sprintf("%v", query.Intervals) + return "date_range, intervals: " + fmt.Sprintf("%v", query.intervals) } func (query DateRange) responseForInterval(row *model.QueryResultRow, intervalIdx, columnIdx int) ( @@ -123,7 +123,7 @@ func (query DateRange) responseForInterval(row *model.QueryResultRow, intervalId var from, to int64 var fromString, toString string - if query.Intervals[intervalIdx].Begin == UnboundedInterval { + if query.intervals[intervalIdx].begin == UnboundedInterval { fromString = UnboundedInterval } else { if columnIdx >= len(row.Cols) { @@ -137,7 +137,7 @@ func (query DateRange) responseForInterval(row *model.QueryResultRow, intervalId columnIdx++ } - if query.Intervals[intervalIdx].End == UnboundedInterval { + if query.intervals[intervalIdx].end == UnboundedInterval { toString = UnboundedInterval } else { if columnIdx >= len(row.Cols) { @@ -173,16 +173,16 @@ func (query DateRange) DoesNotHaveGroupBy() bool { } func (query DateRange) CombinatorGroups() (result []CombinatorGroup) { - for intervalIdx, interval := range query.Intervals { + for intervalIdx, interval := range query.intervals { prefix := fmt.Sprintf("range_%d__", intervalIdx) - if len(query.Intervals) == 1 { + if len(query.intervals) == 1 { prefix = "" } result = append(result, CombinatorGroup{ idx: intervalIdx, Prefix: prefix, Key: prefix, // TODO: we need translate date to real time - WhereClause: interval.ToWhereClause(query.FieldName), + WhereClause: interval.ToWhereClause(query.field), }) } return @@ -199,23 +199,23 @@ func (query DateRange) CombinatorTranslateSqlResponseToJson(subGroup CombinatorG } // TODO: we need translate relative to real time - interval := query.Intervals[subGroup.idx] - if interval.Begin != UnboundedInterval { - response["from"] = interval.Begin - response["from_as_string"] = interval.Begin + interval := query.intervals[subGroup.idx] + if interval.begin != UnboundedInterval { + response["from"] = interval.begin + response["from_as_string"] = interval.begin } - if interval.End != UnboundedInterval { - response["to"] = interval.End - response["to_as_string"] = interval.End + if interval.end != UnboundedInterval { + response["to"] = interval.end + response["to_as_string"] = interval.end } return response } func (query DateRange) CombinatorSplit() []model.QueryType { - result := make([]model.QueryType, 0, len(query.Intervals)) - for _, interval := range query.Intervals { - result = append(result, NewDateRange(query.ctx, query.FieldName, query.Format, []DateTimeInterval{interval}, query.SelectColumnsNr)) + result := make([]model.QueryType, 0, len(query.intervals)) + for _, interval := range query.intervals { + result = append(result, NewDateRange(query.ctx, query.field, query.format, []DateTimeInterval{interval}, query.selectColumnsNr)) } return result } diff --git a/quesma/model/bucket_aggregations/range.go b/quesma/model/bucket_aggregations/range.go index 531185ccd..92c3c3b83 100644 --- a/quesma/model/bucket_aggregations/range.go +++ b/quesma/model/bucket_aggregations/range.go @@ -12,8 +12,6 @@ import ( "strings" ) -const keyedDefaultValue = false - var IntervalInfiniteRange = math.NaN() type Interval struct { @@ -91,10 +89,6 @@ func NewRange(ctx context.Context, expr model.Expr, intervals []Interval, keyed return Range{ctx, expr, intervals, keyed} } -func NewRangeWithDefaultKeyed(ctx context.Context, expr model.Expr, intervals []Interval) Range { - return Range{ctx, expr, intervals, keyedDefaultValue} -} - func (query Range) AggregationType() model.AggregationType { return model.BucketAggregation } diff --git a/quesma/queryparser/aggregation_date_range_parser.go b/quesma/queryparser/aggregation_date_range_parser.go index 82218c68a..b369f6af2 100644 --- a/quesma/queryparser/aggregation_date_range_parser.go +++ b/quesma/queryparser/aggregation_date_range_parser.go @@ -3,80 +3,53 @@ package queryparser import ( - "quesma/logger" + "fmt" "quesma/model/bucket_aggregations" "unicode" ) -func (cw *ClickhouseQueryTranslator) parseDateRangeAggregation(dateRange QueryMap) (bucket_aggregations.DateRange, error) { - var err error - var fieldName, format string - - if field, exists := dateRange["field"]; exists { - if fieldNameRaw, ok := field.(string); ok { - fieldName = cw.ResolveField(cw.Ctx, fieldNameRaw) - } else { - logger.WarnWithCtx(cw.Ctx).Msgf("field specified for date range aggregation is not a string. Using empty. Querymap: %v", dateRange) - } - } else { - logger.WarnWithCtx(cw.Ctx).Msgf("no field specified for date range aggregation. Using empty. Querymap: %v", dateRange) - } - var ranges []any - var ok bool - if formatRaw, exists := dateRange["format"]; exists { - if formatParsed, ok := formatRaw.(string); ok { - format = formatParsed - } else { - logger.WarnWithCtx(cw.Ctx).Msgf("format specified for date range aggregation is not a string. Using empty. Querymap: %v", dateRange) - } +func (cw *ClickhouseQueryTranslator) parseDateRangeAggregation(aggregation *pancakeAggregationTreeNode, params QueryMap) (err error) { + field := cw.parseFieldField(params, "date_range") + if field == nil { + return fmt.Errorf("no field specified for date range aggregation, params: %v", params) } - if rangesRaw, exists := dateRange["ranges"]; exists { - if ranges, ok = rangesRaw.([]any); !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("ranges specified for date range aggregation is not an array. Using empty. Querymap: %v", dateRange) - } - } else { - logger.WarnWithCtx(cw.Ctx).Msgf("no ranges specified for date range aggregation. Using empty. Querymap: %v", dateRange) + format := cw.parseStringField(params, "format", "") + ranges, err := cw.parseArrayField(params, "ranges") + if err != nil { + return err } + intervals := make([]bucket_aggregations.DateTimeInterval, 0, len(ranges)) selectColumnsNr := len(ranges) // we query Clickhouse for every unbounded part of interval (begin and end) - for _, Range := range ranges { - rangeMap := Range.(QueryMap) - var intervalBegin, intervalEnd string - from, exists := rangeMap["from"] - if exists { - if fromRaw, ok := from.(string); ok { - intervalBegin, err = cw.parseDateTimeInClickhouseMathLanguage(fromRaw) - if err != nil { - return bucket_aggregations.DateRange{}, err - } - selectColumnsNr++ - } else { - logger.WarnWithCtx(cw.Ctx).Msgf("from specified for date range aggregation is not a string. Querymap: %v "+ - "Using default (unbounded).", dateRange) - intervalBegin = bucket_aggregations.UnboundedInterval + for _, rangeRaw := range ranges { + rangeMap, ok := rangeRaw.(QueryMap) + if !ok { + return fmt.Errorf("range is not a map, but %T, range: %v", rangeRaw, rangeRaw) + } + + const defaultIntervalBound = bucket_aggregations.UnboundedInterval + intervalBegin := defaultIntervalBound + if from := cw.parseStringField(rangeMap, "from", defaultIntervalBound); from != defaultIntervalBound { + intervalBegin, err = cw.parseDateTimeInClickhouseMathLanguage(from) + if err != nil { + return err } - } else { - intervalBegin = bucket_aggregations.UnboundedInterval + selectColumnsNr++ } - to, exists := rangeMap["to"] - if exists { - if toRaw, ok := to.(string); ok { - intervalEnd, err = cw.parseDateTimeInClickhouseMathLanguage(toRaw) - if err != nil { - return bucket_aggregations.DateRange{}, err - } - selectColumnsNr++ - } else { - logger.WarnWithCtx(cw.Ctx).Msgf("To specified for date range aggregation is not a string. Querymap: %v "+ - "Using default (unbounded).", dateRange) - intervalEnd = bucket_aggregations.UnboundedInterval + + intervalEnd := bucket_aggregations.UnboundedInterval + if to := cw.parseStringField(rangeMap, "to", defaultIntervalBound); to != defaultIntervalBound { + intervalEnd, err = cw.parseDateTimeInClickhouseMathLanguage(to) + if err != nil { + return err } - } else { - intervalEnd = bucket_aggregations.UnboundedInterval + selectColumnsNr++ } intervals = append(intervals, bucket_aggregations.NewDateTimeInterval(intervalBegin, intervalEnd)) } - return bucket_aggregations.NewDateRange(cw.Ctx, fieldName, format, intervals, selectColumnsNr), nil + + aggregation.queryType = bucket_aggregations.NewDateRange(cw.Ctx, field, format, intervals, selectColumnsNr) + return nil } // parseDateTimeInClickhouseMathLanguage parses dateTime from Clickhouse's format diff --git a/quesma/queryparser/aggregation_parser.go b/quesma/queryparser/aggregation_parser.go index 1ed834211..cc7af3e8d 100644 --- a/quesma/queryparser/aggregation_parser.go +++ b/quesma/queryparser/aggregation_parser.go @@ -3,6 +3,7 @@ package queryparser import ( + "fmt" "quesma/clickhouse" "quesma/logger" "quesma/model" @@ -175,7 +176,11 @@ func (cw *ClickhouseQueryTranslator) parseTopHits(queryMap QueryMap) (parsedTopH const defaultSize = 1 size := cw.parseSize(params, defaultSize) - orderBy := cw.parseOrder(params, queryMap, []model.Expr{}) + orderBy, err := cw.parseOrder(params, []model.Expr{}) + if err != nil { + logger.WarnWithCtx(cw.Ctx).Msgf("error parsing order in top_hits: %v", err) + return + } if len(orderBy) == 1 && orderBy[0].IsCountDesc() { // we don't need count DESC orderBy = []model.OrderByExpr{} } @@ -287,6 +292,16 @@ func (cw *ClickhouseQueryTranslator) parseStringField(queryMap QueryMap, fieldNa return defaultValue } +func (cw *ClickhouseQueryTranslator) parseArrayField(queryMap QueryMap, fieldName string) ([]any, error) { + if valueRaw, exists := queryMap[fieldName]; exists { + if asArray, ok := valueRaw.([]any); ok { + return asArray, nil + } + return nil, fmt.Errorf("%s is not an array, but %T, value: %v", fieldName, valueRaw, valueRaw) + } + return nil, fmt.Errorf("array field '%s' not found in aggregation queryMap: %v", fieldName, queryMap) +} + // parseFieldFieldMaybeScript is basically almost a copy of parseFieldField above, but it also handles a basic script, if "field" is missing. func (cw *ClickhouseQueryTranslator) parseFieldFieldMaybeScript(shouldBeMap any, aggregationType string) (field model.Expr, isFromScript bool) { Map, ok := shouldBeMap.(QueryMap) diff --git a/quesma/queryparser/filters_aggregation.go b/quesma/queryparser/filters_aggregation.go index c44cf4972..150a92bd9 100644 --- a/quesma/queryparser/filters_aggregation.go +++ b/quesma/queryparser/filters_aggregation.go @@ -3,41 +3,27 @@ package queryparser import ( - "quesma/logger" + "fmt" "quesma/model" "quesma/model/bucket_aggregations" + "sort" ) -func (cw *ClickhouseQueryTranslator) parseFilters(queryMap QueryMap) (success bool, filtersAggr bucket_aggregations.Filters) { - filtersAggr = bucket_aggregations.NewFiltersEmpty(cw.Ctx) - - filtersRaw, exists := queryMap["filters"] +func (cw *ClickhouseQueryTranslator) parseFilters(aggregation *pancakeAggregationTreeNode, params QueryMap) error { + filtersParamRaw, exists := params["filters"] if !exists { - return + return fmt.Errorf("filters is not a map, but %T, value: %v", params, params) } - - filtersMap, ok := filtersRaw.(QueryMap) + filtersParam, ok := filtersParamRaw.(QueryMap) if !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("filters is not a map, but %T, value: %v. Using empty.", filtersRaw, filtersRaw) - return - } - nested, exists := filtersMap["filters"] - if !exists { - logger.WarnWithCtx(cw.Ctx).Msgf("filters is not a map, but %T, value: %v. Skipping filters.", filtersRaw, filtersRaw) - return - } - nestedMap, ok := nested.(QueryMap) - if !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("filters is not a map, but %T, value: %v. Skipping filters.", nested, nested) - return + return fmt.Errorf("filters is not a map, but %T, value: %v", filtersParamRaw, filtersParamRaw) } - filters := make([]bucket_aggregations.Filter, 0, len(nestedMap)) - for name, filterRaw := range nestedMap { + filters := make([]bucket_aggregations.Filter, 0, len(filtersParam)) + for name, filterRaw := range filtersParam { filterMap, ok := filterRaw.(QueryMap) if !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("filter is not a map, but %T, value: %v. Skipping.", filterRaw, filterRaw) - continue + return fmt.Errorf("filter is not a map, but %T, value: %v", filterRaw, filterRaw) } filter := cw.parseQueryMap(filterMap) if filter.WhereClause == nil { @@ -46,5 +32,11 @@ func (cw *ClickhouseQueryTranslator) parseFilters(queryMap QueryMap) (success bo } filters = append(filters, bucket_aggregations.NewFilter(name, filter)) } - return true, bucket_aggregations.NewFilters(cw.Ctx, filters) + + sort.Slice(filters, func(i, j int) bool { + return filters[i].Name < filters[j].Name + }) + aggregation.queryType = bucket_aggregations.NewFilters(cw.Ctx, filters) + aggregation.isKeyed = true + return nil } diff --git a/quesma/queryparser/pancake_aggregation_parser.go b/quesma/queryparser/pancake_aggregation_parser.go index 053ee990c..5b685d5b8 100644 --- a/quesma/queryparser/pancake_aggregation_parser.go +++ b/quesma/queryparser/pancake_aggregation_parser.go @@ -167,9 +167,7 @@ func (cw *ClickhouseQueryTranslator) pancakeParseAggregation(aggregationName str } // 4. Bucket aggregations. They introduce new subaggregations, even if no explicit subaggregation defined on this level. - // bucketAggrPresent, err := cw.pancakeTryBucketAggregation(aggregation, queryMap) - _, err := cw.pancakeTryBucketAggregation(aggregation, queryMap) - if err != nil { + if err := cw.pancakeTryBucketAggregation(aggregation, queryMap); err != nil { return nil, err } diff --git a/quesma/queryparser/pancake_aggregation_parser_buckets.go b/quesma/queryparser/pancake_aggregation_parser_buckets.go index 4fd87c785..0bf7ec445 100644 --- a/quesma/queryparser/pancake_aggregation_parser_buckets.go +++ b/quesma/queryparser/pancake_aggregation_parser_buckets.go @@ -5,378 +5,310 @@ package queryparser import ( "fmt" + "github.com/pkg/errors" "quesma/clickhouse" "quesma/logger" "quesma/model" "quesma/model/bucket_aggregations" - "sort" "strconv" "strings" ) -func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pancakeAggregationTreeNode, queryMap QueryMap) (success bool, err error) { - - success = true // returned in most cases - if histogramRaw, ok := queryMap["histogram"]; ok { - histogram, ok := histogramRaw.(QueryMap) - if !ok { - return false, fmt.Errorf("histogram is not a map, but %T, value: %v", histogramRaw, histogramRaw) - } +func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pancakeAggregationTreeNode, queryMap QueryMap) error { + aggregationHandlers := []struct { + name string + handler func(*pancakeAggregationTreeNode, QueryMap) error + }{ + {"histogram", cw.parseHistogram}, + {"date_histogram", cw.parseDateHistogram}, + {"terms", func(node *pancakeAggregationTreeNode, params QueryMap) error { + return cw.parseTermsAggregation(node, params, "terms") + }}, + {"filters", cw.parseFilters}, + {"sampler", cw.parseSampler}, + {"random_sampler", cw.parseRandomSampler}, + {"date_range", cw.parseDateRangeAggregation}, + {"range", cw.parseRangeAggregation}, + {"auto_date_histogram", cw.parseAutoDateHistogram}, + {"geotile_grid", cw.parseGeotileGrid}, + {"significant_terms", func(node *pancakeAggregationTreeNode, params QueryMap) error { + return cw.parseTermsAggregation(node, params, "significant_terms") + }}, + {"multi_terms", cw.parseMultiTerms}, + {"composite", cw.parseComposite}, + } - var interval float64 - intervalRaw, ok := histogram["interval"] - if !ok { - return false, fmt.Errorf("interval not found in histogram: %v", histogram) - } - switch intervalTyped := intervalRaw.(type) { - case string: - var err error - interval, err = strconv.ParseFloat(intervalTyped, 64) - if err != nil { - return false, fmt.Errorf("failed to parse interval: %v", intervalRaw) + for _, aggr := range aggregationHandlers { + if paramsRaw, ok := queryMap[aggr.name]; ok { + if params, ok := paramsRaw.(QueryMap); ok { + delete(queryMap, aggr.name) + return aggr.handler(aggregation, params) } - case int: - interval = float64(intervalTyped) - case float64: - interval = intervalTyped - default: - interval = 1.0 - logger.WarnWithCtx(cw.Ctx).Msgf("unexpected type of interval: %T, value: %v. Will use 1.0.", intervalTyped, intervalTyped) + return fmt.Errorf("%s is not a map, but %T, value: %v", aggr.name, paramsRaw, paramsRaw) } - minDocCount := cw.parseMinDocCount(histogram) - aggregation.queryType = bucket_aggregations.NewHistogram(cw.Ctx, interval, minDocCount) + } - field, _ := cw.parseFieldFieldMaybeScript(histogram, "histogram") - field, didWeAddMissing := cw.addMissingParameterIfPresent(field, histogram) - if !didWeAddMissing { - aggregation.filterOutEmptyKeyBucket = true - } + return nil +} - var col model.Expr - if interval != 1.0 { - // col as string is: fmt.Sprintf("floor(%s / %f) * %f", fieldNameProperlyQuoted, interval, interval) - col = model.NewInfixExpr( - model.NewFunction("floor", model.NewInfixExpr(field, "/", model.NewLiteral(interval))), - "*", - model.NewLiteral(interval), - ) - } else { - col = field +// paramsRaw - in a proper request should be of QueryMap type. +func (cw *ClickhouseQueryTranslator) parseHistogram(aggregation *pancakeAggregationTreeNode, params QueryMap) (err error) { + const defaultInterval = 1.0 + var interval float64 + intervalRaw, ok := params["interval"] + if !ok { + return fmt.Errorf("interval not found in histogram: %v", params) + } + switch intervalTyped := intervalRaw.(type) { + case string: + interval, err = strconv.ParseFloat(intervalTyped, 64) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("failed to parse interval: %v", intervalRaw)) } + case int: + interval = float64(intervalTyped) + case float64: + interval = intervalTyped + default: + interval = 1.0 + logger.WarnWithCtx(cw.Ctx).Msgf("unexpected type of interval: %T, value: %v. Will use default (%v)", intervalTyped, intervalTyped, defaultInterval) + } - aggregation.selectedColumns = append(aggregation.selectedColumns, col) - aggregation.orderBy = append(aggregation.orderBy, model.NewOrderByExprWithoutOrder(col)) - - delete(queryMap, "histogram") - return success, nil + minDocCount := cw.parseMinDocCount(params) + field, _ := cw.parseFieldFieldMaybeScript(params, "histogram") + field, didWeAddMissing := cw.addMissingParameterIfPresent(field, params) + if !didWeAddMissing { + aggregation.filterOutEmptyKeyBucket = true } - if dateHistogramRaw, ok := queryMap["date_histogram"]; ok { - dateHistogram, ok := dateHistogramRaw.(QueryMap) - if !ok { - return false, fmt.Errorf("date_histogram is not a map, but %T, value: %v", dateHistogramRaw, dateHistogramRaw) - } - field := cw.parseFieldField(dateHistogram, "date_histogram") - dateTimeType := cw.Table.GetDateTimeTypeFromExpr(cw.Ctx, field) - - weAddedMissing := false - if missingRaw, exists := dateHistogram["missing"]; exists { - if missing, ok := missingRaw.(string); ok { - dateManager := NewDateManager(cw.Ctx) - if missingExpr, parsingOk := dateManager.ParseDateUsualFormat(missing, dateTimeType); parsingOk { - field = model.NewFunction("COALESCE", field, missingExpr) - weAddedMissing = true - } else { - logger.ErrorWithCtx(cw.Ctx).Msgf("unknown format of missing in date_histogram: %v. Skipping it.", missing) - } - } else { - logger.ErrorWithCtx(cw.Ctx).Msgf("missing %v is not a string, but: %T. Skipping it.", missingRaw, missingRaw) - } - } - if !weAddedMissing { - // if we don't add missing, we need to filter out nulls later - aggregation.filterOutEmptyKeyBucket = true - } - ebMin, ebMax := bucket_aggregations.NoExtendedBound, bucket_aggregations.NoExtendedBound - if extendedBounds, exists := dateHistogram["extended_bounds"].(QueryMap); exists { - ebMin = cw.parseInt64Field(extendedBounds, "min", bucket_aggregations.NoExtendedBound) - ebMax = cw.parseInt64Field(extendedBounds, "max", bucket_aggregations.NoExtendedBound) - } + if interval != 1.0 { + // column as string is: fmt.Sprintf("floor(%s / %f) * %f", fieldNameProperlyQuoted, interval, interval) + field = model.NewInfixExpr( + model.NewFunction("floor", model.NewInfixExpr(field, "/", model.NewLiteral(interval))), + "*", + model.NewLiteral(interval), + ) + } - minDocCount := cw.parseMinDocCount(dateHistogram) - timezone := cw.parseStringField(dateHistogram, "time_zone", "") - interval, intervalType := cw.extractInterval(dateHistogram) - // TODO GetDateTimeTypeFromExpr can be moved and it should take cw.Schema as an argument + aggregation.queryType = bucket_aggregations.NewHistogram(cw.Ctx, interval, minDocCount) + aggregation.selectedColumns = append(aggregation.selectedColumns, field) + aggregation.orderBy = append(aggregation.orderBy, model.NewOrderByExprWithoutOrder(field)) + return nil +} - if dateTimeType == clickhouse.Invalid { - logger.WarnWithCtx(cw.Ctx).Msgf("invalid date time type for field %s", field) +// paramsRaw - in a proper request should be of QueryMap type. +func (cw *ClickhouseQueryTranslator) parseDateHistogram(aggregation *pancakeAggregationTreeNode, params QueryMap) (err error) { + field := cw.parseFieldField(params, "date_histogram") + dateTimeType := cw.Table.GetDateTimeTypeFromExpr(cw.Ctx, field) + + weAddedMissing := false + if missingRaw, exists := params["missing"]; exists { + if missing, ok := missingRaw.(string); ok { + dateManager := NewDateManager(cw.Ctx) + if missingExpr, parsingOk := dateManager.ParseDateUsualFormat(missing, dateTimeType); parsingOk { + field = model.NewFunction("COALESCE", field, missingExpr) + weAddedMissing = true + } else { + logger.ErrorWithCtx(cw.Ctx).Msgf("unknown format of missing in date_histogram: %v. Skipping it.", missing) + } + } else { + logger.ErrorWithCtx(cw.Ctx).Msgf("missing %v is not a string, but: %T. Skipping it.", missingRaw, missingRaw) } + } + if !weAddedMissing { + // if we don't add missing, we need to filter out nulls later + aggregation.filterOutEmptyKeyBucket = true + } - dateHistogramAggr := bucket_aggregations.NewDateHistogram( - cw.Ctx, field, interval, timezone, minDocCount, ebMin, ebMax, intervalType, dateTimeType) - aggregation.queryType = dateHistogramAggr + ebMin, ebMax := bucket_aggregations.NoExtendedBound, bucket_aggregations.NoExtendedBound + if extendedBounds, exists := params["extended_bounds"].(QueryMap); exists { + ebMin = cw.parseInt64Field(extendedBounds, "min", bucket_aggregations.NoExtendedBound) + ebMax = cw.parseInt64Field(extendedBounds, "max", bucket_aggregations.NoExtendedBound) + } - sqlQuery := dateHistogramAggr.GenerateSQL() - aggregation.selectedColumns = append(aggregation.selectedColumns, sqlQuery) - aggregation.orderBy = append(aggregation.orderBy, model.NewOrderByExprWithoutOrder(sqlQuery)) + minDocCount := cw.parseMinDocCount(params) + timezone := cw.parseStringField(params, "time_zone", "") + interval, intervalType := cw.extractInterval(params) + // TODO GetDateTimeTypeFromExpr can be moved and it should take cw.Schema as an argument - delete(queryMap, "date_histogram") - return success, nil + if dateTimeType == clickhouse.Invalid { + logger.WarnWithCtx(cw.Ctx).Msgf("invalid date time type for field %s", field) } - if autoDateHistogram := cw.parseAutoDateHistogram(queryMap["auto_date_histogram"]); autoDateHistogram != nil { - aggregation.queryType = autoDateHistogram - delete(queryMap, "auto_date_histogram") - return - } - for _, termsType := range []string{"terms", "significant_terms"} { - termsRaw, ok := queryMap[termsType] - if !ok { - continue - } - terms, ok := termsRaw.(QueryMap) - if !ok { - return false, fmt.Errorf("%s is not a map, but %T, value: %v", termsType, termsRaw, termsRaw) - } - fieldExpression := cw.parseFieldField(terms, termsType) - fieldExpression, didWeAddMissing := cw.addMissingParameterIfPresent(fieldExpression, terms) - if !didWeAddMissing { - aggregation.filterOutEmptyKeyBucket = true - } + dateHistogram := bucket_aggregations.NewDateHistogram(cw.Ctx, + field, interval, timezone, minDocCount, ebMin, ebMax, intervalType, dateTimeType) + aggregation.queryType = dateHistogram - const defaultSize = 10 - size := cw.parseSize(terms, defaultSize) - orderBy := cw.parseOrder(terms, queryMap, []model.Expr{fieldExpression}) - aggregation.queryType = bucket_aggregations.NewTerms(cw.Ctx, termsType == "significant_terms", orderBy[0]) // TODO probably full, not [0] - aggregation.selectedColumns = append(aggregation.selectedColumns, fieldExpression) - aggregation.limit = size - aggregation.orderBy = orderBy + columnSql := dateHistogram.GenerateSQL() + aggregation.selectedColumns = append(aggregation.selectedColumns, columnSql) + aggregation.orderBy = append(aggregation.orderBy, model.NewOrderByExprWithoutOrder(columnSql)) + return nil +} - delete(queryMap, termsType) - return success, nil +// paramsRaw - in a proper request should be of QueryMap type. +// aggrName - "terms" or "significant_terms" +func (cw *ClickhouseQueryTranslator) parseTermsAggregation(aggregation *pancakeAggregationTreeNode, params QueryMap, aggrName string) error { + field := cw.parseFieldField(params, aggrName) + field, didWeAddMissing := cw.addMissingParameterIfPresent(field, params) + if !didWeAddMissing { + aggregation.filterOutEmptyKeyBucket = true } - if multiTermsRaw, exists := queryMap["multi_terms"]; exists { - multiTerms, ok := multiTermsRaw.(QueryMap) - if !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("multi_terms is not a map, but %T, value: %v", multiTermsRaw, multiTermsRaw) - } - const defaultSize = 10 - size := cw.parseIntField(multiTerms, "size", defaultSize) + const defaultSize = 10 + size := cw.parseSize(params, defaultSize) + orderBy, err := cw.parseOrder(params, []model.Expr{field}) + if err != nil { + return err + } - aggregation.limit = size + aggregation.queryType = bucket_aggregations.NewTerms(cw.Ctx, aggrName == "significant_terms", orderBy[0]) // TODO probably full, not [0] + aggregation.selectedColumns = append(aggregation.selectedColumns, field) + aggregation.limit = size + aggregation.orderBy = orderBy + return nil +} - var fieldsNr int - if termsRaw, exists := multiTerms["terms"]; exists { - terms, ok := termsRaw.([]any) - if !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("terms is not an array, but %T, value: %v. Using empty array", termsRaw, termsRaw) - } - fieldsNr = len(terms) - columns := make([]model.Expr, 0, fieldsNr) - for _, term := range terms { - columns = append(columns, cw.parseFieldField(term, "multi_terms")) - } - aggregation.selectedColumns = append(aggregation.selectedColumns, columns...) - aggregation.orderBy = append(aggregation.orderBy, cw.parseOrder(multiTerms, queryMap, columns)...) - } else { - logger.WarnWithCtx(cw.Ctx).Msg("no terms in multi_terms") - } +func (cw *ClickhouseQueryTranslator) parseSampler(aggregation *pancakeAggregationTreeNode, params QueryMap) error { + const defaultSize = 100 + aggregation.queryType = bucket_aggregations.NewSampler(cw.Ctx, cw.parseIntField(params, "shard_size", defaultSize)) + return nil +} - aggregation.queryType = bucket_aggregations.NewMultiTerms(cw.Ctx, fieldsNr) - aggregation.limit = size +func (cw *ClickhouseQueryTranslator) parseRandomSampler(aggregation *pancakeAggregationTreeNode, params QueryMap) error { + const defaultProbability = 0.0 // theoretically it's required + const defaultSeed = 0 + aggregation.queryType = bucket_aggregations.NewRandomSampler(cw.Ctx, + cw.parseFloatField(params, "probability", defaultProbability), + cw.parseIntField(params, "seed", defaultSeed), + ) + return nil +} - delete(queryMap, "multi_terms") - return success, nil +func (cw *ClickhouseQueryTranslator) parseRangeAggregation(aggregation *pancakeAggregationTreeNode, params QueryMap) error { + ranges, err := cw.parseArrayField(params, "ranges") + if err != nil { + return err } - if rangeRaw, ok := queryMap["range"]; ok { - rangeMap, ok := rangeRaw.(QueryMap) - if !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("range is not a map, but %T, value: %v. Using empty map", rangeRaw, rangeRaw) - } - Range := cw.parseRangeAggregation(rangeMap) - aggregation.queryType = Range - if Range.Keyed { - aggregation.isKeyed = true - } - delete(queryMap, "range") - return success, nil + intervals := make([]bucket_aggregations.Interval, 0, len(ranges)) + for _, Range := range ranges { + rangePartMap := Range.(QueryMap) + from := cw.parseFloatField(rangePartMap, "from", bucket_aggregations.IntervalInfiniteRange) + to := cw.parseFloatField(rangePartMap, "to", bucket_aggregations.IntervalInfiniteRange) + intervals = append(intervals, bucket_aggregations.NewInterval(from, to)) } - if dateRangeRaw, ok := queryMap["date_range"]; ok { - dateRange, ok := dateRangeRaw.(QueryMap) - if !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("date_range is not a map, but %T, value: %v. Using empty map", dateRangeRaw, dateRangeRaw) - } - dateRangeParsed, err := cw.parseDateRangeAggregation(dateRange) - if err != nil { - logger.ErrorWithCtx(cw.Ctx).Err(err).Msg("failed to parse date_range aggregation") - return false, err - } - aggregation.queryType = dateRangeParsed - // TODO: keep for reference as relative time, but no longer needed - /* - for _, interval := range dateRangeParsed.Intervals { - aggregation.selectedColumns = append(aggregation.selectedColumns, interval.ToSQLSelectQuery(dateRangeParsed.FieldName)) - - if sqlSelect, selectNeeded := interval.BeginTimestampToSQL(); selectNeeded { - aggregation.selectedColumns = append(aggregation.selectedColumns, sqlSelect) - } - if sqlSelect, selectNeeded := interval.EndTimestampToSQL(); selectNeeded { - aggregation.selectedColumns = append(aggregation.selectedColumns, sqlSelect) - } - }*/ - - delete(queryMap, "date_range") - return success, nil - } - if geoTileGridRaw, ok := queryMap["geotile_grid"]; ok { - geoTileGrid, ok := geoTileGridRaw.(QueryMap) - if !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("geotile_grid is not a map, but %T, value: %v", geoTileGridRaw, geoTileGridRaw) - } - var precisionZoom float64 - precisionRaw, ok := geoTileGrid["precision"] - if ok { - switch cutValueTyped := precisionRaw.(type) { - case float64: - precisionZoom = cutValueTyped - } - } - field := cw.parseFieldField(geoTileGrid, "geotile_grid") - aggregation.queryType = bucket_aggregations.NewGeoTileGrid(cw.Ctx) - - // That's bucket (group by) formula for geotile_grid - // zoom/x/y - // SELECT precisionZoom as zoom, - // FLOOR(((toFloat64("Location::lon") + 180.0) / 360.0) * POWER(2, zoom)) AS x_tile, - // FLOOR( - // ( - // 1 - LOG(TAN(RADIANS(toFloat64("Location::lat"))) + (1 / COS(RADIANS(toFloat64("Location::lat"))))) / PI() - // ) / 2.0 * POWER(2, zoom) - // ) AS y_tile, count() - // FROM - // kibana_sample_data_flights Group by zoom, x_tile, y_tile - - zoomLiteral := model.NewLiteral(precisionZoom) - - fieldName, err := strconv.Unquote(model.AsString(field)) - if err != nil { - return false, err + const keyedDefault = false + keyed := keyedDefault + if keyedRaw, exists := params["keyed"]; exists { + var ok bool + if keyed, ok = keyedRaw.(bool); !ok { + logger.WarnWithCtx(cw.Ctx).Msgf("keyed is not a bool, but %T, value: %v", keyedRaw, keyedRaw) } - lon := model.NewGeoLon(fieldName) - lat := model.NewGeoLat(fieldName) - - toFloatFunLon := model.NewFunction("toFloat64", lon) - var infixX model.Expr - infixX = model.NewParenExpr(model.NewInfixExpr(toFloatFunLon, "+", model.NewLiteral(180.0))) - infixX = model.NewParenExpr(model.NewInfixExpr(infixX, "/", model.NewLiteral(360.0))) - infixX = model.NewInfixExpr(infixX, "*", - model.NewFunction("POWER", model.NewLiteral(2), zoomLiteral)) - xTile := model.NewFunction("FLOOR", infixX) - toFloatFunLat := model.NewFunction("toFloat64", lat) - radians := model.NewFunction("RADIANS", toFloatFunLat) - tan := model.NewFunction("TAN", radians) - cos := model.NewFunction("COS", radians) - Log := model.NewFunction("LOG", model.NewInfixExpr(tan, "+", - model.NewParenExpr(model.NewInfixExpr(model.NewLiteral(1), "/", cos)))) - - FloorContent := model.NewInfixExpr( - model.NewInfixExpr( - model.NewParenExpr( - model.NewInfixExpr(model.NewInfixExpr(model.NewLiteral(1), "-", Log), "/", - model.NewLiteral("PI()"))), "/", - model.NewLiteral(2.0)), "*", - model.NewFunction("POWER", model.NewLiteral(2), zoomLiteral)) - yTile := model.NewFunction("FLOOR", FloorContent) - - aggregation.selectedColumns = append(aggregation.selectedColumns, model.NewLiteral(fmt.Sprintf("CAST(%f AS Float32)", precisionZoom))) - aggregation.selectedColumns = append(aggregation.selectedColumns, xTile) - aggregation.selectedColumns = append(aggregation.selectedColumns, yTile) - - delete(queryMap, "geotile_grid") - return success, err } - if sampler, ok := queryMap["sampler"]; ok { - aggregation.queryType = cw.parseSampler(sampler) - delete(queryMap, "sampler") - return - } - if randomSampler, ok := queryMap["random_sampler"]; ok { - aggregation.queryType = cw.parseRandomSampler(randomSampler) - delete(queryMap, "random_sampler") - return - } - if isFilters, filterAggregation := cw.parseFilters(queryMap); isFilters { - sort.Slice(filterAggregation.Filters, func(i, j int) bool { // stable order is required for tests and caching - return filterAggregation.Filters[i].Name < filterAggregation.Filters[j].Name - }) - aggregation.isKeyed = true - aggregation.queryType = filterAggregation - delete(queryMap, "filters") - return - } - if composite, ok := queryMap["composite"]; ok { - aggregation.queryType, err = cw.parseComposite(aggregation, composite) - delete(queryMap, "composite") - return err == nil, err - } - success = false - return + + field := cw.parseFieldField(params, "range") + aggregation.queryType = bucket_aggregations.NewRange(cw.Ctx, field, intervals, keyed) + aggregation.isKeyed = keyed + return nil } -// samplerRaw - in a proper request should be of QueryMap type. -func (cw *ClickhouseQueryTranslator) parseSampler(samplerRaw any) bucket_aggregations.Sampler { - const defaultSize = 100 - sampler, ok := samplerRaw.(QueryMap) - if !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("sampler is not a map, but %T, value: %v", samplerRaw, samplerRaw) - return bucket_aggregations.NewSampler(cw.Ctx, defaultSize) +func (cw *ClickhouseQueryTranslator) parseAutoDateHistogram(aggregation *pancakeAggregationTreeNode, params QueryMap) error { + fieldRaw := cw.parseFieldField(params, "auto_date_histogram") + if field, ok := fieldRaw.(model.ColumnRef); ok { + bucketsNr := cw.parseIntField(params, "buckets", 10) + aggregation.queryType = bucket_aggregations.NewAutoDateHistogram(cw.Ctx, field, bucketsNr) + return nil } - return bucket_aggregations.NewSampler(cw.Ctx, cw.parseIntField(sampler, "shard_size", defaultSize)) + + return fmt.Errorf("error parsing 'field' in auto_date_histogram; field type: %T, value: %v", fieldRaw, fieldRaw) } -// randomSamplerRaw - in a proper request should be of QueryMap type. -func (cw *ClickhouseQueryTranslator) parseRandomSampler(randomSamplerRaw any) bucket_aggregations.RandomSampler { - const defaultProbability = 0.0 // theoretically it's required - const defaultSeed = 0 - randomSampler, ok := randomSamplerRaw.(QueryMap) - if !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("sampler is not a map, but %T, value: %v", randomSamplerRaw, randomSamplerRaw) - return bucket_aggregations.NewRandomSampler(cw.Ctx, defaultProbability, defaultSeed) +func (cw *ClickhouseQueryTranslator) parseMultiTerms(aggregation *pancakeAggregationTreeNode, params QueryMap) error { + terms, err := cw.parseArrayField(params, "terms") + if err != nil { + return err } - return bucket_aggregations.NewRandomSampler( - cw.Ctx, - cw.parseFloatField(randomSampler, "probability", defaultProbability), - cw.parseIntField(randomSampler, "seed", defaultSeed), - ) -} -func (cw *ClickhouseQueryTranslator) parseAutoDateHistogram(paramsRaw any) *bucket_aggregations.AutoDateHistogram { - params, ok := paramsRaw.(QueryMap) - if !ok { - return nil + fieldsNr := len(terms) + columns := make([]model.Expr, 0, fieldsNr) + for _, term := range terms { + columns = append(columns, cw.parseFieldField(term, "multi_terms")) } - fieldRaw := cw.parseFieldField(params, "auto_date_histogram") - var field model.ColumnRef - if field, ok = fieldRaw.(model.ColumnRef); !ok { - logger.WarnWithCtx(cw.Ctx).Msgf("field is not a string, but %T, value: %v. Skipping auto_date_histogram", fieldRaw, fieldRaw) - return nil + orderBy, err := cw.parseOrder(params, columns) + if err != nil { + return err } - bucketsNr := cw.parseIntField(params, "buckets", 10) - return bucket_aggregations.NewAutoDateHistogram(cw.Ctx, field, bucketsNr) + aggregation.orderBy = append(aggregation.orderBy, orderBy...) + aggregation.selectedColumns = append(aggregation.selectedColumns, columns...) + + const defaultSize = 10 + aggregation.limit = cw.parseSize(params, defaultSize) + aggregation.queryType = bucket_aggregations.NewMultiTerms(cw.Ctx, fieldsNr) + return nil +} + +func (cw *ClickhouseQueryTranslator) parseGeotileGrid(aggregation *pancakeAggregationTreeNode, params QueryMap) error { + const defaultPrecisionZoom = 7.0 + precisionZoom := cw.parseFloatField(params, "precision", defaultPrecisionZoom) + field := cw.parseFieldField(params, "geotile_grid") + + // That's bucket (group by) formula for geotile_grid + // zoom/x/y + // SELECT precisionZoom as zoom, + // FLOOR(((toFloat64("Location::lon") + 180.0) / 360.0) * POWER(2, zoom)) AS x_tile, + // FLOOR( + // ( + // 1 - LOG(TAN(RADIANS(toFloat64("Location::lat"))) + (1 / COS(RADIANS(toFloat64("Location::lat"))))) / PI() + // ) / 2.0 * POWER(2, zoom) + // ) AS y_tile, count() + // FROM + // kibana_sample_data_flights Group by zoom, x_tile, y_tile + + zoomLiteral := model.NewLiteral(precisionZoom) + + fieldName, err := strconv.Unquote(model.AsString(field)) + if err != nil { + return err + } + lon := model.NewGeoLon(fieldName) + lat := model.NewGeoLat(fieldName) + + toFloatFunLon := model.NewFunction("toFloat64", lon) + var infixX model.Expr + infixX = model.NewParenExpr(model.NewInfixExpr(toFloatFunLon, "+", model.NewLiteral(180.0))) + infixX = model.NewParenExpr(model.NewInfixExpr(infixX, "/", model.NewLiteral(360.0))) + infixX = model.NewInfixExpr(infixX, "*", + model.NewFunction("POWER", model.NewLiteral(2), zoomLiteral)) + xTile := model.NewFunction("FLOOR", infixX) + toFloatFunLat := model.NewFunction("toFloat64", lat) + radians := model.NewFunction("RADIANS", toFloatFunLat) + tan := model.NewFunction("TAN", radians) + cos := model.NewFunction("COS", radians) + Log := model.NewFunction("LOG", model.NewInfixExpr(tan, "+", + model.NewParenExpr(model.NewInfixExpr(model.NewLiteral(1), "/", cos)))) + + FloorContent := model.NewInfixExpr( + model.NewInfixExpr( + model.NewParenExpr( + model.NewInfixExpr(model.NewInfixExpr(model.NewLiteral(1), "-", Log), "/", + model.NewLiteral("PI()"))), "/", + model.NewLiteral(2.0)), "*", + model.NewFunction("POWER", model.NewLiteral(2), zoomLiteral)) + yTile := model.NewFunction("FLOOR", FloorContent) + + aggregation.queryType = bucket_aggregations.NewGeoTileGrid(cw.Ctx) + aggregation.selectedColumns = append(aggregation.selectedColumns, model.NewLiteral(fmt.Sprintf("CAST(%f AS Float32)", precisionZoom))) + aggregation.selectedColumns = append(aggregation.selectedColumns, xTile) + aggregation.selectedColumns = append(aggregation.selectedColumns, yTile) + return nil } // compositeRaw - in a proper request should be of QueryMap type. // TODO: In geotile_grid, without order specidfied, Elastic returns sort by key (a/b/c earlier than x/y/z if a all fields :( and can't regex that easily. + // (TODO Maybe we can, don't want to waste time for this now https://stackoverflow.com/questions/3533408/regex-i-want-this-and-that-and-that-in-any-order) + "message": {Name: "message", Type: clickhouse.NewBaseType("String")}, + }, + Created: true, + }) + + test := func(t *testing.T, handlerName string, testcase testdata.FullSearchTestCase) { + db, mock := util.InitSqlMockWithPrettySqlAndPrint(t, false) + defer db.Close() + + for i, expectedSQL := range testcase.ExpectedSQLs { + rows := sqlmock.NewRows([]string{testcase.ExpectedSQLResults[i][0].Cols[0].ColName}) + for _, row := range testcase.ExpectedSQLResults[i] { + rows.AddRow(row.Cols[0].Value) + } + mock.ExpectQuery(expectedSQL).WillReturnRows(rows) + } + + queryRunner := NewQueryRunnerDefaultForTests(db, &DefaultConfig, tableName, table, s) + + var response []byte + var err error + + if handlerName == "handleSearch" { + response, err = queryRunner.handleSearch(ctx, tableName, types.MustJSON(testcase.QueryRequestJson)) + } else if handlerName == "handleAsyncSearch" { + response, err = queryRunner.handleAsyncSearch( + ctx, tableName, types.MustJSON(testcase.QueryRequestJson), defaultAsyncSearchTimeout, true) + } + if err != nil { + t.Fatal(err) + } + assert.NoError(t, err) + + if err := mock.ExpectationsWereMet(); err != nil { + assert.NoError(t, err, "there were unfulfilled expections:") + } + + var responseMap model.JsonMap + err = json.Unmarshal(response, &responseMap) + if err != nil { + pp.Println("Response", string(response)) + } + assert.NoError(t, err, "error unmarshalling search API response:") + + var responsePart model.JsonMap + if handlerName == "handleSearch" { + responsePart = responseMap + } else { + responsePart = responseMap["response"].(model.JsonMap) + } + + assert.NotNil(t, testcase.ExpectedResponse, "ExpectedResponse is nil") + expectedResponseMap, err := util.JsonToMap(testcase.ExpectedResponse) + assert.NoError(t, err, "error unmarshalling expected response:") + + actualMinusExpected, expectedMinusActual := util.MapDifference(responsePart, + expectedResponseMap, []string{}, true, true) + acceptableDifference := []string{"took", "_shards", "timed_out"} + + pp.Println("expected", expectedResponseMap) + pp.Println("actual", responsePart) + + assert.True(t, util.AlmostEmpty(actualMinusExpected, acceptableDifference), "actualMinusExpected: %v", actualMinusExpected) + assert.True(t, util.AlmostEmpty(expectedMinusActual, acceptableDifference), "expectedMinusActual: %v", expectedMinusActual) + } + + handlers := []string{"handleSearch", "handleAsyncSearch"} + for i, tt := range testdata.FullSearchRequests { + for _, handlerName := range handlers[:1] { + t.Run(strconv.Itoa(i)+" "+tt.Name, func(t *testing.T) { + test(t, handlerName, tt) + }) + } + } +} diff --git a/quesma/testdata/invalid_requests.go b/quesma/testdata/invalid_requests.go new file mode 100644 index 000000000..e3a2a7f3d --- /dev/null +++ b/quesma/testdata/invalid_requests.go @@ -0,0 +1,284 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package testdata + +// InvalidAggregationTests are still WIP, not used in tests yet. +var InvalidAggregationTests = []AggregationTestCase{ + { + TestName: "Kibana 8.15, Metrics: Aggregation: Rate, invalid Unit (10)", //reason [eaggs] > reason + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "1": { + "rate": { + "field": "DistanceKilometers", + "unit": "10" + } + } + }, + "script_fields": {}, + "size": 0, + "stored_fields": [ + "*" + ], + "track_total_hits": true + }`, + ExpectedResponse: ` + { + "error": { + "caused_by": { + "reason": "Unsupported unit 10", + "type": "illegal_argument_exception" + }, + "reason": "[1:59] [rate] failed to parse field [unit]", + "root_cause": [ + { + "reason": "[1:59] [rate] failed to parse field [unit]", + "type": "x_content_parse_exception" + } + ], + "type": "x_content_parse_exception" + }, + "status": 400 + } (400 status code)`, + }, + { + TestName: "Kibana 8.15, Metrics: Aggregation: Rate, invalid Unit (abc)", //reason [eaggs] > reason + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "1": { + "rate": { + "field": "DistanceKilometers", + "unit": "abc" + } + } + }, + "script_fields": {}, + "size": 0, + "stored_fields": [ + "*" + ], + "track_total_hits": true + }`, + ExpectedResponse: ` + { + "error": { + "caused_by": { + "reason": "Unsupported unit 10", + "type": "illegal_argument_exception" + }, + "reason": "[1:59] [rate] failed to parse field [unit]", + "root_cause": [ + { + "reason": "[1:59] [rate] failed to parse field [unit]", + "type": "x_content_parse_exception" + } + ], + "type": "x_content_parse_exception" + }, + "status": 400 + } (400 status code)`, + }, + { + TestName: "Kibana 8.15, Metrics: Aggregation: Rate, valid Unit (month), but bad surrounding aggregations", //reason [eaggs] > reason + QueryRequestJson: ` +{ + "_source": { + "excludes": [] + }, + "aggs": { + "1": { + "rate": { + "field": "DistanceKilometers", + "unit": "month" + } + } + }, + "script_fields": {}, + "size": 0, + "stored_fields": [ + "*" + ], + "track_total_hits": true +}`, + + ExpectedResponse: ` +{ + "completion_time_in_millis": 1731585426907, + "error": { + "caused_by": { + "caused_by": { + "caused_by": { + "reason": "The rate aggregation can only be used inside a date histogram aggregation or composite aggregation with one date histogram value source", + "type": "illegal_argument_exception" + }, + "reason": "The rate aggregation can only be used inside a date histogram aggregation or composite aggregation with one date histogram value source", + "type": "illegal_argument_exception" + }, + "failed_shards": [ + { + "index": "kibana_sample_data_flights", + "node": "SqOwBNLfS0yt1lgl8XzEdA", + "reason": { + "reason": "The rate aggregation can only be used inside a date histogram aggregation or composite aggregation with one date histogram value source", + "type": "illegal_argument_exception" + }, + "shard": 0 + } + ], + "grouped": true, + "phase": "query", + "reason": "all shards failed", + "type": "search_phase_execution_exception" + }, + "reason": "error while executing search", + "type": "status_exception" + }, + "expiration_time_in_millis": 1731585486899, + "id": "FnoxVjUxSnRJUnZHNmVCUHZaLTQwbXccU3FPd0JOTGZTMHl0MWxnbDhYekVkQToxMDIzMQ==", + "is_partial": true, + "is_running": false, + "response": { + "_shards": { + "failed": 1, + "failures": [ + { + "index": "kibana_sample_data_flights", + "node": "SqOwBNLfS0yt1lgl8XzEdA", + "reason": { + "reason": "The rate aggregation can only be used inside a date histogram aggregation or composite aggregation with one date histogram value source", + "type": "illegal_argument_exception" + }, + "shard": 0 + } + ], + "skipped": 0, + "successful": 0, + "total": 1 + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "gte", + "value": 0 + } + }, + "num_reduce_phases": 0, + "terminated_early": false, + "timed_out": false, + "took": 8 + }, + "start_time_in_millis": 1731585426899 +} (400 status code)`, + }, + { + TestName: "Kibana 8.15, Metrics: Aggregation: Rate, invalid Unit (10)", //reason [eaggs] > reason + QueryRequestJson: ` + { + "_source": { + "excludes": [] + }, + "aggs": { + "2": { + "aggs": { + "1": { + "rate": { + "field": "DistanceKilometers", + "unit": "month" + } + } + }, + "date_histogram": { + "field": "timestamp", + "fixed_interval": "30s", + "min_doc_count": 1, + "time_zone": "Europe/Warsaw" + } + } + }, + "script_fields": {}, + "size": 0, + "stored_fields": [ + "*" + ], + "track_total_hits": true + }`, + ExpectedResponse: ` + { + "completion_time_in_millis": 1731585496445, + "error": { + "caused_by": { + "caused_by": { + "caused_by": { + "reason": "Cannot use month-based rate unit [month] with fixed interval based histogram, only week, day, hour, minute and second are supported for this histogram", + "type": "illegal_argument_exception" + }, + "reason": "Cannot use month-based rate unit [month] with fixed interval based histogram, only week, day, hour, minute and second are supported for this histogram", + "type": "illegal_argument_exception" + }, + "failed_shards": [ + { + "index": "kibana_sample_data_flights", + "node": "SqOwBNLfS0yt1lgl8XzEdA", + "reason": { + "reason": "Cannot use month-based rate unit [month] with fixed interval based histogram, only week, day, hour, minute and second are supported for this histogram", + "type": "illegal_argument_exception" + }, + "shard": 0 + } + ], + "grouped": true, + "phase": "query", + "reason": "all shards failed", + "type": "search_phase_execution_exception" + }, + "reason": "error while executing search", + "type": "status_exception" + }, + "expiration_time_in_millis": 1731585556279, + "id": "FlU1MWhKNzZsVDh1RGhCS2xpeGFqUXccU3FPd0JOTGZTMHl0MWxnbDhYekVkQToxMTA3Ng==", + "is_partial": true, + "is_running": false, + "response": { + "_shards": { + "failed": 1, + "failures": [ + { + "index": "kibana_sample_data_flights", + "node": "SqOwBNLfS0yt1lgl8XzEdA", + "reason": { + "reason": "Cannot use month-based rate unit [month] with fixed interval based histogram, only week, day, hour, minute and second are supported for this histogram", + "type": "illegal_argument_exception" + }, + "shard": 0 + } + ], + "skipped": 0, + "successful": 0, + "total": 1 + }, + "hits": { + "hits": [], + "max_score": null, + "total": { + "relation": "gte", + "value": 0 + } + }, + "num_reduce_phases": 0, + "terminated_early": false, + "timed_out": false, + "took": 166 + }, + "start_time_in_millis": 1731585496279 + }`, + }, +}