Skip to content

Commit 9be8079

Browse files
authored
Merge pull request #11848 from NVIDIA/merge-branch-24.12-to-main
Merge branch-24.12 into main
2 parents 3e24d01 + 8a8bd3d commit 9be8079

File tree

636 files changed

+20375
-5304
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

636 files changed

+20375
-5304
lines changed

.github/workflows/blossom-ci.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ jobs:
7777
github.actor == 'Feng-Jiang28' ||
7878
github.actor == 'SurajAralihalli' ||
7979
github.actor == 'jihoonson' ||
80-
github.actor == 'ustcfy'
80+
github.actor == 'ustcfy' ||
81+
github.actor == 'knoguchi22'
8182
)
8283
steps:
8384
- name: Check if comment is issued by authorized person

.github/workflows/mvn-verify-check.yml

+2-6
Original file line numberDiff line numberDiff line change
@@ -246,12 +246,10 @@ jobs:
246246
echo "Generated Scala 2.13 build files don't match what's in repository"
247247
exit 1
248248
fi
249-
# change to Scala 2.13 Directory
250-
cd scala2.13
251249
# test command, will retry for 3 times if failed.
252250
max_retry=3; delay=30; i=1
253251
while true; do
254-
mvn package \
252+
mvn package -f scala2.13/ \
255253
-pl integration_tests,tests,tools -am -P 'individual,pre-merge' \
256254
-Dbuildver=${{ matrix.spark-version }} -Dmaven.scalastyle.skip=true \
257255
-Drat.skip=true ${{ env.COMMON_MVN_FLAGS }} && break || {
@@ -303,12 +301,10 @@ jobs:
303301
echo "Generated Scala 2.13 build files don't match what's in repository"
304302
exit 1
305303
fi
306-
# change to Scala 2.13 Directory
307-
cd scala2.13
308304
# test command, will retry for 3 times if failed.
309305
max_retry=3; delay=30; i=1
310306
while true; do
311-
mvn verify \
307+
mvn verify -f scala2.13/ \
312308
-P "individual,pre-merge,source-javadoc" -Dbuildver=${{ matrix.spark-version }} \
313309
${{ env.COMMON_MVN_FLAGS }} && break || {
314310
if [[ $i -le $max_retry ]]; then

CHANGELOG.md

+180-210
Large diffs are not rendered by default.

CONTRIBUTING.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ mvn -pl dist -PnoSnapshots package -DskipTests
127127
Verify that shim-specific classes are hidden from a conventional classloader.
128128

129129
```bash
130-
$ javap -cp dist/target/rapids-4-spark_2.12-24.10.1-cuda11.jar com.nvidia.spark.rapids.shims.SparkShimImpl
130+
$ javap -cp dist/target/rapids-4-spark_2.12-24.12.0-cuda11.jar com.nvidia.spark.rapids.shims.SparkShimImpl
131131
Error: class not found: com.nvidia.spark.rapids.shims.SparkShimImpl
132132
```
133133

134134
However, its bytecode can be loaded if prefixed with `spark3XY` not contained in the package name
135135

136136
```bash
137-
$ javap -cp dist/target/rapids-4-spark_2.12-24.10.1-cuda11.jar spark320.com.nvidia.spark.rapids.shims.SparkShimImpl | head -2
138-
Warning: File dist/target/rapids-4-spark_2.12-24.10.1-cuda11.jar(/spark320/com/nvidia/spark/rapids/shims/SparkShimImpl.class) does not contain class spark320.com.nvidia.spark.rapids.shims.SparkShimImpl
137+
$ javap -cp dist/target/rapids-4-spark_2.12-24.12.0-cuda11.jar spark320.com.nvidia.spark.rapids.shims.SparkShimImpl | head -2
138+
Warning: File dist/target/rapids-4-spark_2.12-24.12.0-cuda11.jar(/spark320/com/nvidia/spark/rapids/shims/SparkShimImpl.class) does not contain class spark320.com.nvidia.spark.rapids.shims.SparkShimImpl
139139
Compiled from "SparkShims.scala"
140140
public final class com.nvidia.spark.rapids.shims.SparkShimImpl {
141141
```
@@ -178,7 +178,7 @@ mvn package -pl dist -am -Dbuildver=340 -DallowConventionalDistJar=true
178178
Verify `com.nvidia.spark.rapids.shims.SparkShimImpl` is conventionally loadable:
179179
180180
```bash
181-
$ javap -cp dist/target/rapids-4-spark_2.12-24.10.1-cuda11.jar com.nvidia.spark.rapids.shims.SparkShimImpl | head -2
181+
$ javap -cp dist/target/rapids-4-spark_2.12-24.12.0-cuda11.jar com.nvidia.spark.rapids.shims.SparkShimImpl | head -2
182182
Compiled from "SparkShims.scala"
183183
public final class com.nvidia.spark.rapids.shims.SparkShimImpl {
184184
```

DF_UDF_README.md

+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
# Scala / Java UDFS implemented using data frame
2+
3+
User Defined Functions (UDFs) are used for a number of reasons in Apache Spark. Much of the time it is to implement
4+
logic that is either very difficult or impossible to implement using existing SQL/Dataframe APIs directly. But they
5+
are also used as a way to standardize processing logic across an organization or for code reused.
6+
7+
But UDFs come with some downsides. The biggest one is visibility into the processing being done. SQL is a language that
8+
can be highly optimized. But a UDF in most cases is a black box, that the SQL optimizer cannot do anything about.
9+
This can result in less than ideal query planning. Additionally, accelerated execution environments, like the
10+
RAPIDS Accelerator for Apache Spark have no easy way to replace UDFs with accelerated versions, which can result in
11+
slow performance.
12+
13+
This attempts to add visibility to the code reuse use case by providing a way to implement a UDF in terms of dataframe
14+
commands.
15+
16+
## Setup
17+
18+
The dataframe UDF plugin is packaged in the same jar as the RAPIDS Accelerator for Apache Spark. This jar will need to
19+
be added as a compile time dependency for code that wants to use this feature as well as adding the jar to your Spark
20+
classpath just like you would do for GPU acceleration.
21+
22+
If you plan to not use the GPU accelerated processing, but still want dataframe UDF support on CPU applications then
23+
add `com.nvidia.spark.DFUDFPlugin` to the `spark.sql.extensions` config. If you do use GPU accelerated processing
24+
the RAPIDS Plugin will enable this automatically. You don't need to set the `spark.sql.extensions` config, but it
25+
won't hurt anything if you do add it. Now you can implement a UDF in terms of Dataframe operations.
26+
27+
## Usage
28+
29+
```scala
30+
import com.nvidia.spark.functions._
31+
32+
import org.apache.spark.sql.Column
33+
import org.apache.spark.sql.functions._
34+
35+
val sum_array = df_udf((longArray: Column) =>
36+
aggregate(longArray,
37+
lit(0L),
38+
(a, b) => coalesce(a, lit(0L)) + coalesce(b, lit(0L)),
39+
a => a))
40+
spark.udf.register("sum_array", sum_array)
41+
```
42+
43+
You can then use `sum_array` however you would have used any other UDF. This allows you to provide a drop in replacement
44+
implementation of an existing UDF.
45+
46+
```scala
47+
Seq(Array(1L, 2L, 3L)).toDF("data").selectExpr("sum_array(data) as result").show()
48+
49+
+------+
50+
|result|
51+
+------+
52+
| 6|
53+
+------+
54+
```
55+
56+
Java APIs are also supported and should work the same as Spark's UDFs
57+
58+
```java
59+
import com.nvidia.spark.functions.df_udf
60+
61+
import org.apache.spark.sql.*;
62+
import org.apache.spark.sql.api.java.UDF2;
63+
import org.apache.spark.sql.expressions.UserDefinedFunction;
64+
65+
66+
UserDefinedFunction myAdd = df_udf((Column lhs, Column rhs) -> lhs + rhs)
67+
spark.udf().register("myadd", myAdd)
68+
69+
spark.sql("SELECT myadd(1, 1) as r").show();
70+
// +--+
71+
// | r|
72+
// +--+
73+
// | 2|
74+
// +--+
75+
76+
```
77+
78+
## Type Checks
79+
80+
DataFrame APIs do not provide type safety when writing the code and that is the same here. There are no builtin type
81+
checks for inputs yet. Also, because of how types are resolved in Spark there is no way to adjust the query based on
82+
the types passed in. Type checks are handled by the SQL planner/optimizer after the UDF has been replaced. This means
83+
that the final SQL will not violate any type safety, but it also means that the errors might be confusing. For example,
84+
if I passed in an `ARRAY<DOUBLE>` to `sum_array` instead of an `ARRAY<LONG>` I would get an error like
85+
86+
```scala
87+
Seq(Array(1.0, 2.0, 3.0)).toDF("data").selectExpr("sum_array(data) as result").show()
88+
org.apache.spark.sql.AnalysisException: [DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE] Cannot resolve "aggregate(data, 0, lambdafunction((coalesce(namedlambdavariable(), 0) + coalesce(namedlambdavariable(), 0)), namedlambdavariable(), namedlambdavariable()), lambdafunction(namedlambdavariable(), namedlambdavariable()))" due to data type mismatch: Parameter 3 requires the "BIGINT" type, however "lambdafunction((coalesce(namedlambdavariable(), 0) + coalesce(namedlambdavariable(), 0)), namedlambdavariable(), namedlambdavariable())" has the type "DOUBLE".; line 1 pos 0;
89+
Project [aggregate(data#46, 0, lambdafunction((cast(coalesce(lambda x_9#49L, 0) as double) + coalesce(lambda y_10#50, cast(0 as double))), lambda x_9#49L, lambda y_10#50, false), lambdafunction(lambda x_11#51L, lambda x_11#51L, false)) AS result#48L]
90+
+- Project [value#43 AS data#46]
91+
+- LocalRelation [value#43]
92+
93+
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.dataTypeMismatch(package.scala:73)
94+
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5(CheckAnalysis.scala:269)
95+
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$5$adapted(CheckAnalysis.scala:256)
96+
```
97+
98+
Which is not as simple to understand as a normal UDF.
99+
100+
```scala
101+
val sum_array = udf((a: Array[Long]) => a.sum)
102+
103+
spark.udf.register("sum_array", sum_array)
104+
105+
Seq(Array(1.0, 2.0, 3.0)).toDF("data").selectExpr("sum_array(data) as result").show()
106+
org.apache.spark.sql.AnalysisException: [CANNOT_UP_CAST_DATATYPE] Cannot up cast array element from "DOUBLE" to "BIGINT".
107+
The type path of the target object is:
108+
- array element class: "long"
109+
- root class: "[J"
110+
You can either add an explicit cast to the input data or choose a higher precision type of the field in the target object
111+
at org.apache.spark.sql.errors.QueryCompilationErrors$.upCastFailureError(QueryCompilationErrors.scala:285)
112+
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveUpCast$$fail(Analyzer.scala:3646)
113+
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$57$$anonfun$applyOrElse$234.applyOrElse(Analyzer.scala:3677)
114+
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$57$$anonfun$applyOrElse$234.applyOrElse(Analyzer.scala:3654)
115+
```
116+
117+
We hope to add optional type checks in the future.

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ as a `provided` dependency.
7373
<dependency>
7474
<groupId>com.nvidia</groupId>
7575
<artifactId>rapids-4-spark_2.12</artifactId>
76-
<version>24.10.1</version>
76+
<version>24.12.0</version>
7777
<scope>provided</scope>
7878
</dependency>
7979
```

0 commit comments

Comments
 (0)