Skip to content

Commit 585540a

Browse files
committed
[core] Introduce 'compaction.total-size-threshold' to do full compaction
1 parent 020ed14 commit 585540a

File tree

10 files changed

+257
-203
lines changed

10 files changed

+257
-203
lines changed

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -733,6 +733,13 @@ public InlineElement getDescription() {
733733
"Implying how often to perform an optimization compaction, this configuration is used to "
734734
+ "ensure the query timeliness of the read-optimized system table.");
735735

736+
public static final ConfigOption<MemorySize> COMPACTION_TOTAL_SIZE_THRESHOLD =
737+
key("compaction.total-size-threshold")
738+
.memoryType()
739+
.noDefaultValue()
740+
.withDescription(
741+
"When total size is smaller than this threshold, force a full compaction.");
742+
736743
public static final ConfigOption<Integer> COMPACTION_MIN_FILE_NUM =
737744
key("compaction.min.file-num")
738745
.intType()
@@ -2364,6 +2371,11 @@ public Duration optimizedCompactionInterval() {
23642371
return options.get(COMPACTION_OPTIMIZATION_INTERVAL);
23652372
}
23662373

2374+
@Nullable
2375+
public MemorySize compactionTotalSizeThreshold() {
2376+
return options.get(COMPACTION_TOTAL_SIZE_THRESHOLD);
2377+
}
2378+
23672379
public int numSortedRunStopTrigger() {
23682380
Integer stopTrigger = options.get(NUM_SORTED_RUNS_STOP_TRIGGER);
23692381
if (stopTrigger == null) {
@@ -2416,9 +2428,12 @@ public int sortedRunSizeRatio() {
24162428
return options.get(COMPACTION_SIZE_RATIO);
24172429
}
24182430

2419-
public OffPeakHours offPeakHours() {
2420-
return OffPeakHours.create(
2421-
options.get(COMPACT_OFFPEAK_START_HOUR), options.get(COMPACT_OFFPEAK_END_HOUR));
2431+
public int compactOffPeakStartHour() {
2432+
return options.get(COMPACT_OFFPEAK_START_HOUR);
2433+
}
2434+
2435+
public int compactOffPeakEndHour() {
2436+
return options.get(COMPACT_OFFPEAK_END_HOUR);
24222437
}
24232438

24242439
public int compactOffPeakRatio() {

paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ForceUpLevel0Compaction.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,24 @@
2121
import org.apache.paimon.compact.CompactUnit;
2222
import org.apache.paimon.mergetree.LevelSortedRun;
2323

24+
import javax.annotation.Nullable;
25+
2426
import java.util.List;
2527
import java.util.Optional;
28+
import java.util.concurrent.atomic.AtomicInteger;
2629

2730
/** A {@link CompactStrategy} to force compacting level 0 files. */
2831
public class ForceUpLevel0Compaction implements CompactStrategy {
2932

3033
private final UniversalCompaction universal;
34+
@Nullable private final Integer maxCompactInterval;
35+
@Nullable private final AtomicInteger compactTriggerCount;
3136

32-
public ForceUpLevel0Compaction(UniversalCompaction universal) {
37+
public ForceUpLevel0Compaction(
38+
UniversalCompaction universal, @Nullable Integer maxCompactInterval) {
3339
this.universal = universal;
40+
this.maxCompactInterval = maxCompactInterval;
41+
this.compactTriggerCount = maxCompactInterval == null ? null : new AtomicInteger(0);
3442
}
3543

3644
@Override
@@ -40,6 +48,26 @@ public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) {
4048
return pick;
4149
}
4250

43-
return universal.forcePickL0(numLevels, runs);
51+
if (maxCompactInterval == null || compactTriggerCount == null) {
52+
return universal.forcePickL0(numLevels, runs);
53+
}
54+
55+
compactTriggerCount.getAndIncrement();
56+
if (compactTriggerCount.compareAndSet(maxCompactInterval, 0)) {
57+
if (LOG.isDebugEnabled()) {
58+
LOG.debug(
59+
"Universal compaction due to max lookup compaction interval {}.",
60+
maxCompactInterval);
61+
}
62+
return universal.forcePickL0(numLevels, runs);
63+
} else {
64+
if (LOG.isDebugEnabled()) {
65+
LOG.debug(
66+
"Skip universal compaction due to lookup compaction trigger count {} is less than the max interval {}.",
67+
compactTriggerCount.get(),
68+
maxCompactInterval);
69+
}
70+
return Optional.empty();
71+
}
4472
}
4573
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.mergetree.compact;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.annotation.VisibleForTesting;
23+
import org.apache.paimon.compact.CompactUnit;
24+
import org.apache.paimon.mergetree.LevelSortedRun;
25+
import org.apache.paimon.options.MemorySize;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import javax.annotation.Nullable;
31+
32+
import java.time.Duration;
33+
import java.util.List;
34+
import java.util.Optional;
35+
36+
/** Trigger full compaction. */
37+
public class FullCompactTrigger {
38+
39+
private static final Logger LOG = LoggerFactory.getLogger(FullCompactTrigger.class);
40+
41+
@Nullable private final Long fullCompactionInterval;
42+
@Nullable private final Long totalSizeThreshold;
43+
44+
@Nullable private Long lastFullCompaction;
45+
46+
public FullCompactTrigger(
47+
@Nullable Long fullCompactionInterval, @Nullable Long totalSizeThreshold) {
48+
this.fullCompactionInterval = fullCompactionInterval;
49+
this.totalSizeThreshold = totalSizeThreshold;
50+
}
51+
52+
@Nullable
53+
public static FullCompactTrigger create(CoreOptions options) {
54+
Duration interval = options.optimizedCompactionInterval();
55+
MemorySize threshold = options.compactionTotalSizeThreshold();
56+
if (interval == null && threshold == null) {
57+
return null;
58+
}
59+
return new FullCompactTrigger(
60+
interval == null ? null : interval.toMillis(),
61+
threshold == null ? null : threshold.getBytes());
62+
}
63+
64+
public Optional<CompactUnit> tryFullCompact(int numLevels, List<LevelSortedRun> runs) {
65+
int maxLevel = numLevels - 1;
66+
if (fullCompactionInterval != null) {
67+
if (lastFullCompaction == null
68+
|| currentTimeMillis() - lastFullCompaction > fullCompactionInterval) {
69+
LOG.debug("Universal compaction due to full compaction interval");
70+
updateLastFullCompaction();
71+
return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
72+
}
73+
}
74+
if (totalSizeThreshold != null) {
75+
long totalSize = 0;
76+
for (LevelSortedRun run : runs) {
77+
totalSize += run.run().totalSize();
78+
}
79+
if (totalSize < totalSizeThreshold) {
80+
return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
81+
}
82+
}
83+
return Optional.empty();
84+
}
85+
86+
public void updateLastFullCompaction() {
87+
lastFullCompaction = currentTimeMillis();
88+
}
89+
90+
@VisibleForTesting
91+
long currentTimeMillis() {
92+
return System.currentTimeMillis();
93+
}
94+
}

paimon-api/src/main/java/org/apache/paimon/OffPeakHours.java renamed to paimon-core/src/main/java/org/apache/paimon/mergetree/compact/OffPeakHours.java

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,55 +16,52 @@
1616
* limitations under the License.
1717
*/
1818

19-
package org.apache.paimon;
19+
package org.apache.paimon.mergetree.compact;
20+
21+
import org.apache.paimon.CoreOptions;
22+
23+
import javax.annotation.Nullable;
2024

2125
import java.time.LocalDateTime;
2226

23-
/** OffPeakHours. */
27+
/** OffPeakHours to control compaction ratio by hours. */
2428
public abstract class OffPeakHours {
2529

26-
public abstract boolean isOffPeak();
27-
2830
public abstract boolean isOffPeak(int targetHour);
2931

30-
public static final OffPeakHours DISABLED =
31-
new OffPeakHours() {
32-
@Override
33-
public boolean isOffPeak() {
34-
return false;
35-
}
32+
public abstract int currentRatio();
3633

37-
@Override
38-
public boolean isOffPeak(int targetHour) {
39-
return false;
40-
}
41-
};
34+
@Nullable
35+
public static OffPeakHours create(CoreOptions options) {
36+
return create(
37+
options.compactOffPeakStartHour(),
38+
options.compactOffPeakEndHour(),
39+
options.compactOffPeakRatio());
40+
}
4241

43-
public static OffPeakHours create(int startHour, int endHour) {
42+
@Nullable
43+
public static OffPeakHours create(int startHour, int endHour, int compactOffPeakRatio) {
4444
if (startHour == -1 && endHour == -1) {
45-
return DISABLED;
45+
return null;
4646
}
4747

4848
if (startHour == endHour) {
49-
return DISABLED;
49+
return null;
5050
}
5151

52-
return new OffPeakHoursImpl(startHour, endHour);
52+
return new OffPeakHoursImpl(startHour, endHour, compactOffPeakRatio);
5353
}
5454

5555
private static class OffPeakHoursImpl extends OffPeakHours {
5656

5757
private final int startHour;
5858
private final int endHour;
59+
private final int compactOffPeakRatio;
5960

60-
OffPeakHoursImpl(int startHour, int endHour) {
61+
OffPeakHoursImpl(int startHour, int endHour, int compactOffPeakRatio) {
6162
this.startHour = startHour;
6263
this.endHour = endHour;
63-
}
64-
65-
@Override
66-
public boolean isOffPeak() {
67-
return isOffPeak(LocalDateTime.now().getHour());
64+
this.compactOffPeakRatio = compactOffPeakRatio;
6865
}
6966

7067
@Override
@@ -74,5 +71,10 @@ public boolean isOffPeak(int targetHour) {
7471
}
7572
return targetHour < endHour || startHour <= targetHour;
7673
}
74+
75+
@Override
76+
public int currentRatio() {
77+
return isOffPeak(LocalDateTime.now().getHour()) ? compactOffPeakRatio : 0;
78+
}
7779
}
7880
}

0 commit comments

Comments
 (0)