Skip to content

Commit

Permalink
We need to enable headers so that the bean type is sent for each kafk…
Browse files Browse the repository at this point in the history
…a message (#6022)
  • Loading branch information
EricWittmann authored Mar 6, 2025
1 parent 2aa24b9 commit 0e5fafd
Showing 1 changed file with 5 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.apicurio.registry.rest.client.models.IfArtifactExists;
import io.apicurio.registry.rest.client.models.VersionContent;
import io.apicurio.registry.rest.client.models.VersionMetaData;
import io.apicurio.registry.serde.config.KafkaSerdeConfig;
import io.apicurio.registry.serde.config.SerdeConfig;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaDeserializer;
import io.apicurio.registry.serde.jsonschema.JsonSchemaKafkaSerializer;
Expand Down Expand Up @@ -184,6 +185,8 @@ private static Producer<Object, Object> createKafkaProducer() {
// Use the Apicurio Registry provided Kafka Serializer for JSON Schema
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSchemaKafkaSerializer.class.getName());
// Send metadata in message headers (includes the schema coordinates and the message bean type)
props.putIfAbsent(KafkaSerdeConfig.ENABLE_HEADERS, true);

// Configure Service Registry location
props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
Expand Down Expand Up @@ -215,6 +218,8 @@ private static KafkaConsumer<Long, MessageBean> createKafkaConsumer() {
// Use the Apicurio Registry provided Kafka Deserializer for JSON Schema
props.putIfAbsent(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonSchemaKafkaDeserializer.class.getName());
// Read metadata from message headers (includes the schema coordinates and the message bean type)
props.putIfAbsent(KafkaSerdeConfig.ENABLE_HEADERS, true);

// Configure Service Registry location
props.putIfAbsent(SerdeConfig.REGISTRY_URL, REGISTRY_URL);
Expand Down

0 comments on commit 0e5fafd

Please sign in to comment.