首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >定时任务调用第三方接口全攻略:从 Token 管理到数据安全落地的实战指南

定时任务调用第三方接口全攻略:从 Token 管理到数据安全落地的实战指南

作者头像
果酱带你啃java
发布2026-04-14 12:36:21
发布2026-04-14 12:36:21
360
举报

在现代企业级应用开发中,定时任务调用第三方接口并将数据同步到本地数据库是一种常见的业务场景。无论是电商平台同步物流信息、金融系统获取汇率数据,还是企业 ERP 系统对接第三方服务,都离不开这种数据交互模式。然而,这个看似简单的流程背后隐藏着诸多技术挑战:如何安全高效地管理第三方接口的 Token?如何保证接口调用的幂等性?如何处理网络波动导致的调用失败?如何确保数据最终一致性?

我将在本文中系统梳理定时任务调用第三方接口时需要注意的核心问题,并提供经过生产环境验证的解决方案。本文不仅涵盖理论分析,还包含完整的代码实现,旨在帮助开发者构建健壮、可靠的集成系统。

一、系统设计概述

1.1 整体架构

在开始讨论具体实现之前,我们先来看一下典型的定时任务调用第三方接口的系统架构:

1.2 核心流程

整个系统的核心流程如下:

1.3 技术选型

基于 Java 生态,我们选择以下技术组件:

  1. 定时任务调度Spring Schedule + Quartz(复杂场景)
  2. HTTP 客户端Spring Cloud OpenFeign
  3. Token 管理Redis + 本地缓存
  4. 数据库访问Spring Data JPA + MyBatis
  5. 重试机制Spring Retry + 自定义重试策略
  6. 日志框架SLF4J + Logback
  7. 监控告警Spring Boot Actuator + Prometheus + Grafana

二、Token 管理与续期策略

2.1 Token 的重要性与常见问题

第三方接口通常采用 Token 进行身份验证,这是一种基于 OAuth 2.0 等协议的安全机制(IETF RFC 6749)。在定时任务场景中,Token 管理面临以下挑战:

  1. Token 过期导致接口调用失败
  2. 频繁刷新 Token 导致第三方服务压力
  3. 分布式环境下 Token 一致性问题
  4. Token 泄露风险

2.2 Token 管理方案设计

一个健壮的 Token 管理方案应包含以下核心功能:

  1. Token 获取与存储
  2. 过期自动检测与刷新
  3. 分布式环境下的同步机制
  4. 异常情况下的降级策略
2.2.1 Token 实体设计

首先定义 Token 实体类:

代码语言:javascript
复制
import lombok.Data;
import java.time.LocalDateTime;

/**
 * 第三方接口访问令牌
 *
 * @author 技术专家
 */
@Data
public class ApiToken {
    /**
     * 访问令牌
     */
    private String accessToken;

    /**
     * 刷新令牌
     */
    private String refreshToken;

    /**
     * 令牌类型
     */
    private String tokenType;

    /**
     * 过期时间(秒)
     */
    private Integer expiresIn;

    /**
     * 令牌获取时间
     */
    private LocalDateTime obtainTime;

    /**
     * 令牌过期时间
     */
    private LocalDateTime expireTime;

    /**
     * 检查令牌是否有效
     * 
     * @return true-有效,false-无效
     */
    public boolean isValid() {
        // 提前60秒过期,避免网络延迟等问题导致的令牌无效
        return LocalDateTime.now().plusSeconds(60).isBefore(expireTime);
    }
}
代码语言:javascript
复制

2.2.2 Token 管理器实现
代码语言:javascript
复制
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 第三方接口Token管理器
 * 负责Token的获取、刷新、存储和有效性检查
 *
 * @author 技术专家
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class TokenManager {
    /**
     * Redis中存储Token的键前缀
     */
    private static final String TOKEN_KEY_PREFIX = "api:token:";

    /**
     * 分布式环境下刷新Token的锁键
     */
    private static final String TOKEN_REFRESH_LOCK_KEY = "api:token:refresh:lock";

    /**
     * 默认的Token存储过期时间(小时)
     */
    private static final int DEFAULT_TOKEN_EXPIRE_HOURS = 24;

    private final RedisTemplate<String, Object> redisTemplate;
    private final ThirdPartyApiClient thirdPartyApiClient;

    /**
     * 本地锁,防止单JVM内重复刷新Token
     */
    private final ReentrantLock localLock = new ReentrantLock();

    /**
     * 获取有效的访问令牌
     * 
     * @param apiKey 第三方接口的API Key,用于区分不同的接口服务
     * @return 有效的访问令牌
     */
    public String getValidAccessToken(String apiKey) {
        Objects.requireNonNull(apiKey, "API Key不能为空");

        ApiToken apiToken = getStoredToken(apiKey);

        // 如果Token不存在或已过期,则刷新Token
        if (Objects.isNull(apiToken) || !apiToken.isValid()) {
            log.info("Token不存在或已过期,需要刷新,apiKey:{}", apiKey);
            apiToken = refreshAccessToken(apiKey);
        }

        return apiToken.getAccessToken();
    }

    /**
     * 从存储中获取Token
     * 
     * @param apiKey API Key
     * @return Token对象,如果不存在则返回null
     */
    private ApiToken getStoredToken(String apiKey) {
        String tokenKey = buildTokenKey(apiKey);
        Object tokenObj = redisTemplate.opsForValue().get(tokenKey);

        if (Objects.isNull(tokenObj)) {
            log.debug("Redis中未找到Token,apiKey:{}", apiKey);
            return null;
        }

        try {
            return (ApiToken) tokenObj;
        } catch (ClassCastException e) {
            log.error("Token类型转换失败,apiKey:{}", apiKey, e);
            // 清除错误的Token数据
            redisTemplate.delete(tokenKey);
            return null;
        }
    }

    /**
     * 刷新访问令牌
     * 采用分布式锁保证同一时间只有一个实例在刷新Token
     * 
     * @param apiKey API Key
     * @return 新的Token对象
     */
    private ApiToken refreshAccessToken(String apiKey) {
        // 先尝试获取本地锁,减少分布式锁的竞争
        if (!localLock.tryLock()) {
            log.info("本地锁已被占用,等待其他线程刷新Token,apiKey:{}", apiKey);
            // 等待本地锁释放后再尝试获取Token
            try {
                TimeUnit.MILLISECONDS.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.error("等待本地锁释放时被中断", e);
            }
            return getStoredToken(apiKey);
        }

        try {
            // 获取分布式锁,防止集群环境下多个实例同时刷新Token
            Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(
                    TOKEN_REFRESH_LOCK_KEY + apiKey,
                    "LOCK",
                    30,
                    TimeUnit.SECONDS
            );

            if (Boolean.FALSE.equals(lockAcquired)) {
                log.info("分布式锁已被占用,等待其他实例刷新Token,apiKey:{}", apiKey);
                // 等待一段时间后再尝试获取Token
                try {
                    TimeUnit.MILLISECONDS.sleep(200);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.error("等待分布式锁释放时被中断", e);
                }
                return getStoredToken(apiKey);
            }

            // 双重检查,防止重复刷新
            ApiToken existingToken = getStoredToken(apiKey);
            if (Objects.nonNull(existingToken) && existingToken.isValid()) {
                log.info("其他线程已刷新Token,直接使用,apiKey:{}", apiKey);
                return existingToken;
            }

            log.info("开始刷新Token,apiKey:{}", apiKey);
            ApiToken newToken = fetchNewToken(apiKey);

            if (Objects.isNull(newToken) || StringUtils.isBlank(newToken.getAccessToken())) {
                log.error("刷新Token失败,获取到的Token为空,apiKey:{}", apiKey);
                throw new TokenRefreshException("刷新Token失败,获取到的Token为空");
            }

            // 计算Token过期时间
            LocalDateTime obtainTime = LocalDateTime.now();
            newToken.setObtainTime(obtainTime);
            newToken.setExpireTime(obtainTime.plusSeconds(newToken.getExpiresIn()));

            // 存储新Token
            storeToken(apiKey, newToken);

            log.info("Token刷新成功,apiKey:{},过期时间:{}", apiKey, newToken.getExpireTime());
            return newToken;
        } finally {
            // 释放本地锁
            localLock.unlock();
            // 释放分布式锁
            redisTemplate.delete(TOKEN_REFRESH_LOCK_KEY + apiKey);
        }
    }

    /**
     * 调用第三方接口获取新的Token
     * 
     * @param apiKey API Key
     * @return 新的Token对象
     */
    private ApiToken fetchNewToken(String apiKey) {
        try {
            // 实际项目中需要根据第三方接口的要求构建请求参数
            TokenRequest request = new TokenRequest();
            request.setApiKey(apiKey);
            // 设置其他必要参数...

            TokenResponse response = thirdPartyApiClient.getToken(request);

            if (Objects.isNull(response) || !response.isSuccess()) {
                log.error("调用第三方接口获取Token失败,apiKey:{}, 响应:{}", apiKey, response);
                return null;
            }

            // 转换响应为ApiToken对象
            ApiToken apiToken = new ApiToken();
            apiToken.setAccessToken(response.getAccessToken());
            apiToken.setRefreshToken(response.getRefreshToken());
            apiToken.setTokenType(response.getTokenType());
            apiToken.setExpiresIn(response.getExpiresIn());

            return apiToken;
        } catch (Exception e) {
            log.error("获取新Token时发生异常,apiKey:{}", apiKey, e);
            return null;
        }
    }

    /**
     * 存储Token到Redis
     * 
     * @param apiKey API Key
     * @param token Token对象
     */
    private void storeToken(String apiKey, ApiToken token) {
        String tokenKey = buildTokenKey(apiKey);
        // 计算Token在Redis中的过期时间,取实际过期时间和默认过期时间的较小值
        long expireSeconds = Duration.between(LocalDateTime.now(), token.getExpireTime()).getSeconds();
        long redisExpireSeconds = Math.min(expireSeconds, DEFAULT_TOKEN_EXPIRE_HOURS * 3600L);

        redisTemplate.opsForValue().set(tokenKey, token, redisExpireSeconds, TimeUnit.SECONDS);
        log.debug("Token已存储到Redis,apiKey:{}, 过期时间:{}秒", apiKey, redisExpireSeconds);
    }

    /**
     * 构建Redis中的Token键
     * 
     * @param apiKey API Key
     * @return 构建后的键
     */
    private String buildTokenKey(String apiKey) {
        return TOKEN_KEY_PREFIX + apiKey;
    }

    /**
     * 手动刷新Token(用于紧急情况)
     * 
     * @param apiKey API Key
     * @return 新的Token对象
     */
    public ApiToken manualRefreshToken(String apiKey) {
        Objects.requireNonNull(apiKey, "API Key不能为空");

        // 先删除旧的Token
        String tokenKey = buildTokenKey(apiKey);
        redisTemplate.delete(tokenKey);
        log.info("已删除旧的Token,准备手动刷新,apiKey:{}", apiKey);

        // 刷新并返回新的Token
        return refreshAccessToken(apiKey);
    }
}
代码语言:javascript
复制

