Skip to content

Commit 693b561

Browse files
committed
[FLINK-38779] Route module supports standard RegExp replacement rules
1 parent c1a7d0b commit 693b561

5 files changed

Lines changed: 240 additions & 25 deletions

File tree

docs/content.zh/docs/core-concept/route.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,25 @@ route:
8585
description: route all tables in source_db to sink_db
8686
```
8787

88-
然后,`source_db` 库下所有的表都会被同步到 `sink_db` 库下。
88+
然后,`source_db` 库下所有的表都会被同步到 `sink_db` 库下。
89+
90+
## 高级:基于正则捕获组的替换规则
91+
92+
您可以在 `source-table` 字段中定义正则表达式的捕获组:
93+
94+
```yaml
95+
route:
96+
- source-table: db_(\.*).(\.*)_tbl
97+
sink-table: sink_db_$1.sink_table_$2
98+
```
99+
100+
这里我们创建了两个捕获组,分别用来匹配数据库名 `db_` 之后的后缀和表名 `_tbl` 之前的前缀。
101+
102+
以上游表 `db_foo.bar_tbl` 为例,我们将会从中提取出 `(foo, bar)` 作为捕获组,并且将其依次绑定到 `$1` 和 `$2` 变量中。
103+
因此,这张表将被路由到 `sink_db_foo.sink_table_bar` 下游表中。
104+
105+
{{< hint info >}}
106+
107+
注意:基于正则捕获组的替换规则无法与 `replace-symbol` 选项搭配使用。
108+
109+
{{< /hint >}}

docs/content/docs/core-concept/route.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,25 @@ route:
8686
description: route all tables in source_db to sink_db
8787
```
8888

89-
Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` without hassle.
89+
Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` without hassle.
90+
91+
## Advanced: RegExp Capturing & Replacement Rules
92+
93+
It is also possible to create capturing groups in `source-table` fields like this:
94+
95+
```yaml
96+
route:
97+
- source-table: db_(\.*).(\.*)_tbl
98+
sink-table: sink_db_$1.sink_table_$2
99+
```
100+
101+
Here we create two capturing groups matching database suffix and table prefix.
102+
103+
For upstream table `db_foo.bar_tbl`, capturing group `(foo, bar)` will be extracted and bound to `$1` and `$2`.
104+
As a result, such table will be routed to downstream table `sink_db_foo.sink_table_bar`.
105+
106+
{{< hint info >}}
107+
108+
Standard RegExp capturing could not be used with `replace-symbol` options.
109+
110+
{{< /hint >}}

flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,23 @@
2020
import org.apache.flink.api.java.tuple.Tuple3;
2121
import org.apache.flink.cdc.common.event.TableId;
2222
import org.apache.flink.cdc.common.route.RouteRule;
23-
import org.apache.flink.cdc.common.schema.Selectors;
2423

2524
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
2625
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
2726
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
2827

28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
2931
import javax.annotation.Nonnull;
3032

3133
import java.time.Duration;
3234
import java.util.ArrayList;
35+
import java.util.Arrays;
3336
import java.util.List;
3437
import java.util.Set;
38+
import java.util.regex.Matcher;
39+
import java.util.regex.Pattern;
3540
import java.util.regex.PatternSyntaxException;
3641
import java.util.stream.Collectors;
3742

@@ -41,18 +46,58 @@
4146
*/
4247
public class TableIdRouter {
4348

44-
private final List<Tuple3<Selectors, String, String>> routes;
45-
private final LoadingCache<TableId, List<TableId>> routingCache;
49+
private static final Logger LOG = LoggerFactory.getLogger(TableIdRouter.class);
4650
private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);
4751

