找回密码
 立即注册
首页 业界区 业界 【CompletableFuture 核心操作全解】详细注释版 ...

【CompletableFuture 核心操作全解】详细注释版

公西颖初 2025-9-18 15:09:06
一、任务创建操作

1. runAsync() - 执行无返回值的异步任务
  1. /**
  2. * 创建并执行无返回值的异步任务
  3. *
  4. * @param runnable 要执行的任务逻辑(无返回值)
  5. * @return CompletableFuture<Void> 表示任务执行状态的Future对象
  6. *
  7. * 特点:
  8. * - 任务在ForkJoinPool.commonPool()中执行
  9. * - 完成后future结果为null
  10. * - 适合执行后台操作、日志记录等不需要返回值的场景
  11. */
  12. CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
  13.     System.out.println("执行数据库清理任务...");
  14.     cleanDatabase(); // 执行耗时清理操作
  15.     System.out.println("数据库清理完成");
  16. });
复制代码
2. supplyAsync() - 执行有返回值的异步任务
  1. /**
  2. * 创建并执行有返回值的异步任务
  3. *
  4. * @param supplier 提供返回值的任务逻辑
  5. * @return CompletableFuture<T> 包含任务结果的Future对象
  6. *
  7. * 特点:
  8. * - 默认使用ForkJoinPool.commonPool()
  9. * - 可指定自定义线程池(第二个参数)
  10. * - 适合执行需要返回结果的计算或IO操作
  11. */
  12. CompletableFuture<String> dataFuture = CompletableFuture.supplyAsync(() -> {
  13.     System.out.println("开始从远程API获取数据...");
  14.     String result = fetchDataFromAPI(); // 模拟网络请求
  15.     System.out.println("数据获取完成");
  16.     return result; // 返回获取的数据
  17. });
  18. // 使用自定义线程池
  19. ExecutorService customPool = Executors.newFixedThreadPool(4);
  20. CompletableFuture<Integer> calculationFuture = CompletableFuture.supplyAsync(() -> {
  21.     System.out.println("在自定义线程池中执行复杂计算...");
  22.     return heavyCalculation(); // 返回计算结果
  23. }, customPool);
复制代码
二、结果转换操作

1. thenApply() - 同步转换结果
  1. /**
  2. * 同步转换前阶段的结果
  3. *
  4. * @param function 转换函数(接受前一阶段结果,返回新结果)
  5. * @return CompletableFuture<U> 包含转换结果的新Future
  6. *
  7. * 特点:
  8. * - 在前一任务完成线程中同步执行
  9. * - 会阻塞完成线程直到转换结束
  10. * - 适合快速、非耗时的转换操作
  11. */
  12. CompletableFuture<String> upperCaseFuture = dataFuture.thenApply(rawData -> {
  13.     System.out.println("同步转换原始数据...");
  14.     return rawData.toUpperCase(); // 立即执行转换
  15. });
  16. // 链式调用示例
  17. CompletableFuture<Integer> lengthFuture = dataFuture
  18.     .thenApply(String::trim)          // 第一步:去除空格
  19.     .thenApply(String::length);       // 第二步:计算长度
复制代码
2. thenApplyAsync() - 异步转换结果
  1. /**
  2. * 异步转换前阶段的结果
  3. *
  4. * @param function 转换函数
  5. * @param executor 可选,指定执行转换的线程池
  6. * @return CompletableFuture<U> 包含转换结果的新Future
  7. *
  8. * 特点:
  9. * - 在独立线程中执行转换
  10. * - 不会阻塞前一任务的完成线程
  11. * - 适合耗时较长的转换操作
  12. */
  13. CompletableFuture<Report> reportFuture = dataFuture.thenApplyAsync(rawData -> {
  14.     System.out.println("在异步线程中生成报告...");
  15.     return generateReport(rawData); // 耗时报告生成
  16. });
  17. // 使用自定义线程池
  18. ExecutorService reportPool = Executors.newFixedThreadPool(2);
  19. CompletableFuture analysisFuture = dataFuture.thenApplyAsync(rawData -> {
  20.     System.out.println("在专用线程池中执行数据分析...");
  21.     return analyzeData(rawData); // 复杂数据分析
  22. }, reportPool);
复制代码
三、结果消费操作

