首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >从 0 到 1 设计高可靠 XXL-Job 同步方案:攻克第三方分页接口的幂等性与批量处理难题

从 0 到 1 设计高可靠 XXL-Job 同步方案:攻克第三方分页接口的幂等性与批量处理难题

作者头像
果酱带你啃java
发布2026-04-14 10:39:02
发布2026-04-14 10:39:02
540
举报

引言:为什么需要专门设计同步方案?

在企业级应用中,系统间的数据同步是常见需求。当我们需要通过 XXL-Job 定时任务调用第三方带分页的消息接口时,看似简单的需求背后隐藏着诸多挑战:如何保证消息不重复处理?如何控制每次传输的数据量以避免系统过载?如何高效地批量存储数据?这些问题如果处理不当,可能导致数据不一致、系统性能下降甚至数据丢失等严重问题。

本文将深入探讨基于 XXL-Job 的第三方分页接口同步方案设计,聚焦接口设计层面,从幂等性保障、数据量控制、批量存储等核心问题出发,提供一套可落地的完整解决方案。无论你是正在设计类似系统,还是遇到了同步过程中的棘手问题,相信都能从本文获得有价值的参考。

一、整体方案架构设计

在开始具体的接口设计前,我们先明确整体方案的架构,以便从全局视角理解各组件的作用和交互方式。

1.1 系统架构图

代码语言:javascript
复制

1.2 核心流程

代码语言:javascript
复制

二、核心问题分析与解决方案设计

2.1 消息幂等性设计

什么是幂等性? 简单来说,就是同一操作执行多次,结果都是一致的。在调用第三方接口进行数据同步时,由于网络波动、任务重试等原因,可能导致同一批数据被多次处理,因此必须保证消息处理的幂等性。

2.1.1 幂等性实现方案

常用的幂等性实现方案有以下几种:

  1. 基于唯一标识的幂等性设计:为每条消息分配一个唯一标识,处理前先检查该标识是否已处理过
  2. 基于业务主键的幂等性设计:利用业务数据本身的唯一键(如订单号)来判断是否已处理
  3. 基于状态机的幂等性设计:通过状态流转来保证重复处理不会改变最终结果

在本方案中,我们采用基于唯一标识 + 业务主键的双重幂等性保障机制。

2.1.2 幂等性表设计

首先,我们需要设计一张幂等性记录表,用于存储已处理消息的标识:

代码语言:javascript
复制
CREATE TABLE `t_idempotent_record` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `message_id` varchar(64) NOT NULL COMMENT '消息唯一标识',
  `business_key` varchar(128) NOT NULL COMMENT '业务主键',
  `process_status` tinyint NOT NULL COMMENT '处理状态:0-处理中,1-处理成功,2-处理失败',
  `process_time` datetime NOT NULL COMMENT '处理时间',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_message_id` (`message_id`),
  UNIQUE KEY `uk_business_key` (`business_key`),
  KEY `idx_process_status` (`process_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='幂等性记录表';
代码语言:javascript
复制

2.1.3 幂等性检查实现
代码语言:javascript
复制
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;

/**
 * 幂等性服务实现类
 *
 * @author ken
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class IdempotentService extends ServiceImpl<IdempotentRecordMapper, IdempotentRecord> {

    /**
     * 检查并标记消息,返回需要处理的消息
     *
     * @param messages 待处理消息列表
     * @return 需要处理的消息列表
     */
    @Transactional(rollbackFor = Exception.class)
    public List<ThirdPartyMessage> checkAndMarkMessages(List<ThirdPartyMessage> messages) {
        if (CollectionUtils.isEmpty(messages)) {
            return Lists.newArrayList();
        }

        // 提取所有消息ID和业务主键
        List<String> messageIds = messages.stream()
                .map(ThirdPartyMessage::getMessageId)
                .collect(Collectors.toList());
        List<String> businessKeys = messages.stream()
                .map(ThirdPartyMessage::getBusinessKey)
                .collect(Collectors.toList());

        // 查询已存在的记录
        LambdaQueryWrapper<IdempotentRecord> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.in(IdempotentRecord::getMessageId, messageIds)
                .or()
                .in(IdempotentRecord::getBusinessKey, businessKeys);

        List<IdempotentRecord> existingRecords = baseMapper.selectList(queryWrapper);
        if (CollectionUtils.isNotEmpty(existingRecords)) {
            List<String> existingMessageIds = existingRecords.stream()
                    .map(IdempotentRecord::getMessageId)
                    .collect(Collectors.toList());
            List<String> existingBusinessKeys = existingRecords.stream()
                    .map(IdempotentRecord::getBusinessKey)
                    .collect(Collectors.toList());

            // 过滤掉已存在的消息
            messages = messages.stream()
                    .filter(msg -> !existingMessageIds.contains(msg.getMessageId()) 
                            && !existingBusinessKeys.contains(msg.getBusinessKey()))
                    .collect(Collectors.toList());
        }

        // 标记新消息为处理中
        if (CollectionUtils.isNotEmpty(messages)) {
            List<IdempotentRecord> records = messages.stream().map(msg -> {
                IdempotentRecord record = new IdempotentRecord();
                record.setMessageId(msg.getMessageId());
                record.setBusinessKey(msg.getBusinessKey());
                record.setProcessStatus(0);
                record.setProcessTime(LocalDateTime.now());
                return record;
            }).collect(Collectors.toList());

            this.saveBatch(records);
        }

        return messages;
    }

    /**
     * 更新消息处理状态
     *
     * @param messageId 消息ID
     * @param status 处理状态:0-处理中,1-处理成功,2-处理失败
     */
    @Transactional(rollbackFor = Exception.class)
    public void updateProcessStatus(String messageId, int status) {
        IdempotentRecord record = new IdempotentRecord();
        record.setProcessStatus(status);
        record.setProcessTime(LocalDateTime.now());

        LambdaQueryWrapper<IdempotentRecord> updateWrapper = new LambdaQueryWrapper<>();
        updateWrapper.eq(IdempotentRecord::getMessageId, messageId);

        baseMapper.update(record, updateWrapper);
    }
}
代码语言:javascript
复制

