Skip to content

Commit d580860

Browse files
authored
[Fix][Zeta] Fix thread classloader be set to null when use cache mode (#6509)
1 parent d159fbe commit d580860

File tree

6 files changed

+43
-44
lines changed

6 files changed

+43
-44
lines changed

Diff for: seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/loader/ClassLoaderUtil.java

-35
This file was deleted.

Diff for: seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -88,16 +88,27 @@ public synchronized void releaseClassLoader(long jobId, Collection<URL> jars) {
8888
return;
8989
}
9090
if (referenceCount == 0) {
91-
classLoaderMap.remove(key);
91+
ClassLoader classLoader = classLoaderMap.remove(key);
9292
log.info("Release classloader for job {} with jars {}", jobId, jars);
9393
classLoaderReferenceCount.get(jobId).remove(key);
94+
recycleClassLoaderFromThread(classLoader);
9495
}
9596
if (classLoaderMap.isEmpty()) {
9697
classLoaderCache.remove(jobId);
9798
classLoaderReferenceCount.remove(jobId);
9899
}
99100
}
100101

102+
private static void recycleClassLoaderFromThread(ClassLoader classLoader) {
103+
Thread.getAllStackTraces().keySet().stream()
104+
.filter(thread -> thread.getContextClassLoader() == classLoader)
105+
.forEach(
106+
thread -> {
107+
log.info("recycle classloader for thread " + thread.getName());
108+
thread.setContextClassLoader(null);
109+
});
110+
}
111+
101112
private String covertJarsToKey(Collection<URL> jars) {
102113
return jars.stream().map(URL::toString).sorted().reduce((a, b) -> a + b).orElse("");
103114
}

Diff for: seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java

-2
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
4747
import org.apache.seatunnel.engine.common.config.JobConfig;
4848
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
49-
import org.apache.seatunnel.engine.common.loader.ClassLoaderUtil;
5049
import org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
5150
import org.apache.seatunnel.engine.common.utils.IdGenerator;
5251
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
@@ -209,7 +208,6 @@ public ImmutablePair<List<Action>, Set<URL>> parse(ClassLoaderService classLoade
209208
classLoaderService.releaseClassLoader(
210209
Long.parseLong(jobConfig.getJobContext().getJobId()), connectorJars);
211210
}
212-
ClassLoaderUtil.recycleClassLoaderFromThread(classLoader);
213211
}
214212
}
215213

Diff for: seatunnel-engine/seatunnel-engine-core/src/test/java/org/apache/seatunnel/engine/core/classloader/ClassLoaderServiceTest.java

+31
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,35 @@ void testSameJarInDifferentJob() throws MalformedURLException {
6969
Lists.newArrayList(new URL("file:///console.jar"), new URL("file:///fake.jar")));
7070
Assertions.assertEquals(0, classLoaderService.queryClassLoaderCount());
7171
}
72+
73+
@Test
74+
void testRecycleClassLoaderFromThread() throws MalformedURLException, InterruptedException {
75+
ClassLoader classLoader =
76+
classLoaderService.getClassLoader(
77+
3L,
78+
Lists.newArrayList(
79+
new URL("file:///console.jar"), new URL("file:///fake.jar")));
80+
ClassLoader appClassLoader = Thread.currentThread().getContextClassLoader();
81+
Thread.currentThread().setContextClassLoader(classLoader);
82+
Thread thread =
83+
new Thread(
84+
() -> {
85+
while (Thread.currentThread().getContextClassLoader() != null) {
86+
try {
87+
Thread.sleep(1000);
88+
} catch (InterruptedException e) {
89+
throw new RuntimeException(e);
90+
}
91+
}
92+
});
93+
thread.start();
94+
Thread.currentThread().setContextClassLoader(appClassLoader);
95+
Assertions.assertEquals(classLoader, thread.getContextClassLoader());
96+
classLoaderService.releaseClassLoader(
97+
3L,
98+
Lists.newArrayList(new URL("file:///console.jar"), new URL("file:///fake.jar")));
99+
Assertions.assertNull(thread.getContextClassLoader());
100+
Thread.sleep(2000);
101+
Assertions.assertFalse(thread.isAlive());
102+
}
72103
}

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java

-3
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
2626
import org.apache.seatunnel.engine.common.config.server.ThreadShareMode;
2727
import org.apache.seatunnel.engine.common.exception.JobNotFoundException;
28-
import org.apache.seatunnel.engine.common.loader.ClassLoaderUtil;
2928
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
3029
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
3130
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
@@ -919,9 +918,7 @@ void taskDone(Task task) {
919918

920919
private void recycleClassLoader(TaskGroupLocation taskGroupLocation) {
921920
TaskGroupContext context = executionContexts.get(taskGroupLocation);
922-
ClassLoader classLoader = context.getClassLoader();
923921
executionContexts.get(taskGroupLocation).setClassLoader(null);
924-
ClassLoaderUtil.recycleClassLoaderFromThread(classLoader);
925922
classLoaderService.releaseClassLoader(taskGroupLocation.getJobId(), context.getJars());
926923
}
927924

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java

-3
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
3131
import org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
3232
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
33-
import org.apache.seatunnel.engine.common.loader.ClassLoaderUtil;
3433
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
3534
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
3635
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
@@ -217,8 +216,6 @@ public void init(long initializationTimestamp, boolean restart) throws Exception
217216
jobImmutableInformation.getJobId(),
218217
jobImmutableInformation.getPluginJarsUrls());
219218

220-
ClassLoaderUtil.recycleClassLoaderFromThread(classLoader);
221-
222219
final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
223220
PlanUtils.fromLogicalDAG(
224221
logicalDag,

0 commit comments

Comments
 (0)