Skip to content

Commit 9c9f720

Browse files
linguoxuanyuxiqian
authored andcommitted
[FLINK-38831][route] The route configuration supports matching the first one
1 parent 91ae677 commit 9c9f720

30 files changed

Lines changed: 723 additions & 38 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ SELECT * FROM test_table;
100100
pipeline:
101101
name: Sync MySQL Database to Doris
102102
parallelism: 2
103+
route-mode: ALL_MATCH
103104
user-defined-function:
104105
- name: addone
105106
classpath: com.example.functions.AddOneFunctionClass

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,15 @@ void testUdfDefinitionWithOptions() throws Exception {
206206
assertThat(pipelineDef).isEqualTo(pipelineDefWithUdfOptions);
207207
}
208208

209+
@Test
210+
void testRouteMode() throws Exception {
211+
URL resource =
212+
Resources.getResource("definitions/pipeline-definition-with-route-mode.yaml");
213+
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
214+
PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration());
215+
assertThat(pipelineDef).isEqualTo(pipelineDefWithRouteMode);
216+
}
217+
209218
@Test
210219
void testSchemaEvolutionTypesConfiguration() throws Exception {
211220
testSchemaEvolutionTypesParsing(
@@ -716,4 +725,60 @@ void testParsingFullDefinitionFromString() throws Exception {
716725
ImmutableMap.<String, String>builder()
717726
.put("parallelism", "1")
718727
.build()));
728+
729+
private final PipelineDef pipelineDefWithRouteMode =
730+
new PipelineDef(
731+
new SourceDef(
732+
"mysql",
733+
null,
734+
Configuration.fromMap(
735+
ImmutableMap.<String, String>builder()
736+
.put("hostname", "localhost")
737+
.put("port", "3306")
738+
.put("username", "root")
739+
.put("password", "123456")
740+
.put("tables", "mydb.\\.*")
741+
.put("server-id", "5400-5404")
742+
.put("server-time-zone", "UTC")
743+
.build())),
744+
new SinkDef(
745+
"doris",
746+
null,
747+
Configuration.fromMap(
748+
ImmutableMap.<String, String>builder()
749+
.put("fenodes", "127.0.0.1:8030")
750+
.put("username", "root")
751+
.put("password", "")
752+
.build()),
753+
ImmutableSet.of(
754+
DROP_COLUMN,
755+
ALTER_COLUMN_TYPE,
756+
ADD_COLUMN,
757+
CREATE_TABLE,
758+
RENAME_COLUMN)),
759+
Arrays.asList(
760+
new RouteDef(
761+
"mydb.order_.*",
762+
"ods_db.ods_orders",
763+
null,
764+
"Merge all order sharded tables"),
765+
new RouteDef(
766+
"mydb.product_.*",
767+
"ods_db.ods_products",
768+
null,
769+
"Merge all product sharded tables"),
770+
new RouteDef(
771+
"mydb.*",
772+
"ods_db.ods_<>",
773+
"<>",
774+
"One-to-one mapping for other tables")),
775+
Collections.emptyList(),
776+
Collections.emptyList(),
777+
Collections.emptyList(),
778+
Configuration.fromMap(
779+
ImmutableMap.<String, String>builder()
780+
.put("name", "mysql_to_doris_with_route_match_mode")
781+
.put("parallelism", "2")
782+
.put("route-mode", "FIRST_MATCH")
783+
.build()));
719784
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
# Example pipeline definition with route match-mode
17+
source:
18+
type: mysql
19+
hostname: localhost
20+
port: 3306
21+
username: root
22+
password: 123456
23+
tables: mydb.\.*
24+
server-id: 5400-5404
25+
server-time-zone: UTC
26+
27+
sink:
28+
type: doris
29+
fenodes: 127.0.0.1:8030
30+
username: root
31+
password: ""
32+
33+
route:
34+
- source-table: mydb.order_.*
35+
sink-table: ods_db.ods_orders
36+
description: "Merge all order sharded tables"
37+
- source-table: mydb.product_.*
38+
sink-table: ods_db.ods_products
39+
description: "Merge all product sharded tables"
40+
- source-table: mydb.*
41+
sink-table: ods_db.ods_<>
42+
replace-symbol: <>
43+
description: "One-to-one mapping for other tables"
44+
45+
pipeline:
46+
name: mysql_to_doris_with_route_match_mode
47+
parallelism: 2
48+
route-mode: FIRST_MATCH

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,22 @@ public class PipelineOptions {
7272
"EXCEPTION: Throw an exception to terminate the sync pipeline.")))
7373
.build());
7474

