
在企业级应用中,系统间的数据同步是常见需求。当我们需要通过 XXL-Job 定时任务调用第三方带分页的消息接口时,看似简单的需求背后隐藏着诸多挑战:如何保证消息不重复处理?如何控制每次传输的数据量以避免系统过载?如何高效地批量存储数据?这些问题如果处理不当,可能导致数据不一致、系统性能下降甚至数据丢失等严重问题。
本文将深入探讨基于 XXL-Job 的第三方分页接口同步方案设计,聚焦接口设计层面,从幂等性保障、数据量控制、批量存储等核心问题出发,提供一套可落地的完整解决方案。无论你是正在设计类似系统,还是遇到了同步过程中的棘手问题,相信都能从本文获得有价值的参考。
在开始具体的接口设计前,我们先明确整体方案的架构,以便从全局视角理解各组件的作用和交互方式。


什么是幂等性? 简单来说,就是同一操作执行多次,结果都是一致的。在调用第三方接口进行数据同步时,由于网络波动、任务重试等原因,可能导致同一批数据被多次处理,因此必须保证消息处理的幂等性。
常用的幂等性实现方案有以下几种:
在本方案中,我们采用基于唯一标识 + 业务主键的双重幂等性保障机制。
首先,我们需要设计一张幂等性记录表,用于存储已处理消息的标识:
CREATE TABLE `t_idempotent_record` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`message_id` varchar(64) NOT NULL COMMENT '消息唯一标识',
`business_key` varchar(128) NOT NULL COMMENT '业务主键',
`process_status` tinyint NOT NULL COMMENT '处理状态:0-处理中,1-处理成功,2-处理失败',
`process_time` datetime NOT NULL COMMENT '处理时间',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_message_id` (`message_id`),
UNIQUE KEY `uk_business_key` (`business_key`),
KEY `idx_process_status` (`process_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='幂等性记录表';
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
/**
* 幂等性服务实现类
*
* @author ken
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class IdempotentService extends ServiceImpl<IdempotentRecordMapper, IdempotentRecord> {
/**
* 检查并标记消息,返回需要处理的消息
*
* @param messages 待处理消息列表
* @return 需要处理的消息列表
*/
@Transactional(rollbackFor = Exception.class)
public List<ThirdPartyMessage> checkAndMarkMessages(List<ThirdPartyMessage> messages) {
if (CollectionUtils.isEmpty(messages)) {
return Lists.newArrayList();
}
// 提取所有消息ID和业务主键
List<String> messageIds = messages.stream()
.map(ThirdPartyMessage::getMessageId)
.collect(Collectors.toList());
List<String> businessKeys = messages.stream()
.map(ThirdPartyMessage::getBusinessKey)
.collect(Collectors.toList());
// 查询已存在的记录
LambdaQueryWrapper<IdempotentRecord> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.in(IdempotentRecord::getMessageId, messageIds)
.or()
.in(IdempotentRecord::getBusinessKey, businessKeys);
List<IdempotentRecord> existingRecords = baseMapper.selectList(queryWrapper);
if (CollectionUtils.isNotEmpty(existingRecords)) {
List<String> existingMessageIds = existingRecords.stream()
.map(IdempotentRecord::getMessageId)
.collect(Collectors.toList());
List<String> existingBusinessKeys = existingRecords.stream()
.map(IdempotentRecord::getBusinessKey)
.collect(Collectors.toList());
// 过滤掉已存在的消息
messages = messages.stream()
.filter(msg -> !existingMessageIds.contains(msg.getMessageId())
&& !existingBusinessKeys.contains(msg.getBusinessKey()))
.collect(Collectors.toList());
}
// 标记新消息为处理中
if (CollectionUtils.isNotEmpty(messages)) {
List<IdempotentRecord> records = messages.stream().map(msg -> {
IdempotentRecord record = new IdempotentRecord();
record.setMessageId(msg.getMessageId());
record.setBusinessKey(msg.getBusinessKey());
record.setProcessStatus(0);
record.setProcessTime(LocalDateTime.now());
return record;
}).collect(Collectors.toList());
this.saveBatch(records);
}
return messages;
}
/**
* 更新消息处理状态
*
* @param messageId 消息ID
* @param status 处理状态:0-处理中,1-处理成功,2-处理失败
*/
@Transactional(rollbackFor = Exception.class)
public void updateProcessStatus(String messageId, int status) {
IdempotentRecord record = new IdempotentRecord();
record.setProcessStatus(status);
record.setProcessTime(LocalDateTime.now());
LambdaQueryWrapper<IdempotentRecord> updateWrapper = new LambdaQueryWrapper<>();
updateWrapper.eq(IdempotentRecord::getMessageId, messageId);
baseMapper.update(record, updateWrapper);
}
}
第三方接口采用分页设计,我们需要合理设计分页参数和每次同步的数据量,以平衡同步效率和系统负载。
分页参数通常包括页码(pageNum)和每页条数(pageSize)。在设计时,我们需要考虑:
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
/**
* 分页查询参数
*
* @author ken
*/
@Data
@Schema(description = "分页查询参数")
public class PageQueryParam {
@Schema(description = "页码,从1开始", example = "1")
private Integer pageNum = 1;
@Schema(description = "每页条数", example = "100")
private Integer pageSize = 100;
@Schema(description = "最大每页条数限制", example = "500")
private static final Integer MAX_PAGE_SIZE = 500;
/**
* 校验并修正分页参数
*/
public void validateAndAdjust() {
if (pageNum == null || pageNum < 1) {
pageNum = 1;
}
if (pageSize == null || pageSize < 1) {
pageSize = 100;
}
if (pageSize > MAX_PAGE_SIZE) {
pageSize = MAX_PAGE_SIZE;
}
}
}
为了支持断点续传,我们需要记录每次同步的状态:
CREATE TABLE `t_sync_status` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`sync_task_name` varchar(128) NOT NULL COMMENT '同步任务名称',
`last_success_page` int NOT NULL DEFAULT 0 COMMENT '上次成功同步的页码',
`last_sync_time` datetime DEFAULT NULL COMMENT '上次同步时间',
`sync_status` tinyint NOT NULL COMMENT '同步状态:0-未开始,1-同步中,2-同步成功,3-同步失败',
`error_message` varchar(1024) DEFAULT NULL COMMENT '错误信息',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_sync_task_name` (`sync_task_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='同步状态表';
对应的实体类:
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 同步状态实体类
*
* @author ken
*/
@Data
@TableName("t_sync_status")
@Schema(description = "同步状态实体类")
public class SyncStatus {
@TableId(type = IdType.AUTO)
@Schema(description = "主键ID")
private Long id;
@Schema(description = "同步任务名称")
private String syncTaskName;
@Schema(description = "上次成功同步的页码")
private Integer lastSuccessPage;
@Schema(description = "上次同步时间")
private LocalDateTime lastSyncTime;
@Schema(description = "同步状态:0-未开始,1-同步中,2-同步成功,3-同步失败")
private Integer syncStatus;
@Schema(description = "错误信息")
private String errorMessage;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
根据系统负载动态调整每次同步的数据量:
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 分页大小动态调整器
*
* @author ken
*/
@Slf4j
@Component
public class DynamicPageSizeAdjuster {
// 基础分页大小
private static final int BASE_PAGE_SIZE = 100;
// 最小分页大小
private static final int MIN_PAGE_SIZE = 10;
// 最大分页大小
private static final int MAX_PAGE_SIZE = 500;
// CPU使用率阈值,超过此值则减小分页大小
private static final double CPU_USAGE_THRESHOLD = 0.7;
// 内存使用率阈值,超过此值则减小分页大小
private static final double MEMORY_USAGE_THRESHOLD = 0.8;
// 调整步长
private static final int ADJUST_STEP = 20;
private final AtomicInteger currentPageSize = new AtomicInteger(BASE_PAGE_SIZE);
/**
* 根据系统负载动态调整分页大小
*
* @return 调整后的分页大小
*/
public int adjustPageSize() {
OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
double cpuUsage = osBean.getSystemCpuLoad();
double memoryUsage = calculateMemoryUsage();
log.info("系统负载:CPU使用率={}, 内存使用率={}", cpuUsage, memoryUsage);
int newPageSize = currentPageSize.get();
// 如果CPU或内存使用率过高,减小分页大小
if (cpuUsage > CPU_USAGE_THRESHOLD || memoryUsage > MEMORY_USAGE_THRESHOLD) {
newPageSize = Math.max(MIN_PAGE_SIZE, newPageSize - ADJUST_STEP);
log.info("系统负载过高,调整分页大小为:{}", newPageSize);
}
// 如果系统负载较低,适当增大分页大小
else if (cpuUsage < CPU_USAGE_THRESHOLD * 0.5 && memoryUsage < MEMORY_USAGE_THRESHOLD * 0.5) {
newPageSize = Math.min(MAX_PAGE_SIZE, newPageSize + ADJUST_STEP);
log.info("系统负载较低,调整分页大小为:{}", newPageSize);
}
currentPageSize.set(newPageSize);
return newPageSize;
}
/**
* 计算内存使用率
*
* @return 内存使用率
*/
private double calculateMemoryUsage() {
Runtime runtime = Runtime.getRuntime();
long totalMemory = runtime.totalMemory();
long freeMemory = runtime.freeMemory();
long usedMemory = totalMemory - freeMemory;
return (double) usedMemory / totalMemory;
}
/**
* 获取当前分页大小
*
* @return 当前分页大小
*/
public int getCurrentPageSize() {
return currentPageSize.get();
}
}
为了提高数据存储效率,我们需要实现批量存储机制,同时考虑事务控制和异常处理。
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
* 消息存储服务
*
* @author ken
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageStorageService extends ServiceImpl<ThirdPartyMessageMapper, ThirdPartyMessage> {
// 每批处理的最大数量
private static final int BATCH_SIZE = 500;
private final IdempotentService idempotentService;
/**
* 批量保存消息
*
* @param messages 消息列表
* @return 保存成功的数量
*/
@Transactional(rollbackFor = Exception.class)
public int batchSaveMessages(List<ThirdPartyMessage> messages) {
if (CollectionUtils.isEmpty(messages)) {
log.info("没有需要保存的消息");
return 0;
}
log.info("开始批量保存消息,总数量:{}", messages.size());
// 检查并过滤重复消息
List<ThirdPartyMessage> messagesToSave = idempotentService.checkAndMarkMessages(messages);
if (CollectionUtils.isEmpty(messagesToSave)) {
log.info("所有消息均已处理,无需保存");
return 0;
}
log.info("过滤后需要保存的消息数量:{}", messagesToSave.size());
// 分批保存
List<List<ThirdPartyMessage>> batches = Lists.partition(messagesToSave, BATCH_SIZE);
int totalSaved = 0;
for (List<ThirdPartyMessage> batch : batches) {
try {
boolean success = saveBatch(batch);
if (success) {
int batchSize = batch.size();
totalSaved += batchSize;
log.info("成功保存一批消息,数量:{}", batchSize);
// 更新幂等性记录状态为成功
batch.forEach(msg -> idempotentService.updateProcessStatus(msg.getMessageId(), 1));
} else {
log.error("保存一批消息失败,数量:{}", batch.size());
// 更新幂等性记录状态为失败
batch.forEach(msg -> idempotentService.updateProcessStatus(msg.getMessageId(), 2));
// 抛出异常,触发事务回滚
throw new RuntimeException("批量保存消息失败");
}
} catch (Exception e) {
log.error("保存消息批次时发生异常", e);
// 更新幂等性记录状态为失败
batch.forEach(msg -> idempotentService.updateProcessStatus(msg.getMessageId(), 2));
throw e;
}
}
log.info("批量保存消息完成,成功保存:{} 条", totalSaved);
return totalSaved;
}
/**
* 根据业务主键查询消息
*
* @param businessKey 业务主键
* @return 消息对象,不存在则返回null
*/
public ThirdPartyMessage getMessageByBusinessKey(String businessKey) {
return getOne(Wrappers.<ThirdPartyMessage>lambdaQuery()
.eq(ThirdPartyMessage::getBusinessKey, businessKey));
}
}
首先,我们需要设计一个客户端来调用第三方的分页接口:
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import org.springframework.util.StringUtils;
import java.util.Map;
/**
* 第三方消息接口客户端
*
* @author ken
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ThirdPartyMessageClient {
private final RestTemplate restTemplate;
// 第三方接口URL
private static final String THIRD_PARTY_API_URL = "https://api.thirdparty.com/messages";
/**
* 调用第三方分页接口获取消息
*
* @param pageNum 页码
* @param pageSize 每页条数
* @param apiKey 接口访问密钥
* @return 分页消息响应
*/
public PageResponse<ThirdPartyMessage> getMessages(int pageNum, int pageSize, String apiKey) {
if (pageNum < 1) {
throw new IllegalArgumentException("页码必须大于等于1");
}
if (pageSize < 1) {
throw new IllegalArgumentException("每页条数必须大于等于1");
}
if (!StringUtils.hasText(apiKey, "API密钥不能为空")) {
throw new IllegalArgumentException("API密钥不能为空");
}
log.info("调用第三方消息接口,页码:{},每页条数:{}", pageNum, pageSize);
// 构建请求头
HttpHeaders headers = new HttpHeaders();
headers.set("Authorization", "Bearer " + apiKey);
headers.set("Content-Type", "application/json");
// 构建请求参数
String url = THIRD_PARTY_API_URL + "?pageNum=" + pageNum + "&pageSize=" + pageSize;
try {
// 发送请求
HttpEntity<Void> requestEntity = new HttpEntity<>(headers);
ResponseEntity<String> responseEntity = restTemplate.exchange(
url, HttpMethod.GET, requestEntity, String.class);
// 处理响应
if (responseEntity.is2xxSuccessful()) {
String responseBody = responseEntity.getBody();
if (StringUtils.hasText(responseBody)) {
PageResponse<ThirdPartyMessage> response = JSON.parseObject(
responseBody, new TypeReference<PageResponse<ThirdPartyMessage>>() {});
log.info("第三方消息接口调用成功,页码:{},返回记录数:{}",
pageNum, response.getTotal());
return response;
} else {
log.warn("第三方消息接口返回空响应,页码:{}", pageNum);
return new PageResponse<>();
}
} else {
log.error("第三方消息接口调用失败,状态码:{},页码:{}",
responseEntity.getStatusCodeValue(), pageNum);
throw new RuntimeException(
"第三方接口调用失败,状态码:" + responseEntity.getStatusCodeValue());
}
} catch (Exception e) {
log.error("调用第三方消息接口时发生异常,页码:{}", pageNum, e);
throw new RuntimeException("调用第三方接口发生异常:" + e.getMessage(), e);
}
}
}
分页响应类:
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.util.List;
/**
* 分页响应对象
*
* @author ken
* @param <T> 数据类型
*/
@Data
@Schema(description = "分页响应对象")
public class PageResponse<T> {
@Schema(description = "总记录数")
private long total;
@Schema(description = "总页数")
private int pages;
@Schema(description = "当前页码")
private int pageNum;
@Schema(description = "每页条数")
private int pageSize;
@Schema(description = "当前页数据")
private List<T> list;
@Schema(description = "是否有下一页")
private boolean hasNextPage;
}
第三方消息实体类:
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 第三方消息实体类
*
* @author ken
*/
@Data
@TableName("t_third_party_message")
@Schema(description = "第三方消息实体类")
public class ThirdPartyMessage {
@TableId(type = IdType.AUTO)
@Schema(description = "主键ID")
private Long id;
@Schema(description = "消息唯一标识")
private String messageId;
@Schema(description = "业务主键")
private String businessKey;
@Schema(description = "消息内容")
private String content;
@Schema(description = "消息类型")
private String messageType;
@Schema(description = "消息状态")
private String status;
@Schema(description = "发送时间")
private LocalDateTime sendTime;
@Schema(description = "接收时间")
private LocalDateTime receiveTime;
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}
同步服务是整个方案的核心,负责协调整个同步流程:
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.service.*;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 消息同步服务接口
*
* @author ken
*/
@RestController
@Tag(name = "消息同步服务", description = "第三方消息同步相关接口")
@Service(name = "MessageSyncService", version = "1.0.0",
description = "第三方消息同步服务,负责从第三方接口同步消息并存储到本地数据库")
public interface MessageSyncService {
/**
* 同步指定页码的消息
*
* @param taskName 任务名称
* @param pageNum 页码
* @param pageSize 每页条数
* @return 同步结果
*/
@Operation(summary = "同步指定页码的消息",
description = "从第三方接口同步指定页码的消息,并存储到本地数据库")
@ApiResponses({
@ApiResponse(responseCode = "200", description = "同步成功",
content = @Content(schema = @Schema(implementation = SyncResult.class))),
@ApiResponse(responseCode = "500", description = "同步失败")
})
@PostMapping("/sync/page")
SyncResult syncPage(
@Parameter(description = "任务名称", required = true)
@RequestParam String taskName,
@Parameter(description = "页码", required = true)
@RequestParam int pageNum,
@Parameter(description = "每页条数")
@RequestParam(required = false) Integer pageSize);
/**
* 同步所有消息
*
* @param taskName 任务名称
* @return 同步结果
*/
@Operation(summary = "同步所有消息",
description = "从第三方接口同步所有消息,从上次成功同步的页码开始")
@ApiResponses({
@ApiResponse(responseCode = "200", description = "同步成功",
content = @Content(schema = @Schema(implementation = SyncResult.class))),
@ApiResponse(responseCode = "500", description = "同步失败")
})
@PostMapping("/sync/all")
SyncResult syncAll(
@Parameter(description = "任务名称", required = true)
@RequestParam String taskName);
/**
* 重试失败的同步任务
*
* @param taskName 任务名称
* @return 同步结果
*/
@Operation(summary = "重试失败的同步任务",
description = "重试上次失败的同步任务")
@ApiResponses({
@ApiResponse(responseCode = "200", description = "重试成功",
content = @Content(schema = @Schema(implementation = SyncResult.class))),
@ApiResponse(responseCode = "500", description = "重试失败")
})
@PostMapping("/sync/retry")
SyncResult retryFailedSync(
@Parameter(description = "任务名称", required = true)
@RequestParam String taskName);
}
同步结果类:
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
/**
* 同步结果
*
* @author ken
*/
@Data
@Schema(description = "同步结果")
public class SyncResult {
@Schema(description = "是否成功")
private boolean success;
@Schema(description = "消息")
private String message;
@Schema(description = "同步的页码")
private int pageNum;
@Schema(description = "同步的记录数")
private int recordCount;
@Schema(description = "总记录数")
private long totalCount;
@Schema(description = "总页数")
private int totalPages;
@Schema(description = "耗时(毫秒)")
private long costTime;
/**
* 创建成功的同步结果
*
* @param pageNum 页码
* @param recordCount 记录数
* @param totalCount 总记录数
* @param totalPages 总页数
* @param costTime 耗时
* @return 同步结果
*/
public static SyncResult success(int pageNum, int recordCount, long totalCount, int totalPages, long costTime) {
SyncResult result = new SyncResult();
result.setSuccess(true);
result.setMessage("同步成功");
result.setPageNum(pageNum);
result.setRecordCount(recordCount);
result.setTotalCount(totalCount);
result.setTotalPages(totalPages);
result.setCostTime(costTime);
return result;
}
/**
* 创建失败的同步结果
*
* @param message 失败消息
* @return 同步结果
*/
public static SyncResult failure(String message) {
SyncResult result = new SyncResult();
result.setSuccess(false);
result.setMessage(message);
return result;
}
}
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
/**
* 消息同步服务实现类
*
* @author ken
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageSyncServiceImpl extends ServiceImpl<SyncStatusMapper, SyncStatus> implements MessageSyncService {
private final ThirdPartyMessageClient thirdPartyMessageClient;
private final MessageStorageService messageStorageService;
private final DynamicPageSizeAdjuster pageSizeAdjuster;
@Value("${thirdparty.api.key}")
private String apiKey;
/**
* 默认分页大小
*/
private static final int DEFAULT_PAGE_SIZE = 100;
@Override
@Transactional(rollbackFor = Exception.class)
public SyncResult syncPage(String taskName, int pageNum, Integer pageSize) {
long startTime = System.currentTimeMillis();
log.info("开始同步任务:{},页码:{}", taskName, pageNum);
try {
// 参数处理
if (!ObjectUtils.isEmpty(pageSize)) {
pageSizeAdjuster.adjustPageSize();
} else {
pageSize = pageSizeAdjuster.getCurrentPageSize();
}
// 更新同步状态为同步中
updateSyncStatus(taskName, 1, pageNum - 1, null);
// 调用第三方接口获取消息
PageResponse<ThirdPartyMessage> response = thirdPartyMessageClient.getMessages(
pageNum, pageSize, apiKey);
// 保存消息
int savedCount = 0;
if (!ObjectUtils.isEmpty(response.getList())) {
savedCount = messageStorageService.batchSaveMessages(response.getList());
}
// 更新同步状态为成功
updateSyncStatus(taskName, 2, pageNum, null);
long costTime = System.currentTimeMillis() - startTime;
log.info("同步任务:{},页码:{} 完成,耗时:{}ms,同步记录数:{}",
taskName, pageNum, costTime, savedCount);
return SyncResult.success(pageNum, savedCount, response.getTotal(),
response.getPages(), costTime);
} catch (Exception e) {
long costTime = System.currentTimeMillis() - startTime;
log.error("同步任务:{},页码:{} 失败", taskName, pageNum, e);
// 更新同步状态为失败
updateSyncStatus(taskName, 3, pageNum - 1, e.getMessage());
return SyncResult.failure("同步失败:" + e.getMessage());
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public SyncResult syncAll(String taskName) {
long startTime = System.currentTimeMillis();
log.info("开始全量同步任务:{}", taskName);
try {
// 获取上次同步状态
SyncStatus syncStatus = getSyncStatus(taskName);
int startPage = syncStatus.getLastSuccessPage() + 1;
int pageSize = pageSizeAdjuster.adjustPageSize();
// 更新同步状态为同步中
updateSyncStatus(taskName, 1, syncStatus.getLastSuccessPage(), null);
int totalSaved = 0;
long totalCount = 0;
int totalPages = 0;
int currentPage = startPage;
boolean hasNextPage = true;
// 循环同步所有页
while (hasNextPage) {
log.info("全量同步任务:{},正在同步页码:{}", taskName, currentPage);
// 调用第三方接口获取消息
PageResponse<ThirdPartyMessage> response = thirdPartyMessageClient.getMessages(
currentPage, pageSize, apiKey);
totalCount = response.getTotal();
totalPages = response.getPages();
hasNextPage = response.isHasNextPage();
// 保存消息
if (!ObjectUtils.isEmpty(response.getList())) {
int savedCount = messageStorageService.batchSaveMessages(response.getList());
totalSaved += savedCount;
log.info("全量同步任务:{},页码:{} 同步完成,本页保存:{} 条",
taskName, currentPage, savedCount);
} else {
log.info("全量同步任务:{},页码:{} 没有需要同步的消息", taskName, currentPage);
}
// 更新当前成功页码
updateSyncStatus(taskName, 1, currentPage, null);
// 准备下一页
currentPage++;
// 动态调整分页大小
pageSize = pageSizeAdjuster.adjustPageSize();
}
// 所有页同步完成,更新状态为成功
updateSyncStatus(taskName, 2, totalPages, null);
long costTime = System.currentTimeMillis() - startTime;
log.info("全量同步任务:{} 完成,耗时:{}ms,总同步记录数:{}",
taskName, costTime, totalSaved);
return SyncResult.success(totalPages, totalSaved, totalCount, totalPages, costTime);
} catch (Exception e) {
long costTime = System.currentTimeMillis() - startTime;
log.error("全量同步任务:{} 失败", taskName, e);
// 保持上次成功的页码,更新状态为失败
SyncStatus syncStatus = getSyncStatus(taskName);
updateSyncStatus(taskName, 3, syncStatus.getLastSuccessPage(), e.getMessage());
return SyncResult.failure("全量同步失败:" + e.getMessage());
}
}
@Override
@Transactional(rollbackFor = Exception.class)
public SyncResult retryFailedSync(String taskName) {
log.info("开始重试失败的同步任务:{}", taskName);
// 获取上次同步状态
SyncStatus syncStatus = getSyncStatus(taskName);
// 如果上次同步不是失败状态,直接返回成功
if (syncStatus.getSyncStatus() != 3) {
log.info("同步任务:{} 上次同步状态不是失败,无需重试", taskName);
return SyncResult.success(
syncStatus.getLastSuccessPage(), 0, 0, 0, 0);
}
// 从上次失败的页码开始重试全量同步
return syncAll(taskName);
}
/**
* 获取同步状态,如果不存在则创建
*
* @param taskName 任务名称
* @return 同步状态
*/
private SyncStatus getSyncStatus(String taskName) {
LambdaQueryWrapper<SyncStatus> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(SyncStatus::getSyncTaskName, taskName);
SyncStatus syncStatus = baseMapper.selectOne(queryWrapper);
if (ObjectUtils.isEmpty(syncStatus)) {
syncStatus = new SyncStatus();
syncStatus.setSyncTaskName(taskName);
syncStatus.setLastSuccessPage(0);
syncStatus.setSyncStatus(0);
baseMapper.insert(syncStatus);
log.info("创建新的同步状态记录,任务名称:{}", taskName);
}
return syncStatus;
}
/**
* 更新同步状态
*
* @param taskName 任务名称
* @param status 状态:0-未开始,1-同步中,2-同步成功,3-同步失败
* @param lastSuccessPage 上次成功页码
* @param errorMessage 错误信息
*/
private void updateSyncStatus(String taskName, int status, int lastSuccessPage, String errorMessage) {
SyncStatus syncStatus = new SyncStatus();
syncStatus.setSyncStatus(status);
syncStatus.setLastSuccessPage(lastSuccessPage);
syncStatus.setLastSyncTime(java.time.LocalDateTime.now());
syncStatus.setErrorMessage(errorMessage);
LambdaQueryWrapper<SyncStatus> updateWrapper = new LambdaQueryWrapper<>();
updateWrapper.eq(SyncStatus::getSyncTaskName, taskName);
baseMapper.update(syncStatus, updateWrapper);
log.info("更新同步状态,任务名称:{},状态:{},上次成功页码:{}",
taskName, status, lastSuccessPage);
}
}
虽然 XXL-Job 已经搭建完成,但我们需要实现具体的任务处理器:
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* 消息同步XXL-Job任务处理器
*
* @author ken
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageSyncXxlJobHandler {
private final MessageSyncService messageSyncService;
/**
* 全量同步消息任务
* 执行参数格式:taskName=xxx
*/
@XxlJob("fullMessageSyncJob")
public void fullMessageSyncJob() {
log.info("开始执行全量同步消息任务");
try {
// 获取任务参数
String param = XxlJobHelper.getJobParam();
if (!org.springframework.util.StringUtils.hasText(param)) {
throw new IllegalArgumentException("任务参数不能为空,格式:taskName=xxx");
}
// 解析参数
String[] paramParts = param.split("=");
if (paramParts.length != 2 || !"taskName".equals(paramParts[0])) {
throw new IllegalArgumentException("参数格式错误,正确格式:taskName=xxx");
}
String taskName = paramParts[1];
// 执行同步
SyncResult result = messageSyncService.syncAll(taskName);
if (result.isSuccess()) {
XxlJobHelper.handleSuccess(
String.format("全量同步任务完成,任务名称:%s,总同步记录数:%d,耗时:%dms",
taskName, result.getRecordCount(), result.getCostTime()));
} else {
XxlJobHelper.handleFail(
String.format("全量同步任务失败,任务名称:%s,原因:%s",
taskName, result.getMessage()));
}
} catch (Exception e) {
log.error("全量同步消息任务执行异常", e);
XxlJobHelper.handleFail("全量同步任务执行异常:" + e.getMessage());
}
}
/**
* 分页同步消息任务
* 执行参数格式:taskName=xxx&pageNum=1&pageSize=100
*/
@XxlJob("pageMessageSyncJob")
public void pageMessageSyncJob() {
log.info("开始执行分页同步消息任务");
try {
// 获取任务参数
String param = XxlJobHelper.getJobParam();
if (!org.springframework.util.StringUtils.hasText(param)) {
throw new IllegalArgumentException("任务参数不能为空,格式:taskName=xxx&pageNum=1&pageSize=100");
}
// 解析参数
String[] paramParts = param.split("&");
String taskName = null;
Integer pageNum = null;
Integer pageSize = null;
for (String part : paramParts) {
String[] keyValue = part.split("=");
if (keyValue.length != 2) {
continue;
}
switch (keyValue[0]) {
case "taskName":
taskName = keyValue[1];
break;
case "pageNum":
try {
pageNum = Integer.parseInt(keyValue[1]);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("pageNum必须是整数");
}
break;
case "pageSize":
try {
pageSize = Integer.parseInt(keyValue[1]);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("pageSize必须是整数");
}
break;
default:
break;
}
}
// 参数校验
if (!org.springframework.util.StringUtils.hasText(taskName)) {
throw new IllegalArgumentException("taskName不能为空");
}
if (pageNum == null || pageNum < 1) {
throw new IllegalArgumentException("pageNum必须大于等于1");
}
// 执行同步
SyncResult result = messageSyncService.syncPage(taskName, pageNum, pageSize);
if (result.isSuccess()) {
XxlJobHelper.handleSuccess(
String.format("分页同步任务完成,任务名称:%s,页码:%d,同步记录数:%d,耗时:%dms",
taskName, pageNum, result.getRecordCount(), result.getCostTime()));
} else {
XxlJobHelper.handleFail(
String.format("分页同步任务失败,任务名称:%s,页码:%d,原因:%s",
taskName, pageNum, result.getMessage()));
}
} catch (Exception e) {
log.error("分页同步消息任务执行异常", e);
XxlJobHelper.handleFail("分页同步任务执行异常:" + e.getMessage());
}
}
/**
* 重试失败的同步任务
* 执行参数格式:taskName=xxx
*/
@XxlJob("retryFailedSyncJob")
public void retryFailedSyncJob() {
log.info("开始执行重试失败的同步任务");
try {
// 获取任务参数
String param = XxlJobHelper.getJobParam();
if (!org.springframework.util.StringUtils.hasText(param)) {
throw new IllegalArgumentException("任务参数不能为空,格式:taskName=xxx");
}
// 解析参数
String[] paramParts = param.split("=");
if (paramParts.length != 2 || !"taskName".equals(paramParts[0])) {
throw new IllegalArgumentException("参数格式错误,正确格式:taskName=xxx");
}
String taskName = paramParts[1];
// 执行重试
SyncResult result = messageSyncService.retryFailedSync(taskName);
if (result.isSuccess()) {
XxlJobHelper.handleSuccess(
String.format("重试同步任务完成,任务名称:%s,同步记录数:%d,耗时:%dms",
taskName, result.getRecordCount(), result.getCostTime()));
} else {
XxlJobHelper.handleFail(
String.format("重试同步任务失败,任务名称:%s,原因:%s",
taskName, result.getMessage()));
}
} catch (Exception e) {
log.error("重试同步任务执行异常", e);
XxlJobHelper.handleFail("重试同步任务执行异常:" + e.getMessage());
}
}
}
为了提高系统的健壮性,我们需要设计完善的异常处理机制:
import lombok.Getter;
/**
* 同步服务异常类
*
* @author ken
*/
@Getter
public class SyncServiceException extends RuntimeException {
/**
* 错误代码
*/
private final String errorCode;
/**
* 构造函数
*
* @param errorCode 错误代码
* @param message 错误消息
*/
public SyncServiceException(String errorCode, String message) {
super(message);
this.errorCode = errorCode;
}
/**
* 构造函数
*
* @param errorCode 错误代码
* @param message 错误消息
* @param cause 异常原因
*/
public SyncServiceException(String errorCode, String message, Throwable cause) {
super(message, cause);
this.errorCode = errorCode;
}
// 常用错误代码
public static final String THIRD_PARTY_API_ERROR = "THIRD_PARTY_API_ERROR";
public static final String DATA_STORAGE_ERROR = "DATA_STORAGE_ERROR";
public static final String PARAMETER_ERROR = "PARAMETER_ERROR";
public static final String SYSTEM_ERROR = "SYSTEM_ERROR";
}
全局异常处理器:
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
/**
* 全局异常处理器
*
* @author ken
*/
@Slf4j
@RestControllerAdvice
public class GlobalExceptionHandler {
/**
* 处理同步服务异常
*
* @param e 同步服务异常
* @return 错误响应
*/
@ExceptionHandler(SyncServiceException.class)
public ResponseEntity<ErrorResponse> handleSyncServiceException(SyncServiceException e) {
log.error("同步服务异常:{},错误代码:{}", e.getMessage(), e.getErrorCode(), e);
ErrorResponse errorResponse = new ErrorResponse();
errorResponse.setErrorCode(e.getErrorCode());
errorResponse.setMessage(e.getMessage());
return new ResponseEntity<>(errorResponse, HttpStatus.INTERNAL_SERVER_ERROR);
}
/**
* 处理非法参数异常
*
* @param e 非法参数异常
* @return 错误响应
*/
@ExceptionHandler(IllegalArgumentException.class)
public ResponseEntity<ErrorResponse> handleIllegalArgumentException(IllegalArgumentException e) {
log.error("非法参数异常:{}", e.getMessage(), e);
ErrorResponse errorResponse = new ErrorResponse();
errorResponse.setErrorCode(SyncServiceException.PARAMETER_ERROR);
errorResponse.setMessage(e.getMessage());
return new ResponseEntity<>(errorResponse, HttpStatus.BAD_REQUEST);
}
/**
* 处理通用异常
*
* @param e 异常
* @return 错误响应
*/
@ExceptionHandler(Exception.class)
public ResponseEntity<ErrorResponse> handleGeneralException(Exception e) {
log.error("系统异常:{}", e.getMessage(), e);
ErrorResponse errorResponse = new ErrorResponse();
errorResponse.setErrorCode(SyncServiceException.SYSTEM_ERROR);
errorResponse.setMessage("系统异常,请联系管理员");
return new ResponseEntity<>(errorResponse, HttpStatus.INTERNAL_SERVER_ERROR);
}
}
错误响应类:
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
/**
* 错误响应
*
* @author ken
*/
@Data
@Schema(description = "错误响应")
public class ErrorResponse {
@Schema(description = "错误代码")
private String errorCode;
@Schema(description = "错误消息")
private String message;
}
为了及时发现和解决问题,我们需要设计监控告警机制:
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 同步服务监控指标
*
* @author ken
*/
@Component
@RequiredArgsConstructor
public class SyncMetrics {
private final MeterRegistry meterRegistry;
// 同步成功计数器
private final Counter syncSuccessCounter;
// 同步失败计数器
private final Counter syncFailureCounter;
// 同步记录计数器
private final Counter syncRecordCounter;
// 当前同步页码 gauge
private final AtomicInteger currentPageGauge = new AtomicInteger(0);
// 同步耗时计时器
private final Timer syncTimer;
/**
* 构造函数,初始化监控指标
*
* @param meterRegistry 指标注册器
*/
public SyncMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// 初始化计数器
this.syncSuccessCounter = meterRegistry.counter("sync.success.count");
this.syncFailureCounter = meterRegistry.counter("sync.failure.count");
this.syncRecordCounter = meterRegistry.counter("sync.record.count");
// 初始化当前同步页码 gauge
Gauge.builder("sync.current.page", currentPageGauge, AtomicInteger::get)
.description("当前同步的页码")
.register(meterRegistry);
// 初始化同步耗时计时器
this.syncTimer = Timer.builder("sync.process.time")
.description("同步处理耗时")
.register(meterRegistry);
}
/**
* 记录同步成功
*/
public void recordSuccess() {
syncSuccessCounter.increment();
}
/**
* 记录同步失败
*/
public void recordFailure() {
syncFailureCounter.increment();
}
/**
* 记录同步的记录数
*
* @param count 记录数
*/
public void recordRecords(int count) {
syncRecordCounter.increment(count);
}
/**
* 更新当前同步页码
*
* @param pageNum 页码
*/
public void updateCurrentPage(int pageNum) {
currentPageGauge.set(pageNum);
}
/**
* 开始计时
*
* @return 计时器样本
*/
public Timer.Sample startTimer() {
return Timer.start(meterRegistry);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>message-sync-service</artifactId>
<version>1.0.0</version>
<name>message-sync-service</name>
<description>Message Sync Service with XXL-Job</description>
<properties>
<java.version>17</java.version>
<mybatis-plus.version>3.5.5</mybatis-plus.version>
<xxl-job.version>2.4.0</xxl-job.version>
<fastjson2.version>2.0.32</fastjson2.version>
<lombok.version>1.18.30</lombok.version>
<guava.version>32.1.3-jre</guava.version>
<springdoc.version>2.1.0</springdoc.version>
<micrometer.version>1.12.0</micrometer.version>
</properties>
<dependencies>
<!-- Spring Boot Starters -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- Database -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- MyBatis-Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<!-- XXL-Job -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>${xxl-job.version}</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- FastJSON2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<!-- Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- Swagger3 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<!-- Micrometer for metrics -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
本文详细介绍了基于 XXL-Job 调用第三方分页消息接口的同步方案设计,主要包含以下几个核心部分:
通过本文介绍的方案,你可以构建一个高可靠、高性能的第三方接口同步系统,有效解决数据同步过程中的各种挑战。在实际应用中,还需要根据具体业务场景进行适当调整和优化,以达到最佳效果。