Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/avro_encoder_add_schema_id_prefix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add optional `schema_id` to be prepended to the Avro message.

authors: yazasnyal
1 change: 1 addition & 0 deletions lib/codecs/src/decoding/format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl From<&AvroDeserializerOptions> for AvroSerializerOptions {
fn from(value: &AvroDeserializerOptions) -> Self {
Self {
schema: value.schema.clone(),
schema_id: None,
}
}
}
Expand Down
64 changes: 58 additions & 6 deletions lib/codecs/src/encoding/format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,27 @@ pub struct AvroSerializerConfig {

impl AvroSerializerConfig {
/// Creates a new `AvroSerializerConfig`.
pub const fn new(schema: String) -> Self {
pub const fn new(schema: String, schema_id: Option<i32>) -> Self {
Self {
avro: AvroSerializerOptions { schema },
avro: AvroSerializerOptions { schema, schema_id },
}
}

/// Build the `AvroSerializer` from this configuration.
pub fn build(&self) -> Result<AvroSerializer, BuildError> {
let schema = apache_avro::Schema::parse_str(&self.avro.schema)
.map_err(|error| format!("Failed building Avro serializer: {error}"))?;
Ok(AvroSerializer { schema })

if let Some(schema_id) = self.avro.schema_id
&& schema_id < 0
{
return Err("Confluent Avro schema id must be a positive i32 number".into());
}

Ok(AvroSerializer {
schema,
schema_id: self.avro.schema_id,
})
}

/// The data type of events that are accepted by `AvroSerializer`.
Expand All @@ -50,18 +60,27 @@ pub struct AvroSerializerOptions {
))]
#[configurable(metadata(docs::human_name = "Schema JSON"))]
pub schema: String,
/// Confluent Avro schema ID
///
/// When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
/// containing a magic byte and a 4-byte big-endian schema ID).
///
/// [wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
#[configurable(metadata(docs::examples = "42"))]
pub schema_id: Option<i32>,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject negative schema IDs in Avro encoder config

The new schema_id field is declared as Option<i32>, and encode writes whatever value is provided directly into the Confluent wire-format prefix. This permits negative IDs (for example -1), which do not correspond to valid Schema Registry IDs and will produce Avro payloads that downstream consumers or broker-side schema validation cannot resolve. This issue is triggered whenever a negative schema_id is configured, so the config should enforce non-negative values (for example u32 or explicit validation in build).

Useful? React with 👍 / 👎.

}

/// Serializer that converts an `Event` to bytes using the Apache Avro format.
#[derive(Debug, Clone)]
pub struct AvroSerializer {
schema: apache_avro::Schema,
schema_id: Option<i32>,
}

impl AvroSerializer {
/// Creates a new `AvroSerializer`.
pub const fn new(schema: apache_avro::Schema) -> Self {
Self { schema }
pub const fn new(schema: apache_avro::Schema, schema_id: Option<i32>) -> Self {
Self { schema, schema_id }
}
}

Expand All @@ -73,7 +92,13 @@ impl Encoder<Event> for AvroSerializer {
let value = apache_avro::to_value(log)?;
let value = value.resolve(&self.schema)?;
let bytes = apache_avro::to_avro_datum(&self.schema, value)?;

if let Some(schema_id) = self.schema_id {
buffer.put_slice(&[0x00]); // magic byte
buffer.put_slice(&schema_id.to_be_bytes()); // schema id data
}
buffer.put_slice(&bytes);

Ok(())
}
}
Expand Down Expand Up @@ -105,12 +130,39 @@ mod tests {
}
"#}
.to_owned();
let config = AvroSerializerConfig::new(schema);
let config = AvroSerializerConfig::new(schema, None);
let mut serializer = config.build().unwrap();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(bytes.freeze(), b"\0\x06bar".as_slice());
}

