Skip to content

Enable skip_rows in the chunked parquet reader. #18130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 46 commits into
base: branch-25.08
Choose a base branch
from

Conversation

mhaseeb123
Copy link
Member

@mhaseeb123 mhaseeb123 commented Feb 28, 2025

Description

Closes #16186
Closes #16249

This PR adds multiple bug fixes required to enable the skip_rows option for the Chunked Parquet reader.

CC: @etseidl

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

Copy link

copy-pr-bot bot commented Feb 28, 2025

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@github-actions github-actions bot added the libcudf Affects libcudf (C++/CUDA) code. label Feb 28, 2025
@mhaseeb123 mhaseeb123 added bug Something isn't working 2 - In Progress Currently a work in progress cuIO cuIO issue breaking Breaking change labels Feb 28, 2025
@mhaseeb123 mhaseeb123 changed the title 🚧 Enable skip_rows in the chunked parquet reader. 🚧 Enable skip_rows in the chunked parquet reader. Feb 28, 2025
Copy link
Contributor

@Matt711 Matt711 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can support skipping row groups in cudf.polars (xref #16257) once this PR goes through.

@github-actions github-actions bot added the Python Affects Python cuDF API. label Feb 28, 2025
@mhaseeb123 mhaseeb123 requested a review from nvdbaranec March 1, 2025 04:31
@@ -539,19 +539,24 @@ struct get_page_span {
auto const column_page_end = page_row_index + page_offsets[column_index + 1];
auto const num_pages = column_page_end - column_page_start;
bool const is_list = chunks[column_index].max_level[level_type::REPETITION] > 0;
// list rows can span page boundaries, so it is not always safe to filter page spans simply
// based on `start_row` so select all pages.
if (is_list) {
Copy link
Member Author

@mhaseeb123 mhaseeb123 Mar 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not 100% sure about this fix yet but seems like do end up needing more pages than found with start_row and end_row for list columns in some cases. I am guessing the page_row_index doesn't contain exact row indices for list columns? Maybe @etseidl or @nvdbaranec can elaborate if possible.

Question: There shouldn't be any performance penalties for this change as we do a trim pass later on with accurate row counts for lists pages, right?

The following case fails (not all pages for the list col c selected) without this change.

import cudf
from cudf.testing import assert_eq
import pyarrow as pa
from io import BytesIO

data = [
    {
        "a": "g",
        "b": {
            "b_a": 10,
            "b_b": {"b_b_b": None, "b_b_a": 2},
        },
        "c": None,
    },
    {"a": None, "b": {"b_a": None, "b_b": None}, "c": [15, 16]},
    {"a": "j", "b": None, "c": [8, 10]},
    {"a": None, "b": {"b_a": None, "b_b": None}, "c": None},
    None,
    {
        "a": None,
        "b": {"b_a": None, "b_b": {"b_b_b": 1}},
        "c": [18, 19],
    },
    {"a": None, "b": None, "c": None},
] * 1000

pa_struct = pa.Table.from_pydict({"struct": data})
df = cudf.DataFrame.from_arrow(pa_struct)

buffer = BytesIO()
df.to_parquet(buffer, row_group_size_rows=7000, max_page_size_rows=100)

# Number of rows to read
with cudf.option_context("io.parquet.low_memory", True):
    actual = cudf.read_parquet(
        [buffer],
        _chunk_read_limit=0,
        _pass_read_limit=1024000000,
        nrows=999,
        skip_rows=6001,
    ).reset_index(drop=True)
expected = cudf.read_parquet(buffer, nrows=999, skip_rows=6001).reset_index(
    drop=True
)
assert_eq(expected, actual)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is the right way to do this. If I'm understanding what's going on here, this will just arbitrarily return the entire row group. Which I'm pretty sure will cause us to read all pages for all columns in a row group, which reverts us back to just being a row group reader in the case where lists exist.

I think the only real option here, if you have lists, is for each eligible row group

  • march through the entire row group, using the row estimates as we do now.

  • for each subpass that we read in, after we compute the real row counts (roughly here

    size_t first_page_index = 0;
    ) determine re-apply skip_rows/num_rows. If we have managed to read a subpass that contains none of the rows we care about, early out and move on to the next subpass.

  • stop early if we cross the last row index we care about.

subpass.num_rows = max_row - subpass.skip_rows;
max_row = std::min<size_t>(max_row, pass_end);
// Make sure we don't skip past the max rows.
CUDF_EXPECTS(max_row > subpass.skip_rows, "Unexpected subpass row count");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catch the infinite for loop here if occurs.

@@ -539,19 +539,24 @@ struct get_page_span {
auto const column_page_end = page_row_index + page_offsets[column_index + 1];
auto const num_pages = column_page_end - column_page_start;
bool const is_list = chunks[column_index].max_level[level_type::REPETITION] > 0;
// list rows can span page boundaries, so it is not always safe to filter page spans simply
// based on `start_row` so select all pages.
if (is_list) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is the right way to do this. If I'm understanding what's going on here, this will just arbitrarily return the entire row group. Which I'm pretty sure will cause us to read all pages for all columns in a row group, which reverts us back to just being a row group reader in the case where lists exist.

I think the only real option here, if you have lists, is for each eligible row group

  • march through the entire row group, using the row estimates as we do now.

  • for each subpass that we read in, after we compute the real row counts (roughly here

    size_t first_page_index = 0;
    ) determine re-apply skip_rows/num_rows. If we have managed to read a subpass that contains none of the rows we care about, early out and move on to the next subpass.

  • stop early if we cross the last row index we care about.

@mhaseeb123 mhaseeb123 marked this pull request as draft May 14, 2025 06:13
@GPUtester GPUtester moved this to In Progress in cuDF Python May 14, 2025
@mhaseeb123
Copy link
Member Author

Accidentally clicked Ready for review instead of Update branch. Will mark this PR ready once it is

@mhaseeb123 mhaseeb123 requested a review from nvdbaranec May 20, 2025 05:55
@mhaseeb123 mhaseeb123 changed the base branch from branch-25.06 to branch-25.08 May 20, 2025 18:57
@mhaseeb123
Copy link
Member Author

/ok to test

Comment on lines +1399 to +1402
thrust::for_each(rmm::exec_policy_nosync(_stream),
thrust::counting_iterator<size_t>(0),
thrust::counting_iterator(pass.pages.size()),
compute_page_num_rows_from_chunk_rows{pass.pages, pass.chunks});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vaguely don't like the idea of putting an accurate computation for row counts inside an function named estimates. This is probably the right place for it though, so maybe the function name should be more specific.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the name of the function with an input argument indicating if we are writing estimates or not

@mhaseeb123 mhaseeb123 changed the title 🚧 Enable skip_rows in the chunked parquet reader. Enable skip_rows in the chunked parquet reader. May 22, 2025
@mhaseeb123 mhaseeb123 added 3 - Ready for Review Ready for review by team and removed 2 - In Progress Currently a work in progress labels May 22, 2025
@mhaseeb123 mhaseeb123 marked this pull request as ready for review May 22, 2025 21:29
@mhaseeb123 mhaseeb123 requested a review from nvdbaranec May 23, 2025 20:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3 - Ready for Review Ready for review by team breaking Breaking change bug Something isn't working cuIO cuIO issue libcudf Affects libcudf (C++/CUDA) code. Python Affects Python cuDF API.
Projects
Status: In Progress
Status: No status
3 participants