Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 161 additions & 13 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ macro_rules! prim_array_cmp {
pub(crate) use prim_array_cmp;

type FieldIndex = usize;
type FlattenedRangeIterator<T> = std::iter::Flatten<std::vec::IntoIter<Range<T>>>;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The various call sites like fixup_parquet_read and reorder_struct_array should start using this instead of <RowIndexBuilder as IntoIterator>::IntoIter

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The various call sites like fixup_parquet_read and reorder_struct_array should start using this instead of <RowIndexBuilder as IntoIterator>::IntoIter

I also did this as part of this PR. I searched for all occurrences of <RowIndexBuilder as IntoIterator> and replaced them.


/// contains information about a StructField matched to a parquet struct field
///
Expand Down Expand Up @@ -128,21 +129,36 @@ impl RowIndexBuilder {
// filtering is not idempotent and `with_row_groups` could be called more than once.
self.row_group_ordinals = Some(ordinals.to_vec())
}
}

impl IntoIterator for RowIndexBuilder {
type Item = i64;
type IntoIter = std::iter::Flatten<std::vec::IntoIter<Range<Self::Item>>>;

fn into_iter(self) -> Self::IntoIter {
/// Build an iterator of row indexes, filtering out row groups that were skipped.
///
/// # Errors
///
/// Returns an error if there are duplicate or out of bounds row group ordinals.
pub(crate) fn build(self) -> DeltaResult<FlattenedRangeIterator<i64>> {
let starting_offsets = match self.row_group_ordinals {
Some(ordinals) => ordinals
.iter()
.map(|i| self.row_group_row_index_ranges[*i].clone())
.collect(),
Some(ordinals) => {
let mut seen_ordinals = HashSet::new();
ordinals
.iter()
.map(|&i| {
// We verify that there are no duplicate or out of bounds ordinals
if !seen_ordinals.insert(i) {
return Err(Error::generic("Found duplicate row group ordinal"));
}
// We have to clone here to avoid modifying the original vector in each iteration
self.row_group_row_index_ranges
.get(i)
.cloned()
.ok_or_else(|| {
Error::generic(format!("Row group ordinal {i} is out of bounds"))
})
})
.try_collect()?
}
None => self.row_group_row_index_ranges,
};
starting_offsets.into_iter().flatten()
Ok(starting_offsets.into_iter().flatten())
}
}

Expand All @@ -153,7 +169,7 @@ impl IntoIterator for RowIndexBuilder {
pub(crate) fn fixup_parquet_read<T>(
batch: RecordBatch,
requested_ordering: &[ReorderIndex],
row_indexes: Option<&mut <RowIndexBuilder as IntoIterator>::IntoIter>,
row_indexes: Option<&mut FlattenedRangeIterator<i64>>,
) -> DeltaResult<T>
where
StructArray: Into<T>,
Expand Down Expand Up @@ -750,7 +766,7 @@ type FieldArrayOpt = Option<(Arc<ArrowField>, Arc<dyn ArrowArray>)>;
pub(crate) fn reorder_struct_array(
input_data: StructArray,
requested_ordering: &[ReorderIndex],
mut row_indexes: Option<&mut <RowIndexBuilder as IntoIterator>::IntoIter>,
mut row_indexes: Option<&mut FlattenedRangeIterator<i64>>,
) -> DeltaResult<StructArray> {
debug!("Reordering {input_data:?} with ordering: {requested_ordering:?}");
if !ordering_needs_transform(requested_ordering) {
Expand Down Expand Up @@ -1153,6 +1169,41 @@ mod tests {
])
}

/// Helper function to create mock row group metadata for testing
fn create_mock_row_group(num_rows: i64) -> RowGroupMetaData {
use crate::parquet::basic::{Encoding, Type as PhysicalType};
use crate::parquet::file::metadata::ColumnChunkMetaData;
use crate::parquet::schema::types::Type;

// Create a minimal schema descriptor
let schema = Arc::new(SchemaDescriptor::new(Arc::new(
Type::group_type_builder("schema")
.with_fields(vec![Arc::new(
Type::primitive_type_builder("test_col", PhysicalType::INT32)
.build()
.unwrap(),
)])
.build()
.unwrap(),
)));

// Create a minimal column chunk metadata
let column_chunk = ColumnChunkMetaData::builder(schema.column(0))
.set_encodings(vec![Encoding::PLAIN])
.set_total_compressed_size(100)
.set_total_uncompressed_size(100)
.set_num_values(num_rows)
.build()
.unwrap();

RowGroupMetaData::builder(schema)
.set_num_rows(num_rows)
.set_total_byte_size(100)
.set_column_metadata(vec![column_chunk])
.build()
.unwrap()
}

#[test]
fn test_json_parsing() {
let requested_schema = Arc::new(ArrowSchema::new(vec![
Expand Down Expand Up @@ -1783,6 +1834,103 @@ mod tests {
assert_eq!(reorder_indices, expect_reorder);
}

#[test]
fn test_row_index_builder_no_skipping() {
let row_groups = vec![
create_mock_row_group(5), // 5 rows: indexes 0-4
create_mock_row_group(3), // 3 rows: indexes 5-7
create_mock_row_group(4), // 4 rows: indexes 8-11
];

let builder = RowIndexBuilder::new(&row_groups);
let row_indexes: Vec<i64> = builder.build().unwrap().collect();

// Should produce consecutive indexes from 0 to 11
assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]);
}

#[test]
fn test_row_index_builder_with_skipping() {
let row_groups = vec![
create_mock_row_group(5), // 5 rows: indexes 0-4
create_mock_row_group(3), // 3 rows: indexes 5-7 (will be skipped)
create_mock_row_group(4), // 4 rows: indexes 8-11
create_mock_row_group(2), // 2 rows: indexes 12-13 (will be skipped)
];

let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[0, 2]);

let row_indexes: Vec<i64> = builder.build().unwrap().collect();

// Should produce indexes from row groups 0 and 2: [0-4] and [8-11]
assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 8, 9, 10, 11]);
}

#[test]
fn test_row_index_builder_single_row_group() {
let row_groups = vec![create_mock_row_group(7)];

let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[0]);

let row_indexes: Vec<i64> = builder.build().unwrap().collect();

assert_eq!(row_indexes, vec![0, 1, 2, 3, 4, 5, 6]);
}

