Skip to content

Commit 3a2a3bf

Browse files
authored
[FLINK-38965][postgres] Fix LIKE wildcard matching issue for similar table names in PostgreSQL CDC connector (#4239)
1 parent f78e5ee commit 3a2a3bf

3 files changed

Lines changed: 443 additions & 1 deletion

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/connection/PostgresConnection.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
* {@link JdbcConnection} connection extension used for connecting to Postgres instances.
5757
*
5858
* @author Horia Chiorean
59-
* <p>Copied from Debezium 1.9.8-Final with three additional methods:
59+
* <p>Copied from Debezium 1.9.8-Final with the following modifications:
6060
* <ul>
6161
* <li>Constructor PostgresConnection( Configuration config, PostgresValueConverterBuilder
6262
* valueConverterBuilder, ConnectionFactory factory) to allow passing a custom
@@ -66,6 +66,13 @@
6666
* <li>override isTableUniqueIndexIncluded: Copied DBZ-5398 from Debezium 2.0.0.Final to fix
6767
* https://github.com/ververica/flink-cdc-connectors/issues/2710. Remove this comment
6868
* after bumping debezium version to 2.0.0.Final.
69+
* <li>FLINK-38965: Modified doReadTableColumn to filter out columns from other tables that
70+
* might be returned due to PostgreSQL LIKE wildcard matching. The underscore '_' matches
71+
* any single character, and '%' matches any sequence of characters. For example, when
72+
* querying table 'user_sink', the LIKE pattern may also match 'userbsink' (due to '_');
73+
* when querying table 'user%data' (where % is a literal character in the table name), the
74+
* LIKE pattern may also match 'user_test_data' (due to '%'). See also:
75+
* https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java#L1327
6976
* </ul>
7077
*/
7178
public class PostgresConnection extends JdbcConnection {
@@ -696,6 +703,17 @@ public Optional<Column> readColumnForDecoder(
696703
private Optional<ColumnEditor> doReadTableColumn(
697704
ResultSet columnMetadata, TableId tableId, Tables.ColumnNameFilter columnFilter)
698705
throws SQLException {
706+
// FLINK-38965: Filter out columns from other tables that might be returned due to
707+
// PostgreSQL LIKE wildcard matching. The underscore '_' matches any single character,
708+
// and '%' matches any sequence of characters. For example:
709+
// - When querying 'user_sink', the pattern may also match 'userbsink' (due to '_')
710+
// - When querying 'user%data' (where % is literal), it may match 'user_test_data' (due to
711+
// '%')
712+
final String resultTableName = columnMetadata.getString(3);
713+
if (!tableId.table().equals(resultTableName)) {
714+
return Optional.empty();
715+
}
716+
699717
final String columnName = columnMetadata.getString(4);
700718
if (columnFilter == null
701719
|| columnFilter.matches(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,340 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.postgres.source;
19+
20+
import org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
21+
import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
22+
import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy;
23+
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
24+
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
25+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
26+
import org.apache.flink.table.api.TableResult;
27+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
28+
import org.apache.flink.test.util.MiniClusterWithClientResource;
29+
import org.apache.flink.types.Row;
30+
import org.apache.flink.util.CloseableIterator;
31+
32+
import org.junit.jupiter.api.AfterEach;
33+
import org.junit.jupiter.api.BeforeEach;
34+
import org.junit.jupiter.api.Test;
35+
import org.junit.jupiter.api.Timeout;
36+
import org.junit.jupiter.api.extension.RegisterExtension;
37+
38+
import java.sql.Connection;
39+
import java.sql.Statement;
40+
import java.util.ArrayList;
41+
import java.util.Arrays;
42+
import java.util.List;
43+
import java.util.concurrent.TimeUnit;
44+
import java.util.stream.Collectors;
45+
46+
import static java.lang.String.format;
47+
import static org.assertj.core.api.Assertions.assertThat;
48+
49+
/**
50+
* IT tests for FLINK-38965: Fix PostgreSQL CDC connector issue when table names contain underscore
51+
* or percent characters that match other tables due to LIKE wildcard behavior.
52+
*
53+
* <p>PostgreSQL LIKE wildcards:
54+
*
55+
* <ul>
56+
* <li>'_' (underscore) matches any single character. E.g., 'user_sink' matches 'userbsink'
57+
* <li>'%' (percent) matches any sequence of characters. E.g., 'user%data' matches
58+
* 'user_test_data'
59+
* </ul>
60+
*
61+
* <p>When table names contain these special characters, JDBC metadata queries using LIKE may return
62+
* columns from unintended tables.
63+
*/
64+
@Timeout(value = 300, unit = TimeUnit.SECONDS)
65+
class SimilarTableNamesITCase extends PostgresTestBase {
66+
67+
private static final int DEFAULT_PARALLELISM = 2;
68+
private static final String DB_NAME_PREFIX = "postgres";
69+
private static final String SCHEMA_NAME = "similar_names";
70+
71+
@RegisterExtension
72+
public final ExternalResourceProxy<MiniClusterWithClientResource> miniClusterResource =
73+
new ExternalResourceProxy<>(
74+
new MiniClusterWithClientResource(
75+
new MiniClusterResourceConfiguration.Builder()
76+
.setNumberTaskManagers(1)
77+
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
78+
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
79+
.withHaLeadershipControl()
80+
.build()));
81+
82+
private final UniqueDatabase similarNamesDatabase =
83+
new UniqueDatabase(
84+
POSTGRES_CONTAINER,
85+
DB_NAME_PREFIX,
86+
SCHEMA_NAME,
87+
POSTGRES_CONTAINER.getUsername(),
88+
POSTGRES_CONTAINER.getPassword());
89+
90+
private String slotName;
91+
92+
@BeforeEach
93+
public void before() {
94+
similarNamesDatabase.createAndInitialize();
95+
this.slotName = getSlotName();
96+
}
97+
98+
@AfterEach
99+
public void after() throws Exception {
100+
Thread.sleep(1000L);
101+
similarNamesDatabase.removeSlot(slotName);
102+
}
103+
104+
// ==================== Test Cases ====================
105+
106+
/**
107+
* Test that when capturing CDC events for table 'ndi_pg_user_sink_1', we don't accidentally
108+
* capture events from 'ndi_pg_userbsink_1' which would match due to PostgreSQL's LIKE wildcard
109+
* behavior (underscore '_' matches any single character).
110+
*
111+
* <p>Before the fix (FLINK-38965), this test would fail with: "java.lang.IllegalStateException:
112+
* Duplicate key Optional.empty"
113+
*/
114+
@Test
115+
void testReadTableWithSimilarNameUnderscore() throws Exception {
116+
StreamTableEnvironment tEnv = createTableEnv();
117+
118+
// Only capture events from 'ndi_pg_user_sink_1' table
119+
tEnv.executeSql(createSourceDDL("target_table", "ndi_pg_user_sink_1"));
120+
121+
TableResult result = tEnv.executeSql("SELECT * FROM target_table");
122+
CloseableIterator<Row> iterator = result.collect();
123+
124+
try {
125+
// Verify snapshot data (3 rows from ndi_pg_user_sink_1)
126+
List<String> expectedSnapshotData =
127+
Arrays.asList(
128+
"+I[1, user_1, Shanghai]",
129+
"+I[2, user_2, Beijing]",
130+
"+I[3, user_3, Hangzhou]");
131+
assertRowsEquals(collectRows(iterator, 3), expectedSnapshotData);
132+
133+
// Perform DML operations on both tables
134+
executeDmlOperations();
135+
136+
// Verify streaming events - should only contain events from ndi_pg_user_sink_1
137+
List<String> expectedStreamData =
138+
Arrays.asList(
139+
"+I[4, user_4, Wuhan]",
140+
"-U[1, user_1, Shanghai]",
141+
"+U[1, user_1, Suzhou]");
142+
assertRowsEquals(collectRows(iterator, 3), expectedStreamData);
143+
} finally {
144+
closeResourcesAndWaitForJobTermination(iterator, result);
145+
}
146+
}
147+
148+
/**
149+
* Test that when capturing CDC events for table 'user%data' (which contains a literal '%'
150+
* character), we don't accidentally capture events from 'user_test_data' which would match due
151+
* to PostgreSQL's LIKE wildcard behavior (percent '%' matches any sequence of characters).
152+
*
153+
* <p>When querying for table 'user%data', the LIKE pattern may also match 'user_test_data'
154+
* because '%' matches the '_test_' sequence.
155+
*/
156+
@Test
157+
void testReadTableWithSimilarNamePercent() throws Exception {
158+
StreamTableEnvironment tEnv = createTableEnv();
159+
160+
// Only capture events from 'user%data' table (note: % is a literal character in table name)
161+
tEnv.executeSql(createSourceDDL("target_table", "user%data"));
162+
163+
TableResult result = tEnv.executeSql("SELECT * FROM target_table");
164+
CloseableIterator<Row> iterator = result.collect();
165+
166+
try {
167+
// Verify snapshot data (3 rows from user%data only)
168+
List<String> expectedSnapshotData =
169+
Arrays.asList(
170+
"+I[201, percent_1, Tianjin]",
171+
"+I[202, percent_2, Dalian]",
172+
"+I[203, percent_3, Qingdao]");
173+
assertRowsEquals(collectRows(iterator, 3), expectedSnapshotData);
174+
175+
// Perform DML on user_test_data table - these should NOT be captured
176+
executeDmlOperationsOnTestDataTable();
177+
178+
// Perform DML on target table - these SHOULD be captured
179+
try (Connection conn =
180+
getJdbcConnection(
181+
POSTGRES_CONTAINER, similarNamesDatabase.getDatabaseName());
182+
Statement stmt = conn.createStatement()) {
183+
stmt.execute(
184+
"INSERT INTO similar_names.\"user%data\" VALUES (204, 'percent_4', 'Xiamen')");
185+
}
186+
187+
// Should only see the insert from target table, not user_test_data table
188+
List<String> expectedStreamData = Arrays.asList("+I[204, percent_4, Xiamen]");
189+
assertRowsEquals(collectRows(iterator, 1), expectedStreamData);
190+
} finally {
191+
closeResourcesAndWaitForJobTermination(iterator, result);
192+
}
193+
}
194+
195+
/**
196+
* Test reading from all similar-named tables to verify they can be captured independently
197+
* without interference.
198+
*/
199+
@Test
200+
void testReadAllSimilarNamedTables() throws Exception {
201+
StreamTableEnvironment tEnv = createTableEnv();
202+
203+
// Capture events from underscore-related tables using regex pattern
204+
tEnv.executeSql(createSourceDDL("underscore_tables", "ndi_pg_user.*"));
205+
206+
TableResult result = tEnv.executeSql("SELECT * FROM underscore_tables");
207+
CloseableIterator<Row> iterator = result.collect();
208+
209+
try {
210+
// Verify snapshot data (3 rows from each table = 6 total)
211+
List<String> expectedSnapshotData =
212+
Arrays.asList(
213+
// From ndi_pg_user_sink_1
214+
"+I[1, user_1, Shanghai]",
215+
"+I[2, user_2, Beijing]",
216+
"+I[3, user_3, Hangzhou]",
217+
// From ndi_pg_userbsink_1
218+
"+I[101, userb_1, Guangzhou]",
219+
"+I[102, userb_2, Shenzhen]",
220+
"+I[103, userb_3, Chengdu]");
221+
assertRowsEquals(collectRows(iterator, 6), expectedSnapshotData);
222+
} finally {
223+
closeResourcesAndWaitForJobTermination(iterator, result);
224+
}
225+
}
226+
227+
// ==================== Helper Methods ====================
228+
229+
/** Creates and configures the StreamTableEnvironment. */
230+
private StreamTableEnvironment createTableEnv() {
231+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
232+
env.setParallelism(DEFAULT_PARALLELISM);
233+
env.enableCheckpointing(200L);
234+
return StreamTableEnvironment.create(env);
235+
}
236+
237+
/** Creates the source DDL for the given table name pattern. */
238+
private String createSourceDDL(String flinkTableName, String pgTablePattern) {
239+
return format(
240+
"CREATE TABLE %s ("
241+
+ " id INT NOT NULL,"
242+
+ " name STRING,"
243+
+ " address STRING,"
244+
+ " PRIMARY KEY (id) NOT ENFORCED"
245+
+ ") WITH ("
246+
+ " 'connector' = 'postgres-cdc',"
247+
+ " 'hostname' = '%s',"
248+
+ " 'port' = '%s',"
249+
+ " 'username' = '%s',"
250+
+ " 'password' = '%s',"
251+
+ " 'database-name' = '%s',"
252+
+ " 'schema-name' = '%s',"
253+
+ " 'table-name' = '%s',"
254+
+ " 'scan.startup.mode' = 'initial',"
255+
+ " 'scan.incremental.snapshot.enabled' = 'true',"
256+
+ " 'scan.incremental.snapshot.chunk.size' = '100',"
257+
+ " 'decoding.plugin.name' = 'pgoutput',"
258+
+ " 'slot.name' = '%s',"
259+
+ " 'scan.lsn-commit.checkpoints-num-delay' = '1'"
260+
+ ")",
261+
flinkTableName,
262+
similarNamesDatabase.getHost(),
263+
similarNamesDatabase.getDatabasePort(),
264+
similarNamesDatabase.getUsername(),
265+
similarNamesDatabase.getPassword(),
266+
similarNamesDatabase.getDatabaseName(),
267+
SCHEMA_NAME,
268+
pgTablePattern,
269+
slotName);
270+
}
271+
272+
/** Collects specified number of rows from the iterator. */
273+
private List<String> collectRows(CloseableIterator<Row> iterator, int expectedCount) {
274+
List<String> rows = new ArrayList<>();
275+
int count = 0;
276+
while (count < expectedCount && iterator.hasNext()) {
277+
rows.add(iterator.next().toString());
278+
count++;
279+
}
280+
return rows;
281+
}
282+
283+
/** Asserts that actual rows match expected rows (order-insensitive). */
284+
private void assertRowsEquals(List<String> actual, List<String> expected) {
285+
assertThat(actual.stream().sorted().collect(Collectors.toList()))
286+
.containsExactlyInAnyOrderElementsOf(
287+
expected.stream().sorted().collect(Collectors.toList()));
288+
}
289+
290+
/** Executes DML operations on target and similar-named tables for streaming phase testing. */
291+
private void executeDmlOperations() throws Exception {
292+
try (Connection conn =
293+
getJdbcConnection(
294+
POSTGRES_CONTAINER, similarNamesDatabase.getDatabaseName());
295+
Statement stmt = conn.createStatement()) {
296+
// Insert into target table
297+
stmt.execute(
298+
"INSERT INTO similar_names.ndi_pg_user_sink_1 VALUES (4, 'user_4', 'Wuhan')");
299+
// Insert into similar-named table (should NOT be captured when only listening to
300+
// target)
301+
stmt.execute(
302+
"INSERT INTO similar_names.ndi_pg_userbsink_1 VALUES (104, 'userb_4', 'Nanjing')");
303+
// Update target table
304+
stmt.execute(
305+
"UPDATE similar_names.ndi_pg_user_sink_1 SET address = 'Suzhou' WHERE id = 1");
306+
}
307+
}
308+
309+
/** Executes DML operations on user_test_data table for '%' wildcard testing. */
310+
private void executeDmlOperationsOnTestDataTable() throws Exception {
311+
try (Connection conn =
312+
getJdbcConnection(
313+
POSTGRES_CONTAINER, similarNamesDatabase.getDatabaseName());
314+
Statement stmt = conn.createStatement()) {
315+
// Insert into user_test_data table (should NOT be captured when only listening to
316+
// user%data)
317+
stmt.execute(
318+
"INSERT INTO similar_names.user_test_data VALUES (304, 'test_4', 'Harbin')");
319+
// Update user_test_data table
320+
stmt.execute(
321+
"UPDATE similar_names.user_test_data SET address = 'Changchun' WHERE id = 301");
322+
}
323+
}
324+
325+
/** Properly closes resources and waits for job termination. */
326+
private void closeResourcesAndWaitForJobTermination(
327+
CloseableIterator<Row> iterator, TableResult result) throws Exception {
328+
iterator.close();
329+
result.getJobClient()
330+
.ifPresent(
331+
client -> {
332+
client.cancel();
333+
try {
334+
client.getJobExecutionResult().get(30, TimeUnit.SECONDS);
335+
} catch (Exception e) {
336+
// Job cancelled, expected
337+
}
338+
});
339+
}
340+
}

0 commit comments

Comments
 (0)