Support write executors for noop format DataFrame writes [databricks]#13314
Support write executors for noop format DataFrame writes [databricks]#13314revans2 wants to merge 4 commits into
Conversation
This change adds support for writing to the "noop" format on the GPU. This is implemented by adding GPU versions of `OverwriteByExpressionExec` and `AppendDataExec` that simply consume the input data without writing anything. The following save modes are supported: - overwrite - append - ignore - errorifexists Integration tests have been added to verify the new functionality.
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
There was a problem hiding this comment.
Pull Request Overview
This PR adds support for V2 write operations (AppendDataExec and OverwriteByExpressionExec) for Spark 3.3.0+ when using the noop format. The implementation only supports NoopWrite operations, which consume and discard data rather than writing to a real data source.
- Adds GPU support for
AppendDataExecandOverwriteByExpressionExecfor Spark 3.3.0+ - Implements GPU noop writer executors that consume and discard data
- Updates support configuration files across all relevant Spark versions (3.3.0 to 3.5.6)
Reviewed Changes
Copilot reviewed 42 out of 42 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| Multiple supportedExecs.csv files | Adds AppendDataExec and OverwriteByExpressionExec entries with support flags |
| Multiple operatorsScore.csv files | Adds score entries (3.0) for the new executor types |
| Spark330PlusShims.scala | Registers GPU rules for the new V2 write executors |
| NoopWriteMeta.scala | Meta classes for tagging and converting V2 write operations |
| GpuNoopWriterExec.scala | GPU implementations that consume and discard data |
| noop_write_test.py | Integration test for noop write operations |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| val newExecs: Seq[(Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan])] = Seq( | ||
| (overwriteByExpressionRule.getClassFor.asSubclass(classOf[SparkPlan]), | ||
| overwriteByExpressionRule), | ||
| (appendDataRule.getClassFor.asSubclass(classOf[SparkPlan]), appendDataRule) |
There was a problem hiding this comment.
[nitpick] The comment explains why explicit casting is needed, but the implementation uses verbose type casting. Consider simplifying by using a helper method or extracting the pattern into a reusable function.
| (appendDataRule.getClassFor.asSubclass(classOf[SparkPlan]), appendDataRule) | |
| // Helper to cast the class to the required type | |
| private def asSparkPlanClass[T <: SparkPlan](rule: ExecRule[T]): Class[_ <: SparkPlan] = | |
| rule.getClassFor.asSubclass(classOf[SparkPlan]) | |
| val newExecs: Seq[(Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan])] = Seq( | |
| (asSparkPlanClass(overwriteByExpressionRule), overwriteByExpressionRule), | |
| (asSparkPlanClass(appendDataRule), appendDataRule) |
| private val noopClassNames = Seq("org.apache.spark.sql.execution.datasources.noop.NoopWrite$") | ||
|
|
||
| def isNoopWrite(write: Write): Boolean = { | ||
| noopClassNames.contains(write.getClass.getName) |
There was a problem hiding this comment.
Using string-based class name comparison for detecting NoopWrite is fragile and could break if the class is renamed or moved. Consider using isInstanceOf or class comparison if possible.
| noopClassNames.contains(write.getClass.getName) | |
| private val noopWriteClassName = "org.apache.spark.sql.execution.datasources.noop.NoopWrite$" | |
| def isNoopWrite(write: Write): Boolean = { | |
| try { | |
| val noopWriteClass = Class.forName(noopWriteClassName) | |
| noopWriteClass.isInstance(write) | |
| } catch { | |
| case _: ClassNotFoundException => false | |
| } |
|
|
||
| override def run(): Seq[InternalRow] = { | ||
| child match { | ||
| case g: GpuExec => g.executeColumnar().foreach(_.close()) |
There was a problem hiding this comment.
[nitpick] The pattern matching logic duplicates the consuming behavior. Consider extracting the consumption logic into a helper method to reduce duplication between run() and internalDoExecuteColumnar().
| case g: GpuExec => g.executeColumnar().foreach(_.close()) | |
| case g: GpuExec => consumeAndCloseColumnarBatches(g.executeColumnar()) |
|
build |
|
|
||
| trait NoopWriteHelper { | ||
| // NoopTable is a private class, so we have to use reflection | ||
| private val noopClassNames = Seq("org.apache.spark.sql.execution.datasources.noop.NoopWrite$") |
There was a problem hiding this comment.
Understandably not a performance issue since there is only one element, but logically it should be a Set
|
We did run into an error on databricks It looks like we fell back to the CPU for the plan in some way, but xdist + pytest did not capture/print enough information in the test to say WHY it fell back. As this is a lower priority and mostly a test to see if AI could handle this functionality I am not inclined to spend a lot of time on it with databricks. So we could adjust the tests so that it is clear it is not supported on databricks and come back later with a follow on issue, or we could just keep the PR up as a starting point for others. |
|
NOTE: release/25.12 has been created from main. Please retarget your PR to release/25.12 if it should be included in the release. |
|
NOTE: release/26.02 has been created from main. Please retarget your PR to release/26.02 if it should be included in the release. |
|
NOTE: release/26.04 has been created from main. Please retarget your PR to release/26.04 if it should be included in the release. |
|
NOTE: release/26.06 has been created from main. Please retarget your PR to release/26.06 if it should be included in the release. |
This is an experiment like #13234 was. This one used jules.google.com instead of copilot. This one also failed, but it got much closer to a workable solution and I was able to take it across the finish line.
This fixes #13074
Description
This adds in support for V2 writes for Spark 3.3.0+ for noop write support.
Checklists
This PR has:
Please select one of the following options: