Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a45140e
do one hop forward fetch if recache data change executor
pang-wu Feb 1, 2026
22259a6
more robust executor id parse
pang-wu Feb 1, 2026
7b50558
add test
pang-wu Feb 1, 2026
75bff2e
add test
pang-wu Feb 2, 2026
32935ab
remove test
pang-wu Feb 2, 2026
099007f
revert change in dataset.py
pang-wu Feb 2, 2026
a489788
clean up
pang-wu Feb 2, 2026
2c8df45
clean up
pang-wu Feb 2, 2026
fb31b09
strip off table metadata again
pang-wu Feb 2, 2026
b60bffd
fix spark gc race condition
pang-wu Feb 16, 2026
86564cb
Add spark 3.4.4 and 3.5.4 support
pang-wu Dec 24, 2024
5334d13
Support Spark 4.0.0
pang-wu Aug 21, 2025
b64a284
exclude spark 3.x
pang-wu Aug 25, 2025
64f9794
add distribution
pang-wu Aug 25, 2025
67faeb5
lint
pang-wu Aug 25, 2025
0a71973
Do not use largeVarTypes
pang-wu Aug 26, 2025
067f277
class to classic session to convert internalRowRdd to rdd
pang-wu Aug 26, 2025
0651c9d
arrow to rdd
pang-wu Aug 26, 2025
f3f6e75
pin click<8.3.0
pang-wu Sep 26, 2025
66656ee
make jackson provided
pang-wu Nov 21, 2025
90a4699
Support spark 4.0.1
pang-wu Nov 21, 2025
201e96a
tf/estimator.py: only write checkpoint in rank0
pang-wu Nov 21, 2025
f520469
revert tf/estimator.py
pang-wu Dec 8, 2025
679eca6
support spark 4.1.x
pang-wu Feb 16, 2026
6c7a1b3
deprecate python 3.9, add 3.11 to CI
pang-wu Feb 16, 2026
187ce96
update pylint
pang-wu Feb 16, 2026
8cfc832
fix pyint rules
pang-wu Feb 16, 2026
6473ccc
fix tensorflow version
pang-wu Feb 16, 2026
93d5d42
pin pandas<3 version
pang-wu Feb 16, 2026
f83db26
remove df.sqlContext reference
pang-wu Feb 16, 2026
7d4c4de
extract commandlineutils to custom spark submit
pang-wu Feb 17, 2026
e7148fe
add new shims
pang-wu Feb 17, 2026
1989452
compile against 4.0.0
pang-wu Feb 17, 2026
a86d51d
use legacy keras
pang-wu Feb 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/pypi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ jobs:
uses: actions/setup-python@e9aba2c848f5ebd159c070c61ea2c4e2b122355e # v2.3.4
with:
python-version: 3.10.14
- name: Set up JDK 1.8
- name: Set up JDK 17
uses: actions/setup-java@b6e674f4b717d7b0ae3baee0fbe79f498905dfde # v1.4.4
with:
java-version: 1.8
java-version: 17
distribution: 'corretto'
- name: days since the commit date
run: |
:
Expand Down
9 changes: 5 additions & 4 deletions .github/workflows/pypi_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
name: build wheel and upload release
runs-on: ubuntu-latest
env:
PYSPARK_VERSION: "3.5.7"
PYSPARK_VERSION: "4.1.0"
RAY_VERSION: "2.40.0"
steps:
- uses: actions/checkout@61b9e3751b92087fd0b06925ba6dd6314e06f089 # master
Expand All @@ -46,10 +46,11 @@ jobs:
uses: actions/setup-python@e9aba2c848f5ebd159c070c61ea2c4e2b122355e # v2.3.4
with:
python-version: 3.10.14
- name: Set up JDK 1.8
- name: Set up JDK 17
uses: actions/setup-java@b6e674f4b717d7b0ae3baee0fbe79f498905dfde # v1.4.4
with:
java-version: 1.8
java-version: 17
distribution: 'corretto'
- name: Install extra dependencies for Ubuntu
run: |
sudo apt-get install -y mpich
Expand All @@ -65,7 +66,7 @@ jobs:
pip install "numpy<1.24" "click<8.3.0"
pip install "pydantic<2.0"
pip install torch --index-url https://download.pytorch.org/whl/cpu
pip install pyarrow "ray[train,default]==${{ env.RAY_VERSION }}" tqdm pytest tensorflow==2.13.1 tabulate grpcio-tools wget
pip install pyarrow "ray[train,default]==${{ env.RAY_VERSION }}" tqdm pytest tensorflow==2.16.1 tf_keras tabulate grpcio-tools wget
pip install "xgboost_ray[default]<=0.1.13"
pip install "xgboost<=2.0.3"
pip install torchmetrics
Expand Down
18 changes: 9 additions & 9 deletions .github/workflows/ray_nightly_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ jobs:
strategy:
matrix:
os: [ ubuntu-latest ]
python-version: [3.9, 3.10.14]
spark-version: [3.3.2, 3.4.0, 3.5.0]
python-version: [3.10.14, 3.11]
spark-version: [4.0.0, 4.1.0]

