1515 * limitations under the License.
1616 */
1717
18- package org .apache .flink .cdc .runtime . operators . schema . common ;
18+ package org .apache .flink .cdc .common . route ;
1919
2020import org .apache .flink .api .java .tuple .Tuple3 ;
21+ import org .apache .flink .cdc .common .annotation .PublicEvolving ;
2122import org .apache .flink .cdc .common .event .TableId ;
22- import org .apache .flink .cdc .common .route .RouteRule ;
2323import org .apache .flink .cdc .common .schema .Selectors ;
2424
2525import org .apache .flink .shaded .guava31 .com .google .common .cache .CacheBuilder ;
2626import org .apache .flink .shaded .guava31 .com .google .common .cache .CacheLoader ;
2727import org .apache .flink .shaded .guava31 .com .google .common .cache .LoadingCache ;
2828
29+ import org .slf4j .Logger ;
30+ import org .slf4j .LoggerFactory ;
31+
2932import javax .annotation .Nonnull ;
3033
3134import java .time .Duration ;
3235import java .util .ArrayList ;
36+ import java .util .Arrays ;
3337import java .util .List ;
3438import java .util .Set ;
39+ import java .util .regex .Matcher ;
40+ import java .util .regex .Pattern ;
3541import java .util .regex .PatternSyntaxException ;
3642import java .util .stream .Collectors ;
3743
3844/**
3945 * Calculates how upstream data change events should be dispatched to downstream tables. Returns one
4046 * or many destination Table IDs based on provided routing rules.
4147 */
48+ @ PublicEvolving
4249public class TableIdRouter {
4350
44- private final List <Tuple3 <Selectors , String , String >> routes ;
45- private final LoadingCache <TableId , List <TableId >> routingCache ;
51+ private static final Logger LOG = LoggerFactory .getLogger (TableIdRouter .class );
4652 private static final Duration CACHE_EXPIRE_DURATION = Duration .ofDays (1 );
4753
54+ private final List <Tuple3 <Pattern , String , String >> routes ;
55+ private final LoadingCache <TableId , List <TableId >> routingCache ;
56+
57+ private static final String DOT_PLACEHOLDER = "_dot_placeholder_" ;
58+
59+ /**
60+ * Currently, The supported regular syntax is not exactly the same in {@link Selectors}.
61+ *
62+ * <p>The main discrepancies are :
63+ *
64+ * <p>1) {@link Selectors} use {@code ,} to split table names instead of `|`.
65+ *
66+ * <p>2) If there is a need to use a dot ({@code .}) in a regular expression to match any
67+ * character, it is necessary to escape the dot with a backslash.
68+ *
69+ * <p>3) The unescaped {@code .} is used as the separator of database and table name. When
70+ * converting to Debezium style, it is expected to be escaped to match the dot ({@code .})
71+ * literally instead of the meta-character.
72+ */
73+ public static String convertTableListToRegExpPattern (String tables ) {
74+ LOG .info ("Rewriting CDC style table capture list: {}" , tables );
75+
76+ // In CDC-style table matching, table names could be separated by `,` character.
77+ // Convert it to `|` as it's standard RegEx syntax.
78+ tables =
79+ Arrays .stream (tables .split ("," )).map (String ::trim ).collect (Collectors .joining ("|" ));
80+ LOG .info ("Expression after replacing comma with vert separator: {}" , tables );
81+
82+ // Essentially, we're just trying to swap escaped `\\.` and unescaped `.`.
83+ // In our table matching syntax, `\\.` means RegEx token matcher and `.` means database &
84+ // table name separator.
85+ // On the contrary, while we're matching TableId string, `\\.` means matching the "dot"
86+ // literal and `.` is the meta-character.
87+
88+ // Step 1: escape the dot with a backslash, but keep it as a placeholder (like `$`).
89+ // For example, `db\.*.tbl\.*` => `db$*.tbl$*`
90+ String unescapedTables = tables .replace ("\\ ." , DOT_PLACEHOLDER );
91+ LOG .info ("Expression after un-escaping dots as RegEx meta-character: {}" , unescapedTables );
92+
93+ // Step 2: replace all remaining dots (`.`) to quoted version (`\.`), as a separator between
94+ // database and table names.
95+ // For example, `db$*.tbl$*` => `db$*\.tbl$*`
96+ String unescapedTablesWithDbTblSeparator = unescapedTables .replace ("." , "\\ ." );
97+ LOG .info ("Re-escaping dots as TableId delimiter: {}" , unescapedTablesWithDbTblSeparator );
98+
99+ // Step 3: restore placeholder to normal RegEx matcher (`.`)
100+ // For example, `db$*\.tbl$*` => `db.*\.tbl.*`
101+ String standardRegExpTableCaptureList =
102+ unescapedTablesWithDbTblSeparator .replace (DOT_PLACEHOLDER , "." );
103+ LOG .info ("Final standard RegExp table capture list: {}" , standardRegExpTableCaptureList );
104+
105+ return standardRegExpTableCaptureList ;
106+ }
107+
48108 public TableIdRouter (List <RouteRule > routingRules ) {
49109 this .routes = new ArrayList <>();
50110 for (RouteRule rule : routingRules ) {
51111 try {
52- String tableInclusions = rule .sourceTable ;
53- Selectors selectors =
54- new Selectors .SelectorsBuilder ().includeTables (tableInclusions ).build ();
55- routes .add (new Tuple3 <>(selectors , rule .sinkTable , rule .replaceSymbol ));
112+ routes .add (
113+ new Tuple3 <>(
114+ Pattern .compile (convertTableListToRegExpPattern (rule .sourceTable )),
115+ rule .sinkTable ,
116+ rule .replaceSymbol ));
56117 } catch (PatternSyntaxException e ) {
57118 throw new IllegalArgumentException (
58119 String .format (
@@ -80,7 +141,7 @@ public List<TableId> route(TableId sourceTableId) {
80141 private List <TableId > calculateRoute (TableId sourceTableId ) {
81142 List <TableId > routedTableIds =
82143 routes .stream ()
83- .filter (route -> route .f0 . isMatch ( sourceTableId ))
144+ .filter (route -> matches ( route .f0 , sourceTableId ))
84145 .map (route -> resolveReplacement (sourceTableId , route ))
85146 .collect (Collectors .toList ());
86147 if (routedTableIds .isEmpty ()) {
@@ -90,9 +151,14 @@ private List<TableId> calculateRoute(TableId sourceTableId) {
90151 }
91152
92153 private TableId resolveReplacement (
93- TableId originalTable , Tuple3 <Selectors , String , String > route ) {
154+ TableId originalTable , Tuple3 <Pattern , String , String > route ) {
94155 if (route .f2 != null ) {
95156 return TableId .parse (route .f1 .replace (route .f2 , originalTable .getTableName ()));
157+ } else {
158+ Matcher matcher = route .f0 .matcher (originalTable .toString ());
159+ if (matcher .find ()) {
160+ return TableId .parse (matcher .replaceAll (route .f1 ));
161+ }
96162 }
97163 return TableId .parse (route .f1 );
98164 }
@@ -111,18 +177,16 @@ public List<Set<TableId>> groupSourceTablesByRouteRule(Set<TableId> tableIdSet)
111177 if (routes .isEmpty ()) {
112178 return new ArrayList <>();
113179 }
114- List <Set <TableId >> routedTableIds =
115- routes .stream ()
116- .map (
117- route -> {
118- return tableIdSet .stream ()
119- .filter (
120- tableId -> {
121- return route .f0 .isMatch (tableId );
122- })
123- .collect (Collectors .toSet ());
124- })
125- .collect (Collectors .toList ());
126- return routedTableIds ;
180+ return routes .stream ()
181+ .map (
182+ route ->
183+ tableIdSet .stream ()
184+ .filter (tableId -> matches (route .f0 , tableId ))
185+ .collect (Collectors .toSet ()))
186+ .collect (Collectors .toList ());
187+ }
188+
189+ private static boolean matches (Pattern pattern , TableId tableId ) {
190+ return pattern .matcher (tableId .toString ()).matches ();
127191 }
128192}
0 commit comments