2.2.3 自定义异常类
代码语言:javascript
复制
/**
 * Token刷新异常
 *
 * @author 技术专家
 */
public class TokenRefreshException extends RuntimeException {
    public TokenRefreshException() {
        super();
    }

    public TokenRefreshException(String message) {
        super(message);
    }

    public TokenRefreshException(String message, Throwable cause) {
        super(message, cause);
    }
}
代码语言:javascript
复制

2.3 Token 管理的最佳实践

  1. 提前刷新策略:如示例中所示,设置提前 60 秒过期,避免因网络延迟等问题导致的令牌无效。
  2. 多级缓存机制:在分布式环境下,可以结合本地缓存(如 Caffeine)和分布式缓存(如 Redis),减少缓存访问压力。
  3. 限流保护:对第三方的 Token 接口设置调用频率限制,避免触发对方的限流措施。
  4. 监控告警:对 Token 刷新失败、频繁刷新等情况设置监控指标和告警阈值。
  5. 容灾备份:在极端情况下,考虑使用备用的 API 密钥和 Token。

三、接口幂等性保证

3.1 幂等性的概念与重要性

幂等性是指一次或多次执行相同的操作,其结果是相同的(W3C 规范)。在定时任务调用第三方接口的场景中,保证幂等性至关重要,因为:

  1. 网络波动可能导致重试机制多次调用接口
  2. 分布式环境下可能出现任务重复执行
  3. 第三方接口可能重复返回相同数据

不保证幂等性可能导致数据重复、状态不一致、业务逻辑错误等严重问题。

3.2 幂等性实现方案

常见的幂等性实现方案有以下几种:

  1. 基于唯一标识的幂等为每个请求生成唯一标识,通过检查该标识判断是否已处理
  2. 基于状态机的幂等通过状态流转控制,确保相同操作在特定状态下只能执行一次
  3. 基于乐观锁的幂等通过版本号控制,确保并发操作的正确性
  4. 基于分布式锁的幂等通过分布式锁控制同一时间只有一个操作能执行

在定时任务场景中,我们通常采用基于唯一标识的幂等方案,结合数据库唯一约束实现。

3.2.1 幂等性记录表设计

首先创建幂等性记录的数据表:

代码语言:javascript
复制
CREATE TABLE `idempotent_record` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `business_type` varchar(64) NOT NULL COMMENT '业务类型',
  `request_id` varchar(64) NOT NULL COMMENT '请求唯一标识',
  `status` tinyint NOT NULL COMMENT '处理状态:0-处理中,1-处理成功,2-处理失败',
  `response_data` text COMMENT '响应数据',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `expire_time` datetime DEFAULT NULL COMMENT '过期时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_business_request` (`business_type`,`request_id`) COMMENT '业务类型+请求ID唯一索引,保证幂等性',
  KEY `idx_expire_time` (`expire_time`) COMMENT '过期时间索引,用于清理过期数据'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='幂等性记录表';
代码语言:javascript
复制

3.2.2 幂等性实体与 Repository
代码语言:javascript
复制
import lombok.Data;
import org.hibernate.annotations.DynamicInsert;
import org.hibernate.annotations.DynamicUpdate;
import javax.persistence.*;
import java.time.LocalDateTime;

/**
 * 幂等性记录实体
 *
 * @author 技术专家
 */
@Data
@Entity
@Table(name = "idempotent_record")
@DynamicInsert
@DynamicUpdate
public class IdempotentRecord {
    /**
     * 处理状态:处理中
     */
    public static final int STATUS_PROCESSING = 0;

    /**
     * 处理状态:处理成功
     */
    public static final int STATUS_SUCCESS = 1;

    /**
     * 处理状态:处理失败
     */
    public static final int STATUS_FAILED = 2;

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(name = "business_type", nullable = false, length = 64)
    private String businessType;

    @Column(name = "request_id", nullable = false, length = 64)
    private String requestId;

    @Column(name = "status", nullable = false)
    private Integer status;

    @Column(name = "response_data", columnDefinition = "text")
    private String responseData;

    @Column(name = "create_time", nullable = false, updatable = false)
    private LocalDateTime createTime;

    @Column(name = "update_time", nullable = false)
    private LocalDateTime updateTime;

    @Column(name = "expire_time")
    private LocalDateTime expireTime;
}
代码语言:javascript
复制

代码语言:javascript
复制
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.Optional;

/**
 * 幂等性记录数据访问接口
 *
 * @author 技术专家
 */
public interface IdempotentRecordRepository extends JpaRepository<IdempotentRecord, Long> {
    /**
     * 根据业务类型和请求ID查询幂等性记录
     *
     * @param businessType 业务类型
     * @param requestId 请求ID
     * @return 幂等性记录
     */
    Optional<IdempotentRecord> findByBusinessTypeAndRequestId(String businessType, String requestId);

    /**
     * 尝试插入幂等性记录,使用数据库唯一约束保证原子性
     *
     * @param businessType 业务类型
     * @param requestId 请求ID
     * @param status 状态
     * @param expireTime 过期时间
     * @return 影响的行数,1-成功,0-失败
     */
    @Modifying
    @Query(value = "INSERT IGNORE INTO idempotent_record " +
                   "(business_type, request_id, status, create_time, update_time, expire_time) " +
                   "VALUES (:businessType, :requestId, :status, NOW(), NOW(), :expireTime)",
           nativeQuery = true)
    int insertIgnore(@Param("businessType") String businessType,
                    @Param("requestId") String requestId,
                    @Param("status") Integer status,
                    @Param("expireTime") LocalDateTime expireTime);

    /**
     * 更新幂等性记录的状态和响应数据
     *
     * @param id ID
     * @param status 状态
     * @param responseData 响应数据
     * @return 影响的行数
     */
    @Modifying
    @Query("UPDATE IdempotentRecord SET status = :status, responseData = :responseData, " +
           "updateTime = CURRENT_TIMESTAMP WHERE id = :id")
    int updateStatusAndResponseData(@Param("id") Long id,
                                   @Param("status") Integer status,
                                   @Param("responseData") String responseData);
}
代码语言:javascript
复制

3.2.3 幂等处理器实现
代码语言:javascript
复制
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;

/**
 * 幂等处理器
 * 负责处理接口调用的幂等性保证
 *
 * @author 技术专家
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class IdempotentProcessor {
    /**
     * 幂等记录默认过期时间(小时)
     */
    private static final int DEFAULT_EXPIRE_HOURS = 24;

    private final IdempotentRecordRepository idempotentRecordRepository;
    private final ObjectMapper objectMapper;

    /**
     * 执行幂等操作
     *
     * @param businessType 业务类型
     * @param requestId 请求ID,如果为null则自动生成
     * @param operation 要执行的操作
     * @param <T> 操作返回值类型
     * @return 操作结果
     */
    @Transactional(rollbackFor = Exception.class)
    public <T> T executeIdempotentOperation(String businessType, String requestId, Supplier<T> operation) {
        Objects.requireNonNull(businessType, "业务类型不能为空");

        // 如果请求ID为null,则自动生成一个UUID作为请求ID
        String actualRequestId = StringUtils.hasText(requestId) ? requestId : generateRequestId();
        log.info("开始执行幂等操作,businessType:{}, requestId:{}", businessType, actualRequestId);

        // 检查是否已经处理过
        Optional<IdempotentRecord> existingRecord = idempotentRecordRepository
                .findByBusinessTypeAndRequestId(businessType, actualRequestId);

        if (existingRecord.isPresent()) {
            IdempotentRecord record = existingRecord.get();
            log.info("幂等操作已存在,businessType:{}, requestId:{}, status:{}",
                    businessType, actualRequestId, record.getStatus());

            // 根据不同状态返回不同结果
            switch (record.getStatus()) {
                case IdempotentRecord.STATUS_SUCCESS:
                    // 如果已经成功,返回之前的结果
                    return parseResponseData(record.getResponseData());
                case IdempotentRecord.STATUS_PROCESSING:
                    // 如果正在处理中,抛出异常,由上层决定是否重试
                    throw new IdempotentProcessingException(
                            String.format("幂等操作正在处理中,businessType:%s, requestId:%s",
                                    businessType, actualRequestId));
                case IdempotentRecord.STATUS_FAILED:
                    // 如果之前处理失败,可以重新尝试处理
                    log.info("幂等操作之前处理失败,将重新处理,businessType:{}, requestId:{}",
                            businessType, actualRequestId);
                    break;
                default:
                    log.warn("未知的幂等操作状态,将重新处理,businessType:{}, requestId:{}, status:{}",
                            businessType, actualRequestId, record.getStatus());
                    break;
            }
        }

        // 尝试插入幂等记录,标记为处理中
        LocalDateTime expireTime = LocalDateTime.now().plusHours(DEFAULT_EXPIRE_HOURS);
        int insertResult = idempotentRecordRepository.insertIgnore(
                businessType, actualRequestId, IdempotentRecord.STATUS_PROCESSING, expireTime);

        if (insertResult == 0) {
            // 插入失败,说明其他线程已经插入了记录,需要重新检查
            log.info("幂等记录插入失败,可能有其他线程正在处理,将重新检查,businessType:{}, requestId:{}",
                    businessType, actualRequestId);
            return executeIdempotentOperation(businessType, actualRequestId, operation);
        }

        log.info("幂等记录插入成功,开始执行实际操作,businessType:{}, requestId:{}",
                businessType, actualRequestId);

        try {
            // 执行实际操作
            T result = operation.get();

            // 更新记录状态为成功,并保存响应数据
            IdempotentRecord record = idempotentRecordRepository
                    .findByBusinessTypeAndRequestId(businessType, actualRequestId)
                    .orElseThrow(() -> new RuntimeException(
                            String.format("幂等记录插入后查询不到,businessType:%s, requestId:%s",
                                    businessType, actualRequestId)));

            String responseData = convertToJson(result);
            idempotentRecordRepository.updateStatusAndResponseData(
                    record.getId(), IdempotentRecord.STATUS_SUCCESS, responseData);

            log.info("幂等操作执行成功,businessType:{}, requestId:{}", businessType, actualRequestId);
            return result;
        } catch (Exception e) {
            log.error("幂等操作执行失败,businessType:{}, requestId:{}",
                    businessType, actualRequestId, e);

            // 更新记录状态为失败
            Optional<IdempotentRecord> recordOpt = idempotentRecordRepository
                    .findByBusinessTypeAndRequestId(businessType, actualRequestId);

            recordOpt.ifPresent(record -> {
                idempotentRecordRepository.updateStatusAndResponseData(
                        record.getId(), IdempotentRecord.STATUS_FAILED, e.getMessage());
            });

            // 重新抛出异常,由上层处理
            throw e;
        }
    }

    /**
     * 生成请求ID
     *
     * @return 生成的请求ID
     */
    private String generateRequestId() {
        return UUID.randomUUID().toString().replaceAll("-", "");
    }

    /**
     * 将对象转换为JSON字符串
     *
     * @param obj 要转换的对象
     * @return JSON字符串
     */
    private String convertToJson(Object obj) {
        if (Objects.isNull(obj)) {
            return null;
        }

        try {
            return objectMapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            log.error("对象转换为JSON失败", e);
            return null;
        }
    }

    /**
     * 将JSON字符串解析为对象
     *
     * @param json JSON字符串
     * @param <T> 目标对象类型
     * @return 解析后的对象
     */
    @SuppressWarnings("unchecked")
    private <T> T parseResponseData(String json) {
        if (StringUtils.isBlank(json)) {
            return null;
        }

        try {
            return (T) objectMapper.readValue(json, Object.class);
        } catch (Exception e) {
            log.error("解析JSON响应数据失败,json:{}", json, e);
            return null;
        }
    }
}
代码语言:javascript
复制

3.2.4 幂等相关异常类
代码语言:javascript
复制
/**
 * 幂等操作处理中异常
 * 当检测到幂等操作正在处理中时抛出
 *
 * @author 技术专家
 */
public class IdempotentProcessingException extends RuntimeException {
    public IdempotentProcessingException() {
        super();
    }

    public IdempotentProcessingException(String message) {
        super(message);
    }

    public IdempotentProcessingException(String message, Throwable cause) {
        super(message, cause);
    }
}
代码语言:javascript
复制

3.3 幂等性实现的最佳实践

  1. 请求 ID 的生成策略
    • 优先使用第三方系统提供的唯一标识
    • 自定义生成时,使用 UUID 或雪花算法保证唯一性
    • 对于有业务含义的 ID,确保其全局唯一性
  2. 过期时间设置
    • 根据业务特点设置合理的过期时间
    • 定期清理过期的幂等记录,避免表过大
  3. 分布式环境考虑
    • 利用数据库唯一约束保证原子性
    • 避免使用内存锁作为唯一的并发控制手段
  4. 异常处理
    • 对处理中的状态设置超时机制
    • 提供手动干预的接口,处理异常状态的幂等记录

