diff --git a/README.md b/README.md index ab17cfa66cb..8b818a92349 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,7 @@ SELECT * FROM test_table; pipeline: name: Sync MySQL Database to Doris parallelism: 2 + route-mode: ALL_MATCH user-defined-function: - name: addone classpath: com.example.functions.AddOneFunctionClass diff --git a/docs/content.zh/docs/core-concept/data-pipeline.md b/docs/content.zh/docs/core-concept/data-pipeline.md index a2edc08548b..6bde6e46202 100644 --- a/docs/content.zh/docs/core-concept/data-pipeline.md +++ b/docs/content.zh/docs/core-concept/data-pipeline.md @@ -118,6 +118,7 @@ under the License. | `parallelism` | pipeline的全局并发度,默认值是1。 | optional | | `local-time-zone` | 作业级别的本地时区。 | optional | | `execution.runtime-mode` | pipeline 的运行模式,包含 STREAMING 和 BATCH,默认值是 STREAMING。 | optional | +| `route-mode` | [路由]({{< ref "docs/core-concept/route" >}}#路由模式)规则的匹配模式。可选值:`ALL_MATCH`(默认,应用所有匹配的规则)或 `FIRST_MATCH`(只应用第一个匹配的规则)。 | optional | | `schema.change.behavior` | 如何处理 [schema 变更]({{< ref "docs/core-concept/schema-evolution" >}})。可选值:[`exception`]({{< ref "docs/core-concept/schema-evolution" >}}#exception-mode)、[`evolve`]({{< ref "docs/core-concept/schema-evolution" >}}#evolve-mode)、[`try_evolve`]({{< ref "docs/core-concept/schema-evolution" >}}#tryevolve-mode)、[`lenient`]({{< ref "docs/core-concept/schema-evolution" >}}#lenient-mode)(默认值)或 [`ignore`]({{< ref "docs/core-concept/schema-evolution" >}}#ignore-mode)。 | optional | | `schema.operator.uid` | Schema 算子的唯一 ID。此 ID 用于算子间通信,必须在所有算子中保持唯一。**已废弃**:请使用 `operator.uid.prefix` 代替。 | optional | | `schema-operator.rpc-timeout` | SchemaOperator 等待下游 SchemaChangeEvent 应用完成的超时时间,默认值是 3 分钟。 | optional | diff --git a/docs/content.zh/docs/core-concept/route.md b/docs/content.zh/docs/core-concept/route.md index 774915d3d6f..88428f85339 100644 --- a/docs/content.zh/docs/core-concept/route.md +++ b/docs/content.zh/docs/core-concept/route.md @@ -39,6 +39,29 @@ under the License. 一个 Route 模块可以包含一个或多个 source-table/sink-table 规则。 +# 路由模式 +默认情况下,所有匹配的路由规则都会被应用到表上。你可以在 pipeline 配置中通过 `route-mode` 选项来改变这一行为: + +| 值 | 描述 | +|--------------|-------------------------------------------------| +| `ALL_MATCH` | 应用所有匹配的路由规则到表上。这是默认模式。 | +| `FIRST_MATCH`| 只应用第一个匹配的路由规则,并停止后续规则的计算。 | + +例如,使用 `FIRST_MATCH` 模式: + +```yaml +pipeline: + name: Sync MySQL Database to Doris + parallelism: 2 + route-mode: FIRST_MATCH +``` + +{{< hint info >}} + +当使用 `FIRST_MATCH` 模式时,路由规则会按照定义的顺序进行计算。第一个匹配源表的规则会被应用,后续的规则将被跳过。 + +{{< /hint >}} + # 示例 ## 路由一个 Data Source 表到一个 Data Sink 表 如果同步一个 `mydb` 数据库中的 `web_order` 表到一个相同库的 `ods_web_order` 表,我们可以使用下面的 yaml 文件来定义这个路由: diff --git a/docs/content/docs/core-concept/data-pipeline.md b/docs/content/docs/core-concept/data-pipeline.md index e8ab9262407..2a9a27b5008 100644 --- a/docs/content/docs/core-concept/data-pipeline.md +++ b/docs/content/docs/core-concept/data-pipeline.md @@ -120,6 +120,7 @@ Note that whilst the parameters are each individually optional, at least one of | `parallelism` | The global parallelism of the pipeline. Defaults to 1. | optional | | `local-time-zone` | The local time zone defines current session time zone id. | optional | | `execution.runtime-mode` | The runtime mode of the pipeline includes STREAMING and BATCH, with the default value being STREAMING. | optional | +| `route-mode` | The matching mode for [route]({{< ref "docs/core-concept/route" >}}#route-mode) rules. One of: `ALL_MATCH` (default, apply all matching rules) or `FIRST_MATCH` (apply only the first matching rule). | optional | | `schema.change.behavior` | How to handle [changes in schema]({{< ref "docs/core-concept/schema-evolution" >}}). One of: [`exception`]({{< ref "docs/core-concept/schema-evolution" >}}#exception-mode), [`evolve`]({{< ref "docs/core-concept/schema-evolution" >}}#evolve-mode), [`try_evolve`]({{< ref "docs/core-concept/schema-evolution" >}}#tryevolve-mode), [`lenient`]({{< ref "docs/core-concept/schema-evolution" >}}#lenient-mode) (default) or [`ignore`]({{< ref "docs/core-concept/schema-evolution" >}}#ignore-mode). | optional | | `schema.operator.uid` | The unique ID for schema operator. This ID will be used for inter-operator communications and must be unique across operators. **Deprecated**: use `operator.uid.prefix` instead. | optional | | `schema-operator.rpc-timeout` | The timeout time for SchemaOperator to wait downstream SchemaChangeEvent applying finished, the default value is 3 minutes. | optional | diff --git a/docs/content/docs/core-concept/route.md b/docs/content/docs/core-concept/route.md index 394f5a56a9e..2c1de4435b0 100644 --- a/docs/content/docs/core-concept/route.md +++ b/docs/content/docs/core-concept/route.md @@ -28,7 +28,7 @@ under the License. **Route** specifies the rule of matching a list of source-table and mapping to sink-table. The most typical scenario is the merge of sub-databases and sub-tables, routing multiple upstream source tables to the same sink table. # Parameters -To describe a route, the follows are required: +To describe a route, the follows are required: | parameter | meaning | optional/required | |----------------|---------------------------------------------------------------------------------------------|-------------------| @@ -39,6 +39,29 @@ To describe a route, the follows are required: A route module can contain a list of source-table/sink-table rules. +# Route Mode +By default, all matching route rules are applied to a table. You can configure the `route-mode` option in the pipeline configuration to change this behavior: + +| Value | Description | +|--------------|------------------------------------------------------------------------| +| `ALL_MATCH` | Apply all matching route rules to a table. This is the default mode. | +| `FIRST_MATCH`| Apply only the first matching route rule and stop evaluation. | + +For example, to use `FIRST_MATCH` mode: + +```yaml +pipeline: + name: Sync MySQL Database to Doris + parallelism: 2 + route-mode: FIRST_MATCH +``` + +{{< hint info >}} + +When using `FIRST_MATCH` mode, route rules are evaluated in the order they are defined. The first rule that matches the source table will be applied, and subsequent rules will be skipped. + +{{< /hint >}} + # Example ## Route one Data Source table to one Data Sink table if synchronize the table `web_order` in the database `mydb` to a Doris table `ods_web_order`, we can use this yaml file to define this route: diff --git a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java index bbfafcf8e69..8349c297708 100644 --- a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java +++ b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java @@ -206,6 +206,15 @@ void testUdfDefinitionWithOptions() throws Exception { assertThat(pipelineDef).isEqualTo(pipelineDefWithUdfOptions); } + @Test + void testRouteMode() throws Exception { + URL resource = + Resources.getResource("definitions/pipeline-definition-with-route-mode.yaml"); + YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser(); + PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration()); + assertThat(pipelineDef).isEqualTo(pipelineDefWithRouteMode); + } + @Test void testSchemaEvolutionTypesConfiguration() throws Exception { testSchemaEvolutionTypesParsing( @@ -716,4 +725,60 @@ void testParsingFullDefinitionFromString() throws Exception { ImmutableMap.builder() .put("parallelism", "1") .build())); + + private final PipelineDef pipelineDefWithRouteMode = + new PipelineDef( + new SourceDef( + "mysql", + null, + Configuration.fromMap( + ImmutableMap.builder() + .put("hostname", "localhost") + .put("port", "3306") + .put("username", "root") + .put("password", "123456") + .put("tables", "mydb.\\.*") + .put("server-id", "5400-5404") + .put("server-time-zone", "UTC") + .build())), + new SinkDef( + "doris", + null, + Configuration.fromMap( + ImmutableMap.builder() + .put("fenodes", "127.0.0.1:8030") + .put("username", "root") + .put("password", "") + .build()), + ImmutableSet.of( + DROP_COLUMN, + ALTER_COLUMN_TYPE, + ADD_COLUMN, + CREATE_TABLE, + RENAME_COLUMN)), + Arrays.asList( + new RouteDef( + "mydb.order_.*", + "ods_db.ods_orders", + null, + "Merge all order sharded tables"), + new RouteDef( + "mydb.product_.*", + "ods_db.ods_products", + null, + "Merge all product sharded tables"), + new RouteDef( + "mydb.*", + "ods_db.ods_<>", + "<>", + "One-to-one mapping for other tables")), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList(), + Configuration.fromMap( + ImmutableMap.builder() + .put("name", "mysql_to_doris_with_route_match_mode") + .put("parallelism", "2") + .put("route-mode", "FIRST_MATCH") + .build())); } diff --git a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-route-mode.yaml b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-route-mode.yaml new file mode 100644 index 00000000000..94be5207538 --- /dev/null +++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-with-route-mode.yaml @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Example pipeline definition with route match-mode +source: + type: mysql + hostname: localhost + port: 3306 + username: root + password: 123456 + tables: mydb.\.* + server-id: 5400-5404 + server-time-zone: UTC + +sink: + type: doris + fenodes: 127.0.0.1:8030 + username: root + password: "" + +route: + - source-table: mydb.order_.* + sink-table: ods_db.ods_orders + description: "Merge all order sharded tables" + - source-table: mydb.product_.* + sink-table: ods_db.ods_products + description: "Merge all product sharded tables" + - source-table: mydb.* + sink-table: ods_db.ods_<> + replace-symbol: <> + description: "One-to-one mapping for other tables" + +pipeline: + name: mysql_to_doris_with_route_match_mode + parallelism: 2 + route-mode: FIRST_MATCH diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java index f27c37f7a6d..443d6145b74 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java @@ -72,6 +72,22 @@ public class PipelineOptions { "EXCEPTION: Throw an exception to terminate the sync pipeline."))) .build()); + public static final ConfigOption PIPELINE_ROUTE_MODE = + ConfigOptions.key("route-mode") + .enumType(RouteMode.class) + .defaultValue(RouteMode.ALL_MATCH) + .withDescription( + Description.builder() + .text("Match mode for routing rules. ") + .linebreak() + .add( + ListElement.list( + text( + "ALL_MATCH: Apply all matching route rules to a table."), + text( + "FIRST_MATCH: Apply only the first matching route rule and stop evaluation."))) + .build()); + public static final ConfigOption PIPELINE_LOCAL_TIME_ZONE = ConfigOptions.key("local-time-zone") .stringType() diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/RouteMode.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/RouteMode.java new file mode 100644 index 00000000000..bd39b0006e2 --- /dev/null +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/RouteMode.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.common.pipeline; + +import org.apache.flink.cdc.common.annotation.PublicEvolving; + +/** Route mode for routing rules. */ +@PublicEvolving +public enum RouteMode { + /** Match all applicable routing rules. */ + ALL_MATCH, + /** Match only the first applicable routing rule. */ + FIRST_MATCH; +} diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java index 06126b88a1d..9c9d7d016a4 100755 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java +++ b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java @@ -20,6 +20,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.cdc.common.annotation.PublicEvolving; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.schema.Selectors; import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder; @@ -34,7 +35,10 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -53,6 +57,7 @@ public class TableIdRouter { private final List> routes; private final LoadingCache> routingCache; + private final RouteMode routeMode; private static final String DOT_PLACEHOLDER = "_dot_placeholder_"; @@ -106,6 +111,11 @@ public static String convertTableListToRegExpPattern(String tables) { } public TableIdRouter(List routingRules) { + this(routingRules, RouteMode.ALL_MATCH); + } + + public TableIdRouter(List routingRules, RouteMode routeMode) { + this.routeMode = routeMode; this.routes = new ArrayList<>(); for (RouteRule rule : routingRules) { try { @@ -139,11 +149,19 @@ public List route(TableId sourceTableId) { } private List calculateRoute(TableId sourceTableId) { - List routedTableIds = - routes.stream() - .filter(route -> matches(route.f0, sourceTableId)) - .map(route -> resolveReplacement(sourceTableId, route)) - .collect(Collectors.toList()); + List routedTableIds = new ArrayList<>(); + + for (Tuple3 route : routes) { + if (matches(route.f0, sourceTableId)) { + routedTableIds.add(resolveReplacement(sourceTableId, route)); + + // If match mode is FIRST_MATCH, stop after the first match + if (routeMode == RouteMode.FIRST_MATCH) { + break; + } + } + } + if (routedTableIds.isEmpty()) { routedTableIds.add(sourceTableId); } @@ -177,13 +195,36 @@ public List> groupSourceTablesByRouteRule(Set tableIdSet) if (routes.isEmpty()) { return new ArrayList<>(); } - return routes.stream() - .map( - route -> - tableIdSet.stream() - .filter(tableId -> matches(route.f0, tableId)) - .collect(Collectors.toSet())) - .collect(Collectors.toList()); + + if (routeMode == RouteMode.ALL_MATCH) { + return routes.stream() + .map( + route -> + tableIdSet.stream() + .filter(tableId -> matches(route.f0, tableId)) + .collect(Collectors.toSet())) + .collect(Collectors.toList()); + } else if (routeMode == RouteMode.FIRST_MATCH) { + Map matchingTableIds = new HashMap<>(); + for (TableId tableId : tableIdSet) { + for (int i = 0; i < routes.size(); i++) { + if (routes.get(i).f0.matcher(tableId.toString()).matches()) { + matchingTableIds.put(tableId, i); + break; + } + } + } + List> routeGroups = new ArrayList<>(routes.size()); + for (int i = 0; i < routes.size(); i++) { + routeGroups.add(new HashSet<>()); + } + for (Map.Entry entry : matchingTableIds.entrySet()) { + routeGroups.get(entry.getValue()).add(entry.getKey()); + } + return routeGroups; + } else { + throw new IllegalArgumentException("Unexpected route mode: " + routeMode); + } } private static boolean matches(Pattern pattern, TableId tableId) { diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java index 985613e131d..de31732b2b8 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.configuration.Configuration; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.RuntimeExecutionMode; import org.apache.flink.cdc.common.types.LocalZonedTimestampType; import org.apache.flink.cdc.composer.PipelineComposer; @@ -32,6 +33,7 @@ import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE; import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE; +import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_ROUTE_MODE; /** * Definition of a pipeline. @@ -100,6 +102,10 @@ public List getRoute() { return routes; } + public RouteMode getRouteMode() { + return config.get(PIPELINE_ROUTE_MODE); + } + public List getTransforms() { return transforms; } @@ -125,6 +131,8 @@ public String toString() { + sink + ", routes=" + routes + + ", routeMode=" + + getRouteMode() + ", transforms=" + transforms + ", udfs=" diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index 6de9045af39..1854b05032b 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -220,7 +220,8 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef) pipelineDef .getSink() .getIncludedSchemaEvolutionTypes()), - pipelineDef.getRoute()); + pipelineDef.getRoute(), + pipelineDef.getRouteMode()); } else { // Translate a regular topology for sources without distributed tables @@ -235,7 +236,8 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef) pipelineDef .getSink() .getIncludedSchemaEvolutionTypes()), - pipelineDef.getRoute()); + pipelineDef.getRoute(), + pipelineDef.getRouteMode()); // Schema Operator ---(shuffled)---> Partitioning stream = diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java index 8f31803b3f7..7340b3e80d6 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java @@ -18,7 +18,9 @@ package org.apache.flink.cdc.composer.flink.translator; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; @@ -55,29 +57,44 @@ public SchemaOperatorTranslator( this.timezone = timezone; } + @VisibleForTesting public DataStream translateRegular( DataStream input, int parallelism, MetadataApplier metadataApplier, List routes) { - return translateRegular(input, parallelism, false, metadataApplier, routes); + return translateRegular( + input, parallelism, false, metadataApplier, routes, RouteMode.ALL_MATCH); } + @VisibleForTesting public DataStream translateRegular( DataStream input, int parallelism, boolean isBatchMode, MetadataApplier metadataApplier, List routes) { + return translateRegular( + input, parallelism, isBatchMode, metadataApplier, routes, RouteMode.ALL_MATCH); + } + + public DataStream translateRegular( + DataStream input, + int parallelism, + boolean isBatchMode, + MetadataApplier metadataApplier, + List routes, + RouteMode routeMode) { return isBatchMode ? addRegularSchemaBatchOperator( - input, parallelism, metadataApplier, routes, timezone) + input, parallelism, metadataApplier, routes, routeMode, timezone) : addRegularSchemaOperator( input, parallelism, metadataApplier, routes, + routeMode, schemaChangeBehavior, timezone); } @@ -86,9 +103,16 @@ public DataStream translateDistributed( DataStream input, int parallelism, MetadataApplier metadataApplier, - List routes) { + List routes, + RouteMode routeMode) { return addDistributedSchemaOperator( - input, parallelism, metadataApplier, routes, schemaChangeBehavior, timezone); + input, + parallelism, + metadataApplier, + routes, + routeMode, + schemaChangeBehavior, + timezone); } @Deprecated @@ -101,6 +125,7 @@ private DataStream addRegularSchemaOperator( int parallelism, MetadataApplier metadataApplier, List routes, + RouteMode routeMode, SchemaChangeBehavior schemaChangeBehavior, String timezone) { List routingRules = new ArrayList<>(); @@ -118,6 +143,7 @@ private DataStream addRegularSchemaOperator( new SchemaOperatorFactory( metadataApplier, routingRules, + routeMode, rpcTimeOut, schemaChangeBehavior, timezone)); @@ -130,6 +156,7 @@ private DataStream addRegularSchemaBatchOperator( int parallelism, MetadataApplier metadataApplier, List routes, + RouteMode routeMode, String timezone) { List routingRules = new ArrayList<>(); for (RouteDef route : routes) { @@ -143,7 +170,8 @@ private DataStream addRegularSchemaBatchOperator( input.transform( "SchemaBatchOperator", new EventTypeInfo(), - new BatchSchemaOperator(routingRules, metadataApplier, timezone)); + new BatchSchemaOperator( + routingRules, routeMode, metadataApplier, timezone)); stream.uid(schemaOperatorUid).setParallelism(parallelism); return stream; } @@ -153,6 +181,7 @@ private DataStream addDistributedSchemaOperator( int parallelism, MetadataApplier metadataApplier, List routes, + RouteMode routeMode, SchemaChangeBehavior schemaChangeBehavior, String timezone) { Preconditions.checkArgument( @@ -181,6 +210,7 @@ private DataStream addDistributedSchemaOperator( .SchemaOperatorFactory( metadataApplier, routingRules, + routeMode, rpcTimeOut, schemaChangeBehavior, timezone)) diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java index d01d987d492..7a01193c3de 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerITCase.java @@ -31,6 +31,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; @@ -1588,6 +1589,146 @@ private List generateDecimalColumnEvents(String tableNamePrefix) { return events; } + @ParameterizedTest + @EnumSource + void testRouteModeFirstMatch(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + TableId routedTable1 = TableId.tableId("default_namespace", "default_schema", "routed1"); + TableId routedTable2 = TableId.tableId("default_namespace", "default_schema", "routed2"); + TableId routedAll = TableId.tableId("default_namespace", "default_schema", "routed_all"); + List routeDef = + Arrays.asList( + new RouteDef(TABLE_1.toString(), routedTable1.toString(), null, null), + new RouteDef(TABLE_2.toString(), routedTable2.toString(), null, null), + new RouteDef( + "default_namespace.default_schema.table\\.*", + routedAll.toString(), + null, + null)); + + // Setup pipeline with first-match route mode + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + pipelineConfig.set(PipelineOptions.PIPELINE_ROUTE_MODE, RouteMode.FIRST_MATCH); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + routeDef, + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check result in ValuesDatabase + // With first-match mode, tables should only be routed to their first matching route + List routed1Results = ValuesDatabase.getResults(routedTable1); + assertThat(routed1Results) + .contains( + "default_namespace.default_schema.routed1:col1=2;newCol3=x", + "default_namespace.default_schema.routed1:col1=3;newCol3="); + List routed2Results = ValuesDatabase.getResults(routedTable2); + assertThat(routed2Results) + .contains( + "default_namespace.default_schema.routed2:col1=1;col2=1", + "default_namespace.default_schema.routed2:col1=2;col2=2", + "default_namespace.default_schema.routed2:col1=3;col2=3"); + + List allResults = ValuesDatabase.getAllResults(); + assertThat(allResults).noneMatch(result -> result.startsWith(routedAll.toString())); + } + + @ParameterizedTest + @EnumSource + void testRouteModeAllMatch(ValuesDataSink.SinkApi sinkApi) throws Exception { + FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster(); + + // Setup value source + Configuration sourceConfig = new Configuration(); + sourceConfig.set( + ValuesDataSourceOptions.EVENT_SET_ID, + ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_MULTI_TABLES); + SourceDef sourceDef = + new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig); + + // Setup value sink + Configuration sinkConfig = new Configuration(); + sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true); + sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi); + SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig); + + TableId routedTable1 = TableId.tableId("default_namespace", "default_schema", "routed1"); + TableId routedTable2 = TableId.tableId("default_namespace", "default_schema", "routed2"); + TableId routedAll = TableId.tableId("default_namespace", "default_schema", "routed_all"); + List routeDef = + Arrays.asList( + new RouteDef(TABLE_1.toString(), routedTable1.toString(), null, null), + new RouteDef(TABLE_2.toString(), routedTable2.toString(), null, null), + new RouteDef( + "default_namespace.default_schema.table\\.*", + routedAll.toString(), + null, + null)); + + // Setup pipeline with all-match route mode + Configuration pipelineConfig = new Configuration(); + pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1); + pipelineConfig.set( + PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE); + pipelineConfig.set(PipelineOptions.PIPELINE_ROUTE_MODE, RouteMode.ALL_MATCH); + PipelineDef pipelineDef = + new PipelineDef( + sourceDef, + sinkDef, + routeDef, + Collections.emptyList(), + Collections.emptyList(), + pipelineConfig); + + // Execute the pipeline + PipelineExecution execution = composer.compose(pipelineDef); + execution.execute(); + + // Check result in ValuesDatabase + // With all-match mode, tables should be routed to all matching routes + List routed1Results = ValuesDatabase.getResults(routedTable1); + assertThat(routed1Results) + .contains( + "default_namespace.default_schema.routed1:col1=2;newCol3=x", + "default_namespace.default_schema.routed1:col1=3;newCol3="); + List routed2Results = ValuesDatabase.getResults(routedTable2); + assertThat(routed2Results) + .contains( + "default_namespace.default_schema.routed2:col1=1;col2=1", + "default_namespace.default_schema.routed2:col1=2;col2=2", + "default_namespace.default_schema.routed2:col1=3;col2=3"); + + List routedAllResults = ValuesDatabase.getResults(routedAll); + assertThat(routedAllResults).isNotEmpty(); + assertThat(routedAllResults.stream().filter(s -> s.contains("routed_all")).count()) + .isGreaterThan(0); + } + BinaryRecordData generate(Schema schema, Object... fields) { return (new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0]))) .generate( diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java index c9169388d3b..870ddfa7274 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/RouteE2eITCase.java @@ -1183,4 +1183,145 @@ void testExtremeMergeTableRoute(boolean batchMode) throws Exception { .toArray(String[]::new)); extremeRouteTestDatabase.dropDatabase(); } + + @ParameterizedTest(name = "isFirstMatch: {0}") + @ValueSource(booleans = {true, false}) + void testMultipleRouteWithRouteMode(boolean isFirstMatch) throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: values\n" + + "route:\n" + + " - source-table: %s.(TABLEALPHA|TABLEBETA)\n" + + " sink-table: NEW_%s.ALPHABET\n" + + " - source-table: %s.(TABLEBETA|TABLEGAMMA)\n" + + " sink-table: NEW_%s.BETAGAMM\n" + + "\n" + + "pipeline:\n" + + " route-mode: %s\n" + + " parallelism: %d", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + routeTestDatabase.getDatabaseName(), + routeTestDatabase.getDatabaseName(), + routeTestDatabase.getDatabaseName(), + routeTestDatabase.getDatabaseName(), + routeTestDatabase.getDatabaseName(), + isFirstMatch ? "FIRST_MATCH" : "ALL_MATCH", + parallelism); + submitPipelineJob(pipelineJob); + waitUntilJobRunning(Duration.ofSeconds(30)); + if (isFirstMatch) { + validateResult( + routeDbNameFormatter, + "CreateTableEvent{tableId=NEW_%s.ALPHABET, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}", + "CreateTableEvent{tableId=NEW_%s.BETAGAMM, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}", + "CreateTableEvent{tableId=%s.TABLEDELTA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[1008, 8], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[1009, 8.1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[1010, 10], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[1011, 11], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[2011, 11], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[2012, 12], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[2013, 13], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[2014, 14], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[3015, Amber], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[3016, Black], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[3017, Cyan], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[3018, Denim], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[4019, Yosemite], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[4020, El Capitan], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[4021, Sierra], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[4022, High Sierra], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[4023, Mojave], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[4024, Catalina], op=INSERT, meta=()}"); + + } else { + validateResult( + routeDbNameFormatter, + "CreateTableEvent{tableId=NEW_%s.ALPHABET, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}", + "CreateTableEvent{tableId=NEW_%s.BETAGAMM, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}", + "CreateTableEvent{tableId=%s.TABLEDELTA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17)}, primaryKeys=ID, options=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[1008, 8], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[1009, 8.1], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[1010, 10], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[1011, 11], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[2011, 11], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[2012, 12], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[2013, 13], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[2014, 14], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[2011, 11], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[2012, 12], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[2013, 13], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[2014, 14], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[3015, Amber], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[3016, Black], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[3017, Cyan], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[3018, Denim], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[4019, Yosemite], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[4020, El Capitan], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[4021, Sierra], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[4022, High Sierra], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[4023, Mojave], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[4024, Catalina], op=INSERT, meta=()}"); + } + + generateIncrementalChanges(); + if (isFirstMatch) { + validateResult( + routeDbNameFormatter, + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[3007, 7], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[2014, 14], after=[2014, 2014], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[3019, Emerald], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[4024, Catalina], after=[], op=DELETE, meta=()}"); + + } else { + validateResult( + routeDbNameFormatter, + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[3007, 7], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[2014, 14], after=[2014, 2014], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[2014, 14], after=[2014, 2014], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[3019, Emerald], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[4024, Catalina], after=[], op=DELETE, meta=()}"); + } + + generateSchemaChanges(); + + if (isFirstMatch) { + validateResult( + routeDbNameFormatter, + "AddColumnEvent{tableId=NEW_%s.ALPHABET, addedColumns=[ColumnWithPosition{column=`NAME` VARCHAR(17), position=AFTER, existedColumnName=VERSION}]}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", + "AddColumnEvent{tableId=NEW_%s.ALPHABET, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=AFTER, existedColumnName=NAME}]}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10002, null, null, 15], op=INSERT, meta=()}", + "RenameColumnEvent{tableId=NEW_%s.BETAGAMM, nameMapping={VERSION=VERSION_EX}}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[10003, Fluorite], op=INSERT, meta=()}", + "DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[10004], op=INSERT, meta=()}"); + } else { + validateResult( + routeDbNameFormatter, + "AddColumnEvent{tableId=NEW_%s.ALPHABET, addedColumns=[ColumnWithPosition{column=`NAME` VARCHAR(17), position=AFTER, existedColumnName=VERSION}]}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10001, 12, Derrida], op=INSERT, meta=()}", + "AddColumnEvent{tableId=NEW_%s.ALPHABET, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=AFTER, existedColumnName=NAME}]}", + "AddColumnEvent{tableId=NEW_%s.BETAGAMM, addedColumns=[ColumnWithPosition{column=`VERSION_EX` VARCHAR(17), position=AFTER, existedColumnName=VERSION}]}", + "DataChangeEvent{tableId=NEW_%s.ALPHABET, before=[], after=[10002, null, null, 15], op=INSERT, meta=()}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[10002, null, 15], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId=NEW_%s.BETAGAMM, typeMapping={VERSION=STRING}, oldTypeMapping={VERSION=VARCHAR(17)}}", + "DataChangeEvent{tableId=NEW_%s.BETAGAMM, before=[], after=[10003, null, Fluorite], op=INSERT, meta=()}", + "DropColumnEvent{tableId=%s.TABLEDELTA, droppedColumnNames=[VERSION]}", + "DataChangeEvent{tableId=%s.TABLEDELTA, before=[], after=[10004], op=INSERT, meta=()}"); + } + } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java index 640fb765caf..af0cd6f4cb7 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.route.TableIdRouter; @@ -87,6 +88,7 @@ public abstract class SchemaRegistry implements OperatorCoordinator, Coordinatio protected final MetadataApplier metadataApplier; protected final Duration rpcTimeout; protected final List routingRules; + protected final RouteMode routeMode; protected final SchemaChangeBehavior behavior; // ------------------------- @@ -104,6 +106,7 @@ protected SchemaRegistry( ExecutorService coordinatorExecutor, MetadataApplier metadataApplier, List routingRules, + RouteMode routeMode, SchemaChangeBehavior schemaChangeBehavior, Duration rpcTimeout) { this.context = context; @@ -111,6 +114,7 @@ protected SchemaRegistry( this.coordinatorExecutor = coordinatorExecutor; this.metadataApplier = metadataApplier; this.routingRules = routingRules; + this.routeMode = routeMode; this.rpcTimeout = rpcTimeout; this.behavior = schemaChangeBehavior; } @@ -127,7 +131,7 @@ public void start() throws Exception { if (this.schemaManager == null) { this.schemaManager = new SchemaManager(); } - this.router = new TableIdRouter(routingRules); + this.router = new TableIdRouter(routingRules, routeMode); } @Override diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java index 8626eafbcbf..f9aad19b856 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinator.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.annotation.VisibleForTesting; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.schema.Schema; @@ -109,6 +110,7 @@ public SchemaCoordinator( ExecutorService coordinatorExecutor, MetadataApplier metadataApplier, List routingRules, + RouteMode routeMode, SchemaChangeBehavior schemaChangeBehavior, Duration rpcTimeout) { super( @@ -117,6 +119,7 @@ public SchemaCoordinator( coordinatorExecutor, metadataApplier, routingRules, + routeMode, schemaChangeBehavior, rpcTimeout); this.schemaChangeThreadPool = Executors.newSingleThreadExecutor(); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinatorProvider.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinatorProvider.java index f028f4cc95c..a5cf25a7f13 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinatorProvider.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaCoordinatorProvider.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.runtime.operators.schema.distributed; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; @@ -39,6 +40,7 @@ public class SchemaCoordinatorProvider implements OperatorCoordinator.Provider { private final String operatorName; private final MetadataApplier metadataApplier; private final List routingRules; + private final RouteMode routeMode; private final SchemaChangeBehavior schemaChangeBehavior; private final Duration rpcTimeout; @@ -47,12 +49,14 @@ public SchemaCoordinatorProvider( String operatorName, MetadataApplier metadataApplier, List routingRules, + RouteMode routeMode, SchemaChangeBehavior schemaChangeBehavior, Duration rpcTimeout) { this.operatorID = operatorID; this.operatorName = operatorName; this.metadataApplier = metadataApplier; this.routingRules = routingRules; + this.routeMode = routeMode; this.schemaChangeBehavior = schemaChangeBehavior; this.rpcTimeout = rpcTimeout; } @@ -75,6 +79,7 @@ public OperatorCoordinator create(OperatorCoordinator.Context context) throws Ex coordinatorExecutor, metadataApplier, routingRules, + routeMode, schemaChangeBehavior, rpcTimeout); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java index 9ec2b2fc9b3..7964efd3f60 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.route.TableIdRouter; @@ -71,13 +72,16 @@ public class SchemaOperator extends AbstractStreamOperatorAdapter private final String timezone; private final SchemaChangeBehavior schemaChangeBehavior; private final List routingRules; + private final RouteMode routeMode; public SchemaOperator( List routingRules, + RouteMode routeMode, Duration rpcTimeOut, SchemaChangeBehavior schemaChangeBehavior, String timezone) { this.routingRules = routingRules; + this.routeMode = routeMode; this.rpcTimeOut = rpcTimeOut; this.schemaChangeBehavior = schemaChangeBehavior; this.timezone = timezone; @@ -100,7 +104,7 @@ public void open() throws Exception { subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); upstreamSchemaTable = HashBasedTable.create(); evolvedSchemaMap = new HashMap<>(); - tableIdRouter = new TableIdRouter(routingRules); + tableIdRouter = new TableIdRouter(routingRules, routeMode); derivator = new SchemaDerivator(); this.schemaOperatorMetrics = new SchemaOperatorMetrics( diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperatorFactory.java index 52378c15f3c..5652f28ad73 100755 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperatorFactory.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperatorFactory.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; @@ -41,18 +42,23 @@ public class SchemaOperatorFactory extends SimpleOperatorFactory private final MetadataApplier metadataApplier; private final List routingRules; + private final RouteMode routeMode; private final SchemaChangeBehavior schemaChangeBehavior; private final Duration rpcTimeout; public SchemaOperatorFactory( MetadataApplier metadataApplier, List routingRules, + RouteMode routeMode, Duration rpcTimeout, SchemaChangeBehavior schemaChangeBehavior, String timezone) { - super(new SchemaOperator(routingRules, rpcTimeout, schemaChangeBehavior, timezone)); + super( + new SchemaOperator( + routingRules, routeMode, rpcTimeout, schemaChangeBehavior, timezone)); this.metadataApplier = metadataApplier; this.routingRules = routingRules; + this.routeMode = routeMode; this.schemaChangeBehavior = schemaChangeBehavior; this.rpcTimeout = rpcTimeout; } @@ -65,6 +71,7 @@ public OperatorCoordinator.Provider getCoordinatorProvider( operatorName, metadataApplier, routingRules, + routeMode, schemaChangeBehavior, rpcTimeout); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java index 8ed7d0916f8..e830b1f2ad5 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/BatchSchemaOperator.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.route.TableIdRouter; @@ -58,6 +59,7 @@ public class BatchSchemaOperator extends AbstractStreamOperatorAdapter // Final fields that are set in constructor private final String timezone; private final List routingRules; + private final RouteMode routeMode; // Transient fields that are set during open() private transient volatile Map originalSchemaMap; @@ -69,10 +71,14 @@ public class BatchSchemaOperator extends AbstractStreamOperatorAdapter private boolean alreadyMergedCreateTableTables = false; public BatchSchemaOperator( - List routingRules, MetadataApplier metadataApplier, String timezone) { + List routingRules, + RouteMode routeMode, + MetadataApplier metadataApplier, + String timezone) { this.chainingStrategy = ChainingStrategy.ALWAYS; this.timezone = timezone; this.routingRules = routingRules; + this.routeMode = routeMode; this.metadataApplier = metadataApplier; } @@ -89,7 +95,7 @@ public void open() throws Exception { super.open(); this.originalSchemaMap = new HashMap<>(); this.evolvedSchemaMap = new HashMap<>(); - this.router = new TableIdRouter(routingRules); + this.router = new TableIdRouter(routingRules, routeMode); this.derivator = new SchemaDerivator(); this.schemaManager = new SchemaManager(SchemaChangeBehavior.IGNORE); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java index 53344dc87ff..3a241aac489 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.schema.Schema; @@ -100,6 +101,7 @@ public SchemaCoordinator( ExecutorService coordinatorExecutor, MetadataApplier metadataApplier, List routes, + RouteMode routeMode, SchemaChangeBehavior schemaChangeBehavior, Duration rpcTimeout) { super( @@ -108,6 +110,7 @@ public SchemaCoordinator( coordinatorExecutor, metadataApplier, routes, + routeMode, schemaChangeBehavior, rpcTimeout); this.schemaChangeThreadPool = Executors.newSingleThreadExecutor(); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java index 253b52cac06..cedc9d37cb7 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.runtime.operators.schema.regular; import org.apache.flink.cdc.common.annotation.Internal; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; @@ -39,6 +40,7 @@ public class SchemaCoordinatorProvider implements OperatorCoordinator.Provider { private final String operatorName; private final MetadataApplier metadataApplier; private final List routingRules; + private final RouteMode routeMode; private final SchemaChangeBehavior schemaChangeBehavior; private final Duration rpcTimeout; @@ -47,12 +49,14 @@ public SchemaCoordinatorProvider( String operatorName, MetadataApplier metadataApplier, List routingRules, + RouteMode routeMode, SchemaChangeBehavior schemaChangeBehavior, Duration rpcTimeout) { this.operatorID = operatorID; this.operatorName = operatorName; this.metadataApplier = metadataApplier; this.routingRules = routingRules; + this.routeMode = routeMode; this.schemaChangeBehavior = schemaChangeBehavior; this.rpcTimeout = rpcTimeout; } @@ -75,6 +79,7 @@ public OperatorCoordinator create(OperatorCoordinator.Context context) throws Ex coordinatorExecutor, metadataApplier, routingRules, + routeMode, schemaChangeBehavior, rpcTimeout); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java index 7dd9a210a0f..b659b7234f7 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.route.TableIdRouter; @@ -76,6 +77,7 @@ public class SchemaOperator extends AbstractStreamOperatorAdapter private final Duration rpcTimeout; private final SchemaChangeBehavior schemaChangeBehavior; private final List routingRules; + private final RouteMode routeMode; // Transient fields that are set during open() private transient int subTaskId; @@ -88,24 +90,26 @@ public class SchemaOperator extends AbstractStreamOperatorAdapter @VisibleForTesting public SchemaOperator(List routingRules) { - this(routingRules, DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT); + this(routingRules, RouteMode.ALL_MATCH, DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT); } @VisibleForTesting - public SchemaOperator(List routingRules, Duration rpcTimeOut) { - this(routingRules, rpcTimeOut, SchemaChangeBehavior.EVOLVE); + public SchemaOperator(List routingRules, RouteMode routeMode, Duration rpcTimeOut) { + this(routingRules, routeMode, rpcTimeOut, SchemaChangeBehavior.EVOLVE); } @VisibleForTesting public SchemaOperator( List routingRules, + RouteMode routeMode, Duration rpcTimeOut, SchemaChangeBehavior schemaChangeBehavior) { - this(routingRules, rpcTimeOut, schemaChangeBehavior, "UTC"); + this(routingRules, routeMode, rpcTimeOut, schemaChangeBehavior, "UTC"); } public SchemaOperator( List routingRules, + RouteMode routeMode, Duration rpcTimeOut, SchemaChangeBehavior schemaChangeBehavior, String timezone) { @@ -114,6 +118,7 @@ public SchemaOperator( this.schemaChangeBehavior = schemaChangeBehavior; this.timezone = timezone; this.routingRules = routingRules; + this.routeMode = routeMode; } @Override @@ -134,7 +139,7 @@ public void open() throws Exception { this.subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(); this.originalSchemaMap = new HashMap<>(); this.evolvedSchemaMap = new HashMap<>(); - this.router = new TableIdRouter(routingRules); + this.router = new TableIdRouter(routingRules, routeMode); this.derivator = new SchemaDerivator(); } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java index 630acc90463..6afc3a13840 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java @@ -19,6 +19,7 @@ import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.event.Event; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.sink.MetadataApplier; @@ -40,18 +41,23 @@ public class SchemaOperatorFactory extends SimpleOperatorFactory private final MetadataApplier metadataApplier; private final List routingRules; + private final RouteMode routeMode; private final SchemaChangeBehavior schemaChangeBehavior; private final Duration rpcTimeout; public SchemaOperatorFactory( MetadataApplier metadataApplier, List routingRules, + RouteMode routeMode, Duration rpcTimeout, SchemaChangeBehavior schemaChangeBehavior, String timezone) { - super(new SchemaOperator(routingRules, rpcTimeout, schemaChangeBehavior, timezone)); + super( + new SchemaOperator( + routingRules, routeMode, rpcTimeout, schemaChangeBehavior, timezone)); this.metadataApplier = metadataApplier; this.routingRules = routingRules; + this.routeMode = routeMode; this.schemaChangeBehavior = schemaChangeBehavior; this.rpcTimeout = rpcTimeout; } @@ -64,6 +70,7 @@ public OperatorCoordinator.Provider getCoordinatorProvider( operatorName, metadataApplier, routingRules, + routeMode, schemaChangeBehavior, rpcTimeout); } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java index b8e487700da..abfc475df51 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/common/route/TableIdRouterTest.java @@ -18,6 +18,7 @@ package org.apache.flink.cdc.common.route; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.runtime.operators.schema.common.SchemaTestBase; import org.junit.jupiter.api.Test; @@ -166,7 +167,9 @@ void testRegExpCapturingGroupExpression() { private static List testStdRegExpRoute( String sourceRouteRule, String sinkRouteRule, List sourceTables) { TableIdRouter router = - new TableIdRouter(List.of(new RouteRule(sourceRouteRule, sinkRouteRule))); + new TableIdRouter( + List.of(new RouteRule(sourceRouteRule, sinkRouteRule)), + RouteMode.ALL_MATCH); return sourceTables.stream() .map(TableId::parse) .map(router::route) diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java index 40b40b46869..55d612b86bc 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaDerivatorTest.java @@ -26,6 +26,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEvent; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.event.TruncateTableEvent; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.route.TableIdRouter; @@ -615,7 +616,8 @@ void testDeduceMergedCreateTableEvent() { new RouteRule("db_3.table_\\.*", "db_3.table_merged"), // Broadcast tables new RouteRule("db_4.table_1", "db_4.table_a"), - new RouteRule("db_4.table_1", "db_4.table_b"))); + new RouteRule("db_4.table_1", "db_4.table_b")), + RouteMode.ALL_MATCH); List createTableEvents = Arrays.asList( new CreateTableEvent( diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java index af273d13d4f..42c78b03027 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaTestBase.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.route.RouteRule; import org.apache.flink.cdc.common.route.TableIdRouter; import org.apache.flink.cdc.common.types.DataType; @@ -72,7 +73,8 @@ public abstract class SchemaTestBase { null), new RouteRule("(inv_\\d+).(table_\\.*)", "$2.$1", null)); - protected static final TableIdRouter TABLE_ID_ROUTER = new TableIdRouter(ROUTING_RULES); + protected static final TableIdRouter TABLE_ID_ROUTER = + new TableIdRouter(ROUTING_RULES, RouteMode.ALL_MATCH); protected static BinaryRecordData genBinRec(String rowType, Object... fields) { return (new BinaryRecordDataGenerator(quickGenRow(rowType).toArray(new DataType[0]))) diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterMatchModeTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterMatchModeTest.java new file mode 100644 index 00000000000..14555947efd --- /dev/null +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/common/TableIdRouterMatchModeTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.operators.schema.common; + +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.RouteMode; +import org.apache.flink.cdc.common.route.RouteRule; +import org.apache.flink.cdc.common.route.TableIdRouter; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat; + +/** Unit test for {@link TableIdRouter} with match-mode support. */ +public class TableIdRouterMatchModeTest { + + @Test + void testFirstMatchMode() { + // Setup routing rules for first-match mode + List routingRules = + Arrays.asList( + // Sharded tables should be merged + new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"), + new RouteRule("mydb.product_\\.*", "ods_db.ods_products"), + // Catch-all rule for one-to-one mapping + new RouteRule("mydb.\\.*", "ods_db.ods_<>", "<>")); + + TableIdRouter router = new TableIdRouter(routingRules, RouteMode.FIRST_MATCH); + + // Test sharded order tables - should match first rule and stop + assertThat(route(router, "mydb.order_1")).containsExactly("ods_db.ods_orders"); + assertThat(route(router, "mydb.order_2")).containsExactly("ods_db.ods_orders"); + assertThat(route(router, "mydb.order_100")).containsExactly("ods_db.ods_orders"); + + // Test sharded product tables - should match second rule and stop + assertThat(route(router, "mydb.product_1")).containsExactly("ods_db.ods_products"); + assertThat(route(router, "mydb.product_2")).containsExactly("ods_db.ods_products"); + + // Test non-sharded tables - should match third rule (catch-all) + assertThat(route(router, "mydb.user")).containsExactly("ods_db.ods_user"); + assertThat(route(router, "mydb.customer")).containsExactly("ods_db.ods_customer"); + assertThat(route(router, "mydb.config")).containsExactly("ods_db.ods_config"); + } + + @Test + void testAllMatchMode() { + // Setup routing rules for all-match mode (default behavior) + List routingRules = + Arrays.asList( + new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"), + new RouteRule("mydb.\\.*", "ods_db.ods_<>", "<>")); + + TableIdRouter router = new TableIdRouter(routingRules, RouteMode.ALL_MATCH); + + // Test sharded order tables - should match BOTH rules + assertThat(route(router, "mydb.order_1")) + .containsExactlyInAnyOrder("ods_db.ods_orders", "ods_db.ods_order_1"); + assertThat(route(router, "mydb.order_2")) + .containsExactlyInAnyOrder("ods_db.ods_orders", "ods_db.ods_order_2"); + + // Test non-sharded tables - should match only second rule + assertThat(route(router, "mydb.user")).containsExactly("ods_db.ods_user"); + } + + @Test + void testFirstMatchWithNoMatchingRules() { + List routingRules = + Arrays.asList( + new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"), + new RouteRule("mydb.product_\\.*", "ods_db.ods_products")); + + TableIdRouter router = new TableIdRouter(routingRules, RouteMode.FIRST_MATCH); + + // Table that doesn't match any rule should route to itself (implicit routing) + assertThat(route(router, "otherdb.user")).containsExactly("otherdb.user"); + } + + @Test + void testAllMatchWithMultipleMatchingRules() { + // Setup multiple overlapping rules + List routingRules = + Arrays.asList( + new RouteRule("db.table_\\.*", "db.merged_1"), + new RouteRule("db.table_\\.*", "db.merged_2"), + new RouteRule("db.table_\\.*", "db.merged_3")); + + TableIdRouter router = new TableIdRouter(routingRules, RouteMode.ALL_MATCH); + + // Should match all three rules + assertThat(route(router, "db.table_1")) + .containsExactlyInAnyOrder("db.merged_1", "db.merged_2", "db.merged_3"); + } + + private static List route(TableIdRouter router, String tableId) { + return router.route(TableId.parse(tableId)).stream() + .map(TableId::toString) + .collect(Collectors.toList()); + } + + @Test + void testGroupSourceTablesByRouteRuleFirstMatch() { + List routingRules = + Arrays.asList( + new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"), + new RouteRule("mydb.product_\\.*", "ods_db.ods_products"), + new RouteRule("mydb.\\.*", "ods_db.ods_<>", "<>")); + + TableIdRouter router = new TableIdRouter(routingRules, RouteMode.FIRST_MATCH); + + Set tableIdSet = + new HashSet<>( + Arrays.asList( + TableId.parse("mydb.order_1"), + TableId.parse("mydb.order_2"), + TableId.parse("mydb.order_100"), + TableId.parse("mydb.product_1"), + TableId.parse("mydb.product_2"), + TableId.parse("mydb.user"), + TableId.parse("mydb.customer"), + TableId.parse("mydb.config"))); + + List> groups = router.groupSourceTablesByRouteRule(tableIdSet); + + assertThat(groups).hasSize(3); + + assertThat(groups.get(0)) + .containsExactlyInAnyOrder( + TableId.parse("mydb.order_1"), + TableId.parse("mydb.order_2"), + TableId.parse("mydb.order_100")); + + assertThat(groups.get(1)) + .containsExactlyInAnyOrder( + TableId.parse("mydb.product_1"), TableId.parse("mydb.product_2")); + + assertThat(groups.get(2)) + .containsExactlyInAnyOrder( + TableId.parse("mydb.user"), + TableId.parse("mydb.customer"), + TableId.parse("mydb.config")); + } + + @Test + void testGroupSourceTablesByRouteRuleFirstMatchWithUnmatchedTables() { + List routingRules = + Arrays.asList( + new RouteRule("mydb.order_\\.*", "ods_db.ods_orders"), + new RouteRule("mydb.product_\\.*", "ods_db.ods_products")); + + TableIdRouter router = new TableIdRouter(routingRules, RouteMode.FIRST_MATCH); + + Set tableIdSet = + new HashSet<>( + Arrays.asList( + TableId.parse("mydb.order_1"), + TableId.parse("mydb.order_2"), + TableId.parse("mydb.product_1"), + TableId.parse("otherdb.user"), + TableId.parse("otherdb.customer"))); + + List> groups = router.groupSourceTablesByRouteRule(tableIdSet); + + assertThat(groups).hasSize(2); + + // First group: order tables + assertThat(groups.get(0)) + .containsExactlyInAnyOrder( + TableId.parse("mydb.order_1"), TableId.parse("mydb.order_2")); + + // Second group: product tables + assertThat(groups.get(1)).containsExactlyInAnyOrder(TableId.parse("mydb.product_1")); + + // Unmatched tables (otherdb.user, otherdb.customer) are not in any group + assertThat(groups.stream().flatMap(Set::stream).collect(Collectors.toList())) + .doesNotContain(TableId.parse("otherdb.user"), TableId.parse("otherdb.customer")); + } + + @Test + void testGroupSourceTablesByRouteRuleFirstMatchWithOverlappingRules() { + List routingRules = + Arrays.asList( + new RouteRule("db.table_\\.*", "db.merged_1"), + new RouteRule("db.table_\\.*", "db.merged_2"), + new RouteRule("db.table_\\.*", "db.merged_3")); + + TableIdRouter router = new TableIdRouter(routingRules, RouteMode.FIRST_MATCH); + + Set tableIdSet = + new HashSet<>( + Arrays.asList( + TableId.parse("db.table_1"), + TableId.parse("db.table_2"), + TableId.parse("db.table_3"))); + + List> groups = router.groupSourceTablesByRouteRule(tableIdSet); + + assertThat(groups).hasSize(3); + + // All tables should match only the first rule (FIRST_MATCH mode) + assertThat(groups.get(0)) + .containsExactlyInAnyOrder( + TableId.parse("db.table_1"), + TableId.parse("db.table_2"), + TableId.parse("db.table_3")); + + // Second and third groups should be empty + assertThat(groups.get(1)).isEmpty(); + assertThat(groups.get(2)).isEmpty(); + } +} diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java index 7ece5d20c87..4245887f21c 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java @@ -28,6 +28,7 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.event.TruncateTableEvent; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; @@ -107,6 +108,7 @@ void testLenientSchemaEvolution() throws Exception { () -> new SchemaOperator( ROUTING_RULES, + RouteMode.ALL_MATCH, Duration.ofMinutes(3), SchemaChangeBehavior.LENIENT, "UTC"), @@ -247,6 +249,7 @@ void testIgnoreSchemaEvolution() throws Exception { () -> new SchemaOperator( ROUTING_RULES, + RouteMode.ALL_MATCH, Duration.ofMinutes(3), SchemaChangeBehavior.IGNORE, "UTC"), @@ -349,6 +352,7 @@ void testExceptionSchemaEvolution() throws Exception { () -> new SchemaOperator( ROUTING_RULES, + RouteMode.ALL_MATCH, Duration.ofMinutes(3), SchemaChangeBehavior.EXCEPTION, "UTC"), diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java index 740c77bd49d..7bdf37ed49a 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java @@ -32,6 +32,7 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.exceptions.SchemaEvolveException; import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; @@ -87,7 +88,8 @@ void testEvolveSchema() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new SchemaOperator( + new ArrayList<>(), RouteMode.ALL_MATCH, Duration.ofSeconds(30), behavior); RegularEventOperatorTestHarness harness = RegularEventOperatorTestHarness.withDurationAndBehavior( schemaOperator, 17, Duration.ofSeconds(3), behavior); @@ -365,7 +367,8 @@ void testTryEvolveSchema() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.TRY_EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new SchemaOperator( + new ArrayList<>(), RouteMode.ALL_MATCH, Duration.ofSeconds(30), behavior); RegularEventOperatorTestHarness harness = RegularEventOperatorTestHarness.withDurationAndBehavior( schemaOperator, 17, Duration.ofSeconds(3), behavior); @@ -643,7 +646,8 @@ void testExceptionEvolveSchema() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.EXCEPTION; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new SchemaOperator( + new ArrayList<>(), RouteMode.ALL_MATCH, Duration.ofSeconds(30), behavior); RegularEventOperatorTestHarness harness = RegularEventOperatorTestHarness.withDurationAndBehavior( schemaOperator, 17, Duration.ofSeconds(3), behavior); @@ -737,7 +741,8 @@ void testIgnoreEvolveSchema() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.IGNORE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new SchemaOperator( + new ArrayList<>(), RouteMode.ALL_MATCH, Duration.ofSeconds(30), behavior); RegularEventOperatorTestHarness harness = RegularEventOperatorTestHarness.withDurationAndBehavior( schemaOperator, 17, Duration.ofSeconds(3), behavior); @@ -1033,7 +1038,8 @@ void testEvolveSchemaWithFailure() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new SchemaOperator( + new ArrayList<>(), RouteMode.ALL_MATCH, Duration.ofSeconds(30), behavior); RegularEventOperatorTestHarness harness = RegularEventOperatorTestHarness.withDurationAndFineGrainedBehaviorWithError( schemaOperator, @@ -1131,7 +1137,8 @@ void testTryEvolveSchemaWithFailure() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.TRY_EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new SchemaOperator( + new ArrayList<>(), RouteMode.ALL_MATCH, Duration.ofSeconds(30), behavior); // All types of schema change events will be sent to the sink // AddColumn and RenameColumn events will always fail @@ -1461,7 +1468,8 @@ void testFineGrainedSchemaEvolves() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.EVOLVE; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new SchemaOperator( + new ArrayList<>(), RouteMode.ALL_MATCH, Duration.ofSeconds(30), behavior); // All types of schema change events will be sent to the sink // AddColumn and RenameColumn events will always fail @@ -1795,7 +1803,8 @@ void testLenientSchemaEvolves() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new SchemaOperator( + new ArrayList<>(), RouteMode.ALL_MATCH, Duration.ofSeconds(30), behavior); RegularEventOperatorTestHarness harness = RegularEventOperatorTestHarness.withDurationAndBehavior( schemaOperator, 17, Duration.ofSeconds(3), behavior); @@ -2196,7 +2205,8 @@ void testLenientEvolveTweaks() throws Exception { SchemaChangeBehavior behavior = SchemaChangeBehavior.LENIENT; SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), behavior); + new SchemaOperator( + new ArrayList<>(), RouteMode.ALL_MATCH, Duration.ofSeconds(30), behavior); RegularEventOperatorTestHarness harness = RegularEventOperatorTestHarness.withDurationAndBehavior( schemaOperator, 17, Duration.ofSeconds(3), behavior); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java index bd7ed4da086..b547a820b53 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.event.DataChangeEvent; import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataTypes; import org.apache.flink.cdc.common.types.RowType; @@ -122,7 +123,7 @@ void testProcessElement() throws Exception { @Test void testProcessSchemaChangeEventWithTimeOut() throws Exception { SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(1)); + new SchemaOperator(new ArrayList<>(), RouteMode.ALL_MATCH, Duration.ofSeconds(1)); RegularEventOperatorTestHarness harness = RegularEventOperatorTestHarness.withDuration( schemaOperator, 1, Duration.ofSeconds(3)); @@ -141,7 +142,7 @@ void testProcessSchemaChangeEventWithTimeOut() throws Exception { @Test void testProcessSchemaChangeEventWithOutTimeOut() throws Exception { SchemaOperator schemaOperator = - new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30)); + new SchemaOperator(new ArrayList<>(), RouteMode.ALL_MATCH, Duration.ofSeconds(30)); RegularEventOperatorTestHarness harness = RegularEventOperatorTestHarness.withDuration( schemaOperator, 1, Duration.ofSeconds(3)); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java index cbc1f407d28..3e81ec82b5c 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java @@ -20,6 +20,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter; @@ -99,6 +100,7 @@ public DistributedEventOperatorTestHarness( Executors.newFixedThreadPool(1), new CollectingMetadataApplier(applyDuration), new ArrayList<>(), + RouteMode.ALL_MATCH, SchemaChangeBehavior.LENIENT, rpcTimeout); this.schemaRegistryGateway = new TestingSchemaRegistryGateway(schemaCoordinator); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java index 8a3a21ba664..9b2eb2ee4af 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/RegularEventOperatorTestHarness.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.PipelineOptions; +import org.apache.flink.cdc.common.pipeline.RouteMode; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter; @@ -124,6 +125,7 @@ private RegularEventOperatorTestHarness( new CollectingMetadataApplier( schemaEvolveDuration, enabledEventTypes, errorsOnEventTypes), new ArrayList<>(), + RouteMode.ALL_MATCH, behavior, rpcTimeout); schemaRegistryGateway = new TestingSchemaRegistryGateway(schemaRegistry);