#[test]
fn test_row_index_builder_empty_selection() {
let row_groups = vec![create_mock_row_group(3), create_mock_row_group(2)];

let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[]);

let row_indexes: Vec<i64> = builder.build().unwrap().collect();

// Should produce no indexes
assert_eq!(row_indexes, Vec::<i64>::new());
}

#[test]
fn test_row_index_builder_out_of_order_selection() {
let row_groups = vec![
create_mock_row_group(2), // 2 rows: indexes 0-1
create_mock_row_group(3), // 3 rows: indexes 2-4
create_mock_row_group(1), // 1 row: index 5
];

let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[2, 0]);

let row_indexes: Vec<i64> = builder.build().unwrap().collect();

// Should produce indexes in the order specified: group 2 first, then group 0
assert_eq!(row_indexes, vec![5, 0, 1]);
}

#[test]
fn test_row_index_builder_out_of_bounds_row_group_ordinals() {
let row_groups = vec![create_mock_row_group(2)];

let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[1]);

let result = builder.build();
assert_result_error_with_message(result, "Row group ordinal 1 is out of bounds");
}

#[test]
fn test_row_index_builder_duplicate_row_group_ordinals() {
let row_groups = vec![create_mock_row_group(2), create_mock_row_group(3)];

let mut builder = RowIndexBuilder::new(&row_groups);
builder.select_row_groups(&[1, 1]);

let result = builder.build();
assert_result_error_with_message(result, "Found duplicate row group ordinal");
}

