@@ -19,11 +19,16 @@ package kafka
1919import (
2020 "context"
2121 "crypto/tls"
22+ "encoding/binary"
23+ "encoding/json"
2224 "fmt"
25+ "strings"
2326 "time"
2427
2528 "github.com/Shopify/sarama"
2629 "github.com/nats-io/nats-kafka/server/conf"
30+ "github.com/riferrei/srclient"
31+ "github.com/santhosh-tekuri/jsonschema/v5"
2732)
2833
2934// Message represents a Kafka message.
@@ -61,6 +66,11 @@ type saramaConsumer struct {
6166 consumeErrCh chan error
6267
6368 cancel context.CancelFunc
69+
70+ schemaRegistryOn bool
71+ schemaRegistryClient srclient.ISchemaRegistryClient
72+ schemaType srclient.SchemaType
73+ pbDeserializer pbDeserializer
6474}
6575
6676// NewConsumer returns a new Kafka Consumer.
@@ -76,6 +86,7 @@ func NewConsumer(cc conf.ConnectorConfig, dialTimeout time.Duration) (Consumer,
7686 sc .Net .SASL .User = cc .SASL .User
7787 sc .Net .SASL .Password = cc .SASL .Password
7888 }
89+
7990 if sc .Net .SASL .Enable && cc .SASL .InsecureSkipVerify {
8091 sc .Net .TLS .Enable = true
8192 sc .Net .TLS .Config = & tls.Config {
@@ -101,6 +112,23 @@ func NewConsumer(cc conf.ConnectorConfig, dialTimeout time.Duration) (Consumer,
101112 tlsSkipVerify : cc .SASL .InsecureSkipVerify ,
102113 }
103114
115+ // If schema registry url and subject name both are set, enable schema registry integration
116+ if cc .SchemaRegistryURL != "" && cc .SubjectName != "" {
117+ cons .schemaRegistryClient = srclient .CreateSchemaRegistryClient (cc .SchemaRegistryURL )
118+
119+ switch strings .ToUpper (cc .SchemaType ) {
120+ case srclient .Json .String ():
121+ cons .schemaType = srclient .Json
122+ case srclient .Protobuf .String ():
123+ cons .schemaType = srclient .Protobuf
124+ cons .pbDeserializer = newDeserializer ()
125+ default :
126+ cons .schemaType = srclient .Avro
127+ }
128+
129+ cons .schemaRegistryOn = true
130+ }
131+
104132 if cons .groupMode {
105133 cg , err := sarama .NewConsumerGroup (cc .Brokers , cc .GroupID , sc )
106134 if err != nil {
@@ -165,14 +193,24 @@ func (c *saramaConsumer) Fetch(ctx context.Context) (Message, error) {
165193 case <- ctx .Done ():
166194 return Message {}, ctx .Err ()
167195 case cmsg := <- c .fetchCh :
168- return Message {
169- Topic : cmsg .Topic ,
170- Partition : int (cmsg .Partition ),
171- Offset : cmsg .Offset ,
196+ var deserializedValue = cmsg .Value
197+ var err error
198+ if c .schemaRegistryOn {
199+ deserializedValue , err = c .deserializePayload (cmsg .Value )
200+ }
172201
173- Key : cmsg .Key ,
174- Value : cmsg .Value ,
175- }, nil
202+ if err == nil {
203+ return Message {
204+ Topic : cmsg .Topic ,
205+ Partition : int (cmsg .Partition ),
206+ Offset : cmsg .Offset ,
207+
208+ Key : cmsg .Key ,
209+ Value : deserializedValue ,
210+ }, nil
211+ } else {
212+ return Message {}, err
213+ }
176214 case loopErr := <- c .consumeErrCh :
177215 return Message {}, loopErr
178216 }
@@ -182,14 +220,24 @@ func (c *saramaConsumer) Fetch(ctx context.Context) (Message, error) {
182220 case <- ctx .Done ():
183221 return Message {}, ctx .Err ()
184222 case cmsg := <- c .pc .Messages ():
185- return Message {
186- Topic : cmsg .Topic ,
187- Partition : int (cmsg .Partition ),
188- Offset : cmsg .Offset ,
189-
190- Key : cmsg .Key ,
191- Value : cmsg .Value ,
192- }, nil
223+ var deserializedValue = cmsg .Value
224+ var err error
225+ if c .schemaRegistryOn {
226+ deserializedValue , err = c .deserializePayload (cmsg .Value )
227+ }
228+
229+ if err == nil {
230+ return Message {
231+ Topic : cmsg .Topic ,
232+ Partition : int (cmsg .Partition ),
233+ Offset : cmsg .Offset ,
234+
235+ Key : cmsg .Key ,
236+ Value : deserializedValue ,
237+ }, nil
238+ } else {
239+ return Message {}, err
240+ }
193241 }
194242}
195243
@@ -261,3 +309,68 @@ func (c *saramaConsumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sa
261309
262310 return nil
263311}
312+
313+ // Retrieve the schema of the message and deserialize it.
314+ func (c * saramaConsumer ) deserializePayload (payload []byte ) ([]byte , error ) {
315+ // first byte of the payload is 0
316+ if payload [0 ] != byte (0 ) {
317+ return nil , fmt .Errorf ("failed to deserialize payload: magic byte is not 0" )
318+ }
319+
320+ // next 4 bytes contain the schema id
321+ schemaID := binary .BigEndian .Uint32 (payload [1 :5 ])
322+ schema , err := c .schemaRegistryClient .GetSchema (int (schemaID ))
323+ if err != nil {
324+ return nil , err
325+ }
326+
327+ var value []byte
328+ switch c .schemaType {
329+ case srclient .Avro :
330+ value , err = c .deserializeAvro (schema , payload [5 :])
331+ case srclient .Json :
332+ value , err = c .validateJsonSchema (schema , payload [5 :])
333+ case srclient .Protobuf :
334+ value , err = c .pbDeserializer .Deserialize (schema , payload [5 :])
335+ }
336+
337+ if err != nil {
338+ return nil , err
339+ }
340+
341+ return value , nil
342+ }
343+
344+ func (c * saramaConsumer ) deserializeAvro (schema * srclient.Schema , cleanPayload []byte ) ([]byte , error ) {
345+ codec := schema .Codec ()
346+ native , _ , err := codec .NativeFromBinary (cleanPayload )
347+ if err != nil {
348+ return nil , fmt .Errorf ("unable to deserailize avro: %w" , err )
349+ }
350+ value , err := codec .TextualFromNative (nil , native )
351+ if err != nil {
352+ return nil , fmt .Errorf ("failed to convert to json: %w" , err )
353+ }
354+
355+ return value , nil
356+ }
357+
358+ func (c * saramaConsumer ) validateJsonSchema (schema * srclient.Schema , cleanPayload []byte ) ([]byte , error ) {
359+ jsc , err := jsonschema .CompileString ("schema.json" , schema .Schema ())
360+ if err != nil {
361+ return nil , fmt .Errorf ("unable to parse json schema: %w" , err )
362+ }
363+
364+ var parsedMessage interface {}
365+ err = json .Unmarshal (cleanPayload , & parsedMessage )
366+ if err != nil {
367+ return nil , fmt .Errorf ("unable to parse json message: %w" , err )
368+ }
369+
370+ err = jsc .Validate (parsedMessage )
371+ if err != nil {
372+ return nil , fmt .Errorf ("json message validation failed: %w" , err )
373+ }
374+
375+ return cleanPayload , nil
376+ }
0 commit comments