首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >攻克 大 Excel 上传难题:从异步处理到并发去重的全链路解决方案

攻克 大 Excel 上传难题:从异步处理到并发去重的全链路解决方案

作者头像
果酱带你啃java
发布2026-04-14 12:53:09
发布2026-04-14 12:53:09
290
举报

在企业级应用开发中,Excel 批量导入是一个高频需求。当面对几百兆甚至更大的 Excel 文件时,传统的同步处理方式往往会导致接口超时、内存溢出,甚至整个应用崩溃。更复杂的是,还需要处理数据校验、重复数据检测、多用户并发上传等问题,最终还要生成一份详细的导入结果供用户核对。

本文将从实际业务场景出发,构建一套完整的大 Excel 文件上传处理方案,涵盖从前端分片上传到后端异步处理、数据校验、并发去重、结果生成的全流程,并提供可直接运行的代码实现。

一、需求分析与架构设计

1.1 核心业务需求

  • 支持上传几百兆的大型 Excel 文件
  • 异步处理导入过程,不阻塞用户操作
  • 对每条数据进行规则校验
  • 检查数据是否已存在于数据库,避免重复导入
  • 处理多用户并发上传的情况
  • 生成包含每条数据导入结果(成功 / 失败及原因)的 Excel 文件
  • 提供导入进度查询功能

1.2 技术挑战

  • 大文件上传导致的内存溢出问题
  • 长时间处理导致的接口超时问题
  • 大量数据校验的性能瓶颈
  • 并发场景下的数据一致性问题
  • 重复数据的高效检测
  • 导入结果的精准记录与反馈

1.3 整体架构设计

针对上述需求和挑战,我们设计如下架构:

架构说明

  1. 前端分片上传将大文件拆分为小分片上传,避免单次请求过大
  2. 后端 API 服务接收文件分片、合并文件、创建导入任务
  3. 文件存储服务存储原始 Excel 文件和生成的结果文件
  4. 消息队列解耦 API 服务和异步处理服务,实现削峰填谷
  5. 异步处理服务负责 Excel 解析、数据校验、去重、导入和结果生成
  6. 校验规则引擎集中处理数据校验逻辑
  7. 数据库存储业务数据、导入任务状态和结果信息

二、技术选型与环境配置

2.1 核心技术栈

  • 后端框架Spring Boot 3.2.0
  • 文件处理Alibaba EasyExcel 3.3.0
  • 消息队列RabbitMQ 3.13.0
  • 数据库MySQL 8.0.35
  • ORM 框架MyBatis-Plus 3.5.5
  • 缓存Redis 7.2.3(用于分布式锁和临时存储)
  • API 文档SpringDoc OpenAPI 2.2.0(Swagger 3)
  • 工具类Lombok 1.18.30、Fastjson2 2.0.32、Guava 32.1.3-jre
  • 前端Vue 3 + Element Plus(本文不展开前端实现细节)

2.2 Maven 依赖配置

代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>large-excel-import</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>large-excel-import</name>
<description>Large Excel Import Solution</description>

<properties>
<java.version>17</java.version>
<easyexcel.version>3.3.0</easyexcel.version>
<mybatis-plus.version>3.5.5</mybatis-plus.version>
<springdoc.version>2.2.0</springdoc.version>
<fastjson2.version>2.0.32</fastjson2.version>
<guava.version>32.1.3-jre</guava.version>
<lombok.version>1.18.30</lombok.version>
</properties>

<dependencies>
<!-- Spring Boot Core -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>

<!-- Database -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>

<!-- MyBatis-Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-extension</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>

<!-- Excel Processing -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>${easyexcel.version}</version>
</dependency>

<!-- API Documentation -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>

<!-- Utilities -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
代码语言:javascript
复制

2.3 核心配置文件

代码语言:javascript
复制
# application.yml
spring:
  profiles:
    active: dev
  servlet:
    multipart:
      max-file-size: 10MB
      max-request-size: 10MB

# 日志配置
logging:
  level:
    root: INFO
    com.example: DEBUG
    com.alibaba.easyexcel: WARN
  pattern:
    console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"
代码语言:javascript
复制

代码语言:javascript
复制
# application-dev.yml
spring:
datasource:
url: jdbc:mysql://localhost:3306/excel_import?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver

redis:
host: localhost
port: 6379
password:
database: 0
timeout: 3000ms
lettuce:
pool:
max-active: 16
max-idle: 8
min-idle: 4

rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
concurrency: 2
max-concurrency: 8
prefetch: 1
template:
retry:
enabled: true
initial-interval: 1000ms
max-attempts: 3
max-interval: 3000ms

# 文件存储配置
file:
storage:
path: ./uploadFiles
temp-path: ./tempFiles
result-path: ./resultFiles
max-size: 524288000 # 500MB

# MyBatis-Plus配置
mybatis-plus:
mapper-locations: classpath*:mapper/**/*.xml
type-aliases-package: com.example.excelimport.entity
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl
global-config:
db-config:
id-type: ASSIGN_ID
logic-delete-field: deleted
logic-delete-value: 1
logic-not-delete-value: 0

# 异步任务配置
async:
executor:
core-pool-size: 4
max-pool-size: 16
queue-capacity: 100
keep-alive-seconds: 60
thread-name-prefix: ExcelImport-

# Swagger配置
springdoc:
api-docs:
path: /api-docs
swagger-ui:
path: /swagger-ui.html
operationsSorter: method
packages-to-scan: com.example.excelimport.controller
代码语言:javascript
复制

三、数据库设计

3.1 核心表结构

