|
32 | 32 | import org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksType;
|
33 | 33 | import org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes.StarRocksTypeConverter;
|
34 | 34 |
|
| 35 | +import org.apache.maven.artifact.versioning.ComparableVersion; |
| 36 | + |
35 | 37 | import lombok.extern.slf4j.Slf4j;
|
36 | 38 |
|
37 | 39 | import java.sql.Connection;
|
| 40 | +import java.sql.ResultSet; |
38 | 41 | import java.sql.SQLException;
|
39 | 42 | import java.sql.Statement;
|
40 | 43 |
|
41 | 44 | @Slf4j
|
42 | 45 | public class SchemaUtils {
|
43 | 46 |
|
| 47 | + private static final String MIN_VERSION_TABLE_CHANGE_COLUMN = "3.3.2"; |
| 48 | + |
44 | 49 | private SchemaUtils() {}
|
45 | 50 |
|
46 | 51 | /**
|
@@ -111,33 +116,52 @@ && columnExists(
|
111 | 116 | public static void applySchemaChange(
|
112 | 117 | Connection connection, TablePath tablePath, AlterTableChangeColumnEvent event)
|
113 | 118 | throws SQLException {
|
114 |
| - StringBuilder sqlBuilder = |
115 |
| - new StringBuilder() |
116 |
| - .append("ALTER TABLE") |
117 |
| - .append(" ") |
118 |
| - .append(tablePath.getFullName()) |
| 119 | + ComparableVersion targetVersion = new ComparableVersion(MIN_VERSION_TABLE_CHANGE_COLUMN); |
| 120 | + ComparableVersion currentVersion; |
| 121 | + try (Statement statement = connection.createStatement(); |
| 122 | + ResultSet resultSet = |
| 123 | + statement.executeQuery("SELECT CURRENT_VERSION() as version")) { |
| 124 | + resultSet.next(); |
| 125 | + String version = resultSet.getString(1); |
| 126 | + log.debug("starrocks version: {}", version); |
| 127 | + String versionOne = version.split(" ")[0]; |
| 128 | + currentVersion = new ComparableVersion(versionOne); |
| 129 | + } |
| 130 | + |
| 131 | + if (currentVersion.compareTo(targetVersion) >= 0) { |
| 132 | + StringBuilder sqlBuilder = |
| 133 | + new StringBuilder() |
| 134 | + .append("ALTER TABLE") |
| 135 | + .append(" ") |
| 136 | + .append(tablePath.getFullName()) |
| 137 | + .append(" ") |
| 138 | + .append("RENAME COLUMN") |
| 139 | + .append(" ") |
| 140 | + .append(quoteIdentifier(event.getOldColumn())) |
| 141 | + .append(" TO ") |
| 142 | + .append(quoteIdentifier(event.getColumn().getName())); |
| 143 | + if (event.getColumn().getComment() != null) { |
| 144 | + sqlBuilder |
119 | 145 | .append(" ")
|
120 |
| - .append("RENAME COLUMN") |
| 146 | + .append("COMMENT ") |
| 147 | + .append("'") |
| 148 | + .append(event.getColumn().getComment()) |
| 149 | + .append("'"); |
| 150 | + } |
| 151 | + if (event.getAfterColumn() != null) { |
| 152 | + sqlBuilder |
121 | 153 | .append(" ")
|
122 |
| - .append(quoteIdentifier(event.getOldColumn())) |
123 |
| - .append(" TO ") |
124 |
| - .append(quoteIdentifier(event.getColumn().getName())); |
125 |
| - if (event.getColumn().getComment() != null) { |
126 |
| - sqlBuilder |
127 |
| - .append(" ") |
128 |
| - .append("COMMENT ") |
129 |
| - .append("'") |
130 |
| - .append(event.getColumn().getComment()) |
131 |
| - .append("'"); |
132 |
| - } |
133 |
| - if (event.getAfterColumn() != null) { |
134 |
| - sqlBuilder.append(" ").append("AFTER ").append(quoteIdentifier(event.getAfterColumn())); |
135 |
| - } |
| 154 | + .append("AFTER ") |
| 155 | + .append(quoteIdentifier(event.getAfterColumn())); |
| 156 | + } |
136 | 157 |
|
137 |
| - String changeColumnSQL = sqlBuilder.toString(); |
138 |
| - try (Statement statement = connection.createStatement()) { |
139 |
| - log.info("Executing change column SQL: " + changeColumnSQL); |
140 |
| - statement.execute(changeColumnSQL); |
| 158 | + String changeColumnSQL = sqlBuilder.toString(); |
| 159 | + try (Statement statement = connection.createStatement()) { |
| 160 | + log.info("Executing change column SQL: " + changeColumnSQL); |
| 161 | + statement.execute(changeColumnSQL); |
| 162 | + } |
| 163 | + } else { |
| 164 | + log.warn("versions prior to starrocks 3.3.2 do not support rename column operations"); |
141 | 165 | }
|
142 | 166 | }
|
143 | 167 |
|
|
0 commit comments