Skip to content

Commit c4a7afc

Browse files
[Feature][Core] Add slot allocation strategy (#8233)
1 parent 36b3dd2 commit c4a7afc

File tree

35 files changed

+2348
-43
lines changed

35 files changed

+2348
-43
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# Slot Allocation Strategy
2+
3+
Slot allocation strategy is an important part of SeaTunnel Engine, which determines how SeaTunnel Engine allocates tasks to different slots. The slot allocation strategy is a configurable component, and users can configure the slot allocation strategy according to their needs.
4+
5+
**Configuration method:**
6+
7+
Set the parameter `slot-allocation-strategy`, optional values are `RANDOM`, `SYSTEM_LOAD`, `SLOT_RATIO`.
8+
9+
Example:
10+
11+
```yaml
12+
seatunnel:
13+
engine:
14+
slot-service:
15+
slot-allocation-strategy: RANDOM
16+
```
17+
18+
## RANDOM (default value)
19+
20+
The random allocation strategy is the default slot allocation strategy of SeaTunnel Engine, which randomly allocates tasks to different slots.
21+
22+
## SYSTEM_LOAD
23+
24+
The system load strategy allocates slots based on the system load, dynamically adjusting the slot allocation according to the system load.
25+
26+
### 1. **Design of time weight**
27+
28+
Time weight reflects the impact of time on scheduling priority:
29+
30+
- Recent data is given higher weight, and historical data gradually decays.
31+
32+
- Using the distribution $4, 2, 2, 1, 1$ and normalizing it, the time weight for each statistic is:
33+
34+
$$ \text{Time weight ratio} = \frac{\text{Current weight}}{10} $$
35+
36+
> When the cluster is just started and there are less than 5 data points, normalization is done separately, and the calculation formula will be dynamically adjusted, which will not be elaborated here.
37+
38+
### 2. **Resource utilization calculation**
39+
40+
Evaluate the idle rate of CPU and memory resources comprehensively according to the weight:
41+
42+
$$ \text{Resource idle rate} = \frac{(1 - \text{CPU utilization}) \cdot \text{CPU weight} + (1 - \text{Memory utilization}) \cdot \text{Memory weight}}{\text{CPU weight} + \text{Memory weight}} $$
43+
44+
- $(1 - \text{CPU utilization})$ and $(1 - \text{Memory utilization})$ in the formula are idle rates.
45+
46+
- The weights of CPU and memory can be adjusted according to specific needs (e.g., $0.6$ and $0.4$), flexibly adapting to different scenarios.
47+
48+
### 3. **Time decay and scheduling priority formula**
49+
50+
After introducing time weight decay, the formula for calculating scheduling priority is:
51+
52+
$$
53+
\text{Comprehensive resource idle rate} = \sum_{i=1}^{5} \left( \frac{(1 - \text{CPU utilization}_i) \cdot \text{CPU weight} + (1 - \text{Memory utilization}_i) \cdot \text{Memory weight}}{\text{CPU weight} + \text{Memory weight}} \cdot \text{Time weight}_i \right)
54+
$$
55+
56+
### 4. **Dynamic adjustment of resource idle rate for slot allocation**
57+
58+
When allocating multiple slots, considering the real-time update and dynamic simulation of resource status (because the resource load of the same task will not change quickly):
59+
60+
- **Resource ratio used by each slot** = (1 - Comprehensive resource idle rate) ÷ Number of allocated slots
61+
62+
- Update the idle rate of the corresponding node after allocating the slot:
63+
64+
$$ \text{Idle rate after slot allocation} = \text{Comprehensive resource idle rate} - \text{Resource ratio used by each slot} $$
65+
66+
- By default, a single slot uses 10% of resources (it is not known how much resources a slot occupies when it is first started, so it is set to 10% by default. The reason for not setting it too low is to prevent allocating too many resources and causing the node to be overloaded. The next time monitoring information is captured, it will be relatively accurate).
67+
68+
This method makes scheduling more in line with the actual resource usage.
69+
70+
### 5. **Introduction of balance factor**
71+
72+
Only dynamically adjusting the resource idle rate through slot allocation may also have errors. We introduce a balance factor based on the number of slots to measure the current load status of the node and avoid over-concentration of scheduling resource allocation:
73+
74+
> This number can be counted in real-time to optimize the scheduling priority indicator.
75+
76+
$$
77+
\text{BalanceFactor}_i = 1 - \frac{S_{\text{used},i}}{S_{\text{total},i}}
78+
$$
79+
80+
- $S_{\text{used},i}$: Number of slots allocated to node $i$.
81+
- $S_{\text{total},i}$: Total number of slots of node $i$.
82+
83+
Adjust the scheduling priority through the balance factor:
84+
85+
$$
86+
W_i = \alpha \cdot \text{Idle rate after slot allocation}_i + \beta \cdot \text{BalanceFactor}_i
87+
$$
88+
89+
**Parameter meaning**:
90+
- $\alpha$: Weight focusing on resource utilization: 0.7
91+
- $\beta$: Weight of the balance factor to prevent single-point overload: 0.3
92+
93+
### 6. **Dynamic adjustment logic**
94+
95+
- Collect CPU and memory utilization regularly, maintaining the most recent 5 statistics.
96+
- Dynamically update weights for the same task, gradually decaying old data.
97+
- Dynamic balance based on slot usage.
98+
99+
> Explanation:
100+
> For example, if we have two nodes and need to allocate 10 slots, A has 10 idle slots, and B has 20 idle slots. After calculating the weights of the 10 slots through steps 4 and 5, the weights of node A are higher than those of node B.
101+
> Then we still think that node A should allocate resources. This may be because the slot configuration of node B in the cluster is not optimal (the slot configuration of the worker node is too small).
102+
103+
## SLOT_RATIO
104+
105+
The slot ratio strategy schedules based on the slot usage rate, with higher priority given to slots with lower usage rates.
106+
107+
**Calculation logic**:
108+
109+
1. Get the total number of slots of the worker.
110+
2. Get the number of unallocated slots.
111+
3. Usage rate = (Total number of slots - Number of unallocated slots) / Total number of slots.

docs/sidebars.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,8 @@ const sidebars = {
213213
"seatunnel-engine/user-command",
214214
"seatunnel-engine/logging",
215215
"seatunnel-engine/telemetry",
216-
"seatunnel-engine/web-ui"
216+
"seatunnel-engine/web-ui",
217+
"seatunnel-engine/slot-allocation-strategy"
217218
]
218219
},
219220
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
---
2+
sidebar_position: 15
3+
---
4+
5+
# Slot分配策略
6+
7+
Slot分配策略是SeaTunnel Engine的一个重要组成部分,它决定了SeaTunnel Engine如何将任务分配到不同的Slot上。Slot分配策略是一个可配置的组件,用户可以根据自己的需求来配置Slot分配策略。
8+
9+
**配置方法:**
10+
11+
设置参数`slot-allocation-strategy`, 可选值有`RANDOM`, `SYSTEM_LOAD`, `SLOT_RATIO`
12+
13+
例:
14+
```yaml
15+
seatunnel:
16+
engine:
17+
slot-service:
18+
slot-allocation-strategy: RANDOM
19+
...
20+
```
21+
22+
## RANDOM(默认值)
23+
24+
随机分配策略是SeaTunnel Engine默认的Slot分配策略,它将任务随机分配到不同的Slot上。
25+
26+
## SYSTEM_LOAD
27+
28+
系统负载策略是根据系统的负载情况来分配Slot的策略,它会根据系统的负载情况来动态调整Slot的分配。
29+
30+
### 1. **时间权重的设计**
31+
时间权重体现了时间对调度优先级的影响:
32+
- 最近的数据赋予较高权重,历史数据逐渐衰减。
33+
- 采用分布 $4, 2, 2, 1, 1$ 并归一化后,每次统计的时间权重为:
34+
$$ \text{时间权重比例} = \frac{\text{当前权重}}{10} $$
35+
36+
> 当集群刚启动时,数据不足5条,会单独做归一化,这里计算公式会动态调整,不做赘述。
37+
38+
### 2. **资源利用率计算**
39+
将 CPU 和内存资源的空闲率按照权重进行综合评估:
40+
$$ \text{资源空闲率} = \frac{(1 - \text{CPU利用率}) \cdot \text{CPU权重} + (1 - \text{内存利用率}) \cdot \text{内存权重}}{\text{CPU权重} + \text{内存权重}} $$
41+
42+
- 公式中的 $(1 - \text{CPU利用率})$ 和 $(1 - \text{内存利用率})$ 是空闲率。
43+
- CPU 和内存的权重可根据具体需求调整(如 $0.6$ 和 $0.4$),灵活适应不同场景。
44+
45+
### 3. **时间衰减与调度优先级公式**
46+
47+
引入时间权重衰减后,计算调度优先级的公式为:
48+
$$
49+
\text{综合资源空闲率} = \sum_{i=1}^{5} \left( \frac{(1 - \text{CPU利用率}_i) \cdot \text{CPU权重} + (1 - \text{内存利用率}_i) \cdot \text{内存权重}}{\text{CPU权重} + \text{内存权重}} \cdot \text{时间权重}_i \right)
50+
$$
51+
52+
### 4. **Slot分配的资源空闲率动态调整**
53+
分配多个 Slot 时,考虑到资源状态的实时更新和动态模拟(因为同一任务资源快速分配负载是不会变化):
54+
- **每个 Slot 使用的资源比** = (1-综合资源空闲率) ÷ 已分配的 Slot 数量
55+
- 分配 Slot 后更新对应节点的空闲率:
56+
$$ \text{Slot分配后的空闲率} = \text{综合资源空闲率} - \text{每 Slot 使用的资源比} $$
57+
- 默认单个Slot使用10%资源(首次启动无法得知但Slot占用资源,这里默认设置为10%,不设置太低的原因是防止分配过多资源导致该节点负载太高。等下一次监控信息捕获到就会相对准确。)
58+
59+
这种方法属于与计算使得调度更加贴合实际资源使用情况。
60+
61+
### 5. **平衡因子引入**
62+
只通过Slot动态调整更新资源空闲率可能也会存在误差,我们引入基于Slot数量的平衡因子,衡量节点当前负载状态,避免调度资源分配过于集中:
63+
> 该数量可以实时统计到,用来优化调度优先级指标
64+
65+
$$
66+
\text{BalanceFactor}_i = 1 - \frac{S_{\text{used},i}}{S_{\text{total},i}}
67+
$$
68+
69+
- $S_{\text{used},i}$:节点 $i$ 已分配的 Slot 数。
70+
- $S_{\text{total},i}$:节点 $i$ 的 Slot 总数。
71+
72+
通过平衡因子调整调度优先级:
73+
$$
74+
W_i = \alpha \cdot \text{Slot分配后的空闲率}_i + \beta \cdot \text{BalanceFactor}_i
75+
$$
76+
77+
**参数意义**
78+
- $\alpha$:侧重资源利用率的权重:0.7
79+
- $\beta$:平衡因子的权重,防止单点过载。:0.3
80+
81+
### 6. **动态调整逻辑**
82+
- 定时采集 CPU 和内存利用率,维持最近 5 次的统计数据。
83+
- 同一任务动态更新权重,对旧数据逐步衰减。
84+
- 根据Slot使用,动态平衡。
85+
86+
> 说明:
87+
> 比如我们有两个节点,需要申请10个Slot,A有10个空闲Slot,B有20个空闲Slot,当通过第四步、第五步计算后,10个Slot的权重计算,A节点权重都比B节点权重高。
88+
> 那么我们仍认为A节点应该分配资源,此时可能是因为集群B节点Slot数量配置不是最佳导致(Worker节点Slot配置少了)。
89+
90+
## SLOT_RATIO
91+
92+
Slot比例策略是根据Slot的使用率来进行调度,使用率越低的Slot优先级越高。
93+
94+
**计算逻辑**
95+
1. 获取Worker总Slot数
96+
2. 获取未分配Slot数。
97+
3. 使用率 = (总插槽数 - 未分配插槽数) / 总插槽数。
98+

