Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flink_cdc_base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ on:
description: "Jdk versions to test against."
required: false
type: string
default: "['8']"
default: "['11']"
flink-versions:
description: "Flink versions to test against."
required: false
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/flink_cdc_ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
- name: Set JDK
uses: actions/setup-java@v4
with:
java-version: '8'
java-version: '11'
distribution: 'temurin'
cache: 'maven'
- name: Compiling jar packages
Expand All @@ -61,19 +61,19 @@ jobs:
name: Common Unit Tests
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[8]"
java-versions: "[11]"
modules: "['core']"
pipeline-ut:
name: Pipeline Unit Tests
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[8]"
java-versions: "[11]"
modules: "['mysql-pipeline', 'postgres-pipeline', 'oceanbase-pipeline', 'doris', 'elasticsearch', 'iceberg', 'kafka', 'maxcompute', 'paimon', 'starrocks', 'fluss', 'hudi']"
source-ut:
name: Source Unit Tests
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[8]"
java-versions: "[11]"
modules: "['mysql-source', 'postgres-source', 'oracle', 'mongodb7', 'mongodb8', 'sqlserver', 'tidb', 'oceanbase-source', 'db2', 'vitess']"
pipeline_e2e:
strategy:
Expand All @@ -83,14 +83,14 @@ jobs:
name: Pipeline E2E Tests (${{ matrix.parallelism }}-Parallelism)
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[8]"
java-versions: "[11]"
flink-versions: "['1.19.3', '1.20.3']"
modules: "['pipeline_e2e']"
parallelism: ${{ matrix.parallelism }}
source_e2e:
name: Source E2E Tests
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[8]"
java-versions: "[11]"
flink-versions: "['1.19.3', '1.20.3']"
modules: "['source_e2e']"
12 changes: 6 additions & 6 deletions .github/workflows/flink_cdc_ci_nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
- name: Set JDK
uses: actions/setup-java@v4
with:
java-version: '11'
java-version: '17'
distribution: 'temurin'
cache: 'maven'
- name: Compiling jar packages
Expand All @@ -52,21 +52,21 @@ jobs:
name: Common Unit Tests
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[11]"
java-versions: "[17]"
modules: "['core']"
pipeline-ut:
if: github.repository == 'apache/flink-cdc'
name: Pipeline Unit Tests
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[11]"
java-versions: "[17]"
modules: "['mysql-pipeline', 'postgres-pipeline', 'oceanbase-pipeline', 'doris', 'elasticsearch', 'iceberg', 'kafka', 'maxcompute', 'paimon', 'starrocks', 'fluss', 'hudi']"
source-ut:
if: github.repository == 'apache/flink-cdc'
name: Source Unit Tests
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[11]"
java-versions: "[17]"
modules: "['mysql-source', 'postgres-source', 'oracle', 'mongodb7', 'mongodb8', 'sqlserver', 'tidb', 'oceanbase-source', 'db2', 'vitess']"
pipeline_e2e:
if: github.repository == 'apache/flink-cdc'
Expand All @@ -77,7 +77,7 @@ jobs:
name: Pipeline E2E Tests (${{ matrix.parallelism }}-Parallelism)
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[11]"
java-versions: "[17]"
flink-versions: "['1.19.3', '1.20.3']"
modules: "['pipeline_e2e']"
parallelism: ${{ matrix.parallelism }}
Expand All @@ -86,6 +86,6 @@ jobs:
name: Source E2E Tests
uses: ./.github/workflows/flink_cdc_base.yml
with:
java-versions: "[11]"
java-versions: "[17]"
flink-versions: "['1.19.3', '1.20.3']"
modules: "['source_e2e']"
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,6 @@ limitations under the License.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,10 @@ private List<String> fetchTableContent(Configuration conf, String basePath) thro
.orElse(null);

