fix: allow safe mixed Spark/Comet partial/final aggregate execution#4015
Conversation
Previously, when one aggregate stage (Partial or Final) couldn't be converted to Comet, the other was also blocked to avoid crashes from incompatible intermediate buffer formats (issues apache#1389, apache#1267). This change introduces per-aggregate `supportsMixedPartialFinal` declarations so that aggregates with simple, compatible buffers (MIN, MAX, COUNT, bitwise) can safely run in mixed mode while unsafe aggregates (SUM, AVG, Variance, CollectSet) continue to be blocked. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Restore `convert` scaladoc in `CometAggregateExpressionSerde` that was displaced when `supportsMixedPartialFinal` was added - Require `aggregateExpressions.nonEmpty` in `findPartialAggInPlan` so intermediate distinct-elimination stages (empty agg, group-by only) are not incorrectly tagged as the Partial to disable - Document that `canFinalAggregateBeConverted` mirrors the predicate checks in `CometBaseAggregate.doConvert` and must be kept in sync
|
@Shekharrajak This PR draws some inspiration from #2994. Thanks for the early work towards this. |
If the corresponding partial aggregate would also fail conversion to Comet (for example, collect_set on float is incompatible), tagging it early hijacks the more specific natural fallback reason. Only tag the partial when it would otherwise have been converted, so the tag guards genuine buffer-format mismatches rather than masking unrelated fallbacks. Generalize the convertibility predicate to accept an expected mode and mirror the mode-specific result-expression handling in doConvert.
… files findPartialAggInPlan was using a deep tree traversal that matched partial aggregates separated from the final by other aggregate stages. For Spark's distinct-aggregate rewrite, the partial for non-distinct aggs feeds into a PartialMerge stage rather than directly into the final, so tagging it as unsafe is incorrect and hijacks the natural 'Unsupported aggregation mode PartialMerge' fallback reason. Walk only through exchanges and AQE stages. Also regenerate TPC-DS plan-stability golden files for Spark 3.4, 3.5, and 4.0 to reflect the branch's new safe-mixed-execution behavior where the final aggregate converts to Comet when all aggregate functions have compatible intermediate buffer formats.
4b5d992 to
753a9a5
Compare
Arrow's row format, used by DataFusion's grouped hash aggregate for
composite group keys, does not support Map at any nesting level. The
existing guard in CometBaseAggregate.doConvert only matched top-level
MapType, so queries grouping by e.g. array<map<int,int>> crashed with
"Row format support not yet implemented for: [SortField { ... List(Map(...)) }]"
once the new mixed-partial-final path produced a Comet Final aggregate
over Spark-partial output.
Add a recursive QueryPlanSerde.containsMapType helper that walks into
ArrayType and StructType, and use it in both doConvert and
canAggregateBeConverted. Add a regression test exercising the failing
group-by.sql query shape from SQLQueryTestSuite.
|
might be related to #4003 |
…ressions Mixed COUNT partial/final regressed AQE's PropagateEmptyRelationAfterAQE (which matches BaseAggregateExec only, not CometHashAggregateExec) and the Spark 4.0 count-bug decorrelation for correlated IN subqueries (row dropped in in-count-bug.sql OR pattern). - Remove supportsMixedPartialFinal override from CometCount. - Update trait docstring to explain why COUNT is intentionally excluded. - Narrow the two "safe mixed" tests in CometExecRuleSuite to use only MIN/MAX, which remain mixed-safe. - Revert TPC-DS golden file regeneration from commit 753a9a5; those plan changes were driven by COUNT becoming mixed-safe. MIN, MAX, and bitwise aggregates retain supportsMixedPartialFinal = true.
Comet now accelerates the Partial/Final MIN aggregate in the subquery, which reduces the WholeStageCodegen subtree count below the hardcoded 3 the test asserts. Tag the test with IgnoreComet in the 3.4.3, 3.5.8, and 4.0.1 diffs, matching the pattern used for other plan-shape tests in ExplainSuite.
…l-final-aggregates # Conflicts: # .gitignore # dev/diffs/3.4.3.diff # spark/src/main/scala/org/apache/spark/sql/comet/operators.scala
|
In #2994 we saw improvements to some Spark plans in the stability suite and I do not see the same with this PR. Moving this to draft until I have time to check this. |
| case a: AQEShuffleReadExec => findPartialAggInPlan(a.child) | ||
| case s: ShuffleQueryStageExec => findPartialAggInPlan(s.plan) | ||
| case e: ShuffleExchangeExec => findPartialAggInPlan(e.child) | ||
| case _ => None |
There was a problem hiding this comment.
does it make sense to log unsupported exec?
| * support `Map`, so grouping on any type that transitively contains a map would crash in native | ||
| * execution. | ||
| */ | ||
| def containsMapType(dt: DataType): Boolean = dt match { |
There was a problem hiding this comment.
we already have this method in operators.scala we prob need to make this method generic. I'll check it in followup PR
|
|
||
| if (multiMode || sparkFinalMode) { | ||
| if (multiMode) { | ||
| return None |
|
|
||
| if (sparkFinalMode && | ||
| !QueryPlanSerde.allAggsSupportMixedExecution(aggregate.aggregateExpressions)) { | ||
| return None |
comphead
left a comment
There was a problem hiding this comment.
Thanks @andygrove left some minors
|
This PR requires conflicts to be resolved, and this will also fix the PR failed CI check |
…l-final-aggregates
- add withInfo for multiMode and Spark-Final-without-Comet-Partial early returns so EXPLAIN shows the reason - dedupe containsMapType: route operators.scala through QueryPlanSerde.containsMapType and drop the local private copy - log unrecognized passthrough in findPartialAggInPlan at debug level
|
Thanks for the review @comphead. I addressed your feedback and will go ahead and merge when CI is green. |
The previous commit removed the local containsMapType helper in operators.scala but left StructType in the imports list, breaking scalafix in the Lint Java jobs. It also added withInfo annotations for the multiMode and Spark-Final- without-Comet-Partial early returns in CometBaseAggregate, which surface in EXPLAIN output and therefore in the plan-stability golden files. Regenerate the affected TPC-DS golden files (q10, q24a, q24b, q35, q45, q70, q77 plus the v2.7 variants) so CometPlanStabilitySuite passes.
|
One workflow keeps failing - sql core-1 4.1.1 ... will keep retrying .. failures are unrelated to PR |
The test asserts a specific WholeStageCodegen subtree count, but Comet changes the plan shape by replacing operators with native versions, so the expected count no longer matches.
Which issue does this PR close?
Closes #1389.
Closes #2894.
Part of #1267.
Part of #2892.
Partial fix for #4242 (re-enables
COUNTfor mixed execution; the two regressions listed in that issue still need verification).Partial fix for #2889 (the
tagUnsafePartialAggregatesrule blocks mixed execution for BloomFilter, eliminating the crash; underlying buffer-format incompatibility is unchanged).Rationale for this change
When a Spark query's Final-mode aggregate cannot be converted to Comet (for example because its result expressions are not supported, as in
concat(flatten(collect_set(col)))), Comet would still convert the upstream Partial-mode aggregate. The Partial produces intermediate buffers in a format the Spark Final cannot interpret (different encodings forCollectSet,Average, decimalSum, variance, etc.), which crashes at runtime with errors such asNot supported on CometListVector.Conversely, most aggregates block even a safe Spark-Partial + Comet-Final combination, where the buffer formats are in fact compatible (
MIN,MAX,COUNT, bitwise).This change prevents the crash for unsafe aggregates and unlocks the mixed execution for the safe ones.
This PR improves Comet native coverage for TPC-DS.
What changes are included in this PR?
supportsMixedPartialFinalflag onCometAggregateExpressionSerde, defaulting tofalse. Set totrueforMIN,MAX,COUNT,BitAndAgg,BitOrAgg,BitXorAgg, whose intermediate buffer formats match between Spark and Comet.QueryPlanSerde.allAggsSupportMixedExecutionchecks the flag across an aggregate's expressions.CometExecRule.tagUnsafePartialAggregatesruns before bottom-up transformation. For each Final-mode aggregate whose expressions are not all mixed-safe, it conservatively checks whether the Final itself is convertible via the newcanFinalAggregateBeConverted(mirrors the predicates inCometBaseAggregate.doConvert). If not, the corresponding Partial (looked up byfindPartialAggInPlan, traversing throughAQEShuffleReadExecandShuffleQueryStageExec) is tagged withCOMET_UNSAFE_PARTIAL.CometBaseAggregate.doConverthonours the new tag, and now permits the Spark-Partial + Comet-Final case when all aggregates are mixed-safe.How are these changes tested?
CometExecRuleSuite:SUM(unsafe) is un-ignored; asserts neither side is converted.SUM; asserts neither side is converted.MIN/MAX/COUNT; asserts partial converts to Comet, final stays Spark.MIN/MAX/COUNT; asserts partial stays Spark, final converts to Comet.