Skip to content

Commit 538b8fa

Browse files
authored
[FLINK-38779] Route module supports standard RegExp replacement rules (#4190)
1 parent f5cec12 commit 538b8fa

12 files changed

Lines changed: 278 additions & 85 deletions

File tree

docs/content.zh/docs/core-concept/route.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,25 @@ route:
8585
description: route all tables in source_db to sink_db
8686
```
8787

88-
然后,`source_db` 库下所有的表都会被同步到 `sink_db` 库下。
88+
然后,`source_db` 库下所有的表都会被同步到 `sink_db` 库下。
89+
90+
## 高级:基于正则捕获组的替换规则
91+
92+
您可以在 `source-table` 字段中定义正则表达式的捕获组:
93+
94+
```yaml
95+
route:
96+
- source-table: db_(\.*).(\.*)_tbl
97+
sink-table: sink_db_$1.sink_table_$2
98+
```
99+
100+
这里我们创建了两个捕获组,分别用来匹配数据库名 `db_` 之后的后缀和表名 `_tbl` 之前的前缀。
101+
102+
以上游表 `db_foo.bar_tbl` 为例,我们将会从中提取出 `(foo, bar)` 作为捕获组,并且将其依次绑定到 `$1` 和 `$2` 变量中。
103+
因此,这张表将被路由到 `sink_db_foo.sink_table_bar` 下游表中。
104+
105+
{{< hint info >}}
106+
107+
注意:基于正则捕获组的替换规则无法与 `replace-symbol` 选项搭配使用。
108+
109+
{{< /hint >}}

docs/content/docs/core-concept/route.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,25 @@ route:
8686
description: route all tables in source_db to sink_db
8787
```
8888

89-
Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` without hassle.
89+
Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` without hassle.
90+
91+
## Advanced: RegExp Capturing & Replacement Rules
92+
93+
It is also possible to create capturing groups in `source-table` fields like this:
94+
95+
```yaml
96+
route:
97+
- source-table: db_(\.*).(\.*)_tbl
98+
sink-table: sink_db_$1.sink_table_$2
99+
```
100+
101+
Here we create two capturing groups matching database suffix and table prefix.
102+
103+
For upstream table `db_foo.bar_tbl`, capturing group `(foo, bar)` will be extracted and bound to `$1` and `$2`.
104+
As a result, such table will be routed to downstream table `sink_db_foo.sink_table_bar`.
105+
106+
{{< hint info >}}
107+
108+
Standard RegExp capturing could not be used with `replace-symbol` options.
109+
110+
{{< /hint >}}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java renamed to flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java

Lines changed: 87 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,44 +15,105 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.flink.cdc.runtime.operators.schema.common;
18+
package org.apache.flink.cdc.common.route;
1919

2020
import org.apache.flink.api.java.tuple.Tuple3;
21+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2122
import org.apache.flink.cdc.common.event.TableId;
22-
import org.apache.flink.cdc.common.route.RouteRule;
2323
import org.apache.flink.cdc.common.schema.Selectors;
2424

2525
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
2626
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
2727
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
2828

29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
2932
import javax.annotation.Nonnull;
3033

3134
import java.time.Duration;
3235
import java.util.ArrayList;
36+
import java.util.Arrays;
3337
import java.util.List;
3438
import java.util.Set;
39+
import java.util.regex.Matcher;
40+
import java.util.regex.Pattern;
3541
import java.util.regex.PatternSyntaxException;
3642
import java.util.stream.Collectors;
3743

3844
/**
3945
* Calculates how upstream data change events should be dispatched to downstream tables. Returns one
4046
* or many destination Table IDs based on provided routing rules.
4147
*/
48+
@PublicEvolving
4249
public class TableIdRouter {
4350

44-
private final List<Tuple3<Selectors, String, String>> routes;
45-
private final LoadingCache<TableId, List<TableId>> routingCache;
51+
private static final Logger LOG = LoggerFactory.getLogger(TableIdRouter.class);
4652
private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);
4753

54+
private final List<Tuple3<Pattern, String, String>> routes;
55+
private final LoadingCache<TableId, List<TableId>> routingCache;
56+
57+
private static final String DOT_PLACEHOLDER = "_dot_placeholder_";
58+
59+
/**
60+
* Currently, The supported regular syntax is not exactly the same in {@link Selectors}.
61+
*
62+
* <p>The main discrepancies are :
63+
*
64+
* <p>1) {@link Selectors} use {@code ,} to split table names instead of `|`.
65+
*
66+
* <p>2) If there is a need to use a dot ({@code .}) in a regular expression to match any
67+
* character, it is necessary to escape the dot with a backslash.
68+
*
69+
* <p>3) The unescaped {@code .} is used as the separator of database and table name. When
70+
* converting to Debezium style, it is expected to be escaped to match the dot ({@code .})
71+
* literally instead of the meta-character.
72+
*/
73+
public static String convertTableListToRegExpPattern(String tables) {
74+
LOG.info("Rewriting CDC style table capture list: {}", tables);
75+
76+
// In CDC-style table matching, table names could be separated by `,` character.
77+
// Convert it to `|` as it's standard RegEx syntax.
78+
tables =
79+
Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|"));
80+
LOG.info("Expression after replacing comma with vert separator: {}", tables);
81+
82+
// Essentially, we're just trying to swap escaped `\\.` and unescaped `.`.
83+
// In our table matching syntax, `\\.` means RegEx token matcher and `.` means database &
84+
// table name separator.
85+
// On the contrary, while we're matching TableId string, `\\.` means matching the "dot"
86+
// literal and `.` is the meta-character.
87+
88+
// Step 1: escape the dot with a backslash, but keep it as a placeholder (like `$`).
89+
// For example, `db\.*.tbl\.*` => `db$*.tbl$*`
90+
String unescapedTables = tables.replace("\\.", DOT_PLACEHOLDER);
91+
LOG.info("Expression after un-escaping dots as RegEx meta-character: {}", unescapedTables);
92+
93+
// Step 2: replace all remaining dots (`.`) to quoted version (`\.`), as a separator between
94+
// database and table names.
95+
// For example, `db$*.tbl$*` => `db$*\.tbl$*`
96+
String unescapedTablesWithDbTblSeparator = unescapedTables.replace(".", "\\.");
97+
LOG.info("Re-escaping dots as TableId delimiter: {}", unescapedTablesWithDbTblSeparator);
98+
99+
// Step 3: restore placeholder to normal RegEx matcher (`.`)
100+
// For example, `db$*\.tbl$*` => `db.*\.tbl.*`
101+
String standardRegExpTableCaptureList =
102+
unescapedTablesWithDbTblSeparator.replace(DOT_PLACEHOLDER, ".");
103+
LOG.info("Final standard RegExp table capture list: {}", standardRegExpTableCaptureList);
104+
105+
return standardRegExpTableCaptureList;
106+
}
107+
48108
public TableIdRouter(List<RouteRule> routingRules) {
49109
this.routes = new ArrayList<>();
50110
for (RouteRule rule : routingRules) {
51111
try {
52-
String tableInclusions = rule.sourceTable;
53-
Selectors selectors =
54-
new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
55-
routes.add(new Tuple3<>(selectors, rule.sinkTable, rule.replaceSymbol));
112+
routes.add(
113+
new Tuple3<>(
114+
Pattern.compile(convertTableListToRegExpPattern(rule.sourceTable)),
115+
rule.sinkTable,
116+
rule.replaceSymbol));
56117
} catch (PatternSyntaxException e) {
57118
throw new IllegalArgumentException(
58119
String.format(
@@ -80,7 +141,7 @@ public List<TableId> route(TableId sourceTableId) {
80141
private List<TableId> calculateRoute(TableId sourceTableId) {
81142
List<TableId> routedTableIds =
82143
routes.stream()
83-
.filter(route -> route.f0.isMatch(sourceTableId))
144+
.filter(route -> matches(route.f0, sourceTableId))
84145
.map(route -> resolveReplacement(sourceTableId, route))
85146
.collect(Collectors.toList());
86147
if (routedTableIds.isEmpty()) {
@@ -90,9 +151,14 @@ private List<TableId> calculateRoute(TableId sourceTableId) {
90151
}
91152

92153
private TableId resolveReplacement(
93-
TableId originalTable, Tuple3<Selectors, String, String> route) {
154+
TableId originalTable, Tuple3<Pattern, String, String> route) {
94155
if (route.f2 != null) {
95156
return TableId.parse(route.f1.replace(route.f2, originalTable.getTableName()));
157+
} else {
158+
Matcher matcher = route.f0.matcher(originalTable.toString());
159+
if (matcher.find()) {
160+
return TableId.parse(matcher.replaceAll(route.f1));
161+
}
96162
}
97163
return TableId.parse(route.f1);
98164
}
@@ -111,18 +177,16 @@ public List<Set<TableId>> groupSourceTablesByRouteRule(Set<TableId> tableIdSet)
111177
if (routes.isEmpty()) {
112178
return new ArrayList<>();
113179
}
114-
List<Set<TableId>> routedTableIds =
115-
routes.stream()
116-
.map(
117-
route -> {
118-
return tableIdSet.stream()
119-
.filter(
120-
tableId -> {
121-
return route.f0.isMatch(tableId);
122-
})
123-
.collect(Collectors.toSet());
124-
})
125-
.collect(Collectors.toList());
126-
return routedTableIds;
180+
return routes.stream()
181+
.map(
182+
route ->
183+
tableIdSet.stream()
184+
.filter(tableId -> matches(route.f0, tableId))
185+
.collect(Collectors.toSet()))
186+
.collect(Collectors.toList());
187+
}
188+
189+
private static boolean matches(Pattern pattern, TableId tableId) {
190+
return pattern.matcher(tableId.toString()).matches();
127191
}
128192
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java

Lines changed: 2 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.flink.cdc.common.source.DataSource;
3131
import org.apache.flink.cdc.common.utils.StringUtils;
3232
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
33-
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions;
3433
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
3534
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
3635
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
@@ -45,7 +44,6 @@
4544

4645
import com.mysql.cj.conf.PropertyKey;
4746
import io.debezium.relational.RelationalDatabaseConnectorConfig;
48-
import io.debezium.relational.Tables;
4947
import org.slf4j.Logger;
5048
import org.slf4j.LoggerFactory;
5149

@@ -63,6 +61,7 @@
6361
import java.util.Set;
6462
import java.util.stream.Collectors;
6563

64+
import static org.apache.flink.cdc.common.route.TableIdRouter.convertTableListToRegExpPattern;
6665
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
6766
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
6867
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_META_GROUP_SIZE;
@@ -231,7 +230,7 @@ public DataSource createDataSource(Context context) {
231230
}
232231

233232
if (scanBinlogNewlyAddedTableEnabled) {
234-
String newTables = validateTableAndReturnDebeziumStyle(tables);
233+
String newTables = convertTableListToRegExpPattern(tables);
235234
configFactory.tableList(newTables);
236235
configFactory.excludeTableList(tablesExclude);
237236

@@ -516,58 +515,6 @@ && doubleCompare(distributionFactorLower, 1.0d) <= 0,
516515

517516
private static final String DOT_PLACEHOLDER = "_$dot_placeholder$_";
518517

519-
/**
520-
* Currently, The supported regular syntax is not exactly the same in {@link Selectors} and
521-
* {@link Tables.TableFilter}.
522-
*
523-
* <p>The main distinction are :
524-
*
525-
* <p>1) {@link Selectors} use {@code ,} to split table names and {@link Tables.TableFilter} use
526-
* `|` to split table names.
527-
*
528-
* <p>2) If there is a need to use a dot ({@code .}) in a regular expression to match any
529-
* character, it is necessary to escape the dot with a backslash, refer to {@link
530-
* MySqlDataSourceOptions#TABLES}.
531-
*
532-
* <p>3) The unescaped {@code .} is used as the separator of database and table name. When
533-
* converting to Debezium style, it is expected to be escaped to match the dot ({@code .})
534-
* literally instead of the meta-character.
535-
*/
536-
private String validateTableAndReturnDebeziumStyle(String tables) {
537-
LOG.info("Rewriting CDC style table capture list: {}", tables);
538-
539-
// In CDC-style table matching, table names could be separated by `,` character.
540-
// Convert it to `|` as it's standard RegEx syntax.
541-
tables =
542-
Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|"));
543-
LOG.info("Expression after replacing comma with vert separator: {}", tables);
544-
545-
// Essentially, we're just trying to swap escaped `\\.` and unescaped `.`.
546-
// In our table matching syntax, `\\.` means RegEx token matcher and `.` means database &
547-
// table name separator.
548-
// On the contrary, while we're matching TableId string, `\\.` means matching the "dot"
549-
// literal and `.` is the meta-character.
550-
551-
// Step 1: escape the dot with a backslash, but keep it as a placeholder (like `$`).
552-
// For example, `db\.*.tbl\.*` => `db$*.tbl$*`
553-
String unescapedTables = tables.replace("\\.", DOT_PLACEHOLDER);
554-
LOG.info("Expression after unescaping dots as RegEx meta-character: {}", unescapedTables);
555-
556-
// Step 2: replace all remaining dots (`.`) to quoted version (`\.`), as a separator between
557-
// database and table names.
558-
// For example, `db$*.tbl$*` => `db$*\.tbl$*`
559-
String unescapedTablesWithDbTblSeparator = unescapedTables.replace(".", "\\.");
560-
LOG.info("Re-escaping dots as TableId delimiter: {}", unescapedTablesWithDbTblSeparator);
561-
562-
// Step 3: restore placeholder to normal RegEx matcher (`.`)
563-
// For example, `db$*\.tbl$*` => `db.*\.tbl.*`
564-
String debeziumStyleTableCaptureList =
565-
unescapedTablesWithDbTblSeparator.replace(DOT_PLACEHOLDER, ".");
566-
LOG.info("Final Debezium-style table capture list: {}", debeziumStyleTableCaptureList);
567-
568-
return debeziumStyleTableCaptureList;
569-
}
570-
571518
/** Replaces the default timezone placeholder with session timezone, if applicable. */
572519
private static ZoneId getServerTimeZone(Configuration config) {
573520
final String serverTimeZone = config.get(SERVER_TIME_ZONE);

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema;
2929
import org.apache.flink.cdc.common.event.TableId;
3030
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
31+
import org.apache.flink.cdc.common.route.TableIdRouter;
3132
import org.apache.flink.cdc.common.schema.Column;
3233
import org.apache.flink.cdc.common.schema.Schema;
3334
import org.apache.flink.cdc.common.sink.MetadataApplier;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.cdc.common.event.TableId;
2323
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
2424
import org.apache.flink.cdc.common.route.RouteRule;
25+
import org.apache.flink.cdc.common.route.TableIdRouter;
2526
import org.apache.flink.cdc.common.schema.Schema;
2627
import org.apache.flink.cdc.common.sink.MetadataApplier;
2728
import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
2727
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
2828
import org.apache.flink.cdc.common.route.RouteRule;
29+
import org.apache.flink.cdc.common.route.TableIdRouter;
2930
import org.apache.flink.cdc.common.schema.Schema;
3031
import org.apache.flink.cdc.common.utils.SchemaUtils;
3132
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
3233
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
33-
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
3434
import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
3535
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeRequest;
3636
import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeResponse;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
import org.apache.flink.cdc.common.event.TableId;
2626
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
2727
import org.apache.flink.cdc.common.route.RouteRule;
28+
import org.apache.flink.cdc.common.route.TableIdRouter;
2829
import org.apache.flink.cdc.common.schema.Schema;
2930
import org.apache.flink.cdc.common.sink.MetadataApplier;
3031
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
3132
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
32-
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
3333
import org.apache.flink.streaming.api.graph.StreamConfig;
3434
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
3535
import org.apache.flink.streaming.api.operators.ChainingStrategy;

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@
2626
import org.apache.flink.cdc.common.event.TableId;
2727
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
2828
import org.apache.flink.cdc.common.route.RouteRule;
29+
import org.apache.flink.cdc.common.route.TableIdRouter;
2930
import org.apache.flink.cdc.common.schema.Schema;
3031
import org.apache.flink.cdc.common.utils.SchemaUtils;
3132
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
3233
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
33-
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
3434
import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
3535
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest;
3636
import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;

0 commit comments

Comments
 (0)