Skip to content

Commit 71d00e4

Browse files
[test][flink] Fix Unstable test FlinkTableSinkITCase.testVersionMergeEngineWithTypeTimestampLTZ9 (apache#2139)
1 parent 593d27e commit 71d00e4

File tree

1 file changed

+32
-23
lines changed

1 file changed

+32
-23
lines changed

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,32 +1151,35 @@ void testVersionMergeEngineWithTypeBigint() throws Exception {
11511151
@Test
11521152
void testVersionMergeEngineWithTypeTimestamp() throws Exception {
11531153
tEnv.executeSql(
1154-
"create table merge_engine_with_version (a int not null primary key not enforced,"
1155-
+ " b string, ts TIMESTAMP(3)) with('table.merge-engine' = 'versioned',"
1156-
+ "'table.merge-engine.versioned.ver-column' = 'ts')");
1157-
1154+
"create table merge_engine_with_version (a int not null primary key not enforced,"
1155+
+ " b string, ts TIMESTAMP(3)) with('table.merge-engine' = 'versioned',"
1156+
+ "'table.merge-engine.versioned.ver-column' = 'ts')")
1157+
.await();
11581158
// insert once
11591159
tEnv.executeSql(
11601160
"INSERT INTO merge_engine_with_version (a, b, ts) VALUES "
11611161
+ "(1, 'v1', TIMESTAMP '2024-12-27 12:00:00.123'), "
11621162
+ "(2, 'v2', TIMESTAMP '2024-12-27 12:00:00.123'), "
1163-
+ "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123'), "
1164-
+ "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123'),"
1165-
+ "(3, 'v33', TIMESTAMP '2024-12-27 12:00:00.123');")
1163+
+ "(3, 'v3', TIMESTAMP '2024-12-27 12:00:00.123');")
11661164
.await();
1167-
11681165
CloseableIterator<Row> rowIter =
11691166
tEnv.executeSql("select * from merge_engine_with_version").collect();
1170-
1171-
// id=1 not update, but id=3 updated
11721167
List<String> expectedRows =
11731168
Arrays.asList(
11741169
"+I[1, v1, 2024-12-27T12:00:00.123]",
11751170
"+I[2, v2, 2024-12-27T12:00:00.123]",
1176-
"+I[3, v3, 2024-12-27T12:00:00.123]",
1171+
"+I[3, v3, 2024-12-27T12:00:00.123]");
1172+
assertResultsIgnoreOrder(rowIter, expectedRows, false);
1173+
1174+
// insert again. id=1 not update, but id=3 updated
1175+
tEnv.executeSql(
1176+
"INSERT INTO merge_engine_with_version (a, b, ts) VALUES "
1177+
+ "(1, 'v11', TIMESTAMP '2024-12-27 11:59:59.123'), "
1178+
+ "(3, 'v33', TIMESTAMP '2024-12-27 12:00:00.123');");
1179+
expectedRows =
1180+
Arrays.asList(
11771181
"-U[3, v3, 2024-12-27T12:00:00.123]",
11781182
"+U[3, v33, 2024-12-27T12:00:00.123]");
1179-
11801183
assertResultsIgnoreOrder(rowIter, expectedRows, true);
11811184
}
11821185

@@ -1185,30 +1188,36 @@ void testVersionMergeEngineWithTypeTimestampLTZ9() throws Exception {
11851188

11861189
tEnv.getConfig().set("table.local-time-zone", "UTC");
11871190
tEnv.executeSql(
1188-
"create table merge_engine_with_version (a int not null primary key not enforced,"
1189-
+ " b string, ts TIMESTAMP(9) WITH LOCAL TIME ZONE ) with("
1190-
+ "'table.merge-engine' = 'versioned',"
1191-
+ "'table.merge-engine.versioned.ver-column' = 'ts')");
1191+
"create table merge_engine_with_version (a int not null primary key not enforced,"
1192+
+ " b string, ts TIMESTAMP(9) WITH LOCAL TIME ZONE ) with("
1193+
+ "'table.merge-engine' = 'versioned',"
1194+
+ "'table.merge-engine.versioned.ver-column' = 'ts')")
1195+
.await();
11921196

11931197
// insert once
11941198
tEnv.executeSql(
11951199
"INSERT INTO merge_engine_with_version (a, b, ts) VALUES "
11961200
+ "(1, 'v1', CAST(TIMESTAMP '2024-12-27 12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
11971201
+ "(2, 'v2', CAST(TIMESTAMP '2024-12-27 12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
1198-
+ "(1, 'v11', CAST(TIMESTAMP '2024-12-27 12:00:00.123456788' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
1199-
+ "(3, 'v3', CAST(TIMESTAMP '2024-12-27 12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
1200-
+ "(3, 'v33', CAST(TIMESTAMP '2024-12-27 12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE));")
1202+
+ "(3, 'v3', CAST(TIMESTAMP '2024-12-27 12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE));")
12011203
.await();
1202-
12031204
CloseableIterator<Row> rowIter =
12041205
tEnv.executeSql("select * from merge_engine_with_version").collect();
1205-
1206-
// id=1 not update, but id=3 updated
12071206
List<String> expectedRows =
12081207
Arrays.asList(
12091208
"+I[1, v1, 2024-12-27T12:00:00.123456789Z]",
12101209
"+I[2, v2, 2024-12-27T12:00:00.123456789Z]",
1211-
"+I[3, v3, 2024-12-27T12:00:00.123456789Z]",
1210+
"+I[3, v3, 2024-12-27T12:00:00.123456789Z]");
1211+
assertResultsIgnoreOrder(rowIter, expectedRows, false);
1212+
1213+
// insert again. id=1 not update, but id=3 updated
1214+
tEnv.executeSql(
1215+
"INSERT INTO merge_engine_with_version (a, b, ts) VALUES "
1216+
+ "(1, 'v11', CAST(TIMESTAMP '2024-12-27 12:00:00.123456788' AS TIMESTAMP(9) WITH LOCAL TIME ZONE)), "
1217+
+ "(3, 'v33', CAST(TIMESTAMP '2024-12-27 12:00:00.123456789' AS TIMESTAMP(9) WITH LOCAL TIME ZONE));")
1218+
.await();
1219+
expectedRows =
1220+
Arrays.asList(
12121221
"-U[3, v3, 2024-12-27T12:00:00.123456789Z]",
12131222
"+U[3, v33, 2024-12-27T12:00:00.123456789Z]");
12141223

0 commit comments

Comments
 (0)