Skip to content

Commit af56746

Browse files
authored
[feat]: migrate blocking executors to virtual threads (#4062)
1 parent d99e567 commit af56746

170 files changed

Lines changed: 7594 additions & 1416 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

hertzbeat-ai/src/main/java/org/apache/hertzbeat/ai/tools/impl/MonitorToolsImpl.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import lombok.extern.slf4j.Slf4j;
2222
import org.apache.hertzbeat.ai.config.McpContextHolder;
2323
import org.apache.hertzbeat.manager.pojo.dto.MonitorDto;
24+
import org.apache.hertzbeat.manager.pojo.dto.ParamDefineInfo;
2425
import org.apache.hertzbeat.manager.service.MonitorService;
2526
import org.apache.hertzbeat.manager.service.AppService;
2627
import org.apache.hertzbeat.ai.utils.UtilityClass;
@@ -32,7 +33,6 @@
3233
import org.springframework.beans.factory.annotation.Autowired;
3334
import org.apache.hertzbeat.common.entity.manager.Monitor;
3435
import org.apache.hertzbeat.common.entity.manager.Param;
35-
import org.apache.hertzbeat.common.entity.manager.ParamDefine;
3636

3737
import java.util.ArrayList;
3838
import java.util.List;
@@ -280,7 +280,9 @@ public String addMonitor(
280280

281281
// Validate that all required parameters for this monitor type are provided
282282
try {
283-
MonitorDto monitorDto = MonitorDto.builder().monitor(monitor).params(paramList).build();
283+
MonitorDto monitorDto = new MonitorDto();
284+
monitorDto.setMonitor(monitor);
285+
monitorDto.setParams(paramList);
284286
monitorService.validate(monitorDto, false);
285287
} catch (IllegalArgumentException argumentException) {
286288
if (argumentException.getMessage().contains("required")) {
@@ -456,7 +458,7 @@ public String getMonitorParams(
456458
}
457459

458460
// Get parameter definitions from app service
459-
List<ParamDefine> paramDefines = appService.getAppParamDefines(app.toLowerCase().trim());
461+
List<ParamDefineInfo> paramDefines = appService.getAppParamDefines(app.toLowerCase().trim());
460462

461463
if (paramDefines == null || paramDefines.isEmpty()) {
462464
return String.format("No parameter definitions found for monitor type '%s'. "
@@ -468,7 +470,7 @@ public String getMonitorParams(
468470
response.append(String.format("Parameter Definitions for Monitor Type '%s' (Total: %d):\n\n",
469471
app, paramDefines.size()));
470472

471-
for (ParamDefine paramDefine : paramDefines) {
473+
for (ParamDefineInfo paramDefine : paramDefines) {
472474
response.append("• Field: ").append(paramDefine.getField()).append("\n");
473475

474476
// Add display name if available

hertzbeat-alerter/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@
4040
<artifactId>hertzbeat-common-core</artifactId>
4141
<scope>provided</scope>
4242
</dependency>
43+
<dependency>
44+
<groupId>org.apache.hertzbeat</groupId>
45+
<artifactId>hertzbeat-common-spring</artifactId>
46+
<scope>provided</scope>
47+
</dependency>
4348
<!-- plugin -->
4449
<dependency>
4550
<groupId>org.apache.hertzbeat</groupId>

hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/AlerterWorkerPool.java

Lines changed: 103 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,29 +18,46 @@
1818
package org.apache.hertzbeat.alert;
1919

2020
import com.google.common.util.concurrent.ThreadFactoryBuilder;
21+
import java.util.Map;
22+
import java.util.concurrent.ConcurrentHashMap;
2123
import java.util.concurrent.LinkedBlockingQueue;
2224
import java.util.concurrent.RejectedExecutionException;
25+
import java.util.concurrent.Semaphore;
2326
import java.util.concurrent.ThreadFactory;
2427
import java.util.concurrent.ThreadPoolExecutor;
2528
import java.util.concurrent.TimeUnit;
2629
import lombok.extern.slf4j.Slf4j;
30+
import org.apache.hertzbeat.common.concurrent.ManagedExecutor;
31+
import org.apache.hertzbeat.common.concurrent.ManagedExecutors;
32+
import org.apache.hertzbeat.common.config.VirtualThreadProperties;
33+
import org.springframework.beans.factory.DisposableBean;
34+
import org.springframework.beans.factory.annotation.Autowired;
2735
import org.springframework.stereotype.Component;
2836

2937
/**
3038
* alarm module thread pool
3139
*/
3240
@Component
3341
@Slf4j
34-
public class AlerterWorkerPool {
42+
public class AlerterWorkerPool implements DisposableBean {
3543

3644
private ThreadPoolExecutor workerExecutor;
37-
private ThreadPoolExecutor notifyExecutor;
38-
private ThreadPoolExecutor logWorkerExecutor;
45+
private ManagedExecutor notifyExecutor;
46+
private ManagedExecutor logWorkerExecutor;
47+
private Map<Byte, Semaphore> notifyChannelPermits;
48+
private int notifyMaxConcurrentPerChannel;
3949

4050
public AlerterWorkerPool() {
51+
this(VirtualThreadProperties.defaults());
52+
}
53+
54+
@Autowired
55+
public AlerterWorkerPool(VirtualThreadProperties virtualThreadProperties) {
56+
VirtualThreadProperties properties =
57+
virtualThreadProperties == null ? VirtualThreadProperties.defaults() : virtualThreadProperties;
4158
initWorkExecutor();
42-
initNotifyExecutor();
43-
initLogWorkerExecutor();
59+
initNotifyExecutor(properties);
60+
initLogWorkerExecutor(properties);
4461
}
4562

4663
private void initWorkExecutor() {
@@ -61,16 +78,32 @@ private void initWorkExecutor() {
6178
new ThreadPoolExecutor.AbortPolicy());
6279
}
6380

64-
private void initNotifyExecutor() {
81+
private void initNotifyExecutor(VirtualThreadProperties properties) {
82+
Thread.UncaughtExceptionHandler handler = (thread, throwable) -> {
83+
log.error("Alerter notifyExecutor has uncaughtException.");
84+
log.error(throwable.getMessage(), throwable);
85+
};
86+
if (properties.enabled()) {
87+
VirtualThreadProperties.AlerterProperties alerterProperties = properties.alerter();
88+
VirtualThreadProperties.PoolProperties notifyProperties = alerterProperties.notifyPool();
89+
notifyMaxConcurrentPerChannel = Math.max(1, alerterProperties.notifyMaxConcurrentPerChannel());
90+
notifyChannelPermits = new ConcurrentHashMap<>(8);
91+
notifyExecutor = ManagedExecutors.newVirtualExecutor("notify-worker", "notify-worker-",
92+
notifyProperties.mode(), notifyProperties.maxConcurrentJobs(), handler);
93+
return;
94+
}
95+
notifyMaxConcurrentPerChannel = 0;
96+
notifyChannelPermits = null;
97+
notifyExecutor = ManagedExecutors.wrap("notify-worker", createLegacyNotifyExecutor(handler));
98+
}
99+
100+
private ThreadPoolExecutor createLegacyNotifyExecutor(Thread.UncaughtExceptionHandler handler) {
65101
ThreadFactory threadFactory = new ThreadFactoryBuilder()
66-
.setUncaughtExceptionHandler((thread, throwable) -> {
67-
log.error("Alerter notifyExecutor has uncaughtException.");
68-
log.error(throwable.getMessage(), throwable);
69-
})
102+
.setUncaughtExceptionHandler(handler)
70103
.setDaemon(true)
71104
.setNameFormat("notify-worker-%d")
72105
.build();
73-
notifyExecutor = new ThreadPoolExecutor(6,
106+
return new ThreadPoolExecutor(6,
74107
6,
75108
10,
76109
TimeUnit.SECONDS,
@@ -79,16 +112,27 @@ private void initNotifyExecutor() {
79112
new ThreadPoolExecutor.AbortPolicy());
80113
}
81114

82-
private void initLogWorkerExecutor() {
115+
private void initLogWorkerExecutor(VirtualThreadProperties properties) {
116+
Thread.UncaughtExceptionHandler handler = (thread, throwable) -> {
117+
log.error("Alerter logWorkerExecutor has uncaughtException.");
118+
log.error(throwable.getMessage(), throwable);
119+
};
120+
if (properties.enabled()) {
121+
VirtualThreadProperties.QueueProperties logWorkerProperties = properties.alerter().logWorker();
122+
logWorkerExecutor = ManagedExecutors.newQueuedVirtualExecutor("alerter-log-worker", "log-worker-",
123+
logWorkerProperties.maxConcurrentJobs(), logWorkerProperties.queueCapacity(), handler);
124+
return;
125+
}
126+
logWorkerExecutor = ManagedExecutors.wrap("alerter-log-worker", createLegacyLogWorkerExecutor(handler));
127+
}
128+
129+
private ThreadPoolExecutor createLegacyLogWorkerExecutor(Thread.UncaughtExceptionHandler handler) {
83130
ThreadFactory threadFactory = new ThreadFactoryBuilder()
84-
.setUncaughtExceptionHandler((thread, throwable) -> {
85-
log.error("Alerter logWorkerExecutor has uncaughtException.");
86-
log.error(throwable.getMessage(), throwable);
87-
})
131+
.setUncaughtExceptionHandler(handler)
88132
.setDaemon(true)
89133
.setNameFormat("log-worker-%d")
90134
.build();
91-
logWorkerExecutor = new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS,
135+
return new ThreadPoolExecutor(10, 10, 10, TimeUnit.SECONDS,
92136
new LinkedBlockingQueue<>(1000),
93137
threadFactory,
94138
new ThreadPoolExecutor.AbortPolicy());
@@ -113,6 +157,41 @@ public void executeNotify(Runnable runnable) throws RejectedExecutionException {
113157
notifyExecutor.execute(runnable);
114158
}
115159

160+
/**
161+
* Executes the given runnable task using the notify executor with per-channel concurrency control.
162+
*
163+
* @param channelType notification channel type
164+
* @param runnable the task to be executed
165+
* @throws RejectedExecutionException if the task cannot be accepted for execution
166+
*/
167+
public void executeNotify(byte channelType, Runnable runnable) throws RejectedExecutionException {
168+
if (notifyChannelPermits == null) {
169+
notifyExecutor.execute(runnable);
170+
return;
171+
}
172+
Semaphore semaphore = notifyChannelPermits.computeIfAbsent(channelType,
173+
key -> new Semaphore(notifyMaxConcurrentPerChannel));
174+
if (!semaphore.tryAcquire()) {
175+
throw new RejectedExecutionException(
176+
"notify-worker rejected task because channel concurrency limit was reached for type " + channelType);
177+
}
178+
boolean submitted = false;
179+
try {
180+
notifyExecutor.execute(() -> {
181+
try {
182+
runnable.run();
183+
} finally {
184+
semaphore.release();
185+
}
186+
});
187+
submitted = true;
188+
} finally {
189+
if (!submitted) {
190+
semaphore.release();
191+
}
192+
}
193+
}
194+
116195
/**
117196
* Executes the given runnable task using the logWorkerExecutor.
118197
*
@@ -122,4 +201,11 @@ public void executeNotify(Runnable runnable) throws RejectedExecutionException {
122201
public void executeLogJob(Runnable runnable) throws RejectedExecutionException {
123202
logWorkerExecutor.execute(runnable);
124203
}
204+
205+
@Override
206+
public void destroy() {
207+
workerExecutor.shutdownNow();
208+
notifyExecutor.close();
209+
logWorkerExecutor.close();
210+
}
125211
}

0 commit comments

Comments
 (0)