Skip to content

[SPARK-51537][CONNECT][CORE] construct the session-specific classloader based on the default session classloader on executor #50334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

wbo4958
Copy link
Contributor

@wbo4958 wbo4958 commented Mar 20, 2025

What changes were proposed in this pull request?

This PR is to construct the session-specific classloader based on the default session classloader which has already added the global jars (e.g., added by --jars ) into the classpath on the executor side in the connect mode.

Why are the changes needed?

In Spark Connect mode, when connecting to a non-local (e.g., standalone) cluster, the executor creates an isolated session state that includes a session-specific classloader for each task. However, a notable issue arises: this session-specific classloader does not include the global JARs specified by the --jars option in the classpath. This oversight can lead to deserialization exceptions. For example:

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096)

Does this PR introduce any user-facing change?

No

How was this patch tested?

The newly added test can pass. And the below manual test can pass,

  1. clone the minimum project that could repro this issue
git clone [email protected]:wbo4958/ConnectMLIssue.git
  1. Compile the project
mvn clean package
  1. Start a standalone cluster
$SPARK_HOME/sbin/start-master.sh -h localhost
$SPARK_HOME/sbin/start-worker.sh spark://localhost:7077
  1. Start a connect server connecting to the spark standalone cluster
./standalone.sh
  1. Play around the demo

Running the below code under the pyspark client environment.

python repro-issue.py

Without this PR, you're going to see the below exception

Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096)
	at java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2060)
	at java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1347)
	at java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2679)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2486)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
	at java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:88)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:136)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:86)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:645)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:100)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:648)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.lang.Thread.run(Thread.java:840)

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Mar 20, 2025
@wbo4958 wbo4958 changed the title [SPARK-51537][CONNECT] [constructed classpath using both global jars and session specific jars in executor [SPARK-51537][CONNECT][CORE] [constructed classpath using both global jars and session specific jars in executor Mar 20, 2025
@wbo4958 wbo4958 force-pushed the connect-executor-classpath branch from 7b963dc to bbe2a94 Compare March 21, 2025 02:17
@wbo4958 wbo4958 changed the title [SPARK-51537][CONNECT][CORE] [constructed classpath using both global jars and session specific jars in executor [SPARK-51537][CONNECT][CORE] construct classpath using both global jars and session specific jars in executor Mar 21, 2025
@wbo4958 wbo4958 marked this pull request as ready for review March 21, 2025 02:43
@wbo4958 wbo4958 changed the title [SPARK-51537][CONNECT][CORE] construct classpath using both global jars and session specific jars in executor [SPARK-51537][CONNECT][CORE] construct classpath using both global jars and session specific jars on executor Mar 21, 2025
@wbo4958
Copy link
Contributor Author

wbo4958 commented Mar 21, 2025

Hi @hvanhovell @zhenlineo @HyukjinKwon @vicennial, Could you help review this PR? Thx very much.

@wbo4958 wbo4958 marked this pull request as ready for review March 25, 2025 08:01
@wbo4958 wbo4958 changed the title [SPARK-51537][CONNECT][CORE] construct classpath using both global jars and session specific jars on executor [SPARK-51537][CONNECT][CORE] construct the session-specific classloader based on the default session classloader on executor Mar 25, 2025
@vicennial
Copy link
Contributor

Thanks for identifying this issue, @wbo4958! While your PR resolves the executor-side problem, I believe we have a chance to refine our approach to cover both executor operations (e.g., typical UDFs) and driver operations (e.g., custom data sources) in one unified solution.

The high-level proposal: In the ArtifactManager, add an initialisation step that would copy JARs from the underlying session.sparkContext.addedJars(DEFAULT_SESSION_ID) into session.sparkContext.addedJars(session.sessionUUID).
Advantages:

  • Enhanced session isolation
    • Global JARs are copied during initialization, so any subsequent changes to the default session jars do not affect the session-specific context.
    • This isolation is particularly beneficial in standalone clusters where Spark Connect sessions coexist with traditional sessions (i.e., those interacting directly with SparkContext).
  • Since the copied global JARs behave as session-scoped JARs, no extra modifications to the executor’s code or classloader are required.

The negative here is duplicating the global JARs for each new Spark Connect session will naturally consume more resources. We could mitigate this by adding a Spark configuration option to toggle whether global jars are inherited into a Spark Connect session.

WDYT?

@wbo4958
Copy link
Contributor Author

wbo4958 commented Mar 25, 2025

Hi @vicennial ,

Looks like we couldn't copy the global jars into session-specific jars. Consider this scenario, A spark plugin is added by "--jars" when launching connect server, then the executor will immediately fetch the global jars and initialize the spark plugin.

If we copy the global jars into session-specific jars, when the tasks are running on the executor side, when the code hits the plugin, the session-specific classloader is going to reload the plugin which could result in plugin not working.

The fist commit of my PR bbe2a94 is adding the global jars into session-specific classpath, but it's going to cause the plugin not working issue, see NVIDIA/spark-rapids#11902

Copy link
Contributor

@vicennial vicennial left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation and changes @wbo4958! LGTM for the most part.

As a follow-up idea for later, I'd like to explore to see if adding a second "user default" session state would make sense. The idea is broadly that a "cluster default" session would contain jars added during cluster-bootup (i.e through --jars ) whose children sessions would be the Spark Connect isolated sessions as well as a "user default" session (i.e any jars added directly by a user manipulating SparkContext directly without going through Spark Connect). These child sessions would inherit the "cluster default" classloader and ensure isolation between classic Spark sessions and Spark Connect sessions. cc @HyukjinKwon @hvanhovell

         +-------------------------------------+
         |         Cluster Default             |
         |       ClassLoader (bootup jars)     |
         +----------------+----------------------+
                          |
         +----------------+----------------+
         |                                 |
+-------------------------+    +-------------------------+
|  Spark Connect          |    |   User Default          |
|  Isolated Session       |    |   Session (direct jars) |
|  (inherits cluster      |    |   (inherits cluster     |
|   default classloader)  |    |    default classloader) |
+-------------------------+    +-------------------------+

Comment on lines 1111 to 1112
// SPARK-51537, The isolation session must "inherit" the classloader from default session which
// has already added the global jars specified by --jars
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For sake of completeness/tracking, please also add the short blurb about the plugin issue here and the other comment below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thx

@@ -109,4 +109,31 @@ class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext {
}
}
}

test("SPARK-51537 Executor isolation session classloader inherits from " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we can also add a test (here or elsewhere) for the plugin issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we file a followup for this? For now, I've no idea about how to adding the plugin in this test.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving LGTM per @vicennial's he knows this part pretty well

@wbo4958
Copy link
Contributor Author

wbo4958 commented Mar 31, 2025

Thanks for the explanation and changes @wbo4958! LGTM for the most part.

As a follow-up idea for later, I'd like to explore to see if adding a second "user default" session state would make sense. The idea is broadly that a "cluster default" session would contain jars added during cluster-bootup (i.e through --jars ) whose children sessions would be the Spark Connect isolated sessions as well as a "user default" session (i.e any jars added directly by a user manipulating SparkContext directly without going through Spark Connect). These child sessions would inherit the "cluster default" classloader and ensure isolation between classic Spark sessions and Spark Connect sessions. cc @HyukjinKwon @hvanhovell

         +-------------------------------------+
         |         Cluster Default             |
         |       ClassLoader (bootup jars)     |
         +----------------+----------------------+
                          |
         +----------------+----------------+
         |                                 |
+-------------------------+    +-------------------------+
|  Spark Connect          |    |   User Default          |
|  Isolated Session       |    |   Session (direct jars) |
|  (inherits cluster      |    |   (inherits cluster     |
|   default classloader)  |    |    default classloader) |
+-------------------------+    +-------------------------+

Hmm, Looks like this makes things complicated. Looks like we couldn't handle SparkContext in the Spark Connect environment no matter it's in the client or server side?

@wbo4958
Copy link
Contributor Author

wbo4958 commented Mar 31, 2025

Hi @vicennial, I resolved the comments, please help review again.

Copy link
Contributor

@vicennial vicennial left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@@ -109,4 +109,31 @@ class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext {
}
}
}

