diff --git a/kernel/src/action_reconciliation/log_replay.rs b/kernel/src/action_reconciliation/log_replay.rs index a0e923715..e1b76480b 100644 --- a/kernel/src/action_reconciliation/log_replay.rs +++ b/kernel/src/action_reconciliation/log_replay.rs @@ -479,7 +479,7 @@ mod tests { use std::collections::HashSet; use super::*; - use crate::arrow::array::StringArray; + use crate::arrow::array::LargeStringArray; use crate::utils::test_utils::{action_batch, parse_json_batch}; use crate::Error; @@ -487,7 +487,7 @@ mod tests { /// Helper function to create test batches from JSON strings fn create_batch(json_strings: Vec<&str>) -> DeltaResult { - let actions = parse_json_batch(StringArray::from(json_strings)); + let actions = parse_json_batch(LargeStringArray::from(json_strings)); Ok(ActionsBatch::new(actions, true)) } @@ -557,7 +557,7 @@ mod tests { #[test] fn test_action_reconciliation_visitor_boundary_cases_for_tombstone_expiration( ) -> DeltaResult<()> { - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"remove":{"path":"exactly_at_threshold","deletionTimestamp":100,"dataChange":true,"partitionValues":{}}}"#, r#"{"remove":{"path":"one_below_threshold","deletionTimestamp":99,"dataChange":true,"partitionValues":{}}}"#, r#"{"remove":{"path":"one_above_threshold","deletionTimestamp":101,"dataChange":true,"partitionValues":{}}}"#, @@ -592,7 +592,7 @@ mod tests { #[test] fn test_action_reconciliation_visitor_file_actions_in_batch() -> DeltaResult<()> { - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"add":{"path":"file1","partitionValues":{"c1":"6","c2":"a"},"size":452,"modificationTime":1670892998137,"dataChange":true}}"#, ] .into(); @@ -626,7 +626,7 @@ mod tests { #[test] fn test_action_reconciliation_visitor_file_actions_with_deletion_vectors() -> DeltaResult<()> { - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ // Add action for file1 with deletion vector r#"{"add":{"path":"file1","partitionValues":{},"size":635,"modificationTime":100,"dataChange":true,"deletionVector":{"storageType":"ONE","pathOrInlineDv":"dv1","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, // Remove action for file1 with a different deletion vector @@ -662,7 +662,7 @@ mod tests { #[test] fn test_action_reconciliation_visitor_already_seen_non_file_actions() -> DeltaResult<()> { - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#, r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#, r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1677811175819}}"#, @@ -697,7 +697,7 @@ mod tests { #[test] fn test_action_reconciliation_visitor_duplicate_non_file_actions() -> DeltaResult<()> { - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#, r#"{"txn":{"appId":"app1","version":1,"lastUpdated":123456789}}"#, // Duplicate txn r#"{"txn":{"appId":"app2","version":1,"lastUpdated":123456789}}"#, // Different app ID @@ -855,7 +855,7 @@ mod tests { #[test] fn test_action_reconciliation_visitor_txn_retention() -> DeltaResult<()> { - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ // Transaction with old timestamp (should be filtered) r#"{"txn":{"appId":"app1","version":1,"lastUpdated":100}}"#, // Transaction with recent timestamp (should be kept) @@ -1153,7 +1153,7 @@ mod tests { #[test] fn test_action_reconciliation_processor_error_propagation() -> DeltaResult<()> { // Test that errors from the visitor are properly propagated by the processor - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ // This will create valid data that parses correctly r#"{"add":{"path":"test","partitionValues":{},"size":100,"modificationTime":123,"dataChange":true}}"#, ].into(); diff --git a/kernel/src/actions/crc.rs b/kernel/src/actions/crc.rs index e9265b180..1b8591f73 100644 --- a/kernel/src/actions/crc.rs +++ b/kernel/src/actions/crc.rs @@ -144,7 +144,7 @@ mod tests { use std::sync::Arc; - use crate::arrow::array::StringArray; + use crate::arrow::array::LargeStringArray; use crate::actions::{Format, Metadata, Protocol}; use crate::engine::sync::SyncEngine; @@ -238,7 +238,7 @@ mod tests { // convert JSON -> StringArray -> (string)EngineData -> actual CRC EngineData let json_string = crc_json.to_string(); - let json_strings = StringArray::from(vec![json_string.as_str()]); + let json_strings = LargeStringArray::from(vec![json_string.as_str()]); let engine_data = string_array_to_engine_data(json_strings); let engine = SyncEngine::new(); let json_handler = engine.json_handler(); diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index ae7669912..ec060b1d9 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -1003,8 +1003,8 @@ mod tests { use super::*; use crate::{ arrow::array::{ - Array, BooleanArray, Int32Array, Int64Array, ListArray, ListBuilder, MapBuilder, - MapFieldNames, RecordBatch, StringArray, StringBuilder, StructArray, + Array, BooleanArray, Int32Array, Int64Array, LargeStringArray, ListArray, ListBuilder, + MapBuilder, MapFieldNames, RecordBatch, StringBuilder, StructArray, }, arrow::datatypes::{DataType as ArrowDataType, Field, Schema}, arrow::json::ReaderBuilder, @@ -1527,7 +1527,7 @@ mod tests { .into(); let schema = Arc::new(Schema::new(vec![ - Field::new("appId", ArrowDataType::Utf8, false), + Field::new("appId", ArrowDataType::LargeUtf8, false), Field::new("version", ArrowDataType::Int64, false), Field::new("lastUpdated", ArrowDataType::Int64, true), ])); @@ -1535,7 +1535,7 @@ mod tests { let expected = RecordBatch::try_new( schema, vec![ - Arc::new(StringArray::from(vec!["app_id"])), + Arc::new(LargeStringArray::from(vec!["app_id"])), Arc::new(Int64Array::from(vec![0_i64])), Arc::new(Int64Array::from(vec![None::])), ], @@ -1570,11 +1570,11 @@ mod tests { vec![ Arc::new(Int64Array::from(vec![Some(0)])), Arc::new(Int64Array::from(vec![None::])), - Arc::new(StringArray::from(vec![Some("UNKNOWN")])), + Arc::new(LargeStringArray::from(vec![Some("UNKNOWN")])), operation_parameters, - Arc::new(StringArray::from(vec![Some(format!("v{KERNEL_VERSION}"))])), - Arc::new(StringArray::from(vec![None::])), - Arc::new(StringArray::from(vec![commit_info_txn_id])), + Arc::new(LargeStringArray::from(vec![Some(format!("v{KERNEL_VERSION}"))])), + Arc::new(LargeStringArray::from(vec![None::])), + Arc::new(LargeStringArray::from(vec![commit_info_txn_id])), ], ) .unwrap(); @@ -1605,8 +1605,8 @@ mod tests { let expected = RecordBatch::try_new( record_batch.schema(), vec![ - Arc::new(StringArray::from(vec!["my.domain"])), - Arc::new(StringArray::from(vec!["config_value"])), + Arc::new(LargeStringArray::from(vec!["my.domain"])), + Arc::new(LargeStringArray::from(vec!["config_value"])), Arc::new(BooleanArray::from(vec![false])), ], ) diff --git a/kernel/src/actions/set_transaction.rs b/kernel/src/actions/set_transaction.rs index 4fb6ff0be..48bc85c0d 100644 --- a/kernel/src/actions/set_transaction.rs +++ b/kernel/src/actions/set_transaction.rs @@ -101,7 +101,7 @@ mod tests { use crate::utils::test_utils::parse_json_batch; use crate::Snapshot; - use crate::arrow::array::StringArray; + use crate::arrow::array::LargeStringArray; use itertools::Itertools; fn get_latest_transactions( @@ -197,7 +197,7 @@ mod tests { #[test] fn test_visitor_retention_with_null_last_updated() { - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"txn":{"appId":"app_with_time","version":1,"lastUpdated":100}}"#, r#"{"txn":{"appId":"app_without_time","version":2}}"#, ] diff --git a/kernel/src/actions/visitors.rs b/kernel/src/actions/visitors.rs index 390abcda9..b5b57f1c8 100644 --- a/kernel/src/actions/visitors.rs +++ b/kernel/src/actions/visitors.rs @@ -682,7 +682,7 @@ impl RowVisitor for InCommitTimestampVisitor { mod tests { use super::*; - use crate::arrow::array::StringArray; + use crate::arrow::array::LargeStringArray; use crate::engine::sync::SyncEngine; use crate::expressions::{column_expr_ref, Expression}; @@ -778,7 +778,7 @@ mod tests { #[test] fn test_parse_add_partitioned() { - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}"#, r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, @@ -831,7 +831,7 @@ mod tests { #[test] fn test_parse_remove_partitioned() { - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, r#"{"remove":{"path":"c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","deletionTimestamp":1670892998135,"dataChange":true,"partitionValues":{"c1":"4","c2":"c"},"size":452}}"#, @@ -865,7 +865,7 @@ mod tests { #[test] fn test_parse_txn() { - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"commitInfo":{"timestamp":1670892998177,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"c1\",\"c2\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"1356"},"engineInfo":"Apache-Spark/3.3.1 Delta-Lake/2.2.0","txnId":"046a258f-45e3-4657-b0bf-abfb0f76681c"}}"#, r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, @@ -900,7 +900,7 @@ mod tests { fn test_parse_domain_metadata() { // note: we process commit_1, commit_0 since the visitor expects things in reverse order. // these come from the 'more recent' commit - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, r#"{"domainMetadata":{"domain": "zach1","configuration":"cfg1","removed": true}}"#, r#"{"domainMetadata":{"domain": "zach2","configuration":"cfg2","removed": false}}"#, @@ -912,7 +912,7 @@ mod tests { .into(); let commit_1 = parse_json_batch(json_strings); // these come from the 'older' commit - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"domainMetadata":{"domain": "zach1","configuration":"old_cfg1","removed": true}}"#, r#"{"domainMetadata":{"domain": "zach2","configuration":"old_cfg2","removed": false}}"#, r#"{"domainMetadata":{"domain": "zach3","configuration":"old_cfg3","removed": false}}"#, @@ -1097,7 +1097,7 @@ mod tests { // Helper function to reduce duplication in tests fn run_timestamp_visitor_test(json_strings: Vec<&str>, expected_timestamp: Option) { - let json_strings: StringArray = json_strings.into(); + let json_strings: LargeStringArray = json_strings.into(); let batch = parse_json_batch(json_strings); let batch = transform_batch(batch); let mut visitor = InCommitTimestampVisitor::default(); diff --git a/kernel/src/engine/arrow_conversion.rs b/kernel/src/engine/arrow_conversion.rs index 42049d26a..778aae51d 100644 --- a/kernel/src/engine/arrow_conversion.rs +++ b/kernel/src/engine/arrow_conversion.rs @@ -122,7 +122,7 @@ impl TryFromKernel<&DataType> for ArrowDataType { match t { DataType::Primitive(p) => { match p { - PrimitiveType::String => Ok(ArrowDataType::Utf8), + PrimitiveType::String => Ok(ArrowDataType::LargeUtf8), PrimitiveType::Long => Ok(ArrowDataType::Int64), // undocumented type PrimitiveType::Integer => Ok(ArrowDataType::Int32), PrimitiveType::Short => Ok(ArrowDataType::Int16), diff --git a/kernel/src/engine/arrow_data.rs b/kernel/src/engine/arrow_data.rs index 1ce2154f8..4e5391dfa 100644 --- a/kernel/src/engine/arrow_data.rs +++ b/kernel/src/engine/arrow_data.rs @@ -299,7 +299,12 @@ impl ArrowEngineData { } &DataType::STRING => { debug!("Pushing string array for {}", ColumnName::new(path)); - col.as_string_opt().map(|a| a as _).ok_or("string") + // Try LargeStringArray first (our new default) + col.as_string_opt::() + .map(|a| a as _) + // Fall back to regular StringArray for compatibility + .or_else(|| col.as_string_opt::().map(|a| a as _)) + .ok_or("string") } &DataType::INTEGER => { debug!("Pushing int32 array for {}", ColumnName::new(path)); @@ -345,7 +350,9 @@ mod tests { use crate::actions::{get_commit_schema, Metadata, Protocol}; use crate::arrow::array::types::Int32Type; - use crate::arrow::array::{Array, AsArray, Int32Array, RecordBatch, StringArray}; + use crate::arrow::array::{ + Array, AsArray, Int32Array, LargeStringArray, RecordBatch, StringArray, + }; use crate::arrow::datatypes::{ DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, }; @@ -362,7 +369,7 @@ mod tests { fn test_md_extract() -> DeltaResult<()> { let engine = SyncEngine::new(); let handler = engine.json_handler(); - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"metaData":{"id":"aff5cb91-8cd9-4195-aef9-446908507302","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"c3\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["c1","c2"],"configuration":{},"createdTime":1670892997849}}"#, ] .into(); @@ -381,7 +388,7 @@ mod tests { fn test_protocol_extract() -> DeltaResult<()> { let engine = SyncEngine::new(); let handler = engine.json_handler(); - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"protocol": {"minReaderVersion": 3, "minWriterVersion": 7, "readerFeatures": ["rw1"], "writerFeatures": ["rw1", "w2"]}}"#, ] .into(); diff --git a/kernel/src/engine/arrow_expression/mod.rs b/kernel/src/engine/arrow_expression/mod.rs index 556347dd9..aa44b3b55 100644 --- a/kernel/src/engine/arrow_expression/mod.rs +++ b/kernel/src/engine/arrow_expression/mod.rs @@ -86,7 +86,7 @@ impl Scalar { Byte(val) => append_val_as!(array::Int8Builder, *val), Float(val) => append_val_as!(array::Float32Builder, *val), Double(val) => append_val_as!(array::Float64Builder, *val), - String(val) => append_val_as!(array::StringBuilder, val), + String(val) => append_val_as!(array::LargeStringBuilder, val), Boolean(val) => append_val_as!(array::BooleanBuilder, *val), Timestamp(val) | TimestampNtz(val) => { // timezone was already set at builder construction time @@ -167,7 +167,7 @@ impl Scalar { DataType::BYTE => append_null_as!(array::Int8Builder), DataType::FLOAT => append_null_as!(array::Float32Builder), DataType::DOUBLE => append_null_as!(array::Float64Builder), - DataType::STRING => append_null_as!(array::StringBuilder), + DataType::STRING => append_null_as!(array::LargeStringBuilder), DataType::BOOLEAN => append_null_as!(array::BooleanBuilder), DataType::TIMESTAMP | DataType::TIMESTAMP_NTZ => { append_null_as!(array::TimestampMicrosecondBuilder) diff --git a/kernel/src/engine/arrow_get_data.rs b/kernel/src/engine/arrow_get_data.rs index fbed64df1..f272741f2 100644 --- a/kernel/src/engine/arrow_get_data.rs +++ b/kernel/src/engine/arrow_get_data.rs @@ -51,6 +51,16 @@ impl<'a> GetData<'a> for GenericByteArray> { } } +impl<'a> GetData<'a> for GenericByteArray> { + fn get_str(&'a self, row_index: usize, _field_name: &str) -> DeltaResult> { + if self.is_valid(row_index) { + Ok(Some(self.value(row_index))) + } else { + Ok(None) + } + } +} + impl<'a, OffsetSize> GetData<'a> for GenericListArray where OffsetSize: OffsetSizeTrait, diff --git a/kernel/src/engine/arrow_utils.rs b/kernel/src/engine/arrow_utils.rs index 21cb246f3..11c362925 100644 --- a/kernel/src/engine/arrow_utils.rs +++ b/kernel/src/engine/arrow_utils.rs @@ -18,7 +18,8 @@ use crate::{ use crate::arrow::array::{ cast::AsArray, make_array, new_null_array, Array as ArrowArray, BooleanArray, GenericListArray, - MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray, + GenericStringArray, LargeStringArray, MapArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, + StringArray, StructArray, }; use crate::arrow::buffer::NullBuffer; use crate::arrow::compute::concat_batches; @@ -1012,16 +1013,25 @@ pub(crate) fn parse_json( schema: SchemaRef, ) -> DeltaResult> { let json_strings: RecordBatch = ArrowEngineData::try_from_engine_data(json_strings)?.into(); - let json_strings = json_strings - .column(0) - .as_any() - .downcast_ref::() - .ok_or_else(|| { - Error::generic("Expected json_strings to be a StringArray, found something else") - })?; + let column = json_strings.column(0); + let array_ref = column.as_any(); let schema = Arc::new(ArrowSchema::try_from_kernel(schema.as_ref())?); - let result = parse_json_impl(json_strings, schema)?; - Ok(Box::new(ArrowEngineData::new(result))) + + // Try LargeStringArray first (our new default) + if let Some(large_strings) = array_ref.downcast_ref::() { + let result = parse_json_impl(large_strings, schema)?; + return Ok(Box::new(ArrowEngineData::new(result))); + } + + // Fall back to regular StringArray for compatibility + if let Some(strings) = array_ref.downcast_ref::() { + let result = parse_json_impl(strings, schema)?; + return Ok(Box::new(ArrowEngineData::new(result))); + } + + Err(Error::generic( + "Expected json_strings to be a StringArray or LargeStringArray, found something else", + )) } // Raw arrow implementation of the json parsing. Separate from the public function for testing. @@ -1029,7 +1039,10 @@ pub(crate) fn parse_json( // NOTE: This code is really inefficient because arrow lacks the native capability to perform robust // StringArray -> StructArray JSON parsing. See https://github.com/apache/arrow-rs/issues/6522. If // that shortcoming gets fixed upstream, this method can simplify or hopefully even disappear. -fn parse_json_impl(json_strings: &StringArray, schema: ArrowSchemaRef) -> DeltaResult { +fn parse_json_impl( + json_strings: &GenericStringArray, + schema: ArrowSchemaRef, +) -> DeltaResult { if json_strings.is_empty() { return Ok(RecordBatch::new_empty(schema)); } @@ -1227,49 +1240,51 @@ mod tests { fn test_json_parsing() { let requested_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("a", ArrowDataType::Int32, true), - ArrowField::new("b", ArrowDataType::Utf8, true), + ArrowField::new("b", ArrowDataType::LargeUtf8, true), ArrowField::new("c", ArrowDataType::Int32, true), ])); let input: Vec<&str> = vec![]; - let result = parse_json_impl(&input.into(), requested_schema.clone()).unwrap(); + let result = + parse_json_impl(&LargeStringArray::from(input), requested_schema.clone()).unwrap(); assert_eq!(result.num_rows(), 0); let input: Vec> = vec![Some("")]; - let result = parse_json_impl(&input.into(), requested_schema.clone()); + let result = parse_json_impl(&LargeStringArray::from(input), requested_schema.clone()); result.expect_err("empty string"); let input: Vec> = vec![Some(" \n\t")]; - let result = parse_json_impl(&input.into(), requested_schema.clone()); + let result = parse_json_impl(&LargeStringArray::from(input), requested_schema.clone()); result.expect_err("empty string"); let input: Vec> = vec![Some(r#""a""#)]; - let result = parse_json_impl(&input.into(), requested_schema.clone()); + let result = parse_json_impl(&LargeStringArray::from(input), requested_schema.clone()); result.expect_err("invalid string"); let input: Vec> = vec![Some(r#"{ "a": 1"#)]; - let result = parse_json_impl(&input.into(), requested_schema.clone()); + let result = parse_json_impl(&LargeStringArray::from(input), requested_schema.clone()); result.expect_err("incomplete object"); let input: Vec> = vec![Some("{}{}")]; - let result = parse_json_impl(&input.into(), requested_schema.clone()); + let result = parse_json_impl(&LargeStringArray::from(input), requested_schema.clone()); assert!(matches!( result.unwrap_err(), Error::Generic(s) if s == "Malformed JSON: Multiple JSON objects" )); let input: Vec> = vec![Some(r#"{} { "a": 1"#)]; - let result = parse_json_impl(&input.into(), requested_schema.clone()); + let result = parse_json_impl(&LargeStringArray::from(input), requested_schema.clone()); assert!(matches!( result.unwrap_err(), Error::Generic(s) if s == "Malformed JSON: Multiple JSON objects" )); let input: Vec> = vec![Some(r#"{ "a": 1"#), Some(r#", "b"}"#)]; - let result = parse_json_impl(&input.into(), requested_schema.clone()); + let result = parse_json_impl(&LargeStringArray::from(input), requested_schema.clone()); result.expect_err("split object"); let input: Vec> = vec![None, Some(r#"{"a": 1, "b": "2", "c": 3}"#), None]; - let result = parse_json_impl(&input.into(), requested_schema.clone()).unwrap(); + let result = + parse_json_impl(&LargeStringArray::from(input), requested_schema.clone()).unwrap(); assert_eq!(result.num_rows(), 3); assert_eq!(result.column(0).null_count(), 2); assert_eq!(result.column(1).null_count(), 2); @@ -1281,19 +1296,61 @@ mod tests { // See issue#1139: https://github.com/delta-io/delta-kernel-rs/issues/1139 let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( "long_val", - ArrowDataType::Utf8, + ArrowDataType::LargeUtf8, true, )])); let long_string = "a".repeat(1_000_000); // 1MB string let json_string = format!(r#"{{"long_val": "{long_string}"}}"#); let input: Vec> = vec![Some(&json_string)]; - let batch = parse_json_impl(&input.into(), schema.clone()).unwrap(); + let batch = parse_json_impl(&LargeStringArray::from(input), schema.clone()).unwrap(); assert_eq!(batch.num_rows(), 1); - let long_col = batch.column(0).as_string::(); + let long_col = batch.column(0).as_string::(); assert_eq!(long_col.value(0), long_string); } + #[test] + #[ignore] // Expensive test - allocates significant memory + fn test_large_string_no_overflow() { + // Test case for issue#3790: https://github.com/delta-io/delta-rs/issues/3790 + // This test verifies that LargeStringArray with i64 offsets can handle + // string data that would overflow i32 offsets (>2.1GB total) + + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "data", + ArrowDataType::LargeUtf8, + true, + )])); + + // Create multiple moderately large strings that together exceed i32::MAX + // Using 100 strings of 30MB each = ~3GB total (exceeds i32::MAX of 2.1GB) + let large_string = "x".repeat(30_000_000); // 30MB string + let mut inputs = Vec::new(); + + for _ in 0..100 { + let json_string = format!(r#"{{"data": "{large_string}"}}"#); + // Store owned strings to avoid lifetime issues + inputs.push(json_string); + } + + // Convert to references for the array + let string_refs: Vec> = inputs.iter().map(|s| Some(s.as_str())).collect(); + + // This should work with LargeStringArray (i64 offsets) + let large_string_array = LargeStringArray::from(string_refs); + + // Parse the JSON strings + let batch = parse_json_impl(&large_string_array, schema.clone()).unwrap(); + + // Verify the results + assert_eq!(batch.num_rows(), 100); + let data_col = batch.column(0).as_string::(); + + // Check first and last values to ensure data integrity + assert_eq!(data_col.value(0), large_string); + assert_eq!(data_col.value(99), large_string); + } + #[test] fn simple_mask_indices() { column_mapping_cases().into_iter().for_each(|mode| { @@ -2783,7 +2840,7 @@ mod tests { )])); let data = RecordBatch::try_new( schema.clone(), - vec![Arc::new(StringArray::from(vec!["string1", "string2"]))], + vec![Arc::new(LargeStringArray::from(vec!["string1", "string2"]))], )?; let data: Box = Box::new(ArrowEngineData::new(data)); let filtered_data = FilteredEngineData::with_all_rows_selected(data); @@ -2805,7 +2862,7 @@ mod tests { )])); let record_batch = RecordBatch::try_new( schema.clone(), - vec![Arc::new(StringArray::from(vec![ + vec![Arc::new(LargeStringArray::from(vec![ "row0", "row1", "row2", "row3", ]))], )?; diff --git a/kernel/src/engine/default/json.rs b/kernel/src/engine/default/json.rs index 5237b67c7..6d98ccc2a 100644 --- a/kernel/src/engine/default/json.rs +++ b/kernel/src/engine/default/json.rs @@ -253,7 +253,7 @@ mod tests { use std::task::Waker; use crate::actions::get_commit_schema; - use crate::arrow::array::{AsArray, Int32Array, RecordBatch, StringArray}; + use crate::arrow::array::{AsArray, Int32Array, LargeStringArray, RecordBatch}; use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::default::executor::tokio::{ @@ -482,7 +482,7 @@ mod tests { let store = Arc::new(LocalFileSystem::new()); let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); - let json_strings = StringArray::from(vec![ + let json_strings = LargeStringArray::from(vec![ r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#, r#"{"commitInfo":{"timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Databricks-Runtime/","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#, r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#, @@ -500,7 +500,7 @@ mod tests { fn test_parse_json_drop_field() { let store = Arc::new(LocalFileSystem::new()); let handler = DefaultJsonHandler::new(store, Arc::new(TokioBackgroundExecutor::new())); - let json_strings = StringArray::from(vec![ + let json_strings = LargeStringArray::from(vec![ r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2, "maxRowId": 3}}}"#, ]); let output_schema = get_commit_schema().clone(); @@ -732,8 +732,10 @@ mod tests { DataType::Utf8, true, )])); - let batch = - RecordBatch::try_new(schema.clone(), vec![Arc::new(StringArray::from(values))])?; + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(LargeStringArray::from(values))], + )?; Ok(Box::new(ArrowEngineData::new(batch))) } diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 7c325db99..061ac6650 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -755,7 +755,7 @@ pub fn selection_vector( // some utils that are used in file_stream.rs and state.rs tests #[cfg(test)] pub(crate) mod test_utils { - use crate::arrow::array::StringArray; + use crate::arrow::array::LargeStringArray; use crate::scan::state_info::StateInfo; use crate::schema::StructType; use crate::utils::test_utils::string_array_to_engine_data; @@ -796,7 +796,7 @@ pub(crate) mod test_utils { .collect(); json_strings.push(r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#.to_string()); - let json_strings_array: StringArray = + let json_strings_array: LargeStringArray = json_strings.iter().map(|s| s.as_str()).collect_vec().into(); let parsed = handler @@ -813,7 +813,7 @@ pub(crate) mod test_utils { // The schema is provided as null columns affect equality checks. pub(crate) fn add_batch_simple(output_schema: SchemaRef) -> Box { let handler = SyncJsonHandler {}; - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues": {"date": "2017-12-10"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#, ] @@ -828,7 +828,7 @@ pub(crate) mod test_utils { // The schema is provided as null columns affect equality checks. pub(crate) fn add_batch_for_row_id(output_schema: SchemaRef) -> Box { let handler = SyncJsonHandler {}; - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues": {"date": "2017-12-10"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","baseRowId": 42, "tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none", "delta.enableRowTracking": "true", "delta.rowTracking.materializedRowIdColumnName":"row_id_col"},"createdTime":1677811175819}}"#, ] @@ -842,7 +842,7 @@ pub(crate) mod test_utils { // An add batch with a removed file parsed with the schema provided pub(crate) fn add_batch_with_remove(output_schema: SchemaRef) -> Box { let handler = SyncJsonHandler {}; - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"remove":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c001.snappy.parquet","deletionTimestamp":1677811194426,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":635,"tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#, r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c001.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#, r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues": {"date": "2017-12-10"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#, @@ -858,7 +858,7 @@ pub(crate) mod test_utils { // add batch with a `date` partition col pub(crate) fn add_batch_with_partition_col() -> Box { let handler = SyncJsonHandler {}; - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#, r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c001.snappy.parquet","partitionValues": {"date": "2017-12-11"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#, r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#, diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index de6c3e362..c57009602 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -430,7 +430,7 @@ mod tests { use test_utils::{add_commit, delta_path_for_version}; use crate::actions::Protocol; - use crate::arrow::array::StringArray; + use crate::arrow::array::LargeStringArray; use crate::arrow::record_batch::RecordBatch; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::default::executor::tokio::TokioBackgroundExecutor; @@ -708,7 +708,7 @@ mod tests { checkpoint1[2]["partitionColumns"] = serde_json::to_value(["some_partition_column"])?; let handler = engine.json_handler(); - let json_strings: StringArray = checkpoint1 + let json_strings: LargeStringArray = checkpoint1 .into_iter() .map(|json| json.to_string()) .collect::>() @@ -1395,7 +1395,7 @@ mod tests { ]; let handler = engine.json_handler(); - let json_strings: StringArray = checkpoint_data + let json_strings: LargeStringArray = checkpoint_data .into_iter() .map(|json| json.to_string()) .collect::>() diff --git a/kernel/src/utils.rs b/kernel/src/utils.rs index e77a3c893..3675f7351 100644 --- a/kernel/src/utils.rs +++ b/kernel/src/utils.rs @@ -149,7 +149,7 @@ pub(crate) mod test_utils { use crate::actions::{ get_all_actions_schema, Add, Cdc, CommitInfo, Metadata, Protocol, Remove, }; - use crate::arrow::array::{RecordBatch, StringArray}; + use crate::arrow::array::{LargeStringArray, RecordBatch}; use crate::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use crate::engine::arrow_data::ArrowEngineData; use crate::engine::sync::SyncEngine; @@ -234,15 +234,15 @@ pub(crate) mod test_utils { assert_eq!(into_record_batch(actual), into_record_batch(expected)); } - pub(crate) fn string_array_to_engine_data(string_array: StringArray) -> Box { - let string_field = Arc::new(Field::new("a", DataType::Utf8, true)); + pub(crate) fn string_array_to_engine_data(string_array: LargeStringArray) -> Box { + let string_field = Arc::new(Field::new("a", DataType::LargeUtf8, true)); let schema = Arc::new(ArrowSchema::new(vec![string_field])); let batch = RecordBatch::try_new(schema, vec![Arc::new(string_array)]) .expect("Can't convert to record batch"); Box::new(ArrowEngineData::new(batch)) } - pub(crate) fn parse_json_batch(json_strings: StringArray) -> Box { + pub(crate) fn parse_json_batch(json_strings: LargeStringArray) -> Box { let engine = SyncEngine::new(); let json_handler = engine.json_handler(); let output_schema = get_all_actions_schema().clone(); @@ -252,7 +252,7 @@ pub(crate) mod test_utils { } pub(crate) fn action_batch() -> Box { - let json_strings: StringArray = vec![ + let json_strings: LargeStringArray = vec![ r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#, r#"{"remove":{"path":"part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet","deletionTimestamp":1670892998135,"dataChange":true,"partitionValues":{"c1":"4","c2":"c"},"size":452}}"#, r#"{"commitInfo":{"timestamp":1677811178585,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"10","numOutputBytes":"635"},"engineInfo":"Databricks-Runtime/","txnId":"a6a94671-55ef-450e-9546-b8465b9147de"}}"#,