撷监芝 发表于 2026-1-17 21:50:16

批量发送请求后立即返回线程的处理结果(不阻塞)

基于Spring Boot的CompletableFuture与Consumer批量处理方案
方案概述:
批量请求处理:用户发送多条数据请求,并发处理每条数据线程池执行:使用自定义线程池处理具体业务逻辑
[*]异步结果获取:通过CompletableFuture获取线程处理结果
[*]结果回调:使用Consumer模式处理完成后的结果
核心实现分析:
1. 线程池配置
package org.completablefuture.consumer;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
public class AsyncConfig {

    @Bean("taskExecutor")
    public Executor taskExecutor() {
      ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
      executor.setCorePoolSize(10);
      executor.setMaxPoolSize(20);
      executor.setQueueCapacity(100);
      executor.setThreadNamePrefix("BatchProcessing-");
      executor.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());
      executor.initialize();
      return executor;
    }
}

[*]配置专用线程池处理批量任务
[*]核心线程5个,最大线程10个,队列容量20
2.批量处理服务
List<CompletableFuture<ProcessResult>> futures = requests.stream()
                .map(request -> {
                  CompletableFuture<ProcessResult> future = new CompletableFuture<>();
                  batchProcessingService.process(request, result -> {}, future);
                  log.info("batch处理返回的future: {}", future);
                  return future;
                })
                .collect(Collectors.toList());/**
   * 单个处理请求,请求处理完成后立即回调
   *
   * @param request
   * @param callback
   * @param completableFuture
   */
    public void process(BatchRequest request,
                        Consumer<ProcessResult> callback,
                        CompletableFuture<ProcessResult> completableFuture) {

      log.info("进入process request: {}", request);
      CompletableFuture.supplyAsync(() -> businessService.dealBusiness(request), taskExecutor)
                .whenComplete((result, throwable) -> {
                  if (throwable != null) {
                        ProcessResult errorResult = new ProcessResult(
                              request.getId(),
                              false,
                              "处理失败: " + throwable.getMessage(),
                              null,
                              0,
                              null
                        );
                        callback.accept(errorResult);
                        completableFuture.completeExceptionally(throwable);
                  } else {
                        callback.accept(result);
                        completableFuture.complete(result);
                  }
                });
    }

[*]控制层循环处理请求
[*]BatchProcessingService类中处理每一个请求,并从线程中获取处理结果,并回调返回
3.业务处理
 
 
public ProcessResult dealBusiness(BatchRequest request) {
      // 模拟处理逻辑
      String id = request.getId();
      Integer idInt = Integer.valueOf(id) == null ? 0 : Integer.valueOf(id);
      if (StringUtils.isNoneBlank(id) && idInt == 3){
            try {
                // 模拟随机异常
                if (new Random().nextBoolean()) {
                  throw new RuntimeException("Simulated business exception");
                }
            } catch (IllegalArgumentException exception) {
                // 处理特定异常
                return new ProcessResult(
                        request.getId(),
                        false,
                        "处理失败",
                        null,
                        -1,
                        exception
                );
            }
            /*new ProcessResult(
                  request.getId(),
                  false,
                  "处理失败",
                  null,
                  -1,
                  new Exception("处理出现异常")
            );*/
      }
      if (StringUtils.isNoneBlank(id) && idInt % 2 == 0){
            return new ProcessResult(
                  request.getId(),
                  false,
                  "处理失败",
                  null,
                  -1,
                     null
            );
      } else {
            return new ProcessResult(
                  request.getId(),
                  true,
                  "处理成功",
                  request.getData(),
                  -1,
                     null
            );
      }
    } 
4.对处理结果进行合并
return CompletableFuture.allOf(futures.toArray(new CompletableFuture))
                .thenApply(v -> futures.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList())); 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

睁扼妤 发表于 2026-1-18 13:55:08

这个好,看起来很实用

曲愍糙 发表于 2026-1-20 16:57:10

很好很强大我过来先占个楼 待编辑

恿深疏 发表于 2026-1-21 16:44:04

谢谢楼主提供!

拙因 发表于 2026-1-21 17:19:53

这个好,看起来很实用

嶝扁 发表于 2026-1-23 10:19:21

感谢分享

荡俊屯 发表于 2026-1-26 02:53:25

感谢分享,下载保存了,貌似很强大

呈步 发表于 5 天前

收藏一下   不知道什么时候能用到

庞悦 发表于 8 小时前

感谢发布原创作品,程序园因你更精彩
页: [1]
查看完整版本: 批量发送请求后立即返回线程的处理结果(不阻塞)