Skip to content

Commit 20c1397

Browse files
committed
feat(kafka): Allow encoder to add avro schema id prefix
1 parent 9f15e23 commit 20c1397

35 files changed

Lines changed: 585 additions & 127 deletions
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Add optional `schema_id` to be prepended to the Avro message.
2+
3+
authors: yazasnyal

lib/codecs/src/decoding/format/avro.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ impl From<&AvroDeserializerOptions> for AvroSerializerOptions {
8282
fn from(value: &AvroDeserializerOptions) -> Self {
8383
Self {
8484
schema: value.schema.clone(),
85+
schema_id: None,
8586
}
8687
}
8788
}

lib/codecs/src/encoding/format/avro.rs

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,20 @@ pub struct AvroSerializerConfig {
1515

1616
impl AvroSerializerConfig {
1717
/// Creates a new `AvroSerializerConfig`.
18-
pub const fn new(schema: String) -> Self {
18+
pub const fn new(schema: String, schema_id: Option<i32>) -> Self {
1919
Self {
20-
avro: AvroSerializerOptions { schema },
20+
avro: AvroSerializerOptions { schema, schema_id },
2121
}
2222
}
2323

2424
/// Build the `AvroSerializer` from this configuration.
2525
pub fn build(&self) -> Result<AvroSerializer, BuildError> {
2626
let schema = apache_avro::Schema::parse_str(&self.avro.schema)
2727
.map_err(|error| format!("Failed building Avro serializer: {error}"))?;
28-
Ok(AvroSerializer { schema })
28+
Ok(AvroSerializer {
29+
schema,
30+
schema_id: self.avro.schema_id,
31+
})
2932
}
3033

3134
/// The data type of events that are accepted by `AvroSerializer`.
@@ -50,18 +53,27 @@ pub struct AvroSerializerOptions {
5053
))]
5154
#[configurable(metadata(docs::human_name = "Schema JSON"))]
5255
pub schema: String,
56+
/// Confluent Avro schema ID
57+
///
58+
/// When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
59+
/// containing a magic byte and a 4-byte big-endian schema ID).
60+
///
61+
/// [wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
62+
#[configurable(metadata(docs::examples = "42"))]
63+
pub schema_id: Option<i32>,
5364
}
5465

5566
/// Serializer that converts an `Event` to bytes using the Apache Avro format.
5667
#[derive(Debug, Clone)]
5768
pub struct AvroSerializer {
5869
schema: apache_avro::Schema,
70+
schema_id: Option<i32>,
5971
}
6072

6173
impl AvroSerializer {
6274
/// Creates a new `AvroSerializer`.
63-
pub const fn new(schema: apache_avro::Schema) -> Self {
64-
Self { schema }
75+
pub const fn new(schema: apache_avro::Schema, schema_id: Option<i32>) -> Self {
76+
Self { schema, schema_id }
6577
}
6678
}
6779

@@ -73,7 +85,13 @@ impl Encoder<Event> for AvroSerializer {
7385
let value = apache_avro::to_value(log)?;
7486
let value = value.resolve(&self.schema)?;
7587
let bytes = apache_avro::to_avro_datum(&self.schema, value)?;
88+
89+
if let Some(schema_id) = self.schema_id {
90+
buffer.put_slice(&[0x00]); // magic byte
91+
buffer.put_slice(&schema_id.to_be_bytes()); // schema id data
92+
}
7693
buffer.put_slice(&bytes);
94+
7795
Ok(())
7896
}
7997
}
@@ -105,12 +123,39 @@ mod tests {
105123
}
106124
"#}
107125
.to_owned();
108-
let config = AvroSerializerConfig::new(schema);
126+
let config = AvroSerializerConfig::new(schema, None);
109127
let mut serializer = config.build().unwrap();
110128
let mut bytes = BytesMut::new();
111129

112130
serializer.encode(event, &mut bytes).unwrap();
113131

114132
assert_eq!(bytes.freeze(), b"\0\x06bar".as_slice());
115133
}
134+
135+
#[test]
136+
fn serialize_avro_with_schema_id() {
137+
let event = Event::Log(LogEvent::from(btreemap! {
138+
"foo" => Value::from("bar")
139+
}));
140+
let schema = indoc! {r#"
141+
{
142+
"type": "record",
143+
"name": "Log",
144+
"fields": [
145+
{
146+
"name": "foo",
147+
"type": ["string"]
148+
}
149+
]
150+
}
151+
"#}
152+
.to_owned();
153+
let config = AvroSerializerConfig::new(schema, Some(42));
154+
let mut serializer = config.build().unwrap();
155+
let mut bytes = BytesMut::new();
156+
157+
serializer.encode(event, &mut bytes).unwrap();
158+
159+
assert_eq!(bytes.freeze(), b"\0\0\0\0\x2A\0\x06bar".as_slice());
160+
}
116161
}

