Skip to content

Commit 723f1a3

Browse files
committed
fix test class
1 parent b548110 commit 723f1a3

2 files changed

Lines changed: 40 additions & 76 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/BinlogOnlyNewlyAddedTableITCase.java

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.junit.jupiter.api.AfterEach;
4545
import org.junit.jupiter.api.BeforeEach;
4646
import org.junit.jupiter.api.Test;
47-
import org.junit.jupiter.api.Timeout;
4847

4948
import java.sql.SQLException;
5049
import java.time.ZoneId;
@@ -57,19 +56,17 @@
5756
import java.util.Map;
5857
import java.util.Properties;
5958
import java.util.UUID;
60-
import java.util.concurrent.TimeUnit;
6159
import java.util.stream.Collectors;
6260

6361
import static java.lang.String.format;
6462

6563
/**
6664
* IT tests for binlog-only newly added table capture functionality using {@link
67-
* MySqlSource.scanBinlogNewlyAddedTableEnabled}.
65+
* MySqlSourceBuilder#scanBinlogNewlyAddedTableEnabled(boolean)}.
6866
*
6967
* <p>This test validates that tables matching the configured pattern are automatically captured
7068
* when they are created during binlog reading phase, without triggering snapshot phase.
7169
*/
72-
@Timeout(value = 300, unit = TimeUnit.SECONDS)
7370
class BinlogOnlyNewlyAddedTableITCase extends MySqlSourceTestBase {
7471

7572
private final UniqueDatabase testDatabase =
@@ -78,18 +75,6 @@ class BinlogOnlyNewlyAddedTableITCase extends MySqlSourceTestBase {
7875
@BeforeEach
7976
public void before() throws SQLException {
8077
testDatabase.createAndInitialize();
81-
82-
try (MySqlConnection connection = getConnection()) {
83-
connection.setAutoCommit(false);
84-
// Create an initial table to ensure binlog is active
85-
String tableId = testDatabase.getDatabaseName() + ".initial_table";
86-
connection.execute(
87-
format(
88-
"CREATE TABLE %s (id BIGINT PRIMARY KEY, value VARCHAR(100));",
89-
tableId));
90-
connection.execute(format("INSERT INTO %s VALUES (1, 'initial');", tableId));
91-
connection.commit();
92-
}
9378
}
9479

9580
@AfterEach
@@ -110,27 +95,45 @@ void testBinlogOnlyCaptureMultipleNewTables() throws Exception {
11095
@Test
11196
void testBinlogOnlyCaptureWithPatternMatching() throws Exception {
11297
// Test with wildcard pattern: capture tables like user_*
98+
// Flink CDC style: unescaped '.' is db/table separator, '\.' is regex any-char wildcard
11399
testBinlogOnlyCaptureWithPattern(
114-
testDatabase.getDatabaseName() + ".user_.*",
100+
testDatabase.getDatabaseName() + ".user_\\.*",
115101
"user_profiles",
116102
"user_settings",
117103
"user_logs");
118104
}
119105

120106
@Test
121107
void testBinlogOnlyCaptureWithDatabasePattern() throws Exception {
122-
// Test with database.* pattern
108+
// Test with database.* pattern (all tables in database)
109+
// Flink CDC style: unescaped '.' is db/table separator, '\.' is regex any-char wildcard
123110
testBinlogOnlyCaptureWithPattern(
124-
testDatabase.getDatabaseName() + ".*", "product_inventory", "product_catalog");
111+
testDatabase.getDatabaseName() + ".\\.*", "product_inventory", "product_catalog");
125112
}
126113

127114
private void testBinlogOnlyCapture(String... tableNames) throws Exception {
128-
String pattern = testDatabase.getDatabaseName() + ".(" + String.join("|", tableNames) + ")";
115+
String pattern =
116+
testDatabase.getDatabaseName() + "\\.(" + String.join("|", tableNames) + ")";
129117
testBinlogOnlyCaptureWithPattern(pattern, tableNames);
130118
}
131119

132120
private void testBinlogOnlyCaptureWithPattern(String tablePattern, String... tableNames)
133121
throws Exception {
122+
// Pre-create tables before starting source to satisfy startup validation.
123+
// With StartupOptions.latest(), no snapshot is taken - only binlog events after source
124+
// starts are captured.
125+
try (MySqlConnection preConnection = getConnection()) {
126+
preConnection.setAutoCommit(false);
127+
for (String tableName : tableNames) {
128+
String tableId = testDatabase.getDatabaseName() + "." + tableName;
129+
preConnection.execute(
130+
format(
131+
"CREATE TABLE %s (id BIGINT PRIMARY KEY, name VARCHAR(100), quantity INT);",
132+
tableId));
133+
}
134+
preConnection.commit();
135+
}
136+
134137
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
135138
env.setParallelism(1);
136139
env.enableCheckpointing(200L);
@@ -190,20 +193,14 @@ private void testBinlogOnlyCaptureWithPattern(String tablePattern, String... tab
190193
// Wait for job to start reading binlog
191194
Thread.sleep(2000);
192195

193-
// Create new tables and insert data
196+
// Insert/update/delete data - these are captured as binlog events
194197
List<String> expectedResults = new ArrayList<>();
195198
try (MySqlConnection connection = getConnection()) {
196199
connection.setAutoCommit(false);
197200

198201
for (String tableName : tableNames) {
199202
String tableId = testDatabase.getDatabaseName() + "." + tableName;
200203

201-
// Create table
202-
connection.execute(
203-
format(
204-
"CREATE TABLE %s (id BIGINT PRIMARY KEY, name VARCHAR(100), quantity INT);",
205-
tableId));
206-
207204
// Insert data - these should be captured as binlog events
208205
connection.execute(
209206
format(
@@ -241,6 +238,17 @@ private void testBinlogOnlyCaptureWithPattern(String tablePattern, String... tab
241238

242239
@Test
243240
void testBinlogOnlyDoesNotCaptureNonMatchingTables() throws Exception {
241+
// Pre-create matching table before starting source (required for startup validation)
242+
String matchingTable = testDatabase.getDatabaseName() + ".temp_test";
243+
try (MySqlConnection preConnection = getConnection()) {
244+
preConnection.setAutoCommit(false);
245+
preConnection.execute(
246+
format(
247+
"CREATE TABLE %s (id BIGINT PRIMARY KEY, value VARCHAR(100));",
248+
matchingTable));
249+
preConnection.commit();
250+
}
251+
244252
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
245253
env.setParallelism(1);
246254
env.enableCheckpointing(200L);
@@ -272,7 +280,8 @@ void testBinlogOnlyDoesNotCaptureNonMatchingTables() throws Exception {
272280
.build();
273281

274282
// Only capture tables matching temp_*
275-
String tablePattern = testDatabase.getDatabaseName() + ".temp_.*";
283+
// Flink CDC style: unescaped '.' is db/table separator, '\.' is regex any-char wildcard
284+
String tablePattern = testDatabase.getDatabaseName() + ".temp_\\.*";
276285

277286
MySqlSource<RowData> mySqlSource =
278287
MySqlSource.<RowData>builder()
@@ -302,15 +311,10 @@ void testBinlogOnlyDoesNotCaptureNonMatchingTables() throws Exception {
302311
try (MySqlConnection connection = getConnection()) {
303312
connection.setAutoCommit(false);
304313

305-
// Create a matching table
306-
String matchingTable = testDatabase.getDatabaseName() + ".temp_test";
307-
connection.execute(
308-
format(
309-
"CREATE TABLE %s (id BIGINT PRIMARY KEY, value VARCHAR(100));",
310-
matchingTable));
314+
// Insert into matching table (already exists)
311315
connection.execute(format("INSERT INTO %s VALUES (1, 'matched');", matchingTable));
312316

313-
// Create a non-matching table
317+
// Create and insert into non-matching table (will not be captured)
314318
String nonMatchingTable = testDatabase.getDatabaseName() + ".permanent_test";
315319
connection.execute(
316320
format(

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/binlog_test.sql

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -35,44 +35,4 @@ CREATE TABLE initial_table (
3535
value VARCHAR(100)
3636
);
3737

38-
INSERT INTO initial_table VALUES (1, 'initial');
39-
40-
-- Example schema for dynamically created product tables (products_2024, etc.)
41-
-- These tables are created during test execution, not from this SQL file
42-
-- Schema reference:
43-
-- CREATE TABLE products_YYYY (
44-
-- id BIGINT PRIMARY KEY,
45-
-- name VARCHAR(100),
46-
-- quantity INT
47-
-- );
48-
49-
-- Example schema for dynamically created order tables (orders_2024, orders_2025, etc.)
50-
-- Schema reference:
51-
-- CREATE TABLE orders_YYYY (
52-
-- id BIGINT PRIMARY KEY,
53-
-- name VARCHAR(100),
54-
-- quantity INT
55-
-- );
56-
57-
-- Example schema for dynamically created user tables (user_profiles, user_settings, user_logs, etc.)
58-
-- Schema reference:
59-
-- CREATE TABLE user_* (
60-
-- id BIGINT PRIMARY KEY,
61-
-- name VARCHAR(100),
62-
-- quantity INT
63-
-- );
64-
65-
-- Example schema for dynamically created temp tables (temp_test, etc.)
66-
-- Schema reference:
67-
-- CREATE TABLE temp_* (
68-
-- id BIGINT PRIMARY KEY,
69-
-- value VARCHAR(100)
70-
-- );
71-
72-
-- Example schema for non-matching tables (permanent_test, etc.)
73-
-- These should not be captured when pattern doesn't match
74-
-- Schema reference:
75-
-- CREATE TABLE permanent_* (
76-
-- id BIGINT PRIMARY KEY,
77-
-- value VARCHAR(100)
78-
-- );
38+
INSERT INTO initial_table VALUES (1, 'initial');

0 commit comments

Comments
 (0)