Skip to content

Commit fa4d9b9

Browse files
committed
Fix getting topic/partition for LogSegments/Directory with jmx
1 parent bf068d0 commit fa4d9b9

File tree

1 file changed

+28
-31
lines changed

1 file changed

+28
-31
lines changed

app/kafka/manager/jmx/KafkaJMX.scala

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import scala.util.matching.Regex
2323
import scala.util.{Failure, Try}
2424

2525
object KafkaJMX extends Logging {
26-
26+
2727
private[this] val defaultJmxConnectorProperties = Map[String, Any] (
2828
"jmx.remote.x.request.waiting.timeout" -> "3000",
2929
"jmx.remote.x.notification.fetch.timeout" -> "3000",
@@ -99,7 +99,7 @@ object KafkaMetrics {
9999
private def getBrokerTopicMeterMetrics(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, metricName: String, topicOption: Option[String]) = {
100100
getMeterMetric(mbsc, getObjectName(kafkaVersion, metricName, topicOption))
101101
}
102-
102+
103103
private def getSep(kafkaVersion: KafkaVersion) : String = {
104104
kafkaVersion match {
105105
case Kafka_0_8_1_1 => "\""
@@ -110,7 +110,7 @@ object KafkaMetrics {
110110
def getObjectName(kafkaVersion: KafkaVersion, name: String, topicOption: Option[String] = None) = {
111111
val sep = getSep(kafkaVersion)
112112
val topicAndName = kafkaVersion match {
113-
case Kafka_0_8_1_1 =>
113+
case Kafka_0_8_1_1 =>
114114
topicOption.map( topic => s"${sep}$topic-$name${sep}").getOrElse(s"${sep}AllTopics$name${sep}")
115115
case _ =>
116116
val topicProp = topicOption.map(topic => s",topic=$topic").getOrElse("")
@@ -127,11 +127,11 @@ object KafkaMetrics {
127127
/* Gauge, Value : 0 */
128128
private val replicaFetcherManagerMaxLag = new ObjectName(
129129
"kafka.server:type=ReplicaFetcherManager,name=MaxLag,clientId=Replica")
130-
130+
131131
/* Gauge, Value : 0 */
132132
private val kafkaControllerActiveControllerCount = new ObjectName(
133133
"kafka.controller:type=KafkaController,name=ActiveControllerCount")
134-
134+
135135
/* Gauge, Value : 0 */
136136
private val kafkaControllerOfflinePartitionsCount = new ObjectName(
137137
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount")
@@ -144,16 +144,18 @@ object KafkaMetrics {
144144
private val operatingSystemObjectName = new ObjectName("java.lang:type=OperatingSystem")
145145

146146
/* Log Segments */
147-
private val logSegmentObjectName = new ObjectName("kafka.log:type=Log,name=*-LogSegments")
147+
private val logSegmentObjectName = new ObjectName("kafka.log:type=Log,name=LogSegments,topic=*,partition=*")
148148

149-
private val directoryObjectName = new ObjectName("kafka.log:type=Log,name=*-Directory")
149+
private val directoryObjectName = new ObjectName("kafka.log:type=Log,name=Directory,topic=*,partition=*")
150150

151-
private val LogSegmentsNameRegex = new Regex("%s-LogSegments".format("""(.*)-(\d*)"""), "topic", "partition")
151+
// exp: kafka.log:type=Log,name=LogSegments,topic=WEB_post,partition=19/Value (ArrayList) = [baseOffset=0, created=1527666489299, logSize=55232565, indexSize=252680]
152+
// private val LogSegmentsNameRegex = new Regex("%s-LogSegments".format("""(.*)-(\d*)"""), "topic", "partition")
152153

153-
private val DirectoryNameRegex = new Regex("%s-Directory".format("""(.*)-(\d*)"""), "topic", "partition")
154+
// exp: kafka.log:type=Log,name=Directory,topic=WEB_post,partition=16/Value (String) = /log/kafka/WEB_post-16
155+
// private val DirectoryNameRegex = new Regex("%s-Directory".format("""(.*)-(\d*)"""), "topic", "partition")
154156

155157
val LogSegmentRegex = new Regex(
156-
"baseOffset=(.*), created=(.*), logSize=(.*), indexSize=(.*)",
158+
"baseOffset=(.*); created=(.*); logSize=(.*); indexSize=(.*)",
157159
"baseOffset", "created", "logSize", "indexSize"
158160
)
159161

@@ -172,7 +174,7 @@ object KafkaMetrics {
172174
case _: InstanceNotFoundException => OSMetric(0D, 0D)
173175
}
174176
}
175-
177+
176178
private def getMeterMetric(mbsc: MBeanServerConnection, name: ObjectName) = {
177179
import scala.collection.JavaConverters._
178180
try {
@@ -187,7 +189,7 @@ object KafkaMetrics {
187189
case _: InstanceNotFoundException => MeterMetric(0,0,0,0,0)
188190
}
189191
}
190-
192+
191193
private def getLongValue(attributes: Seq[Attribute], name: String) = {
192194
attributes.find(_.getName == name).map(_.getValue.asInstanceOf[Long]).getOrElse(0L)
193195
}
@@ -196,27 +198,22 @@ object KafkaMetrics {
196198
attributes.find(_.getName == name).map(_.getValue.asInstanceOf[Double]).getOrElse(0D)
197199
}
198200

199-
private def topicAndPartition(name: String, regex: Regex) = {
201+
private def topicAndPartition(objectName: ObjectName) = {
200202
try {
201-
val matches = regex.findAllIn(name).matchData.toSeq
202-
require(matches.size == 1)
203-
val m = matches.head
204-
205-
val topic = m.group("topic")
206-
val partition = m.group("partition").toInt
207-
203+
val topic = objectName.getKeyProperty("topic")
204+
val partition = objectName.getKeyProperty("partition").toInt
208205
(topic, partition)
209206
}
210207
catch {
211208
case e: Exception =>
212-
throw new IllegalStateException("Can't parse topic and partition from: <%s>".format(name), e)
209+
throw new IllegalStateException("Can't parse topic and partition from: <%s>".format(objectName), e)
213210
}
214211
}
215212

216213
private def queryValues[K, V](
217214
mbsc: MBeanServerConnection,
218215
objectName: ObjectName,
219-
keyConverter: String => K,
216+
keyConverter: ObjectName => K,
220217
valueConverter: Object => V
221218
) = {
222219
val logsSizeObjectNames = mbsc.queryNames(objectName, null).asScala.toSeq
@@ -228,12 +225,12 @@ object KafkaMetrics {
228225
private def queryValue[K, V](
229226
mbsc: MBeanServerConnection,
230227
objectName: ObjectName,
231-
keyConverter: String => K,
228+
keyConverter: ObjectName => K,
232229
valueConverter: Object => V
233230
) = {
234-
val name = objectName.getKeyProperty("name")
231+
// val name = objectName.getKeyProperty("name")
235232
val mbean = MBeanServerInvocationHandler.newProxyInstance(mbsc, objectName, classOf[GaugeMBean], true)
236-
(keyConverter(name), valueConverter(mbean.getValue))
233+
(keyConverter(objectName), valueConverter(mbean.getValue))
237234
}
238235

239236
private def parseLogSegment(str: String): LogSegment = {
@@ -259,7 +256,7 @@ object KafkaMetrics {
259256
queryValues(
260257
mbsc,
261258
logSegmentObjectName,
262-
key => topicAndPartition(key, LogSegmentsNameRegex),
259+
key => topicAndPartition(key),
263260
value => {
264261
val lst = value.asInstanceOf[ju.List[String]]
265262
lst.asScala.map(parseLogSegment).toSeq
@@ -271,7 +268,7 @@ object KafkaMetrics {
271268
queryValues(
272269
mbsc,
273270
directoryObjectName,
274-
key => topicAndPartition(key, DirectoryNameRegex),
271+
key => topicAndPartition(key),
275272
value => value.asInstanceOf[String]
276273
)
277274
}.toMap
@@ -355,10 +352,10 @@ case class MeterMetric(count: Long,
355352

356353
def +(o: MeterMetric) : MeterMetric = {
357354
MeterMetric(
358-
o.count + count,
359-
o.fifteenMinuteRate + fifteenMinuteRate,
360-
o.fiveMinuteRate + fiveMinuteRate,
361-
o.oneMinuteRate + oneMinuteRate,
355+
o.count + count,
356+
o.fifteenMinuteRate + fifteenMinuteRate,
357+
o.fiveMinuteRate + fiveMinuteRate,
358+
o.oneMinuteRate + oneMinuteRate,
362359
o.meanRate + meanRate)
363360
}
364361
}

0 commit comments

Comments
 (0)