75+
public static final ConfigOption<RouteMode> PIPELINE_ROUTE_MODE =
76+
ConfigOptions.key("route-mode")
77+
.enumType(RouteMode.class)
78+
.defaultValue(RouteMode.ALL_MATCH)
79+
.withDescription(
80+
Description.builder()
81+
.text("Match mode for routing rules. ")
82+
.linebreak()
83+
.add(
84+
ListElement.list(
85+
text(
86+
"ALL_MATCH: Apply all matching route rules to a table."),
87+
text(
88+
"FIRST_MATCH: Apply only the first matching route rule and stop evaluation.")))
89+
.build());
90+
7591
public static final ConfigOption<String> PIPELINE_LOCAL_TIME_ZONE =
7692
ConfigOptions.key("local-time-zone")
7793
.stringType()
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.pipeline;
19+
20+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
22+
/** Route mode for routing rules. */
23+
@PublicEvolving
24+
public enum RouteMode {
25+
/** Match all applicable routing rules. */
26+
ALL_MATCH,
27+
/** Match only the first applicable routing rule. */
28+
FIRST_MATCH;
29+
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/TableIdRouter.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.api.java.tuple.Tuple3;
2121
import org.apache.flink.cdc.common.annotation.PublicEvolving;
2222
import org.apache.flink.cdc.common.event.TableId;
23+
import org.apache.flink.cdc.common.pipeline.RouteMode;
2324
import org.apache.flink.cdc.common.schema.Selectors;
2425

2526
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
@@ -53,6 +54,7 @@ public class TableIdRouter {
5354

5455
private final List<Tuple3<Pattern, String, String>> routes;
5556
private final LoadingCache<TableId, List<TableId>> routingCache;
57+
private final RouteMode routeMode;
5658

5759
private static final String DOT_PLACEHOLDER = "_dot_placeholder_";
5860

@@ -105,7 +107,8 @@ public static String convertTableListToRegExpPattern(String tables) {
105107
return standardRegExpTableCaptureList;
106108
}
107109

108-
public TableIdRouter(List<RouteRule> routingRules) {
110+
public TableIdRouter(List<RouteRule> routingRules, RouteMode routeMode) {
111+
this.routeMode = routeMode;
109112
this.routes = new ArrayList<>();
110113
for (RouteRule rule : routingRules) {
111114
try {
@@ -139,11 +142,19 @@ public List<TableId> route(TableId sourceTableId) {
139142
}
140143

141144
private List<TableId> calculateRoute(TableId sourceTableId) {
142-
List<TableId> routedTableIds =
143-
routes.stream()
144-
.filter(route -> matches(route.f0, sourceTableId))
145-
.map(route -> resolveReplacement(sourceTableId, route))
146-
.collect(Collectors.toList());
145+
List<TableId> routedTableIds = new ArrayList<>();
146+
147+
for (Tuple3<Pattern, String, String> route : routes) {
148+
if (matches(route.f0, sourceTableId)) {
149+
routedTableIds.add(resolveReplacement(sourceTableId, route));
150+
151+
// If match mode is FIRST_MATCH, stop after the first match
152+
if (routeMode == RouteMode.FIRST_MATCH) {
153+
break;
154+
}
155+
}
156+
}
157+
147158
if (routedTableIds.isEmpty()) {
148159
routedTableIds.add(sourceTableId);
149160
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
2121
import org.apache.flink.cdc.common.configuration.Configuration;
22+
import org.apache.flink.cdc.common.pipeline.RouteMode;
2223
import org.apache.flink.cdc.common.pipeline.RuntimeExecutionMode;
2324
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
2425
import org.apache.flink.cdc.composer.PipelineComposer;
@@ -32,6 +33,7 @@
3233

3334
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE;
3435
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
36+
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_ROUTE_MODE;
3537

3638
/**
3739
* Definition of a pipeline.
@@ -100,6 +102,10 @@ public List<RouteDef> getRoute() {
100102
return routes;
101103
}
102104

105+
public RouteMode getRouteMode() {
106+
return config.get(PIPELINE_ROUTE_MODE);
107+
}
108+
103109
public List<TransformDef> getTransforms() {
104110
return transforms;
105111
}
@@ -125,6 +131,8 @@ public String toString() {
125131
+ sink
126132
+ ", routes="
127133
+ routes
134+
+ ", routeMode="
135+
+ getRouteMode()
128136
+ ", transforms="
129137
+ transforms
130138
+ ", udfs="

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)
220220
pipelineDef
221221
.getSink()
222222
.getIncludedSchemaEvolutionTypes()),
223-
pipelineDef.getRoute());
223+
pipelineDef.getRoute(),
224+
pipelineDef.getRouteMode());
224225

225226
} else {
226227
// Translate a regular topology for sources without distributed tables
@@ -235,7 +236,8 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)
235236
pipelineDef
236237
.getSink()
237238
.getIncludedSchemaEvolutionTypes()),
238-
pipelineDef.getRoute());
239+
pipelineDef.getRoute(),
240+
pipelineDef.getRouteMode());
239241

240242
// Schema Operator ---(shuffled)---> Partitioning
241243
stream =

0 commit comments

Comments
 (0)