-
-
Notifications
You must be signed in to change notification settings - Fork 59
Description
I am building a streaming application designed to ingest messages of varying schemas and analyse a subset of their fields, based on information held within the schema
E.g. Any producer can publish a message to the topic this service consumes from (as long as the message is an avro message and its schema is registered in the shared Schema Registry), and this app will analyse all string fields and publish results
Decoding the message works well using Faust's raw codec and this library's AvroMessageSerializer, but this doesn't provide access to the schema definition, which is generated within the MessageSerializer.decode_message method
To receive a message and access the decoded message plus the schema, I have extracted the relevant part from the MessageSerializer.decode_message method as a standalone function
import struct
from schema_registry.serializers.errors import SerializerError
from schema_registry.serializers.message_serializer import ContextStringIO, MAGIC_BYTE
def decode_schema_id(message: bytes) -> int:
"""
Decode the schema ID from a message from kafka that has been encoded for use with the schema registry.
This function is an extension to the python-schema-registry-client, which only provides the deserialised message.
Args:
message: message to be decoded
Returns:
dict: The ID of the schema the message was encoded with
"""
if len(message) <= 5:
raise SerializerError("message is too small to decode")
with ContextStringIO(message) as payload:
magic, schema_id = struct.unpack(">bI", payload.read(5))
if magic != MAGIC_BYTE:
raise SerializerError("message does not start with magic byte")
return schema_idFrom the message bytes, I can then access the schema ID, then use the SchemaRegistryClient to get the schema and use the AvroMessageSerializer to get the decoded message
app = faust.App('analysis-app', broker='KAFKA_BROKER_URL')
in_topic = app.topic('in-data', value_serializer='raw')
out_topic = app.topic('out-data')
sr_client = SchemaRegistryClient(url='SCHEMA_REGISTRY_URL')
avro_serdes = AvroMessageSerializer(sr_client)
@app.agent(in_topic)
async def analyse(stream):
async for message_bytes in stream:
in_schema_id = decode_schema_id(message_bytes)
in_schema = sr_client.get_by_id(in_schema_id)
message = avro_serdes.decode_message(message_bytes)
for field in in_schema.schema['fields']:
if field['type'] == 'string':
result = analyse(message[field['name']])
await out_topic.send(value=result)I would like to be able to get the Schema ID and / or the Schema from the decoded message via the MessageSerializer API
Access to these could be provided via additional methods like above, or perhaps a nicer API would be for MessageSerializer.decode_message to return an object containing the schema_id, the schema and the payload rather than just the payload especially given these are already extracted / retrieved within this method