Skip to content

Commit a814a86

Browse files
committed
Add support for Flink 2.x.
1 parent c155638 commit a814a86

85 files changed

Lines changed: 3655 additions & 435 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/flink_cdc_base.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ jobs:
108108
109109
build_maven_parameter="${build_maven_parameter:+$build_maven_parameter }${{ inputs.custom-maven-parameter }}"
110110
111-
mvn --no-snapshot-updates -B -DskipTests -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules -DspecifiedParallelism=${{ inputs.parallelism }} -Duser.timezone=$jvm_timezone verify
111+
mvn --no-snapshot-updates -B -DskipTests ${{ inputs.custom-maven-parameter }} -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules -DspecifiedParallelism=${{ inputs.parallelism }} -Duser.timezone=$jvm_timezone verify
112112
113113
- name: Print JVM thread dumps when cancelled
114114
if: ${{ failure() }}

.github/workflows/flink_cdc_ci.yml

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ jobs:
6363
with:
6464
java-versions: "[11]"
6565
modules: "['core']"
66+
common_2_x:
67+
name: Common Unit Tests 2.x
68+
uses: ./.github/workflows/flink_cdc_base.yml
69+
with:
70+
java-versions: "[11]"
71+
flink-versions: "['2.2.0']"
72+
custom-maven-parameter: "-Pflink2"
73+
modules: "['core_2.x']"
6674
pipeline-ut:
6775
name: Pipeline Unit Tests
6876
uses: ./.github/workflows/flink_cdc_base.yml
@@ -88,6 +96,20 @@ jobs:
8896
flink-versions: "['1.19.3', '1.20.3']"
8997
modules: "['pipeline_e2e']"
9098
parallelism: ${{ matrix.parallelism }}
99+
pipeline_e2e_2_x:
100+
strategy:
101+
max-parallel: 2
102+
fail-fast: false
103+
matrix:
104+
parallelism: [ 1, 4 ]
105+
name: Pipeline E2E Tests 2.x (${{ matrix.parallelism }}-Parallelism)
106+
uses: ./.github/workflows/flink_cdc_base.yml
107+
with:
108+
java-versions: "[11]"
109+
flink-versions: "['2.2.0']"
110+
custom-maven-parameter: "-Pflink2"
111+
modules: "['pipeline_e2e_2.x']"
112+
parallelism: ${{ matrix.parallelism }}
91113
source_e2e:
92114
name: Source E2E Tests
93115
uses: ./.github/workflows/flink_cdc_base.yml

.github/workflows/flink_cdc_ci_nightly.yml

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ jobs:
5454
with:
5555
java-versions: "[17]"
5656
modules: "['core']"
57+
common_2_x:
58+
if: github.repository == 'apache/flink-cdc'
59+
name: Common Unit Tests 2.x
60+
uses: ./.github/workflows/flink_cdc_base.yml
61+
with:
62+
java-versions: "[17]"
63+
flink-versions: "['2.2.0']"
64+
custom-maven-parameter: "-Pflink2"
65+
modules: "['core_2.x']"
5766
pipeline-ut:
5867
if: github.repository == 'apache/flink-cdc'
5968
name: Pipeline Unit Tests
@@ -82,6 +91,21 @@ jobs:
8291
flink-versions: "['1.19.3', '1.20.3']"
8392
modules: "['pipeline_e2e']"
8493
parallelism: ${{ matrix.parallelism }}
94+
pipeline_e2e_2_x:
95+
if: github.repository == 'apache/flink-cdc'
96+
strategy:
97+
max-parallel: 2
98+
fail-fast: false
99+
matrix:
100+
parallelism: [ 1, 4 ]
101+
name: Pipeline E2E Tests 2.x (${{ matrix.parallelism }}-Parallelism)
102+
uses: ./.github/workflows/flink_cdc_base.yml
103+
with:
104+
java-versions: "[17]"
105+
custom-maven-parameter: "-Pflink2"
106+
flink-versions: "['2.2.0']"
107+
modules: "['pipeline_e2e_2.x']"
108+
parallelism: ${{ matrix.parallelism }}
85109
source_e2e:
86110
if: github.repository == 'apache/flink-cdc'
87111
name: Source E2E Tests

.github/workflows/modules.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@
1717
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values"
1818
]
1919

20+
MODULES_CORE_2_X = [
21+
"flink-cdc-cli",
22+
"flink-cdc-common",
23+
"flink-cdc-composer",
24+
"flink-cdc-runtime",
25+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values",
26+
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values-2.x"
27+
]
28+
2029
MODULES_PIPELINE_CONNECTORS = [
2130
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris",
2231
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch",
@@ -136,12 +145,17 @@
136145
"flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests"
137146
]
138147

148+
MODULES_PIPELINE_E2E_2_X = [
149+
"flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x"
150+
]
151+
139152
MODULES_SOURCE_E2E = [
140153
"flink-cdc-e2e-tests/flink-cdc-source-e2e-tests"
141154
]
142155

143156
ALL_MODULES = set(
144157
MODULES_CORE +
158+
MODULES_CORE_2_X +
145159
MODULES_PIPELINE_CONNECTORS +
146160
MODULES_MYSQL_SOURCE +
147161
MODULES_MYSQL_PIPELINE +
@@ -169,13 +183,18 @@
169183
MODULES_SOURCE_E2E
170184
)
171185

186+
# Modules that require the flink2 Maven profile to be activated
187+
ALL_MODULES_FLINK2 = ALL_MODULES | set(MODULES_PIPELINE_E2E_2_X)
188+
172189
test_modules = set()
173190
compile_modules = set()
174191

175192
for module in INPUT_MODULES.split(', '):
176-
module_list = set(globals()['MODULES_' + module.upper().replace('-', '_')])
193+
module_list = set(globals()['MODULES_' + module.upper().replace('-', '_').replace('.', '_')])
177194
test_modules |= module_list
178-
if module == 'source_e2e' or module == 'pipeline_e2e':
195+
if module == 'pipeline_e2e_2.x':
196+
compile_modules |= ALL_MODULES_FLINK2
197+
elif module == 'source_e2e' or module == 'pipeline_e2e':
179198
compile_modules |= ALL_MODULES
180199
else:
181200
compile_modules |= module_list

flink-cdc-cli/pom.xml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,30 @@ limitations under the License.
8282

8383
</dependencies>
8484

85+
<profiles>
86+
<profile>
87+
<id>flink2</id>
88+
<properties>
89+
<flink.version>${flink.2.x.version}</flink.version>
90+
</properties>
91+
<build>
92+
<plugins>
93+
<plugin>
94+
<groupId>org.apache.maven.plugins</groupId>
95+
<artifactId>maven-dependency-plugin</artifactId>
96+
</plugin>
97+
<plugin>
98+
<groupId>org.apache.maven.plugins</groupId>
99+
<artifactId>maven-surefire-plugin</artifactId>
100+
<configuration>
101+
<additionalClasspathElements>
102+
<additionalClasspathElement>${project.build.directory}/flink2-extra-libs/flink-shaded-guava-${flink.2.x.shaded.guava.version}.jar</additionalClasspathElement>
103+
</additionalClasspathElements>
104+
</configuration>
105+
</plugin>
106+
</plugins>
107+
</build>
108+
</profile>
109+
</profiles>
110+
85111
</project>

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.flink.cdc.composer.PipelineExecution;
2828
import org.apache.flink.configuration.DeploymentOptions;
2929
import org.apache.flink.core.fs.Path;
30-
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
3130
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
3231

3332
import org.apache.flink.shaded.guava31.com.google.common.base.Joiner;
@@ -173,7 +172,12 @@ private static SavepointRestoreSettings createSavepointRestoreSettings(
173172
commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE),
174173
ConfigurationUtils.getClaimModeClass());
175174
} else {
176-
restoreMode = SavepointConfigOptions.RESTORE_MODE.defaultValue();
175+
try {
176+
restoreMode =
177+
ConfigurationUtils.getClaimModeClass().getField("DEFAULT").get(null);
178+
} catch (NoSuchFieldException | IllegalAccessException e) {
179+
throw new RuntimeException("Failed to get default restore mode.", e);
180+
}
177181
}
178182
// allowNonRestoredState is always false because all operators are predefined.
179183

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,11 @@ public static Class<?> getClaimModeClass() {
7272
try {
7373
return Class.forName("org.apache.flink.runtime.jobgraph.RestoreMode");
7474
} catch (ClassNotFoundException e) {
75-
throw new RuntimeException(e);
75+
try {
76+
return Class.forName("org.apache.flink.core.execution.RecoveryClaimMode");
77+
} catch (ClassNotFoundException classNotFoundException) {
78+
throw new RuntimeException(classNotFoundException);
79+
}
7680
}
7781
}
7882
}

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
package org.apache.flink.cdc.cli;
1919

