2323import com .starrocks .connector .flink .catalog .StarRocksCatalog ;
2424import com .starrocks .connector .flink .catalog .StarRocksCatalogException ;
2525import com .starrocks .connector .flink .catalog .StarRocksColumn ;
26+ import com .starrocks .connector .flink .catalog .StarRocksTable ;
2627import org .slf4j .Logger ;
2728import org .slf4j .LoggerFactory ;
2829
2930import java .lang .reflect .InvocationTargetException ;
3031import java .lang .reflect .Method ;
32+ import java .util .List ;
3133import java .util .Optional ;
34+ import java .util .stream .Collectors ;
3235
3336/** An enriched {@code StarRocksCatalog} with more schema evolution abilities. */
3437public class StarRocksEnrichedCatalog extends StarRocksCatalog {
@@ -38,6 +41,70 @@ public StarRocksEnrichedCatalog(String jdbcUrl, String username, String password
3841
3942 private static final Logger LOG = LoggerFactory .getLogger (StarRocksEnrichedCatalog .class );
4043
44+ @ Override
45+ public void createTable (StarRocksTable table , boolean ignoreIfExists )
46+ throws StarRocksCatalogException {
47+ String createTableSql = buildCreateTableSql (table , ignoreIfExists );
48+ try {
49+ executeUpdateStatement (createTableSql );
50+ LOG .info (
51+ "Success to create table {}.{}, sql: {}" ,
52+ table .getDatabaseName (),
53+ table .getDatabaseName (),
54+ createTableSql );
55+ } catch (Exception e ) {
56+ LOG .error (
57+ "Failed to create table {}.{}, sql: {}" ,
58+ table .getDatabaseName (),
59+ table .getDatabaseName (),
60+ createTableSql ,
61+ e );
62+ throw new StarRocksCatalogException (
63+ String .format (
64+ "Failed to create table %s.%s" ,
65+ table .getDatabaseName (), table .getDatabaseName ()),
66+ e );
67+ }
68+ }
69+
70+ @ Override
71+ public void alterAddColumns (
72+ String databaseName ,
73+ String tableName ,
74+ List <StarRocksColumn > addColumns ,
75+ long timeoutSecond )
76+ throws StarRocksCatalogException {
77+ Preconditions .checkArgument (
78+ !StringUtils .isNullOrWhitespaceOnly (databaseName ),
79+ "database name cannot be null or empty." );
80+ Preconditions .checkArgument (
81+ !StringUtils .isNullOrWhitespaceOnly (tableName ),
82+ "table name cannot be null or empty." );
83+ Preconditions .checkArgument (!addColumns .isEmpty (), "Added columns should not be empty." );
84+
85+ String alterSql =
86+ buildAlterAddColumnsSql (databaseName , tableName , addColumns , timeoutSecond );
87+ try {
88+ long startTimeMillis = System .currentTimeMillis ();
89+ executeAlter (databaseName , tableName , alterSql , timeoutSecond );
90+ LOG .info (
91+ "Success to add columns to {}.{}, duration: {}ms, sql: {}" ,
92+ databaseName ,
93+ tableName ,
94+ System .currentTimeMillis () - startTimeMillis ,
95+ alterSql );
96+ } catch (Exception e ) {
97+ LOG .error (
98+ "Failed to add columns to {}.{}, sql: {}" ,
99+ databaseName ,
100+ tableName ,
101+ alterSql ,
102+ e );
103+ throw new StarRocksCatalogException (
104+ String .format ("Failed to add columns to %s.%s " , databaseName , tableName ), e );
105+ }
106+ }
107+
41108 public void truncateTable (String databaseName , String tableName )
42109 throws StarRocksCatalogException {
43110 checkTableArgument (databaseName , tableName );
@@ -137,6 +204,80 @@ public void alterColumnType(String databaseName, String tableName, StarRocksColu
137204 }
138205 }
139206
207+ private String buildAlterAddColumnsSql (
208+ String databaseName ,
209+ String tableName ,
210+ List <StarRocksColumn > addColumns ,
211+ long timeoutSecond ) {
212+ StringBuilder builder = new StringBuilder ();
213+ builder .append (String .format ("ALTER TABLE `%s`.`%s` " , databaseName , tableName ));
214+ String columnsStmt =
215+ addColumns .stream ()
216+ .map (col -> "ADD COLUMN " + buildColumnStmt (col ))
217+ .collect (Collectors .joining (", " ));
218+ builder .append (columnsStmt );
219+ builder .append (String .format (" PROPERTIES (\" timeout\" = \" %s\" )" , timeoutSecond ));
220+ builder .append (";" );
221+ return builder .toString ();
222+ }
223+
224+ private String buildCreateTableSql (StarRocksTable table , boolean ignoreIfExists ) {
225+ StringBuilder builder = new StringBuilder ();
226+ builder .append (
227+ String .format (
228+ "CREATE TABLE %s`%s`.`%s`" ,
229+ ignoreIfExists ? "IF NOT EXISTS " : "" ,
230+ table .getDatabaseName (),
231+ table .getTableName ()));
232+ builder .append (" (\n " );
233+ String columnsStmt =
234+ table .getColumns ().stream ()
235+ .map (this ::buildColumnStmt )
236+ .collect (Collectors .joining (",\n " ));
237+ builder .append (columnsStmt );
238+ builder .append ("\n ) " );
239+
240+ Preconditions .checkArgument (
241+ table .getTableType () == StarRocksTable .TableType .PRIMARY_KEY ,
242+ "Not support to build create table sql for table type " + table .getTableType ());
243+ Preconditions .checkArgument (
244+ table .getTableKeys ().isPresent (),
245+ "Can't build create table sql because there is no table keys" );
246+ String tableKeys =
247+ table .getTableKeys ().get ().stream ()
248+ .map (key -> "`" + key + "`" )
249+ .collect (Collectors .joining (", " ));
250+ builder .append (String .format ("PRIMARY KEY (%s)\n " , tableKeys ));
251+
252+ Preconditions .checkArgument (
253+ table .getDistributionKeys ().isPresent (),
254+ "Can't build create table sql because there is no distribution keys" );
255+ String distributionKeys =
256+ table .getDistributionKeys ().get ().stream ()
257+ .map (key -> "`" + key + "`" )
258+ .collect (Collectors .joining (", " ));
259+ builder .append (String .format ("DISTRIBUTED BY HASH (%s)" , distributionKeys ));
260+ if (table .getNumBuckets ().isPresent ()) {
261+ builder .append (" BUCKETS " );
262+ builder .append (table .getNumBuckets ().get ());
263+ }
264+ if (!table .getProperties ().isEmpty ()) {
265+ builder .append ("\n PROPERTIES (\n " );
266+ String properties =
267+ table .getProperties ().entrySet ().stream ()
268+ .map (
269+ entry ->
270+ String .format (
271+ "\" %s\" = \" %s\" " ,
272+ entry .getKey (), entry .getValue ()))
273+ .collect (Collectors .joining (",\n " ));
274+ builder .append (properties );
275+ builder .append ("\n )" );
276+ }
277+ builder .append (";" );
278+ return builder .toString ();
279+ }
280+
140281 private String buildTruncateTableSql (String databaseName , String tableName ) {
141282 return String .format ("TRUNCATE TABLE `%s`.`%s`;" , databaseName , tableName );
142283 }
@@ -171,6 +312,26 @@ private void executeUpdateStatement(String sql) throws StarRocksCatalogException
171312 }
172313 }
173314
315+ private void executeAlter (
316+ String databaseName , String tableName , String alterSql , long timeoutSecond )
317+ throws StarRocksCatalogException {
318+ try {
319+ Method m =
320+ getClass ()
321+ .getSuperclass ()
322+ .getDeclaredMethod (
323+ "executeAlter" ,
324+ String .class ,
325+ String .class ,
326+ String .class ,
327+ long .class );
328+ m .setAccessible (true );
329+ m .invoke (this , databaseName , tableName , alterSql , timeoutSecond );
330+ } catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e ) {
331+ throw new RuntimeException (e );
332+ }
333+ }
334+
174335 private void checkTableArgument (String databaseName , String tableName ) {
175336 Preconditions .checkArgument (
176337 !StringUtils .isNullOrWhitespaceOnly (databaseName ),
@@ -191,15 +352,25 @@ private String buildColumnStmt(StarRocksColumn column) {
191352 builder .append (" " );
192353 builder .append (column .isNullable () ? "NULL" : "NOT NULL" );
193354 if (column .getDefaultValue ().isPresent ()) {
194- builder .append (String .format (" DEFAULT \" %s\" " , column .getDefaultValue ().get ()));
355+ builder .append (
356+ String .format (
357+ " DEFAULT \" %s\" " ,
358+ escapeForDoubleQuotedSqlString (column .getDefaultValue ().get ())));
195359 }
196360
197361 if (column .getColumnComment ().isPresent ()) {
198- builder .append (String .format (" COMMENT \" %s\" " , column .getColumnComment ().get ()));
362+ builder .append (
363+ String .format (
364+ " COMMENT \" %s\" " ,
365+ escapeForDoubleQuotedSqlString (column .getColumnComment ().get ())));
199366 }
200367 return builder .toString ();
201368 }
202369
370+ private String escapeForDoubleQuotedSqlString (String value ) {
371+ return value .replace ("\\ " , "\\ \\ " ).replace ("\" " , "\\ \" " );
372+ }
373+
203374 private String getFullColumnType (
204375 String type , Optional <Integer > columnSize , Optional <Integer > decimalDigits ) {
205376 String dataType = type .toUpperCase ();
0 commit comments