Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 143 additions & 40 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::collections::HashSet;
use std::io::{BufRead, BufReader};
use std::ops::Range;
use std::sync::Arc;

use crate::engine::ensure_data_types::DataTypeCompat;
Expand All @@ -14,16 +15,18 @@ use crate::{

use crate::arrow::array::{
cast::AsArray, make_array, new_null_array, Array as ArrowArray, GenericListArray,
OffsetSizeTrait, RecordBatch, StringArray, StructArray,
OffsetSizeTrait, PrimitiveArray, RecordBatch, StringArray, StructArray,
};
use crate::arrow::buffer::NullBuffer;
use crate::arrow::compute::concat_batches;
use crate::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields,
DataType as ArrowDataType, Field as ArrowField, FieldRef as ArrowFieldRef, Fields, Int64Type,
SchemaRef as ArrowSchemaRef,
};
use crate::arrow::json::{LineDelimitedWriter, ReaderBuilder};
use crate::parquet::file::metadata::RowGroupMetaData;
use crate::parquet::{arrow::ProjectionMask, schema::types::SchemaDescriptor};
use crate::schema::InternalMetadataColumn;
use itertools::Itertools;
use tracing::debug;

Expand Down Expand Up @@ -66,17 +69,63 @@ pub(crate) fn make_arrow_error(s: impl Into<String>) -> Error {
.with_backtrace()
}

/// Prepares to enumerate row indexes of rows in a parquet file, accounting for row group skipping.
pub(crate) struct RowIndexBuilder {
row_group_starting_row_offsets: Vec<Range<i64>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think row_group_row_indexes, or row_group_row_index_ranges or similar, would make more sense as a name. This isn't really the offsets, it's the ranges of rows that each group covers.

row_group_ordinals: Option<Vec<usize>>,
}
impl RowIndexBuilder {
pub(crate) fn new(row_groups: &[RowGroupMetaData]) -> Self {
let mut row_group_starting_row_offsets = vec![];
let mut offset = 0;
for row_group in row_groups {
let num_rows = row_group.num_rows();
row_group_starting_row_offsets.push(offset..offset + num_rows);
offset += num_rows;
}
Self {
row_group_starting_row_offsets,
row_group_ordinals: None,
}
}

/// Only produce row indexes for the row groups specified by the ordinals that survived row
/// group skipping. The ordinals must be in 0..num_row_groups.
pub(crate) fn select_row_groups(&mut self, ordinals: &[usize]) {
// NOTE: Don't apply the filtering until we actually build the iterator, because the
// filtering is not idempotent and `with_row_groups` could be called more than once.
self.row_group_ordinals = Some(ordinals.to_vec())
}
}

impl IntoIterator for RowIndexBuilder {
type Item = i64;
type IntoIter = std::iter::Flatten<std::vec::IntoIter<Range<Self::Item>>>;

fn into_iter(self) -> Self::IntoIter {
let starting_offsets = match self.row_group_ordinals {
Some(ordinals) => ordinals
.iter()
.map(|i| self.row_group_starting_row_offsets[*i].clone())
.collect(),
None => self.row_group_starting_row_offsets,
};
starting_offsets.into_iter().flatten()
Comment on lines +106 to +113
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could avoid the clone with:

Suggested change
let starting_offsets = match self.row_group_ordinals {
Some(ordinals) => ordinals
.iter()
.map(|i| self.row_group_starting_row_offsets[*i].clone())
.collect(),
None => self.row_group_starting_row_offsets,
};
starting_offsets.into_iter().flatten()
if let Some(ordinals) = self.row_group_ordinals {
let mut keep = vec![false; self.row_group_starting_row_offsets.len()];
for i in ordinals.iter() {
keep[*i] = true;
}
let mut iter = keep.iter();
self.row_group_starting_row_offsets.retain(|_| *iter.next().unwrap());
}
self.row_group_starting_row_offsets.into_iter().flatten()

Trades against allocating the vec, so not sure it's actually much better.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clone of a Range<usize> should be super cheap -- no worse than a fat pointer. IMO it should be Copy, but 🤷

}
}

/// Applies post-processing to data read from parquet files. This includes `reorder_struct_array` to
/// ensure schema compatibility, as well as `fix_nested_null_masks` to ensure that leaf columns have
/// accurate null masks that row visitors rely on for correctness.
pub(crate) fn fixup_parquet_read<T>(
batch: RecordBatch,
requested_ordering: &[ReorderIndex],
row_indexes: &mut impl Iterator<Item = i64>,
) -> DeltaResult<T>
where
StructArray: Into<T>,
{
let data = reorder_struct_array(batch.into(), requested_ordering)?;
let data = reorder_struct_array(batch.into(), requested_ordering, row_indexes)?;
let data = fix_nested_null_masks(data);
Ok(data.into())
}
Expand Down Expand Up @@ -188,7 +237,7 @@ where
#[derive(Debug, PartialEq)]
pub(crate) struct ReorderIndex {
pub(crate) index: usize,
transform: ReorderIndexTransform,
pub(crate) transform: ReorderIndexTransform,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A consequence of removing the ReorderIndex::needs_transform method.

}

