Skip to content

Commit 02226af

Browse files
committed
chore: remove COMET_NATIVE_SCAN_IMPL and related scan-impl constants
Removes the four deprecated symbols from CometConf along with all references in main code, tests, and benchmarks: - COMET_NATIVE_SCAN_IMPL (spark.comet.scan.impl) - SCAN_NATIVE_DATAFUSION - SCAN_NATIVE_ICEBERG_COMPAT - SCAN_AUTO With a single Parquet scan implementation, the scanImpl field on CometScanExec is dropped, CometScanTypeChecker becomes parameterless, and per-impl conditionals in CometScanRule and CometExecRule collapse.
1 parent 5dae9a8 commit 02226af

29 files changed

Lines changed: 155 additions & 426 deletions

benchmarks/tpc/engines/comet-hashjoin.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,5 @@ driver_class_path = ["$COMET_JAR"]
3030
"spark.executor.extraClassPath" = "$COMET_JAR"
3131
"spark.plugins" = "org.apache.spark.CometPlugin"
3232
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
33-
"spark.comet.scan.impl" = "native_datafusion"
3433
"spark.comet.exec.replaceSortMergeJoin" = "true"
3534
"spark.comet.expression.Cast.allowIncompatible" = "true"

benchmarks/tpc/engines/comet.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,4 @@ driver_class_path = ["$COMET_JAR"]
3030
"spark.executor.extraClassPath" = "$COMET_JAR"
3131
"spark.plugins" = "org.apache.spark.CometPlugin"
3232
"spark.shuffle.manager" = "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager"
33-
"spark.comet.scan.impl" = "native_datafusion"
3433
"spark.comet.expression.Cast.allowIncompatible" = "true"

spark/src/main/scala/org/apache/comet/CometConf.scala

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -111,25 +111,6 @@ object CometConf extends ShimCometConf {
111111
.booleanConf
112112
.createWithEnvVarOrDefault("ENABLE_COMET_WRITE", false)
113113

114-
@deprecated
115-
val SCAN_NATIVE_DATAFUSION = "native_datafusion"
116-
117-
@deprecated
118-
val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat"
119-
120-
@deprecated
121-
val SCAN_AUTO = "auto"
122-
123-
@deprecated
124-
val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] = conf("spark.comet.scan.impl")
125-
.category(CATEGORY_TESTING)
126-
.internal()
127-
.doc("This configuration option is deprecated and has no effect on Comet behavior.")
128-
.stringConf
129-
.transform(_.toLowerCase(Locale.ROOT))
130-
.checkValues(Set(SCAN_NATIVE_DATAFUSION, SCAN_AUTO))
131-
.createWithEnvVarOrDefault("COMET_PARQUET_SCAN_IMPL", SCAN_AUTO)
132-
133114
val COMET_ICEBERG_NATIVE_ENABLED: ConfigEntry[Boolean] =
134115
conf("spark.comet.scan.icebergNative.enabled")
135116
.category(CATEGORY_SCAN)

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ case class CometExecRule(session: SparkSession)
258258
private def transform(plan: SparkPlan): SparkPlan = {
259259
def convertNode(op: SparkPlan): SparkPlan = op match {
260260
// Fully native scan for V1
261-
case scan: CometScanExec if scan.scanImpl == CometConf.SCAN_NATIVE_DATAFUSION =>
261+
case scan: CometScanExec =>
262262
convertToComet(scan, CometNativeScan).getOrElse(scan)
263263

264264
// Fully native Iceberg scan for V2 (iceberg-rust path)

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,7 @@ case class CometScanRule(session: SparkSession)
197197
r: HadoopFsRelation,
198198
hadoopConf: Configuration): Option[SparkPlan] = {
199199
if (!COMET_EXEC_ENABLED.get()) {
200-
withInfo(
201-
scanExec,
202-
s"$SCAN_NATIVE_DATAFUSION scan requires ${COMET_EXEC_ENABLED.key} to be enabled")
200+
withInfo(scanExec, s"Native Parquet scan requires ${COMET_EXEC_ENABLED.key} to be enabled")
203201
return None
204202
}
205203
// Disabling the vectorized reader opts into parquet-mr's permissive behavior
@@ -210,7 +208,7 @@ case class CometScanRule(session: SparkSession)
210208
!COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.get()) {
211209
withInfo(
212210
scanExec,
213-
s"$SCAN_NATIVE_DATAFUSION scan is incompatible with " +
211+
s"Native Parquet scan is incompatible with " +
214212
s"${SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key}=false; set " +
215213
s"${COMET_SCAN_ALLOW_DISABLED_PARQUET_VECTORIZED_READER.key}=true to opt in")
216214
return None
@@ -219,7 +217,7 @@ case class CometScanRule(session: SparkSession)
219217
return None
220218
}
221219
if (encryptionEnabled(hadoopConf) && !isEncryptionConfigSupported(hadoopConf)) {
222-
withInfo(scanExec, s"$SCAN_NATIVE_DATAFUSION does not support encryption")
220+
withInfo(scanExec, "Native Parquet scan does not support encryption")
223221
return None
224222
}
225223
if (scanExec.fileConstantMetadataColumns.nonEmpty) {
@@ -244,10 +242,10 @@ case class CometScanRule(session: SparkSession)
244242
withInfo(scanExec, "Native DataFusion scan does not support row index generation")
245243
return None
246244
}
247-
if (!isSchemaSupported(scanExec, SCAN_NATIVE_DATAFUSION, r)) {
245+
if (!isSchemaSupported(scanExec, r)) {
248246
return None
249247
}
250-
Some(CometScanExec(scanExec, session, SCAN_NATIVE_DATAFUSION))
248+
Some(CometScanExec(scanExec, session))
251249
}
252250

253251
private def transformV2Scan(scanExec: BatchScanExec): SparkPlan = {
@@ -313,7 +311,7 @@ case class CometScanRule(session: SparkSession)
313311
return withInfos(scanExec, fallbackReasons.toSet)
314312
}
315313

316-
val typeChecker = CometScanTypeChecker(SCAN_NATIVE_DATAFUSION)
314+
val typeChecker = CometScanTypeChecker()
317315
val schemaSupported =
318316
typeChecker.isSchemaSupported(scanExec.scan.readSchema(), fallbackReasons)
319317

@@ -670,48 +668,40 @@ case class CometScanRule(session: SparkSession)
670668
case _ => false
671669
}
672670