1. thenAccept() - 消费结果(有输入)
  1. /**
  2. * 消费前阶段的结果(无返回值)
  3. *
  4. * @param consumer 消费函数(接受前一阶段结果)
  5. * @return CompletableFuture<Void>
  6. *
  7. * 特点:
  8. * - 接收前一阶段结果作为输入
  9. * - 不产生返回值
  10. * - 适合日志记录、结果存储等操作
  11. */
  12. dataFuture.thenAccept(result -> {
  13.     System.out.println("消费获取到的数据: " + result.substring(0, 10) + "...");
  14.     saveToDatabase(result); // 将结果保存到数据库
  15. });
复制代码
2. thenRun() - 执行操作(无输入)
  1. /**
  2. * 在前阶段完成后执行操作(无输入参数)
  3. *
  4. * @param runnable 要执行的操作
  5. * @return CompletableFuture<Void>
  6. *
  7. * 特点:
  8. * - 不接受前阶段结果
  9. * - 无输入参数
  10. * - 适合执行清理、状态更新等操作
  11. */
  12. dataFuture.thenRun(() -> {
  13.     System.out.println("数据操作完成,释放资源...");
  14.     releaseResources(); // 释放使用的资源
  15. });
复制代码
四、任务组合操作

1. thenCompose() - 链式组合
  1. /**
  2. * 链式组合两个异步任务(flatMap)
  3. *
  4. * @param function 接受前阶段结果,返回新CompletableFuture的函数
  5. * @return CompletableFuture<U> 组合后的新Future
  6. *
  7. * 特点:
  8. * - 解决Future<Future<T>>嵌套问题
  9. * - 形成异步任务流水线
  10. * - 前阶段结果作为下一任务输入
  11. */
  12. CompletableFuture<Integer> userIdFuture = CompletableFuture.supplyAsync(() -> {
  13.     System.out.println("获取用户ID...");
  14.     return getUserId(); // 返回用户ID
  15. });
  16. // 使用用户ID获取详情
  17. CompletableFuture<User> userDetailFuture = userIdFuture.thenCompose(userId -> {
  18.     System.out.println("使用用户ID获取详情: " + userId);
  19.     return getUserDetailAsync(userId); // 返回新的Future
  20. });
复制代码
2. thenCombine() - 并行组合
  1. /**
  2. * 并行组合两个独立任务
  3. *
  4. * @param other 另一个CompletableFuture
  5. * @param bifunction 合并两个结果的函数
  6. * @return CompletableFuture<V> 合并结果的新Future
  7. *
  8. * 特点:
  9. * - 等待两个任务都完成
  10. * - 合并两个任务的结果
  11. * - 适合聚合多个独立操作的结果
  12. */
  13. CompletableFuture<Double> priceFuture = CompletableFuture.supplyAsync(() -> {
  14.     System.out.println("获取商品价格...");
  15.     return getProductPrice();
  16. });
  17. CompletableFuture<Double> rateFuture = CompletableFuture.supplyAsync(() -> {
  18.     System.out.println("获取汇率...");
  19.     return getExchangeRate();
  20. });
  21. // 组合两个独立任务的结果
  22. CompletableFuture<Double> localPriceFuture = priceFuture.thenCombine(rateFuture, (price, rate) -> {
  23.     System.out.println("计算本地价格: " + price + " * " + rate);
  24.     return price * rate; // 合并计算结果
  25. });
复制代码
3. acceptEither() - 消费最先完成的结果
  1. /**
  2. * 消费最先完成的任务结果
  3. *
  4. * @param other 另一个CompletableFuture
  5. * @param consumer 消费函数
  6. * @return CompletableFuture<Void>
  7. *
  8. * 特点:
  9. * - 任一任务完成即触发消费
  10. * - 未完成任务继续执行但不处理结果
  11. * - 适合快速响应场景
  12. */
  13. CompletableFuture<String> primaryService = queryService("主服务");
  14. CompletableFuture<String> backupService = queryService("备用服务");
  15. // 使用最先返回的结果
  16. primaryService.acceptEither(backupService, result -> {
  17.     System.out.println("使用服务响应: " + result);
  18.     displayToUser(result); // 向用户显示结果
  19. });
复制代码
五、多任务协调

1. allOf() - 等待所有任务完成
  1. /**
  2. * 等待所有任务完成
  3. *
  4. * @param futures 多个CompletableFuture
  5. * @return CompletableFuture<Void> 所有任务完成时结束
  6. *
  7. * 特点:
  8. * - 所有任务完成时返回
  9. * - 需要手动获取各任务结果
  10. * - 适合聚合多个异步操作结果
  11. */
  12. CompletableFuture<String> task1 = supplyAsync(() -> "结果1");
  13. CompletableFuture<String> task2 = supplyAsync(() -> "结果2");
  14. CompletableFuture<String> task3 = supplyAsync(() -> "结果3");
  15. CompletableFuture<Void> allFutures = CompletableFuture.allOf(task1, task2, task3);
  16. // 所有任务完成后处理
  17. allFutures.thenRun(() -> {
  18.     // 使用join()获取结果(不会抛受检异常)
  19.     String result1 = task1.join();
  20.     String result2 = task2.join();
  21.     String result3 = task3.join();
  22.    
  23.     System.out.println("所有任务完成,聚合结果: "
  24.         + result1 + ", " + result2 + ", " + result3);
  25. });
复制代码
2. anyOf() - 等待任一任务完成
  1. /**
  2. * 任一任务完成即返回
  3. *
  4. * @param futures 多个CompletableFuture
  5. * @return CompletableFuture<Object> 包含首个完成的结果
  6. *
  7. * 特点:
  8. * - 返回Object类型结果(需类型转换)
  9. * - 未完成任务继续执行
  10. * - 适合竞态条件场景
  11. */
  12. CompletableFuture<String> cacheQuery = queryCache();
  13. CompletableFuture<String> dbQuery = queryDatabase();
  14. CompletableFuture<Object> firstResult = CompletableFuture.anyOf(cacheQuery, dbQuery);
  15. firstResult.thenAccept(result -> {
  16.     // 需要显式类型转换
  17.     String data = (String) result;
  18.     System.out.println("最先返回的结果: " + data);
  19. });
复制代码
六、异常处理

1. exceptionally() - 异常恢复
  1. /**
  2. * 捕获异常并返回替代值
  3. *
  4. * @param function 异常处理函数
  5. * @return CompletableFuture<T> 包含正常结果或替代值的新Future
  6. *
  7. * 特点:
  8. * - 仅在前阶段异常时触发
  9. * - 可以恢复为正常结果
  10. * - 相当于catch块
  11. */
  12. CompletableFuture<Integer> riskyFuture = CompletableFuture.supplyAsync(() -> {
  13.     if (Math.random() > 0.5) {
  14.         throw new RuntimeException("随机错误");
  15.     }
  16.     return 42;
  17. });
  18. CompletableFuture<Integer> safeFuture = riskyFuture.exceptionally(ex -> {
  19.     System.err.println("捕获异常: " + ex.getMessage());
  20.     return 0; // 提供默认值
  21. });
复制代码
2. handle() - 双结果处理
  1. /**
  2. * 处理正常结果和异常
  3. *
  4. * @param bifunction 接受结果和异常的处理函数
  5. * @return CompletableFuture<U> 处理后的新Future
  6. *
  7. * 特点:
  8. * - 无论成功失败都会执行
  9. * - 可同时访问结果和异常
  10. * - 必须返回新结果
  11. */
  12. CompletableFuture<String> apiCall = fetchFromAPI();
  13. CompletableFuture<String> processed = apiCall.handle((result, ex) -> {
  14.     if (ex != null) {
  15.         System.err.println("API调用失败: " + ex.getMessage());
  16.         return "默认数据"; // 异常时返回默认值
  17.     }
  18.     return "处理后的: " + result.toUpperCase(); // 正常时转换结果
  19. });
复制代码
3. whenComplete() - 完成回调
  1. /**
  2. * 完成时回调(不改变结果)
  3. *
  4. * @param biconsumer 接受结果和异常的回调函数
  5. * @return CompletableFuture<T> 保留原始结果的新Future
  6. *
  7. * 特点:
  8. * - 类似finally块
  9. * - 可访问结果或异常
  10. * - 不影响原始结果
  11. */
  12. dataFuture.whenComplete((result, ex) -> {
  13.     if (ex != null) {
  14.         System.err.println("数据处理失败: " + ex.getMessage());
  15.         metrics.recordFailure();
  16.     } else {
  17.         System.out.println("数据处理成功,长度: " + result.length());
  18.         metrics.recordSuccess();
  19.     }
  20. });
复制代码
七、完成控制(Java 9+)

1. completeOnTimeout() - 超时默认值
  1. /**
  2. * 超时提供默认值
  3. *
  4. * @param value 默认值
  5. * @param timeout 超时时间
  6. * @param unit 时间单位
  7. * @return CompletableFuture<T>
  8. *
  9. * 特点:
  10. * - 超时后自动完成并返回默认值
  11. * - 原始任务继续执行但结果被忽略
  12. * - 避免无限期等待
  13. */
  14. CompletableFuture<String> slowService = callSlowService();
  15. CompletableFuture<String> withTimeout = slowService
  16.     .completeOnTimeout("超时默认值", 2, TimeUnit.SECONDS);
