Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 40ac249

Browse files
authored
Introducing datetime doris transformation (#1503)
This PR introduces an abstract logical __quesma_date_hour function to be used at the parsing level, and adds a new DateTimeFunction transformation that lowers it to a backend-specific implementation.
1 parent cb30918 commit 40ac249

File tree

8 files changed

+86
-23
lines changed

8 files changed

+86
-23
lines changed

platform/database_common/log_manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ type LogManagerIFace interface {
7777
CountMultiple(ctx context.Context, tables ...*Table) (int64, error)
7878
Count(ctx context.Context, table *Table) (int64, error)
7979
GetTableDefinitions() (TableMap, error)
80+
GetBackendConnector() quesma_api.BackendConnector
8081
}
8182

8283
func NewTableMap() *TableMap {
@@ -427,3 +428,7 @@ func (c *ChTableConfig) GetAttributes() []Attribute {
427428
func (l *LogManager) IsInTransparentProxyMode() bool {
428429
return l.cfg.TransparentProxy
429430
}
431+
432+
func (l *LogManager) GetBackendConnector() quesma_api.BackendConnector {
433+
return l.chDb
434+
}

platform/frontend_connectors/schema_transformer.go

Lines changed: 68 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/QuesmaOrg/quesma/platform/parsers/elastic_query_dsl"
1515
"github.com/QuesmaOrg/quesma/platform/schema"
1616
"github.com/QuesmaOrg/quesma/platform/transformations"
17+
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
1718
"sort"
1819
"strconv"
1920
"strings"
@@ -901,7 +902,6 @@ func (s *SchemaCheckPass) applyRuntimeMappings(indexSchema schema.Schema, query
901902
func (s *SchemaCheckPass) convertQueryDateTimeFunctionToClickhouse(indexSchema schema.Schema, query *model.Query) (*model.Query, error) {
902903

903904
visitor := model.NewBaseVisitor()
904-
905905
visitor.OverrideVisitFunction = func(b *model.BaseExprVisitor, e model.FunctionExpr) interface{} {
906906

907907
switch e.Name {
@@ -926,7 +926,36 @@ func (s *SchemaCheckPass) convertQueryDateTimeFunctionToClickhouse(indexSchema s
926926
query.SelectCommand = *expr.(*model.SelectCommand)
927927
}
928928
return query, nil
929+
}
930+
931+
// it convers out internal date time related fuction to clickhouse functions
932+
func (s *SchemaCheckPass) convertQueryDateTimeFunctionToDoris(indexSchema schema.Schema, query *model.Query) (*model.Query, error) {
933+
934+
visitor := model.NewBaseVisitor()
935+
visitor.OverrideVisitFunction = func(b *model.BaseExprVisitor, e model.FunctionExpr) interface{} {
936+
937+
switch e.Name {
938+
939+
case model.DateHourFunction:
940+
if len(e.Args) != 1 {
941+
return e
942+
}
943+
return model.NewFunction("HOUR", e.Args[0].Accept(b).(model.Expr))
944+
945+
// TODO this is a place for over date/time related functions
946+
// add more
929947

948+
default:
949+
return visitFunction(b, e)
950+
}
951+
}
952+
953+
expr := query.SelectCommand.Accept(visitor)
954+
955+
if _, ok := expr.(*model.SelectCommand); ok {
956+
query.SelectCommand = *expr.(*model.SelectCommand)
957+
}
958+
return query, nil
930959
}
931960

932961
func (s *SchemaCheckPass) checkAggOverUnsupportedType(indexSchema schema.Schema, query *model.Query) (*model.Query, error) {
@@ -1116,21 +1145,46 @@ func (s *SchemaCheckPass) Transform(plan *model.ExecutionPlan) (*model.Execution
11161145
{TransformationName: "FullTextFieldTransformation", Transformation: s.applyFullTextField},
11171146
{TransformationName: "TimestampFieldTransformation", Transformation: s.applyTimestampField},
11181147
{TransformationName: "ApplySearchAfterParameter", Transformation: s.applySearchAfterParameter},
1119-
1120-
// Section 3: clickhouse specific transformations
1121-
{TransformationName: "QuesmaDateFunctions", Transformation: s.convertQueryDateTimeFunctionToClickhouse},
1122-
{TransformationName: "IpTransformation", Transformation: s.applyIpTransformations},
1123-
{TransformationName: "GeoTransformation", Transformation: s.applyGeoTransformations},
1124-
{TransformationName: "ArrayTransformation", Transformation: s.applyArrayTransformations},
1125-
{TransformationName: "MapTransformation", Transformation: s.applyMapTransformations},
1126-
{TransformationName: "MatchOperatorTransformation", Transformation: s.applyMatchOperator},
1127-
{TransformationName: "AggOverUnsupportedType", Transformation: s.checkAggOverUnsupportedType},
1128-
{TransformationName: "ApplySelectFromCluster", Transformation: s.ApplySelectFromCluster},
1129-
1130-
// Section 4: compensations and checks
1131-
{TransformationName: "BooleanLiteralTransformation", Transformation: s.applyBooleanLiteralLowering},
11321148
}
11331149

1150+
// Section 3: backend specific transformations
1151+
// fallback to clickhouse date functions if no backend connector is set
1152+
if plan.BackendConnector == nil {
1153+
transformationChain = append(transformationChain, struct {
1154+
TransformationName string
1155+
Transformation func(schema.Schema, *model.Query) (*model.Query, error)
1156+
}{TransformationName: "QuesmaDateFunctions", Transformation: s.convertQueryDateTimeFunctionToClickhouse})
1157+
} else {
1158+
if plan.BackendConnector.GetId() == quesma_api.ClickHouseSQLBackend {
1159+
transformationChain = append(transformationChain, struct {
1160+
TransformationName string
1161+
Transformation func(schema.Schema, *model.Query) (*model.Query, error)
1162+
}{TransformationName: "QuesmaDateFunctions", Transformation: s.convertQueryDateTimeFunctionToClickhouse})
1163+
}
1164+
1165+
if plan.BackendConnector.GetId() == quesma_api.DorisSQLBackend {
1166+
transformationChain = append(transformationChain, struct {
1167+
TransformationName string
1168+
Transformation func(schema.Schema, *model.Query) (*model.Query, error)
1169+
}{TransformationName: "QuesmaDateFunctions", Transformation: s.convertQueryDateTimeFunctionToDoris})
1170+
}
1171+
}
1172+
transformationChain = append(transformationChain,
1173+
[]struct {
1174+
TransformationName string
1175+
Transformation func(schema.Schema, *model.Query) (*model.Query, error)
1176+
}{
1177+
{TransformationName: "IpTransformation", Transformation: s.applyIpTransformations},
1178+
{TransformationName: "GeoTransformation", Transformation: s.applyGeoTransformations},
1179+
{TransformationName: "ArrayTransformation", Transformation: s.applyArrayTransformations},
1180+
{TransformationName: "MapTransformation", Transformation: s.applyMapTransformations},
1181+
{TransformationName: "MatchOperatorTransformation", Transformation: s.applyMatchOperator},
1182+
{TransformationName: "AggOverUnsupportedType", Transformation: s.checkAggOverUnsupportedType},
1183+
{TransformationName: "ApplySelectFromCluster", Transformation: s.ApplySelectFromCluster},
1184+
{TransformationName: "BooleanLiteralTransformation", Transformation: s.applyBooleanLiteralLowering},
1185+
}...,
1186+
)
1187+
11341188
for k, query := range plan.Queries {
11351189
var err error
11361190

platform/frontend_connectors/search.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
521521
logger.InfoWithCtx(ctx).Msgf("Input SQL query %d: %s", i, query.SelectCommand.String())
522522
}
523523
}
524+
plan.BackendConnector = q.logManager.GetBackendConnector()
524525
err = q.transformQueries(plan)
525526

526527
if err != nil {

platform/model/query.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package model
55
import (
66
"github.com/QuesmaOrg/quesma/platform/parsers/painful"
77
"github.com/QuesmaOrg/quesma/platform/schema"
8+
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
89
"time"
910
)
1011

@@ -120,6 +121,8 @@ type ExecutionPlan struct {
120121
MergeSiblingResults func(plan *ExecutionPlan, results [][]QueryResultRow) (*ExecutionPlan, [][]QueryResultRow)
121122

122123
SiblingQueries map[int][]int // Map of query IDs to their sibling query IDs
124+
125+
BackendConnector quesma_api.BackendConnector // Backend connector used for executing the queries
123126
}
124127

125128
// NewExecutionPlan creates a new instance of model.ExecutionPlan

platform/parsers/elastic_query_dsl/aggregation_parser.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ func (cw *ClickhouseQueryTranslator) parseFieldFromScriptField(queryMap QueryMap
406406
wantedRegex := regexp.MustCompile(`^doc\['(\w+)']\.value\.(?:getHour\(\)|hourOfDay)$`)
407407
matches := wantedRegex.FindStringSubmatch(source)
408408
if len(matches) == 2 {
409-
return model.NewFunction("toHour", model.NewColumnRef(matches[1])), true
409+
return model.NewFunction(model.DateHourFunction, model.NewColumnRef(matches[1])), true
410410
}
411411

412412
// b) source: "if (doc['field_name_1'].value == doc['field_name_2'].value") { return 1; } else { return 0; }"

platform/parsers/elastic_query_dsl/aggregation_parser_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -704,9 +704,9 @@ func Test_parseFieldFromScriptField(t *testing.T) {
704704
expectedMatch model.Expr
705705
expectedSuccess bool
706706
}{
707-
{goodQueryMap("doc['field1'].value.getHour()"), model.NewFunction("toHour", model.NewColumnRef("field1")), true},
707+
{goodQueryMap("doc['field1'].value.getHour()"), model.NewFunction(model.DateHourFunction, model.NewColumnRef("field1")), true},
708708
{goodQueryMap("doc['field1'].value.getHour() + doc['field2'].value.getHour()"), nil, false},
709-
{goodQueryMap("doc['field1'].value.hourOfDay"), model.NewFunction("toHour", model.NewColumnRef("field1")), true},
709+
{goodQueryMap("doc['field1'].value.hourOfDay"), model.NewFunction(model.DateHourFunction, model.NewColumnRef("field1")), true},
710710
{goodQueryMap("doc['field1'].value"), nil, false},
711711
{goodQueryMap("value.getHour() + doc['field2'].value.getHour()"), nil, false},
712712
{QueryMap{}, nil, false},

platform/testdata/opensearch-visualize/aggregation_requests.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1342,8 +1342,8 @@ var AggregationTests = []testdata.AggregationTestCase{
13421342
}},
13431343
},
13441344
ExpectedPancakeSQL: `
1345-
SELECT maxOrNull(toHour("timestamp")) AS "metric__maxAgg_col_0",
1346-
minOrNull(toHour("timestamp")) AS "metric__minAgg_col_0"
1345+
SELECT maxOrNull(__quesma_date_hour("timestamp")) AS "metric__maxAgg_col_0",
1346+
minOrNull(__quesma_date_hour("timestamp")) AS "metric__minAgg_col_0"
13471347
FROM ` + TableName + ``,
13481348
},
13491349
{ // [9]
@@ -1457,9 +1457,9 @@ var AggregationTests = []testdata.AggregationTestCase{
14571457
}},
14581458
},
14591459
ExpectedPancakeSQL: `
1460-
SELECT toHour("timestamp") AS "aggr__2__key_0", count(*) AS "aggr__2__count"
1460+
SELECT __quesma_date_hour("timestamp") AS "aggr__2__key_0", count(*) AS "aggr__2__count"
14611461
FROM ` + TableName + `
1462-
GROUP BY toHour("timestamp") AS "aggr__2__key_0"
1462+
GROUP BY __quesma_date_hour("timestamp") AS "aggr__2__key_0"
14631463
ORDER BY "aggr__2__key_0" ASC`,
14641464
},
14651465
}

platform/testdata/opensearch-visualize/pipeline_aggregation_requests.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -850,7 +850,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{
850850
ExpectedPancakeSQL: `
851851
SELECT toInt64(toUnixTimestamp64Milli("timestamp") / 600000) AS "aggr__2__key_0",
852852
count(*) AS "aggr__2__count",
853-
sumOrNull(toHour("timestamp")) AS "metric__2__1-metric_col_0"
853+
sumOrNull(__quesma_date_hour("timestamp")) AS "metric__2__1-metric_col_0"
854854
FROM __quesma_table_name
855855
GROUP BY toInt64(toUnixTimestamp64Milli("timestamp") / 600000) AS "aggr__2__key_0"
856856
ORDER BY "aggr__2__key_0" ASC`,
@@ -1726,7 +1726,7 @@ var PipelineAggregationTests = []testdata.AggregationTestCase{
17261726
ExpectedPancakeSQL: `
17271727
SELECT toInt64(toUnixTimestamp64Milli("timestamp") / 600000) AS "aggr__2__key_0",
17281728
count(*) AS "aggr__2__count",
1729-
sumOrNull(toHour("timestamp")) AS "metric__2__1-metric_col_0"
1729+
sumOrNull(__quesma_date_hour("timestamp")) AS "metric__2__1-metric_col_0"
17301730
FROM __quesma_table_name
17311731
GROUP BY toInt64(toUnixTimestamp64Milli("timestamp") / 600000) AS "aggr__2__key_0"
17321732
ORDER BY "aggr__2__key_0" ASC`,

0 commit comments

Comments
 (0)