Fix Iceberg _pos to be file-global instead of task-local on split files#14808
Fix Iceberg _pos to be file-global instead of task-local on split files#14808res-life wants to merge 14 commits into
Conversation
Signed-off-by: Chong Gao <res_life@163.com>
|
build |
Greptile SummaryThis PR fixes a data-correctness bug where Iceberg
Confidence Score: 5/5This PR is safe to merge. It fixes a real data-corruption bug in Iceberg split-file reads, uses resource-safe wrappers throughout, and the OOM-retry hardening is correctly structured at both the action and batch levels. The two-level OOM protection (locals-first in FetchRowPosition.execute plus snapshot/restore at the withRetryNoSplit boundary) is logically complete. The second-reader path for global row-position lookup is correctly guarded to run only when _pos is actually projected and the file is split, and both split and non-split paths are covered by the new unit and integration tests. No files require special attention — the logic in reader.scala and GpuParquetReaderPostProcessor.scala is well-covered by the new unit and integration tests. Important Files Changed
Sequence DiagramsequenceDiagram
participant RF as GpuReaderFactory
participant FPB as filterParquetBlocks
participant RR as RangedReader
participant FR as FullReader
participant PP as PostProcessor
RF->>RF: hasRowPositionMetadata check
alt _pos projected
RF->>FPB: filterParquetBlocks(file, requiredSchema)
FPB->>RR: file.newReader with range
RR-->>FPB: filteredBlocks (range-only row groups)
FPB->>FR: "file.copy(split=None).newReader"
FR-->>FPB: getFooter.getBlocks (all row groups)
FPB->>FPB: build startingPos to fileGlobalFirstRow map
FPB->>FPB: lookup each filteredBlock by startingPos
FPB-->>PP: blocksFirstRowIndices with global offsets
PP->>PP: FetchRowPosition emits file-global _pos
else no _pos
FPB->>RR: file.newReader with range
RR-->>FPB: filteredBlocks
FPB->>FPB: accumulate task-local indices
FPB-->>PP: blocksFirstRowIndices task-local
end
Note over PP: OOM Retry
PP->>PP: snapshot curBlockIndex and counters
PP->>PP: withRetryNoSplit restores snapshot on retry
PP->>PP: FetchRowPosition advances locals, commits after fromLongs
Reviews (8): Last reviewed commit: "Fix _pos retry test field id collision w..." | Re-trigger Greptile |
…acker id - reader.scala: replace the bug-history comment with a brief description of the current logic. - GpuParquetReaderPostProcessor.scala: name the enclosing withRetryNoSplit block instead of an unattributed "lambda". - iceberg_test.py: drop internal tracker id from the regression test comment and describe the scenario the test exercises rather than reader internals. Signed-off-by: Chong Gao <res_life@163.com>
Exercises the silent-corruption path on Merge-on-Read tables directly: single Parquet data file split across multiple scan tasks plus a positional delete file. The query does not project _pos; the Iceberg reader injects ROW_POSITION into the read schema to match against the delete list, so an incorrect _pos would silently drop or resurface rows without any user projection of the metadata column. The test asserts the GPU result matches CPU over the surviving rows. Signed-off-by: Chong Gao <res_life@163.com>
A bare HashMap lookup on the filtered block's getStartingPos throws NoSuchElementException with only the offset, leaving distributed-job operators without a file path to diagnose against. Use getOrElse and raise an IllegalStateException naming the file and offset instead. Signed-off-by: Chong Gao <res_life@163.com>
…ile-global The previous fix used reader.getFooter on the same ParquetFileReader that was opened with ParquetReadOptions.withRange(start, end) for split scan tasks. ParquetFileReader.readFooter applies the range filter while parsing the footer, so getFooter().getBlocks() on a ranged reader returns only the row groups inside the range. Accumulating row counts over that filtered list reintroduces the task-local first-row indices the original commit was meant to eliminate — visible directly when the data file has multiple row groups and Iceberg's planner splits it across tasks, and indirectly on MoR reads where the resulting wrong _pos values mismatch the positional delete file and drop or resurface rows silently. When file.split is defined, open a separate reader without the range to enumerate every row group in the file with its absolute byte offset (getStartingPos), build the file-global first-row-index map from that, then look up the ranged reader's filtered blocks against it. When file has no split, fall back to the existing single-reader path. Signed-off-by: Chong Gao <res_life@163.com>
FetchRowPosition.execute already advances state in locals and commits to the processor only after CudfColumnVector.fromLongs succeeds, but the wrapping withRetryNoSplit covers the entire safeMap iteration in GpuParquetReaderPostProcessor.process. A later field action (UpCast, FillNull, GpuColumnVector.from, ...) in the same iteration can OOM and trigger a retry of the whole block, at which point execute() would run again against already-advanced counters and emit _pos values off by numRows. Snapshot curBlockIndex / processedRowCount / processedBlockRowCounts before entering withRetryNoSplit and restore them at the top of each attempt, so the retry restarts from the same processor state regardless of which action inside the block raised the OOM. Tighten the execute() comment to describe the local-commit contract rather than claim full retry safety in isolation. Signed-off-by: Chong Gao <res_life@163.com>
Existing GpuPostProcessorSuite cases always build a ParquetFileInfoWithBlockMeta with a single block whose first-row index is 0, so they never exercise the multi-block state-traversal logic in FetchRowPosition.execute (incrementing localBlockIndex, picking blocksFirstRowIndices(localBlockIndex), accumulating processedBlockRowCounts across blocks). The integration tests cover this end-to-end but are slow to run. Add a createMultiBlockParquetInfo helper that builds non-zero-based blocksFirstRowIndices and a new test that drives the processor across two blocks via three successive process() calls. The test asserts file-global _pos values 500..799, 800..999, 1000..1299 — directly catching regressions where _pos restarts at the task-local 0 or fails to advance into the next block. Signed-off-by: Chong Gao <res_life@163.com>
|
build |
filterParquetBlocks now opens the second range-less Parquet reader only when the required schema actually projects ROW_POSITION (either by the user or by a positional delete filter). Ordinary Iceberg scans go back to the cheap ranged-reader accounting and avoid the extra footer fetch that was paid on every data-file scan task. Signed-off-by: Chong Gao <res_life@163.com>
GpuCoalescingIcebergParquetReader.checkIfNeedToSplitDataBlock now splits whenever two adjacent blocks carry distinct GpuParquetReaderPostProcessor instances. Each Iceberg split owns its own post-processor with private block metadata and _pos counters, so two splits of the same physical Parquet file share a file path but must not be coalesced under one finalize call — the first split's post-processor would otherwise emit wrong _pos values and index past its block array for rows that belong to the second split. Signed-off-by: Chong Gao <res_life@163.com>
|
build |
|
NOTE: release/26.06 has been created from main. Please retarget your PR to release/26.06 if it should be included in the release. |
… splits The earlier "Split coalescing chunks per post-processor identity" check was effectively dead code: GpuMultiFileReader.populateCurrentBlockChunk only calls checkIfNeedToSplitDataBlock when the next block's filePath differs from the current chunk's filePath. Two Iceberg splits of the same physical Parquet file share their real path, so the parent skipped the override and appended the next block straight into the chunk while extraInfo (set once at currentFile == null) stayed pinned to the first split's post-processor. Finalize then ran P0 over rows from P1 — the original _pos failure shape was still live. Tag the path used by the parent equality check with the Iceberg split range when building ParquetSingleDataBlockMeta. Same-file same-split blocks still share the synthetic path and coalesce normally; same-file different-split blocks now present as different "files" to the parent, hit the path-change branch, invoke the override, and the postProcessor identity check forces a chunk split where the new chunk picks up its own extraInfo. The tag is only used by the parent for in-memory equality (currentFile in populateCurrentBlockChunk); the real path is preserved on the post-processor and remains the value materialized for the _file metadata column. Signed-off-by: Chong Gao <res_life@163.com>
|
build |
… _pos retry test Three follow-ups to PR review on the Iceberg _pos split-file fix: 1. GpuCoalescingIcebergParquetReader: the block comment around the synthetic split-tagged path claimed the tagged path was only used for the parent coalescer's equality checks. That was wrong — it is also stored as ParquetSingleDataBlockMeta.filePath, used as a key in the parent's per-file block map, logged, and passed to fileIO.newInputFile when blocks are read. The file still opens correctly because Hadoop FileSystem implementations strip the URI fragment when resolving the underlying path, but that is an implicit invariant. Rewrite the comment to document the full propagation, the FileSystem-fragment assumption it depends on, and the FileCache side effect (per-split cache keys, no cross-split sharing). Also point at the actual class name (MultiFileCoalescingPartitionReaderBase) instead of the file name. Behavior unchanged. 2. GpuPostProcessorSuite "FetchRowPosition emits file-global _pos across multiple blocks": the original batch shape (300 / 200 / 300 rows against blocks of 500 and 400) consumed block 0 in whole-batch increments, so no single process() call ever spanned a block boundary. The inner `if (curRowPos >= curBlockRowEnd)` branch in FetchRowPosition.execute was never reached mid-loop, leaving a mutation like `>` for `>=` undetected. Restructure the batches to 300 / 100 / 350 / 150, so the third batch straddles block 0 → block 1, and tighten assertPosRange to verify every element of each batch rather than just the first and last — a middle-row off-by-one would otherwise slip through. 3. Add GpuPostProcessorRetrySuite to exercise the snapshot/restore of FetchRowPosition counters around withRetryNoSplit in GpuParquetReaderPostProcessor.process. The fix exists because FetchRowPosition commits its counter advance to the processor as soon as fromLongs() succeeds, but a later field action in the same safeMap iteration (UpCast, FillNull, GpuColumnVector.from, ...) can OOM and cause withRetryNoSplit to rerun the whole block — without a pre-block snapshot/restore, the rerun would see already-advanced counters and emit _pos off by numRows. The new test projects _pos plus a missing optional field (which lowers to a FillNull action), injects a single GPU retry-OOM with skipCount=1 so FetchRowPosition.fromLongs succeeds and the next allocation (FillNull) fails, then asserts the produced _pos column matches the pre-retry expected sequence. A follow-up batch with no injected OOM confirms the counters end up where a single successful 300-row call would have left them — catching either a missing restore or an accidental double restore. Signed-off-by: Chong Gao <res_life@163.com>
The prior "tag the path with the split range" approach broke regular Iceberg scans: Hadoop FileSystem implementations (including the local filesystem) do NOT strip URI fragments when resolving the underlying file, so the tagged path `…parquet#iceberg-split=…` flowed through ParquetSingleDataBlockMeta into fileIO.newInputFile and failed with FileNotFoundException on every coalesced read. test_iceberg_parquet_read_round_trip_all_types[COALESCING] reproduced this on local files. Back out the synthetic-path mechanism and instead route _pos-projecting scans away from the coalescing reader at the factory: - GpuReaderFactory.calcThreadConf: add `!hasRowPositionMetadata` to canUseCoalescing. Positional-delete and eq-delete cases are already excluded by hasNoDeletes, so this closes the only remaining path where coalescing could see ROW_POSITION in the required schema. The multi-thread and single-file readers finalize per IcebergPartitionedFile via findIcebergFile, so each split's post-processor is the one that finalizes its rows — same-file multi-split _pos is correct there. - GpuCoalescingIcebergParquetReader.createParquetReader: revert to using info.filePath directly in ParquetSingleDataBlockMeta. Keep the postProcessor-identity check in checkIfNeedToSplitDataBlock as a defense-in-depth safety net (and update its comment to reflect that the _pos corruption it originally guarded is now prevented upstream by the factory routing change). Signed-off-by: Chong Gao <res_life@163.com>
The second projected field in GpuPostProcessorRetrySuite used `rowPosId + 1` for its field id. ROW_POSITION.fieldId() is Integer.MAX_VALUE - 2 and FILE_PATH.fieldId() is Integer.MAX_VALUE - 1, so `rowPosId + 1` collided with FILE_PATH. ActionBuildingVisitor therefore lowered the second field to FetchFilePath, not FillNull, and the test was no longer exercising the intended "later FillNull allocation OOMs after FetchRowPosition committed counter state" retry path documented in the comment. Use an ordinary non-metadata field id (1) so the second field stays optional/missing/non-constant and lowers to FillNull as intended. Signed-off-by: Chong Gao <res_life@163.com>
|
build |
Fixes #14807.
Description
When Iceberg splits a single Parquet data file across multiple read tasks, GPU-side
_posvalues were computed task-locally starting from 0 instead of as the file-global row position. On CoW reads this produces wrong_poscolumn values; on MoR reads with positional deletes it causes silent data corruption because the wrong_posis then matched against the delete file's position list.Root cause
reader.getRowGroups()returns row groups filtered by the split's byte range (set viaParquetReadOptions.withRange).ParquetFileReader.readFooteralso applies the range filter while parsing the footer, sogetFooter().getBlocks()on a ranged reader returns only the range's row groups too. Accumulating cumulative row counts over a range-filtered block list yields task-local offsets starting at 0.Fix
Open a second
ParquetFileReaderwithout the range to enumerate every row group in the file, build a map from absolute byte offset (getStartingPos) to file-global first-row index, then look up each filtered block via itsgetStartingPos. BothFetchRowPosition(the row-position metadata column) and positional-delete matching now see absolute positions within the file regardless of split.Also makes
FetchRowPosition.executeretry-safe underwithRetryNoSplit: the per-block state (curBlockIndex,processedRowCount,processedBlockRowCounts) is advanced in locals and committed to the processor only afterCudfColumnVector.fromLongssucceeds, so an OOM in column allocation does not leave the processor in a partially-advanced state.Test plan
test_iceberg_read_pos_with_split_fileinintegration_tests/src/main/python/iceberg/iceberg_test.pyforces a single small file to be split across multiple scan tasks viawrite.parquet.row-group-size-bytes = 4096,read.split.target-size = 4096,read.split.open-file-cost = 0, and asserts GPU_posmatches CPU_posover all 1500 rows.test_iceberg_read_mor_with_pos_deletes_split_fileexercises the silent-corruption path directly: v2 Merge-on-Read table with positional delete files under the same split conditions; asserts GPU and CPU return the same surviving rows afterDELETE.Checklists
Documentation
Testing
(Please provide the names of the existing tests in the PR description.)
Performance
Signed-off-by: Chong Gao chongg@nvidia.com