Skip to content

Commit b9437bf

Browse files
lvyanquanyuxiqian.yxq
andauthored
[FLINK-38358][pipeline-connector][mysql] Ignore trailing spaces in table capturing rules. (apache#4119)
--------- Co-authored-by: yuxiqian.yxq <yuxiqian.yxq@alibaba-inc.com>
1 parent 32ec9f5 commit b9437bf

2 files changed

Lines changed: 42 additions & 1 deletion

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,8 @@ private String validateTableAndReturnDebeziumStyle(String tables) {
538538

539539
// In CDC-style table matching, table names could be separated by `,` character.
540540
// Convert it to `|` as it's standard RegEx syntax.
541-
tables = tables.replace(",", "|");
541+
tables =
542+
Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|"));
542543
LOG.info("Expression after replacing comma with vert separator: {}", tables);
543544

544545
// Essentially, we're just trying to swap escaped `\\.` and unescaped `.`.

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTablePatternMatchingTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,23 @@ void testMatchingTablesWithMultipleRules() {
166166
.containsExactlyInAnyOrder("db\\.tbl1|db2\\.tbl.*|db3\\.tbl3");
167167
}
168168

169+
@Test
170+
void testMatchingTablesWithSpacedRules() {
171+
List<String> spacedRules =
172+
Arrays.asList(
173+
"db.tbl1, db2.tbl\\.*, db3.tbl3",
174+
"db.tbl1 ,db2.tbl\\.* ,db3.tbl3",
175+
"db.tbl1 , db2.tbl\\.* , db3.tbl3");
176+
177+
Assertions.assertThat(spacedRules)
178+
.map(rule -> testGenericTableMatching(rule, null, false))
179+
.containsOnly(Arrays.asList("db.tbl1", "db2.tbl2", "db3.tbl3"));
180+
181+
Assertions.assertThat(spacedRules)
182+
.map(rule -> testGenericTableMatching(rule, null, true))
183+
.containsOnly(Arrays.asList("db\\.tbl1|db2\\.tbl.*|db3\\.tbl3"));
184+
}
185+
169186
@Test
170187
void testWildcardMatchingRealTables() throws Exception {
171188
String[] expected =
@@ -280,6 +297,29 @@ void testMultipleRulesWithRealTables() throws Exception {
280297
.containsExactlyInAnyOrder(expected);
281298
}
282299

300+
@Test
301+
void testMatchingRealTablesWithSpacedRules() throws Exception {
302+
String[] expected =
303+
new String[] {
304+
"CreateTableEvent{tableId=db.tbl1, schema=columns={`id` INT NOT NULL}, primaryKeys=id, options=()}",
305+
"DataChangeEvent{tableId=db.tbl1, before=[], after=[17], op=INSERT, meta=()}",
306+
"CreateTableEvent{tableId=db2.tbl2, schema=columns={`id` INT NOT NULL}, primaryKeys=id, options=()}",
307+
"DataChangeEvent{tableId=db2.tbl2, before=[], after=[17], op=INSERT, meta=()}",
308+
"CreateTableEvent{tableId=db3.tbl3, schema=columns={`id` INT NOT NULL}, primaryKeys=id, options=()}",
309+
"DataChangeEvent{tableId=db3.tbl3, before=[], after=[17], op=INSERT, meta=()}"
310+
};
311+
312+
Assertions.assertThat(
313+
getRealWorldMatchedTables(
314+
"db.tbl1 , db2.tbl\\.* , db3.tbl3", null, false, expected.length))
315+
.containsExactlyInAnyOrder(expected);
316+
317+
Assertions.assertThat(
318+
getRealWorldMatchedTables(
319+
"db.tbl1 , db2.tbl\\.* , db3.tbl3", null, true, expected.length))
320+
.containsExactlyInAnyOrder(expected);
321+
}
322+
283323
private static void initializeMySqlTables(List<Tuple2<String, String>> tableNames) {
284324
tableNames.forEach(
285325
tableName -> {

0 commit comments

Comments
 (0)