Skip to content

Commit 143e9ca

Browse files
wjxiz1992claude
andauthored
[BUG] Dedup GpuBroadcastExchange across DPP subqueries in non-AQE mode (#14837)
Closes #14833 ## Summary In non-AQE mode with DPP, `GpuSubqueryBroadcastExec` builds its underlying `GpuBroadcastExchangeExec` directly during `GpuOverrides` (the first pass) via `exMeta.convertToGpu()`, bypassing `GpuTransitionOverrides`. The DPP-side broadcast therefore ends up with a structurally different child (missing `GpuCoalesceBatches` that `insertCoalesce`/`optimizeCoalesce` add on the main plan) than the join-side broadcast for the same logical CPU exchange. The `cpuCanonical` field is also computed after GPU rewriting on the DPP side but before it on the join side, so `ExchangeMappingCache` lookup by `cpuCanonical` also fails to match. Spark's `ReuseExchangeAndSubquery` rule cannot merge the two broadcasts, so the dim side is materialized and broadcast twice — defeating DPP's intended performance benefit. This adds `fixupNonAdaptiveBroadcastReuse` to `GpuTransitionOverrides`, gated by a new `spark.rapids.sql.nonAqeBroadcastReuseFixup.enable` conf (internal, default true). The pass collects main-plan `GpuBroadcastExchangeExec` instances, indexes them by `(mode.canonicalized, stripTransitions(child).canonicalized)` (where `stripTransitions` removes `GpuCoalesceBatches` so the structural difference does not block matching), then walks subquery expressions and rewrites any matching DPP-side broadcast to `ReusedExchangeExec` referencing the main-plan instance. Mirrors `fixupAdaptiveExchangeReuse`, which already handles the equivalent gap in AQE mode. The cross-runtime case (CPU `BroadcastHashJoin` + GPU DPP subquery when `array`/`struct` build keys force CPU fallback) is tracked separately by #14836 and not addressed here. ## Traceability - Issue: #14833 (filed before this PR per the issue-first rule) - Follow-on (proper root-cause fix tracked separately): #14892 — apply `GpuTransitionOverrides` to `GpuSubqueryBroadcast`'s broadcast child so the post-hoc fixup added here can be retired. - Migration PR (independent, adds the end-to-end DPP suite that exposes this bug): #14781 - Related: #14836 (cross-runtime fallback case, distinct root cause) - Existing analogue this fix mirrors: `GpuTransitionOverrides.fixupAdaptiveExchangeReuse`. ## Testing ### Unit coverage (added in this PR) `tests/src/test/scala/com/nvidia/spark/rapids/NonAqeBroadcastReuseFixupSuite.scala` (added per @res-life's review request) directly exercises `fixupNonAdaptiveBroadcastReuse`. It hand-builds the #14833 structural divergence — a main-plan `GpuBroadcastExchangeExec(GpuCoalesceBatches(range))` and a DPP-side `GpuBroadcastExchangeExec(range)` sharing one dim-side `range` (so their broadcast modes canonicalize identically, but the children differ by exactly the `GpuCoalesceBatches` wrap): - `fixupNonAdaptiveBroadcastReuse rewrites matching DPP broadcast to ReusedExchangeExec` — once `stripGpuCoalesceBatches` normalizes the structural difference, the DPP-side broadcast is rewritten to a `ReusedExchangeExec` pointing at the main-plan instance. - `fixupNonAdaptiveBroadcastReuse leaves plans with no main-plan broadcast unchanged` — the `mainPlanBroadcasts.isEmpty` early-exit returns the input plan unmodified. - `ENABLE_NON_AQE_BROADCAST_REUSE_FIXUP conf accessor flips with the kill switch` — the accessor reflects the kill switch and defaults to `true` via `createWithDefault(true)`. ``` mvn package -pl tests -am -Dbuildver=330 -Dmaven.repo.local=./.mvn-repo \ -DwildcardSuites=com.nvidia.spark.rapids.NonAqeBroadcastReuseFixupSuite \ -Drapids.test.gpu.allocFraction=0.3 -Drapids.test.gpu.maxAllocFraction=0.3 \ -Drapids.test.gpu.minAllocFraction=0 -s jenkins/settings.xml -P mirror-apache-to-urm ``` → **`Tests: succeeded 3, failed 0`** (verified locally on Apache Spark 3.3, buildver 330) ### End-to-end (DPP suite from #14781, applied locally) Validated against Apache Spark 3.3 in a worktree, with the migrated `RapidsDynamicPartitionPruningV1Suite` from #14781 applied locally and the four `#14833` KNOWN_ISSUE excludes removed: ``` mvn package -pl tests -am -Dbuildver=330 -Dmaven.repo.local=./.mvn-repo \ -DwildcardSuites=org.apache.spark.sql.rapids.suites.RapidsDynamicPartitionPruningV1SuiteAEOff,org.apache.spark.sql.rapids.suites.RapidsDynamicPartitionPruningV1SuiteAEOn \ -Drapids.test.gpu.allocFraction=0.3 -Drapids.test.gpu.maxAllocFraction=0.3 \ -Drapids.test.gpu.minAllocFraction=0 -s jenkins/settings.xml -P mirror-apache-to-urm ``` - `RapidsDynamicPartitionPruningV1SuiteAEOff`: **`Tests: succeeded 34, failed 1`** (the one failure is `SPARK-32659` — known partial-fallback case from #14836, unrelated to this fix). The four previously-failing tests now pass: - `avoid reordering broadcast join keys to match input hash partitioning` - `Plan broadcast pruning only when the broadcast can be reused` - `SPARK-32817: DPP throws error when the broadcast side is empty` - `SPARK-38148: Do not add dynamic partition pruning if there exists static partition pruning` - `RapidsDynamicPartitionPruningV1SuiteAEOn`: **`Tests: succeeded 31, failed 0`** — no AQE regression. - `BroadcastHashJoinSuite` smoke: `Tests: succeeded 2, failed 0`. ### Cross-shim compile (per CLAUDE.md shim coverage rule) - `mvn package -DskipTests -pl sql-plugin -am -Dbuildver=330`: BUILD SUCCESS - `mvn package -DskipTests -pl sql-plugin -am -Dbuildver=340`: BUILD SUCCESS - `mvn package -DskipTests -pl sql-plugin -am -Dbuildver=400` (via `scala2.13/`): BUILD SUCCESS `scripts/check-shim-coverage.sh`: no shim files changed, no `Origin.context` leaks. ## Performance impact Cold-path analysis. The pass runs once at the end of `GpuTransitionOverrides` per query plan: - For queries without `GpuBroadcastExchangeExec` (the vast majority), the pass exits early after a single `SparkPlan.foreach` traversal (no `transformAllExpressions` invocation). - For queries with broadcasts and DPP, the work is O(n) plan traversal plus O(m) canonical computation on small subquery trees, where m is the size of the DPP subquery (typically a few nodes). This is negligible compared to query planning overhead and execution time. No benchmark required. ## Test recovery follow-up The fix has direct unit coverage in this PR (`NonAqeBroadcastReuseFixupSuite`). The end-to-end DPP recovery additionally depends on #14781, which adds the migrated `RapidsDynamicPartitionPruningV1Suite`. After both this fix and #14781 merge, a small follow-up PR removes the four `#14833` KNOWN_ISSUE excludes in `RapidsTestSettings.scala` so the migrated suite exercises the fix in CI. The two PRs are independent but both required for end-to-end recovery. Documentation - [ ] Updated for new or modified user-facing features or behaviors - [x] No user-facing change Testing - [x] Added or modified tests to cover new code paths - [ ] Covered by existing tests (Please provide the names of the existing tests in the PR description.) - [ ] Not required Performance - [ ] Tests ran and results are added in the PR description - [ ] Issue filed with a link in the PR description - [x] Not required --------- Signed-off-by: Allen Xu <allxu@nvidia.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent f10cd40 commit 143e9ca

3 files changed

Lines changed: 269 additions & 1 deletion

File tree

sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExecBase,
3838
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, Exchange, ReusedExchangeExec, ShuffleExchangeLike}
3939
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
4040
import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec, GpuShuffleEnv, GpuTaskMetrics}
41-
import org.apache.spark.sql.rapids.execution.{ExchangeMappingCache, GpuBroadcastExchangeExec, GpuBroadcastExchangeExecBase, GpuBroadcastToRowExec, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase}
41+
import org.apache.spark.sql.rapids.execution.{ExchangeMappingCache, GpuBroadcastExchangeExec, GpuBroadcastExchangeExecBase, GpuBroadcastToRowExec, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase, GpuSubqueryBroadcastExec}
4242
import org.apache.spark.sql.types.StructType
4343

