Skip to content

Support UDAF on GPU[databricks]#13450

Closed
firestarman wants to merge 25 commits into
NVIDIA:mainfrom
firestarman:support-udaf
Closed

Support UDAF on GPU[databricks]#13450
firestarman wants to merge 25 commits into
NVIDIA:mainfrom
firestarman:support-udaf

Conversation

@firestarman

@firestarman firestarman commented Sep 19, 2025

Copy link
Copy Markdown
Collaborator

close #13451
close #13452
contributes to #13412

This PR adds in the GPU support for UDAF by introducing a RAPIDS accelerated interface (RapidsUDAF).

Users can choose to implement this GPU preferred interface to move the Spark UserDefinedAggregateFunction(deprecated), Aggregator and Hive AbstractGenericUDAFResolver on to GPU.

(I know this is a quite large PR, hope I could make it small.)

Perf Regerssion

No obvious perf regression is found according to the NDS runs as below: (in seconds)

Runs With UDAF No UDAF
1 1499.306 1544.629
2 1567.290 1528.895
avg. 1533.298 1536.762

Perf Improvement

The following performance numbers are got from a local test. (in seconds)

Runs CPU avg UDAF GPU avg UDAF GPU avg Function
1 109.504 19.452 18.267
2 110.575 18.100 18.219
avg 110.040 18.78 18.243
  • The UDAF implementation is from the file "ScalaUDAFSuite.scala"
  • Titan V and 10 CPU cores

Test dataset schema:

scala> sql("select * from udaf_perf_test").printSchema
root
 |-- vid: long (nullable = true)
 |-- click_cnt: integer (nullable = true)

Test dataset size:

user:/bigdata/test$ du -d1 -h udaf_perf/
  23G	udaf_perf/

More details:

scala> spark.conf.set("spark.rapids.sql.enabled", "false")

scala> spark.time(sql("select vid, intAverage(click_cnt) from udaf_perf_test group by vid").write.mode("overwrite").parquet("/bigdata/tmp/out/cpu"))
Time taken: 109504 ms                                                           

scala> spark.time(sql("select vid, intAverage(click_cnt) from udaf_perf_test group by vid").write.mode("overwrite").parquet("/bigdata/tmp/out/cpu"))
Time taken: 110575 ms                                                           

scala> spark.conf.set("spark.rapids.sql.enabled", "true")

scala> spark.time(sql("select vid, intAverage(click_cnt) from udaf_perf_test group by vid").write.mode("overwrite").parquet("/bigdata/tmp/out/gpu_avg_udaf"))
Time taken: 19452 ms

scala> spark.time(sql("select vid, intAverage(click_cnt) from udaf_perf_test group by vid").write.mode("overwrite").parquet("/bigdata/tmp/out/gpu_avg_udaf"))
Time taken: 18100 ms                                                            

scala> spark.time(sql("select vid,        avg(click_cnt) from udaf_perf_test group by vid").write.mode("overwrite").parquet("/bigdata/tmp/out/gpu_avg_func"))
Time taken: 18267 ms                                                            

scala> spark.time(sql("select vid,        avg(click_cnt) from udaf_perf_test group by vid").write.mode("overwrite").parquet("/bigdata/tmp/out/gpu_avg_func"))
Time taken: 18219 ms        

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>

@revans2 revans2 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get through everything yet.

InSubqueryExec,S, ,None,project,input,S,S,S,S,S,S,S,S,PS,S,S,S,NS,NS,NS,NA,NS,NS,NA,NA
InSubqueryExec,S, ,None,project,result,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA
ScalarSubquery,S, ,None,project,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS,NS,NS
ScalaAggregator,S, ,None,aggregation,param,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,NS,NS,NS

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amahussein is this going to need some changes to tools? I am concerned if we think that we can support all of these expressions, when it reality it is only some of the time that we can.

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman firestarman changed the title Support UDAF on GPU Support UDAF on GPU[databricks] Sep 24, 2025
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman

Copy link
Copy Markdown
Collaborator Author

build

@firestarman

Copy link
Copy Markdown
Collaborator Author

build

@firestarman firestarman changed the base branch from branch-25.10 to branch-25.12 September 25, 2025 01:50
@firestarman

Copy link
Copy Markdown
Collaborator Author

Retarget to 25.12, since this big PR may need much review.

@sperlingxx sperlingxx left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@firestarman Asking as a reviewer with little context, could you please give a brief summary about why we need to add GpuAdvancedAggregate*** and, more importantly, make AggregateExec much more complex (which is already super complex IMHO) to support UDAFs?

@firestarman

firestarman commented Sep 25, 2025

Copy link
Copy Markdown
Collaborator Author

@firestarman Asking as a reviewer with little context, could you please give a brief summary about why we need to add GpuAdvancedAggregate*** and, more importantly, make AggregateExec much more complex (which is already super complex IMHO) to support UDAFs?

Thanks for review.
This is just a solution i chose to implement the RapidUDAF, and we can NOT do it without changing the GpuAggregateExec, i think.

  • I tried the existing CudfAggregate, but it accepts only one input and one output. The new introduced AdvancedCudfAggregate plays the same role as CudfAggregate, but accepts multiple inputs and outputs, to match the UDAF requirements.
  • There is a real advanced version of aggregate interface named "RapidsAdvancedGroupByAggregation", which can not be supported by the current Aggregate process, unless making change in the GpuAggregateExec.
  • I want to separate the UDAF code away from the current aggregate code as much as possible.
  • GpuAdvancedAggregate is just a quite thin wrapper of the UDAF. And I do NOT want to put the classes named like "xxxUDAFxxx" (which is too specific) into the GpuAggregateExec.
  • Maybe in the future, we can have other children of GpuAdvancedAggregate to support other features, even it is designed for UDAF now.

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
@firestarman

Copy link
Copy Markdown
Collaborator Author

build

// 2) "argsCb" is the batch of the advanced input, have it being processed by
// advanced preSteps to get the final results.
// 3) Append the final results to "nonArgsCb".
val (nonArgsCb, argsCb) = withResource(projectedCb) { _ =>

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we have to go through the sliceColumns and preProcessForAdvancedAggsAndClose in the common case where advanced aggregates are absent.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. updated

metrics: GpuHashAggregateMetrics): SpillableColumnarBatch = {
val postProcessed = NvtxRegistry.AGG_POST_PROCESS {
postStepBound.projectAndCloseWithRetrySingleBatch(aggregatedSpillable)
val advCols = postProcessForAdvancedAggs(aggregatedSpillable, advArgLens)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same suggestion: make postProcessForAdvancedAggs an optional path only for non empty advArgLens

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. updated

// 2) perform the aggregation
// OOM retry means we could get a list of batches
val aggregatedSpillable = aggregate(helper, preProcessed, metrics)
val (aggregatedSpillable, advLens) = aggregate(helper, preProcessed, advArgLens, metrics)

@sperlingxx sperlingxx Oct 28, 2025

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether it works or not, can we treat advArgLens as a mutable state like advCudfAggregates instead of passing them across everywhere? Considering it is a single thread and singleton scenario?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Different methods may return different sizes in advArgLens. Personally it is not a good idea to use a shared variable for all the cases, which is problem-prone.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really like to not pass advArgLens everywhere.

Different methods may return different sizes in advArgLens

So are there a handful of different options? We need to define the different meanings of this and should call them different things instead of reusing the same name. And can't they be part of the state instead of being function arguments?

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This PR introduces GPU acceleration support for User-Defined Aggregate Functions (UDAFs) through a new RapidsUDAF interface. Users can implement this interface to move Spark's deprecated UserDefinedAggregateFunction, Aggregator, and Hive's AbstractGenericUDAFResolver to GPU execution. The implementation adds a new aggregation pipeline with four customizable stages (preProcess, updateAggregation, mergeAggregation, postProcess) that integrates with the existing GpuHashAggregateExec framework. The feature maintains backward compatibility with CPU execution paths through buffer conversion utilities and includes configuration options to enable/disable GPU acceleration per UDAF type. Performance testing shows approximately 6x speedup over CPU UDAFs (18.78s vs 110.04s) with no measurable regression on non-UDAF workloads (NDS runs average ~1533s both with and without the feature).

The change spans multiple layers: new public API interfaces (RapidsUDAF, RapidsUDAFGroupByAggregation, RapidsSimpleGroupByAggregation), plugin-side implementation classes (GpuAdvancedAggregateFunction, UDAFCudfAggregate), expression metadata registration (GpuUDAFMeta), buffer converters for CPU-GPU transitions, Hive UDAF support (GpuHiveUDAFFunction), comprehensive test suites, and integration test utilities. The architecture separates "advanced" (UDAF) and built-in aggregates during execution, processes them independently, then merges results back to Spark's expected column ordering.

Important Files Changed

Filename Score Overview
sql-plugin-api/src/main/java/com/nvidia/spark/RapidsUDAF.java 4/5 Introduces the core public API interface for GPU-accelerated UDAFs with four-stage aggregation pipeline
sql-plugin-api/src/main/java/com/nvidia/spark/RapidsUDAFGroupByAggregation.java 3/5 Defines group-by aggregation contract with resource ownership inconsistencies in javadoc
sql-plugin-api/src/main/java/com/nvidia/spark/RapidsSimpleGroupByAggregation.java 4/5 Simple interface exposing cuDF built-in aggregations for basic UDAF implementations
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/udaf.scala 4/5 Core implementation of advanced aggregation traits, wrapper classes, and buffer converters
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala 3/5 Complex modifications to aggregation execution engine with extensive resource management and column reordering logic
sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/hiveUDFs.scala 3/5 Adds Hive UDAF support with reflection-based buffer type extraction and bidirectional buffer converters
sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/HiveProviderImpl.scala 3/5 Registers Hive UDAF expression metadata with unconditional buffer conversion support flag
tests/src/test/scala/com/nvidia/spark/rapids/ScalaAggregatorSuite.scala 1/5 Contains critical bugs: NPEs in CPU reduce/merge methods and GPU memory leaks in scalar allocations
tests/src/test/scala/com/nvidia/spark/rapids/ScalaUDAFSuite.scala 5/5 Comprehensive test suite with sound resource management demonstrating UDAF implementation patterns
integration_tests/src/main/python/hive_udaf_test.py 2/5 Integration tests with incorrect with_spark_session usage that will fail at runtime
integration_tests/src/main/python/hive_udf_utils.py 3/5 Utility module with SQL injection vulnerabilities in UDF loading functions
integration_tests/src/main/java/com/nvidia/spark/rapids/tests/udf/hive/IntLongAverageHiveUDAF.java 4/5 Dual CPU/GPU Hive UDAF implementation with minor documentation issues and potential null-sum semantic concerns
sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java 4/5 Adds type-safe scalar conversion and column slicing utilities for UDAF buffer manipulation
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDF.scala 5/5 Generalizes RAPIDS function instance detection to support both UDF and UDAF interfaces
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala 4/5 Registers two new UDAF expression meta classes for Scala Aggregator and UserDefinedAggregateFunction
docs/additional-functionality/advanced_configs.md 5/5 Documents three new configuration options for ScalaAggregator, ScalaUDAF, and HiveUDAFFunction
tests/src/test/scala/com/nvidia/spark/rapids/HashAggregateRetrySuite.scala 5/5 Mechanical adaptation to new aggregate method signature returning tuples
tests/src/test/scala/com/nvidia/spark/SparkQueryCompareTestSuite.scala 4/5 Adds test utility methods for DataFrame creation with duplicate definition issue
tests/src/test/spark330/scala/org/apache/spark/sql/rapids/utils/RapidsTestSettings.scala 4/5 Broadens Java version exclusion threshold from 17 to 11 for specific test
integration_tests/src/main/python/row-based_udf_test.py 5/5 Refactors Hive UDF utilities into shared module following DRY principles

Confidence score: 3/5

  • This PR introduces significant complexity with multiple critical issues that need resolution before merge, though the core design and performance benefits are sound
  • Score reflects critical bugs in ScalaAggregatorSuite (NPEs and memory leaks), incorrect test setup in hive_udaf_test.py, SQL injection vulnerabilities in hive_udf_utils.py, documentation inconsistencies in resource ownership across API interfaces, reflection-based brittleness in Hive UDAF support, and edge cases in buffer conversion logic that could cause data corruption during CPU-GPU fallback scenarios
  • Pay close attention to tests/src/test/scala/com/nvidia/spark/rapids/ScalaAggregatorSuite.scala (lines 108-123, 137-188, 217-221), integration_tests/src/main/python/hive_udaf_test.py (lines 42, 55, 69), integration_tests/src/main/python/hive_udf_utils.py (lines 18-30), sql-plugin-api interface javadocs for resource ownership clarification, and sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/hiveUDFs.scala reflection logic

20 files reviewed, 26 comments

Edit Code Review Agent Settings | Greptile

Comment on lines +1318 to +1320
def emptyRowsDf(session: SparkSession, schema: StructType): DataFrame = {
session.createDataFrame(session.sparkContext.parallelize(Seq.empty[Row], 2), schema)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: duplicate of existing method at lines 2318-2320 - this new method definition appears redundant

Comment thread integration_tests/src/main/python/hive_udf_utils.py Outdated
Comment thread integration_tests/src/main/python/hive_udf_utils.py Outdated


def drop_udf(spark, udf_name):
spark.sql("DROP TEMPORARY FUNCTION IF EXISTS {}".format(udf_name))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: use f-string for consistency with modern Python formatting

@ignore_order(local=True)
@pytest.mark.parametrize("aggs", projected_aggs_list, ids=idfn)
def test_groupby_with_hive_average_udaf(aggs):
with_spark_session(skip_if_no_hive)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syntax: missing invocation of skip_if_no_hive - should be with_spark_session(skip_if_no_hive)(lambda spark: ...)


// Avoid leaks even if there is an exception when merging countCol
closeOnExcept(sumCol.sum()) { mergedSum =>
val mergedCount = countCol.sum()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Memory leak: mergedCount scalar is not closed

Comment on lines +164 to +171
override def aggBufferAttribute: AttributeReference = {
opRapidsFunc.map { rapidsUDAF =>
AdvAggTypeUtils.attrFromTypes(expr.name, rapidsUDAF.aggBufferTypes())
}.getOrElse(
// opRapidsFunc is None, so it will fallback to CPU, use the CPU one.
expr.aggBufferAttributes.head
)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: If opRapidsFunc is None, the code falls back to CPU aggBufferAttribute, yet tagAggForGpu already marks it as willNotWorkOnGpu. This path should never execute on GPU. Consider adding an assertion or throwing an exception if GPU execution is attempted. Is there a scenario where opRapidsFunc.isEmpty is true but the expression still reaches aggBufferAttribute on GPU?

a.isUDAFBridgeRequired)
}

override val supportBufferConversion: Boolean = true

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Setting supportBufferConversion = true unconditionally, but converters are only valid when opRapidsFunc.isDefined. If UDAF doesn't implement RapidsUDAF, buffer conversion should not be supported.

def infer(scalar: Scalar): DataType = scalar.getType match {
case DType.LIST =>
val childType = withResource(scalar.getListAsColumnView)(infer)
ArrayType(childType, !scalar.isValid)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: nullable inferred as inverse of scalar validity, should also check if child type itself has nullability

def infer(col: ColumnView): DataType = col.getType match {
case DType.LIST =>
val childType = withResource(col.getChildColumnView(0))(infer)
ArrayType(childType, col.getNullCount > 0)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: nullable determined by current null count, but col may have capacity > null count yet still be nullable by schema

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This review covers only the changes made since the last review, not the entire PR. This change updates the Python utility function drop_udf in hive_udf_utils.py to use backtick-escaped SQL identifiers (switching from .format() to f-string syntax), ensuring that UDF names containing special characters or matching SQL reserved keywords are handled correctly during DROP operations. The modification is a narrow defensive fix for UDF name sanitization in the test infrastructure that supports the broader UDAF-on-GPU feature being introduced in this PR. While this brings Python code formatting in line with modern standards and prevents identifier-related SQL errors, the change does not extend the same protection to load_hive_udf, leaving an inconsistency in SQL identifier handling across the utility module.

Important Files Changed

Filename Score Overview
integration_tests/src/main/python/hive_udf_utils.py 3/5 Added backtick escaping to drop_udf UDF name for SQL identifier safety, but did not apply the same protection to load_hive_udf

Confidence score: 3/5

  • This change is relatively safe but has a minor inconsistency that could cause issues in edge cases
  • Score reflects incomplete application of SQL identifier escaping: drop_udf now uses backticks but load_hive_udf still uses unescaped string interpolation for both udf_name and udf_class, creating an inconsistency in how UDF identifiers are handled and leaving potential for SQL injection or syntax errors if non-standard UDF names or class paths are used
  • Pay close attention to integration_tests/src/main/python/hive_udf_utils.py line 30 where udf_name and udf_class remain unescaped in the CREATE statement

1 file reviewed, no comments

Edit Code Review Agent Settings | Greptile

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This review covers only the changes made since the last review, not the entire PR. The developer has addressed the SQL injection vulnerability concern raised in previous reviews by wrapping UDF names in backticks in the hive_udf_utils.py file. This change ensures that user-provided UDF names containing special characters or SQL reserved keywords are properly escaped when creating or dropping temporary Hive UDFs. The backtick syntax is the standard SQL identifier quoting mechanism in Spark SQL and prevents parsing errors without introducing injection risks. The modification also modernizes the string formatting to use f-strings consistently across both functions. This small but important fix ensures the new UDAF GPU acceleration feature (the main purpose of this PR) works reliably with arbitrary UDF naming conventions that users might employ.

Important Files Changed

Filename Score Overview
integration_tests/src/main/python/hive_udf_utils.py 5/5 Added backtick escaping around UDF names in SQL statements to handle special characters and reserved keywords safely

Confidence score: 5/5

  • This change is safe to merge with minimal risk and properly addresses the SQL injection concern from previous reviews
  • Score reflects the simplicity of the change, correct implementation of SQL identifier escaping, and elimination of the security concern without introducing new issues
  • No files require special attention; the change is straightforward and well-contained

1 file reviewed, no comments

Edit Code Review Agent Settings | Greptile

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

@greptile-apps greptile-apps Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This review covers only the changes made since the last review, not the entire PR. The latest change is a single-line type conversion in GpuAggregateExec.scala (line 763), where outCols (an ArrayBuffer[Array[GpuColumnVector]]) is explicitly converted to outCols.toSeq before being passed to mergeWithOriginalOrderAndClose. This change ensures type compatibility with the method signature, which expects Seq[Array[GpuColumnVector]], and guarantees immutability of the collection. The conversion is part of the broader UDAF support feature that handles reordering and merging of columns from built-in and advanced aggregates. Since ArrayBuffer is mutable, converting to Seq prevents accidental mutation during the merge operation and aligns with functional programming best practices in the codebase.

Important Files Changed

Filename Score Overview
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuAggregateExec.scala 5/5 Converted outCols to outCols.toSeq for type safety and immutability when passing to merge method

Confidence score: 5/5

  • This change is safe to merge with minimal risk.
  • The score reflects a simple, explicit type conversion that improves type safety without altering logic. The change is well-localized, converts a mutable ArrayBuffer to an immutable Seq as expected by the callee, and has been validated with performance testing showing no regression.
  • No files require special attention.

1 file reviewed, no comments

Edit Code Review Agent Settings | Greptile

@firestarman

Copy link
Copy Markdown
Collaborator Author

build

* arguments passed in.
* <br/>
* Users should close the input columns to avoid GPU memory leak, while the
* returned columns will be closed by the Rapids automatically.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* returned columns will be closed by the Rapids automatically.
* returned columns will be closed automatically.

* result of the aggregation.
* <br/>
* Users should close the input columns to avoid GPU memory leak. But the
* returned column will be closed by the Rapids automatically.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are nits, but "the Rapids" doesn't make it clear that it's the plugin, or normal processing. I suggest we omit that from the comments.

Suggested change
* returned column will be closed by the Rapids automatically.
* returned column will be closed automatically.

/**
* Co-work with a GpuAdvancedAggregateFunction to customize the aggregate computation.
*/
trait AdvancedCudfAggregate extends Serializable {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a fan of "advanced", as it doesn't really say why it is different than the regular kind.

Should this interface be the super class of the simpler one? I think so. @firestarman can you add some comments here in the review? not in the code yet, as I am trying to wrap my head around why there is an advanced and simple.

keyOffsets: ColumnVector,
groupedData: Array[GpuColumnVector]): Array[GpuColumnVector] = {
// Should not come here, just in case
throw new UnsupportedOperationException("`RapidsAdvancedGroupByAggregation`" +

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this not supported?

override val name: String = aggregatorName.getOrElse(function.getClass.getSimpleName)
}

case class C2gUDAFBufferTransition(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment about what C2g is?

@abellina abellina Oct 29, 2025

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this is CpuToGpu? Lets spell that out please.

}
}

case class G2cUDAFBufferTransition(child: Expression) extends GpuToCpuBufferTransition {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, GpuToCpu is not that long.

@abellina abellina added the performance A performance related task/issue label Oct 29, 2025
postStepAttr ++= groupingAttributes
postStepDataTypes ++=
groupingExpressions.map(_.dataType)
postStepDataTypes ++= groupingExpressions.map(_.dataType)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary change

@abellina abellina left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need a lot more time in GpuAggregateExec, and I'd like us to consider ways to make the changes the smallest subset possible.

groupingExpressions.map(_.dataType)
postStepDataTypes ++= groupingExpressions.map(_.dataType)

private def addAdvancedAgg(advAgg: AdvancedCudfAggregate,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok so is "advanced" the same as "user defined"? Can we just call them UserDefinedCudfAggregate?

SpillableColumnarBatch(
projectedCb,
SpillPriorities.ACTIVE_BATCHING_PRIORITY)
val (retCb, outLens) = if (advPreStepAndArgLens.nonEmpty) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is outLens? Is it size in bytes? Number of rows? number of aggregates?


def aggregate(preProcessed: ColumnarBatch, numAggs: GpuMetric): ColumnarBatch = {
def aggregate(preProcessed: ColumnarBatch,
advArgLens: Seq[Int], numAggs: GpuMetric): (ColumnarBatch, Seq[Int]) = {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish we didn't change all these functions to pass advArgLens

// 2) perform the aggregation
// OOM retry means we could get a list of batches
val aggregatedSpillable = aggregate(helper, preProcessed, metrics)
val (aggregatedSpillable, advLens) = aggregate(helper, preProcessed, advArgLens, metrics)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would really like to not pass advArgLens everywhere.

Different methods may return different sizes in advArgLens

So are there a handful of different options? We need to define the different meanings of this and should call them different things instead of reusing the same name. And can't they be part of the state instead of being function arguments?

@abellina

abellina commented Oct 29, 2025

Copy link
Copy Markdown
Collaborator

@firestarman this great work and I'd like for us to be able to give it a full review. I would like to suggest breaking up this PR into multiple different smaller PRs, and have people focused on one step at a time. Here's my proposal, let me know what you think.

  1. Lets do one PR that is only the new interfaces so we can comment on the user-defined UDFs and what that API should be. Any documentation around the interfaces goes here. We don't PR anything else until this one merges. We should discuss how these interfaces are different than the current agg apis, and why we are not rewriting all aggregates to the new interfaces. This will allow us to settle on naming, documentation.
  2. We do a single PR that focuses on core: GpuAggregateExec, GpuOverrides, GpuColumnVector. We need to make sure we don't overcomplicate the aggregate.
  3. One PR is about the hive changes (hive/rapids package) and udaf.scala.
  4. We do a single PR with all the testing.

@firestarman firestarman marked this pull request as draft October 30, 2025 08:41
@firestarman

firestarman commented Nov 5, 2025

Copy link
Copy Markdown
Collaborator Author

@firestarman this great work and I'd like for us to be able to give it a full review. I would like to suggest breaking up this PR into multiple different smaller PRs, and have people focused on one step at a time. Here's my proposal, let me know what you think.

  1. Lets do one PR that is only the new interfaces so we can comment on the user-defined UDFs and what that API should be. Any documentation around the interfaces goes here. We don't PR anything else until this one merges. We should discuss how these interfaces are different than the current agg apis, and why we are not rewriting all aggregates to the new interfaces. This will allow us to settle on naming, documentation.
  2. We do a single PR that focuses on core: GpuAggregateExec, GpuOverrides, GpuColumnVector. We need to make sure we don't overcomplicate the aggregate.
  3. One PR is about the hive changes (hive/rapids package) and udaf.scala.
  4. We do a single PR with all the testing.

Thx a lot for the suggestion. I am concerning if it would be good to post a pure API definition PR without any relevant functionality enabled. And we already have a doc reviewed for the API definition.

I agree to split this PR into small ones. Personally it will be nice and recommended to have each small PR runnable and testable. So here is my idea.

  1. A PR covers API defintions, and the basic implementation for only ScalaUDAF (including changes in GpuAggregateExec, GpuOverrides, GpuColumnVector), along with the tests.
  2. A new PR adds in the support for ScalaAggregator and the tests
  3. A new PR adds in the support for Hive UDAFs and the tests
  4. A new PR enables the compatibility with CPU execs for ScalaAggregator and Hive UDAFs to support the mixed aggregates, along with the tests.

@abellina Let's me know if you have any comments.

@nvauto

nvauto commented Nov 17, 2025

Copy link
Copy Markdown
Collaborator

NOTE: release/25.12 has been created from main. Please retarget your PR to release/25.12 if it should be included in the release.

firestarman added a commit that referenced this pull request Dec 3, 2025
Contributes to #13412

Rapids UDAF is designed to support executing an UDAF (User Defined
Aggregate Function) in the columnar way to get accelerated by GPU.

Complete support of RapidsUDAF covers too many things and a single PR
(#13450) is too large to
review. So instead it's better to be added in piece by piece, and this
PR is the first one who only introduces the relevant inerfaces.

- `RapidsUDAF`- the top interface, it defines 5 methods as below, trying
to follow the [CPU definitions
](https://github.com/apache/spark/blob/v3.5.0/sql/core/src/main/scala/org/apache/spark/sql/expressions/udaf.scala#L35)(`UserDefinedAggregateFunction`)
as much as possible to minimize users' learning effort.

   | RapidsUDAF |  UserDefinedAggregateFunction | 
   | :---- | :---- |
   | getDefaultValue  | initialize  |
   | updateAggregation | update |
   | mergeAggregation | merge |
   | getResult | evaluate |
   | bufferTypes | bufferSchema |

`updateAggregation` and `mergeAggregation` return a
`RapidsUDAFGroupByAggregation` who contains the APIs to perform the
aggregation.
- `RapidsUDAFGroupByAggregation` - base interface for GPU-accelerated
UDAF aggregation implementations. It provides the contract for different
aggregation strategies. it also supports an optional pair of `preStep`
and `postStep` to run some transformations before and after a
"reduce/aggregate" operation, similar as "preMerge" and "postMerge" for
the merge-stage aggregate in `GpuAggregateFunction`.
- `RapidsSimpleGroupByAggregation` - the child class of
`RapidsUDAFGroupByAggregation`, providing a standard cuDF-based
aggregation step that uses built-in cuDF aggregation operations.

Putting the groupby 'aggregate' API in the child class is because more
types of `aggregate` may be introduced in the future via child classes.
e.g. an `aggregate` as below can access the grouped data and keys to let
users do more customization.
```
  /**
   * Performs custom aggregation on data that has been grouped by keys.
   * The data is grouped, with offsets indicating group boundaries.
   * @param keyOffsets A ColumnVector containing the start offset for each group.
   *                   The end offset for group i is `keyOffsets[i+1]` (or total
   *                   rows for the last group).
   * @param groupedData An array of ColumnVectors containing the actual data
   *                    columns, sorted and organized by the grouping keys.
   * @return An array of ColumnVectors with one row per group, containing the
   * aggregated results.
   */
  ColumnVector[] aggregateGrouped(ColumnVector keyOffsets, ColumnVector[] groupedData);
```

---------

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
@firestarman

Copy link
Copy Markdown
Collaborator Author

Close this since we are adding RapidsUDAF in piece by piece instead of this single big PR.

@firestarman firestarman closed this Dec 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature request New feature or request performance A performance related task/issue

Projects

None yet

7 participants