Skip to content

Commit d5aa7be

Browse files
authored
Add configurable deserializer and fix build (#109)
* Add MyConfigurableDeserializer example * Update tpolecat and add warn exclusion for scalaPB * Update README and MyConfigurableDeserializer
1 parent 10473f3 commit d5aa7be

File tree

4 files changed

+89
-1
lines changed

4 files changed

+89
-1
lines changed

README.md

+45
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,48 @@ message Person {
6969
repeated PhoneNumber phones = 4;
7070
}
7171
```
72+
73+
## Configurable example
74+
`io.example.conduktor.custom.deserializers.MyConfigurableDeserializer`
75+
76+
[located here](./src/main/scala/io/example/conduktor/custom/deserializers/MyConfigurableDeserializer.scala)
77+
78+
This example allow to show deserializer configuration to change it's behavior.
79+
To configure the behabor, the Deserializer check for a `output` property in it's configuration.
80+
81+
### Passthrough mode :
82+
With configuration :
83+
```properties
84+
output=passthrough
85+
```
86+
The data on record are not de coded and returned as-is in bytes array form.
87+
88+
### Config mode :
89+
With configuration :
90+
```properties
91+
output=config
92+
```
93+
The configuration is returned on each record deserialization.
94+
For example with configuration
95+
```properties
96+
output=config
97+
other.property=some value
98+
```
99+
Will always return JSON like
100+
```json
101+
{
102+
"output": "config",
103+
"other.property": "some value"
104+
}
105+
```
106+
107+
### Constant mode :
108+
109+
With configuration output defined to something else other than `config` or `passthrough` and not empty like:
110+
```properties
111+
output=some constant output
112+
```
113+
The Deserializer will always return String value like
114+
```json
115+
"some constant output"
116+
```

build.sbt

+6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import org.typelevel.scalacoptions.ScalacOptions
2+
13
name := "my_custom_deserializers"
24
version := sys.env.getOrElse("CREATED_TAG", "0.1")
35
scalaVersion := "2.13.10"
@@ -8,6 +10,10 @@ libraryDependencies ++= Seq(
810
"com.thesamet.scalapb.common-protos" %% "proto-google-common-protos-scalapb_0.11" % "2.9.6-0"
911
)
1012

13+
Compile / tpolecatExcludeOptions ++= Set(
14+
ScalacOptions.warnNonUnitStatement, // for scalaPB gen sources
15+
)
16+
1117
assembly / assemblyJarName := "plugins.jar"
1218

1319
// ## Github Packages publish configs

project/plugins.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.2")
1+
addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.0")
22

33
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package io.example.conduktor.custom.deserializers
2+
3+
import org.apache.kafka.common.serialization.Deserializer
4+
5+
import java.util
6+
import scala.jdk.CollectionConverters.MapHasAsScala
7+
8+
case object MyConfigurableDeserializerException
9+
extends RuntimeException(
10+
"ConfigurableDeserializer fail when its `::configure` method is called without `output` property"
11+
)
12+
13+
sealed trait Output
14+
final case class Constant(value: String) extends Output
15+
final case class Config(config: util.Map[String, _]) extends Output
16+
final case object Passthrough extends Output
17+
final case object Unconfigured extends Output
18+
19+
final class MyConfigurableDeserializer extends Deserializer[Any] {
20+
21+
var output: Output = Unconfigured
22+
23+
override def deserialize(topic: String, data: Array[Byte]): Any = output match {
24+
case Constant(value) => value
25+
case Config(config) => config
26+
case Passthrough => data
27+
case Unconfigured => throw MyConfigurableDeserializerException
28+
}
29+
30+
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit =
31+
configs.asScala.get("output").map(_.asInstanceOf[String]) match {
32+
case Some("config") => output = Config(configs)
33+
case Some("passthrough") => output = Passthrough
34+
case Some(value) => output = Constant(value)
35+
case None => throw MyConfigurableDeserializerException
36+
}
37+
}

0 commit comments

Comments
 (0)