Skip to content

Commit 0e1d0cf

Browse files
committed
[test][flink] Fix Unstable test FlinkTableSinkITCase.testVersionMergeEngineWithTypeTimestampLTZ9
1 parent c6af64d commit 0e1d0cf

File tree

1 file changed

+41
-30
lines changed

1 file changed

+41
-30
lines changed

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.HashMap;
6363
import java.util.List;
6464
import java.util.Map;
65+
import java.util.concurrent.ExecutionException;
6566
import java.util.stream.Collectors;
6667
import java.util.stream.Stream;
6768

@@ -129,9 +130,9 @@ void before() {
129130
}
130131

131132
@AfterEach
132-
void after() {
133+
void after() throws ExecutionException, InterruptedException {
133134
tEnv.useDatabase(BUILTIN_DATABASE);
134-
tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB));
135+
tEnv.executeSql(String.format("drop database %s cascade", DEFAULT_DB)).await();
135136
}
136137

137138
@ParameterizedTest
@@ -1114,18 +1115,19 @@ void testUnsupportedStmtOnVersionMergeEngine() {
11141115
@Test
11151116
void testVersionMergeEngineWithTypeBigint() throws Exception {
11161117
tEnv.executeSql(
1117-
"create table merge_engine_with_version (a int not null primary key not enforced,"
1118-
+ " b string, ts bigint) with('table.merge-engine' = 'versioned',"
1119-
+ "'table.merge-engine.versioned.ver-column' = 'ts')");
1118+
"create table version_merge_engine_with_bigint (a int not null primary key not enforced,"
1119+
+ " b string, ts bigint) with('table.merge-engine' = 'versioned',"
1120+
+ "'table.merge-engine.versioned.ver-column' = 'ts')")
1121+
.await();
11201122

11211123
// insert once
11221124
tEnv.executeSql(
1123-
"insert into merge_engine_with_version (a, b, ts) VALUES "
1125+
"insert into version_merge_engine_with_bigint (a, b, ts) VALUES "
11241126
+ "(1, 'v1', 1000), (2, 'v2', 1000), (1, 'v11', 999), (3, 'v3', 1000)")
11251127
.await();
11261128

11271129
CloseableIterator<Row> rowIter =
1128-
tEnv.executeSql("select * from merge_engine_with_version").collect();
1130+
tEnv.executeSql("select * from version_merge_engine_with_bigint").collect();
11291131

11301132
// id=1 not update
11311133
List<String> expectedRows =
@@ -1151,32 +1153,35 @@ void testVersionMergeEngineWithTypeBigint() throws Exception {
11511153
@Test
11521154
void testVersionMergeEngineWithTypeTimestamp() throws Exception {
11531155
tEnv.executeSql(
1154-
"create table merge_engine_with_version (a int not null primary key not enforced,"
1155-
+ " b string, ts TIMESTAMP(3)) with('table.merge-engine' = 'versioned',"
1156-
+ "'table.merge-engine.versioned.ver-column' = 'ts')");
1157-
1156+
"create table merge_engine_with_version (a int not null primary key not enforced,"
1157+
+ " b string, ts TIMESTAMP(3)) with('table.merge-engine' = 'versioned',"
1158+
+ "'table.merge-engine.versioned.ver-column' = 'ts')")
1159+
.await();
11581160
// insert once
11591161
tEnv.executeSql(
11601162
"INSERT INTO merge_engine_with_version (a, b, ts) VALUES "
11611163
+ "(1, 'v1', TIMESTAMP '2024-12-27 12:00:00.123'), "
11621164
+ "(2, 'v2', TIMESTAMP '2024-12-27 12:00:00.123'), "
1163-
+ "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123'), "
1164-
+ "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123'),"
1165-
+ "(3, 'v33', TIMESTAMP '2024-12-27 12:00:00.123');")
1165+
+ "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123');")
11661166
.await();
1167-
11681167
CloseableIterator<Row> rowIter =
11691168
tEnv.executeSql("select * from merge_engine_with_version").collect();
1170-
1171-
// id=1 not update, but id=3 updated
11721169
List<String> expectedRows =
11731170
Arrays.asList(
11741171
"+I[1, v1, 2024-12-27T12:00:00.123]",
11751172
"+I[2, v2, 2024-12-27T12:00:00.123]",
1176-
"+I[3, v3, 2024-12-27T12:00:00.123]",
1173+
"+I[3, v3, 2024-12-27T12:00:00.123]");
1174+
assertResultsIgnoreOrder(rowIter, expectedRows, false);
1175+
1176+
// insert again. id=1 not update, but id=3 updated
1177+
tEnv.executeSql(
1178+
"INSERT INTO merge_engine_with_version (a, b, ts) VALUES "
1179+
+ "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123'), "
1180+
+ "(3, 'v33', TIMESTAMP '2024-12-27 12:00:00.123');");
1181+
expectedRows =
1182+
Arrays.asList(
11771183
"-U[3, v3, 2024-12-27T12:00:00.123]",
11781184
"+U[3, v33, 2024-12-27T12:00:00.123]");
1179-
11801185
assertResultsIgnoreOrder(rowIter, expectedRows, true);
11811186
}
11821187

@@ -1185,30 +1190,36 @@ void testVersionMergeEngineWithTypeTimestampLTZ9() throws Exception {
11851190

11861191
tEnv.getConfig().set("table.local-time-zone", "UTC");
11871192
tEnv.executeSql(
1188-
"create table merge_engine_with_version (a int not null primary key not enforced,"
1189-
+ " b string, ts TIMESTAMP(9) WITH LOCAL TIME ZONE ) with("
1190-
+ "'table.merge-engine' = 'versioned',"
1191-
+ "'table.merge-engine.versioned.ver-column' = 'ts')");
1193+
"create table merge_engine_with_version (a int not null primary key not enforced,"
1194+
+ " b string, ts TIMESTAMP(9) WITH LOCAL TIME ZONE ) with("
1195+
+ "'table.merge-engine' = 'versioned',"
1196+
+ "'table.merge-engine.versioned.ver-column' = 'ts')")
1197+
.await();
11921198

11931199
// insert once
11941200
tEnv.executeSql(
11951201
"INSERT INTO merge_engine_with_version (a, b, ts) VALUES "
11961202
+ "(1, 'v1', CAST(TIMESTAMP '2024-12-27 12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
11971203
+ "(2, 'v2', CAST(TIMESTAMP '2024-12-27 12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
1198-
+ "(1, 'v11', CAST(TIMESTAMP '2024-12-27 12:00:00.123456788' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
1199-
+ "(3, 'v3', CAST(TIMESTAMP '2024-12-27 12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
1200-
+ "(3, 'v33', CAST(TIMESTAMP '2024-12-27 12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE));")
1204+
+ "(3, 'v3', CAST(TIMESTAMP '2024-12-27 12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE));")
12011205
.await();
1202-
12031206
CloseableIterator<Row> rowIter =
12041207
tEnv.executeSql("select * from merge_engine_with_version").collect();
1205-
1206-
// id=1 not update, but id=3 updated
12071208
List<String> expectedRows =
12081209
Arrays.asList(
12091210
"+I[1, v1, 2024-12-27T12:00:00.123456789Z]",
12101211
"+I[2, v2, 2024-12-27T12:00:00.123456789Z]",
1211-
"+I[3, v3, 2024-12-27T12:00:00.123456789Z]",
1212+
"+I[3, v3, 2024-12-27T12:00:00.123456789Z]");
1213+
assertResultsIgnoreOrder(rowIter, expectedRows, false);
1214+
1215+
// insert again. id=1 not update, but id=3 updated
1216+
tEnv.executeSql(
1217+
"INSERT INTO merge_engine_with_version (a, b, ts) VALUES "
1218+
+ "(1, 'v11', CAST(TIMESTAMP '2024-12-27 12:00:00.123456788' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
1219+
+ "(3, 'v33', CAST(TIMESTAMP '2024-12-27 12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE));")
1220+
.await();
1221+
expectedRows =
1222+
Arrays.asList(
12121223
"-U[3, v3, 2024-12-27T12:00:00.123456789Z]",
12131224
"+U[3, v33, 2024-12-27T12:00:00.123456789Z]");
12141225

0 commit comments

Comments
 (0)