20+
import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
2021
import org.apache.flink.cdc.composer.PipelineComposer;
2122
import org.apache.flink.cdc.composer.PipelineExecution;
2223
import org.apache.flink.cdc.composer.definition.PipelineDef;
23-
import org.apache.flink.core.execution.RestoreMode;
2424
import org.apache.flink.core.fs.Path;
2525

2626
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
@@ -107,7 +107,8 @@ void testSavePointConfiguration() throws Exception {
107107
"-n");
108108
assertThat(executor.getFlinkConfig().get(SAVEPOINT_PATH))
109109
.isEqualTo(flinkHome() + "/savepoints/savepoint-1");
110-
assertThat(executor.getFlinkConfig().get(RESTORE_MODE)).isEqualTo(RestoreMode.NO_CLAIM);
110+
assertThat(executor.getFlinkConfig().get(RESTORE_MODE))
111+
.isEqualTo(getRestoreModeEnum("NO_CLAIM"));
111112
assertThat(executor.getFlinkConfig().get(SAVEPOINT_IGNORE_UNCLAIMED_STATE)).isTrue();
112113
}
113114

@@ -225,6 +226,10 @@ private CliExecutor createExecutor(String... args) throws Exception {
225226
return CliFrontend.createExecutor(parser.parse(cliOptions, args));
226227
}
227228

229+
private Object getRestoreModeEnum(String name) throws Exception {
230+
return Enum.valueOf((Class<Enum>) ConfigurationUtils.getClaimModeClass(), name);
231+
}
232+
228233
private String pipelineDef() throws Exception {
229234
URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml");
230235
return Paths.get(resource.toURI()).toString();

flink-cdc-common/pom.xml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,42 @@ limitations under the License.
4444
</plugins>
4545
</build>
4646

47+
<profiles>
48+
<profile>
49+
<id>flink2</id>
50+
<properties>
51+
<flink.version>${flink.2.x.version}</flink.version>
52+
</properties>
53+
<build>
54+
<plugins>
55+
<plugin>
56+
<groupId>org.apache.maven.plugins</groupId>
57+
<artifactId>maven-compiler-plugin</artifactId>
58+
<configuration>
59+
<excludes>
60+
<!-- SinkFunction is moved to legacy package in Flink 2.x -->
61+
<exclude>org/apache/flink/cdc/common/sink/FlinkSinkFunctionProvider.java</exclude>
62+
<!-- SourceFunction is moved to legacy package in Flink 2.x -->
63+
<exclude>org/apache/flink/cdc/common/source/FlinkSourceFunctionProvider.java</exclude>
64+
</excludes>
65+
</configuration>
66+
</plugin>
67+
<plugin>
68+
<groupId>org.apache.maven.plugins</groupId>
69+
<artifactId>maven-dependency-plugin</artifactId>
70+
</plugin>
71+
<plugin>
72+
<groupId>org.apache.maven.plugins</groupId>
73+
<artifactId>maven-surefire-plugin</artifactId>
74+
<configuration>
75+
<additionalClasspathElements>
76+
<additionalClasspathElement>${project.build.directory}/flink2-extra-libs/flink-shaded-guava-${flink.2.x.shaded.guava.version}.jar</additionalClasspathElement>
77+
</additionalClasspathElements>
78+
</configuration>
79+
</plugin>
80+
</plugins>
81+
</build>
82+
</profile>
83+
</profiles>
84+
4785
</project>
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.exceptions;
19+
20+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
22+
import javax.annotation.Nullable;
23+
24+
/**
25+
* Exception for validation errors (e.g. invalid options, unsupported options). Defined in
26+
* flink-cdc-common to avoid runtime dependency on Flink table API (e.g. {@code
27+
* org.apache.flink.table.api.ValidationException}), which may not be on the classpath when running
28+
* with different Flink versions.
29+
*/
30+
@PublicEvolving
31+
public class ValidationException extends RuntimeException {
32+
33+
public ValidationException(String message) {
34+
super(message);
35+
}
36+
37+
public ValidationException(String message, @Nullable Throwable cause) {
38+
super(message, cause);
39+
}
40+
}

0 commit comments

Comments
 (0)