This is a thin Scala wrapper for the Kafka Streams API. It does not intend to provide a Scala-idiomatic API, but rather intends to make the original API simpler to use from Scala. In particular, it provides the following adjustments:
- Scala lambda expressions can be used directly
- when aggregating and counting, counts are converted from Java
Longs to ScalaLongs - when using a
flatMapoperation, this lets you use a ScalaIterable Serdes (Serializers/Deserializers) can be implicitly found in the scope
This API also contains a few Serdes (Serializers/Deserializers):
- to convert Scala
Int/Long/Doubleto/from their binary representation - to convert Scala
Int/Long/Doubleto/from string representation - to convert case classes to/from JSON
Finally, the API provides the following extensions:
KStreamS.split()(see documentation below)
The main objects are:
KStreamsBuilderSas the entry point to build streams or tablesKStreamSas a wrapper aroundKStreamKGroupedStreamSas a wrapper aroundKGroupedStreamKTableSas a wrapper aroundKTableKGroupedTableas a wrapper aroundKGroupedTable
With the original Java API, you would create an instance of KStreamBuilder, then use it to create streams or tables. Here, KStreamsBuilderS is an object that can be used directly:
val stream: KStreamS[String, String] = KStreamBuilderS.stream[String, String]("my-stream")
val table: KTableS[String, String] = KStreamBuilderS.table[String, String]("my-table")When starting the application, you just need to unwrap the KStreamBuilder by calling KStreamBuilderS.inner:
val streams = new KafkaStreams(KStreamBuilderS.inner, config)It is a common mistake to forget to specify Serdes when using the Java API, then resulting in class cast errors when objects are serialized or deserialized.
To work around this issue, this API requires Serdes to be used. Most of the times, it is enough to declare your Serdes as implicit values, and they will be picked up automatically:
implicit val stringSerde: Serde[String] = Serdes.String()
implicit val userSerde: Serde[User] = new MyUserSerde
val usersByIdStream = KStreamBuilderS.stream[String, User]("users-by-id")Resolution is based on the type of the object to serialize/deserialize, so make sure you have a Serde of the appropriate type. If not, you should see an error such as:
Error:(87, 80) could not find implicit value for parameter keySerde: org.apache.kafka.common.serialization.Serde[String]
If, on the other hand, you have multiple Serdes for the same type, you might see the following error:
Error:(88, 80) ambiguous implicit values:
both value stringSerde2 of type org.apache.kafka.common.serialization.Serde[String]
and value stringSerde1 of type org.apache.kafka.common.serialization.Serde[String]
match expected type org.apache.kafka.common.serialization.Serde[String]
In this case, just pass the Serde explicitly:
val usersByIdStream = KStreamBuilderS.stream[String, User]("users-by-id")(stringSerde, userSerde)To convert Scala Int/Long/Double to/from their binary representation:
import com.github.aseigneurin.kafka.serialization.scala._
implicit val intSerde = IntAsStringSerde
implicit val longSerde = LongAsStringSerde
implicit val doubleSerde = DoubleAsStringSerdeTo convert Scala Int/Long/Double to/from string representation:
import com.github.aseigneurin.kafka.serialization.scala._
implicit val intSerde = IntSerde
implicit val longSerde = LongSerde
implicit val doubleSerde = DoubleSerdeTo convert case classes to/from JSON:
- define a
case class - create an instance of
JsonSerdewith the case class as the generic type
Example:
import com.github.aseigneurin.kafka.serialization.scala._
case class User(name: String)
implicit val stringSerde = Serdes.String
implicit val userSerde = new JsonSerde[User]
// read JSON -> case class
KStreamBuilderS.stream[String, User]("users")
.mapValues { user => user.name }
.to("names")
// write case class -> JSON
KStreamBuilderS.stream[String, String]("names")
.mapValues { name => User(name) }
.to("users")This repository contains a Scala version of the Java Word Count Demo.
Here is the code to implement a word count:
val props = new Properties()
// ...
implicit val stringSerde = Serdes.String
implicit val longSerde = LongAsStringSerde
val source = KStreamBuilderS.stream[String, String]("streams-file-input")
val counts: KTableS[String, Long] = source
.flatMapValues { value => value.toLowerCase(Locale.getDefault).split(" ") }
.map { (_, value) => (value, value) }
.groupByKey
.count("Counts")
counts.to("streams-wordcount-output")
val streams = new KafkaStreams(KStreamBuilderS.inner, props)
streams.start()This method applies a predicate and returns two KStreamSs, one with the messages that match the predicate, and another one with the messages that don't match.
The two KStreamSs are returned in a tuple that can be easily deconstructed:
def isValidMessage(v: ...): Boolean = ???
val (goodMessages, badMessages) = deserializedMessages.split((k, v) => isValidMessage(v))