2020import org .apache .flink .api .java .tuple .Tuple3 ;
2121import org .apache .flink .cdc .common .event .TableId ;
2222import org .apache .flink .cdc .common .route .RouteRule ;
23- import org .apache .flink .cdc .common .schema .Selectors ;
2423
2524import org .apache .flink .shaded .guava31 .com .google .common .cache .CacheBuilder ;
2625import org .apache .flink .shaded .guava31 .com .google .common .cache .CacheLoader ;
2726import org .apache .flink .shaded .guava31 .com .google .common .cache .LoadingCache ;
2827
28+ import org .slf4j .Logger ;
29+ import org .slf4j .LoggerFactory ;
30+
2931import javax .annotation .Nonnull ;
3032
3133import java .time .Duration ;
3234import java .util .ArrayList ;
35+ import java .util .Arrays ;
3336import java .util .List ;
3437import java .util .Set ;
38+ import java .util .regex .Matcher ;
39+ import java .util .regex .Pattern ;
3540import java .util .regex .PatternSyntaxException ;
3641import java .util .stream .Collectors ;
3742
4146 */
4247public class TableIdRouter {
4348
44- private final List <Tuple3 <Selectors , String , String >> routes ;
45- private final LoadingCache <TableId , List <TableId >> routingCache ;
49+ private static final Logger LOG = LoggerFactory .getLogger (TableIdRouter .class );
4650 private static final Duration CACHE_EXPIRE_DURATION = Duration .ofDays (1 );
4751
52+ private final List <Tuple3 <Pattern , String , String >> routes ;
53+ private final LoadingCache <TableId , List <TableId >> routingCache ;
54+
55+ private static final String DOT_PLACEHOLDER = "_dot_placeholder_" ;
56+
57+ private static Pattern validateTableListToRegExpPattern (String tables ) {
58+ LOG .info ("Rewriting CDC style table capture list: {}" , tables );
59+
60+ // In CDC-style table matching, table names could be separated by `,` character.
61+ // Convert it to `|` as it's standard RegEx syntax.
62+ tables =
63+ Arrays .stream (tables .split ("," )).map (String ::trim ).collect (Collectors .joining ("|" ));
64+ LOG .info ("Expression after replacing comma with vert separator: {}" , tables );
65+
66+ // Essentially, we're just trying to swap escaped `\\.` and unescaped `.`.
67+ // In our table matching syntax, `\\.` means RegEx token matcher and `.` means database &
68+ // table name separator.
69+ // On the contrary, while we're matching TableId string, `\\.` means matching the "dot"
70+ // literal and `.` is the meta-character.
71+
72+ // Step 1: escape the dot with a backslash, but keep it as a placeholder (like `$`).
73+ // For example, `db\.*.tbl\.*` => `db$*.tbl$*`
74+ String unescapedTables = tables .replace ("\\ ." , DOT_PLACEHOLDER );
75+ LOG .info ("Expression after un-escaping dots as RegEx meta-character: {}" , unescapedTables );
76+
77+ // Step 2: replace all remaining dots (`.`) to quoted version (`\.`), as a separator between
78+ // database and table names.
79+ // For example, `db$*.tbl$*` => `db$*\.tbl$*`
80+ String unescapedTablesWithDbTblSeparator = unescapedTables .replace ("." , "\\ ." );
81+ LOG .info ("Re-escaping dots as TableId delimiter: {}" , unescapedTablesWithDbTblSeparator );
82+
83+ // Step 3: restore placeholder to normal RegEx matcher (`.`)
84+ // For example, `db$*\.tbl$*` => `db.*\.tbl.*`
85+ String standardRegExpTableCaptureList =
86+ unescapedTablesWithDbTblSeparator .replace (DOT_PLACEHOLDER , "." );
87+ LOG .info ("Final standard RegExp table capture list: {}" , standardRegExpTableCaptureList );
88+
89+ return Pattern .compile (standardRegExpTableCaptureList );
90+ }
91+
4892 public TableIdRouter (List <RouteRule > routingRules ) {
4993 this .routes = new ArrayList <>();
5094 for (RouteRule rule : routingRules ) {
5195 try {
52- String tableInclusions = rule .sourceTable ;
53- Selectors selectors =
54- new Selectors .SelectorsBuilder ().includeTables (tableInclusions ).build ();
55- routes .add (new Tuple3 <>(selectors , rule .sinkTable , rule .replaceSymbol ));
96+ routes .add (
97+ new Tuple3 <>(
98+ validateTableListToRegExpPattern (rule .sourceTable ),
99+ rule .sinkTable ,
100+ rule .replaceSymbol ));
56101 } catch (PatternSyntaxException e ) {
57102 throw new IllegalArgumentException (
58103 String .format (
@@ -80,7 +125,7 @@ public List<TableId> route(TableId sourceTableId) {
80125 private List <TableId > calculateRoute (TableId sourceTableId ) {
81126 List <TableId > routedTableIds =
82127 routes .stream ()
83- .filter (route -> route .f0 . isMatch ( sourceTableId ))
128+ .filter (route -> matches ( route .f0 , sourceTableId ))
84129 .map (route -> resolveReplacement (sourceTableId , route ))
85130 .collect (Collectors .toList ());
86131 if (routedTableIds .isEmpty ()) {
@@ -90,9 +135,14 @@ private List<TableId> calculateRoute(TableId sourceTableId) {
90135 }
91136
92137 private TableId resolveReplacement (
93- TableId originalTable , Tuple3 <Selectors , String , String > route ) {
138+ TableId originalTable , Tuple3 <Pattern , String , String > route ) {
94139 if (route .f2 != null ) {
95140 return TableId .parse (route .f1 .replace (route .f2 , originalTable .getTableName ()));
141+ } else {
142+ Matcher matcher = route .f0 .matcher (originalTable .toString ());
143+ if (matcher .find ()) {
144+ return TableId .parse (matcher .replaceAll (route .f1 ));
145+ }
96146 }
97147 return TableId .parse (route .f1 );
98148 }
@@ -111,18 +161,16 @@ public List<Set<TableId>> groupSourceTablesByRouteRule(Set<TableId> tableIdSet)
111161 if (routes .isEmpty ()) {
112162 return new ArrayList <>();
113163 }
114- List <Set <TableId >> routedTableIds =
115- routes .stream ()
116- .map (
117- route -> {
118- return tableIdSet .stream ()
119- .filter (
120- tableId -> {
121- return route .f0 .isMatch (tableId );
122- })
123- .collect (Collectors .toSet ());
124- })
125- .collect (Collectors .toList ());
126- return routedTableIds ;
164+ return routes .stream ()
165+ .map (
166+ route ->
167+ tableIdSet .stream ()
168+ .filter (tableId -> matches (route .f0 , tableId ))
169+ .collect (Collectors .toSet ()))
170+ .collect (Collectors .toList ());
171+ }
172+
173+ private static boolean matches (Pattern pattern , TableId tableId ) {
174+ return pattern .matcher (tableId .toString ()).matches ();
127175 }
128176}
0 commit comments