Skip to content

Commit d2e0393

Browse files
authored
chore: Run Spark 4.0 SQL tests with native_datafusion scan (#3728)
1 parent a5e238a commit d2e0393

2 files changed

Lines changed: 18 additions & 23 deletions

File tree

.github/workflows/spark_sql_test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,13 @@ jobs:
130130
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'auto'}
131131
- {spark-short: '3.5', spark-full: '3.5.8', java: 11, scan-impl: 'native_datafusion'}
132132
- {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto'}
133+
- {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'native_datafusion'}
133134
# Skip sql_hive-1 for Spark 4.0 due to https://github.com/apache/datafusion-comet/issues/2946
134135
exclude:
135136
- config: {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'auto'}
136137
module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"}
138+
- config: {spark-short: '4.0', spark-full: '4.0.1', java: 17, scan-impl: 'native_datafusion'}
139+
module: {name: "sql_hive-1", args1: "", args2: "hive/testOnly * -- -l org.apache.spark.tags.ExtendedHiveTest -l org.apache.spark.tags.SlowHiveTest"}
137140
fail-fast: false
138141
name: spark-sql-${{ matrix.config.scan-impl }}-${{ matrix.module.name }}/spark-${{ matrix.config.spark-full }}
139142
runs-on: ${{ matrix.os }}

dev/diffs/4.0.1.diff

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -574,7 +574,7 @@ index 81713c777bc..b5f92ed9742 100644
574574
assert(exchanges.size == 2)
575575
}
576576
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
577-
index 2c24cc7d570..3311e6e3773 100644
577+
index 2c24cc7d570..12096ea361e 100644
578578
--- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
579579
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
580580
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -669,7 +669,7 @@ index 2c24cc7d570..3311e6e3773 100644
669669
test("static scan metrics",
670670
- DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
671671
+ DisableAdaptiveExecution("DPP in AQE must reuse broadcast"),
672-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
672+
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3442")) {
673673
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
674674
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
675675
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
@@ -708,7 +708,7 @@ index 9c90e0105a4..fadf2f0f698 100644
708708

709709
test("SPARK-35884: Explain Formatted") {
710710
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
711-
index 9c529d14221..6cfd87ad864 100644
711+
index 9c529d14221..5c4e370dfff 100644
712712
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
713713
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
714714
@@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterTha
@@ -748,7 +748,7 @@ index 9c529d14221..6cfd87ad864 100644
748748
Seq("parquet", "orc").foreach { format =>
749749
- test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}") {
750750
+ test(s"Spark native readers should respect spark.sql.caseSensitive - ${format}",
751-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
751+
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) {
752752
withTempDir { dir =>
753753
val tableName = s"spark_25132_${format}_native"
754754
val tableDir = dir.getCanonicalPath + s"/$tableName"
@@ -2727,7 +2727,7 @@ index cd6f41b4ef4..4b6a17344bc 100644
27272727
ParquetOutputFormat.WRITER_VERSION -> ParquetProperties.WriterVersion.PARQUET_2_0.toString
27282728
)
27292729
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
2730-
index 6080a5e8e4b..dc64436164f 100644
2730+
index 6080a5e8e4b..cef477c8b4d 100644
27312731
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
27322732
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
27332733
@@ -38,6 +38,7 @@ import org.apache.parquet.schema.MessageType
@@ -2812,7 +2812,7 @@ index 6080a5e8e4b..dc64436164f 100644
28122812

28132813
- test("SPARK-25207: exception when duplicate fields in case-insensitive mode") {
28142814
+ test("SPARK-25207: exception when duplicate fields in case-insensitive mode",
2815-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
2815+
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3760")) {
28162816
withTempPath { dir =>
28172817
val count = 10
28182818
val tableName = "spark_25207"
@@ -3316,41 +3316,32 @@ index 0dd90925d3c..7d53ec845ef 100644
33163316
spark.range(10).selectExpr("id", "id % 3 as p")
33173317
.write.partitionBy("p").saveAsTable("testDataForScan")
33183318
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
3319-
index 0ab8691801d..f1c4b3d92b1 100644
3319+
index 0ab8691801d..b18a5bea944 100644
33203320
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
33213321
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFsSuite.scala
3322-
@@ -18,6 +18,8 @@
3322+
@@ -18,6 +18,7 @@
33233323
package org.apache.spark.sql.execution.python
33243324

33253325
import org.apache.spark.sql.catalyst.plans.logical.{ArrowEvalPython, BatchEvalPython, Limit, LocalLimit}
3326-
+import org.apache.spark.sql.IgnoreCometNativeDataFusion
33273326
+import org.apache.spark.sql.comet._
33283327
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SparkPlanTest}
33293328
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
33303329
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
3331-
@@ -93,7 +95,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
3332-
assert(arrowEvalNodes.size == 2)
3333-
}
3334-
3335-
- test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") {
3336-
+ test("Python UDF should not break column pruning/filter pushdown -- Parquet V1",
3337-
+ IgnoreCometNativeDataFusion("https://github.com/apache/datafusion-comet/issues/3311")) {
3338-
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
3339-
withTempPath { f =>
3340-
spark.range(10).select($"id".as("a"), $"id".as("b"))
3341-
@@ -108,6 +111,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
3330+
@@ -108,6 +109,8 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
33423331

33433332
val scanNodes = query.queryExecution.executedPlan.collect {
33443333
case scan: FileSourceScanExec => scan
33453334
+ case scan: CometScanExec => scan
3335+
+ case scan: CometNativeScanExec => scan
33463336
}
33473337
assert(scanNodes.length == 1)
33483338
assert(scanNodes.head.output.map(_.name) == Seq("a"))
3349-
@@ -120,11 +124,16 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
3339+
@@ -120,11 +123,18 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
33503340

33513341
val scanNodes = query.queryExecution.executedPlan.collect {
33523342
case scan: FileSourceScanExec => scan
33533343
+ case scan: CometScanExec => scan
3344+
+ case scan: CometNativeScanExec => scan
33543345
}
33553346
assert(scanNodes.length == 1)
33563347
// $"a" is not null and $"a" > 1
@@ -3359,21 +3350,22 @@ index 0ab8691801d..f1c4b3d92b1 100644
33593350
+ val dataFilters = scanNodes.head match {
33603351
+ case scan: FileSourceScanExec => scan.dataFilters
33613352
+ case scan: CometScanExec => scan.dataFilters
3353+
+ case scan: CometNativeScanExec => scan.dataFilters
33623354
+ }
33633355
+ assert(dataFilters.length == 2)
33643356
+ assert(dataFilters.flatMap(_.references.map(_.name)).distinct == Seq("a"))
33653357
}
33663358
}
33673359
}
3368-
@@ -145,6 +154,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
3360+
@@ -145,6 +155,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
33693361

33703362
val scanNodes = query.queryExecution.executedPlan.collect {
33713363
case scan: BatchScanExec => scan
33723364
+ case scan: CometBatchScanExec => scan
33733365
}
33743366
assert(scanNodes.length == 1)
33753367
assert(scanNodes.head.output.map(_.name) == Seq("a"))
3376-
@@ -157,6 +167,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
3368+
@@ -157,6 +168,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
33773369

33783370
val scanNodes = query.queryExecution.executedPlan.collect {
33793371
case scan: BatchScanExec => scan

0 commit comments

Comments
 (0)