test("SPARK-51537 Executor isolation session classloader inherits from " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

@wbo4958
Copy link
Contributor Author

wbo4958 commented Mar 31, 2025

Hi @HyukjinKwon, Could you help review again and merge it if you think the PR is ok. Thx

@wbo4958
Copy link
Contributor Author

wbo4958 commented Mar 31, 2025

Hi @hvanhovell @grundprinzip could you also help review it and merge it if possible? Thx very much

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Mar 31, 2025

Merged to master and branch-4.0.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Apr 1, 2025

Hi @HyukjinKwon, Thx for the merging. BTW, could you help cherry-pick it to branch 4.0

@wbo4958 wbo4958 deleted the connect-executor-classpath branch April 1, 2025 02:40
@HyukjinKwon
Copy link
Member

Sounds like there should be a followup, is it fine without it? @vicennial is this something we can backport? or should keep only in the master branch?

@wbo4958
Copy link
Contributor Author

wbo4958 commented Apr 1, 2025

HI @HyukjinKwon, the followup is to add a unit test for avoiding reloading spark plugin jars. This PR has also fixed this kind of issue. We have manually verified it at least for spark-rapids plugin. But for now, it's hard to add it in the unit test, so we agreed to have the followup for it, anyway, I will try my best to add this unit test. But again, it won't affect the cherry-picking.

@gerashegalov
Copy link
Contributor

Thanks @wbo4958 for this fix. Just to elaborate, we also verified that this PR works when cherry-picked into Spark branch-4.0 with spark-rapids branch-25.04 including the following previously broken scenario:

  • Multi-shim spark-rapids jar for Spark 3.3.0 and 4.0.1-SNAPSHOT
  • Two-worker Spark Standalone cluster
  • Connect Server with RapidsShuffleManager, two executors, one executor per worker

It would be great if this PR is also included into branch-4.0 for the next 4.0.0 RC4

@grundprinzip
Copy link
Contributor

@cloud-fan missinf this PR in the Spark 4.0 rc breaks crucial functionality for the spark ml implementation. Can we please cherry pick it to the RC branch?

@HyukjinKwon
Copy link
Member

Right, let me land it

HyukjinKwon pushed a commit that referenced this pull request Apr 1, 2025
…er based on the default session classloader on executor

This PR is to construct the session-specific classloader based on the default session classloader which has already added the global jars (e.g., added by `--jars` ) into the classpath on the executor side in the connect mode.

In Spark Connect mode, when connecting to a non-local (e.g., standalone) cluster, the executor creates an isolated session state that includes a session-specific classloader for each task. However, a notable issue arises: this session-specific classloader does not include the global JARs specified by the --jars option in the classpath. This oversight can lead to deserialization exceptions. For example:

``` console
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096)
```

No

The newly added test can pass. And the below manual test can pass,

1. clone the minimum project that could repro this issue

``` shell
git clone gitgithub.com:wbo4958/ConnectMLIssue.git
```

2. Compile the project

```shell
mvn clean package
```

3. Start a standalone cluster

```shell
$SPARK_HOME/sbin/start-master.sh -h localhost
$SPARK_HOME/sbin/start-worker.sh spark://localhost:7077
```

4. Start a connect server connecting to the spark standalone cluster

```
./standalone.sh
```

5. Play around the demo

Running the below code under the pyspark client environment.

```shell
python repro-issue.py
```

Without this PR, you're going to see the below exception

``` console
Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance of org.apache.spark.rdd.MapPartitionsRDD
	at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096)
	at java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2060)
	at java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1347)
	at java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2679)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2486)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
	at java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:88)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:136)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:86)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at org.apache.spark.scheduler.Task.run(Task.scala:147)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:645)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:100)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:648)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.lang.Thread.run(Thread.java:840)
```

No

Closes #50334 from wbo4958/connect-executor-classpath.

Authored-by: Bobby Wang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 72bd563)
Signed-off-by: Hyukjin Kwon <[email protected]>
@HyukjinKwon
Copy link
Member

