首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >从 300 到 1000+TPS:保险订单系统的分布式锁与消息队列架构逆袭

从 300 到 1000+TPS:保险订单系统的分布式锁与消息队列架构逆袭

作者头像
果酱带你啃java
发布2026-04-14 12:58:14
发布2026-04-14 12:58:14
490
举报

保险订单处理是保险业务的核心环节,其性能与稳定性直接影响用户体验和业务收益。在营销活动期间,订单量可能激增 10 倍以上,如何在保证数据一致性的前提下,支撑 1000+TPS 的订单峰值,是保险科技领域的典型技术挑战。

本文将深度剖析某大型财产保险公司的订单系统架构演进历程,详细阐述如何通过 "分布式锁 + 消息队列" 的组合方案,结合领域驱动设计与异步化改造,将订单处理能力从 300 TPS 提升至稳定支持 1500 TPS,同时将订单处理成功率从 98.2% 提升至 99.99%,实现了业务高速增长与系统稳定性的双赢。

一、保险订单系统的特殊性与挑战

保险订单不同于普通电商订单,其业务复杂性和合规性要求带来了独特的技术挑战。

1.1 保险订单的业务特性

保险订单处理具有以下鲜明特点,直接影响系统架构设计:

  • 强事务性:保险订单涉及保费计算、库存锁定(部分短期险)、支付处理、保单生成等多个步骤,必须保证数据一致性
  • 业务规则复杂:保费计算需考虑年龄、职业、健康状况等数十个因子;核保规则可能实时调整
  • 合规性要求高:订单数据必须完整留存 5-10 年;关键操作需审计追踪;部分险种需实时上报监管系统
  • 峰值波动大:在 "6・18"、"双 11" 等营销节点,订单量可能达到日常的 10-20 倍;自然灾害后相关险种订单可能激增
  • 处理链路长:从用户下单到保单生效,可能涉及支付网关、核保系统、反欺诈系统、再保险系统等多个外部依赖

1.2 性能瓶颈分析

在采用分布式锁与消息队列架构前,该保险公司的订单系统面临以下核心问题:

  1. 并发处理能力不足
    • 高峰期订单处理 TPS 仅 300 左右
    • 订单响应时间中位数达 800ms,99 分位达 3500ms
    • 日均订单处理延迟超 30 分钟的情况达 5-8 次
  2. 数据一致性问题
    • 每日因并发导致的订单状态不一致约 20-30 笔
    • 重复支付率约 0.3%,需人工介入处理
    • 库存超卖偶有发生,尤其在热门短期险产品上
  3. 系统稳定性隐患
    • 高峰期数据库连接池频繁耗尽
    • 支付回调处理不及时,导致订单状态同步延迟
    • 单个外部系统故障可能引发整体订单处理链路阻塞

1.3 技术挑战的核心矛盾

保险订单系统设计面临的核心矛盾体现在三个方面:

  • 一致性与性能的矛盾:强事务要求往往意味着性能损失,如何在保证核心数据一致的前提下提升处理能力
  • 同步与异步的平衡:哪些步骤必须同步处理以保证用户体验,哪些可以异步化以提升吞吐量
  • 可用性与安全性的取舍:在系统异常时,如何在保证资金安全的前提下,尽可能减少业务中断

这些矛盾在保险领域尤为突出,因为订单处理直接关系到资金安全和用户保障,任何失误都可能引发理赔纠纷和监管风险。

二、整体架构设计与技术选型

基于保险订单的业务特性和性能挑战,我们设计了 "分布式锁保证核心一致性 + 消息队列实现异步削峰" 的架构方案。

2.1 系统架构图

代码语言:javascript
复制

2.2 架构说明

整个订单处理流程分为同步和异步两个部分:

  1. 同步处理(核心链路,耗时 < 300ms)
    • 接收订单请求,参数校验
    • 分布式锁获取,防止重复提交
    • 基础保费计算与库存检查
    • 生成预订单,状态标记为 "待支付"
    • 返回支付链接 / 二维码给用户
  2. 异步处理(非核心链路)
    • 支付结果回调处理
    • 订单状态更新
    • 核保规则校验
    • 保单生成与存储
    • 消息通知(短信、微信等)
    • 监管数据上报

通过这种拆分,将核心链路的响应时间控制在 300ms 以内,同时通过异步处理应对流量峰值。

2.3 关键技术选型

2.3.1 分布式锁:Redis + Redisson

选择 Redis+Redisson 的理由:

  • 高性能:Redis 单节点可支持 10 万级 QPS,满足分布式锁的高频获取与释放需求
  • 可靠性:Redisson 实现了自动续期机制(Watch Dog),避免锁超时释放
  • 灵活性:支持公平锁、可重入锁、红锁等多种锁类型,适应不同业务场景
  • 易用性:Redisson 提供了简洁的 API,简化了分布式锁的使用复杂度

版本选择:Redis 7.2.4 + Redisson 3.24.0

2.3.2 消息队列:Apache RocketMQ

选择 RocketMQ 而非 Kafka 或 RabbitMQ 的原因:

  • 事务消息:支持分布式事务消息,完美解决订单与支付的最终一致性问题
  • 重试机制:完善的消息重试策略,确保订单处理的可靠性
  • 定时消息:支持订单超时关闭等场景的定时处理
  • 集群能力:成熟的集群部署方案,支持水平扩展
  • 监控体系:提供完善的监控指标,便于问题排查

版本选择:RocketMQ 5.2.0

2.3.3 其他核心组件
  • ORM 框架:MyBatis-Plus 3.5.5,简化数据库操作
  • 微服务框架:Spring Cloud Alibaba 2022.0.0.0,提供服务发现与配置管理
  • 分布式事务:Seata 1.7.1,处理跨服务的事务一致性
  • 缓存:Caffeine 3.1.8 + Redis 7.2.4,缓解数据库压力
  • API 文档:SpringDoc OpenAPI 2.2.0(Swagger3),便于接口管理
  • 日志框架:Logback + SkyWalking 9.7.0,实现全链路追踪

三、数据库设计与领域模型

合理的数据库设计是系统高性能的基础,尤其是在高并发场景下。

3.1 数据库表设计

代码语言:javascript
复制
-- 订单主表
CREATE TABLE `insurance_order` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
  `order_no` varchar(64) NOT NULL COMMENT '订单编号,唯一',
  `user_id` bigint NOT NULL COMMENT '用户ID',
  `product_id` bigint NOT NULL COMMENT '产品ID',
  `product_name` varchar(255) NOT NULL COMMENT '产品名称',
  `premium` decimal(12,2) NOT NULL COMMENT '保费金额',
  `pay_amount` decimal(12,2) NOT NULL COMMENT '实付金额',
  `status` tinyint NOT NULL COMMENT '订单状态:0-初始化 1-待支付 2-已支付 3-已取消 4-已失效 5-核保中 6-核保通过 7-核保拒保',
  `payment_type` tinyint DEFAULT NULL COMMENT '支付方式:1-微信 2-支付宝 3-银行卡',
  `pay_time` datetime DEFAULT NULL COMMENT '支付时间',
  `expire_time` datetime NOT NULL COMMENT '订单过期时间',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `version` int NOT NULL DEFAULT 0 COMMENT '版本号,用于乐观锁',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_order_no` (`order_no`),
  KEY `idx_user_id` (`user_id`),
  KEY `idx_product_id_status` (`product_id`,`status`),
  KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='保险订单主表';

-- 订单详情表
CREATE TABLE `insurance_order_detail` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `order_id` bigint NOT NULL COMMENT '订单ID',
  `insured_name` varchar(64) NOT NULL COMMENT '被保险人姓名',
  `insured_id_card` varchar(32) NOT NULL COMMENT '被保险人身份证号',
  `insured_age` int NOT NULL COMMENT '被保险人年龄',
  `insured_gender` tinyint NOT NULL COMMENT '被保险人性别:1-男 2-女',
  `start_date` date NOT NULL COMMENT '保障开始日期',
  `end_date` date NOT NULL COMMENT '保障结束日期',
  `coverage_amount` decimal(16,2) NOT NULL COMMENT '保额',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_order_id` (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='保险订单详情表';

-- 支付记录表
CREATE TABLE `insurance_payment` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `order_id` bigint NOT NULL COMMENT '订单ID',
  `order_no` varchar(64) NOT NULL COMMENT '订单编号',
  `pay_no` varchar(64) NOT NULL COMMENT '支付单号',
  `pay_amount` decimal(12,2) NOT NULL COMMENT '支付金额',
  `payment_type` tinyint NOT NULL COMMENT '支付方式:1-微信 2-支付宝 3-银行卡',
  `status` tinyint NOT NULL COMMENT '支付状态:0-处理中 1-成功 2-失败',
  `pay_time` datetime DEFAULT NULL COMMENT '支付时间',
  `callback_data` text COMMENT '支付回调数据',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_pay_no` (`pay_no`),
  KEY `idx_order_id` (`order_id`),
  KEY `idx_order_no` (`order_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='保险支付记录表';

-- 订单操作日志表(用于审计和问题排查)
CREATE TABLE `insurance_order_operate_log` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `order_id` bigint NOT NULL COMMENT '订单ID',
  `order_no` varchar(64) NOT NULL COMMENT '订单编号',
  `operate_type` tinyint NOT NULL COMMENT '操作类型:1-创建订单 2-支付 3-取消 4-核保 5-生成保单',
  `before_status` tinyint DEFAULT NULL COMMENT '操作前状态',
  `after_status` tinyint DEFAULT NULL COMMENT '操作后状态',
  `operator` varchar(64) NOT NULL COMMENT '操作人',
  `operate_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '操作时间',
  `remark` varchar(512) DEFAULT NULL COMMENT '备注',
  `ext_data` text COMMENT '扩展数据',
  PRIMARY KEY (`id`),
  KEY `idx_order_id` (`order_id`),
  KEY `idx_order_no` (`order_no`),
  KEY `idx_operate_time` (`operate_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单操作日志表';

-- 产品库存表(针对有限售的保险产品)
CREATE TABLE `insurance_product_stock` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `product_id` bigint NOT NULL COMMENT '产品ID',
  `total_stock` int NOT NULL COMMENT '总库存',
  `available_stock` int NOT NULL COMMENT '可用库存',
  `locked_stock` int NOT NULL COMMENT '锁定库存',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `version` int NOT NULL DEFAULT 0 COMMENT '版本号,用于乐观锁',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_product_id` (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='保险产品库存表';
代码语言:javascript
复制

3.2 领域模型设计

基于 DDD 思想设计核心领域模型:

代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.*;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
 * 保险订单实体
 *
 * @author ken
 */
@Data
@TableName("insurance_order")
@Schema(description = "保险订单主信息")
public class InsuranceOrder {
    @TableId(type = IdType.AUTO)
    @Schema(description = "订单ID")
    private Long id;

    @Schema(description = "订单编号,唯一")
    private String orderNo;

    @Schema(description = "用户ID")
    private Long userId;

    @Schema(description = "产品ID")
    private Long productId;

    @Schema(description = "产品名称")
    private String productName;

    @Schema(description = "保费金额")
    private BigDecimal premium;

    @Schema(description = "实付金额")
    private BigDecimal payAmount;

    @Schema(description = "订单状态:0-初始化 1-待支付 2-已支付 3-已取消 4-已失效 5-核保中 6-核保通过 7-核保拒保")
    private Integer status;

    @Schema(description = "支付方式:1-微信 2-支付宝 3-银行卡")
    private Integer paymentType;

    @Schema(description = "支付时间")
    private LocalDateTime payTime;

    @Schema(description = "订单过期时间")
    private LocalDateTime expireTime;

    @Schema(description = "创建时间")
    @TableField(fill = FieldFill.INSERT)
    private LocalDateTime createTime;

    @Schema(description = "更新时间")
    @TableField(fill = FieldFill.INSERT_UPDATE)
    private LocalDateTime updateTime;

    @Schema(description = "版本号,用于乐观锁")
    @Version
    private Integer version;
}
代码语言:javascript
复制

代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.*;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;

/**
 * 订单详情实体
 *
 * @author ken
 */
@Data
@TableName("insurance_order_detail")
@Schema(description = "保险订单详情")
public class InsuranceOrderDetail {
    @TableId(type = IdType.AUTO)
    @Schema(description = "ID")
    private Long id;

    @Schema(description = "订单ID")
    private Long orderId;

    @Schema(description = "被保险人姓名")
    private String insuredName;

    @Schema(description = "被保险人身份证号")
    private String insuredIdCard;

    @Schema(description = "被保险人年龄")
    private Integer insuredAge;

    @Schema(description = "被保险人性别:1-男 2-女")
    private Integer insuredGender;

    @Schema(description = "保障开始日期")
    private LocalDate startDate;

    @Schema(description = "保障结束日期")
    private LocalDate endDate;

    @Schema(description = "保额")
    private BigDecimal coverageAmount;

    @Schema(description = "创建时间")
    @TableField(fill = FieldFill.INSERT)
    private LocalDateTime createTime;

    @Schema(description = "更新时间")
    @TableField(fill = FieldFill.INSERT_UPDATE)
    private LocalDateTime updateTime;
}
代码语言:javascript
复制
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;

/**
 * 订单创建DTO
 *
 * @author ken
 */
@Data
@Schema(description = "订单创建请求参数")
public class OrderCreateDTO {
    @Schema(description = "产品ID", required = true, example = "1001")
    private Long productId;

    @Schema(description = "被保险人姓名", required = true)
    private String insuredName;

    @Schema(description = "被保险人身份证号", required = true)
    private String insuredIdCard;

    @Schema(description = "被保险人年龄", required = true)
    private Integer insuredAge;

    @Schema(description = "被保险人性别:1-男 2-女", required = true)
    private Integer insuredGender;

    @Schema(description = "保障开始日期", required = true)
    private LocalDate startDate;

    @Schema(description = "保障结束日期", required = true)
    private LocalDate endDate;

    @Schema(description = "支付方式:1-微信 2-支付宝 3-银行卡", required = true)
    private Integer paymentType;
}

四、分布式锁设计与实现

分布式锁是解决高并发下数据一致性问题的关键技术,在保险订单系统中主要用于防止重复下单、库存超卖等场景。

4.1 分布式锁核心组件

基于 Redisson 实现分布式锁工具类:

代码语言:javascript
复制
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;

/**
 * 分布式锁工具类
 * 基于Redisson实现,支持可重入锁、公平锁、自动续期等特性
 *
 * @author ken
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class DistributedLockManager {

    private final RedissonClient redissonClient;

    /**
     * 获取分布式锁
     *
     * @param lockKey 锁键
     * @param waitTime 等待时间
     * @param leaseTime 锁持有时间
     * @param timeUnit 时间单位
     * @return 锁对象,如果获取失败则为null
     */
    public RLock lock(String lockKey, long waitTime, long leaseTime, TimeUnit timeUnit) {
        if (!StringUtils.hasText(lockKey)) {
            log.warn("Lock key is empty");
            return null;
        }

        RLock lock = redissonClient.getLock(lockKey);
        try {
            // 尝试获取锁
            boolean locked = lock.tryLock(waitTime, leaseTime, timeUnit);
            if (locked) {
                log.debug("Acquired distributed lock, key: {}", lockKey);
                return lock;
            } else {
                log.warn("Failed to acquire distributed lock, key: {}", lockKey);
                return null;
            }
        } catch (InterruptedException e) {
            log.error("Interrupted while acquiring lock, key: {}", lockKey, e);
            Thread.currentThread().interrupt();
            return null;
        } catch (Exception e) {
            log.error("Error acquiring lock, key: {}", lockKey, e);
            return null;
        }
    }

    /**
     * 获取分布式锁,使用默认时间设置
     * 等待时间3秒,持有时间30秒
     *
     * @param lockKey 锁键
     * @return 锁对象,如果获取失败则为null
     */
    public RLock lock(String lockKey) {
        return lock(lockKey, 3, 30, TimeUnit.SECONDS);
    }

    /**
     * 释放分布式锁
     *
     * @param lock 锁对象
     * @param lockKey 锁键(用于日志)
     */
    public void unlock(RLock lock, String lockKey) {
        if (lock == null) {
            return;
        }

        try {
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
                log.debug("Released distributed lock, key: {}", lockKey);
            } else {
                log.warn("Current thread does not hold the lock, key: {}", lockKey);
            }
        } catch (Exception e) {
            log.error("Error releasing lock, key: {}", lockKey, e);
        }
    }

    /**
     * 构建订单相关的锁键
     *
     * @param orderNo 订单编号
     * @return 锁键
     */
    public String buildOrderLockKey(String orderNo) {
        return "order:lock:" + orderNo;
    }

    /**
     * 构建产品库存相关的锁键
     *
     * @param productId 产品ID
     * @return 锁键
     */
    public String buildProductStockLockKey(Long productId) {
        return "product:stock:lock:" + productId;
    }

    /**
     * 构建用户下单相关的锁键,防止同一用户重复下单
     *
     * @param userId 用户ID
     * @param productId 产品ID
     * @return 锁键
     */
    public String buildUserProductLockKey(Long userId, Long productId) {
        return "user:product:lock:" + userId + ":" + productId;
    }
}
代码语言:javascript
复制

4.2 Redisson 配置

代码语言:javascript
复制
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;

/**
 * Redisson配置类
 *
 * @author ken
 */
@Configuration
public class RedissonConfig {

    /**
     * 配置Redisson客户端
     *
     * @return RedissonClient实例
     * @throws IOException 配置文件读取异常
     */
    @Bean(destroyMethod = "shutdown")
    public RedissonClient redissonClient() throws IOException {
        // 配置Redisson
        Config config = new Config();

        // 单节点配置(生产环境建议使用集群配置)
        config.useSingleServer()
                .setAddress("redis://192.168.1.101:6379")
                .setPassword("redis_password")
                .setDatabase(1)
                // 连接池大小
                .setConnectionPoolSize(32)
                // 最小空闲连接数
                .setConnectionMinimumIdleSize(8)
                // 连接超时时间
                .setConnectTimeout(3000)
                // 命令等待超时时间
                .setTimeout(3000);

        // 集群配置示例
        /*
        config.useClusterServers()
                .addNodeAddress(
                        "redis://192.168.1.101:6379",
                        "redis://192.168.1.102:6379",
                        "redis://192.168.1.103:6379",
                        "redis://192.168.1.104:6379",
                        "redis://192.168.1.105:6379",
                        "redis://192.168.1.106:6379"
                )
                .setPassword("redis_password")
                .setScanInterval(2000)
                .setMasterConnectionPoolSize(32)
                .setSlaveConnectionPoolSize(32)
                .setConnectTimeout(3000);
        */

        return Redisson.create(config);
    }
}
代码语言:javascript
复制

4.3 分布式锁在订单场景的应用

4.3.1 防止重复下单

同一用户可能在短时间内多次点击下单按钮,导致重复订单,需要通过分布式锁防止这种情况:

代码语言:javascript
复制
/**
 * 创建订单,防止重复提交
 *
 * @param userId 用户ID
 * @param orderCreateDTO 订单创建参数
 * @return 订单信息
 */
@Override
public OrderVO createOrder(Long userId, OrderCreateDTO orderCreateDTO) {
    // 参数校验
    validateOrderCreateParams(orderCreateDTO);

    Long productId = orderCreateDTO.getProductId();

    // 构建用户-产品锁键,防止同一用户对同一产品重复下单
    String lockKey = distributedLockManager.buildUserProductLockKey(userId, productId);
    RLock lock = null;

    try {
        // 获取分布式锁,最多等待1秒,持有锁3秒
        lock = distributedLockManager.lock(lockKey, 1, 3, TimeUnit.SECONDS);
        if (lock == null) {
            log.warn("Failed to acquire lock for creating order, userId: {}, productId: {}", userId, productId);
            throw new BusinessException("系统繁忙,请稍后再试");
        }

        // 检查是否已有未支付的相同产品订单
        checkExistingUnpaidOrder(userId, productId);

        // 执行订单创建逻辑
        return doCreateOrder(userId, orderCreateDTO);
    } finally {
        // 释放锁
        distributedLockManager.unlock(lock, lockKey);
    }
}

/**
 * 检查是否已有未支付的相同产品订单
 *
 * @param userId 用户ID
 * @param productId 产品ID
 */
private void checkExistingUnpaidOrder(Long userId, Long productId) {
    QueryWrapper<InsuranceOrder> queryWrapper = new QueryWrapper<>();
    queryWrapper.eq("user_id", userId)
                .eq("product_id", productId)
                .in("status", Arrays.asList(OrderStatus.INITIAL.getCode(), 
                                           OrderStatus.WAIT_PAY.getCode()));

    Integer count = orderMapper.selectCount(queryWrapper);
    if (count != null && count > 0) {
        log.warn("User has existing unpaid order, userId: {}, productId: {}", userId, productId);
        throw new BusinessException("您已有未完成的订单,请先处理");
    }
}
代码语言:javascript
复制

4.3.2 库存扣减与防止超卖

对于有限量的保险产品(如特定活动的短期健康险),需要通过分布式锁保证库存操作的原子性:

代码语言:javascript
复制
/**
 * 扣减产品库存
 *
 * @param productId 产品ID
 * @param quantity 扣减数量
 * @return 是否扣减成功
 */
@Override
public boolean deductStock(Long productId, int quantity) {
    if (productId == null || quantity <= 0) {
        log.warn("Invalid parameters for deducting stock, productId: {}, quantity: {}", productId, quantity);
        return false;
    }

    // 获取产品库存锁
    String lockKey = distributedLockManager.buildProductStockLockKey(productId);
    RLock lock = null;

    try {
        // 获取锁,最多等待3秒,持有锁5秒
        lock = distributedLockManager.lock(lockKey, 3, 5, TimeUnit.SECONDS);
        if (lock == null) {
            log.warn("Failed to acquire lock for deducting stock, productId: {}", productId);
            return false;
        }

        // 查询当前库存
        InsuranceProductStock stock = stockMapper.selectById(productId);
        if (stock == null) {
            log.warn("Product stock not found, productId: {}", productId);
            return false;
        }

        // 检查库存是否充足
        if (stock.getAvailableStock() < quantity) {
            log.warn("Insufficient stock, productId: {}, available: {}, required: {}",
                    productId, stock.getAvailableStock(), quantity);
            return false;
        }

        // 扣减库存(使用乐观锁防止并发问题)
        int rows = stockMapper.deductStock(
                productId, 
                quantity, 
                stock.getVersion()
        );

        if (rows > 0) {
            log.info("Stock deducted successfully, productId: {}, quantity: {}", productId, quantity);
            return true;
        } else {
            log.warn("Failed to deduct stock, productId: {}, quantity: {}, version: {}",
                    productId, quantity, stock.getVersion());
            return false;
        }
    } finally {
        // 释放锁
        distributedLockManager.unlock(lock, lockKey);
    }
}
代码语言:javascript
复制

对应的 Mapper 方法:

代码语言:javascript
复制
/**
 * 扣减库存,使用乐观锁
 *
 * @param productId 产品ID
 * @param quantity 扣减数量
 * @param version 版本号
 * @return 影响行数
 */
@Update("UPDATE insurance_product_stock " +
        "SET available_stock = available_stock - #{quantity}, " +
        "locked_stock = locked_stock + #{quantity}, " +
        "version = version + 1, " +
        "update_time = NOW() " +
        "WHERE product_id = #{productId} " +
        "AND available_stock >= #{quantity} " +
        "AND version = #{version}")
int deductStock(@Param("productId") Long productId,
               @Param("quantity") int quantity,
               @Param("version") int version);
代码语言:javascript
复制

五、消息队列设计与实现

消息队列是实现高并发订单系统的另一个核心组件,主要用于异步处理、流量削峰和系统解耦。

5.1 消息队列配置

代码语言:javascript
复制
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RocketMQ配置类
 *
 * @author ken
 */
@Configuration
public class RocketMQConfig {

    /**
     * 订单相关消息主题
     */
    public static final String ORDER_CREATE_TOPIC = "insurance_order_create_topic";
    public static final String ORDER_PAY_TOPIC = "insurance_order_pay_topic";
    public static final String ORDER_STATUS_CHANGE_TOPIC = "insurance_order_status_change_topic";
    public static final String ORDER_EXPIRE_TOPIC = "insurance_order_expire_topic";

    /**
     * 订单相关消费者组
     */
    public static final String ORDER_PROCESS_CONSUMER_GROUP = "insurance_order_process_consumer_group";
    public static final String ORDER_PAY_CONSUMER_GROUP = "insurance_order_pay_consumer_group";
    public static final String ORDER_STATUS_CONSUMER_GROUP = "insurance_order_status_consumer_group";
    public static final String ORDER_EXPIRE_CONSUMER_GROUP = "insurance_order_expire_consumer_group";
}
代码语言:javascript
复制

application.yml 配置:

代码语言:javascript
复制
rocketmq:
  name-server: 192.168.1.201:9876;192.168.1.202:9876
  producer:
    group: insurance_order_producer_group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 2
    compress-message-body-threshold: 4096
    max-message-size: 4194304
代码语言:javascript
复制

5.2 消息实体定义

代码语言:javascript
复制
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

import java.math.BigDecimal;

/**
 * 订单创建消息
 *
 * @author ken
 */
@Data
@Schema(description = "订单创建消息")
public class OrderCreateMessage {
    @Schema(description = "订单ID")
    private Long orderId;

    @Schema(description = "订单编号")
    private String orderNo;

    @Schema(description = "用户ID")
    private Long userId;

    @Schema(description = "产品ID")
    private Long productId;

    @Schema(description = "产品名称")
    private String productName;

    @Schema(description = "支付金额")
    private BigDecimal payAmount;

    @Schema(description = "创建时间戳")
    private Long createTime;
}
代码语言:javascript
复制

代码语言:javascript
复制
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

import java.time.LocalDateTime;

/**
 * 订单支付消息
 *
 * @author ken
 */
@Data
@Schema(description = "订单支付消息")
public class OrderPayMessage {
    @Schema(description = "订单ID")
    private Long orderId;

    @Schema(description = "订单编号")
    private String orderNo;

    @Schema(description = "支付单号")
    private String payNo;

    @Schema(description = "支付方式:1-微信 2-支付宝 3-银行卡")
    private Integer paymentType;

    @Schema(description = "支付金额")
    private BigDecimal payAmount;

    @Schema(description = "支付时间")
    private LocalDateTime payTime;

    @Schema(description = "消息发送时间戳")
    private Long sendTime;
}
代码语言:javascript
复制
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

/**
 * 订单状态变更消息
 *
 * @author ken
 */
@Data
@Schema(description = "订单状态变更消息")
public class OrderStatusChangeMessage {
    @Schema(description = "订单ID")
    private Long orderId;

    @Schema(description = "订单编号")
    private String orderNo;

    @Schema(description = "变更前状态")
    private Integer beforeStatus;

    @Schema(description = "变更后状态")
    private Integer afterStatus;

    @Schema(description = "状态变更时间戳")
    private Long changeTime;

    @Schema(description = "备注")
    private String remark;
}

5.3 消息生产者实现

代码语言:javascript
复制
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * 订单消息生产者
 *
 * @author ken
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderMessageProducer {

    private final RocketMQTemplate rocketMQTemplate;

    /**
     * 发送订单创建消息
     *
     * @param message 订单创建消息
     * @return 是否发送成功
     */
    public boolean sendOrderCreateMessage(OrderCreateMessage message) {
        if (message == null || !StringUtils.hasText(message.getOrderNo())) {
            log.warn("Invalid order create message: {}", message);
            return false;
        }

        try {
            Message<OrderCreateMessage> rocketMessage = MessageBuilder
                    .withPayload(message)
                    .build();

            // 发送消息,使用订单编号作为key,确保同一订单的消息被同一消费者处理
            SendResult sendResult = rocketMQTemplate.syncSend(
                    RocketMQConfig.ORDER_CREATE_TOPIC + ":" + message.getOrderNo(),
                    rocketMessage
            );

            log.info("Order create message sent successfully, orderNo: {}, sendResult: {}",
                    message.getOrderNo(), sendResult.getSendStatus());
            return true;
        } catch (Exception e) {
            log.error("Failed to send order create message, orderNo: {}", message.getOrderNo(), e);
            return false;
        }
    }

    /**
     * 发送订单支付消息(事务消息)
     *
     * @param message 订单支付消息
     * @param transactionId 事务ID
     * @return 是否发送成功
     */
    public boolean sendOrderPayTransactionMessage(OrderPayMessage message, String transactionId) {
        if (message == null || !StringUtils.hasText(message.getOrderNo()) || !StringUtils.hasText(transactionId)) {
            log.warn("Invalid order pay message or transactionId: {}, transactionId: {}", message, transactionId);
            return false;
        }

        try {
            Message<OrderPayMessage> rocketMessage = MessageBuilder
                    .withPayload(message)
                    .setHeader("TRANSACTION_ID", transactionId)
                    .build();

            // 发送事务消息
            SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
                    RocketMQConfig.ORDER_PAY_TOPIC + ":" + message.getOrderNo(),
                    rocketMessage,
                    null // 额外参数
            );

            log.info("Order pay transaction message sent, orderNo: {}, transactionId: {}, sendResult: {}",
                    message.getOrderNo(), transactionId, sendResult.getSendStatus());
            return true;
        } catch (Exception e) {
            log.error("Failed to send order pay transaction message, orderNo: {}", message.getOrderNo(), e);
            return false;
        }
    }

    /**
     * 发送订单状态变更消息
     *
     * @param message 订单状态变更消息
     * @return 是否发送成功
     */
    public boolean sendOrderStatusChangeMessage(OrderStatusChangeMessage message) {
        if (message == null || !StringUtils.hasText(message.getOrderNo())) {
            log.warn("Invalid order status change message: {}", message);
            return false;
        }

        try {
            Message<OrderStatusChangeMessage> rocketMessage = MessageBuilder
                    .withPayload(message)
                    .build();

            SendResult sendResult = rocketMQTemplate.syncSend(
                    RocketMQConfig.ORDER_STATUS_CHANGE_TOPIC + ":" + message.getOrderNo(),
                    rocketMessage
            );

            log.info("Order status change message sent, orderNo: {}, status: {}->{}, sendResult: {}",
                    message.getOrderNo(), message.getBeforeStatus(), message.getAfterStatus(), 
                    sendResult.getSendStatus());
            return true;
        } catch (Exception e) {
            log.error("Failed to send order status change message, orderNo: {}", message.getOrderNo(), e);
            return false;
        }
    }

    /**
     * 发送订单过期消息(定时消息)
     *
     * @param orderNo 订单编号
     * @param delayTime 延迟时间(毫秒)
     * @return 是否发送成功
     */
    public boolean sendOrderExpireMessage(String orderNo, long delayTime) {
        if (!StringUtils.hasText(orderNo) || delayTime <= 0) {
            log.warn("Invalid orderNo or delayTime: {}, {}", orderNo, delayTime);
            return false;
        }

        try {
            OrderExpireMessage message = new OrderExpireMessage();
            message.setOrderNo(orderNo);
            message.setSendTime(System.currentTimeMillis());

            Message<OrderExpireMessage> rocketMessage = MessageBuilder
                    .withPayload(message)
                    .build();

            // 发送定时消息
            SendResult sendResult = rocketMQTemplate.syncSend(
                    RocketMQConfig.ORDER_EXPIRE_TOPIC + ":" + orderNo,
                    rocketMessage,
                    3000, // 超时时间
                    calculateDelayLevel(delayTime) // 延迟级别
            );

            log.info("Order expire message sent, orderNo: {}, delayTime: {}, sendResult: {}",
                    orderNo, delayTime, sendResult.getSendStatus());
            return true;
        } catch (Exception e) {
            log.error("Failed to send order expire message, orderNo: {}", orderNo, e);
            return false;
        }
    }

    /**
     * 计算RocketMQ延迟级别
     * RocketMQ默认延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     *
     * @param delayTime 延迟时间(毫秒)
     * @return 延迟级别
     */
    private int calculateDelayLevel(long delayTime) {
        long[] delayLevels = {1000, 5000, 10000, 30000, 60000, 120000, 180000, 240000, 300000, 
                            360000, 420000, 480000, 540000, 600000, 1200000, 1800000, 3600000, 7200000};

        for (int i = 0; i < delayLevels.length; i++) {
            if (delayTime <= delayLevels[i]) {
                return i + 1; // 延迟级别从1开始
            }
        }

        return delayLevels.length; // 超过最大延迟时间,使用最大级别
    }
}
代码语言:javascript
复制

5.4 消息消费者实现

代码语言:javascript
复制
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 订单创建消息消费者
 * 处理订单创建后的异步任务:如初始化核保、发送通知等
 *
 * @author ken
 */
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
        topic = RocketMQConfig.ORDER_CREATE_TOPIC,
        consumerGroup = RocketMQConfig.ORDER_PROCESS_CONSUMER_GROUP,
        messageModel = MessageModel.CLUSTERING, // 集群模式,同一消费组的消费者分担消费
        consumeThreadMax = 32, // 最大消费线程数
        consumeTimeout = 30000 // 消费超时时间,单位毫秒
)
public class OrderCreateMessageConsumer implements RocketMQListener<OrderCreateMessage> {

    private final OrderService orderService;
    private final UnderwritingService underwritingService;
    private final NotificationService notificationService;

    @Override
    public void onMessage(OrderCreateMessage message) {
        if (message == null || !StringUtils.hasText(message.getOrderNo())) {
            log.warn("Received invalid order create message: {}", message);
            return;
        }

        String orderNo = message.getOrderNo();
        log.info("Received order create message, orderNo: {}", orderNo);

        try {
            // 1. 记录消息消费日志
            orderService.recordMessageConsumeLog(orderNo, "ORDER_CREATE", "RECEIVED");

            // 2. 初始化核保流程
            underwritingService.initiateUnderwriting(orderNo);

            // 3. 发送订单创建通知(短信/微信)
            notificationService.sendOrderCreateNotification(message.getUserId(), orderNo, message.getProductName());

            // 4. 更新消息消费状态为成功
            orderService.recordMessageConsumeLog(orderNo, "ORDER_CREATE", "SUCCESS");

            log.info("Processed order create message successfully, orderNo: {}", orderNo);
        } catch (Exception e) {
            log.error("Failed to process order create message, orderNo: {}", orderNo, e);
            // 记录消息消费失败日志
            orderService.recordMessageConsumeLog(orderNo, "ORDER_CREATE", "FAILED", e.getMessage());
            // 抛出异常,触发重试机制
            throw new RuntimeException("Failed to process order create message: " + orderNo, e);
        }
    }
}
代码语言:javascript
复制

代码语言:javascript
复制
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 订单支付消息消费者
 * 处理支付成功后的业务:生成保单、释放库存等
 *
 * @author ken
 */
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
        topic = RocketMQConfig.ORDER_PAY_TOPIC,
        consumerGroup = RocketMQConfig.ORDER_PAY_CONSUMER_GROUP,
        messageModel = MessageModel.CLUSTERING,
        consumeThreadMax = 32
)
public class OrderPayMessageConsumer implements RocketMQListener<OrderPayMessage> {

    private final OrderService orderService;
    private final PolicyService policyService;
    private final ProductStockService stockService;
    private final OrderMessageProducer messageProducer;

    @Override
    public void onMessage(OrderPayMessage message) {
        if (message == null || !StringUtils.hasText(message.getOrderNo())) {
            log.warn("Received invalid order pay message: {}", message);
            return;
        }

        String orderNo = message.getOrderNo();
        log.info("Received order pay message, orderNo: {}, payNo: {}", orderNo, message.getPayNo());

        try {
            // 1. 记录消息消费日志
            orderService.recordMessageConsumeLog(orderNo, "ORDER_PAY", "RECEIVED");

            // 2. 更新订单状态为已支付
            boolean statusUpdated = orderService.updateOrderStatus(
                    orderNo, 
                    OrderStatus.WAIT_PAY.getCode(), 
                    OrderStatus.PAID.getCode(), 
                    "支付成功", 
                    message.getPayNo()
            );

            if (!statusUpdated) {
                log.warn("Order status update failed or order already processed, orderNo: {}", orderNo);
                return;
            }

            // 3. 确认扣减库存(将锁定库存转为已使用)
            InsuranceOrder order = orderService.getOrderByOrderNo(orderNo);
            if (order != null) {
                stockService.confirmDeductStock(order.getProductId(), 1);
            }

            // 4. 生成保单
            policyService.generatePolicy(orderNo);

            // 5. 发送订单状态变更消息
            OrderStatusChangeMessage statusMessage = new OrderStatusChangeMessage();
            statusMessage.setOrderNo(orderNo);
            statusMessage.setBeforeStatus(OrderStatus.WAIT_PAY.getCode());
            statusMessage.setAfterStatus(OrderStatus.PAID.getCode());
            statusMessage.setChangeTime(System.currentTimeMillis());
            statusMessage.setRemark("支付成功");
            messageProducer.sendOrderStatusChangeMessage(statusMessage);

            // 6. 记录消息消费成功日志
            orderService.recordMessageConsumeLog(orderNo, "ORDER_PAY", "SUCCESS");

            log.info("Processed order pay message successfully, orderNo: {}", orderNo);
        } catch (Exception e) {
            log.error("Failed to process order pay message, orderNo: {}", orderNo, e);
            orderService.recordMessageConsumeLog(orderNo, "ORDER_PAY", "FAILED", e.getMessage());
            throw new RuntimeException("Failed to process order pay message: " + orderNo, e);
        }
    }
}
代码语言:javascript
复制
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 订单过期消息消费者
 * 处理过期未支付的订单:取消订单、释放库存等
 *
 * @author ken
 */
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
        topic = RocketMQConfig.ORDER_EXPIRE_TOPIC,
        consumerGroup = RocketMQConfig.ORDER_EXPIRE_CONSUMER_GROUP,
        messageModel = MessageModel.CLUSTERING,
        consumeThreadMax = 16
)
public class OrderExpireMessageConsumer implements RocketMQListener<OrderExpireMessage> {

    private final OrderService orderService;
    private final ProductStockService stockService;
    private final OrderMessageProducer messageProducer;

    @Override
    public void onMessage(OrderExpireMessage message) {
        if (message == null || !StringUtils.hasText(message.getOrderNo())) {
            log.warn("Received invalid order expire message: {}", message);
            return;
        }

        String orderNo = message.getOrderNo();
        log.info("Received order expire message, orderNo: {}", orderNo);

        try {
            // 1. 记录消息消费日志
            orderService.recordMessageConsumeLog(orderNo, "ORDER_EXPIRE", "RECEIVED");

            // 2. 查询订单当前状态
            InsuranceOrder order = orderService.getOrderByOrderNo(orderNo);
            if (order == null) {
                log.warn("Order not found, orderNo: {}", orderNo);
                return;
            }

            // 3. 只有待支付状态的订单才需要处理过期
            if (order.getStatus() != OrderStatus.WAIT_PAY.getCode()) {
                log.info("Order is not in wait pay status, no need to expire, orderNo: {}, status: {}",
                        orderNo, order.getStatus());
                return;
            }

            // 4. 更新订单状态为已失效
            boolean statusUpdated = orderService.updateOrderStatus(
                    orderNo, 
                    OrderStatus.WAIT_PAY.getCode(), 
                    OrderStatus.EXPIRED.getCode(), 
                    "订单过期未支付", 
                    null
            );

            if (!statusUpdated) {
                log.warn("Failed to update order status to expired, orderNo: {}", orderNo);
                return;
            }

            // 5. 释放锁定的库存
            stockService.releaseStock(order.getProductId(), 1);

            // 6. 发送订单状态变更消息
            OrderStatusChangeMessage statusMessage = new OrderStatusChangeMessage();
            statusMessage.setOrderNo(orderNo);
            statusMessage.setBeforeStatus(OrderStatus.WAIT_PAY.getCode());
            statusMessage.setAfterStatus(OrderStatus.EXPIRED.getCode());
            statusMessage.setChangeTime(System.currentTimeMillis());
            statusMessage.setRemark("订单过期未支付");
            messageProducer.sendOrderStatusChangeMessage(statusMessage);

            // 7. 记录消息消费成功日志
            orderService.recordMessageConsumeLog(orderNo, "ORDER_EXPIRE", "SUCCESS");

            log.info("Processed order expire message successfully, orderNo: {}", orderNo);
        } catch (Exception e) {
            log.error("Failed to process order expire message, orderNo: {}", orderNo, e);
            orderService.recordMessageConsumeLog(orderNo, "ORDER_EXPIRE", "FAILED", e.getMessage());
            throw new RuntimeException("Failed to process order expire message: " + orderNo, e);
        }
    }
}

5.5 事务消息实现

支付与订单状态更新需要保证最终一致性,使用 RocketMQ 的事务消息来实现:

代码语言:javascript
复制
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * 订单支付事务消息监听器
 * 处理支付结果与订单状态的一致性
 *
 * @author ken
 */
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQTransactionListener(
        txProducerGroup = "insurance_order_pay_transaction_group",
        corePoolSize = 5,
        maximumPoolSize = 10
)
public class OrderPayTransactionListener implements RocketMQLocalTransactionListener {

    private final OrderService orderService;
    private final PaymentService paymentService;

    /**
     * 执行本地事务
     *
     * @param message 消息
     * @param o 额外参数
     * @return 事务状态
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message<?> message, Object o) {
        String transactionId = message.getHeaders().get("TRANSACTION_ID", String.class);
        OrderPayMessage payMessage = (OrderPayMessage) message.getPayload();

        if (payMessage == null || !StringUtils.hasText(payMessage.getOrderNo()) || !StringUtils.hasText(transactionId)) {
            log.warn("Invalid parameters for local transaction, message: {}, transactionId: {}", payMessage, transactionId);
            return RocketMQLocalTransactionState.ROLLBACK;
        }

        String orderNo = payMessage.getOrderNo();
        log.info("Executing local transaction for order pay, orderNo: {}, transactionId: {}", orderNo, transactionId);

        try {
            // 1. 记录支付信息
            boolean paymentSaved = paymentService.savePaymentRecord(
                    orderNo,
                    payMessage.getPayNo(),
                    payMessage.getPaymentType(),
                    payMessage.getPayAmount(),
                    payMessage.getPayTime()
            );

            if (!paymentSaved) {
                log.error("Failed to save payment record, orderNo: {}", orderNo);
                return RocketMQLocalTransactionState.ROLLBACK;
            }

            // 2. 预更新订单状态(标记为支付中)
            boolean preUpdated = orderService.preUpdateOrderAfterPay(orderNo, payMessage.getPayNo());
            if (!preUpdated) {
                log.error("Failed to pre update order after pay, orderNo: {}", orderNo);
                return RocketMQLocalTransactionState.ROLLBACK;
            }

            // 3. 记录事务日志
            orderService.recordTransactionLog(transactionId, orderNo, "PAY", "COMMIT");

            log.info("Local transaction executed successfully, orderNo: {}, transactionId: {}", orderNo, transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            log.error("Local transaction failed, orderNo: {}, transactionId: {}", orderNo, transactionId, e);
            orderService.recordTransactionLog(transactionId, orderNo, "PAY", "ROLLBACK", e.getMessage());
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * 事务回查
     *
     * @param message 消息
     * @return 事务状态
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message<?> message) {
        String transactionId = message.getHeaders().get("TRANSACTION_ID", String.class);
        OrderPayMessage payMessage = (OrderPayMessage) message.getPayload();

        if (payMessage == null || !StringUtils.hasText(payMessage.getOrderNo()) || !StringUtils.hasText(transactionId)) {
            log.warn("Invalid parameters for transaction check, message: {}, transactionId: {}", payMessage, transactionId);
            return RocketMQLocalTransactionState.ROLLBACK;
        }

        String orderNo = payMessage.getOrderNo();
        log.info("Checking local transaction for order pay, orderNo: {}, transactionId: {}", orderNo, transactionId);

        try {
            // 1. 查询事务日志
            TransactionLog log = orderService.getTransactionLog(transactionId);
            if (log != null) {
                switch (log.getStatus()) {
                    case "COMMIT":
                        return RocketMQLocalTransactionState.COMMIT;
                    case "ROLLBACK":
                        return RocketMQLocalTransactionState.ROLLBACK;
                    default:
                        log.warn("Unknown transaction status, transactionId: {}, status: {}", transactionId, log.getStatus());
                        return RocketMQLocalTransactionState.UNKNOWN;
                }
            }

            // 2. 事务日志不存在,查询支付记录
            InsurancePayment payment = paymentService.getPaymentByOrderNo(orderNo);
            if (payment != null && payment.getStatus() == PaymentStatus.SUCCESS.getCode()) {
                // 支付成功,提交事务
                orderService.recordTransactionLog(transactionId, orderNo, "PAY", "COMMIT", "Recovered by check");
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                // 支付未成功或不存在,回滚事务
                orderService.recordTransactionLog(transactionId, orderNo, "PAY", "ROLLBACK", "Recovered by check");
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            log.error("Failed to check local transaction, orderNo: {}, transactionId: {}", orderNo, transactionId, e);
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}
代码语言:javascript
复制

六、订单核心流程实现

结合分布式锁和消息队列,实现保险订单的完整流程。

6.1 订单创建流程

代码语言:javascript
复制


核心代码实现:

代码语言:javascript
复制
/**
 * 执行订单创建逻辑
 *
 * @param userId 用户ID
 * @param orderCreateDTO 订单创建参数
 * @return 订单VO
 */
private OrderVO doCreateOrder(Long userId, OrderCreateDTO orderCreateDTO) {
    // 1. 查询产品信息
    InsuranceProduct product = productService.getProductById(orderCreateDTO.getProductId());
    if (product == null || product.getStatus() != ProductStatus.ON_SALE.getCode()) {
        log.warn("Product not found or not on sale, productId: {}", orderCreateDTO.getProductId());
        throw new BusinessException("产品不存在或已下架");
    }

    // 2. 计算保费
    BigDecimal premium = calculatePremium(product, orderCreateDTO);

    // 3. 扣减库存(如果是限量产品)
    if (product.getIsLimited() == 1) {
        boolean stockDeducted = stockService.deductStock(product.getId(), 1);
        if (!stockDeducted) {
            log.warn("Failed to deduct stock, productId: {}", product.getId());
            throw new BusinessException("产品库存不足");
        }
    }

    // 4. 生成订单号
    String orderNo = generateOrderNo(userId);

    // 5. 创建订单主记录
    LocalDateTime now = LocalDateTime.now();
    // 订单有效期设置为30分钟
    LocalDateTime expireTime = now.plusMinutes(30);

    InsuranceOrder order = new InsuranceOrder();
    order.setOrderNo(orderNo);
    order.setUserId(userId);
    order.setProductId(product.getId());
    order.setProductName(product.getName());
    order.setPremium(premium);
    order.setPayAmount(premium); // 暂时不考虑优惠
    order.setStatus(OrderStatus.WAIT_PAY.getCode());
    order.setPaymentType(orderCreateDTO.getPaymentType());
    order.setExpireTime(expireTime);

    int orderRows = orderMapper.insert(order);
    if (orderRows <= 0) {
        log.error("Failed to insert order, orderNo: {}", orderNo);
        // 回滚库存
        if (product.getIsLimited() == 1) {
            stockService.releaseStock(product.getId(), 1);
        }
        throw new BusinessException("订单创建失败");
    }

    // 6. 创建订单详情
    InsuranceOrderDetail detail = new InsuranceOrderDetail();
    detail.setOrderId(order.getId());
    detail.setInsuredName(orderCreateDTO.getInsuredName());
    detail.setInsuredIdCard(orderCreateDTO.getInsuredIdCard());
    detail.setInsuredAge(orderCreateDTO.getInsuredAge());
    detail.setInsuredGender(orderCreateDTO.getInsuredGender());
    detail.setStartDate(orderCreateDTO.getStartDate());
    detail.setEndDate(orderCreateDTO.getEndDate());
    detail.setCoverageAmount(product.getCoverageAmount());

    int detailRows = orderDetailMapper.insert(detail);
    if (detailRows <= 0) {
        log.error("Failed to insert order detail, orderNo: {}", orderNo);
        // 回滚订单和库存
        orderMapper.deleteById(order.getId());
        if (product.getIsLimited() == 1) {
            stockService.releaseStock(product.getId(), 1);
        }
        throw new BusinessException("订单创建失败");
    }

    // 7. 记录订单操作日志
    recordOrderOperateLog(order.getId(), orderNo, OrderOperateType.CREATE, 
                         null, OrderStatus.WAIT_PAY.getCode(), 
                         "system", "订单创建");

    // 8. 发送订单创建消息(异步处理后续流程)
    OrderCreateMessage createMessage = new OrderCreateMessage();
    createMessage.setOrderId(order.getId());
    createMessage.setOrderNo(orderNo);
    createMessage.setUserId(userId);
    createMessage.setProductId(product.getId());
    createMessage.setProductName(product.getName());
    createMessage.setPayAmount(premium);
    createMessage.setCreateTime(System.currentTimeMillis());
    messageProducer.sendOrderCreateMessage(createMessage);

    // 9. 发送订单过期定时消息(30分钟后)
    long delayTime = Duration.between(now, expireTime).toMillis();
    messageProducer.sendOrderExpireMessage(orderNo, delayTime);

    // 10. 构建并返回订单VO
    OrderVO orderVO = convertToOrderVO(order, detail);
    // 生成支付链接
    orderVO.setPayUrl(generatePayUrl(orderNo, premium, orderCreateDTO.getPaymentType()));

    log.info("Order created successfully, orderNo: {}", orderNo);
    return orderVO;
}

/**
 * 生成订单号
 * 规则:INS+日期(8位)+用户ID后4位+随机数(6位)
 *
 * @param userId 用户ID
 * @return 订单号
 */
private String generateOrderNo(Long userId) {
    String dateStr = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
    String userIdSuffix = String.format("%04d", userId % 10000);
    String randomNum = String.format("%06d", ThreadLocalRandom.current().nextInt(1000000));
    return "INS" + dateStr + userIdSuffix + randomNum;
}

/**
 * 计算保费
 *
 * @param product 产品信息
 * @param orderCreateDTO 订单创建参数
 * @return 保费金额
 */
private BigDecimal calculatePremium(InsuranceProduct product, OrderCreateDTO orderCreateDTO) {
    // 实际业务中会有复杂的保费计算规则
    // 这里简化处理,根据年龄和保障期限计算
    int age = orderCreateDTO.getInsuredAge();
    long days = ChronoUnit.DAYS.between(orderCreateDTO.getStartDate(), orderCreateDTO.getEndDate());

    BigDecimal baseRate = product.getBaseRate();
    // 年龄系数
    BigDecimal ageFactor = getAgeFactor(age);
    // 期限系数
    BigDecimal termFactor = getTermFactor(days);

    return baseRate.multiply(ageFactor).multiply(termFactor).setScale(2, RoundingMode.HALF_UP);
}
代码语言:javascript
复制

6.2 订单支付流程

代码语言:javascript
复制

核心代码实现:

代码语言:javascript
复制
/**
 * 处理支付回调通知
 *
 * @param paymentCallbackDTO 支付回调参数
 * @return 处理结果
 */
@Override
public PaymentCallbackVO handlePaymentCallback(PaymentCallbackDTO paymentCallbackDTO) {
    // 1. 验证回调参数
    if (!validatePaymentCallback(paymentCallbackDTO)) {
        log.warn("Invalid payment callback parameters: {}", paymentCallbackDTO);
        return buildPaymentCallbackResult(false, "参数验证失败");
    }

    String orderNo = paymentCallbackDTO.getOutTradeNo();
    String payNo = paymentCallbackDTO.getTradeNo();
    BigDecimal payAmount = paymentCallbackDTO.getTotalAmount();
    Integer paymentType = mapPaymentType(paymentCallbackDTO.getPaymentType());
    LocalDateTime payTime = paymentCallbackDTO.getPayTime();

    // 2. 获取订单锁
    String lockKey = distributedLockManager.buildOrderLockKey(orderNo);
    RLock lock = null;

    try {
        // 获取锁,最多等待5秒,持有锁10秒
        lock = distributedLockManager.lock(lockKey, 5, 10, TimeUnit.SECONDS);
        if (lock == null) {
            log.warn("Failed to acquire lock for payment callback, orderNo: {}", orderNo);
            // 返回处理中,支付网关会重试
            return buildPaymentCallbackResult(true, "处理中");
        }

        // 3. 查询订单
        InsuranceOrder order = orderMapper.selectByOrderNo(orderNo);
        if (order == null) {
            log.warn("Order not found for payment callback, orderNo: {}", orderNo);
            return buildPaymentCallbackResult(false, "订单不存在");
        }

        // 4. 检查订单状态
        if (order.getStatus() != OrderStatus.WAIT_PAY.getCode()) {
            log.warn("Order status is not wait pay, orderNo: {}, status: {}", orderNo, order.getStatus());
            // 订单状态不是待支付,可能已经处理过,返回成功避免重复处理
            return buildPaymentCallbackResult(true, "订单已处理");
        }

        // 5. 检查支付金额
        if (order.getPayAmount().compareTo(payAmount) != 0) {
            log.warn("Payment amount mismatch, orderNo: {}, expected: {}, actual: {}",
                    orderNo, order.getPayAmount(), payAmount);
            return buildPaymentCallbackResult(false, "支付金额不匹配");
        }

        // 6. 发送支付事务消息
        OrderPayMessage payMessage = new OrderPayMessage();
        payMessage.setOrderId(order.getId());
        payMessage.setOrderNo(orderNo);
        payMessage.setPayNo(payNo);
        payMessage.setPaymentType(paymentType);
        payMessage.setPayAmount(payAmount);
        payMessage.setPayTime(payTime);
        payMessage.setSendTime(System.currentTimeMillis());

        String transactionId = UUID.randomUUID().toString();
        boolean messageSent = messageProducer.sendOrderPayTransactionMessage(payMessage, transactionId);

        if (messageSent) {
            log.info("Payment callback processed successfully, orderNo: {}", orderNo);
            return buildPaymentCallbackResult(true, "处理成功");
        } else {
            log.error("Failed to send payment transaction message, orderNo: {}", orderNo);
            return buildPaymentCallbackResult(false, "处理失败,请稍后重试");
        }
    } finally {
        // 释放锁
        distributedLockManager.unlock(lock, lockKey);
    }
}

/**
 * 验证支付回调参数
 *
 * @param callbackDTO 回调参数
 * @return 是否验证通过
 */
private boolean validatePaymentCallback(PaymentCallbackDTO callbackDTO) {
    if (callbackDTO == null 
            || !StringUtils.hasText(callbackDTO.getOutTradeNo())
            || !StringUtils.hasText(callbackDTO.getTradeNo())
            || callbackDTO.getTotalAmount() == null
            || !StringUtils.hasText(callbackDTO.getSign())) {
        return false;
    }

    // 验证签名(实际业务中实现具体的签名验证逻辑)
    return true;
}

/**
 * 构建支付回调结果
 *
 * @param success 是否成功
 * @param message 消息
 * @return 回调结果VO
 */
private PaymentCallbackVO buildPaymentCallbackResult(boolean success, String message) {
    PaymentCallbackVO result = new PaymentCallbackVO();
    result.setSuccess(success);
    result.setMessage(message);
    // 支付网关要求的特定响应格式,如微信需要返回"success"
    result.setResponseText(success ? "success" : "fail");
    return result;
}
代码语言:javascript
复制

七、性能优化策略

为了支持 1000+TPS 的订单峰值,需要从多个维度进行性能优化。

7.1 数据库优化

  1. 索引优化
    • 为订单表的 order_no、user_id、product_id+status 等字段建立索引
    • 为支付记录表的 pay_no、order_id 建立索引
    • 定期分析慢查询,优化索引设计
  2. 分库分表
    • 订单表按 user_id 进行水平分表,共 16 张表
    • 历史订单(超过 1 年)迁移至归档库
    • 使用 ShardingSphere 实现分库分表逻辑
代码语言:javascript
复制
/**
 * 订单分表策略
 * 基于用户ID取模分片
 */
public class OrderTableShardingStrategy implements PreciseShardingAlgorithm<Long> {
    private static final int TABLE_COUNT = 16;

    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
        Long userId = shardingValue.getValue();
        int tableIndex = (int) (userId % TABLE_COUNT);
        String tableName = "insurance_order_" + tableIndex;

        if (availableTargetNames.contains(tableName)) {
            return tableName;
        }

        throw new UnsupportedOperationException("Invalid table name: " + tableName);
    }
}
代码语言:javascript
复制

  1. 读写分离:
    • 订单查询操作走从库
    • 订单创建和更新操作走主库
    • 配置合适的读写分离策略

7.2 缓存策略

  1. 多级缓存设计
    • 本地缓存(Caffeine):缓存热门产品信息、基础费率表
    • 分布式缓存(Redis):缓存订单状态、用户信息、产品库存
  2. 热点数据缓存
    • 缓存热门保险产品的详情和费率信息
    • 缓存用户的基本信息,避免频繁查询用户服务
    • 缓存产品库存数量,减少数据库访问
代码语言:javascript
复制
/**
 * 产品缓存服务
 *
 * @author ken
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class ProductCacheService {
    private static final String PRODUCT_CACHE_KEY_PREFIX = "product:info:";
    private static final String PRODUCT_RATE_CACHE_KEY_PREFIX = "product:rate:";
    private static final long CACHE_EXPIRE_HOURS = 24;

    private final Cache<String, InsuranceProduct> productLocalCache;
    private final RedisTemplate<String, Object> redisTemplate;
    private final InsuranceProductMapper productMapper;

    /**
     * 获取产品信息(带缓存)
     *
     * @param productId 产品ID
     * @return 产品信息
     */
    public InsuranceProduct getProductById(Long productId) {
        if (productId == null) {
            return null;
        }

        String cacheKey = PRODUCT_CACHE_KEY_PREFIX + productId;

        // 1. 从本地缓存获取
        InsuranceProduct product = productLocalCache.getIfPresent(cacheKey);
        if (product != null) {
            log.debug("Get product from local cache, productId: {}", productId);
            return product;
        }

        // 2. 从Redis获取
        product = getProductFromRedis(cacheKey);
        if (product != null) {
            log.debug("Get product from redis cache, productId: {}", productId);
            // 回写本地缓存
            productLocalCache.put(cacheKey, product);
            return product;
        }

        // 3. 从数据库获取
        product = productMapper.selectById(productId);
        if (product != null) {
            log.debug("Get product from database, productId: {}", productId);
            // 更新缓存
            productLocalCache.put(cacheKey, product);
            setProductToRedis(cacheKey, product);
        }

        return product;
    }

    // 其他方法实现...
}
代码语言:javascript
复制

7.3 并发优化

  1. 线程池优化:
    • 为不同的业务场景配置专用线程池
    • 合理设置核心线程数、最大线程数和队列大小
    • 配置拒绝策略和监控告警
代码语言:javascript
复制
/**
 * 线程池配置
 *
 * @author ken
 */
@Configuration
public class ThreadPoolConfig {

    /**
     * 订单处理线程池
     */
    @Bean(name = "orderThreadPool")
    public Executor orderThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 核心线程数
        executor.setCorePoolSize(16);
        // 最大线程数
        executor.setMaxPoolSize(32);
        // 队列容量
        executor.setQueueCapacity(1000);
        // 线程名称前缀
        executor.setThreadNamePrefix("order-");
        // 线程空闲时间
        executor.setKeepAliveSeconds(60);
        // 拒绝策略:由提交任务的线程执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化
        executor.initialize();
        return executor;
    }

    /**
     * 支付处理线程池
     */
    @Bean(name = "paymentThreadPool")
    public Executor paymentThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(8);
        executor.setMaxPoolSize(16);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("payment-");
        executor.setKeepAliveSeconds(60);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    // 其他线程池配置...
}
代码语言:javascript
复制

  1. 异步处理:
    • 将非核心流程(如日志记录、通知发送)异步化
    • 使用 Spring 的 @Async 注解结合线程池
    • 确保异步操作的可靠性(如通过消息队列)
代码语言:javascript
复制
/**
 * 异步任务服务
 *
 * @author ken
 */
@Slf4j
@Service
public class AsyncTaskService {

    private final NotificationService notificationService;
    private final LogService logService;

    // 构造函数注入...

    /**
     * 异步发送订单通知
     */
    @Async("notificationThreadPool")
    public CompletableFuture<Boolean> sendOrderNotificationAsync(Long userId, String orderNo, String productName) {
        try {
            log.info("Async sending order notification, userId: {}, orderNo: {}", userId, orderNo);
            boolean result = notificationService.sendOrderNotification(userId, orderNo, productName);
            return CompletableFuture.completedFuture(result);
        } catch (Exception e) {
            log.error("Failed to send order notification async, userId: {}, orderNo: {}", userId, orderNo, e);
            return CompletableFuture.completedFuture(false);
        }
    }

    /**
     * 异步记录操作日志
     */
    @Async("logThreadPool")
    public CompletableFuture<Void> recordOperateLogAsync(OperateLog log) {
        try {
            log.info("Async recording operate log, logType: {}, targetId: {}", log.getLogType(), log.getTargetId());
            logService.saveOperateLog(log);
            return CompletableFuture.completedFuture(null);
        } catch (Exception e) {
            log.error("Failed to record operate log async, logType: {}, targetId: {}", 
                    log.getLogType(), log.getTargetId(), e);
            return CompletableFuture.completedFuture(null);
        }
    }
}
代码语言:javascript
复制

八、监控与运维

完善的监控和运维体系是保证系统稳定运行的关键。

8.1 关键监控指标

  1. 业务指标
    • 订单创建成功率
    • 订单支付成功率
    • 平均订单处理时间
    • 各状态订单数量
  2. 技术指标
    • 接口响应时间(P50/P90/P99)
    • 分布式锁获取成功率
    • 消息队列消息堆积量
    • 消息消费成功率
    • 数据库连接池使用率
    • 线程池使用率

8.2 监控实现

使用 Spring Boot Actuator + Prometheus + Grafana 实现监控:

代码语言:javascript
复制
/**
 * 自定义监控指标
 *
 * @author ken
 */
@Component
public class OrderMetricsCollector {

    private final MeterRegistry meterRegistry;

    // 订单创建计数器
    private final Counter orderCreateCounter;
    // 订单支付计数器
    private final Counter orderPayCounter;
    // 订单取消计数器
    private final Counter orderCancelCounter;
    // 订单创建失败计数器
    private final Counter orderCreateFailCounter;
    // 订单支付失败计数器
    private final Counter orderPayFailCounter;

    // 订单创建计时器
    private final Timer orderCreateTimer;
    // 订单支付计时器
    private final Timer orderPayTimer;

    // 分布式锁获取成功率 gauge
    private final AtomicDouble lockSuccessRateGauge = new AtomicDouble(0.0);

    @Autowired
    public OrderMetricsCollector(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;

        // 初始化计数器
        this.orderCreateCounter = Counter.builder("order.create.total")
                .description("Total number of order creations")
                .register(meterRegistry);

        this.orderPayCounter = Counter.builder("order.pay.total")
                .description("Total number of order payments")
                .register(meterRegistry);

        this.orderCancelCounter = Counter.builder("order.cancel.total")
                .description("Total number of order cancellations")
                .register(meterRegistry);

        this.orderCreateFailCounter = Counter.builder("order.create.fail")
                .description("Number of failed order creations")
                .register(meterRegistry);

        this.orderPayFailCounter = Counter.builder("order.pay.fail")
                .description("Number of failed order payments")
                .register(meterRegistry);

        // 初始化计时器
        this.orderCreateTimer = Timer.builder("order.create.time")
                .description("Time taken to create an order")
                .register(meterRegistry);

        this.orderPayTimer = Timer.builder("order.pay.time")
                .description("Time taken to process payment")
                .register(meterRegistry);

        // 注册分布式锁成功率指标
        Gauge.builder("distributed.lock.success.rate", lockSuccessRateGauge, AtomicDouble::get)
                .description("Success rate of acquiring distributed locks")
                .register(meterRegistry);
    }

    /**
     * 记录订单创建
     */
    public void recordOrderCreate() {
        orderCreateCounter.increment();
    }

    /**
     * 记录订单创建失败
     */
    public void recordOrderCreateFail() {
        orderCreateFailCounter.increment();
    }

    /**
     * 记录订单创建时间
     */
    public <T> T recordOrderCreateTime(Supplier<T> supplier) {
        return orderCreateTimer.record(supplier);
    }

    // 其他指标记录方法...

    /**
     * 更新分布式锁成功率
     */
    public void updateLockSuccessRate(double rate) {
        lockSuccessRateGauge.set(rate);
    }
}
代码语言:javascript
复制

8.3 告警配置

设置关键指标的告警阈值,及时发现和解决问题:

  1. 订单创建成功率低于 99% 时告警
  2. 订单支付成功率低于 99.5% 时告警
  3. 接口 P99 响应时间超过 500ms 时告警
  4. 分布式锁获取失败率超过 1% 时告警
  5. 消息队列堆积超过 1000 条时告警
  6. 数据库连接池使用率超过 80% 时告警

九、性能测试与结果分析

为验证系统在高并发场景下的表现,进行了全面的性能测试。

9.1 测试环境

  • 应用服务器:8 台 16 核 32G 云服务器
  • 数据库:MySQL 8.0 主从架构,主库 32 核 64G
  • Redis:3 主 3 从集群,每节点 8 核 16G
  • RocketMQ:2 主 2 从,每节点 8 核 16G
  • 压测工具:JMeter 5.6,20 台压测机

9.2 测试场景与结果

  1. 基准测试
    • 单用户单请求
    • 订单创建响应时间:56ms
    • 订单支付响应时间:42ms
  2. 并发测试
    • 500 并发用户
    • 订单创建 TPS:856
    • 平均响应时间:187ms
    • 99 分位响应时间:326ms
  3. 峰值测试
    • 2000 并发用户
    • 订单创建 TPS:1523
    • 平均响应时间:328ms
    • 99 分位响应时间:654ms
    • 成功率:99.97%
  4. 稳定性测试
    • 1000 并发用户,持续 24 小时
    • 平均 TPS:1120
    • 响应时间波动:±15%
    • 成功率:99.99%
    • 无内存泄漏,CPU 使用率稳定在 60%-70%

9.3 优化前后对比

指标

优化前

优化后

提升比例

订单创建 TPS

302

1523

404%

平均响应时间

820ms

215ms

73.8%

99 分位响应时间

3500ms

654ms

81.3%

订单创建成功率

98.2%

99.99%

1.8%

数据库压力

高(CPU 90%+)

中(CPU 50%-60%)

降低 44.4%

系统稳定性

经常超时

稳定运行

-

十、总结

通过引入分布式锁和消息队列,结合一系列性能优化措施,保险订单系统成功实现了 1000+TPS 的稳定处理能力,同时保证了数据一致性和系统可靠性。

关键经验总结

  1. 技术选型要匹配业务场景:分布式锁选择 Redis+Redisson,消息队列选择 RocketMQ,都是基于保险订单的业务特性做出的合适选择。
  2. 分层处理是高并发的关键:将订单流程拆分为同步核心流程和异步非核心流程,既保证了用户体验,又提升了系统吞吐量。
  3. 一致性设计需权衡取舍:根据业务重要性,采用不同的一致性策略,核心流程强一致,非核心流程最终一致。
  4. 监控告警是稳定运行的保障:完善的监控体系能及时发现潜在问题,避免小问题演变成大故障。
  5. 性能测试必不可少:通过全面的性能测试验证系统设计,发现性能瓶颈,指导优化方向。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-10-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 果酱带你啃java 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、保险订单系统的特殊性与挑战
    • 1.1 保险订单的业务特性
    • 1.2 性能瓶颈分析
    • 1.3 技术挑战的核心矛盾
  • 二、整体架构设计与技术选型
    • 2.1 系统架构图
    • 2.2 架构说明
    • 2.3 关键技术选型
      • 2.3.1 分布式锁:Redis + Redisson
      • 2.3.2 消息队列:Apache RocketMQ
      • 2.3.3 其他核心组件
  • 三、数据库设计与领域模型
    • 3.1 数据库表设计
    • 3.2 领域模型设计
  • 四、分布式锁设计与实现
    • 4.1 分布式锁核心组件
    • 4.2 Redisson 配置
    • 4.3 分布式锁在订单场景的应用
      • 4.3.1 防止重复下单
      • 4.3.2 库存扣减与防止超卖
  • 五、消息队列设计与实现
    • 5.1 消息队列配置
    • 5.2 消息实体定义
    • 5.3 消息生产者实现
    • 5.4 消息消费者实现
    • 5.5 事务消息实现
  • 六、订单核心流程实现
    • 6.1 订单创建流程
    • 6.2 订单支付流程
  • 七、性能优化策略
    • 7.1 数据库优化
    • 7.2 缓存策略
    • 7.3 并发优化
  • 八、监控与运维
    • 8.1 关键监控指标
    • 8.2 监控实现
    • 8.3 告警配置
  • 九、性能测试与结果分析
    • 9.1 测试环境
    • 9.2 测试场景与结果
    • 9.3 优化前后对比
  • 十、总结
    • 关键经验总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档