Skip to content

Commit b7df937

Browse files
authored
feat: disable Comet by default when CometShuffleManager is not registered (#4328)
1 parent 0fd52b7 commit b7df937

3 files changed

Lines changed: 35 additions & 0 deletions

File tree

spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,16 @@ object CometSparkSessionExtensions extends Logging {
127127
return false
128128
}
129129

130+
if (COMET_EXEC_SHUFFLE_ENABLED.get(conf) && !isCometShuffleManagerEnabled(conf)) {
131+
logWarning(
132+
"Comet extension is disabled because spark.shuffle.manager is not set to " +
133+
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager. " +
134+
"Comet provides limited benefit without its shuffle manager. " +
135+
s"Set ${COMET_EXEC_SHUFFLE_ENABLED.key}=false to keep Comet enabled with " +
136+
"Spark's default shuffle manager.")
137+
return false
138+
}
139+
130140
// We don't support INT96 timestamps written by Apache Impala in a different timezone yet
131141
if (conf.getConf(SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION)) {
132142
logWarning(

spark/src/test/scala/org/apache/comet/CometSparkSessionExtensionsSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ class CometSparkSessionExtensionsSuite extends CometTestBase {
2929

3030
test("isCometLoaded") {
3131
val conf = new SQLConf
32+
// Disable Comet shuffle so this test can focus on other checks without needing
33+
// spark.shuffle.manager to be set.
34+
conf.setConfString(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "false")
3235

3336
conf.setConfString(CometConf.COMET_ENABLED.key, "false")
3437
assert(!isCometLoaded(conf))
@@ -50,6 +53,25 @@ class CometSparkSessionExtensionsSuite extends CometTestBase {
5053
NativeBase.setLoaded(true)
5154
}
5255

56+
test("isCometLoaded requires CometShuffleManager when shuffle.enabled=true") {
57+
val conf = new SQLConf
58+
conf.setConfString(CometConf.COMET_ENABLED.key, "true")
59+
60+
// Default: shuffle.enabled=true. Without spark.shuffle.manager set, Comet must be disabled.
61+
assert(!isCometLoaded(conf))
62+
63+
// Opt out: shuffle.enabled=false. Comet should load (assumes native lib is available).
64+
conf.setConfString(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "false")
65+
assert(isCometLoaded(conf))
66+
67+
// shuffle.enabled=true with the Comet shuffle manager registered: Comet should load.
68+
conf.setConfString(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
69+
conf.setConfString(
70+
"spark.shuffle.manager",
71+
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
72+
assert(isCometLoaded(conf))
73+
}
74+
5375
test("Arrow properties") {
5476
NativeBase.setLoaded(false)
5577
NativeBase.load()

spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ class CometPluginsSuite extends CometTestBase {
3131
conf.set("spark.executor.memory", "1G")
3232
conf.set("spark.executor.memoryOverhead", "2G")
3333
conf.set("spark.plugins", "org.apache.spark.CometPlugin")
34+
conf.set(
35+
"spark.shuffle.manager",
36+
"org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
3437
conf.set("spark.comet.enabled", "true")
3538
conf.set("spark.comet.exec.enabled", "true")
3639
conf.set("spark.comet.exec.onHeap.enabled", "true")

0 commit comments

Comments
 (0)