Skip to content

Commit 2ed84c8

Browse files
vderPiotr Fałdrowicz
and
Piotr Fałdrowicz
authored
valid support for nested records in avro kafkaSink (#8137)
* validation Test for nested record in kafkaSink * replacing types in runtimeObjType * recursive version * custom AvroSchemaTypeDefinitionExtractor * additional comment --------- Co-authored-by: Piotr Fałdrowicz <[email protected]>
1 parent 3ac857a commit 2ed84c8

File tree

9 files changed

+137
-20
lines changed

9 files changed

+137
-20
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package pl.touk.nussknacker.engine.schemedkafka.schema
2+
3+
object NestedRecord extends TestSchemaWithRecord {
4+
5+
val stringSchema: String =
6+
"""
7+
|{
8+
| "type": "record",
9+
| "name": "gskTest",
10+
| "fields": [
11+
| {
12+
| "name": "mapSimple",
13+
| "type": [
14+
| "null",
15+
| {
16+
| "type": "record",
17+
| "name": "messageSimple",
18+
| "fields": [
19+
| {
20+
| "name": "id",
21+
| "type": [
22+
| "null",
23+
| {
24+
| "type": "record",
25+
| "name": "messageSimple2",
26+
| "fields": [
27+
| {
28+
| "name": "id",
29+
| "type": [
30+
| "null",
31+
| "string"
32+
| ],
33+
| "default": null
34+
| }
35+
| ]
36+
| }
37+
| ],
38+
| "default": null
39+
| }
40+
| ]
41+
| }
42+
| ],
43+
| "default": null
44+
| }
45+
| ]
46+
|}
47+
""".stripMargin
48+
49+
val exampleData: Map[String, Any] = Map(
50+
"mapSimple" -> Map("id" -> "id"),
51+
)
52+
53+
val jsonMap: String =
54+
s"""
55+
|{
56+
| mapSimple: {id: "id"}
57+
|}
58+
""".stripMargin
59+
60+
}

engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/KafkaAvroSinkSpecMixin.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ trait KafkaAvroSinkSpecMixin {
1717
.register(fullnameTopic, FullNameV1.schema, 1, isKey = false)
1818
.register(fullnameTopic, FullNameV2.schema, 2, isKey = false)
1919
.register(fullnameTopic, PaymentV1.schema, 3, isKey = false)
20+
.register(fullnameTopic, NestedRecord.schema, 4, isKey = false)
2021
.build
2122

2223
val factory: SchemaRegistryClientFactory = MockSchemaRegistryClientFactory.confluentBased(schemaRegistryMockClient)

engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/UniversalKafkaSinkValidationSpec.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,27 @@ class UniversalKafkaSinkValidationSpec extends KafkaAvroSpecMixin with KafkaAvro
6464
result.errors shouldBe Nil
6565
}
6666

67+
test("should validate nested record") {
68+
val result = validate(
69+
sinkKeyParamName.value -> "".spel,
70+
"mapSimple" -> """{id:{id:"10"}}""".spel,
71+
sinkRawEditorParamName.value -> "false".spel,
72+
sinkValidationModeParamName.value -> validationModeParam(ValidationMode.strict),
73+
topicParamName.value -> s"'${KafkaAvroSinkMockSchemaRegistry.fullnameTopic}'".spel,
74+
schemaVersionParamName.value -> "'4'".spel
75+
)
76+
77+
result.errors shouldBe Nil
78+
}
79+
6780
test("should validate latest version") {
6881
val result = validate(
6982
sinkKeyParamName.value -> "".spel,
7083
sinkValueParamName.value -> PaymentV1.exampleData.toSpELLiteral.spel,
7184
sinkRawEditorParamName.value -> "true".spel,
7285
sinkValidationModeParamName.value -> validationModeParam(ValidationMode.strict),
7386
topicParamName.value -> s"'${KafkaAvroSinkMockSchemaRegistry.fullnameTopic}'".spel,
74-
schemaVersionParamName.value -> s"'${SchemaVersionOption.LatestOptionName}'".spel
87+
schemaVersionParamName.value -> s"'3'".spel
7588
)
7689

7790
result.errors shouldBe Nil
@@ -119,7 +132,7 @@ class UniversalKafkaSinkValidationSpec extends KafkaAvroSpecMixin with KafkaAvro
119132
paramName = schemaVersionParamName,
120133
label = None,
121134
value = "'343543'",
122-
values = List("'latest'", "'1'", "'2'", "'3'"),
135+
values = List("'latest'", "'1'", "'2'", "'3'", "'4'"),
123136
nodeId = "id"
124137
) :: Nil
125138
}
@@ -131,7 +144,7 @@ class UniversalKafkaSinkValidationSpec extends KafkaAvroSpecMixin with KafkaAvro
131144
sinkRawEditorParamName.value -> "true".spel,
132145
sinkValidationModeParamName.value -> validationModeParam(ValidationMode.strict),
133146
topicParamName.value -> s"'${KafkaAvroSinkMockSchemaRegistry.fullnameTopic}'".spel,
134-
schemaVersionParamName.value -> s"'${SchemaVersionOption.LatestOptionName}'".spel
147+
schemaVersionParamName.value -> s"'3'".spel
135148
)
136149

