Skip to content

Commit d702c70

Browse files
pang-wucarsonwang
authored andcommitted
Fix Spark 3.5.x support + support Spark 3.4.3, deprecate python 3.7+support python 3.10 (#411)
* support 3.4.1 * Support Spark 3.5.0 * support 3.4.3 * support 3.5.1 * build and test with python 3.10 deprecate spark support prior to 3.4 * deprecate databricks.koalas * fix test * Fix spark 3.5.x support * fix lint * Fix spark 3.5.x support * final touch * pin numpy < 2.0.0 and pyarrow < 15.0.0
1 parent b08439e commit d702c70

File tree

21 files changed

+112
-56
lines changed

21 files changed

+112
-56
lines changed

.github/workflows/pypi.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ jobs:
3434
runs-on: ubuntu-latest
3535
steps:
3636
- uses: actions/checkout@61b9e3751b92087fd0b06925ba6dd6314e06f089 # master
37-
- name: Set up Python 3.7
37+
- name: Set up Python 3.9
3838
uses: actions/setup-python@0f07f7f756721ebd886c2462646a35f78a8bc4de # v1.2.4
3939
with:
40-
python-version: 3.7
40+
python-version: 3.9
4141
- name: Set up JDK 1.8
4242
uses: actions/setup-java@b6e674f4b717d7b0ae3baee0fbe79f498905dfde # v1.4.4
4343
with:

.github/workflows/ray_nightly_test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ jobs:
3131
strategy:
3232
matrix:
3333
os: [ ubuntu-latest ]
34-
python-version: [3.8, 3.9]
35-
spark-version: [3.1.3, 3.2.4, 3.3.2, 3.4.0]
34+
python-version: [3.8, 3.9, 3.10.14]
35+
spark-version: [3.2.4, 3.3.2, 3.4.0, 3.5.0]
3636

3737
runs-on: ${{ matrix.os }}
3838

.github/workflows/raydp.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ jobs:
3333
strategy:
3434
matrix:
3535
os: [ ubuntu-latest ]
36-
python-version: [3.8, 3.9]
37-
spark-version: [3.1.3, 3.2.4, 3.3.2, 3.4.0]
36+
python-version: [3.8, 3.9, 3.10.14]
37+
spark-version: [3.2.4, 3.3.2, 3.4.0, 3.5.0]
3838

3939
runs-on: ${{ matrix.os }}
4040

@@ -82,7 +82,7 @@ jobs:
8282
else
8383
pip install torch
8484
fi
85-
pip install pyarrow==6.0.1 ray[train] pytest koalas tensorflow==2.13.1 tabulate grpcio-tools wget
85+
pip install pyarrow==6.0.1 ray[train] pytest tensorflow==2.13.1 tabulate grpcio-tools wget
8686
pip install "xgboost_ray[default]<=0.1.13"
8787
pip install torchmetrics
8888
- name: Cache Maven

.github/workflows/raydp_nightly.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ jobs:
3434
runs-on: ubuntu-latest
3535
steps:
3636
- uses: actions/checkout@61b9e3751b92087fd0b06925ba6dd6314e06f089 # master
37-
- name: Set up Python 3.7
37+
- name: Set up Python 3.9
3838
uses: actions/setup-python@0f07f7f756721ebd886c2462646a35f78a8bc4de # v1.2.4
3939
with:
40-
python-version: 3.7
40+
python-version: 3.9
4141
- name: Set up JDK 1.8
4242
uses: actions/setup-java@b6e674f4b717d7b0ae3baee0fbe79f498905dfde # v1.4.4
4343
with:

core/raydp-main/src/main/scala/org/apache/spark/sql/raydp/ObjectStoreWriter.scala

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,19 @@
1717

1818
package org.apache.spark.sql.raydp
1919

20-
20+
import com.intel.raydp.shims.SparkShimLoader
21+
import io.ray.api.{ActorHandle, ObjectRef, PyActorHandle, Ray}
22+
import io.ray.runtime.AbstractRayRuntime
2123
import java.io.ByteArrayOutputStream
2224
import java.util.{List, UUID}
2325
import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
2426
import java.util.function.{Function => JFunction}
25-
26-
import scala.collection.JavaConverters._
27-
import scala.collection.mutable
28-
import scala.collection.mutable.ArrayBuffer
29-
30-
import io.ray.api.{ActorHandle, ObjectRef, PyActorHandle, Ray}
31-
import io.ray.runtime.AbstractRayRuntime
3227
import org.apache.arrow.vector.VectorSchemaRoot
3328
import org.apache.arrow.vector.ipc.ArrowStreamWriter
3429
import org.apache.arrow.vector.types.pojo.Schema
30+
import scala.collection.JavaConverters._
31+
import scala.collection.mutable
32+
import scala.collection.mutable.ArrayBuffer
3533

3634
import org.apache.spark.{RayDPException, SparkContext}
3735
import org.apache.spark.deploy.raydp._
@@ -105,7 +103,7 @@ class ObjectStoreWriter(@transient val df: DataFrame) extends Serializable {
105103
Iterator(iter)
106104
}
107105

108-
val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
106+
val arrowSchema = SparkShimLoader.getSparkShims.toArrowSchema(schema, timeZoneId)
109107
val allocator = ArrowUtils.rootAllocator.newChildAllocator(
110108
s"ray object store writer", 0, Long.MaxValue)
111109
val root = VectorSchemaRoot.create(arrowSchema, allocator)
@@ -217,7 +215,7 @@ object ObjectStoreWriter {
217215
def toArrowSchema(df: DataFrame): Schema = {
218216
val conf = df.queryExecution.sparkSession.sessionState.conf
219217
val timeZoneId = conf.getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
220-
ArrowUtils.toArrowSchema(df.schema, timeZoneId)
218+
SparkShimLoader.getSparkShims.toArrowSchema(df.schema, timeZoneId)
221219
}
222220

223221
def fromSparkRDD(df: DataFrame, storageLevel: StorageLevel): Array[Array[Byte]] = {

core/shims/common/src/main/scala/com/intel/raydp/shims/SparkShims.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package com.intel.raydp.shims
1919

20+
import org.apache.arrow.vector.types.pojo.Schema
2021
import org.apache.spark.{SparkEnv, TaskContext}
2122
import org.apache.spark.api.java.JavaRDD
2223
import org.apache.spark.executor.RayDPExecutorBackendFactory
24+
import org.apache.spark.sql.types.StructType
2325
import org.apache.spark.sql.{DataFrame, SparkSession}
2426

2527
sealed abstract class ShimDescriptor
@@ -36,4 +38,6 @@ trait SparkShims {
3638
def getExecutorBackendFactory(): RayDPExecutorBackendFactory
3739

3840
def getDummyTaskContext(partitionId: Int, env: SparkEnv): TaskContext
41+
42+
def toArrowSchema(schema : StructType, timeZoneId : String) : Schema
3943
}

core/shims/spark322/src/main/scala/com/intel/raydp/shims/SparkShims.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ import org.apache.spark.executor.spark322._
2424
import org.apache.spark.spark322.TaskContextUtils
2525
import org.apache.spark.sql.{DataFrame, SparkSession}
2626
import org.apache.spark.sql.spark322.SparkSqlUtils
27-
2827
import com.intel.raydp.shims.{ShimDescriptor, SparkShims}
28+
import org.apache.arrow.vector.types.pojo.Schema
29+
import org.apache.spark.sql.types.StructType
2930

3031
class Spark322Shims extends SparkShims {
3132
override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
@@ -44,4 +45,8 @@ class Spark322Shims extends SparkShims {
4445
override def getDummyTaskContext(partitionId: Int, env: SparkEnv): TaskContext = {
4546
TaskContextUtils.getDummyTaskContext(partitionId, env)
4647
}
48+
49+
override def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
50+
SparkSqlUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
51+
}
4752
}

core/shims/spark322/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,19 @@
1717

1818
package org.apache.spark.sql.spark322
1919

20+
import org.apache.arrow.vector.types.pojo.Schema
2021
import org.apache.spark.api.java.JavaRDD
21-
import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext}
22+
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
2223
import org.apache.spark.sql.execution.arrow.ArrowConverters
24+
import org.apache.spark.sql.types.StructType
25+
import org.apache.spark.sql.util.ArrowUtils
2326

2427
object SparkSqlUtils {
2528
def toDataFrame(rdd: JavaRDD[Array[Byte]], schema: String, session: SparkSession): DataFrame = {
2629
ArrowConverters.toDataFrame(rdd, schema, new SQLContext(session))
2730
}
31+
32+
def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
33+
ArrowUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
34+
}
2835
}

core/shims/spark330/src/main/scala/com/intel/raydp/shims/SparkShims.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ import org.apache.spark.executor.spark330._
2424
import org.apache.spark.spark330.TaskContextUtils
2525
import org.apache.spark.sql.{DataFrame, SparkSession}
2626
import org.apache.spark.sql.spark330.SparkSqlUtils
27-
2827
import com.intel.raydp.shims.{ShimDescriptor, SparkShims}
28+
import org.apache.arrow.vector.types.pojo.Schema
29+
import org.apache.spark.sql.types.StructType
2930

3031
class Spark330Shims extends SparkShims {
3132
override def getShimDescriptor: ShimDescriptor = SparkShimProvider.DESCRIPTOR
@@ -44,4 +45,8 @@ class Spark330Shims extends SparkShims {
4445
override def getDummyTaskContext(partitionId: Int, env: SparkEnv): TaskContext = {
4546
TaskContextUtils.getDummyTaskContext(partitionId, env)
4647
}
48+
49+
override def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
50+
SparkSqlUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
51+
}
4752
}

core/shims/spark330/src/main/scala/org/apache/spark/sql/SparkSqlUtils.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,19 @@
1717

1818
package org.apache.spark.sql.spark330
1919

20+
import org.apache.arrow.vector.types.pojo.Schema
2021
import org.apache.spark.api.java.JavaRDD
21-
import org.apache.spark.sql.{DataFrame, SparkSession, SQLContext}
22+
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
2223
import org.apache.spark.sql.execution.arrow.ArrowConverters
24+
import org.apache.spark.sql.types.StructType
25+
import org.apache.spark.sql.util.ArrowUtils
2326

2427
object SparkSqlUtils {
2528
def toDataFrame(rdd: JavaRDD[Array[Byte]], schema: String, session: SparkSession): DataFrame = {
2629
ArrowConverters.toDataFrame(rdd, schema, session)
2730
}
31+
32+
def toArrowSchema(schema : StructType, timeZoneId : String) : Schema = {
33+
ArrowUtils.toArrowSchema(schema = schema, timeZoneId = timeZoneId)
34+
}
2835
}

0 commit comments

Comments
 (0)