diff --git a/docs/content.zh/docs/core-concept/route.md b/docs/content.zh/docs/core-concept/route.md index fa1a9535309..774915d3d6f 100644 --- a/docs/content.zh/docs/core-concept/route.md +++ b/docs/content.zh/docs/core-concept/route.md @@ -85,4 +85,25 @@ route: description: route all tables in source_db to sink_db ``` -然后,`source_db` 库下所有的表都会被同步到 `sink_db` 库下。 \ No newline at end of file +然后,`source_db` 库下所有的表都会被同步到 `sink_db` 库下。 + +## 高级:基于正则捕获组的替换规则 + +您可以在 `source-table` 字段中定义正则表达式的捕获组: + +```yaml +route: + - source-table: db_(\.*).(\.*)_tbl + sink-table: sink_db_$1.sink_table_$2 +``` + +这里我们创建了两个捕获组,分别用来匹配数据库名 `db_` 之后的后缀和表名 `_tbl` 之前的前缀。 + +以上游表 `db_foo.bar_tbl` 为例,我们将会从中提取出 `(foo, bar)` 作为捕获组,并且将其依次绑定到 `$1` 和 `$2` 变量中。 +因此,这张表将被路由到 `sink_db_foo.sink_table_bar` 下游表中。 + +{{< hint info >}} + +注意:基于正则捕获组的替换规则无法与 `replace-symbol` 选项搭配使用。 + +{{< /hint >}} diff --git a/docs/content/docs/core-concept/route.md b/docs/content/docs/core-concept/route.md index ca5855d5861..394f5a56a9e 100644 --- a/docs/content/docs/core-concept/route.md +++ b/docs/content/docs/core-concept/route.md @@ -86,4 +86,25 @@ route: description: route all tables in source_db to sink_db ``` -Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` without hassle. \ No newline at end of file +Then, all tables including `source_db.XXX` will be routed to `sink_db.XXX` without hassle. + +## Advanced: RegExp Capturing & Replacement Rules + +It is also possible to create capturing groups in `source-table` fields like this: + +```yaml +route: + - source-table: db_(\.*).(\.*)_tbl + sink-table: sink_db_$1.sink_table_$2 +``` + +Here we create two capturing groups matching database suffix and table prefix. + +For upstream table `db_foo.bar_tbl`, capturing group `(foo, bar)` will be extracted and bound to `$1` and `$2`. +As a result, such table will be routed to downstream table `sink_db_foo.sink_table_bar`. + +{{< hint info >}} + +Standard RegExp capturing could not be used with `replace-symbol` options. + +{{< /hint >}} diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java similarity index 50% rename from flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java rename to flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java index 6131b7f4c49..06126b88a1d 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouter.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java @@ -15,23 +15,29 @@ * limitations under the License. */ -package org.apache.flink.cdc.runtime.operators.schema.common; +package org.apache.flink.cdc.common.route; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.cdc.common.annotation.PublicEvolving; import org.apache.flink.cdc.common.event.TableId; -import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder; import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader; import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nonnull; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; @@ -39,20 +45,75 @@ * Calculates how upstream data change events should be dispatched to downstream tables. Returns one * or many destination Table IDs based on provided routing rules. */ +@PublicEvolving public class TableIdRouter { - private final List> routes; - private final LoadingCache> routingCache; + private static final Logger LOG = LoggerFactory.getLogger(TableIdRouter.class); private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1); + private final List> routes; + private final LoadingCache> routingCache; + + private static final String DOT_PLACEHOLDER = "_dot_placeholder_"; + + /** + * Currently, The supported regular syntax is not exactly the same in {@link Selectors}. + * + *

The main discrepancies are : + * + *

1) {@link Selectors} use {@code ,} to split table names instead of `|`. + * + *

2) If there is a need to use a dot ({@code .}) in a regular expression to match any + * character, it is necessary to escape the dot with a backslash. + * + *

3) The unescaped {@code .} is used as the separator of database and table name. When + * converting to Debezium style, it is expected to be escaped to match the dot ({@code .}) + * literally instead of the meta-character. + */ + public static String convertTableListToRegExpPattern(String tables) { + LOG.info("Rewriting CDC style table capture list: {}", tables); + + // In CDC-style table matching, table names could be separated by `,` character. + // Convert it to `|` as it's standard RegEx syntax. + tables = + Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|")); + LOG.info("Expression after replacing comma with vert separator: {}", tables); + + // Essentially, we're just trying to swap escaped `\\.` and unescaped `.`. + // In our table matching syntax, `\\.` means RegEx token matcher and `.` means database & + // table name separator. + // On the contrary, while we're matching TableId string, `\\.` means matching the "dot" + // literal and `.` is the meta-character. + + // Step 1: escape the dot with a backslash, but keep it as a placeholder (like `$`). + // For example, `db\.*.tbl\.*` => `db$*.tbl$*` + String unescapedTables = tables.replace("\\.", DOT_PLACEHOLDER); + LOG.info("Expression after un-escaping dots as RegEx meta-character: {}", unescapedTables); + + // Step 2: replace all remaining dots (`.`) to quoted version (`\.`), as a separator between + // database and table names. + // For example, `db$*.tbl$*` => `db$*\.tbl$*` + String unescapedTablesWithDbTblSeparator = unescapedTables.replace(".", "\\."); + LOG.info("Re-escaping dots as TableId delimiter: {}", unescapedTablesWithDbTblSeparator); + + // Step 3: restore placeholder to normal RegEx matcher (`.`) + // For example, `db$*\.tbl$*` => `db.*\.tbl.*` + String standardRegExpTableCaptureList = + unescapedTablesWithDbTblSeparator.replace(DOT_PLACEHOLDER, "."); + LOG.info("Final standard RegExp table capture list: {}", standardRegExpTableCaptureList); + + return standardRegExpTableCaptureList; + } + public TableIdRouter(List routingRules) { this.routes = new ArrayList<>(); for (RouteRule rule : routingRules) { try { - String tableInclusions = rule.sourceTable; - Selectors selectors = - new Selectors.SelectorsBuilder().includeTables(tableInclusions).build(); - routes.add(new Tuple3<>(selectors, rule.sinkTable, rule.replaceSymbol)); + routes.add( + new Tuple3<>( + Pattern.compile(convertTableListToRegExpPattern(rule.sourceTable)), + rule.sinkTable, + rule.replaceSymbol)); } catch (PatternSyntaxException e) { throw new IllegalArgumentException( String.format( @@ -80,7 +141,7 @@ public List route(TableId sourceTableId) { private List calculateRoute(TableId sourceTableId) { List routedTableIds = routes.stream() - .filter(route -> route.f0.isMatch(sourceTableId)) + .filter(route -> matches(route.f0, sourceTableId)) .map(route -> resolveReplacement(sourceTableId, route)) .collect(Collectors.toList()); if (routedTableIds.isEmpty()) { @@ -90,9 +151,14 @@ private List calculateRoute(TableId sourceTableId) { } private TableId resolveReplacement( - TableId originalTable, Tuple3 route) { + TableId originalTable, Tuple3 route) { if (route.f2 != null) { return TableId.parse(route.f1.replace(route.f2, originalTable.getTableName())); + } else { + Matcher matcher = route.f0.matcher(originalTable.toString()); + if (matcher.find()) { + return TableId.parse(matcher.replaceAll(route.f1)); + } } return TableId.parse(route.f1); } @@ -111,18 +177,16 @@ public List> groupSourceTablesByRouteRule(Set tableIdSet) if (routes.isEmpty()) { return new ArrayList<>(); } - List> routedTableIds = - routes.stream() - .map( - route -> { - return tableIdSet.stream() - .filter( - tableId -> { - return route.f0.isMatch(tableId); - }) - .collect(Collectors.toSet()); - }) - .collect(Collectors.toList()); - return routedTableIds; + return routes.stream() + .map( + route -> + tableIdSet.stream() + .filter(tableId -> matches(route.f0, tableId)) + .collect(Collectors.toSet())) + .collect(Collectors.toList()); + } + + private static boolean matches(Pattern pattern, TableId tableId) { + return pattern.matcher(tableId.toString()).matches(); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index b775cebb05e..1b3540da0bb 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -30,7 +30,6 @@ import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource; -import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; @@ -45,7 +44,6 @@ import com.mysql.cj.conf.PropertyKey; import io.debezium.relational.RelationalDatabaseConnectorConfig; -import io.debezium.relational.Tables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +61,7 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.flink.cdc.common.route.TableIdRouter.convertTableListToRegExpPattern; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.CHUNK_META_GROUP_SIZE; @@ -231,7 +230,7 @@ public DataSource createDataSource(Context context) { } if (scanBinlogNewlyAddedTableEnabled) { - String newTables = validateTableAndReturnDebeziumStyle(tables); + String newTables = convertTableListToRegExpPattern(tables); configFactory.tableList(newTables); configFactory.excludeTableList(tablesExclude); @@ -516,58 +515,6 @@ && doubleCompare(distributionFactorLower, 1.0d) <= 0, private static final String DOT_PLACEHOLDER = "_$dot_placeholder$_"; - /** - * Currently, The supported regular syntax is not exactly the same in {@link Selectors} and - * {@link Tables.TableFilter}. - * - *

The main distinction are : - * - *

1) {@link Selectors} use {@code ,} to split table names and {@link Tables.TableFilter} use - * `|` to split table names. - * - *

2) If there is a need to use a dot ({@code .}) in a regular expression to match any - * character, it is necessary to escape the dot with a backslash, refer to {@link - * MySqlDataSourceOptions#TABLES}. - * - *

3) The unescaped {@code .} is used as the separator of database and table name. When - * converting to Debezium style, it is expected to be escaped to match the dot ({@code .}) - * literally instead of the meta-character. - */ - private String validateTableAndReturnDebeziumStyle(String tables) { - LOG.info("Rewriting CDC style table capture list: {}", tables); - - // In CDC-style table matching, table names could be separated by `,` character. - // Convert it to `|` as it's standard RegEx syntax. - tables = - Arrays.stream(tables.split(",")).map(String::trim).collect(Collectors.joining("|")); - LOG.info("Expression after replacing comma with vert separator: {}", tables); - - // Essentially, we're just trying to swap escaped `\\.` and unescaped `.`. - // In our table matching syntax, `\\.` means RegEx token matcher and `.` means database & - // table name separator. - // On the contrary, while we're matching TableId string, `\\.` means matching the "dot" - // literal and `.` is the meta-character. - - // Step 1: escape the dot with a backslash, but keep it as a placeholder (like `$`). - // For example, `db\.*.tbl\.*` => `db$*.tbl$*` - String unescapedTables = tables.replace("\\.", DOT_PLACEHOLDER); - LOG.info("Expression after unescaping dots as RegEx meta-character: {}", unescapedTables); - - // Step 2: replace all remaining dots (`.`) to quoted version (`\.`), as a separator between - // database and table names. - // For example, `db$*.tbl$*` => `db$*\.tbl$*` - String unescapedTablesWithDbTblSeparator = unescapedTables.replace(".", "\\."); - LOG.info("Re-escaping dots as TableId delimiter: {}", unescapedTablesWithDbTblSeparator); - - // Step 3: restore placeholder to normal RegEx matcher (`.`) - // For example, `db$*\.tbl$*` => `db.*\.tbl.*` - String debeziumStyleTableCaptureList = - unescapedTablesWithDbTblSeparator.replace(DOT_PLACEHOLDER, "."); - LOG.info("Final Debezium-style table capture list: {}", debeziumStyleTableCaptureList); - - return debeziumStyleTableCaptureList; - } - /** Replaces the default timezone placeholder with session timezone, if applicable. */ private static ZoneId getServerTimeZone(Configuration config) { final String serverTimeZone = config.get(SERVER_TIME_ZONE); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java index a2659561e58..8d56f72586e 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivator.java @@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEventWithPreSchema; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; +import org.apache.flink.cdc.common.route.TableIdRouter; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.MetadataApplier; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java index 8bea59548c6..640fb765caf 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; +import org.apache.flink.cdc.common.route.TableIdRouter; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java index f31cbabdcf3..15c0571ba16 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java @@ -26,11 +26,11 @@ import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; +import org.apache.flink.cdc.common.route.TableIdRouter; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils; import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator; -import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter; import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics; import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.distributed.event.SchemaChangeResponse; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java index 27012edca53..df990425afc 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java @@ -25,11 +25,11 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; +import org.apache.flink.cdc.common.route.TableIdRouter; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.MetadataApplier; import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator; import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager; -import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java index feb1a38d5af..901ba168db0 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java @@ -26,11 +26,11 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; +import org.apache.flink.cdc.common.route.TableIdRouter; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils; import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator; -import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter; import org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics; import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest; import org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse; diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java similarity index 53% rename from flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterTest.java rename to flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java index ba21b63eb53..b8e487700da 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java @@ -15,9 +15,10 @@ * limitations under the License. */ -package org.apache.flink.cdc.runtime.operators.schema.common; +package org.apache.flink.cdc.common.route; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.runtime.operators.schema.common.SchemaTestBase; import org.junit.jupiter.api.Test; @@ -26,6 +27,7 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; @@ -38,6 +40,17 @@ private static List testRoute(String tableId) { .collect(Collectors.toList()); } + private static String testConvert(String input) { + return TableIdRouter.convertTableListToRegExpPattern(input); + } + + @Test + void testConvertingDebeziumTableIdToStandardRegex() { + assertThat(testConvert("foo.bar")).isEqualTo("foo\\.bar"); + assertThat(testConvert("foo.bar,foo.baz")).isEqualTo("foo\\.bar|foo\\.baz"); + assertThat(testConvert("db.\\.*")).isEqualTo("db\\..*"); + } + @Test void testImplicitRoute() { assertThat(testRoute("db_0.table_1")).containsExactlyInAnyOrder("db_0.table_1"); @@ -129,6 +142,122 @@ void testGroupSourceTablesByRouteRule() { TableId.parse("db_5.table_1"), TableId.parse("db_5.table_2"), TableId.parse("db_5.table_3"))), + new HashSet<>(), + new HashSet<>(), new HashSet<>()); } + + @Test + void testRegExpCapturingGroupExpression() { + assertThat(Stream.of("re_1.table_1", "re_22.table_22", "re_333.table_333")) + .map(TableIdRouterTest::testRoute) + .map(List::toString) + .containsExactly( + "[database.another_table_with_111_index]", + "[database.another_table_with_222222_index]", + "[database.another_table_with_333333333_index]"); + + assertThat(Stream.of("inv_1.table_foo", "inv_22.table_bar", "inv_333.table_baz")) + .map(TableIdRouterTest::testRoute) + .map(List::toString) + .containsExactly("[table_foo.inv_1]", "[table_bar.inv_22]", "[table_baz.inv_333]"); + } + + private static List testStdRegExpRoute( + String sourceRouteRule, String sinkRouteRule, List sourceTables) { + TableIdRouter router = + new TableIdRouter(List.of(new RouteRule(sourceRouteRule, sinkRouteRule))); + return sourceTables.stream() + .map(TableId::parse) + .map(router::route) + .map(List::toString) + .collect(Collectors.toList()); + } + + @Test + void testRegExpComplexRouting() { + // Capture the entire database. + List tablesToRoute = + List.of("db1.tbl1", "db1.tbl2", "db1.tbl3", "db2.tbl2", "db2.tbl3", "db3.tbl3"); + assertThat(testStdRegExpRoute("db1.(\\.*)", "db1.combined", tablesToRoute)) + .containsExactly( + "[db1.combined]", + "[db1.combined]", + "[db1.combined]", + "[db2.tbl2]", + "[db2.tbl3]", + "[db3.tbl3]"); + + // Capture the entire database and append prefixes. + assertThat(testStdRegExpRoute("db1.(\\.*)", "db1.pre_$1", tablesToRoute)) + .containsExactly( + "[db1.pre_tbl1]", + "[db1.pre_tbl2]", + "[db1.pre_tbl3]", + "[db2.tbl2]", + "[db2.tbl3]", + "[db3.tbl3]"); + + // Capture the entire database and append suffixes. + assertThat(testStdRegExpRoute("db1.(\\.*)", "db1.$1_suf", tablesToRoute)) + .containsExactly( + "[db1.tbl1_suf]", + "[db1.tbl2_suf]", + "[db1.tbl3_suf]", + "[db2.tbl2]", + "[db2.tbl3]", + "[db3.tbl3]"); + + // Capture the entire database and append extract parts. + assertThat(testStdRegExpRoute("db1.tbl(\\.*)", "db1.no$1", tablesToRoute)) + .containsExactly( + "[db1.no1]", + "[db1.no2]", + "[db1.no3]", + "[db2.tbl2]", + "[db2.tbl3]", + "[db3.tbl3]"); + + // Capture databases and append database prefix. + assertThat(testStdRegExpRoute("(\\.*).tbl3", "pre_$1.tbl3", tablesToRoute)) + .containsExactly( + "[db1.tbl1]", + "[db1.tbl2]", + "[pre_db1.tbl3]", + "[db2.tbl2]", + "[pre_db2.tbl3]", + "[pre_db3.tbl3]"); + + // Capture databases and append database suffix. + assertThat(testStdRegExpRoute("(\\.*).tbl3", "$1_suf.tbl3", tablesToRoute)) + .containsExactly( + "[db1.tbl1]", + "[db1.tbl2]", + "[db1_suf.tbl3]", + "[db2.tbl2]", + "[db2_suf.tbl3]", + "[db3_suf.tbl3]"); + + // Capture databases and extract database parts. + assertThat(testStdRegExpRoute("db(\\.*).(tbl\\.*)", "no$1.$2", tablesToRoute)) + .containsExactly( + "[no1.tbl1]", + "[no1.tbl2]", + "[no1.tbl3]", + "[no2.tbl2]", + "[no2.tbl3]", + "[no3.tbl3]"); + + // Capture multiple parts and append extra tags. + assertThat( + testStdRegExpRoute( + "db(\\.*).tbl(\\.*)", "Database$1.Collection$2", tablesToRoute)) + .containsExactly( + "[Database1.Collection1]", + "[Database1.Collection2]", + "[Database1.Collection3]", + "[Database2.Collection2]", + "[Database2.Collection3]", + "[Database3.Collection3]"); + } } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java index 99d6929d1b5..40b40b46869 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java @@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; +import org.apache.flink.cdc.common.route.TableIdRouter; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.sink.MetadataApplier; diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java index 8086f650ef1..af273d13d4f 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.route.RouteRule; +import org.apache.flink.cdc.common.route.TableIdRouter; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent; @@ -62,7 +63,14 @@ public abstract class SchemaTestBase { new RouteRule("db_5.table_\\.*", "db_5.prefix_<>_suffix", "<>"), // Irrelevant routes - new RouteRule("foo", "bar", null)); + new RouteRule("foo", "bar", null), + + // Standard RegExp capturing rules + new RouteRule( + "re_\\d+.table_(\\.*)", + "database.another_table_with_$1$1$1_index", + null), + new RouteRule("(inv_\\d+).(table_\\.*)", "$2.$1", null)); protected static final TableIdRouter TABLE_ID_ROUTER = new TableIdRouter(ROUTING_RULES);