Skip to content

Commit 0a5a239

Browse files
committed
upgrade paimon to 1.4.1
Signed-off-by: Pei Yu <125331682@qq.com>
1 parent eb3cdc0 commit 0a5a239

6 files changed

Lines changed: 20 additions & 28 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ limitations under the License.
2929
<artifactId>flink-cdc-pipeline-connector-paimon</artifactId>
3030

3131
<properties>
32-
<paimon.version>1.3.1</paimon.version>
32+
<paimon.version>1.4.1</paimon.version>
3333
<hadoop.version>2.8.5</hadoop.version>
3434
<hive.version>2.3.9</hive.version>
3535
<mockito.version>3.12.4</mockito.version>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PreCommitOperator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,7 @@ public void prepareSnapshotPreBarrier(long checkpointId) {
115115
multiTableCommittable.getDatabase(),
116116
multiTableCommittable.getTable(),
117117
checkpointId,
118-
multiTableCommittable.kind(),
119-
multiTableCommittable.wrappedCommittable()));
118+
multiTableCommittable.commitMessage()));
120119
}
121120
}
122121

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/StoreSinkWriteImpl.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,13 @@ private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
130130
if (memoryPoolFactory != null) {
131131
return tableWrite.withMemoryPoolFactory(memoryPoolFactory);
132132
} else {
133-
return (TableWriteImpl<?>)
134-
tableWrite.withMemoryPool(
133+
return tableWrite.withMemoryPoolFactory(
134+
new MemoryPoolFactory(
135135
memoryPool != null
136136
? memoryPool
137137
: new HeapMemorySegmentPool(
138138
table.coreOptions().writeBufferSize(),
139-
table.coreOptions().pageSize()));
139+
table.coreOptions().pageSize())));
140140
}
141141
}
142142

@@ -159,11 +159,6 @@ public SinkRecord write(InternalRow internalRow, int i) throws Exception {
159159
return write.writeAndReturn(internalRow, i);
160160
}
161161

162-
@Override
163-
public SinkRecord toLogRecord(SinkRecord record) {
164-
return write.toLogRecord(record);
165-
}
166-
167162
@Override
168163
public void compact(BinaryRow partition, int bucket, boolean fullCompaction) throws Exception {
169164
write.compact(partition, bucket, fullCompaction);
@@ -191,8 +186,7 @@ public List<Committable> prepareCommit(boolean waitCompaction, long checkpointId
191186
try {
192187
for (CommitMessage committable :
193188
write.prepareCommit(this.waitCompaction || waitCompaction, checkpointId)) {
194-
committables.add(
195-
new Committable(checkpointId, Committable.Kind.FILE, committable));
189+
committables.add(new Committable(checkpointId, committable));
196190
}
197191
} catch (Exception e) {
198192
throw new IOException(e);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,8 +1044,7 @@ private MultiTableCommittable correctCheckpointId(MultiTableCommittable committa
10441044
committable.getDatabase(),
10451045
committable.getTable(),
10461046
checkpointId++,
1047-
committable.kind(),
1048-
committable.wrappedCommittable());
1047+
committable.commitMessage());
10491048
}
10501049

10511050
private static class MockCommitRequestImpl<CommT> extends CommitRequestImpl<CommT> {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/TestMultiTableCommittableChannelComputer.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,18 @@ void testChannel() {
3939
computer.setup(4);
4040
List<MultiTableCommittable> commits =
4141
Arrays.asList(
42-
new MultiTableCommittable("database", "table1", 1L, null, null),
43-
new MultiTableCommittable("database", "table2", 1L, null, null),
44-
new MultiTableCommittable("database", "table1", 1L, null, null),
45-
new MultiTableCommittable("database", "table5", 1L, null, null),
46-
new MultiTableCommittable("database", "table3", 1L, null, null),
47-
new MultiTableCommittable("database", "table8", 1L, null, null),
48-
new MultiTableCommittable("database", "table5", 1L, null, null),
49-
new MultiTableCommittable("database", "table1", 1L, null, null),
50-
new MultiTableCommittable("database", "table9", 1L, null, null),
51-
new MultiTableCommittable("database", "table5", 1L, null, null),
52-
new MultiTableCommittable("database", "table3", 1L, null, null),
53-
new MultiTableCommittable("database", "table8", 1L, null, null));
42+
new MultiTableCommittable("database", "table1", 1L, null),
43+
new MultiTableCommittable("database", "table2", 1L, null),
44+
new MultiTableCommittable("database", "table1", 1L, null),
45+
new MultiTableCommittable("database", "table5", 1L, null),
46+
new MultiTableCommittable("database", "table3", 1L, null),
47+
new MultiTableCommittable("database", "table8", 1L, null),
48+
new MultiTableCommittable("database", "table5", 1L, null),
49+
new MultiTableCommittable("database", "table1", 1L, null),
50+
new MultiTableCommittable("database", "table9", 1L, null),
51+
new MultiTableCommittable("database", "table5", 1L, null),
52+
new MultiTableCommittable("database", "table3", 1L, null),
53+
new MultiTableCommittable("database", "table8", 1L, null));
5454
Map<Integer, Set<String>> map = new HashMap<>();
5555
commits.forEach(
5656
(commit) -> {

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ limitations under the License.
3232
<flink-major-1.20>1.20</flink-major-1.20>
3333
<mysql.driver.version>8.0.27</mysql.driver.version>
3434
<starrocks.connector.version>1.2.14_flink-${flink-major-1.20}</starrocks.connector.version>
35-
<paimon.version>1.3.1</paimon.version>
35+
<paimon.version>1.4.1</paimon.version>
3636
<flink.release.download.skip>false</flink.release.download.skip>
3737
<flink.release.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.release.name>
3838
<flink.release.mirror>https://dlcdn.apache.org/flink/flink-${flink.version}</flink.release.mirror>

0 commit comments

Comments
 (0)