673-
private def isSchemaSupported(
674-
scanExec: FileSourceScanExec,
675-
scanImpl: String,
676-
r: HadoopFsRelation): Boolean = {
671+
private def isSchemaSupported(scanExec: FileSourceScanExec, r: HadoopFsRelation): Boolean = {
677672
val fallbackReasons = new ListBuffer[String]()
678-
val typeChecker = CometScanTypeChecker(scanImpl)
673+
val typeChecker = CometScanTypeChecker()
679674
val schemaSupported =
680675
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
681676
if (!schemaSupported) {
682677
withInfo(
683678
scanExec,
684-
s"Unsupported schema ${scanExec.requiredSchema} " +
685-
s"for $scanImpl: ${fallbackReasons.mkString(", ")}")
679+
s"Unsupported schema ${scanExec.requiredSchema}: ${fallbackReasons.mkString(", ")}")
686680
return false
687681
}
688682
val partitionSchemaSupported =
689683
typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
690684
if (!partitionSchemaSupported) {
691685
withInfo(
692686
scanExec,
693-
s"Unsupported partitioning schema ${scanExec.requiredSchema} " +
694-
s"for $scanImpl: ${fallbackReasons
695-
.mkString(", ")}")
687+
s"Unsupported partitioning schema ${scanExec.requiredSchema}: " +
688+
fallbackReasons.mkString(", "))
696689
return false
697690
}
698691
true
699692
}
700693
}
701694

