@@ -211,20 +211,23 @@ public class ClickHouseClient
211
211
212
212
private final ConnectorExpressionRewriter <ParameterizedExpression > connectorExpressionRewriter ;
213
213
private final AggregateFunctionRewriter <JdbcExpression , ?> aggregateFunctionRewriter ;
214
+ private final Optional <String > clusterName ;
214
215
private final Type uuidType ;
215
216
private final Type ipAddressType ;
216
217
private final AtomicReference <ClickHouseVersion > clickHouseVersion = new AtomicReference <>();
217
218
218
219
@ Inject
219
220
public ClickHouseClient (
220
- BaseJdbcConfig config ,
221
+ ClickHouseConfig config ,
222
+ BaseJdbcConfig baseJdbcConfig ,
221
223
ConnectionFactory connectionFactory ,
222
224
QueryBuilder queryBuilder ,
223
225
TypeManager typeManager ,
224
226
IdentifierMapping identifierMapping ,
225
227
RemoteQueryModifier queryModifier )
226
228
{
227
- super ("\" " , connectionFactory , queryBuilder , config .getJdbcTypesMappedToVarchar (), identifierMapping , queryModifier , false );
229
+ super ("\" " , connectionFactory , queryBuilder , baseJdbcConfig .getJdbcTypesMappedToVarchar (), identifierMapping , queryModifier , false );
230
+ this .clusterName = config .getClusterName ();
228
231
this .uuidType = typeManager .getType (new TypeSignature (StandardTypes .UUID ));
229
232
this .ipAddressType = typeManager .getType (new TypeSignature (StandardTypes .IPADDRESS ));
230
233
JdbcTypeHandle bigintTypeHandle = new JdbcTypeHandle (Types .BIGINT , Optional .of ("bigint" ), Optional .empty (), Optional .empty (), Optional .empty (), Optional .empty ());
@@ -253,6 +256,11 @@ public ClickHouseClient(
253
256
.build ());
254
257
}
255
258
259
+ private String getClusterInfo ()
260
+ {
261
+ return clusterName .map (" ON CLUSTER %s " ::formatted ).orElse ("" );
262
+ }
263
+
256
264
@ Override
257
265
public Optional <JdbcExpression > implementAggregation (ConnectorSession session , AggregateFunction aggregate , Map <String , ColumnHandle > assignments )
258
266
{
@@ -340,8 +348,9 @@ protected void copyTableSchema(ConnectorSession session, Connection connection,
340
348
// 1. create table tbl as tbl2
341
349
// 2. create table tbl1 ENGINE=<engine> as select * from tbl2
342
350
String sql = format (
343
- "CREATE TABLE %s AS %s " ,
351
+ "CREATE TABLE %s %s AS %s " ,
344
352
quoted (null , schemaName , newTableName ),
353
+ getClusterInfo (),
345
354
quoted (null , schemaName , tableName ));
346
355
try {
347
356
execute (session , connection , sql );
@@ -396,7 +405,7 @@ protected List<String> createTableSqls(RemoteTableName remoteTableName, List<Str
396
405
ClickHouseTableProperties .getSampleBy (tableProperties ).ifPresent (value -> tableOptions .add ("SAMPLE BY " + quoted (value )));
397
406
tableMetadata .getComment ().ifPresent (comment -> tableOptions .add (format ("COMMENT %s" , clickhouseVarcharLiteral (comment ))));
398
407
399
- return ImmutableList .of (format ("CREATE TABLE %s (%s) %s" , quoted (remoteTableName ), join (", " , columns ), join (" " , tableOptions .build ())));
408
+ return ImmutableList .of (format ("CREATE TABLE %s %s (%s) %s" , quoted (remoteTableName ), getClusterInfo ( ), join (", " , columns ), join (" " , tableOptions .build ())));
400
409
}
401
410
402
411
@ Override
@@ -461,8 +470,9 @@ public void setTableProperties(ConnectorSession session, JdbcTableHandle handle,
461
470
462
471
try (Connection connection = connectionFactory .openConnection (session )) {
463
472
String sql = format (
464
- "ALTER TABLE %s MODIFY %s" ,
473
+ "ALTER TABLE %s %s MODIFY %s" ,
465
474
quoted (handle .asPlainTable ().getRemoteTableName ()),
475
+ getClusterInfo (),
466
476
join (" " , tableOptions .build ()));
467
477
execute (session , connection , sql );
468
478
}
@@ -495,7 +505,7 @@ protected String getColumnDefinitionSql(ConnectorSession session, ColumnMetadata
495
505
protected void createSchema (ConnectorSession session , Connection connection , String remoteSchemaName )
496
506
throws SQLException
497
507
{
498
- execute (session , connection , "CREATE DATABASE " + quoted (remoteSchemaName ));
508
+ execute (session , connection , "CREATE DATABASE " + quoted (remoteSchemaName ) + getClusterInfo () );
499
509
}
500
510
501
511
@ Override
@@ -510,14 +520,14 @@ protected void dropSchema(ConnectorSession session, Connection connection, Strin
510
520
}
511
521
}
512
522
}
513
- execute (session , connection , "DROP DATABASE " + quoted (remoteSchemaName ));
523
+ execute (session , connection , "DROP DATABASE " + quoted (remoteSchemaName ) + getClusterInfo () );
514
524
}
515
525
516
526
@ Override
517
527
protected void renameSchema (ConnectorSession session , Connection connection , String remoteSchemaName , String newRemoteSchemaName )
518
528
throws SQLException
519
529
{
520
- execute (session , connection , "RENAME DATABASE " + quoted (remoteSchemaName ) + " TO " + quoted (newRemoteSchemaName ));
530
+ execute (session , connection , "RENAME DATABASE " + quoted (remoteSchemaName ) + " TO " + quoted (newRemoteSchemaName ) + getClusterInfo () );
521
531
}
522
532
523
533
@ Override
@@ -535,8 +545,9 @@ private void addColumn(ConnectorSession session, RemoteTableName table, ColumnMe
535
545
try (Connection connection = connectionFactory .openConnection (session )) {
536
546
String remoteColumnName = getIdentifierMapping ().toRemoteColumnName (getRemoteIdentifiers (connection ), column .getName ());
537
547
String sql = format (
538
- "ALTER TABLE %s ADD COLUMN %s" ,
548
+ "ALTER TABLE %s %s ADD COLUMN %s" ,
539
549
quoted (table ),
550
+ getClusterInfo (),
540
551
getColumnDefinitionSql (session , column , remoteColumnName ));
541
552
execute (session , connection , sql );
542
553
}
@@ -545,12 +556,42 @@ private void addColumn(ConnectorSession session, RemoteTableName table, ColumnMe
545
556
}
546
557
}
547
558
559
+ @ Override
560
+ protected void renameColumn (ConnectorSession session , Connection connection , RemoteTableName remoteTableName , String remoteColumnName , String newRemoteColumnName )
561
+ throws SQLException
562
+ {
563
+ execute (session , connection , format (
564
+ "ALTER TABLE %s RENAME COLUMN %s TO %s %s" ,
565
+ quoted (remoteTableName ),
566
+ quoted (remoteColumnName ),
567
+ quoted (newRemoteColumnName ),
568
+ getClusterInfo ()));
569
+ }
570
+
571
+ @ Override
572
+ public void dropColumn (ConnectorSession session , JdbcTableHandle handle , JdbcColumnHandle column )
573
+ {
574
+ try (Connection connection = connectionFactory .openConnection (session )) {
575
+ String remoteColumnName = getIdentifierMapping ().toRemoteColumnName (getRemoteIdentifiers (connection ), column .getColumnName ());
576
+ String sql = format (
577
+ "ALTER TABLE %s %s DROP COLUMN %s" ,
578
+ quoted (handle .asPlainTable ().getRemoteTableName ()),
579
+ getClusterInfo (),
580
+ quoted (remoteColumnName ));
581
+ execute (session , connection , sql );
582
+ }
583
+ catch (SQLException e ) {
584
+ throw new TrinoException (JDBC_ERROR , e );
585
+ }
586
+ }
587
+
548
588
@ Override
549
589
public void setTableComment (ConnectorSession session , JdbcTableHandle handle , Optional <String > comment )
550
590
{
551
591
String sql = format (
552
- "ALTER TABLE %s MODIFY COMMENT %s" ,
592
+ "ALTER TABLE %s %s MODIFY COMMENT %s" ,
553
593
quoted (handle .asPlainTable ().getRemoteTableName ()),
594
+ getClusterInfo (),
554
595
clickhouseVarcharLiteral (comment .orElse (NO_COMMENT )));
555
596
execute (session , sql );
556
597
}
@@ -559,8 +600,9 @@ public void setTableComment(ConnectorSession session, JdbcTableHandle handle, Op
559
600
public void setColumnComment (ConnectorSession session , JdbcTableHandle handle , JdbcColumnHandle column , Optional <String > comment )
560
601
{
561
602
String sql = format (
562
- "ALTER TABLE %s COMMENT COLUMN %s %s" ,
603
+ "ALTER TABLE %s %s COMMENT COLUMN %s %s" ,
563
604
quoted (handle .asPlainTable ().getRemoteTableName ()),
605
+ getClusterInfo (),
564
606
quoted (column .getColumnName ()),
565
607
clickhouseVarcharLiteral (comment .orElse ("" )));
566
608
execute (session , sql );
@@ -594,11 +636,19 @@ protected Optional<List<String>> getTableTypes()
594
636
protected void renameTable (ConnectorSession session , Connection connection , String catalogName , String remoteSchemaName , String remoteTableName , String newRemoteSchemaName , String newRemoteTableName )
595
637
throws SQLException
596
638
{
597
- execute (session , connection , format ("RENAME TABLE %s TO %s" ,
639
+ execute (session , connection , format ("RENAME TABLE %s %s TO %s" ,
598
640
quoted (catalogName , remoteSchemaName , remoteTableName ),
641
+ getClusterInfo (),
599
642
quoted (catalogName , newRemoteSchemaName , newRemoteTableName )));
600
643
}
601
644
645
+ @ Override
646
+ protected void dropTable (ConnectorSession session , RemoteTableName remoteTableName , boolean temporaryTable )
647
+ {
648
+ String sql = "DROP TABLE " + quoted (remoteTableName ) + getClusterInfo ();
649
+ execute (session , sql );
650
+ }
651
+
602
652
@ Override
603
653
protected Optional <BiFunction <String , Long , String >> limitFunction ()
604
654
{
0 commit comments