1919
2020import org .apache .fluss .lake .committer .BucketOffset ;
2121import org .apache .fluss .lake .committer .CommittedLakeSnapshot ;
22+ import org .apache .fluss .lake .committer .CommitterInitContext ;
2223import org .apache .fluss .lake .committer .LakeCommitter ;
2324import org .apache .fluss .metadata .TablePath ;
2425import org .apache .fluss .shaded .jackson2 .com .fasterxml .jackson .databind .JsonNode ;
3233import org .apache .paimon .manifest .ManifestCommittable ;
3334import org .apache .paimon .manifest .ManifestEntry ;
3435import org .apache .paimon .manifest .SimpleFileEntry ;
35- import org .apache .paimon .operation .FileStoreCommit ;
3636import org .apache .paimon .table .FileStoreTable ;
3737import org .apache .paimon .table .sink .CommitCallback ;
38+ import org .apache .paimon .table .sink .TableCommitImpl ;
3839import org .apache .paimon .utils .SnapshotManager ;
3940
4041import javax .annotation .Nullable ;
4142
4243import java .io .IOException ;
43- import java .util .Collections ;
44+ import java .util .HashMap ;
4445import java .util .List ;
4546import java .util .Map ;
4647
@@ -55,14 +56,21 @@ public class PaimonLakeCommitter implements LakeCommitter<PaimonWriteResult, Pai
5556
5657 private final Catalog paimonCatalog ;
5758 private final FileStoreTable fileStoreTable ;
58- private FileStoreCommit fileStoreCommit ;
59+ private TableCommitImpl tableCommit ;
5960 private static final ThreadLocal <Long > currentCommitSnapshotId = new ThreadLocal <>();
6061 private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper ();
6162
62- public PaimonLakeCommitter (PaimonCatalogProvider paimonCatalogProvider , TablePath tablePath )
63+ public PaimonLakeCommitter (
64+ PaimonCatalogProvider paimonCatalogProvider , CommitterInitContext committerInitContext )
6365 throws IOException {
6466 this .paimonCatalog = paimonCatalogProvider .get ();
65- this .fileStoreTable = getTable (tablePath );
67+ this .fileStoreTable =
68+ getTable (
69+ committerInitContext .tablePath (),
70+ committerInitContext
71+ .tableInfo ()
72+ .getTableConfig ()
73+ .isDataLakeAutoSnapshotExpiration ());
6674 }
6775
6876 @ Override
@@ -82,29 +90,26 @@ public long commit(PaimonCommittable committable, Map<String, String> snapshotPr
8290 snapshotProperties .forEach (manifestCommittable ::addProperty );
8391
8492 try {
85- fileStoreCommit =
86- fileStoreTable
87- .store ()
88- .newCommit (FLUSS_LAKE_TIERING_COMMIT_USER , fileStoreTable );
89- fileStoreCommit .commit (manifestCommittable , false );
93+ tableCommit = fileStoreTable .newCommit (FLUSS_LAKE_TIERING_COMMIT_USER );
94+ tableCommit .commit (manifestCommittable );
95+
9096 Long commitSnapshotId = currentCommitSnapshotId .get ();
9197 currentCommitSnapshotId .remove ();
9298
9399 return checkNotNull (commitSnapshotId , "Paimon committed snapshot id must be non-null." );
94100 } catch (Throwable t ) {
95- if (fileStoreCommit != null ) {
101+ if (tableCommit != null ) {
96102 // if any error happen while commit, abort the commit to clean committable
97- fileStoreCommit .abort (manifestCommittable .fileCommittables ());
103+ tableCommit .abort (manifestCommittable .fileCommittables ());
98104 }
99105 throw new IOException (t );
100106 }
101107 }
102108
103109 @ Override
104110 public void abort (PaimonCommittable committable ) throws IOException {
105- fileStoreCommit =
106- fileStoreTable .store ().newCommit (FLUSS_LAKE_TIERING_COMMIT_USER , fileStoreTable );
107- fileStoreCommit .abort (committable .manifestCommittable ().fileCommittables ());
111+ tableCommit = fileStoreTable .newCommit (FLUSS_LAKE_TIERING_COMMIT_USER );
112+ tableCommit .abort (committable .manifestCommittable ().fileCommittables ());
108113 }
109114
110115 @ Nullable
@@ -191,8 +196,8 @@ private Snapshot getCommittedLatestSnapshotOfLake(String commitUser) throws IOEx
191196 @ Override
192197 public void close () throws Exception {
193198 try {
194- if (fileStoreCommit != null ) {
195- fileStoreCommit .close ();
199+ if (tableCommit != null ) {
200+ tableCommit .close ();
196201 }
197202 if (paimonCatalog != null ) {
198203 paimonCatalog .close ();
@@ -202,19 +207,20 @@ public void close() throws Exception {
202207 }
203208 }
204209
205- private FileStoreTable getTable (TablePath tablePath ) throws IOException {
210+ private FileStoreTable getTable (TablePath tablePath , boolean isAutoSnapshotExpiration )
211+ throws IOException {
206212 try {
207- FileStoreTable table =
208- ( FileStoreTable )
209- paimonCatalog
210- . getTable ( toPaimon ( tablePath ))
211- . copy (
212- Collections . singletonMap (
213- CoreOptions . COMMIT_CALLBACKS . key (),
214- PaimonLakeCommitter . PaimonCommitCallback . class
215- . getName () ));
216-
217- return table ;
213+ FileStoreTable table = ( FileStoreTable ) paimonCatalog . getTable ( toPaimon ( tablePath ));
214+
215+ Map < String , String > dynamicOptions = new HashMap <>();
216+ dynamicOptions . put (
217+ CoreOptions . COMMIT_CALLBACKS . key (),
218+ PaimonLakeCommitter . PaimonCommitCallback . class . getName ());
219+ dynamicOptions . put (
220+ CoreOptions . WRITE_ONLY . key (),
221+ isAutoSnapshotExpiration ? Boolean . FALSE . toString () : Boolean . TRUE . toString ( ));
222+
223+ return table . copy ( dynamicOptions ) ;
218224 } catch (Exception e ) {
219225 throw new IOException ("Failed to get table " + tablePath + " in Paimon." , e );
220226 }
0 commit comments