#[test]
fn nested_indices() {
column_mapping_cases().into_iter().for_each(|mode| {
Expand Down
4 changes: 2 additions & 2 deletions kernel/src/engine/default/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl FileOpener for ParquetOpener {
builder = builder.with_limit(limit)
}

let mut row_indexes = row_indexes.map(|rb| rb.into_iter());
let mut row_indexes = row_indexes.map(|rb| rb.build()).transpose()?;
let stream = builder.with_batch_size(batch_size).build()?;

let stream = stream.map(move |rbr| {
Expand Down Expand Up @@ -416,7 +416,7 @@ impl FileOpener for PresignedUrlOpener {

let reader = builder.with_batch_size(batch_size).build()?;

let mut row_indexes = row_indexes.map(|rb| rb.into_iter());
let mut row_indexes = row_indexes.map(|rb| rb.build()).transpose()?;
let stream = futures::stream::iter(reader);
let stream = stream.map(move |rbr| {
fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut())
Expand Down
2 changes: 1 addition & 1 deletion kernel/src/engine/sync/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ fn try_create_from_parquet(
builder = builder.with_row_group_filter(predicate.as_ref(), row_indexes.as_mut());
}

let mut row_indexes = row_indexes.map(|rb| rb.into_iter());
let mut row_indexes = row_indexes.map(|rb| rb.build()).transpose()?;
let stream = builder.build()?;
Ok(stream.map(move |rbr| fixup_parquet_read(rbr?, &requested_ordering, row_indexes.as_mut())))
}
Expand Down
55 changes: 33 additions & 22 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::path::PathBuf;
use std::sync::Arc;

use delta_kernel::actions::deletion_vector::split_vector;
use delta_kernel::arrow::array::AsArray as _;
use delta_kernel::arrow::array::{AsArray as _, BooleanArray};
use delta_kernel::arrow::compute::{concat_batches, filter_record_batch};
use delta_kernel::arrow::datatypes::{Int64Type, Schema as ArrowSchema};
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::engine::arrow_conversion::TryFromKernel as _;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
Expand All @@ -14,7 +15,7 @@ use delta_kernel::expressions::{
};
use delta_kernel::parquet::file::properties::{EnabledStatistics, WriterProperties};
use delta_kernel::scan::state::{transform_to_logical, DvInfo, Stats};
use delta_kernel::scan::Scan;
use delta_kernel::scan::{Scan, ScanResult};
use delta_kernel::schema::{DataType, MetadataColumnSpec, Schema, StructField, StructType};
use delta_kernel::{Engine, FileMeta, Snapshot};

Expand All @@ -33,6 +34,23 @@ const PARQUET_FILE1: &str = "part-00000-a72b1fb3-f2df-41fe-a8f0-e65b746382dd-c00
const PARQUET_FILE2: &str = "part-00001-c506e79a-0bf8-4e2b-a42b-9731b2e490ae-c000.snappy.parquet";
const PARQUET_FILE3: &str = "part-00002-c506e79a-0bf8-4e2b-a42b-9731b2e490ff-c000.snappy.parquet";

/// Helper function to extract filtered data from a scan result, respecting row masks
fn extract_record_batch(
scan_result: ScanResult,
) -> Result<RecordBatch, Box<dyn std::error::Error>> {
Comment on lines +38 to +40
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sort of a prefactoring. As I understand #860, we currently never produce a mask for filter predicates (only for DVs). Once we resolve #860, this method will be handy.

let mask = scan_result.full_mask();
let record_batch = into_record_batch(scan_result.raw_data?);

if let Some(mask) = mask {
Ok(filter_record_batch(
&record_batch,
&BooleanArray::from(mask),
)?)
} else {
Ok(record_batch)
}
}

#[tokio::test]
async fn single_commit_two_add_files() -> Result<(), Box<dyn std::error::Error>> {
let batch = generate_simple_batch()?;
Expand Down Expand Up @@ -1382,24 +1400,18 @@ async fn test_row_index_metadata_column() -> Result<(), Box<dyn std::error::Erro
)
.await?;

storage
.put(
&Path::from(PARQUET_FILE1),
record_batch_to_bytes(&batch1).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE2),
record_batch_to_bytes(&batch2).into(),
)
.await?;
storage
.put(
&Path::from(PARQUET_FILE3),
record_batch_to_bytes(&batch3).into(),
)
.await?;
for (parquet_file, batch) in [
(PARQUET_FILE1, &batch1),
(PARQUET_FILE2, &batch2),
(PARQUET_FILE3, &batch3),
] {
storage
.put(
&Path::from(parquet_file),
record_batch_to_bytes(batch).into(),
)
.await?;
}

let location = Url::parse("memory:///")?;
let engine = Arc::new(DefaultEngine::new(
Expand All @@ -1422,8 +1434,7 @@ async fn test_row_index_metadata_column() -> Result<(), Box<dyn std::error::Erro
let stream = scan.execute(engine.clone())?;

for scan_result in stream {
let data = scan_result?.raw_data?;
let batch = into_record_batch(data);
let batch = extract_record_batch(scan_result?)?;
file_count += 1;

// Verify the schema structure
Expand Down
Loading