四、调用失败重试机制

4.1 重试机制的设计原则

在分布式系统中,网络抖动、服务临时不可用等情况时有发生。合理的重试机制可以提高系统的健壮性,但不当的重试策略可能导致:

  1. 重试风暴:大量重试请求压垮第三方服务
  2. 资源耗尽:客户端因大量重试消耗过多资源
  3. 数据不一致:多次重试导致业务状态混乱

根据《容错模式》(Martin Fowler)中的建议,重试机制应遵循以下原则:

  1. 只对幂等操作进行重试
  2. 设置合理的重试次数上限
  3. 使用指数退避策略,避免重试风暴
  4. 对不同类型的异常区别对待

4.2 重试机制实现方案

我们将实现一个灵活的重试框架,结合 Spring Retry 和自定义策略,满足不同场景的需求。

4.2.1 重试策略配置
代码语言:javascript
复制
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * 重试策略配置属性
 *
 * @author 技术专家
 */
@Data
@Component
@ConfigurationProperties(prefix = "api.retry")
public class RetryStrategyProperties {
    /**
     * 是否启用重试
     */
    private boolean enabled = true;

    /**
     * 最大重试次数
     */
    private int maxAttempts = 3;

    /**
     * 初始重试间隔(毫秒)
     */
    private long initialInterval = 1000;

    /**
     * 重试间隔乘数
     */
    private double multiplier = 2.0;

    /**
     * 最大重试间隔(毫秒)
     */
    private long maxInterval = 5000;

    /**
     * 是否启用随机抖动
     */
    private boolean random = true;

    /**
     * 随机抖动因子(0.0-1.0)
     */
    private double randomFactor = 0.5;
}
代码语言:javascript
复制

4.2.2 重试工具类实现
代码语言:javascript
复制
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;

/**
 * 重试工具类
 * 提供灵活的重试机制,支持自定义重试策略和异常类型
 *
 * @author 技术专家
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class RetryUtils {
    private final RetryStrategyProperties retryProperties;

    /**
     * 执行带重试的操作
     * 使用默认的重试策略和异常类型
     *
     * @param operation 要执行的操作
     * @param <T> 操作返回值类型
     * @return 操作结果
     */
    public <T> T executeWithRetry(Supplier<T> operation) {
        return executeWithRetry(operation, getDefaultRetryableExceptions());
    }

    /**
     * 执行带重试的操作
     * 使用默认的重试策略和指定的异常类型
     *
     * @param operation 要执行的操作
     * @param retryableExceptions 需要重试的异常类型
     * @param <T> 操作返回值类型
     * @return 操作结果
     */
    public <T> T executeWithRetry(Supplier<T> operation, Set<Class<? extends Throwable>> retryableExceptions) {
        if (!retryProperties.isEnabled()) {
            log.debug("重试机制已禁用,直接执行操作");
            return operation.get();
        }

        // 创建重试模板
        RetryTemplate retryTemplate = new RetryTemplate();

        // 配置重试策略
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(
                retryProperties.getMaxAttempts(),
                Collections.singletonMap(Exception.class, true)
        );
        retryTemplate.setRetryPolicy(retryPolicy);

        // 配置退避策略
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(retryProperties.getInitialInterval());
        backOffPolicy.setMultiplier(retryProperties.getMultiplier());
        backOffPolicy.setMaxInterval(retryProperties.getMaxInterval());
        retryTemplate.setBackOffPolicy(backOffPolicy);

        log.debug("开始执行带重试的操作,最大重试次数:{}, 初始间隔:{}ms",
                retryProperties.getMaxAttempts(), retryProperties.getInitialInterval());

        try {
            return retryTemplate.execute(context -> {
                int attempt = context.getRetryCount() + 1;
                log.debug("执行操作,第{}次尝试", attempt);

                try {
                    return operation.get();
                } catch (Throwable e) {
                    // 检查是否是需要重试的异常类型
                    if (isRetryableException(e, retryableExceptions)) {
                        log.warn("操作执行失败,将进行重试,第{}次尝试,异常信息:{}",
                                attempt, e.getMessage());
                        throw e;
                    } else {
                        log.error("操作执行失败,不进行重试,异常类型:{}", e.getClass().getName());
                        throw e;
                    }
                }
            });
        } catch (Throwable e) {
            log.error("达到最大重试次数,操作执行失败", e);
            throw e;
        }
    }

    /**
     * 获取默认的需要重试的异常类型
     *
     * @return 需要重试的异常类型集合
     */
    private Set<Class<? extends Throwable>> getDefaultRetryableExceptions() {
        Set<Class<? extends Throwable>> exceptions = new HashSet<>();
        exceptions.add(java.net.SocketTimeoutException.class);
        exceptions.add(java.io.IOException.class);
        exceptions.add(org.springframework.web.client.RestClientException.class);
        exceptions.add(IdempotentProcessingException.class);
        return exceptions;
    }

    /**
     * 检查异常是否是需要重试的类型
     *
     * @param e 异常对象
     * @param retryableExceptions 需要重试的异常类型集合
     * @return true-需要重试,false-不需要重试
     */
    private boolean isRetryableException(Throwable e, Set<Class<? extends Throwable>> retryableExceptions) {
        if (CollectionUtils.isEmpty(retryableExceptions)) {
            return false;
        }

        for (Class<? extends Throwable> exceptionClass : retryableExceptions) {
            if (exceptionClass.isInstance(e)) {
                return true;
            }
        }
        return false;
    }
}
代码语言:javascript
复制

4.3 重试机制的最佳实践

  1. 区分可重试与不可重试异常
    • 可重试:网络超时、连接失败、服务暂时不可用等临时性异常
    • 不可重试:业务错误、参数错误、权限不足等确定性异常
  2. 指数退避策略
    • 重试间隔随次数增加而指数增长,如 1s, 2s, 4s, 8s...
    • 设置最大间隔上限,避免间隔过长
    • 加入随机抖动,避免多个客户端同时重试导致的峰值
  3. 结合熔断机制
    • 当第三方服务持续失败时,应触发熔断,暂停重试
    • 可使用 Spring Cloud Circuit Breaker 或 Resilience4j 实现
  4. 重试监控与告警
    • 记录重试次数、成功率等指标
    • 当重试次数超过阈值时触发告警

五、数据一致性保障

5.1 数据一致性的挑战

在定时任务调用第三方接口并同步数据到本地数据库的场景中,数据一致性面临以下挑战:

  1. 第三方接口调用成功,但本地数据库保存失败
  2. 数据库保存成功,但后续业务处理失败
  3. 分布式环境下的并发更新导致数据冲突
  4. 任务中断导致的数据部分更新

根据《数据密集型应用系统设计》(Martin Kleppmann)中的理论,我们需要在一致性、可用性和分区容错性之间做出权衡。

5.2 最终一致性方案

在大多数业务场景中,我们可以采用最终一致性方案,通过以下机制保证数据最终达到一致状态:

  1. 本地消息表:记录接口调用和数据同步状态
  2. 状态机:通过状态流转控制数据处理流程
  3. 补偿机制:对失败的操作进行补偿处理
  4. 定时对账:定期与第三方数据进行比对和修正
