Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit b97b980

Browse files
authored
Support SELECT ... FROM cluster("my_cluster", "test_db", "test_table") syntax (#1492)
<!-- A note on testing your PR --> <!-- Basic unit test run is executed against each commit in the PR. If you want to run a full integration test suite, you can trigger it by commenting with '/run-integration-tests' or '/run-it' -->
1 parent 0cca9aa commit b97b980

File tree

2 files changed

+143
-26
lines changed

2 files changed

+143
-26
lines changed

platform/frontend_connectors/schema_transformer.go

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/QuesmaOrg/quesma/platform/schema"
1616
"github.com/QuesmaOrg/quesma/platform/transformations"
1717
"sort"
18+
"strconv"
1819
"strings"
1920
)
2021

@@ -45,6 +46,48 @@ func (s *SchemaCheckPass) isFieldMapSyntaxEnabled(query *model.Query) bool {
4546
return enabled
4647
}
4748

49+
func (s *SchemaCheckPass) ApplySelectFromCluster(_ schema.Schema, query *model.Query) (*model.Query, error) {
50+
clusterName := s.cfg.ClusterName
51+
if clusterName == "" {
52+
return query, nil
53+
}
54+
55+
clusterLiteral := func(dbName, tableName string) model.LiteralExpr {
56+
return model.NewLiteral(fmt.Sprintf("cluster(%s, %s, %s)", strconv.Quote(clusterName), strconv.Quote(dbName), strconv.Quote(tableName)))
57+
}
58+
59+
var visitExpr func(expr model.Expr) model.Expr
60+
visitExpr = func(expr model.Expr) model.Expr {
61+
switch e := expr.(type) {
62+
case model.TableRef:
63+
return clusterLiteral(e.DatabaseName, e.Name)
64+
case *model.TableRef:
65+
newVal := clusterLiteral(e.DatabaseName, e.Name)
66+
return &newVal
67+
case *model.SelectCommand:
68+
newSelect := *e
69+
if e.FromClause != nil {
70+
newSelect.FromClause = visitExpr(e.FromClause)
71+
}
72+
return &newSelect
73+
case model.SelectCommand:
74+
newSelect := e
75+
if e.FromClause != nil {
76+
newSelect.FromClause = visitExpr(e.FromClause)
77+
}
78+
return newSelect
79+
default:
80+
return expr
81+
}
82+
}
83+
84+
if query.SelectCommand.FromClause != nil {
85+
query.SelectCommand.FromClause = visitExpr(query.SelectCommand.FromClause)
86+
}
87+
88+
return query, nil
89+
}
90+
4891
func (s *SchemaCheckPass) applyBooleanLiteralLowering(index schema.Schema, query *model.Query) (*model.Query, error) {
4992

5093
visitor := model.NewBaseVisitor()
@@ -1082,7 +1125,7 @@ func (s *SchemaCheckPass) Transform(plan *model.ExecutionPlan) (*model.Execution
10821125
{TransformationName: "MapTransformation", Transformation: s.applyMapTransformations},
10831126
{TransformationName: "MatchOperatorTransformation", Transformation: s.applyMatchOperator},
10841127
{TransformationName: "AggOverUnsupportedType", Transformation: s.checkAggOverUnsupportedType},
1085-
{TransformationName: "ClusterFunction", Transformation: s.applyFromClusterExpression},
1128+
{TransformationName: "ApplySelectFromCluster", Transformation: s.ApplySelectFromCluster},
10861129

10871130
// Section 4: compensations and checks
10881131
{TransformationName: "BooleanLiteralTransformation", Transformation: s.applyBooleanLiteralLowering},
@@ -1280,27 +1323,3 @@ func (s *SchemaCheckPass) applyMatchOperator(indexSchema schema.Schema, query *m
12801323
return query, nil
12811324

12821325
}
1283-
1284-
// applyFromClusterExpression transforms query so that `FROM table` becomes `FROM cluster(clusterName,table)` if applicable
1285-
func (s *SchemaCheckPass) applyFromClusterExpression(currentSchema schema.Schema, query *model.Query) (*model.Query, error) {
1286-
if s.cfg.ClusterName == "" {
1287-
return query, nil
1288-
}
1289-
visitor := model.NewBaseVisitor()
1290-
table, ok := s.tableDiscovery.TableDefinitions().Load(query.TableName)
1291-
if !ok {
1292-
return nil, fmt.Errorf("table %s not found", query.TableName)
1293-
}
1294-
if !table.ExistsOnAllNodes {
1295-
return query, nil
1296-
}
1297-
visitor.OverrideVisitTableRef = func(b *model.BaseExprVisitor, e model.TableRef) interface{} {
1298-
return model.NewFunction("cluster", model.NewLiteral(s.cfg.ClusterName), e)
1299-
}
1300-
logger.Debug().Msgf("applyClusterFunction: %s", s.cfg.ClusterName)
1301-
expr := query.SelectCommand.Accept(visitor)
1302-
if _, ok := expr.(*model.SelectCommand); ok {
1303-
query.SelectCommand = *expr.(*model.SelectCommand)
1304-
}
1305-
return query, nil
1306-
}

platform/frontend_connectors/schema_transformer_test.go

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package frontend_connectors
44

55
import (
6+
"fmt"
67
"github.com/QuesmaOrg/quesma/platform/clickhouse"
78
"github.com/QuesmaOrg/quesma/platform/common_table"
89
"github.com/QuesmaOrg/quesma/platform/config"
@@ -1868,7 +1869,7 @@ func Test_cluster(t *testing.T) {
18681869
expected: &model.Query{
18691870
TableName: "kibana_sample_data_ecommerce",
18701871
SelectCommand: model.SelectCommand{
1871-
FromClause: model.NewFunction("cluster", model.NewLiteral(clusterName), model.NewLiteral("kibana_sample_data_ecommerce")),
1872+
FromClause: model.NewLiteral(fmt.Sprintf("cluster(%s, %s, %s)", strconv.Quote(clusterName), strconv.Quote(""), strconv.Quote("kibana_sample_data_ecommerce"))),
18721873
Columns: []model.Expr{model.NewColumnRef("@timestamp"), model.NewColumnRef("order_date"), model.NewColumnRef("taxful_total_price")},
18731874
},
18741875
},
@@ -2152,3 +2153,100 @@ func Test_acceptIntsAsTimestamps(t *testing.T) {
21522153
})
21532154
}
21542155
}
2156+
2157+
func TestApplySelectFromCluster(t *testing.T) {
2158+
originalTableReference := model.NewTableRefWithDatabaseName("test_table", "test_db")
2159+
expectedTableReference := model.NewLiteral(`cluster("my_cluster", "test_db", "test_table")`)
2160+
cfg := &config.QuesmaConfiguration{
2161+
ClusterName: "my_cluster",
2162+
}
2163+
2164+
tests := []struct {
2165+
name string
2166+
query *model.Query
2167+
expected *model.Query
2168+
}{
2169+
{
2170+
name: "simple FROM <tableName> substitution",
2171+
query: &model.Query{
2172+
TableName: "test_table",
2173+
SelectCommand: model.SelectCommand{
2174+
FromClause: originalTableReference,
2175+
},
2176+
},
2177+
expected: &model.Query{
2178+
TableName: "test_table",
2179+
SelectCommand: model.SelectCommand{
2180+
FromClause: expectedTableReference,
2181+
},
2182+
},
2183+
},
2184+
{
2185+
name: "nested FROM (SELECT ...) substitution",
2186+
query: &model.Query{
2187+
SelectCommand: model.SelectCommand{
2188+
Columns: []model.Expr{model.NewFunction("sum", model.NewColumnRef("FirstColumn"))},
2189+
FromClause: model.SelectCommand{
2190+
Columns: []model.Expr{model.NewFunction("sum", model.NewColumnRef("SecondColumn"))},
2191+
FromClause: model.SelectCommand{
2192+
Columns: []model.Expr{model.NewFunction("sum", model.NewColumnRef("ThirdColumn"))},
2193+
FromClause: originalTableReference,
2194+
WhereClause: model.NewInfixExpr(
2195+
model.NewColumnRef("ThirdColumn"),
2196+
">=",
2197+
model.NewLiteral(50),
2198+
),
2199+
},
2200+
WhereClause: model.NewInfixExpr(
2201+
model.NewColumnRef("SecondColumn"),
2202+
">=",
2203+
model.NewLiteral(50),
2204+
),
2205+
},
2206+
WhereClause: model.NewInfixExpr(
2207+
model.NewColumnRef("FirstColumn"),
2208+
">=",
2209+
model.NewLiteral(50),
2210+
),
2211+
},
2212+
},
2213+
expected: &model.Query{
2214+
SelectCommand: model.SelectCommand{
2215+
Columns: []model.Expr{model.NewFunction("sum", model.NewColumnRef("FirstColumn"))},
2216+
FromClause: model.SelectCommand{
2217+
Columns: []model.Expr{model.NewFunction("sum", model.NewColumnRef("SecondColumn"))},
2218+
FromClause: model.SelectCommand{
2219+
Columns: []model.Expr{model.NewFunction("sum", model.NewColumnRef("ThirdColumn"))},
2220+
FromClause: expectedTableReference,
2221+
WhereClause: model.NewInfixExpr(
2222+
model.NewColumnRef("ThirdColumn"),
2223+
">=",
2224+
model.NewLiteral(50),
2225+
),
2226+
},
2227+
WhereClause: model.NewInfixExpr(
2228+
model.NewColumnRef("SecondColumn"),
2229+
">=",
2230+
model.NewLiteral(50),
2231+
),
2232+
},
2233+
WhereClause: model.NewInfixExpr(
2234+
model.NewColumnRef("FirstColumn"),
2235+
">=",
2236+
model.NewLiteral(50),
2237+
),
2238+
},
2239+
},
2240+
},
2241+
}
2242+
2243+
transform := NewSchemaCheckPass(cfg, nil, defaultSearchAfterStrategy)
2244+
2245+
for _, tt := range tests {
2246+
t.Run(tt.name, func(t *testing.T) {
2247+
result, err := transform.ApplySelectFromCluster(schema.Schema{}, tt.query)
2248+
assert.NoError(t, err)
2249+
assert.Equal(t, tt.expected, result)
2250+
})
2251+
}
2252+
}

0 commit comments

Comments
 (0)