Skip to content

Commit 51947eb

Browse files
author
guoxuanlin
committed
[FLINK-38831] Address review comments.
1 parent 699b54d commit 51947eb

28 files changed

Lines changed: 449 additions & 244 deletions

File tree

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
2222
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
2323
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
24-
import org.apache.flink.cdc.common.route.RouteRule;
2524
import org.apache.flink.cdc.common.utils.Preconditions;
2625
import org.apache.flink.cdc.common.utils.StringUtils;
2726
import org.apache.flink.cdc.composer.definition.ModelDef;
@@ -81,9 +80,6 @@ public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
8180
private static final String ROUTE_REPLACE_SYMBOL = "replace-symbol";
8281
private static final String ROUTE_DESCRIPTION_KEY = "description";
8382

84-
// Pipeline keys
85-
private static final String ROUTE_MODE_KEY = "route-mode";
86-
8783
// Transform keys
8884
private static final String TRANSFORM_SOURCE_TABLE_KEY = "source-table";
8985
private static final String TRANSFORM_PROJECTION_KEY = "projection";
@@ -137,8 +133,6 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
137133
// it's not of plain data types and must be removed before calling toPipelineConfig.
138134
List<UdfDef> udfDefs = new ArrayList<>();
139135
final List<ModelDef> modelDefs = new ArrayList<>();
140-
// Use default route mode if not specified
141-
String routeMode = RouteRule.MatchMode.ALL_MATCH.getConfigValue();
142136
if (pipelineDefJsonNode.get(PIPELINE_KEY) != null) {
143137
Optional.ofNullable(
144138
((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(UDF_KEY))
@@ -149,13 +143,6 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
149143
((ObjectNode) pipelineDefJsonNode.get(PIPELINE_KEY)).remove(MODEL_KEY))
150144
.map(node -> validateArray("model", node))
151145
.ifPresent(node -> modelDefs.addAll(parseModels(node)));
152-
153-
// Extract route-mode from pipeline if present
154-
JsonNode pipelineNode = pipelineDefJsonNode.get(PIPELINE_KEY);
155-
if (pipelineNode.has(ROUTE_MODE_KEY)) {
156-
routeMode = pipelineNode.get(ROUTE_MODE_KEY).asText();
157-
((ObjectNode) pipelineNode).remove(ROUTE_MODE_KEY);
158-
}
159146
}
160147

161148
// Pipeline configs are optional
@@ -202,14 +189,7 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
202189
pipelineConfig.addAll(userPipelineConfig);
203190

204191
return new PipelineDef(
205-
sourceDef,
206-
sinkDef,
207-
routeDefs,
208-
routeMode,
209-
transformDefs,
210-
udfDefs,
211-
modelDefs,
212-
pipelineConfig);
192+
sourceDef, sinkDef, routeDefs, transformDefs, udfDefs, modelDefs, pipelineConfig);
213193
}
214194

215195
private SourceDef toSourceDef(JsonNode sourceNode) {

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -725,13 +725,13 @@ void testParsingFullDefinitionFromString() throws Exception {
725725
"ods_db.ods_<>",
726726
"<>",
727727
"One-to-one mapping for other tables")),
728-
"first-match",
729728
Collections.emptyList(),
730729
Collections.emptyList(),
731730
Collections.emptyList(),
732731
Configuration.fromMap(
733732
ImmutableMap.<String, String>builder()
734733
.put("name", "mysql_to_doris_with_route_match_mode")
735734
.put("parallelism", "2")
735+
.put("route-mode", "first-match")
736736
.build()));
737737
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,22 @@ public class PipelineOptions {
7272
"EXCEPTION: Throw an exception to terminate the sync pipeline.")))
7373
.build());
7474

