Skip to content

Conversation

@scovich
Copy link
Collaborator

@scovich scovich commented May 6, 2025

What changes are proposed in this pull request?

Deletion vectors (and row tracking, eventually) rely on accurate file-level row indexes. But they're not implemented in the kernel's default parquet reader. That means we must rely on the position of rows in data batches returned by each read, and we cannot apply optimizations such as stats-based row group skipping (see #860).

Add row index support to the default parquet reader, in the form of a new RowIndex variant of ReorderIndexTransform. Also start stubbing in a framework for internal metadata columns, in the form of InternalMetadataColumn that can be converted to a specially annotated StructField. Readers can add a row index column to their schema by passing the column name of their choosing to InternalMetadataColumn::RowIndex.as_struct_field. The default parquet reader recognizes that column and injects a transform to generate row indexes (with appropriate adjustments for any row group skipping that might occur).

Fixes #919

NOTE: If/when arrow-rs parquet reader gains native support for row indexes, e.g. apache/arrow-rs#7307, we should switch to using that. Our solution here is not robust to advanced parquet reader features like page-level skipping. row-level predicate pushdown, etc.

How was this change tested?

TODO: I couldn't find any parquet files in our test set that have multiple row groups, so there is incomplete coverage of the code that adjusts for row group skipping. Ideally, we would have a test that skips the middle of three row groups, to verify that the sequence of row indexes correctly skips the row group as well.

New unit tests.

pub(crate) struct ReorderIndex {
pub(crate) index: usize,
transform: ReorderIndexTransform,
pub(crate) transform: ReorderIndexTransform,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A consequence of removing the ReorderIndex::needs_transform method.

Comment on lines -229 to -237
fn needs_transform(&self) -> bool {
match self.transform {
// if we're casting or inserting null, we need to transform
ReorderIndexTransform::Cast(_) | ReorderIndexTransform::Missing(_) => true,
// if our nested ordering needs a transform, we need a transform
ReorderIndexTransform::Nested(ref children) => ordering_needs_transform(children),
// no transform needed
ReorderIndexTransform::Identity => false,
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indirection and mutual recursion of this method plus ordering_needs_transform made the implementation more complex than it needed to be. See below.

@codecov
Copy link

codecov bot commented May 6, 2025

Codecov Report

Attention: Patch coverage is 89.81481% with 22 lines in your changes missing coverage. Please review.

Project coverage is 85.19%. Comparing base (399baf5) to head (c284607).

Files with missing lines Patch % Lines
kernel/src/engine/arrow_utils.rs 90.56% 5 Missing and 5 partials ⚠️
kernel/src/engine/default/parquet.rs 90.16% 5 Missing and 1 partial ⚠️
kernel/src/schema/mod.rs 87.87% 4 Missing ⚠️
kernel/src/engine/parquet_row_group_skipping.rs 91.66% 0 Missing and 1 partial ⚠️
kernel/src/engine/sync/parquet.rs 75.00% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #920      +/-   ##
==========================================
+ Coverage   85.11%   85.19%   +0.07%     
==========================================
  Files          87       87              
  Lines       21874    22049     +175     
  Branches    21874    22049     +175     
==========================================
+ Hits        18619    18784     +165     
- Misses       2298     2306       +8     
- Partials      957      959       +2     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment on lines 463 to 473
let mut metas = vec![];
for url in urls {
println!("url: {}", url);
let location = Path::from_url_path(url.path()).unwrap();
let meta = store.head(&location).await.unwrap();
metas.push(FileMeta {
location: url.clone(),
last_modified: meta.last_modified.timestamp(),
size: meta.size.try_into().unwrap(),
});
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My async-foo was too weak to collect this from a stream... meh.

@scovich scovich force-pushed the row-index-support branch from c284607 to ac33ce2 Compare May 12, 2025 21:50
Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great.

The only issue I see is that I don't think we actually test reading a parquet file where we skip any row groups.

I searched our whole repo and I don't think we actually have a test parquet file with more than one row group 🤦🏽. Might be worth adding something since we'll want that as a test for skipping as well.


/// Prepares to enumerate row indexes of rows in a parquet file, accounting for row group skipping.
pub(crate) struct RowIndexBuilder {
row_group_starting_row_offsets: Vec<Range<i64>>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think row_group_row_indexes, or row_group_row_index_ranges or similar, would make more sense as a name. This isn't really the offsets, it's the ranges of rows that each group covers.

Comment on lines +106 to +113
let starting_offsets = match self.row_group_ordinals {
Some(ordinals) => ordinals
.iter()
.map(|i| self.row_group_starting_row_offsets[*i].clone())
.collect(),
None => self.row_group_starting_row_offsets,
};
starting_offsets.into_iter().flatten()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could avoid the clone with:

Suggested change
let starting_offsets = match self.row_group_ordinals {
Some(ordinals) => ordinals
.iter()
.map(|i| self.row_group_starting_row_offsets[*i].clone())
.collect(),
None => self.row_group_starting_row_offsets,
};
starting_offsets.into_iter().flatten()
if let Some(ordinals) = self.row_group_ordinals {
let mut keep = vec![false; self.row_group_starting_row_offsets.len()];
for i in ordinals.iter() {
keep[*i] = true;
}
let mut iter = keep.iter();
self.row_group_starting_row_offsets.retain(|_| *iter.next().unwrap());
}
self.row_group_starting_row_offsets.into_iter().flatten()

Trades against allocating the vec, so not sure it's actually much better.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clone of a Range<usize> should be super cheap -- no worse than a fat pointer. IMO it should be Copy, but 🤷

num_rows
))
);
let field = field.clone(); // cheap aArc clone
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
let field = field.clone(); // cheap aArc clone
let field = field.clone(); // cheap Arc clone

"./tests/data/data-reader-timestamp_ntz/tsNtzPartition=2013-07-05 17%3A01%3A00.123456/part-00001-336e3e5f-a202-4bd9-b117-28d871bbb639.c000.snappy.parquet",
"./tests/data/data-reader-timestamp_ntz/tsNtzPartition=2021-11-18 02%3A30%3A00.123456/part-00000-65fcd5cb-f2f3-44f4-96ef-f43825143ba9.c000.snappy.parquet",
].map(|p| {
//println!("p: {:?}", std::fs::canonicalize(PathBuf::from(p)).unwrap());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove

});
let mut metas = vec![];
for url in urls {
println!("url: {}", url);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove


/// When present in a [`StructField::metadata`], identifies which internal Delta metadata column the
/// field represents.
pub(crate) const INTERNAL_METADATA_COLUMN_KEY: &str = "delta.__internal__.metadata_column";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a note, none of this delta.__internal__. stuff is actually in the protocol :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. It's purely internal to kernel and the parquet reader.

Whether that's a good thing is a whole different question. I'm not fully convinced this is the right approach to requesting a row index from the parquet reader, but I don't know a better way. Approaches I've seen in the wild are:

  1. Define special annotations for otherwise "normal" columns, which the reader is expected to interpret as a special request if it sees them in the read schema. Spark takes this approach (by special struct field metadata entries). Iceberg also does (by defining special field ids for metadata columns). Downsides are:
    1. A reader can accidentally ignore the annotation, likely producing an all-NULL column because nothing with that name is physically present in the file. In the worst case, the file contains a column with that name, and we return file data instead.
    2. It's easy to accidentally propagate the annotation beyond the parquet reader and cause confusion downstream. In the worst case, a writer could even produce a parquet file that contains those annotations, or a table schema that includes them.
  2. Define specially-named columns, which the reader is expected to interpret as a special request if it sees them in the read schema. Delta does this with the CDC columns in data files. Downsides are:
    1. Same as 1/ above, with the additional problem that it's vulnerable to name collisions (what if the table actually has a column named _row_index for some reason).
  3. Define a special API call on the parquet reader that injects the metadata column into the output schema (usually as the last column), even tho the read schema doesn't actually mention it. Avoids any ambiguity about what was requested. Downsides:
    1. The reader has to be ready for a result that contains columns the read schema didn't mention. And it has to be ordinal, not name based (names could collide).

Note that special Delta columns such as row ids will pose the same problem for kernel <-> engine interface that file row indexes pose for the kernel <-> parquet interface here. Ideally whatever solution we come up with can handle both of those interfaces.

zachschuermann added a commit that referenced this pull request Sep 19, 2025
## What changes are proposed in this pull request?

This PR follows up on #1266 and adds support for reading the row index
metadata column to the default engine. The implementation directly
follows the approach proposed in #920 and slightly modifies it to match
the new metadata column API.

Quoting from #920 

> Deletion vectors (and row tracking, eventually) rely on accurate
file-level row indexes. But they're not implemented in the kernel's
default parquet reader. That means we must rely on the position of rows
in data batches returned by each read, and we cannot apply optimizations
such as stats-based row group skipping (see
#860).
>
> Add row index support to the default Parquet reader, in the form of a
new RowIndex variant of ReorderIndexTransform. [...] The default parquet
reader recognizes (the RowIndex metadata) column and injects a transform
to generate row indexes (with appropriate adjustments for any row group
skipping that might occur).
>
> Fixes #919
>
> NOTE: If/when arrow-rs parquet reader gains native support for row
indexes, e.g. apache/arrow-rs#7307, we should
switch to using that. Our solution here is not robust to advanced
parquet reader features like page-level skipping. row-level predicate
pushdown, etc.

### This PR affects the following public APIs

None - the breaking changes were introduced in #1266.

## How was this change tested?

New UT.

Co-authored-by: Zach Schuermann <[email protected]>
@lbhm
Copy link
Collaborator

lbhm commented Sep 22, 2025

@scovich, do we want to close this one, considering that #1272 merged?

@scovich
Copy link
Collaborator Author

scovich commented Sep 22, 2025

do we want to close this one, considering that #1272 merged?

Yup, thanks!

@scovich scovich closed this Sep 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add row index support to the default parquet reader

3 participants