复制代码
2. orTimeout() - 超时异常
  1. /**
  2. * 超时抛出异常
  3. *
  4. * @param timeout 超时时间
  5. * @param unit 时间单位
  6. * @return CompletableFuture<T>
  7. *
  8. * 特点:
  9. * - 超时后抛出TimeoutException
  10. * - 可配合exceptionally处理
  11. * - 强制设置最大等待时间
  12. */
  13. CompletableFuture<String> serviceCall = externalService()
  14.     .orTimeout(500, TimeUnit.MILLISECONDS)
  15.     .exceptionally(ex -> {
  16.         if (ex.getCause() instanceof TimeoutException) {
  17.             return "超时回退值";
  18.         }
  19.         return "其他错误回退值";
  20.     });
复制代码
八、完成状态操作

1. complete() - 手动设置结果
  1. /**
  2. * 手动设置结果值
  3. *
  4. * @param value 要设置的结果
  5. * @return boolean 是否设置成功(如果已完成返回false)
  6. *
  7. * 特点:
  8. * - 强制完成future
  9. * - 唤醒所有等待线程
  10. * - 如果已完成则无效
  11. */
  12. CompletableFuture<String> manualFuture = new CompletableFuture<>();
  13. // 外部线程完成
  14. new Thread(() -> {
  15.     try {
  16.         String result = computeResult();
  17.         boolean set = manualFuture.complete(result);
  18.         System.out.println("手动设置结果: " + set);
  19.     } catch (Exception e) {
  20.         manualFuture.completeExceptionally(e);
  21.     }
  22. }).start();
复制代码
2. completeExceptionally() - 手动设置异常
  1. /**
  2. * 手动设置异常
  3. *
  4. * @param ex 要设置的异常
  5. * @return boolean 是否设置成功
  6. *
  7. * 特点:
  8. * - 以异常状态完成future
  9. * - 触发异常处理链
  10. * - 模拟服务失败场景
  11. */
  12. CompletableFuture<String> simulatedFailure = new CompletableFuture<>();
  13. // 模拟失败
  14. simulatedFailure.completeExceptionally(new RuntimeException("模拟异常"));
  15. simulatedFuture.exceptionally(ex -> {
  16.     System.out.println("捕获模拟异常: " + ex.getMessage());
  17.     return "回退值";
  18. });
复制代码
操作对比总结表

操作类型方法名特点适用场景线程行为任务创建runAsync无返回值后台任务、清理操作异步执行supplyAsync有返回值数据获取、计算任务异步执行结果转换thenApply同步转换快速转换、数据预处理使用前一任务线程thenApplyAsync异步转换耗时操作、IO处理使用新线程结果消费thenAccept消费结果结果存储、日志记录使用前一任务线程thenRun无参操作资源清理、状态更新使用前一任务线程任务组合thenCompose链式组合任务流水线、依赖操作使用前一任务线程thenCombine并行组合聚合独立任务结果使用前一任务线程acceptEither消费首个结果竞速服务、降级策略使用完成任务的线程多任务allOf等待所有批量处理、聚合数据任意完成线程anyOf等待任一快速响应、超时处理首个完成线程异常处理exceptionally异常恢复提供默认值异常发生线程handle双结果处理统一处理成功/失败使用前一任务线程whenComplete完成回调资源清理、指标记录使用前一任务线程完成控制complete手动设值外部控制、测试模拟调用者线程completeExceptionally手动设异常模拟失败、错误注入调用者线程关键执行机制详解

1. 线程传递规则


  • 非Async方法:在前一阶段的任务线程中同步执行
  • Async方法:默认在ForkJoinPool.commonPool()执行
  • 带Executor参数的Async方法:在指定线程池执行
2. 结果传递流程

graph TD    A[任务完成] --> B{是否有依赖操作}    B -->|有| C[触发依赖动作]    C --> D[创建新阶段]    D --> E[存储结果]    E --> F[触发下一依赖]    B -->|无| G[保存结果待后续访问]3. 异常传播机制


  • 异常会沿调用链向后传播
  • 直到遇到exceptionally()或handle()处理
  • 未处理的异常会导致整个链失败
  • whenComplete()可访问异常但不处理
4. 依赖管理策略


  • 使用栈结构存储依赖关系
  • 完成后按后进先出(LIFO)顺序触发
  • 保证依赖链的有序执行
  • 避免嵌套过深导致的栈溢出问题
掌握这些核心操作的详细用法和执行特性,能够帮助开发者构建高效、健壮的异步处理系统,有效管理复杂的异步任务编排。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册