Skip to content

Commit acb2bb9

Browse files
committed
[core] Introduce 'compaction.delete-ratio-threshold' to reduce compaction
1 parent 6d8f341 commit acb2bb9

File tree

6 files changed

+86
-40
lines changed

6 files changed

+86
-40
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,12 @@
164164
<td>String</td>
165165
<td>Specifies the commit user prefix.</td>
166166
</tr>
167+
<tr>
168+
<td><h5>compaction.delete-ratio-threshold</h5></td>
169+
<td style="word-wrap: break-word;">0.1</td>
170+
<td>Double</td>
171+
<td>Ratio of the deleted rows in a data file to be forced compacted for append-only table.</td>
172+
</tr>
167173
<tr>
168174
<td><h5>compaction.force-up-level-0</h5></td>
169175
<td style="word-wrap: break-word;">false</td>

paimon-common/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -636,6 +636,14 @@ public class CoreOptions implements Serializable {
636636
"For file set [f_0,...,f_N], the minimum file number to trigger a compaction for "
637637
+ "append-only table.");
638638

639+
public static final ConfigOption<Double> COMPACTION_DELETE_RATIO_THRESHOLD =
640+
key("compaction.delete-ratio-threshold")
641+
.doubleType()
642+
.defaultValue(0.1)
643+
.withDescription(
644+
"Ratio of the deleted rows in a data file to be forced compacted for "
645+
+ "append-only table.");
646+
639647
public static final ConfigOption<ChangelogProducer> CHANGELOG_PRODUCER =
640648
key("changelog-producer")
641649
.enumType(ChangelogProducer.class)
@@ -2195,6 +2203,10 @@ public int compactionMinFileNum() {
21952203
return options.get(COMPACTION_MIN_FILE_NUM);
21962204
}
21972205

2206+
public double compactionDeleteRatioThreshold() {
2207+
return options.get(COMPACTION_DELETE_RATIO_THRESHOLD);
2208+
}
2209+
21982210
public long dynamicBucketTargetRowNum() {
21992211
return options.get(DYNAMIC_BUCKET_TARGET_ROW_NUM);
22002212
}

paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java