seatunnel-dist/release-docs/LICENSE

+3
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,8 @@ The text of each license is the standard Apache 2.0 license.
307307
(Apache-2.0) kerby-pkix (org.apache.kerby:kerby-pkix:1.0.1 - https://github.com/apache/directory-kerby)
308308
(Apache-2.0) kerby-util (org.apache.kerby:kerby-util:1.0.1 - https://github.com/apache/directory-kerby)
309309
(Apache-2.0) kerby-xdr (org.apache.kerby:kerby-xdr:1.0.1 - https://github.com/apache/directory-kerby)
310+
(Apache-2.0) jna (net.java.dev.jna:jna:5.15.0 - https://github.com/java-native-access/jna)
311+
(Apache-2.0) jna-platform (net.java.dev.jna:jna-platform:5.15.0 - https://github.com/java-native-access/jna)
310312
(Apache-2.0) token-provider (org.apache.kerby:token-provider:1.0.1 - https://github.com/apache/directory-kerby)
311313
(Apache-2.0) snappy-java (org.xerial.snappy:snappy-java:1.0.5 - https://github.com/xerial/snappy-java)
312314
(Apache-2.0) snappy-java (org.xerial.snappy:snappy-java:1.1.8.3 - https://github.com/xerial/snappy-java)
@@ -396,6 +398,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
396398
(MIT License) jcl-over-slf4j (org.slf4j:jcl-over-slf4j:1.7.25 - http://www.slf4j.org)
397399
(MIT License) animal-sniffer-annotations (org.codehaus.mojo:animal-sniffer-annotations:1.17 - https://mvnrepository.com/artifact/org.codehaus.mojo/animal-sniffer-annotations/1.17)
398400
(MIT License) checker-qual (org.checkerframework:checker-qual:3.10.0 - https://mvnrepository.com/)
401+
(MIT License) oshi-core (com.github.oshi:oshi-core:6.6.5 - https://github.com/oshi/oshi)
399402

400403
========================================================================
401404
EPL-1.0 and Apache-2.0 licenses

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,13 @@
126126
<scope>test</scope>
127127
</dependency>
128128

129+
<dependency>
130+
<groupId>com.github.oshi</groupId>
131+
<artifactId>oshi-core</artifactId>
132+
<version>6.6.5</version>
133+
<scope>test</scope>
134+
</dependency>
135+
129136
<dependency>
130137
<groupId>io.netty</groupId>
131138
<artifactId>netty-buffer</artifactId>

seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public void testJobRetryTimes() throws IOException, InterruptedException {
6060
Assertions.assertTrue(
6161
server.getLogs()
6262
.contains(
63-
"Restore time 3, pipeline Job stream_fake_to_inmemory_with_error.conf"));
63+
"Restore time 3, pipeline Job stream_fake_to_inmemory_with_error.conf"),
64+
server.getLogs());
6465
}
6566

6667
@Test
@@ -77,14 +78,16 @@ public void testMultiTableSinkFailedWithThrowable() throws IOException, Interrup
7778
Container.ExecResult execResult =
7879
executeJob(server, "/stream_fake_to_inmemory_with_throwable_error.conf");
7980
Assertions.assertNotEquals(0, execResult.getExitCode());
80-
Assertions.assertTrue(execResult.getStderr().contains("table fake sink throw error"));
81+
Assertions.assertTrue(
82+
execResult.getStderr().contains("table fake sink throw error"),
83+
execResult.getStderr());
8184
}
8285

8386
@Test
8487
public void testSaveModeOnMasterOrClient() throws IOException, InterruptedException {
8588
Container.ExecResult execResult =
8689
executeJob(server, "/savemode/fake_to_inmemory_savemode.conf");
87-
Assertions.assertEquals(0, execResult.getExitCode());
90+
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
8891
int serverLogLength = 0;
8992
String serverLogs = server.getLogs();
9093
Assertions.assertTrue(
@@ -102,7 +105,7 @@ public void testSaveModeOnMasterOrClient() throws IOException, InterruptedExcept
102105

103106
// restore will not execute savemode
104107
execResult = restoreJob(server, "/savemode/fake_to_inmemory_savemode.conf", "1");
105-
Assertions.assertEquals(0, execResult.getExitCode());
108+
Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr());
106109
// clear old logs
107110
serverLogLength += serverLogs.length();
108111
serverLogs = server.getLogs().substring(serverLogLength);
@@ -112,7 +115,7 @@ public void testSaveModeOnMasterOrClient() throws IOException, InterruptedExcept
112115
// test savemode on client side
113116
Container.ExecResult execResult2 =
114117
executeJob(server, "/savemode/fake_to_inmemory_savemode_client.conf");
115-
Assertions.assertEquals(0, execResult2.getExitCode());
118+
Assertions.assertEquals(0, execResult2.getExitCode(), execResult2.getStderr());
116119
// clear old logs
117120
serverLogLength += serverLogs.length();
118121
serverLogs = server.getLogs().substring(serverLogLength);

0 commit comments

Comments
 (0)