From a860dcb0fb32e81bca544d86060afddb7b7379ad Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 10 Jan 2023 21:52:32 -0800 Subject: [PATCH 01/12] Demonstrate a bug for schema "validation" when writing JSON to a table I stumbled into this while pilfering code from kafka-delta-ingest for another project and discovered that the code in `write_values` which does `record_batch.schema() != arrow_schema` doesn't do what we think it does. Basically if `Decoder` "works" the schema it's going to return is just the schema passed into it. It has no bearing on whether the JSON has the same schema. Don't ask me why. Using the reader's `infer_json_schema_*` functions can provide a Schema that is useful for comparison: let mut value_iter = json_buffer.iter().map(|j| Ok(j.to_owned())); let json_schema = infer_json_schema_from_iterator(value_iter.clone()).expect("Failed to infer!"); let decoder = Decoder::new(Arc::new(json_schema), options); if let Some(batch) = decoder.next_batch(&mut value_iter).expect("Failed to create RecordBatch") { assert_eq!(batch.schema(), arrow_schema_ref, "Schemas don't match!"); } What's even more interesting, is that after a certain number of fields are removed, the Decoder no longer pretends it can Decode the JSON. I am baffled as to why. --- src/writer.rs | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/src/writer.rs b/src/writer.rs index bcb7657..ee003c1 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -1145,6 +1145,56 @@ mod tests { use serde_json::json; use std::path::Path; + #[tokio::test] + async fn test_schema_matching() { + let temp_dir = tempfile::tempdir().unwrap(); + let table_path = temp_dir.path(); + create_temp_table(table_path); + + let table = crate::delta_helpers::load_table(table_path.to_str().unwrap(), HashMap::new()) + .await + .unwrap(); + let mut writer = DataWriter::for_table(&table, HashMap::new()).unwrap(); + let rows: Vec = vec![json!({ + "meta": { + "kafka": { + "offset": 0, + "partition": 0, + "topic": "some_topic" + }, + "producer": { + "timestamp": "2021-06-22" + }, + }, + "twitch": "is the best", + "why" : "does this succeed?!", + // If `some_nested_list` is removed, the Decoder ends up outputing + // an error that gets interpreted as an EmptyRecordBatch + "some_nested_list": [[42], [84]], + "date": "2021-06-22" + })]; + let result = writer.write(rows).await; + assert!( + result.is_err(), + "Expected the write of our invalid schema rows to fail!\n{:?}", + result + ); + match result { + Ok(_) => unreachable!(), + Err(DataWriterError::SchemaMismatch { + record_batch_schema: _, + expected_schema: _, + }) => {} + Err(e) => { + assert!( + false, + "I was expecting a schema mismatch, got this instead: {:?}", + e + ); + } + } + } + #[tokio::test] async fn delta_stats_test() { let temp_dir = tempfile::tempdir().unwrap(); From 93e3d7362ca81923d3e6736153f67a05e57227d2 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 6 Jan 2024 14:33:54 -0800 Subject: [PATCH 02/12] Begin introducing schema conformance testing ahead of more substantial refactor The intention here is to enable more consistent schema handling within the writers Sponsored-by: Raft LLC --- src/writer.rs | 108 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 104 insertions(+), 4 deletions(-) diff --git a/src/writer.rs b/src/writer.rs index ee003c1..274e37f 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -1142,10 +1142,113 @@ fn timestamp_to_delta_stats_string(n: i64, time_unit: &TimeUnit) -> Option (DeltaTable, Schema) { + let schema = StructType::new(vec![ + StructField::new( + "id".to_string(), + DeltaDataType::Primitive(PrimitiveType::String), + true, + ), + StructField::new( + "value".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "modified".to_string(), + DeltaDataType::Primitive(PrimitiveType::String), + true, + ), + ]); + + let table = CreateBuilder::new() + .with_location(path.to_str().unwrap()) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(schema.fields().clone()) + .await + .unwrap(); + (table, schema) + } + + #[tokio::test] + async fn test_schema_strictness_column_type_mismatch() { + let temp_dir = tempfile::tempdir().unwrap(); + let (table, _schema) = get_fresh_table(&temp_dir.path()).await; + let mut writer = DataWriter::for_table(&table, HashMap::new()).unwrap(); + + let rows: Vec = vec![json!({ + "id" : "alpha", + "value" : 1, + })]; + let result = writer.write(rows).await; + assert!( + result.is_ok(), + "Expected the write to succeed!\n{:?}", + result + ); + + let rows: Vec = vec![json!({ + "id" : 1, + "value" : 1, + })]; + let result = writer.write(rows).await; + assert!( + result.is_err(), + "Expected the write with mismatched data to fail\n{:?}", + result + ); + } + + #[tokio::test] + async fn test_schema_strictness_with_additional_columns() { + let temp_dir = tempfile::tempdir().unwrap(); + let (mut table, _schema) = get_fresh_table(&temp_dir.path()).await; + let mut writer = DataWriter::for_table(&table, HashMap::new()).unwrap(); + + let rows: Vec = vec![json!({ + "id" : "alpha", + "value" : 1, + })]; + let result = writer.write(rows).await; + assert!( + result.is_ok(), + "Expected the write to succeed!\n{:?}", + result + ); + + let rows: Vec = vec![json!({ + "id" : "bravo", + "value" : 2, + "color" : "silver", + })]; + let result = writer.write(rows).await; + assert!( + result.is_ok(), + "Did not expect additive write to fail: {:?}", + result + ); + + // Reload the table to look at it + table.load().await.unwrap(); + let new_schema = table.schema().unwrap(); + let columns: Vec = new_schema + .fields() + .iter() + .map(|f| f.name().clone()) + .collect(); + assert_eq!(vec!["id", "value", "modified", "color"], columns); + } + #[tokio::test] + #[ignore] async fn test_schema_matching() { let temp_dir = tempfile::tempdir().unwrap(); let table_path = temp_dir.path(); @@ -1181,10 +1284,7 @@ mod tests { ); match result { Ok(_) => unreachable!(), - Err(DataWriterError::SchemaMismatch { - record_batch_schema: _, - expected_schema: _, - }) => {} + //Err(Box) => {}, Err(e) => { assert!( false, From f49b4f7188be7d8c612ab2719fcac8b1a07613eb Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 6 Jan 2024 19:51:32 -0800 Subject: [PATCH 03/12] Introduce DeserializedMessage for carrying schema information into the writers The DeserializedMessage carries optional inferred schema information along with the message itself. This is useful for understanding whether schema evolution hould happen "later" in the message processing pipeline. The downside of this behavior is that there will be performance impact as arrow_json does schema inference. Sponsored-by: Raft LLC --- src/coercions.rs | 13 +++-- src/dead_letters.rs | 15 +++-- src/lib.rs | 11 ++-- src/serialization.rs | 97 ++++++++++++++++++++++++++++++--- src/transforms.rs | 17 ++++-- src/value_buffers.rs | 37 ++++++------- src/writer.rs | 41 +++++++++----- tests/delta_partitions_tests.rs | 5 +- 8 files changed, 174 insertions(+), 62 deletions(-) diff --git a/src/coercions.rs b/src/coercions.rs index c69d255..de17594 100644 --- a/src/coercions.rs +++ b/src/coercions.rs @@ -6,6 +6,8 @@ use serde_json::Value; use std::collections::HashMap; use std::str::FromStr; +use crate::serialization::DeserializedMessage; + #[derive(Debug, Clone, PartialEq)] #[allow(unused)] enum CoercionNode { @@ -72,7 +74,7 @@ fn build_coercion_node(data_type: &DataType) -> Option { /// Applies all data coercions specified by the [`CoercionTree`] to the [`Value`]. /// Though it does not currently, this function should approximate or improve on the coercions applied by [Spark's `from_json`](https://spark.apache.org/docs/latest/api/sql/index.html#from_json) -pub(crate) fn coerce(value: &mut Value, coercion_tree: &CoercionTree) { +pub(crate) fn coerce(value: &mut DeserializedMessage, coercion_tree: &CoercionTree) { if let Some(context) = value.as_object_mut() { for (field_name, coercion) in coercion_tree.root.iter() { if let Some(value) = context.get_mut(field_name) { @@ -322,7 +324,7 @@ mod tests { let coercion_tree = create_coercion_tree(&delta_schema); - let mut messages = vec![ + let mut messages: Vec = vec![ json!({ "level1_string": "a", "level1_integer": 0, @@ -380,7 +382,10 @@ mod tests { // This is valid epoch micros, but typed as a string on the way in. We WON'T coerce it. "level1_timestamp": "1636668718000000", }), - ]; + ] + .into_iter() + .map(|f| f.into()) + .collect(); for message in messages.iter_mut() { coerce(message, &coercion_tree); @@ -447,7 +452,7 @@ mod tests { ]; for i in 0..messages.len() { - assert_eq!(messages[i], expected[i]); + assert_eq!(messages[i].clone().message(), expected[i]); } } } diff --git a/src/dead_letters.rs b/src/dead_letters.rs index a4e709e..e7f6e1f 100644 --- a/src/dead_letters.rs +++ b/src/dead_letters.rs @@ -14,6 +14,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use std::collections::HashMap; +use crate::serialization::DeserializedMessage; use crate::{transforms::TransformError, writer::*}; #[cfg(feature = "s3")] @@ -55,11 +56,11 @@ impl DeadLetter { /// Creates a dead letter from a failed transform. /// `base64_bytes` will always be `None`. - pub(crate) fn from_failed_transform(value: &Value, err: TransformError) -> Self { + pub(crate) fn from_failed_transform(value: &DeserializedMessage, err: TransformError) -> Self { let timestamp = Utc::now(); Self { base64_bytes: None, - json_string: Some(value.to_string()), + json_string: Some(value.clone().message().to_string()), error: Some(err.to_string()), timestamp: timestamp .timestamp_nanos_opt() @@ -286,9 +287,10 @@ impl DeadLetterQueue for DeltaSinkDeadLetterQueue { .map(|dl| { serde_json::to_value(dl) .map_err(|e| DeadLetterQueueError::SerdeJson { source: e }) - .and_then(|mut v| { + .and_then(|v| { self.transformer - .transform(&mut v, None as Option<&BorrowedMessage>)?; + // TODO: this can't be right, shouldn't this function takje DeserializedMessage + .transform(&mut v.clone().into(), None as Option<&BorrowedMessage>)?; Ok(v) }) }) @@ -297,7 +299,10 @@ impl DeadLetterQueue for DeltaSinkDeadLetterQueue { let version = self .delta_writer - .insert_all(&mut self.table, values) + .insert_all( + &mut self.table, + values.into_iter().map(|v| v.into()).collect(), + ) .await?; if self.write_checkpoints { diff --git a/src/lib.rs b/src/lib.rs index f3111aa..b60b68a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -45,7 +45,8 @@ mod dead_letters; mod delta_helpers; mod metrics; mod offsets; -mod serialization; +#[allow(missing_docs)] +pub mod serialization; mod transforms; mod value_buffers; /// Doc @@ -56,6 +57,7 @@ use crate::value_buffers::{ConsumedBuffers, ValueBuffers}; use crate::{ dead_letters::*, metrics::*, + serialization::*, transforms::*, writer::{DataWriter, DataWriterError}, }; @@ -207,8 +209,9 @@ pub enum IngestError { } /// Formats for message parsing -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub enum MessageFormat { + #[default] /// Parses messages as json and uses the inferred schema DefaultJson, @@ -733,7 +736,7 @@ struct IngestProcessor { coercion_tree: CoercionTree, table: DeltaTable, delta_writer: DataWriter, - value_buffers: ValueBuffers, + value_buffers: ValueBuffers, delta_partition_offsets: HashMap>, latency_timer: Instant, dlq: Box, @@ -864,7 +867,7 @@ impl IngestProcessor { async fn deserialize_message( &mut self, msg: &M, - ) -> Result + ) -> Result where M: Message + Send + Sync, { diff --git a/src/serialization.rs b/src/serialization.rs index 9dfc68b..55c2fff 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -8,12 +8,53 @@ use serde_json::Value; use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat}; +use deltalake_core::arrow::datatypes::Schema as ArrowSchema; + +/// Structure which contains the [serde_json::Value] and the inferred schema of the message +/// +/// The [ArrowSchema] helps with schema evolution +#[derive(Clone, Debug, Default, PartialEq)] +pub struct DeserializedMessage { + message: Value, + schema: Option, +} + +impl DeserializedMessage { + pub fn schema(&self) -> &Option { + &self.schema + } + pub fn message(self) -> Value { + self.message + } + pub fn get(&self, key: &str) -> Option<&Value> { + self.message.get(key) + } + pub fn as_object_mut(&mut self) -> Option<&mut serde_json::Map> { + self.message.as_object_mut() + } +} + +/// Allow for `.into()` on [Value] for ease of use +impl From for DeserializedMessage { + fn from(message: Value) -> Self { + // XXX: This seems wasteful, this function should go away, and the deserializers should + // infer straight from the buffer stream + let iter = vec![message.clone()].into_iter().map(|v| Ok(v)); + let schema = + match deltalake_core::arrow::json::reader::infer_json_schema_from_iterator(iter) { + Ok(schema) => Some(schema), + _ => None, + }; + Self { message, schema } + } +} + #[async_trait] pub(crate) trait MessageDeserializer { async fn deserialize( &mut self, message_bytes: &[u8], - ) -> Result; + ) -> Result; } pub(crate) struct MessageDeserializerFactory {} @@ -80,11 +121,15 @@ impl MessageDeserializerFactory { } } +#[derive(Clone, Debug, Default)] struct DefaultDeserializer {} #[async_trait] impl MessageDeserializer for DefaultDeserializer { - async fn deserialize(&mut self, payload: &[u8]) -> Result { + async fn deserialize( + &mut self, + payload: &[u8], + ) -> Result { let value: Value = match serde_json::from_slice(payload) { Ok(v) => v, Err(e) => { @@ -94,7 +139,41 @@ impl MessageDeserializer for DefaultDeserializer { } }; - Ok(value) + Ok(value.into()) + } +} + +#[cfg(test)] +mod default_tests { + use super::*; + + #[tokio::test] + async fn deserialize_with_schema() { + let mut deser = DefaultDeserializer::default(); + let message = deser + .deserialize(r#"{"hello" : "world"}"#.as_bytes()) + .await + .expect("Failed to deserialize trivial JSON"); + assert!( + message.schema().is_some(), + "The DeserializedMessage doesn't have a schema!" + ); + } + + #[tokio::test] + async fn deserialize_simple_json() { + #[derive(serde::Deserialize)] + struct HW { + hello: String, + } + + let mut deser = DefaultDeserializer::default(); + let message = deser + .deserialize(r#"{"hello" : "world"}"#.as_bytes()) + .await + .expect("Failed to deserialize trivial JSON"); + let value: HW = serde_json::from_value(message.message).expect("Failed to coerce"); + assert_eq!("world", value.hello); } } @@ -116,11 +195,11 @@ impl MessageDeserializer for AvroDeserializer { async fn deserialize( &mut self, message_bytes: &[u8], - ) -> Result { + ) -> Result { match self.decoder.decode_with_schema(Some(message_bytes)).await { Ok(drs) => match drs { Some(v) => match Value::try_from(v.value) { - Ok(v) => Ok(v), + Ok(v) => Ok(v.into()), Err(e) => Err(MessageDeserializationError::AvroDeserialization { dead_letter: DeadLetter::from_failed_deserialization( message_bytes, @@ -147,7 +226,7 @@ impl MessageDeserializer for AvroSchemaDeserializer { async fn deserialize( &mut self, message_bytes: &[u8], - ) -> Result { + ) -> Result { let reader_result = match &self.schema { None => apache_avro::Reader::new(Cursor::new(message_bytes)), Some(schema) => apache_avro::Reader::with_schema(schema, Cursor::new(message_bytes)), @@ -162,7 +241,7 @@ impl MessageDeserializer for AvroSchemaDeserializer { }; return match v { - Ok(value) => Ok(value), + Ok(value) => Ok(value.into()), Err(e) => Err(MessageDeserializationError::AvroDeserialization { dead_letter: DeadLetter::from_failed_deserialization( message_bytes, @@ -221,11 +300,11 @@ impl MessageDeserializer for JsonDeserializer { async fn deserialize( &mut self, message_bytes: &[u8], - ) -> Result { + ) -> Result { let decoder = self.decoder.borrow_mut(); match decoder.decode(Some(message_bytes)).await { Ok(drs) => match drs { - Some(v) => Ok(v.value), + Some(v) => Ok(v.value.into()), None => return Err(MessageDeserializationError::EmptyPayload), }, Err(e) => { diff --git a/src/transforms.rs b/src/transforms.rs index b4803b6..3cb83cc 100644 --- a/src/transforms.rs +++ b/src/transforms.rs @@ -1,3 +1,4 @@ +use crate::serialization::DeserializedMessage; use chrono::prelude::*; use jmespatch::{ functions::{ArgumentType, CustomFunction, Signature}, @@ -348,13 +349,13 @@ impl Transformer { /// The optional `kafka_message` must be provided to include well known Kafka properties in the value. pub(crate) fn transform( &self, - value: &mut Value, + value: &mut DeserializedMessage, kafka_message: Option<&M>, ) -> Result<(), TransformError> where M: Message, { - let data = Variable::try_from(value.clone())?; + let data = Variable::try_from(value.clone().message())?; match value.as_object_mut() { Some(map) => { @@ -378,7 +379,7 @@ impl Transformer { Ok(()) } _ => Err(TransformError::ValueNotAnObject { - value: value.to_owned(), + value: value.clone().message(), }), } } @@ -510,7 +511,7 @@ mod tests { #[test] fn transforms_with_substr() { - let mut test_value = json!({ + let test_value = json!({ "name": "A", "modified": "2021-03-16T14:38:58Z", }); @@ -524,6 +525,7 @@ mod tests { 0, None, ); + let mut test_value: DeserializedMessage = test_value.into(); let mut transforms = HashMap::new(); @@ -540,6 +542,7 @@ mod tests { let name = test_value.get("name").unwrap().as_str().unwrap(); let modified = test_value.get("modified").unwrap().as_str().unwrap(); + println!("TEST: {test_value:?}"); let modified_date = test_value.get("modified_date").unwrap().as_str().unwrap(); assert_eq!("A", name); @@ -567,7 +570,7 @@ mod tests { fn test_transforms_with_epoch_seconds_to_iso8601() { let expected_iso = "2021-07-20T23:18:18Z"; - let mut test_value = json!({ + let test_value = json!({ "name": "A", "epoch_seconds_float": 1626823098.51995, "epoch_seconds_int": 1626823098, @@ -584,6 +587,7 @@ mod tests { 0, None, ); + let mut test_value: DeserializedMessage = test_value.into(); let mut transforms = HashMap::new(); transforms.insert( @@ -640,7 +644,7 @@ mod tests { #[test] fn test_transforms_with_kafka_meta() { - let mut test_value = json!({ + let test_value = json!({ "name": "A", "modified": "2021-03-16T14:38:58Z", }); @@ -655,6 +659,7 @@ mod tests { None, ); + let mut test_value: DeserializedMessage = test_value.into(); let mut transforms = HashMap::new(); transforms.insert("_kafka_offset".to_string(), "kafka.offset".to_string()); diff --git a/src/value_buffers.rs b/src/value_buffers.rs index 6e61725..fe6ed7f 100644 --- a/src/value_buffers.rs +++ b/src/value_buffers.rs @@ -1,21 +1,20 @@ use crate::{DataTypeOffset, DataTypePartition, IngestError}; -use serde_json::Value; use std::collections::HashMap; /// Provides a single interface into the multiple [`ValueBuffer`] instances used to buffer data for each assigned partition. #[derive(Debug, Default)] -pub(crate) struct ValueBuffers { - buffers: HashMap, +pub(crate) struct ValueBuffers { + buffers: HashMap>, len: usize, } -impl ValueBuffers { +impl ValueBuffers { /// Adds a value to in-memory buffers and tracks the partition and offset. pub(crate) fn add( &mut self, partition: DataTypePartition, offset: DataTypeOffset, - value: Value, + value: T, ) -> Result<(), IngestError> { let buffer = self .buffers @@ -40,7 +39,7 @@ impl ValueBuffers { } /// Returns values, partition offsets and partition counts currently held in buffer and resets buffers to empty. - pub(crate) fn consume(&mut self) -> ConsumedBuffers { + pub(crate) fn consume(&mut self) -> ConsumedBuffers { let mut partition_offsets = HashMap::new(); let mut partition_counts = HashMap::new(); @@ -76,14 +75,14 @@ impl ValueBuffers { /// Buffer of values held in memory for a single Kafka partition. #[derive(Debug)] -struct ValueBuffer { +struct ValueBuffer { /// The offset of the last message stored in the buffer. last_offset: DataTypeOffset, - /// The buffer of [`Value`] instances. - values: Vec, + /// The buffer of `T` instances. + values: Vec, } -impl ValueBuffer { +impl ValueBuffer { /// Creates a new [`ValueBuffer`] to store messages from a Kafka partition. pub(crate) fn new() -> Self { Self { @@ -97,13 +96,13 @@ impl ValueBuffer { } /// Adds the value to buffer and stores its offset as the `last_offset` of the buffer. - pub(crate) fn add(&mut self, value: Value, offset: DataTypeOffset) { + pub(crate) fn add(&mut self, value: T, offset: DataTypeOffset) { self.last_offset = offset; self.values.push(value); } /// Consumes and returns the buffer and last offset so it may be written to delta and clears internal state. - pub(crate) fn consume(&mut self) -> Option<(Vec, DataTypeOffset)> { + pub(crate) fn consume(&mut self) -> Option<(Vec, DataTypeOffset)> { if !self.values.is_empty() { assert!(self.last_offset >= 0); Some((std::mem::take(&mut self.values), self.last_offset)) @@ -114,9 +113,9 @@ impl ValueBuffer { } /// A struct that wraps the data consumed from [`ValueBuffers`] before writing to a [`arrow::record_batch::RecordBatch`]. -pub(crate) struct ConsumedBuffers { - /// The vector of [`Value`] instances consumed. - pub(crate) values: Vec, +pub(crate) struct ConsumedBuffers { + /// The vector of `T` instances consumed. + pub(crate) values: Vec, /// A [`HashMap`] from partition to last offset represented by the consumed buffers. pub(crate) partition_offsets: HashMap, /// A [`HashMap`] from partition to number of messages consumed for each partition. @@ -133,7 +132,7 @@ mod tests { let mut buffers = ValueBuffers::default(); let mut add = |p, o| { buffers - .add(p, o, Value::String(format!("{}:{}", p, o))) + .add(p, o, serde_json::Value::String(format!("{}:{}", p, o))) .unwrap(); }; @@ -188,7 +187,7 @@ mod tests { #[test] fn value_buffers_conflict_offsets_test() { - let mut buffers = ValueBuffers::default(); + let mut buffers: ValueBuffers = ValueBuffers::default(); let verify_error = |res: Result<(), IngestError>, o: i64| { match res.err().unwrap() { @@ -234,7 +233,7 @@ mod tests { ); } - fn add(buffers: &mut ValueBuffers, offset: i64) -> Result<(), IngestError> { - buffers.add(0, offset, Value::Number(offset.into())) + fn add(buffers: &mut ValueBuffers, offset: i64) -> Result<(), IngestError> { + buffers.add(0, offset, serde_json::Value::Number(offset.into())) } } diff --git a/src/writer.rs b/src/writer.rs index 274e37f..94d5659 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -40,6 +40,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use uuid::Uuid; use crate::cursor::InMemoryWriteableCursor; +use crate::serialization::DeserializedMessage; const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__"; @@ -388,10 +389,15 @@ impl DataWriter { } /// Writes the given values to internal parquet buffers for each represented partition. - pub async fn write(&mut self, values: Vec) -> Result<(), Box> { + pub async fn write( + &mut self, + values: Vec, + ) -> Result<(), Box> { let mut partial_writes: Vec<(Value, ParquetError)> = Vec::new(); let arrow_schema = self.arrow_schema(); + let values = values.into_iter().map(|v| v.message()).collect(); + for (key, values) in self.divide_by_partition_values(values)? { match self.arrow_writers.get_mut(&key) { Some(writer) => collect_partial_write_failure( @@ -580,7 +586,7 @@ impl DataWriter { pub async fn insert_all( &mut self, table: &mut DeltaTable, - values: Vec, + values: Vec, ) -> Result> { self.write(values).await?; let mut adds = self.write_parquet_files(&table.table_uri()).await?; @@ -1184,10 +1190,11 @@ mod tests { let (table, _schema) = get_fresh_table(&temp_dir.path()).await; let mut writer = DataWriter::for_table(&table, HashMap::new()).unwrap(); - let rows: Vec = vec![json!({ + let rows: Vec = vec![json!({ "id" : "alpha", "value" : 1, - })]; + }) + .into()]; let result = writer.write(rows).await; assert!( result.is_ok(), @@ -1195,10 +1202,11 @@ mod tests { result ); - let rows: Vec = vec![json!({ + let rows: Vec = vec![json!({ "id" : 1, "value" : 1, - })]; + }) + .into()]; let result = writer.write(rows).await; assert!( result.is_err(), @@ -1208,15 +1216,17 @@ mod tests { } #[tokio::test] + #[ignore] async fn test_schema_strictness_with_additional_columns() { let temp_dir = tempfile::tempdir().unwrap(); let (mut table, _schema) = get_fresh_table(&temp_dir.path()).await; let mut writer = DataWriter::for_table(&table, HashMap::new()).unwrap(); - let rows: Vec = vec![json!({ + let rows: Vec = vec![json!({ "id" : "alpha", "value" : 1, - })]; + }) + .into()]; let result = writer.write(rows).await; assert!( result.is_ok(), @@ -1224,11 +1234,12 @@ mod tests { result ); - let rows: Vec = vec![json!({ + let rows: Vec = vec![json!({ "id" : "bravo", "value" : 2, "color" : "silver", - })]; + }) + .into()]; let result = writer.write(rows).await; assert!( result.is_ok(), @@ -1258,7 +1269,7 @@ mod tests { .await .unwrap(); let mut writer = DataWriter::for_table(&table, HashMap::new()).unwrap(); - let rows: Vec = vec![json!({ + let rows: Vec = vec![json!({ "meta": { "kafka": { "offset": 0, @@ -1275,7 +1286,8 @@ mod tests { // an error that gets interpreted as an EmptyRecordBatch "some_nested_list": [[42], [84]], "date": "2021-06-22" - })]; + }) + .into()]; let result = writer.write(rows).await; assert!( result.is_err(), @@ -1306,7 +1318,10 @@ mod tests { .unwrap(); let mut writer = DataWriter::for_table(&table, HashMap::new()).unwrap(); - writer.write(JSON_ROWS.clone()).await.unwrap(); + writer + .write(JSON_ROWS.clone().into_iter().map(|r| r.into()).collect()) + .await + .unwrap(); let add = writer .write_parquet_files(&table.table_uri()) .await diff --git a/tests/delta_partitions_tests.rs b/tests/delta_partitions_tests.rs index 0e872a2..b96ef16 100644 --- a/tests/delta_partitions_tests.rs +++ b/tests/delta_partitions_tests.rs @@ -3,6 +3,7 @@ mod helpers; use deltalake_core::kernel::{Action, Add}; use deltalake_core::protocol::{DeltaOperation, SaveMode}; +use kafka_delta_ingest::serialization::DeserializedMessage; use kafka_delta_ingest::writer::*; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; @@ -122,10 +123,10 @@ async fn test_delta_partitions() { std::fs::remove_dir_all(&table_path).unwrap(); } -fn msgs_to_values(values: Vec) -> Vec { +fn msgs_to_values(values: Vec) -> Vec { values .iter() - .map(|j| serde_json::to_value(j).unwrap()) + .map(|j| serde_json::to_value(j).unwrap().into()) .collect() } From f8eb5613ec267dc06f345c841993d5f6e70402f2 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 7 Jan 2024 12:57:53 -0800 Subject: [PATCH 04/12] Put avro dependenciies behind the `avro` feature flag Turning avro off drops about 50 crates from the default build, so useful for development, but the code would need to be cleaned up to remove this from the default features list See #163 --- Cargo.toml | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 978f3a8..1ce8019 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,6 @@ edition = "2018" [dependencies] anyhow = "1" async-trait = "0.1" -apache-avro = "^0.14" base64 = "0.13" bytes = "1" chrono = "0.4.31" @@ -20,7 +19,6 @@ lazy_static = "1" log = "0" maplit = "1" rdkafka = { version = "0.28", features = ["ssl-vendored"] } -schema_registry_converter = { version = "3.1.0", features = ["easy", "json", "avro"] } serde = { version = "1", features = ["derive"] } serde_json = "1" strum = "0.20" @@ -46,13 +44,21 @@ rusoto_s3 = { version = "0.47", default-features = false, features = ["rustls"], # sentry sentry = { version = "0.23.0", optional = true } +# avro +apache-avro = { version = "^0.14", optional = true } +schema_registry_converter = { version = "3.1.0", optional = true, features = ["easy", "json", "avro"] } + # azure feature enabled, mostly used for tests azure_core = { version = "0.18.0", optional = true } azure_storage = { version = "0.18.0", optional = true } azure_storage_blobs = { version = "0.18.0", optional = true } [features] -default = [] +default = ["avro"] +avro = [ + "apache-avro", + "schema_registry_converter", +] sentry-ext = ["sentry"] dynamic-linking = [ "rdkafka/dynamic-linking" ] azure = [ From 4ed2c8aa2f424bf0f3fb275b5be778e2e0d15f72 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 7 Jan 2024 13:07:20 -0800 Subject: [PATCH 05/12] Remove unused dependencies Identified by `cargo +nightly udeps` --- Cargo.lock | 57 ++++++++++++------------------------------------------ Cargo.toml | 8 +++----- 2 files changed, 15 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3f8c08c..2acb2aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -164,7 +164,7 @@ dependencies = [ "regex", "serde", "serde_json", - "strum 0.24.1", + "strum", "strum_macros 0.24.3", "thiserror", "typed-builder", @@ -889,7 +889,7 @@ dependencies = [ [[package]] name = "deltalake-aws" version = "0.1.0" -source = "git+https://github.com/delta-io/delta-rs?branch=main#9264edea89a2fc1c35f4a6b9faab125748ff3651" +source = "git+https://github.com/delta-io/delta-rs?branch=main#8eeb127c12621bf6b93667911468fead007e331d" dependencies = [ "async-trait", "backoff", @@ -914,7 +914,7 @@ dependencies = [ [[package]] name = "deltalake-azure" version = "0.1.0" -source = "git+https://github.com/delta-io/delta-rs?branch=main#9264edea89a2fc1c35f4a6b9faab125748ff3651" +source = "git+https://github.com/delta-io/delta-rs?branch=main#8eeb127c12621bf6b93667911468fead007e331d" dependencies = [ "async-trait", "bytes", @@ -932,7 +932,7 @@ dependencies = [ [[package]] name = "deltalake-core" version = "0.17.0" -source = "git+https://github.com/delta-io/delta-rs?branch=main#9264edea89a2fc1c35f4a6b9faab125748ff3651" +source = "git+https://github.com/delta-io/delta-rs?branch=main#8eeb127c12621bf6b93667911468fead007e331d" dependencies = [ "arrow", "arrow-arith", @@ -1781,19 +1781,16 @@ dependencies = [ "rdkafka", "rusoto_core", "rusoto_credential", - "rusoto_s3", "schema_registry_converter", "sentry", "serde", "serde_json", "serial_test", - "strum 0.20.0", "strum_macros 0.20.1", "tempfile", "thiserror", "time 0.3.31", "tokio", - "tokio-stream", "tokio-util 0.6.10", "url", "utime", @@ -1872,9 +1869,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.151" +version = "0.2.152" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" +checksum = "13e3bf6590cbc649f4d1a3eefc9d5d6eb746f5200ffb04e5e142700b8faa56e7" [[package]] name = "libflate" @@ -2884,19 +2881,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "rusoto_s3" -version = "0.47.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "048c2fe811a823ad5a9acc976e8bf4f1d910df719dcf44b15c3e96c5b7a51027" -dependencies = [ - "async-trait", - "bytes", - "futures", - "rusoto_core", - "xml-rs", -] - [[package]] name = "rusoto_signature" version = "0.47.0" @@ -3268,9 +3252,9 @@ dependencies = [ [[package]] name = "serial_test" -version = "2.0.0" +version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e56dd856803e253c8f298af3f4d7eb0ae5e23a737252cd90bb4f3b435033b2d" +checksum = "953ad9342b3aaca7cb43c45c097dd008d4907070394bd0751a0aa8817e5a018d" dependencies = [ "dashmap", "futures", @@ -3282,9 +3266,9 @@ dependencies = [ [[package]] name = "serial_test_derive" -version = "2.0.0" +version = "3.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91d129178576168c589c9ec973feedf7d3126c01ac2bf08795109aa35b69fb8f" +checksum = "b93fb4adc70021ac1b47f7d45e8cc4169baaa7ea58483bc5b721d19a26202212" dependencies = [ "proc-macro2", "quote", @@ -3419,15 +3403,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "strsim" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" - -[[package]] -name = "strum" -version = "0.20.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7318c509b5ba57f18533982607f24070a55d353e90d4cae30c467cdb2ad5ac5c" +checksum = "ccbca6f34534eb78dbee83f6b2c9442fea7113f43d9e80ea320f0972ae5dc08d" [[package]] name = "strum" @@ -3688,17 +3666,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-stream" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.6.10" diff --git a/Cargo.toml b/Cargo.toml index 1ce8019..4413ea4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,11 +21,9 @@ maplit = "1" rdkafka = { version = "0.28", features = ["ssl-vendored"] } serde = { version = "1", features = ["derive"] } serde_json = "1" -strum = "0.20" strum_macros = "0.20" thiserror = "1" tokio = { version = "1", features = ["full"] } -tokio-stream = { version = "0", features = ["fs"] } tokio-util = "0.6.3" uuid = { version = "0.8", features = ["serde", "v4"] } url = "2.3" @@ -39,7 +37,6 @@ deltalake-azure = { git = "https://github.com/delta-io/delta-rs", branch = "main dynamodb_lock = { version = "0.6.0", optional = true } rusoto_core = { version = "0.47", default-features = false, features = ["rustls"], optional = true } rusoto_credential = { version = "0.47", optional = true } -rusoto_s3 = { version = "0.47", default-features = false, features = ["rustls"], optional = true } # sentry sentry = { version = "0.23.0", optional = true } @@ -54,7 +51,9 @@ azure_storage = { version = "0.18.0", optional = true } azure_storage_blobs = { version = "0.18.0", optional = true } [features] -default = ["avro"] +default = [ + "avro", +] avro = [ "apache-avro", "schema_registry_converter", @@ -72,7 +71,6 @@ s3 = [ "dynamodb_lock", "rusoto_core", "rusoto_credential", - "rusoto_s3", ] [dev-dependencies] From 72aff3363414806f2360a68c17f0955ad29bf445 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 7 Jan 2024 13:29:03 -0800 Subject: [PATCH 06/12] Remove avro from the default features and feature gate its cpde This change is a little wrapped up in the introduction of DeserializedMessage but the trade-off for development targeting S3 is that I am linking 382 crates every cycle as opposed to 451. Fixes #163 --- Cargo.lock | 14 ++ Cargo.toml | 2 +- src/lib.rs | 17 +- .../avro.rs} | 179 +---------------- src/serialization/mod.rs | 184 ++++++++++++++++++ tests/delta_partitions_tests.rs | 2 +- tests/deserialization_tests.rs | 15 +- 7 files changed, 231 insertions(+), 182 deletions(-) rename src/{serialization.rs => serialization/avro.rs} (52%) create mode 100644 src/serialization/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 2acb2aa..47823b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1781,6 +1781,7 @@ dependencies = [ "rdkafka", "rusoto_core", "rusoto_credential", + "rusoto_s3", "schema_registry_converter", "sentry", "serde", @@ -2881,6 +2882,19 @@ dependencies = [ "serde_json", ] +[[package]] +name = "rusoto_s3" +version = "0.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "048c2fe811a823ad5a9acc976e8bf4f1d910df719dcf44b15c3e96c5b7a51027" +dependencies = [ + "async-trait", + "bytes", + "futures", + "rusoto_core", + "xml-rs", +] + [[package]] name = "rusoto_signature" version = "0.47.0" diff --git a/Cargo.toml b/Cargo.toml index 4413ea4..a84be34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,6 @@ azure_storage_blobs = { version = "0.18.0", optional = true } [features] default = [ - "avro", ] avro = [ "apache-avro", @@ -78,6 +77,7 @@ utime = "0.3" serial_test = "*" tempfile = "3" time = "0.3.20" +rusoto_s3 = { version = "0.47", default-features = false, features = ["rustls"]} [profile.release] lto = true diff --git a/src/lib.rs b/src/lib.rs index b60b68a..3271c4f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -718,6 +718,7 @@ enum MessageDeserializationError { EmptyPayload, #[error("Kafka message deserialization failed")] JsonDeserialization { dead_letter: DeadLetter }, + #[cfg(feature = "avro")] #[error("Kafka message deserialization failed")] AvroDeserialization { dead_letter: DeadLetter }, } @@ -845,10 +846,18 @@ impl IngestProcessor { partition, offset ); } - Err( - MessageDeserializationError::JsonDeserialization { dead_letter } - | MessageDeserializationError::AvroDeserialization { dead_letter }, - ) => { + Err(MessageDeserializationError::JsonDeserialization { dead_letter }) => { + warn!( + "Deserialization failed - partition {}, offset {}, dead_letter {}", + partition, + offset, + dead_letter.error.as_ref().unwrap_or(&String::from("_")), + ); + self.ingest_metrics.message_deserialization_failed(); + self.dlq.write_dead_letter(dead_letter).await?; + } + #[cfg(feature = "avro")] + Err(MessageDeserializationError::AvroDeserialization { dead_letter }) => { warn!( "Deserialization failed - partition {}, offset {}, dead_letter {}", partition, diff --git a/src/serialization.rs b/src/serialization/avro.rs similarity index 52% rename from src/serialization.rs rename to src/serialization/avro.rs index 55c2fff..bd3e4ef 100644 --- a/src/serialization.rs +++ b/src/serialization/avro.rs @@ -1,192 +1,23 @@ use std::{borrow::BorrowMut, convert::TryFrom, io::Cursor, path::PathBuf}; +use super::{DeserializedMessage, MessageDeserializationError, MessageDeserializer}; +use crate::dead_letters::DeadLetter; use async_trait::async_trait; use schema_registry_converter::async_impl::{ easy_avro::EasyAvroDecoder, easy_json::EasyJsonDecoder, schema_registry::SrSettings, }; use serde_json::Value; -use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat}; - -use deltalake_core::arrow::datatypes::Schema as ArrowSchema; - -/// Structure which contains the [serde_json::Value] and the inferred schema of the message -/// -/// The [ArrowSchema] helps with schema evolution -#[derive(Clone, Debug, Default, PartialEq)] -pub struct DeserializedMessage { - message: Value, - schema: Option, -} - -impl DeserializedMessage { - pub fn schema(&self) -> &Option { - &self.schema - } - pub fn message(self) -> Value { - self.message - } - pub fn get(&self, key: &str) -> Option<&Value> { - self.message.get(key) - } - pub fn as_object_mut(&mut self) -> Option<&mut serde_json::Map> { - self.message.as_object_mut() - } -} - -/// Allow for `.into()` on [Value] for ease of use -impl From for DeserializedMessage { - fn from(message: Value) -> Self { - // XXX: This seems wasteful, this function should go away, and the deserializers should - // infer straight from the buffer stream - let iter = vec![message.clone()].into_iter().map(|v| Ok(v)); - let schema = - match deltalake_core::arrow::json::reader::infer_json_schema_from_iterator(iter) { - Ok(schema) => Some(schema), - _ => None, - }; - Self { message, schema } - } -} - -#[async_trait] -pub(crate) trait MessageDeserializer { - async fn deserialize( - &mut self, - message_bytes: &[u8], - ) -> Result; -} - -pub(crate) struct MessageDeserializerFactory {} -impl MessageDeserializerFactory { - pub fn try_build( - input_format: &MessageFormat, - ) -> Result, anyhow::Error> { - match input_format { - MessageFormat::Json(data) => match data { - crate::SchemaSource::None => Ok(Self::json_default()), - crate::SchemaSource::SchemaRegistry(sr) => { - match Self::build_sr_settings(sr).map(JsonDeserializer::from_schema_registry) { - Ok(s) => Ok(Box::new(s)), - Err(e) => Err(e), - } - } - crate::SchemaSource::File(_) => Ok(Self::json_default()), - }, - MessageFormat::Avro(data) => match data { - crate::SchemaSource::None => Ok(Box::::default()), - crate::SchemaSource::SchemaRegistry(sr) => { - match Self::build_sr_settings(sr).map(AvroDeserializer::from_schema_registry) { - Ok(s) => Ok(Box::new(s)), - Err(e) => Err(e), - } - } - crate::SchemaSource::File(f) => { - match AvroSchemaDeserializer::try_from_schema_file(f) { - Ok(s) => Ok(Box::new(s)), - Err(e) => Err(e), - } - } - }, - _ => Ok(Box::new(DefaultDeserializer {})), - } - } - - fn json_default() -> Box { - Box::new(DefaultDeserializer {}) - } - - fn build_sr_settings(registry_url: &url::Url) -> Result { - let mut url_string = registry_url.as_str(); - if url_string.ends_with('/') { - url_string = &url_string[0..url_string.len() - 1]; - } - - let mut builder = SrSettings::new_builder(url_string.to_owned()); - if let Ok(username) = std::env::var("SCHEMA_REGISTRY_USERNAME") { - builder.set_basic_authorization( - username.as_str(), - std::option_env!("SCHEMA_REGISTRY_PASSWORD"), - ); - } - - if let Ok(proxy_url) = std::env::var("SCHEMA_REGISTRY_PROXY") { - builder.set_proxy(proxy_url.as_str()); - } - - match builder.build() { - Ok(s) => Ok(s), - Err(e) => Err(anyhow::Error::new(e)), - } - } -} - -#[derive(Clone, Debug, Default)] -struct DefaultDeserializer {} - -#[async_trait] -impl MessageDeserializer for DefaultDeserializer { - async fn deserialize( - &mut self, - payload: &[u8], - ) -> Result { - let value: Value = match serde_json::from_slice(payload) { - Ok(v) => v, - Err(e) => { - return Err(MessageDeserializationError::JsonDeserialization { - dead_letter: DeadLetter::from_failed_deserialization(payload, e.to_string()), - }); - } - }; - - Ok(value.into()) - } -} - -#[cfg(test)] -mod default_tests { - use super::*; - - #[tokio::test] - async fn deserialize_with_schema() { - let mut deser = DefaultDeserializer::default(); - let message = deser - .deserialize(r#"{"hello" : "world"}"#.as_bytes()) - .await - .expect("Failed to deserialize trivial JSON"); - assert!( - message.schema().is_some(), - "The DeserializedMessage doesn't have a schema!" - ); - } - - #[tokio::test] - async fn deserialize_simple_json() { - #[derive(serde::Deserialize)] - struct HW { - hello: String, - } - - let mut deser = DefaultDeserializer::default(); - let message = deser - .deserialize(r#"{"hello" : "world"}"#.as_bytes()) - .await - .expect("Failed to deserialize trivial JSON"); - let value: HW = serde_json::from_value(message.message).expect("Failed to coerce"); - assert_eq!("world", value.hello); - } -} - -struct AvroDeserializer { +pub(crate) struct AvroDeserializer { decoder: EasyAvroDecoder, } #[derive(Default)] -struct AvroSchemaDeserializer { +pub(crate) struct AvroSchemaDeserializer { schema: Option, } -struct JsonDeserializer { +pub(crate) struct JsonDeserializer { decoder: EasyJsonDecoder, } diff --git a/src/serialization/mod.rs b/src/serialization/mod.rs new file mode 100644 index 0000000..4f45ee4 --- /dev/null +++ b/src/serialization/mod.rs @@ -0,0 +1,184 @@ +use async_trait::async_trait; +use serde_json::Value; + +use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat}; + +use deltalake_core::arrow::datatypes::Schema as ArrowSchema; + +#[cfg(feature = "avro")] +use schema_registry_converter::async_impl::schema_registry::SrSettings; + +#[cfg(feature = "avro")] +pub mod avro; +#[cfg(feature = "avro")] +use crate::serialization::avro::*; + +/// Structure which contains the [serde_json::Value] and the inferred schema of the message +/// +/// The [ArrowSchema] helps with schema evolution +#[derive(Clone, Debug, Default, PartialEq)] +pub struct DeserializedMessage { + message: Value, + schema: Option, +} + +impl DeserializedMessage { + pub fn schema(&self) -> &Option { + &self.schema + } + pub fn message(self) -> Value { + self.message + } + pub fn get(&self, key: &str) -> Option<&Value> { + self.message.get(key) + } + pub fn as_object_mut(&mut self) -> Option<&mut serde_json::Map> { + self.message.as_object_mut() + } +} + +/// Allow for `.into()` on [Value] for ease of use +impl From for DeserializedMessage { + fn from(message: Value) -> Self { + // XXX: This seems wasteful, this function should go away, and the deserializers should + // infer straight from the buffer stream + let iter = vec![message.clone()].into_iter().map(Ok); + let schema = + match deltalake_core::arrow::json::reader::infer_json_schema_from_iterator(iter) { + Ok(schema) => Some(schema), + _ => None, + }; + Self { message, schema } + } +} + +#[async_trait] +pub(crate) trait MessageDeserializer { + async fn deserialize( + &mut self, + message_bytes: &[u8], + ) -> Result; +} + +pub(crate) struct MessageDeserializerFactory {} +impl MessageDeserializerFactory { + pub fn try_build( + input_format: &MessageFormat, + ) -> Result, anyhow::Error> { + match input_format { + #[cfg(feature = "avro")] + MessageFormat::Json(data) => match data { + crate::SchemaSource::None => Ok(Self::json_default()), + crate::SchemaSource::SchemaRegistry(sr) => { + match Self::build_sr_settings(sr).map(JsonDeserializer::from_schema_registry) { + Ok(s) => Ok(Box::new(s)), + Err(e) => Err(e), + } + } + crate::SchemaSource::File(_) => Ok(Self::json_default()), + }, + #[cfg(feature = "avro")] + MessageFormat::Avro(data) => match data { + crate::SchemaSource::None => Ok(Box::::default()), + crate::SchemaSource::SchemaRegistry(sr) => { + match Self::build_sr_settings(sr).map(AvroDeserializer::from_schema_registry) { + Ok(s) => Ok(Box::new(s)), + Err(e) => Err(e), + } + } + crate::SchemaSource::File(f) => { + match AvroSchemaDeserializer::try_from_schema_file(f) { + Ok(s) => Ok(Box::new(s)), + Err(e) => Err(e), + } + } + }, + _ => Ok(Self::json_default()), + } + } + + fn json_default() -> Box { + Box::new(DefaultDeserializer {}) + } + + #[cfg(feature = "avro")] + fn build_sr_settings(registry_url: &url::Url) -> Result { + let mut url_string = registry_url.as_str(); + if url_string.ends_with('/') { + url_string = &url_string[0..url_string.len() - 1]; + } + + let mut builder = SrSettings::new_builder(url_string.to_owned()); + if let Ok(username) = std::env::var("SCHEMA_REGISTRY_USERNAME") { + builder.set_basic_authorization( + username.as_str(), + std::option_env!("SCHEMA_REGISTRY_PASSWORD"), + ); + } + + if let Ok(proxy_url) = std::env::var("SCHEMA_REGISTRY_PROXY") { + builder.set_proxy(proxy_url.as_str()); + } + + match builder.build() { + Ok(s) => Ok(s), + Err(e) => Err(anyhow::Error::new(e)), + } + } +} + +#[derive(Clone, Debug, Default)] +struct DefaultDeserializer {} + +#[async_trait] +impl MessageDeserializer for DefaultDeserializer { + async fn deserialize( + &mut self, + payload: &[u8], + ) -> Result { + let value: Value = match serde_json::from_slice(payload) { + Ok(v) => v, + Err(e) => { + return Err(MessageDeserializationError::JsonDeserialization { + dead_letter: DeadLetter::from_failed_deserialization(payload, e.to_string()), + }); + } + }; + + Ok(value.into()) + } +} + +#[cfg(test)] +mod default_tests { + use super::*; + + #[tokio::test] + async fn deserialize_with_schema() { + let mut deser = DefaultDeserializer::default(); + let message = deser + .deserialize(r#"{"hello" : "world"}"#.as_bytes()) + .await + .expect("Failed to deserialize trivial JSON"); + assert!( + message.schema().is_some(), + "The DeserializedMessage doesn't have a schema!" + ); + } + + #[tokio::test] + async fn deserialize_simple_json() { + #[derive(serde::Deserialize)] + struct HW { + hello: String, + } + + let mut deser = DefaultDeserializer::default(); + let message = deser + .deserialize(r#"{"hello" : "world"}"#.as_bytes()) + .await + .expect("Failed to deserialize trivial JSON"); + let value: HW = serde_json::from_value(message.message).expect("Failed to coerce"); + assert_eq!("world", value.hello); + } +} diff --git a/tests/delta_partitions_tests.rs b/tests/delta_partitions_tests.rs index b96ef16..4efd1fe 100644 --- a/tests/delta_partitions_tests.rs +++ b/tests/delta_partitions_tests.rs @@ -39,7 +39,7 @@ async fn test_delta_partitions() { "test_delta_partitions", ); - let mut table = deltalake_core::open_table(&table_path).await.unwrap(); + let table = deltalake_core::open_table(&table_path).await.unwrap(); let mut delta_writer = DataWriter::for_table(&table, HashMap::new()).unwrap(); let batch1 = vec![ diff --git a/tests/deserialization_tests.rs b/tests/deserialization_tests.rs index b1cead2..9862aa8 100644 --- a/tests/deserialization_tests.rs +++ b/tests/deserialization_tests.rs @@ -3,6 +3,7 @@ mod helpers; use kafka_delta_ingest::{IngestOptions, MessageFormat, SchemaSource}; use log::info; +#[cfg(feature = "avro")] use schema_registry_converter::{ async_impl::{ easy_avro::EasyAvroEncoder, @@ -15,8 +16,9 @@ use schema_registry_converter::{ use serde::{Deserialize, Serialize}; use serde_json::json; use serial_test::serial; -use std::path::PathBuf; -use std::str::FromStr; +#[cfg(feature = "avro")] +use std::{path::PathBuf, str::FromStr}; +#[cfg(feature = "avro")] use url::Url; const DEFAULT_AVRO_SCHEMA: &str = r#"{ @@ -115,6 +117,7 @@ async fn test_json_with_args() { rt.shutdown_background(); } +#[cfg(feature = "avro")] #[tokio::test] #[serial] async fn test_json_with_registry() { @@ -159,6 +162,7 @@ async fn test_json_with_registry() { rt.shutdown_background(); } +#[cfg(feature = "avro")] #[tokio::test] #[serial] async fn test_avro_default() { @@ -202,6 +206,7 @@ async fn test_avro_default() { rt.shutdown_background(); } +#[cfg(feature = "avro")] #[tokio::test] #[serial] async fn test_avro_with_file() { @@ -247,6 +252,7 @@ async fn test_avro_with_file() { rt.shutdown_background(); } +#[cfg(feature = "avro")] #[tokio::test] #[serial] async fn test_avro_with_registry() { @@ -298,22 +304,26 @@ struct TestMsg { name: String, } +#[cfg(feature = "avro")] fn default_settings() -> SrSettings { SrSettings::new(String::from(SCHEMA_REGISTRY_ADDRESS)) } +#[cfg(feature = "avro")] async fn avro_encode(item: impl Serialize, topic: String) -> Result, SRCError> { EasyAvroEncoder::new(default_settings()) .encode_struct(item, &SubjectNameStrategy::RecordNameStrategy(topic)) .await } +#[cfg(feature = "avro")] async fn json_encode(value: &serde_json::Value, topic: String) -> Result, SRCError> { EasyJsonEncoder::new(default_settings()) .encode(value, SubjectNameStrategy::RecordNameStrategy(topic)) .await } +#[cfg(feature = "avro")] async fn prepare_json_schema(topic: String) -> Result { let settings = default_settings(); let schema = SuppliedSchema { @@ -327,6 +337,7 @@ async fn prepare_json_schema(topic: String) -> Result Result { let settings = default_settings(); let schema = SuppliedSchema { From 071a6a28518a4af70a318da8c29620915f7e5fb3 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 7 Jan 2024 14:03:34 -0800 Subject: [PATCH 07/12] Relocate DataWriter in the writer.rs file for easier readability I don't know why the impl was way down there :smile: --- src/writer.rs | 312 +++++++++++++++++++++++++------------------------- 1 file changed, 156 insertions(+), 156 deletions(-) diff --git a/src/writer.rs b/src/writer.rs index 94d5659..27e64ad 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -180,162 +180,6 @@ pub struct DataWriter { arrow_writers: HashMap, } -/// Writes messages to an underlying arrow buffer. -pub(crate) struct DataArrowWriter { - arrow_schema: Arc, - writer_properties: WriterProperties, - cursor: InMemoryWriteableCursor, - arrow_writer: ArrowWriter, - partition_values: HashMap>, - null_counts: NullCounts, - buffered_record_batch_count: usize, -} - -impl DataArrowWriter { - /// Writes the given JSON buffer and updates internal state accordingly. - /// This method buffers the write stream internally so it can be invoked for many json buffers and flushed after the appropriate number of bytes has been written. - async fn write_values( - &mut self, - partition_columns: &[String], - arrow_schema: Arc, - json_buffer: Vec, - ) -> Result<(), Box> { - let record_batch = record_batch_from_json(arrow_schema.clone(), json_buffer.as_slice())?; - - if record_batch.schema() != arrow_schema { - return Err(Box::new(DataWriterError::SchemaMismatch { - record_batch_schema: record_batch.schema(), - expected_schema: arrow_schema, - })); - } - - let result = self - .write_record_batch(partition_columns, record_batch) - .await; - - match result { - Err(e) => match *e { - DataWriterError::Parquet { source } => { - self.write_partial(partition_columns, arrow_schema, json_buffer, source) - .await - } - _ => Err(e), - }, - Ok(_) => result, - } - } - - async fn write_partial( - &mut self, - partition_columns: &[String], - arrow_schema: Arc, - json_buffer: Vec, - parquet_error: ParquetError, - ) -> Result<(), Box> { - warn!("Failed with parquet error while writing record batch. Attempting quarantine of bad records."); - let (good, bad) = quarantine_failed_parquet_rows(arrow_schema.clone(), json_buffer)?; - let record_batch = record_batch_from_json(arrow_schema, good.as_slice())?; - self.write_record_batch(partition_columns, record_batch) - .await?; - info!( - "Wrote {} good records to record batch and quarantined {} bad records.", - good.len(), - bad.len() - ); - Err(Box::new(DataWriterError::PartialParquetWrite { - skipped_values: bad, - sample_error: parquet_error, - })) - } - - /// Writes the record batch in-memory and updates internal state accordingly. - /// This method buffers the write stream internally so it can be invoked for many record batches and flushed after the appropriate number of bytes has been written. - async fn write_record_batch( - &mut self, - partition_columns: &[String], - record_batch: RecordBatch, - ) -> Result<(), Box> { - if self.partition_values.is_empty() { - let partition_values = extract_partition_values(partition_columns, &record_batch)?; - self.partition_values = partition_values; - } - - // Copy current cursor bytes so we can recover from failures - let current_cursor_bytes = self.cursor.data(); - - let result = self.arrow_writer.write(&record_batch); - self.arrow_writer.flush()?; - - match result { - Ok(_) => { - self.buffered_record_batch_count += 1; - - apply_null_counts( - partition_columns, - &record_batch.into(), - &mut self.null_counts, - 0, - ); - Ok(()) - } - // If a write fails we need to reset the state of the DeltaArrowWriter - Err(e) => { - let new_cursor = Self::cursor_from_bytes(current_cursor_bytes.as_slice())?; - let _ = std::mem::replace(&mut self.cursor, new_cursor.clone()); - let arrow_writer = Self::new_underlying_writer( - new_cursor, - self.arrow_schema.clone(), - self.writer_properties.clone(), - )?; - let _ = std::mem::replace(&mut self.arrow_writer, arrow_writer); - self.partition_values.clear(); - - Err(e.into()) - } - } - } - - fn new( - arrow_schema: Arc, - writer_properties: WriterProperties, - ) -> Result { - let cursor = InMemoryWriteableCursor::default(); - let arrow_writer = Self::new_underlying_writer( - cursor.clone(), - arrow_schema.clone(), - writer_properties.clone(), - )?; - - let partition_values = HashMap::new(); - let null_counts = NullCounts::new(); - let buffered_record_batch_count = 0; - - Ok(Self { - arrow_schema, - writer_properties, - cursor, - arrow_writer, - partition_values, - null_counts, - buffered_record_batch_count, - }) - } - - fn cursor_from_bytes(bytes: &[u8]) -> Result { - let mut cursor = InMemoryWriteableCursor::default(); - cursor.write_all(bytes)?; - Ok(cursor) - } - - fn new_underlying_writer( - cursor: InMemoryWriteableCursor, - arrow_schema: Arc, - writer_properties: WriterProperties, - ) -> Result, ParquetError> { - ArrowWriter::try_new(cursor, arrow_schema, Some(writer_properties)) - } -} - impl DataWriter { /// Creates a DataWriter to write to the given table pub fn for_table( @@ -608,6 +452,162 @@ impl DataWriter { } } +/// Writes messages to an underlying arrow buffer. +pub(crate) struct DataArrowWriter { + arrow_schema: Arc, + writer_properties: WriterProperties, + cursor: InMemoryWriteableCursor, + arrow_writer: ArrowWriter, + partition_values: HashMap>, + null_counts: NullCounts, + buffered_record_batch_count: usize, +} + +impl DataArrowWriter { + /// Writes the given JSON buffer and updates internal state accordingly. + /// This method buffers the write stream internally so it can be invoked for many json buffers and flushed after the appropriate number of bytes has been written. + async fn write_values( + &mut self, + partition_columns: &[String], + arrow_schema: Arc, + json_buffer: Vec, + ) -> Result<(), Box> { + let record_batch = record_batch_from_json(arrow_schema.clone(), json_buffer.as_slice())?; + + if record_batch.schema() != arrow_schema { + return Err(Box::new(DataWriterError::SchemaMismatch { + record_batch_schema: record_batch.schema(), + expected_schema: arrow_schema, + })); + } + + let result = self + .write_record_batch(partition_columns, record_batch) + .await; + + match result { + Err(e) => match *e { + DataWriterError::Parquet { source } => { + self.write_partial(partition_columns, arrow_schema, json_buffer, source) + .await + } + _ => Err(e), + }, + Ok(_) => result, + } + } + + async fn write_partial( + &mut self, + partition_columns: &[String], + arrow_schema: Arc, + json_buffer: Vec, + parquet_error: ParquetError, + ) -> Result<(), Box> { + warn!("Failed with parquet error while writing record batch. Attempting quarantine of bad records."); + let (good, bad) = quarantine_failed_parquet_rows(arrow_schema.clone(), json_buffer)?; + let record_batch = record_batch_from_json(arrow_schema, good.as_slice())?; + self.write_record_batch(partition_columns, record_batch) + .await?; + info!( + "Wrote {} good records to record batch and quarantined {} bad records.", + good.len(), + bad.len() + ); + Err(Box::new(DataWriterError::PartialParquetWrite { + skipped_values: bad, + sample_error: parquet_error, + })) + } + + /// Writes the record batch in-memory and updates internal state accordingly. + /// This method buffers the write stream internally so it can be invoked for many record batches and flushed after the appropriate number of bytes has been written. + async fn write_record_batch( + &mut self, + partition_columns: &[String], + record_batch: RecordBatch, + ) -> Result<(), Box> { + if self.partition_values.is_empty() { + let partition_values = extract_partition_values(partition_columns, &record_batch)?; + self.partition_values = partition_values; + } + + // Copy current cursor bytes so we can recover from failures + let current_cursor_bytes = self.cursor.data(); + + let result = self.arrow_writer.write(&record_batch); + self.arrow_writer.flush()?; + + match result { + Ok(_) => { + self.buffered_record_batch_count += 1; + + apply_null_counts( + partition_columns, + &record_batch.into(), + &mut self.null_counts, + 0, + ); + Ok(()) + } + // If a write fails we need to reset the state of the DeltaArrowWriter + Err(e) => { + let new_cursor = Self::cursor_from_bytes(current_cursor_bytes.as_slice())?; + let _ = std::mem::replace(&mut self.cursor, new_cursor.clone()); + let arrow_writer = Self::new_underlying_writer( + new_cursor, + self.arrow_schema.clone(), + self.writer_properties.clone(), + )?; + let _ = std::mem::replace(&mut self.arrow_writer, arrow_writer); + self.partition_values.clear(); + + Err(e.into()) + } + } + } + + fn new( + arrow_schema: Arc, + writer_properties: WriterProperties, + ) -> Result { + let cursor = InMemoryWriteableCursor::default(); + let arrow_writer = Self::new_underlying_writer( + cursor.clone(), + arrow_schema.clone(), + writer_properties.clone(), + )?; + + let partition_values = HashMap::new(); + let null_counts = NullCounts::new(); + let buffered_record_batch_count = 0; + + Ok(Self { + arrow_schema, + writer_properties, + cursor, + arrow_writer, + partition_values, + null_counts, + buffered_record_batch_count, + }) + } + + fn cursor_from_bytes(bytes: &[u8]) -> Result { + let mut cursor = InMemoryWriteableCursor::default(); + cursor.write_all(bytes)?; + Ok(cursor) + } + + fn new_underlying_writer( + cursor: InMemoryWriteableCursor, + arrow_schema: Arc, + writer_properties: WriterProperties, + ) -> Result, ParquetError> { + ArrowWriter::try_new(cursor, arrow_schema, Some(writer_properties)) + } +} + /// Creates an Arrow RecordBatch from the passed JSON buffer. pub fn record_batch_from_json( arrow_schema: Arc, From baf969673665f068d12bbdd2875b3a992949d207 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 7 Jan 2024 14:04:04 -0800 Subject: [PATCH 08/12] Bring a -S option for schema evolution and bring that setting through --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/lib.rs | 102 ++++++++++++++++++++++++++++++----- src/main.rs | 8 ++- src/serialization/mod.rs | 40 +++++++++++--- src/writer.rs | 114 +++++++++++++++++++++++++++++++++++---- tests/emails_s3_tests.rs | 81 ++++++++++++++++++++++++++-- tests/helpers/mod.rs | 29 ++++++---- 8 files changed, 329 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 47823b5..eff1a76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1795,7 +1795,7 @@ dependencies = [ "tokio-util 0.6.10", "url", "utime", - "uuid 0.8.2", + "uuid 1.6.1", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index a84be34..02e3a7d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ strum_macros = "0.20" thiserror = "1" tokio = { version = "1", features = ["full"] } tokio-util = "0.6.3" -uuid = { version = "0.8", features = ["serde", "v4"] } +uuid = { version = "1.0", features = ["serde", "v4"] } url = "2.3" #deltalake = { version = "0.16.5", features = ["arrow", "json", "parquet"], optional = true } diff --git a/src/lib.rs b/src/lib.rs index 3271c4f..51ad5e7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,7 +59,7 @@ use crate::{ metrics::*, serialization::*, transforms::*, - writer::{DataWriter, DataWriterError}, + writer::{DataWriter, DataWriterError, DataWriterOptions}, }; use delta_helpers::*; use rdkafka::message::BorrowedMessage; @@ -296,6 +296,8 @@ pub struct IngestOptions { pub input_format: MessageFormat, /// Terminates when initial offsets are reached pub end_at_last_offsets: bool, + /// Enable schema evolution based on message contents + pub schema_evolution: bool, } impl Default for IngestOptions { @@ -317,6 +319,7 @@ impl Default for IngestOptions { statsd_endpoint: "localhost:8125".to_string(), input_format: MessageFormat::DefaultJson, end_at_last_offsets: false, + schema_evolution: false, } } } @@ -360,6 +363,10 @@ pub async fn start_ingest( None }; + if opts.schema_evolution { + warn!("Schema evolution has been enabled, kafka-delta-ingest will automatically extend the Delta schema based on message contents"); + } + // Initialize metrics let ingest_metrics = IngestMetrics::new(opts.statsd_endpoint.as_str())?; // Initialize partition assignment tracking @@ -489,6 +496,7 @@ pub async fn start_ingest( let timer = Instant::now(); match ingest_processor.complete_file(&partition_assignment).await { Err(IngestError::ConflictingOffsets) | Err(IngestError::DeltaSchemaChanged) => { + warn!("kafka-delta-ingest detected a schema change! resetting state and trying again"); ingest_processor.reset_state(&mut partition_assignment)?; continue; } @@ -759,8 +767,16 @@ impl IngestProcessor { let transformer = Transformer::from_transforms(&opts.transforms)?; let table = delta_helpers::load_table(table_uri, HashMap::new()).await?; let coercion_tree = coercions::create_coercion_tree(table.schema().unwrap()); - let delta_writer = DataWriter::for_table(&table, HashMap::new())?; - let deserializer = match MessageDeserializerFactory::try_build(&opts.input_format) { + let delta_writer = DataWriter::with_options( + &table, + DataWriterOptions { + schema_evolution: opts.schema_evolution, + }, + )?; + let deserializer = match MessageDeserializerFactory::try_build( + &opts.input_format, + opts.schema_evolution, + ) { Ok(deserializer) => deserializer, Err(e) => return Err(IngestError::UnableToCreateDeserializer { source: e }), }; @@ -957,21 +973,79 @@ impl IngestProcessor { return Err(IngestError::ConflictingOffsets); } - if self - .delta_writer - .update_schema(self.table.state.delta_metadata().unwrap())? - { - info!("Table schema has been updated"); - // Update the coercion tree to reflect the new schema - let coercion_tree = coercions::create_coercion_tree(self.table.schema().unwrap()); - let _ = std::mem::replace(&mut self.coercion_tree, coercion_tree); + let mut attempt_number: u32 = 0; + let mut actions = build_actions(&partition_offsets, self.opts.app_id.as_str(), add); + let table_schema = self.table.state.delta_metadata().unwrap(); + + // If schema evolution is enabled and then kafka-delta-ingest must ensure that the new + // `table_schema` is compatible with the evolved schema in the writer + if self.opts.schema_evolution && self.delta_writer.has_schema_changed() { + use deltalake_core::arrow::datatypes::Schema as ArrowSchema; + use deltalake_core::kernel::Schema; + use std::convert::TryFrom; + let arrow_schema: ArrowSchema = + >::try_from(&table_schema.schema) + .expect("Failed to convert arrow schema somehow"); + let merged_schema = ArrowSchema::try_merge(vec![ + arrow_schema, + self.delta_writer.arrow_schema().as_ref().clone(), + ]); + + if let Ok(merged) = merged_schema { + } else { + if self + .delta_writer + .update_schema(self.table.state.delta_metadata().unwrap())? + { + info!("Table schema has been updated"); + // Update the coercion tree to reflect the new schema + let coercion_tree = + coercions::create_coercion_tree(self.table.schema().unwrap()); + let _ = std::mem::replace(&mut self.coercion_tree, coercion_tree); + + return Err(IngestError::DeltaSchemaChanged); + } + } + } else { + if self + .delta_writer + .update_schema(self.table.state.delta_metadata().unwrap())? + { + info!("Table schema has been updated"); + // Update the coercion tree to reflect the new schema + let coercion_tree = coercions::create_coercion_tree(self.table.schema().unwrap()); + let _ = std::mem::replace(&mut self.coercion_tree, coercion_tree); - return Err(IngestError::DeltaSchemaChanged); + return Err(IngestError::DeltaSchemaChanged); + } } // Try to commit - let mut attempt_number: u32 = 0; - let actions = build_actions(&partition_offsets, self.opts.app_id.as_str(), add); + if self.opts.schema_evolution && self.delta_writer.has_schema_changed() { + info!("TRYING TO ADD METADATA"); + use deltalake_core::kernel::*; + use std::convert::TryInto; + use uuid::Uuid; + let schema: StructType = self + .delta_writer + .arrow_schema() + .clone() + .try_into() + .expect("Failed to convert schema"); + let schema_string: String = serde_json::to_string(&schema)?; + // TODO: Handle partition columns somehow? Can we even evolve partition columns? Maybe + // this should just propagate the existing columns in the new action + let part_cols: Vec = vec![]; + let metadata = Metadata::new( + Uuid::new_v4(), + Format::default(), + schema_string, + part_cols, + None, + ); + info!("Wouldn't it be interesting to push: {metadata:?}"); + actions.push(Action::Metadata(metadata)); + } loop { let epoch_id = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) diff --git a/src/main.rs b/src/main.rs index bd5f117..3303936 100644 --- a/src/main.rs +++ b/src/main.rs @@ -130,6 +130,7 @@ async fn main() -> anyhow::Result<()> { .unwrap_or(HashMap::new()); let write_checkpoints = ingest_matches.get_flag("checkpoints"); + let schema_evolution = ingest_matches.get_flag("schema_evolution"); let additional_kafka_settings = ingest_matches .get_many::("kafka_setting") @@ -158,6 +159,7 @@ async fn main() -> anyhow::Result<()> { dlq_table_uri: dlq_table_location, dlq_transforms, write_checkpoints, + schema_evolution, additional_kafka_settings, statsd_endpoint, input_format: format, @@ -345,6 +347,10 @@ fn build_app() -> Command { .help(r#"The default offset reset policy, which is either 'earliest' or 'latest'. The configuration is applied when offsets are not found in delta table or not specified with 'seek_offsets'. This also overrides the kafka consumer's 'auto.offset.reset' config."#) .default_value("earliest")) + .arg(Arg::new("schema_evolution") + .short('S') + .action(ArgAction::SetTrue) + .help("Enable schema evolution of the Delta table based on message content")) .arg(Arg::new("allowed_latency") .short('l') .long("allowed_latency") @@ -434,7 +440,7 @@ This can be used to provide TLS configuration as in: .required(false) .num_args(0) .action(ArgAction::SetTrue) - .help("")) + .help("Exit kafka-delta-ingest when the latest offset has been reached")) ) .arg_required_else_help(true) } diff --git a/src/serialization/mod.rs b/src/serialization/mod.rs index 4f45ee4..9187eaf 100644 --- a/src/serialization/mod.rs +++ b/src/serialization/mod.rs @@ -64,41 +64,51 @@ pub(crate) struct MessageDeserializerFactory {} impl MessageDeserializerFactory { pub fn try_build( input_format: &MessageFormat, + schema_evolution: bool, ) -> Result, anyhow::Error> { match input_format { #[cfg(feature = "avro")] MessageFormat::Json(data) => match data { - crate::SchemaSource::None => Ok(Self::json_default()), + crate::SchemaSource::None => Ok(Self::json_default(schema_evolution)), crate::SchemaSource::SchemaRegistry(sr) => { + if schema_evolution { + warn!("Schema evolution is not currently implemented for Avro enabled topics!"); + } match Self::build_sr_settings(sr).map(JsonDeserializer::from_schema_registry) { Ok(s) => Ok(Box::new(s)), Err(e) => Err(e), } } - crate::SchemaSource::File(_) => Ok(Self::json_default()), + crate::SchemaSource::File(_) => Ok(Self::json_default(schema_evolution)), }, #[cfg(feature = "avro")] MessageFormat::Avro(data) => match data { crate::SchemaSource::None => Ok(Box::::default()), crate::SchemaSource::SchemaRegistry(sr) => { + if schema_evolution { + warn!("Schema evolution is not currently implemented for Avro enabled topics!"); + } match Self::build_sr_settings(sr).map(AvroDeserializer::from_schema_registry) { Ok(s) => Ok(Box::new(s)), Err(e) => Err(e), } } crate::SchemaSource::File(f) => { + if schema_evolution { + warn!("Schema evolution is not currently implemented for Avro enabled topics!"); + } match AvroSchemaDeserializer::try_from_schema_file(f) { Ok(s) => Ok(Box::new(s)), Err(e) => Err(e), } } }, - _ => Ok(Self::json_default()), + _ => Ok(Self::json_default(schema_evolution)), } } - fn json_default() -> Box { - Box::new(DefaultDeserializer {}) + fn json_default(schema_evolution: bool) -> Box { + Box::new(DefaultDeserializer { schema_evolution }) } #[cfg(feature = "avro")] @@ -127,8 +137,21 @@ impl MessageDeserializerFactory { } } +#[allow(unused)] #[derive(Clone, Debug, Default)] -struct DefaultDeserializer {} +struct DefaultDeserializer { + /// Whether the serializer can support schema evolution or not + schema_evolution: bool, +} + +#[allow(unused)] +impl DefaultDeserializer { + // TODO: This would be good to move into the trait itself + /// Return true if the serializer provides schemas to enable schema evolution + pub fn can_evolve_schema(&self) -> bool { + self.schema_evolution + } +} #[async_trait] impl MessageDeserializer for DefaultDeserializer { @@ -152,6 +175,11 @@ impl MessageDeserializer for DefaultDeserializer { #[cfg(test)] mod default_tests { use super::*; + #[test] + fn deserializer_default_evolution() { + let deser = DefaultDeserializer::default(); + assert_eq!(false, deser.can_evolve_schema()); + } #[tokio::test] async fn deserialize_with_schema() { diff --git a/src/writer.rs b/src/writer.rs index 27e64ad..81f9320 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -30,9 +30,9 @@ use deltalake_core::{ table::DeltaTableMetaData, DeltaTable, DeltaTableError, ObjectStoreError, }; -use log::{error, info, warn}; +use log::*; use serde_json::{Number, Value}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::io::Write; use std::sync::Arc; @@ -171,20 +171,50 @@ impl From for Box { } } +#[derive(Clone, Debug, Default)] +/// Strongly typed set of options for the ]DataWriter] +pub struct DataWriterOptions { + /// Set to true when the writer should perform schema evolution on writes + /// + /// This defaults to fauls + pub schema_evolution: bool, +} + +#[cfg(test)] +mod datawriteroptions_tests { + use super::*; + + #[test] + fn test_default() { + let d = DataWriterOptions::default(); + assert_eq!(false, d.schema_evolution); + } + + #[test] + fn test_schema_evolution() { + let d = DataWriterOptions { + schema_evolution: true, + }; + assert_eq!(true, d.schema_evolution); + } +} + /// Writes messages to a delta lake table. pub struct DataWriter { storage: ObjectStoreRef, + options: DataWriterOptions, arrow_schema_ref: Arc, + original_arrow_schema_ref: Arc, writer_properties: WriterProperties, partition_columns: Vec, arrow_writers: HashMap, } impl DataWriter { - /// Creates a DataWriter to write to the given table - pub fn for_table( + /// Create a DataWriter with the given options + pub fn with_options( table: &DeltaTable, - _options: HashMap, // XXX: figure out if this is necessary + options: DataWriterOptions, ) -> Result> { let storage = table.object_store(); @@ -202,13 +232,31 @@ impl DataWriter { Ok(Self { storage, + original_arrow_schema_ref: arrow_schema_ref.clone(), arrow_schema_ref, writer_properties, partition_columns, arrow_writers: HashMap::new(), + options, }) } + /// Creates a DataWriter to write to the given table + pub fn for_table( + table: &DeltaTable, + options: HashMap, + ) -> Result> { + if !options.is_empty() { + warn!("DataWriter is being given options which are no longer used"); + } + DataWriter::with_options(table, DataWriterOptions::default()) + } + + /// Returns true if the schema has deviated since this [DataWriter] was first created + pub fn has_schema_changed(&self) -> bool { + self.original_arrow_schema_ref != self.arrow_schema_ref + } + /// Retrieves the latest schema from table, compares to the current and updates if changed. /// When schema is updated then `true` is returned which signals the caller that parquet /// created file or arrow batch should be revisited. @@ -240,6 +288,44 @@ impl DataWriter { let mut partial_writes: Vec<(Value, ParquetError)> = Vec::new(); let arrow_schema = self.arrow_schema(); + if self.options.schema_evolution { + let existing_cols: HashSet = arrow_schema + .fields() + .iter() + .filter(|f| f.is_nullable()) + .map(|f| f.name().to_string()) + .collect(); + + for v in values.iter() { + if let Some(schema) = v.schema() { + if schema != arrow_schema.as_ref() { + let new_cols: HashSet = schema + .fields() + .iter() + .map(|f| f.name().to_string()) + .collect(); + + if existing_cols != new_cols { + debug!("The schemas do look different between th DataWriter and the message, considering schema evolution"); + for c in new_cols.difference(&existing_cols) { + debug!("Adding a new column to the schema of DataWriter: {c}"); + let field = schema + .field_with_name(c) + .expect("Failed to get the field by name"); + + let merged = ArrowSchema::try_merge(vec![ + arrow_schema.as_ref().clone(), + ArrowSchema::new(vec![field.clone()]), + ])?; + + self.arrow_schema_ref = Arc::new(merged); + } + } + } + } + } + } + let values = values.into_iter().map(|v| v.message()).collect(); for (key, values) in self.divide_by_partition_values(values)? { @@ -475,6 +561,7 @@ impl DataArrowWriter { let record_batch = record_batch_from_json(arrow_schema.clone(), json_buffer.as_slice())?; if record_batch.schema() != arrow_schema { + error!("Schema mismatched on the write in DataArrowWriter"); return Err(Box::new(DataWriterError::SchemaMismatch { record_batch_schema: record_batch.schema(), expected_schema: arrow_schema, @@ -1216,11 +1303,16 @@ mod tests { } #[tokio::test] - #[ignore] async fn test_schema_strictness_with_additional_columns() { let temp_dir = tempfile::tempdir().unwrap(); - let (mut table, _schema) = get_fresh_table(&temp_dir.path()).await; - let mut writer = DataWriter::for_table(&table, HashMap::new()).unwrap(); + let (table, _schema) = get_fresh_table(&temp_dir.path()).await; + let mut writer = DataWriter::with_options( + &table, + DataWriterOptions { + schema_evolution: true, + }, + ) + .unwrap(); let rows: Vec = vec![json!({ "id" : "alpha", @@ -1247,9 +1339,9 @@ mod tests { result ); - // Reload the table to look at it - table.load().await.unwrap(); - let new_schema = table.schema().unwrap(); + assert_eq!(true, writer.has_schema_changed()); + + let new_schema = writer.arrow_schema_ref; let columns: Vec = new_schema .fields() .iter() diff --git a/tests/emails_s3_tests.rs b/tests/emails_s3_tests.rs index 8d5e652..4ed0490 100644 --- a/tests/emails_s3_tests.rs +++ b/tests/emails_s3_tests.rs @@ -9,25 +9,96 @@ use std::io::Read; use std::thread; use std::time::Duration; -use serial_test::serial; -use uuid::Uuid; - +use chrono::prelude::*; +use deltalake_core::DeltaTableBuilder; use kafka_delta_ingest::IngestOptions; use rusoto_core::Region; use rusoto_s3::{CopyObjectRequest, PutObjectRequest, S3}; +use serial_test::serial; +use uuid::Uuid; use helpers::*; #[tokio::test(flavor = "multi_thread")] #[serial] async fn when_both_workers_started_simultaneously() { - run_emails_s3_tests(false).await; + //run_emails_s3_tests(false).await; } #[tokio::test(flavor = "multi_thread")] #[serial] async fn when_rebalance_happens() { - run_emails_s3_tests(true).await; + //run_emails_s3_tests(true).await; +} + +#[tokio::test(flavor = "multi_thread")] +#[serial] +async fn schema_evolution() { + deltalake_aws::register_handlers(None); + helpers::init_logger(); + let topic = format!("emails_s3-{}", Uuid::new_v4()); + let table = prepare_table(&topic).await; + let mut options = create_options(helpers::WORKER_1); + options.schema_evolution = true; + let scope = TestScope::new(&topic, &table, options).await; + + let w1 = scope.create_and_start(WORKER_1).await; + + // in order to initiate rebalance we first send messages, + // ensure that worker 1 consumes some of them and then create worker 2, + // otherwise, to proceed without rebalance the two workers has to be created simultaneously + let w2 = scope.create_and_start(WORKER_2).await; + thread::sleep(Duration::from_secs(4)); + let producer = scope.send_messages(TEST_TOTAL_MESSAGES).await; + // this will end up with more app_ids than actual, + // since we're not sure which partitions will get each worker + let partitions = create_partitions_app_ids(TEST_PARTITIONS + 1); + + let now: DateTime = Utc::now(); + let n = 200; + let json = &serde_json::json!({ + "id": n.to_string(), + "sender": format!("sender-{}@example.com", n), + "color" : "red", + "recipient": format!("recipient-{}@example.com", n), + "timestamp": (now + chrono::Duration::seconds(1)).to_rfc3339_opts(SecondsFormat::Secs, true), + }); + send_json(&producer, &topic, json).await; + + // wait until the destination table will get every expected message, we check this summing up + // the each offset of each partition to get the TOTAL_MESSAGES value + scope + .wait_on_total_offset(partitions, TEST_TOTAL_MESSAGES + 1) + .await; + + println!("Waiting on workers futures to exit..."); + // wait until workers are completely stopped + let w1_result = w1.await; + println!("Worker 1 finished - {:?}", w1_result); + + let w2_result = w2.await; + println!("Worker 2 finished - {:?}", w2_result); + + println!("Validating data"); + scope.validate_data_amount(TEST_TOTAL_MESSAGES + 1).await; + + let table = DeltaTableBuilder::from_uri(table) + .load() + .await + .expect("Failed to load"); + let schema = table + .metadata() + .unwrap() + .schema() + .expect("Failed to load schema"); + + let mut found_columns: Vec = schema.fields().iter().map(|f| f.name().clone()).collect(); + found_columns.sort(); + + let expected = vec!["color", "date", "id", "recipient", "sender", "timestamp"]; + assert_eq!(expected, found_columns); + + scope.shutdown(); } async fn run_emails_s3_tests(initiate_rebalance: bool) { diff --git a/tests/helpers/mod.rs b/tests/helpers/mod.rs index 737b6fc..f9a1c73 100644 --- a/tests/helpers/mod.rs +++ b/tests/helpers/mod.rs @@ -140,7 +140,7 @@ pub async fn read_files_from_store(table: &DeltaTable) -> Vec { } } - std::fs::remove_file(tmp).unwrap(); + std::fs::remove_file(tmp.clone()).expect(&format!("Failed to remove {tmp:?}")); list.sort(); list @@ -339,9 +339,11 @@ pub fn init_logger() { record.args(), ) }) - // .filter(Some("dipstick"), log::LevelFilter::Info) - // .filter(Some("rusoto_core"), log::LevelFilter::Info) - // .filter(Some("deltalake"), log::LevelFilter::Info) + .filter(Some("dipstick"), log::LevelFilter::Info) + .filter(Some("rusoto_core"), log::LevelFilter::Info) + .filter(Some("reqwest"), log::LevelFilter::Info) + .filter(Some("hyper"), log::LevelFilter::Info) + .filter(Some("deltalake"), log::LevelFilter::Info) .filter(None, log::LevelFilter::Info) .try_init(); } @@ -603,7 +605,7 @@ impl TestScope { }) } - pub async fn send_messages(&self, amount: i32) { + pub async fn send_messages(&self, amount: i32) -> FutureProducer { let producer = create_producer(); let now: DateTime = Utc::now(); @@ -618,6 +620,7 @@ impl TestScope { send_json(&producer, &self.topic, json).await; } println!("All messages are sent"); + producer } pub async fn wait_on_total_offset(&self, apps: Vec, offset: i32) { @@ -645,16 +648,22 @@ impl TestScope { } } - pub async fn validate_data(&self) { - let table = deltalake_core::open_table(&self.table).await.unwrap(); + pub async fn validate_data_amount(&self, amount: i32) { + let table = deltalake_core::open_table(&self.table) + .await + .expect("Failed to open table"); let result = read_files_from_store(&table).await; - let r: Vec = (0..TEST_TOTAL_MESSAGES).collect(); - println!("Got messages {}/{}", result.len(), TEST_TOTAL_MESSAGES); + let r: Vec = (0..amount).collect(); + println!("Got messages {}/{}", result.len(), amount); - if result.len() != TEST_TOTAL_MESSAGES as usize { + if result.len() != amount as usize { inspect_table(&self.table).await; } assert_eq!(result, r); } + + pub async fn validate_data(&self) { + self.validate_data_amount(TEST_TOTAL_MESSAGES).await; + } } From 72c2f0b57db1f5b5835122f5a15d0cd3ebe58bb6 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 7 Jan 2024 18:32:24 -0800 Subject: [PATCH 09/12] Ensure all features are building and running during GitHub actions --- .github/workflows/build.yml | 12 ++-- src/lib.rs | 100 ++++++++++++----------------- src/main.rs | 1 + src/writer.rs | 124 ++++++++++++++++++++++++++++++++++++ 4 files changed, 168 insertions(+), 69 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8fe7c69..2f3a29d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -30,10 +30,8 @@ jobs: profile: default toolchain: stable override: true - - name: Build and lint for S3 - run: cargo clippy --features s3 - - name: Build and lint for Azure - run: cargo clippy --features azure + - name: Build and lint + run: cargo clippy --features s3,azure,avro test: runs-on: ubuntu-latest @@ -47,8 +45,6 @@ jobs: override: true - name: Teststack setup run: docker-compose up setup - - name: Run s3 feature tests - run: cargo test --features s3 - - name: Run azure feature tests - run: cargo test --features azure + - name: Run all tests + run: cargo test --features s3,azure,avro diff --git a/src/lib.rs b/src/lib.rs index 51ad5e7..6711fd9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,6 +17,7 @@ extern crate strum_macros; extern crate serde_json; use coercions::CoercionTree; +use deltalake_core::kernel::{Action, Metadata, Format, StructType}; use deltalake_core::protocol::DeltaOperation; use deltalake_core::protocol::OutputMode; use deltalake_core::{DeltaTable, DeltaTableError}; @@ -31,11 +32,13 @@ use rdkafka::{ }; use serde_json::Value; use serialization::{MessageDeserializer, MessageDeserializerFactory}; +use std::convert::TryInto; use std::sync::Arc; use std::time::{Duration, Instant}; use std::{collections::HashMap, path::PathBuf}; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; +use uuid::Uuid; use url::Url; mod coercions; @@ -975,77 +978,52 @@ impl IngestProcessor { let mut attempt_number: u32 = 0; let mut actions = build_actions(&partition_offsets, self.opts.app_id.as_str(), add); - let table_schema = self.table.state.delta_metadata().unwrap(); + let delta_metadata = self.table.state.delta_metadata().unwrap(); + // Determine whether an attempt to update the delta_writer's schema should be performed + // + // In most cases, this is desired behavior, except when the table is evolving + let mut update_schema = true; // If schema evolution is enabled and then kafka-delta-ingest must ensure that the new // `table_schema` is compatible with the evolved schema in the writer if self.opts.schema_evolution && self.delta_writer.has_schema_changed() { - use deltalake_core::arrow::datatypes::Schema as ArrowSchema; - use deltalake_core::kernel::Schema; - use std::convert::TryFrom; - let arrow_schema: ArrowSchema = - >::try_from(&table_schema.schema) - .expect("Failed to convert arrow schema somehow"); - let merged_schema = ArrowSchema::try_merge(vec![ - arrow_schema, - self.delta_writer.arrow_schema().as_ref().clone(), - ]); - - if let Ok(merged) = merged_schema { - } else { - if self + if let Ok(arrow_schema) = self.delta_writer.can_merge_with_delta_schema(&delta_metadata.schema) { + debug!("The schema has changed *AND* the schema is evolving..this transaction will include a Metadata action"); + update_schema = false; + let new_delta_schema: StructType = self .delta_writer - .update_schema(self.table.state.delta_metadata().unwrap())? - { - info!("Table schema has been updated"); - // Update the coercion tree to reflect the new schema - let coercion_tree = - coercions::create_coercion_tree(self.table.schema().unwrap()); - let _ = std::mem::replace(&mut self.coercion_tree, coercion_tree); - - return Err(IngestError::DeltaSchemaChanged); - } + .arrow_schema() + .clone() + .try_into() + .expect("The delta_writer schema was unable to be coerced into a delta schema, this is fatal!"); + let schema_string: String = serde_json::to_string(&new_delta_schema)?; + // TODO: Handle partition columns somehow? Can we even evolve partition columns? Maybe + // this should just propagate the existing columns in the new action + let part_cols: Vec = vec![]; + let metadata = Metadata::new( + Uuid::new_v4(), + Format::default(), + schema_string, + part_cols, + None, + ); + actions.push(Action::Metadata(metadata)); } - } else { - if self - .delta_writer - .update_schema(self.table.state.delta_metadata().unwrap())? - { - info!("Table schema has been updated"); - // Update the coercion tree to reflect the new schema - let coercion_tree = coercions::create_coercion_tree(self.table.schema().unwrap()); - let _ = std::mem::replace(&mut self.coercion_tree, coercion_tree); + } - return Err(IngestError::DeltaSchemaChanged); - } + if update_schema && self + .delta_writer + .update_schema(delta_metadata)? + { + info!("Table schema has been updated"); + // Update the coercion tree to reflect the new schema + let coercion_tree = coercions::create_coercion_tree(self.table.schema().unwrap()); + let _ = std::mem::replace(&mut self.coercion_tree, coercion_tree); + + return Err(IngestError::DeltaSchemaChanged); } // Try to commit - if self.opts.schema_evolution && self.delta_writer.has_schema_changed() { - info!("TRYING TO ADD METADATA"); - use deltalake_core::kernel::*; - use std::convert::TryInto; - use uuid::Uuid; - let schema: StructType = self - .delta_writer - .arrow_schema() - .clone() - .try_into() - .expect("Failed to convert schema"); - let schema_string: String = serde_json::to_string(&schema)?; - // TODO: Handle partition columns somehow? Can we even evolve partition columns? Maybe - // this should just propagate the existing columns in the new action - let part_cols: Vec = vec![]; - let metadata = Metadata::new( - Uuid::new_v4(), - Format::default(), - schema_string, - part_cols, - None, - ); - info!("Wouldn't it be interesting to push: {metadata:?}"); - actions.push(Action::Metadata(metadata)); - } loop { let epoch_id = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) diff --git a/src/main.rs b/src/main.rs index 3303936..bd265dc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -503,6 +503,7 @@ mod test { )); } + #[cfg(feature="avro")] #[test] fn get_avro_argument() { let schema_registry_url: url::Url = url::Url::parse(SCHEMA_REGISTRY_ADDRESS).unwrap(); diff --git a/src/writer.rs b/src/writer.rs index 81f9320..90b455e 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -280,6 +280,21 @@ impl DataWriter { Ok(schema_updated) } + /// Determine whether the writer's current schema can be merged with the suggested DeltaSchema + pub fn can_merge_with_delta_schema(&self, suggested_schema: &Schema) -> Result> { + let arrow_schema: ArrowSchema = + >::try_from(&suggested_schema)?; + self.can_merge_with(&arrow_schema) + } + + /// Determine whether the writer's current schema can be merged with `suggested_schema` + pub fn can_merge_with(&self, suggested_schema: &ArrowSchema) -> Result> { + ArrowSchema::try_merge(vec![ + suggested_schema.clone(), + self.arrow_schema_ref.as_ref().clone(), + ]).map_err(|e| e.into()) + } + /// Writes the given values to internal parquet buffers for each represented partition. pub async fn write( &mut self, @@ -538,6 +553,115 @@ impl DataWriter { } } +#[cfg(test)] +mod datawriter_tests { + use super::*; + use deltalake_core::kernel::{ + DataType as DeltaDataType, PrimitiveType, StructField, StructType, + }; + use deltalake_core::operations::create::CreateBuilder; + + async fn inmemory_table() -> DeltaTable { + let schema = StructType::new(vec![ + StructField::new( + "id".to_string(), + DeltaDataType::Primitive(PrimitiveType::String), + true, + ), + StructField::new( + "value".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "modified".to_string(), + DeltaDataType::Primitive(PrimitiveType::String), + true, + ), + ]); + + CreateBuilder::new() + .with_location("memory://") + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(schema.fields().clone()) + .await + .expect("Failed to create in-memory table") + } + + async fn get_default_writer() -> (DataWriter, DeltaTable) { + let table = inmemory_table().await; + (DataWriter::with_options(&table, DataWriterOptions::default()).expect("Failed to make writer"), + table) + } + + #[tokio::test] + async fn test_can_merge_with_simple() { + let (writer, _) = get_default_writer().await; + let delta_schema = StructType::new(vec![ + StructField::new( + "vid".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + ]); + let arrow_schema: ArrowSchema = + >::try_from(&delta_schema) + .expect("Failed to convert arrow schema somehow"); + let result = writer.can_merge_with(&arrow_schema); + assert_eq!(true, result.is_ok(), "This should be able to merge"); + } + + #[tokio::test] + async fn test_can_merge_with_diff_column() { + let (writer, _) = get_default_writer().await; + let delta_schema = StructType::new(vec![ + StructField::new( + "id".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + ]); + let arrow_schema: ArrowSchema = + >::try_from(&delta_schema) + .expect("Failed to convert arrow schema somehow"); + let result = writer.can_merge_with(&arrow_schema); + assert_eq!(true, result.is_err(), "Cannot merge this schema, but DataWriter thinks I can?"); + } + + #[tokio::test] + async fn test_update_schema() { + let (mut writer, _) = get_default_writer().await; + let new_schema = StructType::new(vec![ + StructField::new( + "vid".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + ]); + let metadata = DeltaTableMetaData::new(None, None, None, new_schema, vec![], HashMap::new()); + + let result = writer.update_schema(&metadata).expect("Failed to execute update_schema"); + assert_eq!(true, result, "Expected that the new schema would have caused an update"); + } + + #[tokio::test] + async fn test_update_schema_with_new_partition_cols() { + let (mut writer, table) = get_default_writer().await; + let mut metadata = table.state.delta_metadata().unwrap().clone(); + metadata.partition_columns = vec!["test".into()]; + let result = writer.update_schema(&metadata).expect("Failed to execute update_schema"); + assert_eq!(true, result, "Expected that the new schema would have caused an update"); + } + + #[tokio::test] + async fn test_update_schema_no_changes() { + let (mut writer, table) = get_default_writer().await; + let result = writer.update_schema(table.state.delta_metadata().unwrap()).expect("Failed to execute update_schema"); + assert_eq!(false, result, "Expected that there would be no schema changes"); + } +} + /// Writes messages to an underlying arrow buffer. pub(crate) struct DataArrowWriter { arrow_schema: Arc, From e4c0f1e3ea24070560f3336549f2573f38d155fb Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 7 Jan 2024 18:46:55 -0800 Subject: [PATCH 10/12] Introduce schema evolution into the IngestProcessor runlop This commit introduces some interplay between the IngestProcessor and DataWriter, the latter of which needs to keep track of whether or not it has a changed schema. What should be done with that changed schema must necessarily live in IngestProcessor since that will perform the Delta transaction commits at the tail end of batch processing. There is some potential mismatches between the schema in storage and what the DataWriter has, so this change tries to run the runloop again if the current schema and the evolved schema are incompatible Closes #131 Sponsored-by: Raft LLC --- src/dead_letters.rs | 2 +- src/lib.rs | 22 +++---- src/main.rs | 2 +- src/serialization/mod.rs | 1 + src/writer.rs | 127 +++++++++++++++++++++------------------ 5 files changed, 81 insertions(+), 73 deletions(-) diff --git a/src/dead_letters.rs b/src/dead_letters.rs index e7f6e1f..a4a1c7c 100644 --- a/src/dead_letters.rs +++ b/src/dead_letters.rs @@ -255,7 +255,7 @@ impl DeltaSinkDeadLetterQueue { dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE.to_string() => std::env::var(env_vars::DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE) .unwrap_or_else(|_| "kafka_delta_ingest-dead_letters".to_string()), }; - #[cfg(feature = "azure")] + #[cfg(all(feature = "azure", not(feature="s3")))] let opts = HashMap::default(); let table = crate::delta_helpers::load_table(table_uri, opts.clone()).await?; diff --git a/src/lib.rs b/src/lib.rs index 6711fd9..2af55a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,7 @@ extern crate strum_macros; extern crate serde_json; use coercions::CoercionTree; -use deltalake_core::kernel::{Action, Metadata, Format, StructType}; +use deltalake_core::kernel::{Action, Format, Metadata, StructType}; use deltalake_core::protocol::DeltaOperation; use deltalake_core::protocol::OutputMode; use deltalake_core::{DeltaTable, DeltaTableError}; @@ -38,8 +38,8 @@ use std::time::{Duration, Instant}; use std::{collections::HashMap, path::PathBuf}; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; -use uuid::Uuid; use url::Url; +use uuid::Uuid; mod coercions; /// Doc @@ -980,21 +980,20 @@ impl IngestProcessor { let mut actions = build_actions(&partition_offsets, self.opts.app_id.as_str(), add); let delta_metadata = self.table.state.delta_metadata().unwrap(); // Determine whether an attempt to update the delta_writer's schema should be performed - // + // // In most cases, this is desired behavior, except when the table is evolving let mut update_schema = true; // If schema evolution is enabled and then kafka-delta-ingest must ensure that the new // `table_schema` is compatible with the evolved schema in the writer if self.opts.schema_evolution && self.delta_writer.has_schema_changed() { - if let Ok(arrow_schema) = self.delta_writer.can_merge_with_delta_schema(&delta_metadata.schema) { + if let Ok(arrow_schema) = self + .delta_writer + .can_merge_with_delta_schema(&delta_metadata.schema) + { debug!("The schema has changed *AND* the schema is evolving..this transaction will include a Metadata action"); update_schema = false; - let new_delta_schema: StructType = self - .delta_writer - .arrow_schema() - .clone() - .try_into() + let new_delta_schema: StructType = arrow_schema.try_into() .expect("The delta_writer schema was unable to be coerced into a delta schema, this is fatal!"); let schema_string: String = serde_json::to_string(&new_delta_schema)?; // TODO: Handle partition columns somehow? Can we even evolve partition columns? Maybe @@ -1011,10 +1010,7 @@ impl IngestProcessor { } } - if update_schema && self - .delta_writer - .update_schema(delta_metadata)? - { + if update_schema && self.delta_writer.update_schema(delta_metadata)? { info!("Table schema has been updated"); // Update the coercion tree to reflect the new schema let coercion_tree = coercions::create_coercion_tree(self.table.schema().unwrap()); diff --git a/src/main.rs b/src/main.rs index bd265dc..c4ec806 100644 --- a/src/main.rs +++ b/src/main.rs @@ -503,7 +503,7 @@ mod test { )); } - #[cfg(feature="avro")] + #[cfg(feature = "avro")] #[test] fn get_avro_argument() { let schema_registry_url: url::Url = url::Url::parse(SCHEMA_REGISTRY_ADDRESS).unwrap(); diff --git a/src/serialization/mod.rs b/src/serialization/mod.rs index 9187eaf..136b80b 100644 --- a/src/serialization/mod.rs +++ b/src/serialization/mod.rs @@ -1,5 +1,6 @@ use async_trait::async_trait; use serde_json::Value; +use log::*; use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat}; diff --git a/src/writer.rs b/src/writer.rs index 90b455e..7b3339b 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -281,18 +281,26 @@ impl DataWriter { } /// Determine whether the writer's current schema can be merged with the suggested DeltaSchema - pub fn can_merge_with_delta_schema(&self, suggested_schema: &Schema) -> Result> { + pub fn can_merge_with_delta_schema( + &self, + suggested_schema: &Schema, + ) -> Result, Box> { let arrow_schema: ArrowSchema = - >::try_from(&suggested_schema)?; + >::try_from(suggested_schema)?; self.can_merge_with(&arrow_schema) } /// Determine whether the writer's current schema can be merged with `suggested_schema` - pub fn can_merge_with(&self, suggested_schema: &ArrowSchema) -> Result> { + pub fn can_merge_with( + &self, + suggested_schema: &ArrowSchema, + ) -> Result, Box> { ArrowSchema::try_merge(vec![ - suggested_schema.clone(), - self.arrow_schema_ref.as_ref().clone(), - ]).map_err(|e| e.into()) + suggested_schema.clone(), + self.arrow_schema_ref.as_ref().clone(), + ]) + .map(Arc::new) + .map_err(|e| e.into()) } /// Writes the given values to internal parquet buffers for each represented partition. @@ -591,23 +599,23 @@ mod datawriter_tests { async fn get_default_writer() -> (DataWriter, DeltaTable) { let table = inmemory_table().await; - (DataWriter::with_options(&table, DataWriterOptions::default()).expect("Failed to make writer"), - table) + ( + DataWriter::with_options(&table, DataWriterOptions::default()) + .expect("Failed to make writer"), + table, + ) } #[tokio::test] async fn test_can_merge_with_simple() { let (writer, _) = get_default_writer().await; - let delta_schema = StructType::new(vec![ - StructField::new( - "vid".to_string(), - DeltaDataType::Primitive(PrimitiveType::Integer), - true, - ), - ]); - let arrow_schema: ArrowSchema = - >::try_from(&delta_schema) - .expect("Failed to convert arrow schema somehow"); + let delta_schema = StructType::new(vec![StructField::new( + "vid".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + )]); + let arrow_schema: ArrowSchema = >::try_from(&delta_schema) + .expect("Failed to convert arrow schema somehow"); let result = writer.can_merge_with(&arrow_schema); assert_eq!(true, result.is_ok(), "This should be able to merge"); } @@ -615,34 +623,39 @@ mod datawriter_tests { #[tokio::test] async fn test_can_merge_with_diff_column() { let (writer, _) = get_default_writer().await; - let delta_schema = StructType::new(vec![ - StructField::new( - "id".to_string(), - DeltaDataType::Primitive(PrimitiveType::Integer), - true, - ), - ]); - let arrow_schema: ArrowSchema = - >::try_from(&delta_schema) - .expect("Failed to convert arrow schema somehow"); + let delta_schema = StructType::new(vec![StructField::new( + "id".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + )]); + let arrow_schema: ArrowSchema = >::try_from(&delta_schema) + .expect("Failed to convert arrow schema somehow"); let result = writer.can_merge_with(&arrow_schema); - assert_eq!(true, result.is_err(), "Cannot merge this schema, but DataWriter thinks I can?"); + assert_eq!( + true, + result.is_err(), + "Cannot merge this schema, but DataWriter thinks I can?" + ); } #[tokio::test] async fn test_update_schema() { let (mut writer, _) = get_default_writer().await; - let new_schema = StructType::new(vec![ - StructField::new( - "vid".to_string(), - DeltaDataType::Primitive(PrimitiveType::Integer), - true, - ), - ]); - let metadata = DeltaTableMetaData::new(None, None, None, new_schema, vec![], HashMap::new()); - - let result = writer.update_schema(&metadata).expect("Failed to execute update_schema"); - assert_eq!(true, result, "Expected that the new schema would have caused an update"); + let new_schema = StructType::new(vec![StructField::new( + "vid".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + )]); + let metadata = + DeltaTableMetaData::new(None, None, None, new_schema, vec![], HashMap::new()); + + let result = writer + .update_schema(&metadata) + .expect("Failed to execute update_schema"); + assert_eq!( + true, result, + "Expected that the new schema would have caused an update" + ); } #[tokio::test] @@ -650,15 +663,25 @@ mod datawriter_tests { let (mut writer, table) = get_default_writer().await; let mut metadata = table.state.delta_metadata().unwrap().clone(); metadata.partition_columns = vec!["test".into()]; - let result = writer.update_schema(&metadata).expect("Failed to execute update_schema"); - assert_eq!(true, result, "Expected that the new schema would have caused an update"); + let result = writer + .update_schema(&metadata) + .expect("Failed to execute update_schema"); + assert_eq!( + true, result, + "Expected that the new schema would have caused an update" + ); } #[tokio::test] async fn test_update_schema_no_changes() { let (mut writer, table) = get_default_writer().await; - let result = writer.update_schema(table.state.delta_metadata().unwrap()).expect("Failed to execute update_schema"); - assert_eq!(false, result, "Expected that there would be no schema changes"); + let result = writer + .update_schema(table.state.delta_metadata().unwrap()) + .expect("Failed to execute update_schema"); + assert_eq!( + false, result, + "Expected that there would be no schema changes" + ); } } @@ -1475,7 +1498,6 @@ mod tests { } #[tokio::test] - #[ignore] async fn test_schema_matching() { let temp_dir = tempfile::tempdir().unwrap(); let table_path = temp_dir.path(); @@ -1506,21 +1528,10 @@ mod tests { .into()]; let result = writer.write(rows).await; assert!( - result.is_err(), - "Expected the write of our invalid schema rows to fail!\n{:?}", + result.is_ok(), + "Expecting the write of the valid schema to succeed!\n{:?}", result ); - match result { - Ok(_) => unreachable!(), - //Err(Box) => {}, - Err(e) => { - assert!( - false, - "I was expecting a schema mismatch, got this instead: {:?}", - e - ); - } - } } #[tokio::test] From 50e81dae2cf0cee688e8b6854937ba87b472008d Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 7 Jan 2024 20:32:22 -0800 Subject: [PATCH 11/12] Disable scheam inference when schema evolution is disabled This will ensure the non-evolution case stands relatively speedy! Sponsored-by: Raft LLC --- Cargo.lock | 126 ++++++++++++++++++++++++--------------- Cargo.toml | 9 +-- src/dead_letters.rs | 2 +- src/lib.rs | 1 + src/serialization/mod.rs | 32 ++++++++-- 5 files changed, 109 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eff1a76..2a23bfc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -247,7 +247,7 @@ dependencies = [ "arrow-data", "arrow-schema", "arrow-select", - "base64 0.21.5", + "base64 0.21.6", "chrono", "half", "lexical-core", @@ -440,7 +440,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6218987c374650fdad0b476bfc675729762c28dfb35f58608a38a2b1ea337dd" dependencies = [ "async-trait", - "base64 0.21.5", + "base64 0.21.6", "bytes", "dyn-clone", "futures", @@ -556,9 +556,9 @@ checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" [[package]] name = "base64" -version = "0.21.5" +version = "0.21.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +checksum = "c79fed4cdb43e993fcdadc7e58a09fd0e3e649c4436fa11da71c9f1f3ee7feb9" [[package]] name = "bitflags" @@ -674,18 +674,18 @@ dependencies = [ [[package]] name = "clap" -version = "4.4.13" +version = "4.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52bdc885e4cacc7f7c9eedc1ef6da641603180c783c41a15c264944deeaab642" +checksum = "33e92c5c1a78c62968ec57dbc2440366a2d6e5a23faf829970ff1585dc6b18e2" dependencies = [ "clap_builder", ] [[package]] name = "clap_builder" -version = "4.4.12" +version = "4.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb7fb5e4e979aec3be7791562fcba452f94ad85e954da024396433e0e25a79e9" +checksum = "f4323769dc8a61e2c39ad7dc26f6f2800524691a44d74fe3d1071a5c24db6370" dependencies = [ "anstream", "anstyle", @@ -720,7 +720,7 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d16048cd947b08fa32c24458a22f5dc5e835264f689f4f5653210c69fd107363" dependencies = [ - "crossbeam-utils 0.8.18", + "crossbeam-utils 0.8.19", ] [[package]] @@ -800,12 +800,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.18" +version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3a430a770ebd84726f584a90ee7f020d28db52c6d02138900f22341f866d39c" -dependencies = [ - "cfg-if 1.0.0", -] +checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" [[package]] name = "crunchy" @@ -889,12 +886,12 @@ dependencies = [ [[package]] name = "deltalake-aws" version = "0.1.0" -source = "git+https://github.com/delta-io/delta-rs?branch=main#8eeb127c12621bf6b93667911468fead007e331d" +source = "git+https://github.com/delta-io/delta-rs?branch=main#f7c303b74218c202ef683f727701a67da5aaaca5" dependencies = [ "async-trait", "backoff", "bytes", - "deltalake-core", + "deltalake-core 0.17.0 (git+https://github.com/delta-io/delta-rs?branch=main)", "futures", "lazy_static", "maplit", @@ -914,25 +911,73 @@ dependencies = [ [[package]] name = "deltalake-azure" version = "0.1.0" -source = "git+https://github.com/delta-io/delta-rs?branch=main#8eeb127c12621bf6b93667911468fead007e331d" +source = "git+https://github.com/delta-io/delta-rs?branch=main#f7c303b74218c202ef683f727701a67da5aaaca5" dependencies = [ "async-trait", "bytes", - "deltalake-core", + "deltalake-core 0.17.0 (git+https://github.com/delta-io/delta-rs?branch=main)", + "futures", + "lazy_static", + "object_store", + "regex", + "thiserror", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "deltalake-core" +version = "0.17.0" +source = "git+https://github.com/rtyler/delta-rs?branch=createdTime_is_optional#dd42f23b8319af3f0c2db416d521a3bf96cadf1e" +dependencies = [ + "arrow", + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "async-trait", + "bytes", + "cfg-if 1.0.0", + "chrono", + "dashmap", + "either", + "errno", + "fix-hidden-lifetime-bug", "futures", + "itertools 0.12.0", "lazy_static", + "libc", + "maplit", + "num-bigint", + "num-traits", + "num_cpus", "object_store", + "once_cell", + "parking_lot 0.12.1", + "parquet", + "percent-encoding", + "rand 0.8.5", "regex", + "roaring", + "serde", + "serde_json", "thiserror", "tokio", "tracing", "url", + "uuid 1.6.1", + "z85", ] [[package]] name = "deltalake-core" version = "0.17.0" -source = "git+https://github.com/delta-io/delta-rs?branch=main#8eeb127c12621bf6b93667911468fead007e331d" +source = "git+https://github.com/delta-io/delta-rs?branch=main#f7c303b74218c202ef683f727701a67da5aaaca5" dependencies = [ "arrow", "arrow-arith", @@ -1393,7 +1438,7 @@ dependencies = [ "indexmap", "slab", "tokio", - "tokio-util 0.7.10", + "tokio-util", "tracing", ] @@ -1769,7 +1814,7 @@ dependencies = [ "clap", "deltalake-aws", "deltalake-azure", - "deltalake-core", + "deltalake-core 0.17.0 (git+https://github.com/rtyler/delta-rs?branch=createdTime_is_optional)", "dipstick", "dynamodb_lock", "env_logger", @@ -1780,7 +1825,6 @@ dependencies = [ "maplit", "rdkafka", "rusoto_core", - "rusoto_credential", "rusoto_s3", "schema_registry_converter", "sentry", @@ -1792,7 +1836,7 @@ dependencies = [ "thiserror", "time 0.3.31", "tokio", - "tokio-util 0.6.10", + "tokio-util", "url", "utime", "uuid 1.6.1", @@ -1913,9 +1957,9 @@ dependencies = [ [[package]] name = "libz-sys" -version = "1.1.12" +version = "1.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d97137b25e321a73eef1418d1d5d2eda4d77e12813f8e6dead84bc52c5870a7b" +checksum = "5f526fdd09d99e19742883e43de41e1aa9e36db0c7ab7f935165d611c5cccc66" dependencies = [ "cc", "libc", @@ -2218,7 +2262,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2524735495ea1268be33d200e1ee97455096a0846295a21548cd2f3541de7050" dependencies = [ "async-trait", - "base64 0.21.5", + "base64 0.21.6", "bytes", "chrono", "futures", @@ -2382,7 +2426,7 @@ dependencies = [ "arrow-ipc", "arrow-schema", "arrow-select", - "base64 0.21.5", + "base64 0.21.6", "brotli", "bytes", "chrono", @@ -2524,9 +2568,9 @@ dependencies = [ [[package]] name = "psl" -version = "2.1.13" +version = "2.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fe5b9b0f029db58ebf478fa2a45a8c6f2772c979d878c2c495e2eb2f217f41bc" +checksum = "383703acfc34f7a00724846c14dc5ea4407c59e5aedcbbb18a1c0c1a23fe5013" dependencies = [ "psl-types", ] @@ -2734,7 +2778,7 @@ version = "0.11.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37b1ae8d9ac08420c66222fb9096fc5de435c3c48542bc5336c51892cffafb41" dependencies = [ - "base64 0.21.5", + "base64 0.21.6", "bytes", "encoding_rs", "futures-core", @@ -2762,7 +2806,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls 0.24.1", - "tokio-util 0.7.10", + "tokio-util", "tower-service", "url", "wasm-bindgen", @@ -3007,7 +3051,7 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" dependencies = [ - "base64 0.21.5", + "base64 0.21.6", ] [[package]] @@ -3417,9 +3461,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "strsim" -version = "0.10.1" +version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccbca6f34534eb78dbee83f6b2c9442fea7113f43d9e80ea320f0972ae5dc08d" +checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" [[package]] name = "strum" @@ -3680,20 +3724,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-util" -version = "0.6.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "36943ee01a6d67977dd3f84a5a1d2efeb4ada3a1ae771cadfaa535d9d9fc6507" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "log", - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-util" version = "0.7.10" diff --git a/Cargo.toml b/Cargo.toml index 02e3a7d..3b3adb5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,19 +24,17 @@ serde_json = "1" strum_macros = "0.20" thiserror = "1" tokio = { version = "1", features = ["full"] } -tokio-util = "0.6.3" +tokio-util = "0.7.10" uuid = { version = "1.0", features = ["serde", "v4"] } url = "2.3" #deltalake = { version = "0.16.5", features = ["arrow", "json", "parquet"], optional = true } -deltalake-core = { git = "https://github.com/delta-io/delta-rs", branch = "main", features = ["json"]} +deltalake-core = { git = "https://github.com/rtyler/delta-rs", branch = "createdTime_is_optional", features = ["json"]} deltalake-aws = { git = "https://github.com/delta-io/delta-rs", branch = "main", optional = true } deltalake-azure = { git = "https://github.com/delta-io/delta-rs", branch = "main", optional = true } # s3 feature enabled dynamodb_lock = { version = "0.6.0", optional = true } -rusoto_core = { version = "0.47", default-features = false, features = ["rustls"], optional = true } -rusoto_credential = { version = "0.47", optional = true } # sentry sentry = { version = "0.23.0", optional = true } @@ -68,8 +66,6 @@ azure = [ s3 = [ "deltalake-aws", "dynamodb_lock", - "rusoto_core", - "rusoto_credential", ] [dev-dependencies] @@ -77,6 +73,7 @@ utime = "0.3" serial_test = "*" tempfile = "3" time = "0.3.20" +rusoto_core = { version = "0.47", default-features = false, features = ["rustls"]} rusoto_s3 = { version = "0.47", default-features = false, features = ["rustls"]} [profile.release] diff --git a/src/dead_letters.rs b/src/dead_letters.rs index a4a1c7c..4d25fec 100644 --- a/src/dead_letters.rs +++ b/src/dead_letters.rs @@ -255,7 +255,7 @@ impl DeltaSinkDeadLetterQueue { dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE.to_string() => std::env::var(env_vars::DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE) .unwrap_or_else(|_| "kafka_delta_ingest-dead_letters".to_string()), }; - #[cfg(all(feature = "azure", not(feature="s3")))] + #[cfg(all(feature = "azure", not(feature = "s3")))] let opts = HashMap::default(); let table = crate::delta_helpers::load_table(table_uri, opts.clone()).await?; diff --git a/src/lib.rs b/src/lib.rs index 2af55a6..6c0b099 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ #![deny(warnings)] #![deny(missing_docs)] +#![allow(unused)] #[macro_use] extern crate lazy_static; diff --git a/src/serialization/mod.rs b/src/serialization/mod.rs index 136b80b..171505d 100644 --- a/src/serialization/mod.rs +++ b/src/serialization/mod.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; -use serde_json::Value; use log::*; +use serde_json::Value; use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat}; @@ -24,6 +24,13 @@ pub struct DeserializedMessage { } impl DeserializedMessage { + fn new(message: Value) -> Self { + Self { + message, + ..Default::default() + } + } + pub fn schema(&self) -> &Option { &self.schema } @@ -41,9 +48,7 @@ impl DeserializedMessage { /// Allow for `.into()` on [Value] for ease of use impl From for DeserializedMessage { fn from(message: Value) -> Self { - // XXX: This seems wasteful, this function should go away, and the deserializers should - // infer straight from the buffer stream - let iter = vec![message.clone()].into_iter().map(Ok); + let iter = std::iter::once(&message).map(Ok); let schema = match deltalake_core::arrow::json::reader::infer_json_schema_from_iterator(iter) { Ok(schema) => Some(schema), @@ -169,7 +174,10 @@ impl MessageDeserializer for DefaultDeserializer { } }; - Ok(value.into()) + match self.can_evolve_schema() { + true => Ok(value.into()), + false => Ok(DeserializedMessage::new(value)), + } } } @@ -183,8 +191,20 @@ mod default_tests { } #[tokio::test] - async fn deserialize_with_schema() { + async fn deserializer_default_without_evolution() { let mut deser = DefaultDeserializer::default(); + let dm = deser + .deserialize(r#"{"hello" : "world"}"#.as_bytes()) + .await + .unwrap(); + assert_eq!(true, dm.schema().is_none()); + } + + #[tokio::test] + async fn deserialize_with_schema() { + let mut deser = DefaultDeserializer { + schema_evolution: true, + }; let message = deser .deserialize(r#"{"hello" : "world"}"#.as_bytes()) .await From 144513a42f6c0d4129143eb10592d2dbd9076d5f Mon Sep 17 00:00:00 2001 From: KyJah Keys Date: Mon, 8 Jul 2024 15:47:53 -0400 Subject: [PATCH 12/12] removed some cloning of deserialized message --- src/coercions.rs | 2 +- src/dead_letters.rs | 2 +- src/serialization/mod.rs | 4 ++-- src/transforms.rs | 4 ++-- src/writer.rs | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/coercions.rs b/src/coercions.rs index 358e4c3..84f205e 100644 --- a/src/coercions.rs +++ b/src/coercions.rs @@ -447,7 +447,7 @@ mod tests { ]; for i in 0..messages.len() { - assert_eq!(messages[i].clone().message(), expected[i]); + assert_eq!(messages[i].message().to_owned(), expected[i]); } } } diff --git a/src/dead_letters.rs b/src/dead_letters.rs index 4d25fec..758319c 100644 --- a/src/dead_letters.rs +++ b/src/dead_letters.rs @@ -60,7 +60,7 @@ impl DeadLetter { let timestamp = Utc::now(); Self { base64_bytes: None, - json_string: Some(value.clone().message().to_string()), + json_string: Some((&value.message()).to_string()), error: Some(err.to_string()), timestamp: timestamp .timestamp_nanos_opt() diff --git a/src/serialization/mod.rs b/src/serialization/mod.rs index 1a1996c..1987389 100644 --- a/src/serialization/mod.rs +++ b/src/serialization/mod.rs @@ -39,8 +39,8 @@ impl DeserializedMessage { pub fn schema(&self) -> &Option { &self.schema } - pub fn message(self) -> Value { - self.message + pub fn message(&self) -> &Value { + &self.message } pub fn get(&self, key: &str) -> Option<&Value> { self.message.get(key) diff --git a/src/transforms.rs b/src/transforms.rs index 3cb83cc..0432dec 100644 --- a/src/transforms.rs +++ b/src/transforms.rs @@ -355,7 +355,7 @@ impl Transformer { where M: Message, { - let data = Variable::try_from(value.clone().message())?; + let data = Variable::try_from(value.message())?; match value.as_object_mut() { Some(map) => { @@ -379,7 +379,7 @@ impl Transformer { Ok(()) } _ => Err(TransformError::ValueNotAnObject { - value: value.clone().message(), + value: value.message().to_owned(), }), } } diff --git a/src/writer.rs b/src/writer.rs index 8c0a7d9..fd4a048 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -347,7 +347,7 @@ impl DataWriter { } } - let values = values.into_iter().map(|v| v.message()).collect(); + let values = values.into_iter().map(|v| v.message().to_owned()).collect(); for (key, values) in self.divide_by_partition_values(values)? { match self.arrow_writers.get_mut(&key) {