Skip to content

Commit 4a2c9ce

Browse files
committed
Cluster mode for clickhouse connector
1 parent d64c1c8 commit 4a2c9ce

File tree

7 files changed

+112
-14
lines changed

7 files changed

+112
-14
lines changed

docs/src/main/sphinx/connector/clickhouse.md

+9
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,15 @@ driver documentation](https://clickhouse.com/docs/en/interfaces/jdbc/)
7070
```{include} jdbc-authentication.fragment
7171
```
7272

73+
### Cluster mode
74+
75+
If the clickhouse connected by `connection-url` belongs to the cluster mode, it is recommended to add this item, so that the ddl statement will be executed on all nodes of the cluster by default,
76+
which will be as convenient as the stand-alone clickhouse.
77+
78+
```properties
79+
cluster-name=examplecluster
80+
```
81+
7382
### Multiple ClickHouse servers
7483

7584
If you have multiple ClickHouse servers you need to configure one

plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java

+62-12
Original file line numberDiff line numberDiff line change
@@ -211,20 +211,23 @@ public class ClickHouseClient
211211

212212
private final ConnectorExpressionRewriter<ParameterizedExpression> connectorExpressionRewriter;
213213
private final AggregateFunctionRewriter<JdbcExpression, ?> aggregateFunctionRewriter;
214+
private final Optional<String> clusterName;
214215
private final Type uuidType;
215216
private final Type ipAddressType;
216217
private final AtomicReference<ClickHouseVersion> clickHouseVersion = new AtomicReference<>();
217218

218219
@Inject
219220
public ClickHouseClient(
220-
BaseJdbcConfig config,
221+
ClickHouseConfig config,
222+
BaseJdbcConfig baseJdbcConfig,
221223
ConnectionFactory connectionFactory,
222224
QueryBuilder queryBuilder,
223225
TypeManager typeManager,
224226
IdentifierMapping identifierMapping,
225227
RemoteQueryModifier queryModifier)
226228
{
227-
super("\"", connectionFactory, queryBuilder, config.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false);
229+
super("\"", connectionFactory, queryBuilder, baseJdbcConfig.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false);
230+
this.clusterName = config.getClusterName();
228231
this.uuidType = typeManager.getType(new TypeSignature(StandardTypes.UUID));
229232
this.ipAddressType = typeManager.getType(new TypeSignature(StandardTypes.IPADDRESS));
230233
JdbcTypeHandle bigintTypeHandle = new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
@@ -253,6 +256,11 @@ public ClickHouseClient(
253256
.build());
254257
}
255258

259+
private String getClusterInfo()
260+
{
261+
return clusterName.map(" ON CLUSTER %s "::formatted).orElse("");
262+
}
263+
256264
@Override
257265
public Optional<JdbcExpression> implementAggregation(ConnectorSession session, AggregateFunction aggregate, Map<String, ColumnHandle> assignments)
258266
{
@@ -340,8 +348,9 @@ protected void copyTableSchema(ConnectorSession session, Connection connection,
340348
// 1. create table tbl as tbl2
341349
// 2. create table tbl1 ENGINE=<engine> as select * from tbl2
342350
String sql = format(
343-
"CREATE TABLE %s AS %s ",
351+
"CREATE TABLE %s %s AS %s ",
344352
quoted(null, schemaName, newTableName),
353+
getClusterInfo(),
345354
quoted(null, schemaName, tableName));
346355
try {
347356
execute(session, connection, sql);
@@ -396,7 +405,7 @@ protected List<String> createTableSqls(RemoteTableName remoteTableName, List<Str
396405
ClickHouseTableProperties.getSampleBy(tableProperties).ifPresent(value -> tableOptions.add("SAMPLE BY " + quoted(value)));
397406
tableMetadata.getComment().ifPresent(comment -> tableOptions.add(format("COMMENT %s", clickhouseVarcharLiteral(comment))));
398407

399-
return ImmutableList.of(format("CREATE TABLE %s (%s) %s", quoted(remoteTableName), join(", ", columns), join(" ", tableOptions.build())));
408+
return ImmutableList.of(format("CREATE TABLE %s %s (%s) %s", quoted(remoteTableName), getClusterInfo(), join(", ", columns), join(" ", tableOptions.build())));
400409
}
401410

402411
@Override
@@ -461,8 +470,9 @@ public void setTableProperties(ConnectorSession session, JdbcTableHandle handle,
461470

462471
try (Connection connection = connectionFactory.openConnection(session)) {
463472
String sql = format(
464-
"ALTER TABLE %s MODIFY %s",
473+
"ALTER TABLE %s %s MODIFY %s",
465474
quoted(handle.asPlainTable().getRemoteTableName()),
475+
getClusterInfo(),
466476
join(" ", tableOptions.build()));
467477
execute(session, connection, sql);
468478
}
@@ -495,7 +505,7 @@ protected String getColumnDefinitionSql(ConnectorSession session, ColumnMetadata
495505
protected void createSchema(ConnectorSession session, Connection connection, String remoteSchemaName)
496506
throws SQLException
497507
{
498-
execute(session, connection, "CREATE DATABASE " + quoted(remoteSchemaName));
508+
execute(session, connection, "CREATE DATABASE " + quoted(remoteSchemaName) + getClusterInfo());
499509
}
500510

501511
@Override
@@ -510,14 +520,14 @@ protected void dropSchema(ConnectorSession session, Connection connection, Strin
510520
}
511521
}
512522
}
513-
execute(session, connection, "DROP DATABASE " + quoted(remoteSchemaName));
523+
execute(session, connection, "DROP DATABASE " + quoted(remoteSchemaName) + getClusterInfo());
514524
}
515525

516526
@Override
517527
protected void renameSchema(ConnectorSession session, Connection connection, String remoteSchemaName, String newRemoteSchemaName)
518528
throws SQLException
519529
{
520-
execute(session, connection, "RENAME DATABASE " + quoted(remoteSchemaName) + " TO " + quoted(newRemoteSchemaName));
530+
execute(session, connection, "RENAME DATABASE " + quoted(remoteSchemaName) + " TO " + quoted(newRemoteSchemaName) + getClusterInfo());
521531
}
522532

523533
@Override
@@ -535,8 +545,9 @@ private void addColumn(ConnectorSession session, RemoteTableName table, ColumnMe
535545
try (Connection connection = connectionFactory.openConnection(session)) {
536546
String remoteColumnName = getIdentifierMapping().toRemoteColumnName(getRemoteIdentifiers(connection), column.getName());
537547
String sql = format(
538-
"ALTER TABLE %s ADD COLUMN %s",
548+
"ALTER TABLE %s %s ADD COLUMN %s",
539549
quoted(table),
550+
getClusterInfo(),
540551
getColumnDefinitionSql(session, column, remoteColumnName));
541552
execute(session, connection, sql);
542553
}
@@ -545,12 +556,42 @@ private void addColumn(ConnectorSession session, RemoteTableName table, ColumnMe
545556
}
546557
}
547558

559+
@Override
560+
protected void renameColumn(ConnectorSession session, Connection connection, RemoteTableName remoteTableName, String remoteColumnName, String newRemoteColumnName)
561+
throws SQLException
562+
{
563+
execute(session, connection, format(
564+
"ALTER TABLE %s RENAME COLUMN %s TO %s %s",
565+
quoted(remoteTableName),
566+
quoted(remoteColumnName),
567+
quoted(newRemoteColumnName),
568+
getClusterInfo()));
569+
}
570+
571+
@Override
572+
public void dropColumn(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column)
573+
{
574+
try (Connection connection = connectionFactory.openConnection(session)) {
575+
String remoteColumnName = getIdentifierMapping().toRemoteColumnName(getRemoteIdentifiers(connection), column.getColumnName());
576+
String sql = format(
577+
"ALTER TABLE %s %s DROP COLUMN %s",
578+
quoted(handle.asPlainTable().getRemoteTableName()),
579+
getClusterInfo(),
580+
getColumnDefinitionSql(session, new ColumnMetadata(column.getColumnName(), column.getColumnType()), remoteColumnName));
581+
execute(session, connection, sql);
582+
}
583+
catch (SQLException e) {
584+
throw new TrinoException(JDBC_ERROR, e);
585+
}
586+
}
587+
548588
@Override
549589
public void setTableComment(ConnectorSession session, JdbcTableHandle handle, Optional<String> comment)
550590
{
551591
String sql = format(
552-
"ALTER TABLE %s MODIFY COMMENT %s",
592+
"ALTER TABLE %s %s MODIFY COMMENT %s",
553593
quoted(handle.asPlainTable().getRemoteTableName()),
594+
getClusterInfo(),
554595
clickhouseVarcharLiteral(comment.orElse(NO_COMMENT)));
555596
execute(session, sql);
556597
}
@@ -559,8 +600,9 @@ public void setTableComment(ConnectorSession session, JdbcTableHandle handle, Op
559600
public void setColumnComment(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column, Optional<String> comment)
560601
{
561602
String sql = format(
562-
"ALTER TABLE %s COMMENT COLUMN %s %s",
603+
"ALTER TABLE %s %s COMMENT COLUMN %s %s",
563604
quoted(handle.asPlainTable().getRemoteTableName()),
605+
getClusterInfo(),
564606
quoted(column.getColumnName()),
565607
clickhouseVarcharLiteral(comment.orElse("")));
566608
execute(session, sql);
@@ -594,11 +636,19 @@ protected Optional<List<String>> getTableTypes()
594636
protected void renameTable(ConnectorSession session, Connection connection, String catalogName, String remoteSchemaName, String remoteTableName, String newRemoteSchemaName, String newRemoteTableName)
595637
throws SQLException
596638
{
597-
execute(session, connection, format("RENAME TABLE %s TO %s",
639+
execute(session, connection, format("RENAME TABLE %s %s TO %s",
598640
quoted(catalogName, remoteSchemaName, remoteTableName),
641+
getClusterInfo(),
599642
quoted(catalogName, newRemoteSchemaName, newRemoteTableName)));
600643
}
601644

645+
@Override
646+
protected void dropTable(ConnectorSession session, RemoteTableName remoteTableName, boolean temporaryTable)
647+
{
648+
String sql = "DROP TABLE " + quoted(remoteTableName) + getClusterInfo();
649+
execute(session, sql);
650+
}
651+
602652
@Override
603653
protected Optional<BiFunction<String, Long, String>> limitFunction()
604654
{

plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseConfig.java

+20
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
*/
1414
package io.trino.plugin.clickhouse;
1515

16+
import java.util.Optional;
17+
import jakarta.validation.constraints.NotNull;
18+
1619
import io.airlift.configuration.Config;
1720
import io.airlift.configuration.ConfigDescription;
1821
import io.airlift.configuration.DefunctConfig;
@@ -35,4 +38,21 @@ public ClickHouseConfig setMapStringAsVarchar(boolean mapStringAsVarchar)
3538
this.mapStringAsVarchar = mapStringAsVarchar;
3639
return this;
3740
}
41+
42+
private Optional<String> clusterName = Optional.empty();
43+
44+
@NotNull
45+
public Optional<String> getClusterName()
46+
{
47+
return clusterName;
48+
}
49+
50+
@Config("clickhouse.cluster-name")
51+
@ConfigDescription("ClickHouse cluster name")
52+
public ClickHouseConfig setClusterName(String clusterName)
53+
{
54+
this.clusterName = Optional.ofNullable(clusterName);
55+
return this;
56+
}
57+
3858
}

plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConfig.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ public class TestClickHouseConfig
4242
public void testDefaults()
4343
{
4444
assertRecordedDefaults(recordDefaults(ClickHouseConfig.class)
45-
.setMapStringAsVarchar(false));
45+
.setMapStringAsVarchar(false)
46+
.setClusterName(null));
4647
}
4748

4849
@Test
@@ -53,7 +54,8 @@ public void testExplicitPropertyMappings()
5354
.buildOrThrow();
5455

5556
ClickHouseConfig expected = new ClickHouseConfig()
56-
.setMapStringAsVarchar(true);
57+
.setMapStringAsVarchar(true)
58+
.setClusterName("example-cluster");
5759

5860
assertFullMapping(properties, expected);
5961
}

plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHousePlugin.java

+15
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,19 @@ public void testCreateConnector()
3636
new TestingConnectorContext())
3737
.shutdown();
3838
}
39+
40+
@Test
41+
public void testCreateClusterConnector()
42+
{
43+
Plugin plugin = new ClickHousePlugin();
44+
ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories());
45+
factory.create(
46+
"test",
47+
ImmutableMap.of(
48+
"connection-url", "jdbc:clickhouse://test",
49+
"cluster-name", "test_cluster",
50+
"bootstrap.quiet", "true"),
51+
new TestingConnectorContext())
52+
.shutdown();
53+
}
3954
}

testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/multinode-all/clickhouse.properties

+1
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ connector.name=clickhouse
22
connection-url=jdbc:clickhouse://host1.invalid:8123/
33
connection-user=exampleuser
44
connection-password=examplepassword
5+
cluster-name=test_cluster

testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/multinode-clickhouse/clickhouse.properties

+1
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,4 @@ connector.name=clickhouse
22
connection-url=jdbc:clickhouse://clickhouse:8124/
33
connection-user=test
44
connection-password=test
5+
cluster-name=test_cluster

0 commit comments

Comments
 (0)