
在 Java 开发领域,CompletableFuture 就像一位深藏不露的高手。自 Java 8 引入以来,它彻底改变了异步编程的游戏规则,却仍有许多开发者停留在 Future 的时代,或在回调地狱中挣扎。
想象这样一个场景:用户下单后,你需要调用库存服务扣减库存、调用支付服务处理付款、调用物流服务创建物流单,最后发送通知给用户。如果用传统的同步方式,总耗时是四个服务耗时的总和;如果用 CompletableFuture 实现并行处理,总耗时将接近耗时最长的那个服务。这就是异步编程的魔力,也是 CompletableFuture 的价值所在。
本文将带你全面掌握 CompletableFuture,从基础 API 到高级实战,从底层原理到性能优化,让你真正理解并能灵活运用这一强大工具,写出高效、优雅的异步代码。
Java 5 引入的 Future 接口为异步编程打开了一扇门,但它的能力非常有限:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@Slf4j
public class FutureLimitExample {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交任务
Future<String> future = executor.submit(() -> {
log.info("执行耗时任务");
Thread.sleep(2000);
return "任务结果";
});
// 必须阻塞等待结果,无法优雅处理
log.info("等待结果...");
String result = future.get(); // 阻塞在这里
log.info("获取结果: {}", result);
executor.shutdown();
}
}
这种方式在处理复杂异步场景时显得力不从心,于是 CompletableFuture 应运而生。
CompletableFuture 实现了 Future 和 CompletionStage 两个接口,前者保证了与原有 Future 体系的兼容性,后者则提供了丰富的异步处理能力。其核心优势包括:

CompletableFuture 提供了多种创建方式,适用于不同场景:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class CompletableFutureCreation {
// 自定义线程池,推荐使用而非默认线程池
private static final ExecutorService CUSTOM_EXECUTOR = Executors.newFixedThreadPool(3);
public static void main(String[] args) throws Exception {
// 1. 无返回值,使用默认线程池
CompletableFuture.runAsync(() -> {
log.info("执行无返回值的异步任务");
});
// 2. 无返回值,使用自定义线程池
CompletableFuture.runAsync(() -> {
log.info("使用自定义线程池执行无返回值任务");
}, CUSTOM_EXECUTOR);
// 3. 有返回值,使用默认线程池
CompletableFuture<String> supplyDefault = CompletableFuture.supplyAsync(() -> {
log.info("执行有返回值的异步任务");
return "任务结果";
});
// 4. 有返回值,使用自定义线程池
CompletableFuture<String> supplyCustom = CompletableFuture.supplyAsync(() -> {
log.info("使用自定义线程池执行有返回值任务");
return "自定义线程池任务结果";
}, CUSTOM_EXECUTOR);
// 等待所有任务完成
CompletableFuture.allOf(supplyDefault, supplyCustom).join();
log.info("supplyDefault结果: {}", supplyDefault.get());
log.info("supplyCustom结果: {}", supplyCustom.get());
CUSTOM_EXECUTOR.shutdown();
}
}
注意:生产环境中应始终使用自定义线程池,避免使用默认的 ForkJoinPool.commonPool (),后者是 JVM 共享的,可能受到其他组件影响。
CompletableFuture 的强大之处在于其丰富的实例方法,这些方法可以分为几大类:结果处理、任务组合、异常处理等。掌握这些方法是灵活运用 CompletableFuture 的关键。
当异步任务完成后,我们通常需要对结果进行处理,CompletableFuture 提供了三种常用方式:

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
@Slf4j
public class ResultHandlingMethods {
public static void main(String[] args) {
// thenApply:转换结果
CompletableFuture<Integer> thenApplyFuture = CompletableFuture.supplyAsync(() -> {
log.info("计算原始值");
return 10;
}).thenApply(original -> {
log.info("将原始值 {} 乘以 2", original);
return original * 2;
});
// thenAccept:消费结果
CompletableFuture<Void> thenAcceptFuture = thenApplyFuture.thenAccept(result -> {
log.info("thenApply的结果是: {}", result);
});
// thenRun:执行后续操作
CompletableFuture<Void> thenRunFuture = thenAcceptFuture.thenRun(() -> {
log.info("所有操作完成,执行最终处理");
});
// 等待所有操作完成
thenRunFuture.join();
}
}
上面介绍的方法默认在当前线程(可能是任务执行线程)中执行后续操作。如果希望后续操作也在异步线程中执行,需要使用它们的异步版本:
这些异步方法有两个重载版本:一个使用默认线程池,一个可以指定线程池。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j
public class AsyncProcessingMethods {
private static final ExecutorService PROCESS_EXECUTOR = Executors.newSingleThreadExecutor(r ->
new Thread(r, "process-thread")
);
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
log.info("执行任务的线程: {}", Thread.currentThread().getName());
return "原始结果";
}).thenApplyAsync(result -> {
log.info("处理结果的线程: {}", Thread.currentThread().getName());
return result + " - 已处理";
}, PROCESS_EXECUTOR); // 使用指定线程池处理
log.info("最终结果: {}", future.join());
PROCESS_EXECUTOR.shutdown();
}
}
运行此代码会发现,任务执行和结果处理在不同的线程中进行,这对于避免阻塞任务线程非常重要。
在实际开发中,我们经常需要组合多个异步任务。CompletableFuture 提供了两种主要方式:

