首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >从 0 到 1 吃透 Java 分布式事务:Seata/TCC/ 本地消息表的终极选型指南

从 0 到 1 吃透 Java 分布式事务:Seata/TCC/ 本地消息表的终极选型指南

作者头像
果酱带你啃java
发布2026-04-14 13:18:06
发布2026-04-14 13:18:06
340
举报

在分布式系统的世界里,数据一致性就像一座必须跨越的独木桥。当业务从单体架构演进到微服务架构,一次业务操作可能涉及多个服务的数据库写入,如何保证这些操作要么全部成功,要么全部失败,成为每个 Java 开发者必须攻克的难关。

本文将深入剖析三种主流的分布式事务解决方案 ——Seata、TCC 模式和本地消息表,从底层原理到实战代码,从适用场景到性能对比,帮你彻底搞懂 "该怎么选" 这个核心问题。无论你是刚接触分布式系统的新手,还是正在项目中面临事务难题的资深开发者,读完本文都能找到适合自己业务场景的答案。

分布式事务的本质:从 CAP 理论说起

在深入技术细节之前,我们必须先理解分布式事务的本质。分布式事务的核心挑战源于著名的 CAP 理论:

CAP 理论告诉我们:在分布式系统中,当网络分区出现时,一致性和可用性无法同时保证。这意味着完美的分布式事务解决方案是不存在的,我们只能根据业务场景做出取舍。

分布式事务的本质就是在不同的取舍策略下,找到数据一致性和系统可用性之间的平衡点。理解这一点,是我们选择合适方案的前提。

分布式事务的 ACID 与 BASE

传统的数据库事务遵循 ACID 原则:

  • 原子性 (Atomicity):事务要么全部完成,要么全部不完成
  • 一致性 (Consistency):事务完成后,所有数据都处于一致状态
  • 隔离性 (Isolation):多个事务并发执行时,彼此不会相互影响
  • 持久性 (Durability):事务完成后,修改是永久的

而在分布式系统中,我们更多地追求 BASE 理论:

  • 基本可用 (Basically Available):系统在出现故障时,仍能保证核心功能可用
  • 软状态 (Soft State):允许系统存在中间状态,这个状态不会影响系统可用性
  • 最终一致性 (Eventual Consistency):系统最终会达到一致的状态,而不是实时保持一致

从 ACID 到 BASE 的转变,体现了分布式系统在一致性和可用性之间的权衡。接下来介绍的三种方案,本质上都是基于 BASE 理论的不同实现。

案例场景:贯穿全文的业务示例

为了让各种方案的讲解更加具体,我们将以一个典型的电商订单流程作为贯穿全文的案例:

  1. 创建订单(订单服务)
  2. 扣减库存(库存服务)
  3. 扣减余额(用户服务)
  4. 记录支付日志(支付服务)

这个流程涉及四个不同的服务和四个独立的数据库,是一个典型的分布式事务场景。我们将基于这个场景,分别实现三种分布式事务方案,通过对比让你理解它们的异同和适用场景。

方案一:本地消息表(最经典的最终一致性方案)

本地消息表是分布式事务中最经典、实现最简单的方案之一,由 eBay 在 2008 年提出。它的核心思想是将分布式事务转化为本地事务和消息的可靠传递。

本地消息表的核心原理

本地消息表的工作原理可以用以下流程图表示:

核心步骤解析:

  1. 在发起方的数据库中创建消息表,用于记录需要发送的消息
  2. 发起方在同一个本地事务中完成业务操作和消息记录
  3. 通过定时任务将消息表中的消息发送到消息队列
  4. 接收方消费消息,完成自己的业务操作,并记录消费日志
  5. 基于消息队列的重试机制保证消息最终被处理

这种方案的本质是通过本地事务保证消息的可靠生成,通过消息队列和重试机制保证消息的可靠传递和消费,从而实现最终一致性。

本地消息表示例实现

下面我们基于订单创建的场景,实现本地消息表方案。

1. 数据库设计

首先需要创建订单表和消息表(订单服务库):

代码语言:javascript
复制
-- 订单表
CREATE TABLE `t_order` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
  `user_id` bigint NOT NULL COMMENT '用户ID',
  `product_id` bigint NOT NULL COMMENT '商品ID',
  `quantity` int NOT NULL COMMENT '购买数量',
  `amount` decimal(10,2) NOT NULL COMMENT '订单金额',
  `status` tinyint NOT NULL COMMENT '订单状态:0-创建中,1-已完成,2-已取消',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';