702-
case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with CometTypeShim {
703-
704-
// this class is intended to be used with a specific scan impl
705-
assert(scanImpl != CometConf.SCAN_AUTO)
695+
case class CometScanTypeChecker() extends DataTypeSupport with CometTypeShim {
706696

707697
override def isTypeSupported(
708698
dt: DataType,
709699
name: String,
710700
fallbackReasons: ListBuffer[String]): Boolean = {
711701
dt match {
712702
case ShortType if CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.get() =>
713-
fallbackReasons += s"$scanImpl scan may not handle unsigned UINT_8 correctly for $dt. " +
714-
s"Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key}=false to allow " +
703+
fallbackReasons += s"Native Parquet scan may not handle unsigned UINT_8 correctly for " +
704+
s"$dt. Set ${CometConf.COMET_PARQUET_UNSIGNED_SMALL_INT_CHECK.key}=false to allow " +
715705
"native execution if your data does not contain unsigned small integers. " +
716706
CometConf.COMPAT_GUIDE
717707
false
@@ -722,9 +712,9 @@ case class CometScanTypeChecker(scanImpl: String) extends DataTypeSupport with C
722712
case s: StructType if isVariantStruct(s) =>
723713
// Spark 4.0's PushVariantIntoScan rewrites a VariantType column into a struct of typed
724714
// fields plus per-field VariantMetadata, expecting the scan to honor Parquet variant
725-
// shredding semantics. Comet's native scans don't, so fall back to Spark.
715+
// shredding semantics. Comet's native scan does not, so fall back to Spark.
726716
fallbackReasons +=
727-
s"Unsupported $name of type VariantType (shredded; not supported by $scanImpl scan)"
717+
s"Unsupported $name of type VariantType (shredded; not supported by native scan)"
728718
false
729719
case s: StructType if s.fields.isEmpty =>
730720
false

spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -42,22 +42,15 @@ import org.apache.spark.sql.types._
4242
import org.apache.spark.sql.vectorized.ColumnarBatch
4343
import org.apache.spark.util.collection._
4444

45-
import org.apache.comet.{CometConf, MetricsSupport}
45+
import org.apache.comet.MetricsSupport
4646
import org.apache.comet.parquet.CometParquetFileFormat
4747

4848
/**
49-
* Comet physical scan node for DataSource V1. Most of the code here follow Spark's
50-
* [[FileSourceScanExec]].
51-
*
52-
* This is a hybrid scan where the native plan will contain a `ScanExec` that reads batches of
53-
* data from the JVM via JNI. The ultimate source of data may be a JVM implementation such as
54-
* Spark readers, or could be the `native_iceberg_compat` native scan.
55-
*
56-
* Note that scanImpl can only be `native_datafusion` after CometScanRule runs and before
57-
* CometExecRule runs. It will never be set to `native_datafusion` at execution time
49+
* Comet physical scan node for DataSource V1. Most of the code here follows Spark's
50+
* [[FileSourceScanExec]]. After CometScanRule runs, this node is replaced by a fully native scan
51+
* by CometExecRule; it does not survive to execution time.
5852
*/
5953
case class CometScanExec(
60-
scanImpl: String,
6154
@transient relation: HadoopFsRelation,
6255
output: Seq[Attribute],
6356
requiredSchema: StructType,
@@ -72,10 +65,8 @@ case class CometScanExec(
7265
with ShimCometScanExec
7366
with CometPlan {
7467

75-
assert(scanImpl != CometConf.SCAN_AUTO)
76-
7768
override val nodeName: String =
78-
s"CometScan [$scanImpl] $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}"
69+
s"CometScan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}"
7970

8071
// FIXME: ideally we should reuse wrapped.supportsColumnar, however that fails many tests
8172
override lazy val supportsColumnar: Boolean =
@@ -154,18 +145,13 @@ case class CometScanExec(
154145
}
155146

156147
/**
157-
* Returns the data filters that are supported for this scan implementation. For
158-
* native_datafusion scans, this excludes dynamic pruning filters (subqueries) and null checks
159-
* on array columns (see [[isNullCheckOnArrayColumn]]).
148+
* Returns the data filters that are supported for this scan. Excludes dynamic pruning filters
149+
* (subqueries) and null checks on array columns (see [[isNullCheckOnArrayColumn]]).
160150
*/
161151
lazy val supportedDataFilters: Seq[Expression] = {
162-
if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION) {
163-
dataFilters
164-
.filterNot(isDynamicPruningFilter)
165-
.filterNot(isNullCheckOnArrayColumn)
166-
} else {
167-
dataFilters
168-
}
152+
dataFilters
153+
.filterNot(isDynamicPruningFilter)
154+
.filterNot(isNullCheckOnArrayColumn)
169155
}
170156

171157
/**
@@ -516,7 +502,6 @@ case class CometScanExec(
516502

517503
override def doCanonicalize(): CometScanExec = {
518504
CometScanExec(
519-
scanImpl,
520505
relation,
521506
output.map(QueryPlan.normalizeExpressions(_, output)),
522507
requiredSchema,
@@ -534,10 +519,7 @@ case class CometScanExec(
534519

535520
object CometScanExec {
536521

537-
def apply(
538-
scanExec: FileSourceScanExec,
539-
session: SparkSession,
540-
scanImpl: String): CometScanExec = {
522+
def apply(scanExec: FileSourceScanExec, session: SparkSession): CometScanExec = {
541523
// TreeNode.mapProductIterator is protected method.
542524
def mapProductIterator[B: ClassTag](product: Product, f: Any => B): Array[B] = {
543525
val arr = Array.ofDim[B](product.productArity)
@@ -563,7 +545,6 @@ object CometScanExec {
563545
val newArgs = mapProductIterator(scanExec, transform)
564546
val wrapped = scanExec.makeCopy(newArgs).asInstanceOf[FileSourceScanExec]
565547
val batchScanExec = CometScanExec(
566-
scanImpl,
567548
wrapped.relation,
568549
wrapped.output,
569550
wrapped.requiredSchema,

spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -985,18 +985,16 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp
985985

986986
test("size - respect to legacySizeOfNull") {
987987
val table = "t1"
988-
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) {
989-
withTable(table) {
990-
sql(s"create table $table(col array<string>) using parquet")
991-
sql(s"insert into $table values(null)")
992-
withSQLConf(SQLConf.LEGACY_SIZE_OF_NULL.key -> "false") {
993-
checkSparkAnswerAndOperator(sql(s"select size(col) from $table"))
994-
}
995-
withSQLConf(
996-
SQLConf.LEGACY_SIZE_OF_NULL.key -> "true",
997-
SQLConf.ANSI_ENABLED.key -> "false") {
998-
checkSparkAnswerAndOperator(sql(s"select size(col) from $table"))
999-
}
988+
withTable(table) {
989+
sql(s"create table $table(col array<string>) using parquet")
990+
sql(s"insert into $table values(null)")
991+
withSQLConf(SQLConf.LEGACY_SIZE_OF_NULL.key -> "false") {
992+
checkSparkAnswerAndOperator(sql(s"select size(col) from $table"))
993+
}
994+
withSQLConf(
995+
SQLConf.LEGACY_SIZE_OF_NULL.key -> "true",
996+
SQLConf.ANSI_ENABLED.key -> "false") {
997+
checkSparkAnswerAndOperator(sql(s"select size(col) from $table"))
1000998
}
1001999
}
10021000
}

spark/src/test/scala/org/apache/comet/CometCastSuite.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1540,13 +1540,8 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper {
15401540
}
15411541

15421542
test("cast ArrayType to StringType") {
1543-
val hasIncompatibleType = (dt: DataType) =>
1544-
if (CometConf.COMET_NATIVE_SCAN_IMPL.get() == "auto") {
1545-
true
1546-
} else {
1547-
!CometScanTypeChecker(CometConf.COMET_NATIVE_SCAN_IMPL.get())
1548-
.isTypeSupported(dt, "a", ListBuffer.empty)
1549-
}
1543+
val hasIncompatibleType =
1544+
(dt: DataType) => !CometScanTypeChecker().isTypeSupported(dt, "a", ListBuffer.empty)
15501545
Seq(
15511546
BooleanType,
15521547
StringType,

spark/src/test/scala/org/apache/comet/CometCsvExpressionSuite.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,7 @@ class CometCsvExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper
7070

7171
test("to_csv - with configurable formatting options") {
7272
val table = "t1"
73-
withSQLConf(
74-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
75-
CometConf.getExprAllowIncompatConfigKey(classOf[StructsToCsv]) -> "true") {
73+
withSQLConf(CometConf.getExprAllowIncompatConfigKey(classOf[StructsToCsv]) -> "true") {
7674
withTable(table) {
7775
val newLinesStr =
7876
""" abc

spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@ import java.time.{Duration, Period}
2323

2424
import scala.util.Random
2525

26-
import org.scalactic.source.Position
27-
import org.scalatest.Tag
28-
2926
import org.apache.hadoop.fs.Path
3027
import org.apache.spark.sql.{CometTestBase, DataFrame, Row}
3128
import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, FromUnixTime, Literal, StructsToJson, TruncDate, TruncTimestamp}
@@ -44,15 +41,6 @@ import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator}
4441
class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
4542
import testImplicits._
4643

47-
override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
48-
pos: Position): Unit = {
49-
super.test(testName, testTags: _*) {
50-
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_AUTO) {
51-
testFun
52-
}
53-
}
54-
}
55-
5644
val ARITHMETIC_OVERFLOW_EXCEPTION_MSG =
5745
"""[ARITHMETIC_OVERFLOW] integer overflow. If necessary set "spark.sql.ansi.enabled" to "false" to bypass this error"""
5846
val DIVIDE_BY_ZERO_EXCEPTION_MSG =
@@ -2516,7 +2504,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
25162504

25172505
withSQLConf(
25182506
CometConf.COMET_ENABLED.key -> "true",
2519-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
25202507
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
25212508

25222509
val df = spark.read.parquet(dir.toString())
@@ -2546,7 +2533,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
25462533

25472534
withSQLConf(
25482535
CometConf.COMET_ENABLED.key -> "true",
2549-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION,
25502536
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") {
25512537

25522538
val df = spark.read.parquet(dir.toString())
@@ -3014,7 +3000,6 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
30143000
CometConf.COMET_EXEC_ENABLED.key -> "true",
30153001
CometConf.COMET_ENABLED.key -> "true",
30163002
CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "false",
3017-
CometConf.COMET_NATIVE_SCAN_IMPL.key -> "native_datafusion",
30183003
SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true",
30193004
SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString,
30203005
// SPARK-53535 (Spark 4.1+) flipped the default to "false", which preserves the parent

0 commit comments

Comments
 (0)