示例代码:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
@Slf4j
public class TaskCombination {
public static void main(String[] args) {
// thenCompose:组合依赖任务
CompletableFuture<String> composeFuture = getUserById(1001)
.thenCompose(userId -> getOrderByUserId(userId));
// thenCombine:组合独立任务
CompletableFuture<String> combineFuture = getUserName(1001)
.thenCombine(getUserAge(1001), (name, age) ->
String.format("用户信息: 姓名=%s, 年龄=%d", name, age)
);
log.info("thenCompose结果: {}", composeFuture.join());
log.info("thenCombine结果: {}", combineFuture.join());
}
/**
* 根据ID获取用户ID(模拟)
* @param id 外部ID
* @return 用户ID
*/
private static CompletableFuture<Integer> getUserById(int id) {
return CompletableFuture.supplyAsync(() -> {
log.info("根据外部ID {} 获取用户ID", id);
return 10001; // 模拟用户ID
});
}
/**
* 根据用户ID获取订单
* @param userId 用户ID
* @return 订单信息
*/
private static CompletableFuture<String> getOrderByUserId(int userId) {
return CompletableFuture.supplyAsync(() -> {
log.info("根据用户ID {} 获取订单", userId);
return "订单信息: 用户=" + userId + ", 订单号=2023001";
});
}
/**
* 获取用户姓名
* @param userId 用户ID
* @return 用户名
*/
private static CompletableFuture<String> getUserName(int userId) {
return CompletableFuture.supplyAsync(() -> {
log.info("获取用户 {} 的姓名", userId);
return "张三";
});
}
/**
* 获取用户年龄
* @param userId 用户ID
* @return 年龄
*/
private static CompletableFuture<Integer> getUserAge(int userId) {
return CompletableFuture.supplyAsync(() -> {
log.info("获取用户 {} 的年龄", userId);
return 30;
});
}
}
注意:thenCompose 和 thenApply 的区别在于,thenCompose 接收的函数返回值是 CompletableFuture,避免了嵌套的 CompletableFuture(CompletableFuture<CompletableFuture<T>>)。
当需要处理多个异步任务时,CompletableFuture 提供了两个非常实用的静态方法:

import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
public class MultipleTasksHandling {
public static void main(String[] args) {
// 创建三个不同耗时的任务
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
log.error("任务1被中断", e);
}
log.info("任务1完成");
return "结果1";
});
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(1500);
} catch (InterruptedException e) {
log.error("任务2被中断", e);
}
log.info("任务2完成");
return "结果2";
});
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.MILLISECONDS.sleep(800);
} catch (InterruptedException e) {
log.error("任务3被中断", e);
}
log.info("任务3完成");
return "结果3";
});
// 等待所有任务完成
CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(task1, task2, task3);
allOfFuture.thenRun(() -> {
log.info("所有任务都已完成");
try {
log.info("任务1结果: {}", task1.get());
log.info("任务2结果: {}", task2.get());
log.info("任务3结果: {}", task3.get());
} catch (Exception e) {
log.error("获取任务结果异常", e);
}
}).join();
// 等待任意一个任务完成
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(task1, task2, task3);
anyOfFuture.thenAccept(result -> {
log.info("有一个任务已完成,结果: {}", result);
}).join();
}
}
注意:allOf 返回 CompletableFuture<Void>,无法直接获取各个任务的结果,需要单独获取;anyOf 返回 CompletableFuture<Object>,结果是第一个完成的任务的结果。
异步任务中的异常处理至关重要,CompletableFuture 提供了三种主要的异常处理方式:

示例代码:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
@Slf4j
public class ExceptionHandling {
public static void main(String[] args) {
// exceptionally:仅处理异常
CompletableFuture<String> exceptionallyFuture = CompletableFuture.supplyAsync(() -> {
log.info("执行可能抛出异常的任务");
if (true) {
throw new RuntimeException("任务执行失败");
}
return "正常结果";
}).exceptionally(ex -> {
log.error("捕获到异常", ex);
return "异常时的默认结果";
});
// handle:处理正常结果和异常
CompletableFuture<String> handleFuture = CompletableFuture.supplyAsync(() -> {
log.info("执行任务");
// 模拟随机成功或失败
if (Math.random() > 0.5) {
throw new RuntimeException("随机失败");
}
return "正常结果";
}).handle((result, ex) -> {
if (ex != null) {
log.error("handle捕获到异常", ex);
return "handle处理异常后的结果";
}
return result + " - 已通过handle处理";
});
// whenComplete:处理完成事件,不改变结果
CompletableFuture<String> whenCompleteFuture = CompletableFuture.supplyAsync(() -> {
log.info("执行任务");
return "正常结果";
}).whenComplete((result, ex) -> {
if (ex != null) {
log.error("whenComplete捕获到异常", ex);
} else {
log.info("whenComplete处理正常结果: {}", result);
}
});
log.info("exceptionally结果: {}", exceptionallyFuture.join());
log.info("handle结果: {}", handleFuture.join());
log.info("whenComplete结果: {}", whenCompleteFuture.join());
}
}
这三种方式的选择取决于具体需求:需要恢复异常并返回默认值用 exceptionally;需要统一处理正常和异常情况并转换结果用 handle;仅需要在完成时记录日志或清理资源用 whenComplete。
要真正掌握 CompletableFuture,理解其底层原理是必要的。本节将深入探讨 CompletableFuture 的实现机制。
CompletableFuture 内部通过一个 volatile 变量维护任务状态,主要状态包括:
状态转换关系如下:

