Skip to content

Commit 375a622

Browse files
committed
Add support for Flink 2.x.
1 parent c155638 commit 375a622

54 files changed

Lines changed: 2658 additions & 102 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/flink_cdc_ci.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,19 @@ jobs:
8888
flink-versions: "['1.19.3', '1.20.3']"
8989
modules: "['pipeline_e2e']"
9090
parallelism: ${{ matrix.parallelism }}
91+
pipeline_e2e_2_x:
92+
strategy:
93+
max-parallel: 2
94+
fail-fast: false
95+
matrix:
96+
parallelism: [ 1, 4 ]
97+
name: Pipeline E2E Tests 2.x (${{ matrix.parallelism }}-Parallelism)
98+
uses: ./.github/workflows/flink_cdc_base.yml
99+
with:
100+
java-versions: "[11]"
101+
flink-versions: "['2.2.0']"
102+
modules: "['pipeline_e2e_2.x']"
103+
parallelism: ${{ matrix.parallelism }}
91104
source_e2e:
92105
name: Source E2E Tests
93106
uses: ./.github/workflows/flink_cdc_base.yml

.github/workflows/flink_cdc_ci_nightly.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,20 @@ jobs:
8282
flink-versions: "['1.19.3', '1.20.3']"
8383
modules: "['pipeline_e2e']"
8484
parallelism: ${{ matrix.parallelism }}
85+
pipeline_e2e_2_x:
86+
if: github.repository == 'apache/flink-cdc'
87+
strategy:
88+
max-parallel: 2
89+
fail-fast: false
90+
matrix:
91+
parallelism: [ 1, 4 ]
92+
name: Pipeline E2E Tests 2.x (${{ matrix.parallelism }}-Parallelism)
93+
uses: ./.github/workflows/flink_cdc_base.yml
94+
with:
95+
java-versions: "[17]"
96+
flink-versions: "['2.2.0']"
97+
modules: "['pipeline_e2e_2.x']"
98+
parallelism: ${{ matrix.parallelism }}
8599
source_e2e:
86100
if: github.repository == 'apache/flink-cdc'
87101
name: Source E2E Tests

.github/workflows/modules.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,10 @@
136136
"flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests"
137137
]
138138

139+
MODULES_PIPELINE_E2E_2_X = [
140+
"flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests-2.x"
141+
]
142+
139143
MODULES_SOURCE_E2E = [
140144
"flink-cdc-e2e-tests/flink-cdc-source-e2e-tests"
141145
]
@@ -166,16 +170,17 @@
166170
MODULES_FLUSS +
167171
MODULES_HUDI +
168172
MODULES_PIPELINE_E2E +
173+
MODULES_PIPELINE_E2E_2_X +
169174
MODULES_SOURCE_E2E
170175
)
171176

172177
test_modules = set()
173178
compile_modules = set()
174179

175180
for module in INPUT_MODULES.split(', '):
176-
module_list = set(globals()['MODULES_' + module.upper().replace('-', '_')])
181+
module_list = set(globals()['MODULES_' + module.upper().replace('-', '_').replace('.', '_')])
177182
test_modules |= module_list
178-
if module == 'source_e2e' or module == 'pipeline_e2e':
183+
if module == 'source_e2e' or module == 'pipeline_e2e' or module == 'pipeline_e2e_2.x':
179184
compile_modules |= ALL_MODULES
180185
else:
181186
compile_modules |= module_list

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/CliFrontend.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.flink.cdc.composer.PipelineExecution;
2828
import org.apache.flink.configuration.DeploymentOptions;
2929
import org.apache.flink.core.fs.Path;
30-
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
3130
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
3231

3332
import org.apache.flink.shaded.guava31.com.google.common.base.Joiner;
@@ -173,7 +172,12 @@ private static SavepointRestoreSettings createSavepointRestoreSettings(
173172
commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE),
174173
ConfigurationUtils.getClaimModeClass());
175174
} else {
176-
restoreMode = SavepointConfigOptions.RESTORE_MODE.defaultValue();
175+
try {
176+
restoreMode =
177+
ConfigurationUtils.getClaimModeClass().getField("DEFAULT").get(null);
178+
} catch (NoSuchFieldException | IllegalAccessException e) {
179+
throw new RuntimeException("Failed to get default restore mode.", e);
180+
}
177181
}
178182
// allowNonRestoredState is always false because all operators are predefined.
179183

flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/utils/ConfigurationUtils.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,11 @@ public static Class<?> getClaimModeClass() {
7272
try {
7373
return Class.forName("org.apache.flink.runtime.jobgraph.RestoreMode");
7474
} catch (ClassNotFoundException e) {
75-
throw new RuntimeException(e);
75+
try {
76+
return Class.forName("org.apache.flink.core.execution.RecoveryClaimMode");
77+
} catch (ClassNotFoundException classNotFoundException) {
78+
throw new RuntimeException(classNotFoundException);
79+
}
7680
}
7781
}
7882
}

flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/CliFrontendTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717

1818
package org.apache.flink.cdc.cli;
1919

20+
import org.apache.flink.cdc.cli.utils.ConfigurationUtils;
2021
import org.apache.flink.cdc.composer.PipelineComposer;
2122
import org.apache.flink.cdc.composer.PipelineExecution;
2223
import org.apache.flink.cdc.composer.definition.PipelineDef;
23-
import org.apache.flink.core.execution.RestoreMode;
2424
import org.apache.flink.core.fs.Path;
2525

2626
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
@@ -107,7 +107,8 @@ void testSavePointConfiguration() throws Exception {
107107
"-n");
108108
assertThat(executor.getFlinkConfig().get(SAVEPOINT_PATH))
109109
.isEqualTo(flinkHome() + "/savepoints/savepoint-1");
110-
assertThat(executor.getFlinkConfig().get(RESTORE_MODE)).isEqualTo(RestoreMode.NO_CLAIM);
110+
assertThat(executor.getFlinkConfig().get(RESTORE_MODE))
111+
.isEqualTo(getRestoreModeEnum("NO_CLAIM"));
111112
assertThat(executor.getFlinkConfig().get(SAVEPOINT_IGNORE_UNCLAIMED_STATE)).isTrue();
112113
}
113114

@@ -225,6 +226,10 @@ private CliExecutor createExecutor(String... args) throws Exception {
225226
return CliFrontend.createExecutor(parser.parse(cliOptions, args));
226227
}
227228

229+
private Object getRestoreModeEnum(String name) throws Exception {
230+
return Enum.valueOf((Class<Enum>) ConfigurationUtils.getClaimModeClass(), name);
231+
}
232+
228233
private String pipelineDef() throws Exception {
229234
URL resource = Resources.getResource("definitions/pipeline-definition-full.yaml");
230235
return Paths.get(resource.toURI()).toString();

flink-cdc-common/pom.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,29 @@ limitations under the License.
4444
</plugins>
4545
</build>
4646

47+
<profiles>
48+
<profile>
49+
<id>flink2</id>
50+
<properties>
51+
<flink.version>${flink.2.x.version}</flink.version>
52+
</properties>
53+
<build>
54+
<plugins>
55+
<plugin>
56+
<groupId>org.apache.maven.plugins</groupId>
57+
<artifactId>maven-compiler-plugin</artifactId>
58+
<configuration>
59+
<excludes>
60+
<!-- SinkFunction is moved to legacy package in Flink 2.x -->
61+
<exclude>org/apache/flink/cdc/common/sink/FlinkSinkFunctionProvider.java</exclude>
62+
<!-- SourceFunction is moved to legacy package in Flink 2.x -->
63+
<exclude>org/apache/flink/cdc/common/source/FlinkSourceFunctionProvider.java</exclude>
64+
</excludes>
65+
</configuration>
66+
</plugin>
67+
</plugins>
68+
</build>
69+
</profile>
70+
</profiles>
71+
4772
</project>
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.common.exceptions;
19+
20+
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
22+
import javax.annotation.Nullable;
23+
24+
/**
25+
* Exception for validation errors (e.g. invalid options, unsupported options). Defined in
26+
* flink-cdc-common to avoid runtime dependency on Flink table API (e.g. {@code
27+
* org.apache.flink.table.api.ValidationException}), which may not be on the classpath when running
28+
* with different Flink versions.
29+
*/
30+
@PublicEvolving
31+
public class ValidationException extends RuntimeException {
32+
33+
public ValidationException(String message) {
34+
super(message);
35+
}
36+
37+
public ValidationException(String message, @Nullable Throwable cause) {
38+
super(message, cause);
39+
}
40+
}

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/factories/FactoryHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import org.apache.flink.cdc.common.configuration.ConfigOption;
2222
import org.apache.flink.cdc.common.configuration.Configuration;
2323
import org.apache.flink.cdc.common.configuration.FallbackKey;
24+
import org.apache.flink.cdc.common.exceptions.ValidationException;
2425
import org.apache.flink.cdc.common.utils.Preconditions;
2526
import org.apache.flink.configuration.ReadableConfig;
26-
import org.apache.flink.table.api.ValidationException;
2727

2828
import java.util.Arrays;
2929
import java.util.HashMap;

flink-cdc-common/src/main/java/org/apache/flink/cdc/common/sink/EventSinkProvider.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.flink.api.connector.sink2.Sink;
2121
import org.apache.flink.cdc.common.annotation.PublicEvolving;
22-
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
2322

2423
/**
2524
* A marker interface used to provide an event sink for writing change events to external systems.

0 commit comments

Comments
 (0)