2424import org .apache .paimon .compact .CompactUnit ;
2525import org .apache .paimon .data .InternalRow ;
2626import org .apache .paimon .io .DataFileMeta ;
27+ import org .apache .paimon .io .RecordLevelExpire ;
2728import org .apache .paimon .mergetree .SortedRun ;
2829import org .apache .paimon .operation .metrics .CompactionMetrics ;
2930
@@ -43,11 +44,11 @@ public class MergeTreeCompactTask extends CompactTask {
4344 private final CompactRewriter rewriter ;
4445 private final int outputLevel ;
4546 private final Supplier <CompactDeletionFile > compactDfSupplier ;
46-
4747 private final List <List <SortedRun >> partitioned ;
48-
4948 private final boolean dropDelete ;
5049 private final int maxLevel ;
50+ @ Nullable private final RecordLevelExpire recordLevelExpire ;
51+ private final boolean forceRewriteAllFiles ;
5152
5253 // metric
5354 private int upgradeFilesNum ;
@@ -60,7 +61,9 @@ public MergeTreeCompactTask(
6061 boolean dropDelete ,
6162 int maxLevel ,
6263 @ Nullable CompactionMetrics .Reporter metricsReporter ,
63- Supplier <CompactDeletionFile > compactDfSupplier ) {
64+ Supplier <CompactDeletionFile > compactDfSupplier ,
65+ @ Nullable RecordLevelExpire recordLevelExpire ,
66+ boolean forceRewriteAllFiles ) {
6467 super (metricsReporter );
6568 this .minFileSize = minFileSize ;
6669 this .rewriter = rewriter ;
@@ -69,6 +72,8 @@ public MergeTreeCompactTask(
6972 this .partitioned = new IntervalPartition (unit .files (), keyComparator ).partition ();
7073 this .dropDelete = dropDelete ;
7174 this .maxLevel = maxLevel ;
75+ this .recordLevelExpire = recordLevelExpire ;
76+ this .forceRewriteAllFiles = forceRewriteAllFiles ;
7277
7378 this .upgradeFilesNum = 0 ;
7479 }
@@ -116,17 +121,21 @@ protected String logMetric(
116121 }
117122
118123 private void upgrade (DataFileMeta file , CompactResult toUpdate ) throws Exception {
119- if (outputLevel != maxLevel || file . deleteRowCount (). map ( d -> d == 0 ). orElse ( false ) ) {
120- CompactResult upgradeResult = rewriter . upgrade ( outputLevel , file );
121- toUpdate . merge ( upgradeResult );
122- upgradeFilesNum ++;
123- } else {
124- // files with delete records should not be upgraded directly to max level
125- List < List < SortedRun >> candidate = new ArrayList <>( );
126- candidate . add ( new ArrayList <>() );
127- candidate . get ( 0 ). add ( SortedRun . fromSingle ( file )) ;
128- rewriteImpl ( candidate , toUpdate );
124+ if (outputLevel == maxLevel ) {
125+ if ( containsExpiredRecords ( file )
126+ || containsDeleteRecords ( file )
127+ || forceRewriteAllFiles ) {
128+ List < List < SortedRun >> candidate = new ArrayList <>();
129+ candidate . add ( new ArrayList <>());
130+ candidate . get ( 0 ). add ( SortedRun . fromSingle ( file ) );
131+ rewriteImpl ( candidate , toUpdate );
132+ return ;
133+ }
129134 }
135+
136+ CompactResult upgradeResult = rewriter .upgrade (outputLevel , file );
137+ toUpdate .merge (upgradeResult );
138+ upgradeFilesNum ++;
130139 }
131140
132141 private void rewrite (List <List <SortedRun >> candidate , CompactResult toUpdate ) throws Exception {
@@ -154,4 +163,12 @@ private void rewriteImpl(List<List<SortedRun>> candidate, CompactResult toUpdate
154163 toUpdate .merge (rewriteResult );
155164 candidate .clear ();
156165 }
166+
167+ private boolean containsDeleteRecords (DataFileMeta file ) {
168+ return file .deleteRowCount ().map (d -> d > 0 ).orElse (true );
169+ }
170+
171+ private boolean containsExpiredRecords (DataFileMeta file ) {
172+ return recordLevelExpire != null && recordLevelExpire .isExpireFile (file );
173+ }
157174}
0 commit comments