Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 976ccff

Browse files
authored
Merge branch 'main' into fuzzy_search_support
2 parents 653e1e9 + cd6fe36 commit 976ccff

30 files changed

+3417
-381
lines changed

bin/ingest-clickhouse/requirements.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ idna==3.7
55
lz4==4.3.3
66
ndjson==0.3.1
77
pytz==2024.1
8-
requests==2.32.3
8+
requests==2.32.4
99
tabulate==0.9.0
1010
ujson==5.10.0
11-
urllib3==2.2.2
11+
urllib3==2.5.0
1212
zstandard==0.22.0

ci/it/testcases/utils.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,11 @@ func setupQuesma(ctx context.Context, quesmaConfig string) (testcontainers.Conta
198198
log.Println("No QUESMA_IT_VERSION environment variable set, watch out for stale images!")
199199
quesmaVersion = "nightly"
200200
}
201+
quesmaDockerImage := fmt.Sprintf("quesma/quesma:%s", quesmaVersion)
202+
log.Printf("Using Quesma docker image: %s", quesmaDockerImage)
201203

202204
quesmaReq := testcontainers.ContainerRequest{
203-
Image: fmt.Sprintf("quesma/quesma:%s", quesmaVersion),
205+
Image: quesmaDockerImage,
204206
ExposedPorts: []string{"0.0.0.0::9999/tcp", "0.0.0.0::8080/tcp"},
205207
Env: map[string]string{
206208
"QUESMA_CONFIG_FILE": "/configuration/conf.yaml",
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package frontend_connectors
4+
5+
const (
6+
CLickhouseDateHourFunction = "toHour"
7+
ClickhouseFromUnixTimeFunction64mili = "fromUnixTimestamp64Milli"
8+
ClickhouseFromUnixTimeFunction = "fromUnixTimestamp"
9+
ClickhouseToTimezone = "toTimezone"
10+
ClickhousetoUnixTimestamp64Milli = "toUnixTimestamp64Milli"
11+
DorisDateHourFunction = "HOUR"
12+
DorisFromUnixTimeFunction = "FROM_UNIXTIME"
13+
DorisFromUnixTimeFunction64mili = "FROM_MILLISECOND"
14+
)

platform/frontend_connectors/schema_transformer.go

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ import (
2020
"strings"
2121
)
2222

23+
type TransformationsChain struct {
24+
TransformationName string
25+
Transformation func(schema.Schema, *model.Query) (*model.Query, error)
26+
}
27+
2328
type SchemaCheckPass struct {
2429
cfg *config.QuesmaConfiguration
2530
tableDiscovery database_common.TableDiscovery
@@ -910,10 +915,15 @@ func (s *SchemaCheckPass) convertQueryDateTimeFunctionToClickhouse(indexSchema s
910915
if len(e.Args) != 1 {
911916
return e
912917
}
913-
return model.NewFunction("toHour", e.Args[0].Accept(b).(model.Expr))
918+
return model.NewFunction(CLickhouseDateHourFunction, e.Args[0].Accept(b).(model.Expr))
919+
920+
case model.FromUnixTimeFunction64mili:
921+
args := b.VisitChildren(e.Args)
922+
return model.NewFunction(ClickhouseFromUnixTimeFunction64mili, args...)
914923

915-
// TODO this is a place for over date/time related functions
916-
// add more
924+
case model.FromUnixTimeFunction:
925+
args := b.VisitChildren(e.Args)
926+
return model.NewFunction(ClickhouseFromUnixTimeFunction, args...)
917927

918928
default:
919929
return visitFunction(b, e)
@@ -940,11 +950,14 @@ func (s *SchemaCheckPass) convertQueryDateTimeFunctionToDoris(indexSchema schema
940950
if len(e.Args) != 1 {
941951
return e
942952
}
943-
return model.NewFunction("HOUR", e.Args[0].Accept(b).(model.Expr))
944-
945-
// TODO this is a place for over date/time related functions
946-
// add more
947-
953+
return model.NewFunction(DorisDateHourFunction, e.Args[0].Accept(b).(model.Expr))
954+
case model.FromUnixTimeFunction:
955+
args := b.VisitChildren(e.Args)
956+
return model.NewFunction(DorisFromUnixTimeFunction, args...)
957+
958+
case model.FromUnixTimeFunction64mili:
959+
args := b.VisitChildren(e.Args)
960+
return model.NewFunction(DorisFromUnixTimeFunction64mili, args...)
948961
default:
949962
return visitFunction(b, e)
950963
}
@@ -1083,7 +1096,7 @@ func (s *SchemaCheckPass) acceptIntsAsTimestamps(indexSchema schema.Schema, quer
10831096
}
10841097
}
10851098
if ok {
1086-
if f, okF := model.ToFunction(expr); okF && f.Name == "fromUnixTimestamp64Milli" && len(f.Args) == 1 {
1099+
if f, okF := model.ToFunction(expr); okF && f.Name == model.FromUnixTimeFunction64mili && len(f.Args) == 1 {
10871100
if l, okL := model.ToLiteral(f.Args[0]); okL {
10881101
if _, exists := l.Format(); exists { // heuristics: it's a date <=> it has a format
10891102
return model.NewInfixExpr(col, e.Op, f.Args[0])
@@ -1096,16 +1109,16 @@ func (s *SchemaCheckPass) acceptIntsAsTimestamps(indexSchema schema.Schema, quer
10961109
}
10971110

10981111
visitor.OverrideVisitFunction = func(b *model.BaseExprVisitor, f model.FunctionExpr) interface{} {
1099-
if f.Name == "toUnixTimestamp64Milli" && len(f.Args) == 1 {
1112+
if f.Name == ClickhousetoUnixTimestamp64Milli && len(f.Args) == 1 {
11001113
if col, ok := model.ExtractColRef(f.Args[0]); ok && table.IsInt(col.ColumnName) {
11011114
// erases toUnixTimestamp64Milli
11021115
return f.Args[0]
11031116
}
11041117
}
1105-
if f.Name == "toTimezone" && len(f.Args) == 2 {
1118+
if f.Name == ClickhouseToTimezone && len(f.Args) == 2 {
11061119
if col, ok := model.ExtractColRef(f.Args[0]); ok && table.IsInt(col.ColumnName) {
11071120
// adds fromUnixTimestamp64Milli
1108-
return model.NewFunction("toTimezone", model.NewFunction("fromUnixTimestamp64Milli", f.Args[0]), f.Args[1])
1121+
return model.NewFunction(ClickhouseToTimezone, model.NewFunction(model.FromUnixTimeFunction64mili, f.Args[0]), f.Args[1])
11091122
}
11101123
}
11111124
return visitFunction(b, f)
@@ -1119,12 +1132,8 @@ func (s *SchemaCheckPass) acceptIntsAsTimestamps(indexSchema schema.Schema, quer
11191132
return query, nil
11201133
}
11211134

1122-
func (s *SchemaCheckPass) Transform(plan *model.ExecutionPlan) (*model.ExecutionPlan, error) {
1123-
1124-
transformationChain := []struct {
1125-
TransformationName string
1126-
Transformation func(schema.Schema, *model.Query) (*model.Query, error)
1127-
}{
1135+
func (s *SchemaCheckPass) makeTransformations(backendConnectorType quesma_api.BackendConnectorType) []TransformationsChain {
1136+
transformationChain := []TransformationsChain{
11281137
// Section 1: from logical to physical
11291138
{TransformationName: "PhysicalFromExpressionTransformation", Transformation: s.applyPhysicalFromExpression},
11301139
{TransformationName: "WildcardExpansion", Transformation: s.applyWildcardExpansion},
@@ -1149,31 +1158,23 @@ func (s *SchemaCheckPass) Transform(plan *model.ExecutionPlan) (*model.Execution
11491158

11501159
// Section 3: backend specific transformations
11511160
// fallback to clickhouse date functions if no backend connector is set
1152-
if plan.BackendConnector == nil {
1161+
1162+
if backendConnectorType == quesma_api.ClickHouseSQLBackend {
11531163
transformationChain = append(transformationChain, struct {
11541164
TransformationName string
11551165
Transformation func(schema.Schema, *model.Query) (*model.Query, error)
11561166
}{TransformationName: "QuesmaDateFunctions", Transformation: s.convertQueryDateTimeFunctionToClickhouse})
1157-
} else {
1158-
if plan.BackendConnector.GetId() == quesma_api.ClickHouseSQLBackend {
1159-
transformationChain = append(transformationChain, struct {
1160-
TransformationName string
1161-
Transformation func(schema.Schema, *model.Query) (*model.Query, error)
1162-
}{TransformationName: "QuesmaDateFunctions", Transformation: s.convertQueryDateTimeFunctionToClickhouse})
1163-
}
1164-
1165-
if plan.BackendConnector.GetId() == quesma_api.DorisSQLBackend {
1166-
transformationChain = append(transformationChain, struct {
1167-
TransformationName string
1168-
Transformation func(schema.Schema, *model.Query) (*model.Query, error)
1169-
}{TransformationName: "QuesmaDateFunctions", Transformation: s.convertQueryDateTimeFunctionToDoris})
1170-
}
11711167
}
1172-
transformationChain = append(transformationChain,
1173-
[]struct {
1168+
1169+
if backendConnectorType == quesma_api.DorisSQLBackend {
1170+
transformationChain = append(transformationChain, struct {
11741171
TransformationName string
11751172
Transformation func(schema.Schema, *model.Query) (*model.Query, error)
1176-
}{
1173+
}{TransformationName: "QuesmaDateFunctions", Transformation: s.convertQueryDateTimeFunctionToDoris})
1174+
}
1175+
1176+
transformationChain = append(transformationChain,
1177+
[]TransformationsChain{
11771178
{TransformationName: "IpTransformation", Transformation: s.applyIpTransformations},
11781179
{TransformationName: "GeoTransformation", Transformation: s.applyGeoTransformations},
11791180
{TransformationName: "ArrayTransformation", Transformation: s.applyArrayTransformations},
@@ -1184,7 +1185,20 @@ func (s *SchemaCheckPass) Transform(plan *model.ExecutionPlan) (*model.Execution
11841185
{TransformationName: "BooleanLiteralTransformation", Transformation: s.applyBooleanLiteralLowering},
11851186
}...,
11861187
)
1188+
return transformationChain
1189+
}
1190+
1191+
func (s *SchemaCheckPass) determineBackendConnectorType(plan *model.ExecutionPlan) quesma_api.BackendConnectorType {
1192+
if plan != nil && plan.BackendConnector != nil {
1193+
return plan.BackendConnector.GetId()
1194+
}
1195+
return quesma_api.ClickHouseSQLBackend
1196+
}
1197+
1198+
func (s *SchemaCheckPass) Transform(plan *model.ExecutionPlan) (*model.ExecutionPlan, error) {
11871199

1200+
backendConnectorType := s.determineBackendConnectorType(plan)
1201+
transformationChain := s.makeTransformations(backendConnectorType)
11881202
for k, query := range plan.Queries {
11891203
var err error
11901204

platform/frontend_connectors/schema_transformer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2039,7 +2039,7 @@ func Test_acceptIntsAsTimestamps(t *testing.T) {
20392039
model.NewInfixExpr(
20402040
model.NewFunction("timeZoneOffset", model.NewFunction(
20412041
"toTimezone",
2042-
model.NewFunction("fromUnixTimestamp64Milli", model.NewColumnRef("timestampInt")),
2042+
model.NewFunction(model.FromUnixTimeFunction64mili, model.NewColumnRef("timestampInt")),
20432043
model.NewLiteral("'Europe/Warsaw'")),
20442044
),
20452045
"*",

platform/frontend_connectors/search_opensearch_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func TestSearchOpensearch(t *testing.T) {
5555
assert.NoError(t, err, "no ParseQuery error")
5656
assert.True(t, len(queries) > 0, "len queries > 0")
5757
whereClause := model.AsString(queries[0].SelectCommand.WhereClause)
58+
// This checks where clause after parsing and before transformations
5859
assert.Contains(t, tt.WantedSql, whereClause, "contains wanted sql")
5960

6061
for _, wantedQuery := range tt.WantedQueries {

platform/frontend_connectors/search_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func TestAsyncSearchHandler(t *testing.T) {
8383
},
8484
}
8585

86-
for i, tt := range testdata.TestsAsyncSearch {
86+
for i, tt := range testdata.TestsAsyncSearchAfterTransformations {
8787
t.Run(util.PrettyTestName(tt.Name, i), func(t *testing.T) {
8888
conn, mock := util.InitSqlMockWithPrettySqlAndPrint(t, false)
8989
db := backend_connectors.NewClickHouseBackendConnectorWithConnection("", conn)
@@ -302,7 +302,7 @@ func TestSearchHandler(t *testing.T) {
302302
},
303303
}
304304

305-
for i, tt := range testdata.TestsSearch {
305+
for i, tt := range testdata.TestsSearchAfterTransformations {
306306
t.Run(util.PrettyTestName(tt.Name, i), func(t *testing.T) {
307307
var conn *sql.DB
308308
var mock sqlmock.Sqlmock
@@ -433,7 +433,7 @@ func TestSearchHandlerNoAttrsConfig(t *testing.T) {
433433
},
434434
}
435435

436-
for i, tt := range testdata.TestsSearchNoAttrs {
436+
for i, tt := range testdata.TestsSearchNoAttrsAfterTransformations {
437437
t.Run(util.PrettyTestName(tt.Name, i), func(t *testing.T) {
438438
conn, mock := util.InitSqlMockWithPrettyPrint(t, false)
439439
defer conn.Close()
@@ -482,7 +482,7 @@ func TestAsyncSearchFilter(t *testing.T) {
482482
},
483483
},
484484
}
485-
for i, tt := range testdata.TestSearchFilter {
485+
for i, tt := range testdata.TestSearchFilterAfterTransformations {
486486
t.Run(util.PrettyTestName(tt.Name, i), func(t *testing.T) {
487487
var conn *sql.DB
488488
var mock sqlmock.Sqlmock

platform/frontend_connectors/terms_enum_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ func testHandleTermsEnumRequest(t *testing.T, requestBody []byte, fieldName stri
128128
ctx = context.WithValue(context.Background(), tracing.RequestIdCtxKey, "test")
129129
qt := &elastic_query_dsl.ClickhouseQueryTranslator{Table: table, Ctx: ctx, Schema: s.Tables[schema.IndexName(testTableName)]}
130130
// Here we additionally verify that terms for `_tier` are **NOT** included in the SQL query
131-
expectedQuery1 := fmt.Sprintf(`SELECT DISTINCT %s FROM %s WHERE (("epoch_time">=fromUnixTimestamp(1709036700) AND "epoch_time"<=fromUnixTimestamp(1709037659)) AND ("epoch_time_datetime64">=fromUnixTimestamp64Milli(1709036700000) AND "epoch_time_datetime64"<=fromUnixTimestamp64Milli(1709037659999))) LIMIT 13`, fieldName, testTableName)
132-
expectedQuery2 := fmt.Sprintf(`SELECT DISTINCT %s FROM %s WHERE (("epoch_time">=fromUnixTimestamp(1709036700) AND "epoch_time"<=fromUnixTimestamp(1709037659)) AND ("epoch_time_datetime64">=fromUnixTimestamp64Milli(1709036700000) AND "epoch_time_datetime64"<=fromUnixTimestamp64Milli(1709037659999))) LIMIT 13`, fieldName, testTableName)
131+
expectedQuery1 := fmt.Sprintf(`SELECT DISTINCT %s FROM %s WHERE (("epoch_time">=__quesma_from_unixtime(1709036700) AND "epoch_time"<=__quesma_from_unixtime(1709037659)) AND ("epoch_time_datetime64">=__quesma_from_unixtime64mili(1709036700000) AND "epoch_time_datetime64"<=__quesma_from_unixtime64mili(1709037659999))) LIMIT 13`, fieldName, testTableName)
132+
expectedQuery2 := fmt.Sprintf(`SELECT DISTINCT %s FROM %s WHERE (("epoch_time">=__quesma_from_unixtime(1709036700) AND "epoch_time"<=__quesma_from_unixtime(1709037659)) AND ("epoch_time_datetime64">=__quesma_from_unixtime64mili(1709036700000) AND "epoch_time_datetime64"<=__quesma_from_unixtime64mili(1709037659999))) LIMIT 13`, fieldName, testTableName)
133133

134134
// Once in a while `AND` conditions could be swapped, so we match both cases
135135
mock.ExpectQuery(fmt.Sprintf("%s|%s", regexp.QuoteMeta(expectedQuery1), regexp.QuoteMeta(expectedQuery2))).

platform/ingest/ingest_validator.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func validateNumericRange(columnType string, value interface{}) (isValid bool) {
8888
}
8989
}
9090

91-
func validateNumericType(columnType string, incomingValueType string, value interface{}) (isValid bool) {
91+
func validateNumericType(columnName, columnType string, incomingValueType string, value interface{}) (isValid bool) {
9292
if isFloatingPointType(columnType) && isNumericType(incomingValueType) {
9393
return true
9494
}
@@ -101,19 +101,19 @@ func validateNumericType(columnType string, incomingValueType string, value inte
101101
if valueAsStr, ok := value.(string); ok {
102102
return util.IsFloat(valueAsStr)
103103
} else {
104-
logger.Error().Msgf("Invalid value type for column of type %s: %T, value: %v", columnType, value, value)
104+
logger.Error().Msgf("Invalid value type for column [%s] of type %s: %T, value: %v", columnName, columnType, value, value)
105105
}
106106
}
107107
if isIntegerType(columnType) && incomingValueType == "String" {
108108
if valueAsStr, ok := value.(string); ok && util.IsInt(valueAsStr) {
109109
valueAsInt, err := util.ToInt64(valueAsStr)
110110
if err != nil {
111-
logger.Error().Msgf("Failed to convert value to int: %v", valueAsStr)
111+
logger.Error().Msgf("Failed to convert value to int: %v when processing data for column [%s]", valueAsStr, columnName)
112112
return false
113113
}
114114
return validateNumericRange(columnType, valueAsInt)
115115
} else {
116-
logger.Error().Msgf("Invalid value type for column of type %s: %T, value: %v", columnType, value, value)
116+
logger.Error().Msgf("Invalid value type for column [%s] of type %s: %T, value: %v", columnName, columnType, value, value)
117117
}
118118
}
119119

@@ -131,7 +131,7 @@ func validateValueAgainstType(fieldName string, value interface{}, columnType da
131131
columnTypeName := removeLowCardinality(columnType.Name)
132132

133133
if isNumericType(columnTypeName) {
134-
if incomingValueType, isBaseType := incomingValueType.(database_common.BaseType); isBaseType && validateNumericType(columnTypeName, incomingValueType.Name, value) {
134+
if incomingValueType, isBaseType := incomingValueType.(database_common.BaseType); isBaseType && validateNumericType(fieldName, columnTypeName, incomingValueType.Name, value) {
135135
// Numeric types match!
136136
return true
137137
}

platform/model/constants.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ const (
77
FullTextFieldNamePlaceHolder = "__quesma_fulltext_field_name"
88
TimestampFieldName = "@timestamp"
99

10-
DateHourFunction = "__quesma_date_hour"
11-
MatchOperator = "__quesma_match"
10+
DateHourFunction = "__quesma_date_hour"
11+
MatchOperator = "__quesma_match"
12+
FromUnixTimeFunction = "__quesma_from_unixtime"
13+
FromUnixTimeFunction64mili = "__quesma_from_unixtime64mili"
1214
)

0 commit comments

Comments
 (0)