2.2 分页参数与数据量控制

第三方接口采用分页设计,我们需要合理设计分页参数和每次同步的数据量,以平衡同步效率和系统负载。

2.2.1 分页参数设计

分页参数通常包括页码(pageNum)和每页条数(pageSize)。在设计时,我们需要考虑:

  1. 最大 pageSize 限制:防止单次请求数据量过大
  2. 动态调整机制:根据系统负载动态调整 pageSize
  3. 分页状态记录:记录上次同步的分页位置,支持断点续传
代码语言:javascript
复制
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

/**
 * 分页查询参数
 *
 * @author ken
 */
@Data
@Schema(description = "分页查询参数")
public class PageQueryParam {

    @Schema(description = "页码,从1开始", example = "1")
    private Integer pageNum = 1;

    @Schema(description = "每页条数", example = "100")
    private Integer pageSize = 100;

    @Schema(description = "最大每页条数限制", example = "500")
    private static final Integer MAX_PAGE_SIZE = 500;

    /**
     * 校验并修正分页参数
     */
    public void validateAndAdjust() {
        if (pageNum == null || pageNum < 1) {
            pageNum = 1;
        }
        if (pageSize == null || pageSize < 1) {
            pageSize = 100;
        }
        if (pageSize > MAX_PAGE_SIZE) {
            pageSize = MAX_PAGE_SIZE;
        }
    }
}
代码语言:javascript
复制

2.2.2 同步状态记录设计

为了支持断点续传,我们需要记录每次同步的状态:

代码语言:javascript
复制
CREATE TABLE `t_sync_status` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `sync_task_name` varchar(128) NOT NULL COMMENT '同步任务名称',
  `last_success_page` int NOT NULL DEFAULT 0 COMMENT '上次成功同步的页码',
  `last_sync_time` datetime DEFAULT NULL COMMENT '上次同步时间',
  `sync_status` tinyint NOT NULL COMMENT '同步状态:0-未开始,1-同步中,2-同步成功,3-同步失败',
  `error_message` varchar(1024) DEFAULT NULL COMMENT '错误信息',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_sync_task_name` (`sync_task_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='同步状态表';
代码语言:javascript
复制

对应的实体类:

代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

import java.time.LocalDateTime;

/**
 * 同步状态实体类
 *
 * @author ken
 */
@Data
@TableName("t_sync_status")
@Schema(description = "同步状态实体类")
public class SyncStatus {

    @TableId(type = IdType.AUTO)
    @Schema(description = "主键ID")
    private Long id;

    @Schema(description = "同步任务名称")
    private String syncTaskName;

    @Schema(description = "上次成功同步的页码")
    private Integer lastSuccessPage;

    @Schema(description = "上次同步时间")
    private LocalDateTime lastSyncTime;

    @Schema(description = "同步状态:0-未开始,1-同步中,2-同步成功,3-同步失败")
    private Integer syncStatus;

    @Schema(description = "错误信息")
    private String errorMessage;

    @Schema(description = "创建时间")
    private LocalDateTime createTime;

    @Schema(description = "更新时间")
    private LocalDateTime updateTime;
}
代码语言:javascript
复制

2.2.3 数据量动态调整策略

根据系统负载动态调整每次同步的数据量:

代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 分页大小动态调整器
 *
 * @author ken
 */
@Slf4j
@Component
public class DynamicPageSizeAdjuster {

    // 基础分页大小
    private static final int BASE_PAGE_SIZE = 100;
    // 最小分页大小
    private static final int MIN_PAGE_SIZE = 10;
    // 最大分页大小
    private static final int MAX_PAGE_SIZE = 500;
    // CPU使用率阈值,超过此值则减小分页大小
    private static final double CPU_USAGE_THRESHOLD = 0.7;
    // 内存使用率阈值,超过此值则减小分页大小
    private static final double MEMORY_USAGE_THRESHOLD = 0.8;
    // 调整步长
    private static final int ADJUST_STEP = 20;

    private final AtomicInteger currentPageSize = new AtomicInteger(BASE_PAGE_SIZE);

    /**
     * 根据系统负载动态调整分页大小
     *
     * @return 调整后的分页大小
     */
    public int adjustPageSize() {
        OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
        double cpuUsage = osBean.getSystemCpuLoad();
        double memoryUsage = calculateMemoryUsage();

        log.info("系统负载:CPU使用率={}, 内存使用率={}", cpuUsage, memoryUsage);

        int newPageSize = currentPageSize.get();

        // 如果CPU或内存使用率过高,减小分页大小
        if (cpuUsage > CPU_USAGE_THRESHOLD || memoryUsage > MEMORY_USAGE_THRESHOLD) {
            newPageSize = Math.max(MIN_PAGE_SIZE, newPageSize - ADJUST_STEP);
            log.info("系统负载过高,调整分页大小为:{}", newPageSize);
        } 
        // 如果系统负载较低,适当增大分页大小
        else if (cpuUsage < CPU_USAGE_THRESHOLD * 0.5 && memoryUsage < MEMORY_USAGE_THRESHOLD * 0.5) {
            newPageSize = Math.min(MAX_PAGE_SIZE, newPageSize + ADJUST_STEP);
            log.info("系统负载较低,调整分页大小为:{}", newPageSize);
        }

        currentPageSize.set(newPageSize);
        return newPageSize;
    }

    /**
     * 计算内存使用率
     *
     * @return 内存使用率
     */
    private double calculateMemoryUsage() {
        Runtime runtime = Runtime.getRuntime();
        long totalMemory = runtime.totalMemory();
        long freeMemory = runtime.freeMemory();
        long usedMemory = totalMemory - freeMemory;
        return (double) usedMemory / totalMemory;
    }

    /**
     * 获取当前分页大小
     *
     * @return 当前分页大小
     */
    public int getCurrentPageSize() {
        return currentPageSize.get();
    }
}
代码语言:javascript
复制

2.3 批量存储设计

为了提高数据存储效率,我们需要实现批量存储机制,同时考虑事务控制和异常处理。

2.3.1 批量存储策略
  1. 批次大小控制:根据数据量和数据库性能,设置合理的批次大小
  2. 分批提交:当数据量较大时,分多批次提交,避免事务过大
  3. 事务控制:同一批次内的操作保证事务一致性
  4. 批量插入优化:使用 MyBatis-Plus 的批量插入功能,减少 SQL 执行次数
2.3.2 批量存储实现
代码语言:javascript
复制
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;

import java.util.List;

/**
 * 消息存储服务
 *
 * @author ken
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageStorageService extends ServiceImpl<ThirdPartyMessageMapper, ThirdPartyMessage> {

    // 每批处理的最大数量
    private static final int BATCH_SIZE = 500;

    private final IdempotentService idempotentService;

    /**
     * 批量保存消息
     *
     * @param messages 消息列表
     * @return 保存成功的数量
     */
    @Transactional(rollbackFor = Exception.class)
    public int batchSaveMessages(List<ThirdPartyMessage> messages) {
        if (CollectionUtils.isEmpty(messages)) {
            log.info("没有需要保存的消息");
            return 0;
        }

        log.info("开始批量保存消息,总数量:{}", messages.size());

        // 检查并过滤重复消息
        List<ThirdPartyMessage> messagesToSave = idempotentService.checkAndMarkMessages(messages);
        if (CollectionUtils.isEmpty(messagesToSave)) {
            log.info("所有消息均已处理,无需保存");
            return 0;
        }

        log.info("过滤后需要保存的消息数量:{}", messagesToSave.size());

        // 分批保存
        List<List<ThirdPartyMessage>> batches = Lists.partition(messagesToSave, BATCH_SIZE);
        int totalSaved = 0;

        for (List<ThirdPartyMessage> batch : batches) {
            try {
                boolean success = saveBatch(batch);
                if (success) {
                    int batchSize = batch.size();
                    totalSaved += batchSize;
                    log.info("成功保存一批消息,数量:{}", batchSize);

                    // 更新幂等性记录状态为成功
                    batch.forEach(msg -> idempotentService.updateProcessStatus(msg.getMessageId(), 1));
                } else {
                    log.error("保存一批消息失败,数量:{}", batch.size());
                    // 更新幂等性记录状态为失败
                    batch.forEach(msg -> idempotentService.updateProcessStatus(msg.getMessageId(), 2));
                    // 抛出异常,触发事务回滚
                    throw new RuntimeException("批量保存消息失败");
                }
            } catch (Exception e) {
                log.error("保存消息批次时发生异常", e);
                // 更新幂等性记录状态为失败
                batch.forEach(msg -> idempotentService.updateProcessStatus(msg.getMessageId(), 2));
                throw e;
            }
        }

        log.info("批量保存消息完成,成功保存:{} 条", totalSaved);
        return totalSaved;
    }

    /**
     * 根据业务主键查询消息
     *
     * @param businessKey 业务主键
     * @return 消息对象,不存在则返回null
     */
    public ThirdPartyMessage getMessageByBusinessKey(String businessKey) {
        return getOne(Wrappers.<ThirdPartyMessage>lambdaQuery()
                .eq(ThirdPartyMessage::getBusinessKey, businessKey));
    }
}
代码语言:javascript
复制

三、接口设计与实现

3.1 第三方接口调用客户端

首先,我们需要设计一个客户端来调用第三方的分页接口:

代码语言:javascript
复制
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import org.springframework.util.StringUtils;

import java.util.Map;

/**
 * 第三方消息接口客户端
 *
 * @author ken
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class ThirdPartyMessageClient {

    private final RestTemplate restTemplate;

    // 第三方接口URL
    private static final String THIRD_PARTY_API_URL = "https://api.thirdparty.com/messages";

    /**
     * 调用第三方分页接口获取消息
     *
     * @param pageNum 页码
     * @param pageSize 每页条数
     * @param apiKey 接口访问密钥
     * @return 分页消息响应
     */
    public PageResponse<ThirdPartyMessage> getMessages(int pageNum, int pageSize, String apiKey) {
        if (pageNum < 1) {
            throw new IllegalArgumentException("页码必须大于等于1");
        }
        if (pageSize < 1) {
            throw new IllegalArgumentException("每页条数必须大于等于1");
        }
        if (!StringUtils.hasText(apiKey, "API密钥不能为空")) {
            throw new IllegalArgumentException("API密钥不能为空");
        }

        log.info("调用第三方消息接口,页码:{},每页条数:{}", pageNum, pageSize);

        // 构建请求头
        HttpHeaders headers = new HttpHeaders();
        headers.set("Authorization", "Bearer " + apiKey);
        headers.set("Content-Type", "application/json");

        // 构建请求参数
        String url = THIRD_PARTY_API_URL + "?pageNum=" + pageNum + "&pageSize=" + pageSize;

        try {
            // 发送请求
            HttpEntity<Void> requestEntity = new HttpEntity<>(headers);
            ResponseEntity<String> responseEntity = restTemplate.exchange(
                    url, HttpMethod.GET, requestEntity, String.class);

            // 处理响应
            if (responseEntity.is2xxSuccessful()) {
                String responseBody = responseEntity.getBody();
                if (StringUtils.hasText(responseBody)) {
                    PageResponse<ThirdPartyMessage> response = JSON.parseObject(
                            responseBody, new TypeReference<PageResponse<ThirdPartyMessage>>() {});
                    log.info("第三方消息接口调用成功,页码:{},返回记录数:{}", 
                            pageNum, response.getTotal());
                    return response;
                } else {
                    log.warn("第三方消息接口返回空响应,页码:{}", pageNum);
                    return new PageResponse<>();
                }
            } else {
                log.error("第三方消息接口调用失败,状态码:{},页码:{}", 
                        responseEntity.getStatusCodeValue(), pageNum);
                throw new RuntimeException(
                        "第三方接口调用失败,状态码:" + responseEntity.getStatusCodeValue());
            }
        } catch (Exception e) {
            log.error("调用第三方消息接口时发生异常,页码:{}", pageNum, e);
            throw new RuntimeException("调用第三方接口发生异常:" + e.getMessage(), e);
        }
    }
}
代码语言:javascript
复制

