Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ SELECT * FROM test_table;
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
route-mode: ALL_MATCH
user-defined-function:
- name: addone
classpath: com.example.functions.AddOneFunctionClass
Expand Down
1 change: 1 addition & 0 deletions docs/content.zh/docs/core-concept/data-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ under the License.
| `parallelism` | pipeline的全局并发度,默认值是1。 | optional |
| `local-time-zone` | 作业级别的本地时区。 | optional |
| `execution.runtime-mode` | pipeline 的运行模式,包含 STREAMING 和 BATCH,默认值是 STREAMING。 | optional |
| `route-mode` | [路由]({{< ref "docs/core-concept/route" >}}#路由模式)规则的匹配模式。可选值:`ALL_MATCH`(默认,应用所有匹配的规则)或 `FIRST_MATCH`(只应用第一个匹配的规则)。 | optional |
| `schema.change.behavior` | 如何处理 [schema 变更]({{< ref "docs/core-concept/schema-evolution" >}})。可选值:[`exception`]({{< ref "docs/core-concept/schema-evolution" >}}#exception-mode)、[`evolve`]({{< ref "docs/core-concept/schema-evolution" >}}#evolve-mode)、[`try_evolve`]({{< ref "docs/core-concept/schema-evolution" >}}#tryevolve-mode)、[`lenient`]({{< ref "docs/core-concept/schema-evolution" >}}#lenient-mode)(默认值)或 [`ignore`]({{< ref "docs/core-concept/schema-evolution" >}}#ignore-mode)。 | optional |
| `schema.operator.uid` | Schema 算子的唯一 ID。此 ID 用于算子间通信,必须在所有算子中保持唯一。**已废弃**:请使用 `operator.uid.prefix` 代替。 | optional |
| `schema-operator.rpc-timeout` | SchemaOperator 等待下游 SchemaChangeEvent 应用完成的超时时间,默认值是 3 分钟。 | optional |
Expand Down
23 changes: 23 additions & 0 deletions docs/content.zh/docs/core-concept/route.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,29 @@ under the License.

一个 Route 模块可以包含一个或多个 source-table/sink-table 规则。

# 路由模式
默认情况下,所有匹配的路由规则都会被应用到表上。你可以在 pipeline 配置中通过 `route-mode` 选项来改变这一行为:

| 值 | 描述 |
|--------------|-------------------------------------------------|
| `ALL_MATCH` | 应用所有匹配的路由规则到表上。这是默认模式。 |
| `FIRST_MATCH`| 只应用第一个匹配的路由规则,并停止后续规则的计算。 |

例如,使用 `FIRST_MATCH` 模式:

```yaml
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
route-mode: FIRST_MATCH
```

{{< hint info >}}

当使用 `FIRST_MATCH` 模式时,路由规则会按照定义的顺序进行计算。第一个匹配源表的规则会被应用,后续的规则将被跳过。

{{< /hint >}}

# 示例
## 路由一个 Data Source 表到一个 Data Sink 表
如果同步一个 `mydb` 数据库中的 `web_order` 表到一个相同库的 `ods_web_order` 表,我们可以使用下面的 yaml 文件来定义这个路由:
Expand Down
1 change: 1 addition & 0 deletions docs/content/docs/core-concept/data-pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ Note that whilst the parameters are each individually optional, at least one of
| `parallelism` | The global parallelism of the pipeline. Defaults to 1. | optional |
| `local-time-zone` | The local time zone defines current session time zone id. | optional |
| `execution.runtime-mode` | The runtime mode of the pipeline includes STREAMING and BATCH, with the default value being STREAMING. | optional |
| `route-mode` | The matching mode for [route]({{< ref "docs/core-concept/route" >}}#route-mode) rules. One of: `ALL_MATCH` (default, apply all matching rules) or `FIRST_MATCH` (apply only the first matching rule). | optional |
| `schema.change.behavior` | How to handle [changes in schema]({{< ref "docs/core-concept/schema-evolution" >}}). One of: [`exception`]({{< ref "docs/core-concept/schema-evolution" >}}#exception-mode), [`evolve`]({{< ref "docs/core-concept/schema-evolution" >}}#evolve-mode), [`try_evolve`]({{< ref "docs/core-concept/schema-evolution" >}}#tryevolve-mode), [`lenient`]({{< ref "docs/core-concept/schema-evolution" >}}#lenient-mode) (default) or [`ignore`]({{< ref "docs/core-concept/schema-evolution" >}}#ignore-mode). | optional |
| `schema.operator.uid` | The unique ID for schema operator. This ID will be used for inter-operator communications and must be unique across operators. **Deprecated**: use `operator.uid.prefix` instead. | optional |
| `schema-operator.rpc-timeout` | The timeout time for SchemaOperator to wait downstream SchemaChangeEvent applying finished, the default value is 3 minutes. | optional |
Expand Down
25 changes: 24 additions & 1 deletion docs/content/docs/core-concept/route.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ under the License.
**Route** specifies the rule of matching a list of source-table and mapping to sink-table. The most typical scenario is the merge of sub-databases and sub-tables, routing multiple upstream source tables to the same sink table.

# Parameters
To describe a route, the follows are required:
To describe a route, the follows are required:

| parameter | meaning | optional/required |
|----------------|---------------------------------------------------------------------------------------------|-------------------|
Expand All @@ -39,6 +39,29 @@ To describe a route, the follows are required:

A route module can contain a list of source-table/sink-table rules.

# Route Mode
By default, all matching route rules are applied to a table. You can configure the `route-mode` option in the pipeline configuration to change this behavior:

| Value | Description |
|--------------|------------------------------------------------------------------------|
| `ALL_MATCH` | Apply all matching route rules to a table. This is the default mode. |
| `FIRST_MATCH`| Apply only the first matching route rule and stop evaluation. |

For example, to use `FIRST_MATCH` mode:

```yaml
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
route-mode: FIRST_MATCH
```

{{< hint info >}}

When using `FIRST_MATCH` mode, route rules are evaluated in the order they are defined. The first rule that matches the source table will be applied, and subsequent rules will be skipped.

{{< /hint >}}

# Example
## Route one Data Source table to one Data Sink table
if synchronize the table `web_order` in the database `mydb` to a Doris table `ods_web_order`, we can use this yaml file to define this route:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,15 @@ void testUdfDefinitionWithOptions() throws Exception {
assertThat(pipelineDef).isEqualTo(pipelineDefWithUdfOptions);
}

@Test
void testRouteMode() throws Exception {
URL resource =
Resources.getResource("definitions/pipeline-definition-with-route-mode.yaml");
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration());
assertThat(pipelineDef).isEqualTo(pipelineDefWithRouteMode);
}

@Test
void testSchemaEvolutionTypesConfiguration() throws Exception {
testSchemaEvolutionTypesParsing(
Expand Down Expand Up @@ -716,4 +725,60 @@ void testParsingFullDefinitionFromString() throws Exception {
ImmutableMap.<String, String>builder()
.put("parallelism", "1")
.build()));

private final PipelineDef pipelineDefWithRouteMode =
new PipelineDef(
new SourceDef(
"mysql",
null,
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("hostname", "localhost")
.put("port", "3306")
.put("username", "root")
.put("password", "123456")
.put("tables", "mydb.\\.*")
.put("server-id", "5400-5404")
.put("server-time-zone", "UTC")
.build())),
new SinkDef(
"doris",
null,
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("fenodes", "127.0.0.1:8030")
.put("username", "root")
.put("password", "")
.build()),
ImmutableSet.of(
DROP_COLUMN,
ALTER_COLUMN_TYPE,
ADD_COLUMN,
CREATE_TABLE,
RENAME_COLUMN)),
Arrays.asList(
new RouteDef(
"mydb.order_.*",
"ods_db.ods_orders",
null,
"Merge all order sharded tables"),
new RouteDef(
"mydb.product_.*",
"ods_db.ods_products",
null,
"Merge all product sharded tables"),
new RouteDef(
"mydb.*",
"ods_db.ods_<>",
"<>",
"One-to-one mapping for other tables")),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
Configuration.fromMap(
ImmutableMap.<String, String>builder()
.put("name", "mysql_to_doris_with_route_match_mode")
.put("parallelism", "2")
.put("route-mode", "FIRST_MATCH")
.build()));
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Example pipeline definition with route match-mode
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: mydb.\.*
server-id: 5400-5404
server-time-zone: UTC

sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: ""

route:
- source-table: mydb.order_.*
sink-table: ods_db.ods_orders
description: "Merge all order sharded tables"
- source-table: mydb.product_.*
sink-table: ods_db.ods_products
description: "Merge all product sharded tables"
- source-table: mydb.*
sink-table: ods_db.ods_<>
Comment thread
yuxiqian marked this conversation as resolved.
replace-symbol: <>
description: "One-to-one mapping for other tables"

pipeline:
name: mysql_to_doris_with_route_match_mode
parallelism: 2
route-mode: FIRST_MATCH
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,22 @@ public class PipelineOptions {
"EXCEPTION: Throw an exception to terminate the sync pipeline.")))
.build());

public static final ConfigOption<RouteMode> PIPELINE_ROUTE_MODE =
ConfigOptions.key("route-mode")
.enumType(RouteMode.class)
.defaultValue(RouteMode.ALL_MATCH)
.withDescription(
Description.builder()
.text("Match mode for routing rules. ")
.linebreak()
.add(
ListElement.list(
text(
"ALL_MATCH: Apply all matching route rules to a table."),
text(
"FIRST_MATCH: Apply only the first matching route rule and stop evaluation.")))
.build());

public static final ConfigOption<String> PIPELINE_LOCAL_TIME_ZONE =
ConfigOptions.key("local-time-zone")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.common.pipeline;

import org.apache.flink.cdc.common.annotation.PublicEvolving;

/** Route mode for routing rules. */
@PublicEvolving
public enum RouteMode {
/** Match all applicable routing rules. */
ALL_MATCH,
/** Match only the first applicable routing rule. */
FIRST_MATCH;
}
Loading
Loading