5.2.1 同步任务记录表设计
代码语言:javascript
复制
CREATE TABLE `sync_task_record` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `task_name` varchar(64) NOT NULL COMMENT '任务名称',
  `task_id` varchar(64) NOT NULL COMMENT '任务ID',
  `business_id` varchar(64) DEFAULT NULL COMMENT '业务ID',
  `status` tinyint NOT NULL COMMENT '状态:0-初始化,1-接口调用中,2-接口调用成功,3-数据处理中,4-处理成功,5-处理失败',
  `api_request` text COMMENT 'API请求参数',
  `api_response` text COMMENT 'API响应数据',
  `error_msg` text COMMENT '错误信息',
  `retry_count` int NOT NULL DEFAULT '0' COMMENT '重试次数',
  `next_retry_time` datetime DEFAULT NULL COMMENT '下次重试时间',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  `finish_time` datetime DEFAULT NULL COMMENT '完成时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_task_id` (`task_id`) COMMENT '任务ID唯一索引',
  KEY `idx_task_name_status` (`task_name`,`status`) COMMENT '任务名称+状态索引',
  KEY `idx_next_retry_time` (`next_retry_time`) COMMENT '下次重试时间索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='同步任务记录表';
代码语言:javascript
复制

5.2.2 同步任务实体与 Repository
代码语言:javascript
复制
import lombok.Data;
import org.hibernate.annotations.DynamicInsert;
import org.hibernate.annotations.DynamicUpdate;
import javax.persistence.*;
import java.time.LocalDateTime;

/**
 * 同步任务记录实体
 *
 * @author 技术专家
 */
@Data
@Entity
@Table(name = "sync_task_record")
@DynamicInsert
@DynamicUpdate
public class SyncTaskRecord {
    /**
     * 状态:初始化
     */
    public static final int STATUS_INIT = 0;

    /**
     * 状态:接口调用中
     */
    public static final int STATUS_API_CALLING = 1;

    /**
     * 状态:接口调用成功
     */
    public static final int STATUS_API_SUCCESS = 2;

    /**
     * 状态:数据处理中
     */
    public static final int STATUS_DATA_PROCESSING = 3;

    /**
     * 状态:处理成功
     */
    public static final int STATUS_SUCCESS = 4;

    /**
     * 状态:处理失败
     */
    public static final int STATUS_FAILED = 5;

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(name = "task_name", nullable = false, length = 64)
    private String taskName;

    @Column(name = "task_id", nullable = false, length = 64)
    private String taskId;

    @Column(name = "business_id", length = 64)
    private String businessId;

    @Column(name = "status", nullable = false)
    private Integer status;

    @Column(name = "api_request", columnDefinition = "text")
    private String apiRequest;

    @Column(name = "api_response", columnDefinition = "text")
    private String apiResponse;

    @Column(name = "error_msg", columnDefinition = "text")
    private String errorMsg;

    @Column(name = "retry_count", nullable = false)
    private Integer retryCount;

    @Column(name = "next_retry_time")
    private LocalDateTime nextRetryTime;

    @Column(name = "create_time", nullable = false, updatable = false)
    private LocalDateTime createTime;

    @Column(name = "update_time", nullable = false)
    private LocalDateTime updateTime;

    @Column(name = "finish_time")
    private LocalDateTime finishTime;
}
代码语言:javascript
复制

代码语言:javascript
复制
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;

/**
 * 同步任务记录数据访问接口
 *
 * @author 技术专家
 */
public interface SyncTaskRecordRepository extends JpaRepository<SyncTaskRecord, Long>, JpaSpecificationExecutor<SyncTaskRecord> {
    /**
     * 根据任务ID查询任务记录
     *
     * @param taskId 任务ID
     * @return 任务记录
     */
    Optional<SyncTaskRecord> findByTaskId(String taskId);

    /**
     * 查询需要重试的任务
     *
     * @param status 任务状态
     * @param currentTime 当前时间
     * @param limit 最大查询数量
     * @return 需要重试的任务列表
     */
    @Query("SELECT t FROM SyncTaskRecord t WHERE t.status = :status AND t.nextRetryTime <= :currentTime ORDER BY t.nextRetryTime ASC")
    List<SyncTaskRecord> findRetryTasks(@Param("status") Integer status,
                                       @Param("currentTime") LocalDateTime currentTime,
                                       @Param("limit") int limit);

    /**
     * 更新任务状态
     *
     * @param id 任务ID
     * @param status 新状态
     * @param errorMsg 错误信息
     * @param finishTime 完成时间
     * @return 影响的行数
     */
    @Modifying
    @Query("UPDATE SyncTaskRecord SET status = :status, errorMsg = :errorMsg, " +
           "finishTime = :finishTime, updateTime = CURRENT_TIMESTAMP WHERE id = :id")
    int updateStatus(@Param("id") Long id,
                    @Param("status") Integer status,
                    @Param("errorMsg") String errorMsg,
                    @Param("finishTime") LocalDateTime finishTime);

    /**
     * 更新任务重试信息
     *
     * @param id 任务ID
     * @param status 新状态
     * @param retryCount 重试次数
     * @param nextRetryTime 下次重试时间
     * @param errorMsg 错误信息
     * @return 影响的行数
     */
    @Modifying
    @Query("UPDATE SyncTaskRecord SET status = :status, retryCount = :retryCount, " +
           "nextRetryTime = :nextRetryTime, errorMsg = :errorMsg, " +
           "updateTime = CURRENT_TIMESTAMP WHERE id = :id")
    int updateRetryInfo(@Param("id") Long id,
                       @Param("status") Integer status,
                       @Param("retryCount") Integer retryCount,
                       @Param("nextRetryTime") LocalDateTime nextRetryTime,
                       @Param("errorMsg") String errorMsg);
}
代码语言:javascript
复制

5.2.3 同步任务管理器实现
代码语言:javascript
复制
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;

