Skip to content

Commit 3736f3b

Browse files
platinumhamburgvamossagar12
authored andcommitted
[flink] Revert unstable test case FlinkTableSinkITCase.testWalModeWithDefaultMergeEngineAndAggregation (#2215)
1 parent 55e7fb4 commit 3736f3b

File tree

1 file changed

+0
-70
lines changed

1 file changed

+0
-70
lines changed

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java

Lines changed: 0 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1384,74 +1384,4 @@ void testDeleteBehaviorForInsertStmt(String deleteBehavior) throws Exception {
13841384
assertResultsIgnoreOrder(rowIter, expectedRows, true);
13851385
}
13861386
}
1387-
1388-
@Test
1389-
void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception {
1390-
String tableName = "wal_mode_pk_table";
1391-
// Create a table with WAL mode and default merge engine
1392-
tEnv.executeSql(
1393-
String.format(
1394-
"create table %s ("
1395-
+ " id int not null,"
1396-
+ " category string,"
1397-
+ " amount bigint,"
1398-
+ " primary key (id) not enforced"
1399-
+ ") with ('table.changelog.image' = 'wal')",
1400-
tableName));
1401-
1402-
// Insert initial data
1403-
tEnv.executeSql(
1404-
String.format(
1405-
"INSERT INTO %s VALUES "
1406-
+ "(1, 'A', 100), "
1407-
+ "(2, 'B', 200), "
1408-
+ "(3, 'A', 150), "
1409-
+ "(4, 'B', 250)",
1410-
tableName))
1411-
.await();
1412-
1413-
// Use batch mode to update and delete records
1414-
tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 120 WHERE id = 1").await();
1415-
tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 180 WHERE id = 3").await();
1416-
tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE id = 4").await();
1417-
1418-
// Do aggregation on the table and verify ChangelogNormalize node is generated
1419-
String aggQuery =
1420-
String.format(
1421-
"SELECT category, SUM(amount) as total_amount FROM %s /*+ OPTIONS('scan.startup.mode' = 'earliest') */ GROUP BY category",
1422-
tableName);
1423-
1424-
// Explain the aggregation query to check for ChangelogNormalize
1425-
String aggPlan = tEnv.explainSql(aggQuery);
1426-
// ChangelogNormalize should be present to normalize the changelog for aggregation
1427-
// In Flink, when the source produces changelog with primary key semantics (I, UA, D),
1428-
// a ChangelogNormalize operator is inserted before aggregation
1429-
assertThat(aggPlan).contains("ChangelogNormalize");
1430-
1431-
// Execute the aggregation and verify the result
1432-
CloseableIterator<Row> aggIter = tEnv.executeSql(aggQuery).collect();
1433-
1434-
// Expected aggregation results:
1435-
// Category A: 120 (id=1) + 180 (id=3) = 300
1436-
// Category B: 200 (id=2) = 200 (id=4 was deleted)
1437-
List<String> expectedAggResults =
1438-
Arrays.asList(
1439-
"+I[A, 100]",
1440-
"-U[A, 100]",
1441-
"+U[A, 250]",
1442-
"-U[A, 250]",
1443-
"+U[A, 150]",
1444-
"-U[A, 150]",
1445-
"+U[A, 270]",
1446-
"-U[A, 270]",
1447-
"+U[A, 120]",
1448-
"-U[A, 120]",
1449-
"+U[A, 300]",
1450-
"+I[B, 250]",
1451-
"-D[B, 250]",
1452-
"+I[B, 200]");
1453-
1454-
// Collect results with timeout
1455-
assertResultsIgnoreOrder(aggIter, expectedAggResults, true);
1456-
}
14571387
}

0 commit comments

Comments
 (0)