Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flink_cdc_base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ jobs:

build_maven_parameter="${build_maven_parameter:+$build_maven_parameter }${{ inputs.custom-maven-parameter }}"

mvn --no-snapshot-updates -B -DskipTests -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules -DspecifiedParallelism=${{ inputs.parallelism }} -Duser.timezone=$jvm_timezone verify
mvn --no-snapshot-updates -B -DskipTests ${{ inputs.custom-maven-parameter }} -pl $compile_modules -am install && mvn --no-snapshot-updates -B $build_maven_parameter -pl $modules -DspecifiedParallelism=${{ inputs.parallelism }} -Duser.timezone=$jvm_timezone verify

- name: Print JVM thread dumps when cancelled
if: ${{ failure() }}
Expand Down
22 changes: 22 additions & 0 deletions .github/workflows/flink_cdc_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ jobs:
with:
java-versions: "[11]"
modules: "['core']"
common_2_x:
name: Common Unit Tests 2.x
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[11]"
flink-versions: "['2.2.0']"
custom-maven-parameter: "-Pflink2"
modules: "['core_2.x']"
pipeline-ut:
name: Pipeline Unit Tests
uses: ./.github/workflows/flink_cdc_base.yml
Expand All @@ -88,6 +96,20 @@ jobs:
flink-versions: "['1.19.3', '1.20.3']"
modules: "['pipeline_e2e']"
parallelism: ${{ matrix.parallelism }}
pipeline_e2e_2_x:
strategy:
max-parallel: 2
fail-fast: false
matrix:
parallelism: [ 1, 4 ]
name: Pipeline E2E Tests 2.x (${{ matrix.parallelism }}-Parallelism)
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[11]"
flink-versions: "['2.2.0']"
custom-maven-parameter: "-Pflink2"
modules: "['pipeline_e2e']"
parallelism: ${{ matrix.parallelism }}
source_e2e:
name: Source E2E Tests
uses: ./.github/workflows/flink_cdc_base.yml
Expand Down
24 changes: 24 additions & 0 deletions .github/workflows/flink_cdc_ci_nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ jobs:
with:
java-versions: "[17]"
modules: "['core']"
common_2_x:
if: github.repository == 'apache/flink-cdc'
name: Common Unit Tests 2.x
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[17]"
flink-versions: "['2.2.0']"
custom-maven-parameter: "-Pflink2"
modules: "['core_2.x']"
pipeline-ut:
if: github.repository == 'apache/flink-cdc'
name: Pipeline Unit Tests
Expand Down Expand Up @@ -82,6 +91,21 @@ jobs:
flink-versions: "['1.19.3', '1.20.3']"
modules: "['pipeline_e2e']"
parallelism: ${{ matrix.parallelism }}
pipeline_e2e_2_x:
if: github.repository == 'apache/flink-cdc'
strategy:
max-parallel: 2
fail-fast: false
matrix:
parallelism: [ 1, 4 ]
name: Pipeline E2E Tests 2.x (${{ matrix.parallelism }}-Parallelism)
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[17]"
custom-maven-parameter: "-Pflink2"
flink-versions: "['2.2.0']"
modules: "['pipeline_e2e_2.x']"
parallelism: ${{ matrix.parallelism }}
source_e2e:
if: github.repository == 'apache/flink-cdc'
name: Source E2E Tests
Expand Down
11 changes: 10 additions & 1 deletion .github/workflows/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values"
]

MODULES_CORE_2_X = [
"flink-cdc-cli",
"flink-cdc-common",
"flink-cdc-composer",
"flink-cdc-runtime",
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values"
]