分页响应类:

代码语言:javascript
复制
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

import java.util.List;

/**
 * 分页响应对象
 *
 * @author ken
 * @param <T> 数据类型
 */
@Data
@Schema(description = "分页响应对象")
public class PageResponse<T> {

    @Schema(description = "总记录数")
    private long total;

    @Schema(description = "总页数")
    private int pages;

    @Schema(description = "当前页码")
    private int pageNum;

    @Schema(description = "每页条数")
    private int pageSize;

    @Schema(description = "当前页数据")
    private List<T> list;

    @Schema(description = "是否有下一页")
    private boolean hasNextPage;
}
代码语言:javascript
复制

第三方消息实体类:

代码语言:javascript
复制
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

import java.time.LocalDateTime;

/**
 * 第三方消息实体类
 *
 * @author ken
 */
@Data
@TableName("t_third_party_message")
@Schema(description = "第三方消息实体类")
public class ThirdPartyMessage {

    @TableId(type = IdType.AUTO)
    @Schema(description = "主键ID")
    private Long id;

    @Schema(description = "消息唯一标识")
    private String messageId;

    @Schema(description = "业务主键")
    private String businessKey;

    @Schema(description = "消息内容")
    private String content;

    @Schema(description = "消息类型")
    private String messageType;

    @Schema(description = "消息状态")
    private String status;

    @Schema(description = "发送时间")
    private LocalDateTime sendTime;

    @Schema(description = "接收时间")
    private LocalDateTime receiveTime;

    @Schema(description = "创建时间")
    private LocalDateTime createTime;

    @Schema(description = "更新时间")
    private LocalDateTime updateTime;
}
代码语言:javascript
复制

3.2 同步服务接口设计

同步服务是整个方案的核心,负责协调整个同步流程:

代码语言:javascript
复制
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.service.*;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * 消息同步服务接口
 *
 * @author ken
 */
@RestController
@Tag(name = "消息同步服务", description = "第三方消息同步相关接口")
@Service(name = "MessageSyncService", version = "1.0.0",
        description = "第三方消息同步服务,负责从第三方接口同步消息并存储到本地数据库")
public interface MessageSyncService {

    /**
     * 同步指定页码的消息
     *
     * @param taskName 任务名称
     * @param pageNum 页码
     * @param pageSize 每页条数
     * @return 同步结果
     */
    @Operation(summary = "同步指定页码的消息", 
            description = "从第三方接口同步指定页码的消息,并存储到本地数据库")
    @ApiResponses({
            @ApiResponse(responseCode = "200", description = "同步成功",
                    content = @Content(schema = @Schema(implementation = SyncResult.class))),
            @ApiResponse(responseCode = "500", description = "同步失败")
    })
    @PostMapping("/sync/page")
    SyncResult syncPage(
            @Parameter(description = "任务名称", required = true)
            @RequestParam String taskName,
            @Parameter(description = "页码", required = true)
            @RequestParam int pageNum,
            @Parameter(description = "每页条数")
            @RequestParam(required = false) Integer pageSize);

    /**
     * 同步所有消息
     *
     * @param taskName 任务名称
     * @return 同步结果
     */
    @Operation(summary = "同步所有消息", 
            description = "从第三方接口同步所有消息,从上次成功同步的页码开始")
    @ApiResponses({
            @ApiResponse(responseCode = "200", description = "同步成功",
                    content = @Content(schema = @Schema(implementation = SyncResult.class))),
            @ApiResponse(responseCode = "500", description = "同步失败")
    })
    @PostMapping("/sync/all")
    SyncResult syncAll(
            @Parameter(description = "任务名称", required = true)
            @RequestParam String taskName);

    /**
     * 重试失败的同步任务
     *
     * @param taskName 任务名称
     * @return 同步结果
     */
    @Operation(summary = "重试失败的同步任务", 
            description = "重试上次失败的同步任务")
    @ApiResponses({
            @ApiResponse(responseCode = "200", description = "重试成功",
                    content = @Content(schema = @Schema(implementation = SyncResult.class))),
            @ApiResponse(responseCode = "500", description = "重试失败")
    })
    @PostMapping("/sync/retry")
    SyncResult retryFailedSync(
            @Parameter(description = "任务名称", required = true)
            @RequestParam String taskName);
}
代码语言:javascript
复制

同步结果类:

代码语言:javascript
复制
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

/**
 * 同步结果
 *
 * @author ken
 */
@Data
@Schema(description = "同步结果")
public class SyncResult {

    @Schema(description = "是否成功")
    private boolean success;

    @Schema(description = "消息")
    private String message;

    @Schema(description = "同步的页码")
    private int pageNum;

    @Schema(description = "同步的记录数")
    private int recordCount;

    @Schema(description = "总记录数")
    private long totalCount;

    @Schema(description = "总页数")
    private int totalPages;

    @Schema(description = "耗时(毫秒)")
    private long costTime;

    /**
     * 创建成功的同步结果
     *
     * @param pageNum 页码
     * @param recordCount 记录数
     * @param totalCount 总记录数
     * @param totalPages 总页数
     * @param costTime 耗时
     * @return 同步结果
     */
    public static SyncResult success(int pageNum, int recordCount, long totalCount, int totalPages, long costTime) {
        SyncResult result = new SyncResult();
        result.setSuccess(true);
        result.setMessage("同步成功");
        result.setPageNum(pageNum);
        result.setRecordCount(recordCount);
        result.setTotalCount(totalCount);
        result.setTotalPages(totalPages);
        result.setCostTime(costTime);
        return result;
    }

    /**
     * 创建失败的同步结果
     *
     * @param message 失败消息
     * @return 同步结果
     */
    public static SyncResult failure(String message) {
        SyncResult result = new SyncResult();
        result.setSuccess(false);
        result.setMessage(message);
        return result;
    }
}
代码语言:javascript
复制

3.3 同步服务实现

代码语言:javascript
复制
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;

/**
 * 消息同步服务实现类
 *
 * @author ken
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageSyncServiceImpl extends ServiceImpl<SyncStatusMapper, SyncStatus> implements MessageSyncService {

    private final ThirdPartyMessageClient thirdPartyMessageClient;
    private final MessageStorageService messageStorageService;
    private final DynamicPageSizeAdjuster pageSizeAdjuster;

    @Value("${thirdparty.api.key}")
    private String apiKey;

    /**
     * 默认分页大小
     */
    private static final int DEFAULT_PAGE_SIZE = 100;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public SyncResult syncPage(String taskName, int pageNum, Integer pageSize) {
        long startTime = System.currentTimeMillis();
        log.info("开始同步任务:{},页码:{}", taskName, pageNum);

        try {
            // 参数处理
            if (!ObjectUtils.isEmpty(pageSize)) {
                pageSizeAdjuster.adjustPageSize();
            } else {
                pageSize = pageSizeAdjuster.getCurrentPageSize();
            }

            // 更新同步状态为同步中
            updateSyncStatus(taskName, 1, pageNum - 1, null);

            // 调用第三方接口获取消息
            PageResponse<ThirdPartyMessage> response = thirdPartyMessageClient.getMessages(
                    pageNum, pageSize, apiKey);

            // 保存消息
            int savedCount = 0;
            if (!ObjectUtils.isEmpty(response.getList())) {
                savedCount = messageStorageService.batchSaveMessages(response.getList());
            }

            // 更新同步状态为成功
            updateSyncStatus(taskName, 2, pageNum, null);

            long costTime = System.currentTimeMillis() - startTime;
            log.info("同步任务:{},页码:{} 完成,耗时:{}ms,同步记录数:{}",
                    taskName, pageNum, costTime, savedCount);

            return SyncResult.success(pageNum, savedCount, response.getTotal(), 
                    response.getPages(), costTime);
        } catch (Exception e) {
            long costTime = System.currentTimeMillis() - startTime;
            log.error("同步任务:{},页码:{} 失败", taskName, pageNum, e);

            // 更新同步状态为失败
            updateSyncStatus(taskName, 3, pageNum - 1, e.getMessage());

            return SyncResult.failure("同步失败:" + e.getMessage());
        }
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public SyncResult syncAll(String taskName) {
        long startTime = System.currentTimeMillis();
        log.info("开始全量同步任务:{}", taskName);

        try {
            // 获取上次同步状态
            SyncStatus syncStatus = getSyncStatus(taskName);
            int startPage = syncStatus.getLastSuccessPage() + 1;
            int pageSize = pageSizeAdjuster.adjustPageSize();

            // 更新同步状态为同步中
            updateSyncStatus(taskName, 1, syncStatus.getLastSuccessPage(), null);

            int totalSaved = 0;
            long totalCount = 0;
            int totalPages = 0;
            int currentPage = startPage;
            boolean hasNextPage = true;

            // 循环同步所有页
            while (hasNextPage) {
                log.info("全量同步任务:{},正在同步页码:{}", taskName, currentPage);

                // 调用第三方接口获取消息
                PageResponse<ThirdPartyMessage> response = thirdPartyMessageClient.getMessages(
                        currentPage, pageSize, apiKey);

                totalCount = response.getTotal();
                totalPages = response.getPages();
                hasNextPage = response.isHasNextPage();

                // 保存消息
                if (!ObjectUtils.isEmpty(response.getList())) {
                    int savedCount = messageStorageService.batchSaveMessages(response.getList());
                    totalSaved += savedCount;
                    log.info("全量同步任务:{},页码:{} 同步完成,本页保存:{} 条",
                            taskName, currentPage, savedCount);
                } else {
                    log.info("全量同步任务:{},页码:{} 没有需要同步的消息", taskName, currentPage);
                }

                // 更新当前成功页码
                updateSyncStatus(taskName, 1, currentPage, null);

                // 准备下一页
                currentPage++;

                // 动态调整分页大小
                pageSize = pageSizeAdjuster.adjustPageSize();
            }

            // 所有页同步完成,更新状态为成功
            updateSyncStatus(taskName, 2, totalPages, null);

            long costTime = System.currentTimeMillis() - startTime;
            log.info("全量同步任务:{} 完成,耗时:{}ms,总同步记录数:{}",
                    taskName, costTime, totalSaved);

            return SyncResult.success(totalPages, totalSaved, totalCount, totalPages, costTime);
        } catch (Exception e) {
            long costTime = System.currentTimeMillis() - startTime;
            log.error("全量同步任务:{} 失败", taskName, e);

            // 保持上次成功的页码,更新状态为失败
            SyncStatus syncStatus = getSyncStatus(taskName);
            updateSyncStatus(taskName, 3, syncStatus.getLastSuccessPage(), e.getMessage());

            return SyncResult.failure("全量同步失败:" + e.getMessage());
        }
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public SyncResult retryFailedSync(String taskName) {
        log.info("开始重试失败的同步任务:{}", taskName);

        // 获取上次同步状态
        SyncStatus syncStatus = getSyncStatus(taskName);

        // 如果上次同步不是失败状态,直接返回成功
        if (syncStatus.getSyncStatus() != 3) {
            log.info("同步任务:{} 上次同步状态不是失败,无需重试", taskName);
            return SyncResult.success(
                    syncStatus.getLastSuccessPage(), 0, 0, 0, 0);
        }

        // 从上次失败的页码开始重试全量同步
        return syncAll(taskName);
    }

    /**
     * 获取同步状态,如果不存在则创建
     *
     * @param taskName 任务名称
     * @return 同步状态
     */
    private SyncStatus getSyncStatus(String taskName) {
        LambdaQueryWrapper<SyncStatus> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(SyncStatus::getSyncTaskName, taskName);

        SyncStatus syncStatus = baseMapper.selectOne(queryWrapper);
        if (ObjectUtils.isEmpty(syncStatus)) {
            syncStatus = new SyncStatus();
            syncStatus.setSyncTaskName(taskName);
            syncStatus.setLastSuccessPage(0);
            syncStatus.setSyncStatus(0);
            baseMapper.insert(syncStatus);
            log.info("创建新的同步状态记录,任务名称:{}", taskName);
        }

        return syncStatus;
    }

    /**
     * 更新同步状态
     *
     * @param taskName 任务名称
     * @param status 状态:0-未开始,1-同步中,2-同步成功,3-同步失败
     * @param lastSuccessPage 上次成功页码
     * @param errorMessage 错误信息
     */
    private void updateSyncStatus(String taskName, int status, int lastSuccessPage, String errorMessage) {
        SyncStatus syncStatus = new SyncStatus();
        syncStatus.setSyncStatus(status);
        syncStatus.setLastSuccessPage(lastSuccessPage);
        syncStatus.setLastSyncTime(java.time.LocalDateTime.now());
        syncStatus.setErrorMessage(errorMessage);

        LambdaQueryWrapper<SyncStatus> updateWrapper = new LambdaQueryWrapper<>();
        updateWrapper.eq(SyncStatus::getSyncTaskName, taskName);

        baseMapper.update(syncStatus, updateWrapper);
        log.info("更新同步状态,任务名称:{},状态:{},上次成功页码:{}",
                taskName, status, lastSuccessPage);
    }
}
代码语言:javascript
复制

3.4 XXL-Job 任务实现

虽然 XXL-Job 已经搭建完成,但我们需要实现具体的任务处理器:

代码语言:javascript
复制
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 消息同步XXL-Job任务处理器
 *
 * @author ken
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageSyncXxlJobHandler {

    private final MessageSyncService messageSyncService;

    /**
     * 全量同步消息任务
     * 执行参数格式:taskName=xxx
     */
    @XxlJob("fullMessageSyncJob")
    public void fullMessageSyncJob() {
        log.info("开始执行全量同步消息任务");

        try {
            // 获取任务参数
            String param = XxlJobHelper.getJobParam();
            if (!org.springframework.util.StringUtils.hasText(param)) {
                throw new IllegalArgumentException("任务参数不能为空,格式:taskName=xxx");
            }

            // 解析参数
            String[] paramParts = param.split("=");
            if (paramParts.length != 2 || !"taskName".equals(paramParts[0])) {
                throw new IllegalArgumentException("参数格式错误,正确格式:taskName=xxx");
            }
            String taskName = paramParts[1];

            // 执行同步
            SyncResult result = messageSyncService.syncAll(taskName);

            if (result.isSuccess()) {
                XxlJobHelper.handleSuccess(
                        String.format("全量同步任务完成,任务名称:%s,总同步记录数:%d,耗时:%dms",
                                taskName, result.getRecordCount(), result.getCostTime()));
            } else {
                XxlJobHelper.handleFail(
                        String.format("全量同步任务失败,任务名称:%s,原因:%s",
                                taskName, result.getMessage()));
            }
        } catch (Exception e) {
            log.error("全量同步消息任务执行异常", e);
            XxlJobHelper.handleFail("全量同步任务执行异常:" + e.getMessage());
        }
    }

    /**
     * 分页同步消息任务
     * 执行参数格式:taskName=xxx&pageNum=1&pageSize=100
     */
    @XxlJob("pageMessageSyncJob")
    public void pageMessageSyncJob() {
        log.info("开始执行分页同步消息任务");

        try {
            // 获取任务参数
            String param = XxlJobHelper.getJobParam();
            if (!org.springframework.util.StringUtils.hasText(param)) {
                throw new IllegalArgumentException("任务参数不能为空,格式:taskName=xxx&pageNum=1&pageSize=100");
            }

            // 解析参数
            String[] paramParts = param.split("&");
            String taskName = null;
            Integer pageNum = null;
            Integer pageSize = null;

            for (String part : paramParts) {
                String[] keyValue = part.split("=");
                if (keyValue.length != 2) {
                    continue;
                }

                switch (keyValue[0]) {
                    case "taskName":
                        taskName = keyValue[1];
                        break;
                    case "pageNum":
                        try {
                            pageNum = Integer.parseInt(keyValue[1]);
                        } catch (NumberFormatException e) {
                            throw new IllegalArgumentException("pageNum必须是整数");
                        }
                        break;
                    case "pageSize":
                        try {
                            pageSize = Integer.parseInt(keyValue[1]);
                        } catch (NumberFormatException e) {
                            throw new IllegalArgumentException("pageSize必须是整数");
                        }
                        break;
                    default:
                        break;
                }
            }

            // 参数校验
            if (!org.springframework.util.StringUtils.hasText(taskName)) {
                throw new IllegalArgumentException("taskName不能为空");
            }
            if (pageNum == null || pageNum < 1) {
                throw new IllegalArgumentException("pageNum必须大于等于1");
            }

            // 执行同步
            SyncResult result = messageSyncService.syncPage(taskName, pageNum, pageSize);

            if (result.isSuccess()) {
                XxlJobHelper.handleSuccess(
                        String.format("分页同步任务完成,任务名称:%s,页码:%d,同步记录数:%d,耗时:%dms",
                                taskName, pageNum, result.getRecordCount(), result.getCostTime()));
            } else {
                XxlJobHelper.handleFail(
                        String.format("分页同步任务失败,任务名称:%s,页码:%d,原因:%s",
                                taskName, pageNum, result.getMessage()));
            }
        } catch (Exception e) {
            log.error("分页同步消息任务执行异常", e);
            XxlJobHelper.handleFail("分页同步任务执行异常:" + e.getMessage());
        }
    }

    /**
     * 重试失败的同步任务
     * 执行参数格式:taskName=xxx
     */
    @XxlJob("retryFailedSyncJob")
    public void retryFailedSyncJob() {
        log.info("开始执行重试失败的同步任务");

        try {
            // 获取任务参数
            String param = XxlJobHelper.getJobParam();
            if (!org.springframework.util.StringUtils.hasText(param)) {
                throw new IllegalArgumentException("任务参数不能为空,格式:taskName=xxx");
            }

            // 解析参数
            String[] paramParts = param.split("=");
            if (paramParts.length != 2 || !"taskName".equals(paramParts[0])) {
                throw new IllegalArgumentException("参数格式错误,正确格式:taskName=xxx");
            }
            String taskName = paramParts[1];

            // 执行重试
            SyncResult result = messageSyncService.retryFailedSync(taskName);

            if (result.isSuccess()) {
                XxlJobHelper.handleSuccess(
                        String.format("重试同步任务完成,任务名称:%s,同步记录数:%d,耗时:%dms",
                                taskName, result.getRecordCount(), result.getCostTime()));
            } else {
                XxlJobHelper.handleFail(
                        String.format("重试同步任务失败,任务名称:%s,原因:%s",
                                taskName, result.getMessage()));
            }
        } catch (Exception e) {
            log.error("重试同步任务执行异常", e);
            XxlJobHelper.handleFail("重试同步任务执行异常:" + e.getMessage());
        }
    }
}
代码语言:javascript
复制

四、异常处理与监控

4.1 异常处理机制

为了提高系统的健壮性,我们需要设计完善的异常处理机制:

代码语言:javascript
复制
import lombok.Getter;

/**
 * 同步服务异常类
 *
 * @author ken
 */
@Getter
public class SyncServiceException extends RuntimeException {

    /**
     * 错误代码
     */
    private final String errorCode;

    /**
     * 构造函数
     *
     * @param errorCode 错误代码
     * @param message 错误消息
     */
    public SyncServiceException(String errorCode, String message) {
        super(message);
        this.errorCode = errorCode;
    }

    /**
     * 构造函数
     *
     * @param errorCode 错误代码
     * @param message 错误消息
     * @param cause 异常原因
     */
    public SyncServiceException(String errorCode, String message, Throwable cause) {
        super(message, cause);
        this.errorCode = errorCode;
    }

    // 常用错误代码
    public static final String THIRD_PARTY_API_ERROR = "THIRD_PARTY_API_ERROR";
    public static final String DATA_STORAGE_ERROR = "DATA_STORAGE_ERROR";
    public static final String PARAMETER_ERROR = "PARAMETER_ERROR";
    public static final String SYSTEM_ERROR = "SYSTEM_ERROR";
}
代码语言:javascript
复制

全局异常处理器:

代码语言:javascript
复制
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;

/**
 * 全局异常处理器
 *
 * @author ken
 */
@Slf4j
@RestControllerAdvice
public class GlobalExceptionHandler {

    /**
     * 处理同步服务异常
     *
     * @param e 同步服务异常
     * @return 错误响应
     */
    @ExceptionHandler(SyncServiceException.class)
    public ResponseEntity<ErrorResponse> handleSyncServiceException(SyncServiceException e) {
        log.error("同步服务异常:{},错误代码:{}", e.getMessage(), e.getErrorCode(), e);

        ErrorResponse errorResponse = new ErrorResponse();
        errorResponse.setErrorCode(e.getErrorCode());
        errorResponse.setMessage(e.getMessage());

        return new ResponseEntity<>(errorResponse, HttpStatus.INTERNAL_SERVER_ERROR);
    }

    /**
     * 处理非法参数异常
     *
     * @param e 非法参数异常
     * @return 错误响应
     */
    @ExceptionHandler(IllegalArgumentException.class)
    public ResponseEntity<ErrorResponse> handleIllegalArgumentException(IllegalArgumentException e) {
        log.error("非法参数异常:{}", e.getMessage(), e);

        ErrorResponse errorResponse = new ErrorResponse();
        errorResponse.setErrorCode(SyncServiceException.PARAMETER_ERROR);
        errorResponse.setMessage(e.getMessage());

        return new ResponseEntity<>(errorResponse, HttpStatus.BAD_REQUEST);
    }

    /**
     * 处理通用异常
     *
     * @param e 异常
     * @return 错误响应
     */
    @ExceptionHandler(Exception.class)
    public ResponseEntity<ErrorResponse> handleGeneralException(Exception e) {
        log.error("系统异常:{}", e.getMessage(), e);

        ErrorResponse errorResponse = new ErrorResponse();
        errorResponse.setErrorCode(SyncServiceException.SYSTEM_ERROR);
        errorResponse.setMessage("系统异常,请联系管理员");

        return new ResponseEntity<>(errorResponse, HttpStatus.INTERNAL_SERVER_ERROR);
    }
}
代码语言:javascript
复制

错误响应类:

代码语言:javascript
复制
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

/**
 * 错误响应
 *
 * @author ken
 */
@Data
@Schema(description = "错误响应")
public class ErrorResponse {

    @Schema(description = "错误代码")
    private String errorCode;

    @Schema(description = "错误消息")
    private String message;
}
代码语言:javascript
复制

4.2 监控告警设计

为了及时发现和解决问题,我们需要设计监控告警机制:

代码语言:javascript
复制
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 同步服务监控指标
 *
 * @author ken
 */
@Component
@RequiredArgsConstructor
public class SyncMetrics {

    private final MeterRegistry meterRegistry;

    // 同步成功计数器
    private final Counter syncSuccessCounter;
    // 同步失败计数器
    private final Counter syncFailureCounter;
    // 同步记录计数器
    private final Counter syncRecordCounter;
    // 当前同步页码 gauge
    private final AtomicInteger currentPageGauge = new AtomicInteger(0);
    // 同步耗时计时器
    private final Timer syncTimer;

    /**
     * 构造函数,初始化监控指标
     *
     * @param meterRegistry 指标注册器
     */
    public SyncMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;

        // 初始化计数器
        this.syncSuccessCounter = meterRegistry.counter("sync.success.count");
        this.syncFailureCounter = meterRegistry.counter("sync.failure.count");
        this.syncRecordCounter = meterRegistry.counter("sync.record.count");

        // 初始化当前同步页码 gauge
        Gauge.builder("sync.current.page", currentPageGauge, AtomicInteger::get)
                .description("当前同步的页码")
                .register(meterRegistry);

        // 初始化同步耗时计时器
        this.syncTimer = Timer.builder("sync.process.time")
                .description("同步处理耗时")
                .register(meterRegistry);
    }

    /**
     * 记录同步成功
     */
    public void recordSuccess() {
        syncSuccessCounter.increment();
    }

    /**
     * 记录同步失败
     */
    public void recordFailure() {
        syncFailureCounter.increment();
    }

    /**
     * 记录同步的记录数
     *
     * @param count 记录数
     */
    public void recordRecords(int count) {
        syncRecordCounter.increment(count);
    }

    /**
     * 更新当前同步页码
     *
     * @param pageNum 页码
     */
    public void updateCurrentPage(int pageNum) {
        currentPageGauge.set(pageNum);
    }

    /**
     * 开始计时
     *
     * @return 计时器样本
     */
    public Timer.Sample startTimer() {
        return Timer.start(meterRegistry);
    }
}
代码语言:javascript
复制