lib/codecs/src/encoding/serializer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ impl SerializerConfig {
288288
pub fn build(&self) -> Result<Serializer, Box<dyn std::error::Error + Send + Sync + 'static>> {
289289
match self {
290290
SerializerConfig::Avro { avro } => Ok(Serializer::Avro(
291-
AvroSerializerConfig::new(avro.schema.clone()).build()?,
291+
AvroSerializerConfig::new(avro.schema.clone(), avro.schema_id).build()?,
292292
)),
293293
SerializerConfig::Cef(config) => Ok(Serializer::Cef(config.build()?)),
294294
SerializerConfig::Csv(config) => Ok(Serializer::Csv(config.build()?)),
@@ -354,7 +354,7 @@ impl SerializerConfig {
354354
pub fn input_type(&self) -> DataType {
355355
match self {
356356
SerializerConfig::Avro { avro } => {
357-
AvroSerializerConfig::new(avro.schema.clone()).input_type()
357+
AvroSerializerConfig::new(avro.schema.clone(), avro.schema_id).input_type()
358358
}
359359
SerializerConfig::Cef(config) => config.input_type(),
360360
SerializerConfig::Csv(config) => config.input_type(),
@@ -377,7 +377,7 @@ impl SerializerConfig {
377377
pub fn schema_requirement(&self) -> schema::Requirement {
378378
match self {
379379
SerializerConfig::Avro { avro } => {
380-
AvroSerializerConfig::new(avro.schema.clone()).schema_requirement()
380+
AvroSerializerConfig::new(avro.schema.clone(), avro.schema_id).schema_requirement()
381381
}
382382
SerializerConfig::Cef(config) => config.schema_requirement(),
383383
SerializerConfig::Csv(config) => config.schema_requirement(),

lib/codecs/tests/avro.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ fn roundtrip_avro(data_path: PathBuf, schema_path: PathBuf, reserialize: bool) {
3838
let deserializer = AvroDeserializerConfig::new(schema.clone(), false)
3939
.build()
4040
.unwrap();
41-
let mut serializer = AvroSerializerConfig::new(schema.clone()).build().unwrap();
41+
let mut serializer = AvroSerializerConfig::new(schema.clone(), None)
42+
.build()
43+
.unwrap();
4244

4345
let (buf, event) = load_deserialize(&data_path, &deserializer);
4446

website/cue/reference/components/sinks/generated/amqp.cue

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,24 @@ generated: components: sinks: amqp: configuration: {
5555
description: "Apache Avro-specific encoder options."
5656
relevant_when: "codec = \"avro\""
5757
required: true
58-
type: object: options: schema: {
59-
description: "The Avro schema."
60-
required: true
61-
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
58+
type: object: options: {
59+
schema: {
60+
description: "The Avro schema."
61+
required: true
62+
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
63+
}
64+
schema_id: {
65+
description: """
66+
Confluent Avro schema ID
67+
68+
When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
69+
containing a magic byte and a 4-byte big-endian schema ID).
70+
71+
[wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
72+
"""
73+
required: false
74+
type: int: examples: ["42"]
75+
}
6276
}
6377
}
6478
cef: {

website/cue/reference/components/sinks/generated/aws_cloudwatch_logs.cue

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -251,10 +251,24 @@ generated: components: sinks: aws_cloudwatch_logs: configuration: {
251251
description: "Apache Avro-specific encoder options."
252252
relevant_when: "codec = \"avro\""
253253
required: true
254-
type: object: options: schema: {
255-
description: "The Avro schema."
256-
required: true
257-
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
254+
type: object: options: {
255+
schema: {
256+
description: "The Avro schema."
257+
required: true
258+
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
259+
}
260+
schema_id: {
261+
description: """
262+
Confluent Avro schema ID
263+
264+
When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
265+
containing a magic byte and a 4-byte big-endian schema ID).
266+
267+
[wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
268+
"""
269+
required: false
270+
type: int: examples: ["42"]
271+
}
258272
}
259273
}
260274
cef: {

website/cue/reference/components/sinks/generated/aws_kinesis_firehose.cue

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -230,10 +230,24 @@ generated: components: sinks: aws_kinesis_firehose: configuration: {
230230
description: "Apache Avro-specific encoder options."
231231
relevant_when: "codec = \"avro\""
232232
required: true
233-
type: object: options: schema: {
234-
description: "The Avro schema."
235-
required: true
236-
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
233+
type: object: options: {
234+
schema: {
235+
description: "The Avro schema."
236+
required: true
237+
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
238+
}
239+
schema_id: {
240+
description: """
241+
Confluent Avro schema ID
242+
243+
When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
244+
containing a magic byte and a 4-byte big-endian schema ID).
245+
246+
[wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
247+
"""
248+
required: false
249+
type: int: examples: ["42"]
250+
}
237251
}
238252
}
239253
cef: {

website/cue/reference/components/sinks/generated/aws_kinesis_streams.cue

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -230,10 +230,24 @@ generated: components: sinks: aws_kinesis_streams: configuration: {
230230
description: "Apache Avro-specific encoder options."
231231
relevant_when: "codec = \"avro\""
232232
required: true
233-
type: object: options: schema: {
234-
description: "The Avro schema."
235-
required: true
236-
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
233+
type: object: options: {
234+
schema: {
235+
description: "The Avro schema."
236+
required: true
237+
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
238+
}
239+
schema_id: {
240+
description: """
241+
Confluent Avro schema ID
242+
243+
When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
244+
containing a magic byte and a 4-byte big-endian schema ID).
245+
246+
[wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
247+
"""
248+
required: false
249+
type: int: examples: ["42"]
250+
}
237251
}
238252
}
239253
cef: {

website/cue/reference/components/sinks/generated/aws_s3.cue

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -438,10 +438,24 @@ generated: components: sinks: aws_s3: configuration: {
438438
description: "Apache Avro-specific encoder options."
439439
relevant_when: "codec = \"avro\""
440440
required: true
441-
type: object: options: schema: {
442-
description: "The Avro schema."
443-
required: true
444-
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
441+
type: object: options: {
442+
schema: {
443+
description: "The Avro schema."
444+
required: true
445+
type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"]
446+
}
447+
schema_id: {
448+
description: """
449+
Confluent Avro schema ID
450+
451+
When set, each message will use the [Confluent wire format][wire_format] (a 5-byte prefix
452+
containing a magic byte and a 4-byte big-endian schema ID).
453+
454+
[wire_format]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
455+
"""
456+
required: false
457+
type: int: examples: ["42"]
458+
}
445459
}
446460
}
447461
cef: {

0 commit comments

Comments
 (0)