diff --git a/platform/backend_connectors/basic_sql_backend_connector.go b/platform/backend_connectors/basic_sql_backend_connector.go index 4a284ede6..94b665237 100644 --- a/platform/backend_connectors/basic_sql_backend_connector.go +++ b/platform/backend_connectors/basic_sql_backend_connector.go @@ -7,6 +7,7 @@ import ( "context" "database/sql" "github.com/QuesmaOrg/quesma/platform/config" + "github.com/QuesmaOrg/quesma/platform/logger" quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core" ) @@ -64,6 +65,7 @@ func (p *BasicSqlBackendConnector) Ping() error { } func (p *BasicSqlBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) { + logger.Info().Msgf("query sql: %s", query) rows, err := p.connection.QueryContext(ctx, query, args...) if err != nil { return nil, err @@ -72,10 +74,12 @@ func (p *BasicSqlBackendConnector) Query(ctx context.Context, query string, args } func (p *BasicSqlBackendConnector) QueryRow(ctx context.Context, query string, args ...interface{}) quesma_api.Row { + logger.Info().Msgf("queryRow sql: %s", query) return p.connection.QueryRowContext(ctx, query, args...) } func (p *BasicSqlBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error { + logger.Info().Msgf("Exec sql: %s", query) if len(args) == 0 { _, err := p.connection.ExecContext(ctx, query) return err diff --git a/platform/clickhouse/connection.go b/platform/clickhouse/connection.go index fc0e1d1ed..1396b3421 100644 --- a/platform/clickhouse/connection.go +++ b/platform/clickhouse/connection.go @@ -6,9 +6,7 @@ import ( "crypto/tls" "database/sql" "fmt" - "github.com/ClickHouse/clickhouse-go/v2" "github.com/QuesmaOrg/quesma/platform/backend_connectors" - "github.com/QuesmaOrg/quesma/platform/buildinfo" "github.com/QuesmaOrg/quesma/platform/config" "github.com/QuesmaOrg/quesma/platform/logger" quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core" @@ -19,40 +17,68 @@ import ( func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql.DB { - options := clickhouse.Options{Addr: []string{c.ClickHouse.Url.Host}} - if c.ClickHouse.User != "" || c.ClickHouse.Password != "" || c.ClickHouse.Database != "" { + dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true", + c.ClickHouse.User, + c.ClickHouse.Password, + c.ClickHouse.Url.Host, + c.ClickHouse.Database) - options.Auth = clickhouse.Auth{ - Username: c.ClickHouse.User, - Password: c.ClickHouse.Password, - Database: c.ClickHouse.Database, - } - } - if !c.ClickHouse.DisableTLS { - options.TLS = tlsConfig - } - - info := struct { - Name string - Version string - }{ - Name: "quesma", - Version: buildinfo.Version, + db, err := sql.Open("mysql", dsn) + if err != nil { + logger.Error().Err(err).Msg("failed to initialize Doris connection pool") + return nil } - // Setting limit here is not working. It causes runtime error. - // Set it after opening the connection. - // - // options.MaxIdleConns = 50 - // options.MaxOpenConns = 50 - // options.ConnMaxLifetime = 0 + db.SetMaxOpenConns(30) + db.SetMaxIdleConns(10) + db.SetConnMaxLifetime(time.Hour) - options.ClientInfo.Products = append(options.ClientInfo.Products, info) + if err := db.Ping(); err != nil { + logger.Error().Err(err).Msg("failed to ping Doris server") + return nil + } - return clickhouse.OpenDB(&options) + logger.Info().Msg("Doris connection pool initialized successfully") + return db } +//func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql.DB { +// +// options := clickhouse.Options{Addr: []string{c.ClickHouse.Url.Host}} +// if c.ClickHouse.User != "" || c.ClickHouse.Password != "" || c.ClickHouse.Database != "" { +// +// options.Auth = clickhouse.Auth{ +// Username: c.ClickHouse.User, +// Password: c.ClickHouse.Password, +// Database: c.ClickHouse.Database, +// } +// } +// if !c.ClickHouse.DisableTLS { +// options.TLS = tlsConfig +// } +// +// info := struct { +// Name string +// Version string +// }{ +// Name: "quesma", +// Version: buildinfo.Version, +// } +// +// // Setting limit here is not working. It causes runtime error. +// // Set it after opening the connection. +// // +// // options.MaxIdleConns = 50 +// // options.MaxOpenConns = 50 +// // options.ConnMaxLifetime = 0 +// +// options.ClientInfo.Products = append(options.ClientInfo.Products, info) +// +// return clickhouse.OpenDB(&options) +// +//} + func InitDBConnectionPool(c *config.QuesmaConfiguration) quesma_api.BackendConnector { if c.ClickHouse.Url == nil { return nil diff --git a/platform/clickhouse/quesma_communicator.go b/platform/clickhouse/quesma_communicator.go index 47b2a28e8..aa8de3207 100644 --- a/platform/clickhouse/quesma_communicator.go +++ b/platform/clickhouse/quesma_communicator.go @@ -223,7 +223,15 @@ func read(ctx context.Context, rows quesma_api.Rows, selectFields []string, rowT } resultRow := model.QueryResultRow{Cols: make([]model.QueryResultCol, len(selectFields))} for i, field := range selectFields { - resultRow.Cols[i] = model.QueryResultCol{ColName: field, Value: rowToScan[i]} + var val interface{} + switch v := rowToScan[i].(type) { + // just for doris query + case []uint8: + val = string(v) + default: + val = v + } + resultRow.Cols[i] = model.QueryResultCol{ColName: field, Value: val} } resultRows = append(resultRows, resultRow) } diff --git a/platform/clickhouse/schema.go b/platform/clickhouse/schema.go index de46f15d1..a50660421 100644 --- a/platform/clickhouse/schema.go +++ b/platform/clickhouse/schema.go @@ -73,6 +73,7 @@ const ( DateTime64 DateTimeType = iota DateTime Invalid + datetime ) func (c *Column) String() string { @@ -220,35 +221,75 @@ func NewBaseType(clickHouseTypeName string) BaseType { // this is catch all type for all types we do not exlicitly support type UnknownType struct{} -func ResolveType(clickHouseTypeName string) reflect.Type { - switch clickHouseTypeName { - case "String", "LowCardinality(String)", "UUID", "FixedString": +func ResolveType(dorisTypeName string) reflect.Type { + dorisTypeName = strings.ToLower(dorisTypeName) + switch dorisTypeName { + case "char", "varchar", "string", "text": return reflect.TypeOf("") - case "DateTime64", "DateTime", "Date", "DateTime64(3)": + case "date", "datetime", "datev2", "datetimev2": return reflect.TypeOf(time.Time{}) - case "UInt8", "UInt16", "UInt32", "UInt64": - return reflect.TypeOf(uint64(0)) - case "Int8", "Int16", "Int32": + case "tinyint", "smallint", "int": return reflect.TypeOf(int32(0)) - case "Int64": + case "bigint": return reflect.TypeOf(int64(0)) - case "Float32", "Float64": - return reflect.TypeOf(float64(0)) - case "Point": - return reflect.TypeOf(Point{}) - case "Bool": + case "largeint": + return reflect.TypeOf("") // LargeInt is typically handled as string due to size + case "boolean": return reflect.TypeOf(true) - case "JSON": + case "float": + return reflect.TypeOf(float32(0)) + case "double": + return reflect.TypeOf(float64(0)) + case "decimal", "decimalv2", "decimal32", "decimal64", "decimal128": + return reflect.TypeOf("") // Decimals often handled as strings for precision + case "json": + return reflect.TypeOf(map[string]interface{}{}) + case "hll": + return reflect.TypeOf([]byte{}) // HLL is a binary type + case "bitmap": + return reflect.TypeOf([]byte{}) // Bitmap is also binary + case "array": + return reflect.TypeOf([]interface{}{}) + case "map": return reflect.TypeOf(map[string]interface{}{}) - case "Map(String, Nullable(String))", "Map(String, String)", "Map(LowCardinality(String), String)", "Map(LowCardinality(String), Nullable(String))": - return reflect.TypeOf(map[string]string{}) - case "Unknown": - return reflect.TypeOf(UnknownType{}) + case "struct": + return reflect.TypeOf(map[string]interface{}{}) + case "null": + return reflect.TypeOf(nil) } return nil } +//func ResolveType(clickHouseTypeName string) reflect.Type { +// switch clickHouseTypeName { +// case "String", "LowCardinality(String)", "UUID", "FixedString": +// return reflect.TypeOf("") +// case "DateTime64", "DateTime", "Date", "DateTime64(3)": +// return reflect.TypeOf(time.Time{}) +// case "UInt8", "UInt16", "UInt32", "UInt64": +// return reflect.TypeOf(uint64(0)) +// case "Int8", "Int16", "Int32": +// return reflect.TypeOf(int32(0)) +// case "Int64": +// return reflect.TypeOf(int64(0)) +// case "Float32", "Float64": +// return reflect.TypeOf(float64(0)) +// case "Point": +// return reflect.TypeOf(Point{}) +// case "Bool": +// return reflect.TypeOf(true) +// case "JSON": +// return reflect.TypeOf(map[string]interface{}{}) +// case "Map(String, Nullable(String))", "Map(String, String)", "Map(LowCardinality(String), String)", "Map(LowCardinality(String), Nullable(String))": +// return reflect.TypeOf(map[string]string{}) +// case "Unknown": +// return reflect.TypeOf(UnknownType{}) +// } +// +// return nil +//} + // 'value': value of a field, from unmarshalled JSON // 'valueOrigin': name of the field (for error messages) func NewType(value any, valueOrigin string) (Type, error) { @@ -428,7 +469,7 @@ func NewDefaultBoolAttribute() Attribute { } func (dt DateTimeType) String() string { - return []string{"DateTime64", "DateTime", "Invalid"}[dt] + return []string{"DateTime64", "DateTime", "datetime", "Invalid"}[dt] } func IsColumnAttributes(colName string) bool { diff --git a/platform/clickhouse/table.go b/platform/clickhouse/table.go index 576f2ebcd..127ee7e40 100644 --- a/platform/clickhouse/table.go +++ b/platform/clickhouse/table.go @@ -106,10 +106,10 @@ func (t *Table) GetDateTimeType(ctx context.Context, fieldName string, dateInSch if col, ok := t.Cols[fieldName]; ok { typeName := col.Type.String() // hasPrefix, not equal, because we can have DateTime64(3) and we want to catch it - if strings.HasPrefix(typeName, "DateTime64") { + if strings.HasPrefix(typeName, "datetime") { return DateTime64 } - if strings.HasPrefix(typeName, "DateTime") { + if strings.HasPrefix(typeName, "date") { return DateTime } } diff --git a/platform/clickhouse/table_discovery.go b/platform/clickhouse/table_discovery.go index 4a1947f86..019da3304 100644 --- a/platform/clickhouse/table_discovery.go +++ b/platform/clickhouse/table_discovery.go @@ -802,7 +802,8 @@ func (td *tableDiscovery) readTables(database string) (map[string]map[string]col return map[string]map[string]columnMetadata{}, fmt.Errorf("database connection pool is nil, cannot describe tables") } - rows, err := td.dbConnPool.Query(context.Background(), "SELECT table, name, type, comment FROM system.columns WHERE database = ?", database) + querySql := fmt.Sprintf("SELECT table_name, column_name, data_type, column_comment FROM information_schema.columns WHERE table_schema = '%s'", database) + rows, err := td.dbConnPool.Query(context.Background(), querySql) if err != nil { err = end_user_errors.GuessClickhouseErrorType(err).InternalDetails("reading list of columns from system.columns") @@ -861,18 +862,18 @@ func (td *tableDiscovery) getTimestampFieldForClickHouse(database, table string) } func (td *tableDiscovery) tableComment(database, table string) (comment string) { - err := td.dbConnPool.QueryRow(context.Background(), "SELECT comment FROM system.tables WHERE database = ? and table = ?", database, table).Scan(&comment) - if err != nil { - logger.Error().Msgf("could not get table comment: %v", err) - } + //err := td.dbConnPool.QueryRow(context.Background(), "SELECT comment FROM system.tables WHERE database = ? and table = ?", database, table).Scan(&comment) + //if err != nil { + // logger.Error().Msgf("could not get table comment: %v", err) + //} return comment } func (td *tableDiscovery) createTableQuery(database, table string) (ddl string) { - err := td.dbConnPool.QueryRow(context.Background(), "SELECT create_table_query FROM system.tables WHERE database = ? and table = ? ", database, table).Scan(&ddl) - if err != nil { - logger.Error().Msgf("could not get create table statement: %v", err) - } + //err := td.dbConnPool.QueryRow(context.Background(), "SELECT create_table_query FROM system.tables WHERE database = ? and table = ? ", database, table).Scan(&ddl) + //if err != nil { + // logger.Error().Msgf("could not get create table statement: %v", err) + //} return ddl } diff --git a/platform/clickhouse/type_adapter.go b/platform/clickhouse/type_adapter.go index 7d4171269..08c562f41 100644 --- a/platform/clickhouse/type_adapter.go +++ b/platform/clickhouse/type_adapter.go @@ -10,39 +10,78 @@ import ( type SchemaTypeAdapter struct { } +//func (c SchemaTypeAdapter) Convert(s string) (schema.QuesmaType, bool) { +// for isArray(s) { +// s = arrayType(s) +// } +// switch { +// case strings.HasPrefix(s, "Unknown"): +// return schema.QuesmaTypeUnknown, true +// case strings.HasPrefix(s, "Tuple"): +// return schema.QuesmaTypeObject, true +// } +// +// switch s { +// case "String": +// return schema.QuesmaTypeText, true +// case "LowCardinality(String)", "UUID", "FixedString": +// return schema.QuesmaTypeKeyword, true +// case "Int", "Int8", "Int16", "Int32", "Int64": +// return schema.QuesmaTypeLong, true +// case "UInt8", "UInt16", "UInt32", "UInt64", "UInt128", "UInt256": +// return schema.QuesmaTypeInteger, true +// case "Bool": +// return schema.QuesmaTypeBoolean, true +// case "Float32", "Float64": +// return schema.QuesmaTypeFloat, true +// case "DateTime", "DateTime64": +// return schema.QuesmaTypeTimestamp, true +// case "Date": +// return schema.QuesmaTypeDate, true +// case "Point": +// return schema.QuesmaTypePoint, true +// case "Map(String, Nullable(String))", "Map(String, String)", "Map(LowCardinality(String), Nullable(String))", "Map(LowCardinality(String), String)", +// "Map(String, Int)", "Map(LowCardinality(String), Int)", "Map(String, Nullable(Int))", "Map(LowCardinality(String), Nullable(Int))": +// return schema.QuesmaTypeMap, true +// default: +// return schema.QuesmaTypeUnknown, false +// } +//} + func (c SchemaTypeAdapter) Convert(s string) (schema.QuesmaType, bool) { + s = strings.ToUpper(s) + for isArray(s) { s = arrayType(s) + s = strings.ToUpper(s) } + switch { - case strings.HasPrefix(s, "Unknown"): + case strings.HasPrefix(s, "UNKNOWN"): return schema.QuesmaTypeUnknown, true - case strings.HasPrefix(s, "Tuple"): + case strings.HasPrefix(s, "STRUCT"): return schema.QuesmaTypeObject, true + case strings.HasPrefix(s, "MAP"): + return schema.QuesmaTypeMap, true } switch s { - case "String": - return schema.QuesmaTypeText, true - case "LowCardinality(String)", "UUID", "FixedString": - return schema.QuesmaTypeKeyword, true - case "Int", "Int8", "Int16", "Int32", "Int64": - return schema.QuesmaTypeLong, true - case "UInt8", "UInt16", "UInt32", "UInt64", "UInt128", "UInt256": - return schema.QuesmaTypeInteger, true - case "Bool": + case "BOOLEAN": return schema.QuesmaTypeBoolean, true - case "Float32", "Float64": + case "TINYINT", "SMALLINT", "INT", "BIGINT", "LARGEINT": + return schema.QuesmaTypeLong, true + case "FLOAT", "DOUBLE": return schema.QuesmaTypeFloat, true - case "DateTime", "DateTime64": - return schema.QuesmaTypeTimestamp, true - case "Date": + case "DECIMAL", "DECIMAL32", "DECIMAL64", "DECIMAL128": + return schema.QuesmaTypeKeyword, true // mapping to keyword type + case "STRING", "CHAR", "VARCHAR": + return schema.QuesmaTypeText, true + //case "DATETIME", "DATETIMEV2": + // return schema.QuesmaTypeTimestamp, true + case "DATE", "DATEV2", "DATETIME", "DATETIMEV2": return schema.QuesmaTypeDate, true - case "Point": - return schema.QuesmaTypePoint, true - case "Map(String, Nullable(String))", "Map(String, String)", "Map(LowCardinality(String), Nullable(String))", "Map(LowCardinality(String), String)", - "Map(String, Int)", "Map(LowCardinality(String), Int)", "Map(String, Nullable(Int))", "Map(LowCardinality(String), Nullable(Int))": - return schema.QuesmaTypeMap, true + case "JSON", "VARIANT": + return schema.QuesmaTypeObject, true default: return schema.QuesmaTypeUnknown, false } diff --git a/platform/clickhouse/util.go b/platform/clickhouse/util.go index 1b6f430ff..afd5faaa7 100644 --- a/platform/clickhouse/util.go +++ b/platform/clickhouse/util.go @@ -118,20 +118,25 @@ func PrettyJson(jsonStr string) string { // - timestampGroupBy("@timestamp", DateTime, 30 seconds) --> toInt64(toUnixTimestamp(`@timestamp`)/30) func TimestampGroupBy(timestampField model.Expr, typ DateTimeType, groupByInterval time.Duration) model.Expr { - createAExp := func(innerFuncName string, interval int64) model.Expr { + createAExp := func(innerFuncName string, interval int64, offsetMultiplier int64) model.Expr { toUnixTsFunc := model.NewInfixExpr( model.NewFunction(innerFuncName, timestampField), - " / ", // TODO nasty hack to make our string-based tests pass. Operator should not contain spaces obviously - model.NewLiteral(interval)) - return model.NewFunction("toInt64", toUnixTsFunc) + " * ", + model.NewLiteral(offsetMultiplier)) + + //" / ", // TODO nasty hack to make our string-based tests pass. Operator should not contain spaces obviously + //model.NewLiteral(interval)) + + toUnixTsFunc = model.NewInfixExpr(toUnixTsFunc, " / ", model.NewLiteral(interval)) + return model.NewFunction("FLOOR", toUnixTsFunc) } switch typ { case DateTime64: // as string: fmt.Sprintf("toInt64(toUnixTimestamp(`%s`)/%f)", timestampFieldName, groupByInterval.Seconds()) - return createAExp("toUnixTimestamp64Milli", groupByInterval.Milliseconds()) + return createAExp("UNIX_TIMESTAMP", groupByInterval.Milliseconds(), 1000) case DateTime: - return createAExp("toUnixTimestamp", groupByInterval.Milliseconds()/1000) + return createAExp("UNIX_TIMESTAMP", groupByInterval.Milliseconds()/1000, 1) default: logger.Error().Msgf("invalid timestamp fieldname: %s", timestampFieldName) return model.NewLiteral("invalid") // maybe create new type InvalidExpr? @@ -142,46 +147,46 @@ func TimestampGroupByWithTimezone(timestampField model.Expr, typ DateTimeType, groupByInterval time.Duration, timezone string) model.Expr { // If no timezone, or timezone is default (UTC), we just return TimestampGroupBy(...) - if timezone == "" { - return TimestampGroupBy(timestampField, typ, groupByInterval) - } - - createAExp := func(innerFuncName string, interval, offsetMultiplier int64) model.Expr { - var offset model.Expr - offset = model.NewFunction( - "timeZoneOffset", - model.NewFunction( - "toTimezone", - timestampField, model.NewLiteral("'"+timezone+"'"), - ), - ) - if offsetMultiplier != 1 { - offset = model.NewInfixExpr(offset, "*", model.NewLiteral(offsetMultiplier)) - } - - unixTsWithOffset := model.NewInfixExpr( - model.NewFunction(innerFuncName, timestampField), - "+", - offset, - ) - - groupByExpr := model.NewInfixExpr( - model.NewParenExpr(unixTsWithOffset), - " / ", // TODO nasty hack to make our string-based tests pass. Operator should not contain spaces obviously - model.NewLiteral(interval), - ) - - return model.NewFunction("toInt64", groupByExpr) - } - - switch typ { - case DateTime64: - // e.g: (toUnixTimestamp64Milli("timestamp")+timeZoneOffset(toTimezone("timestamp",'Europe/Warsaw'))*1000) / 600000 - return createAExp("toUnixTimestamp64Milli", groupByInterval.Milliseconds(), 1000) - case DateTime: - return createAExp("toUnixTimestamp", groupByInterval.Milliseconds()/1000, 1) - default: - logger.Error().Msgf("invalid timestamp fieldname: %s", timestampFieldName) - return model.NewLiteral("invalid") // maybe create new type InvalidExpr? - } + //if timezone == "" { + return TimestampGroupBy(timestampField, typ, groupByInterval) + //} + // + //createAExp := func(innerFuncName string, interval, offsetMultiplier int64) model.Expr { + // var offset model.Expr + // offset = model.NewFunction( + // "timeZoneOffset", + // model.NewFunction( + // "toTimezone", + // timestampField, model.NewLiteral("'"+timezone+"'"), + // ), + // ) + // if offsetMultiplier != 1 { + // offset = model.NewInfixExpr(offset, "*", model.NewLiteral(offsetMultiplier)) + // } + // + // unixTsWithOffset := model.NewInfixExpr( + // model.NewFunction(innerFuncName, timestampField), + // "+", + // offset, + // ) + // + // groupByExpr := model.NewInfixExpr( + // model.NewParenExpr(unixTsWithOffset), + // " / ", // TODO nasty hack to make our string-based tests pass. Operator should not contain spaces obviously + // model.NewLiteral(interval), + // ) + // + // return model.NewFunction("toInt64", groupByExpr) + //} + // + //switch typ { + //case DateTime64: + // // e.g: (toUnixTimestamp64Milli("timestamp")+timeZoneOffset(toTimezone("timestamp",'Europe/Warsaw'))*1000) / 600000 + // return createAExp("UNIX_TIMESTAMP", groupByInterval.Milliseconds(), 1000) + //case DateTime: + // return createAExp("UNIX_TIMESTAMP", groupByInterval.Milliseconds()/1000, 1) + //default: + // logger.Error().Msgf("invalid timestamp fieldname: %s", timestampFieldName) + // return model.NewLiteral("invalid") // maybe create new type InvalidExpr? + //} } diff --git a/platform/connectors/connector.go b/platform/connectors/connector.go index 1bf131412..7082da073 100644 --- a/platform/connectors/connector.go +++ b/platform/connectors/connector.go @@ -3,10 +3,8 @@ package connectors import ( - "fmt" "github.com/QuesmaOrg/quesma/platform/clickhouse" "github.com/QuesmaOrg/quesma/platform/config" - "github.com/QuesmaOrg/quesma/platform/licensing" "github.com/QuesmaOrg/quesma/platform/logger" "github.com/QuesmaOrg/quesma/platform/telemetry" quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core" @@ -27,14 +25,14 @@ func (c *ConnectorManager) GetConnector() *clickhouse.LogManager { if len(c.connectors) == 0 { panic("No connectors found") } - conn := c.connectors[0] - if !c.connectors[0].GetConnector().IsInTransparentProxyMode() { - go func() { - if err := conn.LicensingCheck(); err != nil { - licensing.PanicWithLicenseViolation(fmt.Errorf("connector [%s] reported licensing issue: [%v]", conn.Type(), err)) - } - }() - } + //conn := c.connectors[0] + //if !c.connectors[0].GetConnector().IsInTransparentProxyMode() { + // go func() { + // if err := conn.LicensingCheck(); err != nil { + // licensing.PanicWithLicenseViolation(fmt.Errorf("connector [%s] reported licensing issue: [%v]", conn.Type(), err)) + // } + // }() + //} return c.connectors[0].GetConnector() } diff --git a/platform/frontend_connectors/schema_array_transformer.go b/platform/frontend_connectors/schema_array_transformer.go index 5f5f3ffdd..7fe87241a 100644 --- a/platform/frontend_connectors/schema_array_transformer.go +++ b/platform/frontend_connectors/schema_array_transformer.go @@ -16,6 +16,8 @@ import ( // // +const array = "array" + type functionWithCombinator struct { baseFunctionName string isArray bool @@ -28,7 +30,7 @@ type functionWithCombinator struct { func (f functionWithCombinator) String() string { result := f.baseFunctionName if f.isArray { - result = result + "Array" + result = result + array } if f.isIf { result = result + "If" @@ -94,7 +96,7 @@ func NewArrayTypeVisitor(resolver arrayTypeResolver) (exprVisitor model.ExprVisi column, ok := e.Left.(model.ColumnRef) if ok { dbType := resolver.dbColumnType(column.ColumnName) - if strings.HasPrefix(dbType, "Array") { + if strings.HasPrefix(dbType, array) { op := strings.TrimSpace(e.Op) opUpperCase := strings.ToUpper(op) switch { @@ -109,7 +111,8 @@ func NewArrayTypeVisitor(resolver arrayTypeResolver) (exprVisitor model.ExprVisi case op == "=": return model.NewFunction("has", e.Left, e.Right.Accept(b).(model.Expr)) - + case op == model.MatchOperator && dbType == array: + return model.NewFunction("ARRAY_CONTAINS", e.Left, e.Right.Accept(b).(model.Expr)) default: anyError = true // add context to log line below (already introduced in unmerged Krzysiek's PR) @@ -134,7 +137,7 @@ func NewArrayTypeVisitor(resolver arrayTypeResolver) (exprVisitor model.ExprVisi column, ok := arg.(model.ColumnRef) if ok { dbType := resolver.dbColumnType(column.ColumnName) - if strings.HasPrefix(dbType, "Array") { + if strings.HasPrefix(dbType, array) { funcParsed := parseFunctionWithCombinator(e.Name) funcParsed.isArray = true childGotArrayFunc = true @@ -163,7 +166,7 @@ func NewArrayTypeVisitor(resolver arrayTypeResolver) (exprVisitor model.ExprVisi visitor.OverrideVisitColumnRef = func(b *model.BaseExprVisitor, e model.ColumnRef) interface{} { dbType := resolver.dbColumnType(e.ColumnName) - if strings.HasPrefix(dbType, "Array") { + if strings.HasPrefix(dbType, array) { // add context to log line below (already introduced in unmerged Krzysiek's PR) logger.WarnWithReason("unhandled array column ref").Msgf("column '%v' ('%v')", e.ColumnName, dbType) } @@ -181,7 +184,7 @@ func checkIfGroupingByArrayColumn(selectCommand model.SelectCommand, resolver ar findArrayColumn.OverrideVisitColumnRef = func(b *model.BaseExprVisitor, e model.ColumnRef) interface{} { dbType := resolver.dbColumnType(e.ColumnName) - if strings.HasPrefix(dbType, "Array") { + if strings.HasPrefix(dbType, array) { columnIsArray = true } return e @@ -231,7 +234,7 @@ func NewArrayJoinVisitor(resolver arrayTypeResolver) model.ExprVisitor { visitor.OverrideVisitColumnRef = func(b *model.BaseExprVisitor, e model.ColumnRef) interface{} { dbType := resolver.dbColumnType(e.ColumnName) - if strings.HasPrefix(dbType, "Array") { + if strings.HasPrefix(dbType, array) { return model.NewFunction("arrayJoin", model.NewFunction("arrayDistinct", e)) } return e diff --git a/platform/frontend_connectors/schema_search_after_transformer.go b/platform/frontend_connectors/schema_search_after_transformer.go index ee0750d44..49d36431c 100644 --- a/platform/frontend_connectors/schema_search_after_transformer.go +++ b/platform/frontend_connectors/schema_search_after_transformer.go @@ -125,7 +125,7 @@ func (s searchAfterStrategyBasicAndFast) validateAndParse(query *model.Query, in if number, isNumber := util.ExtractNumeric64Maybe(searchAfterValue); isNumber { if number >= 0 && util.IsFloat64AnInt64(number) { // this param will always be timestamp in milliseconds, as we create it like this while rendering hits - searchAfterParsed[i] = model.NewFunction("fromUnixTimestamp64Milli", model.NewLiteral(int64(number))) + searchAfterParsed[i] = model.NewFunction("FROM_UNIXTIME", model.NewLiteral(int64(number)/1000)) } else { return nil, fmt.Errorf("for basicAndFast strategy, search_after must be a unix timestamp in milliseconds") } diff --git a/platform/frontend_connectors/schema_transformer.go b/platform/frontend_connectors/schema_transformer.go index bbd7bcf6d..7283c4cf7 100644 --- a/platform/frontend_connectors/schema_transformer.go +++ b/platform/frontend_connectors/schema_transformer.go @@ -186,15 +186,19 @@ func (s *SchemaCheckPass) applyGeoTransformations(schemaInstance schema.Schema, lon := model.NewColumnRef(field.InternalPropertyName.AsString() + "_lon") lat := model.NewColumnRef(field.InternalPropertyName.AsString() + "_lat") + lonFun := model.NewFunction("CAST", model.NewAliasedExpr(model.NewColumnRef(field.InternalPropertyName.AsString()+"_lon"), "string")) + latFun := model.NewFunction("CAST", model.NewAliasedExpr(model.NewColumnRef(field.InternalPropertyName.AsString()+"_lat"), "string")) + // This is a workaround. Clickhouse Point is defined as Tuple. We need to know the type of the tuple. // In this step we merge two columns into single map here. Map is in elastic format. // In this point we assume that Quesma point type is stored into two separate columns. - replace[field.InternalPropertyName.AsString()] = model.NewFunction("map", - model.NewLiteral("'lat'"), - lat, - model.NewLiteral("'lon'"), - lon) + replace[field.InternalPropertyName.AsString()] = model.NewFunction("CONCAT", + model.NewLiteral("'{\"lat\":'"), + latFun, + model.NewLiteral("',\"lon\":'"), + lonFun, + model.NewLiteral("'}'")) // these a just if we need multifields support replace[field.InternalPropertyName.AsString()+".lat"] = lat @@ -307,7 +311,7 @@ func (s *SchemaCheckPass) applyArrayTransformations(indexSchema schema.Schema, q hasArrayColumn := false for _, col := range allColumns { dbType := arrayTypeResolver.dbColumnType(col.ColumnName) - if strings.HasPrefix(dbType, "Array") { + if strings.HasPrefix(dbType, "array") { hasArrayColumn = true break } @@ -617,6 +621,9 @@ func (s *SchemaCheckPass) applyFullTextField(indexSchema schema.Schema, query *m if (strings.ToUpper(e.Op) == "LIKE" || strings.ToUpper(e.Op) == "ILIKE") && model.AsString(e.Right) == "'%'" { return model.NewLiteral(true) } + if strings.ToUpper(e.Op) == "MATCH" { + return model.NewLiteral(true) + } return model.NewLiteral(false) } @@ -864,7 +871,7 @@ func (s *SchemaCheckPass) convertQueryDateTimeFunctionToClickhouse(indexSchema s if len(e.Args) != 1 { return e } - return model.NewFunction("toHour", e.Args[0].Accept(b).(model.Expr)) + return model.NewFunction("HOUR", e.Args[0].Accept(b).(model.Expr)) // TODO this is a place for over date/time related functions // add more @@ -1095,6 +1102,55 @@ func (s *SchemaCheckPass) applyMatchOperator(indexSchema schema.Schema, query *m } } + if okLeft && okRight && e.Op == model.MatchPhraseOperator { + field, found := indexSchema.ResolveFieldByInternalName(lhsCol.ColumnName) + if !found { + // indexSchema won't find attributes columns, that's why this check + if clickhouse.IsColumnAttributes(lhsCol.ColumnName) { + colIsAttributes = true + } else { + logger.Error().Msgf("Field %s not found in schema for table %s, should never happen here", lhsCol.ColumnName, query.TableName) + goto experimental + } + } + + rhsValue = strings.TrimPrefix(rhsValue, "'") + rhsValue = strings.TrimSuffix(rhsValue, "'") + + matchParse := func() model.Expr { + return model.NewInfixExpr(lhs, "MATCH_PHRASE", rhs.Clone()) + } + equal := func() model.Expr { + return model.NewInfixExpr(lhs, "=", rhs.Clone()) + } + + // handling case when e.Left is an array access + if lhsIsArrayAccess { + if colIsAttributes || field.IsMapWithStringValues() { // attributes always have string values, so ilike + return matchParse() + } else { + return equal() + } + } + + // handling case when e.Left is a simple column ref + // TODO: improve? we seem to be `ilike'ing` too much + switch field.Type.String() { + case schema.QuesmaTypeInteger.Name, schema.QuesmaTypeLong.Name, schema.QuesmaTypeUnsignedLong.Name, schema.QuesmaTypeFloat.Name, schema.QuesmaTypeBoolean.Name: + rhs.Value = strings.Trim(rhsValue, "%") + rhs.EscapeType = model.NormalNotEscaped + return equal() + case schema.QuesmaTypeKeyword.Name: + return equal() + default: + if rhsValue == "%%" { // ILIKE '%%' has terrible performance, but semantically means "is not null", hence this transformation + return model.NewInfixExpr(lhs, "IS", model.NewLiteral("NOT NULL")) + } + return matchParse() + } + + } + if okLeft && okRight && e.Op == model.MatchOperator { field, found := indexSchema.ResolveFieldByInternalName(lhsCol.ColumnName) if !found { @@ -1111,7 +1167,7 @@ func (s *SchemaCheckPass) applyMatchOperator(indexSchema schema.Schema, query *m rhsValue = strings.TrimSuffix(rhsValue, "'") ilike := func() model.Expr { - return model.NewInfixExpr(lhs, "ILIKE", rhs.Clone()) + return model.NewInfixExpr(lhs, "MATCH", rhs.Clone()) } equal := func() model.Expr { return model.NewInfixExpr(lhs, "=", rhs.Clone()) diff --git a/platform/model/base_visitor.go b/platform/model/base_visitor.go index 6e2086337..aa37a2f26 100644 --- a/platform/model/base_visitor.go +++ b/platform/model/base_visitor.go @@ -12,6 +12,7 @@ type BaseExprVisitor struct { OverrideVisitNestedProperty func(b *BaseExprVisitor, e NestedProperty) interface{} OverrideVisitArrayAccess func(b *BaseExprVisitor, e ArrayAccess) interface{} OverrideVisitOrderByExpr func(b *BaseExprVisitor, e OrderByExpr) interface{} + OverrideVisitGroupByExpr func(b *BaseExprVisitor, e GroupByExpr) interface{} OverrideVisitDistinctExpr func(b *BaseExprVisitor, e DistinctExpr) interface{} OverrideVisitTableRef func(b *BaseExprVisitor, e TableRef) interface{} OverrideVisitAliasedExpr func(b *BaseExprVisitor, e AliasedExpr) interface{} @@ -107,6 +108,13 @@ func (v *BaseExprVisitor) VisitTableRef(e TableRef) interface{} { return e } +func (v *BaseExprVisitor) VisitGroupByExpr(e GroupByExpr) interface{} { + if v.OverrideVisitGroupByExpr != nil { + return v.OverrideVisitGroupByExpr(v, e) + } + return GroupByExpr{Expr: e.Expr.Accept(v).(Expr), GroupAlias: e.GroupAlias} +} + func (v *BaseExprVisitor) VisitOrderByExpr(e OrderByExpr) interface{} { if v.OverrideVisitOrderByExpr != nil { return v.OverrideVisitOrderByExpr(v, e) diff --git a/platform/model/bucket_aggregations/date_histogram.go b/platform/model/bucket_aggregations/date_histogram.go index 49083f653..b5ed9688d 100644 --- a/platform/model/bucket_aggregations/date_histogram.go +++ b/platform/model/bucket_aggregations/date_histogram.go @@ -101,7 +101,11 @@ func (query *DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultR if docCountAsInt < int64(query.minDocCount) { continue } - originalKey := query.getKey(row) + originalKey, err := query.getKey(row) + if err != nil { + logger.ErrorWithCtx(query.ctx).Msgf("error parsing key: %v", err) + continue + } responseKey := query.calculateResponseKey(originalKey) var keyAsString string if query.defaultFormat { @@ -221,8 +225,24 @@ func (query *DateHistogram) generateSQLForCalendarInterval() model.Expr { return model.InvalidExpr } -func (query *DateHistogram) getKey(row model.QueryResultRow) int64 { - return row.Cols[len(row.Cols)-2].Value.(int64) +func (query *DateHistogram) getKey(row model.QueryResultRow) (int64, error) { + value := row.Cols[len(row.Cols)-2].Value + switch v := value.(type) { + case int64: + return v, nil + case string: + // Trying to parse a string as an int64 (e.g. a Unix clock) + if parsed, err := strconv.ParseInt(v, 10, 64); err == nil { + return parsed, nil + } + // Or try parsing a date string like "2024-01-01" + if t, err := time.Parse(time.RFC3339, v); err == nil { + return t.Unix(), nil + } + return 0, fmt.Errorf("invalid date format: %v", v) + default: + return 0, fmt.Errorf("unsupported key type: %T", v) + } } func (query *DateHistogram) Interval() (interval time.Duration, ok bool) { @@ -416,7 +436,21 @@ func (qt *DateHistogramRowsTransformer) Transform(ctx context.Context, rowsFromD } func (qt *DateHistogramRowsTransformer) getKey(row model.QueryResultRow) int64 { - return row.Cols[len(row.Cols)-2].Value.(int64) + value := row.Cols[len(row.Cols)-2].Value + switch v := value.(type) { + case int64: + return v + case string: + val, err := strconv.ParseInt(v, 10, 64) + if err != nil { + logger.Error().Msgf("string conver to int64 failed %T", v, err) + return 0 + } + return val + default: + logger.Error().Msgf("unsupported key type: %T", v) + return 0 + } } func (qt *DateHistogramRowsTransformer) nextKey(key int64) int64 { diff --git a/platform/model/constants.go b/platform/model/constants.go index a409d9c24..7561826c5 100644 --- a/platform/model/constants.go +++ b/platform/model/constants.go @@ -7,6 +7,7 @@ const ( FullTextFieldNamePlaceHolder = "__quesma_fulltext_field_name" TimestampFieldName = "@timestamp" - DateHourFunction = "__quesma_date_hour" - MatchOperator = "__quesma_match" + DateHourFunction = "__quesma_date_hour" + MatchOperator = "__quesma_match" + MatchPhraseOperator = "__quesma_match_phrase" // for doris match expression. https://doris.apache.org/docs/sql-manual/basic-element/operators/conditional-operators/full-text-search-operators ) diff --git a/platform/model/expr.go b/platform/model/expr.go index 316ff0ae5..88ab7ca31 100644 --- a/platform/model/expr.go +++ b/platform/model/expr.go @@ -4,7 +4,7 @@ package model import ( "fmt" - "strconv" + "github.com/QuesmaOrg/quesma/platform/util" ) // Expr is a generic representation of an expression which is a part of the SQL query. @@ -202,6 +202,19 @@ func (t TableRef) Accept(v ExprVisitor) interface{} { return v.VisitTableRef(t) } +type GroupByExpr struct { + Expr Expr + GroupAlias string +} + +func (g GroupByExpr) Accept(v ExprVisitor) interface{} { + return v.VisitGroupByExpr(g) +} + +func NewGroupByExpr(expr Expr, groupAlias string) GroupByExpr { + return GroupByExpr{Expr: expr, GroupAlias: groupAlias} +} + type OrderByDirection int8 const ( @@ -252,7 +265,11 @@ func NewAliasedExpr(expr Expr, alias string) AliasedExpr { func (a AliasedExpr) Accept(v ExprVisitor) interface{} { return v.VisitAliasedExpr(a) } func (a AliasedExpr) AliasRef() LiteralExpr { - return LiteralExpr{Value: strconv.Quote(a.Alias)} + return LiteralExpr{Value: util.BackquoteIdentifier(a.Alias)} +} + +func (g GroupByExpr) GroupAliasRef() LiteralExpr { + return LiteralExpr{Value: util.BackquoteIdentifier(g.GroupAlias)} } // WindowFunction representation e.g. `SUM(x) OVER (PARTITION BY y ORDER BY z)` @@ -339,6 +356,7 @@ type ExprVisitor interface { VisitNestedProperty(e NestedProperty) interface{} VisitArrayAccess(e ArrayAccess) interface{} VisitOrderByExpr(e OrderByExpr) interface{} + VisitGroupByExpr(e GroupByExpr) interface{} VisitDistinctExpr(e DistinctExpr) interface{} VisitTableRef(e TableRef) interface{} VisitAliasedExpr(e AliasedExpr) interface{} diff --git a/platform/model/expr_string_renderer.go b/platform/model/expr_string_renderer.go index 57dbbcaba..6f8ca41d4 100644 --- a/platform/model/expr_string_renderer.go +++ b/platform/model/expr_string_renderer.go @@ -9,7 +9,6 @@ import ( "github.com/QuesmaOrg/quesma/platform/util" "regexp" "sort" - "strconv" "strings" ) @@ -32,9 +31,9 @@ func (v *renderer) VisitColumnRef(e ColumnRef) interface{} { name = strings.TrimSuffix(name, types.MultifieldMapKeysSuffix) name = strings.TrimSuffix(name, types.MultifieldMapValuesSuffix) if len(e.TableAlias) > 0 { - return fmt.Sprintf("%s.%s", strconv.Quote(e.TableAlias), strconv.Quote(name)) + return fmt.Sprintf("%s.%s", util.BackquoteIdentifier(e.TableAlias), util.BackquoteIdentifier(name)) } else { - return strconv.Quote(name) + return util.BackquoteIdentifier(name) } } @@ -79,7 +78,7 @@ func (v *renderer) VisitLiteral(l LiteralExpr) interface{} { if util.IsSingleQuoted(val) { withoutPercents = strings.Trim(withoutPercents, "'") } - return util.SingleQuote(util.SurroundWithPercents(withoutPercents)) + return util.SingleQuote(withoutPercents) case FullyEscaped: if util.IsSingleQuoted(val) { return val @@ -127,13 +126,17 @@ func (v *renderer) VisitInfix(e InfixExpr) interface{} { // I think in the future every infix op should be in braces. if (strings.HasPrefix(e.Op, "_") && e.Op != MatchOperator) || e.Op == "AND" || e.Op == "OR" { // LIKE is without (), so I propose MatchOperator as well return fmt.Sprintf("(%v %v %v)", lhs, e.Op, rhs) - } else if strings.Contains(e.Op, "LIKE") || e.Op == MatchOperator || e.Op == "IS" || e.Op == "IN" || e.Op == "NOT IN" || e.Op == "REGEXP" || strings.Contains(e.Op, "UNION") { + } else if strings.Contains(e.Op, "LIKE") || strings.Contains(e.Op, "MATCH") || e.Op == MatchOperator || e.Op == "IS" || e.Op == "IN" || e.Op == "NOT IN" || e.Op == "REGEXP" || strings.Contains(e.Op, "UNION") { return fmt.Sprintf("%v %v %v", lhs, e.Op, rhs) } else { return fmt.Sprintf("%v%v%v", lhs, e.Op, rhs) } } +func (v *renderer) VisitGroupByExpr(e GroupByExpr) interface{} { + return fmt.Sprintf("%s ", e.Expr.Accept(v).(string)) +} + func (v *renderer) VisitOrderByExpr(e OrderByExpr) interface{} { allExprs := e.Expr.Accept(v).(string) if e.Direction == DescOrder { @@ -153,24 +156,22 @@ func (v *renderer) VisitTableRef(e TableRef) interface{} { var result []string if e.DatabaseName != "" { - if identifierRegexp.MatchString(e.DatabaseName) { - result = append(result, e.DatabaseName) - } else { - result = append(result, strconv.Quote(e.DatabaseName)) - } - } - - if identifierRegexp.MatchString(e.Name) { - result = append(result, e.Name) - } else { - result = append(result, strconv.Quote(e.Name)) + result = append(result, util.BackquoteIdentifier(e.DatabaseName)) } + // append table name + result = append(result, util.BackquoteIdentifier(e.Name)) return strings.Join(result, ".") } func (v *renderer) VisitAliasedExpr(e AliasedExpr) interface{} { - return fmt.Sprintf("%s AS %s", e.Expr.Accept(v).(string), strconv.Quote(e.Alias)) + if e.Alias == "" { + return fmt.Sprintf("%s ", e.Expr.Accept(v).(string)) + } + if util.ContainsKeyword(e.Alias) { + return fmt.Sprintf("%s AS %s", e.Expr.Accept(v).(string), e.Alias) + } + return fmt.Sprintf("%s AS %s", e.Expr.Accept(v).(string), util.BackquoteIdentifier(e.Alias)) } func (v *renderer) VisitSelectCommand(c SelectCommand) interface{} { @@ -244,7 +245,7 @@ func (v *renderer) VisitSelectCommand(c SelectCommand) interface{} { sb.WriteString(AsString(c.FromClause)) } else { // Nested sub-query - sb.WriteString(fmt.Sprintf("(%s)", AsString(c.FromClause))) + sb.WriteString(fmt.Sprintf("(%s) tmp_tab", AsString(c.FromClause))) } } if c.WhereClause != nil { @@ -252,7 +253,7 @@ func (v *renderer) VisitSelectCommand(c SelectCommand) interface{} { sb.WriteString(AsString(c.WhereClause)) } if c.SampleLimit > 0 { - sb.WriteString(fmt.Sprintf(" LIMIT %d)", c.SampleLimit)) + sb.WriteString(fmt.Sprintf(" LIMIT %d) tmp_tab ", c.SampleLimit)) } groupBy := make([]string, 0, len(c.GroupBy)) @@ -260,9 +261,29 @@ func (v *renderer) VisitSelectCommand(c SelectCommand) interface{} { groupBy = append(groupBy, AsString(col)) } if len(groupBy) > 0 { - sb.WriteString(" GROUP BY ") - fullGroupBy := groupBy - sb.WriteString(strings.Join(fullGroupBy, ", ")) + var usedKeys []string + if c.SampleLimit > 0 { + // for doris group by query + usedColumns := make(map[string]bool) + for _, col := range append(c.Columns, c.GroupBy...) { + for _, usedCol := range GetUsedColumns(col) { + usedColumns[AsString(usedCol)] = true + } + } + usedKeys = make([]string, 0, len(usedColumns)) + for key := range usedColumns { + usedKeys = append(usedKeys, key) + } + } + if usedKeys != nil { + sort.Strings(usedKeys) + sb.WriteString(" GROUP BY ") + sb.WriteString(strings.Join(usedKeys, ", ")) + } else { + sb.WriteString(" GROUP BY ") + fullGroupBy := groupBy + sb.WriteString(strings.Join(fullGroupBy, ", ")) + } } orderBy := make([]string, 0, len(c.OrderBy)) @@ -296,7 +317,12 @@ func (v *renderer) VisitWindowFunction(f WindowFunction) interface{} { } var sb strings.Builder - stmtWithoutOrderBy := fmt.Sprintf("%s(%s) OVER (", f.Name, strings.Join(args, ", ")) + var stmtWithoutOrderBy string + if f.Name == "" { + stmtWithoutOrderBy = fmt.Sprintf("%s OVER (", strings.Join(args, ", ")) + } else { + stmtWithoutOrderBy = fmt.Sprintf("%s(%s) OVER (", f.Name, strings.Join(args, ", ")) + } sb.WriteString(stmtWithoutOrderBy) if len(f.PartitionBy) > 0 { diff --git a/platform/model/highlighter.go b/platform/model/highlighter.go index 06c22393d..3e932c3f3 100644 --- a/platform/model/highlighter.go +++ b/platform/model/highlighter.go @@ -56,7 +56,7 @@ func (h *Highlighter) SetTokensToHighlight(selectCmd SelectCommand) { visitor := NewBaseVisitor() visitor.OverrideVisitInfix = func(b *BaseExprVisitor, e InfixExpr) interface{} { switch e.Op { - case "iLIKE", "ILIKE", "LIKE", "IN", "=", MatchOperator: + case "iLIKE", "ILIKE", "LIKE", "IN", "=", MatchOperator, MatchPhraseOperator: lhs, isColumnRef := e.Left.(ColumnRef) rhs, isLiteral := e.Right.(LiteralExpr) if isLiteral && isColumnRef { // we only highlight in this case diff --git a/platform/model/metrics_aggregations/quantile.go b/platform/model/metrics_aggregations/quantile.go index 8f1b9bb22..ae1941cd4 100644 --- a/platform/model/metrics_aggregations/quantile.go +++ b/platform/model/metrics_aggregations/quantile.go @@ -117,6 +117,11 @@ func (query Quantile) processResult(colName string, percentileReturnedByClickhou percentileIsNanOrInvalid = math.IsNaN(percentileTyped[0]) percentile = percentileTyped[0] } + case float64: + // Handle direct float64 value (not in array) + percentileAsArrayLen = 1 + percentileIsNanOrInvalid = math.IsNaN(percentileTyped) + percentile = percentileTyped case []time.Time: percentileAsArrayLen = len(percentileTyped) if len(percentileTyped) > 0 { diff --git a/platform/model/noop_visitor.go b/platform/model/noop_visitor.go index 09b885801..4d4d519fe 100644 --- a/platform/model/noop_visitor.go +++ b/platform/model/noop_visitor.go @@ -14,6 +14,7 @@ func (NoOpVisitor) VisitPrefixExpr(e PrefixExpr) interface{} { return e func (NoOpVisitor) VisitNestedProperty(e NestedProperty) interface{} { return e } func (NoOpVisitor) VisitArrayAccess(e ArrayAccess) interface{} { return e } func (NoOpVisitor) VisitOrderByExpr(e OrderByExpr) interface{} { return e } +func (NoOpVisitor) VisitGroupByExpr(e GroupByExpr) interface{} { return e } func (NoOpVisitor) VisitDistinctExpr(e DistinctExpr) interface{} { return e } func (NoOpVisitor) VisitTableRef(e TableRef) interface{} { return e } func (NoOpVisitor) VisitAliasedExpr(e AliasedExpr) interface{} { return e } diff --git a/platform/model/typical_queries/hits.go b/platform/model/typical_queries/hits.go index 19f4a3554..d54d882db 100644 --- a/platform/model/typical_queries/hits.go +++ b/platform/model/typical_queries/hits.go @@ -190,6 +190,8 @@ func (query Hits) addAndHighlightHit(hit *model.SearchHit, resultRow *model.Quer switch valueAsString := vals[i].(type) { case string: hit.Highlight[columnName] = query.highlighter.HighlightValue(fieldName, valueAsString) + case int64: + hit.Highlight[columnName] = query.highlighter.HighlightValue(fieldName, strconv.FormatInt(valueAsString, 10)) case *string: if valueAsString != nil { hit.Highlight[columnName] = query.highlighter.HighlightValue(fieldName, *valueAsString) diff --git a/platform/model/where_visitor.go b/platform/model/where_visitor.go index 4bb9628d1..4a1bfb4e2 100644 --- a/platform/model/where_visitor.go +++ b/platform/model/where_visitor.go @@ -26,7 +26,7 @@ func FindTimestampLowerBound(field ColumnRef, whereClause Expr) (timestampInMill found = true } } - } else if fun, ok := e.Right.(FunctionExpr); ok && fun.Name == "fromUnixTimestamp" && len(fun.Args) == 1 { + } else if fun, ok := e.Right.(FunctionExpr); ok && fun.Name == "FROM_UNIXTIME" && len(fun.Args) == 1 { if rhs, ok := fun.Args[0].(LiteralExpr); ok { if rhsInt64, ok := util.ExtractInt64Maybe(rhs.Value); ok { timestampInMillis = min(timestampInMillis, rhsInt64*1000) // seconds -> milliseconds diff --git a/platform/optimize/split_time_range.go b/platform/optimize/split_time_range.go index ffa67035f..f6f92283b 100644 --- a/platform/optimize/split_time_range.go +++ b/platform/optimize/split_time_range.go @@ -82,14 +82,12 @@ func (s splitTimeRange) checkAndFindTimeLimits(selectCommand *model.SelectComman if columnName, ok := e.Left.(model.ColumnRef); ok && columnName.ColumnName == orderByColumnName { switch e.Op { case "<", "<=": - if functionExpr, ok := e.Right.(model.FunctionExpr); ok && - (functionExpr.Name == "fromUnixTimestamp64Milli" || functionExpr.Name == "fromUnixTimestamp") { + if functionExpr, ok := e.Right.(model.FunctionExpr); ok && (functionExpr.Name == "FROM_UNIXTIME") { upperBoundValue := functionExpr.Args[0].(model.LiteralExpr).Value.(int64) upperLimit = &timeRangeLimit{value: upperBoundValue, funcName: functionExpr.Name} } case ">", ">=": - if functionExpr, ok := e.Right.(model.FunctionExpr); ok && - (functionExpr.Name == "fromUnixTimestamp64Milli" || functionExpr.Name == "fromUnixTimestamp") { + if functionExpr, ok := e.Right.(model.FunctionExpr); ok && (functionExpr.Name == "FROM_UNIXTIME") { lowerBoundValue := functionExpr.Args[0].(model.LiteralExpr).Value.(int64) lowerLimit = &timeRangeLimit{value: lowerBoundValue, funcName: functionExpr.Name} } diff --git a/platform/parsers/elastic_query_dsl/aggregation_parser.go b/platform/parsers/elastic_query_dsl/aggregation_parser.go index d7602b4e8..6bdee40cf 100644 --- a/platform/parsers/elastic_query_dsl/aggregation_parser.go +++ b/platform/parsers/elastic_query_dsl/aggregation_parser.go @@ -285,7 +285,13 @@ func (cw *ClickhouseQueryTranslator) parseIntField(queryMap QueryMap, fieldName if asFloat, ok := valueRaw.(float64); ok { return int(asFloat) } - logger.WarnWithCtx(cw.Ctx).Msgf("%s is not an float64, but %T, value: %v. Using default: %d", fieldName, valueRaw, valueRaw, defaultValue) + if asString, ok := valueRaw.(string); ok { + if intValue, err := strconv.Atoi(asString); err == nil { + return intValue + } + logger.WarnWithCtx(cw.Ctx).Msgf("%s is a string but cannot be converted to int, value: %v. Using default: %d", fieldName, asString, defaultValue) + } + logger.WarnWithCtx(cw.Ctx).Msgf("%s is not an float64 or string, but %T, value: %v. Using default: %d", fieldName, valueRaw, valueRaw, defaultValue) } return defaultValue } @@ -399,7 +405,7 @@ func (cw *ClickhouseQueryTranslator) parseFieldFromScriptField(queryMap QueryMap wantedRegex := regexp.MustCompile(`^doc\['(\w+)']\.value\.(?:getHour\(\)|hourOfDay)$`) matches := wantedRegex.FindStringSubmatch(source) if len(matches) == 2 { - return model.NewFunction("toHour", model.NewColumnRef(matches[1])), true + return model.NewFunction("HOUR", model.NewColumnRef(matches[1])), true } // b) source: "if (doc['field_name_1'].value == doc['field_name_2'].value") { return 1; } else { return 0; }" diff --git a/platform/parsers/elastic_query_dsl/dates.go b/platform/parsers/elastic_query_dsl/dates.go index 819c624fc..4fb2af962 100644 --- a/platform/parsers/elastic_query_dsl/dates.go +++ b/platform/parsers/elastic_query_dsl/dates.go @@ -24,6 +24,8 @@ func NewDateManager(ctx context.Context) DateManager { var acceptableDateTimeFormats = []string{"2006", "2006-01", "2006-01-02", "2006-01-02", "2006-01-02T15", "2006-01-02T15:04", "2006-01-02T15:04:05", "2006-01-02T15:04:05Z07", "2006-01-02T15:04:05Z07:00"} +const FROM_UNIXTIME = "FROM_UNIXTIME" + // parseStrictDateOptionalTimeOrEpochMillis parses date, which is in [strict_date_optional_time || epoch_millis] format // (https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-date-format.html) func (dm DateManager) parseStrictDateOptionalTimeOrEpochMillis(date any) (utcTimestamp time.Time, parsingSucceeded bool) { @@ -73,7 +75,7 @@ func (dm DateManager) ParseDateUsualFormat(exprFromRequest any, datetimeType cli case clickhouse.DateTime64: threeDigitsOfPrecisionSuffice := utcTs.UnixNano()%1_000_000 == 0 if threeDigitsOfPrecisionSuffice { - return model.NewFunction("fromUnixTimestamp64Milli", model.NewLiteral(utcTs.UnixMilli())), true + return model.NewFunction(FROM_UNIXTIME, model.NewLiteral(utcTs.UnixMilli()/1000)), true } else { return model.NewFunction( "toDateTime64", @@ -86,7 +88,7 @@ func (dm DateManager) ParseDateUsualFormat(exprFromRequest any, datetimeType cli ), true } case clickhouse.DateTime: - return model.NewFunction("fromUnixTimestamp", model.NewLiteral(utcTs.Unix())), true + return model.NewFunction(FROM_UNIXTIME, model.NewLiteral(utcTs.Unix())), true default: logger.WarnWithCtx(dm.ctx).Msgf("Unknown datetimeType: %v", datetimeType) } diff --git a/platform/parsers/elastic_query_dsl/pancake_aggregation_parser_buckets.go b/platform/parsers/elastic_query_dsl/pancake_aggregation_parser_buckets.go index 371635672..96e3d0f94 100644 --- a/platform/parsers/elastic_query_dsl/pancake_aggregation_parser_buckets.go +++ b/platform/parsers/elastic_query_dsl/pancake_aggregation_parser_buckets.go @@ -368,13 +368,13 @@ func (cw *ClickhouseQueryTranslator) parseGeotileGrid(aggregation *pancakeAggreg radians := model.NewFunction("RADIANS", lat) tan := model.NewFunction("TAN", radians) cos := model.NewFunction("COS", radians) - Log := model.NewFunction("LOG", model.NewInfixExpr(tan, "+", + Ln := model.NewFunction("LN", model.NewInfixExpr(tan, "+", model.NewParenExpr(model.NewInfixExpr(model.NewLiteral(1), "/", cos)))) FloorContent := model.NewInfixExpr( model.NewInfixExpr( model.NewParenExpr( - model.NewInfixExpr(model.NewInfixExpr(model.NewLiteral(1), "-", Log), "/", + model.NewInfixExpr(model.NewInfixExpr(model.NewLiteral(1), "-", Ln), "/", model.NewLiteral("PI()"))), "/", model.NewLiteral(2.0)), "*", model.NewFunction("POWER", model.NewLiteral(2), zoomLiteral)) diff --git a/platform/parsers/elastic_query_dsl/pancake_aggregation_parser_metrics.go b/platform/parsers/elastic_query_dsl/pancake_aggregation_parser_metrics.go index 22f6e8cc6..73149a0e9 100644 --- a/platform/parsers/elastic_query_dsl/pancake_aggregation_parser_metrics.go +++ b/platform/parsers/elastic_query_dsl/pancake_aggregation_parser_metrics.go @@ -24,24 +24,32 @@ func generateMetricSelectedColumns(ctx context.Context, metricsAggr metricsAggre switch metricsAggr.AggrType { case "sum", "min", "max", "avg": - result = []model.Expr{model.NewFunction(metricsAggr.AggrType+"OrNull", getFirstExpression())} + result = []model.Expr{model.NewFunction(metricsAggr.AggrType, getFirstExpression())} case "quantile": + firstField := metricsAggr.Fields[0] + var columnName string + if colRef, ok := firstField.(model.ColumnRef); ok { + columnName = colRef.ColumnName + } + // Sorting here useful mostly for determinism in tests. // It wasn't there before, and everything worked fine. We could safely remove it, if needed. usersPercents := util.MapKeysSortedByValue(metricsAggr.Percentiles) result = make([]model.Expr, 0, len(usersPercents)) for _, usersPercent := range usersPercents { percentAsFloat := metricsAggr.Percentiles[usersPercent] - result = append(result, model.FunctionExpr{ - // Rare function that has two brackets: quantiles(0.5)(x) - // https://clickhouse.com/docs/en/sql-reference/aggregate-functions/reference/quantiles - Name: fmt.Sprintf("quantiles(%f)", percentAsFloat), - Args: []model.Expr{getFirstExpression()}}, - ) + // https://doris.apache.org/docs/sql-manual/sql-functions/aggregate-functions/percentile-approx + result = append(result, model.NewFunction("PERCENTILE_APPROX", model.NewColumnRef(columnName), model.NewLiteral(percentAsFloat))) + //result = append(result, model.FunctionExpr{ + // // Rare function that has two brackets: PERCENTILE_APPROX(x, 0.5) + // //https://doris.apache.org/docs/sql-manual/sql-functions/aggregate-functions/percentile-approx + // Name: fmt.Sprintf("PERCENTILE_APPROX(%f)", percentAsFloat), + // Args: []model.Expr{getFirstExpression()}}, + //) } case "cardinality": // In ElasticSearch it is approximate algorithm - result = []model.Expr{model.NewFunction("uniq", getFirstExpression())} + result = []model.Expr{model.NewFunction("NDV", getFirstExpression())} case "value_count": result = []model.Expr{model.NewCountFunc(getFirstExpression())} @@ -51,10 +59,10 @@ func generateMetricSelectedColumns(ctx context.Context, metricsAggr metricsAggre result = make([]model.Expr, 0, 4) result = append(result, model.NewCountFunc(expr), - model.NewFunction("minOrNull", expr), - model.NewFunction("maxOrNull", expr), - model.NewFunction("avgOrNull", expr), - model.NewFunction("sumOrNull", expr)) + model.NewFunction("min", expr), + model.NewFunction("max", expr), + model.NewFunction("avg", expr), + model.NewFunction("sum", expr)) case "top_hits": innerFieldsAsSelect := make([]model.Expr, len(metricsAggr.Fields)) @@ -71,7 +79,7 @@ func generateMetricSelectedColumns(ctx context.Context, metricsAggr metricsAggre if len(metricsAggr.Fields) > 0 { switch metrics_aggregations.NewRateMode(ctx, metricsAggr.mode) { case metrics_aggregations.RateModeSum: - result = []model.Expr{model.NewFunction("sumOrNull", getFirstExpression())} + result = []model.Expr{model.NewFunction("sum", getFirstExpression())} case metrics_aggregations.RateModeValueCount: result = []model.Expr{model.NewCountFunc(getFirstExpression())} default: @@ -103,12 +111,12 @@ func generateMetricSelectedColumns(ctx context.Context, metricsAggr metricsAggre } addColumn("count") - addColumn("minOrNull") - addColumn("maxOrNull") - addColumn("avgOrNull") - addColumn("sumOrNull") + addColumn("min") + addColumn("max") + addColumn("avg") + addColumn("sum") - result = append(result, model.NewFunction("sumOrNull", model.NewInfixExpr(expr, "*", expr))) + result = append(result, model.NewFunction("sum", model.NewInfixExpr(expr, "*", expr))) addColumn("varPop") addColumn("varSamp") @@ -124,8 +132,8 @@ func generateMetricSelectedColumns(ctx context.Context, metricsAggr metricsAggre // TODO we have create columns according to the schema latColumn := model.NewGeoLat(colName) lonColumn := model.NewGeoLon(colName) - result = append(result, model.NewFunction("avgOrNull", latColumn)) - result = append(result, model.NewFunction("avgOrNull", lonColumn)) + result = append(result, model.NewFunction("avg", latColumn)) + result = append(result, model.NewFunction("avg", lonColumn)) result = append(result, model.NewCountFunc()) } case "geo_bounds": @@ -138,9 +146,9 @@ func generateMetricSelectedColumns(ctx context.Context, metricsAggr metricsAggre // TODO we have create columns according to the schema latColumn := model.NewGeoLat(colName) lonColumn := model.NewGeoLon(colName) - result = append(result, model.NewFunction("minOrNull", lonColumn)) + result = append(result, model.NewFunction("min", lonColumn)) result = append(result, model.NewFunction("argMinOrNull", latColumn, lonColumn)) - result = append(result, model.NewFunction("minOrNull", latColumn)) + result = append(result, model.NewFunction("min", latColumn)) result = append(result, model.NewFunction("argMinOrNull", lonColumn, latColumn)) } default: diff --git a/platform/parsers/elastic_query_dsl/pancake_sql_query_generation.go b/platform/parsers/elastic_query_dsl/pancake_sql_query_generation.go index cde74b046..9b2d51b41 100644 --- a/platform/parsers/elastic_query_dsl/pancake_sql_query_generation.go +++ b/platform/parsers/elastic_query_dsl/pancake_sql_query_generation.go @@ -29,6 +29,14 @@ func (p *pancakeSqlQueryGenerator) aliasedExprArrayToExpr(aliasedExprs []model.A return exprs } +func (p *pancakeSqlQueryGenerator) groupByExprArrayToExpr(groupExprs []model.GroupByExpr) []model.Expr { + exprs := make([]model.Expr, 0, len(groupExprs)) + for _, groupExpr := range groupExprs { + exprs = append(exprs, groupExpr) + } + return exprs +} + func (p *pancakeSqlQueryGenerator) aliasedExprArrayToLiteralExpr(aliasedExprs []model.AliasedExpr) []model.Expr { exprs := make([]model.Expr, 0, len(aliasedExprs)) for _, aliasedExpr := range aliasedExprs { @@ -37,10 +45,16 @@ func (p *pancakeSqlQueryGenerator) aliasedExprArrayToLiteralExpr(aliasedExprs [] return exprs } -func (p *pancakeSqlQueryGenerator) generatePartitionBy(groupByColumns []model.AliasedExpr) []model.Expr { +func (p *pancakeSqlQueryGenerator) generatePartitionBy(groupByColumns []model.GroupByExpr, useGroupByColumn bool) []model.Expr { partitionBy := make([]model.Expr, 0) for _, col := range groupByColumns { - partitionBy = append(partitionBy, col.AliasRef()) + if useGroupByColumn { + if colRef, ok := col.Expr.(model.ColumnRef); ok { + partitionBy = append(partitionBy, model.NewLiteral(colRef.ColumnName)) + continue + } + } + partitionBy = append(partitionBy, col.GroupAliasRef()) } return partitionBy } @@ -51,7 +65,7 @@ func (p *pancakeSqlQueryGenerator) generateAccumAggrFunctions(origExpr model.Exp case model.FunctionExpr: origFunc := origExprTyped switch origFunc.Name { - case "sum", "sumOrNull", "min", "minOrNull", "max", "maxOrNull": + case "sum", "sumOrNull", "min", "max": return origExpr, origFunc.Name, nil case "count", "countIf": return model.NewFunction(origFunc.Name, origFunc.Args...), "sum", nil @@ -59,11 +73,33 @@ func (p *pancakeSqlQueryGenerator) generateAccumAggrFunctions(origExpr model.Exp // TODO: I debate whether make that default // This is ClickHouse specific: https://clickhouse.com/docs/en/sql-reference/aggregate-functions/combinators return model.NewFunction(origFunc.Name+"State", origFunc.Args...), origFunc.Name + "Merge", nil + case "NDV": + firstArg := origFunc.Args[0] + var columnName string + if colRef, ok := firstArg.(model.ColumnRef); ok { + columnName = colRef.ColumnName + } + return model.NewFunction("APPROX_COUNT_DISTINCT", model.NewColumnRef(columnName)), "", nil } - if strings.HasPrefix(origFunc.Name, "quantiles") { - return model.NewFunction(strings.Replace(origFunc.Name, "quantiles", "quantilesState", 1), origFunc.Args...), - strings.Replace(origFunc.Name, "quantiles", "quantilesMerge", 1), nil + if strings.HasPrefix(origFunc.Name, "PERCENTILE_APPROX") { + if len(origFunc.Args) < 2 { + return nil, "", fmt.Errorf("invalid quantiles format, expected 2 arguments") + } + + firstArg := origFunc.Args[0] + var columnName string + if colRef, ok := firstArg.(model.ColumnRef); ok { + columnName = colRef.ColumnName + } + + secondArg := origFunc.Args[1] + var percentileValue float64 + if literal, ok := secondArg.(model.LiteralExpr); ok { + percentileValue = literal.Value.(float64) + } + + return model.NewFunction("PERCENTILE_APPROX", model.NewColumnRef(columnName), model.NewLiteral(percentileValue)), "", nil } case model.InfixExpr: origInfix := origExprTyped @@ -79,7 +115,7 @@ func (p *pancakeSqlQueryGenerator) generateAccumAggrFunctions(origExpr model.Exp fmt.Errorf("not implemented, queryType: %s, origExpr: %s", debugQueryType, model.AsString(origExpr)) } -func (p *pancakeSqlQueryGenerator) generateMetricSelects(metric *pancakeModelMetricAggregation, groupByColumns []model.AliasedExpr, hasMoreBucketAggregations bool) (addSelectColumns []model.AliasedExpr, err error) { +func (p *pancakeSqlQueryGenerator) generateMetricSelects(metric *pancakeModelMetricAggregation, groupByColumns []model.GroupByExpr, hasMoreBucketAggregations bool) (addSelectColumns []model.AliasedExpr, err error) { for columnId, column := range metric.selectedColumns { finalColumn := column @@ -89,7 +125,7 @@ func (p *pancakeSqlQueryGenerator) generateMetricSelects(metric *pancakeModelMet return nil, err } finalColumn = model.NewWindowFunction(aggFunctionName, []model.Expr{partColumn}, - p.generatePartitionBy(groupByColumns), []model.OrderByExpr{}) + p.generatePartitionBy(groupByColumns, false), []model.OrderByExpr{}) } aliasedColumn := model.NewAliasedExpr(finalColumn, metric.InternalNameForCol(columnId)) addSelectColumns = append(addSelectColumns, aliasedColumn) @@ -97,19 +133,23 @@ func (p *pancakeSqlQueryGenerator) generateMetricSelects(metric *pancakeModelMet return } -func (p *pancakeSqlQueryGenerator) isPartOf(column model.Expr, aliasedColumns []model.AliasedExpr) *model.AliasedExpr { +func (p *pancakeSqlQueryGenerator) isPartOf(column model.Expr, aliasedColumns []model.GroupByExpr) *model.AliasedExpr { for _, aliasedColumn := range aliasedColumns { - if model.PartlyImplementedIsEqual(column, aliasedColumn) { - return &aliasedColumn + if model.PartlyImplementedIsEqual(column, aliasedColumn.Expr) { + result := model.AliasedExpr{ + Expr: aliasedColumn.Expr, + Alias: aliasedColumn.GroupAlias, + } + return &result } } return nil } -func (p *pancakeSqlQueryGenerator) isPartOfOrderBy(alias model.AliasedExpr, orderByColumns []model.OrderByExpr) bool { +func (p *pancakeSqlQueryGenerator) isPartOfOrderBy(alias model.GroupByExpr, orderByColumns []model.OrderByExpr) bool { for _, orderBy := range orderByColumns { if orderByLiteral, ok := orderBy.Expr.(model.LiteralExpr); ok { - if alias.AliasRef().Value == orderByLiteral.Value { + if alias.GroupAlias == orderByLiteral.Value { return true } } @@ -117,27 +157,27 @@ func (p *pancakeSqlQueryGenerator) isPartOfOrderBy(alias model.AliasedExpr, orde return false } -func (p *pancakeSqlQueryGenerator) addPotentialParentCount(bucketAggregation *pancakeModelBucketAggregation, groupByColumns []model.AliasedExpr) []model.AliasedExpr { +func (p *pancakeSqlQueryGenerator) addPotentialParentCount(bucketAggregation *pancakeModelBucketAggregation, groupByColumns []model.GroupByExpr) []model.AliasedExpr { if query_util.IsAnyKindOfTerms(bucketAggregation.queryType) { parentCountColumn := model.NewWindowFunction("sum", []model.Expr{model.NewCountFunc()}, - p.generatePartitionBy(groupByColumns), []model.OrderByExpr{}) + p.generatePartitionBy(groupByColumns, true), []model.OrderByExpr{}) parentCountAliasedColumn := model.NewAliasedExpr(parentCountColumn, bucketAggregation.InternalNameForParentCount()) return []model.AliasedExpr{parentCountAliasedColumn} } return []model.AliasedExpr{} } -func (p *pancakeSqlQueryGenerator) generateBucketSqlParts(query *pancakeModel, bucketAggregation *pancakeModelBucketAggregation, groupByColumns []model.AliasedExpr, hasMoreBucketAggregations bool) ( - addSelectColumns, addGroupBys, addRankColumns []model.AliasedExpr, addRankWheres []model.Expr, addRankOrderBys []model.OrderByExpr, err error) { +func (p *pancakeSqlQueryGenerator) generateBucketSqlParts(query *pancakeModel, bucketAggregation *pancakeModelBucketAggregation, groupByColumns []model.GroupByExpr, hasMoreBucketAggregations bool) ( + addSelectColumns []model.AliasedExpr, addGroupBys []model.GroupByExpr, addRankColumns []model.AliasedExpr, addRankWheres []model.Expr, addRankOrderBys []model.OrderByExpr, err error) { // For some group by such as terms, we need total count. We add it in this method. addSelectColumns = append(addSelectColumns, p.addPotentialParentCount(bucketAggregation, groupByColumns)...) for columnId, column := range bucketAggregation.selectedColumns { - aliasedColumn := model.NewAliasedExpr(column, bucketAggregation.InternalNameForKey(columnId)) - addSelectColumns = append(addSelectColumns, aliasedColumn) - addGroupBys = append(addGroupBys, aliasedColumn) + columnAliasString := bucketAggregation.InternalNameForKey(columnId) + addSelectColumns = append(addSelectColumns, model.NewAliasedExpr(column, columnAliasString)) + addGroupBys = append(addGroupBys, model.NewGroupByExpr(column, columnAliasString)) } // build count for aggr @@ -146,7 +186,7 @@ func (p *pancakeSqlQueryGenerator) generateBucketSqlParts(query *pancakeModel, b partCountColumn := model.NewCountFunc() countColumn = model.NewWindowFunction("sum", []model.Expr{partCountColumn}, - p.generatePartitionBy(append(groupByColumns, addGroupBys...)), []model.OrderByExpr{}) + p.generatePartitionBy(append(groupByColumns, addGroupBys...), true), []model.OrderByExpr{}) } else { countColumn = model.NewCountFunc() } @@ -162,7 +202,7 @@ func (p *pancakeSqlQueryGenerator) generateBucketSqlParts(query *pancakeModel, b rankColumn := p.isPartOf(orderBy.Expr, append(append(groupByColumns, addGroupBys...), // We need count before window functions - model.NewAliasedExpr(model.NewCountFunc(), bucketAggregation.InternalNameForCount()))) + model.NewGroupByExpr(model.NewCountFunc(), bucketAggregation.InternalNameForCount()))) if rankColumn != nil { // rank is part of group by if direction == model.DefaultOrder { direction = model.AscOrder // primarily needed for tests @@ -181,7 +221,7 @@ func (p *pancakeSqlQueryGenerator) generateBucketSqlParts(query *pancakeModel, b return nil, nil, nil, nil, nil, err } orderByExpr = model.NewWindowFunction(aggFunctionName, []model.Expr{partColumn}, - p.generatePartitionBy(append(groupByColumns, addGroupBys...)), []model.OrderByExpr{}) + p.generatePartitionBy(append(groupByColumns, addGroupBys...), false), []model.OrderByExpr{}) } aliasedExpr := model.NewAliasedExpr(orderByExpr, bucketAggregation.InternalNameForOrderBy(columnId)) addSelectColumns = append(addSelectColumns, aliasedExpr) @@ -194,12 +234,12 @@ func (p *pancakeSqlQueryGenerator) generateBucketSqlParts(query *pancakeModel, b // We order by count, but add key to get right dense_rank() for _, addedGroupByAlias := range addGroupBys { if !p.isPartOfOrderBy(addedGroupByAlias, rankOrderBy) { - rankOrderBy = append(rankOrderBy, model.NewOrderByExpr(addedGroupByAlias.AliasRef(), model.AscOrder)) + rankOrderBy = append(rankOrderBy, model.NewOrderByExpr(addedGroupByAlias.GroupAliasRef(), model.AscOrder)) } } rankColumn := model.NewWindowFunction("dense_rank", []model.Expr{}, - p.generatePartitionBy(groupByColumns), rankOrderBy) + p.generatePartitionBy(groupByColumns, false), rankOrderBy) aliasedRank := model.NewAliasedExpr(rankColumn, bucketAggregation.InternalNameForOrderBy(1)+"_rank") addRankColumns = append(addRankColumns, aliasedRank) @@ -223,21 +263,28 @@ func (p *pancakeSqlQueryGenerator) addIfCombinator(column model.Expr, whereClaus case model.FunctionExpr: splits := strings.SplitN(function.Name, "(", 2) baseFunctionName := splits[0] - functionSuffix := "" - if len(splits) > 1 { - functionSuffix = "(" + splits[1] - } + //functionSuffix := "" + //if len(splits) > 1 { + // functionSuffix = "(" + splits[1] + //} if function.Name == "count" { - return model.NewFunction("countIf", whereClause), nil + //return model.NewFunction("countIf", whereClause), nil + //ifLiteral := model.NewLiteral(" IF (" + whereClause.Accept() + ", 1, 0)") + + ifFunction := model.NewFunction("IF", whereClause, model.NewLiteral(1), model.NewLiteral(0)) + return model.NewFunction("SUM", ifFunction), nil } else if strings.HasSuffix(baseFunctionName, "If") && len(function.Args) > 0 { newArgs := make([]model.Expr, len(function.Args)) copy(newArgs, function.Args) newArgs[len(newArgs)-1] = model.And([]model.Expr{newArgs[len(newArgs)-1], whereClause}) return model.NewFunction(function.Name, newArgs...), nil + } else if function.Name == "sum" || function.Name == "avg" { + ifFunction := model.NewFunction("IF", whereClause, model.NewLiteral(1), model.NewLiteral(0)) + return model.NewFunction(baseFunctionName, ifFunction), nil } else if len(function.Args) == 1 { - // https://clickhouse.com/docs/en/sql-reference/aggregate-functions/combinators#-if - return model.NewFunction(baseFunctionName+"If"+functionSuffix, function.Args[0], whereClause), nil + // min and max function + return model.NewFunction(baseFunctionName, function.Args[0]), nil } else { return nil, fmt.Errorf("not implemented -iF for func with more than one argument: %s", model.AsString(column)) } @@ -289,7 +336,7 @@ func (p *pancakeSqlQueryGenerator) generateSelectCommand(aggregation *pancakeMod rankColumns := make([]model.AliasedExpr, 0) rankWheres := make([]model.Expr, 0) rankOrderBys := make([]model.OrderByExpr, 0) - groupBys := make([]model.AliasedExpr, 0) + groupBys := make([]model.GroupByExpr, 0) type addIfCombinator struct { selectNr int @@ -397,7 +444,7 @@ func (p *pancakeSqlQueryGenerator) generateSelectCommand(aggregation *pancakeMod resultQuery = &model.SelectCommand{ Columns: p.aliasedExprArrayToExpr(selectColumns), - GroupBy: p.aliasedExprArrayToExpr(groupBys), + GroupBy: p.groupByExprArrayToExpr(groupBys), WhereClause: aggregation.whereClause, FromClause: model.NewTableRef(model.SingleTableNamePlaceHolder), OrderBy: orderBy, @@ -408,7 +455,7 @@ func (p *pancakeSqlQueryGenerator) generateSelectCommand(aggregation *pancakeMod } else { windowCte := model.SelectCommand{ Columns: p.aliasedExprArrayToExpr(selectColumns), - GroupBy: p.aliasedExprArrayToExpr(groupBys), + GroupBy: p.groupByExprArrayToExpr(groupBys), WhereClause: aggregation.whereClause, FromClause: model.NewTableRef(model.SingleTableNamePlaceHolder), SampleLimit: aggregation.sampleLimit, diff --git a/platform/parsers/elastic_query_dsl/pancake_top_hits.go b/platform/parsers/elastic_query_dsl/pancake_top_hits.go index 4414daa41..29c4004a7 100644 --- a/platform/parsers/elastic_query_dsl/pancake_top_hits.go +++ b/platform/parsers/elastic_query_dsl/pancake_top_hits.go @@ -6,11 +6,12 @@ import ( "fmt" "github.com/QuesmaOrg/quesma/platform/model" "github.com/QuesmaOrg/quesma/platform/model/metrics_aggregations" + "github.com/QuesmaOrg/quesma/platform/util" "strconv" ) func (p *pancakeSqlQueryGenerator) quotedLiteral(name string) model.LiteralExpr { - return model.NewLiteral(strconv.Quote(name)) + return model.NewLiteral(util.BackquoteIdentifier(name)) } // generateSimpleTopHitsQuery generates an SQL for top_hits/top_metrics @@ -42,7 +43,7 @@ func (p *pancakeSqlQueryGenerator) generateSimpleTopHitsQuery(topHits *pancakeMo func (p *pancakeSqlQueryGenerator) generateTopHitsQuery(aggregation *pancakeModel, combinatorWhere []model.Expr, topHits *pancakeModelMetricAggregation, - groupBys []model.AliasedExpr, + groupBys []model.GroupByExpr, selectColumns []model.AliasedExpr, origQuery *model.SelectCommand) (*model.SelectCommand, error) { @@ -83,7 +84,8 @@ func (p *pancakeSqlQueryGenerator) generateTopHitsQuery(aggregation *pancakeMode hitTableName := "hit_table" groupTableLiteral := func(reference string) model.Expr { - return model.NewLiteral(strconv.Quote(groupTableName) + "." + strconv.Quote(reference)) + //return model.NewLiteral(strconv.Quote(groupTableName) + "." + strconv.Quote(reference)) + return model.NewLiteral(util.BackquoteIdentifier(groupTableName) + "." + util.BackquoteIdentifier(reference)) } convertColumnRefToHitTable := func(expr model.Expr) model.Expr { @@ -101,9 +103,9 @@ func (p *pancakeSqlQueryGenerator) generateTopHitsQuery(aggregation *pancakeMode var joinExprs []model.Expr var partitionByExprs []model.Expr for _, groupBy := range groupBys { - partitionByExprs = append(partitionByExprs, groupTableLiteral(groupBy.Alias)) + partitionByExprs = append(partitionByExprs, groupTableLiteral(groupBy.GroupAlias)) joinExprs = append(joinExprs, model.NewInfixExpr( - groupTableLiteral(groupBy.Alias), + groupTableLiteral(groupBy.GroupAlias), "=", convertColumnRefToHitTable(groupBy.Expr))) } diff --git a/platform/parsers/elastic_query_dsl/pancake_transformer.go b/platform/parsers/elastic_query_dsl/pancake_transformer.go index 16e3ae48c..be793c17f 100644 --- a/platform/parsers/elastic_query_dsl/pancake_transformer.go +++ b/platform/parsers/elastic_query_dsl/pancake_transformer.go @@ -174,7 +174,8 @@ func (a *pancakeTransformer) createLayer(previousAggrNames []string, childAggreg case model.BucketAggregation: filter, isFilter := childAgg.queryType.(bucket_aggregations.FilterAgg) if isFilter && len(childAgg.children) == 0 { - childAgg.selectedColumns = append(childAgg.selectedColumns, model.NewFunction("countIf", filter.WhereClause)) + ifFunction := model.NewFunction("IF", filter.WhereClause, model.NewLiteral(1), model.NewLiteral(0)) + childAgg.selectedColumns = append(childAgg.selectedColumns, model.NewFunction("SUM", ifFunction)) metric, err := a.metricAggregationTreeNodeToModel(previousAggrNames, childAgg) if err != nil { return nil, err diff --git a/platform/parsers/elastic_query_dsl/query_parser.go b/platform/parsers/elastic_query_dsl/query_parser.go index a0c30d9cc..59eed3d69 100644 --- a/platform/parsers/elastic_query_dsl/query_parser.go +++ b/platform/parsers/elastic_query_dsl/query_parser.go @@ -355,40 +355,41 @@ func (cw *ClickhouseQueryTranslator) parseIds(queryMap QueryMap) model.SimpleQue timestampColumnName = model.TimestampFieldName } - if column, ok := cw.Table.Cols[timestampColumnName]; ok { - switch column.Type.String() { - case clickhouse.DateTime64.String(): - idToSql = func(id string) (model.Expr, error) { - precision, success := util.FindTimestampPrecision(id[1 : len(id)-1]) // strip quotes added above - if !success { - return nil, fmt.Errorf("invalid timestamp format: %s", id) - } - return model.NewFunction("toDateTime64", model.NewLiteral(id), model.NewLiteral(precision)), nil - } - case clickhouse.DateTime.String(): - idToSql = func(id string) (model.Expr, error) { - return model.NewFunction("toDateTime", model.NewLiteral(id)), nil - } - default: - logger.ErrorWithCtx(cw.Ctx).Msgf("timestamp field of unsupported type %s", column.Type.String()) - return model.NewSimpleQueryInvalid() - } - } else { - logger.ErrorWithCtx(cw.Ctx).Msgf("timestamp field %s not found in schema", timestampColumnName) - return model.NewSimpleQueryInvalid() - } + //if column, ok := cw.Table.Cols[timestampColumnName]; ok { + // switch column.Type.String() { + // case clickhouse.DateTime64.String(): + // idToSql = func(id string) (model.Expr, error) { + // precision, success := util.FindTimestampPrecision(id[1 : len(id)-1]) // strip quotes added above + // if !success { + // return nil, fmt.Errorf("invalid timestamp format: %s", id) + // } + // return model.NewFunction("toDateTime64", model.NewLiteral(id), model.NewLiteral(precision)), nil + // } + // case clickhouse.DateTime.String(): + // idToSql = func(id string) (model.Expr, error) { + // return model.NewFunction("toDateTime", model.NewLiteral(id)), nil + // } + // default: + // logger.ErrorWithCtx(cw.Ctx).Msgf("timestamp field of unsupported type %s", column.Type.String()) + // return model.NewSimpleQueryInvalid() + // } + //} else { + // logger.ErrorWithCtx(cw.Ctx).Msgf("timestamp field %s not found in schema", timestampColumnName) + // return model.NewSimpleQueryInvalid() + //} var whereStmt model.Expr switch len(ids) { case 0: whereStmt = model.FalseExpr // timestamp IN [] <=> false case 1: - sql, err := idToSql(ids[0]) - if err != nil { - logger.ErrorWithCtx(cw.Ctx).Msgf("error converting id to sql: %v", err) - return model.NewSimpleQueryInvalid() - } - whereStmt = model.NewInfixExpr(model.NewColumnRef(timestampColumnName), " = ", sql) + //sql = ids[0] + //sql, err := idToSql(ids[0]) + //if err != nil { + // logger.ErrorWithCtx(cw.Ctx).Msgf("error converting id to sql: %v", err) + // return model.NewSimpleQueryInvalid() + //} + whereStmt = model.NewInfixExpr(model.NewColumnRef(timestampColumnName), " = ", model.NewLiteral(ids[0])) default: idsAsExprs := make([]model.Expr, len(ids)) for i, id := range ids { @@ -584,9 +585,15 @@ func (cw *ClickhouseQueryTranslator) parseMatch(queryMap QueryMap, matchPhrase b computedIdMatchingQuery := cw.parseIds(QueryMap{"values": []interface{}{subQuery}}) statements = append(statements, computedIdMatchingQuery.WhereClause) } else { - fullLiteral := model.NewLiteralWithEscapeType("'"+subQuery+"'", model.NotEscapedLikeFull) - simpleStat := model.NewInfixExpr(model.NewColumnRef(fieldName), model.MatchOperator, fullLiteral) - statements = append(statements, simpleStat) + if matchPhrase { + fullLiteral := model.NewLiteralWithEscapeType("'"+subQuery+"'", model.NotEscapedLikeFull) + simpleStat := model.NewInfixExpr(model.NewColumnRef(fieldName), model.MatchPhraseOperator, fullLiteral) + statements = append(statements, simpleStat) + } else { + fullLiteral := model.NewLiteralWithEscapeType("'"+subQuery+"'", model.NotEscapedLikeFull) + simpleStat := model.NewInfixExpr(model.NewColumnRef(fieldName), model.MatchOperator, fullLiteral) + statements = append(statements, simpleStat) + } } } return model.NewSimpleQuery(model.Or(statements), true) @@ -648,7 +655,7 @@ func (cw *ClickhouseQueryTranslator) parseMultiMatch(queryMap QueryMap) model.Si i := 0 for _, field := range fields { for _, subQ := range subQueries { - simpleStat := model.NewInfixExpr(model.NewColumnRef(field), "iLIKE", model.NewLiteral("'%"+subQ+"%'")) + simpleStat := model.NewInfixExpr(model.NewColumnRef(field), "MATCH", model.NewLiteral("'"+subQ+"'")) sqls[i] = simpleStat i++ } diff --git a/platform/util/utils.go b/platform/util/utils.go index 464a6feb4..f4f77ff68 100644 --- a/platform/util/utils.go +++ b/platform/util/utils.go @@ -759,6 +759,22 @@ func SingleQuote(value string) string { return "'" + value + "'" } +// EscapeStringNormal is a simple helper function: str -> `str` +func BackquoteIdentifier(identifier string) string { + if len(identifier) >= 2 && identifier[0] == '`' && identifier[len(identifier)-1] == '`' { + return identifier + } + return "`" + identifier + "`" +} + +func ContainsKeyword(str string) bool { + str = strings.ToLower(str) + if str == ("string") { + return true + } + return false +} + // IsSingleQuoted checks if a string is single-quoted func IsSingleQuoted(s string) bool { return len(s) >= 2 && s[0] == '\'' && s[len(s)-1] == '\''