52+
private final List<Tuple3<Pattern, String, String>> routes;
53+
private final LoadingCache<TableId, List<TableId>> routingCache;
54+
55+
private static final String DOT_PLACEHOLDER = "_dot_placeholder_";
56+
57+
private static Pattern validateTableListToRegExpPattern(String tables) {
58+
LOG.info("Rewriting CDC style table capture list: {}", tables);
59+
60+
// In CDC-style table matching, table names could be separated by `,` character.
61+
// Convert it to `|` as it's standard RegEx syntax.
62+
tables =
63+
Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|"));
64+
LOG.info("Expression after replacing comma with vert separator: {}", tables);
65+
66+
// Essentially, we're just trying to swap escaped `\\.` and unescaped `.`.
67+
// In our table matching syntax, `\\.` means RegEx token matcher and `.` means database &
68+
// table name separator.
69+
// On the contrary, while we're matching TableId string, `\\.` means matching the "dot"
70+
// literal and `.` is the meta-character.
71+
72+
// Step 1: escape the dot with a backslash, but keep it as a placeholder (like `$`).
73+
// For example, `db\.*.tbl\.*` => `db$*.tbl$*`
74+
String unescapedTables = tables.replace("\\.", DOT_PLACEHOLDER);
75+
LOG.info("Expression after un-escaping dots as RegEx meta-character: {}", unescapedTables);
76+
77+
// Step 2: replace all remaining dots (`.`) to quoted version (`\.`), as a separator between
78+
// database and table names.
79+
// For example, `db$*.tbl$*` => `db$*\.tbl$*`
80+
String unescapedTablesWithDbTblSeparator = unescapedTables.replace(".", "\\.");
81+
LOG.info("Re-escaping dots as TableId delimiter: {}", unescapedTablesWithDbTblSeparator);
82+
83+
// Step 3: restore placeholder to normal RegEx matcher (`.`)
84+
// For example, `db$*\.tbl$*` => `db.*\.tbl.*`
85+
String standardRegExpTableCaptureList =
86+
unescapedTablesWithDbTblSeparator.replace(DOT_PLACEHOLDER, ".");
87+
LOG.info("Final standard RegExp table capture list: {}", standardRegExpTableCaptureList);
88+
89+
return Pattern.compile(standardRegExpTableCaptureList);
90+
}
91+
4892
public TableIdRouter(List<RouteRule> routingRules) {
4993
this.routes = new ArrayList<>();
5094
for (RouteRule rule : routingRules) {
5195
try {
52-
String tableInclusions = rule.sourceTable;
53-
Selectors selectors =
54-
new Selectors.SelectorsBuilder().includeTables(tableInclusions).build();
55-
routes.add(new Tuple3<>(selectors, rule.sinkTable, rule.replaceSymbol));
96+
routes.add(
97+
new Tuple3<>(
98+
validateTableListToRegExpPattern(rule.sourceTable),
99+
rule.sinkTable,
100+
rule.replaceSymbol));
56101
} catch (PatternSyntaxException e) {
57102
throw new IllegalArgumentException(
58103
String.format(
@@ -80,7 +125,7 @@ public List<TableId> route(TableId sourceTableId) {
80125
private List<TableId> calculateRoute(TableId sourceTableId) {
81126
List<TableId> routedTableIds =
82127
routes.stream()
83-
.filter(route -> route.f0.isMatch(sourceTableId))
128+
.filter(route -> matches(route.f0, sourceTableId))
84129
.map(route -> resolveReplacement(sourceTableId, route))
85130
.collect(Collectors.toList());
86131
if (routedTableIds.isEmpty()) {
@@ -90,9 +135,14 @@ private List<TableId> calculateRoute(TableId sourceTableId) {
90135
}
91136

92137
private TableId resolveReplacement(
93-
TableId originalTable, Tuple3<Selectors, String, String> route) {
138+
TableId originalTable, Tuple3<Pattern, String, String> route) {
94139
if (route.f2 != null) {
95140
return TableId.parse(route.f1.replace(route.f2, originalTable.getTableName()));
141+
} else {
142+
Matcher matcher = route.f0.matcher(originalTable.toString());
143+
if (matcher.find()) {
144+
return TableId.parse(matcher.replaceAll(route.f1));
145+
}
96146
}
97147
return TableId.parse(route.f1);
98148
}
@@ -111,18 +161,16 @@ public List<Set<TableId>> groupSourceTablesByRouteRule(Set<TableId> tableIdSet)
111161
if (routes.isEmpty()) {
112162
return new ArrayList<>();
113163
}
114-
List<Set<TableId>> routedTableIds =
115-
routes.stream()
116-
.map(
117-
route -> {
118-
return tableIdSet.stream()
119-
.filter(
120-
tableId -> {
121-
return route.f0.isMatch(tableId);
122-
})
123-
.collect(Collectors.toSet());
124-
})
125-
.collect(Collectors.toList());
126-
return routedTableIds;
164+
return routes.stream()
165+
.map(
166+
route ->
167+
tableIdSet.stream()
168+
.filter(tableId -> matches(route.f0, tableId))
169+
.collect(Collectors.toSet()))
170+
.collect(Collectors.toList());
171+
}
172+
173+
private static boolean matches(Pattern pattern, TableId tableId) {
174+
return pattern.matcher(tableId.toString()).matches();
127175
}
128176
}

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,14 @@ public abstract class SchemaTestBase {
6262
new RouteRule("db_5.table_\\.*", "db_5.prefix_<>_suffix", "<>"),
6363

6464
// Irrelevant routes
65-
new RouteRule("foo", "bar", null));
65+
new RouteRule("foo", "bar", null),
66+
67+
// Standard RegExp capturing rules
68+
new RouteRule(
69+
"re_\\d+.table_(\\.*)",
70+
"database.another_table_with_$1$1$1_index",
71+
null),
72+
new RouteRule("(inv_\\d+).(table_\\.*)", "$2.$1", null));
6673

6774
protected static final TableIdRouter TABLE_ID_ROUTER = new TableIdRouter(ROUTING_RULES);
6875

flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterTest.java

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.runtime.operators.schema.common;
1919

2020
import org.apache.flink.cdc.common.event.TableId;
21+
import org.apache.flink.cdc.common.route.RouteRule;
2122

2223
import org.junit.jupiter.api.Test;
2324

@@ -26,6 +27,7 @@
2627
import java.util.List;
2728
import java.util.Set;
2829
import java.util.stream.Collectors;
30+
import java.util.stream.Stream;
2931

3032
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
3133

@@ -129,6 +131,122 @@ void testGroupSourceTablesByRouteRule() {
129131
TableId.parse("db_5.table_1"),
130132
TableId.parse("db_5.table_2"),
131133
TableId.parse("db_5.table_3"))),
134+
new HashSet<>(),
135+
new HashSet<>(),
132136
new HashSet<>());
133137
}
138+
139+
@Test
140+
void testRegExpCapturingGroupExpression() {
141+
assertThat(Stream.of("re_1.table_1", "re_22.table_22", "re_333.table_333"))
142+
.map(TableIdRouterTest::testRoute)
143+
.map(List::toString)
144+
.containsExactly(
145+
"[database.another_table_with_111_index]",
146+
"[database.another_table_with_222222_index]",
147+
"[database.another_table_with_333333333_index]");
148+
149+
assertThat(Stream.of("inv_1.table_foo", "inv_22.table_bar", "inv_333.table_baz"))
150+
.map(TableIdRouterTest::testRoute)
151+
.map(List::toString)
152+
.containsExactly("[table_foo.inv_1]", "[table_bar.inv_22]", "[table_baz.inv_333]");
153+
}
154+
155+
private static List<String> testStdRegExpRoute(
156+
String sourceRouteRule, String sinkRouteRule, List<String> sourceTables) {
157+
TableIdRouter router =
158+
new TableIdRouter(List.of(new RouteRule(sourceRouteRule, sinkRouteRule)));
159+
return sourceTables.stream()
160+
.map(TableId::parse)
161+
.map(router::route)
162+
.map(List::toString)
163+
.collect(Collectors.toList());
164+
}
165+
166+
@Test
167+
void testRegExpComplexRouting() {
168+
// Capture the entire database.
169+
List<String> tablesToRoute =
170+
List.of("db1.tbl1", "db1.tbl2", "db1.tbl3", "db2.tbl2", "db2.tbl3", "db3.tbl3");
171+
assertThat(testStdRegExpRoute("db1.(\\.*)", "db1.combined", tablesToRoute))
172+
.containsExactly(
173+
"[db1.combined]",
174+
"[db1.combined]",
175+
"[db1.combined]",
176+
"[db2.tbl2]",
177+
"[db2.tbl3]",
178+
"[db3.tbl3]");
179+
180+
// Capture the entire database and append prefixes.
181+
assertThat(testStdRegExpRoute("db1.(\\.*)", "db1.pre_$1", tablesToRoute))
182+
.containsExactly(
183+
"[db1.pre_tbl1]",
184+
"[db1.pre_tbl2]",
185+
"[db1.pre_tbl3]",
186+
"[db2.tbl2]",
187+
"[db2.tbl3]",
188+
"[db3.tbl3]");
189+
190+
// Capture the entire database and append suffixes.
191+
assertThat(testStdRegExpRoute("db1.(\\.*)", "db1.$1_suf", tablesToRoute))
192+
.containsExactly(
193+
"[db1.tbl1_suf]",
194+
"[db1.tbl2_suf]",
195+
"[db1.tbl3_suf]",
196+
"[db2.tbl2]",
197+
"[db2.tbl3]",
198+
"[db3.tbl3]");
199+
200+
// Capture the entire database and append extract parts.
201+
assertThat(testStdRegExpRoute("db1.tbl(\\.*)", "db1.no$1", tablesToRoute))
202+
.containsExactly(
203+
"[db1.no1]",
204+
"[db1.no2]",
205+
"[db1.no3]",
206+
"[db2.tbl2]",
207+
"[db2.tbl3]",
208+
"[db3.tbl3]");
209+
210+
// Capture databases and append database prefix.
211+
assertThat(testStdRegExpRoute("(\\.*).tbl3", "pre_$1.tbl3", tablesToRoute))
212+
.containsExactly(
213+
"[db1.tbl1]",
214+
"[db1.tbl2]",
215+
"[pre_db1.tbl3]",
216+
"[db2.tbl2]",
217+
"[pre_db2.tbl3]",
218+
"[pre_db3.tbl3]");
219+
220+
// Capture databases and append database suffix.
221+
assertThat(testStdRegExpRoute("(\\.*).tbl3", "$1_suf.tbl3", tablesToRoute))
222+
.containsExactly(
223+
"[db1.tbl1]",
224+
"[db1.tbl2]",
225+
"[db1_suf.tbl3]",
226+
"[db2.tbl2]",
227+
"[db2_suf.tbl3]",
228+
"[db3_suf.tbl3]");
229+
230+
// Capture databases and extract database parts.
231+
assertThat(testStdRegExpRoute("db(\\.*).(tbl\\.*)", "no$1.$2", tablesToRoute))
232+
.containsExactly(
233+
"[no1.tbl1]",
234+
"[no1.tbl2]",
235+
"[no1.tbl3]",
236+
"[no2.tbl2]",
237+
"[no2.tbl3]",
238+
"[no3.tbl3]");
239+
240+
// Capture multiple parts and append extra tags.
241+
assertThat(
242+
testStdRegExpRoute(
243+
"db(\\.*).tbl(\\.*)", "Database$1.Collection$2", tablesToRoute))
244+
.containsExactly(
245+
"[Database1.Collection1]",
246+
"[Database1.Collection2]",
247+
"[Database1.Collection3]",
248+
"[Database2.Collection2]",
249+
"[Database2.Collection3]",
250+
"[Database3.Collection3]");
251+
}
134252
}

0 commit comments

Comments
 (0)