找回密码
 立即注册
首页 业界区 安全 批量发送请求后立即返回线程的处理结果(不阻塞) ...

批量发送请求后立即返回线程的处理结果(不阻塞)

尝琨 2026-1-17 21:50:15
基于Spring Boot的CompletableFuture与Consumer批量处理方案
方案概述:
    批量请求处理:用户发送多条数据请求,并发处理每条数据线程池执行:使用自定义线程池处理具体业务逻辑
  • 异步结果获取:通过CompletableFuture获取线程处理结果
  • 结果回调:使用Consumer模式处理完成后的结果
核心实现分析:
1. 线程池配置
  1. package org.completablefuture.consumer;
  2. import org.springframework.context.annotation.Bean;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
  5. import java.util.concurrent.Executor;
  6. @Configuration
  7. public class AsyncConfig {
  8.     @Bean("taskExecutor")
  9.     public Executor taskExecutor() {
  10.         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  11.         executor.setCorePoolSize(10);
  12.         executor.setMaxPoolSize(20);
  13.         executor.setQueueCapacity(100);
  14.         executor.setThreadNamePrefix("BatchProcessing-");
  15.         executor.setRejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy());
  16.         executor.initialize();
  17.         return executor;
  18.     }
  19. }
复制代码

  • 配置专用线程池处理批量任务
  • 核心线程5个,最大线程10个,队列容量20
2.批量处理服务
  1. List<CompletableFuture<ProcessResult>> futures = requests.stream()
  2.                 .map(request -> {
  3.                     CompletableFuture<ProcessResult> future = new CompletableFuture<>();
  4.                     batchProcessingService.process(request, result -> {}, future);
  5.                     log.info("batch处理返回的future: {}", future);
  6.                     return future;
  7.                 })
  8.                 .collect(Collectors.toList());
复制代码
  1. /**
  2.      * 单个处理请求,请求处理完成后立即回调
  3.      *
  4.      * @param request
  5.      * @param callback
  6.      * @param completableFuture
  7.      */
  8.     public void process(BatchRequest request,
  9.                         Consumer<ProcessResult> callback,
  10.                         CompletableFuture<ProcessResult> completableFuture) {
  11.         log.info("进入process request: {}", request);
  12.         CompletableFuture.supplyAsync(() -> businessService.dealBusiness(request), taskExecutor)
  13.                 .whenComplete((result, throwable) -> {
  14.                     if (throwable != null) {
  15.                         ProcessResult errorResult = new ProcessResult(
  16.                                 request.getId(),
  17.                                 false,
  18.                                 "处理失败: " + throwable.getMessage(),
  19.                                 null,
  20.                                 0,
  21.                                 null
  22.                         );
  23.                         callback.accept(errorResult);
  24.                         completableFuture.completeExceptionally(throwable);
  25.                     } else {
  26.                         callback.accept(result);
  27.                         completableFuture.complete(result);
  28.                     }
  29.                 });
  30.     }
复制代码

  • 控制层循环处理请求
  • BatchProcessingService类中处理每一个请求,并从线程中获取处理结果,并回调返回
3.业务处理
 
 
  1. public ProcessResult dealBusiness(BatchRequest request) {
  2.         // 模拟处理逻辑
  3.         String id = request.getId();
  4.         Integer idInt = Integer.valueOf(id) == null ? 0 : Integer.valueOf(id);
  5.         if (StringUtils.isNoneBlank(id) && idInt == 3){
  6.             try {
  7.                 // 模拟随机异常
  8.                 if (new Random().nextBoolean()) {
  9.                     throw new RuntimeException("Simulated business exception");
  10.                 }
  11.             } catch (IllegalArgumentException exception) {
  12.                 // 处理特定异常
  13.                 return new ProcessResult(
  14.                         request.getId(),
  15.                         false,
  16.                         "处理失败",
  17.                         null,
  18.                         -1,
  19.                         exception
  20.                 );
  21.             }
  22.             /*new ProcessResult(
  23.                     request.getId(),
  24.                     false,
  25.                     "处理失败",
  26.                     null,
  27.                     -1,
  28.                     new Exception("处理出现异常")
  29.             );*/
  30.         }
  31.         if (StringUtils.isNoneBlank(id) && idInt % 2 == 0){
  32.             return new ProcessResult(
  33.                     request.getId(),
  34.                     false,
  35.                     "处理失败",
  36.                     null,
  37.                     -1,
  38.                      null
  39.             );
  40.         } else {
  41.             return new ProcessResult(
  42.                     request.getId(),
  43.                     true,
  44.                     "处理成功",
  45.                     request.getData(),
  46.                     -1,
  47.                      null
  48.             );
  49.         }
  50.     }
复制代码
 
4.对处理结果进行合并
  1. return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
  2.                 .thenApply(v -> futures.stream()
  3.                         .map(CompletableFuture::join)
  4.                         .collect(Collectors.toList()));
复制代码
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

2026-1-23 08:04:54

举报

昨天 03:28

举报

您需要登录后才可以回帖 登录 | 立即注册