-- 本地消息表
CREATE TABLE `t_local_message` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '消息ID',
  `business_type` varchar(32) NOT NULL COMMENT '业务类型',
  `business_id` bigint NOT NULL COMMENT '业务ID',
  `message_content` text NOT NULL COMMENT '消息内容',
  `status` tinyint NOT NULL COMMENT '消息状态:0-待发送,1-已发送,2-已完成,3-发送失败',
  `send_count` int NOT NULL DEFAULT 0 COMMENT '发送次数',
  `next_retry_time` datetime NOT NULL COMMENT '下次重试时间',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `idx_business` (`business_type`,`business_id`),
  KEY `idx_status_retry` (`status`,`next_retry_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='本地消息表';
代码语言:javascript
复制

库存表(库存服务库):

代码语言:javascript
复制
-- 库存表
CREATE TABLE `t_inventory` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `product_id` bigint NOT NULL COMMENT '商品ID',
  `quantity` int NOT NULL COMMENT '库存数量',
  `lock_quantity` int NOT NULL DEFAULT 0 COMMENT '锁定数量',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_product_id` (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存表';

-- 消息消费日志表
CREATE TABLE `t_message_consume_log` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `message_id` bigint NOT NULL COMMENT '消息ID',
  `business_type` varchar(32) NOT NULL COMMENT '业务类型',
  `business_id` bigint NOT NULL COMMENT '业务ID',
  `consume_time` datetime NOT NULL COMMENT '消费时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_message_id` (`message_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息消费日志表';
2. 核心依赖
代码语言:javascript
复制
<!-- pom.xml 核心依赖 -->
<dependencies>
    <!-- Spring Boot -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <version>3.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jdbc</artifactId>
        <version>3.2.0</version>
    </dependency>

    <!-- 数据库 -->
    <dependency>
        <groupId>com.mysql</groupId>
        <artifactId>mysql-connector-j</artifactId>
        <version>8.3.0</version>
        <scope>runtime</scope>
    </dependency>
    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.5.5</version>
    </dependency>

    <!-- 消息队列 -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>3.2.0</version>
    </dependency>

    <!-- 工具类 -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.30</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba.fastjson2</groupId>
        <artifactId>fastjson2</artifactId>
        <version>2.0.45</version>
    </dependency>
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>33.1.0-jre</version>
    </dependency>

    <!-- Swagger3 -->
    <dependency>
        <groupId>org.springdoc</groupId>
        <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
        <version>2.2.0</version>
    </dependency>
</dependencies>
代码语言:javascript
复制

3. 实体类定义

订单实体:

代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import lombok.Data;

/**
 * 订单表实体类
 *
 * @author ken
 */
@Data
@TableName("t_order")
public class Order {
    /**
     * 订单ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 用户ID
     */
    @TableField("user_id")
    private Long userId;

    /**
     * 商品ID
     */
    @TableField("product_id")
    private Long productId;

    /**
     * 购买数量
     */
    @TableField("quantity")
    private Integer quantity;

    /**
     * 订单金额
     */
    @TableField("amount")
    private BigDecimal amount;

    /**
     * 订单状态:0-创建中,1-已完成,2-已取消
     */
    @TableField("status")
    private Integer status;

    /**
     * 创建时间
     */
    @TableField("create_time")
    private LocalDateTime createTime;

    /**
     * 更新时间
     */
    @TableField("update_time")
    private LocalDateTime updateTime;
}
代码语言:javascript
复制

本地消息实体:

代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import java.time.LocalDateTime;
import lombok.Data;

/**
 * 本地消息表实体类
 *
 * @author ken
 */
@Data
@TableName("t_local_message")
public class LocalMessage {
    /**
     * 消息ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 业务类型
     */
    @TableField("business_type")
    private String businessType;

    /**
     * 业务ID
     */
    @TableField("business_id")
    private Long businessId;

    /**
     * 消息内容
     */
    @TableField("message_content")
    private String messageContent;

    /**
     * 消息状态:0-待发送,1-已发送,2-已完成,3-发送失败
     */
    @TableField("status")
    private Integer status;

    /**
     * 发送次数
     */
    @TableField("send_count")
    private Integer sendCount;

    /**
     * 下次重试时间
     */
    @TableField("next_retry_time")
    private LocalDateTime nextRetryTime;

    /**
     * 创建时间
     */
    @TableField("create_time")
    private LocalDateTime createTime;

    /**
     * 更新时间
     */
    @TableField("update_time")
    private LocalDateTime updateTime;
}
代码语言:javascript
复制

库存实体:

代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import java.time.LocalDateTime;
import lombok.Data;

/**
 * 库存表实体类
 *
 * @author ken
 */
@Data
@TableName("t_inventory")
public class Inventory {
    /**
     * ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 商品ID
     */
    @TableField("product_id")
    private Long productId;

    /**
     * 库存数量
     */
    @TableField("quantity")
    private Integer quantity;

    /**
     * 锁定数量
     */
    @TableField("lock_quantity")
    private Integer lockQuantity;

    /**
     * 创建时间
     */
    @TableField("create_time")
    private LocalDateTime createTime;

    /**
     * 更新时间
     */
    @TableField("update_time")
    private LocalDateTime updateTime;
}
代码语言:javascript
复制

消息消费日志实体:

代码语言:javascript
复制
import lombok.Data;

/**
 * 订单创建消息DTO
 *
 * @author ken
 */
@Data
public class OrderCreatedMessage {
    /**
     * 消息ID
     */
    private Long messageId;

    /**
     * 订单ID
     */
    private Long orderId;

    /**
     * 用户ID
     */
    private Long userId;

    /**
     * 商品ID
     */
    private Long productId;

    /**
     * 购买数量
     */
    private Integer quantity;
}
代码语言:javascript
复制

消息 DTO:

代码语言:javascript
复制
import lombok.Data;

/**
 * 订单创建消息DTO
 *
 * @author ken
 */
@Data
public class OrderCreatedMessage {
    /**
     * 消息ID
     */
    private Long messageId;

    /**
     * 订单ID
     */
    private Long orderId;

    /**
     * 用户ID
     */
    private Long userId;

    /**
     * 商品ID
     */
    private Long productId;

    /**
     * 购买数量
     */
    private Integer quantity;
}
代码语言:javascript
复制

4. Mapper 层定义

订单 Mapper:

代码语言:javascript
复制
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.order.entity.Order;
import org.apache.ibatis.annotations.Mapper;

/**
 * 订单Mapper接口
 *
 * @author ken
 */
@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}
代码语言:javascript
复制

本地消息 Mapper:

代码语言:javascript
复制
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.order.entity.LocalMessage;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.time.LocalDateTime;
import java.util.List;

/**
 * 本地消息Mapper接口
 *
 * @author ken
 */
@Mapper
public interface LocalMessageMapper extends BaseMapper<LocalMessage> {

    /**
     * 查询待发送的消息
     *
     * @param status 消息状态
     * @param currentTime 当前时间
     * @param limit 查询数量限制
     * @return 待发送的消息列表
     */
    List<LocalMessage> queryPendingMessages(
            @Param("status") Integer status,
            @Param("currentTime") LocalDateTime currentTime,
            @Param("limit") Integer limit);

    /**
     * 更新消息状态
     *
     * @param id 消息ID
     * @param oldStatus 旧状态
     * @param newStatus 新状态
     * @param sendCount 发送次数
     * @param nextRetryTime 下次重试时间
     * @return 更新影响的行数
     */
    int updateMessageStatus(
            @Param("id") Long id,
            @Param("oldStatus") Integer oldStatus,
            @Param("newStatus") Integer newStatus,
            @Param("sendCount") Integer sendCount,
            @Param("nextRetryTime") LocalDateTime nextRetryTime);
}
代码语言:javascript
复制

库存 Mapper:

代码语言:javascript
复制
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.inventory.entity.Inventory;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

/**
 * 库存Mapper接口
 *
 * @author ken
 */
@Mapper
public interface InventoryMapper extends BaseMapper<Inventory> {

    /**
     * 扣减库存
     *
     * @param productId 商品ID
     * @param quantity 扣减数量
     * @return 影响的行数
     */
    int deductInventory(@Param("productId") Long productId, @Param("quantity") Integer quantity);
}
代码语言:javascript
复制

消息消费日志 Mapper:

代码语言:javascript
复制
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.inventory.entity.MessageConsumeLog;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;

/**
 * 消息消费日志Mapper接口
 *
 * @author ken
 */
@Mapper
public interface MessageConsumeLogMapper extends BaseMapper<MessageConsumeLog> {

    /**
     * 查询消息是否已消费
     *
     * @param messageId 消息ID
     * @return 1-已消费,0-未消费
     */
    int checkMessageConsumed(@Param("messageId") Long messageId);
}
代码语言:javascript
复制

5. 服务层实现

订单服务(消息发起方):

代码语言:javascript
复制
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.order.dto.OrderCreatedMessage;
import com.example.order.entity.LocalMessage;
import com.example.order.entity.Order;
import com.example.order.mapper.LocalMessageMapper;
import com.example.order.mapper.OrderMapper;
import com.alibaba.fastjson2.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.UUID;

/**
 * 订单服务
 *
 * @author ken
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderService {
    private final OrderMapper orderMapper;
    private final LocalMessageMapper localMessageMapper;
    private final RabbitTemplate rabbitTemplate;

    /**
     * 业务类型:订单创建
     */
    public static final String BUSINESS_TYPE_ORDER_CREATE = "ORDER_CREATE";

    /**
     * 消息状态:待发送
     */
    public static final int MESSAGE_STATUS_PENDING = 0;

    /**
     * 消息状态:已发送
     */
    public static final int MESSAGE_STATUS_SENT = 1;

    /**
     * 创建订单
     *
     * @param userId 用户ID
     * @param productId 商品ID
     * @param quantity 购买数量
     * @return 订单ID
     */
    @Transactional(rollbackFor = Exception.class)
    public Long createOrder(Long userId, Long productId, Integer quantity) {
        log.info("开始创建订单,userId: {}, productId: {}, quantity: {}", userId, productId, quantity);

        // 参数校验
        if (ObjectUtils.isEmpty(userId)) {
            throw new IllegalArgumentException("用户ID不能为空");
        }
        if (ObjectUtils.isEmpty(productId)) {
            throw new IllegalArgumentException("商品ID不能为空");
        }
        if (quantity == null || quantity <= 0) {
            throw new IllegalArgumentException("购买数量必须大于0");
        }

        // 1. 创建订单(实际业务中需要查询商品价格等信息)
        Order order = new Order();
        order.setUserId(userId);
        order.setProductId(productId);
        order.setQuantity(quantity);
        // 假设商品单价为100元
        order.setAmount(new BigDecimal(100).multiply(new BigDecimal(quantity)));
        order.setStatus(0); // 0-创建中
        order.setCreateTime(LocalDateTime.now());
        order.setUpdateTime(LocalDateTime.now());
        orderMapper.insert(order);
        log.info("订单创建成功,orderId: {}", order.getId());

        // 2. 创建本地消息(与订单创建在同一个事务中)
        OrderCreatedMessage message = new OrderCreatedMessage();
        message.setOrderId(order.getId());
        message.setUserId(userId);
        message.setProductId(productId);
        message.setQuantity(quantity);

        LocalMessage localMessage = new LocalMessage();
        localMessage.setBusinessType(BUSINESS_TYPE_ORDER_CREATE);
        localMessage.setBusinessId(order.getId());
        localMessage.setMessageContent(JSON.toJSONString(message));
        localMessage.setStatus(MESSAGE_STATUS_PENDING);
        localMessage.setSendCount(0);
        // 首次发送时间设置为当前时间
        localMessage.setNextRetryTime(LocalDateTime.now());
        localMessage.setCreateTime(LocalDateTime.now());
        localMessage.setUpdateTime(LocalDateTime.now());
        localMessageMapper.insert(localMessage);
        log.info("本地消息创建成功,messageId: {}", localMessage.getId());

        // 3. 设置消息ID并尝试立即发送(即使发送失败也没关系,定时任务会重试)
        message.setMessageId(localMessage.getId());
        localMessage.setMessageContent(JSON.toJSONString(message));
        localMessageMapper.updateById(localMessage);

        // 尝试发送消息
        try {
            sendMessage(localMessage);
        } catch (Exception e) {
            log.error("消息发送失败,messageId: {}", localMessage.getId(), e);
            // 发送失败不需要回滚订单,定时任务会重试
        }

        return order.getId();
    }

    /**
     * 发送消息到MQ
     *
     * @param localMessage 本地消息
     */
    public void sendMessage(LocalMessage localMessage) {
        log.info("开始发送消息,messageId: {}", localMessage.getId());

        // 1. 更新消息状态为发送中
        int updateCount = localMessageMapper.updateMessageStatus(
                localMessage.getId(),
                MESSAGE_STATUS_PENDING,
                MESSAGE_STATUS_SENT,
                localMessage.getSendCount() + 1,
                calculateNextRetryTime(localMessage.getSendCount() + 1)
        );

        if (updateCount <= 0) {
            log.warn("消息状态已被修改,无需重复发送,messageId: {}", localMessage.getId());
            return;
        }

        // 2. 发送消息到MQ
        try {
            rabbitTemplate.convertAndSend(
                    "order.event.exchange",
                    "order.created",
                    localMessage.getMessageContent()
            );
            log.info("消息发送成功,messageId: {}", localMessage.getId());
        } catch (Exception e) {
            log.error("消息发送失败,messageId: {}", localMessage.getId(), e);
            // 发送失败,将状态改回待发送
            localMessageMapper.updateMessageStatus(
                    localMessage.getId(),
                    MESSAGE_STATUS_SENT,
                    MESSAGE_STATUS_PENDING,
                    localMessage.getSendCount() + 1,
                    calculateNextRetryTime(localMessage.getSendCount() + 1)
            );
        }
    }

    /**
     * 计算下次重试时间(指数退避策略)
     *
     * @param sendCount 已发送次数
     * @return 下次重试时间
     */
    private LocalDateTime calculateNextRetryTime(int sendCount) {
        // 重试间隔:1分钟、2分钟、4分钟、8分钟、16分钟,最大30分钟
        int delayMinutes = Math.min(30, (int) Math.pow(2, sendCount - 1));
        return LocalDateTime.now().plusMinutes(delayMinutes);
    }
}

定时任务(消息重发):

代码语言:javascript
复制
import com.example.order.entity.LocalMessage;
import com.example.order.mapper.LocalMessageMapper;
import com.example.order.service.OrderService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.time.LocalDateTime;
import java.util.List;

/**
 * 消息发送定时任务
 *
 * @author ken
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageResendTask {
    private final LocalMessageMapper localMessageMapper;
    private final OrderService orderService;

    /**
     * 每30秒执行一次,重试发送失败的消息
     */
    @Scheduled(fixedRate = 30000)
    public void resendFailedMessages() {
        log.info("开始执行消息重发任务");

        // 查询待发送且已到重试时间的消息,每次最多处理100条
        List<LocalMessage> messages = localMessageMapper.queryPendingMessages(
                OrderService.MESSAGE_STATUS_PENDING,
                LocalDateTime.now(),
                100
        );

        if (CollectionUtils.isEmpty(messages)) {
            log.info("没有需要重发的消息");
            return;
        }

        log.info("发现{}条需要重发的消息,开始处理", messages.size());

        // 逐条发送消息
        for (LocalMessage message : messages) {
            try {
                orderService.sendMessage(message);
            } catch (Exception e) {
                log.error("消息重发失败,messageId: {}", message.getId(), e);
                // 单个消息处理失败不影响其他消息
            }
        }

        log.info("消息重发任务执行完毕");
    }
}