4444
/**
@@ -759,6 +759,77 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
759759
}
760760
}
761761

762+
/**
763+
* In non-AQE mode with DPP, GpuSubqueryBroadcastExec builds its underlying
764+
* GpuBroadcastExchangeExec directly in GpuOverrides without going through this rule.
765+
* The DPP-side broadcast therefore has a structurally different child (e.g. missing
766+
* GpuCoalesceBatches that this rule inserts on the main plan) than the join-side
767+
* broadcast for the same logical CPU exchange, and its cpuCanonical is also computed
768+
* after GPU rewriting so it does not match the join-side cpuCanonical. Spark's
769+
* ReuseExchangeAndSubquery rule does not merge them, so the dim side is materialized
770+
* twice and DPP loses its intended performance benefit.
771+
*
772+
* This pass walks subquery expressions in the final plan, identifies the DPP-side
773+
* GpuBroadcastExchangeExec inside a GpuSubqueryBroadcastExec, and matches it against
774+
* the main-plan GpuBroadcastExchangeExec instances by (mode, child canonical form with
775+
* GpuCoalesceBatches stripped — see stripGpuCoalesceBatches below). When a match is found,
776+
* the DPP-side broadcast is rewritten to ReusedExchangeExec referencing the join-side
777+
* instance.
778+
*/
779+
private[rapids] def fixupNonAdaptiveBroadcastReuse(p: SparkPlan): SparkPlan = {
780+
// Normalize a plan for signature matching by removing GpuCoalesceBatches wraps. The main-plan
781+
// broadcast picks these up from insertCoalesce / optimizeCoalesce but the DPP-side broadcast
782+
// (built earlier in GpuOverrides without going through GpuTransitionOverrides) does not, so
783+
// we have to strip them on both sides before comparing canonical forms. This is the only
784+
// structural difference observed in practice; other transitions (host->device, etc.) live
785+
// outside the broadcast subtree and never reach this helper.
786+
def stripGpuCoalesceBatches(plan: SparkPlan): SparkPlan = plan match {
787+
case g: GpuCoalesceBatches => stripGpuCoalesceBatches(g.child)
788+
case other => other.withNewChildren(other.children.map(stripGpuCoalesceBatches))
789+
}
790+
791+
def signature(g: GpuBroadcastExchangeExec): (Any, SparkPlan) =
792+
(g.mode.canonicalized, stripGpuCoalesceBatches(g.child).canonicalized)
793+
794+
// Collect all main-plan GpuBroadcastExchangeExec instances. SparkPlan.foreach only walks
795+
// the plan-tree children and does NOT descend into ExecSubqueryExpression plans, so
796+
// DPP-side broadcasts (which live inside GpuSubqueryBroadcastExec under a subquery
797+
// expression) are naturally excluded from this collection — exactly what we want, because
798+
// those are the instances the transformAllExpressions pass below will rewrite.
799+
val mainPlanBroadcasts = mutable.ArrayBuffer.empty[GpuBroadcastExchangeExec]
800+
p.foreach {
801+
case g: GpuBroadcastExchangeExec => mainPlanBroadcasts += g
802+
case _ =>
803+
}
804+
if (mainPlanBroadcasts.isEmpty) return p
805+
806+
val bySig = mainPlanBroadcasts.groupBy(signature).map {
807+
case (sig, instances) => sig -> instances.head
808+
}
809+
810+
p.transformAllExpressions {
811+
case sub: ExecSubqueryExpression if sub.plan.isInstanceOf[GpuSubqueryBroadcastExec] =>
812+
val gsb = sub.plan.asInstanceOf[GpuSubqueryBroadcastExec]
813+
gsb.child match {
814+
case dpp: GpuBroadcastExchangeExec =>
815+
bySig.get(signature(dpp)) match {
816+
case Some(matched) if !(matched eq dpp) =>
817+
// Use dpp.output (not matched.output) so the reused exchange exposes the
818+
// DPP-side attributes that downstream subquery expressions reference, while
819+
// reading from the matched main-plan exchange. The AQE fixup
820+
// fixupAdaptiveExchangeReuse uses the same shape (its g.output is the
821+
// DPP-side attributes from the in-pass collected map).
822+
val reused = ReusedExchangeExec(dpp.output, matched)
823+
val newGsb = gsb.withNewChildren(Seq(reused))
824+
.asInstanceOf[GpuSubqueryBroadcastExec]
825+
sub.withNewPlan(newGsb)
826+
case _ => sub
827+
}
828+
case _ => sub
829+
}
830+
}
831+
}
832+
762833
private def insertStageLevelMetrics(plan: SparkPlan): Unit = {
763834
val sc = SparkSession.active.sparkContext
764835
val gen = new AtomicInteger(0)
@@ -850,6 +921,10 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
850921
plan.conf.adaptiveExecutionEnabled && plan.conf.exchangeReuseEnabled) {
851922
updatedPlan = fixupAdaptiveExchangeReuse(updatedPlan)
852923
}
924+
if (rapidsConf.isNonAqeBroadcastReuseFixupEnabled &&
925+
!plan.conf.adaptiveExecutionEnabled && plan.conf.exchangeReuseEnabled) {
926+
updatedPlan = fixupNonAdaptiveBroadcastReuse(updatedPlan)
927+
}
853928