这些状态通过 Unsafe 类进行 CAS 操作来保证原子性,确保多线程环境下的状态一致性。
CompletableFuture 的链式调用是通过维护依赖链实现的。每个 CompletableFuture 可以有多个依赖的 CompletableFuture(称为 "依赖者"),当一个 CompletableFuture 完成时,会通知所有依赖者执行。
这种依赖关系通过一个栈结构存储,当调用 thenApply 等方法时,会创建新的 CompletableFuture 并将其添加到当前任务的依赖栈中。
当任务完成时,CompletableFuture 会遍历依赖栈,依次触发每个依赖任务的执行,形成链式反应。这种设计确保了异步操作的高效串联。
CompletableFuture 在执行异步操作时,线程池选择遵循以下规则:
ForkJoinPool.commonPool () 是 JVM 级别的共享线程池,其默认并行度为 CPU 核心数减 1。可以通过系统属性调整:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=4
但在生产环境中,强烈建议使用自定义线程池,以便更好地控制资源使用和任务隔离。
理论知识需要结合实际应用才能真正内化。本节将通过几个典型场景,展示 CompletableFuture 的实战价值。
在微服务架构中,我们经常需要调用多个独立服务,然后汇总结果。使用 CompletableFuture 可以并行执行这些调用,大幅提升效率。
import com.alibaba.fastjson2.JSON;
import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@Slf4j
public class ParallelServiceCall {
// 自定义线程池
private static final ExecutorService SERVICE_EXECUTOR = Executors.newFixedThreadPool(5);
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
// 并行调用三个独立服务
CompletableFuture<UserInfo> userInfoFuture = CompletableFuture.supplyAsync(() ->
getUserInfo(1001), SERVICE_EXECUTOR
);
CompletableFuture<OrderList> orderListFuture = CompletableFuture.supplyAsync(() ->
getOrderList(1001), SERVICE_EXECUTOR
);
CompletableFuture<AddressList> addressListFuture = CompletableFuture.supplyAsync(() ->
getAddressList(1001), SERVICE_EXECUTOR
);
// 汇总结果
CompletableFuture<Map<String, Object>> resultFuture = CompletableFuture.allOf(
userInfoFuture, orderListFuture, addressListFuture
).thenApply(v -> {
Map<String, Object> resultMap = Maps.newHashMap();
try {
resultMap.put("userInfo", userInfoFuture.get());
resultMap.put("orders", orderListFuture.get());
resultMap.put("addresses", addressListFuture.get());
} catch (Exception e) {
log.error("汇总结果异常", e);
}
return resultMap;
});
// 获取最终结果
Map<String, Object> result = resultFuture.join();
log.info("汇总结果: {}", JSON.toJSONString(result));
long endTime = System.currentTimeMillis();
log.info("总耗时: {}ms", endTime - startTime);
SERVICE_EXECUTOR.shutdown();
}
/**
* 获取用户信息
*/
private static UserInfo getUserInfo(int userId) {
log.info("查询用户信息,userId: {}", userId);
try {
// 模拟服务调用耗时
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
log.error("查询用户信息中断", e);
}
return new UserInfo(userId, "张三", 30);
}
/**
* 获取订单列表
*/
private static OrderList getOrderList(int userId) {
log.info("查询订单列表,userId: {}", userId);
try {
// 模拟服务调用耗时
TimeUnit.MILLISECONDS.sleep(1500);
} catch (InterruptedException e) {
log.error("查询订单列表中断", e);
}
OrderList orderList = new OrderList();
orderList.setUserId(userId);
orderList.setOrders(CollectionUtils.newArrayList(
new Order(10001, "商品A"),
new Order(10002, "商品B")
));
return orderList;
}
/**
* 获取地址列表
*/
private static AddressList getAddressList(int userId) {
log.info("查询地址列表,userId: {}", userId);
try {
// 模拟服务调用耗时
TimeUnit.MILLISECONDS.sleep(800);
} catch (InterruptedException e) {
log.error("查询地址列表中断", e);
}
AddressList addressList = new AddressList();
addressList.setUserId(userId);
addressList.setAddresses(CollectionUtils.newArrayList(
new Address(20001, "北京市"),
new Address(20002, "上海市")
));
return addressList;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class UserInfo {
private int userId;
private String username;
private int age;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Order {
private int orderId;
private String productName;
}
@Data
@NoArgsConstructor
static class OrderList {
private int userId;
private java.util.List<Order> orders;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class Address {
private int addressId;
private String address;
}
@Data
@NoArgsConstructor
static class AddressList {
private int userId;
private java.util.List<Address> addresses;
}
}
这个示例中,三个服务调用并行执行,总耗时约 1500ms(等于耗时最长的服务),而不是三个服务耗时之和(约 3300ms),效率提升非常明显。
在分布式系统中,服务调用超时是常见问题。CompletableFuture 提供了优雅的超时控制方案:
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
public class TimeoutHandling {
// 自定义线程池
private static final java.util.concurrent.ExecutorService EXECUTOR =
java.util.concurrent.Executors.newFixedThreadPool(3);
public static void main(String[] args) {
// 方式1: 使用get(timeout, unit)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return callExternalService("服务A", 3000);
}, EXECUTOR);
try {
String result1 = future1.get(2, TimeUnit.SECONDS);
log.info("服务A结果: {}", result1);
} catch (Exception e) {
log.error("服务A调用超时或异常", e);
// 执行降级策略
log.info("服务A降级处理: 返回缓存数据");
}
// 方式2: 使用orTimeout (Java 9+)
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return callExternalService("服务B", 3000);
}, EXECUTOR)
.orTimeout(2, TimeUnit.SECONDS)
.exceptionally(ex -> {
log.error("服务B调用异常", ex);
return "服务B降级数据";
});
log.info("服务B结果: {}", future2.join());
// 方式3: 使用completeOnTimeout (Java 9+)
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
return callExternalService("服务C", 3000);
}, EXECUTOR)
.completeOnTimeout("服务C默认数据", 2, TimeUnit.SECONDS);
log.info("服务C结果: {}", future3.join());
EXECUTOR.shutdown();
}
/**
* 调用外部服务
* @param serviceName 服务名称
* @param delay 延迟时间(ms)
* @return 服务返回结果
*/
private static String callExternalService(String serviceName, int delay) {
log.info("调用外部服务: {}", serviceName);
try {
TimeUnit.MILLISECONDS.sleep(delay);
} catch (InterruptedException e) {
log.error("服务 {} 调用被中断", serviceName, e);
Thread.currentThread().interrupt();
}
return serviceName + "返回结果";
}
}
注意:orTimeout 和 completeOnTimeout 是 Java 9 引入的方法,如果使用 Java 8,可通过 thenApplyAsync 结合定时任务实现类似功能。
在 Spring Boot 应用中,CompletableFuture 可以与 @Async 注解结合,实现高效的异步接口:
1. 首先添加必要依赖 (pom.xml)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.1</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>completablefuture-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>completablefuture-demo</name>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.32</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.3-jre</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
2. 配置异步线程池
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class AsyncConfig {
/**
* 自定义异步线程池
*/
@Bean(name = "apiExecutor")
public Executor apiExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(10);
// 最大线程数
executor.setMaxPoolSize(20);
// 队列容量
executor.setQueueCapacity(100);
// 线程名称前缀
executor.setThreadNamePrefix("api-async-");
// 线程空闲时间
executor.setKeepAliveSeconds(60);
// 拒绝策略:当线程和队列都满时,由提交任务的线程执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
return executor;
}
}
3. 实现异步服务
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class ProductService {
/**
* 获取产品基本信息
* @param productId 产品ID
* @return 产品基本信息
*/
@Async("apiExecutor")
public CompletableFuture<ProductInfo> getProductInfo(Long productId) {
log.info("查询产品基本信息, productId: {}", productId);
try {
// 模拟数据库查询耗时
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
log.error("查询产品信息中断", e);
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
}
// 模拟返回产品信息
ProductInfo info = new ProductInfo();
info.setProductId(productId);
info.setName("高性能笔记本电脑");
info.setPrice(5999.99);
info.setStock(100);
return CompletableFuture.completedFuture(info);
}
/**
* 获取产品评价
* @param productId 产品ID
* @return 产品评价列表
*/
@Async("apiExecutor")
public CompletableFuture<ReviewList> getProductReviews(Long productId) {
log.info("查询产品评价, productId: {}", productId);
try {
// 模拟数据库查询耗时
TimeUnit.MILLISECONDS.sleep(800);
} catch (InterruptedException e) {
log.error("查询产品评价中断", e);
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
}
// 模拟返回评价列表
ReviewList reviewList = new ReviewList();
reviewList.setProductId(productId);
reviewList.setReviews(org.springframework.util.CollectionUtils.newArrayList(
new Review(1L, "非常好用", 5),
new Review(2L, "性能出色", 5),
new Review(3L, "性价比高", 4)
));
return CompletableFuture.completedFuture(reviewList);
}
/**
* 获取产品推荐
* @param productId 产品ID
* @return 推荐产品列表
*/
@Async("apiExecutor")
public CompletableFuture<RecommendationList> getRecommendations(Long productId) {
log.info("查询推荐产品, productId: {}", productId);
try {
// 模拟数据库查询耗时
TimeUnit.MILLISECONDS.sleep(600);
} catch (InterruptedException e) {
log.error("查询推荐产品中断", e);
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture(e);
}
// 模拟返回推荐列表
RecommendationList recommendationList = new RecommendationList();
recommendationList.setProductId(productId);
recommendationList.setRecommendations(org.springframework.util.CollectionUtils.newArrayList(
new Recommendation(1002L, "轻薄笔记本电脑", 4999.99),
new Recommendation(1003L, "游戏笔记本电脑", 7999.99)
));
return CompletableFuture.completedFuture(recommendationList);
}
@Data
@NoArgsConstructor
@Schema(description = "产品基本信息")
public static class ProductInfo {
@Schema(description = "产品ID")
private Long productId;
@Schema(description = "产品名称")
private String name;
@Schema(description = "产品价格")
private Double price;
@Schema(description = "库存数量")
private Integer stock;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "产品评价")
public static class Review {
@Schema(description = "评价ID")
private Long reviewId;
@Schema(description = "评价内容")
private String content;
@Schema(description = "评分(1-5)")
private Integer rating;
}
@Data
@NoArgsConstructor
@Schema(description = "产品评价列表")
public static class ReviewList {
@Schema(description = "产品ID")
private Long productId;
@Schema(description = "评价列表")
private java.util.List<Review> reviews;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "推荐产品")
public static class Recommendation {
@Schema(description = "产品ID")
private Long productId;
@Schema(description = "产品名称")
private String name;
@Schema(description = "产品价格")
private Double price;
}
@Data
@NoArgsConstructor
@Schema(description = "推荐产品列表")
public static class RecommendationList {
@Schema(description = "当前产品ID")
private Long productId;
@Schema(description = "推荐产品列表")
private java.util.List<Recommendation> recommendations;
}
}
4. 实现异步控制器
import com.alibaba.fastjson2.JSON;
import com.google.common.collect.Maps;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/products")
@RequiredArgsConstructor
@Slf4j
@Tag(name = "产品接口", description = "提供产品相关信息查询")
public class ProductController {
private final ProductService productService;
/**
* 获取产品详情(包含基本信息、评价和推荐)
*/
@GetMapping("/{productId}/detail")
@Operation(summary = "获取产品详情", description = "查询产品完整信息,包括基本信息、评价和推荐")
public CompletableFuture<Map<String, Object>> getProductDetail(
@Parameter(description = "产品ID", required = true)
@PathVariable Long productId) {
log.info("接收产品详情查询请求, productId: {}", productId);
// 并行查询产品信息
CompletableFuture<ProductService.ProductInfo> infoFuture = productService.getProductInfo(productId);
CompletableFuture<ProductService.ReviewList> reviewFuture = productService.getProductReviews(productId);
CompletableFuture<ProductService.RecommendationList> recommendationFuture = productService.getRecommendations(productId);
// 汇总结果
return CompletableFuture.allOf(infoFuture, reviewFuture, recommendationFuture)
.thenApply(v -> {
Map<String, Object> result = Maps.newHashMap();
try {
result.put("basicInfo", infoFuture.get());
result.put("reviews", reviewFuture.get());
result.put("recommendations", recommendationFuture.get());
result.put("success", true);
log.info("产品详情查询完成, productId: {}", productId);
} catch (Exception e) {
log.error("产品详情查询异常", e);
result.put("success", false);
result.put("error", e.getMessage());
}
return result;
});
}
}
5. 启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class CompletablefutureDemoApplication {
public static void main(String[] args) {
SpringApplication.run(CompletablefutureDemoApplication.class, args);
}
}
这个 Spring Boot 示例展示了如何利用 CompletableFuture 实现高效的异步接口。通过并行调用三个服务,接口响应时间从约 1900ms(串行)减少到约 800ms(并行),大幅提升了用户体验。
CompletableFuture 虽然强大,但使用不当也会带来问题。以下是一些最佳实践和需要避免的陷阱:
CompletableFuture 为 Java 异步编程带来了革命性的变化,它提供了丰富的 API 来组合、串联和并行执行异步任务,同时拥有完善的异常处理机制。
掌握 CompletableFuture,不仅能写出更高效的代码,更能提升对 Java 并发编程的理解。希望本文能帮助你在异步编程的道路上更进一步,让你的 Java 代码真正 "飞" 起来。