Skip to content

Commit 699b54d

Browse files
author
guoxuanlin
committed
[FLINK-38831][route] The route configuration supports matching the first one
1 parent 42580dc commit 699b54d

26 files changed

Lines changed: 552 additions & 40 deletions

File tree

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2222
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
2323
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
24+
import org.apache.flink.cdc.common.route.RouteRule;
2425
import org.apache.flink.cdc.common.utils.Preconditions;
2526
import org.apache.flink.cdc.common.utils.StringUtils;
2627
import org.apache.flink.cdc.composer.definition.ModelDef;
@@ -80,6 +81,9 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
8081
private static final String ROUTE_REPLACE_SYMBOL = "replace-symbol";
8182
private static final String ROUTE_DESCRIPTION_KEY = "description";
8283

84+
// Pipeline keys
85+
private static final String ROUTE_MODE_KEY = "route-mode";
86+
8387
// Transform keys
8488
private static final String TRANSFORM_SOURCE_TABLE_KEY = "source-table";
8589
private static final String TRANSFORM_PROJECTION_KEY = "projection";
@@ -133,6 +137,8 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
133137
// it's not of plain data types and must be removed before calling toPipelineConfig.
134138
List<UdfDef> udfDefs = new ArrayList<>();
135139
final List<ModelDef> modelDefs = new ArrayList<>();
140+
// Use default route mode if not specified
141+
String routeMode = RouteRule.MatchMode.ALL_MATCH.getConfigValue();
136142
if (pipelineDefJsonNode.get(PIPELINE_KEY) != null) {
137143
Optional.ofNullable(
138144
((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
@@ -143,6 +149,13 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
143149
((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(MODEL_KEY))
144150
.map(node -> validateArray("model", node))
145151
.ifPresent(node -> modelDefs.addAll(parseModels(node)));
152+
153+
// Extract route-mode from pipeline if present
154+
JsonNode pipelineNode = pipelineDefJsonNode.get(PIPELINE_KEY);
155+
if (pipelineNode.has(ROUTE_MODE_KEY)) {
156+
routeMode = pipelineNode.get(ROUTE_MODE_KEY).asText();
157+
((ObjectNode) pipelineNode).remove(ROUTE_MODE_KEY);
158+
}
146159
}
147160

148161
// Pipeline configs are optional
@@ -189,7 +202,14 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
189202
pipelineConfig.addAll(userPipelineConfig);
190203

191204
return new PipelineDef(
192-
sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
205+
sourceDef,
206+
sinkDef,
207+
routeDefs,
208+
routeMode,
209+
transformDefs,
210+
udfDefs,
211+
modelDefs,
212+
pipelineConfig);
193213
}
194214

195215
private SourceDef toSourceDef(JsonNode sourceNode) {

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,15 @@ void testUdfDefinition() throws Exception {
197197
assertThat(pipelineDef).isEqualTo(pipelineDefWithUdf);
198198
}
199199

200+
@Test
201+
void testRouteMode() throws Exception {
202+
URL resource =
203+
Resources.getResource("definitions/pipeline-definition-with-route-mode.yaml");
204+
YamlPipelineDefinitionParser parser = new YamlPipelineDefinitionParser();
205+
PipelineDef pipelineDef = parser.parse(new Path(resource.toURI()), new Configuration());
206+
assertThat(pipelineDef).isEqualTo(pipelineDefWithRouteMode);
207+
}
208+
200209
@Test
201210
void testSchemaEvolutionTypesConfiguration() throws Exception {
202211
testSchemaEvolutionTypesParsing(
@@ -669,4 +678,60 @@ void testParsingFullDefinitionFromString() throws Exception {
669678
ImmutableMap.<String, String>builder()
670679
.put("parallelism", "1")
671680
.build()));
681+
682+
private final PipelineDef pipelineDefWithRouteMode =
683+
new PipelineDef(
684+
new SourceDef(
685+
"mysql",
686+
null,
687+
Configuration.fromMap(
688+
ImmutableMap.<String, String>builder()
689+
.put("hostname", "localhost")
690+
.put("port", "3306")
691+
.put("username", "root")
692+
.put("password", "123456")
693+
.put("tables", "mydb.\\.*")
694+
.put("server-id", "5400-5404")
695+
.put("server-time-zone", "UTC")
696+
.build())),
697+
new SinkDef(
698+
"doris",
699+
null,
700+
Configuration.fromMap(
701+
ImmutableMap.<String, String>builder()
702+
.put("fenodes", "127.0.0.1:8030")
703+
.put("username", "root")
704+
.put("password", "")
705+
.build()),
706+
ImmutableSet.of(
707+
DROP_COLUMN,
708+
ALTER_COLUMN_TYPE,
709+
ADD_COLUMN,
710+
CREATE_TABLE,
711+
RENAME_COLUMN)),
712+
Arrays.asList(
713+
new RouteDef(
714+
"mydb.order_.*",
715+
"ods_db.ods_orders",
716+
null,
717+
"Merge all order sharded tables"),
718+
new RouteDef(
719+
"mydb.product_.*",
720+
"ods_db.ods_products",
721+
null,
722+
"Merge all product sharded tables"),
723+
new RouteDef(
724+
"mydb.*",
725+
"ods_db.ods_<>",
726+
"<>",
727+
"One-to-one mapping for other tables")),
728+
"first-match",
729+
Collections.emptyList(),
730+
Collections.emptyList(),
731+
Collections.emptyList(),
732+
Configuration.fromMap(
733+
ImmutableMap.<String, String>builder()
734+
.put("name", "mysql_to_doris_with_route_match_mode")
735+
.put("parallelism", "2")
736+
.build()));
672737
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
# Example pipeline definition with route match-mode
17+
source:
18+
type: mysql
19+
hostname: localhost
20+
port: 3306
21+
username: root
22+
password: 123456
23+
tables: mydb.\.*
24+
server-id: 5400-5404
25+
server-time-zone: UTC
26+
27+
sink:
28+
type: doris
29+
fenodes: 127.0.0.1:8030
30+
username: root
31+
password: ""
32+
33+
route:
34+
- source-table: mydb.order_.*
35+
sink-table: ods_db.ods_orders
36+
description: "Merge all order sharded tables"
37+
- source-table: mydb.product_.*
38+
sink-table: ods_db.ods_products
39+
description: "Merge all product sharded tables"
40+
- source-table: mydb.*
41+
sink-table: ods_db.ods_<>
42+
replace-symbol: <>
43+
description: "One-to-one mapping for other tables"
44+
45+
pipeline:
46+
name: mysql_to_doris_with_route_match_mode
47+
parallelism: 2
48+
route-mode: first-match

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,43 @@ public class RouteRule implements Serializable {
2424

2525
private static final long serialVersionUID = 1L;
2626

27+
/** Match mode for routing rules. */
28+
public enum MatchMode {
29+
/** Match all applicable routing rules. */
30+
ALL_MATCH("all-match"),
31+
/** Match only the first applicable routing rule. */
32+
FIRST_MATCH("first-match");
33+
34+
private final String configValue;
35+
36+
MatchMode(String configValue) {
37+
this.configValue = configValue;
38+
}
39+
40+
public String getConfigValue() {
41+
return configValue;
42+
}
43+
44+
/**
45+
* Parse match mode from configuration value.
46+
*
47+
* @param value the configuration value
48+
* @return the corresponding MatchMode
49+
* @throws IllegalArgumentException if the value is invalid
50+
*/
51+
public static MatchMode fromConfigValue(String value) {
52+
for (MatchMode mode : values()) {
53+
if (mode.configValue.equalsIgnoreCase(value)) {
54+
return mode;
55+
}
56+
}
57+
throw new IllegalArgumentException(
58+
String.format(
59+
"Invalid match-mode value: %s. Allowed values are: %s, %s",
60+
value, FIRST_MATCH.configValue, ALL_MATCH.configValue));
61+
}
62+
}
63+
2764
public RouteRule(String sourceTable, String sinkTable) {
2865
this(sourceTable, sinkTable, null);
2966
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
2121
import org.apache.flink.cdc.common.configuration.Configuration;
2222
import org.apache.flink.cdc.common.pipeline.RuntimeExecutionMode;
23+
import org.apache.flink.cdc.common.route.RouteRule;
2324
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
2425
import org.apache.flink.cdc.composer.PipelineComposer;
2526
import org.apache.flink.cdc.composer.PipelineExecution;
@@ -56,6 +57,7 @@ public class PipelineDef {
5657
private final SourceDef source;
5758
private final SinkDef sink;
5859
private final List<RouteDef> routes;
60+
private final String routeMode;
5961
private final List<TransformDef> transforms;
6062
private final List<UdfDef> udfs;
6163
private final List<ModelDef> models;
@@ -65,13 +67,15 @@ public PipelineDef(
6567
SourceDef source,
6668
SinkDef sink,
6769
List<RouteDef> routes,
70+
String routeMode,
6871
List<TransformDef> transforms,
6972
List<UdfDef> udfs,
7073
List<ModelDef> models,
7174
Configuration config) {
7275
this.source = source;
7376
this.sink = sink;
7477
this.routes = routes;
78+
this.routeMode = routeMode;
7579
this.transforms = transforms;
7680
this.udfs = udfs;
7781
this.models = models;
@@ -85,7 +89,34 @@ public PipelineDef(
8589
List<TransformDef> transforms,
8690
List<UdfDef> udfs,
8791
Configuration config) {
88-
this(source, sink, routes, transforms, udfs, new ArrayList<>(), config);
92+
this(
93+
source,
94+
sink,
95+
routes,
96+
RouteRule.MatchMode.ALL_MATCH.getConfigValue(),
97+
transforms,
98+
udfs,
99+
new ArrayList<>(),
100+
config);
101+
}
102+
103+
public PipelineDef(
104+
SourceDef source,
105+
SinkDef sink,
106+
List<RouteDef> routes,
107+
List<TransformDef> transforms,
108+
List<UdfDef> udfs,
109+
List<ModelDef> models,
110+
Configuration config) {
111+
this(
112+
source,
113+
sink,
114+
routes,
115+
RouteRule.MatchMode.ALL_MATCH.getConfigValue(),
116+
transforms,
117+
udfs,
118+
models,
119+
config);
89120
}
90121

91122
public SourceDef getSource() {
@@ -100,6 +131,10 @@ public List<RouteDef> getRoute() {
100131
return routes;
101132
}
102133

134+
public String getRouteMode() {
135+
return routeMode;
136+
}
137+
103138
public List<TransformDef> getTransforms() {
104139
return transforms;
105140
}
@@ -125,6 +160,8 @@ public String toString() {
125160
+ sink
126161
+ ", routes="
127162
+ routes
163+
+ ", routeMode="
164+
+ routeMode
128165
+ ", transforms="
129166
+ transforms
130167
+ ", udfs="
@@ -148,6 +185,7 @@ public boolean equals(Object o) {
148185
return Objects.equals(source, that.source)
149186
&& Objects.equals(sink, that.sink)
150187
&& Objects.equals(routes, that.routes)
188+
&& Objects.equals(routeMode, that.routeMode)
151189
&& Objects.equals(transforms, that.transforms)
152190
&& Objects.equals(udfs, that.udfs)
153191
&& Objects.equals(models, that.models)
@@ -156,7 +194,7 @@ public boolean equals(Object o) {
156194

157195
@Override
158196
public int hashCode() {
159-
return Objects.hash(source, sink, routes, transforms, udfs, models, config);
197+
return Objects.hash(source, sink, routes, routeMode, transforms, udfs, models, config);
160198
}
161199

162200
// ------------------------------------------------------------------------

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)
220220
pipelineDef
221221
.getSink()
222222
.getIncludedSchemaEvolutionTypes()),
223-
pipelineDef.getRoute());
223+
pipelineDef.getRoute(),
224+
pipelineDef.getRouteMode());
224225

225226
} else {
226227
// Translate a regular topology for sources without distributed tables
@@ -235,7 +236,8 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)
235236
pipelineDef
236237
.getSink()
237238
.getIncludedSchemaEvolutionTypes()),
238-
pipelineDef.getRoute());
239+
pipelineDef.getRoute(),
240+
pipelineDef.getRouteMode());
239241

240242
// Schema Operator ---(shuffled)---> Partitioning
241243
stream =

0 commit comments

Comments
 (0)