Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 6d7bbbe

Browse files
pdelewskimieciu
andauthored
Support for cluster function as table expression (#1408)
This PR is about supporting `FROM cluster(<name>, <table>)` --------- Signed-off-by: Przemyslaw Delewski <[email protected]> Co-authored-by: Przemysław Hejman <[email protected]>
1 parent 2deec42 commit 6d7bbbe

File tree

5 files changed

+212
-3
lines changed

5 files changed

+212
-3
lines changed

platform/clickhouse/clickhouse.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ type discoveredTable struct {
137137
createTableQuery string
138138
timestampFieldName string
139139
virtualTable bool
140+
existsOnAllNodes bool // => table can be queried using `FROM cluster(<clusterName>,<tableName>)` syntax
140141
}
141142

142143
func (lm *LogManager) ReloadTables() {

platform/clickhouse/table.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ type Table struct {
2828

2929
DiscoveredTimestampFieldName *string
3030

31-
VirtualTable bool
31+
VirtualTable bool
32+
ExistsOnAllNodes bool
3233
}
3334

3435
func (t *Table) createTableOurFieldsString() []string {

platform/clickhouse/table_discovery.go

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,17 @@ func (td *tableDiscovery) ReloadTableDefinitions() {
217217
configuredTables = td.configureTables(tables, databaseName)
218218
}
219219
}
220+
var tablePresence map[string][]TablePresence
221+
if td.cfg.ClusterName != "" {
222+
var err error
223+
tablePresence, err = td.getTablePresenceAcrossClusters(databaseName)
224+
if err != nil {
225+
logger.Warn().Msgf("could not get table presence across clusters: %v", err)
226+
}
227+
logger.Info().Msgf("Table presence across clusters: %v", tablePresence)
228+
configuredTables = td.populateClusterNodes(configuredTables, databaseName, tablePresence)
229+
}
230+
220231
configuredTables = td.readVirtualTables(configuredTables)
221232

222233
td.ReloadTablesError = nil
@@ -328,7 +339,7 @@ func (td *tableDiscovery) configureTables(tables map[string]map[string]columnMet
328339
comment := td.tableComment(databaseName, table)
329340
createTableQuery := td.createTableQuery(databaseName, table)
330341
// we assume here that @timestamp field is always present in the table, or it's explicitly configured
331-
configuredTables[table] = discoveredTable{table, databaseName, columns, indexConfig, comment, createTableQuery, "", false}
342+
configuredTables[table] = discoveredTable{table, databaseName, columns, indexConfig, comment, createTableQuery, "", false, false}
332343
}
333344
} else {
334345
notConfiguredTables = append(notConfiguredTables, table)
@@ -358,7 +369,7 @@ func (td *tableDiscovery) autoConfigureTables(tables map[string]map[string]colum
358369
maybeTimestampField = td.tableTimestampField(databaseName, table, ClickHouse)
359370
}
360371
const isVirtualTable = false
361-
configuredTables[table] = discoveredTable{table, databaseName, columns, config.IndexConfiguration{}, comment, createTableQuery, maybeTimestampField, isVirtualTable}
372+
configuredTables[table] = discoveredTable{table, databaseName, columns, config.IndexConfiguration{}, comment, createTableQuery, maybeTimestampField, isVirtualTable, false}
362373

363374
}
364375
for tableName, table := range configuredTables {
@@ -415,6 +426,7 @@ func (td *tableDiscovery) populateTableDefinitions(configuredTables map[string]d
415426
CreateTableQuery: resTable.createTableQuery,
416427
DiscoveredTimestampFieldName: timestampFieldName,
417428
VirtualTable: resTable.virtualTable,
429+
ExistsOnAllNodes: resTable.existsOnAllNodes,
418430
}
419431
if containsAttributes(resTable.columnTypes) {
420432
table.Config.Attributes = []Attribute{NewDefaultStringAttribute()}
@@ -699,6 +711,90 @@ func (td *tableDiscovery) enrichTableWithMapFields(inputTable map[string]map[str
699711
return outputTable
700712
}
701713

714+
type TablePresence struct {
715+
Database string
716+
Table string
717+
FoundNodes int
718+
TotalNodes int
719+
ExistsOnAllNodes bool
720+
}
721+
722+
func (td *tableDiscovery) getTablePresenceAcrossClusters(database string) (map[string][]TablePresence, error) {
723+
// Step 1: Get all cluster names
724+
clusterQuery := `SELECT DISTINCT cluster FROM system.clusters ORDER BY cluster`
725+
rows, err := td.dbConnPool.Query(context.Background(), clusterQuery)
726+
if err != nil {
727+
return nil, fmt.Errorf("failed to query cluster list: %w", err)
728+
}
729+
defer rows.Close()
730+
731+
var clusters []string
732+
for rows.Next() {
733+
var cluster string
734+
if err := rows.Scan(&cluster); err != nil {
735+
return nil, fmt.Errorf("failed to scan cluster name: %w", err)
736+
}
737+
clusters = append(clusters, cluster)
738+
}
739+
740+
// Step 2: For each cluster, safely query for the given database
741+
presenceData := make(map[string][]TablePresence)
742+
743+
for _, cluster := range clusters {
744+
query := `
745+
WITH (
746+
SELECT count(DISTINCT host_name)
747+
FROM system.clusters
748+
WHERE cluster = ?
749+
) AS total_nodes
750+
751+
SELECT
752+
database,
753+
name AS table_name,
754+
count(DISTINCT hostName()) AS found_nodes,
755+
total_nodes,
756+
count(DISTINCT hostName()) = total_nodes AS exists_on_all_nodes
757+
FROM cluster(?, system.tables)
758+
WHERE database = ?
759+
GROUP BY database, name, total_nodes
760+
`
761+
762+
rows, err := td.dbConnPool.Query(context.Background(), query, cluster, cluster, database)
763+
if err != nil {
764+
return nil, fmt.Errorf("failed to query tables for cluster %s: %w", cluster, err)
765+
}
766+
defer rows.Close()
767+
768+
var tables []TablePresence
769+
for rows.Next() {
770+
var tp TablePresence
771+
if err := rows.Scan(&tp.Database, &tp.Table, &tp.FoundNodes, &tp.TotalNodes, &tp.ExistsOnAllNodes); err != nil {
772+
return nil, fmt.Errorf("failed to scan table row: %w", err)
773+
}
774+
tables = append(tables, tp)
775+
}
776+
777+
if len(tables) > 0 {
778+
presenceData[cluster] = tables
779+
}
780+
}
781+
782+
return presenceData, nil
783+
}
784+
785+
func (td *tableDiscovery) populateClusterNodes(configuredTables map[string]discoveredTable, databaseName string, tablePresence map[string][]TablePresence) map[string]discoveredTable {
786+
for _, tables := range tablePresence {
787+
for _, table := range tables {
788+
if table.Database == databaseName {
789+
if discoTable, ok := configuredTables[table.Table]; ok {
790+
discoTable.existsOnAllNodes = table.ExistsOnAllNodes
791+
}
792+
}
793+
}
794+
}
795+
return configuredTables
796+
}
797+
702798
func (td *tableDiscovery) readTables(database string) (map[string]map[string]columnMetadata, error) {
703799
logger.Debug().Msgf("describing tables: %s", database)
704800

platform/frontend_connectors/schema_transformer.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,6 +1008,7 @@ func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, err
10081008
{TransformationName: "MapTransformation", Transformation: s.applyMapTransformations},
10091009
{TransformationName: "MatchOperatorTransformation", Transformation: s.applyMatchOperator},
10101010
{TransformationName: "AggOverUnsupportedType", Transformation: s.checkAggOverUnsupportedType},
1011+
{TransformationName: "ClusterFunction", Transformation: s.applyFromClusterExpression},
10111012

10121013
// Section 4: compensations and checks
10131014
{TransformationName: "BooleanLiteralTransformation", Transformation: s.applyBooleanLiteralLowering},
@@ -1192,3 +1193,27 @@ func (s *SchemaCheckPass) applyMatchOperator(indexSchema schema.Schema, query *m
11921193
return query, nil
11931194

11941195
}
1196+
1197+
// applyFromClusterExpression transforms query so that `FROM table` becomes `FROM cluster(clusterName,table)` if applicable
1198+
func (s *SchemaCheckPass) applyFromClusterExpression(currentSchema schema.Schema, query *model.Query) (*model.Query, error) {
1199+
if s.cfg.ClusterName == "" {
1200+
return query, nil
1201+
}
1202+
visitor := model.NewBaseVisitor()
1203+
table, ok := s.tableDiscovery.TableDefinitions().Load(query.TableName)
1204+
if !ok {
1205+
return nil, fmt.Errorf("table %s not found", query.TableName)
1206+
}
1207+
if !table.ExistsOnAllNodes {
1208+
return query, nil
1209+
}
1210+
visitor.OverrideVisitTableRef = func(b *model.BaseExprVisitor, e model.TableRef) interface{} {
1211+
return model.NewFunction("cluster", model.NewLiteral(s.cfg.ClusterName), e)
1212+
}
1213+
logger.Debug().Msgf("applyClusterFunction: %s", s.cfg.ClusterName)
1214+
expr := query.SelectCommand.Accept(visitor)
1215+
if _, ok := expr.(*model.SelectCommand); ok {
1216+
query.SelectCommand = *expr.(*model.SelectCommand)
1217+
}
1218+
return query, nil
1219+
}

platform/frontend_connectors/schema_transformer_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1764,3 +1764,89 @@ func Test_mapKeys(t *testing.T) {
17641764
}
17651765

17661766
}
1767+
1768+
func Test_cluster(t *testing.T) {
1769+
indexConfig := map[string]config.IndexConfiguration{
1770+
"kibana_sample_data_ecommerce": {},
1771+
}
1772+
fields := map[schema.FieldName]schema.Field{
1773+
"@timestamp": {PropertyName: "@timestamp", InternalPropertyName: "@timestamp", InternalPropertyType: "DateTime64", Type: schema.QuesmaTypeDate},
1774+
"order_date": {PropertyName: "order_date", InternalPropertyName: "order_date", InternalPropertyType: "DateTime64", Type: schema.QuesmaTypeDate},
1775+
"taxful_total_price": {PropertyName: "taxful_total_price", InternalPropertyName: "taxful_total_price", InternalPropertyType: "Float64", Type: schema.QuesmaTypeFloat},
1776+
}
1777+
1778+
indexSchema := schema.Schema{
1779+
Fields: fields,
1780+
}
1781+
1782+
tableMap := clickhouse.NewTableMap()
1783+
1784+
tableDiscovery := clickhouse.NewEmptyTableDiscovery()
1785+
tableDiscovery.TableMap = tableMap
1786+
for indexName := range indexConfig {
1787+
table := clickhouse.NewEmptyTable(indexName)
1788+
table.ExistsOnAllNodes = true
1789+
tableMap.Store(indexName, table)
1790+
}
1791+
1792+
clickhouseUrl := &config.Url{
1793+
Scheme: "clickhouse",
1794+
Host: "localhost:9000",
1795+
}
1796+
1797+
clusterName := "my_cluster"
1798+
1799+
clickhouseConnector := config.RelationalDbConfiguration{
1800+
ConnectorType: "clickhouse-os",
1801+
Url: clickhouseUrl,
1802+
ClusterName: clusterName,
1803+
}
1804+
transform := NewSchemaCheckPass(&config.QuesmaConfiguration{IndexConfig: indexConfig, ClickHouse: clickhouseConnector, ClusterName: clusterName}, tableDiscovery, defaultSearchAfterStrategy)
1805+
1806+
tests := []struct {
1807+
name string
1808+
query *model.Query
1809+
expected *model.Query
1810+
}{
1811+
{
1812+
name: "simple array",
1813+
query: &model.Query{
1814+
TableName: "kibana_sample_data_ecommerce",
1815+
SelectCommand: model.SelectCommand{
1816+
FromClause: model.NewTableRef("kibana_sample_data_ecommerce"),
1817+
Columns: []model.Expr{model.NewWildcardExpr},
1818+
},
1819+
},
1820+
expected: &model.Query{
1821+
TableName: "kibana_sample_data_ecommerce",
1822+
SelectCommand: model.SelectCommand{
1823+
FromClause: model.NewFunction("cluster", model.NewLiteral(clusterName), model.NewLiteral("kibana_sample_data_ecommerce")),
1824+
Columns: []model.Expr{model.NewColumnRef("@timestamp"), model.NewColumnRef("order_date"), model.NewColumnRef("taxful_total_price")},
1825+
},
1826+
},
1827+
},
1828+
}
1829+
asString := func(query *model.Query) string {
1830+
return query.SelectCommand.String()
1831+
}
1832+
1833+
for i, tt := range tests {
1834+
t.Run(util.PrettyTestName(tt.name, i), func(t *testing.T) {
1835+
tt.query.Schema = indexSchema
1836+
tt.query.Indexes = []string{tt.query.TableName}
1837+
actual, err := transform.Transform([]*model.Query{tt.query})
1838+
assert.NoError(t, err)
1839+
1840+
if err != nil {
1841+
t.Fatal(err)
1842+
}
1843+
1844+
assert.True(t, len(actual) == 1, "len queries == 1")
1845+
1846+
expectedJson := asString(tt.expected)
1847+
actualJson := asString(actual[0])
1848+
1849+
assert.Equal(t, expectedJson, actualJson)
1850+
})
1851+
}
1852+
}

0 commit comments

Comments
 (0)