Skip to content

[SPARK-51068][SQL] Canonicalized CTEs to avoid cached result not being used and recomputed #50360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from

Conversation

nimesh1601
Copy link

What changes were proposed in this pull request?

In this PR we plan to
To check whether the plan exists in the cache or not, CacheManager matches the canonicalized version of the plan. Currently, in canonicalized versions, CTEIds are not handled, which results in unnecessary cache misses in cases where queries using CTE are stored. This issue starts after the commit to Avoid inlining non-deterministic With-CTEs in which each CTERelationDef and CTERelationRef were introduced and their canonicalization was not handled. So to fix this we need to add Canonicalize logic for CTEs too.

Why are the changes needed?

To fix the bug mentioned above.

Does this PR introduce any user-facing change?

Yes, this will now let cache cte to be reused
Before plan

>>>spark.sql("CACHE TABLE cached_cte AS WITH cte1 AS ( SELECT 1 AS id, 'Alice' AS name UNION ALL SELECT 2 AS id, 'Bob' AS name ), cte2 AS ( SELECT 1 AS id, 10 AS score UNION ALL SELECT 2 AS id, 20 AS score ) SELECT cte1.id, cte1.name, cte2.score FROM cte1 JOIN cte2 ON cte1.id = cte2.id");
DataFrame[]
>>> spark.sql("select count(*) from cached_cte").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=165]
      +- HashAggregate(keys=[], functions=[partial_count(1)])
         +- Project
            +- BroadcastHashJoin [id#120], [id#124], Inner, BuildRight, false
               :- Union
               :  :- Project [1 AS id#120]
               :  :  +- Scan OneRowRelation[]
               :  +- Project [2 AS id#122]
               :     +- Scan OneRowRelation[]
               +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=160]
                  +- Union
                     :- Project [1 AS id#124]
                     :  +- Scan OneRowRelation[]
                     +- Project [2 AS id#126]
                        +- Scan OneRowRelation[]

After this change

>>> spark.sql("CACHE TABLE cached_cte AS WITH cte1 AS ( SELECT 1 AS id, 'Alice' AS name UNION ALL SELECT 2 AS id, 'Bob' AS name ), cte2 AS ( SELECT 1 AS id, 10 AS score UNION ALL SELECT 2 AS id, 20 AS score ) SELECT cte1.id, cte1.name, cte2.score FROM cte1 JOIN cte2 ON cte1.id = cte2.id");
DataFrame[]
>>> spark.sql("select count(*) from cached_cte").explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(1)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=116]
      +- HashAggregate(keys=[], functions=[partial_count(1)])
         +- Scan In-memory table cached_cte
               +- InMemoryRelation [id#128, name#129, score#130], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- *(5) Project [id#8, name#9, score#13]
                        +- *(5) BroadcastHashJoin [id#8], [id#12], Inner, BuildRight, false
                           :- Union
                           :  :- *(1) Project [1 AS id#8, Alice AS name#9]
                           :  :  +- *(1) Scan OneRowRelation[]
                           :  +- *(2) Project [2 AS id#10, Bob AS name#11]
                           :     +- *(2) Scan OneRowRelation[]
                           +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=60]
                              +- Union
                                 :- *(3) Project [1 AS id#12, 10 AS score#13]
                                 :  +- *(3) Scan OneRowRelation[]
                                 +- *(4) Project [2 AS id#14, 20 AS score#15]
                                    +- *(4) Scan OneRowRelation[]

How was this patch tested?

Uts are added

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Mar 24, 2025
@nimesh1601
Copy link
Author

@cloud-fan can you check?

@nimesh1601 nimesh1601 closed this Mar 25, 2025
@nimesh1601 nimesh1601 reopened this Mar 28, 2025
@nimesh1601
Copy link
Author

@dongjoon-hyun can you review this ?

@nimesh1601
Copy link
Author

@HyukjinKwon can you take a look ?

@HyukjinKwon
Copy link
Member

Mind filing a JIRA, and linking it to the PR title? See also https://spark.apache.org/contributing.html

@nimesh1601 nimesh1601 changed the title Canonicalized CTEs to avoid cached result not being used and recomputed [SPARK-51068][SQL] Canonicalized CTEs to avoid cached result not being used and recomputed Apr 4, 2025
@nimesh1601
Copy link
Author

Mind filing a JIRA, and linking it to the PR title? See also https://spark.apache.org/contributing.html

My bad. Added Jira Link

@nimesh1601
Copy link
Author

@HyukjinKwon can you check now ?

@nimesh1601
Copy link
Author

@HyukjinKwon / @dongjoon-hyun can you check ?

@nimesh1601
Copy link
Author

@HyukjinKwon/ @dongjoon-hyun can you review this?

@HyukjinKwon
Copy link
Member

i am not super used to this code path. cc @peter-toth

@nimesh1601
Copy link
Author

Thanks @HyukjinKwon !! @peter-toth can you help here?

@peter-toth
Copy link
Contributor

I'm travelling this week, but happy to check it Friday.

@peter-toth
Copy link
Contributor

peter-toth commented Jun 26, 2025

I think this canonicalization makes sense. But why don't you just call a simple transformWithSubqueriesAndPruning() and if you encounter a CTERelationDef or a CTERelationRef just assign new ids (incrementing from 0) to them (and store the old id -> new id mapping in your map if the old id is not yet in it).

@nimesh1601
Copy link
Author

Thanks @peter-toth for your review !! Are you suggesting creating a separate function transformWithSubqueriesAndPruning for traversal?

@peter-toth
Copy link
Contributor

peter-toth commented Jun 26, 2025

Are you suggesting creating a separate function transformWithSubqueriesAndPruning for traversal?

No, I believe it already exists, but I might be wrong.

@nimesh1601
Copy link
Author

@peter-toth Thanks for the clarification. Couldn't find it in branch-3.3

@dongjoon-hyun
Copy link
Member

Hi, @nimesh1601 and all. Sorry for the interruption, but I need to close this PR to prevent accidental merging because this is opened to the End-Of-Life branch-3.3 branch mistakenly.

Please open the PR to master branch for further review and discussion safely.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants