Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 148 additions & 10 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ macro_rules! prim_array_cmp {
pub(crate) use prim_array_cmp;

type FieldIndex = usize;
type FlattenedRangeIterator<T> = std::iter::Flatten<std::vec::IntoIter<Range<T>>>;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The various call sites like fixup_parquet_read and reorder_struct_array should start using this instead of <RowIndexBuilder as IntoIterator>::IntoIter

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The various call sites like fixup_parquet_read and reorder_struct_array should start using this instead of <RowIndexBuilder as IntoIterator>::IntoIter

I also did this as part of this PR. I searched for all occurrences of <RowIndexBuilder as IntoIterator> and replaced them.


/// contains information about a StructField matched to a parquet struct field
///
Expand Down Expand Up @@ -132,16 +133,26 @@ impl RowIndexBuilder {

impl IntoIterator for RowIndexBuilder {
type Item = i64;
type IntoIter = std::iter::Flatten<std::vec::IntoIter<Range<Self::Item>>>;
type IntoIter = FlattenedRangeIterator<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
let starting_offsets = match self.row_group_ordinals {
Some(ordinals) => ordinals
.iter()
.map(|i| self.row_group_row_index_ranges[*i].clone())
.collect(),
None => self.row_group_row_index_ranges,
};
let starting_offsets =
match self.row_group_ordinals {
Some(ordinals) => {
// We generally ignore invalid row group ordinals, but in non-optimized builds,
// we verify that all ordinals are within bounds
debug_assert!(
ordinals.iter().all(|&i| i < self.row_group_row_index_ranges.len()),
"All row group ordinals must be within bounds of row_group_row_index_ranges"
);
// We have to clone here to avoid modifying the original vector in each iteration
ordinals
.iter()
.filter_map(|&i| self.row_group_row_index_ranges.get(i).cloned())
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think clone is a reasonable trade-off here, since (1) we are only cloning Ranges (i.e., two i64s) and (2) preventing the clone would require us to modify the underlying vector as we iterate (costly) or to design something like a FilteredRowIndexIterator that does not collect intermediate results.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, IMO Range<i64> should be Copy -- it's only 16 bytes, the same size as &[T] which is Copy.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While working on this, I learned that Range<T> is not Copy because copyable iterators can be confusing.

Thus, we are only left with cloning as far as I'm aware.

.collect()
}
None => self.row_group_row_index_ranges,
};
starting_offsets.into_iter().flatten()
}
}
Expand All @@ -153,7 +164,7 @@ impl IntoIterator for RowIndexBuilder {
pub(crate) fn fixup_parquet_read<T>(
batch: RecordBatch,
requested_ordering: &[ReorderIndex],
row_indexes: Option<&mut <RowIndexBuilder as IntoIterator>::IntoIter>,
row_indexes: Option<&mut FlattenedRangeIterator<i64>>,
) -> DeltaResult<T>
where
StructArray: Into<T>,
Expand Down Expand Up @@ -750,7 +761,7 @@ type FieldArrayOpt = Option<(Arc<ArrowField>, Arc<dyn ArrowArray>)>;
pub(crate) fn reorder_struct_array(
input_data: StructArray,
requested_ordering: &[ReorderIndex],
mut row_indexes: Option<&mut <RowIndexBuilder as IntoIterator>::IntoIter>,
mut row_indexes: Option<&mut FlattenedRangeIterator<i64>>,
) -> DeltaResult<StructArray> {
debug!("Reordering {input_data:?} with ordering: {requested_ordering:?}");
if !ordering_needs_transform(requested_ordering) {
Expand Down Expand Up @@ -1153,6 +1164,41 @@ mod tests {
])
}

/// Helper function to create mock row group metadata for testing
fn create_mock_row_group(num_rows: i64) -> RowGroupMetaData {
use crate::parquet::basic::{Encoding, Type as PhysicalType};
use crate::parquet::file::metadata::ColumnChunkMetaData;
use crate::parquet::schema::types::Type;

// Create a minimal schema descriptor
let schema = Arc::new(SchemaDescriptor::new(Arc::new(
Type::group_type_builder("schema")
.with_fields(vec![Arc::new(
Type::primitive_type_builder("test_col", PhysicalType::INT32)
.build()
.unwrap(),
)])
.build()
.unwrap(),
)));

// Create a minimal column chunk metadata
let column_chunk = ColumnChunkMetaData::builder(schema.column(0))
.set_encodings(vec![Encoding::PLAIN])
.set_total_compressed_size(100)
.set_total_uncompressed_size(100)
.set_num_values(num_rows)
.build()
.unwrap();

RowGroupMetaData::builder(schema)
.set_num_rows(num_rows)
.set_total_byte_size(100)
.set_column_metadata(vec![column_chunk])
.build()
.unwrap()
}

#[test]
fn test_json_parsing() {
let requested_schema = Arc::new(ArrowSchema::new(vec![
Expand Down Expand Up @@ -1783,6 +1829,98 @@ mod tests {
assert_eq!(reorder_indices, expect_reorder);
}

#[test]
fn test_row_index_builder_no_skipping() {
let row_groups = vec![
create_mock_row_group(5), // 5 rows: indexes 0-4
create_mock_row_group(3), // 3 rows: indexes 5-7
create_mock_row_group(4), // 4 rows: indexes 8-11
];

let builder = RowIndexBuilder::new(&row_groups);
let row_indexes: Vec<i64> = builder.into_iter().collect();

// Should produce consecutive indexes from 0 to 11
assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
}

#[test]
fn test_row_index_builder_with_skipping() {
let row_groups = vec![
create_mock_row_group(5), // 5 rows: indexes 0-4
create_mock_row_group(3), // 3 rows: indexes 5-7 (will be skipped)
create_mock_row_group(4), // 4 rows: indexes 8-11
create_mock_row_group(2), // 2 rows: indexes 12-13 (will be skipped)
];

let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[0, 2]);

let row_indexes: Vec<i64> = builder.into_iter().collect();

// Should produce indexes from row groups 0 and 2: [0-4] and [8-11]
assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 8, 9, 10, 11]);
}

#[test]
fn test_row_index_builder_single_row_group() {
let row_groups = vec![create_mock_row_group(7)];

let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[0]);

let row_indexes: Vec<i64> = builder.into_iter().collect();

assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 5, 6]);
}

#[test]
fn test_row_index_builder_empty_selection() {
let row_groups = vec![create_mock_row_group(3), create_mock_row_group(2)];

let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[]);

let row_indexes: Vec<i64> = builder.into_iter().collect();

// Should produce no indexes
assert_eq!(row_indexes, Vec::<i64>::new());
}

#[test]
fn test_row_index_builder_out_of_order_selection() {
let row_groups = vec![
create_mock_row_group(2), // 2 rows: indexes 0-1
create_mock_row_group(3), // 3 rows: indexes 2-4
create_mock_row_group(1), // 1 row: index 5
];

let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[2, 0]);

let row_indexes: Vec<i64> = builder.into_iter().collect();

// Should produce indexes in the order specified: group 2 first, then group 0
assert_eq!(row_indexes, vec![5, 0, 1]);
}

#[test]
#[cfg(debug_assertions)]
#[should_panic(
expected = "All row group ordinals must be within bounds of row_group_row_index_ranges"
)]
fn test_row_index_builder_expect_debug_panic_for_out_of_bounds_row_group_ordinals() {
let row_groups = vec![create_mock_row_group(2)];

let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[1]);

// This should panic because the row group ordinal is out of bounds
// NOTE: This panic only happens in non-optimized builds
let row_indexes: Vec<i64> = builder.into_iter().collect();
assert_eq!(row_indexes, vec![0, 1]);
}

#[test]
fn nested_indices() {
column_mapping_cases().into_iter().for_each(|mode| {
Expand Down
55 changes: 33 additions & 22 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::path::PathBuf;
use std::sync::Arc;

use delta_kernel::actions::deletion_vector::split_vector;
use delta_kernel::arrow::array::AsArray as _;
use delta_kernel::arrow::array::{AsArray as _, BooleanArray};
use delta_kernel::arrow::compute::{concat_batches, filter_record_batch};
use delta_kernel::arrow::datatypes::{Int64Type, Schema as ArrowSchema};
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::engine::arrow_conversion::TryFromKernel as _;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
Expand All @@ -14,7 +15,7 @@ use delta_kernel::expressions::{
};
use delta_kernel::parquet::file::properties::{EnabledStatistics, WriterProperties};
use delta_kernel::scan::state::{transform_to_logical, DvInfo, Stats};
use delta_kernel::scan::Scan;
use delta_kernel::scan::{Scan, ScanResult};
use delta_kernel::schema::{DataType, MetadataColumnSpec, Schema, StructField, StructType};
use delta_kernel::{Engine, FileMeta, Snapshot};

Expand All @@ -33,6 +34,23 @@ const PARQUET_FILE1: &str = "part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c00
const PARQUET_FILE2: &str = "part-00001-c506e79a-0bf8-4e2b-a42b-9731b2e490ae-c000.snappy.parquet";
const PARQUET_FILE3: &str = "part-00002-c506e79a-0bf8-4e2b-a42b-9731b2e490ff-c000.snappy.parquet";

/// Helper function to extract filtered data from a scan result, respecting row masks
fn extract_record_batch(
scan_result: ScanResult,
) -> Result<RecordBatch, Box<dyn std::error::Error>> {
Comment on lines +38 to +40
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sort of a prefactoring. As I understand #860, we currently never produce a mask for filter predicates (only for DVs). Once we resolve #860, this method will be handy.

let mask = scan_result.full_mask();
let record_batch = into_record_batch(scan_result.raw_data?);

if let Some(mask) = mask {
Ok(filter_record_batch(
&record_batch,
&BooleanArray::from(mask),
)?)
} else {
Ok(record_batch)
}
}

#[tokio::test]
async fn single_commit_two_add_files() -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_simple_batch()?;
Expand Down Expand Up @@ -1382,24 +1400,18 @@ async fn test_row_index_metadata_column() -> Result<(), Box<dyn std::error::Erro
)
.await?;

storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch1).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes(&batch2).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE3),
record_batch_to_bytes(&batch3).into(),
)
.await?;
for (parquet_file, batch) in [
(PARQUET_FILE1, &batch1),
(PARQUET_FILE2, &batch2),
(PARQUET_FILE3, &batch3),
] {
storage
.put(
&Path::from(parquet_file),
record_batch_to_bytes(batch).into(),
)
.await?;
}

let location = Url::parse("memory:///")?;
let engine = Arc::new(DefaultEngine::new(
Expand All @@ -1422,8 +1434,7 @@ async fn test_row_index_metadata_column() -> Result<(), Box<dyn std::error::Erro
let stream = scan.execute(engine.clone())?;

for scan_result in stream {
let data = scan_result?.raw_data?;
let batch = into_record_batch(data);
let batch = extract_record_batch(scan_result?)?;
file_count += 1;

// Verify the schema structure
Expand Down
Loading