Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion async_function_pool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,13 @@
出参
字段名称描述类型说明taskId任务idString任务唯一标识taskStatus单个任务执行状态Integer1成功,2进行中,3不存在taskResult单个任务执行返回结果StringString类型,须用户根据实际类型自行序列化。执行成功与否的标志需要逻辑在返回的序列化String中标记,获取结果后反序列化解析返回信息。
## 异步获取结果asyncGetLogicResult
与同步获取相同,区别点在于同步是流程卡住的,等全部任务执行完成后才会返回。异步则是仅返回当前执行完成的任务。
与同步获取相同,区别点在于同步是流程卡住的,等全部任务执行完成后才会返回。异步则是仅返回当前执行完成的任务。
## 异步执行任务,无返回结果asyncRunLogicNoResult
```
/**
* 异步执行任务,无返回结果
*/
@NaslLogic
public Boolean asyncRunLogicNoResult(Function<String, String> asyncfunction, String requestStr) {

```
2 changes: 1 addition & 1 deletion async_function_pool/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

<groupId>com.netease</groupId>
<artifactId>async_function_pool</artifactId>
<version>1.0.2</version>
<version>1.0.4</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function;

/**
Expand All @@ -34,6 +37,10 @@ public class FunctionManagerApi {
*/
private final Map<String, CompletableFuture<String>> runningTaskRegister = new ConcurrentHashMap<>();


@Resource(name = "libraryCommonTaskExecutor")
private Executor contextAwareExecutor;

/**
* 初始化注册逻辑
*
Expand Down Expand Up @@ -63,7 +70,7 @@ public String asyncRunLogic(String logicKey, String requestStr) {
logger.error("asyncRunLogic not exist: {}", logicKey);
return null;
}
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> function.apply(requestStr));
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> function.apply(requestStr), contextAwareExecutor);
logger.info("asyncRunLogic success: {}", logicKey);
String taskId = UUID.randomUUID().toString();
runningTaskRegister.put(taskId, future);
Expand Down Expand Up @@ -130,4 +137,21 @@ public List<ThreadResultDTO> asyncGetLogicResult(List<String> taskIdList) {
return resultDTOList;
}

/**
* 异步执行任务,无返回结果
*/
@NaslLogic
public Boolean asyncRunLogicNoResult(Function<String, String> asyncfunction, String requestStr) {
try {
contextAwareExecutor.execute(() -> asyncfunction.apply(requestStr));
return true;
} catch (RejectedExecutionException e) {
logger.error("Async task rejected for request: {}", requestStr, e);
return false;
} catch (Exception e) {
logger.error("Failed to submit async task for request: {}", requestStr, e);
return false;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.netease.lib.tasks.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

import javax.annotation.Resource;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
public class AsyncExecutorConfig {

@Resource(name = "libraryThreadPoolConfig")
private ThreadPoolConfig threadPoolConfig;

@Bean(name = "libraryCommonTaskExecutor")
public Executor commonTaskExecutor() {
// Spring 默认配置是核心线程数大小为1,最大线程容量大小不受限制,队列容量也不受限制。
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(Integer.parseInt(threadPoolConfig.getCorePoolSize()));
// 最大线程数
executor.setMaxPoolSize(Integer.parseInt(threadPoolConfig.getMaxPoolSize()));
// 队列大小
executor.setQueueCapacity(Integer.parseInt(threadPoolConfig.getQueueCapacity()));
//核心线程等待销毁时间
executor.setKeepAliveSeconds(Integer.parseInt(threadPoolConfig.getKeepAliveSeconds()));
// 当最大池已满时,此策略保证不会丢失任务请求,但是可能会影响应用程序整体性能。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
executor.setThreadNamePrefix("common-pool");
// 传递request
executor.setTaskDecorator(runnable -> {
RequestAttributes context = RequestContextHolder.getRequestAttributes();
return () -> {
try {
if (context != null) {
RequestContextHolder.setRequestAttributes(context);
}
runnable.run();
} finally {
if (context != null) {
RequestContextHolder.resetRequestAttributes();
}
}
};
});
executor.initialize();
return executor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package com.netease.lib.tasks.config;

import com.netease.lowcode.core.EnvironmentType;
import com.netease.lowcode.core.annotation.Environment;
import com.netease.lowcode.core.annotation.NaslConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component("libraryThreadPoolConfig")
public class ThreadPoolConfig {

/**
* 核心线程数
*/
@Value("${corePoolSize:8}")
@NaslConfiguration(defaultValue = {
@Environment(type = EnvironmentType.DEV, value = "8"),
@Environment(type = EnvironmentType.ONLINE, value = "8")
})
public String corePoolSize;

/**
* 最大线程数
*/
@Value("${maxPoolSize:24}")
@NaslConfiguration(defaultValue = {
@Environment(type = EnvironmentType.DEV, value = "24"),
@Environment(type = EnvironmentType.ONLINE, value = "24")
})
public String maxPoolSize;

/**
* 队列大小
*/
@Value("${queueCapacity:1024}")
@NaslConfiguration(defaultValue = {
@Environment(type = EnvironmentType.DEV, value = "1024"),
@Environment(type = EnvironmentType.ONLINE, value = "1024")
})
public String queueCapacity;

/**
* 核心线程等待销毁时间
*/
@Value("${keepAliveSeconds:60}")
@NaslConfiguration(defaultValue = {
@Environment(type = EnvironmentType.DEV, value = "60"),
@Environment(type = EnvironmentType.ONLINE, value = "60")
})
public String keepAliveSeconds;

public String getCorePoolSize() {
return corePoolSize;
}

public void setCorePoolSize(String corePoolSize) {
this.corePoolSize = corePoolSize;
}

public String getMaxPoolSize() {
return maxPoolSize;
}

public void setMaxPoolSize(String maxPoolSize) {
this.maxPoolSize = maxPoolSize;
}

public String getQueueCapacity() {
return queueCapacity;
}

public void setQueueCapacity(String queueCapacity) {
this.queueCapacity = queueCapacity;
}

public String getKeepAliveSeconds() {
return keepAliveSeconds;
}

public void setKeepAliveSeconds(String keepAliveSeconds) {
this.keepAliveSeconds = keepAliveSeconds;
}
}
Loading
Loading