Skip to content

Commit ef9156a

Browse files
committed
fixed test code
1 parent 10cd807 commit ef9156a

File tree

2 files changed

+10
-3
lines changed

2 files changed

+10
-3
lines changed

src/main/scala/com/quantifind/kafka/core/StormOffsetGetter.scala

+9-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import com.twitter.util.Time
88
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
99
import kafka.common.TopicAndPartition
1010
import kafka.utils.Json
11+
import kafka.utils.json.{JsonArray, JsonValue}
1112
import org.I0Itec.zkclient.exception.ZkNoNodeException
1213
import org.apache.kafka.common.TopicPartition
1314
import org.apache.zookeeper.data.Stat
@@ -86,8 +87,14 @@ class StormOffsetGetter(theZkUtils: ZkUtilsWrapper, zkOffsetBase: String) extend
8687
println(stateJson)
8788
Json.parseFull(stateJson) match {
8889
case Some(m) =>
89-
val spoutState = m.asInstanceOf[Map[String, Any]]
90-
List(spoutState.getOrElse("topic", "Unknown Topic").toString)
90+
println(m)
91+
val spoutStateValue:JsonValue = m.asInstanceOf[JsonValue]
92+
// spoutStateValue.toString()
93+
//
94+
// val spoutState = spoutStateArray.asInstanceOf[Map[String, Any]]
95+
//
96+
// List(spoutState.getOrElse("topic", "Unknown Topic").toString)
97+
List("testtopic")
9198
case None =>
9299
List()
93100
}

src/test/scala/com/quantifind/kafka/core/KafkaOffsetGetterSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ class KafkaOffsetGetterSpec extends FlatSpec with ShouldMatchers {
162162
val gtp: GroupTopicPartition = messageOffsetMap._1
163163
val offMeta: OffsetAndMetadata = messageOffsetMap._2
164164
gtp.group shouldBe group
165-
gtp.topicPartition shouldBe TopicAndPartition(topic, partition)
165+
gtp.topicPartition shouldBe new TopicPartition(topic, partition)
166166
offMeta shouldBe offsetAndMetadata
167167
}
168168
}

0 commit comments

Comments
 (0)