Skip to content

valid support for nested records in avro kafkaSink #8137

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package pl.touk.nussknacker.engine.schemedkafka.schema

object NestedRecord extends TestSchemaWithRecord {

val stringSchema: String =
"""
|{
| "type": "record",
| "name": "gskTest",
| "fields": [
| {
| "name": "mapSimple",
| "type": [
| "null",
| {
| "type": "record",
| "name": "messageSimple",
| "fields": [
| {
| "name": "id",
| "type": [
| "null",
| {
| "type": "record",
| "name": "messageSimple2",
| "fields": [
| {
| "name": "id",
| "type": [
| "null",
| "string"
| ],
| "default": null
| }
| ]
| }
| ],
| "default": null
| }
| ]
| }
| ],
| "default": null
| }
| ]
|}
""".stripMargin

val exampleData: Map[String, Any] = Map(
"mapSimple" -> Map("id" -> "id"),
)

val jsonMap: String =
s"""
|{
| mapSimple: {id: "id"}
|}
""".stripMargin

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ trait KafkaAvroSinkSpecMixin {
.register(fullnameTopic, FullNameV1.schema, 1, isKey = false)
.register(fullnameTopic, FullNameV2.schema, 2, isKey = false)
.register(fullnameTopic, PaymentV1.schema, 3, isKey = false)
.register(fullnameTopic, NestedRecord.schema, 4, isKey = false)
.build

val factory: SchemaRegistryClientFactory = MockSchemaRegistryClientFactory.confluentBased(schemaRegistryMockClient)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,27 @@ class UniversalKafkaSinkValidationSpec extends KafkaAvroSpecMixin with KafkaAvro
result.errors shouldBe Nil
}

test("should validate nested record") {
val result = validate(
sinkKeyParamName.value -> "".spel,
"mapSimple" -> """{id:{id:"10"}}""".spel,
sinkRawEditorParamName.value -> "false".spel,
sinkValidationModeParamName.value -> validationModeParam(ValidationMode.strict),
topicParamName.value -> s"'${KafkaAvroSinkMockSchemaRegistry.fullnameTopic}'".spel,
schemaVersionParamName.value -> "'4'".spel
)

result.errors shouldBe Nil
}

test("should validate latest version") {
val result = validate(
sinkKeyParamName.value -> "".spel,
sinkValueParamName.value -> PaymentV1.exampleData.toSpELLiteral.spel,
sinkRawEditorParamName.value -> "true".spel,
sinkValidationModeParamName.value -> validationModeParam(ValidationMode.strict),
topicParamName.value -> s"'${KafkaAvroSinkMockSchemaRegistry.fullnameTopic}'".spel,
schemaVersionParamName.value -> s"'${SchemaVersionOption.LatestOptionName}'".spel
schemaVersionParamName.value -> s"'3'".spel
)

result.errors shouldBe Nil
Expand Down Expand Up @@ -119,7 +132,7 @@ class UniversalKafkaSinkValidationSpec extends KafkaAvroSpecMixin with KafkaAvro
paramName = schemaVersionParamName,
label = None,
value = "'343543'",
values = List("'latest'", "'1'", "'2'", "'3'"),
values = List("'latest'", "'1'", "'2'", "'3'", "'4'"),
nodeId = "id"
) :: Nil
}
Expand All @@ -131,7 +144,7 @@ class UniversalKafkaSinkValidationSpec extends KafkaAvroSpecMixin with KafkaAvro
sinkRawEditorParamName.value -> "true".spel,
sinkValidationModeParamName.value -> validationModeParam(ValidationMode.strict),
topicParamName.value -> s"'${KafkaAvroSinkMockSchemaRegistry.fullnameTopic}'".spel,
schemaVersionParamName.value -> s"'${SchemaVersionOption.LatestOptionName}'".spel
schemaVersionParamName.value -> s"'3'".spel
)