/**
 * 同步任务管理器
 * 负责管理同步任务的生命周期和状态流转
 *
 * @author 技术专家
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class SyncTaskManager {
    /**
     * 最大重试次数
     */
    private static final int MAX_RETRY_COUNT = 5;

    /**
     * 初始重试间隔(分钟)
     */
    private static final int INITIAL_RETRY_INTERVAL_MINUTES = 1;

    /**
     * 重试间隔乘数
     */
    private static final double RETRY_INTERVAL_MULTIPLIER = 2.0;

    private final SyncTaskRecordRepository syncTaskRecordRepository;
    private final ObjectMapper objectMapper;

    /**
     * 创建同步任务
     *
     * @param taskName 任务名称
     * @param businessId 业务ID
     * @param request 接口请求参数
     * @return 创建的任务记录
     */
    @Transactional(rollbackFor = Exception.class)
    public SyncTaskRecord createTask(String taskName, String businessId, Object request) {
        Objects.requireNonNull(taskName, "任务名称不能为空");

        // 生成唯一任务ID
        String taskId = generateTaskId();

        // 转换请求参数为JSON
        String apiRequest = null;
        if (Objects.nonNull(request)) {
            try {
                apiRequest = objectMapper.writeValueAsString(request);
            } catch (JsonProcessingException e) {
                log.error("请求参数转换为JSON失败", e);
                throw new RuntimeException("请求参数转换为JSON失败", e);
            }
        }

        // 创建任务记录
        SyncTaskRecord taskRecord = new SyncTaskRecord();
        taskRecord.setTaskName(taskName);
        taskRecord.setTaskId(taskId);
        taskRecord.setBusinessId(businessId);
        taskRecord.setStatus(SyncTaskRecord.STATUS_INIT);
        taskRecord.setApiRequest(apiRequest);
        taskRecord.setRetryCount(0);
        taskRecord.setCreateTime(LocalDateTime.now());
        taskRecord.setUpdateTime(LocalDateTime.now());

        return syncTaskRecordRepository.save(taskRecord);
    }

    /**
     * 执行同步任务
     *
     * @param taskId 任务ID
     * @param apiCaller 调用第三方接口的函数
     * @param dataProcessor 处理接口返回数据的函数
     * @param <T> 接口返回数据类型
     * @param <R> 处理结果类型
     * @return 处理结果
     */
    @Transactional(rollbackFor = Exception.class)
    public <T, R> R executeTask(String taskId, Function<String, T> apiCaller, Function<T, R> dataProcessor) {
        Objects.requireNonNull(taskId, "任务ID不能为空");
        Objects.requireNonNull(apiCaller, "API调用函数不能为空");
        Objects.requireNonNull(dataProcessor, "数据处理函数不能为空");

        // 查询任务记录
        SyncTaskRecord taskRecord = syncTaskRecordRepository.findByTaskId(taskId)
                .orElseThrow(() -> new RuntimeException("任务不存在,taskId:" + taskId));

        log.info("开始执行同步任务,taskId:{}, 当前状态:{}", taskId, taskRecord.getStatus());

        // 检查任务状态是否允许执行
        if (taskRecord.getStatus() == SyncTaskRecord.STATUS_SUCCESS) {
            log.info("任务已成功完成,无需重复执行,taskId:{}", taskId);
            return null;
        }

        if (taskRecord.getStatus() == SyncTaskRecord.STATUS_FAILED && 
                taskRecord.getRetryCount() >= MAX_RETRY_COUNT) {
            log.warn("任务已达到最大重试次数,不再执行,taskId:{}, 重试次数:{}",
                    taskId, taskRecord.getRetryCount());
            return null;
        }

        try {
            // 更新任务状态为接口调用中
            updateTaskStatus(taskRecord.getId(), SyncTaskRecord.STATUS_API_CALLING, null, null);

            // 调用第三方接口
            T apiResponse = apiCaller.apply(taskRecord.getApiRequest());

            // 转换响应数据为JSON并更新任务记录
            String apiResponseJson = objectMapper.writeValueAsString(apiResponse);
            taskRecord.setApiResponse(apiResponseJson);
            syncTaskRecordRepository.save(taskRecord);

            // 更新任务状态为接口调用成功
            updateTaskStatus(taskRecord.getId(), SyncTaskRecord.STATUS_API_SUCCESS, null, null);

            // 更新任务状态为数据处理中
            updateTaskStatus(taskRecord.getId(), SyncTaskRecord.STATUS_DATA_PROCESSING, null, null);

            // 处理接口返回数据
            R result = dataProcessor.apply(apiResponse);

            // 更新任务状态为处理成功
            updateTaskStatus(taskRecord.getId(), SyncTaskRecord.STATUS_SUCCESS, null, LocalDateTime.now());

            log.info("同步任务执行成功,taskId:{}", taskId);
            return result;
        } catch (Exception e) {
            log.error("同步任务执行失败,taskId:{}", taskId, e);

            // 计算下次重试时间
            int retryCount = taskRecord.getRetryCount() + 1;
            LocalDateTime nextRetryTime = calculateNextRetryTime(retryCount);

            // 检查是否达到最大重试次数
            if (retryCount >= MAX_RETRY_COUNT) {
                // 更新任务状态为处理失败,不再重试
                updateTaskStatus(taskRecord.getId(), SyncTaskRecord.STATUS_FAILED, 
                        e.getMessage(), LocalDateTime.now());
                log.error("同步任务已达到最大重试次数,taskId:{}, 重试次数:{}", taskId, retryCount);
            } else {
                // 更新任务重试信息
                syncTaskRecordRepository.updateRetryInfo(
                        taskRecord.getId(),
                        SyncTaskRecord.STATUS_FAILED,
                        retryCount,
                        nextRetryTime,
                        e.getMessage()
                );
                log.info("同步任务将在下次重试,taskId:{}, 重试次数:{}, 下次重试时间:{}",
                        taskId, retryCount, nextRetryTime);
            }

            throw new RuntimeException("同步任务执行失败,taskId:" + taskId, e);
        }
    }

    /**
     * 处理需要重试的任务
     *
     * @param batchSize 批次大小
     * @return 处理的任务数量
     */
    @Transactional(rollbackFor = Exception.class)
    public int processRetryTasks(int batchSize) {
        log.info("开始处理需要重试的任务,批次大小:{}", batchSize);

        // 查询需要重试的任务
        List<SyncTaskRecord> retryTasks = syncTaskRecordRepository.findRetryTasks(
                SyncTaskRecord.STATUS_FAILED,
                LocalDateTime.now(),
                batchSize
        );

        if (CollectionUtils.isEmpty(retryTasks)) {
            log.info("没有需要重试的任务");
            return 0;
        }

        log.info("发现需要重试的任务数量:{}", retryTasks.size());

        // 重置任务状态为初始化,等待下次执行
        int processedCount = 0;
        for (SyncTaskRecord task : retryTasks) {
            try {
                updateTaskStatus(task.getId(), SyncTaskRecord.STATUS_INIT, null, null);
                processedCount++;
            } catch (Exception e) {
                log.error("重置重试任务状态失败,taskId:{}", task.getTaskId(), e);
            }
        }

        log.info("处理重试任务完成,成功重置状态的任务数量:{}", processedCount);
        return processedCount;
    }

    /**
     * 更新任务状态
     *
     * @param id 任务ID
     * @param status 新状态
     * @param errorMsg 错误信息
     * @param finishTime 完成时间
     */
    private void updateTaskStatus(Long id, Integer status, String errorMsg, LocalDateTime finishTime) {
        int rowsAffected = syncTaskRecordRepository.updateStatus(id, status, errorMsg, finishTime);
        if (rowsAffected == 0) {
            log.warn("更新任务状态失败,可能任务已被其他线程修改,id:{}, status:{}", id, status);
        } else {
            log.debug("更新任务状态成功,id:{}, status:{}", id, status);
        }
    }

    /**
     * 生成任务ID
     *
     * @return 生成的任务ID
     */
    private String generateTaskId() {
        return "TASK_" + UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
    }

    /**
     * 计算下次重试时间
     *
     * @param retryCount 重试次数
     * @return 下次重试时间
     */
    private LocalDateTime calculateNextRetryTime(int retryCount) {
        // 计算重试间隔,使用指数退避策略
        long intervalMinutes = (long) (INITIAL_RETRY_INTERVAL_MINUTES * Math.pow(RETRY_INTERVAL_MULTIPLIER, retryCount - 1));

        // 加入随机抖动,避免所有失败任务同时重试
        double jitter = 1 + (Math.random() - 0.5) * 0.2; // 随机抖动±10%
        intervalMinutes = (long) (intervalMinutes * jitter);

        // 计算下次重试时间
        return LocalDateTime.now().plus(intervalMinutes, ChronoUnit.MINUTES);
    }
}
代码语言:javascript
复制

5.3 数据一致性的最佳实践

  1. 状态机设计
    • 明确定义任务的状态流转规则
    • 每个状态转换都要记录,便于问题排查
    • 对异常状态设置自动修复机制
  2. 定时对账机制
    • 定期与第三方系统进行数据比对
    • 对不一致的数据进行标记和修复
    • 记录对账结果,形成闭环
  3. 补偿事务
    • 对失败的操作提供补偿机制
    • 补偿操作也要保证幂等性
    • 记录补偿操作的执行情况
  4. 数据校验
    • 对同步的数据进行完整性和一致性校验
    • 校验失败的数据单独处理,不影响整体流程

六、完整的定时任务实现

6.1 定时任务配置

首先,我们需要在 Spring Boot 应用中配置定时任务:

代码语言:javascript
复制
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import java.util.concurrent.Executors;

/**
 * 定时任务配置
 *
 * @author 技术专家
 */
@Configuration
@EnableScheduling
public class SchedulingConfig implements SchedulingConfigurer {
    /**
     * 定时任务线程池大小
     */
    private static final int TASK_POOL_SIZE = 5;

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(TASK_POOL_SIZE);
        taskScheduler.setThreadNamePrefix("scheduled-task-");
        taskScheduler.setAwaitTerminationSeconds(60);
        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        taskScheduler.initialize();

        taskRegistrar.setTaskScheduler(taskScheduler);
    }
}
代码语言:javascript
复制

6.2 第三方接口客户端

代码语言:javascript
复制
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestParam;

/**
 * 第三方接口客户端
 * 使用Feign调用第三方API
 *
 * @author 技术专家
 */
