Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,32 +165,41 @@ private[iceberg] case object FetchRowPosition extends ColumnAction {
val numRows = ctx.numRows
val rowPoses = new Array[Long](numRows)
val processor = ctx.processor
var curBlockRowCount = processor.parquetInfo.blocks(processor.curBlockIndex).getRowCount
var curBlockRowStart =
processor.parquetInfo.blocksFirstRowIndices(processor.curBlockIndex)

// The wrapping withRetryNoSplit may rerun this lambda after an OOM. Keep all advancing
Comment thread
res-life marked this conversation as resolved.
Outdated
// state in locals here and commit back to the processor only after fromLongs() succeeds,
// so a retry restarts from the same processor state.
var localBlockIndex = processor.curBlockIndex
var localProcessedRowCount = processor.processedRowCount
var localProcessedBlockRowCounts = processor.processedBlockRowCounts

var curBlockRowCount = processor.parquetInfo.blocks(localBlockIndex).getRowCount
var curBlockRowStart = processor.parquetInfo.blocksFirstRowIndices(localBlockIndex)
var curBlockRowEnd = curBlockRowStart + curBlockRowCount
var curRowPos = curBlockRowStart + processor.processedRowCount -
processor.processedBlockRowCounts
var curRowPos = curBlockRowStart + localProcessedRowCount - localProcessedBlockRowCounts

for (i <- 0 until numRows) {
if (curRowPos >= curBlockRowEnd) {
// switch to next block
processor.curBlockIndex += 1
processor.processedBlockRowCounts += curBlockRowCount
curRowPos = processor.parquetInfo.blocksFirstRowIndices(processor.curBlockIndex)
localBlockIndex += 1
localProcessedBlockRowCounts += curBlockRowCount
curRowPos = processor.parquetInfo.blocksFirstRowIndices(localBlockIndex)

curBlockRowCount = processor.parquetInfo.blocks(processor.curBlockIndex).getRowCount
curBlockRowStart =
processor.parquetInfo.blocksFirstRowIndices(processor.curBlockIndex)
curBlockRowCount = processor.parquetInfo.blocks(localBlockIndex).getRowCount
curBlockRowStart = processor.parquetInfo.blocksFirstRowIndices(localBlockIndex)
curBlockRowEnd = curBlockRowStart + curBlockRowCount
}

rowPoses(i) = curRowPos
curRowPos += 1
processor.processedRowCount += 1
localProcessedRowCount += 1
}

CudfColumnVector.fromLongs(rowPoses: _*)
val result = CudfColumnVector.fromLongs(rowPoses: _*)
processor.curBlockIndex = localBlockIndex
processor.processedRowCount = localProcessedRowCount
processor.processedBlockRowCounts = localProcessedBlockRowCounts
result
}

override def display(indent: Int): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,29 @@ trait GpuIcebergParquetReader extends Iterator[ColumnarBatch] with AutoCloseable
withResource(file.newReader(conf.metrics)) { reader =>
val fileSchema = reader.getFileMetaData.getSchema

val rowGroupFirstRowIndices = new Array[Long](reader.getRowGroups.size())
// Build file-global first-row index per row group from the FULL footer.
// reader.getRowGroups() is filtered by the split's byte range (set via
// ParquetReadOptions.withRange), so accumulating over it would produce
// task-local offsets starting at 0. That feeds wrong _pos values into
// FetchRowPosition and, more critically, into positional-delete matching
// on MoR reads, causing silent data corruption. The footer's getBlocks()
// is the unfiltered list, so the offsets here are absolute within the
// file. Key by getStartingPos so we can look up each filtered block.
Comment thread
res-life marked this conversation as resolved.
Outdated
val firstRowIndexByStartingPos =
scala.collection.mutable.HashMap.empty[Long, Long]
var accumulatedRowCount = 0L
for (i <- 0 until reader.getRowGroups.size()) {
rowGroupFirstRowIndices(i) = accumulatedRowCount
accumulatedRowCount += reader.getRowGroups.get(i).getRowCount
val allBlocks = reader.getFooter.getBlocks
var i = 0
while (i < allBlocks.size()) {
val b = allBlocks.get(i)
firstRowIndexByStartingPos(b.getStartingPos) = accumulatedRowCount
accumulatedRowCount += b.getRowCount
i += 1
}
val (typeWithIds, fileReadSchema) = projectSchema(fileSchema, requiredSchema)
val filteredBlocks = filterRowGroups(reader, requiredSchema, typeWithIds, file.filter)
val blockFirstRowIndices = filteredBlocks.map(b => rowGroupFirstRowIndices(b._2))
val blockFirstRowIndices = filteredBlocks.map(b =>
firstRowIndexByStartingPos(b._1.getStartingPos))
Comment thread
res-life marked this conversation as resolved.
Outdated
val blocks = clipBlocksToSchema(fileReadSchema, filteredBlocks.map(_._1))

val sqlConf = SQLConf.get
Expand Down
25 changes: 25 additions & 0 deletions integration_tests/src/main/python/iceberg/iceberg_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,31 @@ def setup_iceberg_table(spark):
conf={'spark.rapids.sql.format.parquet.reader.type': reader_type})


@iceberg
@ignore_order(local=True)
@pytest.mark.parametrize('reader_type', rapids_reader_types)
Comment thread
res-life marked this conversation as resolved.
def test_iceberg_read_pos_with_split_file(spark_tmp_table_factory, reader_type):
# Regression for nvbug 6174911: when Iceberg splits a single data file
# across multiple read tasks on row-group byte boundaries, _pos must remain
# the file-global row position, not task-local. Three table properties
Comment thread
res-life marked this conversation as resolved.
Outdated
# together force the split on a tiny dataset: small row-group size so the
# file has many row groups, a tiny split target, and zero per-file open
# cost so the planner is willing to split a small file.
table = get_full_table_name(spark_tmp_table_factory)
def setup_iceberg_table(spark):
spark.sql(f"CREATE TABLE {table} (id BIGINT) USING ICEBERG {_NO_FANOUT}")
spark.sql(
f"ALTER TABLE {table} SET TBLPROPERTIES ("
"'write.parquet.row-group-size-bytes' = '4096', "
"'read.split.target-size' = '4096', "
"'read.split.open-file-cost' = '0')")
spark.range(0, 1500).coalesce(1).writeTo(table).append()
with_cpu_session(setup_iceberg_table)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(f"SELECT id, _pos FROM {table}"),
conf={'spark.rapids.sql.format.parquet.reader.type': reader_type})


@iceberg
@ignore_order(local=True)
@pytest.mark.parametrize('reader_type', rapids_reader_types)
Expand Down
Loading