75+
public static final ConfigOption<String> PIPELINE_ROUTE_MODE =
76+
ConfigOptions.key("route-mode")
77+
.stringType()
78+
.defaultValue(RouteMode.ALL_MATCH.toString())
79+
.withDescription(
80+
Description.builder()
81+
.text("Match mode for routing rules. ")
82+
.linebreak()
83+
.add(
84+
ListElement.list(
85+
text(
86+
"all-match: Apply all matching route rules to a table."),
87+
text(
88+
"first-match: Apply only the first matching route rule and stop evaluation.")))
89+
.build());
90+
7591
public static final ConfigOption<String> PIPELINE_LOCAL_TIME_ZONE =
7692
ConfigOptions.key("local-time-zone")
7793
.stringType()
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.pipeline;
19+
20+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
22+
/** Route mode for routing rules. */
23+
@PublicEvolving
24+
public enum RouteMode {
25+
/** Match all applicable routing rules. */
26+
ALL_MATCH("all-match"),
27+
/** Match only the first applicable routing rule. */
28+
FIRST_MATCH("first-match");
29+
30+
private final String value;
31+
32+
RouteMode(String value) {
33+
this.value = value;
34+
}
35+
36+
@Override
37+
public String toString() {
38+
return value;
39+
}
40+
41+
/**
42+
* Parse RouteMode from string. Supports both underscore (ALL_MATCH) and hyphen (all-match)
43+
* formats, case-insensitive.
44+
*
45+
* @param value the string value to parse
46+
* @return the corresponding RouteMode
47+
* @throws IllegalArgumentException if the value cannot be parsed
48+
*/
49+
public static RouteMode fromString(String value) {
50+
if (value == null) {
51+
throw new IllegalArgumentException("RouteMode value cannot be null");
52+
}
53+
for (RouteMode mode : RouteMode.values()) {
54+
if (mode.value.equalsIgnoreCase(value)) {
55+
return mode;
56+
}
57+
}
58+
throw new IllegalArgumentException(
59+
String.format(
60+
"Invalid RouteMode value: '%s'. Expected one of: [all-match, first-match, ALL-MATCH, FIRST-MATCH]",
61+
value));
62+
}
63+
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/route/RouteRule.java

Lines changed: 0 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,43 +24,6 @@ public class RouteRule implements Serializable {
2424

2525
private static final long serialVersionUID = 1L;
2626

27-
/** Match mode for routing rules. */
28-
public enum MatchMode {
29-
/** Match all applicable routing rules. */
30-
ALL_MATCH("all-match"),
31-
/** Match only the first applicable routing rule. */
32-
FIRST_MATCH("first-match");
33-
34-
private final String configValue;
35-
36-
MatchMode(String configValue) {
37-
this.configValue = configValue;
38-
}
39-
40-
public String getConfigValue() {
41-
return configValue;
42-
}
43-
44-
/**
45-
* Parse match mode from configuration value.
46-
*
47-
* @param value the configuration value
48-
* @return the corresponding MatchMode
49-
* @throws IllegalArgumentException if the value is invalid
50-
*/
51-
public static MatchMode fromConfigValue(String value) {
52-
for (MatchMode mode : values()) {
53-
if (mode.configValue.equalsIgnoreCase(value)) {
54-
return mode;
55-
}
56-
}
57-
throw new IllegalArgumentException(
58-
String.format(
59-
"Invalid match-mode value: %s. Allowed values are: %s, %s",
60-
value, FIRST_MATCH.configValue, ALL_MATCH.configValue));
61-
}
62-
}
63-
6427
public RouteRule(String sourceTable, String sinkTable) {
6528
this(sourceTable, sinkTable, null);
6629
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/PipelineDef.java

Lines changed: 8 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919

2020
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
2121
import org.apache.flink.cdc.common.configuration.Configuration;
22+
import org.apache.flink.cdc.common.pipeline.RouteMode;
2223
import org.apache.flink.cdc.common.pipeline.RuntimeExecutionMode;
23-
import org.apache.flink.cdc.common.route.RouteRule;
2424
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
2525
import org.apache.flink.cdc.composer.PipelineComposer;
2626
import org.apache.flink.cdc.composer.PipelineExecution;
@@ -33,6 +33,7 @@
3333

3434
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_EXECUTION_RUNTIME_MODE;
3535
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_LOCAL_TIME_ZONE;
36+
import static org.apache.flink.cdc.common.pipeline.PipelineOptions.PIPELINE_ROUTE_MODE;
3637

3738
/**
3839
* Definition of a pipeline.
@@ -57,7 +58,6 @@ public class PipelineDef {
5758
private final SourceDef source;
5859
private final SinkDef sink;
5960
private final List<RouteDef> routes;
60-
private final String routeMode;
6161
private final List<TransformDef> transforms;
6262
private final List<UdfDef> udfs;
6363
private final List<ModelDef> models;
@@ -67,15 +67,13 @@ public PipelineDef(
6767
SourceDef source,
6868
SinkDef sink,
6969
List<RouteDef> routes,
70-
String routeMode,
7170
List<TransformDef> transforms,
7271
List<UdfDef> udfs,
7372
List<ModelDef> models,
7473
Configuration config) {
7574
this.source = source;
7675
this.sink = sink;
7776
this.routes = routes;
78-
this.routeMode = routeMode;
7977
this.transforms = transforms;
8078
this.udfs = udfs;
8179
this.models = models;
@@ -89,34 +87,7 @@ public PipelineDef(
8987
List<TransformDef> transforms,
9088
List<UdfDef> udfs,
9189
Configuration config) {
92-
this(
93-
source,
94-
sink,
95-
routes,
96-
RouteRule.MatchMode.ALL_MATCH.getConfigValue(),
97-
transforms,
98-
udfs,
99-
new ArrayList<>(),
100-
config);
101-
}
102-
103-
public PipelineDef(
104-
SourceDef source,
105-
SinkDef sink,
106-
List<RouteDef> routes,
107-
List<TransformDef> transforms,
108-
List<UdfDef> udfs,
109-
List<ModelDef> models,
110-
Configuration config) {
111-
this(
112-
source,
113-
sink,
114-
routes,
115-
RouteRule.MatchMode.ALL_MATCH.getConfigValue(),
116-
transforms,
117-
udfs,
118-
models,
119-
config);
90+
this(source, sink, routes, transforms, udfs, new ArrayList<>(), config);
12091
}
12192

12293
public SourceDef getSource() {
@@ -131,8 +102,9 @@ public List<RouteDef> getRoute() {
131102
return routes;
132103
}
133104

134-
public String getRouteMode() {
135-
return routeMode;
105+
public RouteMode getRouteMode() {
106+
String routeModeStr = config.get(PIPELINE_ROUTE_MODE);
107+
return RouteMode.fromString(routeModeStr);
136108
}
137109

138110
public List<TransformDef> getTransforms() {
@@ -161,7 +133,7 @@ public String toString() {
161133
+ ", routes="
162134
+ routes
163135
+ ", routeMode="
164-
+ routeMode
136+
+ getRouteMode()
165137
+ ", transforms="
166138
+ transforms
167139
+ ", udfs="
@@ -185,7 +157,6 @@ public boolean equals(Object o) {
185157
return Objects.equals(source, that.source)
186158
&& Objects.equals(sink, that.sink)
187159
&& Objects.equals(routes, that.routes)
188-
&& Objects.equals(routeMode, that.routeMode)
189160
&& Objects.equals(transforms, that.transforms)
190161
&& Objects.equals(udfs, that.udfs)
191162
&& Objects.equals(models, that.models)
@@ -194,7 +165,7 @@ public boolean equals(Object o) {
194165

195166
@Override
196167
public int hashCode() {
197-
return Objects.hash(source, sink, routes, routeMode, transforms, udfs, models, config);
168+
return Objects.hash(source, sink, routes, transforms, udfs, models, config);
198169
}
199170

200171
// ------------------------------------------------------------------------

0 commit comments

Comments
 (0)