
我将在本文中系统梳理定时任务调用第三方接口时需要注意的核心问题,并提供经过生产环境验证的解决方案。本文不仅涵盖理论分析,还包含完整的代码实现,旨在帮助开发者构建健壮、可靠的集成系统。
在开始讨论具体实现之前,我们先来看一下典型的定时任务调用第三方接口的系统架构:

整个系统的核心流程如下:

基于 Java 生态,我们选择以下技术组件:
第三方接口通常采用 Token 进行身份验证,这是一种基于 OAuth 2.0 等协议的安全机制(IETF RFC 6749)。在定时任务场景中,Token 管理面临以下挑战:
一个健壮的 Token 管理方案应包含以下核心功能:
首先定义 Token 实体类:
import lombok.Data;
import java.time.LocalDateTime;
/**
* 第三方接口访问令牌
*
* @author 技术专家
*/
@Data
public class ApiToken {
/**
* 访问令牌
*/
private String accessToken;
/**
* 刷新令牌
*/
private String refreshToken;
/**
* 令牌类型
*/
private String tokenType;
/**
* 过期时间(秒)
*/
private Integer expiresIn;
/**
* 令牌获取时间
*/
private LocalDateTime obtainTime;
/**
* 令牌过期时间
*/
private LocalDateTime expireTime;
/**
* 检查令牌是否有效
*
* @return true-有效,false-无效
*/
public boolean isValid() {
// 提前60秒过期,避免网络延迟等问题导致的令牌无效
return LocalDateTime.now().plusSeconds(60).isBefore(expireTime);
}
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* 第三方接口Token管理器
* 负责Token的获取、刷新、存储和有效性检查
*
* @author 技术专家
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class TokenManager {
/**
* Redis中存储Token的键前缀
*/
private static final String TOKEN_KEY_PREFIX = "api:token:";
/**
* 分布式环境下刷新Token的锁键
*/
private static final String TOKEN_REFRESH_LOCK_KEY = "api:token:refresh:lock";
/**
* 默认的Token存储过期时间(小时)
*/
private static final int DEFAULT_TOKEN_EXPIRE_HOURS = 24;
private final RedisTemplate<String, Object> redisTemplate;
private final ThirdPartyApiClient thirdPartyApiClient;
/**
* 本地锁,防止单JVM内重复刷新Token
*/
private final ReentrantLock localLock = new ReentrantLock();
/**
* 获取有效的访问令牌
*
* @param apiKey 第三方接口的API Key,用于区分不同的接口服务
* @return 有效的访问令牌
*/
public String getValidAccessToken(String apiKey) {
Objects.requireNonNull(apiKey, "API Key不能为空");
ApiToken apiToken = getStoredToken(apiKey);
// 如果Token不存在或已过期,则刷新Token
if (Objects.isNull(apiToken) || !apiToken.isValid()) {
log.info("Token不存在或已过期,需要刷新,apiKey:{}", apiKey);
apiToken = refreshAccessToken(apiKey);
}
return apiToken.getAccessToken();
}
/**
* 从存储中获取Token
*
* @param apiKey API Key
* @return Token对象,如果不存在则返回null
*/
private ApiToken getStoredToken(String apiKey) {
String tokenKey = buildTokenKey(apiKey);
Object tokenObj = redisTemplate.opsForValue().get(tokenKey);
if (Objects.isNull(tokenObj)) {
log.debug("Redis中未找到Token,apiKey:{}", apiKey);
return null;
}
try {
return (ApiToken) tokenObj;
} catch (ClassCastException e) {
log.error("Token类型转换失败,apiKey:{}", apiKey, e);
// 清除错误的Token数据
redisTemplate.delete(tokenKey);
return null;
}
}
/**
* 刷新访问令牌
* 采用分布式锁保证同一时间只有一个实例在刷新Token
*
* @param apiKey API Key
* @return 新的Token对象
*/
private ApiToken refreshAccessToken(String apiKey) {
// 先尝试获取本地锁,减少分布式锁的竞争
if (!localLock.tryLock()) {
log.info("本地锁已被占用,等待其他线程刷新Token,apiKey:{}", apiKey);
// 等待本地锁释放后再尝试获取Token
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("等待本地锁释放时被中断", e);
}
return getStoredToken(apiKey);
}
try {
// 获取分布式锁,防止集群环境下多个实例同时刷新Token
Boolean lockAcquired = redisTemplate.opsForValue().setIfAbsent(
TOKEN_REFRESH_LOCK_KEY + apiKey,
"LOCK",
30,
TimeUnit.SECONDS
);
if (Boolean.FALSE.equals(lockAcquired)) {
log.info("分布式锁已被占用,等待其他实例刷新Token,apiKey:{}", apiKey);
// 等待一段时间后再尝试获取Token
try {
TimeUnit.MILLISECONDS.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("等待分布式锁释放时被中断", e);
}
return getStoredToken(apiKey);
}
// 双重检查,防止重复刷新
ApiToken existingToken = getStoredToken(apiKey);
if (Objects.nonNull(existingToken) && existingToken.isValid()) {
log.info("其他线程已刷新Token,直接使用,apiKey:{}", apiKey);
return existingToken;
}
log.info("开始刷新Token,apiKey:{}", apiKey);
ApiToken newToken = fetchNewToken(apiKey);
if (Objects.isNull(newToken) || StringUtils.isBlank(newToken.getAccessToken())) {
log.error("刷新Token失败,获取到的Token为空,apiKey:{}", apiKey);
throw new TokenRefreshException("刷新Token失败,获取到的Token为空");
}
// 计算Token过期时间
LocalDateTime obtainTime = LocalDateTime.now();
newToken.setObtainTime(obtainTime);
newToken.setExpireTime(obtainTime.plusSeconds(newToken.getExpiresIn()));
// 存储新Token
storeToken(apiKey, newToken);
log.info("Token刷新成功,apiKey:{},过期时间:{}", apiKey, newToken.getExpireTime());
return newToken;
} finally {
// 释放本地锁
localLock.unlock();
// 释放分布式锁
redisTemplate.delete(TOKEN_REFRESH_LOCK_KEY + apiKey);
}
}
/**
* 调用第三方接口获取新的Token
*
* @param apiKey API Key
* @return 新的Token对象
*/
private ApiToken fetchNewToken(String apiKey) {
try {
// 实际项目中需要根据第三方接口的要求构建请求参数
TokenRequest request = new TokenRequest();
request.setApiKey(apiKey);
// 设置其他必要参数...
TokenResponse response = thirdPartyApiClient.getToken(request);
if (Objects.isNull(response) || !response.isSuccess()) {
log.error("调用第三方接口获取Token失败,apiKey:{}, 响应:{}", apiKey, response);
return null;
}
// 转换响应为ApiToken对象
ApiToken apiToken = new ApiToken();
apiToken.setAccessToken(response.getAccessToken());
apiToken.setRefreshToken(response.getRefreshToken());
apiToken.setTokenType(response.getTokenType());
apiToken.setExpiresIn(response.getExpiresIn());
return apiToken;
} catch (Exception e) {
log.error("获取新Token时发生异常,apiKey:{}", apiKey, e);
return null;
}
}
/**
* 存储Token到Redis
*
* @param apiKey API Key
* @param token Token对象
*/
private void storeToken(String apiKey, ApiToken token) {
String tokenKey = buildTokenKey(apiKey);
// 计算Token在Redis中的过期时间,取实际过期时间和默认过期时间的较小值
long expireSeconds = Duration.between(LocalDateTime.now(), token.getExpireTime()).getSeconds();
long redisExpireSeconds = Math.min(expireSeconds, DEFAULT_TOKEN_EXPIRE_HOURS * 3600L);
redisTemplate.opsForValue().set(tokenKey, token, redisExpireSeconds, TimeUnit.SECONDS);
log.debug("Token已存储到Redis,apiKey:{}, 过期时间:{}秒", apiKey, redisExpireSeconds);
}
/**
* 构建Redis中的Token键
*
* @param apiKey API Key
* @return 构建后的键
*/
private String buildTokenKey(String apiKey) {
return TOKEN_KEY_PREFIX + apiKey;
}
/**
* 手动刷新Token(用于紧急情况)
*
* @param apiKey API Key
* @return 新的Token对象
*/
public ApiToken manualRefreshToken(String apiKey) {
Objects.requireNonNull(apiKey, "API Key不能为空");
// 先删除旧的Token
String tokenKey = buildTokenKey(apiKey);
redisTemplate.delete(tokenKey);
log.info("已删除旧的Token,准备手动刷新,apiKey:{}", apiKey);
// 刷新并返回新的Token
return refreshAccessToken(apiKey);
}
}
/**
* Token刷新异常
*
* @author 技术专家
*/
public class TokenRefreshException extends RuntimeException {
public TokenRefreshException() {
super();
}
public TokenRefreshException(String message) {
super(message);
}
public TokenRefreshException(String message, Throwable cause) {
super(message, cause);
}
}
幂等性是指一次或多次执行相同的操作,其结果是相同的(W3C 规范)。在定时任务调用第三方接口的场景中,保证幂等性至关重要,因为:
不保证幂等性可能导致数据重复、状态不一致、业务逻辑错误等严重问题。
常见的幂等性实现方案有以下几种:
在定时任务场景中,我们通常采用基于唯一标识的幂等方案,结合数据库唯一约束实现。
首先创建幂等性记录的数据表:
CREATE TABLE `idempotent_record` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`business_type` varchar(64) NOT NULL COMMENT '业务类型',
`request_id` varchar(64) NOT NULL COMMENT '请求唯一标识',
`status` tinyint NOT NULL COMMENT '处理状态:0-处理中,1-处理成功,2-处理失败',
`response_data` text COMMENT '响应数据',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`expire_time` datetime DEFAULT NULL COMMENT '过期时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_business_request` (`business_type`,`request_id`) COMMENT '业务类型+请求ID唯一索引,保证幂等性',
KEY `idx_expire_time` (`expire_time`) COMMENT '过期时间索引,用于清理过期数据'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='幂等性记录表';
import lombok.Data;
import org.hibernate.annotations.DynamicInsert;
import org.hibernate.annotations.DynamicUpdate;
import javax.persistence.*;
import java.time.LocalDateTime;
/**
* 幂等性记录实体
*
* @author 技术专家
*/
@Data
@Entity
@Table(name = "idempotent_record")
@DynamicInsert
@DynamicUpdate
public class IdempotentRecord {
/**
* 处理状态:处理中
*/
public static final int STATUS_PROCESSING = 0;
/**
* 处理状态:处理成功
*/
public static final int STATUS_SUCCESS = 1;
/**
* 处理状态:处理失败
*/
public static final int STATUS_FAILED = 2;
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "business_type", nullable = false, length = 64)
private String businessType;
@Column(name = "request_id", nullable = false, length = 64)
private String requestId;
@Column(name = "status", nullable = false)
private Integer status;
@Column(name = "response_data", columnDefinition = "text")
private String responseData;
@Column(name = "create_time", nullable = false, updatable = false)
private LocalDateTime createTime;
@Column(name = "update_time", nullable = false)
private LocalDateTime updateTime;
@Column(name = "expire_time")
private LocalDateTime expireTime;
}
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.util.Optional;
/**
* 幂等性记录数据访问接口
*
* @author 技术专家
*/
public interface IdempotentRecordRepository extends JpaRepository<IdempotentRecord, Long> {
/**
* 根据业务类型和请求ID查询幂等性记录
*
* @param businessType 业务类型
* @param requestId 请求ID
* @return 幂等性记录
*/
Optional<IdempotentRecord> findByBusinessTypeAndRequestId(String businessType, String requestId);
/**
* 尝试插入幂等性记录,使用数据库唯一约束保证原子性
*
* @param businessType 业务类型
* @param requestId 请求ID
* @param status 状态
* @param expireTime 过期时间
* @return 影响的行数,1-成功,0-失败
*/
@Modifying
@Query(value = "INSERT IGNORE INTO idempotent_record " +
"(business_type, request_id, status, create_time, update_time, expire_time) " +
"VALUES (:businessType, :requestId, :status, NOW(), NOW(), :expireTime)",
nativeQuery = true)
int insertIgnore(@Param("businessType") String businessType,
@Param("requestId") String requestId,
@Param("status") Integer status,
@Param("expireTime") LocalDateTime expireTime);
/**
* 更新幂等性记录的状态和响应数据
*
* @param id ID
* @param status 状态
* @param responseData 响应数据
* @return 影响的行数
*/
@Modifying
@Query("UPDATE IdempotentRecord SET status = :status, responseData = :responseData, " +
"updateTime = CURRENT_TIMESTAMP WHERE id = :id")
int updateStatusAndResponseData(@Param("id") Long id,
@Param("status") Integer status,
@Param("responseData") String responseData);
}
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;
/**
* 幂等处理器
* 负责处理接口调用的幂等性保证
*
* @author 技术专家
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class IdempotentProcessor {
/**
* 幂等记录默认过期时间(小时)
*/
private static final int DEFAULT_EXPIRE_HOURS = 24;
private final IdempotentRecordRepository idempotentRecordRepository;
private final ObjectMapper objectMapper;
/**
* 执行幂等操作
*
* @param businessType 业务类型
* @param requestId 请求ID,如果为null则自动生成
* @param operation 要执行的操作
* @param <T> 操作返回值类型
* @return 操作结果
*/
@Transactional(rollbackFor = Exception.class)
public <T> T executeIdempotentOperation(String businessType, String requestId, Supplier<T> operation) {
Objects.requireNonNull(businessType, "业务类型不能为空");
// 如果请求ID为null,则自动生成一个UUID作为请求ID
String actualRequestId = StringUtils.hasText(requestId) ? requestId : generateRequestId();
log.info("开始执行幂等操作,businessType:{}, requestId:{}", businessType, actualRequestId);
// 检查是否已经处理过
Optional<IdempotentRecord> existingRecord = idempotentRecordRepository
.findByBusinessTypeAndRequestId(businessType, actualRequestId);
if (existingRecord.isPresent()) {
IdempotentRecord record = existingRecord.get();
log.info("幂等操作已存在,businessType:{}, requestId:{}, status:{}",
businessType, actualRequestId, record.getStatus());
// 根据不同状态返回不同结果
switch (record.getStatus()) {
case IdempotentRecord.STATUS_SUCCESS:
// 如果已经成功,返回之前的结果
return parseResponseData(record.getResponseData());
case IdempotentRecord.STATUS_PROCESSING:
// 如果正在处理中,抛出异常,由上层决定是否重试
throw new IdempotentProcessingException(
String.format("幂等操作正在处理中,businessType:%s, requestId:%s",
businessType, actualRequestId));
case IdempotentRecord.STATUS_FAILED:
// 如果之前处理失败,可以重新尝试处理
log.info("幂等操作之前处理失败,将重新处理,businessType:{}, requestId:{}",
businessType, actualRequestId);
break;
default:
log.warn("未知的幂等操作状态,将重新处理,businessType:{}, requestId:{}, status:{}",
businessType, actualRequestId, record.getStatus());
break;
}
}
// 尝试插入幂等记录,标记为处理中
LocalDateTime expireTime = LocalDateTime.now().plusHours(DEFAULT_EXPIRE_HOURS);
int insertResult = idempotentRecordRepository.insertIgnore(
businessType, actualRequestId, IdempotentRecord.STATUS_PROCESSING, expireTime);
if (insertResult == 0) {
// 插入失败,说明其他线程已经插入了记录,需要重新检查
log.info("幂等记录插入失败,可能有其他线程正在处理,将重新检查,businessType:{}, requestId:{}",
businessType, actualRequestId);
return executeIdempotentOperation(businessType, actualRequestId, operation);
}
log.info("幂等记录插入成功,开始执行实际操作,businessType:{}, requestId:{}",
businessType, actualRequestId);
try {
// 执行实际操作
T result = operation.get();
// 更新记录状态为成功,并保存响应数据
IdempotentRecord record = idempotentRecordRepository
.findByBusinessTypeAndRequestId(businessType, actualRequestId)
.orElseThrow(() -> new RuntimeException(
String.format("幂等记录插入后查询不到,businessType:%s, requestId:%s",
businessType, actualRequestId)));
String responseData = convertToJson(result);
idempotentRecordRepository.updateStatusAndResponseData(
record.getId(), IdempotentRecord.STATUS_SUCCESS, responseData);
log.info("幂等操作执行成功,businessType:{}, requestId:{}", businessType, actualRequestId);
return result;
} catch (Exception e) {
log.error("幂等操作执行失败,businessType:{}, requestId:{}",
businessType, actualRequestId, e);
// 更新记录状态为失败
Optional<IdempotentRecord> recordOpt = idempotentRecordRepository
.findByBusinessTypeAndRequestId(businessType, actualRequestId);
recordOpt.ifPresent(record -> {
idempotentRecordRepository.updateStatusAndResponseData(
record.getId(), IdempotentRecord.STATUS_FAILED, e.getMessage());
});
// 重新抛出异常,由上层处理
throw e;
}
}
/**
* 生成请求ID
*
* @return 生成的请求ID
*/
private String generateRequestId() {
return UUID.randomUUID().toString().replaceAll("-", "");
}
/**
* 将对象转换为JSON字符串
*
* @param obj 要转换的对象
* @return JSON字符串
*/
private String convertToJson(Object obj) {
if (Objects.isNull(obj)) {
return null;
}
try {
return objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
log.error("对象转换为JSON失败", e);
return null;
}
}
/**
* 将JSON字符串解析为对象
*
* @param json JSON字符串
* @param <T> 目标对象类型
* @return 解析后的对象
*/
@SuppressWarnings("unchecked")
private <T> T parseResponseData(String json) {
if (StringUtils.isBlank(json)) {
return null;
}
try {
return (T) objectMapper.readValue(json, Object.class);
} catch (Exception e) {
log.error("解析JSON响应数据失败,json:{}", json, e);
return null;
}
}
}
/**
* 幂等操作处理中异常
* 当检测到幂等操作正在处理中时抛出
*
* @author 技术专家
*/
public class IdempotentProcessingException extends RuntimeException {
public IdempotentProcessingException() {
super();
}
public IdempotentProcessingException(String message) {
super(message);
}
public IdempotentProcessingException(String message, Throwable cause) {
super(message, cause);
}
}
在分布式系统中,网络抖动、服务临时不可用等情况时有发生。合理的重试机制可以提高系统的健壮性,但不当的重试策略可能导致:
根据《容错模式》(Martin Fowler)中的建议,重试机制应遵循以下原则:
我们将实现一个灵活的重试框架,结合 Spring Retry 和自定义策略,满足不同场景的需求。
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 重试策略配置属性
*
* @author 技术专家
*/
@Data
@Component
@ConfigurationProperties(prefix = "api.retry")
public class RetryStrategyProperties {
/**
* 是否启用重试
*/
private boolean enabled = true;
/**
* 最大重试次数
*/
private int maxAttempts = 3;
/**
* 初始重试间隔(毫秒)
*/
private long initialInterval = 1000;
/**
* 重试间隔乘数
*/
private double multiplier = 2.0;
/**
* 最大重试间隔(毫秒)
*/
private long maxInterval = 5000;
/**
* 是否启用随机抖动
*/
private boolean random = true;
/**
* 随机抖动因子(0.0-1.0)
*/
private double randomFactor = 0.5;
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Supplier;
/**
* 重试工具类
* 提供灵活的重试机制,支持自定义重试策略和异常类型
*
* @author 技术专家
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RetryUtils {
private final RetryStrategyProperties retryProperties;
/**
* 执行带重试的操作
* 使用默认的重试策略和异常类型
*
* @param operation 要执行的操作
* @param <T> 操作返回值类型
* @return 操作结果
*/
public <T> T executeWithRetry(Supplier<T> operation) {
return executeWithRetry(operation, getDefaultRetryableExceptions());
}
/**
* 执行带重试的操作
* 使用默认的重试策略和指定的异常类型
*
* @param operation 要执行的操作
* @param retryableExceptions 需要重试的异常类型
* @param <T> 操作返回值类型
* @return 操作结果
*/
public <T> T executeWithRetry(Supplier<T> operation, Set<Class<? extends Throwable>> retryableExceptions) {
if (!retryProperties.isEnabled()) {
log.debug("重试机制已禁用,直接执行操作");
return operation.get();
}
// 创建重试模板
RetryTemplate retryTemplate = new RetryTemplate();
// 配置重试策略
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(
retryProperties.getMaxAttempts(),
Collections.singletonMap(Exception.class, true)
);
retryTemplate.setRetryPolicy(retryPolicy);
// 配置退避策略
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(retryProperties.getInitialInterval());
backOffPolicy.setMultiplier(retryProperties.getMultiplier());
backOffPolicy.setMaxInterval(retryProperties.getMaxInterval());
retryTemplate.setBackOffPolicy(backOffPolicy);
log.debug("开始执行带重试的操作,最大重试次数:{}, 初始间隔:{}ms",
retryProperties.getMaxAttempts(), retryProperties.getInitialInterval());
try {
return retryTemplate.execute(context -> {
int attempt = context.getRetryCount() + 1;
log.debug("执行操作,第{}次尝试", attempt);
try {
return operation.get();
} catch (Throwable e) {
// 检查是否是需要重试的异常类型
if (isRetryableException(e, retryableExceptions)) {
log.warn("操作执行失败,将进行重试,第{}次尝试,异常信息:{}",
attempt, e.getMessage());
throw e;
} else {
log.error("操作执行失败,不进行重试,异常类型:{}", e.getClass().getName());
throw e;
}
}
});
} catch (Throwable e) {
log.error("达到最大重试次数,操作执行失败", e);
throw e;
}
}
/**
* 获取默认的需要重试的异常类型
*
* @return 需要重试的异常类型集合
*/
private Set<Class<? extends Throwable>> getDefaultRetryableExceptions() {
Set<Class<? extends Throwable>> exceptions = new HashSet<>();
exceptions.add(java.net.SocketTimeoutException.class);
exceptions.add(java.io.IOException.class);
exceptions.add(org.springframework.web.client.RestClientException.class);
exceptions.add(IdempotentProcessingException.class);
return exceptions;
}
/**
* 检查异常是否是需要重试的类型
*
* @param e 异常对象
* @param retryableExceptions 需要重试的异常类型集合
* @return true-需要重试,false-不需要重试
*/
private boolean isRetryableException(Throwable e, Set<Class<? extends Throwable>> retryableExceptions) {
if (CollectionUtils.isEmpty(retryableExceptions)) {
return false;
}
for (Class<? extends Throwable> exceptionClass : retryableExceptions) {
if (exceptionClass.isInstance(e)) {
return true;
}
}
return false;
}
}
在定时任务调用第三方接口并同步数据到本地数据库的场景中,数据一致性面临以下挑战:
根据《数据密集型应用系统设计》(Martin Kleppmann)中的理论,我们需要在一致性、可用性和分区容错性之间做出权衡。
在大多数业务场景中,我们可以采用最终一致性方案,通过以下机制保证数据最终达到一致状态:
CREATE TABLE `sync_task_record` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`task_name` varchar(64) NOT NULL COMMENT '任务名称',
`task_id` varchar(64) NOT NULL COMMENT '任务ID',
`business_id` varchar(64) DEFAULT NULL COMMENT '业务ID',
`status` tinyint NOT NULL COMMENT '状态:0-初始化,1-接口调用中,2-接口调用成功,3-数据处理中,4-处理成功,5-处理失败',
`api_request` text COMMENT 'API请求参数',
`api_response` text COMMENT 'API响应数据',
`error_msg` text COMMENT '错误信息',
`retry_count` int NOT NULL DEFAULT '0' COMMENT '重试次数',
`next_retry_time` datetime DEFAULT NULL COMMENT '下次重试时间',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`finish_time` datetime DEFAULT NULL COMMENT '完成时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_task_id` (`task_id`) COMMENT '任务ID唯一索引',
KEY `idx_task_name_status` (`task_name`,`status`) COMMENT '任务名称+状态索引',
KEY `idx_next_retry_time` (`next_retry_time`) COMMENT '下次重试时间索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='同步任务记录表';
import lombok.Data;
import org.hibernate.annotations.DynamicInsert;
import org.hibernate.annotations.DynamicUpdate;
import javax.persistence.*;
import java.time.LocalDateTime;
/**
* 同步任务记录实体
*
* @author 技术专家
*/
@Data
@Entity
@Table(name = "sync_task_record")
@DynamicInsert
@DynamicUpdate
public class SyncTaskRecord {
/**
* 状态:初始化
*/
public static final int STATUS_INIT = 0;
/**
* 状态:接口调用中
*/
public static final int STATUS_API_CALLING = 1;
/**
* 状态:接口调用成功
*/
public static final int STATUS_API_SUCCESS = 2;
/**
* 状态:数据处理中
*/
public static final int STATUS_DATA_PROCESSING = 3;
/**
* 状态:处理成功
*/
public static final int STATUS_SUCCESS = 4;
/**
* 状态:处理失败
*/
public static final int STATUS_FAILED = 5;
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "task_name", nullable = false, length = 64)
private String taskName;
@Column(name = "task_id", nullable = false, length = 64)
private String taskId;
@Column(name = "business_id", length = 64)
private String businessId;
@Column(name = "status", nullable = false)
private Integer status;
@Column(name = "api_request", columnDefinition = "text")
private String apiRequest;
@Column(name = "api_response", columnDefinition = "text")
private String apiResponse;
@Column(name = "error_msg", columnDefinition = "text")
private String errorMsg;
@Column(name = "retry_count", nullable = false)
private Integer retryCount;
@Column(name = "next_retry_time")
private LocalDateTime nextRetryTime;
@Column(name = "create_time", nullable = false, updatable = false)
private LocalDateTime createTime;
@Column(name = "update_time", nullable = false)
private LocalDateTime updateTime;
@Column(name = "finish_time")
private LocalDateTime finishTime;
}
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Optional;
/**
* 同步任务记录数据访问接口
*
* @author 技术专家
*/
public interface SyncTaskRecordRepository extends JpaRepository<SyncTaskRecord, Long>, JpaSpecificationExecutor<SyncTaskRecord> {
/**
* 根据任务ID查询任务记录
*
* @param taskId 任务ID
* @return 任务记录
*/
Optional<SyncTaskRecord> findByTaskId(String taskId);
/**
* 查询需要重试的任务
*
* @param status 任务状态
* @param currentTime 当前时间
* @param limit 最大查询数量
* @return 需要重试的任务列表
*/
@Query("SELECT t FROM SyncTaskRecord t WHERE t.status = :status AND t.nextRetryTime <= :currentTime ORDER BY t.nextRetryTime ASC")
List<SyncTaskRecord> findRetryTasks(@Param("status") Integer status,
@Param("currentTime") LocalDateTime currentTime,
@Param("limit") int limit);
/**
* 更新任务状态
*
* @param id 任务ID
* @param status 新状态
* @param errorMsg 错误信息
* @param finishTime 完成时间
* @return 影响的行数
*/
@Modifying
@Query("UPDATE SyncTaskRecord SET status = :status, errorMsg = :errorMsg, " +
"finishTime = :finishTime, updateTime = CURRENT_TIMESTAMP WHERE id = :id")
int updateStatus(@Param("id") Long id,
@Param("status") Integer status,
@Param("errorMsg") String errorMsg,
@Param("finishTime") LocalDateTime finishTime);
/**
* 更新任务重试信息
*
* @param id 任务ID
* @param status 新状态
* @param retryCount 重试次数
* @param nextRetryTime 下次重试时间
* @param errorMsg 错误信息
* @return 影响的行数
*/
@Modifying
@Query("UPDATE SyncTaskRecord SET status = :status, retryCount = :retryCount, " +
"nextRetryTime = :nextRetryTime, errorMsg = :errorMsg, " +
"updateTime = CURRENT_TIMESTAMP WHERE id = :id")
int updateRetryInfo(@Param("id") Long id,
@Param("status") Integer status,
@Param("retryCount") Integer retryCount,
@Param("nextRetryTime") LocalDateTime nextRetryTime,
@Param("errorMsg") String errorMsg);
}
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
/**
* 同步任务管理器
* 负责管理同步任务的生命周期和状态流转
*
* @author 技术专家
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class SyncTaskManager {
/**
* 最大重试次数
*/
private static final int MAX_RETRY_COUNT = 5;
/**
* 初始重试间隔(分钟)
*/
private static final int INITIAL_RETRY_INTERVAL_MINUTES = 1;
/**
* 重试间隔乘数
*/
private static final double RETRY_INTERVAL_MULTIPLIER = 2.0;
private final SyncTaskRecordRepository syncTaskRecordRepository;
private final ObjectMapper objectMapper;
/**
* 创建同步任务
*
* @param taskName 任务名称
* @param businessId 业务ID
* @param request 接口请求参数
* @return 创建的任务记录
*/
@Transactional(rollbackFor = Exception.class)
public SyncTaskRecord createTask(String taskName, String businessId, Object request) {
Objects.requireNonNull(taskName, "任务名称不能为空");
// 生成唯一任务ID
String taskId = generateTaskId();
// 转换请求参数为JSON
String apiRequest = null;
if (Objects.nonNull(request)) {
try {
apiRequest = objectMapper.writeValueAsString(request);
} catch (JsonProcessingException e) {
log.error("请求参数转换为JSON失败", e);
throw new RuntimeException("请求参数转换为JSON失败", e);
}
}
// 创建任务记录
SyncTaskRecord taskRecord = new SyncTaskRecord();
taskRecord.setTaskName(taskName);
taskRecord.setTaskId(taskId);
taskRecord.setBusinessId(businessId);
taskRecord.setStatus(SyncTaskRecord.STATUS_INIT);
taskRecord.setApiRequest(apiRequest);
taskRecord.setRetryCount(0);
taskRecord.setCreateTime(LocalDateTime.now());
taskRecord.setUpdateTime(LocalDateTime.now());
return syncTaskRecordRepository.save(taskRecord);
}
/**
* 执行同步任务
*
* @param taskId 任务ID
* @param apiCaller 调用第三方接口的函数
* @param dataProcessor 处理接口返回数据的函数
* @param <T> 接口返回数据类型
* @param <R> 处理结果类型
* @return 处理结果
*/
@Transactional(rollbackFor = Exception.class)
public <T, R> R executeTask(String taskId, Function<String, T> apiCaller, Function<T, R> dataProcessor) {
Objects.requireNonNull(taskId, "任务ID不能为空");
Objects.requireNonNull(apiCaller, "API调用函数不能为空");
Objects.requireNonNull(dataProcessor, "数据处理函数不能为空");
// 查询任务记录
SyncTaskRecord taskRecord = syncTaskRecordRepository.findByTaskId(taskId)
.orElseThrow(() -> new RuntimeException("任务不存在,taskId:" + taskId));
log.info("开始执行同步任务,taskId:{}, 当前状态:{}", taskId, taskRecord.getStatus());
// 检查任务状态是否允许执行
if (taskRecord.getStatus() == SyncTaskRecord.STATUS_SUCCESS) {
log.info("任务已成功完成,无需重复执行,taskId:{}", taskId);
return null;
}
if (taskRecord.getStatus() == SyncTaskRecord.STATUS_FAILED &&
taskRecord.getRetryCount() >= MAX_RETRY_COUNT) {
log.warn("任务已达到最大重试次数,不再执行,taskId:{}, 重试次数:{}",
taskId, taskRecord.getRetryCount());
return null;
}
try {
// 更新任务状态为接口调用中
updateTaskStatus(taskRecord.getId(), SyncTaskRecord.STATUS_API_CALLING, null, null);
// 调用第三方接口
T apiResponse = apiCaller.apply(taskRecord.getApiRequest());
// 转换响应数据为JSON并更新任务记录
String apiResponseJson = objectMapper.writeValueAsString(apiResponse);
taskRecord.setApiResponse(apiResponseJson);
syncTaskRecordRepository.save(taskRecord);
// 更新任务状态为接口调用成功
updateTaskStatus(taskRecord.getId(), SyncTaskRecord.STATUS_API_SUCCESS, null, null);
// 更新任务状态为数据处理中
updateTaskStatus(taskRecord.getId(), SyncTaskRecord.STATUS_DATA_PROCESSING, null, null);
// 处理接口返回数据
R result = dataProcessor.apply(apiResponse);
// 更新任务状态为处理成功
updateTaskStatus(taskRecord.getId(), SyncTaskRecord.STATUS_SUCCESS, null, LocalDateTime.now());
log.info("同步任务执行成功,taskId:{}", taskId);
return result;
} catch (Exception e) {
log.error("同步任务执行失败,taskId:{}", taskId, e);
// 计算下次重试时间
int retryCount = taskRecord.getRetryCount() + 1;
LocalDateTime nextRetryTime = calculateNextRetryTime(retryCount);
// 检查是否达到最大重试次数
if (retryCount >= MAX_RETRY_COUNT) {
// 更新任务状态为处理失败,不再重试
updateTaskStatus(taskRecord.getId(), SyncTaskRecord.STATUS_FAILED,
e.getMessage(), LocalDateTime.now());
log.error("同步任务已达到最大重试次数,taskId:{}, 重试次数:{}", taskId, retryCount);
} else {
// 更新任务重试信息
syncTaskRecordRepository.updateRetryInfo(
taskRecord.getId(),
SyncTaskRecord.STATUS_FAILED,
retryCount,
nextRetryTime,
e.getMessage()
);
log.info("同步任务将在下次重试,taskId:{}, 重试次数:{}, 下次重试时间:{}",
taskId, retryCount, nextRetryTime);
}
throw new RuntimeException("同步任务执行失败,taskId:" + taskId, e);
}
}
/**
* 处理需要重试的任务
*
* @param batchSize 批次大小
* @return 处理的任务数量
*/
@Transactional(rollbackFor = Exception.class)
public int processRetryTasks(int batchSize) {
log.info("开始处理需要重试的任务,批次大小:{}", batchSize);
// 查询需要重试的任务
List<SyncTaskRecord> retryTasks = syncTaskRecordRepository.findRetryTasks(
SyncTaskRecord.STATUS_FAILED,
LocalDateTime.now(),
batchSize
);
if (CollectionUtils.isEmpty(retryTasks)) {
log.info("没有需要重试的任务");
return 0;
}
log.info("发现需要重试的任务数量:{}", retryTasks.size());
// 重置任务状态为初始化,等待下次执行
int processedCount = 0;
for (SyncTaskRecord task : retryTasks) {
try {
updateTaskStatus(task.getId(), SyncTaskRecord.STATUS_INIT, null, null);
processedCount++;
} catch (Exception e) {
log.error("重置重试任务状态失败,taskId:{}", task.getTaskId(), e);
}
}
log.info("处理重试任务完成,成功重置状态的任务数量:{}", processedCount);
return processedCount;
}
/**
* 更新任务状态
*
* @param id 任务ID
* @param status 新状态
* @param errorMsg 错误信息
* @param finishTime 完成时间
*/
private void updateTaskStatus(Long id, Integer status, String errorMsg, LocalDateTime finishTime) {
int rowsAffected = syncTaskRecordRepository.updateStatus(id, status, errorMsg, finishTime);
if (rowsAffected == 0) {
log.warn("更新任务状态失败,可能任务已被其他线程修改,id:{}, status:{}", id, status);
} else {
log.debug("更新任务状态成功,id:{}, status:{}", id, status);
}
}
/**
* 生成任务ID
*
* @return 生成的任务ID
*/
private String generateTaskId() {
return "TASK_" + UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();
}
/**
* 计算下次重试时间
*
* @param retryCount 重试次数
* @return 下次重试时间
*/
private LocalDateTime calculateNextRetryTime(int retryCount) {
// 计算重试间隔,使用指数退避策略
long intervalMinutes = (long) (INITIAL_RETRY_INTERVAL_MINUTES * Math.pow(RETRY_INTERVAL_MULTIPLIER, retryCount - 1));
// 加入随机抖动,避免所有失败任务同时重试
double jitter = 1 + (Math.random() - 0.5) * 0.2; // 随机抖动±10%
intervalMinutes = (long) (intervalMinutes * jitter);
// 计算下次重试时间
return LocalDateTime.now().plus(intervalMinutes, ChronoUnit.MINUTES);
}
}
首先,我们需要在 Spring Boot 应用中配置定时任务:
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import java.util.concurrent.Executors;
/**
* 定时任务配置
*
* @author 技术专家
*/
@Configuration
@EnableScheduling
public class SchedulingConfig implements SchedulingConfigurer {
/**
* 定时任务线程池大小
*/
private static final int TASK_POOL_SIZE = 5;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(TASK_POOL_SIZE);
taskScheduler.setThreadNamePrefix("scheduled-task-");
taskScheduler.setAwaitTerminationSeconds(60);
taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
taskScheduler.initialize();
taskRegistrar.setTaskScheduler(taskScheduler);
}
}
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestParam;
/**
* 第三方接口客户端
* 使用Feign调用第三方API
*
* @author 技术专家
*/
@FeignClient(name = "thirdPartyApiClient", url = "${third.party.api.url}")
public interface ThirdPartyApiClient {
/**
* 获取访问令牌
*
* @param request 令牌请求参数
* @return 令牌响应
*/
@PostMapping("/oauth/token")
TokenResponse getToken(TokenRequest request);
/**
* 获取数据列表
*
* @param accessToken 访问令牌
* @param page 页码
* @param pageSize 每页大小
* @return 数据响应
*/
@PostMapping("/api/data/list")
DataListResponse getDataList(
@RequestHeader("Authorization") String accessToken,
@RequestParam("page") int page,
@RequestParam("pageSize") int pageSize);
}
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 数据同步服务
* 负责定时从第三方接口同步数据到本地数据库
*
* @author 技术专家
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class DataSyncService {
/**
* API Key
*/
private static final String API_KEY = "${third.party.api.key}";
/**
* 业务类型
*/
private static final String BUSINESS_TYPE = "DATA_SYNC";
/**
* 每页数据量
*/
private static final int PAGE_SIZE = 100;
private final TokenManager tokenManager;
private final IdempotentProcessor idempotentProcessor;
private final RetryUtils retryUtils;
private final SyncTaskManager syncTaskManager;
private final ThirdPartyApiClient thirdPartyApiClient;
private final LocalDataRepository localDataRepository;
/**
* 同步数据的定时任务
*/
//@Scheduled(cron = "0 0 1 * * ?") // 每天凌晨1点执行
public void syncDataTask() {
log.info("开始执行数据同步任务");
try {
// 创建同步任务记录
SyncTaskRecord taskRecord = syncTaskManager.createTask(
BUSINESS_TYPE, null, "全量数据同步");
// 执行同步任务
syncTaskManager.executeTask(
taskRecord.getTaskId(),
this::callThirdPartyApi,
this::processAndSaveData
);
log.info("数据同步任务执行完成");
} catch (Exception e) {
log.error("数据同步任务执行失败", e);
}
}
/**
* 调用第三方接口获取数据
*
* @param requestParam 请求参数(JSON格式)
* @return 接口返回的数据
*/
private DataListResponse callThirdPartyApi(String requestParam) {
log.info("开始调用第三方接口获取数据");
// 获取有效的访问令牌
String accessToken = tokenManager.getValidAccessToken(API_KEY);
if (StringUtils.isBlank(accessToken)) {
throw new RuntimeException("获取访问令牌失败");
}
// 构建Authorization头
String authorizationHeader = "Bearer " + accessToken;
// 分页获取所有数据
int page = 1;
DataListResponse allData = new DataListResponse();
while (true) {
log.info("调用第三方接口,页码:{}, 每页大小:{}", page, PAGE_SIZE);
// 带重试机制调用接口
DataListResponse response = retryUtils.executeWithRetry(() ->
thirdPartyApiClient.getDataList(authorizationHeader, page, PAGE_SIZE)
);
if (Objects.isNull(response) || CollectionUtils.isEmpty(response.getDataList())) {
log.info("第三方接口返回空数据,页码:{}", page);
break;
}
// 累加数据
allData.getDataList().addAll(response.getDataList());
// 检查是否还有下一页
if (response.getCurrentPage() >= response.getTotalPages()) {
log.info("已获取所有数据,总页数:{}, 总条数:{}",
response.getTotalPages(), response.getTotalCount());
break;
}
page++;
}
log.info("第三方接口调用完成,共获取数据:{}条",
CollectionUtils.isEmpty(allData.getDataList()) ? 0 : allData.getDataList().size());
return allData;
}
/**
* 处理并保存数据到本地数据库
*
* @param response 第三方接口返回的数据
* @return 处理结果
*/
@Transactional(rollbackFor = Exception.class)
private Boolean processAndSaveData(DataListResponse response) {
log.info("开始处理并保存数据,数据量:{}",
CollectionUtils.isEmpty(response.getDataList()) ? 0 : response.getDataList().size());
if (CollectionUtils.isEmpty(response.getDataList())) {
log.info("没有需要处理的数据");
return true;
}
// 逐条处理数据,保证每条数据的幂等性
int successCount = 0;
int failCount = 0;
for (ThirdPartyData data : response.getDataList()) {
try {
// 使用数据ID作为请求ID,保证幂等性
String requestId = data.getId();
// 执行幂等操作
idempotentProcessor.executeIdempotentOperation(BUSINESS_TYPE, requestId, () -> {
// 转换第三方数据为本地数据模型
LocalData localData = convertToLocalData(data);
// 保存或更新本地数据
localDataRepository.save(localData);
return localData;
});
successCount++;
} catch (Exception e) {
log.error("处理数据失败,数据ID:{}", data.getId(), e);
failCount++;
}
}
log.info("数据处理完成,成功:{}条,失败:{}条", successCount, failCount);
if (failCount > 0) {
log.warn("存在处理失败的数据,数量:{}", failCount);
// 可以根据实际业务需求决定是否抛出异常
// throw new RuntimeException("部分数据处理失败,失败数量:" + failCount);
}
return true;
}
/**
* 将第三方数据转换为本地数据模型
*
* @param thirdPartyData 第三方数据
* @return 本地数据模型
*/
private LocalData convertToLocalData(ThirdPartyData thirdPartyData) {
// 实际项目中根据业务需求进行字段映射和转换
LocalData localData = new LocalData();
localData.setExternalId(thirdPartyData.getId());
localData.setName(thirdPartyData.getName());
localData.setValue(thirdPartyData.getValue());
localData.setStatus(thirdPartyData.getStatus());
localData.setSyncTime(LocalDateTime.now());
// 设置其他字段...
return localData;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>third-party-api-sync</artifactId>
<version>1.0.0</version>
<name>third-party-api-sync</name>
<description>定时任务调用第三方接口并同步数据到数据库的示例项目</description>
<properties>
<java.version>17</java.version>
<spring-cloud.version>2023.0.0</spring-cloud.version>
<lombok.version>1.18.30</lombok.version>
<commons-lang3.version>3.14.0</commons-lang3.version>
<mysql-connector.version>8.0.33</mysql-connector.version>
</properties>
<dependencies>
<!-- Spring Boot Starter Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Boot Starter Data JPA -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<!-- Spring Boot Starter Scheduling -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-scheduling</artifactId>
</dependency>
<!-- Spring Boot Starter Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Spring Cloud OpenFeign -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<!-- Spring Retry -->
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- Apache Commons Lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<!-- MySQL Connector -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>${mysql-connector.version}</version>
<scope>runtime</scope>
</dependency>
<!-- Spring Boot Starter Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
为了确保系统的稳定运行,我们需要设计关键监控指标:
使用 Spring Boot Actuator 和 Micrometer 实现监控指标收集:
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.TimeUnit;
/**
* 监控指标收集器
*
* @author 技术专家
*/
@Slf4j
@Component
public class MetricsCollector {
private final MeterRegistry meterRegistry;
// Token相关指标
private Counter tokenRefreshSuccessCounter;
private Counter tokenRefreshFailCounter;
private Timer tokenRefreshTimer;
// 接口调用相关指标
private Counter apiCallSuccessCounter;
private Counter apiCallFailCounter;
private Timer apiCallTimer;
// 任务执行相关指标
private Counter taskSuccessCounter;
private Counter taskFailCounter;
private Timer taskExecuteTimer;
// 数据同步相关指标
private Counter dataSyncSuccessCounter;
private Counter dataSyncFailCounter;
private Timer dataSyncTimer;
public MetricsCollector(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@PostConstruct
public void init() {
// 初始化Token相关指标
tokenRefreshSuccessCounter = meterRegistry.counter("token.refresh.success");
tokenRefreshFailCounter = meterRegistry.counter("token.refresh.fail");
tokenRefreshTimer = Timer.builder("token.refresh.duration")
.description("Token刷新耗时")
.register(meterRegistry);
// 初始化接口调用相关指标
apiCallSuccessCounter = meterRegistry.counter("api.call.success");
apiCallFailCounter = meterRegistry.counter("api.call.fail");
apiCallTimer = Timer.builder("api.call.duration")
.description("接口调用耗时")
.register(meterRegistry);
// 初始化任务执行相关指标
taskSuccessCounter = meterRegistry.counter("task.execute.success");
taskFailCounter = meterRegistry.counter("task.execute.fail");
taskExecuteTimer = Timer.builder("task.execute.duration")
.description("任务执行耗时")
.register(meterRegistry);
// 初始化数据同步相关指标
dataSyncSuccessCounter = meterRegistry.counter("data.sync.success");
dataSyncFailCounter = meterRegistry.counter("data.sync.fail");
dataSyncTimer = Timer.builder("data.sync.duration")
.description("数据同步耗时")
.register(meterRegistry);
log.info("监控指标收集器初始化完成");
}
// Token相关指标记录方法
public void recordTokenRefreshSuccess(long durationMs) {
tokenRefreshSuccessCounter.increment();
tokenRefreshTimer.record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordTokenRefreshFail() {
tokenRefreshFailCounter.increment();
}
// 接口调用相关指标记录方法
public void recordApiCallSuccess(long durationMs) {
apiCallSuccessCounter.increment();
apiCallTimer.record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordApiCallFail() {
apiCallFailCounter.increment();
}
// 任务执行相关指标记录方法
public void recordTaskSuccess(long durationMs) {
taskSuccessCounter.increment();
taskExecuteTimer.record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordTaskFail() {
taskFailCounter.increment();
}
// 数据同步相关指标记录方法
public void recordDataSyncSuccess(long durationMs, int count) {
dataSyncSuccessCounter.increment(count);
dataSyncTimer.record(durationMs, TimeUnit.MILLISECONDS);
}
public void recordDataSyncFail(int count) {
dataSyncFailCounter.increment(count);
}
}
结合 Prometheus 和 Grafana 设置告警规则,当以下情况发生时触发告警:
本文详细介绍了定时任务调用第三方接口并同步数据到数据库时需要注意的核心问题及解决方案,包括:
这些方案不仅考虑了功能实现,还兼顾了性能、安全性和可维护性,已经在多个生产环境中得到验证。
未来,可以进一步探索以下方向: