Skip to content

Commit 9aa3be0

Browse files
authored
feat: Implement blocking queue SPI extension and add documentation (#1612)
* Add BlockingQueueManager * Remove hardcoded queues * Add queue extension and switching test code * Add a custom queue documents * Update queue switching description * fix: upgrade Mockito to 4.11.0 * optimize ThreadPoolRebuilder * Add custom SPI blocking queue type support * fix: Enable inline mock maker to support static mocks * Apply default capacity for custom queue types * Remove ThreadPoolRebuilder runtime queue switching * Clarify responsibilities and eliminate redundancy in BlockingQueueManager * rollback mockito version to 3.12.4 * Update frontend static files with custom queue support * Allow passing queue capacity to custom blocking queue SPI
1 parent 7d78be3 commit 9aa3be0

File tree

101 files changed

+1585
-164
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+1585
-164
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
---
2+
sidebar_position: 4
3+
---
4+
5+
# 阻塞队列自定义
6+
7+
Hippo4j 通过 SPI 的方式对拒绝策略进行扩展,可以让用户在 Hippo4j 中完成自定义阻塞队列实现。
8+
9+
## 1. 定义自定义队列类
10+
11+
实现接口 `cn.hippo4j.common.executor.support.CustomBlockingQueue<T>`
12+
13+
```java
14+
public class MyArrayBlockingQueue implements CustomBlockingQueue<Runnable> {
15+
16+
@Override
17+
public Integer getType() {
18+
return 1001;
19+
}
20+
21+
@Override
22+
public String getName() {
23+
return "MyArrayBlockingQueue";
24+
}
25+
26+
@Override
27+
public BlockingQueue<Runnable> generateBlockingQueue(Integer capacity) {
28+
int effectiveCapacity = capacity == null || capacity <= 0 ? 1024 : capacity;
29+
return new ArrayBlockingQueue<>(effectiveCapacity);
30+
}
31+
}
32+
```
33+
34+
> 兼容提示:旧版只需实现 `generateBlockingQueue()` 的实现仍然有效,框架会在未覆写新方法时回退到旧逻辑,但推荐改为覆写带 `capacity` 入参的方法,以便直接复用服务端配置。
35+
36+
## 2. 声明 SPI 文件
37+
38+
`src/main/resources/META-INF/services/` 目录下新增文件:
39+
```
40+
cn.hippo4j.common.executor.support.CustomBlockingQueue
41+
```
42+
43+
文件内容仅一行:
44+
```
45+
com.example.queue.MyArrayBlockingQueue
46+
```
47+
48+
## 3. 服务端生效方式
49+
50+
当服务端下发的 `queueType``capacity` 命中自定义类型时,框架会通过 SPI 自动创建队列,并将服务端配置的容量参数传入 `generateBlockingQueue(Integer capacity)`
51+
52+
### 3.1 队列创建与验证
53+
54+
```java
55+
// 创建队列
56+
BlockingQueue<T> q = BlockingQueueTypeEnum.createBlockingQueue(queueType, capacity);
57+
58+
// 或者通过队列名称创建
59+
BlockingQueue<T> q2 = BlockingQueueTypeEnum.createBlockingQueue("ArrayBlockingQueue", capacity);
60+
61+
// 验证队列配置
62+
boolean valid = BlockingQueueManager.validateQueueConfig(queueType, capacity);
63+
64+
// 动态调整容量(仅 ResizableCapacityLinkedBlockingQueue 支持)
65+
boolean ok = BlockingQueueManager.changeQueueCapacity(executor.getQueue(), newCapacity);
66+
```
67+
68+
### 3.2 队列类型生效
69+
70+
- **配置模板**:在线程池管理页面编辑队列类型,会保存到数据库,但不会推送到运行中的客户端。
71+
- **生效时机**:客户端应用重启时,会从服务端读取最新配置,并使用反射替换线程池的 `workQueue` 字段。
72+
- **运行时调整**:运行时仅支持队列容量的动态调整(仅限 `ResizableCapacityLinkedBlockingQueue`),不支持队列类型切换。
73+
74+
服务端动态刷新处的实现:
75+
76+
```java
77+
// ServerThreadPoolDynamicRefresh#handleQueueChanges
78+
// 仅支持容量调整,不支持队列类型切换
79+
if (parameter.getCapacity() != null) {
80+
if (BlockingQueueManager.canChangeCapacity(executor.getQueue())) {
81+
boolean success = BlockingQueueManager.changeQueueCapacity(
82+
executor.getQueue(), parameter.getCapacity());
83+
if (success) {
84+
log.info("Queue capacity changed to: {} for thread pool: {}",
85+
parameter.getCapacity(), parameter.getTpId());
86+
}
87+
}
88+
}
89+
```
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
---
2+
sidebar_position: 3
3+
---
4+
5+
# 内置阻塞队列
6+
7+
Hippo4j 内置多种常用阻塞队列类型,支持开箱即用,亦可通过 SPI 扩展自定义队列类型。
8+
9+
## 内置类型清单
10+
11+
以下类型可直接在服务端或配置中选择(枚举:`BlockingQueueTypeEnum`):
12+
13+
- ArrayBlockingQueue(数组有界队列)
14+
- LinkedBlockingQueue(链表队列)
15+
- LinkedBlockingDeque(双端队列)
16+
- SynchronousQueue(同步移交队列)
17+
- LinkedTransferQueue(可转移队列)
18+
- PriorityBlockingQueue(优先级队列)
19+
- ResizableCapacityLinkedBlockingQueue(可在线动态调容量的链表队列)
20+
21+
## 枚举定义
22+
23+
**ResizableCapacityLinkedBlockingQueue**:支持在线变更 `capacity`
24+
25+
```java
26+
// cn.hippo4j.common.executor.support.BlockingQueueTypeEnum
27+
RESIZABLE_LINKED_BLOCKING_QUEUE(9, "ResizableCapacityLinkedBlockingQueue") {
28+
@Override
29+
<T> BlockingQueue<T> of(Integer capacity) {
30+
return new ResizableCapacityLinkedBlockingQueue<>(capacity);
31+
}
32+
@Override
33+
<T> BlockingQueue<T> of() {
34+
return new ResizableCapacityLinkedBlockingQueue<>();
35+
}
36+
}
37+
```
38+
39+
## 使用建议
40+
41+
- 需要在线调容量:优先选择 `ResizableCapacityLinkedBlockingQueue`
42+
- 需要严格有界:选择 `ArrayBlockingQueue`
43+
- 需要无界吞吐:选择 `LinkedBlockingQueue`
44+
- 需要优先级:选择 `PriorityBlockingQueue`
45+
- 需要同步移交:选择 `SynchronousQueue`
46+
47+
如需自定义队列类型,请参考《阻塞队列自定义》。
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
---
2+
sidebar_position: 4
3+
---
4+
5+
# 阻塞队列自定义
6+
7+
Hippo4j 通过 SPI 的方式对拒绝策略进行扩展,可以让用户在 Hippo4j 中完成自定义阻塞队列实现。
8+
9+
## 1. 定义自定义队列类
10+
11+
实现接口 `cn.hippo4j.common.executor.support.CustomBlockingQueue<T>`
12+
13+
```java
14+
public class MyArrayBlockingQueue implements CustomBlockingQueue<Runnable> {
15+
16+
@Override
17+
public Integer getType() {
18+
return 1001;
19+
}
20+
21+
@Override
22+
public String getName() {
23+
return "MyArrayBlockingQueue";
24+
}
25+
26+
@Override
27+
public BlockingQueue<Runnable> generateBlockingQueue(Integer capacity) {
28+
int effectiveCapacity = capacity == null || capacity <= 0 ? 1024 : capacity;
29+
return new ArrayBlockingQueue<>(effectiveCapacity);
30+
}
31+
}
32+
```
33+
34+
> 兼容提示:旧版只需实现 `generateBlockingQueue()` 的实现仍然有效,框架会在未覆写新方法时回退到旧逻辑,但推荐改为覆写带 `capacity` 入参的方法,以便直接复用服务端配置。
35+
36+
## 2. 声明 SPI 文件
37+
38+
`src/main/resources/META-INF/services/` 目录下新增文件:
39+
```
40+
cn.hippo4j.common.executor.support.CustomBlockingQueue
41+
```
42+
43+
文件内容仅一行:
44+
```
45+
com.example.queue.MyArrayBlockingQueue
46+
```
47+
48+
## 3. 服务端生效方式
49+
50+
当服务端下发的 `queueType``capacity` 命中自定义类型时,框架会通过 SPI 自动创建队列,并将服务端配置的容量参数传入 `generateBlockingQueue(Integer capacity)`
51+
52+
### 3.1 队列创建与验证
53+
54+
```java
55+
// 创建队列
56+
BlockingQueue<T> q = BlockingQueueTypeEnum.createBlockingQueue(queueType, capacity);
57+
58+
// 或者通过队列名称创建
59+
BlockingQueue<T> q2 = BlockingQueueTypeEnum.createBlockingQueue("ArrayBlockingQueue", capacity);
60+
61+
// 验证队列配置
62+
boolean valid = BlockingQueueManager.validateQueueConfig(queueType, capacity);
63+
64+
// 动态调整容量(仅 ResizableCapacityLinkedBlockingQueue 支持)
65+
boolean ok = BlockingQueueManager.changeQueueCapacity(executor.getQueue(), newCapacity);
66+
```
67+
68+
### 3.2 队列类型生效
69+
70+
- **配置模板**:在线程池管理页面编辑队列类型,会保存到数据库,但不会推送到运行中的客户端。
71+
- **生效时机**:客户端应用重启时,会从服务端读取最新配置,并使用反射替换线程池的 `workQueue` 字段。
72+
- **运行时调整**:运行时仅支持队列容量的动态调整(仅限 `ResizableCapacityLinkedBlockingQueue`),不支持队列类型切换。
73+
74+
服务端动态刷新处的实现:
75+
76+
```java
77+
// ServerThreadPoolDynamicRefresh#handleQueueChanges
78+
// 仅支持容量调整,不支持队列类型切换
79+
if (parameter.getCapacity() != null) {
80+
if (BlockingQueueManager.canChangeCapacity(executor.getQueue())) {
81+
boolean success = BlockingQueueManager.changeQueueCapacity(
82+
executor.getQueue(), parameter.getCapacity());
83+
if (success) {
84+
log.info("Queue capacity changed to: {} for thread pool: {}",
85+
parameter.getCapacity(), parameter.getTpId());
86+
}
87+
}
88+
}
89+
```
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
---
2+
sidebar_position: 3
3+
---
4+
5+
# 内置阻塞队列
6+
7+
Hippo4j 内置多种常用阻塞队列类型,支持开箱即用,亦可通过 SPI 扩展自定义队列类型。
8+
9+
## 内置类型清单
10+
11+
以下类型可直接在服务端或配置中选择(枚举:`BlockingQueueTypeEnum`):
12+
13+
- ArrayBlockingQueue(数组有界队列)
14+
- LinkedBlockingQueue(链表队列)
15+
- LinkedBlockingDeque(双端队列)
16+
- SynchronousQueue(同步移交队列)
17+
- LinkedTransferQueue(可转移队列)
18+
- PriorityBlockingQueue(优先级队列)
19+
- ResizableCapacityLinkedBlockingQueue(可在线动态调容量的链表队列)
20+
21+
## 枚举定义
22+
23+
**ResizableCapacityLinkedBlockingQueue**:支持在线变更 `capacity`
24+
25+
```java
26+
// cn.hippo4j.common.executor.support.BlockingQueueTypeEnum
27+
RESIZABLE_LINKED_BLOCKING_QUEUE(9, "ResizableCapacityLinkedBlockingQueue") {
28+
@Override
29+
<T> BlockingQueue<T> of(Integer capacity) {
30+
return new ResizableCapacityLinkedBlockingQueue<>(capacity);
31+
}
32+
@Override
33+
<T> BlockingQueue<T> of() {
34+
return new ResizableCapacityLinkedBlockingQueue<>();
35+
}
36+
}
37+
```
38+
39+
## 使用建议
40+
41+
- 需要在线调容量:优先选择 `ResizableCapacityLinkedBlockingQueue`
42+
- 需要严格有界:选择 `ArrayBlockingQueue`
43+
- 需要无界吞吐:选择 `LinkedBlockingQueue`
44+
- 需要优先级:选择 `PriorityBlockingQueue`
45+
- 需要同步移交:选择 `SynchronousQueue`
46+
47+
如需自定义队列类型,请参考《阻塞队列自定义》。

0 commit comments

Comments
 (0)