库存服务(消息接收方):

代码语言:javascript
复制
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.inventory.entity.Inventory;
import com.example.inventory.entity.MessageConsumeLog;
import com.example.inventory.mapper.InventoryMapper;
import com.example.inventory.mapper.MessageConsumeLogMapper;
import com.example.order.dto.OrderCreatedMessage;
import com.alibaba.fastjson2.JSON;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;

import java.time.LocalDateTime;

/**
 * 库存服务
 *
 * @author ken
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class InventoryService {
    private final InventoryMapper inventoryMapper;
    private final MessageConsumeLogMapper messageConsumeLogMapper;

    /**
     * 处理订单创建消息,扣减库存
     *
     * @param messageContent 消息内容
     */
    @RabbitListener(queues = "order.created.inventory.queue")
    public void handleOrderCreatedMessage(String messageContent) {
        log.info("收到订单创建消息,内容:{}", messageContent);

        // 解析消息
        OrderCreatedMessage message;
        try {
            message = JSON.parseObject(messageContent, OrderCreatedMessage.class);
        } catch (Exception e) {
            log.error("消息格式解析错误", e);
            // 消息格式错误,直接确认,避免重复消费
            return;
        }

        // 参数校验
        if (ObjectUtils.isEmpty(message.getMessageId()) || ObjectUtils.isEmpty(message.getProductId()) || 
                message.getQuantity() == null || message.getQuantity() <= 0) {
            log.error("消息参数不完整,message: {}", message);
            return;
        }

        // 处理消息
        try {
            processDeductInventory(message);
        } catch (Exception e) {
            log.error("处理订单创建消息失败", e);
            // 抛出异常,让RabbitMQ重新投递消息
            throw new RuntimeException("处理消息失败,需要重试", e);
        }
    }

    /**
     * 处理库存扣减
     *
     * @param message 订单创建消息
     */
    @Transactional(rollbackFor = Exception.class)
    public void processDeductInventory(OrderCreatedMessage message) {
        log.info("开始处理库存扣减,messageId: {}, productId: {}, quantity: {}",
                message.getMessageId(), message.getProductId(), message.getQuantity());

        // 1. 检查消息是否已消费(避免重复消费)
        int consumedCount = messageConsumeLogMapper.checkMessageConsumed(message.getMessageId());
        if (consumedCount > 0) {
            log.info("消息已消费,无需重复处理,messageId: {}", message.getMessageId());
            return;
        }

        // 2. 扣减库存
        int updateCount = inventoryMapper.deductInventory(message.getProductId(), message.getQuantity());
        if (updateCount <= 0) {
            log.error("库存扣减失败,可能库存不足,productId: {}, quantity: {}",
                    message.getProductId(), message.getQuantity());
            throw new RuntimeException("库存不足,扣减失败");
        }

        log.info("库存扣减成功,productId: {}, quantity: {}", message.getProductId(), message.getQuantity());

        // 3. 记录消息消费日志
        MessageConsumeLog log = new MessageConsumeLog();
        log.setMessageId(message.getMessageId());
        log.setBusinessType("ORDER_CREATE");
        log.setBusinessId(message.getOrderId());
        log.setConsumeTime(LocalDateTime.now());
        messageConsumeLogMapper.insert(log);

        log.info("消息处理完成,messageId: {}", message.getMessageId());
    }
}
代码语言:javascript
复制

6. 控制器实现

订单控制器:

代码语言:javascript
复制
import com.example.order.service.OrderService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 订单控制器
 *
 * @author ken
 */
@Slf4j
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
@Tag(name = "订单管理", description = "订单相关接口")
public class OrderController {
    private final OrderService orderService;

    /**
     * 创建订单
     *
     * @param userId 用户ID
     * @param productId 商品ID
     * @param quantity 购买数量
     * @return 订单ID
     */
    @PostMapping
    @Operation(summary = "创建订单", description = "创建新订单并触发库存扣减等后续操作")
    public Long createOrder(
            @Parameter(description = "用户ID", required = true) @RequestParam Long userId,
            @Parameter(description = "商品ID", required = true) @RequestParam Long productId,
            @Parameter(description = "购买数量", required = true) @RequestParam Integer quantity) {
        return orderService.createOrder(userId, productId, quantity);
    }
}
代码语言:javascript
复制

7. RabbitMQ 配置
代码语言:javascript
复制
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ配置
 *
 * @author ken
 */
@Configuration
public class RabbitMQConfig {
    /**
     * 订单事件交换机
     */
    public static final String ORDER_EVENT_EXCHANGE = "order.event.exchange";

    /**
     * 订单创建路由键
     */
    public static final String ORDER_CREATED_ROUTING_KEY = "order.created";

    /**
     * 库存服务处理订单创建的队列
     */
    public static final String ORDER_CREATED_INVENTORY_QUEUE = "order.created.inventory.queue";

    /**
     * 创建订单事件交换机
     */
    @Bean
    public DirectExchange orderEventExchange() {
        // 持久化、非自动删除
        return new DirectExchange(ORDER_EVENT_EXCHANGE, true, false);
    }

    /**
     * 创建库存服务处理订单创建的队列
     */
    @Bean
    public Queue orderCreatedInventoryQueue() {
        // 持久化、非排他、非自动删除
        return new Queue(ORDER_CREATED_INVENTORY_QUEUE, true, false, false);
    }

    /**
     * 绑定队列到交换机
     */
    @Bean
    public Binding bindOrderCreatedInventoryQueue(DirectExchange orderEventExchange, Queue orderCreatedInventoryQueue) {
        return BindingBuilder.bind(orderCreatedInventoryQueue)
                .to(orderEventExchange)
                .with(ORDER_CREATED_ROUTING_KEY);
    }
}
代码语言:javascript
复制

本地消息表方案的优缺点分析

优点:

  1. 实现简单,基于本地事务和消息队列,开发成本低
  2. 可靠性高,通过本地事务保证消息不会丢失
  3. 兼容性好,不需要对现有系统做太大改造
  4. 性能较好,没有锁竞争,各服务可以独立扩展

缺点:

  1. 强依赖消息队列的可靠性
  2. 需要创建额外的消息表,增加了数据库负担
  3. 消息表与业务表耦合在同一个数据库,可能影响性能
  4. 只能保证最终一致性,不能保证实时一致性
  5. 需要处理消息重复消费的问题

本地消息表的适用场景

  1. 对一致性要求不高,能接受最终一致性的业务场景
  2. 业务系统架构相对简单,不想引入复杂的分布式事务框架
  3. 对性能要求较高,不希望有太多的网络交互和锁竞争
  4. 适合异步化程度高的业务流程,如电商下单、物流跟踪等

方案二:TCC 模式(最高性能的分布式事务方案)

TCC(Try-Confirm-Cancel)是一种基于业务层面的分布式事务解决方案,它将分布式事务拆分为三个阶段,通过业务逻辑的补偿机制保证数据一致性。

TCC 模式的核心原理

TCC 模式的工作原理可以用以下流程图表示:

TCC 的三个阶段详解:

  1. Try(尝试)阶段
    • 检查业务所需的资源是否充足
    • 预留业务所需的资源(如锁定库存、冻结金额等)
    • 确保后续的 Confirm 或 Cancel 可以成功执行
  2. Confirm(确认)阶段
    • 当所有参与方的 Try 都成功后执行
    • 真正执行业务逻辑
    • 释放 Try 阶段预留的资源
    • 该阶段必须保证幂等性,确保重复调用不会产生副作用
  3. Cancel(取消)阶段
    • 当任何一个参与方的 Try 失败时执行
    • 取消已执行的业务操作
    • 释放 Try 阶段预留的资源
    • 该阶段也必须保证幂等性

TCC 模式的核心思想是 **"先尝试,再确认,失败则取消"**,通过业务逻辑的拆分和补偿,实现分布式事务的最终一致性。

TCC 模式示例实现

下面我们基于同样的订单创建场景,实现 TCC 模式的分布式事务。

1. 数据库设计

订单表(订单服务库):

代码语言:javascript
复制
CREATE TABLE `t_order` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
  `user_id` bigint NOT NULL COMMENT '用户ID',
  `product_id` bigint NOT NULL COMMENT '商品ID',
  `quantity` int NOT NULL COMMENT '购买数量',
  `amount` decimal(10,2) NOT NULL COMMENT '订单金额',
  `status` tinyint NOT NULL COMMENT '订单状态:0-创建中,1-已确认,2-已取消',
  `tcc_transaction_id` varchar(64) DEFAULT NULL COMMENT 'TCC事务ID',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `idx_user_id` (`user_id`),
  KEY `idx_tcc_transaction_id` (`tcc_transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';
代码语言:javascript
复制

库存表(库存服务库):

代码语言:javascript
复制
CREATE TABLE `t_inventory` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `product_id` bigint NOT NULL COMMENT '商品ID',
  `quantity` int NOT NULL COMMENT '库存数量',
  `lock_quantity` int NOT NULL DEFAULT 0 COMMENT '锁定数量(TCC Try阶段预留)',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_product_id` (`product_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存表';
代码语言:javascript
复制

用户余额表(用户服务库):

代码语言:javascript
复制
CREATE TABLE `t_user_account` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `user_id` bigint NOT NULL COMMENT '用户ID',
  `balance` decimal(10,2) NOT NULL COMMENT '账户余额',
  `freeze_balance` decimal(10,2) NOT NULL DEFAULT 0.00 COMMENT '冻结金额(TCC Try阶段预留)',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户账户表';
代码语言:javascript
复制

支付日志表(支付服务库):

代码语言:javascript
复制
CREATE TABLE `t_payment_log` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `order_id` bigint NOT NULL COMMENT '订单ID',
  `user_id` bigint NOT NULL COMMENT '用户ID',
  `amount` decimal(10,2) NOT NULL COMMENT '支付金额',
  `status` tinyint NOT NULL COMMENT '支付状态:0-待支付,1-已支付,2-已取消',
  `tcc_transaction_id` varchar(64) DEFAULT NULL COMMENT 'TCC事务ID',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `idx_order_id` (`order_id`),
  KEY `idx_tcc_transaction_id` (`tcc_transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='支付日志表';

TCC 事务日志表(协调者使用):

代码语言:javascript
复制
CREATE TABLE `t_tcc_transaction` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `transaction_id` varchar(64) NOT NULL COMMENT '事务ID',
  `status` tinyint NOT NULL COMMENT '事务状态:0-尝试中,1-确认中,2-已确认,3-取消中,4-已取消',
  `business_type` varchar(32) NOT NULL COMMENT '业务类型',
  `business_id` bigint NOT NULL COMMENT '业务ID',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_transaction_id` (`transaction_id`),
  KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='TCC事务表';

CREATE TABLE `t_tcc_participant` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
  `transaction_id` varchar(64) NOT NULL COMMENT '事务ID',
  `participant_id` varchar(64) NOT NULL COMMENT '参与者ID',
  `confirm_method` varchar(255) NOT NULL COMMENT '确认方法',
  `cancel_method` varchar(255) NOT NULL COMMENT '取消方法',
  `status` tinyint NOT NULL COMMENT '状态:0-待处理,1-已确认,2-已取消',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  KEY `idx_transaction_id` (`transaction_id`),
  KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='TCC参与者表';
代码语言:javascript
复制

2. 核心依赖

在本地消息表方案的基础上,增加 TCC 协调器相关依赖:

代码语言:javascript
复制
<!-- TCC协调器 -->
<dependency>
    <groupId>org.dromara</groupId>
    <artifactId>hmily-core</artifactId>
    <version>2.1.2</version>
</dependency>
<dependency>
    <groupId>org.dromara</groupId>
    <artifactId>hmily-spring-boot-starter</artifactId>
    <version>2.1.2</version>
</dependency>
<!-- 序列化方式 -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.45</version>
</dependency>
代码语言:javascript
复制

3. 实体类定义

这里只展示与 TCC 相关的新增实体,其他实体与本地消息表方案类似。

TCC 事务实体:

代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import java.time.LocalDateTime;
import lombok.Data;

/**
 * TCC事务表实体类
 *
 * @author ken
 */
@Data
@TableName("t_tcc_transaction")
public class TccTransaction {
    /**
     * ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 事务ID
     */
    @TableField("transaction_id")
    private String transactionId;

    /**
     * 事务状态:0-尝试中,1-确认中,2-已确认,3-取消中,4-已取消
     */
    @TableField("status")
    private Integer status;

    /**
     * 业务类型
     */
    @TableField("business_type")
    private String businessType;

    /**
     * 业务ID
     */
    @TableField("business_id")
    private Long businessId;

    /**
     * 创建时间
     */
    @TableField("create_time")
    private LocalDateTime createTime;

    /**
     * 更新时间
     */
    @TableField("update_time")
    private LocalDateTime updateTime;
}

TCC 参与者实体:

代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import java.time.LocalDateTime;
import lombok.Data;

/**
 * TCC参与者表实体类
 *
 * @author ken
 */
@Data
@TableName("t_tcc_participant")
public class TccParticipant {
    /**
     * ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 事务ID
     */
    @TableField("transaction_id")
    private String transactionId;

    /**
     * 参与者ID
     */
    @TableField("participant_id")
    private String participantId;

    /**
     * 确认方法
     */
    @TableField("confirm_method")
    private String confirmMethod;

    /**
     * 取消方法
     */
    @TableField("cancel_method")
    private String cancelMethod;

    /**
     * 状态:0-待处理,1-已确认,2-已取消
     */
    @TableField("status")
    private Integer status;

    /**
     * 创建时间
     */
    @TableField("create_time")
    private LocalDateTime createTime;

    /**
     * 更新时间
     */
    @TableField("update_time")
    private LocalDateTime updateTime;
}

用户账户实体:

代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import lombok.Data;

/**
 * 用户账户表实体类
 *
 * @author ken
 */
@Data
@TableName("t_user_account")
public class UserAccount {
    /**
     * ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 用户ID
     */
    @TableField("user_id")
    private Long userId;

    /**
     * 账户余额
     */
    @TableField("balance")
    private BigDecimal balance;

    /**
     * 冻结金额(TCC Try阶段预留)
     */
    @TableField("freeze_balance")
    private BigDecimal freezeBalance;

    /**
     * 创建时间
     */
    @TableField("create_time")
    private LocalDateTime createTime;

    /**
     * 更新时间
     */
    @TableField("update_time")
    private LocalDateTime updateTime;
}

支付日志实体:

代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import lombok.Data;

/**
 * 支付日志表实体类
 *
 * @author ken
 */
@Data
@TableName("t_payment_log")
public class PaymentLog {
    /**
     * ID
     */
    @TableId(type = IdType.AUTO)
    private Long id;

    /**
     * 订单ID
     */
    @TableField("order_id")
    private Long orderId;

    /**
     * 用户ID
     */
    @TableField("user_id")
    private Long userId;

    /**
     * 支付金额
     */
    @TableField("amount")
    private BigDecimal amount;

    /**
     * 支付状态:0-待支付,1-已支付,2-已取消
     */
    @TableField("status")
    private Integer status;

    /**
     * TCC事务ID
     */
    @TableField("tcc_transaction_id")
    private String tccTransactionId;

    /**
     * 创建时间
     */
    @TableField("create_time")
    private LocalDateTime createTime;

    /**
     * 更新时间
     */
    @TableField("update_time")
    private LocalDateTime updateTime;
}
代码语言:javascript
复制

4. TCC 核心接口定义

TCC 事务协调器接口:

代码语言:javascript
复制
import java.util.List;
import java.util.function.Supplier;

/**
 * TCC事务协调器
 *
 * @author ken
 */
public interface TccCoordinator {

    /**
     * 执行TCC分布式事务
     *
     * @param businessType 业务类型
     * @param businessId 业务ID
     * @param action 业务操作,包含各个参与者的Try调用
     * @return 业务操作结果
     */
    <T> T execute(String businessType, Long businessId, Supplier<T> action);

    /**
     * 注册TCC参与者
     *
     * @param transactionId 事务ID
     * @param participantId 参与者ID
     * @param confirmMethod 确认方法
     * @param cancelMethod 取消方法
     */
    void registerParticipant(String transactionId, String participantId, 
                             String confirmMethod, String cancelMethod);

    /**
     * 确认事务
     *
     * @param transactionId 事务ID
     */
    void confirm(String transactionId);

    /**
     * 取消事务
     *
     * @param transactionId 事务ID
     */
    void cancel(String transactionId);

    /**
     * 处理悬挂的事务(定时任务调用)
     */
    void handleSuspendedTransactions();
}

库存服务 TCC 接口:

代码语言:javascript
复制
import org.dromara.hmily.annotation.Hmily;

/**
 * 库存服务TCC接口
 *
 * @author ken
 */
public interface InventoryTccService {

    /**
     * Try阶段:锁定库存
     *
     * @param transactionId 事务ID
     * @param productId 商品ID
     * @param quantity 数量
     */
    @Hmily(confirmMethod = "confirmDeductInventory", cancelMethod = "cancelDeductInventory")
    void tryDeductInventory(String transactionId, Long productId, Integer quantity);

    /**
     * Confirm阶段:确认扣减库存
     *
     * @param transactionId 事务ID
     * @param productId 商品ID
     * @param quantity 数量
     */
    void confirmDeductInventory(String transactionId, Long productId, Integer quantity);

    /**
     * Cancel阶段:取消扣减库存
     *
     * @param transactionId 事务ID
     * @param productId 商品ID
     * @param quantity 数量
     */
    void cancelDeductInventory(String transactionId, Long productId, Integer quantity);
}
代码语言:javascript
复制

用户账户 TCC 接口:

代码语言:javascript
复制
import org.dromara.hmily.annotation.Hmily;
import java.math.BigDecimal;

/**
 * 用户账户TCC接口
 *
 * @author ken
 */
public interface UserAccountTccService {

    /**
     * Try阶段:冻结用户余额
     *
     * @param transactionId 事务ID
     * @param userId 用户ID
     * @param amount 金额
     */
    @Hmily(confirmMethod = "confirmDeductBalance", cancelMethod = "cancelDeductBalance")
    void tryDeductBalance(String transactionId, Long userId, BigDecimal amount);

    /**
     * Confirm阶段:确认扣减用户余额
     *
     * @param transactionId 事务ID
     * @param userId 用户ID
     * @param amount 金额
     */
    void confirmDeductBalance(String transactionId, Long userId, BigDecimal amount);

    /**
     * Cancel阶段:取消扣减用户余额
     *
     * @param transactionId 事务ID
     * @param userId 用户ID
     * @param amount 金额
     */
    void cancelDeductBalance(String transactionId, Long userId, BigDecimal amount);
}
代码语言:javascript
复制

支付服务 TCC 接口:

代码语言:javascript
复制
import org.dromara.hmily.annotation.Hmily;
import java.math.BigDecimal;

/**
 * 支付服务TCC接口
 *
 * @author ken
 */
public interface PaymentTccService {

    /**
     * Try阶段:创建支付记录(待支付状态)
     *
     * @param transactionId 事务ID
     * @param orderId 订单ID
     * @param userId 用户ID
     * @param amount 金额
     */
    @Hmily(confirmMethod = "confirmPayment", cancelMethod = "cancelPayment")
    void tryCreatePayment(String transactionId, Long orderId, Long userId, BigDecimal amount);

    /**
     * Confirm阶段:确认支付
     *
     * @param transactionId 事务ID
     * @param orderId 订单ID
     * @param userId 用户ID
     * @param amount 金额
     */
    void confirmPayment(String transactionId, Long orderId, Long userId, BigDecimal amount);

    /**
     * Cancel阶段:取消支付
     *
     * @param transactionId 事务ID
     * @param orderId 订单ID
     * @param userId 用户ID
     * @param amount 金额
     */
    void cancelPayment(String transactionId, Long orderId, Long userId, BigDecimal amount);
}
代码语言:javascript
复制

5. 服务实现

库存服务 TCC 实现:

代码语言:javascript
复制
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.inventory.entity.Inventory;
import com.example.inventory.mapper.InventoryMapper;
import com.example.inventory.service.InventoryTccService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;

/**
 * 库存服务TCC实现
 *
 * @author ken
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class InventoryTccServiceImpl implements InventoryTccService {
    private final InventoryMapper inventoryMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void tryDeductInventory(String transactionId, Long productId, Integer quantity) {
        log.info("库存Try阶段,transactionId: {}, productId: {}, quantity: {}", 
                transactionId, productId, quantity);

        // 参数校验
        if (ObjectUtils.isEmpty(transactionId)) {
            throw new IllegalArgumentException("事务ID不能为空");
        }
        if (ObjectUtils.isEmpty(productId)) {
            throw new IllegalArgumentException("商品ID不能为空");
        }
        if (quantity == null || quantity <= 0) {
            throw new IllegalArgumentException("数量必须大于0");
        }

        // 查询库存
        Inventory inventory = inventoryMapper.selectOne(Wrappers.<Inventory>lambdaQuery()
                .eq(Inventory::getProductId, productId));

        if (ObjectUtils.isEmpty(inventory)) {
            log.error("商品不存在,productId: {}", productId);
            throw new RuntimeException("商品不存在");
        }

        // 检查库存是否充足
        if (inventory.getQuantity() < quantity) {
            log.error("库存不足,productId: {}, 可用库存: {}, 需要: {}", 
                    productId, inventory.getQuantity(), quantity);
            throw new RuntimeException("库存不足");
        }

        // 锁定库存(Try阶段核心操作)
        int updateCount = inventoryMapper.update(Wrappers.<Inventory>lambdaUpdate()
                .set(Inventory::setLockQuantity, inventory.getLockQuantity() + quantity)
                .eq(Inventory::getId, inventory.getId())
                .eq(Inventory::getQuantity, inventory.getQuantity()));

        if (updateCount <= 0) {
            log.error("库存锁定失败,可能并发修改,productId: {}", productId);
            throw new RuntimeException("库存锁定失败");
        }

        log.info("库存Try阶段完成,transactionId: {}", transactionId);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void confirmDeductInventory(String transactionId, Long productId, Integer quantity) {
        log.info("库存Confirm阶段,transactionId: {}, productId: {}, quantity: {}", 
                transactionId, productId, quantity);

        // 查询库存
        Inventory inventory = inventoryMapper.selectOne(Wrappers.<Inventory>lambdaQuery()
                .eq(Inventory::getProductId, productId));

        if (ObjectUtils.isEmpty(inventory)) {
            log.error("商品不存在,productId: {}", productId);
            return; // 幂等处理,已经处理过的情况
        }

        // 确认扣减库存:实际扣减库存,释放锁定的库存
        int updateCount = inventoryMapper.update(Wrappers.<Inventory>lambdaUpdate()
                .set(Inventory::setQuantity, inventory.getQuantity() - quantity)
                .set(Inventory::setLockQuantity, inventory.getLockQuantity() - quantity)
                .eq(Inventory::getId, inventory.getId())
                .ge(Inventory::getLockQuantity, quantity));

        if (updateCount <= 0) {
            log.warn("库存Confirm阶段已处理,无需重复处理,transactionId: {}", transactionId);
            return; // 幂等处理
        }

        log.info("库存Confirm阶段完成,transactionId: {}", transactionId);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void cancelDeductInventory(String transactionId, Long productId, Integer quantity) {
        log.info("库存Cancel阶段,transactionId: {}, productId: {}, quantity: {}", 
                transactionId, productId, quantity);

        // 查询库存
        Inventory inventory = inventoryMapper.selectOne(Wrappers.<Inventory>lambdaQuery()
                .eq(Inventory::getProductId, productId));

        if (ObjectUtils.isEmpty(inventory)) {
            log.error("商品不存在,productId: {}", productId);
            return; // 幂等处理
        }

        // 取消扣减库存:释放锁定的库存
        int updateCount = inventoryMapper.update(Wrappers.<Inventory>lambdaUpdate()
                .set(Inventory::setLockQuantity, inventory.getLockQuantity() - quantity)
                .eq(Inventory::getId, inventory.getId())
                .ge(Inventory::getLockQuantity, quantity));

        if (updateCount <= 0) {
            log.warn("库存Cancel阶段已处理,无需重复处理,transactionId: {}", transactionId);
            return; // 幂等处理
        }

        log.info("库存Cancel阶段完成,transactionId: {}", transactionId);
    }
}

用户账户服务 TCC 实现:

代码语言:javascript
复制
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.user.entity.UserAccount;
import com.example.user.mapper.UserAccountMapper;
import com.example.user.service.UserAccountTccService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;

import java.math.BigDecimal;

/**
 * 用户账户服务TCC实现
 *
 * @author ken
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class UserAccountTccServiceImpl implements UserAccountTccService {
    private final UserAccountMapper userAccountMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void tryDeductBalance(String transactionId, Long userId, BigDecimal amount) {
        log.info("用户账户Try阶段,transactionId: {}, userId: {}, amount: {}", 
                transactionId, userId, amount);

        // 参数校验
        if (ObjectUtils.isEmpty(transactionId)) {
            throw new IllegalArgumentException("事务ID不能为空");
        }
        if (ObjectUtils.isEmpty(userId)) {
            throw new IllegalArgumentException("用户ID不能为空");
        }
        if (ObjectUtils.isEmpty(amount) || amount.compareTo(BigDecimal.ZERO) <= 0) {
            throw new IllegalArgumentException("金额必须大于0");
        }

        // 查询用户账户
        UserAccount account = userAccountMapper.selectOne(Wrappers.<UserAccount>lambdaQuery()
                .eq(UserAccount::getUserId, userId));

        if (ObjectUtils.isEmpty(account)) {
            log.error("用户账户不存在,userId: {}", userId);
            throw new RuntimeException("用户账户不存在");
        }

        // 检查余额是否充足
        if (account.getBalance().compareTo(amount) < 0) {
            log.error("用户余额不足,userId: {}, 可用余额: {}, 需要: {}", 
                    userId, account.getBalance(), amount);
            throw new RuntimeException("用户余额不足");
        }

        // 冻结金额(Try阶段核心操作)
        int updateCount = userAccountMapper.update(Wrappers.<UserAccount>lambdaUpdate()
                .set(UserAccount::setFreezeBalance, account.getFreezeBalance().add(amount))
                .eq(UserAccount::getId, account.getId())
                .eq(UserAccount::getBalance, account.getBalance()));

        if (updateCount <= 0) {
            log.error("余额冻结失败,可能并发修改,userId: {}", userId);
            throw new RuntimeException("余额冻结失败");
        }

        log.info("用户账户Try阶段完成,transactionId: {}", transactionId);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void confirmDeductBalance(String transactionId, Long userId, BigDecimal amount) {
        log.info("用户账户Confirm阶段,transactionId: {}, userId: {}, amount: {}", 
                transactionId, userId, amount);

        // 查询用户账户
        UserAccount account = userAccountMapper.selectOne(Wrappers.<UserAccount>lambdaQuery()
                .eq(UserAccount::getUserId, userId));

        if (ObjectUtils.isEmpty(account)) {
            log.error("用户账户不存在,userId: {}", userId);
            return; // 幂等处理
        }

        // 确认扣减余额:实际扣减余额,释放冻结的金额
        int updateCount = userAccountMapper.update(Wrappers.<UserAccount>lambdaUpdate()
                .set(UserAccount::setBalance, account.getBalance().subtract(amount))
                .set(UserAccount::setFreezeBalance, account.getFreezeBalance().subtract(amount))
                .eq(UserAccount::getId, account.getId())
                .ge(UserAccount::getFreezeBalance, amount));

        if (updateCount <= 0) {
            log.warn("用户账户Confirm阶段已处理,无需重复处理,transactionId: {}", transactionId);
            return; // 幂等处理
        }

        log.info("用户账户Confirm阶段完成,transactionId: {}", transactionId);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void cancelDeductBalance(String transactionId, Long userId, BigDecimal amount) {
        log.info("用户账户Cancel阶段,transactionId: {}, userId: {}, amount: {}", 
                transactionId, userId, amount);

        // 查询用户账户
        UserAccount account = userAccountMapper.selectOne(Wrappers.<UserAccount>lambdaQuery()
                .eq(UserAccount::getUserId, userId));

        if (ObjectUtils.isEmpty(account)) {
            log.error("用户账户不存在,userId: {}", userId);
            return; // 幂等处理
        }

        // 取消扣减余额:释放冻结的金额
        int updateCount = userAccountMapper.update(Wrappers.<UserAccount>lambdaUpdate()
                .set(UserAccount::setFreezeBalance, account.getFreezeBalance().subtract(amount))
                .eq(UserAccount::getId, account.getId())
                .ge(UserAccount::getFreezeBalance, amount));

        if (updateCount <= 0) {
            log.warn("用户账户Cancel阶段已处理,无需重复处理,transactionId: {}", transactionId);
            return; // 幂等处理
        }

        log.info("用户账户Cancel阶段完成,transactionId: {}", transactionId);
    }
}

支付服务 TCC 实现:

代码语言:javascript
复制
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.payment.entity.PaymentLog;
import com.example.payment.mapper.PaymentLogMapper;
import com.example.payment.service.PaymentTccService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;

import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
 * 支付服务TCC实现
 *
 * @author ken
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class PaymentTccServiceImpl implements PaymentTccService {
    private final PaymentLogMapper paymentLogMapper;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void tryCreatePayment(String transactionId, Long orderId, Long userId, BigDecimal amount) {
        log.info("支付Try阶段,transactionId: {}, orderId: {}, userId: {}, amount: {}", 
                transactionId, orderId, userId, amount);

        // 参数校验
        if (ObjectUtils.isEmpty(transactionId)) {
            throw new IllegalArgumentException("事务ID不能为空");
        }
        if (ObjectUtils.isEmpty(orderId)) {
            throw new IllegalArgumentException("订单ID不能为空");
        }
        if (ObjectUtils.isEmpty(userId)) {
            throw new IllegalArgumentException("用户ID不能为空");
        }
        if (ObjectUtils.isEmpty(amount) || amount.compareTo(BigDecimal.ZERO) <= 0) {
            throw new IllegalArgumentException("金额必须大于0");
        }

        // 检查是否已存在相同事务的支付记录
        PaymentLog existingLog = paymentLogMapper.selectOne(Wrappers.<PaymentLog>lambdaQuery()
                .eq(PaymentLog::getTccTransactionId, transactionId));

        if (!ObjectUtils.isEmpty(existingLog)) {
            log.warn("支付Try阶段已处理,无需重复处理,transactionId: {}", transactionId);
            return; // 幂等处理
        }

        // 创建支付记录(待支付状态)
        PaymentLog paymentLog = new PaymentLog();
        paymentLog.setOrderId(orderId);
        paymentLog.setUserId(userId);
        paymentLog.setAmount(amount);
        paymentLog.setStatus(0); // 0-待支付
        paymentLog.setTccTransactionId(transactionId);
        paymentLog.setCreateTime(LocalDateTime.now());
        paymentLog.setUpdateTime(LocalDateTime.now());

        paymentLogMapper.insert(paymentLog);

        log.info("支付Try阶段完成,transactionId: {}, paymentLogId: {}", transactionId, paymentLog.getId());
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void confirmPayment(String transactionId, Long orderId, Long userId, BigDecimal amount) {
        log.info("支付Confirm阶段,transactionId: {}, orderId: {}, userId: {}, amount: {}", 
                transactionId, orderId, userId, amount);

        // 查询支付记录
        PaymentLog paymentLog = paymentLogMapper.selectOne(Wrappers.<PaymentLog>lambdaQuery()
                .eq(PaymentLog::getTccTransactionId, transactionId));

        if (ObjectUtils.isEmpty(paymentLog)) {
            log.error("支付记录不存在,transactionId: {}", transactionId);
            return; // 幂等处理
        }

        // 如果已经是已支付状态,直接返回
        if (paymentLog.getStatus() == 1) {
            log.warn("支付Confirm阶段已处理,无需重复处理,transactionId: {}", transactionId);
            return; // 幂等处理
        }

        // 更新支付状态为已支付
        int updateCount = paymentLogMapper.update(Wrappers.<PaymentLog>lambdaUpdate()
                .set(PaymentLog::setStatus, 1) // 1-已支付
                .set(PaymentLog::setUpdateTime, LocalDateTime.now())
                .eq(PaymentLog::getId, paymentLog.getId())
                .eq(PaymentLog::getStatus, 0)); // 只更新待支付状态的记录

        if (updateCount <= 0) {
            log.warn("支付Confirm阶段更新失败,可能已处理,transactionId: {}", transactionId);
            return;
        }

        log.info("支付Confirm阶段完成,transactionId: {}", transactionId);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void cancelPayment(String transactionId, Long orderId, Long userId, BigDecimal amount) {
        log.info("支付Cancel阶段,transactionId: {}, orderId: {}, userId: {}, amount: {}", 
                transactionId, orderId, userId, amount);

        // 查询支付记录
        PaymentLog paymentLog = paymentLogMapper.selectOne(Wrappers.<PaymentLog>lambdaQuery()
                .eq(PaymentLog::getTccTransactionId, transactionId));

        if (ObjectUtils.isEmpty(paymentLog)) {
            log.error("支付记录不存在,transactionId: {}", transactionId);
            return; // 幂等处理
        }

        // 如果已经是已取消状态,直接返回
        if (paymentLog.getStatus() == 2) {
            log.warn("支付Cancel阶段已处理,无需重复处理,transactionId: {}", transactionId);
            return; // 幂等处理
        }

        // 更新支付状态为已取消
        int updateCount = paymentLogMapper.update(Wrappers.<PaymentLog>lambdaUpdate()
                .set(PaymentLog::setStatus, 2) // 2-已取消
                .set(PaymentLog::setUpdateTime, LocalDateTime.now())
                .eq(PaymentLog::getId, paymentLog.getId())
                .eq(PaymentLog::getStatus, 0)); // 只更新待支付状态的记录

        if (updateCount <= 0) {
            log.warn("支付Cancel阶段更新失败,可能已处理,transactionId: {}", transactionId);
            return;
        }

        log.info("支付Cancel阶段完成,transactionId: {}", transactionId);
    }
}

订单服务(TCC 发起方):

代码语言:javascript
复制
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.order.entity.Order;
import com.example.order.mapper.OrderMapper;
import com.example.order.service.OrderService;
import com.example.order.service.TccCoordinator;
import com.example.payment.service.PaymentTccService;
import com.example.inventory.service.InventoryTccService;
import com.example.user.service.UserAccountTccService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;

import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
 * 订单服务
 *
 * @author ken
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderServiceImpl implements OrderService {
    private final OrderMapper orderMapper;
    private final TccCoordinator tccCoordinator;
    private final InventoryTccService inventoryTccService;
    private final UserAccountTccService userAccountTccService;
    private final PaymentTccService paymentTccService;

    /**
     * 业务类型:订单创建
     */
    public static final String BUSINESS_TYPE_ORDER_CREATE = "ORDER_CREATE";

    /**
     * 创建订单(TCC分布式事务)
     *
     * @param userId 用户ID
     * @param productId 商品ID
     * @param quantity 购买数量
     * @return 订单ID
     */
    @Override
    public Long createOrderWithTcc(Long userId, Long productId, Integer quantity) {
        log.info("开始创建订单(TCC),userId: {}, productId: {}, quantity: {}", userId, productId, quantity);

        // 参数校验
        if (ObjectUtils.isEmpty(userId)) {
            throw new IllegalArgumentException("用户ID不能为空");
        }
        if (ObjectUtils.isEmpty(productId)) {
            throw new IllegalArgumentException("商品ID不能为空");
        }
        if (quantity == null || quantity <= 0) {
            throw new IllegalArgumentException("购买数量必须大于0");
        }

        // 创建订单(本地事务)
        Order order = createOrder(userId, productId, quantity);

        // 执行TCC分布式事务
        tccCoordinator.execute(BUSINESS_TYPE_ORDER_CREATE, order.getId(), () -> {
            // Try阶段:调用各参与方的Try方法
            // 1. 锁定库存
            inventoryTccService.tryDeductInventory(tccCoordinator.getCurrentTransactionId(), productId, quantity);

            // 2. 冻结用户余额
            BigDecimal amount = order.getAmount();
            userAccountTccService.tryDeductBalance(tccCoordinator.getCurrentTransactionId(), userId, amount);

            // 3. 创建支付记录
            paymentTccService.tryCreatePayment(tccCoordinator.getCurrentTransactionId(), 
                    order.getId(), userId, amount);

            return null;
        });

        log.info("订单创建(TCC)完成,orderId: {}", order.getId());
        return order.getId();
    }

    /**
     * 创建订单(本地事务)
     *
     * @param userId 用户ID
     * @param productId 商品ID
     * @param quantity 购买数量
     * @return 订单对象
     */
    @Transactional(rollbackFor = Exception.class)
    public Order createOrder(Long userId, Long productId, Integer quantity) {
        // 假设商品单价为100元
        BigDecimal amount = new BigDecimal(100).multiply(new BigDecimal(quantity));

        Order order = new Order();
        order.setUserId(userId);
        order.setProductId(productId);
        order.setQuantity(quantity);
        order.setAmount(amount);
        order.setStatus(0); // 0-创建中
        order.setCreateTime(LocalDateTime.now());
        order.setUpdateTime(LocalDateTime.now());

        orderMapper.insert(order);
        log.info("订单创建成功,orderId: {}", order.getId());

        return order;
    }
}
代码语言:javascript
复制

6. 控制器实现

订单控制器(增加 TCC 接口):

代码语言:javascript
复制
import com.example.order.service.OrderService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 订单控制器
 *
 * @author ken
 */
@Slf4j
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
@Tag(name = "订单管理", description = "订单相关接口")
public class OrderController {
    private final OrderService orderService;

    // 省略本地消息表方案的接口...

    /**
     * 创建订单(TCC分布式事务)
     *
     * @param userId 用户ID
     * @param productId 商品ID
     * @param quantity 购买数量
     * @return 订单ID
     */
    @PostMapping("/tcc")
    @Operation(summary = "创建订单(TCC)", description = "使用TCC模式创建新订单并处理分布式事务")
    public Long createOrderWithTcc(
            @Parameter(description = "用户ID", required = true) @RequestParam Long userId,
            @Parameter(description = "商品ID", required = true) @RequestParam Long productId,
            @Parameter(description = "购买数量", required = true) @RequestParam Integer quantity) {
        return orderService.createOrderWithTcc(userId, productId, quantity);
    }
}
代码语言:javascript
复制

TCC 模式的优缺点分析

优点:

  1. 性能高,没有全局锁,各服务可以并行执行
  2. 灵活性高,可以根据业务场景定制 Try/Confirm/Cancel 逻辑
  3. 一致性保证好,在 Try 阶段就能发现并解决问题
  4. 适合复杂的业务场景,可以精细控制每个阶段的行为

缺点:

  1. 开发成本高,需要为每个业务编写三个阶段的代码
  2. 侵入性强,需要修改业务代码
  3. 对开发人员要求高,需要正确设计三个阶段的逻辑
  4. 需要处理幂等性、空回滚、悬挂等问题
  5. 需要实现事务协调器,增加了系统复杂度

TCC 模式的适用场景

  1. 对性能要求高的核心业务,如支付、交易等
  2. 业务逻辑相对复杂,需要精细控制事务过程
  3. 对一致性要求较高,希望在早期就能发现并解决问题
  4. 适合高并发场景,如秒杀、抢购等活动

方案三:Seata(最易用的分布式事务框架)

Seata 是阿里巴巴开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。它支持多种事务模式,包括 AT、TCC、SAGA 和 XA,其中 AT 模式是最常用的。

Seata 的核心原理

Seata 的 AT 模式工作原理可以用以下流程图表示:

Seata 的核心组件:

  • TM(Transaction Manager):事务管理器,负责发起和结束全局事务
  • TC(Transaction Coordinator):事务协调器,负责协调全局事务的提交或回滚
  • RM(Resource Manager):资源管理器,负责管理分支事务

AT 模式的核心流程:

  1. 全局事务开启:TM 向 TC 申请开启全局事务,TC 生成唯一 XID
  2. 分支事务执行:RM 执行本地事务 SQL,同时生成 undo_log 和 redo_log
  3. 分支事务注册:RM 将分支事务注册到 TC
  4. 全局事务提交 / 回滚:TM 根据所有分支事务的执行结果,通知 TC 提交或回滚全局事务
  5. 分支事务提交 / 回滚:TC 通知所有 RM 提交或回滚分支事务

Seata 的 AT 模式通过自动生成 undo_log 和 redo_log,实现了对业务代码的无侵入,大大降低了分布式事务的使用门槛。

Seata 示例实现

下面我们基于同样的订单创建场景,使用 Seata 的 AT 模式实现分布式事务。

1. 数据库设计

在原有表结构的基础上,需要为每个数据库添加 Seata 的 undo_log 表:

代码语言:javascript
复制
-- Seata undo_log表
CREATE TABLE `undo_log` (
  `branch_id` bigint NOT NULL COMMENT '分支事务ID',
  `xid` varchar(100) NOT NULL COMMENT '全局事务ID',
  `context` varchar(128) NOT NULL COMMENT '上下文信息',
  `rollback_info` longblob NOT NULL COMMENT '回滚信息',
  `log_status` int NOT NULL COMMENT '日志状态:0-正常,1-已清理',
  `log_created` datetime NOT NULL COMMENT '创建时间',
  `log_modified` datetime NOT NULL COMMENT '修改时间',
  PRIMARY KEY (`branch_id`),
  KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Seata回滚日志表';

其他业务表与前面的方案类似,这里不再重复列出。

2. 核心依赖
代码语言:javascript
复制
<!-- Seata -->
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-spring-boot-starter</artifactId>
    <version>2.0.0</version>
</dependency>
<dependency>
    <groupId>io.seata</groupId>
    <artifactId>seata-all</artifactId>
    <version>2.0.0</version>
</dependency>

<!-- 其他依赖与前面方案类似 -->
代码语言:javascript
复制

3. Seata 配置
代码语言:javascript
复制
# application.yml
seata:
  enabled: true
  application-id: ${spring.application.name}
  tx-service-group: my_test_tx_group
  registry:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      group: SEATA_GROUP
      namespace:
  config:
    type: nacos
    nacos:
      server-addr: 127.0.0.1:8848
      group: SEATA_GROUP
      namespace:
  service:
    vgroup-mapping:
      my_test_tx_group: default
    grouplist:
      default: 127.0.0.1:8091
  client:
    rm:
      report-success-enable: false
    undo:
      log-table: undo_log
      data-validation: true
      log-serialization: jackson
代码语言:javascript
复制

4. 服务实现

订单服务(Seata TM):

代码语言:javascript
复制
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.order.entity.Order;
import com.example.order.feign.InventoryFeignClient;
import com.example.order.feign.PaymentFeignClient;
import com.example.order.feign.UserAccountFeignClient;
import com.example.order.mapper.OrderMapper;
import com.example.order.service.OrderService;
import io.seata.spring.annotation.GlobalTransactional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;

import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
 * 订单服务(Seata实现)
 *
 * @author ken
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class OrderServiceImpl implements OrderService {
    private final OrderMapper orderMapper;
    private final InventoryFeignClient inventoryFeignClient;
    private final UserAccountFeignClient userAccountFeignClient;
    private final PaymentFeignClient paymentFeignClient;

    /**
     * 创建订单(Seata分布式事务)
     *
     * @param userId 用户ID
     * @param productId 商品ID
     * @param quantity 购买数量
     * @return 订单ID
     */
    @Override
    @GlobalTransactional(name = "create-order-transaction", rollbackFor = Exception.class)
    public Long createOrderWithSeata(Long userId, Long productId, Integer quantity) {
        log.info("开始创建订单(Seata),userId: {}, productId: {}, quantity: {}", userId, productId, quantity);

        // 1. 创建订单
        Order order = createOrder(userId, productId, quantity);

        try {
            // 2. 扣减库存
            boolean inventoryResult = inventoryFeignClient.deductInventory(productId, quantity);
            if (!inventoryResult) {
                throw new RuntimeException("扣减库存失败");
            }

            // 3. 扣减用户余额
            boolean accountResult = userAccountFeignClient.deductBalance(userId, order.getAmount());
            if (!accountResult) {
                throw new RuntimeException("扣减用户余额失败");
            }

            // 4. 创建支付记录
            boolean paymentResult = paymentFeignClient.createPayment(order.getId(), userId, order.getAmount());
            if (!paymentResult) {
                throw new RuntimeException("创建支付记录失败");
            }

            // 5. 更新订单状态为已完成
            updateOrderStatus(order.getId(), 1);

            log.info("订单创建(Seata)完成,orderId: {}", order.getId());
            return order.getId();
        } catch (Exception e) {
            log.error("订单创建失败,触发全局回滚", e);
            // 抛出异常,Seata会自动回滚全局事务
            throw new RuntimeException("订单创建失败", e);
        }
    }

    /**
     * 创建订单(本地事务)
     *
     * @param userId 用户ID
     * @param productId 商品ID
     * @param quantity 购买数量
     * @return 订单对象
     */
    @Transactional(rollbackFor = Exception.class)
    public Order createOrder(Long userId, Long productId, Integer quantity) {
        // 参数校验
        if (ObjectUtils.isEmpty(userId)) {
            throw new IllegalArgumentException("用户ID不能为空");
        }
        if (ObjectUtils.isEmpty(productId)) {
            throw new IllegalArgumentException("商品ID不能为空");
        }
        if (quantity == null || quantity <= 0) {
            throw new IllegalArgumentException("购买数量必须大于0");
        }

        // 假设商品单价为100元
        BigDecimal amount = new BigDecimal(100).multiply(new BigDecimal(quantity));

        Order order = new Order();
        order.setUserId(userId);
        order.setProductId(productId);
        order.setQuantity(quantity);
        order.setAmount(amount);
        order.setStatus(0); // 0-创建中
        order.setCreateTime(LocalDateTime.now());
        order.setUpdateTime(LocalDateTime.now());

        orderMapper.insert(order);
        log.info("订单创建成功,orderId: {}", order.getId());

        return order;
    }

    /**
     * 更新订单状态
     *
     * @param orderId 订单ID
     * @param status 状态
     */
    @Transactional(rollbackFor = Exception.class)
    public void updateOrderStatus(Long orderId, Integer status) {
        Order order = new Order();
        order.setId(orderId);
        order.setStatus(status);
        order.setUpdateTime(LocalDateTime.now());

        orderMapper.updateById(order);
        log.info("订单状态更新成功,orderId: {}, status: {}", orderId, status);
    }
}
代码语言:javascript
复制

库存服务(Seata RM):

代码语言:javascript
复制
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.inventory.entity.Inventory;
import com.example.inventory.mapper.InventoryMapper;
import com.example.inventory.service.InventoryService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;

/**
 * 库存服务(Seata实现)
 *
 * @author ken
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class InventoryServiceImpl implements InventoryService {
    private final InventoryMapper inventoryMapper;

    /**
     * 扣减库存
     *
     * @param productId 商品ID
     * @param quantity 扣减数量
     * @return 是否成功
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean deductInventory(Long productId, Integer quantity) {
        log.info("开始扣减库存,productId: {}, quantity: {}", productId, quantity);

        // 参数校验
        if (ObjectUtils.isEmpty(productId)) {
            throw new IllegalArgumentException("商品ID不能为空");
        }
        if (quantity == null || quantity <= 0) {
            throw new IllegalArgumentException("数量必须大于0");
        }

        // 查询库存
        Inventory inventory = inventoryMapper.selectOne(Wrappers.<Inventory>lambdaQuery()
                .eq(Inventory::getProductId, productId));

        if (ObjectUtils.isEmpty(inventory)) {
            log.error("商品不存在,productId: {}", productId);
            return false;
        }

        // 检查库存是否充足
        if (inventory.getQuantity() < quantity) {
            log.error("库存不足,productId: {}, 可用库存: {}, 需要: {}", 
                    productId, inventory.getQuantity(), quantity);
            return false;
        }

        // 扣减库存
        int updateCount = inventoryMapper.update(Wrappers.<Inventory>lambdaUpdate()
                .set(Inventory::setQuantity, inventory.getQuantity() - quantity)
                .eq(Inventory::getId, inventory.getId())
                .eq(Inventory::getQuantity, inventory.getQuantity()));

        if (updateCount <= 0) {
            log.error("库存扣减失败,可能并发修改,productId: {}", productId);
            return false;
        }

        log.info("库存扣减成功,productId: {}, quantity: {}", productId, quantity);
        return true;
    }
}

用户账户服务和支付服务的实现与库存服务类似,都是在本地事务方法中实现业务逻辑,这里不再重复列出。

5. 控制器实现

订单控制器(增加 Seata 接口):

代码语言:javascript
复制
import com.example.order.service.OrderService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 订单控制器
 *
 * @author ken
 */
@Slf4j
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
@Tag(name = "订单管理", description = "订单相关接口")
public class OrderController {
    private final OrderService orderService;

    // 省略其他接口...

    /**
     * 创建订单(Seata分布式事务)
     *
     * @param userId 用户ID
     * @param productId 商品ID
     * @param quantity 购买数量
     * @return 订单ID
     */
    @PostMapping("/seata")
    @Operation(summary = "创建订单(Seata)", description = "使用Seata AT模式创建新订单并处理分布式事务")
    public Long createOrderWithSeata(
            @Parameter(description = "用户ID", required = true) @RequestParam Long userId,
            @Parameter(description = "商品ID", required = true) @RequestParam Long productId,
            @Parameter(description = "购买数量", required = true) @RequestParam Integer quantity) {
        return orderService.createOrderWithSeata(userId, productId, quantity);
    }
}
代码语言:javascript
复制

6. Feign 客户端定义

订单服务调用其他服务的 Feign 客户端:

代码语言:javascript
复制
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import java.math.BigDecimal;

/**
 * 库存服务Feign客户端
 *
 * @author ken
 */
@FeignClient(name = "inventory-service")
public interface InventoryFeignClient {

    /**
     * 扣减库存
     *
     * @param productId 商品ID
     * @param quantity 扣减数量
     * @return 是否成功
     */
    @PostMapping("/api/inventory/deduct")
    boolean deductInventory(@RequestParam("productId") Long productId, @RequestParam("quantity") Integer quantity);
}

/**
 * 用户账户服务Feign客户端
 *
 * @author ken
 */
@FeignClient(name = "user-service")
public interface UserAccountFeignClient {

    /**
     * 扣减用户余额
     *
     * @param userId 用户ID
     * @param amount 扣减金额
     * @return 是否成功
     */
    @PostMapping("/api/user/account/deduct")
    boolean deductBalance(@RequestParam("userId") Long userId, @RequestParam("amount") BigDecimal amount);
}

/**
 * 支付服务Feign客户端
 *
 * @author ken
 */
@FeignClient(name = "payment-service")
public interface PaymentFeignClient {

    /**
     * 创建支付记录
     *
     * @param orderId 订单ID
     * @param userId 用户ID
     * @param amount 支付金额
     * @return 是否成功
     */
    @PostMapping("/api/payment/create")
    boolean createPayment(@RequestParam("orderId") Long orderId, 
                          @RequestParam("userId") Long userId, 
                          @RequestParam("amount") BigDecimal amount);
}
代码语言:javascript
复制

Seata 方案的优缺点分析

优点:

  1. 易用性好,对业务代码侵入性小,只需添加注解
  2. 功能强大,支持多种事务模式(AT、TCC、SAGA、XA)
  3. 社区活跃,文档丰富,问题解决方便
  4. 性能较好,尤其是 AT 模式采用了异步提交
  5. 与 Spring Cloud 等微服务框架集成良好

缺点:

  1. 需要部署和维护 Seata 服务器(TC),增加了系统复杂度
  2. 对数据库有侵入性,需要创建 undo_log 表
  3. AT 模式只支持部分数据库(MySQL、Oracle 等)
  4. 在高并发场景下,全局锁可能成为性能瓶颈
  5. 学习成本相对较高,需要理解 Seata 的工作原理

Seata 的适用场景

  1. 对开发效率要求高,希望快速实现分布式事务
  2. 业务代码已经相对成熟,不希望做大的改动
  3. 团队技术实力参差不齐,需要简单易用的方案
  4. 业务场景可能会变化,需要灵活支持多种事务模式
  5. 中低并发场景,或者可以接受一定性能损耗的高并发场景

三种方案的对比与选型指南

现在我们已经详细介绍了本地消息表、TCC 模式和 Seata 三种分布式事务解决方案,下面对它们进行全面对比,并给出选型建议。

功能特性对比

特性

本地消息表

TCC 模式

Seata

一致性保证

最终一致性

强一致性(Try 阶段)

强一致性(AT 模式)

实现复杂度

业务侵入性

性能

很高

适用场景

异步场景

同步场景

同步场景

代码改动量

学习成本

部署复杂度

重试机制

需要自己实现

需要自己实现

内置

幂等处理

需要自己实现

需要自己实现

内置

跨语言支持

一般(主要支持 Java)

性能对比

在性能方面,三种方案的表现由高到低依次是:

  1. TCC 模式:性能最高,没有全局锁,各阶段可以并行执行
  2. 本地消息表:性能次之,基于消息队列异步通信,没有锁竞争
  3. Seata AT 模式:性能相对较低,需要全局锁和日志记录

性能测试数据(仅供参考):

  • TCC 模式:约 1000-2000 TPS
  • 本地消息表:约 800-1500 TPS
  • Seata AT 模式:约 500-1000 TPS

实际性能会受到网络延迟、数据库性能、业务复杂度等多种因素影响。

故障处理能力对比

故障场景

本地消息表

TCC 模式

Seata

服务宕机

消息表持久化,恢复后可重试

协调器记录状态,恢复后可继续

TC 记录状态,恢复后可继续

网络分区

消息队列缓存,分区恢复后同步

协调器重试,最终一致性

TC 重试,最终一致性

数据库宕机

本地事务保证,恢复后可继续

Try 阶段预留资源,恢复后可继续

基于 undo_log 回滚,恢复后可继续

消息丢失

定时任务重试,保证最终送达

协调器重试机制

全局事务回滚机制

选型决策指南

选择分布式事务方案时,应综合考虑以下因素:

  1. 业务一致性要求
    • 要求强一致性:选择 TCC 或 Seata AT 模式
    • 可接受最终一致性:选择本地消息表
  2. 性能要求
    • 高并发核心业务:选择 TCC 模式
    • 一般性能要求:选择 Seata 或本地消息表
  3. 开发成本
    • 希望快速实现:选择 Seata AT 模式
    • 团队有能力定制:选择 TCC 或本地消息表
  4. 系统复杂度
    • 不想引入中间件:选择本地消息表或 TCC
    • 可以接受中间件:选择 Seata
  5. 业务场景
    • 同步调用为主:选择 TCC 或 Seata
    • 异步调用为主:选择本地消息表
  6. 团队技术栈
    • 熟悉 Spring Cloud:选择 Seata
    • 熟悉消息队列:选择本地消息表
    • 有能力设计补偿逻辑:选择 TCC

典型场景推荐方案

  1. 电商下单流程
    • 推荐:本地消息表或 Seata AT 模式
    • 理由:下单流程涉及多个服务,但可接受最终一致性,追求开发效率
  2. 支付交易系统
    • 推荐:TCC 模式
    • 理由:对一致性和性能要求极高,需要精细控制事务过程
  3. 物流跟踪系统
    • 推荐:本地消息表
    • 理由:异步场景为主,可接受最终一致性,追求高可用性
  4. 金融核心系统
    • 推荐:TCC 模式或 Seata XA 模式
    • 理由:对一致性要求极高,不允许数据不一致
  5. 内容发布系统
    • 推荐:本地消息表
    • 理由:异步场景,可接受最终一致性,追求高吞吐量

总结与展望

分布式事务是微服务架构中的一个核心难题,没有放之四海而皆准的完美解决方案。本文介绍的三种方案各有优缺点,适用于不同的业务场景:

  • 本地消息表:最简单、最容易理解的方案,适合对一致性要求不高的异步场景
  • TCC 模式:性能最高、最灵活的方案,适合对一致性和性能要求都很高的核心业务
  • Seata:最易用、功能最全面的方案,适合大多数需要快速实现分布式事务的场景

在实际项目中,我们往往需要根据具体业务场景,灵活选择甚至组合使用这些方案。例如,核心交易流程使用 TCC 保证强一致性,而非核心的通知、日志等流程使用本地消息表保证最终一致性。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-09-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 果酱带你啃java 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 分布式事务的本质:从 CAP 理论说起
  • 分布式事务的 ACID 与 BASE
  • 案例场景:贯穿全文的业务示例
  • 方案一:本地消息表(最经典的最终一致性方案)
    • 本地消息表的核心原理
    • 本地消息表示例实现
      • 1. 数据库设计
      • 2. 核心依赖
      • 3. 实体类定义
      • 4. Mapper 层定义
      • 5. 服务层实现
      • 6. 控制器实现
      • 7. RabbitMQ 配置
    • 本地消息表方案的优缺点分析
    • 本地消息表的适用场景
  • 方案二:TCC 模式(最高性能的分布式事务方案)
    • TCC 模式的核心原理
    • TCC 模式示例实现
      • 1. 数据库设计
      • 2. 核心依赖
      • 3. 实体类定义
      • 4. TCC 核心接口定义
      • 5. 服务实现
      • 6. 控制器实现
    • TCC 模式的优缺点分析
    • TCC 模式的适用场景
  • 方案三:Seata(最易用的分布式事务框架)
    • Seata 的核心原理
    • Seata 示例实现
      • 1. 数据库设计
      • 2. 核心依赖
      • 3. Seata 配置
      • 4. 服务实现
      • 5. 控制器实现
      • 6. Feign 客户端定义
    • Seata 方案的优缺点分析
    • Seata 的适用场景
  • 三种方案的对比与选型指南
    • 功能特性对比
    • 性能对比
    • 故障处理能力对比
    • 选型决策指南
    • 典型场景推荐方案
  • 总结与展望
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档