Java中的CompletableFuture详解
字数 924 2025-11-05 08:31:57
Java中的CompletableFuture详解
一、CompletableFuture概述
CompletableFuture是Java 8引入的一个强大的异步编程工具类,位于java.util.concurrent包中。它实现了Future和CompletionStage接口,不仅支持获取异步计算结果,还提供了丰富的回调函数和组合操作,能够优雅地处理复杂的异步任务链。
二、核心特性与优势
- 异步执行:可以在后台线程中执行耗时任务,避免阻塞主线程
- 链式调用:支持将多个异步任务串联或并联执行
- 异常处理:提供完整的异常处理机制
- 结果转换:能够对异步结果进行各种转换和加工
- 组合操作:支持多个CompletableFuture的组合操作
三、创建CompletableFuture的四种方式
3.1 使用runAsync()创建无返回值的任务
// 创建异步任务,不返回结果
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("异步任务执行中...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("异步任务完成");
});
3.2 使用supplyAsync()创建有返回值的任务
// 创建异步任务,返回计算结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("计算任务开始");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "计算结果";
});
3.3 使用completedFuture()创建已完成的Future
// 直接创建已经完成的任务
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("立即完成的结果");
3.4 自定义线程池
// 使用自定义线程池执行异步任务
ExecutorService customExecutor = Executors.newFixedThreadPool(5);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return "使用自定义线程池执行";
}, customExecutor);
四、结果获取与基本操作
4.1 同步获取结果
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
try {
String result = future.get(); // 阻塞直到获取结果
System.out.println(result); // 输出: Hello
} catch (Exception e) {
e.printStackTrace();
}
4.2 异步回调处理
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "World");
// thenApply: 对结果进行转换
CompletableFuture<String> transformed = future.thenApply(result -> "Hello " + result);
// thenAccept: 消费结果,不返回新值
future.thenAccept(result -> System.out.println("接收到结果: " + result));
// thenRun: 任务完成后执行操作,不关心结果
future.thenRun(() -> System.out.println("任务完成"));
五、组合操作详解
5.1 thenCompose() - 链式组合
// 模拟用户信息查询链
CompletableFuture<String> getUserInfo = CompletableFuture.supplyAsync(() -> "用户123");
// thenCompose用于连接两个有依赖关系的异步任务
CompletableFuture<String> result = getUserInfo.thenCompose(userId ->
CompletableFuture.supplyAsync(() -> userId + "的详细信息")
);
System.out.println(result.get()); // 输出: 用户123的详细信息
5.2 thenCombine() - 并行组合
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
// 等两个任务都完成后,合并它们的结果
CompletableFuture<Integer> combined = future1.thenCombine(future2, (result1, result2) ->
result1 + result2
);
System.out.println(combined.get()); // 输出: 30
5.3 allOf() - 等待所有任务完成
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "任务1");
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "任务2");
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> "任务3");
CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);
allTasks.thenRun(() -> {
System.out.println("所有任务都已完成");
try {
System.out.println("任务1结果: " + task1.get());
System.out.println("任务2结果: " + task2.get());
System.out.println("任务3结果: " + task3.get());
} catch (Exception e) {
e.printStackTrace();
}
});
5.4 anyOf() - 等待任意任务完成
CompletableFuture<String> fastTask = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(100); } catch (InterruptedException e) {}
return "快速任务";
});
CompletableFuture<String> slowTask = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "慢速任务";
});
CompletableFuture<Object> firstCompleted = CompletableFuture.anyOf(fastTask, slowTask);
System.out.println("最先完成的任务: " + firstCompleted.get()); // 输出: 快速任务
六、异常处理机制
6.1 exceptionally() - 异常恢复
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (true) throw new RuntimeException("模拟异常");
return "正常结果";
});
// 异常发生时提供默认值
CompletableFuture<String> safeFuture = future.exceptionally(throwable -> {
System.out.println("捕获异常: " + throwable.getMessage());
return "默认结果";
});
System.out.println(safeFuture.get()); // 输出: 默认结果
6.2 handle() - 统一处理正常和异常情况
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "成功结果";
});
CompletableFuture<String> handled = future.handle((result, throwable) -> {
if (throwable != null) {
return "异常处理结果";
}
return result;
});
6.3 whenComplete() - 完成时回调
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "测试任务");
future.whenComplete((result, throwable) -> {
if (throwable != null) {
System.out.println("任务失败: " + throwable.getMessage());
} else {
System.out.println("任务成功: " + result);
}
});
七、实际应用场景
7.1 并行调用多个服务
// 模拟并行调用三个微服务
CompletableFuture<String> userService = CompletableFuture.supplyAsync(() -> "用户数据");
CompletableFuture<String> orderService = CompletableFuture.supplyAsync(() -> "订单数据");
CompletableFuture<String> productService = CompletableFuture.supplyAsync(() -> "商品数据");
CompletableFuture<Void> allServices = CompletableFuture.allOf(userService, orderService, productService);
allServices.thenRun(() -> {
try {
String user = userService.get();
String order = orderService.get();
String product = productService.get();
System.out.println("聚合结果: " + user + ", " + order + ", " + product);
} catch (Exception e) {
e.printStackTrace();
}
});
7.2 异步任务流水线
CompletableFuture<String> pipeline = CompletableFuture
.supplyAsync(() -> "原始数据") // 第一步:获取数据
.thenApplyAsync(data -> data + " -> 处理后") // 第二步:处理数据
.thenApplyAsync(data -> data + " -> 验证后") // 第三步:验证数据
.thenApplyAsync(data -> data + " -> 最终结果"); // 第四步:生成最终结果
pipeline.thenAccept(finalResult ->
System.out.println("流水线结果: " + finalResult)
);
八、最佳实践与注意事项
- 合理使用线程池:避免使用默认的ForkJoinPool处理阻塞IO操作
- 异常处理:始终为异步任务添加异常处理逻辑
- 资源清理:及时关闭自定义的线程池
- 避免阻塞:尽量使用回调而非get()方法
- 超时控制:使用orTimeout()方法设置超时时间
CompletableFuture为Java异步编程提供了强大的支持,通过熟练掌握其各种操作符,可以编写出高效、可读性强的异步代码。