Skip to content

Commit 775dbea

Browse files
[Feature] [Zeta] Optimize CoordinatorService ThreadPool Configuration to Prevent Potential OOM (#8241)
1 parent 353bbd2 commit 775dbea

File tree

11 files changed

+183
-3
lines changed

11 files changed

+183
-3
lines changed

Diff for: docs/en/seatunnel-engine/hybrid-cluster-deployment.md

+21
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,27 @@ seatunnel:
154154

155155
When `dynamic-slot: true` is used, the `job-schedule-strategy: WAIT` configuration will become invalid and will be forcibly changed to `job-schedule-strategy: REJECT`, because this parameter is meaningless in dynamic slots.
156156

157+
### 4.7 Coordinator Service
158+
159+
CoordinatorService responsible for the process of generating each job from a LogicalDag to an ExecutionDag,
160+
and then to a PhysicalDag. It ultimately creates the JobMaster for the job to handle scheduling, execution, and state monitoring.
161+
162+
**core-thread-num**
163+
164+
The corePoolSize of seatunnel coordinator job's executor cached thread pool
165+
166+
**max-thread-num**
167+
168+
The max job count can be executed at same time
169+
170+
Example
171+
172+
```yaml
173+
coordinator-service:
174+
core-thread-num: 30
175+
max-thread-num: 1000
176+
```
177+
157178
## 5. Configure The SeaTunnel Engine Network Service
158179

159180
All SeaTunnel Engine network-related configurations are in the `hazelcast.yaml` file.

Diff for: docs/en/seatunnel-engine/separated-cluster-deployment.md

+23
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,29 @@ seatunnel:
297297
```
298298
When `dynamic-slot: true` is used, the `job-schedule-strategy: WAIT` configuration will become invalid and will be forcibly changed to `job-schedule-strategy: REJECT`, because this parameter is meaningless in dynamic slots.
299299

300+
301+
### 4.8 Coordinator Service
302+
303+
CoordinatorService responsible for the process of generating each job from a LogicalDag to an ExecutionDag,
304+
and then to a PhysicalDag. It ultimately creates the JobMaster for the job to handle scheduling, execution, and state monitoring.
305+
306+
**core-thread-num**
307+
308+
The corePoolSize of seatunnel coordinator job's executor cached thread pool
309+
310+
**max-thread-num**
311+
312+
The max job count can be executed at same time
313+
314+
Example
315+
316+
```yaml
317+
coordinator-service:
318+
core-thread-num: 30
319+
max-thread-num: 1000
320+
```
321+
322+
300323
## 5. Configuring SeaTunnel Engine Network Services
301324

302325
All network-related configurations of the SeaTunnel Engine are in the `hazelcast-master.yaml` and `hazelcast-worker.yaml` files.

Diff for: docs/zh/seatunnel-engine/hybrid-cluster-deployment.md

+22
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,28 @@ seatunnel:
153153

154154
当`dynamic-slot: ture`时,`job-schedule-strategy: WAIT` 配置会失效,将被强制修改为`job-schedule-strategy: REJECT`,因为动态Slot时该参数没有意义,可以直接提交。
155155

156+
### 4.7 Coordinator Service
157+
158+
CoordinatorService 提供了每个作业从 LogicalDag 到 ExecutionDag,再到 PhysicalDag 的生成流程, 并最终创建作业的 JobMaster 进行作业的调度执行和状态监控
159+
160+
**core-thread-num**
161+
162+
配置 CoordinatorService 线程池核心线程数量
163+
164+
**max-thread-num**
165+
166+
同时可执行的最大作业数量
167+
168+
Example
169+
170+
```yaml
171+
coordinator-service:
172+
core-thread-num: 30
173+
max-thread-num: 1000
174+
```
175+
176+
177+
156178
## 5. 配置 SeaTunnel Engine 网络服务
157179

158180
所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast.yaml` 文件中.

Diff for: docs/zh/seatunnel-engine/separated-cluster-deployment.md

+20
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,26 @@ seatunnel:
301301

302302
`dynamic-slot: ture`时,`job-schedule-strategy: WAIT` 配置会失效,将被强制修改为`job-schedule-strategy: REJECT`,因为动态Slot时该参数没有意义,可以直接提交。
303303

304+
### 4.8 Coordinator Service
305+
306+
CoordinatorService 提供了每个作业从 LogicalDag 到 ExecutionDag,再到 PhysicalDag 的生成流程, 并最终创建作业的 JobMaster 进行作业的调度执行和状态监控
307+
308+
**core-thread-num**
309+
310+
配置 CoordinatorService 线程池核心线程数量
311+
312+
**max-thread-num**
313+
314+
同时可执行的最大作业数量
315+
316+
Example
317+
318+
```yaml
319+
coordinator-service:
320+
core-thread-num: 30
321+
max-thread-num: 1000
322+
```
323+
304324
## 5. 配置 SeaTunnel Engine 网络服务
305325
306326
所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast-master.yaml`和`hazelcast-worker.yaml` 文件中.

Diff for: seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java

+4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
2121
import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
22+
import org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig;
2223
import org.apache.seatunnel.engine.common.config.server.HttpConfig;
2324
import org.apache.seatunnel.engine.common.config.server.QueueType;
2425
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
@@ -57,6 +58,9 @@ public class EngineConfig {
5758

5859
private CheckpointConfig checkpointConfig = ServerConfigOptions.CHECKPOINT.defaultValue();
5960

61+
private CoordinatorServiceConfig coordinatorServiceConfig =
62+
ServerConfigOptions.COORDINATOR_SERVICE.defaultValue();
63+
6064
private ConnectorJarStorageConfig connectorJarStorageConfig =
6165
ServerConfigOptions.CONNECTOR_JAR_STORAGE_CONFIG.defaultValue();
6266

Diff for: seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java

+22
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.seatunnel.engine.common.config.server.ConnectorJarHAStorageConfig;
2323
import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
2424
import org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageMode;
25+
import org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig;
2526
import org.apache.seatunnel.engine.common.config.server.HttpConfig;
2627
import org.apache.seatunnel.engine.common.config.server.QueueType;
2728
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
@@ -106,6 +107,25 @@ private SlotServiceConfig parseSlotServiceConfig(Node slotServiceNode) {
106107
return slotServiceConfig;
107108
}
108109

110+
private CoordinatorServiceConfig parseCoordinatorServiceConfig(Node coordinatorServiceNode) {
111+
CoordinatorServiceConfig coordinatorServiceConfig = new CoordinatorServiceConfig();
112+
for (Node node : childElements(coordinatorServiceNode)) {
113+
String name = cleanNodeName(node);
114+
if (ServerConfigOptions.MAX_THREAD_NUM.key().equals(name)) {
115+
coordinatorServiceConfig.setMaxThreadNum(
116+
getIntegerValue(
117+
ServerConfigOptions.MAX_THREAD_NUM.key(), getTextContent(node)));
118+
} else if (ServerConfigOptions.CORE_THREAD_NUM.key().equals(name)) {
119+
coordinatorServiceConfig.setCoreThreadNum(
120+
getIntegerValue(
121+
ServerConfigOptions.CORE_THREAD_NUM.key(), getTextContent(node)));
122+
} else {
123+
LOGGER.warning("Unrecognized element: " + name);
124+
}
125+
}
126+
return coordinatorServiceConfig;
127+
}
128+
109129
private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
110130
final EngineConfig engineConfig = config.getEngineConfig();
111131
for (Node node : childElements(engineNode)) {
@@ -177,6 +197,8 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
177197
ScheduleStrategy.valueOf(getTextContent(node).toUpperCase(Locale.ROOT)));
178198
} else if (ServerConfigOptions.HTTP.key().equals(name)) {
179199
engineConfig.setHttpConfig(parseHttpConfig(node));
200+
} else if (ServerConfigOptions.COORDINATOR_SERVICE.key().equals(name)) {
201+
engineConfig.setCoordinatorServiceConfig(parseCoordinatorServiceConfig(node));
180202
} else {
181203
LOGGER.warning("Unrecognized element: " + name);
182204
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.engine.common.config.server;
19+
20+
import lombok.Data;
21+
22+
import java.io.Serializable;
23+
24+
import static com.hazelcast.internal.util.Preconditions.checkPositive;
25+
26+
@Data
27+
public class CoordinatorServiceConfig implements Serializable {
28+
29+
private int coreThreadNum = ServerConfigOptions.CORE_THREAD_NUM.defaultValue();
30+
31+
private int maxThreadNum = ServerConfigOptions.MAX_THREAD_NUM.defaultValue();
32+
33+
public void setCoreThreadNum(int coreThreadNum) {
34+
checkPositive(coreThreadNum, ServerConfigOptions.CORE_THREAD_NUM + " must be >= 0");
35+
this.coreThreadNum = coreThreadNum;
36+
}
37+
38+
public void setMaxThreadNum(int maxThreadNum) {
39+
checkPositive(maxThreadNum, ServerConfigOptions.MAX_THREAD_NUM + " must be > 0");
40+
this.maxThreadNum = maxThreadNum;
41+
}
42+
}

Diff for: seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java

+19
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,25 @@ public class ServerConfigOptions {
132132
.type(new TypeReference<Map<String, String>>() {})
133133
.noDefaultValue()
134134
.withDescription("The checkpoint storage instance configuration.");
135+
136+
public static final Option<Integer> CORE_THREAD_NUM =
137+
Options.key("core-thread-num")
138+
.intType()
139+
.defaultValue(10)
140+
.withDescription("The core thread num of coordinator service");
141+
142+
public static final Option<Integer> MAX_THREAD_NUM =
143+
Options.key("max-thread-num")
144+
.intType()
145+
.defaultValue(Integer.MAX_VALUE)
146+
.withDescription("The max thread num of coordinator service");
147+
148+
public static final Option<CoordinatorServiceConfig> COORDINATOR_SERVICE =
149+
Options.key("coordinator-service")
150+
.type(new TypeReference<CoordinatorServiceConfig>() {})
151+
.defaultValue(new CoordinatorServiceConfig())
152+
.withDescription("The coordinator service configuration.");
153+
135154
public static final Option<Integer> HISTORY_JOB_EXPIRE_MINUTES =
136155
Options.key("history-job-expire-minutes")
137156
.intType()

Diff for: seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ public void testSeaTunnelConfig() {
7676
Assertions.assertTrue(config.getEngineConfig().getHttpConfig().isEnableDynamicPort());
7777
Assertions.assertEquals(8080, config.getEngineConfig().getHttpConfig().getPort());
7878
Assertions.assertEquals(200, config.getEngineConfig().getHttpConfig().getPortRange());
79+
Assertions.assertEquals(
80+
30, config.getEngineConfig().getCoordinatorServiceConfig().getCoreThreadNum());
81+
Assertions.assertEquals(
82+
1000, config.getEngineConfig().getCoordinatorServiceConfig().getMaxThreadNum());
7983
}
8084

8185
@Test

Diff for: seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ seatunnel:
2222
slot-service:
2323
dynamic-slot: false
2424
slot-num: 5
25+
coordinator-service:
26+
core-thread-num: 30
27+
max-thread-num: 1000
2528
checkpoint:
2629
interval: 6000
2730
timeout: 7000

Diff for: seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,12 @@ public CoordinatorService(
199199
@NonNull SeaTunnelServer seaTunnelServer,
200200
EngineConfig engineConfig) {
201201
this.nodeEngine = nodeEngine;
202+
this.engineConfig = engineConfig;
202203
this.logger = nodeEngine.getLogger(getClass());
203204
this.executorService =
204205
new ThreadPoolExecutor(
205-
0,
206-
Integer.MAX_VALUE,
206+
engineConfig.getCoordinatorServiceConfig().getCoreThreadNum(),
207+
engineConfig.getCoordinatorServiceConfig().getMaxThreadNum(),
207208
60L,
208209
TimeUnit.SECONDS,
209210
new SynchronousQueue<>(),
@@ -212,7 +213,6 @@ public CoordinatorService(
212213
.build(),
213214
new ThreadPoolStatus.RejectionCountingHandler());
214215
this.seaTunnelServer = seaTunnelServer;
215-
this.engineConfig = engineConfig;
216216
masterActiveListener = Executors.newSingleThreadScheduledExecutor();
217217
masterActiveListener.scheduleAtFixedRate(
218218
this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);

0 commit comments

Comments
 (0)