[DO NOT REVIEW] Adds in support for a "CpuBridge" that lets us fall back to the CPU on a per-expression level instead of per-SparkPlan node level. [databricks]#13368
Conversation
a per expression level instead of per-SparkPlan node level. A lot fo this code was written with AI (specifically claude-4-sonnet through cursor) Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
|
build |
| /** | ||
| * Converts a CPU expression to a GPU expression. | ||
| */ | ||
| def convertToGpuBase(): Expression |
There was a problem hiding this comment.
I was a little confused about convertToGpuBase vs convertToGpu (the naming), and kind of wish we didn't use Base in the name. Something like: keeping convertToGpu for, well, converting to the GPU, and using something different for the function defined in line 1453 that indicates it's going to be either or. I do not know if this is possible or adds a lot more work.
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
|
build |
There was a problem hiding this comment.
Pull Request Overview
This PR introduces support for a "CpuBridge" feature that enables GPU-CPU hybrid execution at an expression level rather than falling back entire SparkPlan nodes to the CPU. The primary goal is to minimize data movement costs, utilize GPU resources more efficiently, and allow CPU expressions to run in parallel using a thread pool.
Key changes:
- Adds CPU bridge infrastructure with thread pool for parallel CPU expression evaluation
- Modifies expression metadata system to support bridge decision-making and optimization
- Updates method signatures from
convertToGpu()toconvertToGpuImpl()and introduces new wrapper logic
Reviewed Changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| RapidsMeta.scala | Core infrastructure for CPU bridge support with expression analysis and optimization logic |
| RapidsConf.scala | Configuration options for enabling CPU bridge and controlling which expressions can use it |
| GpuOverrides.scala | Updates expression metadata classes to use new convertToGpuImpl() method signature |
| GpuCpuBridgeExpression.scala | Main bridge expression implementation handling GPU-to-CPU data transfer and parallel evaluation |
| GpuCpuBridgeOptimizer.scala | Optimizer for making bridge vs GPU decisions and merging adjacent bridge expressions |
| GpuCpuBridgeThreadPool.scala | Thread pool implementation with priority queuing and task context propagation |
| various shim files | Method signature updates from convertToGpu() to convertToGpuBase() or convertToGpuImpl() |
| cpu_bridge_test.py | Integration tests for CPU bridge functionality |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
|
build |
|
I did run into one unexpected issue when I tried to run and also NPEs like I am going to spend some time to try and understand this a bit more and see if I can fix it. |
|
I fixed the ScalarSubquery issue so I think this is ready to go. |
|
build |
|
build |
|
build |
|
build |
|
build |
|
build |
zpuller
left a comment
There was a problem hiding this comment.
Still working through it but I had a couple comments for now
| * from SparkPlan nodes. Use the public API that requires metrics instead. | ||
| */ | ||
| def bindGpuReferences[A <: Expression]( | ||
| def bindGpuReferencesInternal[A <: Expression]( |
There was a problem hiding this comment.
nit: can we/should we make this project private?
There was a problem hiding this comment.
We don't want it to be private because there are a few cases, even though they are rare, where we want to call it.
There was a problem hiding this comment.
I meant project private as in it would be callable from (most) RAPIDS code but not from spark itself. But anyway I don't have a strong opinion
| override def prettyName: String = "gpu_cpu_bridge" | ||
|
|
||
| override def toString: String = { | ||
| val gpuInputsStr = if (gpuInputs.nonEmpty) { |
There was a problem hiding this comment.
nit: do we need to check if it's nonEmpty? if we took an empty Seq and did .mkString(", ") wouldn't it just be empty string?
There was a problem hiding this comment.
Good point. Cursor does not always deal with these very well.
| * Takes an iterator of rows and the expected row count, produces a complete | ||
| * GpuColumnVector result. | ||
| */ | ||
| private def createEvaluationFunction( |
There was a problem hiding this comment.
Why does this and the below take a cpuExpression argument, given that cpuExpressions is a field of this class? I could see reasons to do this, but where I get confused is that the threadLocalProjection seems to be based off the field cpuExpression, not the arg
|
build |
|
build |
zpuller
left a comment
There was a problem hiding this comment.
Posted one other minor question, but looking good
| override def call(): T = { | ||
| // Register this thread with RmmSpark for memory tracking if we have a task context | ||
| if (taskContext != null) { | ||
| RmmSpark.currentThreadIsDedicatedToTask(taskContext.taskAttemptId()) |
There was a problem hiding this comment.
Does it make sense to use TaskRegistryTracker here, or do we not want the same behavior eg. retries?
|
what is the state of this? Is it something we should re-review? |
|
NOTE: release/25.12 has been created from main. Please retarget your PR to release/25.12 if it should be included in the release. |
|
Skipped: This PR changes more files than the configured file change limit: ( |
This is part 1 of splitting #13368 into smaller parts ### Description This adds in framework changes to allow an expression to update SQL metrics. The metrics are passed in when the expressions are bound and then can be updated as they run. There are no expressions added, yet that would let us truly test it beyond seeing that it does not break existing functionality. ### Checklists - [ ] This PR has added documentation for new or modified features or behaviors. - [ ] This PR has added new tests or modified existing tests to cover new code paths. (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.) - [ ] Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description. This is an internal only feature so no docs or tests are really needed, beyond verifying that we didn't break anything. Most of the changes are very small, except for the ones in GpuBoundAttribute and GpuMetrics --------- Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
This is part 2 of splitting up #13368 ### Description This adds in a layer of indirection when converting an expression to be on the GPU. It is not used in the PR, but in the final PR it will allow us to wrap CPU expressions in a GPU expression compatibility layer (bridge). ### Checklists - [ ] This PR has added documentation for new or modified features or behaviors. - [ ] This PR has added new tests or modified existing tests to cover new code paths. (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.) - [ ] Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description. No real need for new tests. If they still all pass and everything compiles, then the refactoring is good. --------- Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
This is step 3 in splitting #13368 into smaller pieces ### Description This adds in basic GPU/CPU bridge functionality, but it is off by default because the performance would not be good without the thread pool and optimizer. ### Checklists - [ ] This PR has added documentation for new or modified features or behaviors. - [X] This PR has added new tests or modified existing tests to cover new code paths. (Please explain in the PR description how the new code paths are tested, such as names of the new/existing tests that cover them.) - [ ] Performance testing has been performed and its results are added in the PR description. Or, an issue has been filed with a link in the PR description. The performance is expected to be bad so it is off by default an not tested. I did add some basic tests to verify that the code works. --------- Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
|
NOTE: release/26.02 has been created from main. Please retarget your PR to release/26.02 if it should be included in the release. |
|
NOTE: release/26.04 has been created from main. Please retarget your PR to release/26.04 if it should be included in the release. |
|
NOTE: release/26.06 has been created from main. Please retarget your PR to release/26.06 if it should be included in the release. |
I am going to split this up into smaller pieces and put them up as separate PRs to hopefully get this in over time.
Description
This attempts to make it possible to fall back to the CPU in a more efficient way. In the current code we fall back to the CPU at the level of a
SparkPlannode. So an entireProjectExecorHashAggregateExecwould fall back to the CPU if a single expression in it could not run on the GPU.SparkPlannode to the CPU and then the entire result back to the GPU again.SparkPlannode only a single CPU thread would be used to process the data. This does not scale well, in all cases.The cpu bridge has a thread pool that is used to execute the CPU expressions in parallel moving only the minimal data needed. This allows more of the processing to stay on the GPU, it minimizes data movement, and even though it does not release the semaphore when running on the CPU it offsets this by throwing as many cores at the processing as there are configured tasks.
This does not work for non-deterministic expressions or aggregations.
It is currently off by default for two reasons
From a performance standpoint I have tested it in a few situations.
I also ran some much simpler tests where almost the entire query is a single expression that is not on the GPU.
The bridge version is more memory efficient than falling back for everything, but not quite as good as a pure GPU implementation. There is code to allow us to run the expression as an interpreted expression instead of as code gen, but the performance can vary wildly, and is generally slower than the code gen version. I am happy to rip out the interpreted version as the code gen version can fall back to an interpreted version behind the scenes in Spark. It just requires setting a separate config entirely.
Note: A lot of this code was written with AI (specifically claude-4-sonnet through cursor)
Checklists