首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >从回调地狱到异步王者:CompletableFuture 如何重构你的 Java 异步代码

从回调地狱到异步王者:CompletableFuture 如何重构你的 Java 异步代码

作者头像
果酱带你啃java
发布2026-04-14 13:23:08
发布2026-04-14 13:23:08
370
举报

引言:被低估的异步编程利器

在 Java 开发领域,CompletableFuture 就像一位深藏不露的高手。自 Java 8 引入以来,它彻底改变了异步编程的游戏规则,却仍有许多开发者停留在 Future 的时代,或在回调地狱中挣扎。

想象这样一个场景:用户下单后,你需要调用库存服务扣减库存、调用支付服务处理付款、调用物流服务创建物流单,最后发送通知给用户。如果用传统的同步方式,总耗时是四个服务耗时的总和;如果用 CompletableFuture 实现并行处理,总耗时将接近耗时最长的那个服务。这就是异步编程的魔力,也是 CompletableFuture 的价值所在。

本文将带你全面掌握 CompletableFuture,从基础 API 到高级实战,从底层原理到性能优化,让你真正理解并能灵活运用这一强大工具,写出高效、优雅的异步代码。

一、CompletableFuture 基础:超越 Future 的异步体验

1.1 为什么 Future 不够用了?

Java 5 引入的 Future 接口为异步编程打开了一扇门,但它的能力非常有限:

  • 无法手动完成任务
  • 不能链式处理多个异步操作
  • 缺乏异常处理机制
  • 获取结果只能通过阻塞或轮询
代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

@Slf4j
public class FutureLimitExample {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        // 提交任务
        Future<String> future = executor.submit(() -> {
            log.info("执行耗时任务");
            Thread.sleep(2000);
            return "任务结果";
        });

        // 必须阻塞等待结果,无法优雅处理
        log.info("等待结果...");
        String result = future.get(); // 阻塞在这里
        log.info("获取结果: {}", result);

        executor.shutdown();
    }
}
代码语言:javascript
复制

这种方式在处理复杂异步场景时显得力不从心,于是 CompletableFuture 应运而生。

1.2 CompletableFuture 的诞生与核心优势

CompletableFuture 实现了 Future 和 CompletionStage 两个接口,前者保证了与原有 Future 体系的兼容性,后者则提供了丰富的异步处理能力。其核心优势包括:

  • 支持链式调用,轻松组合多个异步操作
  • 提供丰富的异常处理机制
  • 可以手动完成任务
  • 支持非阻塞地处理异步结果
  • 提供多种并行执行和组合的方式

1.3 创建 CompletableFuture 的四种方式

CompletableFuture 提供了多种创建方式,适用于不同场景:

代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class CompletableFutureCreation {
    // 自定义线程池,推荐使用而非默认线程池
    private static final ExecutorService CUSTOM_EXECUTOR = Executors.newFixedThreadPool(3);

    public static void main(String[] args) throws Exception {
        // 1. 无返回值,使用默认线程池
        CompletableFuture.runAsync(() -> {
            log.info("执行无返回值的异步任务");
        });

        // 2. 无返回值,使用自定义线程池
        CompletableFuture.runAsync(() -> {
            log.info("使用自定义线程池执行无返回值任务");
        }, CUSTOM_EXECUTOR);

        // 3. 有返回值,使用默认线程池
        CompletableFuture<String> supplyDefault = CompletableFuture.supplyAsync(() -> {
            log.info("执行有返回值的异步任务");
            return "任务结果";
        });

        // 4. 有返回值,使用自定义线程池
        CompletableFuture<String> supplyCustom = CompletableFuture.supplyAsync(() -> {
            log.info("使用自定义线程池执行有返回值任务");
            return "自定义线程池任务结果";
        }, CUSTOM_EXECUTOR);

        // 等待所有任务完成
        CompletableFuture.allOf(supplyDefault, supplyCustom).join();
        log.info("supplyDefault结果: {}", supplyDefault.get());
        log.info("supplyCustom结果: {}", supplyCustom.get());

        CUSTOM_EXECUTOR.shutdown();
    }
}
代码语言:javascript
复制

注意:生产环境中应始终使用自定义线程池,避免使用默认的 ForkJoinPool.commonPool (),后者是 JVM 共享的,可能受到其他组件影响。

二、核心方法详解:链式调用与结果处理

CompletableFuture 的强大之处在于其丰富的实例方法,这些方法可以分为几大类:结果处理、任务组合、异常处理等。掌握这些方法是灵活运用 CompletableFuture 的关键。

2.1 结果处理的三种方式

当异步任务完成后,我们通常需要对结果进行处理,CompletableFuture 提供了三种常用方式:

  • thenApply:接收上一个任务的结果,返回新的结果
  • thenAccept:接收上一个任务的结果,不返回新结果(消费结果)
  • thenRun:不关心上一个任务的结果,仅在任务完成后执行
代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;

@Slf4j
public class ResultHandlingMethods {
    public static void main(String[] args) {
        // thenApply:转换结果
        CompletableFuture<Integer> thenApplyFuture = CompletableFuture.supplyAsync(() -> {
            log.info("计算原始值");
            return 10;
        }).thenApply(original -> {
            log.info("将原始值 {} 乘以 2", original);
            return original * 2;
        });

        // thenAccept:消费结果
        CompletableFuture<Void> thenAcceptFuture = thenApplyFuture.thenAccept(result -> {
            log.info("thenApply的结果是: {}", result);
        });

        // thenRun:执行后续操作
        CompletableFuture<Void> thenRunFuture = thenAcceptFuture.thenRun(() -> {
            log.info("所有操作完成,执行最终处理");
        });

        // 等待所有操作完成
        thenRunFuture.join();
    }
}
代码语言:javascript
复制

2.2 异步处理的关键:*Async 方法

上面介绍的方法默认在当前线程(可能是任务执行线程)中执行后续操作。如果希望后续操作也在异步线程中执行,需要使用它们的异步版本:

  • thenApplyAsync
  • thenAcceptAsync
  • thenRunAsync

这些异步方法有两个重载版本:一个使用默认线程池,一个可以指定线程池。

代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j
public class AsyncProcessingMethods {
    private static final ExecutorService PROCESS_EXECUTOR = Executors.newSingleThreadExecutor(r -> 
        new Thread(r, "process-thread")
    );

    public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            log.info("执行任务的线程: {}", Thread.currentThread().getName());
            return "原始结果";
        }).thenApplyAsync(result -> {
            log.info("处理结果的线程: {}", Thread.currentThread().getName());
            return result + " - 已处理";
        }, PROCESS_EXECUTOR); // 使用指定线程池处理

        log.info("最终结果: {}", future.join());
        PROCESS_EXECUTOR.shutdown();
    }
}
代码语言:javascript
复制

运行此代码会发现,任务执行和结果处理在不同的线程中进行,这对于避免阻塞任务线程非常重要。

2.3 任务组合:thenCompose 与 thenCombine

在实际开发中,我们经常需要组合多个异步任务。CompletableFuture 提供了两种主要方式:

  • thenCompose:用于组合两个依赖的任务,第二个任务依赖第一个任务的结果
  • thenCombine:用于组合两个独立的任务,等两个任务都完成后处理它们的结果

示例代码:

代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;

@Slf4j
public class TaskCombination {
    public static void main(String[] args) {
        // thenCompose:组合依赖任务
        CompletableFuture<String> composeFuture = getUserById(1001)
            .thenCompose(userId -> getOrderByUserId(userId));

        // thenCombine:组合独立任务
        CompletableFuture<String> combineFuture = getUserName(1001)
            .thenCombine(getUserAge(1001), (name, age) -> 
                String.format("用户信息: 姓名=%s, 年龄=%d", name, age)
            );

        log.info("thenCompose结果: {}", composeFuture.join());
        log.info("thenCombine结果: {}", combineFuture.join());
    }

    /**
     * 根据ID获取用户ID(模拟)
     * @param id 外部ID
     * @return 用户ID
     */
    private static CompletableFuture<Integer> getUserById(int id) {
        return CompletableFuture.supplyAsync(() -> {
            log.info("根据外部ID {} 获取用户ID", id);
            return 10001; // 模拟用户ID
        });
    }

    /**
     * 根据用户ID获取订单
     * @param userId 用户ID
     * @return 订单信息
     */
    private static CompletableFuture<String> getOrderByUserId(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            log.info("根据用户ID {} 获取订单", userId);
            return "订单信息: 用户=" + userId + ", 订单号=2023001";
        });
    }

    /**
     * 获取用户姓名
     * @param userId 用户ID
     * @return 用户名
     */
    private static CompletableFuture<String> getUserName(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            log.info("获取用户 {} 的姓名", userId);
            return "张三";
        });
    }

    /**
     * 获取用户年龄
     * @param userId 用户ID
     * @return 年龄
     */
    private static CompletableFuture<Integer> getUserAge(int userId) {
        return CompletableFuture.supplyAsync(() -> {
            log.info("获取用户 {} 的年龄", userId);
            return 30;
        });
    }
}
代码语言:javascript
复制

注意:thenCompose 和 thenApply 的区别在于,thenCompose 接收的函数返回值是 CompletableFuture,避免了嵌套的 CompletableFuture(CompletableFuture<CompletableFuture<T>>)。

2.4 多任务处理:allOf 与 anyOf

当需要处理多个异步任务时,CompletableFuture 提供了两个非常实用的静态方法:

  • allOf:等待所有任务完成
  • anyOf:等待任意一个任务完成
代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@Slf4j
public class MultipleTasksHandling {
    public static void main(String[] args) {
        // 创建三个不同耗时的任务
        CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException e) {
                log.error("任务1被中断", e);
            }
            log.info("任务1完成");
            return "结果1";
        });

        CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(1500);
            } catch (InterruptedException e) {
                log.error("任务2被中断", e);
            }
            log.info("任务2完成");
            return "结果2";
        });

        CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.MILLISECONDS.sleep(800);
            } catch (InterruptedException e) {
                log.error("任务3被中断", e);
            }
            log.info("任务3完成");
            return "结果3";
        });

        // 等待所有任务完成
        CompletableFuture<Void> allOfFuture = CompletableFuture.allOf(task1, task2, task3);
        allOfFuture.thenRun(() -> {
            log.info("所有任务都已完成");
            try {
                log.info("任务1结果: {}", task1.get());
                log.info("任务2结果: {}", task2.get());
                log.info("任务3结果: {}", task3.get());
            } catch (Exception e) {
                log.error("获取任务结果异常", e);
            }
        }).join();

        // 等待任意一个任务完成
        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(task1, task2, task3);
        anyOfFuture.thenAccept(result -> {
            log.info("有一个任务已完成,结果: {}", result);
        }).join();
    }
}
代码语言:javascript
复制

注意:allOf 返回 CompletableFuture<Void>,无法直接获取各个任务的结果,需要单独获取;anyOf 返回 CompletableFuture<Object>,结果是第一个完成的任务的结果。

2.5 异常处理:三大处理机制

异步任务中的异常处理至关重要,CompletableFuture 提供了三种主要的异常处理方式:

  • exceptionally:仅处理异常情况,返回默认值
  • handle:同时处理正常结果和异常,返回新的结果
  • whenComplete:同时处理正常结果和异常,不改变结果

示例代码:

代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;

@Slf4j
public class ExceptionHandling {
    public static void main(String[] args) {
        // exceptionally:仅处理异常
        CompletableFuture<String> exceptionallyFuture = CompletableFuture.supplyAsync(() -> {
            log.info("执行可能抛出异常的任务");
            if (true) {
                throw new RuntimeException("任务执行失败");
            }
            return "正常结果";
        }).exceptionally(ex -> {
            log.error("捕获到异常", ex);
            return "异常时的默认结果";
        });

        // handle:处理正常结果和异常
        CompletableFuture<String> handleFuture = CompletableFuture.supplyAsync(() -> {
            log.info("执行任务");
            // 模拟随机成功或失败
            if (Math.random() > 0.5) {
                throw new RuntimeException("随机失败");
            }
            return "正常结果";
        }).handle((result, ex) -> {
            if (ex != null) {
                log.error("handle捕获到异常", ex);
                return "handle处理异常后的结果";
            }
            return result + " - 已通过handle处理";
        });

        // whenComplete:处理完成事件,不改变结果
        CompletableFuture<String> whenCompleteFuture = CompletableFuture.supplyAsync(() -> {
            log.info("执行任务");
            return "正常结果";
        }).whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("whenComplete捕获到异常", ex);
            } else {
                log.info("whenComplete处理正常结果: {}", result);
            }
        });

        log.info("exceptionally结果: {}", exceptionallyFuture.join());
        log.info("handle结果: {}", handleFuture.join());
        log.info("whenComplete结果: {}", whenCompleteFuture.join());
    }
}
代码语言:javascript
复制

这三种方式的选择取决于具体需求:需要恢复异常并返回默认值用 exceptionally;需要统一处理正常和异常情况并转换结果用 handle;仅需要在完成时记录日志或清理资源用 whenComplete。

三、底层原理:CompletableFuture 的工作机制

要真正掌握 CompletableFuture,理解其底层原理是必要的。本节将深入探讨 CompletableFuture 的实现机制。

3.1 状态管理机制

CompletableFuture 内部通过一个 volatile 变量维护任务状态,主要状态包括:

  • NEW:初始状态
  • COMPLETING:正在完成(中间状态)
  • NORMAL:正常完成
  • EXCEPTIONAL:异常完成
  • CANCELLED:已取消
  • INTERRUPTED:已中断

状态转换关系如下:

这些状态通过 Unsafe 类进行 CAS 操作来保证原子性,确保多线程环境下的状态一致性。

3.2 依赖链与回调触发

CompletableFuture 的链式调用是通过维护依赖链实现的。每个 CompletableFuture 可以有多个依赖的 CompletableFuture(称为 "依赖者"),当一个 CompletableFuture 完成时,会通知所有依赖者执行。

这种依赖关系通过一个栈结构存储,当调用 thenApply 等方法时,会创建新的 CompletableFuture 并将其添加到当前任务的依赖栈中。

当任务完成时,CompletableFuture 会遍历依赖栈,依次触发每个依赖任务的执行,形成链式反应。这种设计确保了异步操作的高效串联。

3.3 线程池的选择策略

CompletableFuture 在执行异步操作时,线程池选择遵循以下规则:

  1. 如果方法指定了 Executor,则使用指定的线程池
  2. 如果没有指定 Executor,则使用 ForkJoinPool.commonPool ()
  3. 非异步方法(如 thenApply)默认在完成当前任务的线程中执行

ForkJoinPool.commonPool () 是 JVM 级别的共享线程池,其默认并行度为 CPU 核心数减 1。可以通过系统属性调整:

代码语言:javascript
复制
-Djava.util.concurrent.ForkJoinPool.common.parallelism=4
代码语言:javascript
复制

但在生产环境中,强烈建议使用自定义线程池,以便更好地控制资源使用和任务隔离。

四、实战应用:从理论到实践

理论知识需要结合实际应用才能真正内化。本节将通过几个典型场景,展示 CompletableFuture 的实战价值。

4.1 并行查询多个服务并汇总结果

在微服务架构中,我们经常需要调用多个独立服务,然后汇总结果。使用 CompletableFuture 可以并行执行这些调用,大幅提升效率。

代码语言:javascript
复制
import com.alibaba.fastjson2.JSON;
import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@Slf4j
public class ParallelServiceCall {
    // 自定义线程池
    private static final ExecutorService SERVICE_EXECUTOR = Executors.newFixedThreadPool(5);

    public static void main(String[] args) {
        long startTime = System.currentTimeMillis();

        // 并行调用三个独立服务
        CompletableFuture<UserInfo> userInfoFuture = CompletableFuture.supplyAsync(() -> 
            getUserInfo(1001), SERVICE_EXECUTOR
        );

        CompletableFuture<OrderList> orderListFuture = CompletableFuture.supplyAsync(() -> 
            getOrderList(1001), SERVICE_EXECUTOR
        );

        CompletableFuture<AddressList> addressListFuture = CompletableFuture.supplyAsync(() -> 
            getAddressList(1001), SERVICE_EXECUTOR
        );

        // 汇总结果
        CompletableFuture<Map<String, Object>> resultFuture = CompletableFuture.allOf(
                userInfoFuture, orderListFuture, addressListFuture
            ).thenApply(v -> {
                Map<String, Object> resultMap = Maps.newHashMap();
                try {
                    resultMap.put("userInfo", userInfoFuture.get());
                    resultMap.put("orders", orderListFuture.get());
                    resultMap.put("addresses", addressListFuture.get());
                } catch (Exception e) {
                    log.error("汇总结果异常", e);
                }
                return resultMap;
            });

        // 获取最终结果
        Map<String, Object> result = resultFuture.join();
        log.info("汇总结果: {}", JSON.toJSONString(result));

        long endTime = System.currentTimeMillis();
        log.info("总耗时: {}ms", endTime - startTime);

        SERVICE_EXECUTOR.shutdown();
    }

    /**
     * 获取用户信息
     */
    private static UserInfo getUserInfo(int userId) {
        log.info("查询用户信息,userId: {}", userId);
        try {
            // 模拟服务调用耗时
            TimeUnit.MILLISECONDS.sleep(1000);
        } catch (InterruptedException e) {
            log.error("查询用户信息中断", e);
        }
        return new UserInfo(userId, "张三", 30);
    }

    /**
     * 获取订单列表
     */
    private static OrderList getOrderList(int userId) {
        log.info("查询订单列表,userId: {}", userId);
        try {
            // 模拟服务调用耗时
            TimeUnit.MILLISECONDS.sleep(1500);
        } catch (InterruptedException e) {
            log.error("查询订单列表中断", e);
        }
        OrderList orderList = new OrderList();
        orderList.setUserId(userId);
        orderList.setOrders(CollectionUtils.newArrayList(
            new Order(10001, "商品A"),
            new Order(10002, "商品B")
        ));
        return orderList;
    }

    /**
     * 获取地址列表
     */
    private static AddressList getAddressList(int userId) {
        log.info("查询地址列表,userId: {}", userId);
        try {
            // 模拟服务调用耗时
            TimeUnit.MILLISECONDS.sleep(800);
        } catch (InterruptedException e) {
            log.error("查询地址列表中断", e);
        }
        AddressList addressList = new AddressList();
        addressList.setUserId(userId);
        addressList.setAddresses(CollectionUtils.newArrayList(
            new Address(20001, "北京市"),
            new Address(20002, "上海市")
        ));
        return addressList;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class UserInfo {
        private int userId;
        private String username;
        private int age;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Order {
        private int orderId;
        private String productName;
    }

    @Data
    @NoArgsConstructor
    static class OrderList {
        private int userId;
        private java.util.List<Order> orders;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class Address {
        private int addressId;
        private String address;
    }

    @Data
    @NoArgsConstructor
    static class AddressList {
        private int userId;
        private java.util.List<Address> addresses;
    }
}
代码语言:javascript
复制

这个示例中,三个服务调用并行执行,总耗时约 1500ms(等于耗时最长的服务),而不是三个服务耗时之和(约 3300ms),效率提升非常明显。

4.2 服务调用超时控制与降级处理

在分布式系统中,服务调用超时是常见问题。CompletableFuture 提供了优雅的超时控制方案:

代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@Slf4j
public class TimeoutHandling {
    // 自定义线程池
    private static final java.util.concurrent.ExecutorService EXECUTOR = 
        java.util.concurrent.Executors.newFixedThreadPool(3);

    public static void main(String[] args) {
        // 方式1: 使用get(timeout, unit)
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            return callExternalService("服务A", 3000);
        }, EXECUTOR);

        try {
            String result1 = future1.get(2, TimeUnit.SECONDS);
            log.info("服务A结果: {}", result1);
        } catch (Exception e) {
            log.error("服务A调用超时或异常", e);
            // 执行降级策略
            log.info("服务A降级处理: 返回缓存数据");
        }

        // 方式2: 使用orTimeout (Java 9+)
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            return callExternalService("服务B", 3000);
        }, EXECUTOR)
        .orTimeout(2, TimeUnit.SECONDS)
        .exceptionally(ex -> {
            log.error("服务B调用异常", ex);
            return "服务B降级数据";
        });

        log.info("服务B结果: {}", future2.join());

        // 方式3: 使用completeOnTimeout (Java 9+)
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            return callExternalService("服务C", 3000);
        }, EXECUTOR)
        .completeOnTimeout("服务C默认数据", 2, TimeUnit.SECONDS);

        log.info("服务C结果: {}", future3.join());

        EXECUTOR.shutdown();
    }

    /**
     * 调用外部服务
     * @param serviceName 服务名称
     * @param delay 延迟时间(ms)
     * @return 服务返回结果
     */
    private static String callExternalService(String serviceName, int delay) {
        log.info("调用外部服务: {}", serviceName);
        try {
            TimeUnit.MILLISECONDS.sleep(delay);
        } catch (InterruptedException e) {
            log.error("服务 {} 调用被中断", serviceName, e);
            Thread.currentThread().interrupt();
        }
        return serviceName + "返回结果";
    }
}
代码语言:javascript
复制

注意:orTimeout 和 completeOnTimeout 是 Java 9 引入的方法,如果使用 Java 8,可通过 thenApplyAsync 结合定时任务实现类似功能。

4.3 Spring Boot 中的异步接口实现

在 Spring Boot 应用中,CompletableFuture 可以与 @Async 注解结合,实现高效的异步接口:

1. 首先添加必要依赖 (pom.xml)

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.1</version>
        <relativePath/>
    </parent>
    <groupId>com.example</groupId>
    <artifactId>completablefuture-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>completablefuture-demo</name>

    <properties>
        <java.version>17</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.32</version>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>32.1.3-jre</version>
        </dependency>

        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
代码语言:javascript
复制

2. 配置异步线程池

代码语言:javascript
复制
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@EnableAsync
public class AsyncConfig {

    /**
     * 自定义异步线程池
     */
    @Bean(name = "apiExecutor")
    public Executor apiExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数
        executor.setCorePoolSize(10);
        // 最大线程数
        executor.setMaxPoolSize(20);
        // 队列容量
        executor.setQueueCapacity(100);
        // 线程名称前缀
        executor.setThreadNamePrefix("api-async-");
        // 线程空闲时间
        executor.setKeepAliveSeconds(60);
        // 拒绝策略:当线程和队列都满时,由提交任务的线程执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }
}
代码语言:javascript
复制

3. 实现异步服务

代码语言:javascript
复制
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

@Service
@Slf4j
public class ProductService {

    /**
     * 获取产品基本信息
     * @param productId 产品ID
     * @return 产品基本信息
     */
    @Async("apiExecutor")
    public CompletableFuture<ProductInfo> getProductInfo(Long productId) {
        log.info("查询产品基本信息, productId: {}", productId);
        try {
            // 模拟数据库查询耗时
            TimeUnit.MILLISECONDS.sleep(500);
        } catch (InterruptedException e) {
            log.error("查询产品信息中断", e);
            Thread.currentThread().interrupt();
            return CompletableFuture.failedFuture(e);
        }

        // 模拟返回产品信息
        ProductInfo info = new ProductInfo();
        info.setProductId(productId);
        info.setName("高性能笔记本电脑");
        info.setPrice(5999.99);
        info.setStock(100);

        return CompletableFuture.completedFuture(info);
    }

    /**
     * 获取产品评价
     * @param productId 产品ID
     * @return 产品评价列表
     */
    @Async("apiExecutor")
    public CompletableFuture<ReviewList> getProductReviews(Long productId) {
        log.info("查询产品评价, productId: {}", productId);
        try {
            // 模拟数据库查询耗时
            TimeUnit.MILLISECONDS.sleep(800);
        } catch (InterruptedException e) {
            log.error("查询产品评价中断", e);
            Thread.currentThread().interrupt();
            return CompletableFuture.failedFuture(e);
        }

        // 模拟返回评价列表
        ReviewList reviewList = new ReviewList();
        reviewList.setProductId(productId);
        reviewList.setReviews(org.springframework.util.CollectionUtils.newArrayList(
            new Review(1L, "非常好用", 5),
            new Review(2L, "性能出色", 5),
            new Review(3L, "性价比高", 4)
        ));

        return CompletableFuture.completedFuture(reviewList);
    }

    /**
     * 获取产品推荐
     * @param productId 产品ID
     * @return 推荐产品列表
     */
    @Async("apiExecutor")
    public CompletableFuture<RecommendationList> getRecommendations(Long productId) {
        log.info("查询推荐产品, productId: {}", productId);
        try {
            // 模拟数据库查询耗时
            TimeUnit.MILLISECONDS.sleep(600);
        } catch (InterruptedException e) {
            log.error("查询推荐产品中断", e);
            Thread.currentThread().interrupt();
            return CompletableFuture.failedFuture(e);
        }

        // 模拟返回推荐列表
        RecommendationList recommendationList = new RecommendationList();
        recommendationList.setProductId(productId);
        recommendationList.setRecommendations(org.springframework.util.CollectionUtils.newArrayList(
            new Recommendation(1002L, "轻薄笔记本电脑", 4999.99),
            new Recommendation(1003L, "游戏笔记本电脑", 7999.99)
        ));

        return CompletableFuture.completedFuture(recommendationList);
    }

    @Data
    @NoArgsConstructor
    @Schema(description = "产品基本信息")
    public static class ProductInfo {
        @Schema(description = "产品ID")
        private Long productId;
        @Schema(description = "产品名称")
        private String name;
        @Schema(description = "产品价格")
        private Double price;
        @Schema(description = "库存数量")
        private Integer stock;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Schema(description = "产品评价")
    public static class Review {
        @Schema(description = "评价ID")
        private Long reviewId;
        @Schema(description = "评价内容")
        private String content;
        @Schema(description = "评分(1-5)")
        private Integer rating;
    }

    @Data
    @NoArgsConstructor
    @Schema(description = "产品评价列表")
    public static class ReviewList {
        @Schema(description = "产品ID")
        private Long productId;
        @Schema(description = "评价列表")
        private java.util.List<Review> reviews;
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @Schema(description = "推荐产品")
    public static class Recommendation {
        @Schema(description = "产品ID")
        private Long productId;
        @Schema(description = "产品名称")
        private String name;
        @Schema(description = "产品价格")
        private Double price;
    }

    @Data
    @NoArgsConstructor
    @Schema(description = "推荐产品列表")
    public static class RecommendationList {
        @Schema(description = "当前产品ID")
        private Long productId;
        @Schema(description = "推荐产品列表")
        private java.util.List<Recommendation> recommendations;
    }
}
代码语言:javascript
复制

4. 实现异步控制器

代码语言:javascript
复制
import com.alibaba.fastjson2.JSON;
import com.google.common.collect.Maps;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

@RestController
@RequestMapping("/products")
@RequiredArgsConstructor
@Slf4j
@Tag(name = "产品接口", description = "提供产品相关信息查询")
public class ProductController {

    private final ProductService productService;

    /**
     * 获取产品详情(包含基本信息、评价和推荐)
     */
    @GetMapping("/{productId}/detail")
    @Operation(summary = "获取产品详情", description = "查询产品完整信息,包括基本信息、评价和推荐")
    public CompletableFuture<Map<String, Object>> getProductDetail(
            @Parameter(description = "产品ID", required = true)
            @PathVariable Long productId) {

        log.info("接收产品详情查询请求, productId: {}", productId);

        // 并行查询产品信息
        CompletableFuture<ProductService.ProductInfo> infoFuture = productService.getProductInfo(productId);
        CompletableFuture<ProductService.ReviewList> reviewFuture = productService.getProductReviews(productId);
        CompletableFuture<ProductService.RecommendationList> recommendationFuture = productService.getRecommendations(productId);

        // 汇总结果
        return CompletableFuture.allOf(infoFuture, reviewFuture, recommendationFuture)
            .thenApply(v -> {
                Map<String, Object> result = Maps.newHashMap();
                try {
                    result.put("basicInfo", infoFuture.get());
                    result.put("reviews", reviewFuture.get());
                    result.put("recommendations", recommendationFuture.get());
                    result.put("success", true);
                    log.info("产品详情查询完成, productId: {}", productId);
                } catch (Exception e) {
                    log.error("产品详情查询异常", e);
                    result.put("success", false);
                    result.put("error", e.getMessage());
                }
                return result;
            });
    }
}
代码语言:javascript
复制

5. 启动类

代码语言:javascript
复制
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class CompletablefutureDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(CompletablefutureDemoApplication.class, args);
    }
}
代码语言:javascript
复制