List<FileSlice> fileSlices =
table.getSliceView().getAllLatestFileSlicesBeforeOrOn(latestInstant).values()
table
.getSliceView()
.getAllLatestFileSlicesBeforeOrOn(latestInstant)
.values()
.stream()
.flatMap(s -> s)
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ public byte[] serialize(Event event) {
reuseGenericRowData.setField(
5,
new GenericArrayData(
jsonSerializers.get(dataChangeEvent.tableId()).getSchema().primaryKeys()
jsonSerializers
.get(dataChangeEvent.tableId())
.getSchema()
.primaryKeys()
.stream()
.map(StringData::fromString)
.toArray()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ limitations under the License.
<paimon.version>1.3.1</paimon.version>
<hadoop.version>2.8.5</hadoop.version>
<hive.version>2.3.9</hive.version>
<mockito.version>3.4.6</mockito.version>
<mockito.version>3.12.4</mockito.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,17 @@ public PaimonWriter(
public Collection<MultiTableCommittable> prepareCommit() {
long startTime = System.currentTimeMillis();
List<MultiTableCommittable> committables =
writes.entrySet()
.parallelStream()
writes.entrySet().parallelStream()
.flatMap(
entry -> {
try {
// here we set it to lastCheckpointId+1 to
// avoid prepareCommit the same checkpointId with the first
// round.
return entry.getValue()
.prepareCommit(false, lastCheckpointId + 1).stream()
return entry
.getValue()
.prepareCommit(false, lastCheckpointId + 1)
.stream()
.map(
committable ->
MultiTableCommittable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,8 @@ public Table getTableSchemaFromChangeTable(Db2ChangeTable changeTable) throws SQ
// final List<Column> columns = columnEditors.subList(CHANGE_TABLE_DATA_COLUMN_OFFSET,
// columnEditors.size() - 1).stream()
final List<Column> columns =
columnEditors.subList(CHANGE_TABLE_DATA_COLUMN_OFFSET, columnEditors.size())
columnEditors
.subList(CHANGE_TABLE_DATA_COLUMN_OFFSET, columnEditors.size())
.stream()
.map(
c ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ private int getParallelism() {
"parallelism.default: 4",
"execution.checkpointing.interval: 300",
"state.backend.type: hashmap",
"env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false",
"env.java.default-opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED --add-opens=java.base/jdk.internal.loader=ALL-UNNAMED --add-opens=java.base/java.security=ALL-UNNAMED --add-exports=java.base/sun.net.www=ALL-UNNAMED -Doracle.jdbc.timezoneAsRegion=false",
"execution.checkpointing.savepoint-dir: file:///opt/flink",
"restart-strategy.type: off",
"pekko.ask.timeout: 60s",
// Set off-heap memory explicitly to avoid "java.lang.OutOfMemoryError: Direct
// buffer memory" error.
"taskmanager.memory.task.off-heap.size: 128mb",
Expand Down Expand Up @@ -403,7 +404,10 @@ public void waitUntilJobState(Duration timeout, JobStatus expectedStatus) {
}

protected String getFlinkDockerImageTag() {
return String.format("flink:%s-scala_2.12", flinkVersion);
if (System.getProperty("java.specification.version").equals("17")) {
return String.format("flink:%s-scala_2.12-java17", flinkVersion);
}
return String.format("flink:%s-scala_2.12-java11", flinkVersion);
}

private ExecResult executeAndCheck(GenericContainer<?> container, String... command) {
Expand Down
2 changes: 1 addition & 1 deletion flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ limitations under the License.
<executions>
<execution>
<id>copy-jars</id>
<phase>process-resources</phase>
<phase>package</phase>
<goals>
<goal>copy</goal>
</goals>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,10 @@ private String copyAndGetContainerPath(GenericContainer<?> container, String fil
}

private String getFlinkDockerImageTag() {
return String.format("flink:%s-scala_2.12", flinkVersion);
if (System.getProperty("java.specification.version").equals("17")) {
return String.format("flink:%s-scala_2.12-java17", flinkVersion);
}
return String.format("flink:%s-scala_2.12-java11", flinkVersion);
}

protected String getJdbcConnectorResourceName() {
Expand Down
105 changes: 95 additions & 10 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ limitations under the License.
<version.awaitility>4.2.0</version.awaitility>
<slf4j.version>1.7.36</slf4j.version>
<log4j.version>2.17.1</log4j.version>
<spotless.version>2.4.2</spotless.version>
<spotless.version>2.43.0</spotless.version>
<commons-lang3.version>3.12.0</commons-lang3.version>
<json-path.version>2.7.0</json-path.version>
<jackson.version>2.13.2</jackson.version>
Expand All @@ -98,6 +98,14 @@ limitations under the License.
<iceberg.version>1.6.1</iceberg.version>
<hive.version>2.3.9</hive.version>
<hadoop.version>3.3.4</hadoop.version>
<java.version>11</java.version>
<source.java.version>11</source.java.version>
<target.java.version>11</target.java.version>
<!-- Overwrite default values from parent pom.
IntelliJ IDEA is (sometimes?) using those values to choose target language level
and thus is changing back to java 1.6 on each maven re-import -->
<maven.compiler.source>${source.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -465,7 +473,7 @@ limitations under the License.
<configuration>
<java>
<googleJavaFormat>
<version>1.7</version>
<version>1.8</version>
<style>AOSP</style>
</googleJavaFormat>

Expand Down Expand Up @@ -561,6 +569,15 @@ limitations under the License.
does not understand some time zone which used in Ubuntu OS -->
<argLine>-Xms256m -Xmx2048m -Dmvn.forkNumber=${surefire.forkNumber}
-XX:+UseG1GC -Doracle.jdbc.timezoneAsRegion=false
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.sql/java.sql=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens java.base/java.net=ALL-UNNAMED
--add-exports=java.base/sun.nio.ch=ALL-UNNAMED
</argLine>
</configuration>
</plugin>
Expand Down Expand Up @@ -674,8 +691,8 @@ limitations under the License.
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${maven.compiler.source}</source>
<target>${maven.compiler.target}</target>
<source>${source.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>

Expand All @@ -688,6 +705,29 @@ limitations under the License.
<generateBackupPoms>false</generateBackupPoms>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
<executions>
<execution>
<id>enforce-maven</id>
<goals>
<goal>enforce</goal>
</goals>
<configuration>
<rules>
<requireMavenVersion>
<!-- enforce at least mvn version 3.1.1 (see FLINK-12447) -->
<version>[3.1.1,)</version>
</requireMavenVersion>
<requireJavaVersion>
<version>${source.java.version}</version>
</requireJavaVersion>
</rules>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down Expand Up @@ -774,26 +814,71 @@ limitations under the License.
</build>
</profile>
<profile>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we're running tests against Java 17 in nightly builds, shall we create a java-17-target profile and activate it with [17,)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

<id>java-8-target</id>
<id>java-11-target</id>
<activation>
<jdk>[1.8,11)</jdk>
<jdk>[11,17)</jdk>
</activation>
<properties>
<java.version>1.8</java.version>
<java.version>11</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
<compilerArgs combine.children="append">
<arg>--add-exports=java.base/sun.net.util=ALL-UNNAMED</arg>
<arg>--add-exports=java.management/sun.management=ALL-UNNAMED</arg>
<arg>--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED</arg>
<arg>--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>java-11-target</id>
<id>java-17-target</id>
<activation>
<jdk>[11,)</jdk>
<jdk>[17,)</jdk>
</activation>
<properties>
<java.version>11</java.version>
<java.version>17</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>17</source>
<target>17</target>
<compilerArgs combine.children="append">
<arg>--add-exports=java.base/sun.net.util=ALL-UNNAMED</arg>
<arg>--add-exports=java.management/sun.management=ALL-UNNAMED</arg>
<arg>--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED</arg>
<arg>--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED</arg>
<arg>--add-exports=java.base/java.util=ALL-UNNAMED</arg>
<arg>--add-opens</arg>
<arg>java.base/java.lang=ALL-UNNAMED</arg>
<arg>--add-opens</arg>
<arg>java.base/java.lang.reflect=ALL-UNNAMED</arg>
<arg>--add-opens</arg>
<arg>java.base/java.util.concurrent=ALL-UNNAMED</arg>
<arg>--add-opens</arg>
<arg>java.base/java.util=ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Loading