Skip to content

Commit 20d9176

Browse files
committed
alter column is asynchronous
1 parent ec2f9c4 commit 20d9176

2 files changed

Lines changed: 29 additions & 1 deletion

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/StarRocksMetadataApplierITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ void testStarRocksRenameColumn() throws Exception {
313313
StarRocksContainer.STARROCKS_TABLE_NAME);
314314

315315
runJobWithEvents(generateRenameColumnEvents(tableId));
316-
316+
waitAlterDone(tableId, 60000L);
317317
List<String> actual = inspectTableSchema(tableId);
318318

319319
List<String> expected =

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-starrocks/src/test/java/org/apache/flink/cdc/connectors/starrocks/sink/utils/StarRocksSinkTestBase.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
4040
import org.testcontainers.lifecycle.Startables;
4141

42+
import java.sql.Connection;
4243
import java.sql.ResultSet;
4344
import java.sql.SQLException;
4445
import java.time.Duration;
@@ -227,6 +228,33 @@ public List<String> fetchTableContent(TableId tableId, int columnCount) throws S
227228
return results;
228229
}
229230

231+
// Starrocks alter column is asynchronous and does not support Light mode.
232+
public void waitAlterDone(TableId tableId, long timeout)
233+
throws SQLException, InterruptedException {
234+
Connection conn = STARROCKS_CONTAINER.createConnection("");
235+
conn.createStatement().execute(String.format("USE `%s`", tableId.getSchemaName()));
236+
long t0 = System.currentTimeMillis();
237+
while (System.currentTimeMillis() - t0 < timeout) {
238+
ResultSet rs =
239+
conn.createStatement()
240+
.executeQuery(
241+
String.format(
242+
"SHOW ALTER TABLE COLUMN WHERE TableName = '%s' ORDER BY CreateTime DESC LIMIT 1",
243+
tableId.getTableName()));
244+
if (rs.next()) {
245+
String state = rs.getString("State");
246+
if ("FINISHED".equals(state)) {
247+
return;
248+
}
249+
if ("CANCELLED".equals(state)) {
250+
throw new RuntimeException("Alter failed: " + rs.getString("Msg"));
251+
}
252+
}
253+
Thread.sleep(1000L);
254+
}
255+
throw new RuntimeException("Alter job timeout");
256+
}
257+
230258
public static <T> void assertEqualsInAnyOrder(List<T> expected, List<T> actual) {
231259
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
232260
}

0 commit comments

Comments
 (0)