五、pom.xml 依赖配置

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.0</version>
        <relativePath/>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>message-sync-service</artifactId>
    <version>1.0.0</version>
    <name>message-sync-service</name>
    <description>Message Sync Service with XXL-Job</description>

    <properties>
        <java.version>17</java.version>
        <mybatis-plus.version>3.5.5</mybatis-plus.version>
        <xxl-job.version>2.4.0</xxl-job.version>
        <fastjson2.version>2.0.32</fastjson2.version>
        <lombok.version>1.18.30</lombok.version>
        <guava.version>32.1.3-jre</guava.version>
        <springdoc.version>2.1.0</springdoc.version>
        <micrometer.version>1.12.0</micrometer.version>
    </properties>

    <dependencies>
        <!-- Spring Boot Starters -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <!-- Database -->
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!-- MyBatis-Plus -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>

        <!-- XXL-Job -->
        <dependency>
            <groupId>com.xuxueli</groupId>
            <artifactId>xxl-job-core</artifactId>
            <version>${xxl-job.version}</version>
        </dependency>

        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- FastJSON2 -->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>${fastjson2.version}</version>
        </dependency>

        <!-- Guava -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>${guava.version}</version>
        </dependency>

        <!-- Swagger3 -->
        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
            <version>${springdoc.version}</version>
        </dependency>

        <!-- Micrometer for metrics -->
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-core</artifactId>
            <version>${micrometer.version}</version>
        </dependency>

        <!-- Testing -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>