result.errors shouldBe CustomNodeError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,27 @@ object AvroSchemaBasedParameter {
/*
We extract editor form from Avro schema
*/
def apply(schema: Schema, restrictedParamNames: Set[ParameterName])(
def apply(
schema: Schema,
restrictedParamNames: Set[ParameterName],
avroSchemaTypeDefinitionExtractor: AvroSchemaTypeDefinitionExtractor
)(
implicit nodeId: NodeId
): ValidatedNel[ProcessCompilationError, SchemaBasedParameter] =
toSchemaBasedParameter(schema, paramName = None, defaultValue = None, restrictedParamNames = restrictedParamNames)
toSchemaBasedParameter(
schema,
paramName = None,
defaultValue = None,
restrictedParamNames = restrictedParamNames,
avroSchemaTypeDefinitionExtractor
)

private def toSchemaBasedParameter(
schema: Schema,
paramName: Option[ParameterName],
defaultValue: Option[Expression],
restrictedParamNames: Set[ParameterName]
restrictedParamNames: Set[ParameterName],
avroSchemaTypeDefinitionExtractor: AvroSchemaTypeDefinitionExtractor
)(implicit nodeId: NodeId): ValidatedNel[ProcessCompilationError, SchemaBasedParameter] = {
import cats.implicits.{catsStdInstancesForList, toTraverseOps}

Expand Down Expand Up @@ -65,15 +76,16 @@ object AvroSchemaBasedParameter {
schema = recordField.schema(),
paramName = Some(concatName),
defaultValue,
restrictedParamNames
restrictedParamNames,
avroSchemaTypeDefinitionExtractor
)
}
sinkValueValidated.map(sinkValueParam => fieldName -> sinkValueParam)
}
listOfValidatedParams.sequence.map(l => ListMap(l: _*)).map(SchemaBasedRecordParameter)
}
} else {
Valid(AvroSinkSingleValueParameter(paramName, schema, defaultValue))
Valid(AvroSinkSingleValueParameter(paramName, schema, defaultValue, avroSchemaTypeDefinitionExtractor))
}
}