137150
result.errors shouldBe CustomNodeError(

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schema/AvroSchemaBasedParameter.scala

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,27 @@ object AvroSchemaBasedParameter {
2828
/*
2929
We extract editor form from Avro schema
3030
*/
31-
def apply(schema: Schema, restrictedParamNames: Set[ParameterName])(
31+
def apply(
32+
schema: Schema,
33+
restrictedParamNames: Set[ParameterName],
34+
avroSchemaTypeDefinitionExtractor: AvroSchemaTypeDefinitionExtractor
35+
)(
3236
implicit nodeId: NodeId
3337
): ValidatedNel[ProcessCompilationError, SchemaBasedParameter] =
34-
toSchemaBasedParameter(schema, paramName = None, defaultValue = None, restrictedParamNames = restrictedParamNames)
38+
toSchemaBasedParameter(
39+
schema,
40+
paramName = None,
41+
defaultValue = None,
42+
restrictedParamNames = restrictedParamNames,
43+
avroSchemaTypeDefinitionExtractor
44+
)
3545

3646
private def toSchemaBasedParameter(
3747
schema: Schema,
3848
paramName: Option[ParameterName],
3949
defaultValue: Option[Expression],
40-
restrictedParamNames: Set[ParameterName]
50+
restrictedParamNames: Set[ParameterName],
51+
avroSchemaTypeDefinitionExtractor: AvroSchemaTypeDefinitionExtractor
4152
)(implicit nodeId: NodeId): ValidatedNel[ProcessCompilationError, SchemaBasedParameter] = {
4253
import cats.implicits.{catsStdInstancesForList, toTraverseOps}
4354

@@ -65,15 +76,16 @@ object AvroSchemaBasedParameter {
6576
schema = recordField.schema(),
6677
paramName = Some(concatName),
6778
defaultValue,
68-
restrictedParamNames
79+
restrictedParamNames,
80+
avroSchemaTypeDefinitionExtractor
6981
)
7082
}
7183
sinkValueValidated.map(sinkValueParam => fieldName -> sinkValueParam)
7284
}
7385
listOfValidatedParams.sequence.map(l => ListMap(l: _*)).map(SchemaBasedRecordParameter)
7486
}
7587
} else {
76-
Valid(AvroSinkSingleValueParameter(paramName, schema, defaultValue))
88+
Valid(AvroSinkSingleValueParameter(paramName, schema, defaultValue, avroSchemaTypeDefinitionExtractor))
7789
}
7890
}
7991

@@ -96,9 +108,10 @@ object AvroSinkSingleValueParameter {
96108
def apply(
97109
paramName: Option[ParameterName],
98110
schema: Schema,
99-
defaultValue: Option[Expression]
111+
defaultValue: Option[Expression],
112+
avroSchemaTypeDefinitionExtractor: AvroSchemaTypeDefinitionExtractor
100113
): SingleSchemaBasedParameter = {
101-
val typing = AvroSchemaTypeDefinitionExtractor.typeDefinition(schema)
114+
val typing = avroSchemaTypeDefinitionExtractor.typeDefinition(schema)
102115
val name = paramName.getOrElse(sinkValueParamName)
103116
val parameter = (
104117
if (schema.isNullable) Parameter.optional(name, typing) else Parameter(name, typing)

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/ParsedSchemaSupport.scala

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,10 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{
3535
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.serialization._
3636
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.serialization.jsonpayload.ConfluentJsonPayloadKafkaSerializer
3737
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.formatter.AvroMessageReader
38-
import pl.touk.nussknacker.engine.schemedkafka.typed.AvroSchemaTypeDefinitionExtractor
38+
import pl.touk.nussknacker.engine.schemedkafka.typed.{
39+
AvroSchemaTypeDefinitionExtractor,
40+
AvroSchemaTypeDefinitionExtractorWithUnderlyingMap
41+
}
3942
import pl.touk.nussknacker.engine.util.parameters.{SchemaBasedParameter, SingleSchemaBasedParameter}
4043

4144
sealed trait ParsedSchemaSupport[+S <: ParsedSchema] extends UniversalSchemaSupport {
@@ -114,7 +117,16 @@ class AvroSchemaSupport(kafkaConfig: KafkaConfig) extends ParsedSchemaSupport[Av
114117
rawParameter: Parameter,
115118
restrictedParamNames: Set[ParameterName]
116119
)(implicit nodeId: NodeId): ValidatedNel[ProcessCompilationError, SchemaBasedParameter] =
117-
extractParameter(schema, rawMode, validationMode, rawParameter, restrictedParamNames)
120+
extractParameter(
121+
schema,
122+
rawMode,
123+
validationMode,
124+
rawParameter,
125+
restrictedParamNames,
126+
// We need custom AvroSchemaTypeDefinitionExtractor here as otherwise SpelExpressionValidator rises an errors
127+
// when rawmode is off and sink schema contains nested records
128+
AvroSchemaTypeDefinitionExtractorWithUnderlyingMap
129+
)
118130

119131
override def extractParameterForTests(schema: ParsedSchema)(
120132
implicit nodeId: NodeId
@@ -132,7 +144,8 @@ class AvroSchemaSupport(kafkaConfig: KafkaConfig) extends ParsedSchemaSupport[Av
132144
rawMode: Boolean,
133145
validationMode: ValidationMode,
134146
rawParameter: Parameter,
135-
restrictedParamNames: Set[ParameterName]
147+
restrictedParamNames: Set[ParameterName],
148+
avroSchemaTypeDefinitionExtractor: AvroSchemaTypeDefinitionExtractor = AvroSchemaTypeDefinitionExtractor
136149
)(implicit nodeId: NodeId): ValidatedNel[ProcessCompilationError, SchemaBasedParameter] = {
137150
if (rawMode) {
138151
Validated.Valid(
@@ -142,7 +155,7 @@ class AvroSchemaSupport(kafkaConfig: KafkaConfig) extends ParsedSchemaSupport[Av
142155
)
143156
)
144157
} else {
145-
AvroSchemaBasedParameter(schema.cast().rawSchema(), restrictedParamNames)
158+
AvroSchemaBasedParameter(schema.cast().rawSchema(), restrictedParamNames, avroSchemaTypeDefinitionExtractor)
146159
}
147160
}
148161

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.schemedkafka.sink
22

33
import cats.data.NonEmptyList
44
import io.confluent.kafka.schemaregistry.ParsedSchema
5+
import org.apache.avro.generic.GenericRecord
56
import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema
67
import pl.touk.nussknacker.engine.ModelConfig
78
import pl.touk.nussknacker.engine.api.{LazyParameter, MetaData, NodeId, Params}
@@ -41,6 +42,8 @@ import pl.touk.nussknacker.engine.util.sinkvalue.SinkValue
4142
*/
4243
object UniversalKafkaSinkFactory {
4344

45+
private val genericRecordClass = classOf[GenericRecord]
46+
4447
private val paramsDeterminedAfterSchema = List(
4548
Parameter.optional[CharSequence](sinkKeyParamName).copy(isLazyParameter = true),
4649
Parameter[Boolean](sinkRawEditorParamName).copy(

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/typed/AvroSchemaTypeDefinitionExtractor.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,19 @@ package pl.touk.nussknacker.engine.schemedkafka.typed
33
import org.apache.avro.{LogicalTypes, Schema}
44
import org.apache.avro.generic.{GenericData, GenericRecord}
55
import org.apache.avro.generic.GenericData.EnumSymbol
6-
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedClass, TypedNull, TypingResult}
6+
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedClass, TypedNull, TypingResult, Unknown}
77

88
import java.nio.ByteBuffer
99
import java.time.{Instant, LocalDate, LocalTime}
1010
import java.util.UUID
1111

1212
object AvroSchemaTypeDefinitionExtractor extends AvroSchemaTypeDefinitionExtractor(Typed.typedClass[GenericRecord])
1313

14+
object AvroSchemaTypeDefinitionExtractorWithUnderlyingMap
15+
extends AvroSchemaTypeDefinitionExtractor(
16+
Typed.genericTypeClass(classOf[java.util.Map[_, _]], List(Typed[String], Unknown))
17+
)
18+
1419
class AvroSchemaTypeDefinitionExtractor(recordUnderlyingType: TypedClass) {
1520

1621
import scala.jdk.CollectionConverters._

utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/AvroSinkValueParameterTest.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import pl.touk.nussknacker.engine.definition.component.parameter.StandardParamet
1414
import pl.touk.nussknacker.engine.graph.expression.Expression
1515
import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer._
1616
import pl.touk.nussknacker.engine.schemedkafka.schema.AvroSchemaBasedParameter
17+
import pl.touk.nussknacker.engine.schemedkafka.typed.AvroSchemaTypeDefinitionExtractorWithUnderlyingMap
1718
import pl.touk.nussknacker.engine.util.parameters.{SchemaBasedRecordParameter, SingleSchemaBasedParameter}
1819

1920
class AvroSchemaBasedParameterTest extends AnyFunSuite with Matchers {
@@ -55,7 +56,7 @@ class AvroSchemaBasedParameterTest extends AnyFunSuite with Matchers {
5556
.nullDefault()
5657
.endRecord()
5758

58-
val result = AvroSchemaBasedParameter(recordSchema, Set.empty)
59+
val result = AvroSchemaBasedParameter(recordSchema, Set.empty, AvroSchemaTypeDefinitionExtractorWithUnderlyingMap)
5960
.valueOr(e => fail(e.toString))
6061
.asInstanceOf[SchemaBasedRecordParameter]
6162
StandardParameterEnrichment.enrichParameterDefinitions(
@@ -83,7 +84,7 @@ class AvroSchemaBasedParameterTest extends AnyFunSuite with Matchers {
8384

8485
test("typing result to AvroSinkPrimitiveValueParameter") {
8586
val longSchema = SchemaBuilder.builder().longType()
86-
val result = AvroSchemaBasedParameter(longSchema, Set.empty)
87+
val result = AvroSchemaBasedParameter(longSchema, Set.empty, AvroSchemaTypeDefinitionExtractorWithUnderlyingMap)
8788
.valueOr(e => fail(e.toString))
8889
.asInstanceOf[SingleSchemaBasedParameter]
8990
StandardParameterEnrichment.enrichParameterDefinitions(
@@ -110,7 +111,8 @@ class AvroSchemaBasedParameterTest extends AnyFunSuite with Matchers {
110111
.longType()
111112
.noDefault()
112113
.endRecord()
113-
val result = AvroSchemaBasedParameter(recordSchema, restrictedNames)
114+
val result =
115+
AvroSchemaBasedParameter(recordSchema, restrictedNames, AvroSchemaTypeDefinitionExtractorWithUnderlyingMap)
114116
result shouldBe Invalid(
115117
NonEmptyList.one(
116118
CustomNodeError(

utils/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/AvroSinkValueTest.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import pl.touk.nussknacker.engine.api.parameter.ParameterName
88
import pl.touk.nussknacker.engine.api.typed.typing.Typed
99
import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer.sinkValueParamName
1010
import pl.touk.nussknacker.engine.schemedkafka.schema.AvroSchemaBasedParameter
11+
import pl.touk.nussknacker.engine.schemedkafka.typed.AvroSchemaTypeDefinitionExtractorWithUnderlyingMap
1112
import pl.touk.nussknacker.engine.util.sinkvalue.SinkValue
1213
import pl.touk.nussknacker.engine.util.sinkvalue.SinkValueData.{SinkRecordValue, SinkSingleValue, SinkValue}
1314

@@ -38,7 +39,10 @@ class AvroSinkValueTest extends AnyFunSuite with Matchers {
3839

3940
val parameterValues = Params.fromRawValuesMap(Map(ParameterName("a") -> value, ParameterName("b.c") -> value))
4041

41-
val sinkParam = AvroSchemaBasedParameter(recordSchema, Set.empty).valueOr(e => fail(e.toString))
42+
val sinkParam =
43+
AvroSchemaBasedParameter(recordSchema, Set.empty, AvroSchemaTypeDefinitionExtractorWithUnderlyingMap).valueOr(e =>
44+
fail(e.toString)
45+
)
4246

4347
val fields: Map[String, SinkValue] = SinkValue
4448
.applyUnsafe(sinkParam, parameterValues)
@@ -55,7 +59,10 @@ class AvroSinkValueTest extends AnyFunSuite with Matchers {
5559
val longSchema = SchemaBuilder.builder().longType()
5660
val value = LazyParameter.pure(java.lang.Long.valueOf(1L), Typed[java.lang.Long])
5761
val parameterValues = Params.fromRawValuesMap(Map(sinkValueParamName -> value))
58-
val sinkParam = AvroSchemaBasedParameter(longSchema, Set.empty).valueOr(e => fail(e.toString))
62+
val sinkParam =
63+
AvroSchemaBasedParameter(longSchema, Set.empty, AvroSchemaTypeDefinitionExtractorWithUnderlyingMap).valueOr(e =>
64+
fail(e.toString)
65+
)
5966

6067
SinkValue
6168
.applyUnsafe(sinkParam, parameterValues)

0 commit comments

Comments
 (0)