Skip to content

Commit d6d3000

Browse files
authored
[Improve][Zeta] Split classloader in job master (#8622)
1 parent 78b23c0 commit d6d3000

File tree

21 files changed

+221
-108
lines changed

21 files changed

+221
-108
lines changed

Diff for: seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobExecutionEnvironment.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ public ClientJobProxy execute() throws ExecutionException, InterruptedException
194194
Long.parseLong(jobConfig.getJobContext().getJobId()),
195195
jobConfig.getName(),
196196
isStartWithSavePoint,
197-
seaTunnelHazelcastClient.getSerializationService().toData(logicalDag),
198-
jobConfig,
197+
seaTunnelHazelcastClient.getSerializationService(),
198+
logicalDag,
199199
new ArrayList<>(jarUrls),
200200
new ArrayList<>(connectorJarIdentifiers));
201201

Diff for: seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/classloader/DefaultClassLoaderService.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class DefaultClassLoaderService implements ClassLoaderService {
4343
private final Map<Long, Map<String, ClassLoader>> classLoaderCache;
4444
private final Map<Long, Map<String, AtomicInteger>> classLoaderReferenceCount;
4545
private final NodeEngine nodeEngine;
46+
public static final String SKIP_CHECK_JAR = "CLASSLOADER_SERVICE_SKIP_CHECK_JAR";
4647

4748
public DefaultClassLoaderService(boolean cacheMode, NodeEngine nodeEngine) {
4849
this.cacheMode = cacheMode;
@@ -70,7 +71,9 @@ public synchronized ClassLoader getClassLoader(long jobId, Collection<URL> jars)
7071
classLoaderReferenceCount.get(jobId).get(key).incrementAndGet();
7172
return classLoaderMap.get(key);
7273
} else {
73-
if (Objects.nonNull(nodeEngine)) {
74+
if (Objects.nonNull(nodeEngine)
75+
&& !Boolean.parseBoolean(
76+
System.getenv().getOrDefault(SKIP_CHECK_JAR, "false"))) {
7477
for (URL jar : jars) {
7578
File file = new File(jar.toURI().getPath());
7679
if (!file.exists()) {

Diff for: seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDag.java

+14-20
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.io.IOException;
3434
import java.util.LinkedHashMap;
3535
import java.util.LinkedHashSet;
36-
import java.util.Map;
3736
import java.util.Set;
3837

3938
/**
@@ -59,7 +58,7 @@ public class LogicalDag implements IdentifiedDataSerializable {
5958

6059
@Getter private JobConfig jobConfig;
6160
private final Set<LogicalEdge> edges = new LinkedHashSet<>();
62-
private final Map<Long, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();
61+
private final LinkedHashMap<Long, LogicalVertex> logicalVertexMap = new LinkedHashMap<>();
6362
private IdGenerator idGenerator;
6463
private boolean isStartWithSavePoint = false;
6564

@@ -82,7 +81,7 @@ public Set<LogicalEdge> getEdges() {
8281
return this.edges;
8382
}
8483

85-
public Map<Long, LogicalVertex> getLogicalVertexMap() {
84+
public LinkedHashMap<Long, LogicalVertex> getLogicalVertexMap() {
8685
return logicalVertexMap;
8786
}
8887

@@ -116,8 +115,18 @@ public void setStartWithSavePoint(boolean startWithSavePoint) {
116115
.forEach(
117116
e -> {
118117
JsonObject edge = new JsonObject();
119-
edge.add("inputVertex", e.getInputVertex().getAction().getName());
120-
edge.add("targetVertex", e.getTargetVertex().getAction().getName());
118+
edge.add(
119+
"inputVertex",
120+
logicalVertexMap
121+
.get(e.getInputVertexId())
122+
.getAction()
123+
.getName());
124+
edge.add(
125+
"targetVertex",
126+
logicalVertexMap
127+
.get(e.getTargetVertexId())
128+
.getAction()
129+
.getName());
121130
edges.add(edge);
122131
});
123132

@@ -137,13 +146,6 @@ public int getClassId() {
137146

138147
@Override
139148
public void writeData(ObjectDataOutput out) throws IOException {
140-
out.writeInt(logicalVertexMap.size());
141-
142-
for (Map.Entry<Long, LogicalVertex> entry : logicalVertexMap.entrySet()) {
143-
out.writeLong(entry.getKey());
144-
out.writeObject(entry.getValue());
145-
}
146-
147149
out.writeInt(edges.size());
148150

149151
for (LogicalEdge edge : edges) {
@@ -158,19 +160,11 @@ public void writeData(ObjectDataOutput out) throws IOException {
158160

159161
@Override
160162
public void readData(ObjectDataInput in) throws IOException {
161-
int vertexCount = in.readInt();
162-
163-
for (int i = 0; i < vertexCount; i++) {
164-
Long key = in.readLong();
165-
LogicalVertex value = in.readObject();
166-
logicalVertexMap.put(key, value);
167-
}
168163

169164
int edgeCount = in.readInt();
170165

171166
for (int i = 0; i < edgeCount; i++) {
172167
LogicalEdge edge = in.readObject();
173-
edge.recoveryFromVertexMap(logicalVertexMap);
174168
edges.add(edge);
175169
}
176170

Diff for: seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalDagGenerator.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -105,12 +105,7 @@ private Set<LogicalEdge> createLogicalEdges() {
105105
.map(
106106
entry ->
107107
entry.getValue().stream()
108-
.map(
109-
targetId ->
110-
new LogicalEdge(
111-
logicalVertexMap.get(
112-
entry.getKey()),
113-
logicalVertexMap.get(targetId)))
108+
.map(targetId -> new LogicalEdge(entry.getKey(), targetId))
114109
.collect(Collectors.toList()))
115110
.flatMap(Collection::stream)
116111
.collect(Collectors.toCollection(LinkedHashSet::new));

Diff for: seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/logical/LogicalEdge.java

+5-14
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,8 @@
2323
import com.hazelcast.nio.ObjectDataOutput;
2424
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
2525
import lombok.Data;
26-
import lombok.NonNull;
2726

2827
import java.io.IOException;
29-
import java.util.Map;
30-
31-
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
3228

3329
@Data
3430
public class LogicalEdge implements IdentifiedDataSerializable {
@@ -45,21 +41,16 @@ public class LogicalEdge implements IdentifiedDataSerializable {
4541

4642
public LogicalEdge() {}
4743

44+
public LogicalEdge(Long inputVertexId, Long targetVertexId) {
45+
this.inputVertexId = inputVertexId;
46+
this.targetVertexId = targetVertexId;
47+
}
48+
4849
public LogicalEdge(LogicalVertex inputVertex, LogicalVertex targetVertex) {
49-
this.inputVertex = inputVertex;
50-
this.targetVertex = targetVertex;
5150
this.inputVertexId = inputVertex.getVertexId();
5251
this.targetVertexId = targetVertex.getVertexId();
5352
}
5453

55-
public void recoveryFromVertexMap(@NonNull Map<Long, LogicalVertex> vertexMap) {
56-
inputVertex = vertexMap.get(inputVertexId);
57-
targetVertex = vertexMap.get(targetVertexId);
58-
59-
checkNotNull(inputVertex);
60-
checkNotNull(targetVertex);
61-
}
62-
6354
@Override
6455
public int getFactoryId() {
6556
return JobDataSerializerHook.FACTORY_ID;

Diff for: seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobImmutableInformation.java

+47-7
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,22 @@
1818
package org.apache.seatunnel.engine.core.job;
1919

2020
import org.apache.seatunnel.engine.common.config.JobConfig;
21+
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
2122
import org.apache.seatunnel.engine.core.serializable.JobDataSerializerHook;
2223

2324
import com.hazelcast.internal.nio.IOUtil;
2425
import com.hazelcast.internal.serialization.Data;
26+
import com.hazelcast.internal.serialization.SerializationService;
2527
import com.hazelcast.nio.ObjectDataInput;
2628
import com.hazelcast.nio.ObjectDataOutput;
2729
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
2830
import lombok.NonNull;
2931

3032
import java.io.IOException;
3133
import java.net.URL;
34+
import java.util.ArrayList;
3235
import java.util.List;
36+
import java.util.Set;
3337

3438
public class JobImmutableInformation implements IdentifiedDataSerializable {
3539
private long jobId;
@@ -42,6 +46,10 @@ public class JobImmutableInformation implements IdentifiedDataSerializable {
4246

4347
private Data logicalDag;
4448

49+
private final List<Data> logicalVertexDataList = new ArrayList<>();
50+
51+
private final List<Set<URL>> logicalVertexJarsList = new ArrayList<>();
52+
4553
private JobConfig jobConfig;
4654

4755
private List<URL> pluginJarsUrls;
@@ -64,28 +72,42 @@ public JobImmutableInformation(
6472
long jobId,
6573
String jobName,
6674
boolean isStartWithSavePoint,
67-
@NonNull Data logicalDag,
68-
@NonNull JobConfig jobConfig,
75+
SerializationService serializationService,
76+
@NonNull LogicalDag logicalDag,
6977
@NonNull List<URL> pluginJarsUrls,
7078
@NonNull List<ConnectorJarIdentifier> connectorJarIdentifiers) {
7179
this.createTime = System.currentTimeMillis();
7280
this.jobId = jobId;
7381
this.jobName = jobName;
7482
this.isStartWithSavePoint = isStartWithSavePoint;
75-
this.logicalDag = logicalDag;
76-
this.jobConfig = jobConfig;
83+
logicalDag
84+
.getLogicalVertexMap()
85+
.forEach(
86+
(k, v) -> {
87+
logicalVertexDataList.add(serializationService.toData(v));
88+
logicalVertexJarsList.add(v.getAction().getJarUrls());
89+
});
90+
this.logicalDag = serializationService.toData(logicalDag);
91+
this.jobConfig = logicalDag.getJobConfig();
7792
this.pluginJarsUrls = pluginJarsUrls;
7893
this.connectorJarIdentifiers = connectorJarIdentifiers;
7994
}
8095

8196
public JobImmutableInformation(
8297
long jobId,
8398
String jobName,
84-
@NonNull Data logicalDag,
85-
@NonNull JobConfig jobConfig,
99+
SerializationService serializationService,
100+
@NonNull LogicalDag logicalDag,
86101
@NonNull List<URL> pluginJarsUrls,
87102
@NonNull List<ConnectorJarIdentifier> connectorJarIdentifiers) {
88-
this(jobId, jobName, false, logicalDag, jobConfig, pluginJarsUrls, connectorJarIdentifiers);
103+
this(
104+
jobId,
105+
jobName,
106+
false,
107+
serializationService,
108+
logicalDag,
109+
pluginJarsUrls,
110+
connectorJarIdentifiers);
89111
}
90112

91113
public long getJobId() {
@@ -120,6 +142,14 @@ public List<ConnectorJarIdentifier> getPluginJarIdentifiers() {
120142
return connectorJarIdentifiers;
121143
}
122144

145+
public List<Data> getLogicalVertexDataList() {
146+
return logicalVertexDataList;
147+
}
148+
149+
public List<Set<URL>> getLogicalVertexJarsList() {
150+
return logicalVertexJarsList;
151+
}
152+
123153
@Override
124154
public int getFactoryId() {
125155
return JobDataSerializerHook.FACTORY_ID;
@@ -136,6 +166,11 @@ public void writeData(ObjectDataOutput out) throws IOException {
136166
out.writeString(jobName);
137167
out.writeBoolean(isStartWithSavePoint);
138168
out.writeLong(createTime);
169+
out.writeInt(logicalVertexDataList.size());
170+
for (int i = 0; i < logicalVertexDataList.size(); i++) {
171+
IOUtil.writeData(out, logicalVertexDataList.get(i));
172+
out.writeObject(logicalVertexJarsList.get(i));
173+
}
139174
IOUtil.writeData(out, logicalDag);
140175
out.writeObject(jobConfig);
141176
out.writeObject(pluginJarsUrls);
@@ -148,6 +183,11 @@ public void readData(ObjectDataInput in) throws IOException {
148183
jobName = in.readString();
149184
isStartWithSavePoint = in.readBoolean();
150185
createTime = in.readLong();
186+
int size = in.readInt();
187+
for (int i = 0; i < size; i++) {
188+
logicalVertexDataList.add(IOUtil.readData(in));
189+
logicalVertexJarsList.add(in.readObject());
190+
}
151191
logicalDag = IOUtil.readData(in);
152192
jobConfig = in.readObject();
153193
pluginJarsUrls = in.readObject();

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java

+44
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2323
import org.apache.seatunnel.api.table.catalog.TablePath;
2424
import org.apache.seatunnel.engine.common.config.EngineConfig;
25+
import org.apache.seatunnel.engine.core.classloader.ClassLoaderService;
2526
import org.apache.seatunnel.engine.core.dag.actions.Action;
2627
import org.apache.seatunnel.engine.core.dag.actions.ActionUtils;
2728
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
@@ -36,8 +37,11 @@
3637
import org.apache.seatunnel.engine.server.dag.execution.ExecutionPlanGenerator;
3738
import org.apache.seatunnel.engine.server.dag.execution.Pipeline;
3839

40+
import com.hazelcast.internal.serialization.Data;
41+
import com.hazelcast.internal.serialization.SerializationService;
3942
import lombok.extern.slf4j.Slf4j;
4043

44+
import java.net.URL;
4145
import java.util.ArrayList;
4246
import java.util.HashMap;
4347
import java.util.List;
@@ -50,6 +54,46 @@
5054
@Slf4j
5155
public class DAGUtils {
5256

57+
public static LogicalDag restoreLogicalDag(
58+
JobImmutableInformation jobImmutableInformation,
59+
SerializationService serializationService,
60+
List<ClassLoader> classLoaders) {
61+
LogicalDag logicalDag =
62+
serializationService.toObject(jobImmutableInformation.getLogicalDag());
63+
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
64+
try {
65+
List<Data> logicalVertexDataList = jobImmutableInformation.getLogicalVertexDataList();
66+
for (int i = 0; i < jobImmutableInformation.getLogicalVertexDataList().size(); i++) {
67+
Thread.currentThread().setContextClassLoader(classLoaders.get(i));
68+
logicalDag.addLogicalVertex(
69+
serializationService.toObject(logicalVertexDataList.get(i)));
70+
}
71+
return logicalDag;
72+
} finally {
73+
Thread.currentThread().setContextClassLoader(classLoader);
74+
}
75+
}
76+
77+
public static LogicalDag restoreLogicalDag(
78+
JobImmutableInformation jobImmutableInformation,
79+
SerializationService serializationService,
80+
ClassLoaderService classLoaderService) {
81+
List<Set<URL>> logicalVertexJarsList = jobImmutableInformation.getLogicalVertexJarsList();
82+
List<ClassLoader> classLoaders = new ArrayList<>();
83+
try {
84+
for (Set<URL> urls : logicalVertexJarsList) {
85+
classLoaders.add(
86+
classLoaderService.getClassLoader(
87+
jobImmutableInformation.getJobId(), urls));
88+
}
89+
return restoreLogicalDag(jobImmutableInformation, serializationService, classLoaders);
90+
} finally {
91+
for (Set<URL> urls : logicalVertexJarsList) {
92+
classLoaderService.releaseClassLoader(jobImmutableInformation.getJobId(), urls);
93+
}
94+
}
95+
}
96+
5397
public static JobDAGInfo getJobDAGInfo(
5498
LogicalDag logicalDag,
5599
JobImmutableInformation jobImmutableInformation,

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ private Set<ExecutionEdge> generateExecutionEdges(Set<LogicalEdge> logicalEdges)
159159
return 0;
160160
});
161161
for (LogicalEdge logicalEdge : sortedLogicalEdges) {
162-
LogicalVertex logicalInputVertex = logicalEdge.getInputVertex();
162+
LogicalVertex logicalInputVertex =
163+
logicalPlan.getLogicalVertexMap().get(logicalEdge.getInputVertexId());
163164
ExecutionVertex executionInputVertex =
164165
logicalVertexIdToExecutionVertexMap.computeIfAbsent(
165166
logicalInputVertex.getVertexId(),
@@ -176,7 +177,8 @@ private Set<ExecutionEdge> generateExecutionEdges(Set<LogicalEdge> logicalEdges)
176177
logicalInputVertex.getParallelism());
177178
});
178179

179-
LogicalVertex logicalTargetVertex = logicalEdge.getTargetVertex();
180+
LogicalVertex logicalTargetVertex =
181+
logicalPlan.getLogicalVertexMap().get(logicalEdge.getTargetVertexId());
180182
ExecutionVertex executionTargetVertex =
181183
logicalVertexIdToExecutionVertexMap.computeIfAbsent(
182184
logicalTargetVertex.getVertexId(),

0 commit comments

Comments
 (0)