Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
Expand Down Expand Up @@ -164,48 +164,63 @@ private ChangeParameterNotifyRequest buildChangeRequest(ExecutorProperties befor
* @param executorProperties
*/
private void checkNotifyConsistencyAndReplace(ExecutorProperties executorProperties) {
boolean checkNotifyConfig = false;
boolean checkNotifyAlarm = false;
Map<String, List<NotifyConfigDTO>> newDynamicThreadPoolNotifyMap =
configModeNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties);
Map<String, List<NotifyConfigDTO>> notifyConfigs = threadPoolBaseSendMessageService.getNotifyConfigs();
if (CollectionUtil.isNotEmpty(notifyConfigs)) {
for (Map.Entry<String, List<NotifyConfigDTO>> each : newDynamicThreadPoolNotifyMap.entrySet()) {
if (checkNotifyConfig) {
break;
}
List<NotifyConfigDTO> notifyConfigDTOS = notifyConfigs.get(each.getKey());
for (NotifyConfigDTO notifyConfig : each.getValue()) {
if (!notifyConfigDTOS.contains(notifyConfig)) {
checkNotifyConfig = true;
break;
}
}
}

boolean checkNotifyConfig = checkAndReplaceNotifyConfig(newDynamicThreadPoolNotifyMap, notifyConfigs);
boolean checkNotifyAlarm = checkAndReplaceNotifyAlarm(executorProperties);

if (checkNotifyConfig || checkNotifyAlarm) {
log.info("[{}] Dynamic thread pool notification property changes.", executorProperties.getThreadPoolId());
}
if (checkNotifyConfig) {
configModeNotifyConfigBuilder.initCacheAndLock(newDynamicThreadPoolNotifyMap);
threadPoolBaseSendMessageService.putPlatform(newDynamicThreadPoolNotifyMap);
}

private boolean checkAndReplaceNotifyConfig(Map<String, List<NotifyConfigDTO>> newConfigs,
Map<String, List<NotifyConfigDTO>> currentConfigs) {
if (CollectionUtil.isEmpty(currentConfigs)) {
return false;
}
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId());
if (threadPoolNotifyAlarm != null) {
Boolean isAlarm = executorProperties.getAlarm();
Integer activeAlarm = executorProperties.getActiveAlarm();
Integer capacityAlarm = executorProperties.getCapacityAlarm();
if ((isAlarm != null && !Objects.equals(isAlarm, threadPoolNotifyAlarm.getAlarm()))
|| (activeAlarm != null && !Objects.equals(activeAlarm, threadPoolNotifyAlarm.getActiveAlarm()))
|| (capacityAlarm != null && !Objects.equals(capacityAlarm, threadPoolNotifyAlarm.getCapacityAlarm()))) {
checkNotifyAlarm = true;
threadPoolNotifyAlarm.setAlarm(Optional.ofNullable(isAlarm).orElse(threadPoolNotifyAlarm.getAlarm()));
threadPoolNotifyAlarm.setActiveAlarm(Optional.ofNullable(activeAlarm).orElse(threadPoolNotifyAlarm.getActiveAlarm()));
threadPoolNotifyAlarm.setCapacityAlarm(Optional.ofNullable(capacityAlarm).orElse(threadPoolNotifyAlarm.getCapacityAlarm()));

for (Map.Entry<String, List<NotifyConfigDTO>> entry : newConfigs.entrySet()) {
String key = entry.getKey();
List<NotifyConfigDTO> newNotifyConfigList = entry.getValue();
List<NotifyConfigDTO> currentNotifyConfigList = currentConfigs.get(key);

if (currentNotifyConfigList == null || !currentNotifyConfigList.containsAll(newNotifyConfigList)) {
configModeNotifyConfigBuilder.initCacheAndLock(newConfigs);
threadPoolBaseSendMessageService.putPlatform(newConfigs);
return true;
}
}
if (checkNotifyConfig || checkNotifyAlarm) {
log.info("[{}] Dynamic thread pool notification property changes.", executorProperties.getThreadPoolId());

return false;
}

private boolean checkAndReplaceNotifyAlarm(ExecutorProperties executorProperties) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId());
if (threadPoolNotifyAlarm == null) {
return false;
}

boolean checkNotifyAlarm = false;
Boolean isAlarm = executorProperties.getAlarm();
Integer activeAlarm = executorProperties.getActiveAlarm();
Integer capacityAlarm = executorProperties.getCapacityAlarm();

if ((isAlarm != null && !Objects.equals(isAlarm, threadPoolNotifyAlarm.getAlarm()))
|| (activeAlarm != null && !Objects.equals(activeAlarm, threadPoolNotifyAlarm.getActiveAlarm()))
|| (capacityAlarm != null && !Objects.equals(capacityAlarm, threadPoolNotifyAlarm.getCapacityAlarm()))) {
checkNotifyAlarm = true;
threadPoolNotifyAlarm.setAlarm(isAlarm != null ? isAlarm : threadPoolNotifyAlarm.getAlarm());
threadPoolNotifyAlarm.setActiveAlarm(activeAlarm != null ? activeAlarm : threadPoolNotifyAlarm.getActiveAlarm());
threadPoolNotifyAlarm.setCapacityAlarm(capacityAlarm != null ? capacityAlarm : threadPoolNotifyAlarm.getCapacityAlarm());
}

return checkNotifyAlarm;
}


/**
* Check consistency.
*
Expand All @@ -216,19 +231,26 @@ private boolean checkConsistency(String threadPoolId, ExecutorProperties propert
ThreadPoolExecutorHolder executorHolder = ThreadPoolExecutorRegistry.getHolder(threadPoolId);
ExecutorProperties beforeProperties = executorHolder.getExecutorProperties();
ThreadPoolExecutor executor = executorHolder.getExecutor();

if (executor == null) {
return false;
}
boolean result = (properties.getCorePoolSize() != null && !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize()))
|| (properties.getMaximumPoolSize() != null && !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()))
|| (properties.getAllowCoreThreadTimeOut() != null && !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()))
|| (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()))
|| (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()))
|| (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler()))
||
((properties.getQueueCapacity() != null && !Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName())));
return result;

return Stream.of(
hasPropertyChanged(beforeProperties.getCorePoolSize(), properties.getCorePoolSize()),
hasPropertyChanged(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()),
hasPropertyChanged(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()),
hasPropertyChanged(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()),
hasPropertyChanged(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()),
hasPropertyChanged(beforeProperties.getRejectedHandler(), properties.getRejectedHandler()),
isQueueCapacityChanged(beforeProperties, properties, executor)
).anyMatch(Boolean::booleanValue);
}

private boolean isQueueCapacityChanged(ExecutorProperties beforeProperties, ExecutorProperties properties, ThreadPoolExecutor executor) {
return properties.getQueueCapacity() != null &&
!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) &&
Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName());
}

/**
Expand All @@ -238,41 +260,62 @@ private boolean checkConsistency(String threadPoolId, ExecutorProperties propert
* @param properties
*/
private void dynamicRefreshPool(String threadPoolId, ExecutorProperties properties) {
ExecutorProperties beforeProperties = ThreadPoolExecutorRegistry.getHolder(threadPoolId).getExecutorProperties();
ThreadPoolExecutor executor = ThreadPoolExecutorRegistry.getHolder(threadPoolId).getExecutor();
if (properties.getMaximumPoolSize() != null && properties.getCorePoolSize() != null) {
ThreadPoolExecutorUtil.safeSetPoolSize(executor, properties.getCorePoolSize(), properties.getMaximumPoolSize());
ThreadPoolExecutorHolder holder = ThreadPoolExecutorRegistry.getHolder(threadPoolId);
ExecutorProperties beforeProperties = holder.getExecutorProperties();
ThreadPoolExecutor executor = holder.getExecutor();

if (executor == null) {
log.warn("Executor is null for threadPoolId: {}", threadPoolId);
return;
}

setPoolSizes(executor, properties);
updateExecutorProperties(executor, beforeProperties, properties);
updateQueueCapacity(executor, beforeProperties, properties);
}

private void setPoolSizes(ThreadPoolExecutor executor, ExecutorProperties properties) {
Integer corePoolSize = properties.getCorePoolSize();
Integer maximumPoolSize = properties.getMaximumPoolSize();

if (corePoolSize != null && maximumPoolSize != null) {
ThreadPoolExecutorUtil.safeSetPoolSize(executor, corePoolSize, maximumPoolSize);
} else {
if (properties.getMaximumPoolSize() != null) {
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
if (maximumPoolSize != null) {
executor.setMaximumPoolSize(maximumPoolSize);
}
if (properties.getCorePoolSize() != null) {
executor.setCorePoolSize(properties.getCorePoolSize());
if (corePoolSize != null) {
executor.setCorePoolSize(corePoolSize);
}
}
if (properties.getAllowCoreThreadTimeOut() != null && !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) {
}

private void updateExecutorProperties(ThreadPoolExecutor executor, ExecutorProperties beforeProperties, ExecutorProperties properties) {
if (hasPropertyChanged(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) {
executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
}
if (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) {
if (executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut());
}
if (hasPropertyChanged(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()) && executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut());
}
if (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
if (hasPropertyChanged(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(properties.getRejectedHandler());
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
}
if (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) {
if (hasPropertyChanged(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) {
executor.setKeepAliveTime(properties.getKeepAliveTime(), TimeUnit.SECONDS);
}
if (properties.getQueueCapacity() != null && !Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName())) {
if (executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) {
ResizableCapacityLinkedBlockingQueue<?> queue = (ResizableCapacityLinkedBlockingQueue<?>) executor.getQueue();
queue.setCapacity(properties.getQueueCapacity());
} else {
log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName());
}
}

private void updateQueueCapacity(ThreadPoolExecutor executor, ExecutorProperties beforeProperties, ExecutorProperties properties) {
if (hasPropertyChanged(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) {
((ResizableCapacityLinkedBlockingQueue<?>) executor.getQueue()).setCapacity(properties.getQueueCapacity());
} else {
log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName());
}
}

private <T> boolean hasPropertyChanged(T before, T after) {
return after != null && !Objects.equals(before, after);
}
}