backported

@wbo4958
Copy link
Contributor Author

wbo4958 commented Apr 1, 2025

Hi @HyukjinKwon, Thx for your help. BTW, I found branch-3.5 (3.5.6-SNAPSHOT) has the same issue that the test will fail. I wonder if it's worth to backport to branch 3.5?

Seems there're a few of similiar spark jiras about this issue for spark 3.5, see https://issues.apache.org/jira/browse/SPARK-46762 and https://issues.apache.org/jira/browse/SPARK-45598

@HyukjinKwon
Copy link
Member

Alright, let;s put it in branch-3.5 too. Would you mind creating a PR for branch-3.5? I think we should better open a new PR to make sure tests pass

@wbo4958
Copy link
Contributor Author

wbo4958 commented Apr 1, 2025

Hi @HyukjinKwon, I made a PR for 3.5 #50475.

HyukjinKwon pushed a commit that referenced this pull request Apr 10, 2025
…k plugins are not reloaded

### What changes were proposed in this pull request?

This PR adds a unit test to verify that Spark plugin JARs specified via `--jars` are not reloaded.

### Why are the changes needed?

This PR is a followup of #50334 (comment)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

The test added can pass

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #50526 from wbo4958/SPARK-51537-followup.

Authored-by: Bobby Wang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Apr 10, 2025
…k plugins are not reloaded

### What changes were proposed in this pull request?

This PR adds a unit test to verify that Spark plugin JARs specified via `--jars` are not reloaded.

### Why are the changes needed?

This PR is a followup of #50334 (comment)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

The test added can pass

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #50526 from wbo4958/SPARK-51537-followup.

Authored-by: Bobby Wang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 622fa35)
Signed-off-by: Hyukjin Kwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Apr 10, 2025
…k plugins are not reloaded

### What changes were proposed in this pull request?

This PR adds a unit test to verify that Spark plugin JARs specified via `--jars` are not reloaded.

### Why are the changes needed?

This PR is a followup of #50334 (comment)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

The test added can pass

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #50526 from wbo4958/SPARK-51537-followup.

Authored-by: Bobby Wang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 622fa35)
Signed-off-by: Hyukjin Kwon <[email protected]>
jetoile pushed a commit to criteo-forks/spark that referenced this pull request Jul 15, 2025
…k plugins are not reloaded

### What changes were proposed in this pull request?

This PR adds a unit test to verify that Spark plugin JARs specified via `--jars` are not reloaded.

### Why are the changes needed?

This PR is a followup of apache#50334 (comment)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

The test added can pass

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#50526 from wbo4958/SPARK-51537-followup.

Authored-by: Bobby Wang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 622fa35)
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants