Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f7fa33c
fix: allow safe mixed Spark/Comet partial/final aggregate execution
andygrove Apr 21, 2026
f2a8207
fix: address review feedback on mixed partial/final aggregate guard
andygrove Apr 21, 2026
9826403
fix: skip partial aggregate tag when partial itself cannot be converted
andygrove Apr 21, 2026
753a9a5
fix: narrow partial aggregate tag lookup and regenerate TPC-DS golden…
andygrove Apr 21, 2026
6ae483d
fix: reject grouping on nested map types in hash aggregate conversion
andygrove Apr 21, 2026
53405f6
fix: remove COUNT from mixed-safe aggregates to fix AQE/count-bug reg…
andygrove Apr 22, 2026
9e2c25a
spotless
andygrove Apr 22, 2026
f53e3c1
test: ignore SPARK-33853 explain codegen subquery test under Comet
andygrove Apr 23, 2026
3285485
Merge remote-tracking branch 'apache/main' into fix/safe-mixed-partia…
andygrove Apr 25, 2026
671afa6
Merge remote-tracking branch 'apache/main' into fix/safe-mixed-partia…
andygrove May 6, 2026
4322852
test: regenerate Spark 4.2 TPC-DS golden files after merge from main
andygrove May 6, 2026
12018c3
Merge remote-tracking branch 'apache/main' into fix/safe-mixed-partia…
andygrove May 20, 2026
43e0c0b
fix: address review feedback on safe mixed aggregate guard
andygrove May 20, 2026
f0437e0
feat: re-enable COUNT for mixed Spark partial / Comet final aggregates
andygrove May 20, 2026
6bd58c8
Merge remote-tracking branch 'apache/main' into feat/count-mixed-part…
andygrove May 21, 2026
8b83454
fix: canonicalize broadcast mode in CometBroadcastExchangeExec [skip ci]
andygrove May 21, 2026
f813be0
chore: drop unrelated .gitignore change [skip ci]
andygrove May 21, 2026
bc53538
fix: include mode and output in CometBroadcastExchangeExec equals/has…
andygrove May 21, 2026
cea167a
fix: correct count-bug root cause to CometHashAggregateExec canonical…
andygrove May 22, 2026
e30d4e8
test: ignore Spark plan-shape tests that change under Comet
andygrove May 22, 2026
06f8b59
Merge remote-tracking branch 'apache/main' into feat/count-mixed-part…
andygrove May 22, 2026
b7cc66f
test: regenerate TPCDS plan stability golden files
andygrove May 22, 2026
8f0ca4e
Merge branch 'main' into feat/count-mixed-partial-final
andygrove May 22, 2026
8b5f172
test: link SPARK-35442 IgnoreComet to #4412 and unignore SPARK-34980/…
andygrove May 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 112 additions & 17 deletions dev/diffs/3.4.3.diff
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,30 @@ index 0efe0877e9b..423d3b3d76d 100644
--
-- SELECT_HAVING
-- https://github.com/postgres/postgres/blob/REL_12_BETA2/src/test/regress/sql/select_having.sql
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
index c7c09bf7c79..5eaa5222142 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CTEInlineSuite.scala
@@ -280,7 +280,8 @@ abstract class CTEInlineSuiteBase
}
}

- test("CTE Predicate push-down and column pruning") {
+ test("CTE Predicate push-down and column pruning",
+ IgnoreComet("Comet changes the exchange reuse count asserted by this test")) {
withTempView("t") {
Seq((0, 1), (1, 2)).toDF("c1", "c2").createOrReplaceTempView("t")
val df = sql(
@@ -330,7 +331,8 @@ abstract class CTEInlineSuiteBase
}
}

- test("CTE Predicate push-down and column pruning - combined predicate") {
+ test("CTE Predicate push-down and column pruning - combined predicate",
+ IgnoreComet("Comet changes the exchange reuse count asserted by this test")) {
withTempView("t") {
Seq((0, 1, 2), (1, 2, 3)).toDF("c1", "c2", "c3").createOrReplaceTempView("t")
val df = sql(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index cf40e944c09..bdd5be4f462 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
Expand Down Expand Up @@ -497,7 +521,7 @@ index f33432ddb6f..b375e285dde 100644
}
assert(scanOption.isDefined)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index a6b295578d6..91acca4306f 100644
index a6b295578d6..1167bbe6554 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -260,7 +260,8 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
Expand Down Expand Up @@ -1474,7 +1498,7 @@ index ac710c32296..2854b433dd3 100644

import testImplicits._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 593bd7bb4ba..32af28b0238 100644
index 593bd7bb4ba..7f68e3bd8d3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -26,9 +26,11 @@ import org.scalatest.time.SpanSugar._
Expand Down Expand Up @@ -1742,7 +1766,27 @@ index 593bd7bb4ba..32af28b0238 100644
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT key FROM testData GROUP BY key")
@@ -1599,7 +1628,7 @@ class AdaptiveQueryExecSuite
@@ -1541,7 +1570,8 @@ class AdaptiveQueryExecSuite
}
}

- test("SPARK-35442: Support propagate empty relation through aggregate") {
+ test("SPARK-35442: Support propagate empty relation through aggregate",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult(
"SELECT key, count(*) FROM testData WHERE value = 'no_match' GROUP BY key")
@@ -1560,7 +1590,8 @@ class AdaptiveQueryExecSuite
}
}

- test("SPARK-35442: Support propagate empty relation through union") {
+ test("SPARK-35442: Support propagate empty relation through union",
+ IgnoreComet("https://github.com/apache/datafusion-comet/issues/4412")) {
def checkNumUnion(plan: SparkPlan, numUnion: Int): Unit = {
assert(
collect(plan) {
@@ -1599,7 +1630,7 @@ class AdaptiveQueryExecSuite
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT id FROM v1 GROUP BY id DISTRIBUTE BY id")
assert(collect(adaptivePlan) {
Expand All @@ -1751,7 +1795,7 @@ index 593bd7bb4ba..32af28b0238 100644
}.length == 1)
}
}
@@ -1679,7 +1708,8 @@ class AdaptiveQueryExecSuite
@@ -1679,7 +1710,8 @@ class AdaptiveQueryExecSuite
}
}

Expand All @@ -1761,7 +1805,7 @@ index 593bd7bb4ba..32af28b0238 100644
def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
find(plan) {
case s: ShuffleExchangeLike =>
@@ -1864,6 +1894,9 @@ class AdaptiveQueryExecSuite
@@ -1864,6 +1896,9 @@ class AdaptiveQueryExecSuite
def checkNoCoalescePartitions(ds: Dataset[Row], origin: ShuffleOrigin): Unit = {
assert(collect(ds.queryExecution.executedPlan) {
case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s
Expand All @@ -1771,7 +1815,7 @@ index 593bd7bb4ba..32af28b0238 100644
}.size == 1)
ds.collect()
val plan = ds.queryExecution.executedPlan
@@ -1872,6 +1905,9 @@ class AdaptiveQueryExecSuite
@@ -1872,6 +1907,9 @@ class AdaptiveQueryExecSuite
}.isEmpty)
assert(collect(plan) {
case s: ShuffleExchangeExec if s.shuffleOrigin == origin && s.numPartitions == 2 => s
Expand All @@ -1781,7 +1825,15 @@ index 593bd7bb4ba..32af28b0238 100644
}.size == 1)
checkAnswer(ds, testData)
}
@@ -2028,7 +2064,8 @@ class AdaptiveQueryExecSuite
@@ -1901,6 +1939,7 @@ class AdaptiveQueryExecSuite
df.collect()
assert(collect(df.queryExecution.executedPlan) {
case u: UnionExec => u
+ case u: CometUnionExec => u.originalPlan.asInstanceOf[UnionExec]
}.size == numUnion)
assert(collect(df.queryExecution.executedPlan) {
case r: AQEShuffleReadExec => r
@@ -2028,7 +2067,8 @@ class AdaptiveQueryExecSuite
}
}

Expand All @@ -1791,7 +1843,7 @@ index 593bd7bb4ba..32af28b0238 100644
withTempView("t1", "t2") {
def checkJoinStrategy(shouldShuffleHashJoin: Boolean): Unit = {
Seq("100", "100000").foreach { size =>
@@ -2114,7 +2151,8 @@ class AdaptiveQueryExecSuite
@@ -2114,7 +2154,8 @@ class AdaptiveQueryExecSuite
}
}

Expand All @@ -1801,7 +1853,7 @@ index 593bd7bb4ba..32af28b0238 100644
withTempView("v") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
@@ -2213,7 +2251,7 @@ class AdaptiveQueryExecSuite
@@ -2213,7 +2254,7 @@ class AdaptiveQueryExecSuite
runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " +
s"JOIN skewData2 ON key1 = key2 GROUP BY key1")
val shuffles1 = collect(adaptive1) {
Expand All @@ -1810,7 +1862,7 @@ index 593bd7bb4ba..32af28b0238 100644
}
assert(shuffles1.size == 3)
// shuffles1.head is the top-level shuffle under the Aggregate operator
@@ -2226,7 +2264,7 @@ class AdaptiveQueryExecSuite
@@ -2226,7 +2267,7 @@ class AdaptiveQueryExecSuite
runAdaptiveAndVerifyResult(s"SELECT $repartition key1 FROM skewData1 " +
s"JOIN skewData2 ON key1 = key2")
val shuffles2 = collect(adaptive2) {
Expand All @@ -1819,7 +1871,7 @@ index 593bd7bb4ba..32af28b0238 100644
}
if (hasRequiredDistribution) {
assert(shuffles2.size == 3)
@@ -2260,7 +2298,8 @@ class AdaptiveQueryExecSuite
@@ -2260,7 +2301,8 @@ class AdaptiveQueryExecSuite
}
}

Expand All @@ -1829,15 +1881,15 @@ index 593bd7bb4ba..32af28b0238 100644
CostEvaluator.instantiate(
classOf[SimpleShuffleSortCostEvaluator].getCanonicalName, spark.sparkContext.getConf)
intercept[IllegalArgumentException] {
@@ -2404,6 +2443,7 @@ class AdaptiveQueryExecSuite
@@ -2404,6 +2446,7 @@ class AdaptiveQueryExecSuite
val (_, adaptive) = runAdaptiveAndVerifyResult(query)
assert(adaptive.collect {
case sort: SortExec => sort
+ case sort: CometSortExec => sort
}.size == 1)
val read = collect(adaptive) {
case read: AQEShuffleReadExec => read
@@ -2421,7 +2461,8 @@ class AdaptiveQueryExecSuite
@@ -2421,7 +2464,8 @@ class AdaptiveQueryExecSuite
}
}

Expand All @@ -1847,7 +1899,7 @@ index 593bd7bb4ba..32af28b0238 100644
withTempView("v") {
withSQLConf(
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED.key -> "true",
@@ -2533,7 +2574,7 @@ class AdaptiveQueryExecSuite
@@ -2533,7 +2577,7 @@ class AdaptiveQueryExecSuite
runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " +
"JOIN skewData3 ON value2 = value3")
val shuffles1 = collect(adaptive1) {
Expand All @@ -1856,7 +1908,7 @@ index 593bd7bb4ba..32af28b0238 100644
}
assert(shuffles1.size == 4)
val smj1 = findTopLevelSortMergeJoin(adaptive1)
@@ -2544,7 +2585,7 @@ class AdaptiveQueryExecSuite
@@ -2544,7 +2588,7 @@ class AdaptiveQueryExecSuite
runAdaptiveAndVerifyResult("SELECT key1 FROM skewData1 JOIN skewData2 ON key1 = key2 " +
"JOIN skewData3 ON value1 = value3")
val shuffles2 = collect(adaptive2) {
Expand Down Expand Up @@ -2327,7 +2379,7 @@ index bf5c51b89bb..f7402b7d883 100644
val e = testSchemaMismatch(dir.getCanonicalPath, vectorizedReaderEnabled = true)
assert(e.getCause.isInstanceOf[SparkException])
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
index 3a0bd35cb70..b28f06a757f 100644
index 3a0bd35cb70..99b70606261 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.debug
Expand All @@ -2338,7 +2390,27 @@ index 3a0bd35cb70..b28f06a757f 100644
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
@@ -124,7 +125,8 @@ class DebuggingSuite extends DebuggingSuiteBase with DisableAdaptiveExecutionSui
@@ -41,7 +42,8 @@ abstract class DebuggingSuiteBase extends SharedSparkSession {
testData.as[TestData].debug()
}

- test("debugCodegen") {
+ test("debugCodegen",
+ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) {
val df = spark.range(10).groupBy(col("id") * 2).count()
df.collect()
val res = codegenString(df.queryExecution.executedPlan)
@@ -50,7 +52,8 @@ abstract class DebuggingSuiteBase extends SharedSparkSession {
assert(res.contains("Object[]"))
}

- test("debugCodegenStringSeq") {
+ test("debugCodegenStringSeq",
+ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) {
val df = spark.range(10).groupBy(col("id") * 2).count()
df.collect()
val res = codegenStringSeq(df.queryExecution.executedPlan)
@@ -124,7 +127,8 @@ class DebuggingSuite extends DebuggingSuiteBase with DisableAdaptiveExecutionSui
| id LongType: {}""".stripMargin))
}

Expand Down Expand Up @@ -2448,6 +2520,29 @@ index d083cac48ff..3c11bcde807 100644

import testImplicits._

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index fdc633f3556..99474724c26 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -36,7 +36,7 @@ import org.apache.spark.internal.config.Status._
import org.apache.spark.rdd.RDD
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler._
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.{DataFrame, IgnoreComet, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
@@ -699,7 +699,8 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes
}

test("SPARK-29894 test Codegen Stage Id in SparkPlanInfo",
- DisableAdaptiveExecution("WSCG rule is applied later in AQE")) {
+ DisableAdaptiveExecution("WSCG rule is applied later in AQE"),
+ IgnoreComet("Comet changes the WholeStageCodegen subtree count")) {
// with AQE on, the WholeStageCodegen rule is applied when running QueryStageExec.
val df = createTestDataFrame.select(count("*"))
val sparkPlanInfo = SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
index 266bb343526..f8ad838e2b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
Expand Down
Loading
Loading