Skip to content

Cluster mode for clickhouse connector #25447

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/src/main/sphinx/connector/clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ driver documentation](https://clickhouse.com/docs/en/interfaces/jdbc/)
```{include} jdbc-authentication.fragment
```

### Cluster mode

If the clickhouse connected by `connection-url` belongs to the cluster mode, it is recommended to add this item, so that the ddl statement will be executed on all nodes of the cluster by default,
which will be as convenient as the stand-alone clickhouse.

```properties
cluster-name=examplecluster
```

### Multiple ClickHouse servers

If you have multiple ClickHouse servers you need to configure one
Expand Down
5 changes: 5 additions & 0 deletions plugin/trino-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
<artifactId>jakarta.annotation-api</artifactId>
</dependency>

<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,20 +211,23 @@ public class ClickHouseClient

private final ConnectorExpressionRewriter<ParameterizedExpression> connectorExpressionRewriter;
private final AggregateFunctionRewriter<JdbcExpression, ?> aggregateFunctionRewriter;
private final Optional<String> clusterName;
private final Type uuidType;
private final Type ipAddressType;
private final AtomicReference<ClickHouseVersion> clickHouseVersion = new AtomicReference<>();

@Inject
public ClickHouseClient(
BaseJdbcConfig config,
ClickHouseConfig config,
BaseJdbcConfig baseJdbcConfig,
ConnectionFactory connectionFactory,
QueryBuilder queryBuilder,
TypeManager typeManager,
IdentifierMapping identifierMapping,
RemoteQueryModifier queryModifier)
{
super("\"", connectionFactory, queryBuilder, config.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false);
super("\"", connectionFactory, queryBuilder, baseJdbcConfig.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false);
this.clusterName = config.getClusterName();
this.uuidType = typeManager.getType(new TypeSignature(StandardTypes.UUID));
this.ipAddressType = typeManager.getType(new TypeSignature(StandardTypes.IPADDRESS));
JdbcTypeHandle bigintTypeHandle = new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
Expand Down Expand Up @@ -253,6 +256,11 @@ public ClickHouseClient(
.build());
}

private String getClusterInfo()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding ON CLUSTER to all queries is error-prone. ClickHouse doesn't support configuring the cluster at connection level with JDBC property or something?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://clickhouse.com/docs/sql-reference/distributed-ddl

But yes, it's not that simple, as you can read on https://clickhouse.com/docs/engines/table-engines/special/distributed

In short, you have to create/alter tables on each node using the ON CLUSTER ... , and then create a "virtual" distributed table using the "real" tables as data source.

It may be required to add this distributed table when creating the real ones, I'm not (yet) used to clickhouse cluster mode.

{
return clusterName.map(" ON CLUSTER %s "::formatted).orElse("");
}

@Override
public Optional<JdbcExpression> implementAggregation(ConnectorSession session, AggregateFunction aggregate, Map<String, ColumnHandle> assignments)
{
Expand Down Expand Up @@ -340,8 +348,9 @@ protected void copyTableSchema(ConnectorSession session, Connection connection,
// 1. create table tbl as tbl2
// 2. create table tbl1 ENGINE=<engine> as select * from tbl2
String sql = format(
"CREATE TABLE %s AS %s ",
"CREATE TABLE %s %s AS %s ",
quoted(null, schemaName, newTableName),
getClusterInfo(),
quoted(null, schemaName, tableName));
try {
execute(session, connection, sql);
Expand Down Expand Up @@ -396,7 +405,7 @@ protected List<String> createTableSqls(RemoteTableName remoteTableName, List<Str
ClickHouseTableProperties.getSampleBy(tableProperties).ifPresent(value -> tableOptions.add("SAMPLE BY " + quoted(value)));
tableMetadata.getComment().ifPresent(comment -> tableOptions.add(format("COMMENT %s", clickhouseVarcharLiteral(comment))));

return ImmutableList.of(format("CREATE TABLE %s (%s) %s", quoted(remoteTableName), join(", ", columns), join(" ", tableOptions.build())));
return ImmutableList.of(format("CREATE TABLE %s %s (%s) %s", quoted(remoteTableName), getClusterInfo(), join(", ", columns), join(" ", tableOptions.build())));
}

@Override
Expand Down Expand Up @@ -461,8 +470,9 @@ public void setTableProperties(ConnectorSession session, JdbcTableHandle handle,

try (Connection connection = connectionFactory.openConnection(session)) {
String sql = format(
"ALTER TABLE %s MODIFY %s",
"ALTER TABLE %s %s MODIFY %s",
quoted(handle.asPlainTable().getRemoteTableName()),
getClusterInfo(),
join(" ", tableOptions.build()));
execute(session, connection, sql);
}
Expand Down Expand Up @@ -495,7 +505,7 @@ protected String getColumnDefinitionSql(ConnectorSession session, ColumnMetadata
protected void createSchema(ConnectorSession session, Connection connection, String remoteSchemaName)
throws SQLException
{
execute(session, connection, "CREATE DATABASE " + quoted(remoteSchemaName));
execute(session, connection, "CREATE DATABASE " + quoted(remoteSchemaName) + getClusterInfo());
}

@Override
Expand All @@ -510,14 +520,14 @@ protected void dropSchema(ConnectorSession session, Connection connection, Strin
}
}
}
execute(session, connection, "DROP DATABASE " + quoted(remoteSchemaName));
execute(session, connection, "DROP DATABASE " + quoted(remoteSchemaName) + getClusterInfo());
}

@Override
protected void renameSchema(ConnectorSession session, Connection connection, String remoteSchemaName, String newRemoteSchemaName)
throws SQLException
{
execute(session, connection, "RENAME DATABASE " + quoted(remoteSchemaName) + " TO " + quoted(newRemoteSchemaName));
execute(session, connection, "RENAME DATABASE " + quoted(remoteSchemaName) + " TO " + quoted(newRemoteSchemaName) + getClusterInfo());
}

@Override
Expand All @@ -535,8 +545,9 @@ private void addColumn(ConnectorSession session, RemoteTableName table, ColumnMe
try (Connection connection = connectionFactory.openConnection(session)) {
String remoteColumnName = getIdentifierMapping().toRemoteColumnName(getRemoteIdentifiers(connection), column.getName());
String sql = format(
"ALTER TABLE %s ADD COLUMN %s",
"ALTER TABLE %s %s ADD COLUMN %s",
quoted(table),
getClusterInfo(),
getColumnDefinitionSql(session, column, remoteColumnName));
execute(session, connection, sql);
}
Expand All @@ -545,12 +556,42 @@ private void addColumn(ConnectorSession session, RemoteTableName table, ColumnMe
}
}

@Override
protected void renameColumn(ConnectorSession session, Connection connection, RemoteTableName remoteTableName, String remoteColumnName, String newRemoteColumnName)
throws SQLException
{
execute(session, connection, format(
"ALTER TABLE %s RENAME COLUMN %s TO %s %s",
quoted(remoteTableName),
quoted(remoteColumnName),
quoted(newRemoteColumnName),
getClusterInfo()));
}

@Override
public void dropColumn(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column)
{
try (Connection connection = connectionFactory.openConnection(session)) {
String remoteColumnName = getIdentifierMapping().toRemoteColumnName(getRemoteIdentifiers(connection), column.getColumnName());
String sql = format(
"ALTER TABLE %s %s DROP COLUMN %s",
quoted(handle.asPlainTable().getRemoteTableName()),
getClusterInfo(),
quoted(remoteColumnName));
execute(session, connection, sql);
}
catch (SQLException e) {
throw new TrinoException(JDBC_ERROR, e);
}
}

@Override
public void setTableComment(ConnectorSession session, JdbcTableHandle handle, Optional<String> comment)
{
String sql = format(
"ALTER TABLE %s MODIFY COMMENT %s",
"ALTER TABLE %s %s MODIFY COMMENT %s",
quoted(handle.asPlainTable().getRemoteTableName()),
getClusterInfo(),
clickhouseVarcharLiteral(comment.orElse(NO_COMMENT)));
execute(session, sql);
}
Expand All @@ -559,8 +600,9 @@ public void setTableComment(ConnectorSession session, JdbcTableHandle handle, Op
public void setColumnComment(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column, Optional<String> comment)
{
String sql = format(
"ALTER TABLE %s COMMENT COLUMN %s %s",
"ALTER TABLE %s %s COMMENT COLUMN %s %s",
quoted(handle.asPlainTable().getRemoteTableName()),
getClusterInfo(),
quoted(column.getColumnName()),
clickhouseVarcharLiteral(comment.orElse("")));
execute(session, sql);
Expand Down Expand Up @@ -594,11 +636,19 @@ protected Optional<List<String>> getTableTypes()
protected void renameTable(ConnectorSession session, Connection connection, String catalogName, String remoteSchemaName, String remoteTableName, String newRemoteSchemaName, String newRemoteTableName)
throws SQLException
{
execute(session, connection, format("RENAME TABLE %s TO %s",
execute(session, connection, format("RENAME TABLE %s %s TO %s",
quoted(catalogName, remoteSchemaName, remoteTableName),
getClusterInfo(),
quoted(catalogName, newRemoteSchemaName, newRemoteTableName)));
}

@Override
protected void dropTable(ConnectorSession session, RemoteTableName remoteTableName, boolean temporaryTable)
{
String sql = "DROP TABLE " + quoted(remoteTableName) + getClusterInfo();
execute(session, sql);
}

@Override
protected Optional<BiFunction<String, Long, String>> limitFunction()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import jakarta.validation.constraints.NotNull;

import java.util.Optional;

@DefunctConfig("clickhouse.legacy-driver")
public class ClickHouseConfig
Expand All @@ -35,4 +38,20 @@ public ClickHouseConfig setMapStringAsVarchar(boolean mapStringAsVarchar)
this.mapStringAsVarchar = mapStringAsVarchar;
return this;
}

private Optional<String> clusterName = Optional.empty();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move under mapStringAsVarchar.


@NotNull
public Optional<String> getClusterName()
{
return clusterName;
}

@Config("clickhouse.cluster-name")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should test all statements with this property. TestClickHouseConfig is insufficient.

@ConfigDescription("ClickHouse cluster name")
public ClickHouseConfig setClusterName(String clusterName)
{
this.clusterName = Optional.ofNullable(clusterName);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,20 @@ public class TestClickHouseConfig
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(ClickHouseConfig.class)
.setMapStringAsVarchar(false));
.setMapStringAsVarchar(false)
.setClusterName(null));
}

@Test
public void testExplicitPropertyMappings()
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("clickhouse.map-string-as-varchar", "true")
.put("clickhouse.cluster-name", "test")
.buildOrThrow();

ClickHouseConfig expected = new ClickHouseConfig()
.setClusterName("test")
.setMapStringAsVarchar(true);

assertFullMapping(properties, expected);
Expand Down