Lines changed: 40 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@
2525
import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
2626
import org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintainer;
2727
import org.apache.paimon.index.IndexFileHandler;
28-
import org.apache.paimon.index.IndexFileMeta;
2928
import org.apache.paimon.io.DataFileMeta;
3029
import org.apache.paimon.manifest.FileKind;
3130
import org.apache.paimon.manifest.ManifestEntry;
3231
import org.apache.paimon.predicate.Predicate;
3332
import org.apache.paimon.table.FileStoreTable;
33+
import org.apache.paimon.table.source.DeletionFile;
3434
import org.apache.paimon.table.source.EndOfScanException;
3535
import org.apache.paimon.table.source.ScanMode;
3636
import org.apache.paimon.table.source.snapshot.SnapshotReader;
@@ -78,6 +78,7 @@ public class UnawareAppendTableCompactionCoordinator {
7878
private final SnapshotManager snapshotManager;
7979
private final long targetFileSize;
8080
private final long compactionFileSize;
81+
private final double deleteThreshold;
8182
private final long openFileCost;
8283
private final int minFileNum;
8384
private final DvMaintainerCache dvMaintainerCache;
@@ -86,10 +87,6 @@ public class UnawareAppendTableCompactionCoordinator {
8687
final Map<BinaryRow, PartitionCompactCoordinator> partitionCompactCoordinators =
8788
new HashMap<>();
8889

89-
public UnawareAppendTableCompactionCoordinator(FileStoreTable table) {
90-
this(table, true);
91-
}
92-
9390
public UnawareAppendTableCompactionCoordinator(FileStoreTable table, boolean isStreaming) {
9491
this(table, isStreaming, null);
9592
}
@@ -101,6 +98,7 @@ public UnawareAppendTableCompactionCoordinator(
10198
CoreOptions options = table.coreOptions();
10299
this.targetFileSize = options.targetFileSize(false);
103100
this.compactionFileSize = options.compactionFileSize(false);
101+
this.deleteThreshold = options.compactionDeleteRatioThreshold();
104102
this.openFileCost = options.splitOpenFileCost();
105103
this.minFileNum = options.compactionMinFileNum();
106104
this.dvMaintainerCache =
@@ -274,13 +272,13 @@ private List<List<DataFileMeta>> pack(Set<DataFileMeta> toCompact) {
274272
FileBin fileBin = new FileBin();
275273
for (DataFileMeta fileMeta : files) {
276274
fileBin.addFile(fileMeta);
277-
if (fileBin.binFull()) {
275+
if (fileBin.enoughContent()) {
278276
result.add(new ArrayList<>(fileBin.bin));
279-
// remove it from coordinator memory, won't join in compaction again
280277
fileBin.reset();
281278
}
282279
}
283-
if (fileBin.fileNum >= minFileNum) {
280+
281+
if (fileBin.enoughInputFiles() || fileBin.containsTooHighDeleteFile()) {
284282
result.add(new ArrayList<>(fileBin.bin));
285283
fileBin.reset();
286284
}
@@ -289,11 +287,13 @@ private List<List<DataFileMeta>> pack(Set<DataFileMeta> toCompact) {
289287

290288
private List<List<DataFileMeta>> packInDeletionVectorVMode(Set<DataFileMeta> toCompact) {
291289
// we group the data files by their related index files.
292-
Map<IndexFileMeta, List<DataFileMeta>> filesWithDV = new HashMap<>();
290+
Map<String, List<DataFileMeta>> filesWithDV = new HashMap<>();
293291
Set<DataFileMeta> rest = new HashSet<>();
294292
for (DataFileMeta dataFile : toCompact) {
295-
IndexFileMeta indexFile =
296-
dvMaintainerCache.dvMaintainer(partition).getIndexFile(dataFile.fileName());
293+
String indexFile =
294+
dvMaintainerCache
295+
.dvMaintainer(partition)
296+
.getIndexFilePath(dataFile.fileName());
297297
if (indexFile == null) {
298298
rest.add(dataFile);
299299
} else {
@@ -314,23 +314,28 @@ private List<List<DataFileMeta>> packInDeletionVectorVMode(Set<DataFileMeta> toC
314314
private class FileBin {
315315
List<DataFileMeta> bin = new ArrayList<>();
316316
long totalFileSize = 0;
317-
int fileNum = 0;
318317

319318
public void reset() {
320319
bin.forEach(toCompact::remove);
321320
bin.clear();
322321
totalFileSize = 0;
323-
fileNum = 0;
324322
}
325323

326324
public void addFile(DataFileMeta file) {
327325
totalFileSize += file.fileSize() + openFileCost;
328-
fileNum++;
329326
bin.add(file);
330327
}
331328

332-
public boolean binFull() {
333-
return totalFileSize >= targetFileSize * 50 && fileNum >= minFileNum;
329+
private boolean enoughContent() {
330+
return bin.size() > 1 && totalFileSize >= targetFileSize * 50;
331+
}
332+
333+
private boolean enoughInputFiles() {
334+
return bin.size() >= minFileNum;
335+
}
336+
337+
private boolean containsTooHighDeleteFile() {
338+
return bin.stream().anyMatch(file -> tooHighDeleteRatio(partition, file));
334339
}
335340
}
336341
}
@@ -421,12 +426,7 @@ private void assignNewIterator() {
421426
return true;
422427
}
423428

424-
if (dvMaintainerCache != null) {
425-
return dvMaintainerCache
426-
.dvMaintainer(entry.partition())
427-
.hasDeletionFile(entry.fileName());
428-
}
429-
return false;
429+
return tooHighDeleteRatio(entry.partition(), entry.file());
430430
};
431431
currentIterator =
432432
snapshotReader
@@ -457,4 +457,22 @@ public ManifestEntry next() {
457457
}
458458
}
459459
}
460+
461+
private boolean tooHighDeleteRatio(BinaryRow partition, DataFileMeta file) {
462+
if (dvMaintainerCache != null) {
463+
DeletionFile deletionFile =
464+
dvMaintainerCache.dvMaintainer(partition).getDeletionFile(file.fileName());
465+
if (deletionFile != null) {
466+
Long cardinality = deletionFile.cardinality();
467+
long rowCount = file.rowCount();
468+
return cardinality == null || cardinality > rowCount * deleteThreshold;
469+
}
470+
}
471+
return false;
472+
}
473+
474+
@VisibleForTesting
475+
UnawareAppendDeletionFileMaintainer dvMaintainer(BinaryRow partition) {
476+
return dvMaintainerCache.dvMaintainer(partition);
477+
}
460478
}

paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public class UnawareAppendDeletionFileMaintainer implements AppendDeletionFileMa
6262
Map<String, DeletionFile> deletionFiles) {
6363
this.indexFileHandler = indexFileHandler;
6464
this.partition = partition;
65-
this.dataFileToDeletionFile = deletionFiles;
65+
this.dataFileToDeletionFile = new HashMap<>(deletionFiles);
6666
// the deletion of data files is independent
6767
// just create an empty maintainer
6868
this.maintainer = new DeletionVectorsMaintainer.Factory(indexFileHandler).create();
@@ -100,14 +100,14 @@ public int getBucket() {
100100
return UNAWARE_BUCKET;
101101
}
102102

103-
public boolean hasDeletionFile(String dataFile) {
104-
return this.dataFileToDeletionFile.containsKey(dataFile);
105-
}
106-
107103
public DeletionFile getDeletionFile(String dataFile) {
108104
return this.dataFileToDeletionFile.get(dataFile);
109105
}
110106

107+
public void putDeletionFile(String dataFile, DeletionFile deletionFile) {
108+
this.dataFileToDeletionFile.put(dataFile, deletionFile);
109+
}
110+
111111
public DeletionVector getDeletionVector(String dataFile) {
112112
DeletionFile deletionFile = getDeletionFile(dataFile);
113113
if (deletionFile != null) {
@@ -151,15 +151,9 @@ public List<IndexManifestEntry> persist() {
151151
return result;
152152
}
153153

154-
public IndexFileMeta getIndexFile(String dataFile) {
154+
public String getIndexFilePath(String dataFile) {
155155
DeletionFile deletionFile = getDeletionFile(dataFile);
156-
if (deletionFile == null) {
157-
return null;
158-
} else {
159-
IndexManifestEntry entry =
160-
this.indexNameToEntry.get(new Path(deletionFile.path()).getName());
161-
return entry == null ? null : entry.indexFile();
162-
}
156+
return deletionFile == null ? null : deletionFile.path();
163157
}
164158

165159
@VisibleForTesting

paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ private void recreate() {
8181
new org.apache.paimon.fs.Path(tempDir.toString()),
8282
tableSchema);
8383
compactionCoordinator =
84-
new UnawareAppendTableCompactionCoordinator(appendOnlyFileStoreTable);
84+
new UnawareAppendTableCompactionCoordinator(appendOnlyFileStoreTable, true);
8585
write = (AppendOnlyFileStoreWrite) appendOnlyFileStoreTable.store().newWrite(commitUser);
8686
}
8787

paimon-core/src/test/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinatorTest.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
package org.apache.paimon.append;
2020

21-
import org.apache.paimon.CoreOptions;
2221
import org.apache.paimon.data.BinaryRow;
22+
import org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintainer;
2323
import org.apache.paimon.fs.FileIO;
2424
import org.apache.paimon.fs.local.LocalFileIO;
2525
import org.apache.paimon.io.DataFileMeta;
@@ -29,6 +29,7 @@
2929
import org.apache.paimon.schema.TableSchema;
3030
import org.apache.paimon.table.FileStoreTable;
3131
import org.apache.paimon.table.FileStoreTableFactory;
32+
import org.apache.paimon.table.source.DeletionFile;
3233
import org.apache.paimon.table.source.EndOfScanException;
3334
import org.apache.paimon.types.DataTypes;
3435

@@ -42,6 +43,8 @@
4243
import java.util.List;
4344
import java.util.UUID;
4445

46+
import static org.apache.paimon.CoreOptions.COMPACTION_MIN_FILE_NUM;
47+
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
4548
import static org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row;
4649
import static org.apache.paimon.stats.StatsTestUtils.newSimpleStats;
4750
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
@@ -172,6 +175,18 @@ public void testBatchScanEmptyTable() {
172175
.satisfies(anyCauseMatches(EndOfScanException.class));
173176
}
174177

178+
@Test
179+
public void testDeleteRatio() {
180+
List<DataFileMeta> files =
181+
generateNewFiles(
182+
2, appendOnlyFileStoreTable.coreOptions().targetFileSize(false) / 3 + 1);
183+
UnawareAppendDeletionFileMaintainer dvMaintainer =
184+
compactionCoordinator.dvMaintainer(partition);
185+
DeletionFile deletionFile = new DeletionFile("", 0, 0, 11L);
186+
dvMaintainer.putDeletionFile(files.get(0).fileName(), deletionFile);
187+
assertTasks(files, 1);
188+
}
189+
175190
private void assertTasks(List<DataFileMeta> files, int taskNum) {
176191
compactionCoordinator.notifyNewFiles(partition, files);
177192
List<UnawareAppendCompactionTask> tasks = compactionCoordinator.compactPlan();
@@ -184,7 +199,8 @@ private static Schema schema() {
184199
schemaBuilder.column("f1", DataTypes.STRING());
185200
schemaBuilder.column("f2", DataTypes.STRING());
186201
schemaBuilder.column("f3", DataTypes.STRING());
187-
schemaBuilder.option(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "3");
202+
schemaBuilder.option(COMPACTION_MIN_FILE_NUM.key(), "3");
203+
schemaBuilder.option(DELETION_VECTORS_ENABLED.key(), "true");
188204
return schemaBuilder.build();
189205
}
190206

@@ -199,7 +215,7 @@ public void createCoordinator() throws Exception {
199215
FileStoreTableFactory.create(
200216
fileIO, new org.apache.paimon.fs.Path(tempDir.toString()), tableSchema);
201217
compactionCoordinator =
202-
new UnawareAppendTableCompactionCoordinator(appendOnlyFileStoreTable);
218+
new UnawareAppendTableCompactionCoordinator(appendOnlyFileStoreTable, true);
203219
partition = BinaryRow.EMPTY_ROW;
204220
}
205221

@@ -215,7 +231,7 @@ private DataFileMeta newFile(long fileSize) {
215231
return new DataFileMeta(
216232
UUID.randomUUID().toString(),
217233
fileSize,
218-
1,
234+
100,
219235
row(0),
220236
row(0),
221237
newSimpleStats(0, 1),

0 commit comments

Comments
 (0)