diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
index 5e15835a8b4..5eb925da130 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
@@ -29,7 +29,7 @@ limitations under the License.
flink-cdc-pipeline-connector-paimon
- 1.3.1
+ 1.4.1
2.8.5
2.3.9
3.12.4
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
index 8758bcbcbc0..c3fefd3a227 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java
@@ -115,8 +115,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) {
multiTableCommittable.getDatabase(),
multiTableCommittable.getTable(),
checkpointId,
- multiTableCommittable.kind(),
- multiTableCommittable.wrappedCommittable()));
+ multiTableCommittable.commitMessage()));
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
index bba9e90cb4e..c709253cc13 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java
@@ -130,13 +130,13 @@ private TableWriteImpl> newTableWrite(FileStoreTable table) {
if (memoryPoolFactory != null) {
return tableWrite.withMemoryPoolFactory(memoryPoolFactory);
} else {
- return (TableWriteImpl>)
- tableWrite.withMemoryPool(
+ return tableWrite.withMemoryPoolFactory(
+ new MemoryPoolFactory(
memoryPool != null
? memoryPool
: new HeapMemorySegmentPool(
table.coreOptions().writeBufferSize(),
- table.coreOptions().pageSize()));
+ table.coreOptions().pageSize())));
}
}
@@ -159,11 +159,6 @@ public SinkRecord write(InternalRow internalRow, int i) throws Exception {
return write.writeAndReturn(internalRow, i);
}
- @Override
- public SinkRecord toLogRecord(SinkRecord record) {
- return write.toLogRecord(record);
- }
-
@Override
public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception {
write.compact(partition, bucket, fullCompaction);
@@ -191,8 +186,7 @@ public List prepareCommit(boolean waitCompaction, long checkpointId
try {
for (CommitMessage committable :
write.prepareCommit(this.waitCompaction || waitCompaction, checkpointId)) {
- committables.add(
- new Committable(checkpointId, Committable.Kind.FILE, committable));
+ committables.add(new Committable(checkpointId, committable));
}
} catch (Exception e) {
throw new IOException(e);
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index 53764cf5a30..381c87b249c 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -1044,8 +1044,7 @@ private MultiTableCommittable correctCheckpointId(MultiTableCommittable committa
committable.getDatabase(),
committable.getTable(),
checkpointId++,
- committable.kind(),
- committable.wrappedCommittable());
+ committable.commitMessage());
}
private static class MockCommitRequestImpl extends CommitRequestImpl {
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java
index e065d13a843..ca54cbc51c9 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java
@@ -39,18 +39,18 @@ void testChannel() {
computer.setup(4);
List commits =
Arrays.asList(
- new MultiTableCommittable("database", "table1", 1L, null, null),
- new MultiTableCommittable("database", "table2", 1L, null, null),
- new MultiTableCommittable("database", "table1", 1L, null, null),
- new MultiTableCommittable("database", "table5", 1L, null, null),
- new MultiTableCommittable("database", "table3", 1L, null, null),
- new MultiTableCommittable("database", "table8", 1L, null, null),
- new MultiTableCommittable("database", "table5", 1L, null, null),
- new MultiTableCommittable("database", "table1", 1L, null, null),
- new MultiTableCommittable("database", "table9", 1L, null, null),
- new MultiTableCommittable("database", "table5", 1L, null, null),
- new MultiTableCommittable("database", "table3", 1L, null, null),
- new MultiTableCommittable("database", "table8", 1L, null, null));
+ new MultiTableCommittable("database", "table1", 1L, null),
+ new MultiTableCommittable("database", "table2", 1L, null),
+ new MultiTableCommittable("database", "table1", 1L, null),
+ new MultiTableCommittable("database", "table5", 1L, null),
+ new MultiTableCommittable("database", "table3", 1L, null),
+ new MultiTableCommittable("database", "table8", 1L, null),
+ new MultiTableCommittable("database", "table5", 1L, null),
+ new MultiTableCommittable("database", "table1", 1L, null),
+ new MultiTableCommittable("database", "table9", 1L, null),
+ new MultiTableCommittable("database", "table5", 1L, null),
+ new MultiTableCommittable("database", "table3", 1L, null),
+ new MultiTableCommittable("database", "table8", 1L, null));
Map> map = new HashMap<>();
commits.forEach(
(commit) -> {
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 88130ce233d..24e6213ed9e 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -32,7 +32,7 @@ limitations under the License.
1.20
8.0.27
1.2.14_flink-${flink-major-1.20}
- 1.3.1
+ 1.4.1
false
flink-${flink.version}-bin-scala_${scala.binary.version}.tgz
https://dlcdn.apache.org/flink/flink-${flink.version}