From 28361ccb1b1bc65ebb967b85818eaa55f2b1e06e Mon Sep 17 00:00:00 2001 From: Seungha Lee Date: Tue, 3 Mar 2020 14:59:46 +0900 Subject: [PATCH] Upgrade dependencies & separate common version literals into val + trivial code cleanups --- .../actor/cluster/KafkaStateActor.scala | 22 ++++----- app/kafka/manager/model/ActorModel.scala | 14 +++--- .../utils/two40/GroupMetadataManager.scala | 29 +----------- build.sbt | 47 ++++++++++--------- 4 files changed, 44 insertions(+), 68 deletions(-) diff --git a/app/kafka/manager/actor/cluster/KafkaStateActor.scala b/app/kafka/manager/actor/cluster/KafkaStateActor.scala index ba6dcab2e..702a7d929 100644 --- a/app/kafka/manager/actor/cluster/KafkaStateActor.scala +++ b/app/kafka/manager/actor/cluster/KafkaStateActor.scala @@ -130,7 +130,7 @@ case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends Ba val options = new DescribeConsumerGroupsOptions options.timeoutMs(1000) client.describeConsumerGroups(groupList.asJava, options).all().whenComplete { - (mapGroupDescription, error) => mapGroupDescription.asScala.foreach { + (mapGroupDescription, _) => mapGroupDescription.asScala.foreach { case (group, desc) => enqueue.offer(group -> desc.members().asScala.map(m => MemberMetadata.from(group, desc, m)).toList) } @@ -438,7 +438,7 @@ object ConsumerInstanceSubscriptions extends Logging { import org.json4s.jackson.JsonMethods.parse import org.json4s.scalaz.JsonScalaz.field val json = parse(jsonString) - val subs: Map[String, Int] = field[Map[String,Int]]("subscription")(json).fold({ e => + val subs: Map[String, Int] = field[Map[String,Int]]("subscription")(json).fold({ _ => error(s"[consumer=$consumer] Failed to parse consumer instance subscriptions : $id : $jsonString"); Map.empty}, identity) new ConsumerInstanceSubscriptions(id, subs) } @@ -483,10 +483,8 @@ trait OffsetCache extends Logging { // Get partition leader broker information val optPartitionsWithLeaders : Option[List[(Int, Option[BrokerIdentity])]] = getTopicPartitionLeaders(topic) - val clientId = "partitionOffsetGetter" val time = -1 val nOffsets = 1 - val simpleConsumerBufferSize = 256 * 1024 val currentActiveBrokerSet:Set[String] = getBrokerList().list.map(_.host).toSet val partitionsByBroker = optPartitionsWithLeaders.map { @@ -672,13 +670,13 @@ trait OffsetCache extends Logging { val (partitionOffsets, partitionOwners) = consumerType match { case ZKManagedConsumer => val partitionOffsets = for { - td <- optTopic + _ <- optTopic tpi <- optTpi } yield { readConsumerOffsetByTopicPartition(consumer, topic, tpi) } val partitionOwners = for { - td <- optTopic + _ <- optTopic tpi <- optTpi } yield { readConsumerOwnerByTopicPartition(consumer, topic, tpi) @@ -686,13 +684,13 @@ trait OffsetCache extends Logging { (partitionOffsets, partitionOwners) case KafkaManagedConsumer => val partitionOffsets = for { - td <- optTopic + _ <- optTopic tpi <- optTpi } yield { readKafkaManagedConsumerOffsetByTopicPartition(consumer, topic, tpi) } val partitionOwners = for { - td <- optTopic + _ <- optTopic tpi <- optTpi } yield { readKafkaManagedConsumerOwnerByTopicPartition(consumer, topic, tpi) @@ -818,7 +816,7 @@ case class OffsetCacheActive(curator: CuratorFramework IndexedSeq.empty[ConsumerNameAndType] } { data: java.util.Map[String, ChildData] => data.asScala.filter{ - case (consumer, childData) => + case (_, childData) => if (clusterContext.config.filterConsumers) // Defining "inactive consumer" as a consumer that is missing one of three children ids/ offsets/ or owners/ childData.getStat.getNumChildren > 2 @@ -1203,7 +1201,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom states.map(_.map{case (part, state) => val partition = part.toInt val descJson = parse(state) - val leaderID = field[Int]("leader")(descJson).fold({ e => + val leaderID = field[Int]("leader")(descJson).fold({ _ => log.error(s"[topic=$topic] Failed to get partitions from topic json $state"); 0}, identity) val leader = targetBrokers.find(_.id == leaderID) (partition, leader) @@ -1444,7 +1442,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom if (shutdown) { return } - var optPartitionsWithLeaders : Option[List[(Int, Option[BrokerIdentity])]] = getPartitionLeaders(topic) + val optPartitionsWithLeaders : Option[List[(Int, Option[BrokerIdentity])]] = getPartitionLeaders(topic) optPartitionsWithLeaders match { case Some(leaders) => leaders.foreach(leader => { @@ -1492,7 +1490,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom try { kafkaConsumer = Option(new KafkaConsumer(consumerProperties)) val request = tpList.map(f => new TopicPartition(f._1.topic(), f._1.partition())) - var tpOffsetMapOption = kafkaConsumer.map(_.endOffsets(request.asJavaCollection).asScala) + val tpOffsetMapOption = kafkaConsumer.map(_.endOffsets(request.asJavaCollection).asScala) var topicOffsetMap: Map[Int, Long] = null tpOffsetMapOption.foreach(tpOffsetMap => tpOffsetMap.keys.foreach(tp => { diff --git a/app/kafka/manager/model/ActorModel.scala b/app/kafka/manager/model/ActorModel.scala index 2881b4fe4..d91916e98 100644 --- a/app/kafka/manager/model/ActorModel.scala +++ b/app/kafka/manager/model/ActorModel.scala @@ -267,7 +267,7 @@ object ActorModel { val portResult = fieldExtended[Int]("port")(json) val jmxPortResult = fieldExtended[Int]("jmx_port")(json) val hostPortResult: JsonScalaz.Result[(String, Map[SecurityProtocol, Int])] = json.findField(_._1 == "endpoints").map(_ => fieldExtended[List[String]]("endpoints")(json)) - .fold((hostResult |@| portResult |@| DEFAULT_SECURE)((a, b, c) => (a, Map(PLAINTEXT.asInstanceOf[SecurityProtocol] -> b)))){ + .fold((hostResult |@| portResult |@| DEFAULT_SECURE)((a, b, _) => (a, Map(PLAINTEXT.asInstanceOf[SecurityProtocol] -> b)))){ r => r.flatMap { endpointList => @@ -296,7 +296,7 @@ object ActorModel { } result } else { - (hostResult |@| portResult |@| DEFAULT_SECURE)((a, b, c) => (a, Map(PLAINTEXT.asInstanceOf[SecurityProtocol] -> b))) + (hostResult |@| portResult |@| DEFAULT_SECURE)((a, b, _) => (a, Map(PLAINTEXT.asInstanceOf[SecurityProtocol] -> b))) } } } @@ -304,11 +304,11 @@ object ActorModel { tpl <- hostPortResult host = tpl._1 port = tpl._2 - secure = (tpl._2.contains(PLAINTEXT) && tpl._2.size > 1) || (!tpl._2.contains(PLAINTEXT) && tpl._2.nonEmpty) - nonSecure = tpl._2.contains(PLAINTEXT) + secure = (port.contains(PLAINTEXT) && port.size > 1) || (!port.contains(PLAINTEXT) && port.nonEmpty) + nonSecure = port.contains(PLAINTEXT) jmxPort <- jmxPortResult } yield { - BrokerIdentity(brokerId, host, jmxPort, secure, nonSecure, tpl._2) + BrokerIdentity(brokerId, host, jmxPort, secure, nonSecure, port) } } } @@ -494,7 +494,7 @@ import scala.language.reflectiveCalls private[this] def getPartitionReplicaMap(td: TopicDescription) : Map[String, List[Int]] = { // Get the topic description information val descJson = parse(td.description._2) - field[Map[String,List[Int]]]("partitions")(descJson).fold({ e => + field[Map[String,List[Int]]]("partitions")(descJson).fold({ _ => logger.error(s"[topic=${td.topic}] Failed to get partitions from topic json ${td.description._2}") Map.empty }, identity) @@ -550,7 +550,7 @@ import scala.language.reflectiveCalls try { val resultOption: Option[(Int,Map[String, String])] = td.config.map { configString => val configJson = parse(configString._2) - val configMap : Map[String, String] = field[Map[String,String]]("config")(configJson).fold({ e => + val configMap : Map[String, String] = field[Map[String,String]]("config")(configJson).fold({ _ => logger.error(s"Failed to parse topic config ${configString._2}") Map.empty }, identity) diff --git a/app/kafka/manager/utils/two40/GroupMetadataManager.scala b/app/kafka/manager/utils/two40/GroupMetadataManager.scala index 5d702d5d3..aa087e448 100644 --- a/app/kafka/manager/utils/two40/GroupMetadataManager.scala +++ b/app/kafka/manager/utils/two40/GroupMetadataManager.scala @@ -133,12 +133,6 @@ case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offs } object GroupMetadata { - private val validPreviousStates: Map[GroupState, Set[GroupState]] = - Map(Dead -> Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead), - CompletingRebalance -> Set(PreparingRebalance), - Stable -> Set(CompletingRebalance), - PreparingRebalance -> Set(Stable, CompletingRebalance, Empty), - Empty -> Set(PreparingRebalance)) def loadGroup(groupId: String, initialState: GroupState, @@ -179,7 +173,7 @@ class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) e private[two40] val lock = new ReentrantLock - private var state: GroupState = initialState + private val state: GroupState = initialState var currentStateTimestamp: Option[Long] = Some(time.milliseconds()) var protocolType: Option[String] = None var generationId = 0 @@ -187,14 +181,10 @@ class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) e private var protocol: Option[String] = None private val members = new mutable.HashMap[String, MemberMetadata] - private val pendingMembers = new mutable.HashSet[String] - private var numMembersAwaitingJoin = 0 private val supportedProtocols = new mutable.HashMap[String, Integer]().withDefaultValue(0) private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset] private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata] private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]() - private var receivedTransactionalOffsetCommits = false - private var receivedConsumerOffsetCommits = false var newMemberAdded: Boolean = false @@ -828,23 +818,6 @@ object GroupMetadataManager { } } - private def parseOffsets(offsetKey: OffsetKey, payload: ByteBuffer): (Option[String], Option[String]) = { - val groupId = offsetKey.key.group - val topicPartition = offsetKey.key.topicPartition - val keyString = s"offset_commit::group=$groupId,partition=$topicPartition" - - val offset = GroupMetadataManager.readOffsetMessageValue(payload) - val valueString = if (offset == null) { - "" - } else { - if (offset.metadata.isEmpty) - s"offset=${offset.offset}" - else - s"offset=${offset.offset},metadata=${offset.metadata}" - } - - (Some(keyString), Some(valueString)) - } } case class GroupTopicPartition(group: String, topicPartition: TopicPartition) { diff --git a/build.sbt b/build.sbt index 9a1084abf..002fa64ee 100644 --- a/build.sbt +++ b/build.sbt @@ -21,35 +21,40 @@ assemblyMergeStrategy in assembly := { case other => (assemblyMergeStrategy in assembly).value(other) } +val akkaVersion = "2.6.2" +val curatorVersion = "4.2.0" +val json4sVersion = "3.6.7" +val kafkaVersion = "2.4.0" + libraryDependencies ++= Seq( - "com.typesafe.akka" %% "akka-actor" % "2.5.19", - "com.typesafe.akka" %% "akka-slf4j" % "2.5.19", + "com.typesafe.akka" %% "akka-actor" % akkaVersion, + "com.typesafe.akka" %% "akka-slf4j" % akkaVersion, "com.google.code.findbugs" % "jsr305" % "3.0.2", - "org.webjars" %% "webjars-play" % "2.6.3", - "org.webjars" % "bootstrap" % "4.3.1", - "org.webjars" % "jquery" % "3.3.1-2", + "org.webjars" %% "webjars-play" % "2.8.0", + "org.webjars" % "bootstrap" % "4.4.1", + "org.webjars" % "jquery" % "3.4.0", "org.webjars" % "backbonejs" % "1.3.3", "org.webjars" % "underscorejs" % "1.9.0", "org.webjars" % "dustjs-linkedin" % "2.7.2", "org.webjars" % "octicons" % "4.3.0", - "org.apache.curator" % "curator-framework" % "2.12.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), - "org.apache.curator" % "curator-recipes" % "2.12.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), - "org.json4s" %% "json4s-jackson" % "3.6.5", - "org.json4s" %% "json4s-scalaz" % "3.6.5", - "org.slf4j" % "log4j-over-slf4j" % "1.7.25", - "com.adrianhurt" %% "play-bootstrap" % "1.4-P26-B4" exclude("com.typesafe.play", "*"), + "org.apache.curator" % "curator-framework" % curatorVersion exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), + "org.apache.curator" % "curator-recipes" % curatorVersion exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), + "org.json4s" %% "json4s-jackson" % json4sVersion, + "org.json4s" %% "json4s-scalaz" % json4sVersion, + "org.slf4j" % "log4j-over-slf4j" % "1.7.30", + "com.adrianhurt" %% "play-bootstrap" % "1.5.1-P27-B3" exclude("com.typesafe.play", "*"), "org.clapper" %% "grizzled-slf4j" % "1.3.3", - "org.apache.kafka" %% "kafka" % "2.4.0" exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), - "org.apache.kafka" % "kafka-streams" % "2.2.0", - "com.beachape" %% "enumeratum" % "1.5.13", - "com.github.ben-manes.caffeine" % "caffeine" % "2.6.2", - "com.typesafe.play" %% "play-logback" % "2.6.21", - "org.scalatest" %% "scalatest" % "3.0.5" % "test", - "org.scalatestplus.play" %% "scalatestplus-play" % "3.1.2" % "test", - "org.apache.curator" % "curator-test" % "2.12.0" % "test", - "org.mockito" % "mockito-core" % "1.10.19" % "test", + "org.apache.kafka" %% "kafka" % kafkaVersion exclude("log4j","log4j") exclude("org.slf4j", "slf4j-log4j12") force(), + "org.apache.kafka" % "kafka-streams" % kafkaVersion, + "com.beachape" %% "enumeratum" % "1.5.14", + "com.github.ben-manes.caffeine" % "caffeine" % "2.8.0", + "com.typesafe.play" %% "play-logback" % "2.8.0", + "org.scalatest" %% "scalatest" % "3.1.0" % "test", + "org.scalatestplus.play" %% "scalatestplus-play" % "4.0.3" % "test", + "org.apache.curator" % "curator-test" % curatorVersion % "test", + "org.mockito" % "mockito-core" % "3.2.4" % "test", "com.yammer.metrics" % "metrics-core" % "2.2.0" force(), - "com.unboundid" % "unboundid-ldapsdk" % "4.0.9" + "com.unboundid" % "unboundid-ldapsdk" % "4.0.14" ) routesGenerator := InjectedRoutesGenerator