Skip to content

Faust Generic Serializer  #108

@Irate-Walrus

Description

@Irate-Walrus

Feature Request✨
While I may be treating the package incorrectly, it appears that, by intention, there exists one serializer per schema. I was interested in a generic serializer that took any registered Record/AvroModel subclass and attempted to serialize/deserialize it.
Additionally the ability to register a specific schema to a confluent kafka topic value/key was desirable.

I quickly whipped up a example:

  • It operates by identifying the registered model using the Record namespace.
  • Models must be registered before serialization is attempted. This will register the schema to the confluent registry.
  • Will attempt to deserialize any avro message that contains a confluent schema id.
  • register_to_topic will attempt to register the schema as a confluent topic value or key schema.

Considerations 🔍️:

  • Does not inherit from Serializer or MessageSerializer (Due to perceived one schema constraint)
  • Not sure how reliable using Record namespace is for model identification
  • Potential to pre-register all of these model classes using some common baseclass? (Would also allow all subclasses to be pre-set to use this codec).

Questions 🤔:

  • It appears _loads and _dumps must be synchronous due to Faust compatibility, would there be anyway to use an AsyncSchemaRegistryClient and support async avro writers within these functions?

Notes 📝:

  • First ever public github feature request!! 🎉
  • Would love feedback and suggestions as I'm already using this myself
class AvroModelCodec(Codec):
    schemaregistry_client: SchemaRegistryClient
    ns_to_id: Dict[str, int] = dict()
    id_to_schema: Dict[int, Any] = dict()
    
    def __init__(self, client: SchemaRegistryClient) -> None:
        super().__init__()
        self.schemaregistry_client = client

    def register_to_topic(
        self, 
        topic: str,
        *,
        key_model: Optional[AvroModel] = None, 
        value_model: Optional[AvroModel] = None
    ) -> int:
        """ Register AvroModel to Codec, Schema Registry, and Confuent Kafka topic """
        if key_model and issubclass(key_model, AvroModel):
            self.schemaregistry_client.register('-'.join([topic, 'key']), key_model.avro_schema())
            schema_id = self.register_model(key_model)
        elif value_model and issubclass(value_model, AvroModel):
            self.schemaregistry_client.register('-'.join([topic, 'value']), value_model.avro_schema())
            schema_id = self.register_model(value_model)
        else:
            raise ValueError("No valid Input Model")
        return schema_id

    def register_model(self, model: AvroModel) -> int:
        """ Register AvroModel to Codec and Schema Registry """
        schema_dict = model.avro_schema_to_python()
        schema_id = self.schemaregistry_client.register(schema_dict['name'], model.avro_schema())
        self.ns_to_id[model._options.namespace] = schema_id
        self.id_to_schema[schema_id] = parse_schema(schema_dict)
        return schema_id

    def _dumps(self, model_dict: Dict[str, Any]) -> bytes:
        """ Serialize AvroModel Dict """
        # Identify registered model by faust namespace
        schema_id =  self.ns_to_id.get(model_dict['__faust']['ns'])

        if not schema_id:
            raise ValueError("Unregistered Model")

        with BytesIO() as payload:
            payload.write(struct.pack(">bI", MAGIC_BYTE, schema_id))
            schemaless_writer(payload, self.id_to_schema[schema_id], model_dict)
            return payload.getvalue()

    def _loads(self, message: bytes) -> Dict:
        """ Deserialize Message via Confluent Schema Id """
        if message is None:
            return None

        if len(message) <= 5:
            raise ValueError("message is too small to decode")
        
        with BytesIO(message) as payload:
            magic, schema_id = struct.unpack(">bI", payload.read(5))

            if magic != MAGIC_BYTE:
                raise ValueError("message does not start with magic byte")

            writer_schema = self.id_to_schema.get(schema_id)
            if not writer_schema:
                try:
                    schema = self.schemaregistry_client.get_by_id(schema_id)
                    writer_schema = parse_schema(schema.schema)
                    self.id_to_schema[schema_id] = parse_schema(schema.schema)
                except ClientError as e:
                        raise ValueError(f"unable to fetch schema with id {schema_id}: {e}")
            
            return schemaless_reader(payload, writer_schema)


avro_model_codec = AvroModelCodec(client=SchemaRegistryClient(url=config('KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL')))
codecs.register('avro-model', avro_model_codec)

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions