Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
43a26ac
feat: Add dedicated thread pool for RestTemplate in alert notificati…
Carpe-Wang Jun 20, 2025
a52364d
Merge branch 'master' into RestTemplateConfig-Thread-pool
Carpe-Wang Jun 20, 2025
58b6674
Merge branch 'master' into RestTemplateConfig-Thread-pool
Aias00 Jun 20, 2025
956a073
checkstyle
Carpe-Wang Jun 20, 2025
ee52c90
Merge remote-tracking branch 'origin/RestTemplateConfig-Thread-pool' …
Carpe-Wang Jun 20, 2025
cfb7f2e
Merge branch 'master' into RestTemplateConfig-Thread-pool
Carpe-Wang Jun 21, 2025
d54e188
change according review
Carpe-Wang Jun 21, 2025
475d177
Merge branch 'master' into RestTemplateConfig-Thread-pool
Aias00 Jun 21, 2025
b95c588
Merge branch 'master' into RestTemplateConfig-Thread-pool
Aias00 Jun 21, 2025
0134dee
Merge branch 'master' into RestTemplateConfig-Thread-pool
Aias00 Jun 22, 2025
570b61b
Merge branch 'master' into RestTemplateConfig-Thread-pool
Aias00 Jun 23, 2025
a90301f
Merge branch 'master' into RestTemplateConfig-Thread-pool
Calvin979 Jun 23, 2025
8e58f00
Merge branch 'master' into RestTemplateConfig-Thread-pool
Aias00 Jun 27, 2025
9f5db16
Merge branch 'master' into RestTemplateConfig-Thread-pool
Calvin979 Jun 27, 2025
afe66b1
Merge branch 'master' into RestTemplateConfig-Thread-pool
Aias00 Jun 28, 2025
432f234
Merge branch 'master' into RestTemplateConfig-Thread-pool
Aias00 Jun 28, 2025
3b2ad36
Merge branch 'master' into RestTemplateConfig-Thread-pool
Aias00 Jun 29, 2025
bc08ee8
Merge branch 'master' into RestTemplateConfig-Thread-pool
Calvin979 Jun 30, 2025
276324c
Merge branch 'master' into RestTemplateConfig-Thread-pool
Aias00 Jul 1, 2025
4a7f522
Merge branch 'master' into RestTemplateConfig-Thread-pool
Aias00 Jul 1, 2025
840683e
Merge branch 'master' into RestTemplateConfig-Thread-pool
Aias00 Jul 2, 2025
ceb194a
Merge branch 'master' into RestTemplateConfig-Thread-pool
Aias00 Jul 10, 2025
078db96
Merge branch 'master' into RestTemplateConfig-Thread-pool
Aias00 Jul 10, 2025
88f71b3
Merge branch 'master' into RestTemplateConfig-Thread-pool
yuluo-yx Jul 12, 2025
01ac2bb
Merge branch 'master' into RestTemplateConfig-Thread-pool
yuluo-yx Aug 14, 2025
14e51b5
Merge branch 'master' into RestTemplateConfig-Thread-pool
tomsun28 Sep 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.alert.AlerterWorkerPool;
import org.springframework.beans.factory.annotation.Qualifier;
import org.apache.hertzbeat.alert.config.AlertSseManager;
import org.apache.hertzbeat.common.entity.alerter.GroupAlert;
import org.apache.hertzbeat.common.entity.alerter.NoticeReceiver;
Expand All @@ -48,15 +51,20 @@ public class AlertNoticeDispatch {
private final Map<Byte, AlertNotifyHandler> alertNotifyHandlerMap;
private final PluginRunner pluginRunner;
private final AlertSseManager emitterManager;
private final Executor restTemplateThreadPool;

public AlertNoticeDispatch(AlerterWorkerPool workerPool,
NoticeConfigService noticeConfigService,
AlertStoreHandler alertStoreHandler,
List<AlertNotifyHandler> alertNotifyHandlerList, PluginRunner pluginRunner, AlertSseManager emitterManager) {
List<AlertNotifyHandler> alertNotifyHandlerList,
PluginRunner pluginRunner,
AlertSseManager emitterManager,
@Qualifier("restTemplateThreadPool") Executor restTemplateThreadPool) {
this.workerPool = workerPool;
this.noticeConfigService = noticeConfigService;
this.alertStoreHandler = alertStoreHandler;
this.pluginRunner = pluginRunner;
this.restTemplateThreadPool = restTemplateThreadPool;
alertNotifyHandlerMap = Maps.newHashMapWithExpectedSize(alertNotifyHandlerList.size());
this.emitterManager = emitterManager;
alertNotifyHandlerList.forEach(r -> alertNotifyHandlerMap.put(r.type(), r));
Expand Down Expand Up @@ -121,14 +129,31 @@ public void dispatchAlarm(GroupAlert groupAlert) {
}

private void sendNotify(GroupAlert alert) {
matchNoticeRulesByAlert(alert).ifPresent(noticeRules -> noticeRules.forEach(rule -> workerPool.executeNotify(() -> rule.getReceiverId()
.forEach(receiverId -> {
try {
sendNoticeMsg(getOneReceiverById(receiverId),
getOneTemplateById(rule.getTemplateId()), alert);
} catch (AlertNoticeException e) {
log.warn("DispatchTask sendNoticeMsg error, message: {}", e.getMessage());
}
}))));
matchNoticeRulesByAlert(alert).ifPresent(noticeRules -> noticeRules.forEach(rule -> workerPool.executeNotify(() -> {
List<CompletableFuture<Void>> futures = rule.getReceiverId().stream()
.map(receiverId -> sendNoticeAsync(getOneReceiverById(receiverId),
getOneTemplateById(rule.getTemplateId()), alert))
.toList();

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((result, exception) -> {
if (exception != null) {
log.warn("Some async notifications failed", exception);
} else {
log.debug("All notifications completed for alert: {}", alert.getGroupLabels());
}
});
})));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi can you describe why use the CompleableFuture here, sendNoticeMsg does not need to return a result; it just needs to handle exceptions.

}

private CompletableFuture<Void> sendNoticeAsync(NoticeReceiver receiver, NoticeTemplate template, GroupAlert alert) {
return CompletableFuture.runAsync(() -> {
try {
sendNoticeMsg(receiver, template, alert);
} catch (AlertNoticeException e) {
log.warn("Async notification failed for receiver {}: {}", receiver.getName(), e.getMessage());
throw new RuntimeException(e);
}
}, restTemplateThreadPool);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.Locale;
import java.util.Map;
import java.util.ResourceBundle;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.alert.AlerterProperties;
import org.apache.hertzbeat.common.entity.alerter.GroupAlert;
Expand All @@ -34,6 +36,7 @@
import org.apache.hertzbeat.common.util.ResourceBundleUtil;
import org.apache.hertzbeat.alert.notice.AlertNotifyHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.event.EventListener;
import org.springframework.ui.freemarker.FreeMarkerTemplateUtils;
import org.springframework.web.client.RestTemplate;
Expand All @@ -50,6 +53,9 @@ abstract class AbstractAlertNotifyHandlerImpl implements AlertNotifyHandler {
protected RestTemplate restTemplate;
@Autowired
protected AlerterProperties alerterProperties;
@Autowired
@Qualifier("restTemplateThreadPool")
protected Executor restTemplateThreadPool;


protected String renderContent(NoticeTemplate noticeTemplate, GroupAlert alert) throws TemplateException, IOException {
Expand Down Expand Up @@ -113,6 +119,19 @@ protected String escapeJsonStr(String jsonStr){
return sb.toString();
}

protected CompletableFuture<Void> sendAsync(org.apache.hertzbeat.common.entity.alerter.NoticeReceiver receiver,
org.apache.hertzbeat.common.entity.alerter.NoticeTemplate noticeTemplate,
GroupAlert alert) {
return CompletableFuture.runAsync(() -> {
try {
send(receiver, noticeTemplate, alert);
} catch (Exception e) {
log.error("Async alert notification failed", e);
throw new RuntimeException(e);
}
}, restTemplateThreadPool);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seem no others use this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have deleted this piece of code.


@EventListener(SystemConfigChangeEvent.class)
public void onEvent(SystemConfigChangeEvent event) {
log.info("{} receive system config change event: {}.", this.getClass().getName(), event.getSource());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.concurrent.Executor;
import java.util.Collections;
import java.util.List;
import org.apache.hertzbeat.alert.AlerterWorkerPool;
Expand Down Expand Up @@ -64,6 +65,9 @@ class AlertNoticeDispatchTest {
@Mock
private AlertSseManager emitterManager;

@Mock
private Executor restTemplateThreadPool;

private AlertNoticeDispatch alertNoticeDispatch;

private static final int DISPATCH_THREADS = 3;
Expand All @@ -82,7 +86,8 @@ void setUp() {
alertStoreHandler,
alertNotifyHandlerList,
pluginRunner,
emitterManager
emitterManager,
restTemplateThreadPool
);

receiver = NoticeReceiver.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hertzbeat.manager.config;

import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
Expand All @@ -26,11 +28,11 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.OkHttp3ClientHttpRequestFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.client.RestTemplate;

/**
* restTemplate config
* todo thread pool
*/
@Configuration
public class RestTemplateConfig {
Expand Down Expand Up @@ -58,4 +60,16 @@ public ClientHttpRequestFactory simpleClientHttpRequestFactory() {
);
}

@Bean("restTemplateThreadPool")
public Executor restTemplateThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(200);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi the http request require timeliness, suggest use the SynchronousQueue instead of executor.setQueueCapacity(200); , and executor.setCorePoolSize(2); executor.setMaxPoolSize(Integer.MAX_VALUE);

executor.setThreadNamePrefix("RestTemplate-");
executor.setKeepAliveSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
Loading