diff --git a/examples/simple-json/src/main/java/io/apicurio/registry/examples/simple/json/SimpleJsonSchemaExample.java b/examples/simple-json/src/main/java/io/apicurio/registry/examples/simple/json/SimpleJsonSchemaExample.java index d34c35ee1f..ce5b0be884 100644 --- a/examples/simple-json/src/main/java/io/apicurio/registry/examples/simple/json/SimpleJsonSchemaExample.java +++ b/examples/simple-json/src/main/java/io/apicurio/registry/examples/simple/json/SimpleJsonSchemaExample.java @@ -22,6 +22,7 @@ import io.apicurio.registry.rest.client.models.IfArtifactExists; import io.apicurio.registry.rest.client.models.VersionContent; import io.apicurio.registry.rest.client.models.VersionMetaData; +import io.apicurio.registry.serde.config.KafkaSerdeConfig; import io.apicurio.registry.serde.config.SerdeConfig; import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer; import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer; @@ -184,6 +185,8 @@ private static Producer createKafkaProducer() { // Use the Apicurio Registry provided Kafka Serializer for JSON Schema props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSchemaKafkaSerializer.class.getName()); + // Send metadata in message headers (includes the schema coordinates and the message bean type) + props.putIfAbsent(KafkaSerdeConfig.ENABLE_HEADERS, true); // Configure Service Registry location props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL); @@ -215,6 +218,8 @@ private static KafkaConsumer createKafkaConsumer() { // Use the Apicurio Registry provided Kafka Deserializer for JSON Schema props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSchemaKafkaDeserializer.class.getName()); + // Read metadata from message headers (includes the schema coordinates and the message bean type) + props.putIfAbsent(KafkaSerdeConfig.ENABLE_HEADERS, true); // Configure Service Registry location props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);