2222import org .apache .paimon .Snapshot ;
2323import org .apache .paimon .annotation .VisibleForTesting ;
2424import org .apache .paimon .data .BinaryRow ;
25- import org .apache .paimon .deletionvectors .append .AppendDeletionFileMaintainer ;
26- import org .apache .paimon .deletionvectors .append .UnawareAppendDeletionFileMaintainer ;
25+ import org .apache .paimon .deletionvectors .append .AppendDeleteFileMaintainer ;
26+ import org .apache .paimon .deletionvectors .append .BaseAppendDeleteFileMaintainer ;
2727import org .apache .paimon .index .IndexFileHandler ;
2828import org .apache .paimon .io .DataFileMeta ;
2929import org .apache .paimon .manifest .FileKind ;
3434import org .apache .paimon .table .source .EndOfScanException ;
3535import org .apache .paimon .table .source .ScanMode ;
3636import org .apache .paimon .table .source .snapshot .SnapshotReader ;
37- import org .apache .paimon .utils .Filter ;
3837import org .apache .paimon .utils .SnapshotManager ;
3938
4039import javax .annotation .Nullable ;
5655/**
5756 * Compact coordinator for append only tables.
5857 *
59- * <p>Note: {@link UnawareAppendTableCompactionCoordinator } scan files in snapshot, read APPEND and
60- * COMPACT snapshot then load those new files. It will try it best to generate compaction task for
61- * the restored files scanned in snapshot, but to reduce memory usage, it won't remain single file
62- * for a long time. After ten times scan, single file with one partition will be ignored and removed
63- * from memory, which means, it will not participate in compaction again until restart the
64- * compaction job.
58+ * <p>Note: {@link AppendCompactCoordinator } scan files in snapshot, read APPEND and COMPACT
59+ * snapshot then load those new files. It will try it best to generate compaction task for the
60+ * restored files scanned in snapshot, but to reduce memory usage, it won't remain single file for a
61+ * long time. After ten times scan, single file with one partition will be ignored and removed from
62+ * memory, which means, it will not participate in compaction again until restart the compaction
63+ * job.
6564 *
6665 * <p>When a third task delete file in latest snapshot(including batch delete/update and overwrite),
6766 * the file in coordinator will still remain and participate in compaction task. When this happens,
6867 * compaction job will fail in commit stage, and fail-over to rescan the restored files in latest
6968 * snapshot.
7069 */
71- public class UnawareAppendTableCompactionCoordinator {
70+ public class AppendCompactCoordinator {
7271
7372 private static final int FILES_BATCH = 100_000 ;
7473
@@ -87,11 +86,11 @@ public class UnawareAppendTableCompactionCoordinator {
8786 final Map <BinaryRow , PartitionCompactCoordinator > partitionCompactCoordinators =
8887 new HashMap <>();
8988
90- public UnawareAppendTableCompactionCoordinator (FileStoreTable table , boolean isStreaming ) {
89+ public AppendCompactCoordinator (FileStoreTable table , boolean isStreaming ) {
9190 this (table , isStreaming , null );
9291 }
9392
94- public UnawareAppendTableCompactionCoordinator (
93+ public AppendCompactCoordinator (
9594 FileStoreTable table , boolean isStreaming , @ Nullable Predicate filter ) {
9695 checkArgument (table .primaryKeys ().isEmpty ());
9796 this .snapshotManager = table .snapshotManager ();
@@ -108,7 +107,7 @@ public UnawareAppendTableCompactionCoordinator(
108107 this .filesIterator = new FilesIterator (table , isStreaming , filter );
109108 }
110109
111- public List <UnawareAppendCompactionTask > run () {
110+ public List <AppendCompactTask > run () {
112111 // scan files in snapshot
113112 if (scan ()) {
114113 // do plan compact tasks
@@ -154,29 +153,20 @@ FilesIterator filesIterator() {
154153
155154 @ VisibleForTesting
156155 void notifyNewFiles (BinaryRow partition , List <DataFileMeta > files ) {
157- java .util .function .Predicate <DataFileMeta > filter =
158- file -> {
159- if (dvMaintainerCache == null
160- || dvMaintainerCache
161- .dvMaintainer (partition )
162- .getDeletionFile (file .fileName ())
163- == null ) {
164- return file .fileSize () < compactionFileSize ;
165- }
166- // if a data file has a deletion file, always be to compact.
167- return true ;
168- };
169- List <DataFileMeta > toCompact = files .stream ().filter (filter ).collect (Collectors .toList ());
156+ List <DataFileMeta > toCompact =
157+ files .stream ()
158+ .filter (file -> shouldCompact (partition , file ))
159+ .collect (Collectors .toList ());
170160 partitionCompactCoordinators
171161 .computeIfAbsent (partition , pp -> new PartitionCompactCoordinator (partition ))
172162 .addFiles (toCompact );
173163 }
174164
175165 @ VisibleForTesting
176166 // generate compaction task to the next stage
177- List <UnawareAppendCompactionTask > compactPlan () {
167+ List <AppendCompactTask > compactPlan () {
178168 // first loop to found compaction tasks
179- List <UnawareAppendCompactionTask > tasks =
169+ List <AppendCompactTask > tasks =
180170 partitionCompactCoordinators .values ().stream ()
181171 .flatMap (s -> s .plan ().stream ())
182172 .collect (Collectors .toList ());
@@ -213,18 +203,18 @@ public PartitionCompactCoordinator(BinaryRow partition) {
213203 this .partition = partition ;
214204 }
215205
216- public List <UnawareAppendCompactionTask > plan () {
206+ public List <AppendCompactTask > plan () {
217207 return pickCompact ();
218208 }
219209
220210 public BinaryRow partition () {
221211 return partition ;
222212 }
223213
224- private List <UnawareAppendCompactionTask > pickCompact () {
214+ private List <AppendCompactTask > pickCompact () {
225215 List <List <DataFileMeta >> waitCompact = agePack ();
226216 return waitCompact .stream ()
227- .map (files -> new UnawareAppendCompactionTask (partition , files ))
217+ .map (files -> new AppendCompactTask (partition , files ))
228218 .collect (Collectors .toList ());
229219 }
230220
@@ -260,14 +250,11 @@ private List<List<DataFileMeta>> agePack() {
260250 }
261251
262252 private List <List <DataFileMeta >> pack (Set <DataFileMeta > toCompact ) {
263- // we compact smaller files first
264- // step 1, sort files by file size, pick the smaller first
253+ // we don't know how many parallel compact works there should be, so in order to pack
254+ // better, we will sort them first
265255 ArrayList <DataFileMeta > files = new ArrayList <>(toCompact );
266256 files .sort (Comparator .comparingLong (DataFileMeta ::fileSize ));
267257
268- // step 2, when files picked size greater than targetFileSize(meanwhile file num greater
269- // than minFileNum) or file numbers bigger than maxFileNum, we pack it to a compaction
270- // task
271258 List <List <DataFileMeta >> result = new ArrayList <>();
272259 FileBin fileBin = new FileBin ();
273260 for (DataFileMeta fileMeta : files ) {
@@ -278,15 +265,19 @@ private List<List<DataFileMeta>> pack(Set<DataFileMeta> toCompact) {
278265 }
279266 }
280267
281- if (fileBin .enoughInputFiles () || fileBin . containsTooHighDeleteFile () ) {
268+ if (fileBin .enoughInputFiles ()) {
282269 result .add (new ArrayList <>(fileBin .bin ));
283- fileBin .reset ();
284270 }
271+ // else skip these small files that are too few
272+
285273 return result ;
286274 }
287275
288276 private List <List <DataFileMeta >> packInDeletionVectorVMode (Set <DataFileMeta > toCompact ) {
289277 // we group the data files by their related index files.
278+ // In the subsequent compact task, if any files with deletion vectors are compacted, we
279+ // need to rewrite their corresponding deleted files. To avoid duplicate deleted files,
280+ // we must group them according to the deleted files
290281 Map <String , List <DataFileMeta >> filesWithDV = new HashMap <>();
291282 Set <DataFileMeta > rest = new HashSet <>();
292283 for (DataFileMeta dataFile : toCompact ) {
@@ -301,13 +292,35 @@ private List<List<DataFileMeta>> packInDeletionVectorVMode(Set<DataFileMeta> toC
301292 }
302293 }
303294
304- List <List <DataFileMeta >> result = new ArrayList <>(filesWithDV .values ());
295+ // To avoid too small a compact task, merge them
296+ List <List <DataFileMeta >> dvGroups = new ArrayList <>(filesWithDV .values ());
297+ dvGroups .sort (Comparator .comparingLong (this ::fileSizeOfList ));
298+
299+ List <List <DataFileMeta >> result = new ArrayList <>();
300+ FileBin fileBin = new FileBin ();
301+ for (List <DataFileMeta > dvGroup : dvGroups ) {
302+ fileBin .addFiles (dvGroup );
303+ if (fileBin .enoughContent ()) {
304+ result .add (new ArrayList <>(fileBin .bin ));
305+ fileBin .reset ();
306+ }
307+ }
308+
309+ // for file with deletion vectors, must do compaction
310+ if (!fileBin .bin .isEmpty ()) {
311+ result .add (new ArrayList <>(fileBin .bin ));
312+ }
313+
305314 if (rest .size () > 1 ) {
306315 result .addAll (pack (rest ));
307316 }
308317 return result ;
309318 }
310319
320+ private long fileSizeOfList (List <DataFileMeta > list ) {
321+ return list .stream ().mapToLong (DataFileMeta ::fileSize ).sum ();
322+ }
323+
311324 /**
312325 * A file bin for {@link PartitionCompactCoordinator} determine whether ready to compact.
313326 */
@@ -321,6 +334,10 @@ public void reset() {
321334 totalFileSize = 0 ;
322335 }
323336
337+ public void addFiles (List <DataFileMeta > files ) {
338+ files .forEach (this ::addFile );
339+ }
340+
324341 public void addFile (DataFileMeta file ) {
325342 totalFileSize += file .fileSize () + openFileCost ;
326343 bin .add (file );
@@ -333,10 +350,6 @@ private boolean enoughContent() {
333350 private boolean enoughInputFiles () {
334351 return bin .size () >= minFileNum ;
335352 }
336-
337- private boolean containsTooHighDeleteFile () {
338- return bin .stream ().anyMatch (file -> tooHighDeleteRatio (partition , file ));
339- }
340353 }
341354 }
342355
@@ -345,8 +358,7 @@ private class DvMaintainerCache {
345358 private final IndexFileHandler indexFileHandler ;
346359
347360 /** Should be thread safe, ManifestEntryFilter will be invoked in many threads. */
348- private final Map <BinaryRow , UnawareAppendDeletionFileMaintainer > cache =
349- new ConcurrentHashMap <>();
361+ private final Map <BinaryRow , AppendDeleteFileMaintainer > cache = new ConcurrentHashMap <>();
350362
351363 private DvMaintainerCache (IndexFileHandler indexFileHandler ) {
352364 this .indexFileHandler = indexFileHandler ;
@@ -356,12 +368,12 @@ private void refresh() {
356368 this .cache .clear ();
357369 }
358370
359- private UnawareAppendDeletionFileMaintainer dvMaintainer (BinaryRow partition ) {
360- UnawareAppendDeletionFileMaintainer maintainer = cache .get (partition );
371+ private AppendDeleteFileMaintainer dvMaintainer (BinaryRow partition ) {
372+ AppendDeleteFileMaintainer maintainer = cache .get (partition );
361373 if (maintainer == null ) {
362374 synchronized (this ) {
363375 maintainer =
364- AppendDeletionFileMaintainer .forUnawareAppend (
376+ BaseAppendDeleteFileMaintainer .forUnawareAppend (
365377 indexFileHandler , snapshotManager .latestSnapshot (), partition );
366378 }
367379 cache .put (partition , maintainer );
@@ -420,17 +432,10 @@ private void assignNewIterator() {
420432 if (dvMaintainerCache != null ) {
421433 dvMaintainerCache .refresh ();
422434 }
423- Filter <ManifestEntry > entryFilter =
424- entry -> {
425- if (entry .file ().fileSize () < compactionFileSize ) {
426- return true ;
427- }
428-
429- return tooHighDeleteRatio (entry .partition (), entry .file ());
430- };
431435 currentIterator =
432436 snapshotReader
433- .withManifestEntryFilter (entryFilter )
437+ .withManifestEntryFilter (
438+ entry -> shouldCompact (entry .partition (), entry .file ()))
434439 .withSnapshot (snapshot )
435440 .readFileIterator ();
436441 }
@@ -458,6 +463,10 @@ public ManifestEntry next() {
458463 }
459464 }
460465
466+ private boolean shouldCompact (BinaryRow partition , DataFileMeta file ) {
467+ return file .fileSize () < compactionFileSize || tooHighDeleteRatio (partition , file );
468+ }
469+
461470 private boolean tooHighDeleteRatio (BinaryRow partition , DataFileMeta file ) {
462471 if (dvMaintainerCache != null ) {
463472 DeletionFile deletionFile =
@@ -472,7 +481,7 @@ private boolean tooHighDeleteRatio(BinaryRow partition, DataFileMeta file) {
472481 }
473482
474483 @ VisibleForTesting
475- UnawareAppendDeletionFileMaintainer dvMaintainer (BinaryRow partition ) {
484+ AppendDeleteFileMaintainer dvMaintainer (BinaryRow partition ) {
476485 return dvMaintainerCache .dvMaintainer (partition );
477486 }
478487}
0 commit comments