Skip to content

Support pass threshold parameters #3778

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,15 +288,22 @@ public void enableForceGC() {
if (forceGarbageCollection.compareAndSet(false, true)) {
LOG.info("Forced garbage collection triggered by thread: {}", Thread.currentThread().getName());
triggerGC(true, suspendMajorCompaction.get(),
suspendMinorCompaction.get());
suspendMinorCompaction.get(), majorCompactionThreshold, minorCompactionThreshold,
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
}
}

public void enableForceGC(boolean forceMajor, boolean forceMinor) {
public void enableForceGC(boolean forceMajor, boolean forceMinor,
double majorCompactionThreshold, double minorCompactionThreshold,
long majorCompactionMaxTimeMillis, long minorCompactionMaxTimeMillis) {
if (forceGarbageCollection.compareAndSet(false, true)) {
LOG.info("Forced garbage collection triggered by thread: {}, forceMajor: {}, forceMinor: {}",
Thread.currentThread().getName(), forceMajor, forceMinor);
triggerGC(true, !forceMajor, !forceMinor);
LOG.info("Forced garbage collection triggered by thread: {}, forceMajor: {}, forceMinor: {}, "
+ "majorCompactionThreshold :{}, minorCompactionThreshold: {}, "
+ "majorCompactionMaxTimeMillis: {}, minorCompactionMaxTimeMillis: {}",
Thread.currentThread().getName(), forceMajor, forceMinor, majorCompactionThreshold,
minorCompactionThreshold, majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
triggerGC(true, !forceMajor, !forceMinor, majorCompactionThreshold, minorCompactionThreshold,
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
}
}

Expand All @@ -309,9 +316,14 @@ public void disableForceGC() {

Future<?> triggerGC(final boolean force,
final boolean suspendMajor,
final boolean suspendMinor) {
final boolean suspendMinor,
final double majorCompactionThreshold,
final double minorCompactionThreshold,
final long majorCompactionMaxTimeMillis,
final long minorCompactionMaxTimeMillis) {
return gcExecutor.submit(() -> {
runWithFlags(force, suspendMajor, suspendMinor);
runWithFlags(force, suspendMajor, suspendMinor, majorCompactionThreshold, minorCompactionThreshold,
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
});
}

Expand All @@ -321,7 +333,8 @@ Future<?> triggerGC() {
final boolean suspendMinor = suspendMinorCompaction.get();

return gcExecutor.submit(() -> {
runWithFlags(force, suspendMajor, suspendMinor);
runWithFlags(force, suspendMajor, suspendMinor, majorCompactionThreshold, minorCompactionThreshold,
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
});
}

Expand Down Expand Up @@ -391,7 +404,8 @@ public void run() {
boolean suspendMajor = suspendMajorCompaction.get();
boolean suspendMinor = suspendMinorCompaction.get();

runWithFlags(force, suspendMajor, suspendMinor);
runWithFlags(force, suspendMajor, suspendMinor, majorCompactionThreshold, minorCompactionThreshold,
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);

if (force) {
// only set force to false if it had been true when the garbage
Expand All @@ -400,7 +414,9 @@ public void run() {
}
}

public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMinor) {
public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMinor,
double majorCompactionThreshold, double minorCompactionThreshold,
long majorCompactionMaxTimeMillis, long minorCompactionMaxTimeMillis) {
long threadStart = MathUtils.nowInNano();
if (force) {
LOG.info("Garbage collector thread forced to perform GC before expiry of wait time.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,11 @@ public void forceGC() {
}

@Override
public void forceGC(boolean forceMajor, boolean forceMinor) {
gcThread.enableForceGC(forceMajor, forceMinor);
public void forceGC(boolean forceMajor, boolean forceMinor,
double majorCompactionThreshold, double minorCompactionThreshold,
long majorCompactionMaxTimeMillis, long minorCompactionMaxTimeMillis) {
gcThread.enableForceGC(forceMajor, forceMinor, majorCompactionThreshold, minorCompactionThreshold,
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ default void forceGC() {
/**
* Force trigger Garbage Collection with forceMajor or forceMinor parameter.
*/
default void forceGC(boolean forceMajor, boolean forceMinor) {
default void forceGC(boolean forceMajor, boolean forceMinor,
double majorCompactionThreshold, double minorCompactionThreshold,
long majorCompactionMaxTimeMillis, long minorCompactionMaxTimeMillis) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,11 @@ public void forceGC() {
}

@Override
public void forceGC(boolean forceMajor, boolean forceMinor) {
interleavedLedgerStorage.forceGC(forceMajor, forceMinor);
public void forceGC(boolean forceMajor, boolean forceMinor,
double majorCompactionThreshold, double minorCompactionThreshold,
long majorCompactionMaxTimeMillis, long minorCompactionMaxTimeMillis) {
interleavedLedgerStorage.forceGC(forceMajor, forceMinor, majorCompactionThreshold, minorCompactionThreshold,
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,12 @@ public void forceGC() {
}

@Override
public void forceGC(boolean forceMajor, boolean forceMinor) {
ledgerStorageList.stream().forEach(s -> s.forceGC(forceMajor, forceMinor));
public void forceGC(boolean forceMajor, boolean forceMinor,
double majorCompactionThreshold, double minorCompactionThreshold,
long majorCompactionMaxTimeMillis, long minorCompactionMaxTimeMillis) {
ledgerStorageList.stream().forEach(s -> s.forceGC(forceMajor, forceMinor,
majorCompactionThreshold, minorCompactionThreshold,
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,11 @@ public void forceGC() {
}

@Override
public void forceGC(boolean forceMajor, boolean forceMinor) {
gcThread.enableForceGC(forceMajor, forceMinor);
public void forceGC(boolean forceMajor, boolean forceMinor,
double majorCompactionThreshold, double minorCompactionThreshold,
long majorCompactionMaxTimeMillis, long minorCompactionMaxTimeMillis) {
gcThread.enableForceGC(forceMajor, forceMinor, majorCompactionThreshold, minorCompactionThreshold,
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,29 @@ public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {

forceMajor = Boolean.parseBoolean(configMap.getOrDefault("forceMajor", forceMajor).toString());
forceMinor = Boolean.parseBoolean(configMap.getOrDefault("forceMinor", forceMinor).toString());
ledgerStorage.forceGC(forceMajor, forceMinor);

double majorCompactionThreshold = (double) configMap.getOrDefault("majorCompactionThreshold",
conf.getMajorCompactionThreshold());
double minorCompactionThreshold = (double) configMap.getOrDefault("minorCompactionThreshold",
conf.getMinorCompactionThreshold());
long majorCompactionMaxTimeMillis = configMap.get(
"majorCompactionMaxTimeMillis") == null ? conf.getMajorCompactionMaxTimeMillis()
: (long) (int) configMap.get("majorCompactionMaxTimeMillis");
long minorCompactionMaxTimeMillis = configMap.get(
"minorCompactionMaxTimeMillis") == null ? conf.getMinorCompactionMaxTimeMillis()
: (long) (int) configMap.get("majorCompactionMaxTimeMillis");

if (majorCompactionThreshold > 1.0f || majorCompactionThreshold < 0
|| minorCompactionThreshold > 1.0f || minorCompactionThreshold < 0
|| minorCompactionThreshold >= majorCompactionThreshold) {
response.setCode(HttpServer.StatusCode.BAD_REQUEST);
response.setBody("Bad request parameters");
return response;
}

ledgerStorage.forceGC(forceMajor, forceMinor,
majorCompactionThreshold, minorCompactionThreshold,
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
}

String output = "Triggered GC on BookieServer: " + bookieServer.getBookieId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ public void testForceGarbageCollectionWhenDiskIsFull(boolean isForceCompactionAl
// disable forceMajor and forceMinor
majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
getGCThread().triggerGC(true, true, true).get();
getGCThread().triggerGC(true, true, true, majorCompactionThreshold, minorCompactionThreshold,
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
assertEquals(majorCompactionCntBeforeGC, majorCompactionCntAfterGC);
Expand All @@ -371,7 +372,8 @@ public void testForceGarbageCollectionWhenDiskIsFull(boolean isForceCompactionAl
// enable forceMajor and forceMinor
majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
getGCThread().triggerGC(true, false, false).get();
getGCThread().triggerGC(true, false, false, majorCompactionThreshold, minorCompactionThreshold,
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
assertEquals(majorCompactionCntBeforeGC + 1, majorCompactionCntAfterGC);
Expand All @@ -380,7 +382,8 @@ public void testForceGarbageCollectionWhenDiskIsFull(boolean isForceCompactionAl
// enable forceMajor and disable forceMinor
majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
getGCThread().triggerGC(true, false, true).get();
getGCThread().triggerGC(true, false, true, majorCompactionThreshold, minorCompactionThreshold,
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
assertEquals(majorCompactionCntBeforeGC + 1, majorCompactionCntAfterGC);
Expand All @@ -389,7 +392,8 @@ public void testForceGarbageCollectionWhenDiskIsFull(boolean isForceCompactionAl
// disable forceMajor and enable forceMinor
majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
getGCThread().triggerGC(true, true, false).get();
getGCThread().triggerGC(true, true, false, majorCompactionThreshold, minorCompactionThreshold,
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
assertEquals(majorCompactionCntBeforeGC, majorCompactionCntAfterGC);
Expand Down Expand Up @@ -721,7 +725,8 @@ public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws Exception {
}

LOG.info("Finished deleting the ledgers contains most entries.");
getGCThread().triggerGC(true, false, false).get();
getGCThread().triggerGC(true, false, false, majorCompactionThreshold, minorCompactionThreshold,
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();

assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime);
assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime);
Expand All @@ -740,17 +745,20 @@ public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws Exception {
// Now, let's mark E1 as flushed, as its ledger L1 has been deleted already. In this case, the GC algorithm
// should consider it for deletion.
((DefaultEntryLogger) getGCThread().entryLogger).recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(1L);
getGCThread().triggerGC(true, false, false).get();
getGCThread().triggerGC(true, false, false, majorCompactionThreshold, minorCompactionThreshold,
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
assertTrue("Found entry log file 1.log that should have been compacted in ledgerDirectory: "
+ tmpDirs.getDirs().get(0), TestUtils.hasNoneLogFiles(tmpDirs.getDirs().get(0), 1));

// Once removed the ledger L0, then deleting E0 is fine (only if it has been flushed).
bkc.deleteLedger(lhs[0].getId());
getGCThread().triggerGC(true, false, false).get();
getGCThread().triggerGC(true, false, false, majorCompactionThreshold, minorCompactionThreshold,
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
assertTrue("Found entry log file 0.log that should not have been compacted in ledgerDirectory: "
+ tmpDirs.getDirs().get(0), TestUtils.hasAllLogFiles(tmpDirs.getDirs().get(0), 0));
((DefaultEntryLogger) getGCThread().entryLogger).recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(0L);
getGCThread().triggerGC(true, false, false).get();
getGCThread().triggerGC(true, false, false, majorCompactionThreshold, minorCompactionThreshold,
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
assertTrue("Found entry log file 0.log that should have been compacted in ledgerDirectory: "
+ tmpDirs.getDirs().get(0), TestUtils.hasNoneLogFiles(tmpDirs.getDirs().get(0), 0));
}
Expand Down Expand Up @@ -868,7 +876,8 @@ public void testMinorCompactionWithNoWritableLedgerDirsButIsForceGCAllowWhenNoSp
bkc.deleteLedger(lhs[2].getId());

LOG.info("Finished deleting the ledgers contains most entries.");
getGCThread().triggerGC(true, false, false).get();
getGCThread().triggerGC(true, false, false, majorCompactionThreshold, minorCompactionThreshold,
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();

// after garbage collection, major compaction should not be executed
assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime);
Expand Down Expand Up @@ -1407,7 +1416,8 @@ public void testCancelledCompactionWhenShuttingDown() throws Exception {
bkc.deleteLedger(lhs[2].getId());
LOG.info("Finished deleting the ledgers contains most entries.");

getGCThread().triggerGC(true, false, false);
getGCThread().triggerGC(true, false, false, majorCompactionThreshold, minorCompactionThreshold,
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis());
getGCThread().throttler.cancelledAcquire();
waitUntilTrue(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,9 @@ public void testCompactionWithFileSizeCheck() throws Exception {

storage.deleteLedger(1);
// only logId 1 will be compacted.
gcThread.runWithFlags(true, true, false);
gcThread.runWithFlags(true, true, false, conf.getMajorCompactionThreshold(),
conf.getMinorCompactionThreshold(), conf.getMajorCompactionMaxTimeMillis(),
conf.getMinorCompactionMaxTimeMillis());

// logId1 and logId2 should be compacted
assertFalse(entryLogger.logExists(logId1));
Expand Down Expand Up @@ -362,7 +364,9 @@ public void testCompactionWithoutFileSizeCheck() throws Exception {
assertTrue(entryLogMetaMap.containsKey(logId2));
assertTrue(entryLogger.logExists(logId3));

gcThread.runWithFlags(true, true, false);
gcThread.runWithFlags(true, true, false, conf.getMajorCompactionThreshold(),
conf.getMinorCompactionThreshold(), conf.getMajorCompactionMaxTimeMillis(),
conf.getMinorCompactionMaxTimeMillis());

assertTrue(entryLogger.logExists(logId1));
assertTrue(entryLogger.logExists(logId2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,12 @@ public void forceGC() {
}

@Override
public void forceGC(boolean forceMajor, boolean forceMinor) {
CompactableLedgerStorage.super.forceGC(forceMajor, forceMinor);
public void forceGC(boolean forceMajor, boolean forceMinor,
double majorCompactionThreshold, double minorCompactionThreshold,
long majorCompactionMaxTimeMillis, long minorCompactionMaxTimeMillis) {
CompactableLedgerStorage.super.forceGC(forceMajor, forceMinor,
majorCompactionThreshold, minorCompactionThreshold,
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
}

public void suspendMinorGC() {
Expand Down
Loading