#[derive(Debug, PartialEq)]
Expand All @@ -201,6 +250,8 @@ pub(crate) enum ReorderIndexTransform {
Identity,
/// Data is missing, fill in with a null column
Missing(ArrowFieldRef),
/// Row index column requested, compute it
RowIndex(ArrowFieldRef),
}

impl ReorderIndex {
Expand All @@ -224,17 +275,8 @@ impl ReorderIndex {
ReorderIndex::new(index, ReorderIndexTransform::Missing(field))
}

/// Check if this reordering requires a transformation anywhere. See comment below on
/// [`ordering_needs_transform`] to understand why this is needed.
fn needs_transform(&self) -> bool {
match self.transform {
// if we're casting or inserting null, we need to transform
ReorderIndexTransform::Cast(_) | ReorderIndexTransform::Missing(_) => true,
// if our nested ordering needs a transform, we need a transform
ReorderIndexTransform::Nested(ref children) => ordering_needs_transform(children),
// no transform needed
ReorderIndexTransform::Identity => false,
}
Comment on lines -229 to -237
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indirection and mutual recursion of this method plus ordering_needs_transform made the implementation more complex than it needed to be. See below.

fn row_index(index: usize, field: ArrowFieldRef) -> Self {
ReorderIndex::new(index, ReorderIndexTransform::RowIndex(field))
}
}

Expand Down Expand Up @@ -413,7 +455,13 @@ fn get_indices(
// some fields are missing, but they might be nullable, need to insert them into the reorder_indices
for (requested_position, field) in requested_schema.fields().enumerate() {
if !found_fields.contains(field.name()) {
if field.nullable {
if let Ok(InternalMetadataColumn::RowIndex) = field.try_into() {
debug!("Inserting a row index column: {}", field.name());
reorder_indices.push(ReorderIndex::row_index(
requested_position,
Arc::new(field.try_into()?),
));
} else if field.nullable {
debug!("Inserting missing and nullable field: {}", field.name());
reorder_indices.push(ReorderIndex::missing(
requested_position,
Expand Down Expand Up @@ -473,22 +521,19 @@ pub(crate) fn generate_mask(

/// Check if an ordering requires transforming the data in any way. This is true if the indices are
/// NOT in ascending order (so we have to reorder things), or if we need to do any transformation on
/// the data read from parquet. We check the ordering here, and also call
/// `ReorderIndex::needs_transform` on each element to check for other transforms, and to check
/// `Nested` variants recursively.
/// the data read from parquet. The check is recursive over `Nested` transforms.
fn ordering_needs_transform(requested_ordering: &[ReorderIndex]) -> bool {
if requested_ordering.is_empty() {
return false;
}
// we have >=1 element. check that the first element doesn't need a transform
if requested_ordering[0].needs_transform() {
return true;
}
// Check for all elements if we need a transform. This is true if any elements are not in order
// (i.e. element[i].index < element[i+1].index), or any element needs a transform
requested_ordering
.windows(2)
.any(|ri| (ri[0].index >= ri[1].index) || ri[1].needs_transform())
let requested_ordering = &mut requested_ordering.iter().enumerate();
requested_ordering.any(|(i, ReorderIndex { index, transform })| {
use ReorderIndexTransform::*;
match (index, transform) {
(index, _) if *index != i => (),
(_, Cast(_) | Missing(_) | RowIndex(_)) => (),
(_, Nested(ref children)) if ordering_needs_transform(children) => (),
(_, Identity | Nested(_)) => return false, // fully ordered and trivial so far
}
true // out of order or non-trivial transform detected
})
}

// we use this as a placeholder for an array and its associated field. We can fill in a Vec of None
Expand All @@ -500,6 +545,7 @@ type FieldArrayOpt = Option<(Arc<ArrowField>, Arc<dyn ArrowArray>)>;
pub(crate) fn reorder_struct_array(
input_data: StructArray,
requested_ordering: &[ReorderIndex],
row_indexes: &mut impl Iterator<Item = i64>,
) -> DeltaResult<StructArray> {
debug!("Reordering {input_data:?} with ordering: {requested_ordering:?}");
if !ordering_needs_transform(requested_ordering) {
Expand Down Expand Up @@ -532,8 +578,11 @@ pub(crate) fn reorder_struct_array(
match input_cols[parquet_position].data_type() {
ArrowDataType::Struct(_) => {
let struct_array = input_cols[parquet_position].as_struct().clone();
let result_array =
Arc::new(reorder_struct_array(struct_array, children)?);
let result_array = Arc::new(reorder_struct_array(
struct_array,
children,
row_indexes,
)?);
// create the new field specifying the correct order for the struct
let new_field = Arc::new(ArrowField::new_struct(
input_fields[parquet_position].name(),
Expand Down Expand Up @@ -578,6 +627,21 @@ pub(crate) fn reorder_struct_array(
let field = field.clone(); // cheap Arc clone
final_fields_cols[reorder_index.index] = Some((field, null_array));
}
ReorderIndexTransform::RowIndex(field) => {
let row_index_array: PrimitiveArray<Int64Type> =
row_indexes.take(num_rows).collect();
require!(
row_index_array.len() == num_rows,
Error::internal_error(format!(
"Row index iterator exhausted after only {} of {} rows",
row_index_array.len(),
num_rows
))
);
let field = field.clone(); // cheap aArc clone
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
let field = field.clone(); // cheap aArc clone
let field = field.clone(); // cheap Arc clone

final_fields_cols[reorder_index.index] =
Some((field, Arc::new(row_index_array)));
}
}
}
let num_cols = final_fields_cols.len();
Expand All @@ -603,7 +667,14 @@ fn reorder_list<O: OffsetSizeTrait>(
let (list_field, offset_buffer, maybe_sa, null_buf) = list_array.into_parts();
if let Some(struct_array) = maybe_sa.as_struct_opt() {
let struct_array = struct_array.clone();
let result_array = Arc::new(reorder_struct_array(struct_array, children)?);
// WARNING: We cannot naively plumb through our caller's row index iterator, because each
// array element of a given row must replicate the row's index and empty arrays must drop
// that row's index. For now, just don't support it (Delta doesn't need the capability).
let result_array = Arc::new(reorder_struct_array(
struct_array,
children,
&mut std::iter::empty(),
)?);
let new_list_field = Arc::new(ArrowField::new_struct(
list_field.name(),
result_array.fields().clone(),
Expand Down Expand Up @@ -944,6 +1015,31 @@ mod tests {
assert_eq!(reorder_indices, expect_reorder);
}

#[test]
fn simple_row_index_field() {
let row_index_field = InternalMetadataColumn::RowIndex.as_struct_field("my_row_index");
let arrow_row_index_field = ArrowField::try_from(&row_index_field).unwrap();
let requested_schema = Arc::new(StructType::new([
StructField::not_null("i", DataType::INTEGER),
row_index_field,
StructField::nullable("i2", DataType::INTEGER),
]));
let parquet_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("i", ArrowDataType::Int32, false),
ArrowField::new("i2", ArrowDataType::Int32, true),
]));
let (mask_indices, reorder_indices) =
get_requested_indices(&requested_schema, &parquet_schema).unwrap();
let expect_mask = vec![0, 1];
let expect_reorder = vec![
ReorderIndex::identity(0),
ReorderIndex::identity(2),
ReorderIndex::row_index(1, Arc::new(arrow_row_index_field)),
];
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}

#[test]
fn nested_indices() {
let requested_schema = Arc::new(StructType::new([
Expand Down Expand Up @@ -1336,7 +1432,7 @@ mod tests {
fn simple_reorder_struct() {
let arry = make_struct_array();
let reorder = vec![ReorderIndex::identity(1), ReorderIndex::identity(0)];
let ordered = reorder_struct_array(arry, &reorder).unwrap();
let ordered = reorder_struct_array(arry, &reorder, &mut std::iter::empty()).unwrap();
assert_eq!(ordered.column_names(), vec!["c", "b"]);
}

Expand Down Expand Up @@ -1375,19 +1471,25 @@ mod tests {
ReorderIndex::nested(
0,
vec![
ReorderIndex::identity(0),
ReorderIndex::identity(1),
ReorderIndex::identity(2),
ReorderIndex::missing(
2,
3,
Arc::new(ArrowField::new("s", ArrowDataType::Utf8, true)),
),
ReorderIndex::row_index(
0,
Arc::new(ArrowField::new("r", ArrowDataType::Int64, false)),
),
],
),
];
let ordered = reorder_struct_array(nested, &reorder).unwrap();
let mut row_indexes = [0, 1, 2, 3, 4].into_iter();
let ordered = reorder_struct_array(nested, &reorder, &mut row_indexes).unwrap();
assert_eq!(row_indexes.next(), Some(4));
assert_eq!(ordered.column_names(), vec!["struct2", "struct1"]);
let ordered_s2 = ordered.column(0).as_struct();
assert_eq!(ordered_s2.column_names(), vec!["b", "c", "s"]);
assert_eq!(ordered_s2.column_names(), vec!["r", "b", "c", "s"]);
let ordered_s1 = ordered.column(1).as_struct();
assert_eq!(ordered_s1.column_names(), vec!["c", "b"]);
}
Expand Down Expand Up @@ -1431,7 +1533,8 @@ mod tests {
0,
vec![ReorderIndex::identity(1), ReorderIndex::identity(0)],
)];
let ordered = reorder_struct_array(struct_array, &reorder).unwrap();
let ordered =
reorder_struct_array(struct_array, &reorder, &mut std::iter::empty()).unwrap();
let ordered_list_col = ordered.column(0).as_list::<i32>();
for i in 0..ordered_list_col.len() {
let array_item = ordered_list_col.value(i);
Expand Down
Loading
Loading