Skip to content

Commit cfb986b

Browse files
[tests] Add integration test for WAL image mode change log of Primary Key (#2217)
1 parent 1a3c51e commit cfb986b

File tree

2 files changed

+108
-7
lines changed

2 files changed

+108
-7
lines changed

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlinkTableSinkITCase.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import java.util.stream.Stream;
6767

6868
import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
69+
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertQueryResultExactOrder;
6970
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
7071
import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout;
7172
import static org.apache.fluss.flink.utils.FlinkTestBase.waitUntilPartitions;
@@ -1384,4 +1385,76 @@ void testDeleteBehaviorForInsertStmt(String deleteBehavior) throws Exception {
13841385
assertResultsIgnoreOrder(rowIter, expectedRows, true);
13851386
}
13861387
}
1388+
1389+
@Test
1390+
void testWalModeWithDefaultMergeEngineAndAggregation() throws Exception {
1391+
// use single parallelism to make result ordering stable
1392+
tEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
1393+
1394+
String tableName = "wal_mode_pk_table";
1395+
// Create a table with WAL mode and default merge engine
1396+
tEnv.executeSql(
1397+
String.format(
1398+
"create table %s ("
1399+
+ " id int not null,"
1400+
+ " category string,"
1401+
+ " amount bigint,"
1402+
+ " primary key (id) not enforced"
1403+
+ ") with ('table.changelog.image' = 'wal')",
1404+
tableName));
1405+
1406+
// Insert initial data
1407+
tEnv.executeSql(
1408+
String.format(
1409+
"INSERT INTO %s VALUES "
1410+
+ "(1, 'A', 100), "
1411+
+ "(2, 'B', 200), "
1412+
+ "(3, 'A', 150), "
1413+
+ "(4, 'B', 250)",
1414+
tableName))
1415+
.await();
1416+
1417+
// Use batch mode to update and delete records
1418+
tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 120 WHERE id = 1").await();
1419+
tBatchEnv.executeSql("UPDATE " + tableName + " SET amount = 180 WHERE id = 3").await();
1420+
tBatchEnv.executeSql("DELETE FROM " + tableName + " WHERE id = 4").await();
1421+
1422+
// Do aggregation on the table and verify ChangelogNormalize node is generated
1423+
String aggQuery =
1424+
String.format(
1425+
"SELECT category, SUM(amount) as total_amount FROM %s /*+ OPTIONS('scan.startup.mode' = 'earliest') */ GROUP BY category",
1426+
tableName);
1427+
1428+
// Explain the aggregation query to check for ChangelogNormalize
1429+
String aggPlan = tEnv.explainSql(aggQuery);
1430+
// ChangelogNormalize should be present to normalize the changelog for aggregation
1431+
// In Flink, when the source produces changelog with primary key semantics (I, UA, D),
1432+
// a ChangelogNormalize operator is inserted before aggregation
1433+
assertThat(aggPlan).contains("ChangelogNormalize");
1434+
1435+
// Expected aggregation results:
1436+
// Category A: 120 (id=1) + 180 (id=3) = 300
1437+
// Category B: 200 (id=2) = 200 (id=4 was deleted)
1438+
List<String> expectedAggResults =
1439+
Arrays.asList(
1440+
"+I[A, 100]",
1441+
"+I[B, 200]",
1442+
"-U[A, 100]",
1443+
"+U[A, 250]",
1444+
"-U[B, 200]",
1445+
"+U[B, 450]",
1446+
"-U[A, 250]",
1447+
"+U[A, 150]",
1448+
"-U[A, 150]",
1449+
"+U[A, 270]",
1450+
"-U[A, 270]",
1451+
"+U[A, 120]",
1452+
"-U[A, 120]",
1453+
"+U[A, 300]",
1454+
"-U[B, 450]",
1455+
"+U[B, 200]");
1456+
1457+
// Collect results with timeout
1458+
assertQueryResultExactOrder(tEnv, aggQuery, expectedAggResults);
1459+
}
13871460
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/testutils/FlinkRowAssertionsUtils.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.util.ArrayList;
2626
import java.util.List;
2727
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
2830
import java.util.concurrent.TimeUnit;
2931
import java.util.concurrent.TimeoutException;
3032
import java.util.stream.Collectors;
@@ -34,6 +36,15 @@
3436
/** Utility class providing assertion methods for Flink test results. */
3537
public class FlinkRowAssertionsUtils {
3638

39+
// Use a daemon thread pool for hasNext checks to avoid blocking test shutdown
40+
private static final ExecutorService EXECUTOR =
41+
Executors.newCachedThreadPool(
42+
r -> {
43+
Thread t = new Thread(r, "FlinkRowAssertionsUtils-hasNext-checker");
44+
t.setDaemon(true);
45+
return t;
46+
});
47+
3748
private FlinkRowAssertionsUtils() {}
3849

3950
public static void assertRowResultsIgnoreOrder(
@@ -109,11 +120,23 @@ protected static List<String> collectRowsWithTimeout(
109120
long deadlineTimeMs = startTimeMs + maxWaitTime.toMillis();
110121
try {
111122
for (int i = 0; i < expectedCount; i++) {
123+
long remainingTimeMs = deadlineTimeMs - System.currentTimeMillis();
124+
if (remainingTimeMs <= 0) {
125+
// Deadline exceeded, throw timeout error immediately
126+
throw timeoutError(
127+
System.currentTimeMillis() - startTimeMs,
128+
expectedCount,
129+
actual.size(),
130+
actual);
131+
}
112132
// Wait for next record with timeout
113-
if (!waitForNextWithTimeout(
114-
iterator, Math.max(deadlineTimeMs - System.currentTimeMillis(), 1_000))) {
133+
if (!waitForNextWithTimeout(iterator, remainingTimeMs)) {
134+
115135
throw timeoutError(
116-
System.currentTimeMillis() - startTimeMs, expectedCount, actual.size());
136+
System.currentTimeMillis() - startTimeMs,
137+
expectedCount,
138+
actual.size(),
139+
actual);
117140
}
118141
if (iterator.hasNext()) {
119142
actual.add(iterator.next().toString());
@@ -152,21 +175,26 @@ protected static List<String> collectRowsWithTimeout(
152175
}
153176

154177
private static AssertionError timeoutError(
155-
long elapsedTime, int expectedCount, int actualCount) {
178+
long elapsedTime, int expectedCount, int actualCount, List<String> actualRecords) {
156179
return new AssertionError(
157180
String.format(
158181
"Timeout after waiting %d ms for Flink job results. "
159182
+ "Expected %d records but only received %d. "
160-
+ "This might indicate a job hang or insufficient data generation.",
161-
elapsedTime, expectedCount, actualCount));
183+
+ "This might indicate a job hang or insufficient data generation.%n"
184+
+ "Actual records received: %s",
185+
elapsedTime, expectedCount, actualCount, actualRecords));
162186
}
163187

164188
private static boolean waitForNextWithTimeout(
165189
CloseableIterator<Row> iterator, long maxWaitTime) {
166-
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(iterator::hasNext);
190+
CompletableFuture<Boolean> future =
191+
CompletableFuture.supplyAsync(iterator::hasNext, EXECUTOR);
167192
try {
168193
return future.get(maxWaitTime, TimeUnit.MILLISECONDS);
169194
} catch (TimeoutException e) {
195+
// Timeout occurred - cancel the future and return false
196+
// Note: The thread may still be blocked in hasNext(), but as a daemon thread,
197+
// it won't prevent JVM shutdown
170198
future.cancel(true);
171199
return false;
172200
} catch (Exception e) {

0 commit comments

Comments
 (0)