Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@ package com.nvidia.spark.rapids

import java.time.ZoneId

import scala.util.Try

import ai.rapids.cudf._
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableProducingArray
import com.nvidia.spark.rapids.jni.DateTimeRebase
import com.nvidia.spark.rapids.jni.fileio.RapidsFileIO
import com.nvidia.spark.rapids.shims._
import com.nvidia.spark.rapids.shims.parquet._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{Job, OutputCommitter, TaskAttemptContext}
import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat}
import org.apache.parquet.hadoop.{ParquetOutputCommitter, ParquetOutputFormat, ParquetWriter}
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel
import org.apache.parquet.hadoop.codec.CodecConfig
import org.apache.parquet.hadoop.util.ContextUtil
Expand All @@ -43,6 +46,28 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

object GpuParquetFileFormat {
private val DefaultParquetBlockSize = ParquetWriter.DEFAULT_BLOCK_SIZE.toLong

private def isDefaultParquetBlockSize(value: String): Boolean =
Try(value.trim.toLong).toOption.contains(DefaultParquetBlockSize)

def parquetBlockSizeWarning(
conf: Configuration,
options: Map[String, String]): Option[String] = {
options.get(ParquetOutputFormat.BLOCK_SIZE)
.orElse(Option(conf.get(ParquetOutputFormat.BLOCK_SIZE)))
.filterNot(isDefaultParquetBlockSize)
.map { value =>
s"${ParquetOutputFormat.BLOCK_SIZE} is set to $value, but the RAPIDS GPU Parquet " +
"writer does not apply Spark CPU writer row group sizing semantics for this setting. " +
s"Set ${RapidsConf.ENABLE_PARQUET_WRITE.key}=false to use Spark CPU Parquet " +
"writer behavior. To tune RAPIDS GPU writer row group sizing, use internal " +
s"configs ${RapidsConf.PARQUET_WRITER_ROW_GROUP_SIZE_ROWS.key} and " +
s"${RapidsConf.PARQUET_WRITER_ROW_GROUP_SIZE_BYTES.key}; these are cuDF-specific " +
s"and are not equivalent to ${ParquetOutputFormat.BLOCK_SIZE}."
}
}

def tagGpuSupport(
meta: RapidsMeta[_, _, _],
spark: SparkSession,
Expand Down Expand Up @@ -186,6 +211,9 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {
val parquetOptions = new ParquetOptions(options, sqlConf)

val conf = ContextUtil.getConfiguration(job)
GpuParquetFileFormat.parquetBlockSizeWarning(conf, options).foreach { warning =>
logWarning(warning)
}

val outputTimestampType = sqlConf.parquetOutputTimestampType
val dateTimeRebaseMode = DateTimeRebaseMode.fromName(
Expand Down Expand Up @@ -280,6 +308,10 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {
// holdGpuBetweenBatches is on by default if asyncOutputWriteEnabled is on
val holdGpuBetweenBatches = RapidsConf.ASYNC_QUERY_OUTPUT_WRITE_HOLD_GPU_IN_TASK.get(sqlConf)
.getOrElse(asyncOutputWriteEnabled)
val parquetWriterRowGroupSizeRows =
RapidsConf.PARQUET_WRITER_ROW_GROUP_SIZE_ROWS.get(sqlConf)
val parquetWriterRowGroupSizeBytes =
RapidsConf.PARQUET_WRITER_ROW_GROUP_SIZE_BYTES.get(sqlConf)

new ColumnarOutputWriterFactory {
override def newInstance(
Expand All @@ -291,16 +323,18 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {
fileIO: RapidsFileIO): ColumnarOutputWriter = {
new GpuParquetWriter(path, dataSchema, compressionType, outputTimestampType.toString,
dateTimeRebaseMode, timestampRebaseMode, context, parquetFieldIdWriteEnabled,
statsTrackers, debugOutputPath, holdGpuBetweenBatches, asyncOutputWriteEnabled, fileIO)
parquetWriterRowGroupSizeRows, parquetWriterRowGroupSizeBytes, statsTrackers,
debugOutputPath, holdGpuBetweenBatches, asyncOutputWriteEnabled, fileIO)
}

override def getFileExtension(context: TaskAttemptContext): String = {
CodecConfig.from(context).getCodec.getExtension + ".parquet"
}

override def partitionFlushSize(context: TaskAttemptContext): Long =
context.getConfiguration.getLong("write.parquet.row-group-size-bytes",
128L * 1024L * 1024L) // 128M
parquetWriterRowGroupSizeBytes.getOrElse(
context.getConfiguration.getLong("write.parquet.row-group-size-bytes",
128L * 1024L * 1024L)) // 128M
}
}
}
Expand All @@ -314,6 +348,8 @@ class GpuParquetWriter(
timestampRebaseMode: DateTimeRebaseMode,
context: TaskAttemptContext,
parquetFieldIdEnabled: Boolean,
parquetWriterRowGroupSizeRows: Option[Integer],
parquetWriterRowGroupSizeBytes: Option[Long],
statsTrackers: Seq[ColumnarWriteTaskStatsTracker],
debugDumpPath: Option[String],
holdGpuBetweenBatches: Boolean,
Expand Down Expand Up @@ -407,6 +443,12 @@ class GpuParquetWriter(
parquetFieldIdEnabled)
.withMetadata(writeContext.getExtraMetaData)
.withCompressionType(compressionType)
parquetWriterRowGroupSizeRows.foreach { rowGroupSizeRows =>
builder.withRowGroupSizeRows(rowGroupSizeRows)
}
parquetWriterRowGroupSizeBytes.foreach { rowGroupSizeBytes =>
builder.withRowGroupSizeBytes(rowGroupSizeBytes)
}
Table.writeParquetChunked(builder.build(), this)
}
}
27 changes: 27 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,27 @@ val GPU_COREDUMP_PIPE_PATTERN = conf("spark.rapids.gpu.coreDump.pipePattern")
.booleanConf
.createWithDefault(true)

val PARQUET_WRITER_ROW_GROUP_SIZE_ROWS =
conf("spark.rapids.sql.format.parquet.writer.rowGroupSizeRows")
.doc("Maximum number of rows in a Parquet row group written by the GPU writer. " +
"This is a best-effort limit because the cuDF writer does not split row groups " +
"below its page fragment granularity. If not set, the cuDF writer default is used.")
.internal()
.integerConf
.checkValue(v => v > 0, "The Parquet writer row group size rows must be positive.")
.createOptional

val PARQUET_WRITER_ROW_GROUP_SIZE_BYTES =
conf("spark.rapids.sql.format.parquet.writer.rowGroupSizeBytes")
.doc("Maximum size in bytes of a Parquet row group written by the GPU writer. " +
"This is a best-effort limit because the cuDF writer does not split row groups " +
"below its page fragment granularity. If not set, the cuDF writer default is used.")
.internal()
.bytesConf(ByteUnit.BYTE)
.checkValue(v => v >= 1024L,
"The Parquet writer row group size bytes must be at least 1024.")
.createOptional

object ParquetFooterReaderType extends Enumeration {
val JAVA, NATIVE, AUTO = Value
}
Expand Down Expand Up @@ -3564,6 +3585,12 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isParquetInt96WriteEnabled: Boolean = get(ENABLE_PARQUET_INT96_WRITE)

lazy val parquetWriterRowGroupSizeRows: Option[Integer] =
get(PARQUET_WRITER_ROW_GROUP_SIZE_ROWS)

lazy val parquetWriterRowGroupSizeBytes: Option[Long] =
get(PARQUET_WRITER_ROW_GROUP_SIZE_BYTES)

lazy val parquetReaderFooterType: ParquetFooterReaderType.Value = {
get(PARQUET_READER_FOOTER_TYPE) match {
case "AUTO" => ParquetFooterReaderType.AUTO
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023-2025, NVIDIA CORPORATION.
* Copyright (c) 2023-2026, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -200,13 +200,20 @@ object GpuHiveFileFormat extends Logging {
}

class GpuHiveParquetFileFormat(compType: CompressionType) extends ColumnarFileFormat
with Serializable {
with Logging with Serializable {

override def prepareWrite(sparkSession: SparkSession, job: Job,
options: Map[String, String], dataSchema: StructType): ColumnarOutputWriterFactory = {

// Avoid referencing the outer object.
val compressionType = compType
GpuParquetFileFormat.parquetBlockSizeWarning(job.getConfiguration, options).foreach {
warning => logWarning(warning)
}
val parquetWriterRowGroupSizeRows =
RapidsConf.PARQUET_WRITER_ROW_GROUP_SIZE_ROWS.get(sparkSession.sessionState.conf)
val parquetWriterRowGroupSizeBytes =
RapidsConf.PARQUET_WRITER_ROW_GROUP_SIZE_BYTES.get(sparkSession.sessionState.conf)
new ColumnarOutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String =
compressionType match {
Expand All @@ -221,8 +228,11 @@ class GpuHiveParquetFileFormat(compType: CompressionType) extends ColumnarFileFo
debugOutputPath: Option[String],
fileIO: RapidsFileIO): ColumnarOutputWriter = {
new GpuHiveParquetWriter(path, dataSchema, context, compressionType, statsTrackers,
debugOutputPath, fileIO)
debugOutputPath, parquetWriterRowGroupSizeRows, parquetWriterRowGroupSizeBytes, fileIO)
}

override def partitionFlushSize(context: TaskAttemptContext): Long =
parquetWriterRowGroupSizeBytes.getOrElse(super.partitionFlushSize(context))
}
}
Comment thread
pmattione-nvidia marked this conversation as resolved.
}
Expand All @@ -231,6 +241,8 @@ class GpuHiveParquetWriter(override val path: String, dataSchema: StructType,
context: TaskAttemptContext, compType: CompressionType,
statsTrackers: Seq[ColumnarWriteTaskStatsTracker],
debugOutputPath: Option[String],
parquetWriterRowGroupSizeRows: Option[Integer],
parquetWriterRowGroupSizeBytes: Option[Long],
fileIO: RapidsFileIO)
extends ColumnarOutputWriter(context, dataSchema, NvtxRegistry.FILE_FORMAT_WRITE, true,
statsTrackers, debugOutputPath, false, false, fileIO) {
Expand All @@ -242,6 +254,12 @@ class GpuHiveParquetWriter(override val path: String, dataSchema: StructType,
writeInt96 = true, // Hive 1.2 write timestamp as INT96
parquetFieldIdEnabled = false)
.withCompressionType(compType)
parquetWriterRowGroupSizeRows.foreach { rowGroupSizeRows =>
optionsBuilder.withRowGroupSizeRows(rowGroupSizeRows)
}
parquetWriterRowGroupSizeBytes.foreach { rowGroupSizeBytes =>
optionsBuilder.withRowGroupSizeBytes(rowGroupSizeBytes)
}
Table.writeParquetChunked(optionsBuilder.build(), this)
}

Expand Down
137 changes: 135 additions & 2 deletions tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,23 @@ package com.nvidia.spark.rapids
import java.io.File
import java.nio.charset.StandardCharsets

import ai.rapids.cudf.CompressionType
import com.nvidia.spark.rapids.shims.SparkShimImpl
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileUtil.fullyDelete
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.hadoop.mapreduce.{Job, JobContext, TaskAttemptContext, TaskAttemptID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetOutputFormat, ParquetWriter}

import org.apache.spark.SparkConf
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.hive.rapids.GpuHiveParquetFileFormat
import org.apache.spark.sql.rapids.BasicColumnarWriteJobStatsTracker
import org.apache.spark.sql.rapids.shims.SparkUpgradeExceptionShims
import org.apache.spark.sql.types.StructType

/**
* Tests for writing Parquet files with the GPU.
Expand Down Expand Up @@ -104,6 +110,109 @@ class ParquetWriterSuite extends SparkQueryCompareTestSuite {
}
}

test("parquet writer row group size rows config") {
// cuDF row group boundaries are page-fragment based, so use a value that is observable.
val rowGroupRows = 5000
val numRows = rowGroupRows * 2 + 2000
val conf = new SparkConf()
.set(RapidsConf.PARQUET_WRITER_ROW_GROUP_SIZE_ROWS.key, rowGroupRows.toString)
.set(RapidsConf.PARQUET_WRITER_ROW_GROUP_SIZE_BYTES.key, "1m")
withGpuSparkSession(spark => {
withTempPath { writePath =>
spark.range(0, numRows, 1, 1).write.mode("overwrite").parquet(writePath.getAbsolutePath)

val rowGroupCounts = getSingleParquetFileRowGroupCounts(spark, writePath)
assert(rowGroupCounts.length > 1, s"Expected multiple row groups, got $rowGroupCounts")
assert(rowGroupCounts.forall(_ <= rowGroupRows.toLong),
s"Expected all row groups <= $rowGroupRows rows, got $rowGroupCounts")
assertResult(numRows.toLong) {
rowGroupCounts.sum
}
}
}, conf)
}

test("hive parquet writer row group size bytes config partition flush size") {
val rowGroupSizeBytes = 1024L
val conf = new SparkConf()
.set(RapidsConf.PARQUET_WRITER_ROW_GROUP_SIZE_BYTES.key, rowGroupSizeBytes.toString)
withGpuSparkSession(spark => {
val job = Job.getInstance(spark.sparkContext.hadoopConfiguration)
val factory = new GpuHiveParquetFileFormat(CompressionType.NONE)
.prepareWrite(spark, job, Map.empty, new StructType())
val attemptId = new TaskAttemptID("job", 0, TaskType.MAP, 0, 0)
val context = new TaskAttemptContextImpl(job.getConfiguration, attemptId)

assertResult(rowGroupSizeBytes) {
factory.partitionFlushSize(context)
}
}, conf)
}

test("parquet writer row group size bytes config") {
val rowGroupRows = 1000000
val rowGroupSizeBytes = 1024L
val bytesPerRow = java.lang.Long.BYTES * 2
val rowGroupByteSizeOverhead = 512L
val conf = new SparkConf()
.set(RapidsConf.PARQUET_WRITER_ROW_GROUP_SIZE_ROWS.key, rowGroupRows.toString)
.set(RapidsConf.PARQUET_WRITER_ROW_GROUP_SIZE_BYTES.key, rowGroupSizeBytes.toString)
withGpuSparkSession(spark => {
withTempPath { writePath =>
spark.range(0, 10000, 1, 1)
.selectExpr("id", "id + 1 as id2")
.write.mode("overwrite")
.parquet(writePath.getAbsolutePath)

val rowGroups = getSingleParquetFileRowGroups(spark, writePath)
assert(rowGroups.length > 1, s"Expected multiple row groups, got $rowGroups")
assert(rowGroups.forall(_.rowCount <= rowGroupRows.toLong),
s"Expected all row groups <= $rowGroupRows rows, got $rowGroups")
// cuDF sizes row groups from uncompressed data estimates; footer sizes
// include page overhead.
assert(rowGroups.forall(_.rowCount * bytesPerRow <= rowGroupSizeBytes),
s"Expected estimated data bytes <= $rowGroupSizeBytes, got $rowGroups")
assert(rowGroups.forall(_.totalByteSize <= rowGroupSizeBytes + rowGroupByteSizeOverhead),
s"Expected row group byte sizes <= $rowGroupSizeBytes plus " +
s"$rowGroupByteSizeOverhead bytes of page overhead, got $rowGroups")
assertResult(10000L) {
rowGroups.map(_.rowCount).sum
}
}
}, conf)
}

test("parquet block size warning") {
val unsetConf = new Configuration(false)
assert(GpuParquetFileFormat.parquetBlockSizeWarning(unsetConf, Map.empty).isEmpty)

val defaultConf = new Configuration(false)
defaultConf.setLong(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE)
assert(GpuParquetFileFormat.parquetBlockSizeWarning(defaultConf, Map.empty).isEmpty)

val defaultOptions = Map(ParquetOutputFormat.BLOCK_SIZE ->
ParquetWriter.DEFAULT_BLOCK_SIZE.toString)
assert(GpuParquetFileFormat.parquetBlockSizeWarning(unsetConf, defaultOptions).isEmpty)

val nonDefaultConf = new Configuration(false)
nonDefaultConf.setLong(ParquetOutputFormat.BLOCK_SIZE,
ParquetWriter.DEFAULT_BLOCK_SIZE.toLong * 2)
val warning = GpuParquetFileFormat.parquetBlockSizeWarning(nonDefaultConf, Map.empty)
assert(warning.exists(_.contains(ParquetOutputFormat.BLOCK_SIZE)))
assert(warning.exists(_.contains(RapidsConf.ENABLE_PARQUET_WRITE.key)))
assert(warning.exists(_.contains(RapidsConf.PARQUET_WRITER_ROW_GROUP_SIZE_ROWS.key)))
assert(warning.exists(_.contains(RapidsConf.PARQUET_WRITER_ROW_GROUP_SIZE_BYTES.key)))
assert(warning.exists(_.contains("not equivalent")))

// Options override the Hadoop conf: a non-default option warns even when the
// conf is set to the default.
val mixedConf = new Configuration(false)
mixedConf.setLong(ParquetOutputFormat.BLOCK_SIZE, ParquetWriter.DEFAULT_BLOCK_SIZE)
val nonDefaultOptions = Map(ParquetOutputFormat.BLOCK_SIZE ->
(ParquetWriter.DEFAULT_BLOCK_SIZE.toLong * 2).toString)
assert(GpuParquetFileFormat.parquetBlockSizeWarning(mixedConf, nonDefaultOptions).nonEmpty)
}

private def listAllFiles(f: File): Array[File] = {
if (f.isFile()) {
Array(f)
Expand All @@ -116,6 +225,30 @@ class ParquetWriterSuite extends SparkQueryCompareTestSuite {
}
}

private def getSingleParquetFileRowGroupCounts(
spark: SparkSession,
parquetPath: File): Seq[Long] = {
getSingleParquetFileRowGroups(spark, parquetPath).map(_.rowCount)
}

private case class ParquetRowGroup(rowCount: Long, totalByteSize: Long)

private def getSingleParquetFileRowGroups(
spark: SparkSession,
parquetPath: File): Seq[ParquetRowGroup] = {
val parquetFiles = listAllFiles(parquetPath).filter(_.getName.endsWith(".parquet"))
assertResult(1) {
parquetFiles.length
}
val footer = ParquetFileReader.readFooters(spark.sparkContext.hadoopConfiguration,
new Path(parquetFiles.head.getAbsolutePath)).get(0)
val blocks = footer.getParquetMetadata.getBlocks
(0 until blocks.size()).map { i =>
val block = blocks.get(i)
ParquetRowGroup(block.getRowCount, block.getTotalByteSize)
}
}

test("set max records per file no partition") {
val conf = new SparkConf()
.set("spark.sql.files.maxRecordsPerFile", "50")
Expand Down