MODULES_PIPELINE_CONNECTORS = [
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-doris",
"flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch",
Expand Down Expand Up @@ -142,6 +150,7 @@

ALL_MODULES = set(
MODULES_CORE +
MODULES_CORE_2_X +
MODULES_PIPELINE_CONNECTORS +
MODULES_MYSQL_SOURCE +
MODULES_MYSQL_PIPELINE +
Expand Down Expand Up @@ -173,7 +182,7 @@
compile_modules = set()

for module in INPUT_MODULES.split(', '):
module_list = set(globals()['MODULES_' + module.upper().replace('-', '_')])
module_list = set(globals()['MODULES_' + module.upper().replace('-', '_').replace('.', '_')])
test_modules |= module_list
if module == 'source_e2e' or module == 'pipeline_e2e':
compile_modules |= ALL_MODULES
Expand Down
29 changes: 29 additions & 0 deletions flink-cdc-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,33 @@ limitations under the License.

</dependencies>

<profiles>
<profile>
<id>flink2</id>
<properties>
<flink.version>${flink.2.x.version}</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-flink2-compat</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
</plugins>
</build>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.cdc.composer.PipelineExecution;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;

import org.apache.flink.shaded.guava31.com.google.common.base.Joiner;
Expand Down Expand Up @@ -173,7 +172,13 @@ private static SavepointRestoreSettings createSavepointRestoreSettings(
commandLine.getOptionValue(SAVEPOINT_CLAIM_MODE),
ConfigurationUtils.getClaimModeClass());
} else {
restoreMode = SavepointConfigOptions.RESTORE_MODE.defaultValue();
try {
restoreMode =
ConfigurationUtils.getClaimModeClass().getField("DEFAULT").get(null);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(
"Failed to get DEFAULT value from RestoreMode class.", e);
}
}
// allowNonRestoredState is always false because all operators are predefined.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,16 @@ private static Map<String, String> flattenConfigMap(

public static Class<?> getClaimModeClass() {
try {
return Class.forName("org.apache.flink.core.execution.RestoreMode");
} catch (ClassNotFoundException ignored) {
return Class.forName("org.apache.flink.core.execution.RecoveryClaimMode");
} catch (ClassNotFoundException classNotFoundException) {
try {
return Class.forName("org.apache.flink.runtime.jobgraph.RestoreMode");
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
return Class.forName("org.apache.flink.core.execution.RestoreMode");
} catch (ClassNotFoundException ignored) {
try {
return Class.forName("org.apache.flink.runtime.jobgraph.RestoreMode");
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ void testSavePointConfiguration() throws Exception {
"-n");
assertThat(executor.getFlinkConfig().get(SAVEPOINT_PATH))
.isEqualTo(flinkHome() + "/savepoints/savepoint-1");
assertThat(executor.getFlinkConfig().get(RESTORE_MODE)).isEqualTo(RestoreMode.NO_CLAIM);
assertThat(executor.getFlinkConfig().get(RESTORE_MODE).toString())
.isEqualTo(RestoreMode.NO_CLAIM.toString());
assertThat(executor.getFlinkConfig().get(SAVEPOINT_IGNORE_UNCLAIMED_STATE)).isTrue();
}

Expand Down
29 changes: 29 additions & 0 deletions flink-cdc-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,33 @@ limitations under the License.
</plugins>
</build>

<profiles>
<profile>
<id>flink2</id>
<properties>
<flink.version>${flink.2.x.version}</flink.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-flink2-compat</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>

</project>
43 changes: 43 additions & 0 deletions flink-cdc-composer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,47 @@ limitations under the License.
</dependency>
</dependencies>

<profiles>
<profile>
<id>flink2</id>
<properties>
<flink.version>${flink.2.x.version}</flink.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-flink2-compat</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>flink1</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-flink1-compat</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.composer.flink.translator;

import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand Down Expand Up @@ -87,5 +88,10 @@ public DataStream<Event> addPreWriteTopology(DataStream<Event> inputDataStream)
public SinkWriter<Event> createWriter(InitContext context) throws IOException {
return null;
}

@Override
public SinkWriter<Event> createWriter(WriterInitContext context) throws IOException {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public DistributedSourceFunction(int numOfTables, boolean distributedTables) {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
iotaCounter = 0;
subTaskId = getRuntimeContext().getIndexOfThisSubtask();
parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
subTaskId = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
parallelism = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
if (distributedTables) {
tables =
IntStream.range(0, numOfTables)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,39 @@ limitations under the License.
</plugins>
</build>

<profiles>
<profile>
<id>flink2</id>
<properties>
<flink.version>${flink.2.x.version}</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.2.x.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-flink2-compat</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
</plugin>
</plugins>
</build>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.ChangeEvent;
Expand All @@ -36,6 +37,7 @@
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.values.ValuesDatabase;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -101,8 +103,17 @@ public SinkWriter<Event> createWriter(InitContext context) {
return new ValuesSinkWriter(
materializedInMemory,
print,
context.getSubtaskId(),
context.getNumberOfParallelSubtasks());
context.getTaskInfo().getIndexOfThisSubtask(),
context.getTaskInfo().getNumberOfParallelSubtasks());
}

@Override
public SinkWriter<Event> createWriter(WriterInitContext context) throws IOException {
return new ValuesSinkWriter(
materializedInMemory,
print,
context.getTaskInfo().getIndexOfThisSubtask(),
context.getTaskInfo().getNumberOfParallelSubtasks());
}
}

Expand Down
Loading
Loading