批量发送请求后立即返回线程的处理结果(不阻塞)
基于Spring Boot的CompletableFuture与Consumer批量处理方案方案概述:
批量请求处理:用户发送多条数据请求,并发处理每条数据线程池执行:使用自定义线程池处理具体业务逻辑
[*]异步结果获取:通过CompletableFuture获取线程处理结果
[*]结果回调:使用Consumer模式处理完成后的结果
核心实现分析:
1. 线程池配置
package org.completablefuture.consumer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
public class AsyncConfig {
@Bean("taskExecutor")
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("BatchProcessing-");
executor.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
[*]配置专用线程池处理批量任务
[*]核心线程5个,最大线程10个,队列容量20
2.批量处理服务
List<CompletableFuture<ProcessResult>> futures = requests.stream()
.map(request -> {
CompletableFuture<ProcessResult> future = new CompletableFuture<>();
batchProcessingService.process(request, result -> {}, future);
log.info("batch处理返回的future: {}", future);
return future;
})
.collect(Collectors.toList());/**
* 单个处理请求,请求处理完成后立即回调
*
* @param request
* @param callback
* @param completableFuture
*/
public void process(BatchRequest request,
Consumer<ProcessResult> callback,
CompletableFuture<ProcessResult> completableFuture) {
log.info("进入process request: {}", request);
CompletableFuture.supplyAsync(() -> businessService.dealBusiness(request), taskExecutor)
.whenComplete((result, throwable) -> {
if (throwable != null) {
ProcessResult errorResult = new ProcessResult(
request.getId(),
false,
"处理失败: " + throwable.getMessage(),
null,
0,
null
);
callback.accept(errorResult);
completableFuture.completeExceptionally(throwable);
} else {
callback.accept(result);
completableFuture.complete(result);
}
});
}
[*]控制层循环处理请求
[*]BatchProcessingService类中处理每一个请求,并从线程中获取处理结果,并回调返回
3.业务处理
public ProcessResult dealBusiness(BatchRequest request) {
// 模拟处理逻辑
String id = request.getId();
Integer idInt = Integer.valueOf(id) == null ? 0 : Integer.valueOf(id);
if (StringUtils.isNoneBlank(id) && idInt == 3){
try {
// 模拟随机异常
if (new Random().nextBoolean()) {
throw new RuntimeException("Simulated business exception");
}
} catch (IllegalArgumentException exception) {
// 处理特定异常
return new ProcessResult(
request.getId(),
false,
"处理失败",
null,
-1,
exception
);
}
/*new ProcessResult(
request.getId(),
false,
"处理失败",
null,
-1,
new Exception("处理出现异常")
);*/
}
if (StringUtils.isNoneBlank(id) && idInt % 2 == 0){
return new ProcessResult(
request.getId(),
false,
"处理失败",
null,
-1,
null
);
} else {
return new ProcessResult(
request.getId(),
true,
"处理成功",
request.getData(),
-1,
null
);
}
}
4.对处理结果进行合并
return CompletableFuture.allOf(futures.toArray(new CompletableFuture))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! 这个好,看起来很实用 很好很强大我过来先占个楼 待编辑 谢谢楼主提供! 这个好,看起来很实用 感谢分享 感谢分享,下载保存了,貌似很强大 收藏一下 不知道什么时候能用到 感谢发布原创作品,程序园因你更精彩
页:
[1]