Skip to content

Commit 84b8fec

Browse files
authored
KAFKA-14486 Move LogCleanerManager to storage module (apache#19216)
Move LogCleanerManager and related classes to storage module and rewrite in Java. Reviewers: TengYao Chi <[email protected]>, Jun Rao <[email protected]>, Mickael Maison <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent eb88e78 commit 84b8fec

18 files changed

+1347
-935
lines changed

Diff for: checkstyle/import-control-storage.xml

+1
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
<subpackage name="storage.internals">
8181
<allow pkg="kafka.server"/>
8282
<allow pkg="kafka.log"/>
83+
<allow pkg="kafka.utils"/>
8384
<allow pkg="com.fasterxml.jackson" />
8485
<allow pkg="com.yammer.metrics.core" />
8586
<allow pkg="org.apache.kafka.common" />

Diff for: core/src/main/scala/kafka/log/LogCleaner.scala

+14-48
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ import java.io.{File, IOException}
2121
import java.lang.{Long => JLong}
2222
import java.nio._
2323
import java.util
24-
import java.util.Date
24+
import java.util.{Date, Optional}
2525
import java.util.concurrent.TimeUnit
2626
import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
2727
import kafka.server.{BrokerReconfigurable, KafkaConfig}
28-
import kafka.utils.{Logging, Pool}
28+
import kafka.utils.Logging
2929
import org.apache.kafka.common.{KafkaException, TopicPartition}
3030
import org.apache.kafka.common.config.ConfigException
3131
import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException}
@@ -36,12 +36,13 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time}
3636
import org.apache.kafka.server.config.ServerConfigs
3737
import org.apache.kafka.server.metrics.KafkaMetricsGroup
3838
import org.apache.kafka.server.util.ShutdownableThread
39-
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogCleaningAbortedException, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, ThreadShutdownException, TransactionIndex, UnifiedLog}
39+
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogCleanerManager, LogCleaningAbortedException, LogCleaningException, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, LogToClean, OffsetMap, PreCleanStats, SkimpyOffsetMap, ThreadShutdownException, TransactionIndex, UnifiedLog}
4040
import org.apache.kafka.storage.internals.utils.Throttler
4141

4242
import scala.jdk.CollectionConverters._
4343
import scala.collection.mutable.ListBuffer
4444
import scala.collection.{Iterable, Seq, Set, mutable}
45+
import scala.jdk.OptionConverters.{RichOption, RichOptional}
4546
import scala.util.control.ControlThrowable
4647

4748
/**
@@ -93,13 +94,13 @@ import scala.util.control.ControlThrowable
9394
*
9495
* @param initialConfig Initial configuration parameters for the cleaner. Actual config may be dynamically updated.
9596
* @param logDirs The directories where offset checkpoints reside
96-
* @param logs The pool of logs
97+
* @param logs The map of logs
9798
* @param logDirFailureChannel The channel used to add offline log dirs that may be encountered when cleaning the log
9899
* @param time A way to control the passage of time
99100
*/
100101
class LogCleaner(initialConfig: CleanerConfig,
101102
val logDirs: Seq[File],
102-
val logs: Pool[TopicPartition, UnifiedLog],
103+
val logs: util.concurrent.ConcurrentMap[TopicPartition, UnifiedLog],
103104
val logDirFailureChannel: LogDirFailureChannel,
104105
time: Time = Time.SYSTEM) extends Logging with BrokerReconfigurable {
105106
// Visible for test.
@@ -109,7 +110,7 @@ class LogCleaner(initialConfig: CleanerConfig,
109110
@volatile private var config = initialConfig
110111

111112
/* for managing the state of partitions being cleaned. package-private to allow access in tests */
112-
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, logDirFailureChannel)
113+
private[log] val cleanerManager = new LogCleanerManager(logDirs.asJava, logs, logDirFailureChannel)
113114

114115
/* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
115116
private[log] val throttler = new Throttler(config.maxIoBytesPerSecond, 300, "cleaner-io", "bytes", time)
@@ -249,7 +250,7 @@ class LogCleaner(initialConfig: CleanerConfig,
249250
* @param partitionToRemove The topicPartition to be removed, default none
250251
*/
251252
def updateCheckpoints(dataDir: File, partitionToRemove: Option[TopicPartition] = None): Unit = {
252-
cleanerManager.updateCheckpoints(dataDir, partitionToRemove = partitionToRemove)
253+
cleanerManager.updateCheckpoints(dataDir, Optional.empty(), partitionToRemove.toJava)
253254
}
254255

255256
/**
@@ -300,7 +301,7 @@ class LogCleaner(initialConfig: CleanerConfig,
300301
* @param topicPartitions The collection of topicPartitions to be resumed cleaning
301302
*/
302303
def resumeCleaning(topicPartitions: Iterable[TopicPartition]): Unit = {
303-
cleanerManager.resumeCleaning(topicPartitions)
304+
cleanerManager.resumeCleaning(topicPartitions.toList.asJava)
304305
}
305306

306307
/**
@@ -314,7 +315,7 @@ class LogCleaner(initialConfig: CleanerConfig,
314315
* @return A boolean indicating whether the work has completed before timeout
315316
*/
316317
def awaitCleaned(topicPartition: TopicPartition, offset: Long, maxWaitMs: Long = 60000L): Boolean = {
317-
def isCleaned = cleanerManager.allCleanerCheckpoints.get(topicPartition).fold(false)(_ >= offset)
318+
def isCleaned = Option(cleanerManager.allCleanerCheckpoints.get(topicPartition)).fold(false)(_ >= offset)
318319
var remainingWaitMs = maxWaitMs
319320
while (!isCleaned && remainingWaitMs > 0) {
320321
val sleepTime = math.min(100, remainingWaitMs)
@@ -331,7 +332,7 @@ class LogCleaner(initialConfig: CleanerConfig,
331332
* @return A list of log partitions that retention threads can safely work on
332333
*/
333334
def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, UnifiedLog)] = {
334-
cleanerManager.pauseCleaningForNonCompactedPartitions()
335+
cleanerManager.pauseCleaningForNonCompactedPartitions().asScala.map(entry => (entry.getKey, entry.getValue))
335336
}
336337

337338
// Only for testing
@@ -409,7 +410,7 @@ class LogCleaner(initialConfig: CleanerConfig,
409410
@throws(classOf[LogCleaningException])
410411
private def cleanFilthiestLog(): Boolean = {
411412
val preCleanStats = new PreCleanStats()
412-
val ltc = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats)
413+
val ltc = cleanerManager.grabFilthiestCompactedLog(time, preCleanStats).toScala
413414
val cleaned = ltc match {
414415
case None =>
415416
false
@@ -424,7 +425,7 @@ class LogCleaner(initialConfig: CleanerConfig,
424425
case e: Exception => throw new LogCleaningException(cleanable.log, e.getMessage, e)
425426
}
426427
}
427-
val deletable: Iterable[(TopicPartition, UnifiedLog)] = cleanerManager.deletableLogs()
428+
val deletable = cleanerManager.deletableLogs().asScala
428429
try {
429430
deletable.foreach { case (_, log) =>
430431
try {
@@ -435,7 +436,7 @@ class LogCleaner(initialConfig: CleanerConfig,
435436
}
436437
}
437438
} finally {
438-
cleanerManager.doneDeleting(deletable.map(_._1))
439+
cleanerManager.doneDeleting(deletable.keys.toList.asJava)
439440
}
440441

441442
cleaned
@@ -1150,25 +1151,6 @@ private[log] class Cleaner(val id: Int,
11501151
}
11511152
}
11521153

1153-
/**
1154-
* A simple struct for collecting pre-clean stats
1155-
*/
1156-
private class PreCleanStats {
1157-
var maxCompactionDelayMs = 0L
1158-
var delayedPartitions = 0
1159-
var cleanablePartitions = 0
1160-
1161-
def updateMaxCompactionDelay(delayMs: Long): Unit = {
1162-
maxCompactionDelayMs = Math.max(maxCompactionDelayMs, delayMs)
1163-
if (delayMs > 0) {
1164-
delayedPartitions += 1
1165-
}
1166-
}
1167-
def recordCleanablePartitions(numOfCleanables: Int): Unit = {
1168-
cleanablePartitions = numOfCleanables
1169-
}
1170-
}
1171-
11721154
/**
11731155
* A simple struct for collecting stats about log cleaning
11741156
*/
@@ -1221,22 +1203,6 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
12211203

12221204
}
12231205

1224-
/**
1225-
* Helper class for a log, its topic/partition, the first cleanable position, the first uncleanable dirty position,
1226-
* and whether it needs compaction immediately.
1227-
*/
1228-
private case class LogToClean(topicPartition: TopicPartition,
1229-
log: UnifiedLog,
1230-
firstDirtyOffset: Long,
1231-
uncleanableOffset: Long,
1232-
needCompactionNow: Boolean = false) extends Ordered[LogToClean] {
1233-
val cleanBytes: Long = log.logSegments(-1, firstDirtyOffset).asScala.map(_.size.toLong).sum
1234-
val (firstUncleanableOffset, cleanableBytes) = LogCleanerManager.calculateCleanableBytes(log, firstDirtyOffset, uncleanableOffset)
1235-
val totalBytes: Long = cleanBytes + cleanableBytes
1236-
val cleanableRatio: Double = cleanableBytes / totalBytes.toDouble
1237-
override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
1238-
}
1239-
12401206
/**
12411207
* This is a helper class to facilitate tracking transaction state while cleaning the log. It maintains a set
12421208
* of the ongoing aborted and committed transactions as the cleaner is working its way through the log. This

0 commit comments

Comments
 (0)