854929
if (rapidsConf.isTagLoreIdEnabled) {
855930
updatedPlan = GpuLore.tagForLore(updatedPlan, rapidsConf)

sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2748,6 +2748,18 @@ val SHUFFLE_COMPRESSION_LZ4_CHUNK_SIZE = conf("spark.rapids.shuffle.compression.
27482748
.booleanConf
27492749
.createWithDefault(true)
27502750

2751+
val ENABLE_NON_AQE_BROADCAST_REUSE_FIXUP =
2752+
conf("spark.rapids.sql.nonAqeBroadcastReuseFixup.enable")
2753+
.doc("Option to turn on the fixup of broadcast exchange reuse for DPP " +
2754+
"subqueries when AQE is disabled. The DPP-side GpuBroadcastExchange is built " +
2755+
"during GpuOverrides and bypasses GpuTransitionOverrides, so it does not match " +
2756+
"the join-side broadcast canonically. This fixup builds a per-query signature map " +
2757+
"of join-side GpuBroadcastExchangeExec nodes in the main plan and rewrites a " +
2758+
"matching DPP-side broadcast to ReusedExchangeExec.")
2759+
.internal()
2760+
.booleanConf
2761+
.createWithDefault(true)
2762+
27512763
val CHUNKED_PACK_POOL_SIZE = conf("spark.rapids.sql.chunkedPack.poolSize")
27522764
.doc("Amount of GPU memory (in bytes) to set aside at startup for the chunked pack " +
27532765
"scratch space, needed during spill from GPU to host memory. As a rule of thumb, each " +
@@ -4012,6 +4024,9 @@ class RapidsConf(conf: Map[String, String]) extends Logging {
40124024

40134025
lazy val isAqeExchangeReuseFixupEnabled: Boolean = get(ENABLE_AQE_EXCHANGE_REUSE_FIXUP)
40144026

4027+
lazy val isNonAqeBroadcastReuseFixupEnabled: Boolean =
4028+
get(ENABLE_NON_AQE_BROADCAST_REUSE_FIXUP)
4029+
40154030
lazy val chunkedPackPoolSize: Long = get(CHUNKED_PACK_POOL_SIZE)
40164031

40174032
lazy val chunkedPackBounceBufferSize: Long = get(CHUNKED_PACK_BOUNCE_BUFFER_SIZE)
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Copyright (c) 2026, NVIDIA CORPORATION.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.nvidia.spark.rapids
18+
19+
import org.apache.spark.SparkConf
20+
import org.apache.spark.sql.catalyst.expressions.NamedExpression
21+
import org.apache.spark.sql.catalyst.plans.logical.Range
22+
import org.apache.spark.sql.execution.{FilterExec, RangeExec, SparkPlan}
23+
import org.apache.spark.sql.execution.{InSubqueryExec => SparkInSubqueryExec}
24+
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
25+
import org.apache.spark.sql.execution.joins.HashedRelationBroadcastMode
26+
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExec, GpuSubqueryBroadcastExec}
27+
28+
/**
29+
* Unit coverage for `GpuTransitionOverrides.fixupNonAdaptiveBroadcastReuse`, the non-AQE
30+
* counterpart of `fixupAdaptiveExchangeReuse` already exercised by `ReusedExchangeFixupSuite`.
31+
*
32+
* The non-AQE fixup only fires for a `GpuBroadcastExchangeExec` that lives inside a
33+
* `GpuSubqueryBroadcastExec` referenced by an `ExecSubqueryExpression` (DPP). A faithful
34+
* test plan therefore needs to wire up both the main-plan broadcast and a hand-built DPP
35+
* subquery expression.
36+
*/
37+
class NonAqeBroadcastReuseFixupSuite extends SparkQueryCompareTestSuite {
38+
39+
private val baseConf: SparkConf = new SparkConf()
40+
.set("spark.sql.adaptive.enabled", "false")
41+
.set("spark.sql.exchange.reuse", "true")
42+
.set(RapidsConf.ENABLE_NON_AQE_BROADCAST_REUSE_FIXUP.key, "true")
43+
44+
private def newRange(): RangeExec = RangeExec(Range(1, 2, 1, Some(1)))
45+
46+
/** Wrap a plan in `GpuCoalesceBatches` so the signature must `stripGpuCoalesceBatches`. */
47+
private def coalesce(child: SparkPlan): GpuCoalesceBatches =
48+
GpuCoalesceBatches(child, TargetSize(1L << 20))
49+
50+
/**
51+
* Build a `GpuBroadcastExchangeExec` over `child`. The broadcast mode is keyed on
52+
* `leafRange.output`, so callers can wrap `child` in `GpuCoalesceBatches` without affecting
53+
* the mode keys — exactly mirroring real DPP, where the dim-side attributes are pinned by
54+
* the join's build expression while only the main-plan broadcast's child gets a coalesce
55+
* wrap from `insertCoalesce` / `optimizeCoalesce`.
56+
*/
57+
private def newGpuBroadcast(child: SparkPlan, leafRange: RangeExec): GpuBroadcastExchangeExec = {
58+
val mode = HashedRelationBroadcastMode(leafRange.output)
59+
GpuBroadcastExchangeExec(mode, child)(BroadcastExchangeExec(mode, child))
60+
}
61+
62+
private def newGpuSubqueryBroadcast(
63+
child: GpuBroadcastExchangeExec,
64+
leafRange: RangeExec): GpuSubqueryBroadcastExec = {
65+
val keys = Seq(leafRange.output.head)
66+
GpuSubqueryBroadcastExec("dpp", Seq(0), keys, child)(modeKeys = Some(keys))
67+
}
68+
69+
/**
70+
* Build a synthetic non-AQE DPP plan that mirrors the actual #14833 structural divergence:
71+
*
72+
* FilterExec(
73+
* condition = InSubqueryExec(plan = GpuSubqueryBroadcastExec(child = dppG)),
74+
* child = mainG // GpuBroadcastExchangeExec(child = GpuCoalesceBatches(range))
75+
* )
76+
*
77+
* The dim-side `range` is shared between `mainG` and `dppG` so their broadcast modes
78+
* (keyed on `range.output`) canonicalize identically — matching how real DPP wires both
79+
* broadcasts off the same logical filter sub-plan. The children diverge structurally
80+
* only in the `GpuCoalesceBatches` wrap that `insertCoalesce` / `optimizeCoalesce`
81+
* applies on the main-plan side:
82+
* - `mainG.child` = `GpuCoalesceBatches(range)`
83+
* - `dppG.child` = `range`
84+
*
85+
* Without the production `stripGpuCoalesceBatches` normalization, the canonical comparison
86+
* fails because the two children differ by exactly that wrap. With the normalization, both
87+
* reduce to `range.canonicalized`, the signatures match, and the DPP-side broadcast is
88+
* rewritten to `ReusedExchangeExec` pointing at `mainG`.
89+
*/
90+
private def buildDppPlan(): (SparkPlan, GpuBroadcastExchangeExec, GpuBroadcastExchangeExec) = {
91+
val range = newRange()
92+
val mainG = newGpuBroadcast(coalesce(range), range)
93+
val dppG = newGpuBroadcast(range, range)
94+
val gsb = newGpuSubqueryBroadcast(dppG, range)
95+
val inSub = SparkInSubqueryExec(range.output.head, gsb, NamedExpression.newExprId)
96+
(FilterExec(inSub, mainG), mainG, dppG)
97+
}
98+
99+
private def reusedExchangeChildren(p: SparkPlan): Seq[SparkPlan] = {
100+
val collected = scala.collection.mutable.ArrayBuffer.empty[SparkPlan]
101+
p.foreach { node =>
102+
node.expressions.foreach(_.foreach {
103+
case sub: SparkInSubqueryExec =>
104+
sub.plan match {
105+
case gsb: GpuSubqueryBroadcastExec =>
106+
gsb.child match {
107+
case r: ReusedExchangeExec => collected += r.child
108+
case _ =>
109+
}
110+
case _ =>
111+
}
112+
case _ =>
113+
})
114+
}
115+
collected.toSeq
116+
}
117+
118+
test("fixupNonAdaptiveBroadcastReuse rewrites matching DPP broadcast to ReusedExchangeExec") {
119+
withGpuSparkSession(_ => {
120+
val (plan, mainG, _) = buildDppPlan()
121+
val updated = new GpuTransitionOverrides().fixupNonAdaptiveBroadcastReuse(plan)
122+
123+
val reusedChildren = reusedExchangeChildren(updated)
124+
assert(reusedChildren.size == 1,
125+
s"expected exactly one ReusedExchangeExec under the GpuSubqueryBroadcastExec, got " +
126+
s"${reusedChildren.size} in:\n${updated.treeString}")
127+
assert(reusedChildren.head eq mainG,
128+
s"expected the ReusedExchangeExec to point at the main-plan broadcast G1, got " +
129+
s"${reusedChildren.head}")
130+
}, baseConf)
131+
}
132+
133+
test("fixupNonAdaptiveBroadcastReuse leaves plans with no main-plan broadcast unchanged") {
134+
// No GpuBroadcastExchangeExec in the main plan; just a RangeExec wrapped in a Filter whose
135+
// condition still references a DPP-side GpuSubqueryBroadcastExec. The early-exit at the
136+
// `if (mainPlanBroadcasts.isEmpty) return p` line in fixupNonAdaptiveBroadcastReuse should
137+
// return the input plan unmodified.
138+
withGpuSparkSession(_ => {
139+
val range = newRange()
140+
val dppG = newGpuBroadcast(range, range)
141+
val gsb = newGpuSubqueryBroadcast(dppG, range)
142+
val inSub = SparkInSubqueryExec(range.output.head, gsb, NamedExpression.newExprId)
143+
val plan = FilterExec(inSub, range)
144+
145+
val updated = new GpuTransitionOverrides().fixupNonAdaptiveBroadcastReuse(plan)
146+
assert(updated eq plan,
147+
s"expected the plan to be returned unchanged when mainPlanBroadcasts.isEmpty, got:\n" +
148+
s"${updated.treeString}")
149+
}, baseConf)
150+
}
151+
152+
test("ENABLE_NON_AQE_BROADCAST_REUSE_FIXUP conf accessor flips with the kill switch") {
153+
// Scope: this test validates ONLY that the `RapidsConf.isNonAqeBroadcastReuseFixupEnabled`
154+
// accessor reflects the conf key. The plan-level gate inside `GpuTransitionOverrides.apply`
155+
// (the `if (rapidsConf.isNonAqeBroadcastReuseFixupEnabled ...)` block, identified by code
156+
// shape rather than line number so this comment doesn't drift) reads this accessor;
157+
// exercising the full gate against a real plan needs a GPU-routed end-to-end run and is
158+
// covered by `RapidsDynamicPartitionPruningV1SuiteAEOff` rather than this unit suite.
159+
val killSwitchConf = baseConf.clone()
160+
.set(RapidsConf.ENABLE_NON_AQE_BROADCAST_REUSE_FIXUP.key, "false")
161+
withGpuSparkSession(spark => {
162+
val rapidsConf = new RapidsConf(spark.sessionState.conf)
163+
assert(!rapidsConf.isNonAqeBroadcastReuseFixupEnabled,
164+
"isNonAqeBroadcastReuseFixupEnabled should be false when the kill switch is set")
165+
}, killSwitchConf)
166+
167+
// Use a SparkConf that does NOT set the kill-switch key, so the assertion really
168+
// exercises the `createWithDefault(true)` default rather than a redundant "true" override.
169+
val defaultConf = new SparkConf()
170+
.set("spark.sql.adaptive.enabled", "false")
171+
.set("spark.sql.exchange.reuse", "true")
172+
withGpuSparkSession(spark => {
173+
val rapidsConf = new RapidsConf(spark.sessionState.conf)
174+
assert(rapidsConf.isNonAqeBroadcastReuseFixupEnabled,
175+
"isNonAqeBroadcastReuseFixupEnabled should default to true (createWithDefault(true))")
176+
}, defaultConf)
177+
}
178+
}

0 commit comments

Comments
 (0)