Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 54d168a

Browse files
authored
Geo type refactor (#761)
Signed-off-by: Rafał Strzaliński <[email protected]>
1 parent 9f822e6 commit 54d168a

File tree

11 files changed

+175
-193
lines changed

11 files changed

+175
-193
lines changed

quesma/model/geo_type.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package model
4+
5+
const QuesmaGeoLatFunction = "__quesma_geo_lat"
6+
const QuesmaGeoLonFunction = "__quesma_geo_lon"
7+
8+
func NewGeoLat(propertyName string) Expr {
9+
return NewFunction(QuesmaGeoLatFunction, NewColumnRef(propertyName))
10+
}
11+
12+
func NewGeoLon(propertyName string) Expr {
13+
return NewFunction(QuesmaGeoLonFunction, NewColumnRef(propertyName))
14+
}

quesma/model/metrics_aggregations/top_hits.go

Lines changed: 3 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,8 @@ package metrics_aggregations
44

55
import (
66
"context"
7-
"encoding/json"
87
"quesma/logger"
98
"quesma/model"
10-
"quesma/schema"
11-
"strconv"
12-
"strings"
139
)
1410

1511
type TopHits struct {
@@ -50,49 +46,11 @@ func (query TopHits) TranslateSqlResponseToJson(rows []model.QueryResultRow) mod
5046
sourceMap := model.JsonMap{}
5147

5248
for _, col := range valuesForHits {
53-
var withoutQuotes string
54-
if unquoted, err := strconv.Unquote(col.ColName); err == nil {
55-
withoutQuotes = unquoted
56-
} else {
57-
withoutQuotes = col.ColName
58-
}
59-
colName, _ := strings.CutPrefix(withoutQuotes, `windowed_`)
6049

61-
if col.ColType.Name == schema.QuesmaTypePoint.Name {
62-
hits := make(model.JsonMap)
63-
// TODO suffixes (::lat, ::lon) hardcoded for now
64-
// due to insufficient information in the schema
65-
if strings.Contains(col.ColName, "::lon") {
66-
hits["lon"] = col.ExtractValue(query.ctx)
67-
colName = strings.TrimSuffix(col.ColName, "::lon")
68-
}
69-
if strings.Contains(col.ColName, "::lat") {
70-
hits["lat"] = col.ExtractValue(query.ctx)
71-
colName = strings.TrimSuffix(col.ColName, "::lat")
72-
}
73-
if _, ok := sourceMap[colName]; ok {
74-
currentHits := sourceMap[colName].(model.JsonMap)
75-
for k, v := range currentHits {
76-
hits[k] = v
77-
}
78-
sourceMap[colName] = hits
79-
} else {
80-
sourceMap[colName] = hits
81-
}
50+
value := col.ExtractValue(query.ctx)
51+
52+
sourceMap[col.ColName] = value
8253

83-
} else {
84-
value := col.ExtractValue(query.ctx)
85-
// TODO: this is hack, we should not assume this is location
86-
if strings.HasSuffix(col.ColName, "Location") {
87-
if valueStr, ok := value.(string); ok {
88-
var valueJson model.JsonMap
89-
if err := json.Unmarshal([]byte(valueStr), &valueJson); err == nil {
90-
value = valueJson
91-
}
92-
}
93-
}
94-
sourceMap[col.ColName] = value
95-
}
9654
}
9755

9856
elem := model.JsonMap{

quesma/queryparser/pancake_aggregation_parser_buckets.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -236,21 +236,21 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
236236

237237
zoomLiteral := model.NewLiteral(precisionZoom)
238238

239-
// TODO columns names should be created according to the schema
240-
var lon = model.AsString(field)
241-
lon = strings.Trim(lon, "\"")
242-
lon = lon + "::lon"
243-
var lat = model.AsString(field)
244-
lat = strings.Trim(lat, "\"")
245-
lat = lat + "::lat"
246-
toFloatFunLon := model.NewFunction("toFloat64", model.NewColumnRef(lon))
239+
fieldName, err := strconv.Unquote(model.AsString(field))
240+
if err != nil {
241+
return false, err
242+
}
243+
lon := model.NewGeoLon(fieldName)
244+
lat := model.NewGeoLat(fieldName)
245+
246+
toFloatFunLon := model.NewFunction("toFloat64", lon)
247247
var infixX model.Expr
248248
infixX = model.NewParenExpr(model.NewInfixExpr(toFloatFunLon, "+", model.NewLiteral(180.0)))
249249
infixX = model.NewParenExpr(model.NewInfixExpr(infixX, "/", model.NewLiteral(360.0)))
250250
infixX = model.NewInfixExpr(infixX, "*",
251251
model.NewFunction("POWER", model.NewLiteral(2), zoomLiteral))
252252
xTile := model.NewFunction("FLOOR", infixX)
253-
toFloatFunLat := model.NewFunction("toFloat64", model.NewColumnRef(lat))
253+
toFloatFunLat := model.NewFunction("toFloat64", lat)
254254
radians := model.NewFunction("RADIANS", toFloatFunLat)
255255
tan := model.NewFunction("TAN", radians)
256256
cos := model.NewFunction("COS", radians)

quesma/queryparser/pancake_aggregation_parser_metrics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,8 @@ func generateMetricSelectedColumns(ctx context.Context, metricsAggr metricsAggre
108108
if col, ok := firstExpr.(model.ColumnRef); ok {
109109
colName := col.ColumnName
110110
// TODO we have create columns according to the schema
111-
latColumn := model.NewColumnRef(colName + "::lat")
112-
lonColumn := model.NewColumnRef(colName + "::lon")
111+
latColumn := model.NewGeoLat(colName)
112+
lonColumn := model.NewGeoLon(colName)
113113
castLat := model.NewFunction("CAST", latColumn, model.NewLiteral(fmt.Sprintf("'%s'", "Float")))
114114
castLon := model.NewFunction("CAST", lonColumn, model.NewLiteral(fmt.Sprintf("'%s'", "Float")))
115115
result = append(result, model.NewFunction("avgOrNull", castLat))

quesma/queryparser/pancake_sql_query_generation_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ func TestPancakeQueryGeneration(t *testing.T) {
143143

144144
actualMinusExpected, expectedMinusActual := util.MapDifference(pancakeJson,
145145
expectedAggregationsPart, acceptableDifference, true, true)
146+
146147
if len(actualMinusExpected) != 0 {
147148
pp.Println("ACTUAL diff", actualMinusExpected)
148149
}

quesma/queryparser/pancake_top_hits.go

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"quesma/model"
88
"quesma/model/metrics_aggregations"
99
"strconv"
10-
"strings"
1110
)
1211

1312
func (p *pancakeSqlQueryGenerator) quotedLiteral(name string) model.LiteralExpr {
@@ -83,25 +82,14 @@ func (p *pancakeSqlQueryGenerator) generateTopHitsQuery(aggregation *pancakeMode
8382
groupTableName := "group_table"
8483
hitTableName := "hit_table"
8584

86-
hitTableLiteral := func(reference string) model.Expr {
87-
return model.NewLiteral(strconv.Quote(hitTableName) + "." + strconv.Quote(reference))
88-
}
8985
groupTableLiteral := func(reference string) model.Expr {
9086
return model.NewLiteral(strconv.Quote(groupTableName) + "." + strconv.Quote(reference))
9187
}
9288

9389
convertColumnRefToHitTable := func(expr model.Expr) model.Expr {
9490
switch exprTyped := expr.(type) {
9591
case model.ColumnRef:
96-
// TODO: hack alert, we treat geo here in unique way
97-
if strings.HasSuffix(exprTyped.ColumnName, "Location") {
98-
return model.NewFunction("map",
99-
model.NewLiteral("'lat'"),
100-
hitTableLiteral(exprTyped.ColumnName+"::lat"),
101-
model.NewLiteral("'lon'"),
102-
hitTableLiteral(exprTyped.ColumnName+"::lon"),
103-
)
104-
}
92+
10593
return model.ColumnRef{
10694
TableAlias: hitTableName,
10795
ColumnName: exprTyped.ColumnName,

quesma/quesma/schema_transformer.go

Lines changed: 99 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -152,63 +152,117 @@ func (s *SchemaCheckPass) applyIpTransformations(indexSchema schema.Schema, quer
152152
return query, nil
153153
}
154154

155-
func (s *SchemaCheckPass) applyGeoTransformations(currentSchema schema.Schema, query *model.Query) (*model.Query, error) {
156-
fromTable := query.TableName
155+
func (s *SchemaCheckPass) applyGeoTransformations(schemaInstance schema.Schema, query *model.Query) (*model.Query, error) {
156+
157+
replace := make(map[string]model.Expr)
158+
159+
for _, field := range schemaInstance.Fields {
160+
if field.Type.Name == schema.QuesmaTypePoint.Name {
161+
162+
lon := model.NewColumnRef(field.InternalPropertyName.AsString() + "::lon")
163+
lat := model.NewColumnRef(field.InternalPropertyName.AsString() + "::lat")
164+
165+
// This is a workaround. Clickhouse Point is defined as Tuple. We need to know the type of the tuple.
166+
// In this step we merge two columns into single map here. Map is in elastic format.
167+
168+
// In this point we assume that Quesma point type is stored into two separate columns.
169+
replace[field.PropertyName.AsString()] = model.NewFunction("map",
170+
model.NewLiteral("'lat'"),
171+
lat,
172+
model.NewLiteral("'lon'"),
173+
lon)
174+
175+
// these a just if we need multifields support
176+
replace[field.PropertyName.AsString()+".lat"] = lat
177+
replace[field.PropertyName.AsString()+".lon"] = lon
178+
179+
// if the point is stored as a single column, we need to extract the lat and lon
180+
//replace[field.PropertyName.AsString()] = model.NewFunction("give_me_point", model.NewColumnRef(field.InternalPropertyName.AsString()))
181+
//replace[field.PropertyName.AsString()+".lat"] = model.NewFunction("give_me_lat", model.NewColumnRef(field.InternalPropertyName.AsString()))
182+
//replace[field.PropertyName.AsString()+".lon"] = model.NewFunction("give_me_lon", model.NewColumnRef(field.InternalPropertyName.AsString()))
183+
184+
}
185+
}
157186

158187
visitor := model.NewBaseVisitor()
159-
visitor.OverrideVisitSelectCommand = func(b *model.BaseExprVisitor, e model.SelectCommand) interface{} {
160-
if s.schemaRegistry == nil {
161-
logger.Error().Msg("Schema registry is not set")
162-
return e
163-
}
164-
schemaInstance, exists := s.schemaRegistry.FindSchema(schema.TableName(fromTable))
165-
if !exists {
166-
logger.Error().Msgf("Schema fot table %s not found", fromTable)
167-
return e
168-
}
169-
var groupBy []model.Expr
170-
for _, expr := range e.GroupBy {
171-
groupByExpr := expr.Accept(b).(model.Expr)
172-
if col, ok := expr.(model.ColumnRef); ok {
173-
// This checks if the column is of type point
174-
// and if it is, it appends the lat and lon columns to the group by clause
175-
field := schemaInstance.Fields[schema.FieldName(col.ColumnName)]
176-
if field.Type.Name == schema.QuesmaTypePoint.Name {
177-
// TODO suffixes ::lat, ::lon are hardcoded for now
178-
groupBy = append(groupBy, model.NewColumnRef(field.InternalPropertyName.AsString()+"::lat"))
179-
groupBy = append(groupBy, model.NewColumnRef(field.InternalPropertyName.AsString()+"::lon"))
180-
} else {
181-
groupBy = append(groupBy, groupByExpr)
188+
189+
visitor.OverrideVisitColumnRef = func(b *model.BaseExprVisitor, e model.ColumnRef) interface{} {
190+
if expr, ok := replace[e.ColumnName]; ok {
191+
return expr
192+
}
193+
return e
194+
}
195+
196+
visitor.OverrideVisitFunction = func(b *model.BaseExprVisitor, e model.FunctionExpr) interface{} {
197+
198+
var suffix string
199+
switch e.Name {
200+
case model.QuesmaGeoLatFunction:
201+
suffix = ".lat"
202+
case model.QuesmaGeoLonFunction:
203+
suffix = ".lon"
204+
}
205+
206+
if suffix != "" && len(e.Args) == 1 {
207+
if col, ok := e.Args[0].(model.ColumnRef); ok {
208+
if expr, ok := replace[col.ColumnName+suffix]; ok {
209+
return expr
182210
}
183-
} else {
184-
groupBy = append(groupBy, groupByExpr)
185211
}
186212
}
187-
var columns []model.Expr
188-
for _, expr := range e.Columns {
213+
214+
return model.NewFunction(e.Name, b.VisitChildren(e.Args)...)
215+
}
216+
217+
visitor.OverrideVisitSelectCommand = func(v *model.BaseExprVisitor, query model.SelectCommand) interface{} {
218+
var columns, groupBy []model.Expr
219+
var orderBy []model.OrderByExpr
220+
from := query.FromClause
221+
where := query.WhereClause
222+
223+
for _, expr := range query.Columns {
224+
var alias string
189225
if col, ok := expr.(model.ColumnRef); ok {
190-
// This checks if the column is of type point
191-
// and if it is, it appends the lat and lon columns to the select clause
192-
field := schemaInstance.Fields[schema.FieldName(col.ColumnName)]
193-
if field.Type.Name == schema.QuesmaTypePoint.Name {
194-
// TODO suffixes ::lat, ::lon are hardcoded for now
195-
columns = append(columns, model.NewColumnRef(field.InternalPropertyName.AsString()+"::lat"))
196-
columns = append(columns, model.NewColumnRef(field.InternalPropertyName.AsString()+"::lon"))
197-
} else {
198-
columns = append(columns, expr.Accept(b).(model.Expr))
226+
if _, ok := replace[col.ColumnName]; ok {
227+
alias = col.ColumnName
199228
}
200-
} else {
201-
columns = append(columns, expr.Accept(b).(model.Expr))
202229
}
230+
231+
col := expr.Accept(v).(model.Expr)
232+
233+
if alias != "" {
234+
col = model.NewAliasedExpr(col, alias)
235+
}
236+
237+
columns = append(columns, col)
238+
}
239+
for _, expr := range query.GroupBy {
240+
groupBy = append(groupBy, expr.Accept(v).(model.Expr))
241+
}
242+
for _, expr := range query.OrderBy {
243+
orderBy = append(orderBy, expr.Accept(v).(model.OrderByExpr))
244+
}
245+
if query.FromClause != nil {
246+
from = query.FromClause.Accept(v).(model.Expr)
247+
}
248+
if query.WhereClause != nil {
249+
where = query.WhereClause.Accept(v).(model.Expr)
203250
}
204251

205-
var fromClause model.Expr
206-
if e.FromClause != nil {
207-
fromClause = e.FromClause.Accept(b).(model.Expr)
252+
var namedCTEs []*model.CTE
253+
if query.NamedCTEs != nil {
254+
for _, cte := range query.NamedCTEs {
255+
namedCTEs = append(namedCTEs, cte.Accept(v).(*model.CTE))
256+
}
208257
}
209258

210-
return model.NewSelectCommand(columns, groupBy, e.OrderBy,
211-
fromClause, e.WhereClause, e.LimitBy, e.Limit, e.SampleLimit, e.IsDistinct, e.NamedCTEs)
259+
var limitBy []model.Expr
260+
if query.LimitBy != nil {
261+
for _, expr := range query.LimitBy {
262+
limitBy = append(limitBy, expr.Accept(v).(model.Expr))
263+
}
264+
}
265+
return model.NewSelectCommand(columns, groupBy, orderBy, from, where, limitBy, query.Limit, query.SampleLimit, query.IsDistinct, namedCTEs)
212266
}
213267

214268
expr := query.SelectCommand.Accept(visitor)
@@ -329,9 +383,6 @@ func (s *SchemaCheckPass) applyWildcardExpansion(indexSchema schema.Schema, quer
329383

330384
cols := make([]string, 0, len(indexSchema.Fields))
331385
for _, col := range indexSchema.Fields {
332-
if col.Type.Name == schema.QuesmaTypePoint.Name { // Temporary workaround for kibana_flights
333-
continue
334-
}
335386
cols = append(cols, col.InternalPropertyName.AsString())
336387
}
337388
sort.Strings(cols)
@@ -520,38 +571,6 @@ func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, err
520571
return queries, nil
521572
}
522573

523-
type GeoIpResultTransformer struct {
524-
schemaRegistry schema.Registry
525-
fromTable string
526-
}
527-
528-
func (g *GeoIpResultTransformer) Transform(result [][]model.QueryResultRow) ([][]model.QueryResultRow, error) {
529-
if g.schemaRegistry == nil {
530-
logger.Error().Msg("Schema registry is not set")
531-
return result, nil
532-
}
533-
schemaInstance, exists := g.schemaRegistry.FindSchema(schema.TableName(g.fromTable))
534-
if !exists {
535-
logger.Error().Msgf("Schema fot table %s not found", g.fromTable)
536-
return result, nil
537-
}
538-
for i, rows := range result {
539-
for j, row := range rows {
540-
for k, col := range row.Cols {
541-
if strings.Contains(col.ColName, "::lat") {
542-
colType := schemaInstance.Fields[schema.FieldName(strings.TrimSuffix(col.ColName, "::lat"))].Type
543-
result[i][j].Cols[k].ColType = colType
544-
}
545-
if strings.Contains(col.ColName, "::lon") {
546-
colType := schemaInstance.Fields[schema.FieldName(strings.TrimSuffix(col.ColName, "::lon"))].Type
547-
result[i][j].Cols[k].ColType = colType
548-
}
549-
}
550-
}
551-
}
552-
return result, nil
553-
}
554-
555574
// ArrayResultTransformer is a transformer that transforms array columns into string representation
556575
type ArrayResultTransformer struct {
557576
}

quesma/quesma/schema_transformer_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -258,8 +258,11 @@ func Test_ipRangeTransform(t *testing.T) {
258258
TableName: "kibana_sample_data_flights",
259259
SelectCommand: model.SelectCommand{
260260
FromClause: model.NewTableRef("kibana_sample_data_flights"),
261-
Columns: []model.Expr{model.NewColumnRef("DestLocation::lat"),
262-
model.NewColumnRef("DestLocation::lon")},
261+
Columns: []model.Expr{model.NewAliasedExpr(model.NewFunction("map",
262+
model.NewLiteral("'lat'"),
263+
model.NewColumnRef("DestLocation::lat"),
264+
model.NewLiteral("'lon'"),
265+
model.NewColumnRef("DestLocation::lon")), "DestLocation")},
263266
}},
264267
}
265268
queries := [][]*model.Query{

quesma/quesma/search.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -764,7 +764,6 @@ func (q *QueryRunner) postProcessResults(table *clickhouse.Table, results [][]mo
764764
transformer model.ResultTransformer
765765
}{
766766
{"replaceColumNamesWithFieldNames", &replaceColumNamesWithFieldNames{}},
767-
{"geoIpResultTransformer", &GeoIpResultTransformer{schemaRegistry: q.schemaRegistry, fromTable: table.Name}},
768767
{"arrayResultTransformer", &ArrayResultTransformer{}},
769768
}
770769

0 commit comments

Comments
 (0)