Skip to content

Commit 54b515b

Browse files
authored
[paimon] Bump Paimon version from 1.0.1 to 1.2.0 (#1289)
1 parent e5c565c commit 54b515b

File tree

7 files changed

+54
-18
lines changed

7 files changed

+54
-18
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lakehouse/paimon/reader/ScanRecordWrapper.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.data.InternalMap;
2929
import org.apache.paimon.data.InternalRow;
3030
import org.apache.paimon.data.Timestamp;
31+
import org.apache.paimon.data.variant.Variant;
3132
import org.apache.paimon.types.DataType;
3233
import org.apache.paimon.types.RowKind;
3334
import org.apache.paimon.types.RowType;
@@ -157,6 +158,11 @@ public byte[] getBinary(int pos) {
157158
return flussRow.getBytes(pos);
158159
}
159160

161+
@Override
162+
public Variant getVariant(int pos) {
163+
throw new UnsupportedOperationException();
164+
}
165+
160166
@Override
161167
public InternalArray getArray(int pos) {
162168
throw new UnsupportedOperationException();

fluss-lake/fluss-lake-paimon/pom.xml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
<packaging>jar</packaging>
3434

3535
<properties>
36-
<paimon.version>1.0.1</paimon.version>
36+
<paimon.version>1.2.0</paimon.version>
3737
</properties>
3838

3939
<dependencies>
@@ -75,6 +75,14 @@
7575
<scope>test</scope>
7676
</dependency>
7777

78+
79+
<dependency>
80+
<groupId>org.apache.hadoop</groupId>
81+
<artifactId>hadoop-mapreduce-client-core</artifactId>
82+
<version>2.8.5</version>
83+
<scope>test</scope>
84+
</dependency>
85+
7886
<dependency>
7987
<groupId>org.apache.hadoop</groupId>
8088
<artifactId>hadoop-hdfs-client</artifactId>

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/FlussRecordAsPaimonRow.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.paimon.data.InternalMap;
2828
import org.apache.paimon.data.InternalRow;
2929
import org.apache.paimon.data.Timestamp;
30+
import org.apache.paimon.data.variant.Variant;
3031
import org.apache.paimon.types.DataType;
3132
import org.apache.paimon.types.RowKind;
3233
import org.apache.paimon.types.RowType;
@@ -189,6 +190,12 @@ public byte[] getBinary(int pos) {
189190
return internalRow.getBytes(pos);
190191
}
191192

193+
@Override
194+
public Variant getVariant(int pos) {
195+
throw new UnsupportedOperationException(
196+
"getVariant is not support for Fluss record currently.");
197+
}
198+
192199
@Override
193200
public InternalArray getArray(int pos) {
194201
throw new UnsupportedOperationException(

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import com.alibaba.fluss.lake.committer.LakeCommitter;
2222
import com.alibaba.fluss.metadata.TablePath;
2323

24+
import org.apache.paimon.CoreOptions;
2425
import org.apache.paimon.Snapshot;
2526
import org.apache.paimon.catalog.Catalog;
2627
import org.apache.paimon.data.BinaryRow;
2728
import org.apache.paimon.io.DataFileMeta;
29+
import org.apache.paimon.manifest.IndexManifestEntry;
2830
import org.apache.paimon.manifest.ManifestCommittable;
2931
import org.apache.paimon.manifest.ManifestEntry;
3032
import org.apache.paimon.operation.FileStoreCommit;
@@ -56,6 +58,7 @@ public class PaimonLakeCommitter implements LakeCommitter<PaimonWriteResult, Pai
5658
private final FileStoreTable fileStoreTable;
5759
private FileStoreCommit fileStoreCommit;
5860
private final TablePath tablePath;
61+
private static final ThreadLocal<Long> currentCommitSnapshotId = new ThreadLocal<>();
5962

6063
public PaimonLakeCommitter(PaimonCatalogProvider paimonCatalogProvider, TablePath tablePath)
6164
throws IOException {
@@ -77,18 +80,16 @@ public PaimonCommittable toCommittable(List<PaimonWriteResult> paimonWriteResult
7780
@Override
7881
public long commit(PaimonCommittable committable) throws IOException {
7982
ManifestCommittable manifestCommittable = committable.manifestCommittable();
80-
PaimonCommitCallback paimonCommitCallback = new PaimonCommitCallback();
8183
try {
8284
fileStoreCommit =
8385
fileStoreTable
8486
.store()
85-
.newCommit(
86-
FLUSS_LAKE_TIERING_COMMIT_USER,
87-
Collections.singletonList(paimonCommitCallback));
88-
fileStoreCommit.commit(manifestCommittable, Collections.emptyMap());
89-
return checkNotNull(
90-
paimonCommitCallback.commitSnapshotId,
91-
"Paimon committed snapshot id must be non-null.");
87+
.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER, fileStoreTable);
88+
fileStoreCommit.commit(manifestCommittable, false);
89+
Long commitSnapshotId = currentCommitSnapshotId.get();
90+
currentCommitSnapshotId.remove();
91+
92+
return checkNotNull(commitSnapshotId, "Paimon committed snapshot id must be non-null.");
9293
} catch (Throwable t) {
9394
if (fileStoreCommit != null) {
9495
// if any error happen while commit, abort the commit to clean committable
@@ -100,7 +101,8 @@ public long commit(PaimonCommittable committable) throws IOException {
100101

101102
@Override
102103
public void abort(PaimonCommittable committable) throws IOException {
103-
fileStoreCommit = fileStoreTable.store().newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
104+
fileStoreCommit =
105+
fileStoreTable.store().newCommit(FLUSS_LAKE_TIERING_COMMIT_USER, fileStoreTable);
104106
fileStoreCommit.abort(committable.manifestCommittable().fileCommittables());
105107
}
106108

@@ -189,19 +191,29 @@ public void close() throws Exception {
189191

190192
private FileStoreTable getTable(TablePath tablePath) throws IOException {
191193
try {
192-
return (FileStoreTable) paimonCatalog.getTable(toPaimon(tablePath));
194+
FileStoreTable table =
195+
(FileStoreTable)
196+
paimonCatalog
197+
.getTable(toPaimon(tablePath))
198+
.copy(
199+
Collections.singletonMap(
200+
CoreOptions.COMMIT_CALLBACKS.key(),
201+
PaimonLakeCommitter.PaimonCommitCallback.class
202+
.getName()));
203+
204+
return table;
193205
} catch (Exception e) {
194206
throw new IOException("Failed to get table " + tablePath + " in Paimon.", e);
195207
}
196208
}
197209

198-
private static class PaimonCommitCallback implements CommitCallback {
199-
200-
private Long commitSnapshotId = null;
210+
/** A {@link CommitCallback} to save paimon commit snapshot info. */
211+
public static class PaimonCommitCallback implements CommitCallback {
201212

202213
@Override
203-
public void call(List<ManifestEntry> list, Snapshot snapshot) {
204-
this.commitSnapshotId = snapshot.id();
214+
public void call(
215+
List<ManifestEntry> list, List<IndexManifestEntry> indexFiles, Snapshot snapshot) {
216+
currentCommitSnapshotId.set(snapshot.id());
205217
}
206218

207219
@Override

fluss-lake/fluss-lake-paimon/src/main/resources/META-INF/NOTICE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/).
66

77
This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
88

9-
- org.apache.paimon:paimon-bundle:1.0.1
9+
- org.apache.paimon:paimon-bundle:1.2.0

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,6 +731,9 @@ private void doCreatePaimonTable(TablePath tablePath, Schema.Builder paimonSchem
731731
paimonSchemaBuilder.column(OFFSET_COLUMN_NAME, DataTypes.BIGINT());
732732
paimonSchemaBuilder.column(
733733
TIMESTAMP_COLUMN_NAME, DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
734+
paimonSchemaBuilder.option(
735+
CoreOptions.COMMIT_CALLBACKS.key(),
736+
PaimonLakeCommitter.PaimonCommitCallback.class.getName());
734737
paimonCatalog.createDatabase(tablePath.getDatabaseName(), true);
735738
paimonCatalog.createTable(toPaimon(tablePath), paimonSchemaBuilder.build(), true);
736739
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@
9898
<curator.version>5.4.0</curator.version>
9999
<netty.version>4.1.104</netty.version>
100100
<arrow.version>15.0.0</arrow.version>
101-
<paimon.version>1.0.1</paimon.version>
101+
<paimon.version>1.2.0</paimon.version>
102102

103103
<fluss.hadoop.version>2.10.2</fluss.hadoop.version>
104104
<frocksdb.version>6.20.3-ververica-2.0</frocksdb.version>

0 commit comments

Comments
 (0)