这个 Spring Boot 示例展示了如何利用 CompletableFuture 实现高效的异步接口。通过并行调用三个服务,接口响应时间从约 1900ms(串行)减少到约 800ms(并行),大幅提升了用户体验。

五、最佳实践与避坑指南

CompletableFuture 虽然强大,但使用不当也会带来问题。以下是一些最佳实践和需要避免的陷阱:

5.1 线程池使用规范

  1. 始终使用自定义线程池:避免使用默认的 ForkJoinPool.commonPool (),它是 JVM 共享的,可能受其他组件影响
  2. 合理配置线程池参数:根据业务场景调整核心线程数、最大线程数和队列容量
  3. 设置有意义的线程名称:便于问题排查和日志分析
  4. 使用合适的拒绝策略:根据业务重要性选择 AbortPolicy、CallerRunsPolicy 等

5.2 异常处理原则

  1. 不要忽略任何异常:异步任务的异常如果不处理,可能导致难以排查的问题
  2. 根据场景选择异常处理方式:需要恢复用 exceptionally,需要转换用 handle,仅记录用 whenComplete
  3. 组合任务的异常处理:allOf/anyOf 的异常需要单独处理每个子任务的异常

5.3 性能优化建议

  1. 避免过度并行:根据 CPU 核心数和 IO 密集型 / CPU 密集型任务特性合理设置并行度
  2. 减少线程切换:可以在非异步方法中处理简单逻辑,避免不必要的线程切换
  3. 合理使用异步方法:IO 密集型任务更适合异步处理,CPU 密集型任务过度并行可能适得其反

5.4 常见陷阱与解决方案

  1. 阻塞获取结果:避免在异步线程中使用 get () 或 join () 阻塞等待,可能导致线程池耗尽
  2. 任务依赖循环:避免创建循环依赖的 CompletableFuture,可能导致死锁
  3. 内存泄漏风险:长时间运行的异步任务可能持有不必要的资源引用,需要注意及时释放
  4. 过度链式调用:过长的调用链会降低可读性,可适当拆分

六、总结与展望

CompletableFuture 为 Java 异步编程带来了革命性的变化,它提供了丰富的 API 来组合、串联和并行执行异步任务,同时拥有完善的异常处理机制。

掌握 CompletableFuture,不仅能写出更高效的代码,更能提升对 Java 并发编程的理解。希望本文能帮助你在异步编程的道路上更进一步,让你的 Java 代码真正 "飞" 起来。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-09-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 果酱带你啃java 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言:被低估的异步编程利器
  • 一、CompletableFuture 基础:超越 Future 的异步体验
    • 1.1 为什么 Future 不够用了?
    • 1.2 CompletableFuture 的诞生与核心优势
    • 1.3 创建 CompletableFuture 的四种方式
  • 二、核心方法详解:链式调用与结果处理
    • 2.1 结果处理的三种方式
    • 2.2 异步处理的关键:*Async 方法
    • 2.3 任务组合:thenCompose 与 thenCombine
    • 2.4 多任务处理:allOf 与 anyOf
    • 2.5 异常处理:三大处理机制
  • 三、底层原理:CompletableFuture 的工作机制
    • 3.1 状态管理机制
    • 3.2 依赖链与回调触发
    • 3.3 线程池的选择策略
  • 四、实战应用:从理论到实践
    • 4.1 并行查询多个服务并汇总结果
    • 4.2 服务调用超时控制与降级处理
    • 4.3 Spring Boot 中的异步接口实现
  • 五、最佳实践与避坑指南
    • 5.1 线程池使用规范
    • 5.2 异常处理原则
    • 5.3 性能优化建议
    • 5.4 常见陷阱与解决方案
  • 六、总结与展望
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档