
在分布式系统的世界里,数据一致性就像一座必须跨越的独木桥。当业务从单体架构演进到微服务架构,一次业务操作可能涉及多个服务的数据库写入,如何保证这些操作要么全部成功,要么全部失败,成为每个 Java 开发者必须攻克的难关。
本文将深入剖析三种主流的分布式事务解决方案 ——Seata、TCC 模式和本地消息表,从底层原理到实战代码,从适用场景到性能对比,帮你彻底搞懂 "该怎么选" 这个核心问题。无论你是刚接触分布式系统的新手,还是正在项目中面临事务难题的资深开发者,读完本文都能找到适合自己业务场景的答案。
在深入技术细节之前,我们必须先理解分布式事务的本质。分布式事务的核心挑战源于著名的 CAP 理论:

CAP 理论告诉我们:在分布式系统中,当网络分区出现时,一致性和可用性无法同时保证。这意味着完美的分布式事务解决方案是不存在的,我们只能根据业务场景做出取舍。
分布式事务的本质就是在不同的取舍策略下,找到数据一致性和系统可用性之间的平衡点。理解这一点,是我们选择合适方案的前提。
传统的数据库事务遵循 ACID 原则:
而在分布式系统中,我们更多地追求 BASE 理论:
从 ACID 到 BASE 的转变,体现了分布式系统在一致性和可用性之间的权衡。接下来介绍的三种方案,本质上都是基于 BASE 理论的不同实现。
为了让各种方案的讲解更加具体,我们将以一个典型的电商订单流程作为贯穿全文的案例:
这个流程涉及四个不同的服务和四个独立的数据库,是一个典型的分布式事务场景。我们将基于这个场景,分别实现三种分布式事务方案,通过对比让你理解它们的异同和适用场景。
本地消息表是分布式事务中最经典、实现最简单的方案之一,由 eBay 在 2008 年提出。它的核心思想是将分布式事务转化为本地事务和消息的可靠传递。
本地消息表的工作原理可以用以下流程图表示:

核心步骤解析:
这种方案的本质是通过本地事务保证消息的可靠生成,通过消息队列和重试机制保证消息的可靠传递和消费,从而实现最终一致性。
下面我们基于订单创建的场景,实现本地消息表方案。
首先需要创建订单表和消息表(订单服务库):
-- 订单表
CREATE TABLE `t_order` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
`user_id` bigint NOT NULL COMMENT '用户ID',
`product_id` bigint NOT NULL COMMENT '商品ID',
`quantity` int NOT NULL COMMENT '购买数量',
`amount` decimal(10,2) NOT NULL COMMENT '订单金额',
`status` tinyint NOT NULL COMMENT '订单状态:0-创建中,1-已完成,2-已取消',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';
-- 本地消息表
CREATE TABLE `t_local_message` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '消息ID',
`business_type` varchar(32) NOT NULL COMMENT '业务类型',
`business_id` bigint NOT NULL COMMENT '业务ID',
`message_content` text NOT NULL COMMENT '消息内容',
`status` tinyint NOT NULL COMMENT '消息状态:0-待发送,1-已发送,2-已完成,3-发送失败',
`send_count` int NOT NULL DEFAULT 0 COMMENT '发送次数',
`next_retry_time` datetime NOT NULL COMMENT '下次重试时间',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `idx_business` (`business_type`,`business_id`),
KEY `idx_status_retry` (`status`,`next_retry_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='本地消息表';
库存表(库存服务库):
-- 库存表
CREATE TABLE `t_inventory` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`product_id` bigint NOT NULL COMMENT '商品ID',
`quantity` int NOT NULL COMMENT '库存数量',
`lock_quantity` int NOT NULL DEFAULT 0 COMMENT '锁定数量',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_product_id` (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存表';
-- 消息消费日志表
CREATE TABLE `t_message_consume_log` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`message_id` bigint NOT NULL COMMENT '消息ID',
`business_type` varchar(32) NOT NULL COMMENT '业务类型',
`business_id` bigint NOT NULL COMMENT '业务ID',
`consume_time` datetime NOT NULL COMMENT '消费时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_message_id` (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息消费日志表';<!-- pom.xml 核心依赖 -->
<dependencies>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
<version>3.2.0</version>
</dependency>
<!-- 数据库 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.3.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.5</version>
</dependency>
<!-- 消息队列 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>3.2.0</version>
</dependency>
<!-- 工具类 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.45</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.1.0-jre</version>
</dependency>
<!-- Swagger3 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
订单实体:
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import lombok.Data;
/**
* 订单表实体类
*
* @author ken
*/
@Data
@TableName("t_order")
public class Order {
/**
* 订单ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 用户ID
*/
@TableField("user_id")
private Long userId;
/**
* 商品ID
*/
@TableField("product_id")
private Long productId;
/**
* 购买数量
*/
@TableField("quantity")
private Integer quantity;
/**
* 订单金额
*/
@TableField("amount")
private BigDecimal amount;
/**
* 订单状态:0-创建中,1-已完成,2-已取消
*/
@TableField("status")
private Integer status;
/**
* 创建时间
*/
@TableField("create_time")
private LocalDateTime createTime;
/**
* 更新时间
*/
@TableField("update_time")
private LocalDateTime updateTime;
}
本地消息实体:
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import java.time.LocalDateTime;
import lombok.Data;
/**
* 本地消息表实体类
*
* @author ken
*/
@Data
@TableName("t_local_message")
public class LocalMessage {
/**
* 消息ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 业务类型
*/
@TableField("business_type")
private String businessType;
/**
* 业务ID
*/
@TableField("business_id")
private Long businessId;
/**
* 消息内容
*/
@TableField("message_content")
private String messageContent;
/**
* 消息状态:0-待发送,1-已发送,2-已完成,3-发送失败
*/
@TableField("status")
private Integer status;
/**
* 发送次数
*/
@TableField("send_count")
private Integer sendCount;
/**
* 下次重试时间
*/
@TableField("next_retry_time")
private LocalDateTime nextRetryTime;
/**
* 创建时间
*/
@TableField("create_time")
private LocalDateTime createTime;
/**
* 更新时间
*/
@TableField("update_time")
private LocalDateTime updateTime;
}
库存实体:
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import java.time.LocalDateTime;
import lombok.Data;
/**
* 库存表实体类
*
* @author ken
*/
@Data
@TableName("t_inventory")
public class Inventory {
/**
* ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 商品ID
*/
@TableField("product_id")
private Long productId;
/**
* 库存数量
*/
@TableField("quantity")
private Integer quantity;
/**
* 锁定数量
*/
@TableField("lock_quantity")
private Integer lockQuantity;
/**
* 创建时间
*/
@TableField("create_time")
private LocalDateTime createTime;
/**
* 更新时间
*/
@TableField("update_time")
private LocalDateTime updateTime;
}
消息消费日志实体:
import lombok.Data;
/**
* 订单创建消息DTO
*
* @author ken
*/
@Data
public class OrderCreatedMessage {
/**
* 消息ID
*/
private Long messageId;
/**
* 订单ID
*/
private Long orderId;
/**
* 用户ID
*/
private Long userId;
/**
* 商品ID
*/
private Long productId;
/**
* 购买数量
*/
private Integer quantity;
}
消息 DTO:
import lombok.Data;
/**
* 订单创建消息DTO
*
* @author ken
*/
@Data
public class OrderCreatedMessage {
/**
* 消息ID
*/
private Long messageId;
/**
* 订单ID
*/
private Long orderId;
/**
* 用户ID
*/
private Long userId;
/**
* 商品ID
*/
private Long productId;
/**
* 购买数量
*/
private Integer quantity;
}
订单 Mapper:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.order.entity.Order;
import org.apache.ibatis.annotations.Mapper;
/**
* 订单Mapper接口
*
* @author ken
*/
@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}
本地消息 Mapper:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.order.entity.LocalMessage;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.time.LocalDateTime;
import java.util.List;
/**
* 本地消息Mapper接口
*
* @author ken
*/
@Mapper
public interface LocalMessageMapper extends BaseMapper<LocalMessage> {
/**
* 查询待发送的消息
*
* @param status 消息状态
* @param currentTime 当前时间
* @param limit 查询数量限制
* @return 待发送的消息列表
*/
List<LocalMessage> queryPendingMessages(
@Param("status") Integer status,
@Param("currentTime") LocalDateTime currentTime,
@Param("limit") Integer limit);
/**
* 更新消息状态
*
* @param id 消息ID
* @param oldStatus 旧状态
* @param newStatus 新状态
* @param sendCount 发送次数
* @param nextRetryTime 下次重试时间
* @return 更新影响的行数
*/
int updateMessageStatus(
@Param("id") Long id,
@Param("oldStatus") Integer oldStatus,
@Param("newStatus") Integer newStatus,
@Param("sendCount") Integer sendCount,
@Param("nextRetryTime") LocalDateTime nextRetryTime);
}
库存 Mapper:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.inventory.entity.Inventory;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
/**
* 库存Mapper接口
*
* @author ken
*/
@Mapper
public interface InventoryMapper extends BaseMapper<Inventory> {
/**
* 扣减库存
*
* @param productId 商品ID
* @param quantity 扣减数量
* @return 影响的行数
*/
int deductInventory(@Param("productId") Long productId, @Param("quantity") Integer quantity);
}
消息消费日志 Mapper:
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.inventory.entity.MessageConsumeLog;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
/**
* 消息消费日志Mapper接口
*
* @author ken
*/
@Mapper
public interface MessageConsumeLogMapper extends BaseMapper<MessageConsumeLog> {
/**
* 查询消息是否已消费
*
* @param messageId 消息ID
* @return 1-已消费,0-未消费
*/
int checkMessageConsumed(@Param("messageId") Long messageId);
}
订单服务(消息发起方):
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.order.dto.OrderCreatedMessage;
import com.example.order.entity.LocalMessage;
import com.example.order.entity.Order;
import com.example.order.mapper.LocalMessageMapper;
import com.example.order.mapper.OrderMapper;
import com.alibaba.fastjson2.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.UUID;
/**
* 订单服务
*
* @author ken
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderService {
private final OrderMapper orderMapper;
private final LocalMessageMapper localMessageMapper;
private final RabbitTemplate rabbitTemplate;
/**
* 业务类型:订单创建
*/
public static final String BUSINESS_TYPE_ORDER_CREATE = "ORDER_CREATE";
/**
* 消息状态:待发送
*/
public static final int MESSAGE_STATUS_PENDING = 0;
/**
* 消息状态:已发送
*/
public static final int MESSAGE_STATUS_SENT = 1;
/**
* 创建订单
*
* @param userId 用户ID
* @param productId 商品ID
* @param quantity 购买数量
* @return 订单ID
*/
@Transactional(rollbackFor = Exception.class)
public Long createOrder(Long userId, Long productId, Integer quantity) {
log.info("开始创建订单,userId: {}, productId: {}, quantity: {}", userId, productId, quantity);
// 参数校验
if (ObjectUtils.isEmpty(userId)) {
throw new IllegalArgumentException("用户ID不能为空");
}
if (ObjectUtils.isEmpty(productId)) {
throw new IllegalArgumentException("商品ID不能为空");
}
if (quantity == null || quantity <= 0) {
throw new IllegalArgumentException("购买数量必须大于0");
}
// 1. 创建订单(实际业务中需要查询商品价格等信息)
Order order = new Order();
order.setUserId(userId);
order.setProductId(productId);
order.setQuantity(quantity);
// 假设商品单价为100元
order.setAmount(new BigDecimal(100).multiply(new BigDecimal(quantity)));
order.setStatus(0); // 0-创建中
order.setCreateTime(LocalDateTime.now());
order.setUpdateTime(LocalDateTime.now());
orderMapper.insert(order);
log.info("订单创建成功,orderId: {}", order.getId());
// 2. 创建本地消息(与订单创建在同一个事务中)
OrderCreatedMessage message = new OrderCreatedMessage();
message.setOrderId(order.getId());
message.setUserId(userId);
message.setProductId(productId);
message.setQuantity(quantity);
LocalMessage localMessage = new LocalMessage();
localMessage.setBusinessType(BUSINESS_TYPE_ORDER_CREATE);
localMessage.setBusinessId(order.getId());
localMessage.setMessageContent(JSON.toJSONString(message));
localMessage.setStatus(MESSAGE_STATUS_PENDING);
localMessage.setSendCount(0);
// 首次发送时间设置为当前时间
localMessage.setNextRetryTime(LocalDateTime.now());
localMessage.setCreateTime(LocalDateTime.now());
localMessage.setUpdateTime(LocalDateTime.now());
localMessageMapper.insert(localMessage);
log.info("本地消息创建成功,messageId: {}", localMessage.getId());
// 3. 设置消息ID并尝试立即发送(即使发送失败也没关系,定时任务会重试)
message.setMessageId(localMessage.getId());
localMessage.setMessageContent(JSON.toJSONString(message));
localMessageMapper.updateById(localMessage);
// 尝试发送消息
try {
sendMessage(localMessage);
} catch (Exception e) {
log.error("消息发送失败,messageId: {}", localMessage.getId(), e);
// 发送失败不需要回滚订单,定时任务会重试
}
return order.getId();
}
/**
* 发送消息到MQ
*
* @param localMessage 本地消息
*/
public void sendMessage(LocalMessage localMessage) {
log.info("开始发送消息,messageId: {}", localMessage.getId());
// 1. 更新消息状态为发送中
int updateCount = localMessageMapper.updateMessageStatus(
localMessage.getId(),
MESSAGE_STATUS_PENDING,
MESSAGE_STATUS_SENT,
localMessage.getSendCount() + 1,
calculateNextRetryTime(localMessage.getSendCount() + 1)
);
if (updateCount <= 0) {
log.warn("消息状态已被修改,无需重复发送,messageId: {}", localMessage.getId());
return;
}
// 2. 发送消息到MQ
try {
rabbitTemplate.convertAndSend(
"order.event.exchange",
"order.created",
localMessage.getMessageContent()
);
log.info("消息发送成功,messageId: {}", localMessage.getId());
} catch (Exception e) {
log.error("消息发送失败,messageId: {}", localMessage.getId(), e);
// 发送失败,将状态改回待发送
localMessageMapper.updateMessageStatus(
localMessage.getId(),
MESSAGE_STATUS_SENT,
MESSAGE_STATUS_PENDING,
localMessage.getSendCount() + 1,
calculateNextRetryTime(localMessage.getSendCount() + 1)
);
}
}
/**
* 计算下次重试时间(指数退避策略)
*
* @param sendCount 已发送次数
* @return 下次重试时间
*/
private LocalDateTime calculateNextRetryTime(int sendCount) {
// 重试间隔:1分钟、2分钟、4分钟、8分钟、16分钟,最大30分钟
int delayMinutes = Math.min(30, (int) Math.pow(2, sendCount - 1));
return LocalDateTime.now().plusMinutes(delayMinutes);
}
}定时任务(消息重发):
import com.example.order.entity.LocalMessage;
import com.example.order.mapper.LocalMessageMapper;
import com.example.order.service.OrderService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.time.LocalDateTime;
import java.util.List;
/**
* 消息发送定时任务
*
* @author ken
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageResendTask {
private final LocalMessageMapper localMessageMapper;
private final OrderService orderService;
/**
* 每30秒执行一次,重试发送失败的消息
*/
@Scheduled(fixedRate = 30000)
public void resendFailedMessages() {
log.info("开始执行消息重发任务");
// 查询待发送且已到重试时间的消息,每次最多处理100条
List<LocalMessage> messages = localMessageMapper.queryPendingMessages(
OrderService.MESSAGE_STATUS_PENDING,
LocalDateTime.now(),
100
);
if (CollectionUtils.isEmpty(messages)) {
log.info("没有需要重发的消息");
return;
}
log.info("发现{}条需要重发的消息,开始处理", messages.size());
// 逐条发送消息
for (LocalMessage message : messages) {
try {
orderService.sendMessage(message);
} catch (Exception e) {
log.error("消息重发失败,messageId: {}", message.getId(), e);
// 单个消息处理失败不影响其他消息
}
}
log.info("消息重发任务执行完毕");
}
}库存服务(消息接收方):
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.inventory.entity.Inventory;
import com.example.inventory.entity.MessageConsumeLog;
import com.example.inventory.mapper.InventoryMapper;
import com.example.inventory.mapper.MessageConsumeLogMapper;
import com.example.order.dto.OrderCreatedMessage;
import com.alibaba.fastjson2.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.time.LocalDateTime;
/**
* 库存服务
*
* @author ken
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class InventoryService {
private final InventoryMapper inventoryMapper;
private final MessageConsumeLogMapper messageConsumeLogMapper;
/**
* 处理订单创建消息,扣减库存
*
* @param messageContent 消息内容
*/
@RabbitListener(queues = "order.created.inventory.queue")
public void handleOrderCreatedMessage(String messageContent) {
log.info("收到订单创建消息,内容:{}", messageContent);
// 解析消息
OrderCreatedMessage message;
try {
message = JSON.parseObject(messageContent, OrderCreatedMessage.class);
} catch (Exception e) {
log.error("消息格式解析错误", e);
// 消息格式错误,直接确认,避免重复消费
return;
}
// 参数校验
if (ObjectUtils.isEmpty(message.getMessageId()) || ObjectUtils.isEmpty(message.getProductId()) ||
message.getQuantity() == null || message.getQuantity() <= 0) {
log.error("消息参数不完整,message: {}", message);
return;
}
// 处理消息
try {
processDeductInventory(message);
} catch (Exception e) {
log.error("处理订单创建消息失败", e);
// 抛出异常,让RabbitMQ重新投递消息
throw new RuntimeException("处理消息失败,需要重试", e);
}
}
/**
* 处理库存扣减
*
* @param message 订单创建消息
*/
@Transactional(rollbackFor = Exception.class)
public void processDeductInventory(OrderCreatedMessage message) {
log.info("开始处理库存扣减,messageId: {}, productId: {}, quantity: {}",
message.getMessageId(), message.getProductId(), message.getQuantity());
// 1. 检查消息是否已消费(避免重复消费)
int consumedCount = messageConsumeLogMapper.checkMessageConsumed(message.getMessageId());
if (consumedCount > 0) {
log.info("消息已消费,无需重复处理,messageId: {}", message.getMessageId());
return;
}
// 2. 扣减库存
int updateCount = inventoryMapper.deductInventory(message.getProductId(), message.getQuantity());
if (updateCount <= 0) {
log.error("库存扣减失败,可能库存不足,productId: {}, quantity: {}",
message.getProductId(), message.getQuantity());
throw new RuntimeException("库存不足,扣减失败");
}
log.info("库存扣减成功,productId: {}, quantity: {}", message.getProductId(), message.getQuantity());
// 3. 记录消息消费日志
MessageConsumeLog log = new MessageConsumeLog();
log.setMessageId(message.getMessageId());
log.setBusinessType("ORDER_CREATE");
log.setBusinessId(message.getOrderId());
log.setConsumeTime(LocalDateTime.now());
messageConsumeLogMapper.insert(log);
log.info("消息处理完成,messageId: {}", message.getMessageId());
}
}
订单控制器:
import com.example.order.service.OrderService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 订单控制器
*
* @author ken
*/
@Slf4j
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
@Tag(name = "订单管理", description = "订单相关接口")
public class OrderController {
private final OrderService orderService;
/**
* 创建订单
*
* @param userId 用户ID
* @param productId 商品ID
* @param quantity 购买数量
* @return 订单ID
*/
@PostMapping
@Operation(summary = "创建订单", description = "创建新订单并触发库存扣减等后续操作")
public Long createOrder(
@Parameter(description = "用户ID", required = true) @RequestParam Long userId,
@Parameter(description = "商品ID", required = true) @RequestParam Long productId,
@Parameter(description = "购买数量", required = true) @RequestParam Integer quantity) {
return orderService.createOrder(userId, productId, quantity);
}
}
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ配置
*
* @author ken
*/
@Configuration
public class RabbitMQConfig {
/**
* 订单事件交换机
*/
public static final String ORDER_EVENT_EXCHANGE = "order.event.exchange";
/**
* 订单创建路由键
*/
public static final String ORDER_CREATED_ROUTING_KEY = "order.created";
/**
* 库存服务处理订单创建的队列
*/
public static final String ORDER_CREATED_INVENTORY_QUEUE = "order.created.inventory.queue";
/**
* 创建订单事件交换机
*/
@Bean
public DirectExchange orderEventExchange() {
// 持久化、非自动删除
return new DirectExchange(ORDER_EVENT_EXCHANGE, true, false);
}
/**
* 创建库存服务处理订单创建的队列
*/
@Bean
public Queue orderCreatedInventoryQueue() {
// 持久化、非排他、非自动删除
return new Queue(ORDER_CREATED_INVENTORY_QUEUE, true, false, false);
}
/**
* 绑定队列到交换机
*/
@Bean
public Binding bindOrderCreatedInventoryQueue(DirectExchange orderEventExchange, Queue orderCreatedInventoryQueue) {
return BindingBuilder.bind(orderCreatedInventoryQueue)
.to(orderEventExchange)
.with(ORDER_CREATED_ROUTING_KEY);
}
}
优点:
缺点:
TCC(Try-Confirm-Cancel)是一种基于业务层面的分布式事务解决方案,它将分布式事务拆分为三个阶段,通过业务逻辑的补偿机制保证数据一致性。
TCC 模式的工作原理可以用以下流程图表示:

TCC 的三个阶段详解:
TCC 模式的核心思想是 **"先尝试,再确认,失败则取消"**,通过业务逻辑的拆分和补偿,实现分布式事务的最终一致性。
下面我们基于同样的订单创建场景,实现 TCC 模式的分布式事务。
订单表(订单服务库):
CREATE TABLE `t_order` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
`user_id` bigint NOT NULL COMMENT '用户ID',
`product_id` bigint NOT NULL COMMENT '商品ID',
`quantity` int NOT NULL COMMENT '购买数量',
`amount` decimal(10,2) NOT NULL COMMENT '订单金额',
`status` tinyint NOT NULL COMMENT '订单状态:0-创建中,1-已确认,2-已取消',
`tcc_transaction_id` varchar(64) DEFAULT NULL COMMENT 'TCC事务ID',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `idx_user_id` (`user_id`),
KEY `idx_tcc_transaction_id` (`tcc_transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';
库存表(库存服务库):
CREATE TABLE `t_inventory` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`product_id` bigint NOT NULL COMMENT '商品ID',
`quantity` int NOT NULL COMMENT '库存数量',
`lock_quantity` int NOT NULL DEFAULT 0 COMMENT '锁定数量(TCC Try阶段预留)',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_product_id` (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存表';
用户余额表(用户服务库):
CREATE TABLE `t_user_account` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`user_id` bigint NOT NULL COMMENT '用户ID',
`balance` decimal(10,2) NOT NULL COMMENT '账户余额',
`freeze_balance` decimal(10,2) NOT NULL DEFAULT 0.00 COMMENT '冻结金额(TCC Try阶段预留)',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户账户表';
支付日志表(支付服务库):
CREATE TABLE `t_payment_log` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`order_id` bigint NOT NULL COMMENT '订单ID',
`user_id` bigint NOT NULL COMMENT '用户ID',
`amount` decimal(10,2) NOT NULL COMMENT '支付金额',
`status` tinyint NOT NULL COMMENT '支付状态:0-待支付,1-已支付,2-已取消',
`tcc_transaction_id` varchar(64) DEFAULT NULL COMMENT 'TCC事务ID',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `idx_order_id` (`order_id`),
KEY `idx_tcc_transaction_id` (`tcc_transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='支付日志表';TCC 事务日志表(协调者使用):
CREATE TABLE `t_tcc_transaction` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`transaction_id` varchar(64) NOT NULL COMMENT '事务ID',
`status` tinyint NOT NULL COMMENT '事务状态:0-尝试中,1-确认中,2-已确认,3-取消中,4-已取消',
`business_type` varchar(32) NOT NULL COMMENT '业务类型',
`business_id` bigint NOT NULL COMMENT '业务ID',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_transaction_id` (`transaction_id`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='TCC事务表';
CREATE TABLE `t_tcc_participant` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`transaction_id` varchar(64) NOT NULL COMMENT '事务ID',
`participant_id` varchar(64) NOT NULL COMMENT '参与者ID',
`confirm_method` varchar(255) NOT NULL COMMENT '确认方法',
`cancel_method` varchar(255) NOT NULL COMMENT '取消方法',
`status` tinyint NOT NULL COMMENT '状态:0-待处理,1-已确认,2-已取消',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `idx_transaction_id` (`transaction_id`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='TCC参与者表';
在本地消息表方案的基础上,增加 TCC 协调器相关依赖:
<!-- TCC协调器 -->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>hmily-core</artifactId>
<version>2.1.2</version>
</dependency>
<dependency>
<groupId>org.dromara</groupId>
<artifactId>hmily-spring-boot-starter</artifactId>
<version>2.1.2</version>
</dependency>
<!-- 序列化方式 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.45</version>
</dependency>
这里只展示与 TCC 相关的新增实体,其他实体与本地消息表方案类似。
TCC 事务实体:
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import java.time.LocalDateTime;
import lombok.Data;
/**
* TCC事务表实体类
*
* @author ken
*/
@Data
@TableName("t_tcc_transaction")
public class TccTransaction {
/**
* ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 事务ID
*/
@TableField("transaction_id")
private String transactionId;
/**
* 事务状态:0-尝试中,1-确认中,2-已确认,3-取消中,4-已取消
*/
@TableField("status")
private Integer status;
/**
* 业务类型
*/
@TableField("business_type")
private String businessType;
/**
* 业务ID
*/
@TableField("business_id")
private Long businessId;
/**
* 创建时间
*/
@TableField("create_time")
private LocalDateTime createTime;
/**
* 更新时间
*/
@TableField("update_time")
private LocalDateTime updateTime;
}TCC 参与者实体:
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import java.time.LocalDateTime;
import lombok.Data;
/**
* TCC参与者表实体类
*
* @author ken
*/
@Data
@TableName("t_tcc_participant")
public class TccParticipant {
/**
* ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 事务ID
*/
@TableField("transaction_id")
private String transactionId;
/**
* 参与者ID
*/
@TableField("participant_id")
private String participantId;
/**
* 确认方法
*/
@TableField("confirm_method")
private String confirmMethod;
/**
* 取消方法
*/
@TableField("cancel_method")
private String cancelMethod;
/**
* 状态:0-待处理,1-已确认,2-已取消
*/
@TableField("status")
private Integer status;
/**
* 创建时间
*/
@TableField("create_time")
private LocalDateTime createTime;
/**
* 更新时间
*/
@TableField("update_time")
private LocalDateTime updateTime;
}用户账户实体:
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import lombok.Data;
/**
* 用户账户表实体类
*
* @author ken
*/
@Data
@TableName("t_user_account")
public class UserAccount {
/**
* ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 用户ID
*/
@TableField("user_id")
private Long userId;
/**
* 账户余额
*/
@TableField("balance")
private BigDecimal balance;
/**
* 冻结金额(TCC Try阶段预留)
*/
@TableField("freeze_balance")
private BigDecimal freezeBalance;
/**
* 创建时间
*/
@TableField("create_time")
private LocalDateTime createTime;
/**
* 更新时间
*/
@TableField("update_time")
private LocalDateTime updateTime;
}支付日志实体:
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import lombok.Data;
/**
* 支付日志表实体类
*
* @author ken
*/
@Data
@TableName("t_payment_log")
public class PaymentLog {
/**
* ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 订单ID
*/
@TableField("order_id")
private Long orderId;
/**
* 用户ID
*/
@TableField("user_id")
private Long userId;
/**
* 支付金额
*/
@TableField("amount")
private BigDecimal amount;
/**
* 支付状态:0-待支付,1-已支付,2-已取消
*/
@TableField("status")
private Integer status;
/**
* TCC事务ID
*/
@TableField("tcc_transaction_id")
private String tccTransactionId;
/**
* 创建时间
*/
@TableField("create_time")
private LocalDateTime createTime;
/**
* 更新时间
*/
@TableField("update_time")
private LocalDateTime updateTime;
}
TCC 事务协调器接口:
import java.util.List;
import java.util.function.Supplier;
/**
* TCC事务协调器
*
* @author ken
*/
public interface TccCoordinator {
/**
* 执行TCC分布式事务
*
* @param businessType 业务类型
* @param businessId 业务ID
* @param action 业务操作,包含各个参与者的Try调用
* @return 业务操作结果
*/
<T> T execute(String businessType, Long businessId, Supplier<T> action);
/**
* 注册TCC参与者
*
* @param transactionId 事务ID
* @param participantId 参与者ID
* @param confirmMethod 确认方法
* @param cancelMethod 取消方法
*/
void registerParticipant(String transactionId, String participantId,
String confirmMethod, String cancelMethod);
/**
* 确认事务
*
* @param transactionId 事务ID
*/
void confirm(String transactionId);
/**
* 取消事务
*
* @param transactionId 事务ID
*/
void cancel(String transactionId);
/**
* 处理悬挂的事务(定时任务调用)
*/
void handleSuspendedTransactions();
}库存服务 TCC 接口:
import org.dromara.hmily.annotation.Hmily;
/**
* 库存服务TCC接口
*
* @author ken
*/
public interface InventoryTccService {
/**
* Try阶段:锁定库存
*
* @param transactionId 事务ID
* @param productId 商品ID
* @param quantity 数量
*/
@Hmily(confirmMethod = "confirmDeductInventory", cancelMethod = "cancelDeductInventory")
void tryDeductInventory(String transactionId, Long productId, Integer quantity);
/**
* Confirm阶段:确认扣减库存
*
* @param transactionId 事务ID
* @param productId 商品ID
* @param quantity 数量
*/
void confirmDeductInventory(String transactionId, Long productId, Integer quantity);
/**
* Cancel阶段:取消扣减库存
*
* @param transactionId 事务ID
* @param productId 商品ID
* @param quantity 数量
*/
void cancelDeductInventory(String transactionId, Long productId, Integer quantity);
}
用户账户 TCC 接口:
import org.dromara.hmily.annotation.Hmily;
import java.math.BigDecimal;
/**
* 用户账户TCC接口
*
* @author ken
*/
public interface UserAccountTccService {
/**
* Try阶段:冻结用户余额
*
* @param transactionId 事务ID
* @param userId 用户ID
* @param amount 金额
*/
@Hmily(confirmMethod = "confirmDeductBalance", cancelMethod = "cancelDeductBalance")
void tryDeductBalance(String transactionId, Long userId, BigDecimal amount);
/**
* Confirm阶段:确认扣减用户余额
*
* @param transactionId 事务ID
* @param userId 用户ID
* @param amount 金额
*/
void confirmDeductBalance(String transactionId, Long userId, BigDecimal amount);
/**
* Cancel阶段:取消扣减用户余额
*
* @param transactionId 事务ID
* @param userId 用户ID
* @param amount 金额
*/
void cancelDeductBalance(String transactionId, Long userId, BigDecimal amount);
}
支付服务 TCC 接口:
import org.dromara.hmily.annotation.Hmily;
import java.math.BigDecimal;
/**
* 支付服务TCC接口
*
* @author ken
*/
public interface PaymentTccService {
/**
* Try阶段:创建支付记录(待支付状态)
*
* @param transactionId 事务ID
* @param orderId 订单ID
* @param userId 用户ID
* @param amount 金额
*/
@Hmily(confirmMethod = "confirmPayment", cancelMethod = "cancelPayment")
void tryCreatePayment(String transactionId, Long orderId, Long userId, BigDecimal amount);
/**
* Confirm阶段:确认支付
*
* @param transactionId 事务ID
* @param orderId 订单ID
* @param userId 用户ID
* @param amount 金额
*/
void confirmPayment(String transactionId, Long orderId, Long userId, BigDecimal amount);
/**
* Cancel阶段:取消支付
*
* @param transactionId 事务ID
* @param orderId 订单ID
* @param userId 用户ID
* @param amount 金额
*/
void cancelPayment(String transactionId, Long orderId, Long userId, BigDecimal amount);
}
库存服务 TCC 实现:
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.inventory.entity.Inventory;
import com.example.inventory.mapper.InventoryMapper;
import com.example.inventory.service.InventoryTccService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
/**
* 库存服务TCC实现
*
* @author ken
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class InventoryTccServiceImpl implements InventoryTccService {
private final InventoryMapper inventoryMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public void tryDeductInventory(String transactionId, Long productId, Integer quantity) {
log.info("库存Try阶段,transactionId: {}, productId: {}, quantity: {}",
transactionId, productId, quantity);
// 参数校验
if (ObjectUtils.isEmpty(transactionId)) {
throw new IllegalArgumentException("事务ID不能为空");
}
if (ObjectUtils.isEmpty(productId)) {
throw new IllegalArgumentException("商品ID不能为空");
}
if (quantity == null || quantity <= 0) {
throw new IllegalArgumentException("数量必须大于0");
}
// 查询库存
Inventory inventory = inventoryMapper.selectOne(Wrappers.<Inventory>lambdaQuery()
.eq(Inventory::getProductId, productId));
if (ObjectUtils.isEmpty(inventory)) {
log.error("商品不存在,productId: {}", productId);
throw new RuntimeException("商品不存在");
}
// 检查库存是否充足
if (inventory.getQuantity() < quantity) {
log.error("库存不足,productId: {}, 可用库存: {}, 需要: {}",
productId, inventory.getQuantity(), quantity);
throw new RuntimeException("库存不足");
}
// 锁定库存(Try阶段核心操作)
int updateCount = inventoryMapper.update(Wrappers.<Inventory>lambdaUpdate()
.set(Inventory::setLockQuantity, inventory.getLockQuantity() + quantity)
.eq(Inventory::getId, inventory.getId())
.eq(Inventory::getQuantity, inventory.getQuantity()));
if (updateCount <= 0) {
log.error("库存锁定失败,可能并发修改,productId: {}", productId);
throw new RuntimeException("库存锁定失败");
}
log.info("库存Try阶段完成,transactionId: {}", transactionId);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void confirmDeductInventory(String transactionId, Long productId, Integer quantity) {
log.info("库存Confirm阶段,transactionId: {}, productId: {}, quantity: {}",
transactionId, productId, quantity);
// 查询库存
Inventory inventory = inventoryMapper.selectOne(Wrappers.<Inventory>lambdaQuery()
.eq(Inventory::getProductId, productId));
if (ObjectUtils.isEmpty(inventory)) {
log.error("商品不存在,productId: {}", productId);
return; // 幂等处理,已经处理过的情况
}
// 确认扣减库存:实际扣减库存,释放锁定的库存
int updateCount = inventoryMapper.update(Wrappers.<Inventory>lambdaUpdate()
.set(Inventory::setQuantity, inventory.getQuantity() - quantity)
.set(Inventory::setLockQuantity, inventory.getLockQuantity() - quantity)
.eq(Inventory::getId, inventory.getId())
.ge(Inventory::getLockQuantity, quantity));
if (updateCount <= 0) {
log.warn("库存Confirm阶段已处理,无需重复处理,transactionId: {}", transactionId);
return; // 幂等处理
}
log.info("库存Confirm阶段完成,transactionId: {}", transactionId);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void cancelDeductInventory(String transactionId, Long productId, Integer quantity) {
log.info("库存Cancel阶段,transactionId: {}, productId: {}, quantity: {}",
transactionId, productId, quantity);
// 查询库存
Inventory inventory = inventoryMapper.selectOne(Wrappers.<Inventory>lambdaQuery()
.eq(Inventory::getProductId, productId));
if (ObjectUtils.isEmpty(inventory)) {
log.error("商品不存在,productId: {}", productId);
return; // 幂等处理
}
// 取消扣减库存:释放锁定的库存
int updateCount = inventoryMapper.update(Wrappers.<Inventory>lambdaUpdate()
.set(Inventory::setLockQuantity, inventory.getLockQuantity() - quantity)
.eq(Inventory::getId, inventory.getId())
.ge(Inventory::getLockQuantity, quantity));
if (updateCount <= 0) {
log.warn("库存Cancel阶段已处理,无需重复处理,transactionId: {}", transactionId);
return; // 幂等处理
}
log.info("库存Cancel阶段完成,transactionId: {}", transactionId);
}
}用户账户服务 TCC 实现:
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.user.entity.UserAccount;
import com.example.user.mapper.UserAccountMapper;
import com.example.user.service.UserAccountTccService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.math.BigDecimal;
/**
* 用户账户服务TCC实现
*
* @author ken
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class UserAccountTccServiceImpl implements UserAccountTccService {
private final UserAccountMapper userAccountMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public void tryDeductBalance(String transactionId, Long userId, BigDecimal amount) {
log.info("用户账户Try阶段,transactionId: {}, userId: {}, amount: {}",
transactionId, userId, amount);
// 参数校验
if (ObjectUtils.isEmpty(transactionId)) {
throw new IllegalArgumentException("事务ID不能为空");
}
if (ObjectUtils.isEmpty(userId)) {
throw new IllegalArgumentException("用户ID不能为空");
}
if (ObjectUtils.isEmpty(amount) || amount.compareTo(BigDecimal.ZERO) <= 0) {
throw new IllegalArgumentException("金额必须大于0");
}
// 查询用户账户
UserAccount account = userAccountMapper.selectOne(Wrappers.<UserAccount>lambdaQuery()
.eq(UserAccount::getUserId, userId));
if (ObjectUtils.isEmpty(account)) {
log.error("用户账户不存在,userId: {}", userId);
throw new RuntimeException("用户账户不存在");
}
// 检查余额是否充足
if (account.getBalance().compareTo(amount) < 0) {
log.error("用户余额不足,userId: {}, 可用余额: {}, 需要: {}",
userId, account.getBalance(), amount);
throw new RuntimeException("用户余额不足");
}
// 冻结金额(Try阶段核心操作)
int updateCount = userAccountMapper.update(Wrappers.<UserAccount>lambdaUpdate()
.set(UserAccount::setFreezeBalance, account.getFreezeBalance().add(amount))
.eq(UserAccount::getId, account.getId())
.eq(UserAccount::getBalance, account.getBalance()));
if (updateCount <= 0) {
log.error("余额冻结失败,可能并发修改,userId: {}", userId);
throw new RuntimeException("余额冻结失败");
}
log.info("用户账户Try阶段完成,transactionId: {}", transactionId);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void confirmDeductBalance(String transactionId, Long userId, BigDecimal amount) {
log.info("用户账户Confirm阶段,transactionId: {}, userId: {}, amount: {}",
transactionId, userId, amount);
// 查询用户账户
UserAccount account = userAccountMapper.selectOne(Wrappers.<UserAccount>lambdaQuery()
.eq(UserAccount::getUserId, userId));
if (ObjectUtils.isEmpty(account)) {
log.error("用户账户不存在,userId: {}", userId);
return; // 幂等处理
}
// 确认扣减余额:实际扣减余额,释放冻结的金额
int updateCount = userAccountMapper.update(Wrappers.<UserAccount>lambdaUpdate()
.set(UserAccount::setBalance, account.getBalance().subtract(amount))
.set(UserAccount::setFreezeBalance, account.getFreezeBalance().subtract(amount))
.eq(UserAccount::getId, account.getId())
.ge(UserAccount::getFreezeBalance, amount));
if (updateCount <= 0) {
log.warn("用户账户Confirm阶段已处理,无需重复处理,transactionId: {}", transactionId);
return; // 幂等处理
}
log.info("用户账户Confirm阶段完成,transactionId: {}", transactionId);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void cancelDeductBalance(String transactionId, Long userId, BigDecimal amount) {
log.info("用户账户Cancel阶段,transactionId: {}, userId: {}, amount: {}",
transactionId, userId, amount);
// 查询用户账户
UserAccount account = userAccountMapper.selectOne(Wrappers.<UserAccount>lambdaQuery()
.eq(UserAccount::getUserId, userId));
if (ObjectUtils.isEmpty(account)) {
log.error("用户账户不存在,userId: {}", userId);
return; // 幂等处理
}
// 取消扣减余额:释放冻结的金额
int updateCount = userAccountMapper.update(Wrappers.<UserAccount>lambdaUpdate()
.set(UserAccount::setFreezeBalance, account.getFreezeBalance().subtract(amount))
.eq(UserAccount::getId, account.getId())
.ge(UserAccount::getFreezeBalance, amount));
if (updateCount <= 0) {
log.warn("用户账户Cancel阶段已处理,无需重复处理,transactionId: {}", transactionId);
return; // 幂等处理
}
log.info("用户账户Cancel阶段完成,transactionId: {}", transactionId);
}
}支付服务 TCC 实现:
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.payment.entity.PaymentLog;
import com.example.payment.mapper.PaymentLogMapper;
import com.example.payment.service.PaymentTccService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 支付服务TCC实现
*
* @author ken
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class PaymentTccServiceImpl implements PaymentTccService {
private final PaymentLogMapper paymentLogMapper;
@Override
@Transactional(rollbackFor = Exception.class)
public void tryCreatePayment(String transactionId, Long orderId, Long userId, BigDecimal amount) {
log.info("支付Try阶段,transactionId: {}, orderId: {}, userId: {}, amount: {}",
transactionId, orderId, userId, amount);
// 参数校验
if (ObjectUtils.isEmpty(transactionId)) {
throw new IllegalArgumentException("事务ID不能为空");
}
if (ObjectUtils.isEmpty(orderId)) {
throw new IllegalArgumentException("订单ID不能为空");
}
if (ObjectUtils.isEmpty(userId)) {
throw new IllegalArgumentException("用户ID不能为空");
}
if (ObjectUtils.isEmpty(amount) || amount.compareTo(BigDecimal.ZERO) <= 0) {
throw new IllegalArgumentException("金额必须大于0");
}
// 检查是否已存在相同事务的支付记录
PaymentLog existingLog = paymentLogMapper.selectOne(Wrappers.<PaymentLog>lambdaQuery()
.eq(PaymentLog::getTccTransactionId, transactionId));
if (!ObjectUtils.isEmpty(existingLog)) {
log.warn("支付Try阶段已处理,无需重复处理,transactionId: {}", transactionId);
return; // 幂等处理
}
// 创建支付记录(待支付状态)
PaymentLog paymentLog = new PaymentLog();
paymentLog.setOrderId(orderId);
paymentLog.setUserId(userId);
paymentLog.setAmount(amount);
paymentLog.setStatus(0); // 0-待支付
paymentLog.setTccTransactionId(transactionId);
paymentLog.setCreateTime(LocalDateTime.now());
paymentLog.setUpdateTime(LocalDateTime.now());
paymentLogMapper.insert(paymentLog);
log.info("支付Try阶段完成,transactionId: {}, paymentLogId: {}", transactionId, paymentLog.getId());
}
@Override
@Transactional(rollbackFor = Exception.class)
public void confirmPayment(String transactionId, Long orderId, Long userId, BigDecimal amount) {
log.info("支付Confirm阶段,transactionId: {}, orderId: {}, userId: {}, amount: {}",
transactionId, orderId, userId, amount);
// 查询支付记录
PaymentLog paymentLog = paymentLogMapper.selectOne(Wrappers.<PaymentLog>lambdaQuery()
.eq(PaymentLog::getTccTransactionId, transactionId));
if (ObjectUtils.isEmpty(paymentLog)) {
log.error("支付记录不存在,transactionId: {}", transactionId);
return; // 幂等处理
}
// 如果已经是已支付状态,直接返回
if (paymentLog.getStatus() == 1) {
log.warn("支付Confirm阶段已处理,无需重复处理,transactionId: {}", transactionId);
return; // 幂等处理
}
// 更新支付状态为已支付
int updateCount = paymentLogMapper.update(Wrappers.<PaymentLog>lambdaUpdate()
.set(PaymentLog::setStatus, 1) // 1-已支付
.set(PaymentLog::setUpdateTime, LocalDateTime.now())
.eq(PaymentLog::getId, paymentLog.getId())
.eq(PaymentLog::getStatus, 0)); // 只更新待支付状态的记录
if (updateCount <= 0) {
log.warn("支付Confirm阶段更新失败,可能已处理,transactionId: {}", transactionId);
return;
}
log.info("支付Confirm阶段完成,transactionId: {}", transactionId);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void cancelPayment(String transactionId, Long orderId, Long userId, BigDecimal amount) {
log.info("支付Cancel阶段,transactionId: {}, orderId: {}, userId: {}, amount: {}",
transactionId, orderId, userId, amount);
// 查询支付记录
PaymentLog paymentLog = paymentLogMapper.selectOne(Wrappers.<PaymentLog>lambdaQuery()
.eq(PaymentLog::getTccTransactionId, transactionId));
if (ObjectUtils.isEmpty(paymentLog)) {
log.error("支付记录不存在,transactionId: {}", transactionId);
return; // 幂等处理
}
// 如果已经是已取消状态,直接返回
if (paymentLog.getStatus() == 2) {
log.warn("支付Cancel阶段已处理,无需重复处理,transactionId: {}", transactionId);
return; // 幂等处理
}
// 更新支付状态为已取消
int updateCount = paymentLogMapper.update(Wrappers.<PaymentLog>lambdaUpdate()
.set(PaymentLog::setStatus, 2) // 2-已取消
.set(PaymentLog::setUpdateTime, LocalDateTime.now())
.eq(PaymentLog::getId, paymentLog.getId())
.eq(PaymentLog::getStatus, 0)); // 只更新待支付状态的记录
if (updateCount <= 0) {
log.warn("支付Cancel阶段更新失败,可能已处理,transactionId: {}", transactionId);
return;
}
log.info("支付Cancel阶段完成,transactionId: {}", transactionId);
}
}订单服务(TCC 发起方):
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.order.entity.Order;
import com.example.order.mapper.OrderMapper;
import com.example.order.service.OrderService;
import com.example.order.service.TccCoordinator;
import com.example.payment.service.PaymentTccService;
import com.example.inventory.service.InventoryTccService;
import com.example.user.service.UserAccountTccService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 订单服务
*
* @author ken
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderServiceImpl implements OrderService {
private final OrderMapper orderMapper;
private final TccCoordinator tccCoordinator;
private final InventoryTccService inventoryTccService;
private final UserAccountTccService userAccountTccService;
private final PaymentTccService paymentTccService;
/**
* 业务类型:订单创建
*/
public static final String BUSINESS_TYPE_ORDER_CREATE = "ORDER_CREATE";
/**
* 创建订单(TCC分布式事务)
*
* @param userId 用户ID
* @param productId 商品ID
* @param quantity 购买数量
* @return 订单ID
*/
@Override
public Long createOrderWithTcc(Long userId, Long productId, Integer quantity) {
log.info("开始创建订单(TCC),userId: {}, productId: {}, quantity: {}", userId, productId, quantity);
// 参数校验
if (ObjectUtils.isEmpty(userId)) {
throw new IllegalArgumentException("用户ID不能为空");
}
if (ObjectUtils.isEmpty(productId)) {
throw new IllegalArgumentException("商品ID不能为空");
}
if (quantity == null || quantity <= 0) {
throw new IllegalArgumentException("购买数量必须大于0");
}
// 创建订单(本地事务)
Order order = createOrder(userId, productId, quantity);
// 执行TCC分布式事务
tccCoordinator.execute(BUSINESS_TYPE_ORDER_CREATE, order.getId(), () -> {
// Try阶段:调用各参与方的Try方法
// 1. 锁定库存
inventoryTccService.tryDeductInventory(tccCoordinator.getCurrentTransactionId(), productId, quantity);
// 2. 冻结用户余额
BigDecimal amount = order.getAmount();
userAccountTccService.tryDeductBalance(tccCoordinator.getCurrentTransactionId(), userId, amount);
// 3. 创建支付记录
paymentTccService.tryCreatePayment(tccCoordinator.getCurrentTransactionId(),
order.getId(), userId, amount);
return null;
});
log.info("订单创建(TCC)完成,orderId: {}", order.getId());
return order.getId();
}
/**
* 创建订单(本地事务)
*
* @param userId 用户ID
* @param productId 商品ID
* @param quantity 购买数量
* @return 订单对象
*/
@Transactional(rollbackFor = Exception.class)
public Order createOrder(Long userId, Long productId, Integer quantity) {
// 假设商品单价为100元
BigDecimal amount = new BigDecimal(100).multiply(new BigDecimal(quantity));
Order order = new Order();
order.setUserId(userId);
order.setProductId(productId);
order.setQuantity(quantity);
order.setAmount(amount);
order.setStatus(0); // 0-创建中
order.setCreateTime(LocalDateTime.now());
order.setUpdateTime(LocalDateTime.now());
orderMapper.insert(order);
log.info("订单创建成功,orderId: {}", order.getId());
return order;
}
}
订单控制器(增加 TCC 接口):
import com.example.order.service.OrderService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 订单控制器
*
* @author ken
*/
@Slf4j
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
@Tag(name = "订单管理", description = "订单相关接口")
public class OrderController {
private final OrderService orderService;
// 省略本地消息表方案的接口...
/**
* 创建订单(TCC分布式事务)
*
* @param userId 用户ID
* @param productId 商品ID
* @param quantity 购买数量
* @return 订单ID
*/
@PostMapping("/tcc")
@Operation(summary = "创建订单(TCC)", description = "使用TCC模式创建新订单并处理分布式事务")
public Long createOrderWithTcc(
@Parameter(description = "用户ID", required = true) @RequestParam Long userId,
@Parameter(description = "商品ID", required = true) @RequestParam Long productId,
@Parameter(description = "购买数量", required = true) @RequestParam Integer quantity) {
return orderService.createOrderWithTcc(userId, productId, quantity);
}
}
优点:
缺点:
Seata 是阿里巴巴开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。它支持多种事务模式,包括 AT、TCC、SAGA 和 XA,其中 AT 模式是最常用的。
Seata 的 AT 模式工作原理可以用以下流程图表示:

Seata 的核心组件:
AT 模式的核心流程:
Seata 的 AT 模式通过自动生成 undo_log 和 redo_log,实现了对业务代码的无侵入,大大降低了分布式事务的使用门槛。
下面我们基于同样的订单创建场景,使用 Seata 的 AT 模式实现分布式事务。
在原有表结构的基础上,需要为每个数据库添加 Seata 的 undo_log 表:
-- Seata undo_log表
CREATE TABLE `undo_log` (
`branch_id` bigint NOT NULL COMMENT '分支事务ID',
`xid` varchar(100) NOT NULL COMMENT '全局事务ID',
`context` varchar(128) NOT NULL COMMENT '上下文信息',
`rollback_info` longblob NOT NULL COMMENT '回滚信息',
`log_status` int NOT NULL COMMENT '日志状态:0-正常,1-已清理',
`log_created` datetime NOT NULL COMMENT '创建时间',
`log_modified` datetime NOT NULL COMMENT '修改时间',
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Seata回滚日志表';其他业务表与前面的方案类似,这里不再重复列出。
<!-- Seata -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>2.0.0</version>
</dependency>
<!-- 其他依赖与前面方案类似 -->
# application.yml
seata:
enabled: true
application-id: ${spring.application.name}
tx-service-group: my_test_tx_group
registry:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
namespace:
config:
type: nacos
nacos:
server-addr: 127.0.0.1:8848
group: SEATA_GROUP
namespace:
service:
vgroup-mapping:
my_test_tx_group: default
grouplist:
default: 127.0.0.1:8091
client:
rm:
report-success-enable: false
undo:
log-table: undo_log
data-validation: true
log-serialization: jackson
订单服务(Seata TM):
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.order.entity.Order;
import com.example.order.feign.InventoryFeignClient;
import com.example.order.feign.PaymentFeignClient;
import com.example.order.feign.UserAccountFeignClient;
import com.example.order.mapper.OrderMapper;
import com.example.order.service.OrderService;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 订单服务(Seata实现)
*
* @author ken
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderServiceImpl implements OrderService {
private final OrderMapper orderMapper;
private final InventoryFeignClient inventoryFeignClient;
private final UserAccountFeignClient userAccountFeignClient;
private final PaymentFeignClient paymentFeignClient;
/**
* 创建订单(Seata分布式事务)
*
* @param userId 用户ID
* @param productId 商品ID
* @param quantity 购买数量
* @return 订单ID
*/
@Override
@GlobalTransactional(name = "create-order-transaction", rollbackFor = Exception.class)
public Long createOrderWithSeata(Long userId, Long productId, Integer quantity) {
log.info("开始创建订单(Seata),userId: {}, productId: {}, quantity: {}", userId, productId, quantity);
// 1. 创建订单
Order order = createOrder(userId, productId, quantity);
try {
// 2. 扣减库存
boolean inventoryResult = inventoryFeignClient.deductInventory(productId, quantity);
if (!inventoryResult) {
throw new RuntimeException("扣减库存失败");
}
// 3. 扣减用户余额
boolean accountResult = userAccountFeignClient.deductBalance(userId, order.getAmount());
if (!accountResult) {
throw new RuntimeException("扣减用户余额失败");
}
// 4. 创建支付记录
boolean paymentResult = paymentFeignClient.createPayment(order.getId(), userId, order.getAmount());
if (!paymentResult) {
throw new RuntimeException("创建支付记录失败");
}
// 5. 更新订单状态为已完成
updateOrderStatus(order.getId(), 1);
log.info("订单创建(Seata)完成,orderId: {}", order.getId());
return order.getId();
} catch (Exception e) {
log.error("订单创建失败,触发全局回滚", e);
// 抛出异常,Seata会自动回滚全局事务
throw new RuntimeException("订单创建失败", e);
}
}
/**
* 创建订单(本地事务)
*
* @param userId 用户ID
* @param productId 商品ID
* @param quantity 购买数量
* @return 订单对象
*/
@Transactional(rollbackFor = Exception.class)
public Order createOrder(Long userId, Long productId, Integer quantity) {
// 参数校验
if (ObjectUtils.isEmpty(userId)) {
throw new IllegalArgumentException("用户ID不能为空");
}
if (ObjectUtils.isEmpty(productId)) {
throw new IllegalArgumentException("商品ID不能为空");
}
if (quantity == null || quantity <= 0) {
throw new IllegalArgumentException("购买数量必须大于0");
}
// 假设商品单价为100元
BigDecimal amount = new BigDecimal(100).multiply(new BigDecimal(quantity));
Order order = new Order();
order.setUserId(userId);
order.setProductId(productId);
order.setQuantity(quantity);
order.setAmount(amount);
order.setStatus(0); // 0-创建中
order.setCreateTime(LocalDateTime.now());
order.setUpdateTime(LocalDateTime.now());
orderMapper.insert(order);
log.info("订单创建成功,orderId: {}", order.getId());
return order;
}
/**
* 更新订单状态
*
* @param orderId 订单ID
* @param status 状态
*/
@Transactional(rollbackFor = Exception.class)
public void updateOrderStatus(Long orderId, Integer status) {
Order order = new Order();
order.setId(orderId);
order.setStatus(status);
order.setUpdateTime(LocalDateTime.now());
orderMapper.updateById(order);
log.info("订单状态更新成功,orderId: {}, status: {}", orderId, status);
}
}
库存服务(Seata RM):
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.inventory.entity.Inventory;
import com.example.inventory.mapper.InventoryMapper;
import com.example.inventory.service.InventoryService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
/**
* 库存服务(Seata实现)
*
* @author ken
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class InventoryServiceImpl implements InventoryService {
private final InventoryMapper inventoryMapper;
/**
* 扣减库存
*
* @param productId 商品ID
* @param quantity 扣减数量
* @return 是否成功
*/
@Override
@Transactional(rollbackFor = Exception.class)
public boolean deductInventory(Long productId, Integer quantity) {
log.info("开始扣减库存,productId: {}, quantity: {}", productId, quantity);
// 参数校验
if (ObjectUtils.isEmpty(productId)) {
throw new IllegalArgumentException("商品ID不能为空");
}
if (quantity == null || quantity <= 0) {
throw new IllegalArgumentException("数量必须大于0");
}
// 查询库存
Inventory inventory = inventoryMapper.selectOne(Wrappers.<Inventory>lambdaQuery()
.eq(Inventory::getProductId, productId));
if (ObjectUtils.isEmpty(inventory)) {
log.error("商品不存在,productId: {}", productId);
return false;
}
// 检查库存是否充足
if (inventory.getQuantity() < quantity) {
log.error("库存不足,productId: {}, 可用库存: {}, 需要: {}",
productId, inventory.getQuantity(), quantity);
return false;
}
// 扣减库存
int updateCount = inventoryMapper.update(Wrappers.<Inventory>lambdaUpdate()
.set(Inventory::setQuantity, inventory.getQuantity() - quantity)
.eq(Inventory::getId, inventory.getId())
.eq(Inventory::getQuantity, inventory.getQuantity()));
if (updateCount <= 0) {
log.error("库存扣减失败,可能并发修改,productId: {}", productId);
return false;
}
log.info("库存扣减成功,productId: {}, quantity: {}", productId, quantity);
return true;
}
}用户账户服务和支付服务的实现与库存服务类似,都是在本地事务方法中实现业务逻辑,这里不再重复列出。
订单控制器(增加 Seata 接口):
import com.example.order.service.OrderService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 订单控制器
*
* @author ken
*/
@Slf4j
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
@Tag(name = "订单管理", description = "订单相关接口")
public class OrderController {
private final OrderService orderService;
// 省略其他接口...
/**
* 创建订单(Seata分布式事务)
*
* @param userId 用户ID
* @param productId 商品ID
* @param quantity 购买数量
* @return 订单ID
*/
@PostMapping("/seata")
@Operation(summary = "创建订单(Seata)", description = "使用Seata AT模式创建新订单并处理分布式事务")
public Long createOrderWithSeata(
@Parameter(description = "用户ID", required = true) @RequestParam Long userId,
@Parameter(description = "商品ID", required = true) @RequestParam Long productId,
@Parameter(description = "购买数量", required = true) @RequestParam Integer quantity) {
return orderService.createOrderWithSeata(userId, productId, quantity);
}
}
订单服务调用其他服务的 Feign 客户端:
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import java.math.BigDecimal;
/**
* 库存服务Feign客户端
*
* @author ken
*/
@FeignClient(name = "inventory-service")
public interface InventoryFeignClient {
/**
* 扣减库存
*
* @param productId 商品ID
* @param quantity 扣减数量
* @return 是否成功
*/
@PostMapping("/api/inventory/deduct")
boolean deductInventory(@RequestParam("productId") Long productId, @RequestParam("quantity") Integer quantity);
}
/**
* 用户账户服务Feign客户端
*
* @author ken
*/
@FeignClient(name = "user-service")
public interface UserAccountFeignClient {
/**
* 扣减用户余额
*
* @param userId 用户ID
* @param amount 扣减金额
* @return 是否成功
*/
@PostMapping("/api/user/account/deduct")
boolean deductBalance(@RequestParam("userId") Long userId, @RequestParam("amount") BigDecimal amount);
}
/**
* 支付服务Feign客户端
*
* @author ken
*/
@FeignClient(name = "payment-service")
public interface PaymentFeignClient {
/**
* 创建支付记录
*
* @param orderId 订单ID
* @param userId 用户ID
* @param amount 支付金额
* @return 是否成功
*/
@PostMapping("/api/payment/create")
boolean createPayment(@RequestParam("orderId") Long orderId,
@RequestParam("userId") Long userId,
@RequestParam("amount") BigDecimal amount);
}
优点:
缺点:
现在我们已经详细介绍了本地消息表、TCC 模式和 Seata 三种分布式事务解决方案,下面对它们进行全面对比,并给出选型建议。
特性 | 本地消息表 | TCC 模式 | Seata |
|---|---|---|---|
一致性保证 | 最终一致性 | 强一致性(Try 阶段) | 强一致性(AT 模式) |
实现复杂度 | 低 | 高 | 低 |
业务侵入性 | 中 | 高 | 低 |
性能 | 高 | 很高 | 中 |
适用场景 | 异步场景 | 同步场景 | 同步场景 |
代码改动量 | 中 | 大 | 小 |
学习成本 | 低 | 高 | 中 |
部署复杂度 | 低 | 中 | 高 |
重试机制 | 需要自己实现 | 需要自己实现 | 内置 |
幂等处理 | 需要自己实现 | 需要自己实现 | 内置 |
跨语言支持 | 好 | 好 | 一般(主要支持 Java) |
在性能方面,三种方案的表现由高到低依次是:
性能测试数据(仅供参考):
实际性能会受到网络延迟、数据库性能、业务复杂度等多种因素影响。
故障场景 | 本地消息表 | TCC 模式 | Seata |
|---|---|---|---|
服务宕机 | 消息表持久化,恢复后可重试 | 协调器记录状态,恢复后可继续 | TC 记录状态,恢复后可继续 |
网络分区 | 消息队列缓存,分区恢复后同步 | 协调器重试,最终一致性 | TC 重试,最终一致性 |
数据库宕机 | 本地事务保证,恢复后可继续 | Try 阶段预留资源,恢复后可继续 | 基于 undo_log 回滚,恢复后可继续 |
消息丢失 | 定时任务重试,保证最终送达 | 协调器重试机制 | 全局事务回滚机制 |
选择分布式事务方案时,应综合考虑以下因素:
分布式事务是微服务架构中的一个核心难题,没有放之四海而皆准的完美解决方案。本文介绍的三种方案各有优缺点,适用于不同的业务场景:
在实际项目中,我们往往需要根据具体业务场景,灵活选择甚至组合使用这些方案。例如,核心交易流程使用 TCC 保证强一致性,而非核心的通知、日志等流程使用本地消息表保证最终一致性。