Open
Description
Problem
Identical Protobuf schemas are being flagged as not equal. Hence we are receiving "Schema not found" (HTTP status 404, SR code 40403) when looking up a schema when producing a message.
- (Scneario 1) To resolve this, we need to set normalize.schemas = TRUE in the ProtobufSerializer.
- (Scenario 2) For Protobuf Structs, it still does not work even with normalize.schemas = TRUE. Removing the struct resolves the issue.
Steps to reproduce scenario 1
- Create protobuf schema without struct
- Add protobuf schema to confluent schema registry via confluent cloud
- Use confluent-kafka-python to publish kafka message with ProtobufSerializer normalize.schemas = FALSE to schema registry (hosted on confluent cloud)
- Schema registry returns 404
Steps to reproduce scenario 2
- Create protobuf schema with struct
- Add protobuf schema to confluent schema registry via confluent cloud
- Use confluent-kafka-python to publish kafka message with ProtobufSerializer normalize.schemas = TRUE to schema registry (hosted on confluent cloud)
- Schema registry returns 404
Code snippet for scenario 2
from uuid import uuid4
import protobuf.approvals_create_queue_pb2 as approvals_create_queue_pb2
from confluent_kafka import Producer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.protobuf import ProtobufSerializer
from confluent_kafka.serialization import (
MessageField,
SerializationContext,
StringSerializer,
)
def delivery_report(err, msg):
"""
Reports the failure or success of a message delivery.
Args:
err (KafkaError): The error that occurred on None on success.
msg (Message): The message that was produced or failed.
"""
if err is not None:
print("Delivery failed for User record {}: {}".format(msg.key(), err))
return
print(
'User record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()
)
)
def main():
topic = "test-topic"
schema_registry_conf = {
'url': 'https://***.***.azure.confluent.cloud',
'basic.auth.user.info': '***',
}
bootstrap_servers = "***.***.gcp.confluent.cloud:9092"
producer_conf = {
'bootstrap.servers': bootstrap_servers,
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '***',
'sasl.password': '***',
}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
string_serializer = StringSerializer('utf8')
protobuf_serializer = ProtobufSerializer(
approvals_create_queue_pb2.ApprovalsCreateQueue,
schema_registry_client,
{
'use.deprecated.format': False,
'normalize.schemas': True,
'auto.register.schemas': False,
'use.latest.version': False,
},
)
producer = Producer(producer_conf)
print("Producing user records to topic {}. ^C to exit.".format(topic))
while True:
# Serve on_delivery callbacks from previous calls to produce()
producer.poll(0.0)
try:
user = approvals_create_queue_pb2.ApprovalsCreateQueue(
scope='test_scope',
org_id=1,
request_id='test_request_id',
request_user_id=1,
payload={
'test_key': 'test_value',
},
user_id=1,
)
producer.produce(
topic=topic,
partition=0,
key=string_serializer(str(uuid4())),
value=protobuf_serializer(
user, SerializationContext(topic, MessageField.VALUE)
),
on_delivery=delivery_report,
)
break
except (KeyboardInterrupt, EOFError):
break
except ValueError:
print("Invalid input, discarding record...")
continue
print("\nFlushing records...")
producer.flush()
if __name__ == '__main__':
main()
Protobuf schema with struct
syntax = "proto3";
package test.approvals_create_queue;
import "google/protobuf/struct.proto";
message ApprovalsCreateQueue {
string scope = 1;
int32 org_id = 2;
string request_id = 3;
int32 request_user_id = 4;
google.protobuf.Struct payload = 5;
int32 user_id = 6;
}
confluent-kafka-python v2.6.1 (I've also tested with v1.8.2)
Metadata
Assignees
Labels
No labels
Activity