#[test]
fn serialize_avro_with_schema_id() {
let event = Event::Log(LogEvent::from(btreemap! {
"foo" => Value::from("bar")
}));
let schema = indoc! {r#"
{
"type": "record",
"name": "Log",
"fields": [
{
"name": "foo",
"type": ["string"]
}
]
}
"#}
.to_owned();
let config = AvroSerializerConfig::new(schema, Some(42));
let mut serializer = config.build().unwrap();
let mut bytes = BytesMut::new();

serializer.encode(event, &mut bytes).unwrap();

assert_eq!(bytes.freeze(), b"\0\0\0\0\x2A\0\x06bar".as_slice());
}
}
6 changes: 3 additions & 3 deletions lib/codecs/src/encoding/serializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl SerializerConfig {
pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
match self {
SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
AvroSerializerConfig::new(avro.schema.clone()).build()?,
AvroSerializerConfig::new(avro.schema.clone(), avro.schema_id).build()?,
)),
SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
Expand Down Expand Up @@ -354,7 +354,7 @@ impl SerializerConfig {
pub fn input_type(&self) -> DataType {
match self {
SerializerConfig::Avro { avro } => {
AvroSerializerConfig::new(avro.schema.clone()).input_type()
AvroSerializerConfig::new(avro.schema.clone(), avro.schema_id).input_type()
}
SerializerConfig::Cef(config) => config.input_type(),
SerializerConfig::Csv(config) => config.input_type(),
Expand All @@ -377,7 +377,7 @@ impl SerializerConfig {
pub fn schema_requirement(&self) -> schema::Requirement {
match self {
SerializerConfig::Avro { avro } => {
AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
AvroSerializerConfig::new(avro.schema.clone(), avro.schema_id).schema_requirement()
}
SerializerConfig::Cef(config) => config.schema_requirement(),
SerializerConfig::Csv(config) => config.schema_requirement(),
Expand Down
4 changes: 3 additions & 1 deletion lib/codecs/tests/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ fn roundtrip_avro(data_path: PathBuf, schema_path: PathBuf, reserialize: bool) {
let deserializer = AvroDeserializerConfig::new(schema.clone(), false)
.build()
.unwrap();
let mut serializer = AvroSerializerConfig::new(schema.clone()).build().unwrap();
let mut serializer = AvroSerializerConfig::new(schema.clone(), None)
.build()
.unwrap();

let (buf, event) = load_deserialize(&data_path, &deserializer);

Expand Down
22 changes: 18 additions & 4 deletions website/cue/reference/components/sinks/generated/amqp.cue
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,24 @@ generated: components: sinks: amqp: configuration: {
description: "Apache Avro-specific encoder options."
relevant_when: "codec = \"avro\""
required: true
type: object: options: schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
type: object: options: {
schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
}
schema_id: {
description: """
Confluent Avro schema ID

When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
containing a magic byte and a 4-byte big-endian schema ID).

[wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
"""
required: false
type: int: examples: ["42"]
}
}
}
cef: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,24 @@ generated: components: sinks: aws_cloudwatch_logs: configuration: {
description: "Apache Avro-specific encoder options."
relevant_when: "codec = \"avro\""
required: true
type: object: options: schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
type: object: options: {
schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
}
schema_id: {
description: """
Confluent Avro schema ID

When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
containing a magic byte and a 4-byte big-endian schema ID).

[wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
"""
required: false
type: int: examples: ["42"]
}
}
}
cef: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,24 @@ generated: components: sinks: aws_kinesis_firehose: configuration: {
description: "Apache Avro-specific encoder options."
relevant_when: "codec = \"avro\""
required: true
type: object: options: schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
type: object: options: {
schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
}
schema_id: {
description: """
Confluent Avro schema ID

When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
containing a magic byte and a 4-byte big-endian schema ID).

[wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
"""
required: false
type: int: examples: ["42"]
}
}
}
cef: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,24 @@ generated: components: sinks: aws_kinesis_streams: configuration: {
description: "Apache Avro-specific encoder options."
relevant_when: "codec = \"avro\""
required: true
type: object: options: schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
type: object: options: {
schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
}
schema_id: {
description: """
Confluent Avro schema ID

When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
containing a magic byte and a 4-byte big-endian schema ID).

[wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
"""
required: false
type: int: examples: ["42"]
}
}
}
cef: {
Expand Down
22 changes: 18 additions & 4 deletions website/cue/reference/components/sinks/generated/aws_s3.cue
Original file line number Diff line number Diff line change
Expand Up @@ -438,10 +438,24 @@ generated: components: sinks: aws_s3: configuration: {
description: "Apache Avro-specific encoder options."
relevant_when: "codec = \"avro\""
required: true
type: object: options: schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
type: object: options: {
schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
}
schema_id: {
description: """
Confluent Avro schema ID

When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
containing a magic byte and a 4-byte big-endian schema ID).

[wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
"""
required: false
type: int: examples: ["42"]
}
}
}
cef: {
Expand Down
22 changes: 18 additions & 4 deletions website/cue/reference/components/sinks/generated/aws_sns.cue
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,24 @@ generated: components: sinks: aws_sns: configuration: {
description: "Apache Avro-specific encoder options."
relevant_when: "codec = \"avro\""
required: true
type: object: options: schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
type: object: options: {
schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
}
schema_id: {
description: """
Confluent Avro schema ID

When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
containing a magic byte and a 4-byte big-endian schema ID).

[wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
"""
required: false
type: int: examples: ["42"]
}
}
}
cef: {
Expand Down
22 changes: 18 additions & 4 deletions website/cue/reference/components/sinks/generated/aws_sqs.cue
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,24 @@ generated: components: sinks: aws_sqs: configuration: {
description: "Apache Avro-specific encoder options."
relevant_when: "codec = \"avro\""
required: true
type: object: options: schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
type: object: options: {
schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
}
schema_id: {
description: """
Confluent Avro schema ID

When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
containing a magic byte and a 4-byte big-endian schema ID).

[wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
"""
required: false
type: int: examples: ["42"]
}
}
}
cef: {
Expand Down
22 changes: 18 additions & 4 deletions website/cue/reference/components/sinks/generated/azure_blob.cue
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,24 @@ generated: components: sinks: azure_blob: configuration: {
description: "Apache Avro-specific encoder options."
relevant_when: "codec = \"avro\""
required: true
type: object: options: schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
type: object: options: {
schema: {
description: "The Avro schema."
required: true
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
}
schema_id: {
description: """
Confluent Avro schema ID

When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
containing a magic byte and a 4-byte big-endian schema ID).

[wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
"""
required: false
type: int: examples: ["42"]
}
}
}
cef: {
Expand Down
Loading
Loading