runs-on: ${{ matrix.os }}

Expand Down Expand Up @@ -74,7 +74,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install wheel
pip install "numpy<1.24" "click<8.3.0"
pip install "click<8.3.0"
SUBVERSION=$(python -c 'import sys; print(sys.version_info[1])')
if [ "$(uname -s)" == "Linux" ]
then
Expand All @@ -83,14 +83,14 @@ jobs:
pip install torch
fi
case $PYTHON_VERSION in
3.9)
pip install "ray[train,default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp39-cp39-manylinux2014_x86_64.whl"
;;
3.10.14)
pip install "ray[train,default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp310-cp310-manylinux2014_x86_64.whl"
;;
3.11)
pip install "ray[train,default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp311-cp311-manylinux2014_x86_64.whl"
;;
esac
pip install pyarrow tqdm pytest tensorflow==2.13.1 tabulate grpcio-tools wget
pip install pyarrow tqdm pytest tabulate grpcio-tools wget
pip install "xgboost_ray[default]<=0.1.13"
pip install torchmetrics
HOROVOD_WITH_GLOO=1
Expand All @@ -107,10 +107,10 @@ jobs:
run: |
pip install pyspark==${{ matrix.spark-version }}
./build.sh
pip install dist/raydp-*.whl
pip install "$(ls dist/raydp-*.whl)[tensorflow]"
- name: Lint
run: |
pip install pylint==2.8.3
pip install pylint==3.2.7
pylint --rcfile=python/pylintrc python/raydp
pylint --rcfile=python/pylintrc examples/*.py
- name: Test with pytest
Expand Down
13 changes: 6 additions & 7 deletions .github/workflows/raydp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
python-version: [3.9, 3.10.14]
spark-version: [3.3.2, 3.4.0, 3.5.0]
python-version: [3.10.14, 3.11]
spark-version: [4.0.0, 4.1.0]
ray-version: [2.37.0, 2.40.0, 2.50.0]

runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -74,16 +74,15 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install wheel
pip install "numpy<1.24" "click<8.3.0"
pip install "pydantic<2.0"
pip install "pydantic<2.0" "click<8.3.0"
SUBVERSION=$(python -c 'import sys; print(sys.version_info[1])')
if [ "$(uname -s)" == "Linux" ]
then
pip install torch --index-url https://download.pytorch.org/whl/cpu
else
pip install torch
fi
pip install pyarrow "ray[train,default]==${{ matrix.ray-version }}" tqdm pytest tensorflow==2.13.1 tabulate grpcio-tools wget
pip install pyarrow "ray[train,default]==${{ matrix.ray-version }}" tqdm pytest tabulate grpcio-tools wget
pip install "xgboost_ray[default]<=0.1.13"
pip install "xgboost<=2.0.3"
pip install torchmetrics
Expand All @@ -98,10 +97,10 @@ jobs:
run: |
pip install pyspark==${{ matrix.spark-version }}
./build.sh
pip install dist/raydp-*.whl
pip install "$(ls dist/raydp-*.whl)[tensorflow]"
- name: Lint
run: |
pip install pylint==2.8.3
pip install pylint==3.2.7
pylint --rcfile=python/pylintrc python/raydp
pylint --rcfile=python/pylintrc examples/*.py
- name: Test with pytest
Expand Down
19 changes: 13 additions & 6 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
<url>https://github.com/ray-project/raydp.git</url>

<properties>
<spark.version>3.3.3</spark.version>
<spark.version>4.0.0</spark.version>
<spark322.version>3.2.2</spark322.version>
<spark330.version>3.3.0</spark330.version>
<spark340.version>3.4.0</spark340.version>
<spark350.version>3.5.0</spark350.version>
<spark400.version>4.0.0</spark400.version>
<spark410.version>4.1.0</spark410.version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: spark410.version is 4.1.0 — worth bumping to 4.1.1 (current release)? The SparkShimProvider already covers it at runtime, but compiling against the latest patch would catch any API changes at build time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep it 4.1.0 -- the idea is we should support the minimum API from the initial version otherwise the lib might introduce broken changes between Spark's patch versions (Spark is supposed to be backward compatible on patch versions)

<snappy.version>1.1.10.4</snappy.version>
<netty.version>4.1.94.Final</netty.version>
<commons.text.version>1.10.0</commons.text.version>
Expand All @@ -27,11 +29,11 @@
<ivy.version>2.5.2</ivy.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Maven compiler source/target is still 1.8. Since Spark 4.x requires Java 17 at runtime and CI now uses JDK 17, should we bump the compile target to 17 as well? This would catch any bytecode-level incompatibilities at compile time rather than runtime.

<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<scala.version>2.12.15</scala.version>
<jackson.version>2.18.6</jackson.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<scala.version>2.13.12</scala.version>
<jackson.version>2.18.2</jackson.version>
<scala.binary.version>2.13</scala.binary.version>
<junit-jupiter.version>5.10.1</junit-jupiter.version>
</properties>

Expand Down Expand Up @@ -151,23 +153,27 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<!-- Guava is excluded because of SPARK-6149. The Guava version referenced in this module is
15.0, which causes runtime incompatibility issues. -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
Expand All @@ -179,6 +185,7 @@
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down
5 changes: 0 additions & 5 deletions core/raydp-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -134,24 +134,20 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<!-- Guava is excluded because of SPARK-6149. The Guava version referenced in this module is
15.0, which causes runtime incompatibility issues. -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
<version>${jackson.version}</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
Expand All @@ -162,7 +158,6 @@
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import java.security.PrivilegedExceptionAction
import java.text.ParseException
import java.util.{ServiceLoader, UUID}
import java.util.jar.JarInputStream
import javax.ws.rs.core.UriBuilder

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Properties, Try}

import com.intel.raydp.shims.SparkShimLoader
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.{Configuration => HadoopConfiguration}
import org.apache.hadoop.fs.{FileSystem, Path}
Expand Down Expand Up @@ -258,7 +258,10 @@ private[spark] class SparkSubmit extends Logging {
}

if (clusterManager == KUBERNETES) {
args.master = Utils.checkAndGetK8sMasterUrl(args.master)
val checkedMaster = Utils.checkAndGetK8sMasterUrl(args.master)
SparkShimLoader.getSparkShims
.getCommandLineUtilsBridge
.setSubmitMaster(args, checkedMaster)
// Make sure KUBERNETES is included in our build if we're trying to use it
if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
error(
Expand Down Expand Up @@ -340,7 +343,7 @@ private[spark] class SparkSubmit extends Logging {

// update spark config from args
args.toSparkConf(Option(sparkConf))
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
val hadoopConf = conf.getOrElse(SparkHadoopUtil.get.newConfiguration(sparkConf))
val targetDir = Utils.createTempDir()

// Kerberos is not supported in standalone mode, and keytab support is not yet available
Expand Down Expand Up @@ -393,8 +396,10 @@ private[spark] class SparkSubmit extends Logging {
val archiveLocalFiles = Option(args.archives).map { uris =>
val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
val localArchives = downloadFileList(
resolvedUris.map(
UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","),
resolvedUris.map { uri =>
new URI(uri.getScheme,
uri.getRawSchemeSpecificPart, null).toString
}.mkString(","),
targetDir, sparkConf, hadoopConf)

// SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running
Expand All @@ -413,8 +418,9 @@ private[spark] class SparkSubmit extends Logging {
Utils.unpack(source, dest)

// Keep the URIs of local files with the given fragments.
UriBuilder.fromUri(
localArchive).fragment(resolvedUri.getFragment).build().toString
new URI(localArchive.getScheme,
localArchive.getRawSchemeSpecificPart,
resolvedUri.getFragment).toString
}.mkString(",")
}.orNull
args.files = filesLocalFiles
Expand Down Expand Up @@ -986,7 +992,12 @@ private[spark] object InProcessSparkSubmit {

}

object SparkSubmit extends CommandLineUtils with Logging {
object SparkSubmit extends Logging {

var printStream: PrintStream = System.err
// scalastyle:off println
def printMessage(str: String): Unit = printStream.println(str)
// scalastyle:on println

// Cluster managers
private val YARN = 1
Expand Down Expand Up @@ -1019,7 +1030,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"

override def main(args: Array[String]): Unit = {
def main(args: Array[String]): Unit = {
val submit = new SparkSubmit() {
self =>

Expand All @@ -1044,7 +1055,8 @@ object SparkSubmit extends CommandLineUtils with Logging {
super.doSubmit(args)
} catch {
case e: SparkUserAppException =>
exitFn(e.exitCode)
SparkShimLoader.getSparkShims
.getCommandLineUtilsBridge.callExit(e.exitCode)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ import java.text.SimpleDateFormat
import java.util.{Date, Locale}
import javax.xml.bind.DatatypeConverter

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap
import scala.jdk.CollectionConverters._

import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.databind.ObjectMapper
import io.ray.api.{ActorHandle, PlacementGroups, Ray}
import io.ray.api.id.PlacementGroupId
import io.ray.api.placementgroup.PlacementGroup
import io.ray.runtime.config.RayConfig
import org.json4s._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{RayDPException, SecurityManager, SparkConf}
import org.apache.spark.executor.RayDPExecutor
Expand All @@ -39,6 +39,7 @@ import org.apache.spark.raydp.{RayExecutorUtils, SparkOnRayConfigs}
import org.apache.spark.rpc._
import org.apache.spark.util.Utils


class RayAppMaster(host: String,
port: Int,
actorExtraClasspath: String) extends Serializable with Logging {
Expand Down Expand Up @@ -298,7 +299,7 @@ class RayAppMaster(host: String,
.map { case (name, amount) => (name, Double.box(amount)) }.asJava,
placementGroup,
getNextBundleIndex,
seqAsJavaList(appInfo.desc.command.javaOpts))
appInfo.desc.command.javaOpts.asJava)
appInfo.addPendingRegisterExecutor(executorId, handler, sparkCoresPerExecutor, memory)
}

Expand Down Expand Up @@ -356,11 +357,15 @@ object RayAppMaster extends Serializable {
val ACTOR_NAME = "RAY_APP_MASTER"

def setProperties(properties: String): Unit = {
implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
val parsed = parse(properties).extract[Map[String, String]]
parsed.foreach{ case (key, value) =>
System.setProperty(key, value)
// Use Jackson ObjectMapper directly to avoid JSON4S version conflicts
val mapper = new ObjectMapper()
val javaMap = mapper.readValue(properties, classOf[java.util.Map[String, Object]])
val scalaMap = javaMap.asScala.toMap
scalaMap.foreach{ case (key, value) =>
// Convert all values to strings since System.setProperty expects String
System.setProperty(key, value.toString)
}

// Use the same session dir as the python side
RayConfig.create().setSessionDir(System.getProperty("ray.session-dir"))
}
Expand Down
Loading
Loading