diff --git a/cmd/v2_test_objects.go b/cmd/v2_test_objects.go index abea85034..395698e33 100644 --- a/cmd/v2_test_objects.go +++ b/cmd/v2_test_objects.go @@ -422,8 +422,8 @@ func (p *QueryTransformationPipeline) ComposeResult(results [][]model.QueryResul type QueryTransformer1 struct { } -func (p *QueryTransformer1) Transform(queries []*model.Query) ([]*model.Query, error) { - logger.Debug().Msg("SimpleQueryTransformationPipeline: Transform") +func (p *QueryTransformer1) Transform(ctx context.Context, queries []*model.Query) ([]*model.Query, error) { + logger.DebugWithCtx(ctx).Msg("SimpleQueryTransformationPipeline: Transform") // Do basic transformation return queries, nil diff --git a/platform/clickhouse/schema.go b/platform/clickhouse/schema.go index de46f15d1..565b6e334 100644 --- a/platform/clickhouse/schema.go +++ b/platform/clickhouse/schema.go @@ -75,10 +75,6 @@ const ( Invalid ) -func (c *Column) String() string { - return fmt.Sprintf("%s %s", c.Name, c.Type.String()) -} - func (t BaseType) String() string { return t.Name } @@ -326,6 +322,21 @@ func NewEmptyTable(tableName string) *Table { return &Table{Name: tableName, Config: NewChTableConfigNoAttrs()} } +func (col *Column) String() string { + return fmt.Sprintf("%s %s", col.Name, col.Type.String()) +} + +// IsDatetime <=> is it DateTime or Date (but NOT DateTime64) +func (col *Column) IsDatetime() bool { + isDatetime := strings.HasPrefix(col.Type.String(), "DateTime") || strings.HasPrefix(col.Type.String(), "Date") + isDatetime64 := strings.HasPrefix(col.Type.String(), "DateTime64") + return isDatetime && !isDatetime64 +} + +func (col *Column) IsDatetime64() bool { + return strings.HasPrefix(col.Type.String(), "DateTime64") +} + func (col *Column) isArray() bool { return col.Type.isArray() } diff --git a/platform/clickhouse/util.go b/platform/clickhouse/util.go index 1b6f430ff..e42838100 100644 --- a/platform/clickhouse/util.go +++ b/platform/clickhouse/util.go @@ -3,13 +3,7 @@ package clickhouse import ( - "bytes" - "fmt" - "github.com/QuesmaOrg/quesma/platform/logger" - "github.com/QuesmaOrg/quesma/platform/model" - "github.com/goccy/go-json" "strings" - "time" ) // Code doesn't need to be pretty, 99.9% it's just for our purposes @@ -103,85 +97,3 @@ func parseTypeFromShowColumns(typ, name string) (Type, string) { } return parseTypeRec(typ, name) } - -func PrettyJson(jsonStr string) string { - var prettyJSON bytes.Buffer - if err := json.Indent(&prettyJSON, []byte(jsonStr), "", " "); err != nil { - return fmt.Sprintf("PrettyJson err: %v\n", err) - } - return prettyJSON.String() -} - -// TimestampGroupBy returns string to be used in the select part of Clickhouse query, when grouping by timestamp interval. -// e.g. -// - timestampGroupBy("@timestamp", DateTime64, 30 seconds) --> toInt64(toUnixTimestamp64Milli(`@timestamp`)/30000) -// - timestampGroupBy("@timestamp", DateTime, 30 seconds) --> toInt64(toUnixTimestamp(`@timestamp`)/30) -func TimestampGroupBy(timestampField model.Expr, typ DateTimeType, groupByInterval time.Duration) model.Expr { - - createAExp := func(innerFuncName string, interval int64) model.Expr { - toUnixTsFunc := model.NewInfixExpr( - model.NewFunction(innerFuncName, timestampField), - " / ", // TODO nasty hack to make our string-based tests pass. Operator should not contain spaces obviously - model.NewLiteral(interval)) - return model.NewFunction("toInt64", toUnixTsFunc) - } - - switch typ { - case DateTime64: - // as string: fmt.Sprintf("toInt64(toUnixTimestamp(`%s`)/%f)", timestampFieldName, groupByInterval.Seconds()) - return createAExp("toUnixTimestamp64Milli", groupByInterval.Milliseconds()) - case DateTime: - return createAExp("toUnixTimestamp", groupByInterval.Milliseconds()/1000) - default: - logger.Error().Msgf("invalid timestamp fieldname: %s", timestampFieldName) - return model.NewLiteral("invalid") // maybe create new type InvalidExpr? - } -} - -func TimestampGroupByWithTimezone(timestampField model.Expr, typ DateTimeType, - groupByInterval time.Duration, timezone string) model.Expr { - - // If no timezone, or timezone is default (UTC), we just return TimestampGroupBy(...) - if timezone == "" { - return TimestampGroupBy(timestampField, typ, groupByInterval) - } - - createAExp := func(innerFuncName string, interval, offsetMultiplier int64) model.Expr { - var offset model.Expr - offset = model.NewFunction( - "timeZoneOffset", - model.NewFunction( - "toTimezone", - timestampField, model.NewLiteral("'"+timezone+"'"), - ), - ) - if offsetMultiplier != 1 { - offset = model.NewInfixExpr(offset, "*", model.NewLiteral(offsetMultiplier)) - } - - unixTsWithOffset := model.NewInfixExpr( - model.NewFunction(innerFuncName, timestampField), - "+", - offset, - ) - - groupByExpr := model.NewInfixExpr( - model.NewParenExpr(unixTsWithOffset), - " / ", // TODO nasty hack to make our string-based tests pass. Operator should not contain spaces obviously - model.NewLiteral(interval), - ) - - return model.NewFunction("toInt64", groupByExpr) - } - - switch typ { - case DateTime64: - // e.g: (toUnixTimestamp64Milli("timestamp")+timeZoneOffset(toTimezone("timestamp",'Europe/Warsaw'))*1000) / 600000 - return createAExp("toUnixTimestamp64Milli", groupByInterval.Milliseconds(), 1000) - case DateTime: - return createAExp("toUnixTimestamp", groupByInterval.Milliseconds()/1000, 1) - default: - logger.Error().Msgf("invalid timestamp fieldname: %s", timestampFieldName) - return model.NewLiteral("invalid") // maybe create new type InvalidExpr? - } -} diff --git a/platform/frontend_connectors/schema_search_after_transformer.go b/platform/frontend_connectors/schema_search_after_transformer.go index ee0750d44..e299a3ee9 100644 --- a/platform/frontend_connectors/schema_search_after_transformer.go +++ b/platform/frontend_connectors/schema_search_after_transformer.go @@ -3,6 +3,7 @@ package frontend_connectors import ( + "context" "fmt" "github.com/QuesmaOrg/quesma/platform/logger" "github.com/QuesmaOrg/quesma/platform/model" @@ -160,7 +161,7 @@ func (s searchAfterStrategyBasicAndFast) transform(query *model.Query, searchAft return query, nil } -func (s *SchemaCheckPass) applySearchAfterParameter(indexSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applySearchAfterParameter(ctx context.Context, indexSchema schema.Schema, query *model.Query) (*model.Query, error) { searchAfterParsed, err := s.searchAfterStrategy.validateAndParse(query, indexSchema) if err != nil { return nil, err diff --git a/platform/frontend_connectors/schema_search_after_transformer_test.go b/platform/frontend_connectors/schema_search_after_transformer_test.go index dc454ddda..c6511f56c 100644 --- a/platform/frontend_connectors/schema_search_after_transformer_test.go +++ b/platform/frontend_connectors/schema_search_after_transformer_test.go @@ -3,6 +3,7 @@ package frontend_connectors import ( + "context" "fmt" "github.com/QuesmaOrg/quesma/platform/clickhouse" "github.com/QuesmaOrg/quesma/platform/config" @@ -157,7 +158,7 @@ func Test_applySearchAfterParameter(t *testing.T) { tc.transformedQueryExpected.SearchAfter = tc.searchAfter transformer := NewSchemaCheckPass(&config.QuesmaConfiguration{IndexConfig: indexConfig}, tableDiscovery, strategy) - actual, err := transformer.applySearchAfterParameter(Schema, tc.query) + actual, err := transformer.applySearchAfterParameter(context.Background(), Schema, tc.query) assert.Equal(t, tc.errorExpected, err != nil, "Expected error: %v, got: %v", tc.errorExpected, err) if err == nil { assert.Equal(t, diff --git a/platform/frontend_connectors/schema_transformer.go b/platform/frontend_connectors/schema_transformer.go index 3dbeebfb4..c317e647a 100644 --- a/platform/frontend_connectors/schema_transformer.go +++ b/platform/frontend_connectors/schema_transformer.go @@ -3,6 +3,7 @@ package frontend_connectors import ( + "context" "fmt" "github.com/QuesmaOrg/quesma/platform/clickhouse" "github.com/QuesmaOrg/quesma/platform/common_table" @@ -12,6 +13,9 @@ import ( "github.com/QuesmaOrg/quesma/platform/model/typical_queries" "github.com/QuesmaOrg/quesma/platform/schema" "github.com/QuesmaOrg/quesma/platform/transformations" + "github.com/QuesmaOrg/quesma/platform/util" + "github.com/k0kubun/pp" + "slices" "sort" "strings" ) @@ -43,7 +47,7 @@ func (s *SchemaCheckPass) isFieldMapSyntaxEnabled(query *model.Query) bool { return enabled } -func (s *SchemaCheckPass) applyBooleanLiteralLowering(index schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applyBooleanLiteralLowering(ctx context.Context, index schema.Schema, query *model.Query) (*model.Query, error) { visitor := model.NewBaseVisitor() @@ -79,7 +83,7 @@ func (s *SchemaCheckPass) applyBooleanLiteralLowering(index schema.Schema, query // SELECT * FROM "kibana_sample_data_logs" WHERE isIPAddressInRange(CAST(COALESCE(lhs,'0.0.0.0') AS "String"),rhs) - COALESCE is used to handle NULL values // // e.g.: isIPAddressInRange(CAST(COALESCE(IP_ADDR_COLUMN_NAME,'0.0.0.0') AS "String"),'10.10.10.0/24') -func (s *SchemaCheckPass) applyIpTransformations(indexSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applyIpTransformations(ctx context.Context, indexSchema schema.Schema, query *model.Query) (*model.Query, error) { fromTable := query.TableName @@ -125,7 +129,7 @@ func (s *SchemaCheckPass) applyIpTransformations(indexSchema schema.Schema, quer field, found := indexSchema.ResolveFieldByInternalName(lhsValue) if !found { - logger.Error().Msgf("Field %s not found in schema for table %s, should never happen here", lhsValue, fromTable) + logger.ErrorWithCtx(ctx).Msgf("Field %s not found in schema for table %s, should never happen here", lhsValue, fromTable) } if !field.Type.Equal(schema.QuesmaTypeIp) { return model.NewInfixExpr(lhs.(model.Expr), e.Op, rhs.(model.Expr)) @@ -177,7 +181,7 @@ func (s *SchemaCheckPass) applyIpTransformations(indexSchema schema.Schema, quer return query, nil } -func (s *SchemaCheckPass) applyGeoTransformations(schemaInstance schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applyGeoTransformations(ctx context.Context, schemaInstance schema.Schema, query *model.Query) (*model.Query, error) { replace := make(map[string]model.Expr) @@ -296,7 +300,7 @@ func (s *SchemaCheckPass) applyGeoTransformations(schemaInstance schema.Schema, return query, nil } -func (s *SchemaCheckPass) applyArrayTransformations(indexSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applyArrayTransformations(ctx context.Context, indexSchema schema.Schema, query *model.Query) (*model.Query, error) { arrayTypeResolver := arrayTypeResolver{indexSchema: indexSchema} @@ -339,7 +343,7 @@ func (s *SchemaCheckPass) applyArrayTransformations(indexSchema schema.Schema, q return query, nil } -func (s *SchemaCheckPass) applyMapTransformations(indexSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applyMapTransformations(ctx context.Context, indexSchema schema.Schema, query *model.Query) (*model.Query, error) { mapResolver := mapTypeResolver{indexSchema: indexSchema} @@ -396,7 +400,7 @@ func (s *SchemaCheckPass) computeListIndexPrefixesToGroup() []string { return groupIndexesPrefix } -func (s *SchemaCheckPass) applyPhysicalFromExpression(currentSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applyPhysicalFromExpression(ctx context.Context, currentSchema schema.Schema, query *model.Query) (*model.Query, error) { if query.TableName == model.SingleTableNamePlaceHolder { logger.Warn().Msg("applyPhysicalFromExpression: physical table name is not set") @@ -527,7 +531,7 @@ func (s *SchemaCheckPass) applyPhysicalFromExpression(currentSchema schema.Schem } -func (s *SchemaCheckPass) applyWildcardExpansion(indexSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applyWildcardExpansion(ctx context.Context, indexSchema schema.Schema, query *model.Query) (*model.Query, error) { var newColumns []model.Expr var hasWildcard bool @@ -579,7 +583,7 @@ func (s *SchemaCheckPass) applyWildcardExpansion(indexSchema schema.Schema, quer return query, nil } -func (s *SchemaCheckPass) applyFullTextField(indexSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applyFullTextField(ctx context.Context, indexSchema schema.Schema, query *model.Query) (*model.Query, error) { var fullTextFields []string @@ -648,7 +652,7 @@ func (s *SchemaCheckPass) applyFullTextField(indexSchema schema.Schema, query *m } -func (s *SchemaCheckPass) applyTimestampField(indexSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applyTimestampField(ctx context.Context, indexSchema schema.Schema, query *model.Query) (*model.Query, error) { var timestampColumnName string @@ -718,7 +722,273 @@ func (s *SchemaCheckPass) applyTimestampField(indexSchema schema.Schema, query * } -func (s *SchemaCheckPass) applyFieldEncoding(indexSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applyTimestampFieldd(ctx context.Context, indexSchema schema.Schema, query *model.Query) (*model.Query, error) { + table, ok := s.tableDiscovery.TableDefinitions().Load(query.TableName) + if !ok { + logger.WarnWithCtx(ctx).Msgf("table %s not found", query.TableName) + return query, nil + } + + type scopeType = int + const ( + datetime scopeType = iota + datetime64 + none + ) + scope := none + + visitor := model.NewBaseVisitor() + + // we look for: (timestamp_field OP fromUnixTimestamp) + visitor.OverrideVisitInfix = func(b *model.BaseExprVisitor, e model.InfixExpr) interface{} { + visitChildren := func() model.InfixExpr { + return model.NewInfixExpr(e.Left.Accept(b).(model.Expr), e.Op, e.Right.Accept(b).(model.Expr)) + } + + fmt.Println("KK start 1", e) + + // check if timestamp_field is ok + colRef, ok := e.Left.(model.ColumnRef) + fmt.Println("KK start 2", colRef, ok) + if !ok { + return visitChildren() + } + field, ok := indexSchema.ResolveField(colRef.ColumnName) + fmt.Println("KK start 3", field, ok) + if !ok { + logger.WarnWithCtx(ctx).Msgf("field %s not found in schema for table %s", colRef.ColumnName, query.TableName) + return visitChildren() + } + col, ok := table.Cols[field.InternalPropertyName.AsString()] + if !ok { + logger.WarnWithCtx(ctx).Msgf("field %s not found in table %s", field.InternalPropertyName.AsString(), query.TableName) + return visitChildren() + } + fmt.Println("KK start 3", e, col, ok) + isDatetime := col.IsDatetime() + isDateTime64 := col.IsDatetime64() + fmt.Println("KK start 4", isDatetime, isDateTime64) + if !isDatetime && !isDateTime64 { + return visitChildren() + } + + // check if operator is ok + op := strings.TrimSpace(e.Op) + fmt.Println("KK start 5", op) + if !slices.Contains([]string{"=", "!=", ">", "<", ">=", "<=", "/"}, op) { + return visitChildren() + } + + // check if right side is a function we want + tsFunc, ok := e.Right.(model.FunctionExpr) + if !ok { + fmt.Println("koniec") + return visitChildren() + } + if tsFunc.Name != model.FromUnixTimestampMs && tsFunc.Name != model.ToUnixTimestampMs { + //fmt.Println("wtf, name:", tsFunc.Name) + return visitChildren() + } + if len(tsFunc.Args) != 1 { + logger.WarnWithCtx(ctx).Msgf("invalid number of arguments for %s function", tsFunc.Name) + return visitChildren() + } + + arg := tsFunc.Args[0].Accept(b).(model.Expr) + pp.Println("KK 74 ARG", tsFunc.Args[0], arg) + if isDateTime64 { + clickhouseFunc := model.ClickhouseFromUnixTimestampMsToDatetime64Function + return model.NewInfixExpr(colRef, e.Op, model.NewFunction(clickhouseFunc, arg)) + } else if isDatetime { + fmt.Println("KK 79l", arg) + tsAny, isLiteral := arg.(model.LiteralExpr) + if !isLiteral { + logger.WarnWithCtx(ctx).Msgf("invalid argument for %s function: %v. isn't literal, but %T", tsFunc.Name, arg, arg) + return visitChildren() + } + ts, isNumber := util.ExtractNumeric64Maybe(tsAny.Value) + if !isNumber { + logger.WarnWithCtx(ctx).Msgf("invalid argument for %s function: %v. isn't integer, but %T", tsFunc.Name, arg, arg) + return visitChildren() + } + + clickhouseFunc := model.ClickhouseFromUnixTimestampMsToDatetimeFunction + return model.NewInfixExpr(colRef, e.Op, model.NewFunction(clickhouseFunc, model.NewLiteral(int64(ts/1000)))) + } + + return visitChildren() // unreachable + } + + // we look for: toUnixTimestamp(timestamp_field) or fromUnixTimestamp(TimeLiteral) + visitor.OverrideVisitFunction = func(b *model.BaseExprVisitor, e model.FunctionExpr) interface{} { + visitChildren := func() model.FunctionExpr { + return model.NewFunction(e.Name, b.VisitChildren(e.Args)...) + } + + scopeBefore := scope + defer func() { scope = scopeBefore }() + + toUnix := func() interface{} { + if len(e.Args) != 1 { + logger.WarnWithCtx(ctx).Msgf("invalid number of arguments for %s function", e.Name) + return visitChildren() + } + colRef, ok := e.Args[0].(model.ColumnRef) + fmt.Printf("KK colref %v ok %v\n", colRef, ok) + if !ok { + if f, ok := e.Args[0].(model.FunctionExpr); ok && strings.ToLower(f.Name) == "coalesce" && len(f.Args) > 1 { + colRef, ok = f.Args[0].(model.ColumnRef) + if !ok { + logger.WarnWithCtx(ctx).Msgf("invalid argument for %s function: %v. isn't column reference, but %T", e.Name, f.Args[0], f.Args[0]) + return visitChildren() + } + } + } + fmt.Println("KK f start 2", e, colRef) + field, ok := indexSchema.ResolveField(colRef.ColumnName) + fmt.Println("KK f start 2.5", field, ok) + if !ok { + logger.WarnWithCtx(ctx).Msgf("field %s not found in schema for table %s", colRef.ColumnName, query.TableName) + return visitChildren() + } + col, ok := table.Cols[field.InternalPropertyName.AsString()] + if !ok { + logger.WarnWithCtx(ctx).Msgf("field %s not found in table %s", field.InternalPropertyName.AsString(), query.TableName) + return visitChildren() + } + isDatetime := col.IsDatetime() + isDateTime64 := col.IsDatetime64() + fmt.Println("KK f start 3", e, isDatetime, isDateTime64) + if !isDatetime && !isDateTime64 { + return visitChildren() + } + + var clickhouseFunc string + if isDateTime64 { + scope = datetime64 + clickhouseFunc = model.ClickhouseToUnixTimestampMsFromDatetime64Function + } else if isDatetime { + scope = datetime + clickhouseFunc = model.ClickhouseToUnixTimestampMsFromDatetimeFunction + } + + return model.NewFunction(clickhouseFunc, b.VisitChildren(e.Args)...) + } + + fromUnix := func() interface{} { + if len(e.Args) != 1 { + logger.WarnWithCtx(ctx).Msgf("invalid number of arguments for %s function", e.Name) + return visitChildren() + } + + children := b.VisitChildren(e.Args) + var clickhouseFunc string + switch scope { + case datetime: + clickhouseFunc = model.ClickhouseFromUnixTimestampMsToDatetimeFunction + default: + pp.Println("Children", children) + /*threeDigitsOfPrecisionSuffice := utcTs.UnixNano()%1_000_000 == 0 + if threeDigitsOfPrecisionSuffice { + return model.NewFunction("fromUnixTimestamp64Milli", model.NewLiteral(utcTs.UnixMilli())), true + } else { + return model.NewFunction( + "toDateTime64", + model.NewInfixExpr( + model.NewLiteral(utcTs.UnixNano()), + "/", + model.NewLiteral(1_000_000_000), + ), + model.NewLiteral(9), + ), true + }*/ + clickhouseFunc = model.ClickhouseFromUnixTimestampMsToDatetime64Function + } + + return model.NewFunction(clickhouseFunc, b.VisitChildren(e.Args)...) + } + + switch e.Name { + case model.ToUnixTimestampMs: + fmt.Println("KK f START ToUnix", e) + return toUnix() + case model.FromUnixTimestampMs: + fmt.Println("KK f START FromUnix", e) + return fromUnix() + default: + fmt.Println("wtf, name:", e.Name) + return visitChildren() + } + } + + // we look for: DurationLiteral/TimeLiteral + visitor.OverrideVisitLiteral = func(b *model.BaseExprVisitor, l model.LiteralExpr) interface{} { + pp.Println("visitor literal", l) + if timeL, ok := l.Value.(model.TimeLiteral); ok { + ts := timeL.Value + fmt.Println("eee", ts, scope) + switch scope { + case datetime: + return model.NewLiteral(ts.Unix()) + default: + threeDigitsOfPrecisionSuffice := ts.UnixNano()%1_000_000 == 0 + fmt.Println("three?", threeDigitsOfPrecisionSuffice) + if threeDigitsOfPrecisionSuffice { + return model.NewLiteral(ts.UnixMilli()) + } else { + return model.NewFunction( + "toDateTime64", + model.NewInfixExpr( + model.NewLiteral(ts.UnixNano()), + "/", + model.NewLiteral(1_000_000_000), + ), + model.NewLiteral(9), + ) + } + } + } else { + fmt.Println(l.Value) + } + + msLiteral, ok := l.Value.(model.DurationLiteral) + if !ok { + return l.Clone() + } + + fmt.Println("LOL", msLiteral) + + field, ok := indexSchema.ResolveField(msLiteral.TimestampField.ColumnName) + fmt.Println("1 LOL", msLiteral, field, ok) + if !ok { + logger.WarnWithCtx(ctx).Msgf("field %v not found in schema for table %s", msLiteral.TimestampField, query.TableName) + return l.Clone() + } + col, ok := table.Cols[field.InternalPropertyName.AsString()] + fmt.Println("1LOL", msLiteral, col) + if !ok { + logger.WarnWithCtx(ctx).Msgf("field %s not found in table %s", field.InternalPropertyName.AsString(), query.TableName) + return l.Clone() + } + + fmt.Println("2LOL", msLiteral, col.IsDatetime()) + + if col.IsDatetime() { + return model.NewLiteral(msLiteral.Value.Milliseconds() / 1000) + } + return model.NewLiteral(msLiteral.Value.Milliseconds()) + } + + expr := query.SelectCommand.Accept(visitor) + if _, ok := expr.(*model.SelectCommand); ok { + query.SelectCommand = *expr.(*model.SelectCommand) + } + + return query, nil +} + +func (s *SchemaCheckPass) applyFieldEncoding(ctx context.Context, indexSchema schema.Schema, query *model.Query) (*model.Query, error) { + table, ok := s.tableDiscovery.TableDefinitions().Load(query.TableName) if !ok { return nil, fmt.Errorf("table %s not found", query.TableName) @@ -737,7 +1007,7 @@ func (s *SchemaCheckPass) applyFieldEncoding(indexSchema schema.Schema, query *m // This is workaround. // Our query parse resolves columns sometimes. Here we detect it and skip the resolution. if _, ok := indexSchema.ResolveFieldByInternalName(e.ColumnName); ok { - logger.Debug().Msgf("Got field already resolved %s", e.ColumnName) // Reduced to debug as it was really noisy + logger.DebugWithCtx(ctx).Msgf("Got field already resolved %s", e.ColumnName) // Reduced to debug as it was really noisy return e } @@ -815,7 +1085,7 @@ func (s *SchemaCheckPass) applyFieldEncoding(indexSchema schema.Schema, query *m return query, nil } -func (s *SchemaCheckPass) applyRuntimeMappings(indexSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applyRuntimeMappings(ctx context.Context, indexSchema schema.Schema, query *model.Query) (*model.Query, error) { if query.RuntimeMappings == nil { return query, nil @@ -852,7 +1122,7 @@ func (s *SchemaCheckPass) applyRuntimeMappings(indexSchema schema.Schema, query } // it convers out internal date time related fuction to clickhouse functions -func (s *SchemaCheckPass) convertQueryDateTimeFunctionToClickhouse(indexSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) convertQueryDateTimeFunctionToClickhouse(ctx context.Context, indexSchema schema.Schema, query *model.Query) (*model.Query, error) { visitor := model.NewBaseVisitor() @@ -883,7 +1153,7 @@ func (s *SchemaCheckPass) convertQueryDateTimeFunctionToClickhouse(indexSchema s } -func (s *SchemaCheckPass) checkAggOverUnsupportedType(indexSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) checkAggOverUnsupportedType(ctx context.Context, indexSchema schema.Schema, query *model.Query) (*model.Query, error) { aggFunctionPrefixes := []string{"sum", "avg", "quantiles"} @@ -902,7 +1172,7 @@ func (s *SchemaCheckPass) checkAggOverUnsupportedType(indexSchema schema.Schema, if col, ok := indexSchema.ResolveFieldByInternalName(columnRef.ColumnName); ok { for _, dbTypePrefix := range dbTypePrefixes { if strings.HasPrefix(col.InternalPropertyType, dbTypePrefix) { - logger.Warn().Msgf("Aggregation '%s' over unsupported type '%s' in column '%s'", e.Name, dbTypePrefix, col.InternalPropertyName.AsString()) + logger.WarnWithCtx(ctx).Msgf("Aggregation '%s' over unsupported type '%s' in column '%s'", e.Name, dbTypePrefix, col.InternalPropertyName.AsString()) args := b.VisitChildren(e.Args) args[0] = model.NewLiteral("NULL") return model.NewFunction(e.Name, args...) @@ -913,7 +1183,7 @@ func (s *SchemaCheckPass) checkAggOverUnsupportedType(indexSchema schema.Schema, // attributes values are always string, if access, ok := e.Args[0].(model.ArrayAccess); ok { if access.ColumnRef.ColumnName == clickhouse.AttributesValuesColumn { - logger.Warn().Msgf("Unsupported case. Aggregation '%s' over attribute named: '%s'", e.Name, access.Index) + logger.WarnWithCtx(ctx).Msgf("Unsupported case. Aggregation '%s' over attribute named: '%s'", e.Name, access.Index) args := b.VisitChildren(e.Args) args[0] = model.NewLiteral("NULL") return model.NewFunction(e.Name, args...) @@ -969,25 +1239,26 @@ func columnsToAliasedColumns(columns []model.Expr) []model.Expr { return aliasedColumns } -func (s *SchemaCheckPass) applyAliasColumns(indexSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applyAliasColumns(ctx context.Context, indexSchema schema.Schema, query *model.Query) (*model.Query, error) { query.SelectCommand.Columns = columnsToAliasedColumns(query.SelectCommand.Columns) return query, nil } -func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, error) { +func (s *SchemaCheckPass) Transform(ctx context.Context, queries []*model.Query) ([]*model.Query, error) { transformationChain := []struct { TransformationName string - Transformation func(schema.Schema, *model.Query) (*model.Query, error) + Transformation func(context.Context, schema.Schema, *model.Query) (*model.Query, error) }{ // Section 1: from logical to physical {TransformationName: "PhysicalFromExpressionTransformation", Transformation: s.applyPhysicalFromExpression}, {TransformationName: "WildcardExpansion", Transformation: s.applyWildcardExpansion}, {TransformationName: "RuntimeMappings", Transformation: s.applyRuntimeMappings}, - {TransformationName: "AllNecessaryCommonTransformations", Transformation: func(schema schema.Schema, query *model.Query) (*model.Query, error) { - return transformations.ApplyAllNecessaryCommonTransformations(query, schema, s.cfg.MapFieldsDiscoveringEnabled) + {TransformationName: "AllNecessaryCommonTransformations", Transformation: func(ctx context.Context, schema schema.Schema, query *model.Query) (*model.Query, error) { + return transformations.ApplyAllNecessaryCommonTransformations(ctx, query, schema, s.cfg.MapFieldsDiscoveringEnabled) }}, {TransformationName: "AliasColumnsTransformation", Transformation: s.applyAliasColumns}, + {TransformationName: "UnixTimestampToDateTimeTransformation", Transformation: s.applyTimestampFieldd}, // Section 2: generic schema based transformations // @@ -1029,7 +1300,7 @@ func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, err inputQuery = query.SelectCommand.String() } - query, err = transformation.Transformation(query.Schema, query) + query, err = transformation.Transformation(ctx, query.Schema, query) if err != nil { return nil, err } @@ -1044,11 +1315,14 @@ func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, err } queries[k] = query + + //pp.Println("KK 1", query.SelectCommand) + //fmt.Println("KK 2", model.AsString(query.SelectCommand)) } return queries, nil } -func (s *SchemaCheckPass) applyMatchOperator(indexSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applyMatchOperator(ctx context.Context, indexSchema schema.Schema, query *model.Query) (*model.Query, error) { visitor := model.NewBaseVisitor() @@ -1201,7 +1475,7 @@ func (s *SchemaCheckPass) applyMatchOperator(indexSchema schema.Schema, query *m } // applyFromClusterExpression transforms query so that `FROM table` becomes `FROM cluster(clusterName,table)` if applicable -func (s *SchemaCheckPass) applyFromClusterExpression(currentSchema schema.Schema, query *model.Query) (*model.Query, error) { +func (s *SchemaCheckPass) applyFromClusterExpression(ctx context.Context, currentSchema schema.Schema, query *model.Query) (*model.Query, error) { if s.cfg.ClusterName == "" { return query, nil } diff --git a/platform/frontend_connectors/schema_transformer_test.go b/platform/frontend_connectors/schema_transformer_test.go index b16773a98..cd0bc406a 100644 --- a/platform/frontend_connectors/schema_transformer_test.go +++ b/platform/frontend_connectors/schema_transformer_test.go @@ -3,6 +3,7 @@ package frontend_connectors import ( + "context" "github.com/QuesmaOrg/quesma/platform/clickhouse" "github.com/QuesmaOrg/quesma/platform/common_table" "github.com/QuesmaOrg/quesma/platform/config" @@ -103,7 +104,7 @@ func TestApplyTimestampField(t *testing.T) { tt.query.Schema = indexSchema tt.query.Indexes = []string{tt.query.TableName} - actual, err := transform.applyTimestampField(indexSchema, tt.query) + actual, err := transform.applyTimestampField(context.Background(), indexSchema, tt.query) assert.NoError(t, err) assert.Equal(t, model.AsString(tt.expected.SelectCommand), model.AsString(actual.SelectCommand)) @@ -500,7 +501,7 @@ func Test_ipRangeTransform(t *testing.T) { q.Indexes = []string{q.TableName} } - resultQueries, err := transform.Transform(queries[k]) + resultQueries, err := transform.Transform(context.Background(), queries[k]) assert.NoError(t, err) assert.Equal(t, expectedQueries[k].SelectCommand.String(), resultQueries[0].SelectCommand.String()) }) @@ -739,7 +740,7 @@ func Test_arrayType(t *testing.T) { t.Run(util.PrettyTestName(tt.name, i), func(t *testing.T) { tt.query.Schema = indexSchema tt.query.Indexes = []string{tt.query.TableName} - actual, err := transform.Transform([]*model.Query{tt.query}) + actual, err := transform.Transform(context.Background(), []*model.Query{tt.query}) assert.NoError(t, err) if err != nil { @@ -814,7 +815,7 @@ func TestApplyWildCard(t *testing.T) { }, } - actual, err := transform.applyWildcardExpansion(indexSchema, query) + actual, err := transform.applyWildcardExpansion(context.Background(), indexSchema, query) if err != nil { t.Fatal(err) @@ -1063,7 +1064,7 @@ func TestApplyPhysicalFromExpression(t *testing.T) { expectedAsString := model.AsString(tt.expected) - actual, err := transform.applyPhysicalFromExpression(indexSchema, query) + actual, err := transform.applyPhysicalFromExpression(context.Background(), indexSchema, query) if err != nil { t.Fatal(err) @@ -1212,7 +1213,7 @@ func TestFullTextFields(t *testing.T) { expectedAsString := model.AsString(tt.expected) - actual, err := transform.applyFullTextField(indexSchema, query) + actual, err := transform.applyFullTextField(context.Background(), indexSchema, query) if err != nil { t.Fatal(err) @@ -1519,7 +1520,7 @@ func Test_applyMatchOperator(t *testing.T) { InternalPropertyType: "String", Type: schema.QuesmaTypeKeyword, } - actual, err := transform.applyMatchOperator(indexSchema, tt.query) + actual, err := transform.applyMatchOperator(context.Background(), indexSchema, tt.query) if err != nil { t.Fatal(err) } @@ -1621,7 +1622,7 @@ func Test_checkAggOverUnsupportedType(t *testing.T) { t.Fatal("schema not found") } - actual, err := transform.checkAggOverUnsupportedType(indexSchema, tt.query) + actual, err := transform.checkAggOverUnsupportedType(context.Background(), indexSchema, tt.query) if err != nil { t.Fatal(err) } @@ -1802,12 +1803,13 @@ func Test_mapKeys(t *testing.T) { t.Run(util.PrettyTestName(tt.name, i), func(t *testing.T) { tt.query.Schema = indexSchema tt.query.Indexes = []string{tt.query.TableName} + var actual []*model.Query var err error if indexConfig[tt.query.TableName].EnableFieldMapSyntax { - actual, err = transformPass.Transform([]*model.Query{tt.query}) + actual, err = transformPass.Transform(context.Background(), []*model.Query{tt.query}) } else { - actual, err = noTransformPass.Transform([]*model.Query{tt.query}) + actual, err = noTransformPass.Transform(context.Background(), []*model.Query{tt.query}) } assert.NoError(t, err) @@ -1895,7 +1897,7 @@ func Test_cluster(t *testing.T) { t.Run(util.PrettyTestName(tt.name, i), func(t *testing.T) { tt.query.Schema = indexSchema tt.query.Indexes = []string{tt.query.TableName} - actual, err := transform.Transform([]*model.Query{tt.query}) + actual, err := transform.Transform(context.Background(), []*model.Query{tt.query}) assert.NoError(t, err) if err != nil { diff --git a/platform/frontend_connectors/search.go b/platform/frontend_connectors/search.go index 586797463..a3c7c907d 100644 --- a/platform/frontend_connectors/search.go +++ b/platform/frontend_connectors/search.go @@ -362,9 +362,9 @@ type AsyncQuery struct { startTime time.Time } -func (q *QueryRunner) transformQueries(plan *model.ExecutionPlan) error { +func (q *QueryRunner) transformQueries(ctx context.Context, plan *model.ExecutionPlan) error { var err error - plan.Queries, err = q.transformationPipeline.Transform(plan.Queries) + plan.Queries, err = q.transformationPipeline.Transform(ctx, plan.Queries) if err != nil { return fmt.Errorf("error transforming queries: %v", err) } @@ -516,7 +516,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin logger.ErrorWithCtxAndReason(ctx, "Quesma generated invalid SQL query").Msg(queriesBodyConcat) goto logErrorAndReturn } - err = q.transformQueries(plan) + err = q.transformQueries(ctx, plan) if err != nil { goto logErrorAndReturn } diff --git a/platform/frontend_connectors/search_common_table_test.go b/platform/frontend_connectors/search_common_table_test.go index 6db23bcf2..d9539a820 100644 --- a/platform/frontend_connectors/search_common_table_test.go +++ b/platform/frontend_connectors/search_common_table_test.go @@ -185,7 +185,12 @@ func TestSearchCommonTable(t *testing.T) { } }`, WantedSql: []string{ - `SELECT countIf("@timestamp"=toInt64(toUnixTimestamp(toStartOfDay(subDate(now(), INTERVAL 3 week)))) AND "@timestamp"=toInt64(toUnixTimestamp('2024-04-14'))) AS "range_2__aggr__2__count" FROM quesma_common_table WHERE ("__quesma_index_name"='logs-1' OR "__quesma_index_name"='logs-2') -- optimizations: pancake(half)`, + `SELECT countIf("@timestamp"=toStartOfDay(subDate(now(),INTERVAL 3 week)) AND "@timestamp"=fromUnixTimestamp64Milli(1713052800000)) AS "range_2__aggr__2__count" ` + + `FROM quesma_common_table ` + + `WHERE ("__quesma_index_name"='logs-1' OR "__quesma_index_name"='logs-2') +-- optimizations: pancake(half)`, `SELECT "@timestamp", "message", "__quesma_index_name" FROM quesma_common_table WHERE ("__quesma_index_name"='logs-1' OR "__quesma_index_name"='logs-2') LIMIT 10`, }, // we need to return some rows, otherwise pancakes will fail @@ -240,8 +245,8 @@ func TestSearchCommonTable(t *testing.T) { tableMap.Store("logs-1", &clickhouse.Table{ Name: "logs-1", Cols: map[string]*clickhouse.Column{ - "@timestamp": {Name: "@timestamp"}, - "message": {Name: "message"}, + "@timestamp": {Name: "@timestamp", Type: clickhouse.NewBaseType("DateTime64")}, + "message": {Name: "message", Type: clickhouse.NewBaseType("String")}, }, VirtualTable: true, }) @@ -256,8 +261,8 @@ func TestSearchCommonTable(t *testing.T) { tableMap.Store("logs-2", &clickhouse.Table{ Name: "logs-2", Cols: map[string]*clickhouse.Column{ - "@timestamp": {Name: "@timestamp"}, - "message": {Name: "message"}, + "@timestamp": {Name: "@timestamp", Type: clickhouse.NewBaseType("DateTime64")}, + "message": {Name: "message", Type: clickhouse.NewBaseType("String")}, }, VirtualTable: true, }) @@ -272,8 +277,8 @@ func TestSearchCommonTable(t *testing.T) { tableMap.Store("logs-3", &clickhouse.Table{ Name: "logs-3", Cols: map[string]*clickhouse.Column{ - "@timestamp": {Name: "@timestamp"}, - "message": {Name: "message"}, + "@timestamp": {Name: "@timestamp", Type: clickhouse.NewBaseType("DateTime64")}, + "message": {Name: "message", Type: clickhouse.NewBaseType("String")}, }, VirtualTable: false, }) @@ -289,8 +294,8 @@ func TestSearchCommonTable(t *testing.T) { tableMap.Store(common_table.TableName, &clickhouse.Table{ Name: common_table.TableName, Cols: map[string]*clickhouse.Column{ - "@timestamp": {Name: "@timestamp"}, - "message": {Name: "message"}, + "@timestamp": {Name: "@timestamp", Type: clickhouse.NewBaseType("DateTime64")}, + "message": {Name: "message", Type: clickhouse.NewBaseType("String")}, common_table.IndexNameColumn: {Name: common_table.IndexNameColumn}, }, }) @@ -389,6 +394,8 @@ func TestSearchCommonTable(t *testing.T) { t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) } + // You can replace above with this when debugging - much better errors + // db, mock := util.InitSqlMockWithPrettyPrint(t, true) defer db.Close() lm := clickhouse.NewLogManagerWithConnection(db, tableMap) diff --git a/platform/frontend_connectors/search_opensearch_test.go b/platform/frontend_connectors/search_opensearch_test.go index 54565d605..78cbc90a0 100644 --- a/platform/frontend_connectors/search_opensearch_test.go +++ b/platform/frontend_connectors/search_opensearch_test.go @@ -11,6 +11,7 @@ import ( "github.com/QuesmaOrg/quesma/platform/parsers/elastic_query_dsl" "github.com/QuesmaOrg/quesma/platform/schema" "github.com/QuesmaOrg/quesma/platform/testdata" + transformations_delete "github.com/QuesmaOrg/quesma/platform/transformations-delete" "github.com/QuesmaOrg/quesma/platform/types" "github.com/QuesmaOrg/quesma/platform/util" "github.com/stretchr/testify/assert" @@ -52,9 +53,15 @@ func TestSearchOpensearch(t *testing.T) { body, parseErr := types.ParseJSON(tt.QueryJson) assert.NoError(t, parseErr) plan, err := cw.ParseQuery(body) - queries := plan.Queries assert.NoError(t, err, "no ParseQuery error") + + queries := plan.Queries + for j, query := range queries { + queries[j], err = transformations_delete.ApplyNecessaryTransformations(context.Background(), query, &table, s.Tables[tableName]) + assert.NoError(t, err) + } assert.True(t, len(queries) > 0, "len queries > 0") + whereClause := model.AsString(queries[0].SelectCommand.WhereClause) assert.Contains(t, tt.WantedSql, whereClause, "contains wanted sql") diff --git a/platform/frontend_connectors/search_test.go b/platform/frontend_connectors/search_test.go index 26774771c..d86ab52e8 100644 --- a/platform/frontend_connectors/search_test.go +++ b/platform/frontend_connectors/search_test.go @@ -193,6 +193,10 @@ func TestSearchHandler(t *testing.T) { Name: "@timestamp", Type: clickhouse.NewBaseType("DateTime64"), }, + "tsPrec9": { + Name: "@tsPrec9", + Type: clickhouse.NewBaseType("DateTime64"), + }, "user_id": { Name: "user_id", Type: clickhouse.NewBaseType("String"), @@ -268,6 +272,7 @@ func TestSearchHandler(t *testing.T) { "type": {PropertyName: "type", InternalPropertyName: "type", Type: schema.QuesmaTypeKeyword}, "task.enabled": {PropertyName: "task.enabled", InternalPropertyName: "task_enabled", Type: schema.QuesmaTypeBoolean}, "@timestamp": {PropertyName: "@timestamp", InternalPropertyName: "@timestamp", Type: schema.QuesmaTypeDate}, + "tsPrec9": {PropertyName: "tsPrec9", InternalPropertyName: "tsPrec9", Type: schema.QuesmaTypeDate}, "user.id": {PropertyName: "user.id", InternalPropertyName: "user_id", Type: schema.QuesmaTypeKeyword}, "tags": {PropertyName: "tags", InternalPropertyName: "tags", Type: schema.QuesmaTypeKeyword}, "age": {PropertyName: "age", InternalPropertyName: "age", Type: schema.QuesmaTypeInteger}, @@ -303,6 +308,9 @@ func TestSearchHandler(t *testing.T) { for i, tt := range testdata.TestsSearch { t.Run(util.PrettyTestName(tt.Name, i), func(t *testing.T) { + if i != 49 { + t.Skip() + } var conn *sql.DB var mock sqlmock.Sqlmock if len(tt.WantedRegexes) > 0 { @@ -552,6 +560,7 @@ func TestHandlingDateTimeFields(t *testing.T) { Cols: map[string]*clickhouse.Column{ "timestamp": {Name: "timestamp", Type: clickhouse.NewBaseType("DateTime")}, "timestamp64": {Name: "timestamp64", Type: clickhouse.NewBaseType("DateTime64")}, + "@timestamp": {Name: "@timestamp", Type: clickhouse.NewBaseType("DateTime64")}, }, } query := func(fieldName string) string { @@ -586,40 +595,44 @@ func TestHandlingDateTimeFields(t *testing.T) { }` } expectedSelectStatement := map[string]string{ - dateTimeTimestampField: `SELECT toInt64(toUnixTimestamp("timestamp") / 60) AS "aggr__0__key_0", - count(*) AS "aggr__0__count" - FROM __quesma_table_name - WHERE ((("timestamp64">=fromUnixTimestamp64Milli(1706542596491) AND - "timestamp64"<=fromUnixTimestamp64Milli(1706551896491)) AND ("timestamp">= - fromUnixTimestamp(1706542596) AND "timestamp"<=fromUnixTimestamp(1706551896))) - AND NOT (("@timestamp">=fromUnixTimestamp64Milli(1706542596491) AND - "@timestamp"<=fromUnixTimestamp64Milli(1706551896491)))) - GROUP BY toInt64(toUnixTimestamp("timestamp") / 60) AS "aggr__0__key_0" - ORDER BY "aggr__0__key_0" ASC`, - dateTime64TimestampField: `SELECT toInt64(toUnixTimestamp64Milli("timestamp64") / 60000) AS - "aggr__0__key_0", count(*) AS "aggr__0__count" - FROM __quesma_table_name - WHERE ((("timestamp64">=fromUnixTimestamp64Milli(1706542596491) AND - "timestamp64"<=fromUnixTimestamp64Milli(1706551896491)) AND ("timestamp">= - fromUnixTimestamp(1706542596) AND "timestamp"<=fromUnixTimestamp(1706551896))) - AND NOT (("@timestamp">=fromUnixTimestamp64Milli(1706542596491) AND - "@timestamp"<=fromUnixTimestamp64Milli(1706551896491)))) - GROUP BY toInt64(toUnixTimestamp64Milli("timestamp64") / 60000) AS - "aggr__0__key_0" - ORDER BY "aggr__0__key_0" ASC`, - dateTime64OurTimestampField: `SELECT toInt64(toUnixTimestamp64Milli("@timestamp") / 60000) AS "aggr__0__key_0" - , count(*) AS "aggr__0__count" - FROM __quesma_table_name - WHERE ((("timestamp64">=fromUnixTimestamp64Milli(1706542596491) AND - "timestamp64"<=fromUnixTimestamp64Milli(1706551896491)) AND ("timestamp">= - fromUnixTimestamp(1706542596) AND "timestamp"<=fromUnixTimestamp(1706551896))) - AND NOT (("@timestamp">=fromUnixTimestamp64Milli(1706542596491) AND - "@timestamp"<=fromUnixTimestamp64Milli(1706551896491)))) - GROUP BY toInt64(toUnixTimestamp64Milli("@timestamp") / 60000) AS - "aggr__0__key_0" - ORDER BY "aggr__0__key_0" ASC`, + dateTimeTimestampField: ` + SELECT toInt64(toUnixTimestamp("timestamp") / 60) AS "aggr__0__key_0", + count(*) AS "aggr__0__count" + FROM __quesma_table_name + WHERE ((("timestamp64">=fromUnixTimestamp64Milli(1706542596491) AND + "timestamp64"<=fromUnixTimestamp64Milli(1706551896491)) AND ("timestamp">= + fromUnixTimestamp(1706542596) AND "timestamp"<=fromUnixTimestamp(1706551896))) + AND NOT (("@timestamp">=fromUnixTimestamp64Milli(1706542596491) AND + "@timestamp"<=fromUnixTimestamp64Milli(1706551896491)))) + GROUP BY toInt64(toUnixTimestamp("timestamp") / 60) AS "aggr__0__key_0" + ORDER BY "aggr__0__key_0" ASC`, + dateTime64TimestampField: ` + SELECT toInt64(toUnixTimestamp64Milli("timestamp64") / 60000) AS + "aggr__0__key_0", count(*) AS "aggr__0__count" + FROM __quesma_table_name + WHERE ((("timestamp64">=fromUnixTimestamp64Milli(1706542596491) AND + "timestamp64"<=fromUnixTimestamp64Milli(1706551896491)) AND ("timestamp">= + fromUnixTimestamp(1706542596) AND "timestamp"<=fromUnixTimestamp(1706551896))) + AND NOT (("@timestamp">=fromUnixTimestamp64Milli(1706542596491) AND + "@timestamp"<=fromUnixTimestamp64Milli(1706551896491)))) + GROUP BY toInt64(toUnixTimestamp64Milli("timestamp64") / 60000) AS + "aggr__0__key_0" + ORDER BY "aggr__0__key_0" ASC`, + dateTime64OurTimestampField: ` + SELECT toInt64(toUnixTimestamp64Milli("@timestamp") / 60000) AS "aggr__0__key_0" + , count(*) AS "aggr__0__count" + FROM __quesma_table_name + WHERE ((("timestamp64">=fromUnixTimestamp64Milli(1706542596491) AND + "timestamp64"<=fromUnixTimestamp64Milli(1706551896491)) AND ("timestamp">= + fromUnixTimestamp(1706542596) AND "timestamp"<=fromUnixTimestamp(1706551896))) + AND NOT (("@timestamp">=fromUnixTimestamp64Milli(1706542596491) AND + "@timestamp"<=fromUnixTimestamp64Milli(1706551896491)))) + GROUP BY toInt64(toUnixTimestamp64Milli("@timestamp") / 60000) AS + "aggr__0__key_0" + ORDER BY "aggr__0__key_0" ASC`, } + // logger.InitSimpleLoggerForTestsWarnLevel() conn, mock := util.InitSqlMockWithPrettySqlAndPrint(t, false) defer conn.Close() db := backend_connectors.NewClickHouseBackendConnectorWithConnection("", conn) diff --git a/platform/frontend_connectors/terms_enum.go b/platform/frontend_connectors/terms_enum.go index 26ce4d123..f596bc86f 100644 --- a/platform/frontend_connectors/terms_enum.go +++ b/platform/frontend_connectors/terms_enum.go @@ -13,6 +13,7 @@ import ( "github.com/QuesmaOrg/quesma/platform/parsers/elastic_query_dsl" "github.com/QuesmaOrg/quesma/platform/schema" "github.com/QuesmaOrg/quesma/platform/transformations" + transformations_delete "github.com/QuesmaOrg/quesma/platform/transformations-delete" "github.com/QuesmaOrg/quesma/platform/types" "github.com/QuesmaOrg/quesma/platform/v2/core/diag" "github.com/QuesmaOrg/quesma/platform/v2/core/tracing" @@ -97,7 +98,13 @@ func handleTermsEnumRequest(ctx context.Context, body types.JSON, lm clickhouse. where := qt.ParseAutocomplete(indexFilter, field, prefixString, caseInsensitive) selectQuery := buildAutocompleteQuery(field, qt.Table.Name, where.WhereClause, size) - selectQuery, err = transformations.ApplyAllNecessaryCommonTransformations(selectQuery, qt.Schema, isFieldMapSyntaxEnabled) + selectQuery, err = transformations_delete.ApplyNecessaryTransformations(ctx, selectQuery, lm.FindTable(qt.Table.Name), qt.Schema) + if err != nil { + logger.ErrorWithCtx(ctx).Err(err).Msg("error applying necessary transformations") + return json.Marshal(emptyTermsEnumResponse()) + } + + selectQuery, err = transformations.ApplyAllNecessaryCommonTransformations(ctx, selectQuery, qt.Schema, isFieldMapSyntaxEnabled) if err == nil { dbQueryCtx, cancel := context.WithCancel(ctx) // TODO this will be used to cancel goroutine that is executing the query diff --git a/platform/model/bucket_aggregations/dateRange.go b/platform/model/bucket_aggregations/dateRange.go index 0dd0d3c74..c5ac22c94 100644 --- a/platform/model/bucket_aggregations/dateRange.go +++ b/platform/model/bucket_aggregations/dateRange.go @@ -10,52 +10,34 @@ import ( "time" ) -const UnboundedInterval = "*" +var UnboundedInterval model.Expr = nil + +const UnboundedIntervalString = "*" // DateTimeInterval represents a date range. Both Begin and End are either: // 1) in Clickhouse's proper format, e.g. toStartOfDay(subDate(now(), INTERVAL 3 week)) // 2) * (UnboundedInterval), which means no bound type DateTimeInterval struct { - begin string - end string + begin model.Expr + end model.Expr } -func NewDateTimeInterval(begin, end string) DateTimeInterval { +func NewDateTimeInterval(begin, end model.Expr) DateTimeInterval { return DateTimeInterval{ begin: begin, end: end, } } -// BeginTimestampToSQL returns SQL select for the begin timestamp, and a boolean indicating if the select is needed -// We query Clickhouse for this timestamp, as it's defined in Clickhouse's format, e.g. now()-1d. -// It's only 1 more field to our SELECT query, so it shouldn't be a performance issue. -func (interval DateTimeInterval) BeginTimestampToSQL() (sqlSelect model.Expr, selectNeeded bool) { - if interval.begin != UnboundedInterval { - return model.NewFunction("toInt64", model.NewFunction("toUnixTimestamp", model.NewLiteral(interval.begin))), true - } - return nil, false -} - -// EndTimestampToSQL returns SQL select for the end timestamp, and a boolean indicating if the select is needed -// We query Clickhouse for this timestamp, as it's defined in Clickhouse's format, e.g. now()-1d. -// It's only 1 more field to our SELECT query, so it isn't a performance issue. -func (interval DateTimeInterval) EndTimestampToSQL() (sqlSelect model.Expr, selectNeeded bool) { - if interval.end != UnboundedInterval { - return model.NewFunction("toInt64", model.NewFunction("toUnixTimestamp", model.NewLiteral(interval.end))), true - } - return nil, false -} - func (interval DateTimeInterval) ToWhereClause(field model.Expr) model.Expr { - begin, isBegin := interval.BeginTimestampToSQL() - end, isEnd := interval.EndTimestampToSQL() - + var begin, end model.Expr + isBegin := interval.begin != UnboundedInterval + isEnd := interval.end != UnboundedInterval if isBegin { - begin = model.NewInfixExpr(field, ">=", begin) + begin = model.NewInfixExpr(field, ">=", interval.begin) } if isEnd { - end = model.NewInfixExpr(field, "<", end) + end = model.NewInfixExpr(field, "<", interval.end) } if isBegin && isEnd { @@ -69,6 +51,7 @@ func (interval DateTimeInterval) ToWhereClause(field model.Expr) model.Expr { } } +// TODO support time_zone type DateRange struct { ctx context.Context field model.Expr @@ -111,7 +94,7 @@ func (query DateRange) TranslateSqlResponseToJson(rows []model.QueryResultRow) m } func (query DateRange) String() string { - return "date_range, intervals: " + fmt.Sprintf("%v", query.intervals) + return fmt.Sprintf("date_range, intervals: %v", query.intervals) } func (query DateRange) responseForInterval(row *model.QueryResultRow, intervalIdx, columnIdx int) ( @@ -124,7 +107,7 @@ func (query DateRange) responseForInterval(row *model.QueryResultRow, intervalId var from, to int64 var fromString, toString string if query.intervals[intervalIdx].begin == UnboundedInterval { - fromString = UnboundedInterval + fromString = UnboundedIntervalString } else { if columnIdx >= len(row.Cols) { logger.ErrorWithCtx(query.ctx).Msgf("trying to read column after columns length, query: %v, row: %v", query, row) @@ -138,7 +121,7 @@ func (query DateRange) responseForInterval(row *model.QueryResultRow, intervalId } if query.intervals[intervalIdx].end == UnboundedInterval { - toString = UnboundedInterval + toString = UnboundedIntervalString } else { if columnIdx >= len(row.Cols) { logger.ErrorWithCtx(query.ctx).Msgf("trying to read column after columns length, query: %v, row: %v", query, row) @@ -201,7 +184,7 @@ func (query DateRange) CombinatorTranslateSqlResponseToJson(subGroup CombinatorG // TODO: we need translate relative to real time interval := query.intervals[subGroup.idx] if interval.begin != UnboundedInterval { - response["from"] = interval.begin + response["from"] = model.AsString(interval.begin) response["from_as_string"] = interval.begin } if interval.end != UnboundedInterval { diff --git a/platform/model/bucket_aggregations/date_histogram.go b/platform/model/bucket_aggregations/date_histogram.go index 49083f653..2569a7e15 100644 --- a/platform/model/bucket_aggregations/date_histogram.go +++ b/platform/model/bucket_aggregations/date_histogram.go @@ -31,7 +31,8 @@ const ( type DateHistogram struct { ctx context.Context - field model.Expr // name of the field, e.g. timestamp + field model.Expr // full expression as in SQL, e.g. toInt64(toUnixTimestamp(timestamp)) + timestampColumn model.ColumnRef interval string timezone string defaultFormat bool // how we format "key_as_string". If not default, it's milliseconds @@ -40,11 +41,10 @@ type DateHistogram struct { extendedBoundsMax int64 minDocCount int intervalType DateHistogramIntervalType - fieldDateTimeType clickhouse.DateTimeType } -func NewDateHistogram(ctx context.Context, field model.Expr, interval, timezone, format string, minDocCount int, - extendedBoundsMin, extendedBoundsMax int64, intervalType DateHistogramIntervalType, fieldDateTimeType clickhouse.DateTimeType) *DateHistogram { +func NewDateHistogram(ctx context.Context, field model.Expr, timestampColumn model.ColumnRef, interval, timezone, format string, + minDocCount int, extendedBoundsMin, extendedBoundsMax int64, intervalType DateHistogramIntervalType) *DateHistogram { wantedTimezone, err := time.LoadLocation(timezone) if err != nil { @@ -54,9 +54,9 @@ func NewDateHistogram(ctx context.Context, field model.Expr, interval, timezone, defaultFormat := format != "epoch_millis" - return &DateHistogram{ctx: ctx, field: field, interval: interval, timezone: timezone, wantedTimezone: wantedTimezone, - minDocCount: minDocCount, extendedBoundsMin: extendedBoundsMin, extendedBoundsMax: extendedBoundsMax, - intervalType: intervalType, fieldDateTimeType: fieldDateTimeType, defaultFormat: defaultFormat} + return &DateHistogram{ctx: ctx, field: field, timestampColumn: timestampColumn, interval: interval, timezone: timezone, + wantedTimezone: wantedTimezone, minDocCount: minDocCount, extendedBoundsMin: extendedBoundsMin, + extendedBoundsMax: extendedBoundsMax, intervalType: intervalType, defaultFormat: defaultFormat} } func (typ DateHistogramIntervalType) String(ctx context.Context) string { @@ -162,13 +162,46 @@ func (query *DateHistogram) generateSQLForFixedInterval() model.Expr { interval, err := util.ParseInterval(query.interval) if err != nil { logger.ErrorWithCtx(query.ctx).Msg(err.Error()) + return model.InvalidExpr } - dateTimeType := query.fieldDateTimeType - if query.fieldDateTimeType == clickhouse.Invalid { - logger.ErrorWithCtx(query.ctx).Msgf("invalid date type for DateHistogram %+v. Using DateTime64 as default.", query) - dateTimeType = defaultDateTimeType + + fmt.Println("DH, timestampColumn", query.timestampColumn) + + var groupBy model.InfixExpr + // If no timezone, or timezone is default (UTC) + if query.timezone == "" { + groupBy = model.NewInfixExpr( + model.NewFunction(model.ToUnixTimestampMs, query.field), + " / ", // TODO nasty hack to make our string-based tests pass. Operator should not contain spaces obviously + model.NewDurationLiteral(interval, query.timestampColumn), + ) + } else { + offset := model.NewInfixExpr( + model.NewFunction( + "timeZoneOffset", + model.NewFunction( + "toTimezone", + query.field, model.NewLiteralSingleQuoteString(query.timezone), + ), + ), + "*", + model.NewDurationLiteral(time.Second, query.timestampColumn), + ) + + unixTsWithOffset := model.NewInfixExpr( + model.NewFunction(model.ToUnixTimestampMs, query.field), + "+", + offset, + ) + + groupBy = model.NewInfixExpr( + model.NewParenExpr(unixTsWithOffset), + " / ", // TODO nasty hack to make our string-based tests pass. Operator should not contain spaces obviously + model.NewDurationLiteral(interval, query.timestampColumn), + ) } - return clickhouse.TimestampGroupByWithTimezone(query.field, dateTimeType, interval, query.timezone) + + return model.NewFunction("toInt64", groupBy) } func (query *DateHistogram) generateSQLForCalendarInterval() model.Expr { diff --git a/platform/model/bucket_aggregations/ip_range.go b/platform/model/bucket_aggregations/ip_range.go index e8c4be1c5..9c51225c8 100644 --- a/platform/model/bucket_aggregations/ip_range.go +++ b/platform/model/bucket_aggregations/ip_range.go @@ -114,12 +114,12 @@ func (interval IpInterval) ToWhereClause(field model.Expr) model.Expr { // hasBeginInResponse returns true if we should add 'from' field to the response. // We do that <=> begin is not 0.0.0.0 (unbounded) func (interval IpInterval) hasBeginInResponse() bool { - return interval.begin != UnboundedInterval && netip.MustParseAddr(interval.begin) != netip.MustParseAddr("::") + return interval.begin != UnboundedIntervalString && netip.MustParseAddr(interval.begin) != netip.MustParseAddr("::") } // hasEndInResponse returns true if we should add 'to' field to the response. func (interval IpInterval) hasEndInResponse() bool { - return interval.end != UnboundedInterval + return interval.end != UnboundedIntervalString } // String returns key part of the response, e.g. "1.0-2.0", or "*-6.55" diff --git a/platform/model/constants.go b/platform/model/constants.go index a409d9c24..b6b1dd34d 100644 --- a/platform/model/constants.go +++ b/platform/model/constants.go @@ -9,4 +9,11 @@ const ( DateHourFunction = "__quesma_date_hour" MatchOperator = "__quesma_match" + + FromUnixTimestampMs = "__quesma_from_unix_timestamp_ms" + ToUnixTimestampMs = "__quesma_to_unix_timestamp_ms" + ClickhouseFromUnixTimestampMsToDatetime64Function = "fromUnixTimestamp64Milli" + ClickhouseFromUnixTimestampMsToDatetimeFunction = "fromUnixTimestamp" + ClickhouseToUnixTimestampMsFromDatetime64Function = "toUnixTimestamp64Milli" + ClickhouseToUnixTimestampMsFromDatetimeFunction = "toUnixTimestamp" ) diff --git a/platform/model/expr.go b/platform/model/expr.go index 316ff0ae5..d7b89fe5a 100644 --- a/platform/model/expr.go +++ b/platform/model/expr.go @@ -5,6 +5,7 @@ package model import ( "fmt" "strconv" + "time" ) // Expr is a generic representation of an expression which is a part of the SQL query. @@ -96,9 +97,35 @@ type ( Value any EscapeType EscapeType // only meaningful if Value is string } + TimeLiteral struct { + Value time.Time + TimestampField ColumnRef + } + DurationLiteral struct { + Value time.Duration + TimestampField ColumnRef + } EscapeType string ) +func ToTimeLiteral(expr Expr) (tl TimeLiteral, ok bool) { + if literal, ok := expr.(LiteralExpr); ok { + if timeLiteral, ok := literal.Value.(TimeLiteral); ok { + return timeLiteral, true + } + } + return TimeLiteral{}, false +} + +func ToDurationLiteral(expr Expr) (dl DurationLiteral, ok bool) { + if literal, ok := expr.(LiteralExpr); ok { + if durationLiteral, ok := literal.Value.(DurationLiteral); ok { + return durationLiteral, true + } + } + return DurationLiteral{}, false +} + const ( NormalNotEscaped EscapeType = "normal" // used in 90% cases, everywhere but not in 'LIKE' exprs NotEscapedLikePrefix EscapeType = "like_prefix" // used in 'LIKE' exprs, will be rendered 'value%' @@ -149,6 +176,14 @@ func NewLiteral(value any) LiteralExpr { return LiteralExpr{Value: value, EscapeType: NormalNotEscaped} } +func NewTimeLiteral(value time.Time, timestampField ColumnRef) LiteralExpr { + return NewLiteral(TimeLiteral{Value: value, TimestampField: timestampField}) +} + +func NewDurationLiteral(value time.Duration, timestampField ColumnRef) LiteralExpr { + return NewLiteral(DurationLiteral{Value: value, TimestampField: timestampField}) +} + // NewLiteralSingleQuoteString simply does: string -> 'string', anything_else -> anything_else func NewLiteralSingleQuoteString(value any) LiteralExpr { switch v := value.(type) { diff --git a/platform/model/expr_string_renderer.go b/platform/model/expr_string_renderer.go index 57dbbcaba..c80dc74da 100644 --- a/platform/model/expr_string_renderer.go +++ b/platform/model/expr_string_renderer.go @@ -89,6 +89,10 @@ func (v *renderer) VisitLiteral(l LiteralExpr) interface{} { logger.WarnWithThrottling("unknown_literal", "VisitLiteral %s", val) return escapeStringNormal(val) // like normal } + case DurationLiteral: + return fmt.Sprintf("%v", val.Value) + case TimeLiteral: + return fmt.Sprintf("%v", val.Value.UnixMilli()) default: return fmt.Sprintf("%v", val) } diff --git a/platform/model/transformation_pipeline.go b/platform/model/transformation_pipeline.go index e9e000a2c..686ba273d 100644 --- a/platform/model/transformation_pipeline.go +++ b/platform/model/transformation_pipeline.go @@ -2,6 +2,8 @@ // SPDX-License-Identifier: Elastic-2.0 package model +import "context" + type TransformationPipeline struct { transformers []QueryTransformer } @@ -10,10 +12,10 @@ func NewTransformationPipeline() *TransformationPipeline { return &TransformationPipeline{} } -func (o *TransformationPipeline) Transform(queries []*Query) ([]*Query, error) { +func (o *TransformationPipeline) Transform(ctx context.Context, queries []*Query) ([]*Query, error) { var err error for _, transformer := range o.transformers { - queries, err = transformer.Transform(queries) + queries, err = transformer.Transform(ctx, queries) if err != nil { return nil, err } diff --git a/platform/model/transformers.go b/platform/model/transformers.go index 4adb7fe0d..0b5cb0d12 100644 --- a/platform/model/transformers.go +++ b/platform/model/transformers.go @@ -5,7 +5,7 @@ package model import "context" type QueryTransformer interface { - Transform(query []*Query) ([]*Query, error) + Transform(ctx context.Context, query []*Query) ([]*Query, error) } type ResultTransformer interface { diff --git a/platform/model/where_visitor.go b/platform/model/where_visitor.go index 4bb9628d1..ec25e4165 100644 --- a/platform/model/where_visitor.go +++ b/platform/model/where_visitor.go @@ -3,8 +3,7 @@ package model import ( - "github.com/QuesmaOrg/quesma/platform/util" - "math" + "time" ) // FindTimestampLowerBound returns y if there is "x>=y" or "x>y" in the WHERE clause, but only as a single top-level expression. @@ -14,23 +13,17 @@ import ( // If there are multiple such expressions, we return the smallest one. // // TODO: add upper bound here too, when bucket_nr=1 in auto_date_histogram (only use case of this function), it's not needed. -func FindTimestampLowerBound(field ColumnRef, whereClause Expr) (timestampInMillis int64, found bool) { - timestampInMillis = math.MaxInt64 +func FindTimestampLowerBound(field ColumnRef, whereClause Expr) (lowerBoundTs time.Time, found bool) { visitor := NewBaseVisitor() visitor.OverrideVisitInfix = func(visitor *BaseExprVisitor, e InfixExpr) interface{} { if columnRef, ok := e.Left.(ColumnRef); ok && columnRef == field && e.Op == ">=" || e.Op == ">" { - if fun, ok := e.Right.(FunctionExpr); ok && fun.Name == "fromUnixTimestamp64Milli" && len(fun.Args) == 1 { + if fun, ok := e.Right.(FunctionExpr); ok && fun.Name == FromUnixTimestampMs && len(fun.Args) == 1 { if rhs, ok := fun.Args[0].(LiteralExpr); ok { - if rhsInt64, ok := util.ExtractInt64Maybe(rhs.Value); ok { - timestampInMillis = min(timestampInMillis, rhsInt64) - found = true - } - } - } else if fun, ok := e.Right.(FunctionExpr); ok && fun.Name == "fromUnixTimestamp" && len(fun.Args) == 1 { - if rhs, ok := fun.Args[0].(LiteralExpr); ok { - if rhsInt64, ok := util.ExtractInt64Maybe(rhs.Value); ok { - timestampInMillis = min(timestampInMillis, rhsInt64*1000) // seconds -> milliseconds - found = true + if timestamp, ok := rhs.Value.(TimeLiteral); ok { + if !found || timestamp.Value.Before(lowerBoundTs) { + lowerBoundTs = timestamp.Value + found = true + } } } } diff --git a/platform/optimize/pipeline.go b/platform/optimize/pipeline.go index a70af5d17..fe814cae5 100644 --- a/platform/optimize/pipeline.go +++ b/platform/optimize/pipeline.go @@ -3,6 +3,7 @@ package optimize import ( + "context" "github.com/QuesmaOrg/quesma/platform/config" "github.com/QuesmaOrg/quesma/platform/model" "strings" @@ -74,7 +75,7 @@ func (s *OptimizePipeline) findConfig(transformer OptimizeTransformer, queries [ return !transformer.IsEnabledByDefault(), make(map[string]string) } -func (s *OptimizePipeline) Transform(queries []*model.Query) ([]*model.Query, error) { +func (s *OptimizePipeline) Transform(ctx context.Context, queries []*model.Query) ([]*model.Query, error) { if len(queries) == 0 { return queries, nil diff --git a/platform/optimize/pipeline_test.go b/platform/optimize/pipeline_test.go index aea28d84a..91e6314a1 100644 --- a/platform/optimize/pipeline_test.go +++ b/platform/optimize/pipeline_test.go @@ -3,6 +3,7 @@ package optimize import ( + "context" "fmt" "github.com/QuesmaOrg/quesma/platform/config" "github.com/QuesmaOrg/quesma/platform/model" @@ -59,7 +60,7 @@ func Test_cacheQueries(t *testing.T) { }, } pipeline := NewOptimizePipeline(&cfg) - optimized, err := pipeline.Transform(queries) + optimized, err := pipeline.Transform(context.Background(), queries) if err != nil { t.Fatalf("error optimizing query: %v", err) } @@ -212,7 +213,7 @@ func Test_dateTrunc(t *testing.T) { }, } pipeline := NewOptimizePipeline(&cfg) - optimized, err := pipeline.Transform(queries) + optimized, err := pipeline.Transform(context.Background(), queries) if err != nil { t.Fatalf("error optimizing query: %v", err) @@ -448,7 +449,7 @@ func Test_materialized_view_replace(t *testing.T) { }, } pipeline := NewOptimizePipeline(&cfg) - optimized, err := pipeline.Transform(queries) + optimized, err := pipeline.Transform(context.Background(), queries) if err != nil { t.Fatalf("error optimizing query: %v", err) diff --git a/platform/parsers/elastic_query_dsl/aggregation_date_range_parser.go b/platform/parsers/elastic_query_dsl/aggregation_date_range_parser.go index 9a388556b..79901e0b6 100644 --- a/platform/parsers/elastic_query_dsl/aggregation_date_range_parser.go +++ b/platform/parsers/elastic_query_dsl/aggregation_date_range_parser.go @@ -4,8 +4,8 @@ package elastic_query_dsl import ( "fmt" + "github.com/QuesmaOrg/quesma/platform/model" "github.com/QuesmaOrg/quesma/platform/model/bucket_aggregations" - "unicode" ) func (cw *ClickhouseQueryTranslator) parseDateRangeAggregation(aggregation *pancakeAggregationTreeNode, params QueryMap) (err error) { @@ -13,6 +13,11 @@ func (cw *ClickhouseQueryTranslator) parseDateRangeAggregation(aggregation *panc if field == nil { return fmt.Errorf("no field specified for date range aggregation, params: %v", params) } + colRef, ok := field.(model.ColumnRef) + if !ok { + return fmt.Errorf("field is not a column reference, but %T, field: %v", field, field) + } + format := cw.parseStringField(params, "format", "") ranges, err := cw.parseArrayField(params, "ranges") if err != nil { @@ -27,19 +32,19 @@ func (cw *ClickhouseQueryTranslator) parseDateRangeAggregation(aggregation *panc return fmt.Errorf("range is not a map, but %T, range: %v", rangeRaw, rangeRaw) } - const defaultIntervalBound = bucket_aggregations.UnboundedInterval - intervalBegin := defaultIntervalBound + const defaultIntervalBound = bucket_aggregations.UnboundedIntervalString + var intervalBegin model.Expr if from := cw.parseStringField(rangeMap, "from", defaultIntervalBound); from != defaultIntervalBound { - intervalBegin, err = cw.parseDateTimeInClickhouseMathLanguage(from) + intervalBegin, err = cw.parseDateTimeInClickhouseMathLanguage(from, colRef) if err != nil { return err } selectColumnsNr++ } - intervalEnd := bucket_aggregations.UnboundedInterval + var intervalEnd model.Expr if to := cw.parseStringField(rangeMap, "to", defaultIntervalBound); to != defaultIntervalBound { - intervalEnd, err = cw.parseDateTimeInClickhouseMathLanguage(to) + intervalEnd, err = cw.parseDateTimeInClickhouseMathLanguage(to, colRef) if err != nil { return err } @@ -55,35 +60,18 @@ func (cw *ClickhouseQueryTranslator) parseDateRangeAggregation(aggregation *panc // parseDateTimeInClickhouseMathLanguage parses dateTime from Clickhouse's format // It's described here: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-daterange-aggregation.html // Maybe not 100% of it is implemented, not sure. -func (cw *ClickhouseQueryTranslator) parseDateTimeInClickhouseMathLanguage(dateTime string) (string, error) { +func (cw *ClickhouseQueryTranslator) parseDateTimeInClickhouseMathLanguage(dateTime string, field model.ColumnRef) (model.Expr, error) { // So far we've seen only either: - // 1. 2024-01-01 format - if cw.isSimpleDate(dateTime) { - return "'" + dateTime + "'", nil + // 1. 2024-01-01 format TODO update + dateManager := NewDateManager(cw.Ctx) + if parsed := dateManager.ParseDateUsualFormat(dateTime, field); parsed != nil { + return parsed, nil } // 2. expressions like now() or now()-1d - res, err := cw.parseDateMathExpression(dateTime) + res, err := cw.parseDateMathExpression(dateTime, field) if err != nil { - return "", err + return nil, err } return res, nil } - -// isSimpleDate returns true if the given dateTime is a simple date string in format 2024-04-15 -func (cw *ClickhouseQueryTranslator) isSimpleDate(dateTime string) bool { - if len(dateTime) != len("2024-04-15") { - return false - } - for _, idx := range []int{0, 1, 2, 3, 5, 6, 8, 9} { - if !unicode.IsDigit(rune(dateTime[idx])) { - return false - } - } - for _, idx := range []int{4, 7} { - if dateTime[idx] != '-' { - return false - } - } - return true -} diff --git a/platform/parsers/elastic_query_dsl/aggregation_parser.go b/platform/parsers/elastic_query_dsl/aggregation_parser.go index d7602b4e8..d732181a4 100644 --- a/platform/parsers/elastic_query_dsl/aggregation_parser.go +++ b/platform/parsers/elastic_query_dsl/aggregation_parser.go @@ -270,7 +270,7 @@ func (cw *ClickhouseQueryTranslator) parseFieldField(shouldBeMap any, aggregatio } if fieldRaw, ok := Map["field"]; ok { if field, ok := fieldRaw.(string); ok { - return model.NewColumnRef(ResolveField(cw.Ctx, field, cw.Schema)) // model.NewSelectColumnTableField(cw.Table.ResolveField(cw.Ctx, field)) // remove this resolve? we do all transforms after parsing is done? + return model.NewColumnRef(field) } else { logger.WarnWithCtx(cw.Ctx).Msgf("field is not a string, but %T, value: %v", fieldRaw, fieldRaw) } diff --git a/platform/parsers/elastic_query_dsl/date_expr.go b/platform/parsers/elastic_query_dsl/date_expr.go index 13279953d..92e8b2064 100644 --- a/platform/parsers/elastic_query_dsl/date_expr.go +++ b/platform/parsers/elastic_query_dsl/date_expr.go @@ -5,6 +5,7 @@ package elastic_query_dsl import ( "errors" "fmt" + "github.com/QuesmaOrg/quesma/platform/model" "strconv" "strings" "time" @@ -99,7 +100,7 @@ func ParseDateMathExpression(input string) (*DateMathExpression, error) { } type DateMathExpressionRenderer interface { - RenderSQL(expression *DateMathExpression) (string, error) + RenderExpr(expression *DateMathExpression) (model.Expr, error) } const DateMathExpressionFormatLiteral = "literal" @@ -123,11 +124,9 @@ func DateMathExpressionRendererFactory(format string) DateMathExpressionRenderer type DateMathAsClickhouseIntervals struct{} -func (b *DateMathAsClickhouseIntervals) RenderSQL(expression *DateMathExpression) (string, error) { +func (b *DateMathAsClickhouseIntervals) RenderExpr(expression *DateMathExpression) (model.Expr, error) { - var result string - - result = "now()" + result := model.NewFunction("now") for _, interval := range expression.intervals { @@ -147,10 +146,10 @@ func (b *DateMathAsClickhouseIntervals) RenderSQL(expression *DateMathExpression unit, err := b.parseTimeUnit(interval.unit) if err != nil { - return "", fmt.Errorf("invalid time unit: %s", interval.unit) + return nil, fmt.Errorf("invalid time unit: %s", interval.unit) } - result = fmt.Sprintf("%s(%s, INTERVAL %d %s)", op, result, amount, unit) + result = model.NewFunction(op, result, model.NewLiteral(fmt.Sprintf("INTERVAL %d %s", amount, unit))) } var roundingFunction = map[string]string{ @@ -163,9 +162,9 @@ func (b *DateMathAsClickhouseIntervals) RenderSQL(expression *DateMathExpression if expression.rounding != "" { if function, ok := roundingFunction[string(expression.rounding)]; ok { - result = fmt.Sprintf("%s(%s)", function, result) + result = model.NewFunction(function, result) } else { - return "", fmt.Errorf("invalid rounding unit: %s", expression.rounding) + return nil, fmt.Errorf("invalid rounding unit: %s", expression.rounding) } } @@ -197,9 +196,7 @@ type DateMathExpressionAsLiteral struct { now time.Time } -func (b *DateMathExpressionAsLiteral) RenderSQL(expression *DateMathExpression) (string, error) { - - const format = "2006-01-02 15:04:05" +func (b *DateMathExpressionAsLiteral) RenderExpr(expression *DateMathExpression) (model.Expr, error) { result := b.now @@ -234,7 +231,7 @@ func (b *DateMathExpressionAsLiteral) RenderSQL(expression *DateMathExpression) result = result.AddDate(amount, 0, 0) default: - return "", fmt.Errorf("unsupported time unit: %s", interval.unit) + return nil, fmt.Errorf("unsupported time unit: %s", interval.unit) } } @@ -254,8 +251,8 @@ func (b *DateMathExpressionAsLiteral) RenderSQL(expression *DateMathExpression) result = time.Date(result.Year(), 1, 1, 0, 0, 0, 0, result.Location()) default: - return "", fmt.Errorf("unsupported rounding unit: %s", expression.rounding) + return nil, fmt.Errorf("unsupported rounding unit: %s", expression.rounding) } - return fmt.Sprintf("'%s'", result.Format(format)), nil + return model.NewFunction(model.FromUnixTimestampMs, model.NewLiteral(model.TimeLiteral{Value: result})), nil } diff --git a/platform/parsers/elastic_query_dsl/date_expr_test.go b/platform/parsers/elastic_query_dsl/date_expr_test.go index 95dcb652e..f92ab15d7 100644 --- a/platform/parsers/elastic_query_dsl/date_expr_test.go +++ b/platform/parsers/elastic_query_dsl/date_expr_test.go @@ -3,10 +3,12 @@ package elastic_query_dsl import ( + "github.com/QuesmaOrg/quesma/platform/model" "github.com/QuesmaOrg/quesma/platform/util" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "testing" + "time" ) func TestParseDateMathExpression(t *testing.T) { @@ -34,12 +36,12 @@ func TestParseDateMathExpression(t *testing.T) { func Test_parseDateTimeInClickhouseMathLanguage(t *testing.T) { exprs := map[string]string{ "now": "now()", - "now-15m": "subDate(now(), INTERVAL 15 minute)", - "now-15m+5s": "addDate(subDate(now(), INTERVAL 15 minute), INTERVAL 5 second)", + "now-15m": "subDate(now(),INTERVAL 15 minute)", + "now-15m+5s": "addDate(subDate(now(),INTERVAL 15 minute),INTERVAL 5 second)", "now-": "now()", - "now-15m+/M": "toStartOfMonth(subDate(now(), INTERVAL 15 minute))", - "now-15m/d": "toStartOfDay(subDate(now(), INTERVAL 15 minute))", - "now-15m+5s/w": "toStartOfWeek(addDate(subDate(now(), INTERVAL 15 minute), INTERVAL 5 second))", + "now-15m+/M": "toStartOfMonth(subDate(now(),INTERVAL 15 minute))", + "now-15m/d": "toStartOfDay(subDate(now(),INTERVAL 15 minute))", + "now-15m+5s/w": "toStartOfWeek(addDate(subDate(now(),INTERVAL 15 minute),INTERVAL 5 second))", "now-/Y": "toStartOfYear(now())", } @@ -55,38 +57,39 @@ func Test_parseDateTimeInClickhouseMathLanguage(t *testing.T) { return } - resultExpr, err := renderer.RenderSQL(dt) + resultExpr, err := renderer.RenderExpr(dt) assert.NoError(t, err) if err != nil { return } - assert.Equal(t, expected, resultExpr) + assert.Equal(t, expected, model.AsString(resultExpr)) }) } } func Test_DateMathExpressionAsLiteral(t *testing.T) { + now := time.Date(2024, 5, 17, 12, 1, 2, 3, time.UTC) tests := []struct { input string - expected string + expected time.Time }{ - {"now", "'2024-05-17 12:01:02'"}, - {"now-15m", "'2024-05-17 11:46:02'"}, - {"now-15m+5s", "'2024-05-17 11:46:07'"}, - {"now-", "'2024-05-17 12:01:02'"}, - {"now-15m+/M", "'2024-05-01 00:00:00'"}, - {"now-15m/d", "'2024-05-17 00:00:00'"}, - {"now-15m+5s/w", "'2024-05-12 00:00:00'"}, // week starts on Sunday here so 2024-05-12 is the start of the week - {"now-/Y", "'2024-01-01 00:00:00'"}, - {"now-2M", "'2024-03-17 12:01:02'"}, - {"now-1y", "'2023-05-17 12:01:02'"}, - {"now-1w", "'2024-05-10 12:01:02'"}, - {"now-1s", "'2024-05-17 12:01:01'"}, - {"now-1m", "'2024-05-17 12:00:02'"}, - {"now-1d", "'2024-05-16 12:01:02'"}, + {"now", now}, + {"now-15m", now.Add(-15 * time.Minute)}, + {"now-15m+5s", now.Add(-15 * time.Minute).Add(5 * time.Second)}, + {"now-", now}, + {"now-15m+/M", time.Date(2024, 5, 1, 0, 0, 0, 0, time.UTC)}, + {"now-15m/d", time.Date(2024, 5, 17, 0, 0, 0, 0, time.UTC)}, + {"now-15m+5s/w", time.Date(2024, 5, 12, 0, 0, 0, 0, time.UTC)}, // week starts on Sunday here so 2024-05-12 is the start of the week + {"now-/Y", time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC)}, + {"now-2M", now.AddDate(0, -2, 0)}, + {"now-1y", now.AddDate(-1, 0, 0)}, + {"now-1w", now.Add(-7 * 24 * time.Hour)}, + {"now-1s", now.Add(-1 * time.Second)}, + {"now-1m", now.Add(-1 * time.Minute)}, + {"now-1d", now.Add(-24 * time.Hour)}, } for i, test := range tests { @@ -102,14 +105,14 @@ func Test_DateMathExpressionAsLiteral(t *testing.T) { // this renderer is single use, so we can't reuse it renderer := DateMathExpressionRendererFactory(DateMathExpressionFormatLiteralTest) - resultExpr, err := renderer.RenderSQL(dt) + resultExpr, err := renderer.RenderExpr(dt) assert.NoError(t, err) if err != nil { return } - assert.Equal(t, test.expected, resultExpr) + assert.Equal(t, model.NewFunction(model.FromUnixTimestampMs, model.NewLiteral(model.TimeLiteral{Value: test.expected})), resultExpr) }) } } diff --git a/platform/parsers/elastic_query_dsl/dates.go b/platform/parsers/elastic_query_dsl/dates.go index 819c624fc..763b87e03 100644 --- a/platform/parsers/elastic_query_dsl/dates.go +++ b/platform/parsers/elastic_query_dsl/dates.go @@ -5,10 +5,10 @@ package elastic_query_dsl import ( "context" - "github.com/QuesmaOrg/quesma/platform/clickhouse" - "github.com/QuesmaOrg/quesma/platform/logger" + "fmt" "github.com/QuesmaOrg/quesma/platform/model" "github.com/QuesmaOrg/quesma/platform/util" + "github.com/k0kubun/pp" "strconv" "time" ) @@ -21,8 +21,8 @@ func NewDateManager(ctx context.Context) DateManager { return DateManager{ctx} } -var acceptableDateTimeFormats = []string{"2006", "2006-01", "2006-01-02", "2006-01-02", "2006-01-02T15", - "2006-01-02T15:04", "2006-01-02T15:04:05", "2006-01-02T15:04:05Z07", "2006-01-02T15:04:05Z07:00"} +var acceptableDateTimeFormats = []string{time.RFC3339Nano, "2006", "2006-01", "2006-01-02", "2006-01-02", "2006-01-02T15", + "2006-01-02T15:04", "2006-01-02T15:04:05", "2006-01-02T15:04:05.99999999Z", "2006-01-02T15:04:05Z07", "2006-01-02T15:04:05Z07:00"} // parseStrictDateOptionalTimeOrEpochMillis parses date, which is in [strict_date_optional_time || epoch_millis] format // (https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html) @@ -56,6 +56,8 @@ func (dm DateManager) parseStrictDateOptionalTimeOrEpochMillis(date any) (utcTim // https://github.com/relvacode/iso8601/pull/26 for _, format := range acceptableDateTimeFormats { if t, err := time.Parse(format, asString); err == nil { + + fmt.Println(t, "format:", format) return t, true } } @@ -66,30 +68,36 @@ func (dm DateManager) parseStrictDateOptionalTimeOrEpochMillis(date any) (utcTim // ParseDateUsualFormat parses date expression, which is in [strict_date_optional_time || epoch_millis] format // (https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html) // It's most usual format for date in Kibana, used e.g. in Query DSL's range, or date_histogram. -func (dm DateManager) ParseDateUsualFormat(exprFromRequest any, datetimeType clickhouse.DateTimeType) ( - resultExpr model.Expr, parsingSucceeded bool) { - if utcTs, success := dm.parseStrictDateOptionalTimeOrEpochMillis(exprFromRequest); success { - switch datetimeType { - case clickhouse.DateTime64: - threeDigitsOfPrecisionSuffice := utcTs.UnixNano()%1_000_000 == 0 - if threeDigitsOfPrecisionSuffice { - return model.NewFunction("fromUnixTimestamp64Milli", model.NewLiteral(utcTs.UnixMilli())), true - } else { - return model.NewFunction( - "toDateTime64", - model.NewInfixExpr( - model.NewLiteral(utcTs.UnixNano()), - "/", - model.NewLiteral(1_000_000_000), - ), - model.NewLiteral(9), - ), true +func (dm DateManager) ParseDateUsualFormat(exprFromRequest any, field model.ColumnRef) (resultExpr model.Expr) { + if unixTsInMs, success := dm.parseStrictDateOptionalTimeOrEpochMillis(exprFromRequest); success { + pp.Println("DDDD", unixTsInMs, unixTsInMs.Nanosecond()) + return model.NewFunction(model.FromUnixTimestampMs, model.NewTimeLiteral(unixTsInMs, field)) + } + return nil + + /* + if utcTs, success := dm.parseStrictDateOptionalTimeOrEpochMillis(exprFromRequest); success { + switch datetimeType { + case clickhouse.DateTime64: + threeDigitsOfPrecisionSuffice := utcTs.UnixNano()%1_000_000 == 0 + if threeDigitsOfPrecisionSuffice { + return model.NewFunction("fromUnixTimestamp64Milli", model.NewLiteral(utcTs.UnixMilli())), true + } else { + return model.NewFunction( + "toDateTime64", + model.NewInfixExpr( + model.NewLiteral(utcTs.UnixNano()), + "/", + model.NewLiteral(1_000_000_000), + ), + model.NewLiteral(9), + ), true + } + case clickhouse.DateTime: + return model.NewFunction("fromUnixTimestamp", model.NewLiteral(utcTs.Unix())), true + default: + logger.WarnWithCtx(dm.ctx).Msgf("Unknown datetimeType: %v", datetimeType) } - case clickhouse.DateTime: - return model.NewFunction("fromUnixTimestamp", model.NewLiteral(utcTs.Unix())), true - default: - logger.WarnWithCtx(dm.ctx).Msgf("Unknown datetimeType: %v", datetimeType) } - } - return nil, false + */ } diff --git a/platform/parsers/elastic_query_dsl/dates_test.go b/platform/parsers/elastic_query_dsl/dates_test.go index 5a7c9905e..9d1dfc10a 100644 --- a/platform/parsers/elastic_query_dsl/dates_test.go +++ b/platform/parsers/elastic_query_dsl/dates_test.go @@ -39,8 +39,11 @@ func TestDateManager_parseStrictDateOptionalTimeOrEpochMillis(t *testing.T) { {"2024-02-25T25:00:00", empty, false}, {"2024-02-25T13:00:00+05", time.UnixMilli(1708848000000), true}, {"2024-02-25T13:00:00+05:00", time.UnixMilli(1708848000000), true}, + {"2024-02-25T13:00:00.1", time.UnixMilli(1708866000100), true}, {"2024-02-25T13:00:00.123", time.UnixMilli(1708866000123), true}, {"2024-02-25T13:00:00.123Z", time.UnixMilli(1708866000123), true}, + {"2024-02-25T13:00:00.03123584", time.Unix(1708866000, 31235840), true}, + {"2024-02-25T13:00:00.03123584Z", time.Unix(1708866000, 31235840), true}, {"2024-02-25T13:00:00.123456789", time.Unix(1708866000, 123456789), true}, {"2024-02-25T13:00:00.123456789Z", time.Unix(1708866000, 123456789), true}, } @@ -48,8 +51,8 @@ func TestDateManager_parseStrictDateOptionalTimeOrEpochMillis(t *testing.T) { t.Run(util.PrettyTestName(fmt.Sprintf("%v", tt.input), i), func(t *testing.T) { dm := NewDateManager(context.Background()) gotUnixTs, gotParsingSucceeded := dm.parseStrictDateOptionalTimeOrEpochMillis(tt.input) - assert.Truef(t, tt.wantedTimestamp.Equal(gotUnixTs), "MissingInDateHistogramToUnixTimestamp(%v)", tt.input) - assert.Equalf(t, tt.wantedParsingSucceeded, gotParsingSucceeded, "MissingInDateHistogramToUnixTimestamp(%v)", tt.input) + assert.Truef(t, tt.wantedTimestamp.Equal(gotUnixTs), "wanted %v, got %v", tt.wantedTimestamp, gotUnixTs) + assert.Equalf(t, tt.wantedParsingSucceeded, gotParsingSucceeded, "wanted %v, got %v", tt.wantedParsingSucceeded, gotParsingSucceeded) }) } } diff --git a/platform/parsers/elastic_query_dsl/pancake_aggregation_parser_buckets.go b/platform/parsers/elastic_query_dsl/pancake_aggregation_parser_buckets.go index 371635672..fbb8e0db1 100644 --- a/platform/parsers/elastic_query_dsl/pancake_aggregation_parser_buckets.go +++ b/platform/parsers/elastic_query_dsl/pancake_aggregation_parser_buckets.go @@ -6,7 +6,6 @@ package elastic_query_dsl import ( "fmt" "github.com/H0llyW00dzZ/cidr" - "github.com/QuesmaOrg/quesma/platform/clickhouse" "github.com/QuesmaOrg/quesma/platform/logger" "github.com/QuesmaOrg/quesma/platform/model" "github.com/QuesmaOrg/quesma/platform/model/bucket_aggregations" @@ -107,13 +106,16 @@ func (cw *ClickhouseQueryTranslator) parseHistogram(aggregation *pancakeAggregat func (cw *ClickhouseQueryTranslator) parseDateHistogram(aggregation *pancakeAggregationTreeNode, params QueryMap) (err error) { field := cw.parseFieldField(params, "date_histogram") - dateTimeType := cw.Table.GetDateTimeTypeFromExpr(cw.Ctx, field) + colRef, ok := field.(model.ColumnRef) + if !ok { + return fmt.Errorf("field is not a column reference, but %T, value: %v", field, field) + } weAddedMissing := false if missingRaw, exists := params["missing"]; exists { if missing, ok := missingRaw.(string); ok { dateManager := NewDateManager(cw.Ctx) - if missingExpr, parsingOk := dateManager.ParseDateUsualFormat(missing, dateTimeType); parsingOk { + if missingExpr := dateManager.ParseDateUsualFormat(missing, colRef); missingExpr != nil { field = model.NewFunction("COALESCE", field, missingExpr) weAddedMissing = true } else { @@ -140,12 +142,8 @@ func (cw *ClickhouseQueryTranslator) parseDateHistogram(aggregation *pancakeAggr interval, intervalType := cw.extractInterval(params) // TODO GetDateTimeTypeFromExpr can be moved and it should take cw.Schema as an argument - if dateTimeType == clickhouse.Invalid { - logger.WarnWithCtx(cw.Ctx).Msgf("invalid date time type for field %s", field) - } - dateHistogram := bucket_aggregations.NewDateHistogram(cw.Ctx, - field, interval, timezone, format, minDocCount, ebMin, ebMax, intervalType, dateTimeType) + field, colRef, interval, timezone, format, minDocCount, ebMin, ebMax, intervalType) aggregation.queryType = dateHistogram columnSql := dateHistogram.GenerateSQL() @@ -506,7 +504,7 @@ func (cw *ClickhouseQueryTranslator) parseIpRange(aggregation *pancakeAggregatio if endExclusive.IsValid() { end = endExclusive.String() } else { // invalid means endInclusive was already the biggest possible value (ff...ff) - end = bucket_aggregations.UnboundedInterval + end = bucket_aggregations.UnboundedIntervalString } } else { return fmt.Errorf("invalid mask: %s", maskIfExists) @@ -515,8 +513,8 @@ func (cw *ClickhouseQueryTranslator) parseIpRange(aggregation *pancakeAggregatio key = &maskIfExists } } else { - begin = cw.parseStringField(rangeRaw.(QueryMap), "from", bucket_aggregations.UnboundedInterval) - end = cw.parseStringField(rangeRaw.(QueryMap), "to", bucket_aggregations.UnboundedInterval) + begin = cw.parseStringField(rangeRaw.(QueryMap), "from", bucket_aggregations.UnboundedIntervalString) + end = cw.parseStringField(rangeRaw.(QueryMap), "to", bucket_aggregations.UnboundedIntervalString) } ranges = append(ranges, bucket_aggregations.NewIpInterval(begin, end, key)) } diff --git a/platform/parsers/elastic_query_dsl/pancake_sql_query_generation_test.go b/platform/parsers/elastic_query_dsl/pancake_sql_query_generation_test.go index 895296d5f..d7bd380e6 100644 --- a/platform/parsers/elastic_query_dsl/pancake_sql_query_generation_test.go +++ b/platform/parsers/elastic_query_dsl/pancake_sql_query_generation_test.go @@ -6,9 +6,11 @@ import ( "context" "fmt" "github.com/QuesmaOrg/quesma/platform/clickhouse" + "github.com/QuesmaOrg/quesma/platform/logger" "github.com/QuesmaOrg/quesma/platform/model" "github.com/QuesmaOrg/quesma/platform/model/bucket_aggregations" "github.com/QuesmaOrg/quesma/platform/schema" + transformations_delete "github.com/QuesmaOrg/quesma/platform/transformations-delete" "github.com/QuesmaOrg/quesma/platform/types" "github.com/QuesmaOrg/quesma/platform/util" "github.com/k0kubun/pp" @@ -21,12 +23,13 @@ const TableName = model.SingleTableNamePlaceHolder func TestPancakeQueryGeneration(t *testing.T) { - // logger.InitSimpleLoggerForTestsWarnLevel() + logger.InitSimpleLoggerForTestsWarnLevel() table := clickhouse.Table{ Cols: map[string]*clickhouse.Column{ "@timestamp": {Name: "@timestamp", Type: clickhouse.NewBaseType("DateTime64")}, "timestamp": {Name: "timestamp", Type: clickhouse.NewBaseType("DateTime64")}, "order_date": {Name: "order_date", Type: clickhouse.NewBaseType("DateTime64")}, + "reqTimeSec": {Name: "reqTimeSec", Type: clickhouse.NewBaseType("DateTime64")}, "message": {Name: "message", Type: clickhouse.NewBaseType("String")}, "bytes_gauge": {Name: "bytes_gauge", Type: clickhouse.NewBaseType("UInt64")}, "customer_birth_date": {Name: "customer_birth_date", Type: clickhouse.NewBaseType("DateTime")}, @@ -37,7 +40,14 @@ func TestPancakeQueryGeneration(t *testing.T) { } currentSchema := schema.Schema{ - Fields: nil, + Fields: map[schema.FieldName]schema.Field{ + "@timestamp": {PropertyName: "@timestamp", InternalPropertyName: "@timestamp", Type: schema.QuesmaTypeDate}, + "timestamp": {PropertyName: "timestamp", InternalPropertyName: "timestamp", Type: schema.QuesmaTypeDate}, + "order_date": {PropertyName: "order_date", InternalPropertyName: "order_date", Type: schema.QuesmaTypeDate}, + "reqTimeSec": {PropertyName: "reqTimeSec", InternalPropertyName: "reqTimeSec", Type: schema.QuesmaTypeDate}, + "customer_birth_date": {PropertyName: "customer_birth_date", InternalPropertyName: "customer_birth_date", Type: schema.QuesmaTypeDate}, + "customer_birth_date_datetime64": {PropertyName: "customer_birth_date_datetime64", InternalPropertyName: "customer_birth_date_datetime64", Type: schema.QuesmaTypeDate}, + }, Aliases: nil, ExistsInDataSource: false, DatabaseName: "", @@ -78,6 +88,11 @@ func TestPancakeQueryGeneration(t *testing.T) { pancakeSqls, err := cw.PancakeParseAggregationJson(jsonp, false) assert.NoError(t, err) + for j, pancake := range pancakeSqls { + pancakeSqls[j], err = transformations_delete.ApplyNecessaryTransformations(context.Background(), pancake, &table, currentSchema) + assert.NoError(t, err) + } + assert.True(t, len(pancakeSqls) >= 1, "pancakeSqls should have at least one query") if len(pancakeSqls) < 1 { return diff --git a/platform/parsers/elastic_query_dsl/pancake_transformer.go b/platform/parsers/elastic_query_dsl/pancake_transformer.go index 16e3ae48c..c93d7945a 100644 --- a/platform/parsers/elastic_query_dsl/pancake_transformer.go +++ b/platform/parsers/elastic_query_dsl/pancake_transformer.go @@ -387,7 +387,7 @@ func (a *pancakeTransformer) transformAutoDateHistogram(layers []*pancakeModelLa if layer.nextBucketAggregation != nil { if autoDateHistogram, ok := layer.nextBucketAggregation.queryType.(*bucket_aggregations.AutoDateHistogram); ok { if tsLowerBound, found := model.FindTimestampLowerBound(autoDateHistogram.GetField(), whereClause); found { - autoDateHistogram.SetKey(tsLowerBound) + autoDateHistogram.SetKey(tsLowerBound.UnixMilli()) } else { logger.WarnWithCtx(a.ctx).Msgf("could not find timestamp lower bound (field: %v, where clause: %v)", autoDateHistogram.GetField(), whereClause) diff --git a/platform/parsers/elastic_query_dsl/query_parser.go b/platform/parsers/elastic_query_dsl/query_parser.go index a0c30d9cc..431cff3f4 100644 --- a/platform/parsers/elastic_query_dsl/query_parser.go +++ b/platform/parsers/elastic_query_dsl/query_parser.go @@ -750,22 +750,26 @@ func (cw *ClickhouseQueryTranslator) parseNested(queryMap QueryMap) model.Simple return model.NewSimpleQueryInvalid() } -func (cw *ClickhouseQueryTranslator) parseDateMathExpression(expr string) (string, error) { +func (cw *ClickhouseQueryTranslator) parseDateMathExpression(expr string, timestampField model.ColumnRef) (model.Expr, error) { expr = strings.ReplaceAll(expr, "'", "") exp, err := ParseDateMathExpression(expr) if err != nil { - return "", err + return nil, err } builder := DateMathExpressionRendererFactory(cw.DateMathRenderer) if builder == nil { - return "", fmt.Errorf("no date math expression renderer found: %s", cw.DateMathRenderer) + return nil, fmt.Errorf("no date math expression renderer found: %s", cw.DateMathRenderer) } - sql, err := builder.RenderSQL(exp) + sql, err := builder.RenderExpr(exp) if err != nil { - return "", err + return nil, err + } + + if ts, ok := model.ToTimeLiteral(sql); ok { + return model.NewTimeLiteral(ts.Value, timestampField), nil } return sql, nil @@ -786,8 +790,6 @@ func (cw *ClickhouseQueryTranslator) parseRange(queryMap QueryMap) model.SimpleQ const dateInSchemaExpected = true for fieldName, v := range queryMap { - fieldName = ResolveField(cw.Ctx, fieldName, cw.Schema) - fieldType := cw.Table.GetDateTimeType(cw.Ctx, ResolveField(cw.Ctx, fieldName, cw.Schema), dateInSchemaExpected) stmts := make([]model.Expr, 0) if _, ok := v.(QueryMap); !ok { @@ -810,20 +812,23 @@ func (cw *ClickhouseQueryTranslator) parseRange(queryMap QueryMap) model.SimpleQ // Numbers use just 3rd var finalValue model.Expr - doneParsing, isQuoted := false, len(value) > 2 && value[0] == '\'' && value[len(value)-1] == '\'' + areWeDoneParsing := func() bool { + return finalValue != nil + } + isQuoted := len(value) > 2 && value[0] == '\'' && value[len(value)-1] == '\'' switch fieldType { case clickhouse.DateTime, clickhouse.DateTime64: // TODO add support for "time_zone" parameter in ParseDateUsualFormat - finalValue, doneParsing = dateManager.ParseDateUsualFormat(value, fieldType) // stage 1 - if !doneParsing && (op == "gte" || op == "lte" || op == "gt" || op == "lt") { // stage 2 - parsed, err := cw.parseDateMathExpression(value) + finalValue = dateManager.ParseDateUsualFormat(value, model.NewColumnRef(fieldName)) // stage 1 + if !areWeDoneParsing() && (op == "gte" || op == "lte" || op == "gt" || op == "lt") { // stage 2 + parsed, err := cw.parseDateMathExpression(value, model.NewColumnRef(fieldName)) + fmt.Println("QQQ parsed: ", parsed, cw.DateMathRenderer) if err == nil { - doneParsing = true - finalValue = model.NewLiteral(parsed) + finalValue = parsed } } - if !doneParsing && isQuoted { // stage 3 - finalValue, doneParsing = dateManager.ParseDateUsualFormat(value[1:len(value)-1], fieldType) + if !areWeDoneParsing() && isQuoted { // stage 3 + finalValue = dateManager.ParseDateUsualFormat(value[1:len(value)-1], model.NewColumnRef(fieldName)) } case clickhouse.Invalid: if isQuoted { @@ -835,16 +840,18 @@ func (cw *ClickhouseQueryTranslator) parseRange(queryMap QueryMap) model.SimpleQ } if isNumber { finalValue = model.NewLiteral(unquoted) - doneParsing = true } } default: logger.ErrorWithCtx(cw.Ctx).Msgf("invalid DateTime type for field: %s, parsed dateTime value: %s", fieldName, value) } - if !doneParsing { + if !areWeDoneParsing() { finalValue = defaultValue } + fmt.Println("finalValue1: ", finalValue) + + fmt.Println("finalValue2: ", finalValue) field := model.NewColumnRef(fieldName) switch op { diff --git a/platform/parsers/elastic_query_dsl/query_parser_range_test.go b/platform/parsers/elastic_query_dsl/query_parser_range_test.go index 1c9b74a8f..8906da84c 100644 --- a/platform/parsers/elastic_query_dsl/query_parser_range_test.go +++ b/platform/parsers/elastic_query_dsl/query_parser_range_test.go @@ -31,7 +31,7 @@ var parseRangeTests = []parseRangeTest{ `CREATE TABLE ` + tableName + ` ( "message" String, "timestamp" DateTime64(3, 'UTC') ) ENGINE = Memory`, - `("timestamp">=fromUnixTimestamp64Milli(1706881636029) AND "timestamp"<=fromUnixTimestamp64Milli(1707486436029))`, + `("timestamp">=__quesma_from_unix_timestamp_ms(1706881636029) AND "timestamp"<=__quesma_from_unix_timestamp_ms(1707486436029))`, }, { "parseDateTimeBestEffort", @@ -45,7 +45,7 @@ var parseRangeTests = []parseRangeTest{ `CREATE TABLE ` + tableName + ` ( "message" String, "timestamp" DateTime ) ENGINE = Memory`, - `("timestamp">=fromUnixTimestamp(1706881636) AND "timestamp"<=fromUnixTimestamp(1707486436))`, + `("timestamp">=__quesma_from_unix_timestamp_ms(1706881636029) AND "timestamp"<=__quesma_from_unix_timestamp_ms(1707486436029))`, }, { "numeric range", @@ -71,7 +71,7 @@ var parseRangeTests = []parseRangeTest{ `CREATE TABLE ` + tableName + ` ( "message" String, "timestamp" DateTime64(3, 'UTC') ) ENGINE = Memory`, - `("timestamp">=fromUnixTimestamp64Milli(1706881636000) AND "timestamp"<=fromUnixTimestamp64Milli(1707486436000))`, + `("timestamp">=__quesma_from_unix_timestamp_ms(1706881636000) AND "timestamp"<=__quesma_from_unix_timestamp_ms(1707486436000))`, }, } diff --git a/platform/parsers/elastic_query_dsl/query_parser_test.go b/platform/parsers/elastic_query_dsl/query_parser_test.go index 41d509355..ddb4d74f7 100644 --- a/platform/parsers/elastic_query_dsl/query_parser_test.go +++ b/platform/parsers/elastic_query_dsl/query_parser_test.go @@ -13,6 +13,7 @@ import ( "github.com/QuesmaOrg/quesma/platform/persistence" "github.com/QuesmaOrg/quesma/platform/schema" "github.com/QuesmaOrg/quesma/platform/testdata" + transformations_delete "github.com/QuesmaOrg/quesma/platform/transformations-delete" "github.com/QuesmaOrg/quesma/platform/types" "github.com/QuesmaOrg/quesma/platform/util" "github.com/QuesmaOrg/quesma/platform/v2/core/diag" @@ -71,6 +72,12 @@ func TestQueryParserStringAttrConfig(t *testing.T) { plan, errQuery := cw.ParseQuery(body) queries := plan.Queries assert.NoError(t, errQuery, "no ParseQuery error") + + for j, query := range queries { + queries[j], err = transformations_delete.ApplyNecessaryTransformations(context.Background(), query, table, s.Tables[schema.IndexName(tableName)]) + assert.NoError(t, err) + } + assert.True(t, len(queries) > 0, "len queries > 0") var simpleListQuery *model.Query for _, query := range queries { diff --git a/platform/processors/base_processor.go b/platform/processors/base_processor.go index d215e2cb2..434df9a11 100644 --- a/platform/processors/base_processor.go +++ b/platform/processors/base_processor.go @@ -4,6 +4,7 @@ package processors import ( + "context" "github.com/QuesmaOrg/quesma/platform/logger" "github.com/QuesmaOrg/quesma/platform/model" quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core" @@ -68,7 +69,7 @@ func (p *BaseProcessor) Handle(metadata map[string]interface{}, messages ...any) if err != nil { mError = multierror.Append(mError, err) } - queries, err := p.QueryTransformationPipeline.Transform(executionPlan.Queries) + queries, err := p.QueryTransformationPipeline.Transform(context.Background(), executionPlan.Queries) if err != nil { mError = multierror.Append(mError, err) } diff --git a/platform/testdata/aggregation_requests.go b/platform/testdata/aggregation_requests.go index 73b39d8e1..f9c14d915 100644 --- a/platform/testdata/aggregation_requests.go +++ b/platform/testdata/aggregation_requests.go @@ -3,19 +3,10 @@ package testdata import ( - "github.com/QuesmaOrg/quesma/platform/clickhouse" "github.com/QuesmaOrg/quesma/platform/model" "math" - "time" ) -var timestampGroupByClause = model.AsString(clickhouse.TimestampGroupBy( - model.NewColumnRef("@timestamp"), clickhouse.DateTime64, 30*time.Second)) - -func groupBySQL(fieldName string, typ clickhouse.DateTimeType, groupByInterval time.Duration) string { - return model.AsString(clickhouse.TimestampGroupBy(model.NewColumnRef(fieldName), typ, groupByInterval)) -} - const fullTextFieldName = `"` + model.FullTextFieldNamePlaceHolder + `"` // TODO change some tests to size > 0, and track_total_hits different values @@ -4484,15 +4475,14 @@ var AggregationTests = []AggregationTestCase{ }}, }, ExpectedPancakeSQL: ` - SELECT countIf("timestamp"=toInt64(toUnixTimestamp(toStartOfDay(subDate(now(), - INTERVAL 3 week)))) AND "timestamp"=toInt64(toUnixTimestamp('2024-04-14'))) AS - "range_2__aggr__2__count" - FROM ` + TableName + ` - WHERE ("timestamp">=fromUnixTimestamp64Milli(1712388530059) AND "timestamp"<=fromUnixTimestamp64Milli(1713288530059))`, + SELECT countIf("timestamp"=toStartOfDay(subDate(now(), INTERVAL 3 week)) AND + "timestamp"=fromUnixTimestamp64Milli(1713052800000)) AS + "range_2__aggr__2__count" + FROM __quesma_table_name + WHERE ("timestamp">=fromUnixTimestamp64Milli(1712388530059) AND "timestamp"<= + fromUnixTimestamp64Milli(1713288530059))`, }, { // [23] TestName: "significant terms aggregation: same as terms for now", diff --git a/platform/testdata/dashboard-1/aggregation_requests.go b/platform/testdata/dashboard-1/aggregation_requests.go index b71b79937..341bc0419 100644 --- a/platform/testdata/dashboard-1/aggregation_requests.go +++ b/platform/testdata/dashboard-1/aggregation_requests.go @@ -199,8 +199,8 @@ var AggregationTests = []testdata.AggregationTestCase{ count(*) AS "aggr__0__1__count", avgOrNull("rspContentLen") AS "metric__0__1__2_col_0" FROM __quesma_table_name - WHERE ("reqTimeSec">='2024-04-24T10:55:23.606Z' AND "reqTimeSec"<= - '2024-04-24T11:10:23.606Z') + WHERE ("reqTimeSec">=fromUnixTimestamp64Milli(1713956123606) AND + "reqTimeSec"<=fromUnixTimestamp64Milli(1713957023606)) GROUP BY floor("rspContentLen"/2e+06)*2e+06 AS "aggr__0__key_0", floor("rspContentLen"/2e+06)*2e+06 AS "aggr__0__1__key_0")) ORDER BY "aggr__0__order_1_rank" ASC, "aggr__0__1__order_1_rank" ASC`, @@ -460,8 +460,8 @@ var AggregationTests = []testdata.AggregationTestCase{ count(*) AS "aggr__0__1__count", quantiles(0.950000)("latency") AS "metric__0__1__2_col_0" FROM __quesma_table_name - WHERE ("reqTimeSec">='2024-04-24T11:15:46.279Z' AND "reqTimeSec"<= - '2024-04-24T11:30:46.279Z') + WHERE ("reqTimeSec">=fromUnixTimestamp64Milli(1713957346279) AND + "reqTimeSec"<=fromUnixTimestamp64Milli(1713958246279)) GROUP BY toInt64((toUnixTimestamp64Milli("reqTimeSec")+timeZoneOffset( toTimezone("reqTimeSec", 'Europe/Warsaw'))*1000) / 30000) AS "aggr__0__key_0", floor("billingRegion"/0.5)*0.5 AS "aggr__0__1__key_0")) diff --git a/platform/testdata/opensearch_requests.go b/platform/testdata/opensearch_requests.go index fa0ae2d18..f2186b433 100644 --- a/platform/testdata/opensearch_requests.go +++ b/platform/testdata/opensearch_requests.go @@ -80,7 +80,7 @@ var OpensearchSearchTests = []SearchTestCase{ "track_total_hits": true }`, WantedSql: []string{ - `("__timestamp">=fromUnixTimestamp64Milli(1712236698149) AND "__timestamp"<=fromUnixTimestamp64Milli(1712237598149))`, + `("-@timestamp">=fromUnixTimestamp64Milli(1712236698149) AND "-@timestamp"<=fromUnixTimestamp64Milli(1712237598149))`, }, WantedQueryType: model.ListAllFields, WantedQueries: []string{ @@ -171,7 +171,7 @@ var OpensearchSearchTests = []SearchTestCase{ "track_total_hits": true }`, WantedSql: []string{ - `("__timestamp">=fromUnixTimestamp64Milli(1712236698149) AND "__timestamp"<=fromUnixTimestamp64Milli(1712237598149))`, + `("-@timestamp">=fromUnixTimestamp64Milli(1712236698149) AND "-@timestamp"<=fromUnixTimestamp64Milli(1712237598149))`, }, WantedQueryType: model.Normal, WantedQueries: []string{ diff --git a/platform/testdata/requests.go b/platform/testdata/requests.go index 019c4a41e..3476f7b1a 100644 --- a/platform/testdata/requests.go +++ b/platform/testdata/requests.go @@ -156,8 +156,8 @@ var TestsAsyncSearch = []AsyncSearchTestCase{ FROM ( SELECT "host_name" FROM __quesma_table_name - WHERE (("@timestamp">=fromUnixTimestamp64Milli(1706009236820) AND "@timestamp" - <=fromUnixTimestamp64Milli(1706010136820)) AND "message" iLIKE '%user%') + WHERE (("@timestamp">=fromUnixTimestamp64Milli(1706009236820) AND + "@timestamp"<=fromUnixTimestamp64Milli(1706010136820)) AND "message" iLIKE '%user%') LIMIT 20000) GROUP BY "host_name" AS "aggr__sample__top_values__key_0" ORDER BY "aggr__sample__top_values__count" DESC, @@ -941,10 +941,10 @@ var TestsAsyncSearch = []AsyncSearchTestCase{ []string{` SELECT "properties_isreg" FROM __quesma_table_name - WHERE ((("@timestamp">=fromUnixTimestamp64Milli(1710171234276) AND "@timestamp" - <=fromUnixTimestamp64Milli(1710172134276)) AND ("@timestamp">= - fromUnixTimestamp64Milli(1710171234276) AND "@timestamp"<= - fromUnixTimestamp64Milli(1710172134276))) AND "properties_isreg" IS NOT NULL) + WHERE ((("@timestamp">=fromUnixTimestamp64Milli(1710171234276) AND + "@timestamp"<=fromUnixTimestamp64Milli(1710172134276)) AND + ("@timestamp">=fromUnixTimestamp64Milli(1710171234276) AND + "@timestamp"<=fromUnixTimestamp64Milli(1710172134276))) AND "properties_isreg" IS NOT NULL) LIMIT 100`, }, false, @@ -1490,15 +1490,6 @@ var TestsSearch = []SearchTestCase{ "query": "user", "lenient": true } - }, - { - "range": { - "@timestamp": { - "format": "strict_date_optional_time", - "gte": "2024-01-22T09:26:10.299Z", - "lte": "2024-01-22T09:41:10.299Z" - } - } } ], "should": [], @@ -1530,9 +1521,8 @@ var TestsSearch = []SearchTestCase{ } `, []string{ - `(` + fullTextFieldName + ` iLIKE '%user%' AND ("@timestamp">=fromUnixTimestamp64Milli(1705915570299) AND "@timestamp"<=fromUnixTimestamp64Milli(1705916470299)))`, - `((` + fullTextFieldName + ` iLIKE '%user%' AND ("@timestamp">=fromUnixTimestamp64Milli(1705915570299) AND "@timestamp"<=fromUnixTimestamp64Milli(1705916470299))) ` + - `AND "stream.namespace" IS NOT NULL)`, + fullTextFieldName + ` iLIKE '%user%'`, + fullTextFieldName + ` iLIKE '%user%' AND "stream.namespace" IS NOT NULL`, }, model.Normal, []string{}, @@ -1544,7 +1534,7 @@ var TestsSearch = []SearchTestCase{ "stream_namespace" AS "aggr__suggestions__key_0", count(*) AS "aggr__suggestions__count" FROM __quesma_table_name - WHERE ("message" iLIKE '%user%' AND ("@timestamp">=fromUnixTimestamp64Milli(1705915570299) AND "@timestamp"<=fromUnixTimestamp64Milli(1705916470299))) + WHERE "message" iLIKE '%user%' GROUP BY "stream_namespace" AS "aggr__suggestions__key_0" ORDER BY "aggr__suggestions__count" DESC, "aggr__suggestions__key_0" ASC LIMIT 11`, @@ -1578,15 +1568,6 @@ var TestsSearch = []SearchTestCase{ ], "minimum_should_match": 1 } - }, - { - "range": { - "@timestamp": { - "format": "strict_date_optional_time", - "gte": "2024-01-22T14:34:35.873Z", - "lte": "2024-01-22T14:49:35.873Z" - } - } } ], "should": [], @@ -1617,9 +1598,8 @@ var TestsSearch = []SearchTestCase{ } `, []string{ - `(("service.name"='admin' AND ("@timestamp">=fromUnixTimestamp64Milli(1705934075873) AND "@timestamp"<=fromUnixTimestamp64Milli(1705934975873))) ` + - `AND "namespace" IS NOT NULL)`, - `("service.name"='admin' AND ("@timestamp">=fromUnixTimestamp64Milli(1705934075873) AND "@timestamp"<=fromUnixTimestamp64Milli(1705934975873)))`, + `"service.name"='admin' AND "namespace" IS NOT NULL`, + `"service.name"='admin'`, }, model.Normal, []string{}, @@ -1629,7 +1609,7 @@ var TestsSearch = []SearchTestCase{ "namespace" AS "aggr__suggestions__key_0", count(*) AS "aggr__suggestions__count" FROM __quesma_table_name - WHERE ("service_name"='admin' AND ("@timestamp">=fromUnixTimestamp64Milli(1705934075873) AND "@timestamp"<=fromUnixTimestamp64Milli(1705934975873))) + WHERE "service_name"='admin' GROUP BY "namespace" AS "aggr__suggestions__key_0" ORDER BY "aggr__suggestions__count" DESC, "aggr__suggestions__key_0" ASC LIMIT 11`, @@ -1670,15 +1650,6 @@ var TestsSearch = []SearchTestCase{ "match_phrase": { "host.name": "poseidon" } - }, - { - "range": { - "@timestamp": { - "format": "strict_date_optional_time", - "gte": "2024-01-29T15:36:36.491Z", - "lte": "2024-01-29T18:11:36.491Z" - } - } } ], "must": [], @@ -1696,11 +1667,9 @@ var TestsSearch = []SearchTestCase{ "track_total_hits": true }`, []string{ - `(("message" __quesma_match '%User logged out%' AND "host.name" __quesma_match '%poseidon%') ` + - `AND ("@timestamp">=fromUnixTimestamp64Milli(1706542596491) AND "@timestamp"<=fromUnixTimestamp64Milli(1706551896491)))`, - `((("message" __quesma_match '%User logged out%' AND "host.name" __quesma_match '%poseidon%') ` + - `AND ("@timestamp">=fromUnixTimestamp64Milli(1706542596491) AND "@timestamp"<=fromUnixTimestamp64Milli(1706551896491))) ` + - `AND "stream.namespace" IS NOT NULL)`, + `("message" __quesma_match '%User logged out%' AND "host.name" __quesma_match '%poseidon%')`, + `("message" __quesma_match '%User logged out%' AND "host.name" __quesma_match '%poseidon%') ` + + `AND "stream.namespace" IS NOT NULL`, }, model.Normal, []string{}, @@ -1712,8 +1681,7 @@ var TestsSearch = []SearchTestCase{ "stream_namespace" AS "aggr__suggestions__key_0", count(*) AS "aggr__suggestions__count" FROM __quesma_table_name - WHERE (("message" ILIKE '%User logged out%' AND "host_name" ILIKE '%poseidon%') - AND ("@timestamp">=fromUnixTimestamp64Milli(1706542596491) AND "@timestamp"<=fromUnixTimestamp64Milli(1706551896491))) + WHERE ("message" ILIKE '%User logged out%' AND "host_name" ILIKE '%poseidon%') GROUP BY "stream_namespace" AS "aggr__suggestions__key_0" ORDER BY "aggr__suggestions__count" DESC, "aggr__suggestions__key_0" ASC LIMIT 11`, @@ -1751,15 +1719,6 @@ var TestsSearch = []SearchTestCase{ "query": "user", "type": "best_fields" } - }, - { - "range": { - "@timestamp": { - "format": "strict_date_optional_time", - "gte": "2024-01-22T09:26:10.299Z", - "lte": "2024-01-22T09:41:10.299Z" - } - } } ], "must": [], @@ -1776,11 +1735,7 @@ var TestsSearch = []SearchTestCase{ "terminate_after": 100000, "timeout": "1000ms" }`, - []string{ - `((` + fullTextFieldName + ` iLIKE '%user%' AND ("@timestamp">=fromUnixTimestamp64Milli(1705915570299) AND "@timestamp"<=fromUnixTimestamp64Milli(1705916470299))) ` + - `AND "namespace" IS NOT NULL)`, - `(` + fullTextFieldName + ` iLIKE '%user%' AND ("@timestamp">=fromUnixTimestamp64Milli(1705915570299) AND "@timestamp"<=fromUnixTimestamp64Milli(1705916470299)))`, - }, + []string{fullTextFieldName + ` iLIKE '%user%'`}, model.Normal, []string{}, []string{ @@ -1789,7 +1744,7 @@ var TestsSearch = []SearchTestCase{ "namespace" AS "aggr__suggestions__key_0", count(*) AS "aggr__suggestions__count" FROM __quesma_table_name - WHERE ("message" iLIKE '%user%' AND ("@timestamp">=fromUnixTimestamp64Milli(1705915570299) AND "@timestamp"<=fromUnixTimestamp64Milli(1705916470299))) + WHERE "message" iLIKE '%user%' GROUP BY "namespace" AS "aggr__suggestions__key_0" ORDER BY "aggr__suggestions__count" DESC, "aggr__suggestions__key_0" ASC LIMIT 11`, @@ -1830,15 +1785,6 @@ var TestsSearch = []SearchTestCase{ "match_phrase": { "host.name": "poseidon" } - }, - { - "range": { - "@timestamp": { - "format": "strict_date_optional_time", - "gte": "2024-01-29T15:36:36.491Z", - "lte": "2024-01-29T18:11:36.491Z" - } - } } ], "must": [], @@ -1855,13 +1801,7 @@ var TestsSearch = []SearchTestCase{ "terminate_after": 100000, "timeout": "1000ms" }`, - []string{ - `(("message" __quesma_match '%User logged out%' AND "host.name" __quesma_match '%poseidon%') ` + - `AND ("@timestamp">=fromUnixTimestamp64Milli(1706542596491) AND "@timestamp"<=fromUnixTimestamp64Milli(1706551896491))) ` + - `AND "namespace" IS NOT NULL)`, - `(("message" __quesma_match '%User logged out%' AND "host.name" __quesma_match '%poseidon%') ` + - `AND ("@timestamp">=fromUnixTimestamp64Milli(1706542596491) AND "@timestamp"<=fromUnixTimestamp64Milli(1706551896491)))`, - }, + []string{`("message" __quesma_match '%User logged out%' AND "host.name" __quesma_match '%poseidon%')`}, model.Normal, []string{}, []string{ @@ -1870,8 +1810,7 @@ var TestsSearch = []SearchTestCase{ "namespace" AS "aggr__suggestions__key_0", count(*) AS "aggr__suggestions__count" FROM __quesma_table_name - WHERE (("message" ILIKE '%User logged out%' AND "host_name" ILIKE '%poseidon%') - AND ("@timestamp">=fromUnixTimestamp64Milli(1706542596491) AND "@timestamp"<=fromUnixTimestamp64Milli(1706551896491))) + WHERE ("message" ILIKE '%User logged out%' AND "host_name" ILIKE '%poseidon%') GROUP BY "namespace" AS "aggr__suggestions__key_0" ORDER BY "aggr__suggestions__count" DESC, "aggr__suggestions__key_0" ASC LIMIT 11`, @@ -1909,15 +1848,6 @@ var TestsSearch = []SearchTestCase{ "query": "user", "type": "best_fields" } - }, - { - "range": { - "@timestamp": { - "format": "strict_date_optional_time", - "gte": "2024-01-22T09:26:10.299Z", - "lte": "2024-01-22T09:41:10.299Z" - } - } } ], "must": [], @@ -1934,10 +1864,7 @@ var TestsSearch = []SearchTestCase{ "terminate_after": 100000, "timeout": "1000ms" }`, - []string{ - `((` + fullTextFieldName + ` iLIKE '%user%' AND ("@timestamp">=fromUnixTimestamp64Milli(1705915570299) AND "@timestamp"<=fromUnixTimestamp64Milli(1705916470299))) AND "namespace" IS NOT NULL)`, - `(` + fullTextFieldName + ` iLIKE '%user%' AND ("@timestamp">=fromUnixTimestamp64Milli(1705915570299) AND "@timestamp"<=fromUnixTimestamp64Milli(1705916470299)))`, - }, + []string{fullTextFieldName + ` iLIKE '%user%'`}, model.Normal, []string{}, []string{ @@ -1946,7 +1873,7 @@ var TestsSearch = []SearchTestCase{ "namespace" AS "aggr__suggestions__key_0", count(*) AS "aggr__suggestions__count" FROM __quesma_table_name - WHERE ("message" iLIKE '%user%' AND ("@timestamp">=fromUnixTimestamp64Milli(1705915570299) AND "@timestamp"<=fromUnixTimestamp64Milli(1705916470299))) + WHERE "message" iLIKE '%user%' GROUP BY "namespace" AS "aggr__suggestions__key_0" ORDER BY "aggr__suggestions__count" DESC, "aggr__suggestions__key_0" ASC LIMIT 11`, @@ -2156,9 +2083,7 @@ var TestsSearch = []SearchTestCase{ }, "track_total_hits": false }`, - []string{ - `("@timestamp">=fromUnixTimestamp64Milli(1705915570299) AND "@timestamp" = toDateTime64('2024-05-24 13:32:47.307',3))`, - }, + []string{`("@timestamp">=fromUnixTimestamp64Milli(1705915570299) AND "@timestamp" = toDateTime64('2024-05-24 13:32:47.307',3))`}, model.ListAllFields, // TestSearchHandler is pretty blunt with config loading so the test below can't be used. // We will probably refactor it as we move forwards with schema which will get even more side-effecting @@ -2213,7 +2138,6 @@ var TestsSearch = []SearchTestCase{ }`, []string{`("cliIP" IN tuple('2601:204:c503:c240:9c41:5531:ad94:4d90', '50.116.43.98', '75.246.0.64') AND ("@timestamp">=fromUnixTimestamp64Milli(1715817600000) AND "@timestamp"<=fromUnixTimestamp64Milli(1715990399000)))`}, model.ListAllFields, - //[]model.Query{withLimit(justSimplestWhere(`("cliIP" IN ('2601:204:c503:c240:9c41:5531:ad94:4d90','50.116.43.98','75.246.0.64') AND ("@timestamp">=parseDateTime64BestEffort('2024-05-16T00:00:00') AND "@timestamp"<=parseDateTime64BestEffort('2024-05-17T23:59:59')))`), 1)}, []string{ `SELECT "message" ` + `FROM ` + TableName + ` ` + @@ -2505,6 +2429,44 @@ Men\\'s Clothing \\\\ %' LIMIT 10`}, []string{}, }, { // [49] + "DateTime(N), N > 3 (9 here, should be the same for all)", + `{ + "query": { + "bool": { + "filter": [ + { + "exists": { + "field": "tsPrec9" + } + }, + { + "range": { + "tsPrec9": { + "format": "date_time", + "gte": "2025-03-18T10:04:36.03123584Z", + "lte": "2025-03-18T10:04:36.03123584Z" + } + } + } + ], + "must": [], + "must_not": [], + "should": [] + } + }, + "track_total_hits": false + }`, + []string{`("tsPrec9" IS NOT NULL AND ("tsPrec9">='2025-03-18T10:04:36.03123584Z' AND "tsPrec9"<='2025-03-18T10:04:36.03123584Z'))`}, + model.ListAllFields, + []string{ + `SELECT "message" FROM ` + TableName + ` ` + + `WHERE ("tsPrec9" IS NOT NULL AND ("tsPrec9">=fromUnixTimestamp64Milli(toDateTime64(1742292276031235840/1000000000,9)) AND ` + + `"tsPrec9"<=fromUnixTimestamp64Milli(toDateTime64(1742292276031235840/1000000000,9)))) ` + + `LIMIT 10`, + }, + []string{}, + }, + { // [50] "_index term", `{ "query": { /*one comment */ @@ -2563,14 +2525,6 @@ var TestsSearchNoAttrs = []SearchTestCase{ "query": { "bool": { "filter": [ - { - "range": { - "@timestamp": { - "gte": "2024-01-25T13:22:45.968Z", - "lte": "2024-01-25T13:37:45.968Z" - } - } - }, { "exists": { "field": "summary" @@ -2590,10 +2544,10 @@ var TestsSearchNoAttrs = []SearchTestCase{ }, "track_total_hits": false }`, - []string{`((("@timestamp">=fromUnixTimestamp64Milli(1706188965968) AND "@timestamp"<=fromUnixTimestamp64Milli(1706189865968)) AND "summary" IS NOT NULL) AND NOT ("run_once" IS NOT NULL))`}, + []string{`("summary" IS NOT NULL AND NOT ("run_once" IS NOT NULL))`}, model.ListAllFields, []string{ - `SELECT "@timestamp", "message" FROM __quesma_table_name WHERE ((("@timestamp">=fromUnixTimestamp64Milli(1706188965968) AND "@timestamp"<=fromUnixTimestamp64Milli(1706189865968)) AND NULL IS NOT NULL) AND NOT (NULL IS NOT NULL)) LIMIT 10`, + `SELECT "@timestamp", "message" FROM __quesma_table_name WHERE (NULL IS NOT NULL AND NOT (NULL IS NOT NULL)) LIMIT 10`, }, []string{}, }, @@ -2641,9 +2595,9 @@ var TestSearchFilter = []SearchTestCase{ []string{}, model.Normal, []string{}, - []string{ - `SELECT toInt64(toUnixTimestamp64Milli("@timestamp") / 30000) AS "aggr__0__key_0" - , count(*) AS "aggr__0__count" + []string{` + SELECT toInt64(toUnixTimestamp64Milli("@timestamp") / 30000) AS "aggr__0__key_0", + count(*) AS "aggr__0__count" FROM __quesma_table_name GROUP BY toInt64(toUnixTimestamp64Milli("@timestamp") / 30000) AS "aggr__0__key_0" @@ -2759,12 +2713,13 @@ var TestSearchFilter = []SearchTestCase{ []string{}, model.Normal, []string{}, - []string{ - `SELECT sum(count(*)) OVER () AS "metric____quesma_total_count_col_0", + []string{` + SELECT sum(count(*)) OVER () AS "metric____quesma_total_count_col_0", toInt64(toUnixTimestamp64Milli("@timestamp") / 30000) AS "aggr__0__key_0", count(*) AS "aggr__0__count" FROM __quesma_table_name - WHERE ("@timestamp">=fromUnixTimestamp64Milli(1727858503270) AND "@timestamp"<=fromUnixTimestamp64Milli(1727859403270)) + WHERE ("@timestamp">=fromUnixTimestamp64Milli(1727858503270) AND + "@timestamp"<=fromUnixTimestamp64Milli(1727859403270)) GROUP BY toInt64(toUnixTimestamp64Milli("@timestamp") / 30000) AS "aggr__0__key_0" ORDER BY "aggr__0__key_0" ASC`, diff --git a/platform/transformations-delete/transformations.go b/platform/transformations-delete/transformations.go new file mode 100644 index 000000000..121503016 --- /dev/null +++ b/platform/transformations-delete/transformations.go @@ -0,0 +1,277 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package transformations_delete + +import ( + "context" + "fmt" + "github.com/QuesmaOrg/quesma/platform/clickhouse" + "github.com/QuesmaOrg/quesma/platform/logger" + "github.com/QuesmaOrg/quesma/platform/model" + "github.com/QuesmaOrg/quesma/platform/schema" + "github.com/QuesmaOrg/quesma/platform/util" + "github.com/k0kubun/pp" + "slices" + "strings" +) + +func ApplyNecessaryTransformations(ctx context.Context, query *model.Query, table *clickhouse.Table, indexSchema schema.Schema) (*model.Query, error) { + + type scopeType = int + const ( + datetime scopeType = iota + datetime64 + none + ) + scope := none + + visitor := model.NewBaseVisitor() + + // we look for: (timestamp_field OP fromUnixTimestamp) + visitor.OverrideVisitInfix = func(b *model.BaseExprVisitor, e model.InfixExpr) interface{} { + visitChildren := func() model.InfixExpr { + return model.NewInfixExpr(e.Left.Accept(b).(model.Expr), e.Op, e.Right.Accept(b).(model.Expr)) + } + + fmt.Println("KK start 1", e) + + // check if timestamp_field is ok + colRef, ok := e.Left.(model.ColumnRef) + fmt.Println("KK start 2", colRef, ok) + if !ok { + return visitChildren() + } + field, ok := indexSchema.ResolveField(colRef.ColumnName) + fmt.Println("KK start 3", field, ok) + if !ok { + logger.WarnWithCtx(ctx).Msgf("field %s not found in schema for table %s", colRef.ColumnName, query.TableName) + return visitChildren() + } + col, ok := table.Cols[field.InternalPropertyName.AsString()] + if !ok { + logger.WarnWithCtx(ctx).Msgf("field %s not found in table %s", field.InternalPropertyName.AsString(), query.TableName) + return visitChildren() + } + fmt.Println("KK start 3", e, col, ok) + isDatetime := col.IsDatetime() + isDateTime64 := col.IsDatetime64() + fmt.Println("KK start 4", isDatetime, isDateTime64) + if !isDatetime && !isDateTime64 { + return visitChildren() + } + + // check if operator is ok + op := strings.TrimSpace(e.Op) + fmt.Println("KK start 5", op) + if !slices.Contains([]string{"=", "!=", ">", "<", ">=", "<=", "/"}, op) { + return visitChildren() + } + + // check if right side is a function we want + tsFunc, ok := e.Right.(model.FunctionExpr) + if !ok { + fmt.Println("koniec") + return visitChildren() + } + if tsFunc.Name != model.FromUnixTimestampMs && tsFunc.Name != model.ToUnixTimestampMs { + //fmt.Println("wtf, name:", tsFunc.Name) + return visitChildren() + } + if len(tsFunc.Args) != 1 { + logger.WarnWithCtx(ctx).Msgf("invalid number of arguments for %s function", tsFunc.Name) + return visitChildren() + } + + arg := tsFunc.Args[0].Accept(b).(model.Expr) + pp.Println("KK 74 ARG", tsFunc.Args[0], arg) + if isDateTime64 { + clickhouseFunc := model.ClickhouseFromUnixTimestampMsToDatetime64Function + return model.NewInfixExpr(colRef, e.Op, model.NewFunction(clickhouseFunc, arg)) + } else if isDatetime { + fmt.Println("KK 79l", arg) + tsAny, isLiteral := arg.(model.LiteralExpr) + if !isLiteral { + logger.WarnWithCtx(ctx).Msgf("invalid argument for %s function: %v. isn't literal, but %T", tsFunc.Name, arg, arg) + return visitChildren() + } + ts, isNumber := util.ExtractNumeric64Maybe(tsAny.Value) + if !isNumber { + logger.WarnWithCtx(ctx).Msgf("invalid argument for %s function: %v. isn't integer, but %T", tsFunc.Name, arg, arg) + return visitChildren() + } + + clickhouseFunc := model.ClickhouseFromUnixTimestampMsToDatetimeFunction + return model.NewInfixExpr(colRef, e.Op, model.NewFunction(clickhouseFunc, model.NewLiteral(int64(ts/1000)))) + } + + return visitChildren() // unreachable + } + + // we look for: toUnixTimestamp(timestamp_field) or fromUnixTimestamp(TimeLiteral) + visitor.OverrideVisitFunction = func(b *model.BaseExprVisitor, e model.FunctionExpr) interface{} { + visitChildren := func() model.FunctionExpr { + return model.NewFunction(e.Name, b.VisitChildren(e.Args)...) + } + + scopeBefore := scope + defer func() { scope = scopeBefore }() + + toUnix := func() interface{} { + if len(e.Args) != 1 { + logger.WarnWithCtx(ctx).Msgf("invalid number of arguments for %s function", e.Name) + return visitChildren() + } + colRef, ok := e.Args[0].(model.ColumnRef) + fmt.Printf("KK colref %v ok %v\n", colRef, ok) + if !ok { + if f, ok := e.Args[0].(model.FunctionExpr); ok && strings.ToLower(f.Name) == "coalesce" && len(f.Args) > 1 { + colRef, ok = f.Args[0].(model.ColumnRef) + if !ok { + logger.WarnWithCtx(ctx).Msgf("invalid argument for %s function: %v. isn't column reference, but %T", e.Name, f.Args[0], f.Args[0]) + return visitChildren() + } + } + } + fmt.Println("KK f start 2", e, colRef) + field, ok := indexSchema.ResolveField(colRef.ColumnName) + fmt.Println("KK f start 2.5", field, ok) + if !ok { + logger.WarnWithCtx(ctx).Msgf("field %s not found in schema for table %s", colRef.ColumnName, query.TableName) + return visitChildren() + } + col, ok := table.Cols[field.InternalPropertyName.AsString()] + if !ok { + logger.WarnWithCtx(ctx).Msgf("field %s not found in table %s", field.InternalPropertyName.AsString(), query.TableName) + return visitChildren() + } + isDatetime := col.IsDatetime() + isDateTime64 := col.IsDatetime64() + fmt.Println("KK f start 3", e, isDatetime, isDateTime64) + if !isDatetime && !isDateTime64 { + return visitChildren() + } + + var clickhouseFunc string + if isDateTime64 { + scope = datetime64 + clickhouseFunc = model.ClickhouseToUnixTimestampMsFromDatetime64Function + } else if isDatetime { + scope = datetime + clickhouseFunc = model.ClickhouseToUnixTimestampMsFromDatetimeFunction + } + + return model.NewFunction(clickhouseFunc, b.VisitChildren(e.Args)...) + } + + fromUnix := func() interface{} { + if len(e.Args) != 1 { + logger.WarnWithCtx(ctx).Msgf("invalid number of arguments for %s function", e.Name) + return visitChildren() + } + + children := b.VisitChildren(e.Args) + var clickhouseFunc string + switch scope { + case datetime: + clickhouseFunc = model.ClickhouseFromUnixTimestampMsToDatetimeFunction + default: + pp.Println("Children", children) + /*threeDigitsOfPrecisionSuffice := utcTs.UnixNano()%1_000_000 == 0 + if threeDigitsOfPrecisionSuffice { + return model.NewFunction("fromUnixTimestamp64Milli", model.NewLiteral(utcTs.UnixMilli())), true + } else { + return model.NewFunction( + "toDateTime64", + model.NewInfixExpr( + model.NewLiteral(utcTs.UnixNano()), + "/", + model.NewLiteral(1_000_000_000), + ), + model.NewLiteral(9), + ), true + }*/ + clickhouseFunc = model.ClickhouseFromUnixTimestampMsToDatetime64Function + } + + return model.NewFunction(clickhouseFunc, b.VisitChildren(e.Args)...) + } + + switch e.Name { + case model.ToUnixTimestampMs: + fmt.Println("KK f START ToUnix", e) + return toUnix() + case model.FromUnixTimestampMs: + fmt.Println("KK f START FromUnix", e) + return fromUnix() + default: + fmt.Println("wtf, name:", e.Name) + return visitChildren() + } + } + + // we look for: DurationLiteral/TimeLiteral + visitor.OverrideVisitLiteral = func(b *model.BaseExprVisitor, l model.LiteralExpr) interface{} { + pp.Println("visitor literal", l) + if timeL, ok := l.Value.(model.TimeLiteral); ok { + ts := timeL.Value + fmt.Println("eee", ts, scope) + switch scope { + case datetime: + return model.NewLiteral(ts.Unix()) + default: + threeDigitsOfPrecisionSuffice := ts.UnixNano()%1_000_000 == 0 + fmt.Println("three?", threeDigitsOfPrecisionSuffice) + if threeDigitsOfPrecisionSuffice { + return model.NewLiteral(ts.UnixMilli()) + } else { + return model.NewFunction( + "toDateTime64", + model.NewInfixExpr( + model.NewLiteral(ts.UnixNano()), + "/", + model.NewLiteral(1_000_000_000), + ), + model.NewLiteral(9), + ) + } + } + } else { + fmt.Println(l.Value) + } + + msLiteral, ok := l.Value.(model.DurationLiteral) + if !ok { + return l.Clone() + } + + fmt.Println("LOL", msLiteral) + + field, ok := indexSchema.ResolveField(msLiteral.TimestampField.ColumnName) + fmt.Println("1 LOL", msLiteral, field, ok) + if !ok { + logger.WarnWithCtx(ctx).Msgf("field %v not found in schema for table %s", msLiteral.TimestampField, query.TableName) + return l.Clone() + } + col, ok := table.Cols[field.InternalPropertyName.AsString()] + fmt.Println("1LOL", msLiteral, col) + if !ok { + logger.WarnWithCtx(ctx).Msgf("field %s not found in table %s", field.InternalPropertyName.AsString(), query.TableName) + return l.Clone() + } + + fmt.Println("2LOL", msLiteral, col.IsDatetime()) + + if col.IsDatetime() { + return model.NewLiteral(msLiteral.Value.Milliseconds() / 1000) + } + return model.NewLiteral(msLiteral.Value.Milliseconds()) + } + + expr := query.SelectCommand.Accept(visitor) + + if _, ok := expr.(*model.SelectCommand); ok { + query.SelectCommand = *expr.(*model.SelectCommand) + } + return query, nil + +} diff --git a/platform/transformations/necessary_transformations.go b/platform/transformations/necessary_transformations.go index fb07d6b63..864a7d879 100644 --- a/platform/transformations/necessary_transformations.go +++ b/platform/transformations/necessary_transformations.go @@ -3,6 +3,7 @@ package transformations import ( + "context" "fmt" "github.com/QuesmaOrg/quesma/platform/common_table" "github.com/QuesmaOrg/quesma/platform/model" @@ -13,7 +14,7 @@ import ( // TODO it should be removed and transformations should be in 1 place. // Only introduced temporarly e.g. for terms_enum to be able to get transformed // (terms_enum package can't import frontend_conntectors package) -func ApplyAllNecessaryCommonTransformations(query *model.Query, schema schema.Schema, isFieldMapSyntaxEnabled bool) (*model.Query, error) { +func ApplyAllNecessaryCommonTransformations(ctx context.Context, query *model.Query, schema schema.Schema, isFieldMapSyntaxEnabled bool) (*model.Query, error) { query, err := ApplyFieldMapSyntax(schema, query, isFieldMapSyntaxEnabled) if err != nil { return nil, err diff --git a/platform/util/utils.go b/platform/util/utils.go index 39b678abb..72cea7ace 100644 --- a/platform/util/utils.go +++ b/platform/util/utils.go @@ -429,39 +429,7 @@ func AssertSqlEqual(t *testing.T, expected, actual string) { fmt.Printf("%s\n", SqlPrettyPrint([]byte(actual))) actualLines := strings.Split(actual, "\n") expectedLines := strings.Split(expected, "\n") - pp.Println("-- First diff: ") - for i, aLine := range actualLines { - if i >= len(expectedLines) { - fmt.Println("Actual is longer than expected") - break - } - eLine := expectedLines[i] - if aLine != eLine { - if i > 0 { - fmt.Println(" ", actualLines[i-1]) - } - fmt.Println(" actual:", aLine) - if i+1 < len(actualLines) { - fmt.Println(" ", actualLines[i+1]) - } - fmt.Println() - if i > 0 { - fmt.Println(" ", expectedLines[i-1]) - } - fmt.Println("expected:", eLine) - if i+1 < len(expectedLines) { - fmt.Println(" ", expectedLines[i+1]) - } - - for j := range min(len(aLine), len(eLine)) { - if aLine[j] != eLine[j] { - fmt.Printf("First diff in line %d at index %d (actual: %c, expected: %c)\n", i, j, aLine[j], eLine[j]) - break - } - } - break - } - } + pp.Printf("-- First diff: \n%s\n", firstDiff(expectedLines, actualLines)) t.Errorf("Expected: %s\n\nactual: %s", expected, actual) } } @@ -854,6 +822,10 @@ func InitSqlMockWithPrettySqlAndPrint(t *testing.T, matchExpectationsInOrder boo fmt.Printf("%s\n", mismatch.expected) pp.Printf("---- %s Actual pretty:\n", t.Name()) fmt.Printf("%s\n", mismatch.actual) + + actualPretty := strings.Split(SqlPrettyPrint([]byte(mismatch.actual)), "\n") + expectedPretty := strings.Split(SqlPrettyPrint([]byte(mismatch.expected)), "\n") + pp.Printf("-- First diff: \n%s\n", firstDiff(expectedPretty, actualPretty)) } } }) @@ -1041,3 +1013,41 @@ func ReadResponseBody(resp *http.Response) ([]byte, error) { resp.Body = io.NopCloser(bytes.NewBuffer(respBody)) return respBody, nil } + +func firstDiff(expectedLines, actualLines []string) string { + var diff strings.Builder + for i, aLine := range actualLines { + if i >= len(expectedLines) { + fmt.Println("Actual is longer than expected") + break + } + eLine := expectedLines[i] + if aLine != eLine { + if i > 0 { + diff.WriteString(fmt.Sprintf("\t\t%s", actualLines[i-1])) + } + diff.WriteString(fmt.Sprintf(" actual: %s", aLine)) + if i+1 < len(actualLines) { + diff.WriteString(fmt.Sprintf("\t\t%s", actualLines[i+1])) + } + diff.WriteString("\n") + if i > 0 { + diff.WriteString(fmt.Sprintf("\t\t%s", expectedLines[i-1])) + } + diff.WriteString(fmt.Sprintf(" expected: %s", eLine)) + if i+1 < len(expectedLines) { + diff.WriteString(fmt.Sprintf("\t\t%s", expectedLines[i+1])) + } + + for j := range min(len(aLine), len(eLine)) { + if aLine[j] != eLine[j] { + diff.WriteString(fmt.Sprintf("First diff in line %d at index %d (actual: %c, expected: %c)\n", i, j, aLine[j], eLine[j])) + break + } + } + break + } + } + + return diff.String() +}