3.1.1 导入任务表(import_task)
代码语言:javascript
复制
CREATETABLE `import_task` (
  `id` bigintNOTNULL COMMENT '主键ID',
  `task_no` varchar(64) NOTNULL COMMENT '任务编号',
  `file_name` varchar(255) NOTNULL COMMENT '文件名',
  `file_path` varchar(512) NOTNULL COMMENT '文件路径',
  `result_file_path` varchar(512) DEFAULTNULL COMMENT '结果文件路径',
  `file_size` bigintNOTNULL COMMENT '文件大小(字节)',
  `total_rows` intDEFAULT0 COMMENT '总记录数',
  `success_rows` intDEFAULT0 COMMENT '成功记录数',
  `fail_rows` intDEFAULT0 COMMENT '失败记录数',
  `status` tinyint NOTNULL COMMENT '状态:0-初始化,1-处理中,2-完成,3-失败',
  `progress` intNOTNULLDEFAULT0 COMMENT '进度(%)',
  `user_id` bigintNOTNULL COMMENT '操作人ID',
  `user_name` varchar(64) NOTNULL COMMENT '操作人姓名',
  `error_msg` varchar(1024) DEFAULTNULL COMMENT '错误信息',
  `create_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_task_no` (`task_no`),
  KEY `idx_user_id` (`user_id`),
  KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='导入任务表';
3.1.2 业务数据表(business_data)

以一个商品信息表为例,实际业务中根据需求调整:

代码语言:javascript
复制
CREATETABLE `business_data` (
  `id` bigintNOTNULL COMMENT '主键ID',
  `product_code` varchar(64) NOTNULL COMMENT '商品编码',
  `product_name` varchar(255) NOTNULL COMMENT '商品名称',
  `category_id` bigintNOTNULL COMMENT '分类ID',
  `price` decimal(10,2) NOTNULL COMMENT '价格',
  `stock` intNOTNULL COMMENT '库存',
  `status` tinyint NOTNULLDEFAULT1 COMMENT '状态:0-禁用,1-启用',
  `create_user` bigintNOTNULL COMMENT '创建人',
  `create_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMP COMMENT '创建时间',
  `update_user` bigintDEFAULTNULL COMMENT '更新人',
  `update_time` datetime DEFAULTNULLONUPDATECURRENT_TIMESTAMP COMMENT '更新时间',
  `deleted` tinyint NOTNULLDEFAULT0 COMMENT '删除标识:0-未删,1-已删',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_product_code` (`product_code`) COMMENT '商品编码唯一',
  KEY `idx_category_id` (`category_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
代码语言:javascript
复制

3.1.3 导入日志表(可选,用于详细追踪)
代码语言:javascript
复制
CREATETABLE `import_log` (
  `id` bigintNOTNULL COMMENT '主键ID',
  `task_id` bigintNOTNULL COMMENT '任务ID',
  `row_num` intNOTNULL COMMENT '行号',
  `data_content` text COMMENT '数据内容',
  `success` tinyint NOTNULL COMMENT '是否成功:0-失败,1-成功',
  `message` varchar(1024) DEFAULTNULL COMMENT '提示信息',
  `create_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
  KEY `idx_task_id` (`task_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='导入日志表';
代码语言:javascript
复制

3.2 MyBatis-Plus 实体类

3.2.1 导入任务实体
代码语言:javascript
复制
package com.example.excelimport.entity;

import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;
/**
 * 导入任务实体类
 *
 * @author ken
 */
@Data
@TableName("import_task")
publicclassImportTask {
/**
     * 主键ID
     */
@TableId(type = IdType.ASSIGN_ID)
privateLong id;
/**
     * 任务编号
     */
@TableField("task_no")
private String taskNo;
/**
     * 文件名
     */
@TableField("file_name")
private String fileName;
/**
     * 文件路径
     */
@TableField("file_path")
private String filePath;
/**
     * 结果文件路径
     */
@TableField("result_file_path")
private String resultFilePath;
/**
     * 文件大小(字节)
     */
@TableField("file_size")
privateLong fileSize;
/**
     * 总记录数
     */
@TableField("total_rows")
private Integer totalRows;
/**
     * 成功记录数
     */
@TableField("success_rows")
private Integer successRows;
/**
     * 失败记录数
     */
@TableField("fail_rows")
private Integer failRows;
/**
     * 状态:0-初始化,1-处理中,2-完成,3-失败
     */
@TableField("status")
private Integer status;
/**
     * 进度(%)
     */
@TableField("progress")
private Integer progress;
/**
     * 操作人ID
     */
@TableField("user_id")
privateLong userId;
/**
     * 操作人姓名
     */
@TableField("user_name")
private String userName;
/**
     * 错误信息
     */
@TableField("error_msg")
private String errorMsg;
/**
     * 创建时间
     */
@TableField(value = "create_time", fill = FieldFill.INSERT)
private LocalDateTime createTime;
/**
     * 更新时间
     */
@TableField(value = "update_time", fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updateTime;
}
代码语言:javascript
复制

3.2.2 商品信息实体
代码语言:javascript
复制
package com.example.excelimport.entity;

import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;

import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
 * 商品信息实体类
 *
 * @author ken
 */
@Data
@TableName("business_data")
publicclassBusinessData {

/**
     * 主键ID
     */
@TableId(type = IdType.ASSIGN_ID)
privateLong id;

/**
     * 商品编码
     */
@TableField("product_code")
private String productCode;

/**
     * 商品名称
     */
@TableField("product_name")
private String productName;

/**
     * 分类ID
     */
@TableField("category_id")
privateLong categoryId;

/**
     * 价格
     */
@TableField("price")
private BigDecimal price;

/**
     * 库存
     */
@TableField("stock")
private Integer stock;

/**
     * 状态:0-禁用,1-启用
     */
@TableField("status")
private Integer status;

/**
     * 创建人
     */
@TableField(value = "create_user", fill = FieldFill.INSERT)
privateLong createUser;

/**
     * 创建时间
     */
@TableField(value = "create_time", fill = FieldFill.INSERT)
private LocalDateTime createTime;

/**
     * 更新人
     */
@TableField(value = "update_user", fill = FieldFill.UPDATE)
privateLong updateUser;

/**
     * 更新时间
     */
@TableField(value = "update_time", fill = FieldFill.UPDATE)
private LocalDateTime updateTime;

/**
     * 删除标识:0-未删,1-已删
     */
@TableField("deleted")
@TableLogic
private Integer deleted;
}
代码语言:javascript
复制

3.2.3 Mapper 接口
代码语言:javascript
复制
package com.example.excelimport.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.excelimport.entity.ImportTask;
import org.apache.ibatis.annotations.Mapper;

/**
 * 导入任务Mapper
 *
 * @author ken
 */
@Mapper
publicinterfaceImportTaskMapperextendsBaseMapper<ImportTask> {
}
代码语言:javascript
复制

四、文件上传与任务管理

4.1 分片上传核心组件

4.1.1 文件分片 DTO
代码语言:javascript
复制
package com.example.excelimport.dto;

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import org.springframework.web.multipart.MultipartFile;

/**
 * 文件分片上传DTO
 *
 * @author ken
 */
@Data
@Schema(description = "文件分片上传参数")
publicclassFileChunkDTO {

/**
     * 文件名
     */
@Schema(description = "文件名", requiredMode = Schema.RequiredMode.REQUIRED)
private String fileName;

/**
     * 文件唯一标识
     */
@Schema(description = "文件唯一标识", requiredMode = Schema.RequiredMode.REQUIRED)
private String fileId;

/**
     * 分片索引
     */
@Schema(description = "分片索引,从0开始", requiredMode = Schema.RequiredMode.REQUIRED)
private Integer chunkIndex;

/**
     * 总分片数
     */
@Schema(description = "总分片数", requiredMode = Schema.RequiredMode.REQUIRED)
private Integer totalChunks;

/**
     * 分片大小(字节)
     */
@Schema(description = "分片大小(字节)", requiredMode = Schema.RequiredMode.REQUIRED)
privateLong chunkSize;

/**
     * 文件总大小(字节)
     */
@Schema(description = "文件总大小(字节)", requiredMode = Schema.RequiredMode.REQUIRED)
privateLong totalSize;

/**
     * 文件分片数据
     */
@Schema(description = "文件分片数据", requiredMode = Schema.RequiredMode.REQUIRED)
private MultipartFile chunkFile;
}
代码语言:javascript
复制

4.1.2 文件上传服务接口
代码语言:javascript
复制
package com.example.excelimport.service;

import com.example.excelimport.dto.FileChunkDTO;
import com.example.excelimport.vo.FileUploadResultVO;

/**
 * 文件上传服务
 *
 * @authorken
 */
publicinterfaceFileUploadService {

/**
     * 上传文件分片
     *
     * @param chunkDTO 分片上传参数
     * @param userId 用户ID
     * @return 上传结果
     */
FileUploadResultVOuploadChunk(FileChunkDTO chunkDTO, Long userId);

/**
     * 合并文件分片
     *
     * @param fileId 文件唯一标识
     * @param fileName 文件名
     * @param totalChunks 总分片数
     * @param userId 用户ID
     * @return 合并结果,包含完整文件路径
     */
StringmergeChunks(String fileId, String fileName, Integer totalChunks, Long userId);

/**
     * 检查分片是否已上传
     *
     * @param fileId 文件唯一标识
     * @param chunkIndex 分片索引
     * @returntrue-已上传,false-未上传
     */
booleancheckChunkExists(String fileId, Integer chunkIndex);
}
代码语言:javascript
复制

4.1.3 文件上传服务实现
代码语言:javascript
复制
package com.example.excelimport.service.impl;

import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.IdUtil;
import com.example.excelimport.dto.FileChunkDTO;
import com.example.excelimport.service.FileUploadService;
import com.example.excelimport.vo.FileUploadResultVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
 * 文件上传服务实现
 *
 * @author ken
 */
@Slf4j
@Service
publicclassFileUploadServiceImplimplementsFileUploadService {
/**
     * 临时文件存储路径
     */
@Value("${file.storage.temp-path}")
private String tempPath;
/**
     * 正式文件存储路径
     */
@Value("${file.storage.path}")
private String storagePath;
/**
     * 最大文件大小
     */
@Value("${file.storage.max-size}")
private Long maxFileSize;
/**
     * 日期格式化器
     */
privatestaticfinalDateTimeFormatterDATE_FORMATTER= DateTimeFormatter.ofPattern("yyyyMMdd");
@Override
public FileUploadResultVO uploadChunk(FileChunkDTO chunkDTO, Long userId) {
// 参数校验
        StringUtils.hasText(chunkDTO.getFileId(), "文件标识不能为空");
        StringUtils.hasText(chunkDTO.getFileName(), "文件名不能为空");
if (chunkDTO.getChunkIndex() == null || chunkDTO.getChunkIndex() < 0) {
thrownewIllegalArgumentException("分片索引必须大于等于0");
        }
if (chunkDTO.getTotalChunks() == null || chunkDTO.getTotalChunks() <= 0) {
thrownewIllegalArgumentException("总分片数必须大于0");
        }
if (chunkDTO.getChunkFile() == null || chunkDTO.getChunkFile().isEmpty()) {
thrownewIllegalArgumentException("分片文件不能为空");
        }
if (chunkDTO.getTotalSize() == null || chunkDTO.getTotalSize() <= 0) {
thrownewIllegalArgumentException("文件总大小必须大于0");
        }
if (chunkDTO.getTotalSize() > maxFileSize) {
thrownewIllegalArgumentException("文件大小超过限制,最大支持" + maxFileSize / (1024 * 1024) + "MB");
        }
// 创建临时目录
StringchunkDir= getChunkDir(chunkDTO.getFileId());
FiledirFile=newFile(chunkDir);
if (!dirFile.exists() && !dirFile.mkdirs()) {
            log.error("创建分片目录失败: {}", chunkDir);
thrownewRuntimeException("上传失败,无法创建临时目录");
        }
// 保存分片文件
StringchunkFileName= chunkDTO.getChunkIndex().toString();
FilechunkFile=newFile(chunkDir, chunkFileName);
try {
            chunkDTO.getChunkFile().transferTo(chunkFile);
            log.info("分片上传成功,fileId: {}, chunkIndex: {}, userId: {}",
                    chunkDTO.getFileId(), chunkDTO.getChunkIndex(), userId);
        } catch (IOException e) {
            log.error("分片上传失败,fileId: {}, chunkIndex: {}", chunkDTO.getFileId(), chunkDTO.getChunkIndex(), e);
// 失败时删除可能的残留文件
if (chunkFile.exists() && !chunkFile.delete()) {
                log.warn("删除失败的分片文件失败: {}", chunkFile.getAbsolutePath());
            }
thrownewRuntimeException("上传失败: " + e.getMessage());
        }
// 检查是否所有分片都已上传
booleanallUploaded= checkAllChunksUploaded(chunkDTO.getFileId(), chunkDTO.getTotalChunks());
FileUploadResultVOresult=newFileUploadResultVO();
        result.setFileId(chunkDTO.getFileId());
        result.setFileName(chunkDTO.getFileName());
        result.setChunkIndex(chunkDTO.getChunkIndex());
        result.setTotalChunks(chunkDTO.getTotalChunks());
        result.setAllUploaded(allUploaded);
        result.setMessage("分片上传成功");
return result;
    }
@Override
public String mergeChunks(String fileId, String fileName, Integer totalChunks, Long userId) {
        StringUtils.hasText(fileId, "文件标识不能为空");
        StringUtils.hasText(fileName, "文件名不能为空");
if (totalChunks == null || totalChunks <= 0) {
thrownewIllegalArgumentException("总分片数必须大于0");
        }
// 验证所有分片是否已上传
if (!checkAllChunksUploaded(fileId, totalChunks)) {
thrownewRuntimeException("存在未上传的分片,无法合并");
        }
try {
// 获取文件存储目录
StringdateDir= DATE_FORMATTER.format(LocalDate.now());
StringsaveDir= storagePath + File.separator + dateDir;
FilesaveDirFile=newFile(saveDir);
if (!saveDirFile.exists() && !saveDirFile.mkdirs()) {
                log.error("创建文件存储目录失败: {}", saveDir);
thrownewRuntimeException("合并失败,无法创建存储目录");
            }
// 生成唯一文件名,保留原扩展名
Stringext= FileUtil.extName(fileName);
StringuniqueFileName= IdUtil.fastSimpleUUID() + (StringUtils.hasText(ext) ? "." + ext : "");
StringfilePath= saveDir + File.separator + uniqueFileName;
// 合并分片
PathtargetPath= Paths.get(filePath);
StringchunkDir= getChunkDir(fileId);
// 按分片索引升序排列
            List<File> chunkFiles = newArrayList<>();
for (inti=0; i < totalChunks; i++) {
FilechunkFile=newFile(chunkDir, String.valueOf(i));
if (!chunkFile.exists()) {
thrownewRuntimeException("分片文件缺失: " + i);
                }
                chunkFiles.add(chunkFile);
            }
// 合并所有分片到目标文件
for (File chunkFile : chunkFiles) {
                Files.write(targetPath, Files.readAllBytes(chunkFile.toPath()), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
            }
            log.info("文件合并成功,fileId: {}, fileName: {}, savePath: {}, userId: {}",
                    fileId, fileName, filePath, userId);
// 清理临时分片文件
            deleteChunkDir(fileId);
return filePath;
        } catch (IOException e) {
            log.error("文件合并失败,fileId: {}", fileId, e);
thrownewRuntimeException("合并失败: " + e.getMessage());
        }
    }
@Override
publicbooleancheckChunkExists(String fileId, Integer chunkIndex) {
        StringUtils.hasText(fileId, "文件标识不能为空");
if (chunkIndex == null || chunkIndex < 0) {
thrownewIllegalArgumentException("分片索引必须大于等于0");
        }
StringchunkDir= getChunkDir(fileId);
FilechunkFile=newFile(chunkDir, chunkIndex.toString());
return chunkFile.exists() && chunkFile.length() > 0;
    }
/**
     * 获取分片存储目录
     */
private String getChunkDir(String fileId) {
return tempPath + File.separator + "chunks" + File.separator + fileId;
    }
/**
     * 检查所有分片是否已上传
     */
privatebooleancheckAllChunksUploaded(String fileId, int totalChunks) {
StringchunkDir= getChunkDir(fileId);
FiledirFile=newFile(chunkDir);
if (!dirFile.exists()) {
returnfalse;
        }
// 列出所有分片文件并检查数量是否匹配
        File[] chunkFiles = dirFile.listFiles();
if (chunkFiles == null || chunkFiles.length != totalChunks) {
returnfalse;
        }
// 检查每个分片是否存在
for (inti=0; i < totalChunks; i++) {
FilechunkFile=newFile(dirFile, String.valueOf(i));
if (!chunkFile.exists() || chunkFile.length() == 0) {
returnfalse;
            }
        }
returntrue;
    }
/**
     * 删除分片目录及文件
     */
privatevoiddeleteChunkDir(String fileId) {
StringchunkDir= getChunkDir(fileId);
FiledirFile=newFile(chunkDir);
if (dirFile.exists()) {
try {
// 递归删除目录
                FileUtil.del(dirFile);
                log.info("分片目录已删除: {}", chunkDir);
            } catch (Exception e) {
                log.warn("删除分片目录失败: {}", chunkDir, e);
            }
        }
    }
}
代码语言:javascript
复制

4.2 导入任务管理

4.2.1 任务相关 DTO 和 VO
代码语言:javascript
复制
package com.example.excelimport.dto;

import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;

/**
 * 创建导入任务DTO
 *
 * @author ken
 */
@Data
@Schema(description = "创建导入任务参数")
publicclassCreateImportTaskDTO {

/**
     * 文件唯一标识
     */
@Schema(description = "文件唯一标识", requiredMode = Schema.RequiredMode.REQUIRED)
private String fileId;

/**
     * 文件名
     */
@Schema(description = "文件名", requiredMode = Schema.RequiredMode.REQUIRED)
private String fileName;

/**
     * 总分片数
     */
@Schema(description = "总分片数", requiredMode = Schema.RequiredMode.REQUIRED)
private Integer totalChunks;
}
代码语言:javascript
复制

代码语言:javascript
复制
package com.example.excelimport.vo;

import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
 * 导入任务VO
 *
 * @author ken
 */
@Data
@Schema(description = "导入任务信息")
publicclassImportTaskVO {
/**
     * 任务ID
     */
@Schema(description = "任务ID")
privateLong id;
/**
     * 任务编号
     */
@Schema(description = "任务编号")
private String taskNo;
/**
     * 文件名
     */
@Schema(description = "文件名")
private String fileName;
/**
     * 文件大小(MB)
     */
@Schema(description = "文件大小(MB)")
privateDouble fileSizeMB;
/**
     * 总记录数
     */
@Schema(description = "总记录数")
private Integer totalRows;
/**
     * 成功记录数
     */
@Schema(description = "成功记录数")
private Integer successRows;
/**
     * 失败记录数
     */
@Schema(description = "失败记录数")
private Integer failRows;
/**
     * 状态:0-初始化,1-处理中,2-完成,3-失败
     */
@Schema(description = "状态:0-初始化,1-处理中,2-完成,3-失败")
private Integer status;
/**
     * 状态描述
     */
@Schema(description = "状态描述")
private String statusDesc;
/**
     * 进度(%)
     */
@Schema(description = "进度(%)")
private Integer progress;
/**
     * 错误信息
     */
@Schema(description = "错误信息")
private String errorMsg;
/**
     * 结果文件下载地址
     */
@Schema(description = "结果文件下载地址")
private String resultFileUrl;
/**
     * 创建时间
     */
@Schema(description = "创建时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime createTime;
/**
     * 更新时间
     */
@Schema(description = "更新时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime updateTime;
}
代码语言:javascript
复制

4.2.2 任务服务接口
代码语言:javascript
复制
package com.example.excelimport.service;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.excelimport.dto.CreateImportTaskDTO;
import com.example.excelimport.entity.ImportTask;
import com.example.excelimport.vo.ImportTaskVO;

/**
 * 导入任务服务
 *
 * @author ken
 */
publicinterfaceImportTaskService {

/**
     * 创建导入任务
     *
     * @param dto 创建任务参数
     * @param userId 用户ID
     * @param userName 用户名
     * @return 任务信息
     */
    ImportTaskVO createTask(CreateImportTaskDTO dto, Long userId, String userName);

/**
     * 查询任务详情
     *
     * @param taskId 任务ID
     * @param userId 用户ID
     * @return 任务详情
     */
    ImportTaskVO getTaskDetail(Long taskId, Long userId);

/**
     * 分页查询用户的导入任务
     *
     * @param page 分页参数
     * @param userId 用户ID
     * @param status 任务状态,null表示查询所有
     * @return 任务分页列表
     */
    IPage<ImportTaskVO> queryUserTasks(Page<ImportTask> page, Long userId, Integer status);

/**
     * 更新任务状态
     *
     * @param taskId 任务ID
     * @param status 新状态
     * @param progress 进度(%)
     * @param successRows 成功记录数
     * @param failRows 失败记录数
     * @param errorMsg 错误信息
     * @param resultFilePath 结果文件路径
     * @return 是否更新成功
     */
    boolean updateTaskStatus(Long taskId, Integer status, Integer progress,
                            Integer successRows, Integer failRows,
                            String errorMsg, String resultFilePath);

/**
     * 获取任务实体
     *
     * @param taskId 任务ID
     * @return 任务实体
     */
    ImportTask getTaskById(Long taskId);
}
代码语言:javascript
复制

4.2.3 任务服务实现
代码语言:javascript
复制
package com.example.excelimport.service.impl;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.excelimport.dto.CreateImportTaskDTO;
import com.example.excelimport.entity.ImportTask;
import com.example.excelimport.mapper.ImportTaskMapper;
import com.example.excelimport.service.ImportTaskService;
import com.example.excelimport.service.FileUploadService;
import com.example.excelimport.vo.ImportTaskVO;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.io.File;
import java.text.MessageFormat;
import java.util.Map;
import java.util.UUID;
/**
 * 导入任务服务实现
 *
 * @author ken
 */
@Slf4j
@Service
publicclassImportTaskServiceImplextendsServiceImpl<ImportTaskMapper, ImportTask> implementsImportTaskService {
/**
     * 任务状态描述映射
     */
privatestaticfinal Map<Integer, String> STATUS_DESC_MAP = Maps.newHashMap();
static {
        STATUS_DESC_MAP.put(0, "初始化");
        STATUS_DESC_MAP.put(1, "处理中");
        STATUS_DESC_MAP.put(2, "完成");
        STATUS_DESC_MAP.put(3, "失败");
    }
@Autowired
private FileUploadService fileUploadService;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
     * 结果文件访问路径前缀
     */
@Value("${server.servlet.context-path:}")
private String contextPath;
/**
     * 导入任务交换机
     */
@Value("${rabbitmq.exchange.import-task:import.task.exchange}")
private String importTaskExchange;
/**
     * 导入任务路由键
     */
@Value("${rabbitmq.routing-key.import-task:import.task.process}")
private String importTaskRoutingKey;
@Override
@Transactional(rollbackFor = Exception.class)
public ImportTaskVO createTask(CreateImportTaskDTO dto, Long userId, String userName) {
// 参数校验
        StringUtils.hasText(dto.getFileId(), "文件标识不能为空");
        StringUtils.hasText(dto.getFileName(), "文件名不能为空");
if (dto.getTotalChunks() == null || dto.getTotalChunks() <= 0) {
thrownewIllegalArgumentException("总分片数必须大于0");
        }
        ObjectUtils.isEmpty(userId, "用户ID不能为空");
        StringUtils.hasText(userName, "用户名不能为空");
// 合并文件分片
StringfilePath= fileUploadService.mergeChunks(dto.getFileId(), dto.getFileName(), dto.getTotalChunks(), userId);
if (!StringUtils.hasText(filePath)) {
thrownewRuntimeException("文件合并失败");
        }
// 获取文件大小
Filefile=newFile(filePath);
if (!file.exists() || !file.isFile()) {
thrownewRuntimeException("合并后的文件不存在");
        }
longfileSize= file.length();
// 创建任务记录
ImportTasktask=newImportTask();
        task.setTaskNo(generateTaskNo());
        task.setFileName(dto.getFileName());
        task.setFilePath(filePath);
        task.setFileSize(fileSize);
        task.setStatus(0); // 初始化状态
        task.setProgress(0);
        task.setUserId(userId);
        task.setUserName(userName);
intinsert= baseMapper.insert(task);
if (insert <= 0) {
            log.error("创建导入任务失败,userId: {}, fileName: {}", userId, dto.getFileName());
thrownewRuntimeException("创建导入任务失败");
        }
        log.info("创建导入任务成功,taskId: {}, taskNo: {}, fileName: {}, userId: {}",
                task.getId(), task.getTaskNo(), dto.getFileName(), userId);
// 发送消息到队列,异步处理导入
try {
            rabbitTemplate.convertAndSend(importTaskExchange, importTaskRoutingKey, task.getId());
            log.info("导入任务已发送到队列,taskId: {}", task.getId());
        } catch (Exception e) {
            log.error("发送导入任务到队列失败,taskId: {}", task.getId(), e);
// 发送失败时,更新任务状态为失败
            task.setStatus(3);
            task.setErrorMsg("任务提交失败: " + e.getMessage());
            baseMapper.updateById(task);
thrownewRuntimeException("创建任务成功,但提交处理失败,请稍后重试");
        }
return convertToVO(task);
    }
@Override
public ImportTaskVO getTaskDetail(Long taskId, Long userId) {
        ObjectUtils.isEmpty(taskId, "任务ID不能为空");
        ObjectUtils.isEmpty(userId, "用户ID不能为空");
ImportTasktask= baseMapper.selectOne(Wrappers.<ImportTask>lambdaQuery()
                .eq(ImportTask::getId, taskId)
                .eq(ImportTask::getUserId, userId));
if (task == null) {
returnnull;
        }
return convertToVO(task);
    }
@Override
public IPage<ImportTaskVO> queryUserTasks(Page<ImportTask> page, Long userId, Integer status) {
        ObjectUtils.isEmpty(userId, "用户ID不能为空");
        IPage<ImportTask> taskPage = baseMapper.selectPage(page, Wrappers.<ImportTask>lambdaQuery()
                .eq(ImportTask::getUserId, userId)
                .eq(status != null, ImportTask::getStatus, status)
                .orderByDesc(ImportTask::getCreateTime));
        IPage<ImportTaskVO> resultPage = newPage<>();
        BeanUtils.copyProperties(taskPage, resultPage);
        resultPage.setRecords(taskPage.getRecords().stream()
                .map(this::convertToVO)
                .collect(java.util.stream.Collectors.toList()));
return resultPage;
    }
@Override
@Transactional(rollbackFor = Exception.class)
publicbooleanupdateTaskStatus(Long taskId, Integer status, Integer progress,
                                   Integer successRows, Integer failRows,
                                   String errorMsg, String resultFilePath) {
        ObjectUtils.isEmpty(taskId, "任务ID不能为空");
        ObjectUtils.isEmpty(status, "状态不能为空");
ImportTasktask=newImportTask();
        task.setId(taskId);
        task.setStatus(status);
if (progress != null) {
            task.setProgress(progress);
        }
if (successRows != null) {
            task.setSuccessRows(successRows);
        }
if (failRows != null) {
            task.setFailRows(failRows);
        }
if (totalRows != null) {
            task.setTotalRows(totalRows);
        }
        task.setErrorMsg(errorMsg);
        task.setResultFilePath(resultFilePath);
intupdate= baseMapper.updateById(task);
return update > 0;
    }
@Override
public ImportTask getTaskById(Long taskId) {
if (taskId == null) {
returnnull;
        }
return baseMapper.selectById(taskId);
    }
/**
     * 生成任务编号
     */
private String generateTaskNo() {
return"IMPT" + System.currentTimeMillis() + UUID.randomUUID().toString().substring(0, 8).toUpperCase();
    }
/**
     * 转换为VO对象
     */
private ImportTaskVO convertToVO(ImportTask task) {
ImportTaskVOvo=newImportTaskVO();
        BeanUtils.copyProperties(task, vo);
// 计算文件大小(MB)
if (task.getFileSize() != null) {
            vo.setFileSizeMB(task.getFileSize() / (1024.0 * 1024.0));
        }
// 设置状态描述
        vo.setStatusDesc(STATUS_DESC_MAP.getOrDefault(task.getStatus(), "未知状态"));
// 设置结果文件下载地址
if (StringUtils.hasText(task.getResultFilePath())) {
StringrelativePath= task.getResultFilePath().replace(File.separator, "/");
            vo.setResultFileUrl(contextPath + "/api/v1/files/download?path=" + relativePath);
        }
return vo;
    }
}
代码语言:javascript
复制

4.3 控制器实现

代码语言:javascript
复制
package com.example.excelimport.controller;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.excelimport.dto.CreateImportTaskDTO;
import com.example.excelimport.dto.FileChunkDTO;
import com.example.excelimport.entity.ImportTask;
import com.example.excelimport.service.ImportTaskService;
import com.example.excelimport.service.FileUploadService;
import com.example.excelimport.vo.FileUploadResultVO;
import com.example.excelimport.vo.ImportTaskVO;
import com.example.excelimport.vo.ResponseVO;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
/**
 * 文件导入控制器
 *
 * @author ken
 */
@Slf4j
@RestController
@RequestMapping("/api/v1/import")
@Tag(name = "文件导入接口", description = "大文件分片上传及异步导入处理")
publicclassImportController {
@Autowired
private FileUploadService fileUploadService;
@Autowired
private ImportTaskService importTaskService;
@Operation(summary = "上传文件分片", description = "将大文件拆分为分片上传")
@PostMapping("/file/chunk")
public ResponseVO<FileUploadResultVO> uploadChunk(
@Parameter(description = "用户ID", required = true)@RequestHeader("X-User-Id")Long userId,
@Parameter(description = "文件分片信息", required = true)@ModelAttribute FileChunkDTO chunkDTO) {
        log.info("接收文件分片上传请求,fileId: {}, chunkIndex: {}, userId: {}",
                chunkDTO.getFileId(), chunkDTO.getChunkIndex(), userId);
        FileUploadResultVO result = fileUploadService.uploadChunk(chunkDTO, userId);
return ResponseVO.success(result);
    }
@Operation(summary = "检查分片是否已上传", description = "用于断点续传时检查分片状态")
@GetMapping("/file/chunk/exists")
public ResponseVO<Boolean> checkChunkExists(
@Parameter(description = "用户ID", required = true)@RequestHeader("X-User-Id")Long userId,
@Parameter(description = "文件唯一标识", required = true)@RequestParam String fileId,
@Parameter(description = "分片索引", required = true)@RequestParam Integer chunkIndex) {
        log.info("检查分片是否已上传,fileId: {}, chunkIndex: {}, userId: {}", fileId, chunkIndex, userId);
        boolean exists = fileUploadService.checkChunkExists(fileId, chunkIndex);
return ResponseVO.success(exists);
    }
@Operation(summary = "创建导入任务", description = "合并文件分片并创建导入任务")
@PostMapping("/task")
public ResponseVO<ImportTaskVO> createImportTask(
@Parameter(description = "用户ID", required = true)@RequestHeader("X-User-Id")Long userId,
@Parameter(description = "用户名", required = true)@RequestHeader("X-User-Name") String userName,
@Parameter(description = "创建任务参数", required = true)@RequestBody CreateImportTaskDTO dto) {
        log.info("创建导入任务,fileId: {}, fileName: {}, userId: {}", dto.getFileId(), dto.getFileName(), userId);
        ImportTaskVO taskVO = importTaskService.createTask(dto, userId, userName);
return ResponseVO.success(taskVO);
    }
@Operation(summary = "查询任务详情", description = "获取导入任务的详细信息和进度")
@GetMapping("/task/{taskId}")
public ResponseVO<ImportTaskVO> getTaskDetail(
@Parameter(description = "用户ID", required = true)@RequestHeader("X-User-Id")Long userId,
@Parameter(description = "任务ID", required = true)@PathVariableLong taskId) {
        log.info("查询任务详情,taskId: {}, userId: {}", taskId, userId);
        ImportTaskVO taskVO = importTaskService.getTaskDetail(taskId, userId);
return ResponseVO.success(taskVO);
    }
@Operation(summary = "查询用户导入任务列表", description = "分页查询当前用户的导入任务")
@GetMapping("/tasks")
public ResponseVO<IPage<ImportTaskVO>> queryUserTasks(
@Parameter(description = "用户ID", required = true)@RequestHeader("X-User-Id")Long userId,
@Parameter(description = "页码,从1开始")@RequestParam(defaultValue = "1") Integer pageNum,
@Parameter(description = "每页条数")@RequestParam(defaultValue = "10") Integer pageSize,
@Parameter(description = "任务状态:0-初始化,1-处理中,2-完成,3-失败")@RequestParam(required = false) Integer status) {
        log.info("查询用户导入任务列表,userId: {}, pageNum: {}, pageSize: {}, status: {}",
                userId, pageNum, pageSize, status);
        Page<ImportTask> page = new Page<>(pageNum, pageSize);
        IPage<ImportTaskVO> taskPage = importTaskService.queryUserTasks(page, userId, status);
return ResponseVO.success(taskPage);
    }
@Operation(summary = "下载导入结果文件", description = "下载包含导入结果的Excel文件")
@GetMapping("/file/result")
public void downloadResultFile(
@Parameter(description = "用户ID", required = true)@RequestHeader("X-User-Id")Long userId,
@Parameter(description = "任务ID", required = true)@RequestParamLong taskId,
            HttpServletResponse response) throws IOException {
        log.info("下载导入结果文件,taskId: {}, userId: {}", taskId, userId);
// 获取任务信息
        ImportTask task = importTaskService.getTaskById(taskId);
if (task == null) {
            response.sendError(HttpServletResponse.SC_NOT_FOUND, "任务不存在");
return;
        }
// 验证权限
if (!task.getUserId().equals(userId)) {
            response.sendError(HttpServletResponse.SC_FORBIDDEN, "没有权限下载该文件");
return;
        }
// 检查结果文件是否存在
        String resultFilePath = task.getResultFilePath();
if (!org.springframework.util.StringUtils.hasText(resultFilePath)) {
            response.sendError(HttpServletResponse.SC_NOT_FOUND, "结果文件不存在");
return;
        }
        File file = new File(resultFilePath);
if (!file.exists() || !file.isFile()) {
            response.sendError(HttpServletResponse.SC_NOT_FOUND, "结果文件不存在");
return;
        }
// 设置响应头
        response.setContentType("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet");
        String fileName = "导入结果_" + task.getFileName();
        String encodedFileName = URLEncoder.encode(fileName, StandardCharsets.UTF_8.name());
        response.setHeader("Content-Disposition", "attachment; filename*=UTF-8''" + encodedFileName);
        response.setContentLengthLong(file.length());
// 写入文件内容
try (FileInputStream fis = new FileInputStream(file);
             OutputStream os = response.getOutputStream()) {
            byte[] buffer = new byte[1024 * 1024]; // 1MB缓冲区
            int len;
while ((len = fis.read(buffer)) != -1) {
                os.write(buffer, 0, len);
            }
            os.flush();
        } catch (IOException e) {
            log.error("下载结果文件失败,taskId: {}", taskId, e);
throw e;
        }
    }
}
代码语言:javascript
复制

五、异步处理与数据导入

5.1 消息队列配置

代码语言:javascript
复制
package com.example.excelimport.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
 * RabbitMQ配置
 *
 * @authorken
 */
@Configuration
publicclassRabbitMQConfig {
/**
     * 导入任务交换机
     */
@Value("${rabbitmq.exchange.import-task:import.task.exchange}")
privateString importTaskExchange;
/**
     * 导入任务队列
     */
@Value("${rabbitmq.queue.import-task:import.task.queue}")
privateString importTaskQueue;
/**
     * 导入任务路由键
     */
@Value("${rabbitmq.routing-key.import-task:import.task.process}")
privateString importTaskRoutingKey;
/**
     * 死信交换机
     */
@Value("${rabbitmq.exchange.dlq:import.dlq.exchange}")
privateString dlqExchange;
/**
     * 死信队列
     */
@Value("${rabbitmq.queue.dlq:import.dlq.queue}")
privateString dlqQueue;
/**
     * 死信路由键
     */
@Value("${rabbitmq.routing-key.dlq:import.dlq.routing}")
privateString dlqRoutingKey;
/**
     * 死信交换机
     */
@Bean
publicDirectExchangedlqExchange() {
returnnewDirectExchange(dlqExchange, true, false);
    }
/**
     * 死信队列
     */
@Bean
publicQueuedlqQueue() {
returnQueueBuilder.durable(dlqQueue)
                .build();
    }
/**
     * 死信队列绑定
     */
@Bean
publicBindingdlqBinding() {
returnBindingBuilder.bind(dlqQueue())
                .to(dlqExchange())
                .with(dlqRoutingKey);
    }
/**
     * 导入任务交换机
     */
@Bean
publicDirectExchangeimportTaskExchange() {
returnnewDirectExchange(importTaskExchange, true, false);
    }
/**
     * 导入任务队列
     * 设置死信队列,用于处理失败的任务
     */
@Bean
publicQueueimportTaskQueue() {
Map<String, Object> arguments = newHashMap<>(3);
// 绑定死信交换机
arguments.put("x-dead-letter-exchange", dlqExchange);
// 绑定死信路由键
arguments.put("x-dead-letter-routing-key", dlqRoutingKey);
// 消息过期时间 30分钟
arguments.put("x-message-ttl", 30 * 60 * 1000);
returnQueueBuilder.durable(importTaskQueue)
                .withArguments(arguments)
                .build();
    }
/**
     * 导入任务队列绑定
     */
@Bean
publicBindingimportTaskBinding() {
returnBindingBuilder.bind(importTaskQueue())
                .to(importTaskExchange())
                .with(importTaskRoutingKey);
    }
}
代码语言:javascript
复制

5.2 Excel 数据模型与解析

5.2.1 导入数据模型
代码语言:javascript
复制
package com.example.excelimport.excel;

import com.alibaba.excel.annotation.ExcelProperty;
import com.alibaba.excel.annotation.write.style.ColumnWidth;
import com.alibaba.excel.annotation.write.style.ContentRowHeight;
import com.alibaba.excel.annotation.write.style.HeadRowHeight;
import lombok.Data;

import java.math.BigDecimal;

/**
 * 商品导入Excel模型
 *
 * @author ken
 */
@Data
@HeadRowHeight(20)
@ContentRowHeight(18)
@ColumnWidth(20)
publicclassProductImportModel {

@ExcelProperty(value = "商品编码", index = 0)
private String productCode;

@ExcelProperty(value = "商品名称", index = 1)
private String productName;

@ExcelProperty(value = "分类ID", index = 2)
privateLong categoryId;

@ExcelProperty(value = "价格", index = 3)
private BigDecimal price;

@ExcelProperty(value = "库存", index = 4)
private Integer stock;

@ExcelProperty(value = "状态(0-禁用 1-启用)", index = 5)
private Integer status;

/**
     * 导入结果状态:成功/失败
     */
@ExcelProperty(value = "导入结果", index = 6)
private String importStatus;

/**
     * 导入失败原因
     */
@ExcelProperty(value = "失败原因", index = 7)
private String errorMsg;
}
代码语言:javascript
复制

5.2.2 Excel 读取监听器
代码语言:javascript
复制
package com.example.excelimport.excel;

import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.read.listener.ReadListener;
import com.alibaba.excel.util.ListUtils;
import com.example.excelimport.entity.BusinessData;
import com.example.excelimport.service.BusinessDataService;
import com.example.excelimport.service.ImportTaskService;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * 商品导入Excel监听器
 *
 * @author ken
 */
@Slf4j
publicclassProductImportListenerimplementsReadListener<ProductImportModel> {
/**
     * 批处理阈值
     */
privatestaticfinalintBATCH_COUNT=1000;
/**
     * 缓存的数据列表
     */
private List<ProductImportModel> cachedDataList = ListUtils.newArrayListWithExpectedSize(BATCH_COUNT);
/**
     * 所有数据列表(用于生成结果文件)
     */
private List<ProductImportModel> allDataList = Lists.newArrayList();
/**
     * 当前行号(从1开始,包含表头)
     */
privateAtomicIntegerrowNum=newAtomicInteger(0);
/**
     * 任务ID
     */
privatefinal Long taskId;
/**
     * 处理用户ID
     */
privatefinal Long userId;
/**
     * 业务数据服务
     */
privatefinal BusinessDataService businessDataService;
/**
     * 导入任务服务
     */
privatefinal ImportTaskService importTaskService;
/**
     * 构造函数
     */
publicProductImportListener(Long taskId, Long userId,
                                BusinessDataService businessDataService,
                                ImportTaskService importTaskService) {
this.taskId = taskId;
this.userId = userId;
this.businessDataService = businessDataService;
this.importTaskService = importTaskService;
    }
/**
     * 每解析一行数据都会调用此方法
     */
@Override
publicvoidinvoke(ProductImportModel data, AnalysisContext context) {
// 行号递增(表头行也会被计数,所以需要过滤)
intcurrentRowNum= rowNum.incrementAndGet();
// 跳过表头行
if (currentRowNum == 1) {
return;
        }
// 记录原始行号(展示给用户时从1开始)
        data.setRowNum(currentRowNum - 1);
// 添加到缓存列表
        cachedDataList.add(data);
        allDataList.add(data);
// 达到批处理阈值时进行处理
if (cachedDataList.size() >= BATCH_COUNT) {
            processBatchData();
// 清空缓存
            cachedDataList = ListUtils.newArrayListWithExpectedSize(BATCH_COUNT);
// 更新进度
            updateTaskProgress();
        }
    }
/**
     * 解析完成后调用此方法
     */
@Override
publicvoiddoAfterAllAnalysed(AnalysisContext context) {
// 处理剩余数据
if (!CollectionUtils.isEmpty(cachedDataList)) {
            processBatchData();
        }
        log.info("Excel解析完成,taskId: {}, 总记录数: {}", taskId, allDataList.size());
// 更新总记录数
        importTaskService.updateTaskStatus(taskId, null, null, null, null, null, null, allDataList.size());
    }
/**
     * 处理批量数据
     */
privatevoidprocessBatchData() {
        log.info("开始处理批量数据,taskId: {}, 数量: {}", taskId, cachedDataList.size());
try {
// 1. 数据校验
            businessDataService.validateData(cachedDataList);
// 2. 检查重复数据
            businessDataService.checkDuplicateData(cachedDataList);
// 3. 过滤有效数据
            List<BusinessData> validDataList = businessDataService.filterValidData(cachedDataList, userId);
// 4. 批量导入数据
if (!CollectionUtils.isEmpty(validDataList)) {
                businessDataService.batchImportData(validDataList);
            }
        } catch (Exception e) {
            log.error("处理批量数据失败,taskId: {}", taskId, e);
// 标记该批次数据为失败
            cachedDataList.forEach(data -> {
if (data.getImportStatus() == null) {
                    data.setImportStatus("失败");
                    data.setErrorMsg("系统处理错误: " + e.getMessage());
                }
            });
        }
    }
/**
     * 更新任务进度
     */
privatevoidupdateTaskProgress() {
try {
// 计算总记录数(预估)
inttotalCount= allDataList.size() + (cachedDataList.size() > 0 ? BATCH_COUNT : 0);
if (totalCount == 0) {
return;
            }
// 计算进度百分比
intprogress= Math.min(90, (allDataList.size() * 100) / totalCount);
            importTaskService.updateTaskStatus(taskId, 1, progress, null, null, null, null, null);
        } catch (Exception e) {
            log.error("更新任务进度失败,taskId: {}", taskId, e);
        }
    }
/**
     * 获取所有数据列表
     */
public List<ProductImportModel> getAllDataList() {
return allDataList;
    }
}
代码语言:javascript
复制

5.3 数据校验与去重服务

5.3.1 业务服务接口
代码语言:javascript
复制
package com.example.excelimport.service;

import com.example.excelimport.entity.BusinessData;
import com.example.excelimport.excel.ProductImportModel;

import java.util.List;

/**
 * 业务数据服务
 *
 * @author ken
 */
publicinterfaceBusinessDataService {

/**
     * 数据校验
     *
     * @param dataList 导入数据列表
     */
voidvalidateData(List<ProductImportModel> dataList);

/**
     * 检查重复数据(包括列表内部重复和与数据库重复)
     *
     * @param dataList 导入数据列表
     */
voidcheckDuplicateData(List<ProductImportModel> dataList);

/**
     * 过滤有效数据
     *
     * @param dataList 导入数据列表
     * @param userId 用户ID
     * @return 有效业务数据列表
     */
    List<BusinessData> filterValidData(List<ProductImportModel> dataList, Long userId);

/**
     * 批量导入数据
     *
     * @param dataList 业务数据列表
     * @return 导入成功数量
     */
intbatchImportData(List<BusinessData> dataList);

/**
     * 根据编码列表查询数据
     *
     * @param codeList 编码列表
     * @return 业务数据列表
     */
    List<BusinessData> getByCodes(List<String> codeList);
}
代码语言:javascript
复制

5.3.2 业务服务实现
代码语言:javascript
复制
package com.example.excelimport.service.impl;

import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.excelimport.entity.BusinessData;
import com.example.excelimport.excel.ProductImportModel;
import com.example.excelimport.mapper.BusinessDataMapper;
import com.example.excelimport.service.BusinessDataService;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
 * 业务数据服务实现
 *
 * @author ken
 */
@Slf4j
@Service
publicclassBusinessDataServiceImplimplementsBusinessDataService {
/**
     * Redis分布式锁前缀
     */
privatestaticfinalStringLOCK_PREFIX="import:lock:product:";
/**
     * 分布式锁过期时间(秒)
     */
privatestaticfinalintLOCK_EXPIRE_SECONDS=60;
@Autowired
private BusinessDataMapper businessDataMapper;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
publicvoidvalidateData(List<ProductImportModel> dataList) {
if (CollectionUtils.isEmpty(dataList)) {
return;
        }
for (ProductImportModel data : dataList) {
// 重置之前的校验结果
            data.setImportStatus(null);
            data.setErrorMsg(null);
            List<String> errors = Lists.newArrayList();
// 商品编码校验
if (!StringUtils.hasText(data.getProductCode())) {
                errors.add("商品编码不能为空");
            } elseif (data.getProductCode().length() > 64) {
                errors.add("商品编码长度不能超过64个字符");
            }
// 商品名称校验
if (!StringUtils.hasText(data.getProductName())) {
                errors.add("商品名称不能为空");
            } elseif (data.getProductName().length() > 255) {
                errors.add("商品名称长度不能超过255个字符");
            }
// 分类ID校验
if (data.getCategoryId() == null || data.getCategoryId() <= 0) {
                errors.add("分类ID必须大于0");
            }
// 价格校验
if (data.getPrice() == null) {
                errors.add("价格不能为空");
            } elseif (data.getPrice().compareTo(BigDecimal.ZERO) <= 0) {
                errors.add("价格必须大于0");
            } elseif (data.getPrice().scale() > 2) {
                errors.add("价格最多保留两位小数");
            }
// 库存校验
if (data.getStock() == null) {
                errors.add("库存不能为空");
            } elseif (data.getStock() < 0) {
                errors.add("库存不能为负数");
            }
// 状态校验
if (data.getStatus() == null) {
                errors.add("状态不能为空");
            } elseif (data.getStatus() != 0 && data.getStatus() != 1) {
                errors.add("状态必须为0(禁用)或1(启用)");
            }
// 设置校验结果
if (!errors.isEmpty()) {
                data.setImportStatus("失败");
                data.setErrorMsg(String.join(";", errors));
            }
        }
    }
@Override
publicvoidcheckDuplicateData(List<ProductImportModel> dataList) {
if (CollectionUtils.isEmpty(dataList)) {
return;
        }
// 1. 过滤已校验失败的数据
        List<ProductImportModel> validDataList = dataList.stream()
                .filter(data -> data.getImportStatus() == null)
                .collect(Collectors.toList());
if (CollectionUtils.isEmpty(validDataList)) {
return;
        }
// 2. 检查列表内部重复
        Multimap<String, ProductImportModel> codeMap = HashMultimap.create();
for (ProductImportModel data : validDataList) {
            codeMap.put(data.getProductCode(), data);
        }
// 标记重复数据
for (Map.Entry<String, ProductImportModel> entry : codeMap.entries()) {
Stringcode= entry.getKey();
ProductImportModeldata= entry.getValue();
// 如果编码对应的记录数大于1,则表示有重复
if (codeMap.get(code).size() > 1) {
                data.setImportStatus("失败");
                data.setErrorMsg("商品编码在导入文件中重复");
            }
        }
// 3. 过滤掉内部重复的数据,检查数据库中是否已存在
        List<ProductImportModel> uniqueDataList = validDataList.stream()
                .filter(data -> data.getImportStatus() == null)
                .collect(Collectors.toList());
if (CollectionUtils.isEmpty(uniqueDataList)) {
return;
        }
// 4. 批量查询数据库中已存在的编码
        List<String> codeList = uniqueDataList.stream()
                .map(ProductImportModel::getProductCode)
                .collect(Collectors.toList());
        List<BusinessData> existDataList = businessDataMapper.selectByCodes(codeList);
if (!CollectionUtils.isEmpty(existDataList)) {
            Set<String> existCodeSet = existDataList.stream()
                    .map(BusinessData::getProductCode)
                    .collect(Collectors.toSet());
// 标记数据库中已存在的数据
for (ProductImportModel data : uniqueDataList) {
if (existCodeSet.contains(data.getProductCode())) {
                    data.setImportStatus("失败");
                    data.setErrorMsg("商品编码在系统中已存在");
                }
            }
        }
    }
@Override
public List<BusinessData> filterValidData(List<ProductImportModel> dataList, Long userId) {
if (CollectionUtils.isEmpty(dataList)) {
return Lists.newArrayList();
        }
// 过滤出校验通过的数据
        List<ProductImportModel> validDataList = dataList.stream()
                .filter(data -> data.getImportStatus() == null)
                .collect(Collectors.toList());
if (CollectionUtils.isEmpty(validDataList)) {
return Lists.newArrayList();
        }
// 转换为业务实体
        List<BusinessData> businessDataList = Lists.newArrayListWithExpectedSize(validDataList.size());
for (ProductImportModel data : validDataList) {
BusinessDatabusinessData=newBusinessData();
            BeanUtils.copyProperties(data, businessData);
            businessData.setCreateUser(userId);
            businessData.setCreateTime(LocalDateTime.now());
            businessDataList.add(businessData);
        }
return businessDataList;
    }
@Override
@Transactional(rollbackFor = Exception.class)
publicintbatchImportData(List<BusinessData> dataList) {
if (CollectionUtils.isEmpty(dataList)) {
return0;
        }
        log.info("开始批量导入数据,数量: {}", dataList.size());
// 1. 并发控制:使用分布式锁确保数据唯一性
        List<BusinessData> finalDataList = Lists.newArrayList();
for (BusinessData data : dataList) {
StringlockKey= LOCK_PREFIX + data.getProductCode();
Booleanlocked= redisTemplate.opsForValue().setIfAbsent(lockKey, "1", LOCK_EXPIRE_SECONDS, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(locked)) {
try {
// 再次检查数据库,确保没有并发插入
BusinessDataexistData= businessDataMapper.selectOne(Wrappers.<BusinessData>lambdaQuery()
                            .eq(BusinessData::getProductCode, data.getProductCode())
                            .eq(BusinessData::getDeleted, 0));
if (existData == null) {
                        finalDataList.add(data);
                    }
                } finally {
// 释放锁
                    redisTemplate.delete(lockKey);
                }
            } else {
                log.warn("获取分布式锁失败,可能有并发操作,productCode: {}", data.getProductCode());
            }
        }
if (CollectionUtils.isEmpty(finalDataList)) {
            log.info("没有可导入的数据(可能被并发操作拦截)");
return0;
        }
// 2. 批量插入数据
intinsertCount= businessDataMapper.batchInsert(finalDataList);
        log.info("批量导入完成,计划导入: {}, 实际导入: {}", finalDataList.size(), insertCount);
return insertCount;
    }
@Override
public List<BusinessData> getByCodes(List<String> codeList) {
if (CollectionUtils.isEmpty(codeList)) {
return Lists.newArrayList();
        }
return businessDataMapper.selectByCodes(codeList);
    }
}
代码语言:javascript
复制

5.4 消息消费者(异步处理服务)

代码语言:javascript
复制
package com.example.excelimport.consumer;

import com.alibaba.excel.EasyExcel;
import com.example.excelimport.entity.ImportTask;
import com.example.excelimport.excel.ProductImportModel;
import com.example.excelimport.excel.ProductImportListener;
import com.example.excelimport.service.BusinessDataService;
import com.example.excelimport.service.ImportTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.io.File;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.UUID;
/**
 * 导入任务消费者
 *
 * @author ken
 */
@Slf4j
@Component
publicclassImportTaskConsumer {
@Autowired
private ImportTaskService importTaskService;
@Autowired
private BusinessDataService businessDataService;
/**
     * 结果文件存储路径
     */
@Value("${file.storage.result-path}")
private String resultPath;
/**
     * 日期格式化器
     */
privatestaticfinalDateTimeFormatterDATE_FORMATTER= DateTimeFormatter.ofPattern("yyyyMMdd");
/**
     * 处理导入任务
     */
@RabbitListener(queues = "${rabbitmq.queue.import-task:import.task.queue}")
publicvoidprocessImportTask(Long taskId) {
        log.info("开始处理导入任务,taskId: {}", taskId);
if (taskId == null) {
            log.error("导入任务ID为空,跳过处理");
return;
        }
ImportTasktask=null;
try {
// 1. 获取任务信息
            task = importTaskService.getTaskById(taskId);
if (task == null) {
                log.error("导入任务不存在,taskId: {}", taskId);
return;
            }
            log.info("开始处理文件导入,taskId: {}, fileName: {}, filePath: {}",
                    taskId, task.getFileName(), task.getFilePath());
// 2. 更新任务状态为处理中
            importTaskService.updateTaskStatus(taskId, 1, 5, 0, 0, null, null, 0);
// 3. 验证文件是否存在
StringfilePath= task.getFilePath();
if (!StringUtils.hasText(filePath)) {
thrownewRuntimeException("文件路径为空");
            }
Filefile=newFile(filePath);
if (!file.exists() || !file.isFile()) {
thrownewRuntimeException("文件不存在或不是有效文件: " + filePath);
            }
// 4. 创建监听器并读取Excel
ProductImportListenerlistener=newProductImportListener(
                    taskId, task.getUserId(), businessDataService, importTaskService);
            EasyExcel.read(file, ProductImportModel.class, listener)
                    .sheet()
                    .doRead();
// 5. 获取所有数据并统计结果
            List<ProductImportModel> allDataList = listener.getAllDataList();
inttotalCount= allDataList.size();
intsuccessCount= (int) allDataList.stream()
                    .filter(data -> "成功".equals(data.getImportStatus()))
                    .count();
intfailCount= totalCount - successCount;
// 6. 生成结果文件
StringresultFilePath= generateResultFile(task, allDataList);
// 7. 更新任务状态为完成
            importTaskService.updateTaskStatus(
                    taskId, 2, 100, successCount, failCount, null, resultFilePath, totalCount);
            log.info("导入任务处理完成,taskId: {}, 总记录数: {}, 成功: {}, 失败: {}",
                    taskId, totalCount, successCount, failCount);
        } catch (Exception e) {
            log.error("导入任务处理失败,taskId: {}", taskId, e);
// 更新任务状态为失败
StringerrorMsg= e.getMessage();
if (errorMsg != null && errorMsg.length() > 1000) {
                errorMsg = errorMsg.substring(0, 1000) + "...";
            }
if (task != null) {
                importTaskService.updateTaskStatus(taskId, 3, null, null, null, errorMsg, null, null);
            }
        }
    }
/**
     * 处理死信队列中的失败任务
     */
@RabbitListener(queues = "${rabbitmq.queue.dlq:import.dlq.queue}")
publicvoidprocessFailedTask(Long taskId) {
        log.warn("接收到死信队列中的失败任务,taskId: {}", taskId);
// 这里可以做一些告警或者重试处理
// 例如:记录到失败任务表,通知管理员处理
ImportTasktask= importTaskService.getTaskById(taskId);
if (task != null) {
            log.warn("失败任务详情:taskNo: {}, fileName: {}, status: {}",
                    task.getTaskNo(), task.getFileName(), task.getStatus());
// TODO: 发送告警通知
        }
    }
/**
     * 生成结果文件
     */
private String generateResultFile(ImportTask task, List<ProductImportModel> dataList) {
try {
// 创建结果文件存储目录
StringdateDir= DATE_FORMATTER.format(LocalDate.now());
StringsaveDir= resultPath + File.separator + dateDir;
FilesaveDirFile=newFile(saveDir);
if (!saveDirFile.exists() && !saveDirFile.mkdirs()) {
                log.error("创建结果文件目录失败: {}", saveDir);
thrownewRuntimeException("生成结果文件失败,无法创建存储目录");
            }
// 生成结果文件名
StringoriginalFileName= task.getFileName();
Stringext= originalFileName.contains(".") ? originalFileName.substring(originalFileName.lastIndexOf(".")) : ".xlsx";
StringresultFileName="result_" + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().substring(0, 8) + ext;
StringresultFilePath= saveDir + File.separator + resultFileName;
// 写入结果文件
            EasyExcel.write(resultFilePath, ProductImportModel.class)
                    .sheet("导入结果")
                    .doWrite(dataList);
            log.info("结果文件生成成功,taskId: {}, filePath: {}", task.getId(), resultFilePath);
return resultFilePath;
        } catch (Exception e) {
            log.error("生成结果文件失败,taskId: {}", task.getId(), e);
thrownewRuntimeException("生成结果文件失败: " + e.getMessage());
        }
    }
}
代码语言:javascript
复制

六、并发控制与性能优化

6.1 分布式锁实现

在多用户并发上传的场景下,需要确保数据的唯一性,避免重复导入。我们使用 Redis 实现分布式锁来控制并发访问:

代码语言:javascript
复制
package com.example.excelimport.util;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
 * Redis分布式锁实现
 *
 * @author ken
 */
@Slf4j
@Component
publicclassRedisDistributedLockimplementsLock {
privatefinal RedisTemplate<String, Object> redisTemplate;
/**
     * 锁的默认过期时间(毫秒)
     */
privatestaticfinallongDEFAULT_EXPIRE_MILLIS=30000;
/**
     * 锁的键
     */
privatefinal String lockKey;
/**
     * 锁的持有者标识
     */
privatefinal String lockValue;
/**
     * 锁的过期时间(毫秒)
     */
privatefinallong expireMillis;
/**
     * 构造函数
     */
publicRedisDistributedLock(RedisTemplate<String, Object> redisTemplate, String lockKey) {
this(redisTemplate, lockKey, UUID.randomUUID().toString(), DEFAULT_EXPIRE_MILLIS);
    }
/**
     * 构造函数
     */
publicRedisDistributedLock(RedisTemplate<String, Object> redisTemplate, String lockKey, String lockValue, long expireMillis) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey;
this.lockValue = lockValue;
this.expireMillis = expireMillis;
    }
@Override
publicvoidlock() {
// 循环获取锁,直到成功
while (!tryLock()) {
// 短暂休眠,避免过度消耗CPU
try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
return;
            }
        }
    }
@Override
publicvoidlockInterruptibly()throws InterruptedException {
if (Thread.interrupted()) {
thrownewInterruptedException();
        }
// 循环获取锁,直到成功或被中断
while (!tryLock()) {
            Thread.sleep(50);
if (Thread.interrupted()) {
thrownewInterruptedException();
            }
        }
    }
@Override
publicbooleantryLock() {
return tryLock(expireMillis, TimeUnit.MILLISECONDS);
    }
@Override
publicbooleantryLock(long time, TimeUnit unit) {
longmillisToWait= unit.toMillis(time);
longstart= System.currentTimeMillis();
// 循环获取锁,直到成功或超时
while (true) {
// 尝试获取锁
Booleansuccess= redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireMillis, TimeUnit.MILLISECONDS);
if (Boolean.TRUE.equals(success)) {
                log.debug("获取分布式锁成功,lockKey: {}, lockValue: {}", lockKey, lockValue);
returntrue;
            }
// 检查是否超时
if (System.currentTimeMillis() - start > millisToWait) {
                log.debug("获取分布式锁超时,lockKey: {}, lockValue: {}", lockKey, lockValue);
returnfalse;
            }
// 短暂休眠,避免过度消耗CPU
try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
returnfalse;
            }
        }
    }
@Override
publicvoidunlock() {
// 使用Lua脚本保证删除操作的原子性
Stringscript="if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        script = script.replace("ARGV[1]", "'" + lockValue + "'");
Longresult= (Long) redisTemplate.execute(newDefaultRedisScript<>(script, Long.class), Lists.newArrayList(lockKey));
if (result != null && result > 0) {
            log.debug("释放分布式锁成功,lockKey: {}, lockValue: {}", lockKey, lockValue);
        } else {
            log.warn("释放分布式锁失败,可能锁已过期或被其他线程持有,lockKey: {}, lockValue: {}", lockKey, lockValue);
        }
    }
@Override
public Condition newCondition() {
thrownewUnsupportedOperationException("RedisDistributedLock不支持Condition");
    }
/**
     * 获取锁的键
     */
public String getLockKey() {
return lockKey;
    }
/**
     * 获取锁的持有者标识
     */
public String getLockValue() {
return lockValue;
    }
}
代码语言:javascript
复制

6.2 性能优化策略

  1. 分批次处理将大量数据分成小批次处理,避免内存溢出
  2. 异步处理使用消息队列和异步任务,避免阻塞主线程
  3. 缓存热点数据将频繁访问的数据(如分类信息)缓存到 Redis
  4. 批量操作使用 MyBatis-Plus 的批量插入功能,减少数据库交互
  5. 索引优化为商品编码等查询条件建立唯一索引
  6. 文件读写优化使用缓冲流读写文件,设置合理的缓冲区大小
  7. 并发控制使用分布式锁控制并发写入,避免数据冲突
  8. JVM 优化根据实际情况调整 JVM 参数,尤其是堆内存大小
代码语言:javascript
复制
package com.example.excelimport.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 异步任务配置
 *
 * @author ken
 */
@Configuration
@EnableAsync
publicclassAsyncConfig {

/**
     * 核心线程数
     */
@Value("${async.executor.core-pool-size:4}")
private int corePoolSize;

/**
     * 最大线程数
     */
@Value("${async.executor.max-pool-size:16}")
private int maxPoolSize;

/**
     * 队列容量
     */
@Value("${async.executor.queue-capacity:100}")
private int queueCapacity;

/**
     * 线程存活时间(秒)
     */
@Value("${async.executor.keep-alive-seconds:60}")
private int keepAliveSeconds;

/**
     * 线程名称前缀
     */
@Value("${async.executor.thread-name-prefix:ExcelImport-}")
private String threadNamePrefix;

/**
     * 异步任务执行器
     */
@Bean(name = "asyncExecutor")
public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
        executor.setCorePoolSize(corePoolSize);
// 最大线程数
        executor.setMaxPoolSize(maxPoolSize);
// 队列容量
        executor.setQueueCapacity(queueCapacity);
// 线程存活时间
        executor.setKeepAliveSeconds(keepAliveSeconds);
// 线程名称前缀
        executor.setThreadNamePrefix(threadNamePrefix);

// 拒绝策略:当线程池和队列都满了,由提交任务的线程执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

// 初始化
        executor.initialize();
return executor;
    }
}
代码语言:javascript
复制

七、总结与扩展

本文详细介绍了一个大 Excel 文件异步上传和导入的完整解决方案,从前端分片上传到后端异步处理,再到数据校验、并发控制和结果生成,涵盖了整个流程的关键技术点。

该方案的核心优势在于:

  1. 高性能通过分片上传、异步处理、批量操作等技术,支持处理几百兆的大型 Excel 文件
  2. 高可靠性使用消息队列解耦,结合死信队列处理失败任务,确保数据不丢失
  3. 数据一致性通过分布式锁和数据库唯一索引,有效防止并发导入导致的数据重复
  4. 良好的用户体验异步处理不阻塞用户操作,提供进度查询和详细的结果反馈

可扩展方向

  1. 断点续传优化结合文件哈希校验,实现更高效的断点续传
  2. 分布式部署将文件存储迁移到分布式文件系统(如 MinIO),支持多节点部署
  3. 任务优先级为不同用户或业务场景设置任务优先级
  4. 监控告警增加详细的监控指标和告警机制,及时发现和处理问题
  5. 导入模板管理支持多种导入模板和动态配置校验规则
  6. 大数据量优化对于超大规模数据(千万级以上),可考虑使用 Spark 等大数据处理框架

通过本文提供的方案和代码实现,开发者可以快速构建一个稳定、高效的大 Excel 导入功能,满足企业级应用的需求。同时,也可以根据实际业务场景进行定制和扩展,进一步提升系统的性能和可靠性。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2025-09-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 果酱带你啃java 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、需求分析与架构设计
    • 1.1 核心业务需求
    • 1.2 技术挑战
    • 1.3 整体架构设计
  • 二、技术选型与环境配置
    • 2.1 核心技术栈
    • 2.2 Maven 依赖配置
    • 2.3 核心配置文件
  • 三、数据库设计
    • 3.1 核心表结构
      • 3.1.1 导入任务表(import_task)
      • 3.1.2 业务数据表(business_data)
      • 3.1.3 导入日志表(可选,用于详细追踪)
    • 3.2 MyBatis-Plus 实体类
      • 3.2.1 导入任务实体
      • 3.2.2 商品信息实体
      • 3.2.3 Mapper 接口
  • 四、文件上传与任务管理
    • 4.1 分片上传核心组件
      • 4.1.1 文件分片 DTO
      • 4.1.2 文件上传服务接口
      • 4.1.3 文件上传服务实现
    • 4.2 导入任务管理
      • 4.2.1 任务相关 DTO 和 VO
      • 4.2.2 任务服务接口
      • 4.2.3 任务服务实现
    • 4.3 控制器实现
  • 五、异步处理与数据导入
    • 5.1 消息队列配置
    • 5.2 Excel 数据模型与解析
      • 5.2.1 导入数据模型
      • 5.2.2 Excel 读取监听器
    • 5.3 数据校验与去重服务
      • 5.3.1 业务服务接口
      • 5.3.2 业务服务实现
    • 5.4 消息消费者(异步处理服务)
  • 六、并发控制与性能优化
    • 6.1 分布式锁实现
    • 6.2 性能优化策略
  • 七、总结与扩展
    • 可扩展方向
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档