代码语言:javascript
复制

六、总结与最佳实践

6.1 方案总结

本文详细介绍了基于 XXL-Job 调用第三方分页消息接口的同步方案设计,主要包含以下几个核心部分:

  1. 幂等性保障:通过消息唯一标识和业务主键双重机制,确保消息不会被重复处理
  2. 分页处理:设计灵活的分页参数,支持动态调整每页条数,平衡效率和系统负载
  3. 批量存储:实现高效的批量数据存储,减少数据库交互次数,提高性能
  4. 状态记录:通过同步状态表记录每次同步的位置,支持断点续传
  5. 异常处理:完善的异常处理机制,确保系统稳定运行
  6. 监控告警:设计监控指标,及时发现和解决问题

6.2 最佳实践

  1. 合理设置分页大小:根据第三方接口性能和本地系统处理能力,设置合适的初始分页大小,并通过动态调整机制优化
  2. 批量存储批次控制:根据数据库性能,设置合理的批量存储批次大小,一般建议 500-1000 条 / 批
  3. 幂等性设计:始终坚持幂等性设计原则,特别是在分布式系统中
  4. 异常重试策略:实现分级重试策略,对不同类型的异常采用不同的重试机制
  5. 监控指标:关键指标必须监控,如同步成功率、同步延迟、系统负载等
  6. 日志记录:详细记录同步过程中的关键节点和异常信息,便于问题排查
  7. 定时任务频率:根据数据更新频率和接口性能,设置合理的任务执行频率

