Skip to content

Commit c9ffab7

Browse files
authored
[FLINK-38831][runtime] Route configuration support FIRST_MATCH mode (apache#4212)
1 parent bcb4a4b commit c9ffab7

File tree

34 files changed

+921
-45
lines changed

34 files changed

+921
-45
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ SELECT * FROM test_table;
100100
pipeline:
101101
name: Sync MySQL Database to Doris
102102
parallelism: 2
103+
route-mode: ALL_MATCH
103104
user-defined-function:
104105
- name: addone
105106
classpath: com.example.functions.AddOneFunctionClass

docs/content.zh/docs/core-concept/data-pipeline.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ under the License.
118118
| `parallelism` | pipeline的全局并发度,默认值是1。 | optional |
119119
| `local-time-zone` | 作业级别的本地时区。 | optional |
120120
| `execution.runtime-mode` | pipeline 的运行模式,包含 STREAMING 和 BATCH,默认值是 STREAMING。 | optional |
121+
| `route-mode` | [路由]({{< ref "docs/core-concept/route" >}}#路由模式)规则的匹配模式。可选值:`ALL_MATCH`(默认,应用所有匹配的规则)或 `FIRST_MATCH`(只应用第一个匹配的规则)。 | optional |
121122
| `schema.change.behavior` | 如何处理 [schema 变更]({{< ref "docs/core-concept/schema-evolution" >}})。可选值:[`exception`]({{< ref "docs/core-concept/schema-evolution" >}}#exception-mode)、[`evolve`]({{< ref "docs/core-concept/schema-evolution" >}}#evolve-mode)、[`try_evolve`]({{< ref "docs/core-concept/schema-evolution" >}}#tryevolve-mode)、[`lenient`]({{< ref "docs/core-concept/schema-evolution" >}}#lenient-mode)(默认值)或 [`ignore`]({{< ref "docs/core-concept/schema-evolution" >}}#ignore-mode)。 | optional |
122123
| `schema.operator.uid` | Schema 算子的唯一 ID。此 ID 用于算子间通信,必须在所有算子中保持唯一。**已废弃**:请使用 `operator.uid.prefix` 代替。 | optional |
123124
| `schema-operator.rpc-timeout` | SchemaOperator 等待下游 SchemaChangeEvent 应用完成的超时时间,默认值是 3 分钟。 | optional |

docs/content.zh/docs/core-concept/route.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,29 @@ under the License.
3939

4040
一个 Route 模块可以包含一个或多个 source-table/sink-table 规则。
4141

42+
# 路由模式
43+
默认情况下,所有匹配的路由规则都会被应用到表上。你可以在 pipeline 配置中通过 `route-mode` 选项来改变这一行为:
44+
45+
|| 描述 |
46+
|--------------|-------------------------------------------------|
47+
| `ALL_MATCH` | 应用所有匹配的路由规则到表上。这是默认模式。 |
48+
| `FIRST_MATCH`| 只应用第一个匹配的路由规则,并停止后续规则的计算。 |
49+
50+
例如,使用 `FIRST_MATCH` 模式:
51+
52+
```yaml
53+
pipeline:
54+
name: Sync MySQL Database to Doris
55+
parallelism: 2
56+
route-mode: FIRST_MATCH
57+
```
58+
59+
{{< hint info >}}
60+
61+
当使用 `FIRST_MATCH` 模式时,路由规则会按照定义的顺序进行计算。第一个匹配源表的规则会被应用,后续的规则将被跳过。
62+
63+
{{< /hint >}}
64+
4265
# 示例
4366
## 路由一个 Data Source 表到一个 Data Sink 表
4467
如果同步一个 `mydb` 数据库中的 `web_order` 表到一个相同库的 `ods_web_order` 表,我们可以使用下面的 yaml 文件来定义这个路由:

docs/content/docs/core-concept/data-pipeline.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ Note that whilst the parameters are each individually optional, at least one of
120120
| `parallelism` | The global parallelism of the pipeline. Defaults to 1. | optional |
121121
| `local-time-zone` | The local time zone defines current session time zone id. | optional |
122122
| `execution.runtime-mode` | The runtime mode of the pipeline includes STREAMING and BATCH, with the default value being STREAMING. | optional |
123+
| `route-mode` | The matching mode for [route]({{< ref "docs/core-concept/route" >}}#route-mode) rules. One of: `ALL_MATCH` (default, apply all matching rules) or `FIRST_MATCH` (apply only the first matching rule). | optional |
123124
| `schema.change.behavior` | How to handle [changes in schema]({{< ref "docs/core-concept/schema-evolution" >}}). One of: [`exception`]({{< ref "docs/core-concept/schema-evolution" >}}#exception-mode), [`evolve`]({{< ref "docs/core-concept/schema-evolution" >}}#evolve-mode), [`try_evolve`]({{< ref "docs/core-concept/schema-evolution" >}}#tryevolve-mode), [`lenient`]({{< ref "docs/core-concept/schema-evolution" >}}#lenient-mode) (default) or [`ignore`]({{< ref "docs/core-concept/schema-evolution" >}}#ignore-mode). | optional |
124125
| `schema.operator.uid` | The unique ID for schema operator. This ID will be used for inter-operator communications and must be unique across operators. **Deprecated**: use `operator.uid.prefix` instead. | optional |
125126
| `schema-operator.rpc-timeout` | The timeout time for SchemaOperator to wait downstream SchemaChangeEvent applying finished, the default value is 3 minutes. | optional |

docs/content/docs/core-concept/route.md

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ under the License.
2828
**Route** specifies the rule of matching a list of source-table and mapping to sink-table. The most typical scenario is the merge of sub-databases and sub-tables, routing multiple upstream source tables to the same sink table.
2929

3030
# Parameters
31-
To describe a route, the follows are required:
31+
To describe a route, the follows are required:
3232

3333
| parameter | meaning | optional/required |
3434
|----------------|---------------------------------------------------------------------------------------------|-------------------|
@@ -39,6 +39,29 @@ To describe a route, the follows are required:
3939

4040
A route module can contain a list of source-table/sink-table rules.
4141

42+
# Route Mode
43+
By default, all matching route rules are applied to a table. You can configure the `route-mode` option in the pipeline configuration to change this behavior:
44+
45+
| Value | Description |
46+
|--------------|------------------------------------------------------------------------|
47+
| `ALL_MATCH` | Apply all matching route rules to a table. This is the default mode. |
48+
| `FIRST_MATCH`| Apply only the first matching route rule and stop evaluation. |
49+
50+
For example, to use `FIRST_MATCH` mode:
51+
52+
```yaml
53+
pipeline:
54+
name: Sync MySQL Database to Doris
55+
parallelism: 2
56+
route-mode: FIRST_MATCH
57+
```
58+
59+
{{< hint info >}}
60+
61+
When using `FIRST_MATCH` mode, route rules are evaluated in the order they are defined. The first rule that matches the source table will be applied, and subsequent rules will be skipped.
62+
63+
{{< /hint >}}
64+
4265
# Example
4366
## Route one Data Source table to one Data Sink table
4467
if synchronize the table `web_order` in the database `mydb` to a Doris table `ods_web_order`, we can use this yaml file to define this route:

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,15 @@ void testUdfDefinitionWithOptions() throws Exception {
206206
assertThat(pipelineDef).isEqualTo(pipelineDefWithUdfOptions);
207207
}
208208

209+
@Test
210+
void testRouteMode() throws Exception {
211+
URL resource =
212+
Resources.getResource("definitions/pipeline-definition-with-route-mode.yaml");
213+
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
214+
PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration());
215+
assertThat(pipelineDef).isEqualTo(pipelineDefWithRouteMode);
216+
}
217+
209218
@Test
210219
void testSchemaEvolutionTypesConfiguration() throws Exception {
211220
testSchemaEvolutionTypesParsing(
@@ -716,4 +725,60 @@ void testParsingFullDefinitionFromString() throws Exception {
716725
ImmutableMap.<String, String>builder()
717726
.put("parallelism", "1")
718727
.build()));
728+
729+
private final PipelineDef pipelineDefWithRouteMode =
730+
new PipelineDef(
731+
new SourceDef(
732+
"mysql",
733+
null,
734+
Configuration.fromMap(
735+
ImmutableMap.<String, String>builder()
736+
.put("hostname", "localhost")
737+
.put("port", "3306")
738+
.put("username", "root")
739+
.put("password", "123456")
740+
.put("tables", "mydb.\\.*")
741+
.put("server-id", "5400-5404")
742+
.put("server-time-zone", "UTC")
743+
.build())),
744+
new SinkDef(
745+
"doris",
746+
null,
747+
Configuration.fromMap(
748+
ImmutableMap.<String, String>builder()
749+
.put("fenodes", "127.0.0.1:8030")
750+
.put("username", "root")
751+
.put("password", "")
752+
.build()),
753+
ImmutableSet.of(
754+
DROP_COLUMN,
755+
ALTER_COLUMN_TYPE,
756+
ADD_COLUMN,
757+
CREATE_TABLE,
758+
RENAME_COLUMN)),
759+
Arrays.asList(
760+
new RouteDef(
761+
"mydb.order_.*",
762+
"ods_db.ods_orders",
763+
null,
764+
"Merge all order sharded tables"),
765+
new RouteDef(
766+
"mydb.product_.*",
767+
"ods_db.ods_products",
768+
null,
769+
"Merge all product sharded tables"),
770+
new RouteDef(
771+
"mydb.*",
772+
"ods_db.ods_<>",
773+
"<>",
774+
"One-to-one mapping for other tables")),
775+
Collections.emptyList(),
776+
Collections.emptyList(),
777+
Collections.emptyList(),
778+
Configuration.fromMap(
779+
ImmutableMap.<String, String>builder()
780+
.put("name", "mysql_to_doris_with_route_match_mode")
781+
.put("parallelism", "2")
782+
.put("route-mode", "FIRST_MATCH")
783+
.build()));
719784
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
# Example pipeline definition with route match-mode
17+
source:
18+
type: mysql
19+
hostname: localhost
20+
port: 3306
21+
username: root
22+
password: 123456
23+
tables: mydb.\.*
24+
server-id: 5400-5404
25+
server-time-zone: UTC
26+
27+
sink:
28+
type: doris
29+
fenodes: 127.0.0.1:8030
30+
username: root
31+
password: ""
32+
33+
route:
34+
- source-table: mydb.order_.*
35+
sink-table: ods_db.ods_orders
36+
description: "Merge all order sharded tables"
37+
- source-table: mydb.product_.*
38+
sink-table: ods_db.ods_products
39+
description: "Merge all product sharded tables"
40+
- source-table: mydb.*
41+
sink-table: ods_db.ods_<>
42+
replace-symbol: <>
43+
description: "One-to-one mapping for other tables"
44+
45+
pipeline:
46+
name: mysql_to_doris_with_route_match_mode
47+
parallelism: 2
48+
route-mode: FIRST_MATCH

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,22 @@ public class PipelineOptions {
7272
"EXCEPTION: Throw an exception to terminate the sync pipeline.")))
7373
.build());
7474

75+
public static final ConfigOption<RouteMode> PIPELINE_ROUTE_MODE =
76+
ConfigOptions.key("route-mode")
77+
.enumType(RouteMode.class)
78+
.defaultValue(RouteMode.ALL_MATCH)
79+
.withDescription(
80+
Description.builder()
81+
.text("Match mode for routing rules. ")
82+
.linebreak()
83+
.add(
84+
ListElement.list(
85+
text(
86+
"ALL_MATCH: Apply all matching route rules to a table."),
87+
text(
88+
"FIRST_MATCH: Apply only the first matching route rule and stop evaluation.")))
89+
.build());
90+
7591
public static final ConfigOption<String> PIPELINE_LOCAL_TIME_ZONE =
7692
ConfigOptions.key("local-time-zone")
7793
.stringType()
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.pipeline;
19+
20+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
22+
/** Route mode for routing rules. */
23+
@PublicEvolving
24+
public enum RouteMode {
25+
/** Match all applicable routing rules. */
26+
ALL_MATCH,
27+
/** Match only the first applicable routing rule. */
28+
FIRST_MATCH;
29+
}

0 commit comments

Comments
 (0)