Skip to content

Commit 6f5ceac

Browse files
committed
update
1 parent b1ae2e3 commit 6f5ceac

3 files changed

Lines changed: 131 additions & 15 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
* {@link JdbcConnection} connection extension used for connecting to Postgres instances.
5757
*
5858
* @author Horia Chiorean
59-
* <p>Copied from Debezium 1.9.8-Final with three additional methods:
59+
* <p>Copied from Debezium 1.9.8-Final with the following modifications:
6060
* <ul>
6161
* <li>Constructor PostgresConnection( Configuration config, PostgresValueConverterBuilder
6262
* valueConverterBuilder, ConnectionFactory factory) to allow passing a custom
@@ -66,6 +66,13 @@
6666
* <li>override isTableUniqueIndexIncluded: Copied DBZ-5398 from Debezium 2.0.0.Final to fix
6767
* https://github.com/ververica/flink-cdc-connectors/issues/2710. Remove this comment
6868
* after bumping debezium version to 2.0.0.Final.
69+
* <li>FLINK-38965: Modified doReadTableColumn to filter out columns from other tables that
70+
* might be returned due to PostgreSQL LIKE wildcard matching. The underscore '_' matches
71+
* any single character, and '%' matches any sequence of characters. For example, when
72+
* querying table 'user_sink', the LIKE pattern may also match 'userbsink' (due to '_');
73+
* when querying table 'user%data' (where % is a literal character in the table name), the
74+
* LIKE pattern may also match 'user_test_data' (due to '%'). See also:
75+
* https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java#L1327
6976
* </ul>
7077
*/
7178
public class PostgresConnection extends JdbcConnection {
@@ -697,9 +704,11 @@ private Optional<ColumnEditor> doReadTableColumn(
697704
ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter columnFilter)
698705
throws SQLException {
699706
// FLINK-38965: Filter out columns from other tables that might be returned due to
700-
// PostgreSQL LIKE wildcard matching (underscore '_' matches any single character).
701-
// For example, when querying 'ndi_pg_user_sink_1', the LIKE pattern may also match
702-
// 'ndi_pg_userbsink_1' because '_' acts as a wildcard.
707+
// PostgreSQL LIKE wildcard matching. The underscore '_' matches any single character,
708+
// and '%' matches any sequence of characters. For example:
709+
// - When querying 'user_sink', the pattern may also match 'userbsink' (due to '_')
710+
// - When querying 'user%data' (where % is literal), it may match 'user_test_data' (due to
711+
// '%')
703712
final String resultTableName = columnMetadata.getString(3);
704713
if (!tableId.table().equals(resultTableName)) {
705714
return Optional.empty();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/SimilarTableNamesITCase.java

Lines changed: 81 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,18 @@
4848

4949
/**
5050
* IT tests for FLINK-38965: Fix PostgreSQL CDC connector issue when table names contain underscore
51-
* that matches other tables due to LIKE wildcard behavior.
51+
* or percent characters that match other tables due to LIKE wildcard behavior.
52+
*
53+
* <p>PostgreSQL LIKE wildcards:
54+
*
55+
* <ul>
56+
* <li>'_' (underscore) matches any single character. E.g., 'user_sink' matches 'userbsink'
57+
* <li>'%' (percent) matches any sequence of characters. E.g., 'user%data' matches
58+
* 'user_test_data'
59+
* </ul>
60+
*
61+
* <p>When table names contain these special characters, JDBC metadata queries using LIKE may return
62+
* columns from unintended tables.
5263
*/
5364
@Timeout(value = 300, unit = TimeUnit.SECONDS)
5465
class SimilarTableNamesITCase extends PostgresTestBase {
@@ -135,17 +146,64 @@ void testReadTableWithSimilarNameUnderscore() throws Exception {
135146
}
136147

137148
/**
138-
* Test reading from both similar-named tables to verify they can be captured independently
149+
* Test that when capturing CDC events for table 'user%data' (which contains a literal '%'
150+
* character), we don't accidentally capture events from 'user_test_data' which would match due
151+
* to PostgreSQL's LIKE wildcard behavior (percent '%' matches any sequence of characters).
152+
*
153+
* <p>When querying for table 'user%data', the LIKE pattern may also match 'user_test_data'
154+
* because '%' matches the '_test_' sequence.
155+
*/
156+
@Test
157+
void testReadTableWithSimilarNamePercent() throws Exception {
158+
StreamTableEnvironment tEnv = createTableEnv();
159+
160+
// Only capture events from 'user%data' table (note: % is a literal character in table name)
161+
tEnv.executeSql(createSourceDDL("target_table", "user%data"));
162+
163+
TableResult result = tEnv.executeSql("SELECT * FROM target_table");
164+
CloseableIterator<Row> iterator = result.collect();
165+
166+
try {
167+
// Verify snapshot data (3 rows from user%data only)
168+
List<String> expectedSnapshotData =
169+
Arrays.asList(
170+
"+I[201, percent_1, Tianjin]",
171+
"+I[202, percent_2, Dalian]",
172+
"+I[203, percent_3, Qingdao]");
173+
assertRowsEquals(collectRows(iterator, 3), expectedSnapshotData);
174+
175+
// Perform DML on user_test_data table - these should NOT be captured
176+
executeDmlOperationsOnTestDataTable();
177+
178+
// Perform DML on target table - these SHOULD be captured
179+
try (Connection conn =
180+
getJdbcConnection(
181+
POSTGRES_CONTAINER, similarNamesDatabase.getDatabaseName());
182+
Statement stmt = conn.createStatement()) {
183+
stmt.execute(
184+
"INSERT INTO similar_names.\"user%data\" VALUES (204, 'percent_4', 'Xiamen')");
185+
}
186+
187+
// Should only see the insert from target table, not user_test_data table
188+
List<String> expectedStreamData = Arrays.asList("+I[204, percent_4, Xiamen]");
189+
assertRowsEquals(collectRows(iterator, 1), expectedStreamData);
190+
} finally {
191+
closeResourcesAndWaitForJobTermination(iterator, result);
192+
}
193+
}
194+
195+
/**
196+
* Test reading from all similar-named tables to verify they can be captured independently
139197
* without interference.
140198
*/
141199
@Test
142-
void testReadBothSimilarNamedTables() throws Exception {
200+
void testReadAllSimilarNamedTables() throws Exception {
143201
StreamTableEnvironment tEnv = createTableEnv();
144202

145-
// Capture events from both tables using regex pattern
146-
tEnv.executeSql(createSourceDDL("all_tables", "ndi_pg_user.*"));
203+
// Capture events from underscore-related tables using regex pattern
204+
tEnv.executeSql(createSourceDDL("underscore_tables", "ndi_pg_user.*"));
147205

148-
TableResult result = tEnv.executeSql("SELECT * FROM all_tables");
206+
TableResult result = tEnv.executeSql("SELECT * FROM underscore_tables");
149207
CloseableIterator<Row> iterator = result.collect();
150208

151209
try {
@@ -229,7 +287,7 @@ private void assertRowsEquals(List<String> actual, List<String> expected) {
229287
expected.stream().sorted().collect(Collectors.toList()));
230288
}
231289

232-
/** Executes DML operations on both tables for streaming phase testing. */
290+
/** Executes DML operations on target and similar-named tables for streaming phase testing. */
233291
private void executeDmlOperations() throws Exception {
234292
try (Connection conn =
235293
getJdbcConnection(
@@ -248,6 +306,22 @@ private void executeDmlOperations() throws Exception {
248306
}
249307
}
250308

309+
/** Executes DML operations on user_test_data table for '%' wildcard testing. */
310+
private void executeDmlOperationsOnTestDataTable() throws Exception {
311+
try (Connection conn =
312+
getJdbcConnection(
313+
POSTGRES_CONTAINER, similarNamesDatabase.getDatabaseName());
314+
Statement stmt = conn.createStatement()) {
315+
// Insert into user_test_data table (should NOT be captured when only listening to
316+
// user%data)
317+
stmt.execute(
318+
"INSERT INTO similar_names.user_test_data VALUES (304, 'test_4', 'Harbin')");
319+
// Update user_test_data table
320+
stmt.execute(
321+
"UPDATE similar_names.user_test_data SET address = 'Changchun' WHERE id = 301");
322+
}
323+
}
324+
251325
/** Properly closes resources and waits for job termination. */
252326
private void closeResourcesAndWaitForJobTermination(
253327
CloseableIterator<Row> iterator, TableResult result) throws Exception {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/similar_names.sql

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
-- See the License for the specific language governing permissions and
1414
-- limitations under the License.
1515

16-
-- FLINK-38965: Test case for similar table names with underscore
17-
-- This tests the fix for PostgreSQL LIKE wildcard matching issue
18-
-- where underscore '_' matches any single character, causing
19-
-- 'ndi_pg_user_sink_1' to also match 'ndi_pg_userbsink_1'
16+
-- FLINK-38965: Test case for similar table names with underscore or percent characters
17+
-- This tests the fix for PostgreSQL LIKE wildcard matching issue:
18+
-- - underscore '_' matches any single character
19+
-- - percent '%' matches any sequence of characters
20+
-- For example, 'user_sink' may match 'userbsink' (due to '_')
21+
-- and 'user%sink' may match 'user_test_sink' (due to '%')
2022

2123
DROP SCHEMA IF EXISTS similar_names CASCADE;
2224
CREATE SCHEMA similar_names;
@@ -49,3 +51,34 @@ INSERT INTO ndi_pg_userbsink_1
4951
VALUES (101, 'userb_1', 'Guangzhou'),
5052
(102, 'userb_2', 'Shenzhen'),
5153
(103, 'userb_3', 'Chengdu');
54+
55+
-- Table 3: user%data (tests '%' wildcard scenario)
56+
-- The table name contains '%' character which acts as a wildcard in LIKE pattern.
57+
-- When querying for table 'user%data', the LIKE pattern may also match
58+
-- 'user_test_data' because '%' matches any sequence of characters.
59+
CREATE TABLE "user%data" (
60+
id INTEGER NOT NULL PRIMARY KEY,
61+
name VARCHAR(255) NOT NULL,
62+
address VARCHAR(1024)
63+
);
64+
ALTER TABLE "user%data" REPLICA IDENTITY FULL;
65+
66+
INSERT INTO "user%data"
67+
VALUES (201, 'percent_1', 'Tianjin'),
68+
(202, 'percent_2', 'Dalian'),
69+
(203, 'percent_3', 'Qingdao');
70+
71+
-- Table 4: user_test_data (similar to 'user%data' when % is treated as wildcard)
72+
-- This table name would match the LIKE pattern for 'user%data'
73+
-- because '%' matches '_test_' sequence.
74+
CREATE TABLE user_test_data (
75+
id INTEGER NOT NULL PRIMARY KEY,
76+
name VARCHAR(255) NOT NULL,
77+
address VARCHAR(1024)
78+
);
79+
ALTER TABLE user_test_data REPLICA IDENTITY FULL;
80+
81+
INSERT INTO user_test_data
82+
VALUES (301, 'test_1', 'Harbin'),
83+
(302, 'test_2', 'Changchun'),
84+
(303, 'test_3', 'Shenyang');

0 commit comments

Comments
 (0)