Skip to content

Commit 84dfe5a

Browse files
committed
Update base Kafka test code
1 parent f4d76ab commit 84dfe5a

File tree

4 files changed

+30
-5
lines changed

4 files changed

+30
-5
lines changed

build.sbt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ val scalaTestScalaCheckVersion = "3.2.9.0"
2222
val akkaStreamsJson = "0.8.0"
2323
val diffxVersion = "0.5.6"
2424
val testContainersVersion = "0.39.8"
25+
val testContainersJavaVersion = "1.16.2"
2526

2627
val flagsFor12 = Seq(
2728
"-Xlint:_",
@@ -88,7 +89,9 @@ lazy val core = project
8889
"com.softwaremill.diffx" %% "diffx-scalatest" % diffxVersion % Test,
8990
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
9091
"com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % Test,
91-
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersVersion % Test
92+
"com.dimafeng" %% "testcontainers-scala-scalatest" % testContainersVersion % Test,
93+
"com.dimafeng" %% "testcontainers-scala-kafka" % testContainersVersion % Test,
94+
"org.testcontainers" % "kafka" % testContainersJavaVersion % Test
9295
)
9396
)
9497

core/src/main/scala/io/aiven/guardian/kafka/KafkaClient.scala

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@ import java.util.Base64
2323
/** A Kafka Client that uses Alpakka Kafka Consumer under the hood to create a stream of events from a Kafka cluster. To
2424
* configure the Alpakka Kafka Consumer use the standard typesafe configuration i.e. akka.kafka.consumer (note that the
2525
* `keySerializer` and `valueSerializer` are hardcoded so you cannot override this).
26+
* @param configure
27+
* A way to configure the underlying Kafka consumer settings
2628
* @param system
2729
* A classic `ActorSystem`
2830
* @param kafkaClusterConfig
2931
* Additional cluster configuration that is needed
3032
*/
31-
class KafkaClient()(implicit system: ActorSystem, kafkaClusterConfig: KafkaCluster)
33+
class KafkaClient(
34+
configure: Option[ConsumerSettings[Array[Byte], Array[Byte]] => ConsumerSettings[Array[Byte], Array[Byte]]] = None
35+
)(implicit system: ActorSystem, kafkaClusterConfig: KafkaCluster)
3236
extends KafkaClientInterface
3337
with StrictLogging {
3438
override type CursorContext = Committable
@@ -37,8 +41,10 @@ class KafkaClient()(implicit system: ActorSystem, kafkaClusterConfig: KafkaClust
3741
if (kafkaClusterConfig.topics.isEmpty)
3842
logger.warn("Kafka Cluster configuration has no topics set")
3943

40-
private[kafka] val consumerSettings =
41-
ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
44+
private[kafka] val consumerSettings = {
45+
val base = ConsumerSettings(system, new ByteArrayDeserializer, new ByteArrayDeserializer)
46+
configure.fold(base)(block => block(base))
47+
}
4248

4349
private[kafka] val subscriptions = Subscriptions.topics(kafkaClusterConfig.topics)
4450

core/src/test/scala/io/aiven/guardian/kafka/Generators.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,18 @@ object Generators {
109109
max: Int = 100,
110110
padTimestampsMillis: Int = 10
111111
): Gen[KafkaDataWithTimePeriod] = for {
112-
topic <- Gen.alphaStr
112+
topic <- kafkaTopic
113113
records <- Generators.kafkaReducedConsumerRecordsGen(topic, min, max, padTimestampsMillis)
114114
head = records.head
115115
last = records.last
116116

117117
duration <- Gen.choose[Long](head.timestamp, last.timestamp - 1).map(millis => FiniteDuration(millis, MILLISECONDS))
118118
} yield KafkaDataWithTimePeriod(records, duration)
119119

120+
/** Generator for a valid Kafka topic that can be used in actual Kafka clusters
121+
*/
122+
lazy val kafkaTopic: Gen[String] = for {
123+
size <- Gen.choose(1, 249)
124+
topic <- Gen.listOfN(size, Gen.oneOf(Gen.alphaChar, Gen.numChar, Gen.const('-'), Gen.const('.'), Gen.const('_')))
125+
} yield topic.mkString
120126
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package io.aiven.guardian.kafka
2+
3+
import akka.testkit.TestKitBase
4+
import com.dimafeng.testcontainers.ForAllTestContainer
5+
import com.dimafeng.testcontainers.KafkaContainer
6+
import org.scalatest.Suite
7+
8+
trait KafkaClusterTest extends ForAllTestContainer with TestKitBase { this: Suite =>
9+
override lazy val container: KafkaContainer = new KafkaContainer()
10+
}

0 commit comments

Comments
 (0)