Chunk UnnestExec output batches to bound memory usage and preserve stacked unnest semantics#20866
Draft
kosiew wants to merge 16 commits intoapache:mainfrom
Draft
Chunk UnnestExec output batches to bound memory usage and preserve stacked unnest semantics#20866kosiew wants to merge 16 commits intoapache:mainfrom
UnnestExec output batches to bound memory usage and preserve stacked unnest semantics#20866kosiew wants to merge 16 commits intoapache:mainfrom
Conversation
Drain each upstream RecordBatch in smaller contiguous row slices based on the session batch_size, preventing the full materialization of large batches at once. For recursive unnesting (depth > 1), fallback to single-row slices to maintain bounded memory use while preserving recursive semantics.
Consolidate List / LargeList / FixedSizeList dispatching into a single helper function as_list_array_type. This simplifies the code by replacing multiple match blocks with a uniform call. Any future changes to list datatypes or null semantics will now only require adjustments in as_list_array_type and its trait implementations.
Introduce new tests for unnest_chunks_high_fanout_batches: - Test handling of list columns with varying lengths. - Validate row output with nulls dropped when preserve_nulls=false. - Verify recursive unnesting works correctly under memory pressure.
Introduce PendingBatch struct to streamline UnnestStream. Encapsulate row slicing and cursor advancement in PendingBatch methods, reducing complexity and improving code readability. Replace existing fields in UnnestStream and update relevant methods to utilize the new structure.
Remove redundant scaffolding in PendingBatch within unnest.rs by integrating exhaustion checks into the existing logic. Additionally, extract common functionality into helper functions in mod.rs to reduce repetition in new unnest chunking tests, improving readability and maintainability.
Disable row-slice chunking in UnnestExec only when unnesting an internal __unnest_placeholder(...) input at depth == 1 to handle the stacked-unnest case. Retain the existing fallback for depth > 1. Added a regression test in mod.rs to validate the output order of a two-stage unnest pipeline.
Add UnnestExec::disable_chunking_for_stacked_unnest to
centralize detection logic for chunking. Introduce a helper
function is_internal_unnest_placeholder to streamline checks.
Replace inline "__unnest_placeholder(" verification in
execute with calls to the new disable_chunking method.
Simplify `PendingBatch::take_next_slice` to only manage state and slice. Introduce `next_pending_slice_row_count` to streamline policy selection, encapsulating the `disable_chunking_for_stacked_unnest` logic. Update `build_next_pending_batch` to compute row count using the new helper and pass it to `PendingBatch::take_next_slice`.
Implement a reusable function to count nodes by operator name in the physical plan. Update test `unnest_chunks_stacked_unnest_preserves_order` to build the physical plan directly from the query DataFrame. Assert the presence of exactly 2 `UnnestExec` nodes using the new helper, and reuse the same DataFrame for result collection. This change enhances test stability by removing reliance on formatted EXPLAIN output, minimizing issues with formatting.
Add a dedicated module for unnest chunking tests in datafusion/core/tests/dataframe. Move relevant helper functions and tests to a new focused file, ensuring mod.rs remains cleaner. This organizes the test suite for chunking into a more cohesive structure.
Simplify control flow in unnest.rs by inlining redundant state transitions and removing unused helpers. Reduce code duplication in unnest_chunks.rs by extracting nested-list batch construction into a single reusable helper for both recursive and stacked-unnest tests.
Add a shared unnest-placeholder marker in unnest.rs with helper functions for creation and inspection, re-exported from lib.rs. Update the planner in utils.rs to tag internal placeholder aliases with metadata during inner projection. Modify UnnestExec to check input field metadata using the shared helper instead of relying on string matching. Also, implement a unit test in unnest.rs to confirm that chunking is disabled via metadata, removing the brittle string dependency.
Include a test helper for expected placeholder aliases with metadata. Update `test_transform_bottom_unnest_recursive` to convert inner projection aliases to fields and assert `is_unnest_placeholder_field(...)` before physical planning. Also, revise existing planner test expectations to align with the new metadata-bearing placeholder aliases.
Simplify build_batch by removing the UnnestingResult wrapper and computing max_recursion more idiomatically. Flatten and sort the unnested outputs directly before regrouping by the original column index. Reduce duplicate scanning in list_unnest_at_level by creating repeat_mask in a single pass per column. Optimize flatten_struct_cols using HashSet::contains for direct checks.
UnnestExec output batches to bound memory usage and preserve stacked unnest semantics
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Rationale for this change
unnestcurrently builds output for an entire input batch before emitting any results. For high-fanout list columns, this can cause very large intermediate batches and substantial memory growth, especially in query shapes likeunnest + group bywhere downstream operators cannot make progress until the expanded batch is produced.Issue #20788 shows this clearly: unnesting large array columns can expand a modest input into tens of millions of rows, driving memory usage far beyond what users expect from a streaming execution engine.
This change improves the physical
unnestexecution path so it emits smaller, size-bounded output batches instead of materializing the full expansion of each input batch at once. The goal is to reduce peak memory usage while preserving existing query results and ordering behavior, including recursive and stackedunnestcases.What changes are included in this PR?
This PR introduces chunked output production for
UnnestExecand adds planner/executor coordination for stackedunnesthandling.Key changes include:
Add planner-internal field metadata helpers in
datafusion_common::unnestto mark unnest placeholder columns:UNNEST_PLACEHOLDER_METADATA_KEYunnest_placeholder_field_metadatais_unnest_placeholder_fieldExport those helpers from
datafusion_common::libfor reuse across crates.Update SQL recursive-unnest rewriting to attach placeholder metadata using
alias_with_metadata(...)rather than relying only on placeholder names.Extend
UnnestExecwith chunked draining of input batches:batch_sizeAdd conservative fallback behavior for recursive unnest (
depth > 1) by processing one input row at a time, avoiding underestimation of output expansion.Detect stacked
unnestinputs via placeholder field metadata and disable chunking in that specific case to preserve expected ordering/semantics.Refactor some internal unnest code for clarity and maintainability:
as_list_array_typeAdd regression and behavior tests covering:
preserve_nulls = falseunnestorderingAre these changes tested?
Yes.
This PR adds dataframe-level regression tests in
datafusion/core/tests/dataframe/unnest_chunks.rsthat validate both correctness and chunk sizing behavior under small batch-size settings. These tests cover:preserve_nullsis disabledunnestpreserving output orderIt also adds executor/unit coverage in
datafusion/physical-plan/src/unnest.rsto verify stacked-unnest detection via field metadata.Are there any user-facing changes?
Yes, in behavior/performance characteristics, but not in SQL syntax.
Users should see lower peak memory usage and more streaming-friendly execution for queries that unnest large list columns, because
UnnestExecnow emits bounded output batches instead of expanding an entire input batch at once.There is also a small public API surface addition in
datafusion_common, where unnest placeholder metadata helpers/constants are now exported for cross-crate coordination. This is intended for internal planner/executor integration rather than general end-user usage.LLM-generated code disclosure
This PR includes LLM-generated code and comments. All LLM-generated content has been manually reviewed and tested.