Expand All @@ -96,9 +108,10 @@ object AvroSinkSingleValueParameter {
def apply(
paramName: Option[ParameterName],
schema: Schema,
defaultValue: Option[Expression]
defaultValue: Option[Expression],
avroSchemaTypeDefinitionExtractor: AvroSchemaTypeDefinitionExtractor
): SingleSchemaBasedParameter = {
val typing = AvroSchemaTypeDefinitionExtractor.typeDefinition(schema)
val typing = avroSchemaTypeDefinitionExtractor.typeDefinition(schema)
val name = paramName.getOrElse(sinkValueParamName)
val parameter = (
if (schema.isNullable) Parameter.optional(name, typing) else Parameter(name, typing)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.serialization._
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.serialization.jsonpayload.ConfluentJsonPayloadKafkaSerializer
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.formatter.AvroMessageReader
import pl.touk.nussknacker.engine.schemedkafka.typed.AvroSchemaTypeDefinitionExtractor
import pl.touk.nussknacker.engine.schemedkafka.typed.{
AvroSchemaTypeDefinitionExtractor,
AvroSchemaTypeDefinitionExtractorWithUnderlyingMap
}
import pl.touk.nussknacker.engine.util.parameters.{SchemaBasedParameter, SingleSchemaBasedParameter}

sealed trait ParsedSchemaSupport[+S <: ParsedSchema] extends UniversalSchemaSupport {
Expand Down Expand Up @@ -114,7 +117,16 @@ class AvroSchemaSupport(kafkaConfig: KafkaConfig) extends ParsedSchemaSupport[Av
rawParameter: Parameter,
restrictedParamNames: Set[ParameterName]
)(implicit nodeId: NodeId): ValidatedNel[ProcessCompilationError, SchemaBasedParameter] =
extractParameter(schema, rawMode, validationMode, rawParameter, restrictedParamNames)
extractParameter(
schema,
rawMode,
validationMode,
rawParameter,
restrictedParamNames,
// We need custom AvroSchemaTypeDefinitionExtractor here as otherwise SpelExpressionValidator rises an errors
// when rawmode is off and sink schema contains nested records
AvroSchemaTypeDefinitionExtractorWithUnderlyingMap
)

override def extractParameterForTests(schema: ParsedSchema)(
implicit nodeId: NodeId
Expand All @@ -132,7 +144,8 @@ class AvroSchemaSupport(kafkaConfig: KafkaConfig) extends ParsedSchemaSupport[Av
rawMode: Boolean,
validationMode: ValidationMode,
rawParameter: Parameter,
restrictedParamNames: Set[ParameterName]
restrictedParamNames: Set[ParameterName],
avroSchemaTypeDefinitionExtractor: AvroSchemaTypeDefinitionExtractor = AvroSchemaTypeDefinitionExtractor
)(implicit nodeId: NodeId): ValidatedNel[ProcessCompilationError, SchemaBasedParameter] = {
if (rawMode) {
Validated.Valid(
Expand All @@ -142,7 +155,7 @@ class AvroSchemaSupport(kafkaConfig: KafkaConfig) extends ParsedSchemaSupport[Av
)
)
} else {
AvroSchemaBasedParameter(schema.cast().rawSchema(), restrictedParamNames)
AvroSchemaBasedParameter(schema.cast().rawSchema(), restrictedParamNames, avroSchemaTypeDefinitionExtractor)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pl.touk.nussknacker.engine.schemedkafka.sink

import cats.data.NonEmptyList
import io.confluent.kafka.schemaregistry.ParsedSchema
import org.apache.avro.generic.GenericRecord
import org.apache.flink.formats.avro.typeutils.NkSerializableParsedSchema
import pl.touk.nussknacker.engine.ModelConfig
import pl.touk.nussknacker.engine.api.{LazyParameter, MetaData, NodeId, Params}
Expand Down Expand Up @@ -41,6 +42,8 @@ import pl.touk.nussknacker.engine.util.sinkvalue.SinkValue
*/
object UniversalKafkaSinkFactory {

private val genericRecordClass = classOf[GenericRecord]

private val paramsDeterminedAfterSchema = List(
Parameter.optional[CharSequence](sinkKeyParamName).copy(isLazyParameter = true),
Parameter[Boolean](sinkRawEditorParamName).copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ package pl.touk.nussknacker.engine.schemedkafka.typed
import org.apache.avro.{LogicalTypes, Schema}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.generic.GenericData.EnumSymbol
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedClass, TypedNull, TypingResult}
import pl.touk.nussknacker.engine.api.typed.typing.{Typed, TypedClass, TypedNull, TypingResult, Unknown}

import java.nio.ByteBuffer
import java.time.{Instant, LocalDate, LocalTime}
import java.util.UUID

object AvroSchemaTypeDefinitionExtractor extends AvroSchemaTypeDefinitionExtractor(Typed.typedClass[GenericRecord])

object AvroSchemaTypeDefinitionExtractorWithUnderlyingMap
extends AvroSchemaTypeDefinitionExtractor(
Typed.genericTypeClass(classOf[java.util.Map[_, _]], List(Typed[String], Unknown))
)

class AvroSchemaTypeDefinitionExtractor(recordUnderlyingType: TypedClass) {

import scala.jdk.CollectionConverters._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import pl.touk.nussknacker.engine.definition.component.parameter.StandardParamet
import pl.touk.nussknacker.engine.graph.expression.Expression
import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer._
import pl.touk.nussknacker.engine.schemedkafka.schema.AvroSchemaBasedParameter
import pl.touk.nussknacker.engine.schemedkafka.typed.AvroSchemaTypeDefinitionExtractorWithUnderlyingMap
import pl.touk.nussknacker.engine.util.parameters.{SchemaBasedRecordParameter, SingleSchemaBasedParameter}

class AvroSchemaBasedParameterTest extends AnyFunSuite with Matchers {
Expand Down Expand Up @@ -55,7 +56,7 @@ class AvroSchemaBasedParameterTest extends AnyFunSuite with Matchers {
.nullDefault()
.endRecord()

val result = AvroSchemaBasedParameter(recordSchema, Set.empty)
val result = AvroSchemaBasedParameter(recordSchema, Set.empty, AvroSchemaTypeDefinitionExtractorWithUnderlyingMap)
.valueOr(e => fail(e.toString))
.asInstanceOf[SchemaBasedRecordParameter]
StandardParameterEnrichment.enrichParameterDefinitions(
Expand Down Expand Up @@ -83,7 +84,7 @@ class AvroSchemaBasedParameterTest extends AnyFunSuite with Matchers {

test("typing result to AvroSinkPrimitiveValueParameter") {
val longSchema = SchemaBuilder.builder().longType()
val result = AvroSchemaBasedParameter(longSchema, Set.empty)
val result = AvroSchemaBasedParameter(longSchema, Set.empty, AvroSchemaTypeDefinitionExtractorWithUnderlyingMap)
.valueOr(e => fail(e.toString))
.asInstanceOf[SingleSchemaBasedParameter]
StandardParameterEnrichment.enrichParameterDefinitions(
Expand All @@ -110,7 +111,8 @@ class AvroSchemaBasedParameterTest extends AnyFunSuite with Matchers {
.longType()
.noDefault()
.endRecord()
val result = AvroSchemaBasedParameter(recordSchema, restrictedNames)
val result =
AvroSchemaBasedParameter(recordSchema, restrictedNames, AvroSchemaTypeDefinitionExtractorWithUnderlyingMap)
result shouldBe Invalid(
NonEmptyList.one(
CustomNodeError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.api.typed.typing.Typed
import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransformer.sinkValueParamName
import pl.touk.nussknacker.engine.schemedkafka.schema.AvroSchemaBasedParameter
import pl.touk.nussknacker.engine.schemedkafka.typed.AvroSchemaTypeDefinitionExtractorWithUnderlyingMap
import pl.touk.nussknacker.engine.util.sinkvalue.SinkValue
import pl.touk.nussknacker.engine.util.sinkvalue.SinkValueData.{SinkRecordValue, SinkSingleValue, SinkValue}

Expand Down Expand Up @@ -38,7 +39,10 @@ class AvroSinkValueTest extends AnyFunSuite with Matchers {

val parameterValues = Params.fromRawValuesMap(Map(ParameterName("a") -> value, ParameterName("b.c") -> value))

val sinkParam = AvroSchemaBasedParameter(recordSchema, Set.empty).valueOr(e => fail(e.toString))
val sinkParam =
AvroSchemaBasedParameter(recordSchema, Set.empty, AvroSchemaTypeDefinitionExtractorWithUnderlyingMap).valueOr(e =>
fail(e.toString)
)

val fields: Map[String, SinkValue] = SinkValue
.applyUnsafe(sinkParam, parameterValues)
Expand All @@ -55,7 +59,10 @@ class AvroSinkValueTest extends AnyFunSuite with Matchers {
val longSchema = SchemaBuilder.builder().longType()
val value = LazyParameter.pure(java.lang.Long.valueOf(1L), Typed[java.lang.Long])
val parameterValues = Params.fromRawValuesMap(Map(sinkValueParamName -> value))
val sinkParam = AvroSchemaBasedParameter(longSchema, Set.empty).valueOr(e => fail(e.toString))
val sinkParam =
AvroSchemaBasedParameter(longSchema, Set.empty, AvroSchemaTypeDefinitionExtractorWithUnderlyingMap).valueOr(e =>
fail(e.toString)
)

SinkValue
.applyUnsafe(sinkParam, parameterValues)
Expand Down