@FeignClient(name = "thirdPartyApiClient", url = "${third.party.api.url}")
public interface ThirdPartyApiClient {
    /**
     * 获取访问令牌
     *
     * @param request 令牌请求参数
     * @return 令牌响应
     */
    @PostMapping("/oauth/token")
    TokenResponse getToken(TokenRequest request);

    /**
     * 获取数据列表
     *
     * @param accessToken 访问令牌
     * @param page 页码
     * @param pageSize 每页大小
     * @return 数据响应
     */
    @PostMapping("/api/data/list")
    DataListResponse getDataList(
            @RequestHeader("Authorization") String accessToken,
            @RequestParam("page") int page,
            @RequestParam("pageSize") int pageSize);
}
代码语言:javascript
复制

6.3 业务服务实现

代码语言:javascript
复制
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

/**
 * 数据同步服务
 * 负责定时从第三方接口同步数据到本地数据库
 *
 * @author 技术专家
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class DataSyncService {
    /**
     * API Key
     */
    private static final String API_KEY = "${third.party.api.key}";

    /**
     * 业务类型
     */
    private static final String BUSINESS_TYPE = "DATA_SYNC";

    /**
     * 每页数据量
     */
    private static final int PAGE_SIZE = 100;

    private final TokenManager tokenManager;
    private final IdempotentProcessor idempotentProcessor;
    private final RetryUtils retryUtils;
    private final SyncTaskManager syncTaskManager;
    private final ThirdPartyApiClient thirdPartyApiClient;
    private final LocalDataRepository localDataRepository;

    /**
     * 同步数据的定时任务
     */
    //@Scheduled(cron = "0 0 1 * * ?") // 每天凌晨1点执行
    public void syncDataTask() {
        log.info("开始执行数据同步任务");

        try {
            // 创建同步任务记录
            SyncTaskRecord taskRecord = syncTaskManager.createTask(
                    BUSINESS_TYPE, null, "全量数据同步");

            // 执行同步任务
            syncTaskManager.executeTask(
                    taskRecord.getTaskId(),
                    this::callThirdPartyApi,
                    this::processAndSaveData
            );

            log.info("数据同步任务执行完成");
        } catch (Exception e) {
            log.error("数据同步任务执行失败", e);
        }
    }

    /**
     * 调用第三方接口获取数据
     *
     * @param requestParam 请求参数(JSON格式)
     * @return 接口返回的数据
     */
    private DataListResponse callThirdPartyApi(String requestParam) {
        log.info("开始调用第三方接口获取数据");

        // 获取有效的访问令牌
        String accessToken = tokenManager.getValidAccessToken(API_KEY);
        if (StringUtils.isBlank(accessToken)) {
            throw new RuntimeException("获取访问令牌失败");
        }

        // 构建Authorization头
        String authorizationHeader = "Bearer " + accessToken;

        // 分页获取所有数据
        int page = 1;
        DataListResponse allData = new DataListResponse();

        while (true) {
            log.info("调用第三方接口,页码:{}, 每页大小:{}", page, PAGE_SIZE);

            // 带重试机制调用接口
            DataListResponse response = retryUtils.executeWithRetry(() ->
                    thirdPartyApiClient.getDataList(authorizationHeader, page, PAGE_SIZE)
            );

            if (Objects.isNull(response) || CollectionUtils.isEmpty(response.getDataList())) {
                log.info("第三方接口返回空数据,页码:{}", page);
                break;
            }

            // 累加数据
            allData.getDataList().addAll(response.getDataList());

            // 检查是否还有下一页
            if (response.getCurrentPage() >= response.getTotalPages()) {
                log.info("已获取所有数据,总页数:{}, 总条数:{}",
                        response.getTotalPages(), response.getTotalCount());
                break;
            }

            page++;
        }

        log.info("第三方接口调用完成,共获取数据:{}条", 
                CollectionUtils.isEmpty(allData.getDataList()) ? 0 : allData.getDataList().size());
        return allData;
    }

    /**
     * 处理并保存数据到本地数据库
     *
     * @param response 第三方接口返回的数据
     * @return 处理结果
     */
    @Transactional(rollbackFor = Exception.class)
    private Boolean processAndSaveData(DataListResponse response) {
        log.info("开始处理并保存数据,数据量:{}",
                CollectionUtils.isEmpty(response.getDataList()) ? 0 : response.getDataList().size());

        if (CollectionUtils.isEmpty(response.getDataList())) {
            log.info("没有需要处理的数据");
            return true;
        }

        // 逐条处理数据,保证每条数据的幂等性
        int successCount = 0;
        int failCount = 0;

        for (ThirdPartyData data : response.getDataList()) {
            try {
                // 使用数据ID作为请求ID,保证幂等性
                String requestId = data.getId();

                // 执行幂等操作
                idempotentProcessor.executeIdempotentOperation(BUSINESS_TYPE, requestId, () -> {
                    // 转换第三方数据为本地数据模型
                    LocalData localData = convertToLocalData(data);

                    // 保存或更新本地数据
                    localDataRepository.save(localData);
                    return localData;
                });

                successCount++;
            } catch (Exception e) {
                log.error("处理数据失败,数据ID:{}", data.getId(), e);
                failCount++;
            }
        }

        log.info("数据处理完成,成功:{}条,失败:{}条", successCount, failCount);

        if (failCount > 0) {
            log.warn("存在处理失败的数据,数量:{}", failCount);
            // 可以根据实际业务需求决定是否抛出异常
            // throw new RuntimeException("部分数据处理失败,失败数量:" + failCount);
        }

        return true;
    }

    /**
     * 将第三方数据转换为本地数据模型
     *
     * @param thirdPartyData 第三方数据
     * @return 本地数据模型
     */
    private LocalData convertToLocalData(ThirdPartyData thirdPartyData) {
        // 实际项目中根据业务需求进行字段映射和转换
        LocalData localData = new LocalData();
        localData.setExternalId(thirdPartyData.getId());
        localData.setName(thirdPartyData.getName());
        localData.setValue(thirdPartyData.getValue());
        localData.setStatus(thirdPartyData.getStatus());
        localData.setSyncTime(LocalDateTime.now());
        // 设置其他字段...

        return localData;
    }
}
代码语言:javascript
复制

6.4 依赖配置(pom.xml)

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.0</version>
        <relativePath/>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>third-party-api-sync</artifactId>
    <version>1.0.0</version>
    <name>third-party-api-sync</name>
    <description>定时任务调用第三方接口并同步数据到数据库的示例项目</description>

    <properties>
        <java.version>17</java.version>
        <spring-cloud.version>2023.0.0</spring-cloud.version>
        <lombok.version>1.18.30</lombok.version>
        <commons-lang3.version>3.14.0</commons-lang3.version>
        <mysql-connector.version>8.0.33</mysql-connector.version>
    </properties>

    <dependencies>
        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Spring Boot Starter Data JPA -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <!-- Spring Boot Starter Scheduling -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-scheduling</artifactId>
        </dependency>

        <!-- Spring Boot Starter Redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- Spring Cloud OpenFeign -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>

        <!-- Spring Retry -->
        <dependency>
            <groupId>org.springframework.retry</groupId>
            <artifactId>spring-retry</artifactId>
        </dependency>

        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Apache Commons Lang3 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${commons-lang3.version}</version>
        </dependency>

        <!-- MySQL Connector -->
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <version>${mysql-connector.version}</version>
            <scope>runtime</scope>
        </dependency>

        <!-- Spring Boot Starter Test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
代码语言:javascript
复制

七、监控与告警

7.1 监控指标设计

为了确保系统的稳定运行,我们需要设计关键监控指标:

  1. Token 相关指标
    • Token 刷新成功率
    • Token 有效时长
    • Token 刷新频率
  2. 接口调用指标
    • 接口调用成功率
    • 接口响应时间
    • 接口调用频率
  3. 任务执行指标
    • 任务成功率
    • 任务执行时长
    • 重试次数分布
  4. 数据同步指标
    • 数据同步成功率
    • 数据同步总量
    • 数据同步耗时

