Skip to content

MessageSerializer.encode... functions not serializing proper Avro #87

@xgamer4

Description

@xgamer4

Describe the bug
The MessageSerializer.encode... functions don't output proper Avro. When serializing a record using MessageSerializer/FaustSerializer the binary doesn't conform to proper Avro, and isn't parsed properly when read back from something that isn't a MessageSerializer instance.

This is particularly problematic when using registry-schema with Faust, as the recommended Faust integration examples use MessageSerializer. The expected behavior in such a case is that MessageSerializer will take the record and serialize it to Avro, which will then get passed to whatever Faust has for its backend (likely Kafka). Because it's not actually proper Avro, this means that any other program not backed by a MessageSerializer will try to read in the message and parse it (because from all appearances it should be proper Avro backed by a schema in a schema-registry) and fail.

To Reproduce

schema = {"type": "record",
          "name": "Example",
          "fields": [{"name": "ID", "type": "long"}],
          }
schema = AvroSchema(schema)
schema_client = SchemaRegistryClient(url='http://schema-registry:8081')
serializer = MessageSerializer(schema_client)
maybe_avro = serializer.encode_record_with_schema('example', schema, {'ID': 42})
fastavro_deserialized = fastavro.schemaless_reader(io.BytesIO(maybe_avro), schema.schema)
serializer_deserialized = serializer.decode_message(maybe_avro)
print(fastavro_deserialized)
print(serializer_deserialized)
exit()

this outputs:

{'ID': 0}
{'ID': 42}

Expected behavior
The above code should output:

{'ID': 42}
{'ID': 42}

Thoughts
So it looks like the reason this happens is because MessageSerializer appends the schema id to the front of the binary output (message_serializer.py, lines 103 on):

        with ContextStringIO() as outf:
            # Write the magic byte and schema ID in network byte order (big endian)
            outf.write(struct.pack(">bI", MAGIC_BYTE, schema_id))

            # write the record to the rest of the buffer
            writer(record, outf)

            return outf.getvalue()

This looks like it's done to keep track of the schema id for deserialization, but it breaks any other ways to deserialize the message.

I don't have any particularly compelling proposals for the bug fix unfortunately. Avro defines a header in the container object, and the proper/ideal way to handle this would be to include the id in the header (or just use the predefined avro.schema key to store the schema), and it looks like fastavro.writer supports header objects, but that would require transitioning the serialization logic to use fastavro.writer instead of fastavro.schemaless_writer. Probably simple... maybe... but I'm not sure about how reading it works or what else would need changed.

Alternatively if this break is intended, the Faust Integration document should be updated to note that this generates out-of-spec Avro that must be read by a MessageSerializer or manually modified to retrieve the proper Avro.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workinghelp wantedExtra attention is needed

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions