4949import org .apache .paimon .table .source .Split ;
5050import org .apache .paimon .types .DataTypes ;
5151import org .apache .paimon .utils .CloseableIterator ;
52+ import org .apache .paimon .utils .SnapshotManager ;
5253import org .junit .jupiter .api .BeforeEach ;
5354import org .junit .jupiter .api .Test ;
5455import org .junit .jupiter .api .io .TempDir ;
@@ -106,6 +107,18 @@ private static Stream<Arguments> tieringWriteArgs() {
106107 Arguments .of (false , false ));
107108 }
108109
110+ private static Stream <Arguments > snapshotExpireArgs () {
111+ return Stream .of (
112+ Arguments .of (true , true , true ),
113+ Arguments .of (true , true , false ),
114+ Arguments .of (true , false , true ),
115+ Arguments .of (true , false , false ),
116+ Arguments .of (false , true , true ),
117+ Arguments .of (false , true , false ),
118+ Arguments .of (false , false , true ),
119+ Arguments .of (false , false , false ));
120+ }
121+
109122 @ ParameterizedTest
110123 @ MethodSource ("tieringWriteArgs" )
111124 void testTieringWriteTable (boolean isPrimaryKeyTable , boolean isPartitioned ) throws Exception {
@@ -118,7 +131,11 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) thr
118131 isPrimaryKeyTable ? "primary_key" : "log" ,
119132 isPartitioned ? "partitioned" : "non_partitioned" ));
120133 createTable (
121- tablePath , isPrimaryKeyTable , isPartitioned , isPrimaryKeyTable ? bucketNum : null );
134+ tablePath ,
135+ isPrimaryKeyTable ,
136+ isPartitioned ,
137+ isPrimaryKeyTable ? bucketNum : null ,
138+ Collections .emptyMap ());
122139 TableDescriptor descriptor =
123140 TableDescriptor .builder ()
124141 .schema (
@@ -132,12 +149,6 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) thr
132149 .build ();
133150 TableInfo tableInfo = TableInfo .of (tablePath , 0 , 1 , descriptor , 1L , 1L );
134151
135- List <PaimonWriteResult > paimonWriteResults = new ArrayList <>();
136- SimpleVersionedSerializer <PaimonWriteResult > writeResultSerializer =
137- paimonLakeTieringFactory .getWriteResultSerializer ();
138- SimpleVersionedSerializer <PaimonCommittable > committableSerializer =
139- paimonLakeTieringFactory .getCommittableSerializer ();
140-
141152 try (LakeCommitter <PaimonWriteResult , PaimonCommittable > lakeCommitter =
142153 createLakeCommitter (tablePath )) {
143154 // should no any missing snapshot
@@ -155,53 +166,9 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitioned) thr
155166 }
156167 }
157168 : Collections .singletonMap (null , null );
158- Map <TableBucket , Long > tableBucketOffsets = new HashMap <>();
159- // first, write data
160- for (int bucket = 0 ; bucket < bucketNum ; bucket ++) {
161- for (Map .Entry <Long , String > entry : partitionIdAndName .entrySet ()) {
162- String partition = entry .getValue ();
163- try (LakeWriter <PaimonWriteResult > lakeWriter =
164- createLakeWriter (tablePath , bucket , partition , entry .getKey (), tableInfo )) {
165- Tuple2 <String , Integer > partitionBucket = Tuple2 .of (partition , bucket );
166- Tuple2 <List <LogRecord >, List <LogRecord >> writeAndExpectRecords =
167- isPrimaryKeyTable
168- ? genPrimaryKeyTableRecords (partition , bucket )
169- : genLogTableRecords (partition , bucket , 10 );
170- List <LogRecord > writtenRecords = writeAndExpectRecords .f0 ;
171- List <LogRecord > expectRecords = writeAndExpectRecords .f1 ;
172- recordsByBucket .put (partitionBucket , expectRecords );
173- tableBucketOffsets .put (new TableBucket (0 , entry .getKey (), bucket ), 10L );
174- for (LogRecord logRecord : writtenRecords ) {
175- lakeWriter .write (logRecord );
176- }
177- // serialize/deserialize writeResult
178- PaimonWriteResult paimonWriteResult = lakeWriter .complete ();
179- byte [] serialized = writeResultSerializer .serialize (paimonWriteResult );
180- paimonWriteResults .add (
181- writeResultSerializer .deserialize (
182- writeResultSerializer .getVersion (), serialized ));
183- }
184- }
185- }
186169
187- // second, commit data
188- try (LakeCommitter <PaimonWriteResult , PaimonCommittable > lakeCommitter =
189- createLakeCommitter (tablePath )) {
190- // serialize/deserialize committable
191- PaimonCommittable paimonCommittable = lakeCommitter .toCommittable (paimonWriteResults );
192- byte [] serialized = committableSerializer .serialize (paimonCommittable );
193- paimonCommittable =
194- committableSerializer .deserialize (
195- committableSerializer .getVersion (), serialized );
196- long snapshot =
197- lakeCommitter .commit (
198- paimonCommittable ,
199- toBucketOffsetsProperty (
200- tableBucketOffsets ,
201- partitionIdAndName ,
202- getPartitionKeys (tablePath )));
203- assertThat (snapshot ).isEqualTo (1 );
204- }
170+ // firstly, write some data
171+ writeData (tableInfo , recordsByBucket , partitionIdAndName );
205172
206173 // then, check data
207174 for (int bucket = 0 ; bucket < 3 ; bucket ++) {
@@ -401,6 +368,77 @@ void testThreePartitionTiering() throws Exception {
401368 }
402369 }
403370
371+ @ ParameterizedTest
372+ @ MethodSource ("snapshotExpireArgs" )
373+ void testSnapshotExpiration (
374+ boolean isPrimaryKeyTable , boolean isPartitioned , boolean isWriteOnly )
375+ throws Exception {
376+ int bucketNum = 3 ;
377+ TablePath tablePath =
378+ TablePath .of (
379+ "paimon" ,
380+ String .format (
381+ "test_tiering_snapshot_expiration_table_%s_%s" ,
382+ isPrimaryKeyTable ? "primary_key" : "log" ,
383+ isPartitioned ? "partitioned" : "non_partitioned" ));
384+ Map <String , String > options = new HashMap <>();
385+ options .put (CoreOptions .SNAPSHOT_NUM_RETAINED_MIN .key (), "1" );
386+ options .put (CoreOptions .SNAPSHOT_NUM_RETAINED_MAX .key (), "2" );
387+ if (isWriteOnly ) {
388+ options .put (CoreOptions .WRITE_ONLY .key (), "true" );
389+ }
390+ createTable (
391+ tablePath ,
392+ isPrimaryKeyTable ,
393+ isPartitioned ,
394+ isPrimaryKeyTable ? bucketNum : null ,
395+ options );
396+ TableDescriptor descriptor =
397+ TableDescriptor .builder ()
398+ .schema (
399+ org .apache .fluss .metadata .Schema .newBuilder ()
400+ .column ("c1" , org .apache .fluss .types .DataTypes .INT ())
401+ .column ("c2" , org .apache .fluss .types .DataTypes .STRING ())
402+ .column ("c3" , org .apache .fluss .types .DataTypes .STRING ())
403+ .build ())
404+ .distributedBy (bucketNum )
405+ .property (ConfigOptions .TABLE_DATALAKE_ENABLED , true )
406+ .build ();
407+ TableInfo tableInfo = TableInfo .of (tablePath , 0 , 1 , descriptor , 1L , 1L );
408+ // Get the FileStoreTable to verify snapshots
409+ FileStoreTable fileStoreTable =
410+ (FileStoreTable ) paimonCatalog .getTable (toPaimon (tablePath ));
411+ SnapshotManager snapshotManager = fileStoreTable .snapshotManager ();
412+
413+ Map <Long , String > partitionIdAndName =
414+ isPartitioned
415+ ? new HashMap <Long , String >() {
416+ {
417+ put (1L , "p1" );
418+ put (2L , "p2" );
419+ put (3L , "p3" );
420+ }
421+ }
422+ : Collections .singletonMap (null , null );
423+
424+ // Write some data to generate 2 snapshots
425+ writeData (tableInfo , new HashMap <>(), partitionIdAndName );
426+ writeData (tableInfo , new HashMap <>(), partitionIdAndName );
427+ assertThat (snapshotManager .snapshotCount ()).isEqualTo (2 );
428+
429+ // write more data
430+ for (int i = 0 ; i < 5 ; i ++) {
431+ writeData (tableInfo , new HashMap <>(), partitionIdAndName );
432+ if (isWriteOnly ) {
433+ // if write-only table, snapshot should never be expired
434+ assertThat (snapshotManager .snapshotCount ()).isGreaterThan (2 );
435+ } else {
436+ // if not write-only table, snapshot should be expired
437+ assertThat (snapshotManager .snapshotCount ()).isEqualTo (2 );
438+ }
439+ }
440+ }
441+
404442 private void verifyLogTableRecordsMultiPartition (
405443 CloseableIterator <InternalRow > actualRecords ,
406444 List <LogRecord > expectRecords ,
@@ -726,7 +764,8 @@ private void createTable(
726764 TablePath tablePath ,
727765 boolean isPrimaryTable ,
728766 boolean isPartitioned ,
729- @ Nullable Integer numBuckets )
767+ @ Nullable Integer numBuckets ,
768+ Map <String , String > options )
730769 throws Exception {
731770 Schema .Builder builder =
732771 Schema .newBuilder ()
@@ -749,6 +788,7 @@ private void createTable(
749788 if (numBuckets != null ) {
750789 builder .option (CoreOptions .BUCKET .key (), String .valueOf (numBuckets ));
751790 }
791+ builder .options (options );
752792 doCreatePaimonTable (tablePath , builder );
753793 }
754794
@@ -804,4 +844,64 @@ private List<String> getPartitionKeys(TablePath tablePath) throws Exception {
804844 FileStoreTable fileStoreTable = (FileStoreTable ) paimonCatalog .getTable (identifier );
805845 return fileStoreTable .partitionKeys ();
806846 }
847+
848+ private void writeData (
849+ TableInfo tableInfo ,
850+ Map <Tuple2 <String , Integer >, List <LogRecord >> recordsByBucket ,
851+ Map <Long , String > partitionIdAndName )
852+ throws Exception {
853+ TablePath tablePath = tableInfo .getTablePath ();
854+ int bucketNum = tableInfo .getNumBuckets ();
855+ boolean isPrimaryKeyTable = tableInfo .hasPrimaryKey ();
856+
857+ List <PaimonWriteResult > paimonWriteResults = new ArrayList <>();
858+ SimpleVersionedSerializer <PaimonWriteResult > writeResultSerializer =
859+ paimonLakeTieringFactory .getWriteResultSerializer ();
860+ SimpleVersionedSerializer <PaimonCommittable > committableSerializer =
861+ paimonLakeTieringFactory .getCommittableSerializer ();
862+
863+ Map <TableBucket , Long > tableBucketOffsets = new HashMap <>();
864+ // first, write data
865+ for (int bucket = 0 ; bucket < bucketNum ; bucket ++) {
866+ for (Map .Entry <Long , String > entry : partitionIdAndName .entrySet ()) {
867+ String partition = entry .getValue ();
868+ try (LakeWriter <PaimonWriteResult > lakeWriter =
869+ createLakeWriter (tablePath , bucket , partition , entry .getKey (), tableInfo )) {
870+ Tuple2 <String , Integer > partitionBucket = Tuple2 .of (partition , bucket );
871+ Tuple2 <List <LogRecord >, List <LogRecord >> writeAndExpectRecords =
872+ isPrimaryKeyTable
873+ ? genPrimaryKeyTableRecords (partition , bucket )
874+ : genLogTableRecords (partition , bucket , 10 );
875+ List <LogRecord > writtenRecords = writeAndExpectRecords .f0 ;
876+ List <LogRecord > expectRecords = writeAndExpectRecords .f1 ;
877+ recordsByBucket .put (partitionBucket , expectRecords );
878+ tableBucketOffsets .put (new TableBucket (0 , entry .getKey (), bucket ), 10L );
879+ for (LogRecord logRecord : writtenRecords ) {
880+ lakeWriter .write (logRecord );
881+ }
882+ // serialize/deserialize writeResult
883+ PaimonWriteResult paimonWriteResult = lakeWriter .complete ();
884+ byte [] serialized = writeResultSerializer .serialize (paimonWriteResult );
885+ paimonWriteResults .add (
886+ writeResultSerializer .deserialize (
887+ writeResultSerializer .getVersion (), serialized ));
888+ }
889+ }
890+ }
891+
892+ // second, commit data
893+ try (LakeCommitter <PaimonWriteResult , PaimonCommittable > lakeCommitter =
894+ createLakeCommitter (tablePath )) {
895+ // serialize/deserialize committable
896+ PaimonCommittable paimonCommittable = lakeCommitter .toCommittable (paimonWriteResults );
897+ byte [] serialized = committableSerializer .serialize (paimonCommittable );
898+ paimonCommittable =
899+ committableSerializer .deserialize (
900+ committableSerializer .getVersion (), serialized );
901+ lakeCommitter .commit (
902+ paimonCommittable ,
903+ toBucketOffsetsProperty (
904+ tableBucketOffsets , partitionIdAndName , getPartitionKeys (tablePath )));
905+ }
906+ }
807907}
0 commit comments