7.2 监控实现

使用 Spring Boot Actuator 和 Micrometer 实现监控指标收集:

代码语言:javascript
复制
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;

/**
 * 监控指标收集器
 *
 * @author 技术专家
 */
@Slf4j
@Component
public class MetricsCollector {
    private final MeterRegistry meterRegistry;

    // Token相关指标
    private Counter tokenRefreshSuccessCounter;
    private Counter tokenRefreshFailCounter;
    private Timer tokenRefreshTimer;

    // 接口调用相关指标
    private Counter apiCallSuccessCounter;
    private Counter apiCallFailCounter;
    private Timer apiCallTimer;

    // 任务执行相关指标
    private Counter taskSuccessCounter;
    private Counter taskFailCounter;
    private Timer taskExecuteTimer;

    // 数据同步相关指标
    private Counter dataSyncSuccessCounter;
    private Counter dataSyncFailCounter;
    private Timer dataSyncTimer;

    public MetricsCollector(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
    }

    @PostConstruct
    public void init() {
        // 初始化Token相关指标
        tokenRefreshSuccessCounter = meterRegistry.counter("token.refresh.success");
        tokenRefreshFailCounter = meterRegistry.counter("token.refresh.fail");
        tokenRefreshTimer = Timer.builder("token.refresh.duration")
                .description("Token刷新耗时")
                .register(meterRegistry);

        // 初始化接口调用相关指标
        apiCallSuccessCounter = meterRegistry.counter("api.call.success");
        apiCallFailCounter = meterRegistry.counter("api.call.fail");
        apiCallTimer = Timer.builder("api.call.duration")
                .description("接口调用耗时")
                .register(meterRegistry);

        // 初始化任务执行相关指标
        taskSuccessCounter = meterRegistry.counter("task.execute.success");
        taskFailCounter = meterRegistry.counter("task.execute.fail");
        taskExecuteTimer = Timer.builder("task.execute.duration")
                .description("任务执行耗时")
                .register(meterRegistry);

        // 初始化数据同步相关指标
        dataSyncSuccessCounter = meterRegistry.counter("data.sync.success");
        dataSyncFailCounter = meterRegistry.counter("data.sync.fail");
        dataSyncTimer = Timer.builder("data.sync.duration")
                .description("数据同步耗时")
                .register(meterRegistry);

        log.info("监控指标收集器初始化完成");
    }

    // Token相关指标记录方法
    public void recordTokenRefreshSuccess(long durationMs) {
        tokenRefreshSuccessCounter.increment();
        tokenRefreshTimer.record(durationMs, TimeUnit.MILLISECONDS);
    }

    public void recordTokenRefreshFail() {
        tokenRefreshFailCounter.increment();
    }

    // 接口调用相关指标记录方法
    public void recordApiCallSuccess(long durationMs) {
        apiCallSuccessCounter.increment();
        apiCallTimer.record(durationMs, TimeUnit.MILLISECONDS);
    }

    public void recordApiCallFail() {
        apiCallFailCounter.increment();
    }

    // 任务执行相关指标记录方法
    public void recordTaskSuccess(long durationMs) {
        taskSuccessCounter.increment();
        taskExecuteTimer.record(durationMs, TimeUnit.MILLISECONDS);
    }

    public void recordTaskFail() {
        taskFailCounter.increment();
    }

    // 数据同步相关指标记录方法
    public void recordDataSyncSuccess(long durationMs, int count) {
        dataSyncSuccessCounter.increment(count);
        dataSyncTimer.record(durationMs, TimeUnit.MILLISECONDS);
    }

    public void recordDataSyncFail(int count) {
        dataSyncFailCounter.increment(count);
    }
}
代码语言:javascript
复制

7.3 告警机制

结合 Prometheus 和 Grafana 设置告警规则,当以下情况发生时触发告警:

  1. Token 刷新失败次数超过阈值
  2. 接口调用失败率超过阈值
  3. 任务执行失败次数超过阈值
  4. 数据同步成功率低于阈值
  5. 关键操作耗时超过阈值

八、总结与展望

本文详细介绍了定时任务调用第三方接口并同步数据到数据库时需要注意的核心问题及解决方案,包括:

  1. Token 管理与续期通过多级缓存、分布式锁和提前刷新策略,确保 Token 的安全有效。
  2. 接口幂等性基于唯一标识和数据库约束,保证重复调用不会产生副作用。
  3. 失败重试机制采用指数退避策略,合理设置重试次数和间隔,避免重试风暴。
  4. 数据一致性通过状态机、本地消息表和补偿机制,保证数据最终一致性。
  5. 监控与告警设计关键指标,及时发现和解决系统问题。

这些方案不仅考虑了功能实现,还兼顾了性能、安全性和可维护性,已经在多个生产环境中得到验证。

未来,可以进一步探索以下方向:

  1. 引入服务网格(Service Mesh)技术,将流量控制、重试、熔断等功能下沉到基础设施层。
  2. 使用事件驱动架构,提高系统的解耦性和可扩展性。
  3. 结合 AI 技术,实现异常检测和自愈能力,提高系统的智能化水平。

九、参考资料

  1. OAuth 2.0 协议:IETF RFC 6749
  2. 幂等性概念:W3C 规范
  3. 重试机制与容错模式:Martin Fowler 的《容错模式》
  4. 数据一致性理论:Martin Kleppmann 的《数据密集型应用系统设计》
  5. Java 并发编程:《Java 并发编程实战》
  6. Spring 框架最佳实践:Spring 官方文档
  7. 数据库设计规范:MySQL 官方文档
  8. 代码规范:《阿里巴巴 Java 开发手册(嵩山版)》
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-08-29,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 果酱带你啃java 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 在现代企业级应用开发中,定时任务调用第三方接口并将数据同步到本地数据库是一种常见的业务场景。无论是电商平台同步物流信息、金融系统获取汇率数据,还是企业 ERP 系统对接第三方服务,都离不开这种数据交互模式。然而,这个看似简单的流程背后隐藏着诸多技术挑战:如何安全高效地管理第三方接口的 Token?如何保证接口调用的幂等性?如何处理网络波动导致的调用失败?如何确保数据最终一致性?
  • 一、系统设计概述
    • 1.1 整体架构
    • 1.2 核心流程
    • 1.3 技术选型
  • 二、Token 管理与续期策略
    • 2.1 Token 的重要性与常见问题
    • 2.2 Token 管理方案设计
      • 2.2.1 Token 实体设计
      • 2.2.2 Token 管理器实现
      • 2.2.3 自定义异常类
    • 2.3 Token 管理的最佳实践
  • 三、接口幂等性保证
    • 3.1 幂等性的概念与重要性
    • 3.2 幂等性实现方案
      • 3.2.1 幂等性记录表设计
      • 3.2.2 幂等性实体与 Repository
      • 3.2.3 幂等处理器实现
      • 3.2.4 幂等相关异常类
    • 3.3 幂等性实现的最佳实践
  • 四、调用失败重试机制
    • 4.1 重试机制的设计原则
    • 4.2 重试机制实现方案
      • 4.2.1 重试策略配置
      • 4.2.2 重试工具类实现
    • 4.3 重试机制的最佳实践
  • 五、数据一致性保障
    • 5.1 数据一致性的挑战
    • 5.2 最终一致性方案
      • 5.2.1 同步任务记录表设计
      • 5.2.2 同步任务实体与 Repository
      • 5.2.3 同步任务管理器实现
    • 5.3 数据一致性的最佳实践
  • 六、完整的定时任务实现
    • 6.1 定时任务配置
    • 6.2 第三方接口客户端
    • 6.3 业务服务实现
    • 6.4 依赖配置(pom.xml)
  • 七、监控与告警
    • 7.1 监控指标设计
    • 7.2 监控实现
    • 7.3 告警机制
  • 八、总结与展望
  • 九、参考资料
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档