Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1384,74 +1384,4 @@ void testDeleteBehaviorForInsertStmt(String deleteBehavior) throws Exception {
assertResultsIgnoreOrder(rowIter, expectedRows, true);
}
}

@Test
void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception {
String tableName = "wal_mode_pk_table";
// Create a table with WAL mode and default merge engine
tEnv.executeSql(
String.format(
"create table %s ("
+ " id int not null,"
+ " category string,"
+ " amount bigint,"
+ " primary key (id) not enforced"
+ ") with ('table.changelog.image' = 'wal')",
tableName));

// Insert initial data
tEnv.executeSql(
String.format(
"INSERT INTO %s VALUES "
+ "(1, 'A', 100), "
+ "(2, 'B', 200), "
+ "(3, 'A', 150), "
+ "(4, 'B', 250)",
tableName))
.await();

// Use batch mode to update and delete records
tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 120 WHERE id = 1").await();
tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 180 WHERE id = 3").await();
tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE id = 4").await();

// Do aggregation on the table and verify ChangelogNormalize node is generated
String aggQuery =
String.format(
"SELECT category, SUM(amount) as total_amount FROM %s /*+ OPTIONS('scan.startup.mode' = 'earliest') */ GROUP BY category",
tableName);

// Explain the aggregation query to check for ChangelogNormalize
String aggPlan = tEnv.explainSql(aggQuery);
// ChangelogNormalize should be present to normalize the changelog for aggregation
// In Flink, when the source produces changelog with primary key semantics (I, UA, D),
// a ChangelogNormalize operator is inserted before aggregation
assertThat(aggPlan).contains("ChangelogNormalize");

// Execute the aggregation and verify the result
CloseableIterator<Row> aggIter = tEnv.executeSql(aggQuery).collect();

// Expected aggregation results:
// Category A: 120 (id=1) + 180 (id=3) = 300
// Category B: 200 (id=2) = 200 (id=4 was deleted)
List<String> expectedAggResults =
Arrays.asList(
"+I[A, 100]",
"-U[A, 100]",
"+U[A, 250]",
"-U[A, 250]",
"+U[A, 150]",
"-U[A, 150]",
"+U[A, 270]",
"-U[A, 270]",
"+U[A, 120]",
"-U[A, 120]",
"+U[A, 300]",
"+I[B, 250]",
"-D[B, 250]",
"+I[B, 200]");

// Collect results with timeout
assertResultsIgnoreOrder(aggIter, expectedAggResults, true);
}
}