Skip to content

Commit 9d81d70

Browse files
committed
[core] Simplify force rewrite files in Compact Task
1 parent 9c9452e commit 9d81d70

File tree

11 files changed

+157
-128
lines changed

11 files changed

+157
-128
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@
183183
<td>Ratio of the deleted rows in a data file to be forced compacted for append-only table.</td>
184184
</tr>
185185
<tr>
186-
<td><h5>compaction.force-compact-all-files</h5></td>
186+
<td><h5>compaction.force-rewrite-all-files</h5></td>
187187
<td style="word-wrap: break-word;">false</td>
188188
<td>Boolean</td>
189189
<td>Whether to force pick all files for a full compaction. Usually seen in a compaction task to external paths.</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,8 @@ public class CoreOptions implements Serializable {
151151
+ ExternalPathStrategy.SPECIFIC_FS
152152
+ ", should be the prefix scheme of the external path, now supported are s3 and oss.");
153153

154-
public static final ConfigOption<Boolean> COMPACTION_FORCE_COMPACT_ALL_FILES =
155-
key("compaction.force-compact-all-files")
154+
public static final ConfigOption<Boolean> COMPACTION_FORCE_REWRITE_ALL_FILES =
155+
key("compaction.force-rewrite-all-files")
156156
.booleanType()
157157
.defaultValue(false)
158158
.withDescription(
@@ -2473,8 +2473,8 @@ public String externalSpecificFS() {
24732473
return options.get(DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS);
24742474
}
24752475

2476-
public Boolean forceCompactAllFiles() {
2477-
return options.get(COMPACTION_FORCE_COMPACT_ALL_FILES);
2476+
public Boolean forceRewriteAllFiles() {
2477+
return options.get(COMPACTION_FORCE_REWRITE_ALL_FILES);
24782478
}
24792479

24802480
public String partitionTimestampFormatter() {

paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class BucketedAppendCompactManager extends CompactFutureManager {
6060
private final PriorityQueue<DataFileMeta> toCompact;
6161
private final int minFileNum;
6262
private final long targetFileSize;
63-
private final boolean forceCompactAllFiles;
63+
private final boolean forceRewriteAllFiles;
6464
private final CompactRewriter rewriter;
6565

6666
private List<DataFileMeta> compacting;
@@ -73,7 +73,7 @@ public BucketedAppendCompactManager(
7373
@Nullable DeletionVectorsMaintainer dvMaintainer,
7474
int minFileNum,
7575
long targetFileSize,
76-
boolean forceCompactAllFiles,
76+
boolean forceRewriteAllFiles,
7777
CompactRewriter rewriter,
7878
@Nullable CompactionMetrics.Reporter metricsReporter) {
7979
this.executor = executor;
@@ -82,7 +82,7 @@ public BucketedAppendCompactManager(
8282
this.toCompact.addAll(restored);
8383
this.minFileNum = minFileNum;
8484
this.targetFileSize = targetFileSize;
85-
this.forceCompactAllFiles = forceCompactAllFiles;
85+
this.forceRewriteAllFiles = forceRewriteAllFiles;
8686
this.rewriter = rewriter;
8787
this.metricsReporter = metricsReporter;
8888
}
@@ -102,7 +102,7 @@ private void triggerFullCompaction() {
102102
"A compaction task is still running while the user "
103103
+ "forces a new compaction. This is unexpected.");
104104
// if all files are force picked or deletion vector enables, always trigger compaction.
105-
if (!forceCompactAllFiles
105+
if (!forceRewriteAllFiles
106106
&& (toCompact.isEmpty()
107107
|| (dvMaintainer == null && toCompact.size() < FULL_COMPACT_MIN_FILE))) {
108108
return;
@@ -118,7 +118,7 @@ private void triggerFullCompaction() {
118118
dvMaintainer,
119119
toCompact,
120120
targetFileSize,
121-
forceCompactAllFiles,
121+
forceRewriteAllFiles,
122122
rewriter,
123123
metricsReporter));
124124
recordCompactionsQueuedRequest();
@@ -243,28 +243,28 @@ public static class FullCompactTask extends CompactTask {
243243
private final DeletionVectorsMaintainer dvMaintainer;
244244
private final LinkedList<DataFileMeta> toCompact;
245245
private final long targetFileSize;
246-
private final boolean forceCompactAllFiles;
246+
private final boolean forceRewriteAllFiles;
247247
private final CompactRewriter rewriter;
248248

249249
public FullCompactTask(
250250
DeletionVectorsMaintainer dvMaintainer,
251251
Collection<DataFileMeta> inputs,
252252
long targetFileSize,
253-
boolean forceCompactAllFiles,
253+
boolean forceRewriteAllFiles,
254254
CompactRewriter rewriter,
255255
@Nullable CompactionMetrics.Reporter metricsReporter) {
256256
super(metricsReporter);
257257
this.dvMaintainer = dvMaintainer;
258258
this.toCompact = new LinkedList<>(inputs);
259259
this.targetFileSize = targetFileSize;
260-
this.forceCompactAllFiles = forceCompactAllFiles;
260+
this.forceRewriteAllFiles = forceRewriteAllFiles;
261261
this.rewriter = rewriter;
262262
}
263263

264264
@Override
265265
protected CompactResult doCompact() throws Exception {
266266
// remove large files
267-
while (!forceCompactAllFiles && !toCompact.isEmpty()) {
267+
while (!forceRewriteAllFiles && !toCompact.isEmpty()) {
268268
DataFileMeta file = toCompact.peekFirst();
269269
// the data file with deletion file always need to be compacted.
270270
if (file.fileSize() >= targetFileSize && !hasDeletionFile(file)) {
@@ -289,7 +289,7 @@ protected CompactResult doCompact() throws Exception {
289289
small++;
290290
}
291291
}
292-
if (forceCompactAllFiles
292+
if (forceRewriteAllFiles
293293
|| (small > big && toCompact.size() >= FULL_COMPACT_MIN_FILE)) {
294294
return compact(null, toCompact, rewriter);
295295
} else {

paimon-core/src/main/java/org/apache/paimon/compact/CompactUnit.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,31 +25,40 @@
2525
import java.util.List;
2626

2727
/** A files unit for compaction. */
28-
public interface CompactUnit {
28+
public class CompactUnit {
2929

30-
int outputLevel();
30+
private final int outputLevel;
31+
private final List<DataFileMeta> files;
32+
private final boolean fileRewrite;
3133

32-
List<DataFileMeta> files();
34+
public CompactUnit(int outputLevel, List<DataFileMeta> files, boolean fileRewrite) {
35+
this.outputLevel = outputLevel;
36+
this.files = files;
37+
this.fileRewrite = fileRewrite;
38+
}
39+
40+
public int outputLevel() {
41+
return outputLevel;
42+
}
43+
44+
public List<DataFileMeta> files() {
45+
return files;
46+
}
47+
48+
public boolean fileRewrite() {
49+
return fileRewrite;
50+
}
3351

34-
static CompactUnit fromLevelRuns(int outputLevel, List<LevelSortedRun> runs) {
52+
public static CompactUnit fromLevelRuns(int outputLevel, List<LevelSortedRun> runs) {
3553
List<DataFileMeta> files = new ArrayList<>();
3654
for (LevelSortedRun run : runs) {
3755
files.addAll(run.run().files());
3856
}
39-
return fromFiles(outputLevel, files);
57+
return fromFiles(outputLevel, files, false);
4058
}
4159

42-
static CompactUnit fromFiles(int outputLevel, List<DataFileMeta> files) {
43-
return new CompactUnit() {
44-
@Override
45-
public int outputLevel() {
46-
return outputLevel;
47-
}
48-
49-
@Override
50-
public List<DataFileMeta> files() {
51-
return files;
52-
}
53-
};
60+
public static CompactUnit fromFiles(
61+
int outputLevel, List<DataFileMeta> files, boolean fileRewrite) {
62+
return new CompactUnit(outputLevel, files, fileRewrite);
5463
}
5564
}

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.ArrayList;
3333
import java.util.List;
3434
import java.util.Optional;
35-
import java.util.stream.Collectors;
3635

3736
/** Compact strategy to decide which files to select for compaction. */
3837
public interface CompactStrategy {
@@ -56,16 +55,19 @@ static Optional<CompactUnit> pickFullCompaction(
5655
List<LevelSortedRun> runs,
5756
@Nullable RecordLevelExpire recordLevelExpire,
5857
@Nullable DeletionVectorsMaintainer dvMaintainer,
59-
boolean forceCompactAllFiles) {
58+
boolean forceRewriteAllFiles) {
6059
int maxLevel = numLevels - 1;
6160
if (runs.isEmpty()) {
6261
// no sorted run, no need to compact
6362
return Optional.empty();
64-
} else if ((runs.size() == 1 && runs.get(0).level() == maxLevel)) {
63+
}
64+
65+
// only max level files
66+
if ((runs.size() == 1 && runs.get(0).level() == maxLevel)) {
6567
List<DataFileMeta> filesToBeCompacted = new ArrayList<>();
6668

6769
for (DataFileMeta file : runs.get(0).run().files()) {
68-
if (forceCompactAllFiles) {
70+
if (forceRewriteAllFiles) {
6971
// add all files when force compacted
7072
filesToBeCompacted.add(file);
7173
} else if (recordLevelExpire != null && recordLevelExpire.isExpireFile(file)) {
@@ -78,27 +80,14 @@ static Optional<CompactUnit> pickFullCompaction(
7880
}
7981
}
8082

81-
if (LOG.isDebugEnabled()) {
82-
LOG.debug(
83-
"Pick these files which have expired records or dv index for full compaction: {}",
84-
filesToBeCompacted.stream()
85-
.map(
86-
file ->
87-
String.format(
88-
"(%s, %d, %d)",
89-
file.fileName(),
90-
file.level(),
91-
file.fileSize()))
92-
.collect(Collectors.joining(", ")));
93-
}
94-
95-
if (!filesToBeCompacted.isEmpty()) {
96-
return Optional.of(CompactUnit.fromFiles(maxLevel, filesToBeCompacted));
97-
} else {
83+
if (filesToBeCompacted.isEmpty()) {
9884
return Optional.empty();
9985
}
100-
} else {
101-
return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
86+
87+
return Optional.of(CompactUnit.fromFiles(maxLevel, filesToBeCompacted, true));
10288
}
89+
90+
// full compaction
91+
return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
10392
}
10493
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.mergetree.compact;
20+
21+
import org.apache.paimon.compact.CompactResult;
22+
import org.apache.paimon.compact.CompactTask;
23+
import org.apache.paimon.compact.CompactUnit;
24+
import org.apache.paimon.io.DataFileMeta;
25+
import org.apache.paimon.mergetree.SortedRun;
26+
import org.apache.paimon.operation.metrics.CompactionMetrics;
27+
28+
import javax.annotation.Nullable;
29+
30+
import java.util.List;
31+
32+
import static java.util.Collections.singletonList;
33+
34+
/** Compact task for file rewrite compaction. */
35+
public class FileRewriteCompactTask extends CompactTask {
36+
37+
private final CompactRewriter rewriter;
38+
private final int outputLevel;
39+
private final List<DataFileMeta> files;
40+
private final boolean dropDelete;
41+
42+
public FileRewriteCompactTask(
43+
CompactRewriter rewriter,
44+
CompactUnit unit,
45+
boolean dropDelete,
46+
@Nullable CompactionMetrics.Reporter metricsReporter) {
47+
super(metricsReporter);
48+
this.rewriter = rewriter;
49+
this.outputLevel = unit.outputLevel();
50+
this.files = unit.files();
51+
this.dropDelete = dropDelete;
52+
}
53+
54+
@Override
55+
protected CompactResult doCompact() throws Exception {
56+
CompactResult result = new CompactResult();
57+
for (DataFileMeta file : files) {
58+
rewriteFile(file, result);
59+
}
60+
return result;
61+
}
62+
63+
private void rewriteFile(DataFileMeta file, CompactResult toUpdate) throws Exception {
64+
List<List<SortedRun>> candidate = singletonList(singletonList(SortedRun.fromSingle(file)));
65+
toUpdate.merge(rewriter.rewrite(outputLevel, dropDelete, candidate));
66+
}
67+
}

0 commit comments

Comments
 (0)