Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1003,8 +1003,8 @@ mod tests {
use super::*;
use crate::{
arrow::array::{
Array, BooleanArray, Int32Array, Int64Array, ListArray, ListBuilder, MapBuilder,
MapFieldNames, RecordBatch, StringArray, StringBuilder, StructArray,
Array, BooleanArray, Int32Array, Int64Array, LargeStringArray, LargeStringBuilder,
ListArray, ListBuilder, MapBuilder, MapFieldNames, RecordBatch, StructArray,
},
arrow::datatypes::{DataType as ArrowDataType, Field, Schema},
arrow::json::ReaderBuilder,
Expand Down Expand Up @@ -1044,19 +1044,19 @@ mod tests {

fn create_string_map_builder(
nullable_values: bool,
) -> MapBuilder<StringBuilder, StringBuilder> {
) -> MapBuilder<LargeStringBuilder, LargeStringBuilder> {
MapBuilder::new(
Some(MapFieldNames {
entry: "key_value".to_string(),
key: "key".to_string(),
value: "value".to_string(),
}),
StringBuilder::new(),
StringBuilder::new(),
LargeStringBuilder::new(),
LargeStringBuilder::new(),
)
.with_values_field(Field::new(
"value".to_string(),
ArrowDataType::Utf8,
ArrowDataType::LargeUtf8,
nullable_values,
))
}
Expand Down Expand Up @@ -1527,15 +1527,15 @@ mod tests {
.into();

let schema = Arc::new(Schema::new(vec![
Field::new("appId", ArrowDataType::Utf8, false),
Field::new("appId", ArrowDataType::LargeUtf8, false),
Field::new("version", ArrowDataType::Int64, false),
Field::new("lastUpdated", ArrowDataType::Int64, true),
]));

let expected = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["app_id"])),
Arc::new(LargeStringArray::from(vec!["app_id"])),
Arc::new(Int64Array::from(vec![0_i64])),
Arc::new(Int64Array::from(vec![None::<i64>])),
],
Expand Down Expand Up @@ -1570,11 +1570,13 @@ mod tests {
vec![
Arc::new(Int64Array::from(vec![Some(0)])),
Arc::new(Int64Array::from(vec![None::<i64>])),
Arc::new(StringArray::from(vec![Some("UNKNOWN")])),
Arc::new(LargeStringArray::from(vec![Some("UNKNOWN")])),
operation_parameters,
Arc::new(StringArray::from(vec![Some(format!("v{KERNEL_VERSION}"))])),
Arc::new(StringArray::from(vec![None::<String>])),
Arc::new(StringArray::from(vec![commit_info_txn_id])),
Arc::new(LargeStringArray::from(vec![Some(format!(
"v{KERNEL_VERSION}"
))])),
Arc::new(LargeStringArray::from(vec![None::<String>])),
Arc::new(LargeStringArray::from(vec![commit_info_txn_id])),
],
)
.unwrap();
Expand Down Expand Up @@ -1605,8 +1607,8 @@ mod tests {
let expected = RecordBatch::try_new(
record_batch.schema(),
vec![
Arc::new(StringArray::from(vec!["my.domain"])),
Arc::new(StringArray::from(vec!["config_value"])),
Arc::new(LargeStringArray::from(vec!["my.domain"])),
Arc::new(LargeStringArray::from(vec!["config_value"])),
Arc::new(BooleanArray::from(vec![false])),
],
)
Expand Down Expand Up @@ -1859,7 +1861,7 @@ mod tests {
.unwrap()
.into();

let list_field = Arc::new(Field::new("element", ArrowDataType::Utf8, false));
let list_field = Arc::new(Field::new("element", ArrowDataType::LargeUtf8, false));
let protocol_fields = vec![
Field::new("minReaderVersion", ArrowDataType::Int32, false),
Field::new("minWriterVersion", ArrowDataType::Int32, false),
Expand All @@ -1876,13 +1878,13 @@ mod tests {
];
let schema = Arc::new(Schema::new(protocol_fields.clone()));

let string_builder = StringBuilder::new();
let string_builder = LargeStringBuilder::new();
let mut list_builder = ListBuilder::new(string_builder).with_field(list_field.clone());
list_builder.values().append_value("columnMapping");
list_builder.append(true);
let reader_features_array = list_builder.finish();

let string_builder = StringBuilder::new();
let string_builder = LargeStringBuilder::new();
let mut list_builder = ListBuilder::new(string_builder).with_field(list_field.clone());
list_builder.values().append_value("deletionVectors");
list_builder.append(true);
Expand Down
16 changes: 15 additions & 1 deletion kernel/src/engine/arrow_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl TryFromKernel<&DataType> for ArrowDataType {
match t {
DataType::Primitive(p) => {
match p {
PrimitiveType::String => Ok(ArrowDataType::Utf8),
PrimitiveType::String => Ok(ArrowDataType::LargeUtf8),
PrimitiveType::Long => Ok(ArrowDataType::Int64), // undocumented type
PrimitiveType::Integer => Ok(ArrowDataType::Int32),
PrimitiveType::Short => Ok(ArrowDataType::Int16),
Expand Down Expand Up @@ -341,4 +341,18 @@ mod tests {
.contains("Incorrect Variant Schema"));
Ok(())
}

#[test]
fn test_string_uses_large_type() -> DeltaResult<()> {
// Verify that STRING type converts to LargeUtf8 to avoid 2GB overflow panics
let string_type = DataType::STRING;
let arrow_type = ArrowDataType::try_from_kernel(&string_type)?;
assert_eq!(arrow_type, ArrowDataType::LargeUtf8);

// Verify that LargeUtf8 can be converted back to STRING
let kernel_type = DataType::try_from_arrow(&ArrowDataType::LargeUtf8)?;
assert_eq!(kernel_type, DataType::STRING);

Ok(())
}
}
65 changes: 44 additions & 21 deletions kernel/src/engine/arrow_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@

pub use crate::engine::arrow_utils::fix_nested_null_masks;

/// Helper function to get a string value from an array that could be either Utf8 or LargeUtf8
fn get_string_value(array: &dyn Array, index: usize) -> String {
match array.data_type() {
ArrowDataType::Utf8 => array.as_string::<i32>().value(index).to_string(),
ArrowDataType::LargeUtf8 => array.as_string::<i64>().value(index).to_string(),
_ => panic!("Expected string array, got {:?}", array.data_type()),

Check failure on line 28 in kernel/src/engine/arrow_data.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

`panic` should not be present in production code

Check failure on line 28 in kernel/src/engine/arrow_data.rs

View workflow job for this annotation

GitHub Actions / build (macOS-latest)

`panic` should not be present in production code
}
}

/// Helper function to get a string slice from an array that could be either Utf8 or LargeUtf8
fn get_string_slice<'a>(array: &'a dyn Array, index: usize) -> &'a str {

Check failure on line 33 in kernel/src/engine/arrow_data.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

the following explicit lifetimes could be elided: 'a

Check failure on line 33 in kernel/src/engine/arrow_data.rs

View workflow job for this annotation

GitHub Actions / build (macOS-latest)

the following explicit lifetimes could be elided: 'a
match array.data_type() {
ArrowDataType::Utf8 => array.as_string::<i32>().value(index),
ArrowDataType::LargeUtf8 => array.as_string::<i64>().value(index),
_ => panic!("Expected string array, got {:?}", array.data_type()),

Check failure on line 37 in kernel/src/engine/arrow_data.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

`panic` should not be present in production code

Check failure on line 37 in kernel/src/engine/arrow_data.rs

View workflow job for this annotation

GitHub Actions / build (macOS-latest)

`panic` should not be present in production code
}
}

/// ArrowEngineData holds an Arrow `RecordBatch`, implements `EngineData` so the kernel can extract from it.
///
/// WARNING: Row visitors require that all leaf columns of the record batch have correctly computed
Expand Down Expand Up @@ -100,8 +118,7 @@

fn get(&self, row_index: usize, index: usize) -> String {
let arry = self.value(row_index);
let sarry = arry.as_string::<i32>();
sarry.value(index).to_string()
get_string_value(arry.as_ref(), index)
}

fn materialize(&self, row_index: usize) -> Vec<String> {
Expand All @@ -118,14 +135,13 @@
let offsets = self.offsets();
let start_offset = offsets[row_index] as usize;
let count = offsets[row_index + 1] as usize - start_offset;
let keys = self.keys().as_string::<i32>();
for (idx, map_key) in keys.iter().enumerate().skip(start_offset).take(count) {
if let Some(map_key) = map_key {
if key == map_key {
// found the item
let vals = self.values().as_string::<i32>();
return Some(vals.value(idx));
}
let keys = self.keys();

for idx in start_offset..(start_offset + count) {
let map_key = get_string_slice(keys.as_ref(), idx);
if key == map_key {
// found the item
return Some(get_string_slice(self.values().as_ref(), idx));
}
}
None
Expand All @@ -134,12 +150,13 @@
fn materialize(&self, row_index: usize) -> HashMap<String, String> {
let mut ret = HashMap::new();
let map_val = self.value(row_index);
let keys = map_val.column(0).as_string::<i32>();
let values = map_val.column(1).as_string::<i32>();
for (key, value) in keys.iter().zip(values.iter()) {
if let (Some(key), Some(value)) = (key, value) {
ret.insert(key.into(), value.into());
}
let keys_array = map_val.column(0);
let values_array = map_val.column(1);

for idx in 0..keys_array.len() {
let key = get_string_value(keys_array.as_ref(), idx);
let value = get_string_value(values_array.as_ref(), idx);
ret.insert(key, value);
}
ret
}
Expand Down Expand Up @@ -277,19 +294,21 @@
data_type: &DataType,
col: &'a dyn Array,
) -> DeltaResult<&'a dyn GetData<'a>> {
use ArrowDataType::Utf8;
use ArrowDataType::{LargeUtf8, Utf8};
let is_string_type = |dt: &ArrowDataType| matches!(dt, Utf8 | LargeUtf8);
let col_as_list = || {
if let Some(array) = col.as_list_opt::<i32>() {
(array.value_type() == Utf8).then_some(array as _)
is_string_type(&array.value_type()).then_some(array as _)
} else if let Some(array) = col.as_list_opt::<i64>() {
(array.value_type() == Utf8).then_some(array as _)
is_string_type(&array.value_type()).then_some(array as _)
} else {
None
}
};
let col_as_map = || {
col.as_map_opt().and_then(|array| {
(array.key_type() == &Utf8 && array.value_type() == &Utf8).then_some(array as _)
(is_string_type(array.key_type()) && is_string_type(array.value_type()))
.then_some(array as _)
})
};
let result: Result<&'a dyn GetData<'a>, _> = match data_type {
Expand All @@ -299,7 +318,11 @@
}
&DataType::STRING => {
debug!("Pushing string array for {}", ColumnName::new(path));
col.as_string_opt().map(|a| a as _).ok_or("string")
// Try both Utf8 (i32) and LargeUtf8 (i64) string types
col.as_string_opt::<i32>()
.map(|a| a as _)
.or_else(|| col.as_string_opt::<i64>().map(|a| a as _))
.ok_or("string")
}
&DataType::INTEGER => {
debug!("Pushing int32 array for {}", ColumnName::new(path));
Expand Down
9 changes: 8 additions & 1 deletion kernel/src/engine/arrow_expression/evaluate_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,12 +337,19 @@ pub fn evaluate_predicate(
(Expression::Literal(_), Expression::Column(_)) => {
let left = evaluate_expression(left, batch, None)?;
let right = evaluate_expression(right, batch, None)?;
// Handle both Utf8 (i32) and LargeUtf8 (i64) string types
if let Some(string_arr) = left.as_string_opt::<i32>() {
if let Some(list_arr) = right.as_list_opt::<i32>() {
let result = in_list_utf8(string_arr, list_arr)?;
return Ok(result);
}
}
if let Some(string_arr) = left.as_string_opt::<i64>() {
if let Some(list_arr) = right.as_list_opt::<i32>() {
let result = in_list_utf8(string_arr, list_arr)?;
return Ok(result);
}
}

use ArrowDataType::*;
prim_array_cmp! {
Expand Down Expand Up @@ -1041,7 +1048,7 @@ mod tests {
let result2 = coalesce_arrays(&[arr1, arr2], Some(&DataType::STRING));
assert_result_error_with_message(
result2,
"Requested result type Utf8 does not match arrays' data type Int32",
"Requested result type LargeUtf8 does not match arrays' data type Int32",
);
}

Expand Down
4 changes: 2 additions & 2 deletions kernel/src/engine/arrow_expression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Scalar {
Byte(val) => append_val_as!(array::Int8Builder, *val),
Float(val) => append_val_as!(array::Float32Builder, *val),
Double(val) => append_val_as!(array::Float64Builder, *val),
String(val) => append_val_as!(array::StringBuilder, val),
String(val) => append_val_as!(array::LargeStringBuilder, val),
Boolean(val) => append_val_as!(array::BooleanBuilder, *val),
Timestamp(val) | TimestampNtz(val) => {
// timezone was already set at builder construction time
Expand Down Expand Up @@ -167,7 +167,7 @@ impl Scalar {
DataType::BYTE => append_null_as!(array::Int8Builder),
DataType::FLOAT => append_null_as!(array::Float32Builder),
DataType::DOUBLE => append_null_as!(array::Float64Builder),
DataType::STRING => append_null_as!(array::StringBuilder),
DataType::STRING => append_null_as!(array::LargeStringBuilder),
DataType::BOOLEAN => append_null_as!(array::BooleanBuilder),
DataType::TIMESTAMP | DataType::TIMESTAMP_NTZ => {
append_null_as!(array::TimestampMicrosecondBuilder)
Expand Down
18 changes: 9 additions & 9 deletions kernel/src/engine/arrow_expression/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::ops::{Add, Div, Mul, Sub};

use crate::arrow::array::{
create_array, Array, ArrayRef, BooleanArray, GenericStringArray, Int32Array, Int32Builder,
ListArray, MapArray, MapBuilder, MapFieldNames, StringArray, StringBuilder, StructArray,
LargeStringBuilder, ListArray, MapArray, MapBuilder, MapFieldNames, StringArray, StructArray,
};
use crate::arrow::buffer::{BooleanBuffer, NullBuffer, OffsetBuffer, ScalarBuffer};
use crate::arrow::compute::kernels::cmp::{gt_eq, lt};
Expand Down Expand Up @@ -231,7 +231,7 @@ fn test_literal_complex_type_array() {
let expected_values = [Some(1), Some(2), None, Some(3)];
let expected_keys = (0..10).flat_map(|_| expected_keys.iter().cloned());
let expected_values = (0..10).flat_map(|_| expected_values.iter().cloned());
let map_keys = map_array.keys().as_string::<i32>();
let map_keys = map_array.keys().as_string::<i64>();
assert!(expected_keys.zip(map_keys).all(|(a, b)| a == b.unwrap()));
let map_values = map_array
.values()
Expand Down Expand Up @@ -266,11 +266,11 @@ fn test_invalid_array_sides() {

#[test]
fn test_str_arrays() {
let values = GenericStringArray::<i32>::from(vec![
let values = GenericStringArray::<i64>::from(vec![
"hi", "bye", "hi", "hi", "bye", "bye", "hi", "bye", "hi",
]);
let offsets = OffsetBuffer::new(ScalarBuffer::from(vec![0, 3, 6, 9]));
let field = Arc::new(Field::new("item", DataType::Utf8, true));
let field = Arc::new(Field::new("item", DataType::LargeUtf8, true));
let arr_field = Arc::new(Field::new("item", DataType::List(field.clone()), true));
let schema = Schema::new([arr_field.clone()]);
let array = ListArray::new(field.clone(), offsets, Arc::new(values), None);
Expand Down Expand Up @@ -664,12 +664,12 @@ fn test_null_row() {
Arc::new(StructArray::new_null(
[
Arc::new(Field::new("a", DataType::Int32, true)),
Arc::new(Field::new("b", DataType::Utf8, false)),
Arc::new(Field::new("b", DataType::LargeUtf8, false)),
]
.into(),
1,
)),
create_array!(Utf8, [None::<String>]),
create_array!(LargeUtf8, [None::<String>]),
],
)
.unwrap();
Expand Down Expand Up @@ -723,15 +723,15 @@ fn test_create_one() {

let expected_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Utf8, true),
Field::new("b", DataType::LargeUtf8, true),
Field::new("c", DataType::Int32, false),
Field::new("d", DataType::Int32, true),
]));
let expected = RecordBatch::try_new(
expected_schema,
vec![
create_array!(Int32, [1]),
create_array!(Utf8, ["B"]),
create_array!(LargeUtf8, ["B"]),
create_array!(Int32, [3]),
create_array!(Int32, [None]),
],
Expand Down Expand Up @@ -879,7 +879,7 @@ fn test_scalar_map() -> DeltaResult<()> {
let arrow_array = scalar_map.to_array(2)?;
let map_array = arrow_array.as_any().downcast_ref::<MapArray>().unwrap();

let key_builder = StringBuilder::new();
let key_builder = LargeStringBuilder::new();
let val_builder = Int32Builder::new();
let names = MapFieldNames {
entry: "key_values".to_string(),
Expand Down
10 changes: 10 additions & 0 deletions kernel/src/engine/arrow_get_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ impl<'a> GetData<'a> for GenericByteArray<GenericStringType<i32>> {
}
}

impl<'a> GetData<'a> for GenericByteArray<GenericStringType<i64>> {
fn get_str(&'a self, row_index: usize, _field_name: &str) -> DeltaResult<Option<&'a str>> {
if self.is_valid(row_index) {
Ok(Some(self.value(row_index)))
} else {
Ok(None)
}
}
}

impl<'a, OffsetSize> GetData<'a> for GenericListArray<OffsetSize>
where
OffsetSize: OffsetSizeTrait,
Expand Down
Loading
Loading