
保险订单处理是保险业务的核心环节,其性能与稳定性直接影响用户体验和业务收益。在营销活动期间,订单量可能激增 10 倍以上,如何在保证数据一致性的前提下,支撑 1000+TPS 的订单峰值,是保险科技领域的典型技术挑战。
本文将深度剖析某大型财产保险公司的订单系统架构演进历程,详细阐述如何通过 "分布式锁 + 消息队列" 的组合方案,结合领域驱动设计与异步化改造,将订单处理能力从 300 TPS 提升至稳定支持 1500 TPS,同时将订单处理成功率从 98.2% 提升至 99.99%,实现了业务高速增长与系统稳定性的双赢。
保险订单不同于普通电商订单,其业务复杂性和合规性要求带来了独特的技术挑战。
保险订单处理具有以下鲜明特点,直接影响系统架构设计:
在采用分布式锁与消息队列架构前,该保险公司的订单系统面临以下核心问题:
保险订单系统设计面临的核心矛盾体现在三个方面:
这些矛盾在保险领域尤为突出,因为订单处理直接关系到资金安全和用户保障,任何失误都可能引发理赔纠纷和监管风险。
基于保险订单的业务特性和性能挑战,我们设计了 "分布式锁保证核心一致性 + 消息队列实现异步削峰" 的架构方案。

整个订单处理流程分为同步和异步两个部分:
通过这种拆分,将核心链路的响应时间控制在 300ms 以内,同时通过异步处理应对流量峰值。
选择 Redis+Redisson 的理由:
版本选择:Redis 7.2.4 + Redisson 3.24.0
选择 RocketMQ 而非 Kafka 或 RabbitMQ 的原因:
版本选择:RocketMQ 5.2.0
合理的数据库设计是系统高性能的基础,尤其是在高并发场景下。
-- 订单主表
CREATE TABLE `insurance_order` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
`order_no` varchar(64) NOT NULL COMMENT '订单编号,唯一',
`user_id` bigint NOT NULL COMMENT '用户ID',
`product_id` bigint NOT NULL COMMENT '产品ID',
`product_name` varchar(255) NOT NULL COMMENT '产品名称',
`premium` decimal(12,2) NOT NULL COMMENT '保费金额',
`pay_amount` decimal(12,2) NOT NULL COMMENT '实付金额',
`status` tinyint NOT NULL COMMENT '订单状态:0-初始化 1-待支付 2-已支付 3-已取消 4-已失效 5-核保中 6-核保通过 7-核保拒保',
`payment_type` tinyint DEFAULT NULL COMMENT '支付方式:1-微信 2-支付宝 3-银行卡',
`pay_time` datetime DEFAULT NULL COMMENT '支付时间',
`expire_time` datetime NOT NULL COMMENT '订单过期时间',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`version` int NOT NULL DEFAULT 0 COMMENT '版本号,用于乐观锁',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_order_no` (`order_no`),
KEY `idx_user_id` (`user_id`),
KEY `idx_product_id_status` (`product_id`,`status`),
KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='保险订单主表';
-- 订单详情表
CREATE TABLE `insurance_order_detail` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`order_id` bigint NOT NULL COMMENT '订单ID',
`insured_name` varchar(64) NOT NULL COMMENT '被保险人姓名',
`insured_id_card` varchar(32) NOT NULL COMMENT '被保险人身份证号',
`insured_age` int NOT NULL COMMENT '被保险人年龄',
`insured_gender` tinyint NOT NULL COMMENT '被保险人性别:1-男 2-女',
`start_date` date NOT NULL COMMENT '保障开始日期',
`end_date` date NOT NULL COMMENT '保障结束日期',
`coverage_amount` decimal(16,2) NOT NULL COMMENT '保额',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_order_id` (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='保险订单详情表';
-- 支付记录表
CREATE TABLE `insurance_payment` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`order_id` bigint NOT NULL COMMENT '订单ID',
`order_no` varchar(64) NOT NULL COMMENT '订单编号',
`pay_no` varchar(64) NOT NULL COMMENT '支付单号',
`pay_amount` decimal(12,2) NOT NULL COMMENT '支付金额',
`payment_type` tinyint NOT NULL COMMENT '支付方式:1-微信 2-支付宝 3-银行卡',
`status` tinyint NOT NULL COMMENT '支付状态:0-处理中 1-成功 2-失败',
`pay_time` datetime DEFAULT NULL COMMENT '支付时间',
`callback_data` text COMMENT '支付回调数据',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_pay_no` (`pay_no`),
KEY `idx_order_id` (`order_id`),
KEY `idx_order_no` (`order_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='保险支付记录表';
-- 订单操作日志表(用于审计和问题排查)
CREATE TABLE `insurance_order_operate_log` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`order_id` bigint NOT NULL COMMENT '订单ID',
`order_no` varchar(64) NOT NULL COMMENT '订单编号',
`operate_type` tinyint NOT NULL COMMENT '操作类型:1-创建订单 2-支付 3-取消 4-核保 5-生成保单',
`before_status` tinyint DEFAULT NULL COMMENT '操作前状态',
`after_status` tinyint DEFAULT NULL COMMENT '操作后状态',
`operator` varchar(64) NOT NULL COMMENT '操作人',
`operate_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '操作时间',
`remark` varchar(512) DEFAULT NULL COMMENT '备注',
`ext_data` text COMMENT '扩展数据',
PRIMARY KEY (`id`),
KEY `idx_order_id` (`order_id`),
KEY `idx_order_no` (`order_no`),
KEY `idx_operate_time` (`operate_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单操作日志表';
-- 产品库存表(针对有限售的保险产品)
CREATE TABLE `insurance_product_stock` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`product_id` bigint NOT NULL COMMENT '产品ID',
`total_stock` int NOT NULL COMMENT '总库存',
`available_stock` int NOT NULL COMMENT '可用库存',
`locked_stock` int NOT NULL COMMENT '锁定库存',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`version` int NOT NULL DEFAULT 0 COMMENT '版本号,用于乐观锁',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_product_id` (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='保险产品库存表';
基于 DDD 思想设计核心领域模型:
import com.baomidou.mybatisplus.annotation.*;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 保险订单实体
*
* @author ken
*/
@Data
@TableName("insurance_order")
@Schema(description = "保险订单主信息")
public class InsuranceOrder {
@TableId(type = IdType.AUTO)
@Schema(description = "订单ID")
private Long id;
@Schema(description = "订单编号,唯一")
private String orderNo;
@Schema(description = "用户ID")
private Long userId;
@Schema(description = "产品ID")
private Long productId;
@Schema(description = "产品名称")
private String productName;
@Schema(description = "保费金额")
private BigDecimal premium;
@Schema(description = "实付金额")
private BigDecimal payAmount;
@Schema(description = "订单状态:0-初始化 1-待支付 2-已支付 3-已取消 4-已失效 5-核保中 6-核保通过 7-核保拒保")
private Integer status;
@Schema(description = "支付方式:1-微信 2-支付宝 3-银行卡")
private Integer paymentType;
@Schema(description = "支付时间")
private LocalDateTime payTime;
@Schema(description = "订单过期时间")
private LocalDateTime expireTime;
@Schema(description = "创建时间")
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createTime;
@Schema(description = "更新时间")
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updateTime;
@Schema(description = "版本号,用于乐观锁")
@Version
private Integer version;
}
import com.baomidou.mybatisplus.annotation.*;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
/**
* 订单详情实体
*
* @author ken
*/
@Data
@TableName("insurance_order_detail")
@Schema(description = "保险订单详情")
public class InsuranceOrderDetail {
@TableId(type = IdType.AUTO)
@Schema(description = "ID")
private Long id;
@Schema(description = "订单ID")
private Long orderId;
@Schema(description = "被保险人姓名")
private String insuredName;
@Schema(description = "被保险人身份证号")
private String insuredIdCard;
@Schema(description = "被保险人年龄")
private Integer insuredAge;
@Schema(description = "被保险人性别:1-男 2-女")
private Integer insuredGender;
@Schema(description = "保障开始日期")
private LocalDate startDate;
@Schema(description = "保障结束日期")
private LocalDate endDate;
@Schema(description = "保额")
private BigDecimal coverageAmount;
@Schema(description = "创建时间")
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createTime;
@Schema(description = "更新时间")
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updateTime;
}import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
/**
* 订单创建DTO
*
* @author ken
*/
@Data
@Schema(description = "订单创建请求参数")
public class OrderCreateDTO {
@Schema(description = "产品ID", required = true, example = "1001")
private Long productId;
@Schema(description = "被保险人姓名", required = true)
private String insuredName;
@Schema(description = "被保险人身份证号", required = true)
private String insuredIdCard;
@Schema(description = "被保险人年龄", required = true)
private Integer insuredAge;
@Schema(description = "被保险人性别:1-男 2-女", required = true)
private Integer insuredGender;
@Schema(description = "保障开始日期", required = true)
private LocalDate startDate;
@Schema(description = "保障结束日期", required = true)
private LocalDate endDate;
@Schema(description = "支付方式:1-微信 2-支付宝 3-银行卡", required = true)
private Integer paymentType;
}分布式锁是解决高并发下数据一致性问题的关键技术,在保险订单系统中主要用于防止重复下单、库存超卖等场景。
基于 Redisson 实现分布式锁工具类:
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 分布式锁工具类
* 基于Redisson实现,支持可重入锁、公平锁、自动续期等特性
*
* @author ken
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class DistributedLockManager {
private final RedissonClient redissonClient;
/**
* 获取分布式锁
*
* @param lockKey 锁键
* @param waitTime 等待时间
* @param leaseTime 锁持有时间
* @param timeUnit 时间单位
* @return 锁对象,如果获取失败则为null
*/
public RLock lock(String lockKey, long waitTime, long leaseTime, TimeUnit timeUnit) {
if (!StringUtils.hasText(lockKey)) {
log.warn("Lock key is empty");
return null;
}
RLock lock = redissonClient.getLock(lockKey);
try {
// 尝试获取锁
boolean locked = lock.tryLock(waitTime, leaseTime, timeUnit);
if (locked) {
log.debug("Acquired distributed lock, key: {}", lockKey);
return lock;
} else {
log.warn("Failed to acquire distributed lock, key: {}", lockKey);
return null;
}
} catch (InterruptedException e) {
log.error("Interrupted while acquiring lock, key: {}", lockKey, e);
Thread.currentThread().interrupt();
return null;
} catch (Exception e) {
log.error("Error acquiring lock, key: {}", lockKey, e);
return null;
}
}
/**
* 获取分布式锁,使用默认时间设置
* 等待时间3秒,持有时间30秒
*
* @param lockKey 锁键
* @return 锁对象,如果获取失败则为null
*/
public RLock lock(String lockKey) {
return lock(lockKey, 3, 30, TimeUnit.SECONDS);
}
/**
* 释放分布式锁
*
* @param lock 锁对象
* @param lockKey 锁键(用于日志)
*/
public void unlock(RLock lock, String lockKey) {
if (lock == null) {
return;
}
try {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
log.debug("Released distributed lock, key: {}", lockKey);
} else {
log.warn("Current thread does not hold the lock, key: {}", lockKey);
}
} catch (Exception e) {
log.error("Error releasing lock, key: {}", lockKey, e);
}
}
/**
* 构建订单相关的锁键
*
* @param orderNo 订单编号
* @return 锁键
*/
public String buildOrderLockKey(String orderNo) {
return "order:lock:" + orderNo;
}
/**
* 构建产品库存相关的锁键
*
* @param productId 产品ID
* @return 锁键
*/
public String buildProductStockLockKey(Long productId) {
return "product:stock:lock:" + productId;
}
/**
* 构建用户下单相关的锁键,防止同一用户重复下单
*
* @param userId 用户ID
* @param productId 产品ID
* @return 锁键
*/
public String buildUserProductLockKey(Long userId, Long productId) {
return "user:product:lock:" + userId + ":" + productId;
}
}
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
/**
* Redisson配置类
*
* @author ken
*/
@Configuration
public class RedissonConfig {
/**
* 配置Redisson客户端
*
* @return RedissonClient实例
* @throws IOException 配置文件读取异常
*/
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() throws IOException {
// 配置Redisson
Config config = new Config();
// 单节点配置(生产环境建议使用集群配置)
config.useSingleServer()
.setAddress("redis://192.168.1.101:6379")
.setPassword("redis_password")
.setDatabase(1)
// 连接池大小
.setConnectionPoolSize(32)
// 最小空闲连接数
.setConnectionMinimumIdleSize(8)
// 连接超时时间
.setConnectTimeout(3000)
// 命令等待超时时间
.setTimeout(3000);
// 集群配置示例
/*
config.useClusterServers()
.addNodeAddress(
"redis://192.168.1.101:6379",
"redis://192.168.1.102:6379",
"redis://192.168.1.103:6379",
"redis://192.168.1.104:6379",
"redis://192.168.1.105:6379",
"redis://192.168.1.106:6379"
)
.setPassword("redis_password")
.setScanInterval(2000)
.setMasterConnectionPoolSize(32)
.setSlaveConnectionPoolSize(32)
.setConnectTimeout(3000);
*/
return Redisson.create(config);
}
}
同一用户可能在短时间内多次点击下单按钮,导致重复订单,需要通过分布式锁防止这种情况:
/**
* 创建订单,防止重复提交
*
* @param userId 用户ID
* @param orderCreateDTO 订单创建参数
* @return 订单信息
*/
@Override
public OrderVO createOrder(Long userId, OrderCreateDTO orderCreateDTO) {
// 参数校验
validateOrderCreateParams(orderCreateDTO);
Long productId = orderCreateDTO.getProductId();
// 构建用户-产品锁键,防止同一用户对同一产品重复下单
String lockKey = distributedLockManager.buildUserProductLockKey(userId, productId);
RLock lock = null;
try {
// 获取分布式锁,最多等待1秒,持有锁3秒
lock = distributedLockManager.lock(lockKey, 1, 3, TimeUnit.SECONDS);
if (lock == null) {
log.warn("Failed to acquire lock for creating order, userId: {}, productId: {}", userId, productId);
throw new BusinessException("系统繁忙,请稍后再试");
}
// 检查是否已有未支付的相同产品订单
checkExistingUnpaidOrder(userId, productId);
// 执行订单创建逻辑
return doCreateOrder(userId, orderCreateDTO);
} finally {
// 释放锁
distributedLockManager.unlock(lock, lockKey);
}
}
/**
* 检查是否已有未支付的相同产品订单
*
* @param userId 用户ID
* @param productId 产品ID
*/
private void checkExistingUnpaidOrder(Long userId, Long productId) {
QueryWrapper<InsuranceOrder> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("user_id", userId)
.eq("product_id", productId)
.in("status", Arrays.asList(OrderStatus.INITIAL.getCode(),
OrderStatus.WAIT_PAY.getCode()));
Integer count = orderMapper.selectCount(queryWrapper);
if (count != null && count > 0) {
log.warn("User has existing unpaid order, userId: {}, productId: {}", userId, productId);
throw new BusinessException("您已有未完成的订单,请先处理");
}
}
对于有限量的保险产品(如特定活动的短期健康险),需要通过分布式锁保证库存操作的原子性:
/**
* 扣减产品库存
*
* @param productId 产品ID
* @param quantity 扣减数量
* @return 是否扣减成功
*/
@Override
public boolean deductStock(Long productId, int quantity) {
if (productId == null || quantity <= 0) {
log.warn("Invalid parameters for deducting stock, productId: {}, quantity: {}", productId, quantity);
return false;
}
// 获取产品库存锁
String lockKey = distributedLockManager.buildProductStockLockKey(productId);
RLock lock = null;
try {
// 获取锁,最多等待3秒,持有锁5秒
lock = distributedLockManager.lock(lockKey, 3, 5, TimeUnit.SECONDS);
if (lock == null) {
log.warn("Failed to acquire lock for deducting stock, productId: {}", productId);
return false;
}
// 查询当前库存
InsuranceProductStock stock = stockMapper.selectById(productId);
if (stock == null) {
log.warn("Product stock not found, productId: {}", productId);
return false;
}
// 检查库存是否充足
if (stock.getAvailableStock() < quantity) {
log.warn("Insufficient stock, productId: {}, available: {}, required: {}",
productId, stock.getAvailableStock(), quantity);
return false;
}
// 扣减库存(使用乐观锁防止并发问题)
int rows = stockMapper.deductStock(
productId,
quantity,
stock.getVersion()
);
if (rows > 0) {
log.info("Stock deducted successfully, productId: {}, quantity: {}", productId, quantity);
return true;
} else {
log.warn("Failed to deduct stock, productId: {}, quantity: {}, version: {}",
productId, quantity, stock.getVersion());
return false;
}
} finally {
// 释放锁
distributedLockManager.unlock(lock, lockKey);
}
}
对应的 Mapper 方法:
/**
* 扣减库存,使用乐观锁
*
* @param productId 产品ID
* @param quantity 扣减数量
* @param version 版本号
* @return 影响行数
*/
@Update("UPDATE insurance_product_stock " +
"SET available_stock = available_stock - #{quantity}, " +
"locked_stock = locked_stock + #{quantity}, " +
"version = version + 1, " +
"update_time = NOW() " +
"WHERE product_id = #{productId} " +
"AND available_stock >= #{quantity} " +
"AND version = #{version}")
int deductStock(@Param("productId") Long productId,
@Param("quantity") int quantity,
@Param("version") int version);
消息队列是实现高并发订单系统的另一个核心组件,主要用于异步处理、流量削峰和系统解耦。
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RocketMQ配置类
*
* @author ken
*/
@Configuration
public class RocketMQConfig {
/**
* 订单相关消息主题
*/
public static final String ORDER_CREATE_TOPIC = "insurance_order_create_topic";
public static final String ORDER_PAY_TOPIC = "insurance_order_pay_topic";
public static final String ORDER_STATUS_CHANGE_TOPIC = "insurance_order_status_change_topic";
public static final String ORDER_EXPIRE_TOPIC = "insurance_order_expire_topic";
/**
* 订单相关消费者组
*/
public static final String ORDER_PROCESS_CONSUMER_GROUP = "insurance_order_process_consumer_group";
public static final String ORDER_PAY_CONSUMER_GROUP = "insurance_order_pay_consumer_group";
public static final String ORDER_STATUS_CONSUMER_GROUP = "insurance_order_status_consumer_group";
public static final String ORDER_EXPIRE_CONSUMER_GROUP = "insurance_order_expire_consumer_group";
}
application.yml 配置:
rocketmq:
name-server: 192.168.1.201:9876;192.168.1.202:9876
producer:
group: insurance_order_producer_group
send-message-timeout: 3000
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
compress-message-body-threshold: 4096
max-message-size: 4194304
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.math.BigDecimal;
/**
* 订单创建消息
*
* @author ken
*/
@Data
@Schema(description = "订单创建消息")
public class OrderCreateMessage {
@Schema(description = "订单ID")
private Long orderId;
@Schema(description = "订单编号")
private String orderNo;
@Schema(description = "用户ID")
private Long userId;
@Schema(description = "产品ID")
private Long productId;
@Schema(description = "产品名称")
private String productName;
@Schema(description = "支付金额")
private BigDecimal payAmount;
@Schema(description = "创建时间戳")
private Long createTime;
}
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 订单支付消息
*
* @author ken
*/
@Data
@Schema(description = "订单支付消息")
public class OrderPayMessage {
@Schema(description = "订单ID")
private Long orderId;
@Schema(description = "订单编号")
private String orderNo;
@Schema(description = "支付单号")
private String payNo;
@Schema(description = "支付方式:1-微信 2-支付宝 3-银行卡")
private Integer paymentType;
@Schema(description = "支付金额")
private BigDecimal payAmount;
@Schema(description = "支付时间")
private LocalDateTime payTime;
@Schema(description = "消息发送时间戳")
private Long sendTime;
}import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
/**
* 订单状态变更消息
*
* @author ken
*/
@Data
@Schema(description = "订单状态变更消息")
public class OrderStatusChangeMessage {
@Schema(description = "订单ID")
private Long orderId;
@Schema(description = "订单编号")
private String orderNo;
@Schema(description = "变更前状态")
private Integer beforeStatus;
@Schema(description = "变更后状态")
private Integer afterStatus;
@Schema(description = "状态变更时间戳")
private Long changeTime;
@Schema(description = "备注")
private String remark;
}import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
/**
* 订单消息生产者
*
* @author ken
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderMessageProducer {
private final RocketMQTemplate rocketMQTemplate;
/**
* 发送订单创建消息
*
* @param message 订单创建消息
* @return 是否发送成功
*/
public boolean sendOrderCreateMessage(OrderCreateMessage message) {
if (message == null || !StringUtils.hasText(message.getOrderNo())) {
log.warn("Invalid order create message: {}", message);
return false;
}
try {
Message<OrderCreateMessage> rocketMessage = MessageBuilder
.withPayload(message)
.build();
// 发送消息,使用订单编号作为key,确保同一订单的消息被同一消费者处理
SendResult sendResult = rocketMQTemplate.syncSend(
RocketMQConfig.ORDER_CREATE_TOPIC + ":" + message.getOrderNo(),
rocketMessage
);
log.info("Order create message sent successfully, orderNo: {}, sendResult: {}",
message.getOrderNo(), sendResult.getSendStatus());
return true;
} catch (Exception e) {
log.error("Failed to send order create message, orderNo: {}", message.getOrderNo(), e);
return false;
}
}
/**
* 发送订单支付消息(事务消息)
*
* @param message 订单支付消息
* @param transactionId 事务ID
* @return 是否发送成功
*/
public boolean sendOrderPayTransactionMessage(OrderPayMessage message, String transactionId) {
if (message == null || !StringUtils.hasText(message.getOrderNo()) || !StringUtils.hasText(transactionId)) {
log.warn("Invalid order pay message or transactionId: {}, transactionId: {}", message, transactionId);
return false;
}
try {
Message<OrderPayMessage> rocketMessage = MessageBuilder
.withPayload(message)
.setHeader("TRANSACTION_ID", transactionId)
.build();
// 发送事务消息
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(
RocketMQConfig.ORDER_PAY_TOPIC + ":" + message.getOrderNo(),
rocketMessage,
null // 额外参数
);
log.info("Order pay transaction message sent, orderNo: {}, transactionId: {}, sendResult: {}",
message.getOrderNo(), transactionId, sendResult.getSendStatus());
return true;
} catch (Exception e) {
log.error("Failed to send order pay transaction message, orderNo: {}", message.getOrderNo(), e);
return false;
}
}
/**
* 发送订单状态变更消息
*
* @param message 订单状态变更消息
* @return 是否发送成功
*/
public boolean sendOrderStatusChangeMessage(OrderStatusChangeMessage message) {
if (message == null || !StringUtils.hasText(message.getOrderNo())) {
log.warn("Invalid order status change message: {}", message);
return false;
}
try {
Message<OrderStatusChangeMessage> rocketMessage = MessageBuilder
.withPayload(message)
.build();
SendResult sendResult = rocketMQTemplate.syncSend(
RocketMQConfig.ORDER_STATUS_CHANGE_TOPIC + ":" + message.getOrderNo(),
rocketMessage
);
log.info("Order status change message sent, orderNo: {}, status: {}->{}, sendResult: {}",
message.getOrderNo(), message.getBeforeStatus(), message.getAfterStatus(),
sendResult.getSendStatus());
return true;
} catch (Exception e) {
log.error("Failed to send order status change message, orderNo: {}", message.getOrderNo(), e);
return false;
}
}
/**
* 发送订单过期消息(定时消息)
*
* @param orderNo 订单编号
* @param delayTime 延迟时间(毫秒)
* @return 是否发送成功
*/
public boolean sendOrderExpireMessage(String orderNo, long delayTime) {
if (!StringUtils.hasText(orderNo) || delayTime <= 0) {
log.warn("Invalid orderNo or delayTime: {}, {}", orderNo, delayTime);
return false;
}
try {
OrderExpireMessage message = new OrderExpireMessage();
message.setOrderNo(orderNo);
message.setSendTime(System.currentTimeMillis());
Message<OrderExpireMessage> rocketMessage = MessageBuilder
.withPayload(message)
.build();
// 发送定时消息
SendResult sendResult = rocketMQTemplate.syncSend(
RocketMQConfig.ORDER_EXPIRE_TOPIC + ":" + orderNo,
rocketMessage,
3000, // 超时时间
calculateDelayLevel(delayTime) // 延迟级别
);
log.info("Order expire message sent, orderNo: {}, delayTime: {}, sendResult: {}",
orderNo, delayTime, sendResult.getSendStatus());
return true;
} catch (Exception e) {
log.error("Failed to send order expire message, orderNo: {}", orderNo, e);
return false;
}
}
/**
* 计算RocketMQ延迟级别
* RocketMQ默认延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*
* @param delayTime 延迟时间(毫秒)
* @return 延迟级别
*/
private int calculateDelayLevel(long delayTime) {
long[] delayLevels = {1000, 5000, 10000, 30000, 60000, 120000, 180000, 240000, 300000,
360000, 420000, 480000, 540000, 600000, 1200000, 1800000, 3600000, 7200000};
for (int i = 0; i < delayLevels.length; i++) {
if (delayTime <= delayLevels[i]) {
return i + 1; // 延迟级别从1开始
}
}
return delayLevels.length; // 超过最大延迟时间,使用最大级别
}
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 订单创建消息消费者
* 处理订单创建后的异步任务:如初始化核保、发送通知等
*
* @author ken
*/
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = RocketMQConfig.ORDER_CREATE_TOPIC,
consumerGroup = RocketMQConfig.ORDER_PROCESS_CONSUMER_GROUP,
messageModel = MessageModel.CLUSTERING, // 集群模式,同一消费组的消费者分担消费
consumeThreadMax = 32, // 最大消费线程数
consumeTimeout = 30000 // 消费超时时间,单位毫秒
)
public class OrderCreateMessageConsumer implements RocketMQListener<OrderCreateMessage> {
private final OrderService orderService;
private final UnderwritingService underwritingService;
private final NotificationService notificationService;
@Override
public void onMessage(OrderCreateMessage message) {
if (message == null || !StringUtils.hasText(message.getOrderNo())) {
log.warn("Received invalid order create message: {}", message);
return;
}
String orderNo = message.getOrderNo();
log.info("Received order create message, orderNo: {}", orderNo);
try {
// 1. 记录消息消费日志
orderService.recordMessageConsumeLog(orderNo, "ORDER_CREATE", "RECEIVED");
// 2. 初始化核保流程
underwritingService.initiateUnderwriting(orderNo);
// 3. 发送订单创建通知(短信/微信)
notificationService.sendOrderCreateNotification(message.getUserId(), orderNo, message.getProductName());
// 4. 更新消息消费状态为成功
orderService.recordMessageConsumeLog(orderNo, "ORDER_CREATE", "SUCCESS");
log.info("Processed order create message successfully, orderNo: {}", orderNo);
} catch (Exception e) {
log.error("Failed to process order create message, orderNo: {}", orderNo, e);
// 记录消息消费失败日志
orderService.recordMessageConsumeLog(orderNo, "ORDER_CREATE", "FAILED", e.getMessage());
// 抛出异常,触发重试机制
throw new RuntimeException("Failed to process order create message: " + orderNo, e);
}
}
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 订单支付消息消费者
* 处理支付成功后的业务:生成保单、释放库存等
*
* @author ken
*/
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = RocketMQConfig.ORDER_PAY_TOPIC,
consumerGroup = RocketMQConfig.ORDER_PAY_CONSUMER_GROUP,
messageModel = MessageModel.CLUSTERING,
consumeThreadMax = 32
)
public class OrderPayMessageConsumer implements RocketMQListener<OrderPayMessage> {
private final OrderService orderService;
private final PolicyService policyService;
private final ProductStockService stockService;
private final OrderMessageProducer messageProducer;
@Override
public void onMessage(OrderPayMessage message) {
if (message == null || !StringUtils.hasText(message.getOrderNo())) {
log.warn("Received invalid order pay message: {}", message);
return;
}
String orderNo = message.getOrderNo();
log.info("Received order pay message, orderNo: {}, payNo: {}", orderNo, message.getPayNo());
try {
// 1. 记录消息消费日志
orderService.recordMessageConsumeLog(orderNo, "ORDER_PAY", "RECEIVED");
// 2. 更新订单状态为已支付
boolean statusUpdated = orderService.updateOrderStatus(
orderNo,
OrderStatus.WAIT_PAY.getCode(),
OrderStatus.PAID.getCode(),
"支付成功",
message.getPayNo()
);
if (!statusUpdated) {
log.warn("Order status update failed or order already processed, orderNo: {}", orderNo);
return;
}
// 3. 确认扣减库存(将锁定库存转为已使用)
InsuranceOrder order = orderService.getOrderByOrderNo(orderNo);
if (order != null) {
stockService.confirmDeductStock(order.getProductId(), 1);
}
// 4. 生成保单
policyService.generatePolicy(orderNo);
// 5. 发送订单状态变更消息
OrderStatusChangeMessage statusMessage = new OrderStatusChangeMessage();
statusMessage.setOrderNo(orderNo);
statusMessage.setBeforeStatus(OrderStatus.WAIT_PAY.getCode());
statusMessage.setAfterStatus(OrderStatus.PAID.getCode());
statusMessage.setChangeTime(System.currentTimeMillis());
statusMessage.setRemark("支付成功");
messageProducer.sendOrderStatusChangeMessage(statusMessage);
// 6. 记录消息消费成功日志
orderService.recordMessageConsumeLog(orderNo, "ORDER_PAY", "SUCCESS");
log.info("Processed order pay message successfully, orderNo: {}", orderNo);
} catch (Exception e) {
log.error("Failed to process order pay message, orderNo: {}", orderNo, e);
orderService.recordMessageConsumeLog(orderNo, "ORDER_PAY", "FAILED", e.getMessage());
throw new RuntimeException("Failed to process order pay message: " + orderNo, e);
}
}
}import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
/**
* 订单过期消息消费者
* 处理过期未支付的订单:取消订单、释放库存等
*
* @author ken
*/
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQMessageListener(
topic = RocketMQConfig.ORDER_EXPIRE_TOPIC,
consumerGroup = RocketMQConfig.ORDER_EXPIRE_CONSUMER_GROUP,
messageModel = MessageModel.CLUSTERING,
consumeThreadMax = 16
)
public class OrderExpireMessageConsumer implements RocketMQListener<OrderExpireMessage> {
private final OrderService orderService;
private final ProductStockService stockService;
private final OrderMessageProducer messageProducer;
@Override
public void onMessage(OrderExpireMessage message) {
if (message == null || !StringUtils.hasText(message.getOrderNo())) {
log.warn("Received invalid order expire message: {}", message);
return;
}
String orderNo = message.getOrderNo();
log.info("Received order expire message, orderNo: {}", orderNo);
try {
// 1. 记录消息消费日志
orderService.recordMessageConsumeLog(orderNo, "ORDER_EXPIRE", "RECEIVED");
// 2. 查询订单当前状态
InsuranceOrder order = orderService.getOrderByOrderNo(orderNo);
if (order == null) {
log.warn("Order not found, orderNo: {}", orderNo);
return;
}
// 3. 只有待支付状态的订单才需要处理过期
if (order.getStatus() != OrderStatus.WAIT_PAY.getCode()) {
log.info("Order is not in wait pay status, no need to expire, orderNo: {}, status: {}",
orderNo, order.getStatus());
return;
}
// 4. 更新订单状态为已失效
boolean statusUpdated = orderService.updateOrderStatus(
orderNo,
OrderStatus.WAIT_PAY.getCode(),
OrderStatus.EXPIRED.getCode(),
"订单过期未支付",
null
);
if (!statusUpdated) {
log.warn("Failed to update order status to expired, orderNo: {}", orderNo);
return;
}
// 5. 释放锁定的库存
stockService.releaseStock(order.getProductId(), 1);
// 6. 发送订单状态变更消息
OrderStatusChangeMessage statusMessage = new OrderStatusChangeMessage();
statusMessage.setOrderNo(orderNo);
statusMessage.setBeforeStatus(OrderStatus.WAIT_PAY.getCode());
statusMessage.setAfterStatus(OrderStatus.EXPIRED.getCode());
statusMessage.setChangeTime(System.currentTimeMillis());
statusMessage.setRemark("订单过期未支付");
messageProducer.sendOrderStatusChangeMessage(statusMessage);
// 7. 记录消息消费成功日志
orderService.recordMessageConsumeLog(orderNo, "ORDER_EXPIRE", "SUCCESS");
log.info("Processed order expire message successfully, orderNo: {}", orderNo);
} catch (Exception e) {
log.error("Failed to process order expire message, orderNo: {}", orderNo, e);
orderService.recordMessageConsumeLog(orderNo, "ORDER_EXPIRE", "FAILED", e.getMessage());
throw new RuntimeException("Failed to process order expire message: " + orderNo, e);
}
}
}支付与订单状态更新需要保证最终一致性,使用 RocketMQ 的事务消息来实现:
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
/**
* 订单支付事务消息监听器
* 处理支付结果与订单状态的一致性
*
* @author ken
*/
@Slf4j
@Component
@RequiredArgsConstructor
@RocketMQTransactionListener(
txProducerGroup = "insurance_order_pay_transaction_group",
corePoolSize = 5,
maximumPoolSize = 10
)
public class OrderPayTransactionListener implements RocketMQLocalTransactionListener {
private final OrderService orderService;
private final PaymentService paymentService;
/**
* 执行本地事务
*
* @param message 消息
* @param o 额外参数
* @return 事务状态
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message<?> message, Object o) {
String transactionId = message.getHeaders().get("TRANSACTION_ID", String.class);
OrderPayMessage payMessage = (OrderPayMessage) message.getPayload();
if (payMessage == null || !StringUtils.hasText(payMessage.getOrderNo()) || !StringUtils.hasText(transactionId)) {
log.warn("Invalid parameters for local transaction, message: {}, transactionId: {}", payMessage, transactionId);
return RocketMQLocalTransactionState.ROLLBACK;
}
String orderNo = payMessage.getOrderNo();
log.info("Executing local transaction for order pay, orderNo: {}, transactionId: {}", orderNo, transactionId);
try {
// 1. 记录支付信息
boolean paymentSaved = paymentService.savePaymentRecord(
orderNo,
payMessage.getPayNo(),
payMessage.getPaymentType(),
payMessage.getPayAmount(),
payMessage.getPayTime()
);
if (!paymentSaved) {
log.error("Failed to save payment record, orderNo: {}", orderNo);
return RocketMQLocalTransactionState.ROLLBACK;
}
// 2. 预更新订单状态(标记为支付中)
boolean preUpdated = orderService.preUpdateOrderAfterPay(orderNo, payMessage.getPayNo());
if (!preUpdated) {
log.error("Failed to pre update order after pay, orderNo: {}", orderNo);
return RocketMQLocalTransactionState.ROLLBACK;
}
// 3. 记录事务日志
orderService.recordTransactionLog(transactionId, orderNo, "PAY", "COMMIT");
log.info("Local transaction executed successfully, orderNo: {}, transactionId: {}", orderNo, transactionId);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("Local transaction failed, orderNo: {}, transactionId: {}", orderNo, transactionId, e);
orderService.recordTransactionLog(transactionId, orderNo, "PAY", "ROLLBACK", e.getMessage());
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 事务回查
*
* @param message 消息
* @return 事务状态
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message<?> message) {
String transactionId = message.getHeaders().get("TRANSACTION_ID", String.class);
OrderPayMessage payMessage = (OrderPayMessage) message.getPayload();
if (payMessage == null || !StringUtils.hasText(payMessage.getOrderNo()) || !StringUtils.hasText(transactionId)) {
log.warn("Invalid parameters for transaction check, message: {}, transactionId: {}", payMessage, transactionId);
return RocketMQLocalTransactionState.ROLLBACK;
}
String orderNo = payMessage.getOrderNo();
log.info("Checking local transaction for order pay, orderNo: {}, transactionId: {}", orderNo, transactionId);
try {
// 1. 查询事务日志
TransactionLog log = orderService.getTransactionLog(transactionId);
if (log != null) {
switch (log.getStatus()) {
case "COMMIT":
return RocketMQLocalTransactionState.COMMIT;
case "ROLLBACK":
return RocketMQLocalTransactionState.ROLLBACK;
default:
log.warn("Unknown transaction status, transactionId: {}, status: {}", transactionId, log.getStatus());
return RocketMQLocalTransactionState.UNKNOWN;
}
}
// 2. 事务日志不存在,查询支付记录
InsurancePayment payment = paymentService.getPaymentByOrderNo(orderNo);
if (payment != null && payment.getStatus() == PaymentStatus.SUCCESS.getCode()) {
// 支付成功,提交事务
orderService.recordTransactionLog(transactionId, orderNo, "PAY", "COMMIT", "Recovered by check");
return RocketMQLocalTransactionState.COMMIT;
} else {
// 支付未成功或不存在,回滚事务
orderService.recordTransactionLog(transactionId, orderNo, "PAY", "ROLLBACK", "Recovered by check");
return RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
log.error("Failed to check local transaction, orderNo: {}, transactionId: {}", orderNo, transactionId, e);
return RocketMQLocalTransactionState.UNKNOWN;
}
}
}
结合分布式锁和消息队列,实现保险订单的完整流程。

核心代码实现:
/**
* 执行订单创建逻辑
*
* @param userId 用户ID
* @param orderCreateDTO 订单创建参数
* @return 订单VO
*/
private OrderVO doCreateOrder(Long userId, OrderCreateDTO orderCreateDTO) {
// 1. 查询产品信息
InsuranceProduct product = productService.getProductById(orderCreateDTO.getProductId());
if (product == null || product.getStatus() != ProductStatus.ON_SALE.getCode()) {
log.warn("Product not found or not on sale, productId: {}", orderCreateDTO.getProductId());
throw new BusinessException("产品不存在或已下架");
}
// 2. 计算保费
BigDecimal premium = calculatePremium(product, orderCreateDTO);
// 3. 扣减库存(如果是限量产品)
if (product.getIsLimited() == 1) {
boolean stockDeducted = stockService.deductStock(product.getId(), 1);
if (!stockDeducted) {
log.warn("Failed to deduct stock, productId: {}", product.getId());
throw new BusinessException("产品库存不足");
}
}
// 4. 生成订单号
String orderNo = generateOrderNo(userId);
// 5. 创建订单主记录
LocalDateTime now = LocalDateTime.now();
// 订单有效期设置为30分钟
LocalDateTime expireTime = now.plusMinutes(30);
InsuranceOrder order = new InsuranceOrder();
order.setOrderNo(orderNo);
order.setUserId(userId);
order.setProductId(product.getId());
order.setProductName(product.getName());
order.setPremium(premium);
order.setPayAmount(premium); // 暂时不考虑优惠
order.setStatus(OrderStatus.WAIT_PAY.getCode());
order.setPaymentType(orderCreateDTO.getPaymentType());
order.setExpireTime(expireTime);
int orderRows = orderMapper.insert(order);
if (orderRows <= 0) {
log.error("Failed to insert order, orderNo: {}", orderNo);
// 回滚库存
if (product.getIsLimited() == 1) {
stockService.releaseStock(product.getId(), 1);
}
throw new BusinessException("订单创建失败");
}
// 6. 创建订单详情
InsuranceOrderDetail detail = new InsuranceOrderDetail();
detail.setOrderId(order.getId());
detail.setInsuredName(orderCreateDTO.getInsuredName());
detail.setInsuredIdCard(orderCreateDTO.getInsuredIdCard());
detail.setInsuredAge(orderCreateDTO.getInsuredAge());
detail.setInsuredGender(orderCreateDTO.getInsuredGender());
detail.setStartDate(orderCreateDTO.getStartDate());
detail.setEndDate(orderCreateDTO.getEndDate());
detail.setCoverageAmount(product.getCoverageAmount());
int detailRows = orderDetailMapper.insert(detail);
if (detailRows <= 0) {
log.error("Failed to insert order detail, orderNo: {}", orderNo);
// 回滚订单和库存
orderMapper.deleteById(order.getId());
if (product.getIsLimited() == 1) {
stockService.releaseStock(product.getId(), 1);
}
throw new BusinessException("订单创建失败");
}
// 7. 记录订单操作日志
recordOrderOperateLog(order.getId(), orderNo, OrderOperateType.CREATE,
null, OrderStatus.WAIT_PAY.getCode(),
"system", "订单创建");
// 8. 发送订单创建消息(异步处理后续流程)
OrderCreateMessage createMessage = new OrderCreateMessage();
createMessage.setOrderId(order.getId());
createMessage.setOrderNo(orderNo);
createMessage.setUserId(userId);
createMessage.setProductId(product.getId());
createMessage.setProductName(product.getName());
createMessage.setPayAmount(premium);
createMessage.setCreateTime(System.currentTimeMillis());
messageProducer.sendOrderCreateMessage(createMessage);
// 9. 发送订单过期定时消息(30分钟后)
long delayTime = Duration.between(now, expireTime).toMillis();
messageProducer.sendOrderExpireMessage(orderNo, delayTime);
// 10. 构建并返回订单VO
OrderVO orderVO = convertToOrderVO(order, detail);
// 生成支付链接
orderVO.setPayUrl(generatePayUrl(orderNo, premium, orderCreateDTO.getPaymentType()));
log.info("Order created successfully, orderNo: {}", orderNo);
return orderVO;
}
/**
* 生成订单号
* 规则:INS+日期(8位)+用户ID后4位+随机数(6位)
*
* @param userId 用户ID
* @return 订单号
*/
private String generateOrderNo(Long userId) {
String dateStr = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
String userIdSuffix = String.format("%04d", userId % 10000);
String randomNum = String.format("%06d", ThreadLocalRandom.current().nextInt(1000000));
return "INS" + dateStr + userIdSuffix + randomNum;
}
/**
* 计算保费
*
* @param product 产品信息
* @param orderCreateDTO 订单创建参数
* @return 保费金额
*/
private BigDecimal calculatePremium(InsuranceProduct product, OrderCreateDTO orderCreateDTO) {
// 实际业务中会有复杂的保费计算规则
// 这里简化处理,根据年龄和保障期限计算
int age = orderCreateDTO.getInsuredAge();
long days = ChronoUnit.DAYS.between(orderCreateDTO.getStartDate(), orderCreateDTO.getEndDate());
BigDecimal baseRate = product.getBaseRate();
// 年龄系数
BigDecimal ageFactor = getAgeFactor(age);
// 期限系数
BigDecimal termFactor = getTermFactor(days);
return baseRate.multiply(ageFactor).multiply(termFactor).setScale(2, RoundingMode.HALF_UP);
}

核心代码实现:
/**
* 处理支付回调通知
*
* @param paymentCallbackDTO 支付回调参数
* @return 处理结果
*/
@Override
public PaymentCallbackVO handlePaymentCallback(PaymentCallbackDTO paymentCallbackDTO) {
// 1. 验证回调参数
if (!validatePaymentCallback(paymentCallbackDTO)) {
log.warn("Invalid payment callback parameters: {}", paymentCallbackDTO);
return buildPaymentCallbackResult(false, "参数验证失败");
}
String orderNo = paymentCallbackDTO.getOutTradeNo();
String payNo = paymentCallbackDTO.getTradeNo();
BigDecimal payAmount = paymentCallbackDTO.getTotalAmount();
Integer paymentType = mapPaymentType(paymentCallbackDTO.getPaymentType());
LocalDateTime payTime = paymentCallbackDTO.getPayTime();
// 2. 获取订单锁
String lockKey = distributedLockManager.buildOrderLockKey(orderNo);
RLock lock = null;
try {
// 获取锁,最多等待5秒,持有锁10秒
lock = distributedLockManager.lock(lockKey, 5, 10, TimeUnit.SECONDS);
if (lock == null) {
log.warn("Failed to acquire lock for payment callback, orderNo: {}", orderNo);
// 返回处理中,支付网关会重试
return buildPaymentCallbackResult(true, "处理中");
}
// 3. 查询订单
InsuranceOrder order = orderMapper.selectByOrderNo(orderNo);
if (order == null) {
log.warn("Order not found for payment callback, orderNo: {}", orderNo);
return buildPaymentCallbackResult(false, "订单不存在");
}
// 4. 检查订单状态
if (order.getStatus() != OrderStatus.WAIT_PAY.getCode()) {
log.warn("Order status is not wait pay, orderNo: {}, status: {}", orderNo, order.getStatus());
// 订单状态不是待支付,可能已经处理过,返回成功避免重复处理
return buildPaymentCallbackResult(true, "订单已处理");
}
// 5. 检查支付金额
if (order.getPayAmount().compareTo(payAmount) != 0) {
log.warn("Payment amount mismatch, orderNo: {}, expected: {}, actual: {}",
orderNo, order.getPayAmount(), payAmount);
return buildPaymentCallbackResult(false, "支付金额不匹配");
}
// 6. 发送支付事务消息
OrderPayMessage payMessage = new OrderPayMessage();
payMessage.setOrderId(order.getId());
payMessage.setOrderNo(orderNo);
payMessage.setPayNo(payNo);
payMessage.setPaymentType(paymentType);
payMessage.setPayAmount(payAmount);
payMessage.setPayTime(payTime);
payMessage.setSendTime(System.currentTimeMillis());
String transactionId = UUID.randomUUID().toString();
boolean messageSent = messageProducer.sendOrderPayTransactionMessage(payMessage, transactionId);
if (messageSent) {
log.info("Payment callback processed successfully, orderNo: {}", orderNo);
return buildPaymentCallbackResult(true, "处理成功");
} else {
log.error("Failed to send payment transaction message, orderNo: {}", orderNo);
return buildPaymentCallbackResult(false, "处理失败,请稍后重试");
}
} finally {
// 释放锁
distributedLockManager.unlock(lock, lockKey);
}
}
/**
* 验证支付回调参数
*
* @param callbackDTO 回调参数
* @return 是否验证通过
*/
private boolean validatePaymentCallback(PaymentCallbackDTO callbackDTO) {
if (callbackDTO == null
|| !StringUtils.hasText(callbackDTO.getOutTradeNo())
|| !StringUtils.hasText(callbackDTO.getTradeNo())
|| callbackDTO.getTotalAmount() == null
|| !StringUtils.hasText(callbackDTO.getSign())) {
return false;
}
// 验证签名(实际业务中实现具体的签名验证逻辑)
return true;
}
/**
* 构建支付回调结果
*
* @param success 是否成功
* @param message 消息
* @return 回调结果VO
*/
private PaymentCallbackVO buildPaymentCallbackResult(boolean success, String message) {
PaymentCallbackVO result = new PaymentCallbackVO();
result.setSuccess(success);
result.setMessage(message);
// 支付网关要求的特定响应格式,如微信需要返回"success"
result.setResponseText(success ? "success" : "fail");
return result;
}
为了支持 1000+TPS 的订单峰值,需要从多个维度进行性能优化。
/**
* 订单分表策略
* 基于用户ID取模分片
*/
public class OrderTableShardingStrategy implements PreciseShardingAlgorithm<Long> {
private static final int TABLE_COUNT = 16;
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
Long userId = shardingValue.getValue();
int tableIndex = (int) (userId % TABLE_COUNT);
String tableName = "insurance_order_" + tableIndex;
if (availableTargetNames.contains(tableName)) {
return tableName;
}
throw new UnsupportedOperationException("Invalid table name: " + tableName);
}
}
/**
* 产品缓存服务
*
* @author ken
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ProductCacheService {
private static final String PRODUCT_CACHE_KEY_PREFIX = "product:info:";
private static final String PRODUCT_RATE_CACHE_KEY_PREFIX = "product:rate:";
private static final long CACHE_EXPIRE_HOURS = 24;
private final Cache<String, InsuranceProduct> productLocalCache;
private final RedisTemplate<String, Object> redisTemplate;
private final InsuranceProductMapper productMapper;
/**
* 获取产品信息(带缓存)
*
* @param productId 产品ID
* @return 产品信息
*/
public InsuranceProduct getProductById(Long productId) {
if (productId == null) {
return null;
}
String cacheKey = PRODUCT_CACHE_KEY_PREFIX + productId;
// 1. 从本地缓存获取
InsuranceProduct product = productLocalCache.getIfPresent(cacheKey);
if (product != null) {
log.debug("Get product from local cache, productId: {}", productId);
return product;
}
// 2. 从Redis获取
product = getProductFromRedis(cacheKey);
if (product != null) {
log.debug("Get product from redis cache, productId: {}", productId);
// 回写本地缓存
productLocalCache.put(cacheKey, product);
return product;
}
// 3. 从数据库获取
product = productMapper.selectById(productId);
if (product != null) {
log.debug("Get product from database, productId: {}", productId);
// 更新缓存
productLocalCache.put(cacheKey, product);
setProductToRedis(cacheKey, product);
}
return product;
}
// 其他方法实现...
}
/**
* 线程池配置
*
* @author ken
*/
@Configuration
public class ThreadPoolConfig {
/**
* 订单处理线程池
*/
@Bean(name = "orderThreadPool")
public Executor orderThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(16);
// 最大线程数
executor.setMaxPoolSize(32);
// 队列容量
executor.setQueueCapacity(1000);
// 线程名称前缀
executor.setThreadNamePrefix("order-");
// 线程空闲时间
executor.setKeepAliveSeconds(60);
// 拒绝策略:由提交任务的线程执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
return executor;
}
/**
* 支付处理线程池
*/
@Bean(name = "paymentThreadPool")
public Executor paymentThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(8);
executor.setMaxPoolSize(16);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("payment-");
executor.setKeepAliveSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
// 其他线程池配置...
}
/**
* 异步任务服务
*
* @author ken
*/
@Slf4j
@Service
public class AsyncTaskService {
private final NotificationService notificationService;
private final LogService logService;
// 构造函数注入...
/**
* 异步发送订单通知
*/
@Async("notificationThreadPool")
public CompletableFuture<Boolean> sendOrderNotificationAsync(Long userId, String orderNo, String productName) {
try {
log.info("Async sending order notification, userId: {}, orderNo: {}", userId, orderNo);
boolean result = notificationService.sendOrderNotification(userId, orderNo, productName);
return CompletableFuture.completedFuture(result);
} catch (Exception e) {
log.error("Failed to send order notification async, userId: {}, orderNo: {}", userId, orderNo, e);
return CompletableFuture.completedFuture(false);
}
}
/**
* 异步记录操作日志
*/
@Async("logThreadPool")
public CompletableFuture<Void> recordOperateLogAsync(OperateLog log) {
try {
log.info("Async recording operate log, logType: {}, targetId: {}", log.getLogType(), log.getTargetId());
logService.saveOperateLog(log);
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
log.error("Failed to record operate log async, logType: {}, targetId: {}",
log.getLogType(), log.getTargetId(), e);
return CompletableFuture.completedFuture(null);
}
}
}
完善的监控和运维体系是保证系统稳定运行的关键。
使用 Spring Boot Actuator + Prometheus + Grafana 实现监控:
/**
* 自定义监控指标
*
* @author ken
*/
@Component
public class OrderMetricsCollector {
private final MeterRegistry meterRegistry;
// 订单创建计数器
private final Counter orderCreateCounter;
// 订单支付计数器
private final Counter orderPayCounter;
// 订单取消计数器
private final Counter orderCancelCounter;
// 订单创建失败计数器
private final Counter orderCreateFailCounter;
// 订单支付失败计数器
private final Counter orderPayFailCounter;
// 订单创建计时器
private final Timer orderCreateTimer;
// 订单支付计时器
private final Timer orderPayTimer;
// 分布式锁获取成功率 gauge
private final AtomicDouble lockSuccessRateGauge = new AtomicDouble(0.0);
@Autowired
public OrderMetricsCollector(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// 初始化计数器
this.orderCreateCounter = Counter.builder("order.create.total")
.description("Total number of order creations")
.register(meterRegistry);
this.orderPayCounter = Counter.builder("order.pay.total")
.description("Total number of order payments")
.register(meterRegistry);
this.orderCancelCounter = Counter.builder("order.cancel.total")
.description("Total number of order cancellations")
.register(meterRegistry);
this.orderCreateFailCounter = Counter.builder("order.create.fail")
.description("Number of failed order creations")
.register(meterRegistry);
this.orderPayFailCounter = Counter.builder("order.pay.fail")
.description("Number of failed order payments")
.register(meterRegistry);
// 初始化计时器
this.orderCreateTimer = Timer.builder("order.create.time")
.description("Time taken to create an order")
.register(meterRegistry);
this.orderPayTimer = Timer.builder("order.pay.time")
.description("Time taken to process payment")
.register(meterRegistry);
// 注册分布式锁成功率指标
Gauge.builder("distributed.lock.success.rate", lockSuccessRateGauge, AtomicDouble::get)
.description("Success rate of acquiring distributed locks")
.register(meterRegistry);
}
/**
* 记录订单创建
*/
public void recordOrderCreate() {
orderCreateCounter.increment();
}
/**
* 记录订单创建失败
*/
public void recordOrderCreateFail() {
orderCreateFailCounter.increment();
}
/**
* 记录订单创建时间
*/
public <T> T recordOrderCreateTime(Supplier<T> supplier) {
return orderCreateTimer.record(supplier);
}
// 其他指标记录方法...
/**
* 更新分布式锁成功率
*/
public void updateLockSuccessRate(double rate) {
lockSuccessRateGauge.set(rate);
}
}
设置关键指标的告警阈值,及时发现和解决问题:
为验证系统在高并发场景下的表现,进行了全面的性能测试。
指标 | 优化前 | 优化后 | 提升比例 |
|---|---|---|---|
订单创建 TPS | 302 | 1523 | 404% |
平均响应时间 | 820ms | 215ms | 73.8% |
99 分位响应时间 | 3500ms | 654ms | 81.3% |
订单创建成功率 | 98.2% | 99.99% | 1.8% |
数据库压力 | 高(CPU 90%+) | 中(CPU 50%-60%) | 降低 44.4% |
系统稳定性 | 经常超时 | 稳定运行 | - |
通过引入分布式锁和消息队列,结合一系列性能优化措施,保险订单系统成功实现了 1000+TPS 的稳定处理能力,同时保证了数据一致性和系统可靠性。