From 0a5a239c15ea4878153559d8363a68e4bf7b9360 Mon Sep 17 00:00:00 2001 From: Pei Yu <125331682@qq.com> Date: Wed, 29 Apr 2026 01:22:27 +0800 Subject: [PATCH] upgrade paimon to 1.4.1 Signed-off-by: Pei Yu <125331682@qq.com> --- .../pom.xml | 2 +- .../paimon/sink/v2/PreCommitOperator.java | 3 +-- .../paimon/sink/v2/StoreSinkWriteImpl.java | 14 ++++------- .../paimon/sink/v2/PaimonSinkITCase.java | 3 +-- ...tMultiTableCommittableChannelComputer.java | 24 +++++++++---------- .../flink-cdc-pipeline-e2e-tests/pom.xml | 2 +- 6 files changed, 20 insertions(+), 28 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml index 5e15835a8b4..5eb925da130 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml @@ -29,7 +29,7 @@ limitations under the License. flink-cdc-pipeline-connector-paimon - 1.3.1 + 1.4.1 2.8.5 2.3.9 3.12.4 diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java index 8758bcbcbc0..c3fefd3a227 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java @@ -115,8 +115,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) { multiTableCommittable.getDatabase(), multiTableCommittable.getTable(), checkpointId, - multiTableCommittable.kind(), - multiTableCommittable.wrappedCommittable())); + multiTableCommittable.commitMessage())); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java index bba9e90cb4e..c709253cc13 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java @@ -130,13 +130,13 @@ private TableWriteImpl newTableWrite(FileStoreTable table) { if (memoryPoolFactory != null) { return tableWrite.withMemoryPoolFactory(memoryPoolFactory); } else { - return (TableWriteImpl) - tableWrite.withMemoryPool( + return tableWrite.withMemoryPoolFactory( + new MemoryPoolFactory( memoryPool != null ? memoryPool : new HeapMemorySegmentPool( table.coreOptions().writeBufferSize(), - table.coreOptions().pageSize())); + table.coreOptions().pageSize()))); } } @@ -159,11 +159,6 @@ public SinkRecord write(InternalRow internalRow, int i) throws Exception { return write.writeAndReturn(internalRow, i); } - @Override - public SinkRecord toLogRecord(SinkRecord record) { - return write.toLogRecord(record); - } - @Override public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception { write.compact(partition, bucket, fullCompaction); @@ -191,8 +186,7 @@ public List prepareCommit(boolean waitCompaction, long checkpointId try { for (CommitMessage committable : write.prepareCommit(this.waitCompaction || waitCompaction, checkpointId)) { - committables.add( - new Committable(checkpointId, Committable.Kind.FILE, committable)); + committables.add(new Committable(checkpointId, committable)); } } catch (Exception e) { throw new IOException(e); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index 53764cf5a30..381c87b249c 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -1044,8 +1044,7 @@ private MultiTableCommittable correctCheckpointId(MultiTableCommittable committa committable.getDatabase(), committable.getTable(), checkpointId++, - committable.kind(), - committable.wrappedCommittable()); + committable.commitMessage()); } private static class MockCommitRequestImpl extends CommitRequestImpl { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java index e065d13a843..ca54cbc51c9 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java @@ -39,18 +39,18 @@ void testChannel() { computer.setup(4); List commits = Arrays.asList( - new MultiTableCommittable("database", "table1", 1L, null, null), - new MultiTableCommittable("database", "table2", 1L, null, null), - new MultiTableCommittable("database", "table1", 1L, null, null), - new MultiTableCommittable("database", "table5", 1L, null, null), - new MultiTableCommittable("database", "table3", 1L, null, null), - new MultiTableCommittable("database", "table8", 1L, null, null), - new MultiTableCommittable("database", "table5", 1L, null, null), - new MultiTableCommittable("database", "table1", 1L, null, null), - new MultiTableCommittable("database", "table9", 1L, null, null), - new MultiTableCommittable("database", "table5", 1L, null, null), - new MultiTableCommittable("database", "table3", 1L, null, null), - new MultiTableCommittable("database", "table8", 1L, null, null)); + new MultiTableCommittable("database", "table1", 1L, null), + new MultiTableCommittable("database", "table2", 1L, null), + new MultiTableCommittable("database", "table1", 1L, null), + new MultiTableCommittable("database", "table5", 1L, null), + new MultiTableCommittable("database", "table3", 1L, null), + new MultiTableCommittable("database", "table8", 1L, null), + new MultiTableCommittable("database", "table5", 1L, null), + new MultiTableCommittable("database", "table1", 1L, null), + new MultiTableCommittable("database", "table9", 1L, null), + new MultiTableCommittable("database", "table5", 1L, null), + new MultiTableCommittable("database", "table3", 1L, null), + new MultiTableCommittable("database", "table8", 1L, null)); Map> map = new HashMap<>(); commits.forEach( (commit) -> { diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml index 88130ce233d..24e6213ed9e 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml @@ -32,7 +32,7 @@ limitations under the License. 1.20 8.0.27 1.2.14_flink-${flink-major-1.20} - 1.3.1 + 1.4.1 false flink-${flink.version}-bin-scala_${scala.binary.version}.tgz https://dlcdn.apache.org/flink/flink-${flink.version}