Skip to content

Commit cc5e221

Browse files
committed
[starrocks] Escape quoted default/comment literals in DDL
1 parent 7e5ab46 commit cc5e221

1 file changed

Lines changed: 173 additions & 2 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java

Lines changed: 173 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@
2323
import com.starrocks.connector.flink.catalog.StarRocksCatalog;
2424
import com.starrocks.connector.flink.catalog.StarRocksCatalogException;
2525
import com.starrocks.connector.flink.catalog.StarRocksColumn;
26+
import com.starrocks.connector.flink.catalog.StarRocksTable;
2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
2829

2930
import java.lang.reflect.InvocationTargetException;
3031
import java.lang.reflect.Method;
32+
import java.util.List;
3133
import java.util.Optional;
34+
import java.util.stream.Collectors;
3235

3336
/** An enriched {@code StarRocksCatalog} with more schema evolution abilities. */
3437
public class StarRocksEnrichedCatalog extends StarRocksCatalog {
@@ -38,6 +41,70 @@ public StarRocksEnrichedCatalog(String jdbcUrl, String username, String password
3841

3942
private static final Logger LOG = LoggerFactory.getLogger(StarRocksEnrichedCatalog.class);
4043

44+
@Override
45+
public void createTable(StarRocksTable table, boolean ignoreIfExists)
46+
throws StarRocksCatalogException {
47+
String createTableSql = buildCreateTableSql(table, ignoreIfExists);
48+
try {
49+
executeUpdateStatement(createTableSql);
50+
LOG.info(
51+
"Success to create table {}.{}, sql: {}",
52+
table.getDatabaseName(),
53+
table.getDatabaseName(),
54+
createTableSql);
55+
} catch (Exception e) {
56+
LOG.error(
57+
"Failed to create table {}.{}, sql: {}",
58+
table.getDatabaseName(),
59+
table.getDatabaseName(),
60+
createTableSql,
61+
e);
62+
throw new StarRocksCatalogException(
63+
String.format(
64+
"Failed to create table %s.%s",
65+
table.getDatabaseName(), table.getDatabaseName()),
66+
e);
67+
}
68+
}
69+
70+
@Override
71+
public void alterAddColumns(
72+
String databaseName,
73+
String tableName,
74+
List<StarRocksColumn> addColumns,
75+
long timeoutSecond)
76+
throws StarRocksCatalogException {
77+
Preconditions.checkArgument(
78+
!StringUtils.isNullOrWhitespaceOnly(databaseName),
79+
"database name cannot be null or empty.");
80+
Preconditions.checkArgument(
81+
!StringUtils.isNullOrWhitespaceOnly(tableName),
82+
"table name cannot be null or empty.");
83+
Preconditions.checkArgument(!addColumns.isEmpty(), "Added columns should not be empty.");
84+
85+
String alterSql =
86+
buildAlterAddColumnsSql(databaseName, tableName, addColumns, timeoutSecond);
87+
try {
88+
long startTimeMillis = System.currentTimeMillis();
89+
executeAlter(databaseName, tableName, alterSql, timeoutSecond);
90+
LOG.info(
91+
"Success to add columns to {}.{}, duration: {}ms, sql: {}",
92+
databaseName,
93+
tableName,
94+
System.currentTimeMillis() - startTimeMillis,
95+
alterSql);
96+
} catch (Exception e) {
97+
LOG.error(
98+
"Failed to add columns to {}.{}, sql: {}",
99+
databaseName,
100+
tableName,
101+
alterSql,
102+
e);
103+
throw new StarRocksCatalogException(
104+
String.format("Failed to add columns to %s.%s ", databaseName, tableName), e);
105+
}
106+
}
107+
41108
public void truncateTable(String databaseName, String tableName)
42109
throws StarRocksCatalogException {
43110
checkTableArgument(databaseName, tableName);
@@ -137,6 +204,80 @@ public void alterColumnType(String databaseName, String tableName, StarRocksColu
137204
}
138205
}
139206

207+
private String buildAlterAddColumnsSql(
208+
String databaseName,
209+
String tableName,
210+
List<StarRocksColumn> addColumns,
211+
long timeoutSecond) {
212+
StringBuilder builder = new StringBuilder();
213+
builder.append(String.format("ALTER TABLE `%s`.`%s` ", databaseName, tableName));
214+
String columnsStmt =
215+
addColumns.stream()
216+
.map(col -> "ADD COLUMN " + buildColumnStmt(col))
217+
.collect(Collectors.joining(", "));
218+
builder.append(columnsStmt);
219+
builder.append(String.format(" PROPERTIES (\"timeout\" = \"%s\")", timeoutSecond));
220+
builder.append(";");
221+
return builder.toString();
222+
}
223+
224+
private String buildCreateTableSql(StarRocksTable table, boolean ignoreIfExists) {
225+
StringBuilder builder = new StringBuilder();
226+
builder.append(
227+
String.format(
228+
"CREATE TABLE %s`%s`.`%s`",
229+
ignoreIfExists ? "IF NOT EXISTS " : "",
230+
table.getDatabaseName(),
231+
table.getTableName()));
232+
builder.append(" (\n");
233+
String columnsStmt =
234+
table.getColumns().stream()
235+
.map(this::buildColumnStmt)
236+
.collect(Collectors.joining(",\n"));
237+
builder.append(columnsStmt);
238+
builder.append("\n) ");
239+
240+
Preconditions.checkArgument(
241+
table.getTableType() == StarRocksTable.TableType.PRIMARY_KEY,
242+
"Not support to build create table sql for table type " + table.getTableType());
243+
Preconditions.checkArgument(
244+
table.getTableKeys().isPresent(),
245+
"Can't build create table sql because there is no table keys");
246+
String tableKeys =
247+
table.getTableKeys().get().stream()
248+
.map(key -> "`" + key + "`")
249+
.collect(Collectors.joining(", "));
250+
builder.append(String.format("PRIMARY KEY (%s)\n", tableKeys));
251+
252+
Preconditions.checkArgument(
253+
table.getDistributionKeys().isPresent(),
254+
"Can't build create table sql because there is no distribution keys");
255+
String distributionKeys =
256+
table.getDistributionKeys().get().stream()
257+
.map(key -> "`" + key + "`")
258+
.collect(Collectors.joining(", "));
259+
builder.append(String.format("DISTRIBUTED BY HASH (%s)", distributionKeys));
260+
if (table.getNumBuckets().isPresent()) {
261+
builder.append(" BUCKETS ");
262+
builder.append(table.getNumBuckets().get());
263+
}
264+
if (!table.getProperties().isEmpty()) {
265+
builder.append("\nPROPERTIES (\n");
266+
String properties =
267+
table.getProperties().entrySet().stream()
268+
.map(
269+
entry ->
270+
String.format(
271+
"\"%s\" = \"%s\"",
272+
entry.getKey(), entry.getValue()))
273+
.collect(Collectors.joining(",\n"));
274+
builder.append(properties);
275+
builder.append("\n)");
276+
}
277+
builder.append(";");
278+
return builder.toString();
279+
}
280+
140281
private String buildTruncateTableSql(String databaseName, String tableName) {
141282
return String.format("TRUNCATE TABLE `%s`.`%s`;", databaseName, tableName);
142283
}
@@ -171,6 +312,26 @@ private void executeUpdateStatement(String sql) throws StarRocksCatalogException
171312
}
172313
}
173314

315+
private void executeAlter(
316+
String databaseName, String tableName, String alterSql, long timeoutSecond)
317+
throws StarRocksCatalogException {
318+
try {
319+
Method m =
320+
getClass()
321+
.getSuperclass()
322+
.getDeclaredMethod(
323+
"executeAlter",
324+
String.class,
325+
String.class,
326+
String.class,
327+
long.class);
328+
m.setAccessible(true);
329+
m.invoke(this, databaseName, tableName, alterSql, timeoutSecond);
330+
} catch (InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
331+
throw new RuntimeException(e);
332+
}
333+
}
334+
174335
private void checkTableArgument(String databaseName, String tableName) {
175336
Preconditions.checkArgument(
176337
!StringUtils.isNullOrWhitespaceOnly(databaseName),
@@ -191,15 +352,25 @@ private String buildColumnStmt(StarRocksColumn column) {
191352
builder.append(" ");
192353
builder.append(column.isNullable() ? "NULL" : "NOT NULL");
193354
if (column.getDefaultValue().isPresent()) {
194-
builder.append(String.format(" DEFAULT \"%s\"", column.getDefaultValue().get()));
355+
builder.append(
356+
String.format(
357+
" DEFAULT \"%s\"",
358+
escapeForDoubleQuotedSqlString(column.getDefaultValue().get())));
195359
}
196360

197361
if (column.getColumnComment().isPresent()) {
198-
builder.append(String.format(" COMMENT \"%s\"", column.getColumnComment().get()));
362+
builder.append(
363+
String.format(
364+
" COMMENT \"%s\"",
365+
escapeForDoubleQuotedSqlString(column.getColumnComment().get())));
199366
}
200367
return builder.toString();
201368
}
202369

370+
private String escapeForDoubleQuotedSqlString(String value) {
371+
return value.replace("\\", "\\\\").replace("\"", "\\\"");
372+
}
373+
203374
private String getFullColumnType(
204375
String type, Optional<Integer> columnSize, Optional<Integer> decimalDigits) {
205376
String dataType = type.toUpperCase();

0 commit comments

Comments
 (0)