基于Spring Boot的CompletableFuture与Consumer批量处理方案
方案概述:
批量请求处理:用户发送多条数据请求,并发处理每条数据线程池执行:使用自定义线程池处理具体业务逻辑
- 异步结果获取:通过CompletableFuture获取线程处理结果
- 结果回调:使用Consumer模式处理完成后的结果
核心实现分析:
1. 线程池配置- package org.completablefuture.consumer;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import java.util.concurrent.Executor;
- @Configuration
- public class AsyncConfig {
- @Bean("taskExecutor")
- public Executor taskExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(10);
- executor.setMaxPoolSize(20);
- executor.setQueueCapacity(100);
- executor.setThreadNamePrefix("BatchProcessing-");
- executor.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());
- executor.initialize();
- return executor;
- }
- }
复制代码
- 配置专用线程池处理批量任务
- 核心线程5个,最大线程10个,队列容量20
2.批量处理服务- List<CompletableFuture<ProcessResult>> futures = requests.stream()
- .map(request -> {
- CompletableFuture<ProcessResult> future = new CompletableFuture<>();
- batchProcessingService.process(request, result -> {}, future);
- log.info("batch处理返回的future: {}", future);
- return future;
- })
- .collect(Collectors.toList());
复制代码- /**
- * 单个处理请求,请求处理完成后立即回调
- *
- * @param request
- * @param callback
- * @param completableFuture
- */
- public void process(BatchRequest request,
- Consumer<ProcessResult> callback,
- CompletableFuture<ProcessResult> completableFuture) {
- log.info("进入process request: {}", request);
- CompletableFuture.supplyAsync(() -> businessService.dealBusiness(request), taskExecutor)
- .whenComplete((result, throwable) -> {
- if (throwable != null) {
- ProcessResult errorResult = new ProcessResult(
- request.getId(),
- false,
- "处理失败: " + throwable.getMessage(),
- null,
- 0,
- null
- );
- callback.accept(errorResult);
- completableFuture.completeExceptionally(throwable);
- } else {
- callback.accept(result);
- completableFuture.complete(result);
- }
- });
- }
复制代码
- 控制层循环处理请求
- BatchProcessingService类中处理每一个请求,并从线程中获取处理结果,并回调返回
3.业务处理
- public ProcessResult dealBusiness(BatchRequest request) {
- // 模拟处理逻辑
- String id = request.getId();
- Integer idInt = Integer.valueOf(id) == null ? 0 : Integer.valueOf(id);
- if (StringUtils.isNoneBlank(id) && idInt == 3){
- try {
- // 模拟随机异常
- if (new Random().nextBoolean()) {
- throw new RuntimeException("Simulated business exception");
- }
- } catch (IllegalArgumentException exception) {
- // 处理特定异常
- return new ProcessResult(
- request.getId(),
- false,
- "处理失败",
- null,
- -1,
- exception
- );
- }
- /*new ProcessResult(
- request.getId(),
- false,
- "处理失败",
- null,
- -1,
- new Exception("处理出现异常")
- );*/
- }
- if (StringUtils.isNoneBlank(id) && idInt % 2 == 0){
- return new ProcessResult(
- request.getId(),
- false,
- "处理失败",
- null,
- -1,
- null
- );
- } else {
- return new ProcessResult(
- request.getId(),
- true,
- "处理成功",
- request.getData(),
- -1,
- null
- );
- }
- }
复制代码
4.对处理结果进行合并- return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
- .thenApply(v -> futures.stream()
- .map(CompletableFuture::join)
- .collect(Collectors.toList()));
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |