Skip to content

Commit 274fe46

Browse files
committed
Merge remote-tracking branch 'upstream/branch-25.10' into ray/13471
2 parents 4384bf3 + 6f3627a commit 274fe46

16 files changed

Lines changed: 808 additions & 274 deletions

File tree

delta-lake/delta-33x/src/main/scala/com/nvidia/spark/rapids/delta/delta33x/Delta33xProvider.scala

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@ import org.apache.hadoop.fs.Path
2323
import org.apache.spark.sql.SparkSession
2424
import org.apache.spark.sql.connector.catalog.SupportsWrite
2525
import org.apache.spark.sql.delta.{DeltaLog, DeltaParquetFileFormat}
26-
import org.apache.spark.sql.delta.DeltaParquetFileFormat.{IS_ROW_DELETED_COLUMN_NAME, ROW_INDEX_COLUMN_NAME}
26+
import org.apache.spark.sql.delta.DeltaParquetFileFormat.IS_ROW_DELETED_COLUMN_NAME
2727
import org.apache.spark.sql.delta.catalog.{DeltaCatalog, DeltaTableV2}
2828
import org.apache.spark.sql.delta.commands.{DeleteCommand, MergeIntoCommand, OptimizeTableCommand, UpdateCommand}
2929
import org.apache.spark.sql.delta.rapids.DeltaRuntimeShim
30-
import org.apache.spark.sql.delta.sources.DeltaDataSource
30+
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSQLConf}
3131
import org.apache.spark.sql.execution.FileSourceScanExec
3232
import org.apache.spark.sql.execution.command.RunnableCommand
3333
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, SaveIntoDataSourceCommand}
@@ -54,6 +54,10 @@ object Delta33xProvider extends DeltaIOProvider {
5454
write == classOf[DeltaTableV2] || write == classOf[GpuDeltaCatalog#GpuStagedDeltaTableV2]
5555
}
5656

57+
override def isSupportedFormat(format: Class[_ <: FileFormat]): Boolean = {
58+
super.isSupportedFormat(format) || format == classOf[GpuDelta33xParquetFileFormat]
59+
}
60+
5761
override def tagForGpu(
5862
cpuExec: AppendDataExecV1,
5963
meta: AppendDataExecV1Meta): Unit = {
@@ -90,20 +94,14 @@ object Delta33xProvider extends DeltaIOProvider {
9094
override def tagSupportForGpuFileSourceScan(meta: SparkPlanMeta[FileSourceScanExec]): Unit = {
9195
val format = meta.wrapped.relation.fileFormat
9296
if (format.getClass == classOf[DeltaParquetFileFormat]) {
97+
val session = meta.wrapped.session
98+
val useMetadataRowIndex =
99+
session.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_USE_METADATA_ROW_INDEX)
93100
val requiredSchema = meta.wrapped.requiredSchema
94-
if (!meta.conf.isParquetPerFileReadEnabled) {
95-
if (requiredSchema.exists(_.name == IS_ROW_DELETED_COLUMN_NAME)) {
96-
meta.willNotWorkOnGpu(
97-
s"reading metadata column $IS_ROW_DELETED_COLUMN_NAME is supported for PERFILE and " +
98-
" not supported for " +
99-
s"${RapidsReaderType.withName(meta.conf.get(RapidsConf.PARQUET_READER_TYPE))}")
100-
}
101-
if (requiredSchema.exists(_.name == ROW_INDEX_COLUMN_NAME)) {
102-
meta.willNotWorkOnGpu(
103-
s"reading metadata column $ROW_INDEX_COLUMN_NAME is supported for PERFILE and " +
104-
" not supported for " +
105-
s"${RapidsReaderType.withName(meta.conf.get(RapidsConf.PARQUET_READER_TYPE))}")
106-
}
101+
val isRowDeletedCol = requiredSchema.exists(_.name == IS_ROW_DELETED_COLUMN_NAME)
102+
if (useMetadataRowIndex && isRowDeletedCol) {
103+
meta.willNotWorkOnGpu("we don't support generating metadata row index for " +
104+
s"${meta.wrapped.getClass.getSimpleName}")
107105
}
108106
GpuReadParquetFileFormat.tagSupport(meta)
109107
} else {

0 commit comments

Comments
 (0)