Skip to content

Commit 7103abb

Browse files
author
gavingaozhangmin
committed
Support pass threshold parameters
1 parent 97e3bb1 commit 7103abb

File tree

12 files changed

+149
-35
lines changed

12 files changed

+149
-35
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -288,16 +288,28 @@ public void enableForceGC() {
288288
if (forceGarbageCollection.compareAndSet(false, true)) {
289289
LOG.info("Forced garbage collection triggered by thread: {}", Thread.currentThread().getName());
290290
triggerGC(true, suspendMajorCompaction.get(),
291-
suspendMinorCompaction.get());
291+
suspendMinorCompaction.get(), majorCompactionThreshold, minorCompactionThreshold,
292+
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
292293
}
293294
}
294295

295-
public void enableForceGC(Boolean forceMajor, Boolean forceMinor) {
296+
public void enableForceGC(Boolean forceMajor, Boolean forceMinor,
297+
Double majorCompactionThreshold, Double minorCompactionThreshold,
298+
Long majorCompactionMaxTimeMillis, Long minorCompactionMaxTimeMillis) {
296299
if (forceGarbageCollection.compareAndSet(false, true)) {
297-
LOG.info("Forced garbage collection triggered by thread: {}, forceMajor: {}, forceMinor: {}",
298-
Thread.currentThread().getName(), forceMajor, forceMinor);
300+
LOG.info("Forced garbage collection triggered by thread: {}, forceMajor: {}, forceMinor: {}, "
301+
+ "majorCompactionThreshold :{}, minorCompactionThreshold: {}, "
302+
+ "majorCompactionMaxTimeMillis: {}, minorCompactionMaxTimeMillis: {}",
303+
Thread.currentThread().getName(), forceMajor, forceMinor, majorCompactionThreshold,
304+
minorCompactionThreshold, majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
299305
triggerGC(true, forceMajor == null ? suspendMajorCompaction.get() : !forceMajor,
300-
forceMinor == null ? suspendMinorCompaction.get() : !forceMinor);
306+
forceMinor == null ? suspendMinorCompaction.get() : !forceMinor,
307+
majorCompactionThreshold == null ? this.majorCompactionThreshold : majorCompactionThreshold,
308+
minorCompactionThreshold == null ? this.minorCompactionThreshold : minorCompactionThreshold,
309+
majorCompactionMaxTimeMillis == null
310+
? this.majorCompactionMaxTimeMillis : majorCompactionMaxTimeMillis,
311+
minorCompactionMaxTimeMillis == null
312+
? this.minorCompactionMaxTimeMillis : minorCompactionMaxTimeMillis);
301313
}
302314
}
303315

@@ -310,9 +322,14 @@ public void disableForceGC() {
310322

311323
Future<?> triggerGC(final boolean force,
312324
final boolean suspendMajor,
313-
final boolean suspendMinor) {
325+
final boolean suspendMinor,
326+
final double majorCompactionThreshold,
327+
final double minorCompactionThreshold,
328+
final long majorCompactionMaxTimeMillis,
329+
final long minorCompactionMaxTimeMillis) {
314330
return gcExecutor.submit(() -> {
315-
runWithFlags(force, suspendMajor, suspendMinor);
331+
runWithFlags(force, suspendMajor, suspendMinor, majorCompactionThreshold, minorCompactionThreshold,
332+
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
316333
});
317334
}
318335

@@ -322,7 +339,8 @@ Future<?> triggerGC() {
322339
final boolean suspendMinor = suspendMinorCompaction.get();
323340

324341
return gcExecutor.submit(() -> {
325-
runWithFlags(force, suspendMajor, suspendMinor);
342+
runWithFlags(force, suspendMajor, suspendMinor, majorCompactionThreshold, minorCompactionThreshold,
343+
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
326344
});
327345
}
328346

@@ -392,7 +410,8 @@ public void run() {
392410
boolean suspendMajor = suspendMajorCompaction.get();
393411
boolean suspendMinor = suspendMinorCompaction.get();
394412

395-
runWithFlags(force, suspendMajor, suspendMinor);
413+
runWithFlags(force, suspendMajor, suspendMinor, majorCompactionThreshold, minorCompactionThreshold,
414+
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
396415

397416
if (force) {
398417
// only set force to false if it had been true when the garbage
@@ -401,7 +420,9 @@ public void run() {
401420
}
402421
}
403422

404-
public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMinor) {
423+
public void runWithFlags(boolean force, boolean suspendMajor, boolean suspendMinor,
424+
double majorCompactionThreshold, double minorCompactionThreshold,
425+
long majorCompactionMaxTimeMillis, long minorCompactionMaxTimeMillis) {
405426
long threadStart = MathUtils.nowInNano();
406427
if (force) {
407428
LOG.info("Garbage collector thread forced to perform GC before expiry of wait time.");

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,11 @@ public void forceGC() {
260260
}
261261

262262
@Override
263-
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
264-
gcThread.enableForceGC(forceMajor, forceMinor);
263+
public void forceGC(Boolean forceMajor, Boolean forceMinor,
264+
Double majorCompactionThreshold, Double minorCompactionThreshold,
265+
Long majorCompactionMaxTimeMillis, Long minorCompactionMaxTimeMillis) {
266+
gcThread.enableForceGC(forceMajor, forceMinor, majorCompactionThreshold, minorCompactionThreshold,
267+
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
265268
}
266269

267270
@Override

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,9 @@ default void forceGC() {
232232
/**
233233
* Force trigger Garbage Collection with forceMajor or forceMinor parameter.
234234
*/
235-
default void forceGC(Boolean forceMajor, Boolean forceMinor) {
235+
default void forceGC(Boolean forceMajor, Boolean forceMinor,
236+
Double majorCompactionThreshold, Double minorCompactionThreshold,
237+
Long majorCompactionMaxTimeMillis, Long minorCompactionMaxTimeMillis) {
236238
return;
237239
}
238240

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,8 +367,11 @@ public void forceGC() {
367367
}
368368

369369
@Override
370-
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
371-
interleavedLedgerStorage.forceGC(forceMajor, forceMinor);
370+
public void forceGC(Boolean forceMajor, Boolean forceMinor,
371+
Double majorCompactionThreshold, Double minorCompactionThreshold,
372+
Long majorCompactionMaxTimeMillis, Long minorCompactionMaxTimeMillis) {
373+
interleavedLedgerStorage.forceGC(forceMajor, forceMinor, majorCompactionThreshold, minorCompactionThreshold,
374+
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
372375
}
373376

374377
@Override

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,8 +506,12 @@ public void forceGC() {
506506
}
507507

508508
@Override
509-
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
510-
ledgerStorageList.stream().forEach(s -> s.forceGC(forceMajor, forceMinor));
509+
public void forceGC(Boolean forceMajor, Boolean forceMinor,
510+
Double majorCompactionThreshold, Double minorCompactionThreshold,
511+
Long majorCompactionMaxTimeMillis, Long minorCompactionMaxTimeMillis) {
512+
ledgerStorageList.stream().forEach(s -> s.forceGC(forceMajor, forceMinor,
513+
majorCompactionThreshold, minorCompactionThreshold,
514+
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis));
511515
}
512516

513517
@Override

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -265,8 +265,11 @@ public void forceGC() {
265265
}
266266

267267
@Override
268-
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
269-
gcThread.enableForceGC(forceMajor, forceMinor);
268+
public void forceGC(Boolean forceMajor, Boolean forceMinor,
269+
Double majorCompactionThreshold, Double minorCompactionThreshold,
270+
Long majorCompactionMaxTimeMillis, Long minorCompactionMaxTimeMillis) {
271+
gcThread.enableForceGC(forceMajor, forceMinor, majorCompactionThreshold, minorCompactionThreshold,
272+
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
270273
}
271274

272275
@Override

bookkeeper-server/src/main/java/org/apache/bookkeeper/server/http/service/TriggerGCService.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,28 @@ public HttpServiceResponse handle(HttpServiceRequest request) throws Exception {
7171
Map<String, Object> configMap = JsonUtil.fromJson(requestBody, HashMap.class);
7272
Boolean forceMajor = (Boolean) configMap.getOrDefault("forceMajor", null);
7373
Boolean forceMinor = (Boolean) configMap.getOrDefault("forceMinor", null);
74-
bookieServer.getBookie().getLedgerStorage().forceGC(forceMajor, forceMinor);
74+
Double majorCompactionThreshold = (Double) configMap.getOrDefault("majorCompactionThreshold",
75+
conf.getMajorCompactionThreshold());
76+
Double minorCompactionThreshold = (Double) configMap.getOrDefault("minorCompactionThreshold",
77+
conf.getMinorCompactionThreshold());
78+
Long majorCompactionMaxTimeMillis = configMap.get(
79+
"majorCompactionMaxTimeMillis") == null ? conf.getMajorCompactionMaxTimeMillis()
80+
: (long) (int) configMap.get("majorCompactionMaxTimeMillis");
81+
Long minorCompactionMaxTimeMillis = configMap.get(
82+
"minorCompactionMaxTimeMillis") == null ? conf.getMinorCompactionMaxTimeMillis()
83+
: (long) (int) configMap.get("majorCompactionMaxTimeMillis");
84+
85+
if (majorCompactionThreshold > 1.0f || majorCompactionThreshold < 0
86+
|| minorCompactionThreshold > 1.0f || minorCompactionThreshold < 0
87+
|| minorCompactionThreshold >= majorCompactionThreshold) {
88+
response.setCode(HttpServer.StatusCode.BAD_REQUEST);
89+
response.setBody("Bad request parameters");
90+
return response;
91+
}
92+
93+
bookieServer.getBookie().getLedgerStorage().forceGC(forceMajor, forceMinor,
94+
majorCompactionThreshold, minorCompactionThreshold,
95+
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
7596
}
7697

7798
String output = "Triggered GC on BookieServer: " + bookieServer.toString();

bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,8 @@ public void testForceGarbageCollectionWhenDiskIsFull(boolean isForceCompactionAl
362362
// disable forceMajor and forceMinor
363363
majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
364364
minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
365-
getGCThread().triggerGC(true, true, true).get();
365+
getGCThread().triggerGC(true, true, true, majorCompactionThreshold, minorCompactionThreshold,
366+
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
366367
majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
367368
minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
368369
assertEquals(majorCompactionCntBeforeGC, majorCompactionCntAfterGC);
@@ -371,7 +372,8 @@ public void testForceGarbageCollectionWhenDiskIsFull(boolean isForceCompactionAl
371372
// enable forceMajor and forceMinor
372373
majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
373374
minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
374-
getGCThread().triggerGC(true, false, false).get();
375+
getGCThread().triggerGC(true, false, false, majorCompactionThreshold, minorCompactionThreshold,
376+
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
375377
majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
376378
minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
377379
assertEquals(majorCompactionCntBeforeGC + 1, majorCompactionCntAfterGC);
@@ -380,7 +382,8 @@ public void testForceGarbageCollectionWhenDiskIsFull(boolean isForceCompactionAl
380382
// enable forceMajor and disable forceMinor
381383
majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
382384
minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
383-
getGCThread().triggerGC(true, false, true).get();
385+
getGCThread().triggerGC(true, false, true, majorCompactionThreshold, minorCompactionThreshold,
386+
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
384387
majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
385388
minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
386389
assertEquals(majorCompactionCntBeforeGC + 1, majorCompactionCntAfterGC);
@@ -389,7 +392,8 @@ public void testForceGarbageCollectionWhenDiskIsFull(boolean isForceCompactionAl
389392
// disable forceMajor and enable forceMinor
390393
majorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
391394
minorCompactionCntBeforeGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
392-
getGCThread().triggerGC(true, true, false).get();
395+
getGCThread().triggerGC(true, true, false, majorCompactionThreshold, minorCompactionThreshold,
396+
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
393397
majorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMajorCompactionCounter();
394398
minorCompactionCntAfterGC = getGCThread().getGarbageCollectionStatus().getMinorCompactionCounter();
395399
assertEquals(majorCompactionCntBeforeGC, majorCompactionCntAfterGC);
@@ -721,7 +725,8 @@ public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws Exception {
721725
}
722726

723727
LOG.info("Finished deleting the ledgers contains most entries.");
724-
getGCThread().triggerGC(true, false, false).get();
728+
getGCThread().triggerGC(true, false, false, majorCompactionThreshold, minorCompactionThreshold,
729+
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
725730

726731
assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime);
727732
assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime);
@@ -740,17 +745,20 @@ public void testMinorCompactionWithEntryLogPerLedgerEnabled() throws Exception {
740745
// Now, let's mark E1 as flushed, as its ledger L1 has been deleted already. In this case, the GC algorithm
741746
// should consider it for deletion.
742747
((DefaultEntryLogger) getGCThread().entryLogger).recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(1L);
743-
getGCThread().triggerGC(true, false, false).get();
748+
getGCThread().triggerGC(true, false, false, majorCompactionThreshold, minorCompactionThreshold,
749+
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
744750
assertTrue("Found entry log file 1.log that should have been compacted in ledgerDirectory: "
745751
+ tmpDirs.getDirs().get(0), TestUtils.hasNoneLogFiles(tmpDirs.getDirs().get(0), 1));
746752

747753
// Once removed the ledger L0, then deleting E0 is fine (only if it has been flushed).
748754
bkc.deleteLedger(lhs[0].getId());
749-
getGCThread().triggerGC(true, false, false).get();
755+
getGCThread().triggerGC(true, false, false, majorCompactionThreshold, minorCompactionThreshold,
756+
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
750757
assertTrue("Found entry log file 0.log that should not have been compacted in ledgerDirectory: "
751758
+ tmpDirs.getDirs().get(0), TestUtils.hasAllLogFiles(tmpDirs.getDirs().get(0), 0));
752759
((DefaultEntryLogger) getGCThread().entryLogger).recentlyCreatedEntryLogsStatus.flushRotatedEntryLog(0L);
753-
getGCThread().triggerGC(true, false, false).get();
760+
getGCThread().triggerGC(true, false, false, majorCompactionThreshold, minorCompactionThreshold,
761+
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
754762
assertTrue("Found entry log file 0.log that should have been compacted in ledgerDirectory: "
755763
+ tmpDirs.getDirs().get(0), TestUtils.hasNoneLogFiles(tmpDirs.getDirs().get(0), 0));
756764
}
@@ -868,7 +876,8 @@ public void testMinorCompactionWithNoWritableLedgerDirsButIsForceGCAllowWhenNoSp
868876
bkc.deleteLedger(lhs[2].getId());
869877

870878
LOG.info("Finished deleting the ledgers contains most entries.");
871-
getGCThread().triggerGC(true, false, false).get();
879+
getGCThread().triggerGC(true, false, false, majorCompactionThreshold, minorCompactionThreshold,
880+
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis()).get();
872881

873882
// after garbage collection, major compaction should not be executed
874883
assertEquals(lastMajorCompactionTime, getGCThread().lastMajorCompactionTime);
@@ -1407,7 +1416,8 @@ public void testCancelledCompactionWhenShuttingDown() throws Exception {
14071416
bkc.deleteLedger(lhs[2].getId());
14081417
LOG.info("Finished deleting the ledgers contains most entries.");
14091418

1410-
getGCThread().triggerGC(true, false, false);
1419+
getGCThread().triggerGC(true, false, false, majorCompactionThreshold, minorCompactionThreshold,
1420+
baseConf.getMajorCompactionMaxTimeMillis(), baseConf.getMinorCompactionMaxTimeMillis());
14111421
getGCThread().throttler.cancelledAcquire();
14121422
waitUntilTrue(() -> {
14131423
try {

bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,9 @@ public void testCompactionWithFileSizeCheck() throws Exception {
295295

296296
storage.deleteLedger(1);
297297
// only logId 1 will be compacted.
298-
gcThread.runWithFlags(true, true, false);
298+
gcThread.runWithFlags(true, true, false, conf.getMajorCompactionThreshold(),
299+
conf.getMinorCompactionThreshold(), conf.getMajorCompactionMaxTimeMillis(),
300+
conf.getMinorCompactionMaxTimeMillis());
299301

300302
// logId1 and logId2 should be compacted
301303
assertFalse(entryLogger.logExists(logId1));
@@ -362,7 +364,9 @@ public void testCompactionWithoutFileSizeCheck() throws Exception {
362364
assertTrue(entryLogMetaMap.containsKey(logId2));
363365
assertTrue(entryLogger.logExists(logId3));
364366

365-
gcThread.runWithFlags(true, true, false);
367+
gcThread.runWithFlags(true, true, false, conf.getMajorCompactionThreshold(),
368+
conf.getMinorCompactionThreshold(), conf.getMajorCompactionMaxTimeMillis(),
369+
conf.getMinorCompactionMaxTimeMillis());
366370

367371
assertTrue(entryLogger.logExists(logId1));
368372
assertTrue(entryLogger.logExists(logId2));

bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/MockLedgerStorage.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,12 @@ public void forceGC() {
271271
}
272272

273273
@Override
274-
public void forceGC(Boolean forceMajor, Boolean forceMinor) {
275-
CompactableLedgerStorage.super.forceGC(forceMajor, forceMinor);
274+
public void forceGC(Boolean forceMajor, Boolean forceMinor,
275+
Double majorCompactionThreshold, Double minorCompactionThreshold,
276+
Long majorCompactionMaxTimeMillis, Long minorCompactionMaxTimeMillis) {
277+
CompactableLedgerStorage.super.forceGC(forceMajor, forceMinor,
278+
majorCompactionThreshold, minorCompactionThreshold,
279+
majorCompactionMaxTimeMillis, minorCompactionMaxTimeMillis);
276280
}
277281

278282
public void suspendMinorGC() {

0 commit comments

Comments
 (0)