Skip to content

Commit ec2f9c4

Browse files
committed
fix
1 parent c05fd79 commit ec2f9c4

1 file changed

Lines changed: 28 additions & 1 deletion

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/main/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksEnrichedCatalog.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.lang.reflect.InvocationTargetException;
3030
import java.lang.reflect.Method;
31+
import java.util.Optional;
3132

3233
/** An enriched {@code StarRocksCatalog} with more schema evolution abilities. */
3334
public class StarRocksEnrichedCatalog extends StarRocksCatalog {
@@ -113,7 +114,13 @@ public void alterColumnType(String databaseName, String tableName, StarRocksColu
113114
"column name cannot be null or empty.");
114115
String alterSql =
115116
buildAlterColumnTypeSql(
116-
databaseName, tableName, column.getColumnName(), column.getDataType());
117+
databaseName,
118+
tableName,
119+
column.getColumnName(),
120+
getFullColumnType(
121+
column.getDataType(),
122+
column.getColumnSize(),
123+
column.getDecimalDigits()));
117124
try {
118125
long startTimeMillis = System.currentTimeMillis();
119126
executeUpdateStatement(alterSql);
@@ -181,4 +188,24 @@ private void checkTableArgument(String databaseName, String tableName) {
181188
!StringUtils.isNullOrWhitespaceOnly(tableName),
182189
"Table name cannot be null or empty.");
183190
}
191+
192+
private String getFullColumnType(
193+
String type, Optional<Integer> columnSize, Optional<Integer> decimalDigits) {
194+
String dataType = type.toUpperCase();
195+
switch (dataType) {
196+
case "DECIMAL":
197+
Preconditions.checkArgument(
198+
columnSize.isPresent(), "DECIMAL type must have column size");
199+
Preconditions.checkArgument(
200+
decimalDigits.isPresent(), "DECIMAL type must have decimal digits");
201+
return String.format("DECIMAL(%d, %s)", columnSize.get(), decimalDigits.get());
202+
case "CHAR":
203+
case "VARCHAR":
204+
Preconditions.checkArgument(
205+
columnSize.isPresent(), type + " type must have column size");
206+
return String.format("%s(%d)", dataType, columnSize.get());
207+
default:
208+
return dataType;
209+
}
210+
}
184211
}

0 commit comments

Comments
 (0)