Schema Caching issue while Pushing Kafka Message #2282
Replies: 2 comments 1 reply
-
Hello, @santanirudha First of all, thank you very much for the detailed explanation. After checking the code, those artifact references are the same, and the equals method will return true that pair, and the value should be returned if it's present in the cache even if the globalId and the contentId are null. Since this can be considered a bug, do you mind opening a new issue with the same description that you added here (I can do it if you want)? Again, thank you very much! |
Beta Was this translation helpful? Give feedback.
-
Related: #2483 |
Beta Was this translation helpful? Give feedback.
-
I am using 2.2.0.Final version libraries of Apicurio. I configured KafkaProducer with Apicurio properties. I have set key serializer as StringSerializer and value serializer as JsonSchemaKafkaSerializer. When my producer serializes the message goes through serialize method in AbstractKafkaSerializer it fetches schema using:
SchemaLookupResult<T> schema = getSchemaResolver().resolveSchema(new KafkaSerdeRecord<>(resolverMetadata, data));
I am using DefaultSchemaResolver as my SchemaResolver. The resolveSchema method in DefaultSchemaResolver for the first time performs the following steps:
The next time when we try to push message into Kafka topic, the step1 works as expected, however in the step 2 since index1 now has ArtifactReference key fetched earlier hence the index1.get(key) should return non null value, but it returns null. The reason behind this is the ArtifactReference key in the index1 map is:
ArtifactReference [groupId=my-group, artifactId=share-price-json, version=1, globalId=1, contentId=1]
while the key we are looking for is:
ArtifactReference [groupId=my-group, artifactId=share-price-json, version=1, globalId=null, contentId=null]
In the step 5 the artifact reference is built with globalId and contentId as non null value but we check for ArtifactReference built without globalId and contentId. This is the reason the caching is never happening while producing Kafka message.
Note: This isn't the case while consuming message from Kafka. Here the caching is working perfectly since the deserialise method in AbstractKafkaDeserializer uses resolveSchemaByArtifactReference method in DefaultSchemaResolver which fetches schema using only the globalId and stores in index2 also with only globalId. Also I have identified that resolveSchemaByGlobalId method in AbstractSchemaResolver has the following code:
This commented code is actually making the caching working!
Can someone please verify the issue and comment?
Beta Was this translation helpful? Give feedback.
All reactions