通过本文介绍的方案,你可以构建一个高可靠、高性能的第三方接口同步系统,有效解决数据同步过程中的各种挑战。在实际应用中,还需要根据具体业务场景进行适当调整和优化,以达到最佳效果。

七、常见问题与解决方案

  1. Q: 如何处理第三方接口返回数据不完整的情况?A: 实现数据校验机制,对关键字段进行校验,发现不完整数据时记录日志并跳过,避免影响整体同步流程。
  2. Q: 当第三方接口出现长时间不可用时,系统如何处理?A: 实现熔断机制,当接口多次调用失败后,暂时停止调用并触发告警,避免无效重试消耗系统资源。
  3. Q: 如何处理大数据量同步的性能问题?A: 可以采用分片同步策略,将数据按时间或业务维度分片,并行同步不同分片的数据,提高同步效率。
  4. Q: 同步过程中数据库出现故障如何处理?A: 实现数据库故障检测和自动切换机制,同时记录同步状态,确保数据库恢复后可以从断点继续同步。
  5. Q: 如何避免同步任务对业务系统造成性能影响?A: 可以在非业务高峰期执行同步任务,或限制同步任务的资源使用,如设置线程池大小、数据库连接数等。
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-10-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 果酱带你啃java 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言:为什么需要专门设计同步方案?
  • 一、整体方案架构设计
    • 1.1 系统架构图
    • 1.2 核心流程
  • 二、核心问题分析与解决方案设计
    • 2.1 消息幂等性设计
      • 2.1.1 幂等性实现方案
      • 2.1.2 幂等性表设计
      • 2.1.3 幂等性检查实现
    • 2.2 分页参数与数据量控制
      • 2.2.1 分页参数设计
      • 2.2.2 同步状态记录设计
      • 2.2.3 数据量动态调整策略
    • 2.3 批量存储设计
      • 2.3.1 批量存储策略
      • 2.3.2 批量存储实现
  • 三、接口设计与实现
    • 3.1 第三方接口调用客户端
    • 3.2 同步服务接口设计
    • 3.3 同步服务实现
    • 3.4 XXL-Job 任务实现
  • 四、异常处理与监控
    • 4.1 异常处理机制
    • 4.2 监控告警设计
  • 五、pom.xml 依赖配置
  • 六、总结与最佳实践
    • 6.1 方案总结
    • 6.2 最佳实践
  • 七、常见问题与解决方案
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档