
在企业级应用开发中,Excel 批量导入是一个高频需求。当面对几百兆甚至更大的 Excel 文件时,传统的同步处理方式往往会导致接口超时、内存溢出,甚至整个应用崩溃。更复杂的是,还需要处理数据校验、重复数据检测、多用户并发上传等问题,最终还要生成一份详细的导入结果供用户核对。
本文将从实际业务场景出发,构建一套完整的大 Excel 文件上传处理方案,涵盖从前端分片上传到后端异步处理、数据校验、并发去重、结果生成的全流程,并提供可直接运行的代码实现。
针对上述需求和挑战,我们设计如下架构:

架构说明:
<?xml version="1.0" encoding="UTF-8"?>
<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<groupId>com.example</groupId>
<artifactId>large-excel-import</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>large-excel-import</name>
<description>Large Excel Import Solution</description>
<properties>
<java.version>17</java.version>
<easyexcel.version>3.3.0</easyexcel.version>
<mybatis-plus.version>3.5.5</mybatis-plus.version>
<springdoc.version>2.2.0</springdoc.version>
<fastjson2.version>2.0.32</fastjson2.version>
<guava.version>32.1.3-jre</guava.version>
<lombok.version>1.18.30</lombok.version>
</properties>
<dependencies>
<!-- Spring Boot Core -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<!-- Database -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- MyBatis-Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-extension</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<!-- Excel Processing -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>easyexcel</artifactId>
<version>${easyexcel.version}</version>
</dependency>
<!-- API Documentation -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<!-- Utilities -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- Testing -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
# application.yml
spring:
profiles:
active: dev
servlet:
multipart:
max-file-size: 10MB
max-request-size: 10MB
# 日志配置
logging:
level:
root: INFO
com.example: DEBUG
com.alibaba.easyexcel: WARN
pattern:
console: "%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"
# application-dev.yml
spring:
datasource:
url: jdbc:mysql://localhost:3306/excel_import?useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
redis:
host: localhost
port: 6379
password:
database: 0
timeout: 3000ms
lettuce:
pool:
max-active: 16
max-idle: 8
min-idle: 4
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
concurrency: 2
max-concurrency: 8
prefetch: 1
template:
retry:
enabled: true
initial-interval: 1000ms
max-attempts: 3
max-interval: 3000ms
# 文件存储配置
file:
storage:
path: ./uploadFiles
temp-path: ./tempFiles
result-path: ./resultFiles
max-size: 524288000 # 500MB
# MyBatis-Plus配置
mybatis-plus:
mapper-locations: classpath*:mapper/**/*.xml
type-aliases-package: com.example.excelimport.entity
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.slf4j.Slf4jImpl
global-config:
db-config:
id-type: ASSIGN_ID
logic-delete-field: deleted
logic-delete-value: 1
logic-not-delete-value: 0
# 异步任务配置
async:
executor:
core-pool-size: 4
max-pool-size: 16
queue-capacity: 100
keep-alive-seconds: 60
thread-name-prefix: ExcelImport-
# Swagger配置
springdoc:
api-docs:
path: /api-docs
swagger-ui:
path: /swagger-ui.html
operationsSorter: method
packages-to-scan: com.example.excelimport.controller
CREATETABLE `import_task` (
`id` bigintNOTNULL COMMENT '主键ID',
`task_no` varchar(64) NOTNULL COMMENT '任务编号',
`file_name` varchar(255) NOTNULL COMMENT '文件名',
`file_path` varchar(512) NOTNULL COMMENT '文件路径',
`result_file_path` varchar(512) DEFAULTNULL COMMENT '结果文件路径',
`file_size` bigintNOTNULL COMMENT '文件大小(字节)',
`total_rows` intDEFAULT0 COMMENT '总记录数',
`success_rows` intDEFAULT0 COMMENT '成功记录数',
`fail_rows` intDEFAULT0 COMMENT '失败记录数',
`status` tinyint NOTNULL COMMENT '状态:0-初始化,1-处理中,2-完成,3-失败',
`progress` intNOTNULLDEFAULT0 COMMENT '进度(%)',
`user_id` bigintNOTNULL COMMENT '操作人ID',
`user_name` varchar(64) NOTNULL COMMENT '操作人姓名',
`error_msg` varchar(1024) DEFAULTNULL COMMENT '错误信息',
`create_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_task_no` (`task_no`),
KEY `idx_user_id` (`user_id`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='导入任务表';以一个商品信息表为例,实际业务中根据需求调整:
CREATETABLE `business_data` (
`id` bigintNOTNULL COMMENT '主键ID',
`product_code` varchar(64) NOTNULL COMMENT '商品编码',
`product_name` varchar(255) NOTNULL COMMENT '商品名称',
`category_id` bigintNOTNULL COMMENT '分类ID',
`price` decimal(10,2) NOTNULL COMMENT '价格',
`stock` intNOTNULL COMMENT '库存',
`status` tinyint NOTNULLDEFAULT1 COMMENT '状态:0-禁用,1-启用',
`create_user` bigintNOTNULL COMMENT '创建人',
`create_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMP COMMENT '创建时间',
`update_user` bigintDEFAULTNULL COMMENT '更新人',
`update_time` datetime DEFAULTNULLONUPDATECURRENT_TIMESTAMP COMMENT '更新时间',
`deleted` tinyint NOTNULLDEFAULT0 COMMENT '删除标识:0-未删,1-已删',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_product_code` (`product_code`) COMMENT '商品编码唯一',
KEY `idx_category_id` (`category_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
CREATETABLE `import_log` (
`id` bigintNOTNULL COMMENT '主键ID',
`task_id` bigintNOTNULL COMMENT '任务ID',
`row_num` intNOTNULL COMMENT '行号',
`data_content` text COMMENT '数据内容',
`success` tinyint NOTNULL COMMENT '是否成功:0-失败,1-成功',
`message` varchar(1024) DEFAULTNULL COMMENT '提示信息',
`create_time` datetime NOTNULLDEFAULTCURRENT_TIMESTAMP COMMENT '创建时间',
PRIMARY KEY (`id`),
KEY `idx_task_id` (`task_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='导入日志表';
package com.example.excelimport.entity;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 导入任务实体类
*
* @author ken
*/
@Data
@TableName("import_task")
publicclassImportTask {
/**
* 主键ID
*/
@TableId(type = IdType.ASSIGN_ID)
privateLong id;
/**
* 任务编号
*/
@TableField("task_no")
private String taskNo;
/**
* 文件名
*/
@TableField("file_name")
private String fileName;
/**
* 文件路径
*/
@TableField("file_path")
private String filePath;
/**
* 结果文件路径
*/
@TableField("result_file_path")
private String resultFilePath;
/**
* 文件大小(字节)
*/
@TableField("file_size")
privateLong fileSize;
/**
* 总记录数
*/
@TableField("total_rows")
private Integer totalRows;
/**
* 成功记录数
*/
@TableField("success_rows")
private Integer successRows;
/**
* 失败记录数
*/
@TableField("fail_rows")
private Integer failRows;
/**
* 状态:0-初始化,1-处理中,2-完成,3-失败
*/
@TableField("status")
private Integer status;
/**
* 进度(%)
*/
@TableField("progress")
private Integer progress;
/**
* 操作人ID
*/
@TableField("user_id")
privateLong userId;
/**
* 操作人姓名
*/
@TableField("user_name")
private String userName;
/**
* 错误信息
*/
@TableField("error_msg")
private String errorMsg;
/**
* 创建时间
*/
@TableField(value = "create_time", fill = FieldFill.INSERT)
private LocalDateTime createTime;
/**
* 更新时间
*/
@TableField(value = "update_time", fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updateTime;
}
package com.example.excelimport.entity;
import com.baomidou.mybatisplus.annotation.*;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDateTime;
/**
* 商品信息实体类
*
* @author ken
*/
@Data
@TableName("business_data")
publicclassBusinessData {
/**
* 主键ID
*/
@TableId(type = IdType.ASSIGN_ID)
privateLong id;
/**
* 商品编码
*/
@TableField("product_code")
private String productCode;
/**
* 商品名称
*/
@TableField("product_name")
private String productName;
/**
* 分类ID
*/
@TableField("category_id")
privateLong categoryId;
/**
* 价格
*/
@TableField("price")
private BigDecimal price;
/**
* 库存
*/
@TableField("stock")
private Integer stock;
/**
* 状态:0-禁用,1-启用
*/
@TableField("status")
private Integer status;
/**
* 创建人
*/
@TableField(value = "create_user", fill = FieldFill.INSERT)
privateLong createUser;
/**
* 创建时间
*/
@TableField(value = "create_time", fill = FieldFill.INSERT)
private LocalDateTime createTime;
/**
* 更新人
*/
@TableField(value = "update_user", fill = FieldFill.UPDATE)
privateLong updateUser;
/**
* 更新时间
*/
@TableField(value = "update_time", fill = FieldFill.UPDATE)
private LocalDateTime updateTime;
/**
* 删除标识:0-未删,1-已删
*/
@TableField("deleted")
@TableLogic
private Integer deleted;
}
package com.example.excelimport.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.example.excelimport.entity.ImportTask;
import org.apache.ibatis.annotations.Mapper;
/**
* 导入任务Mapper
*
* @author ken
*/
@Mapper
publicinterfaceImportTaskMapperextendsBaseMapper<ImportTask> {
}
package com.example.excelimport.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import org.springframework.web.multipart.MultipartFile;
/**
* 文件分片上传DTO
*
* @author ken
*/
@Data
@Schema(description = "文件分片上传参数")
publicclassFileChunkDTO {
/**
* 文件名
*/
@Schema(description = "文件名", requiredMode = Schema.RequiredMode.REQUIRED)
private String fileName;
/**
* 文件唯一标识
*/
@Schema(description = "文件唯一标识", requiredMode = Schema.RequiredMode.REQUIRED)
private String fileId;
/**
* 分片索引
*/
@Schema(description = "分片索引,从0开始", requiredMode = Schema.RequiredMode.REQUIRED)
private Integer chunkIndex;
/**
* 总分片数
*/
@Schema(description = "总分片数", requiredMode = Schema.RequiredMode.REQUIRED)
private Integer totalChunks;
/**
* 分片大小(字节)
*/
@Schema(description = "分片大小(字节)", requiredMode = Schema.RequiredMode.REQUIRED)
privateLong chunkSize;
/**
* 文件总大小(字节)
*/
@Schema(description = "文件总大小(字节)", requiredMode = Schema.RequiredMode.REQUIRED)
privateLong totalSize;
/**
* 文件分片数据
*/
@Schema(description = "文件分片数据", requiredMode = Schema.RequiredMode.REQUIRED)
private MultipartFile chunkFile;
}
package com.example.excelimport.service;
import com.example.excelimport.dto.FileChunkDTO;
import com.example.excelimport.vo.FileUploadResultVO;
/**
* 文件上传服务
*
* @authorken
*/
publicinterfaceFileUploadService {
/**
* 上传文件分片
*
* @param chunkDTO 分片上传参数
* @param userId 用户ID
* @return 上传结果
*/
FileUploadResultVOuploadChunk(FileChunkDTO chunkDTO, Long userId);
/**
* 合并文件分片
*
* @param fileId 文件唯一标识
* @param fileName 文件名
* @param totalChunks 总分片数
* @param userId 用户ID
* @return 合并结果,包含完整文件路径
*/
StringmergeChunks(String fileId, String fileName, Integer totalChunks, Long userId);
/**
* 检查分片是否已上传
*
* @param fileId 文件唯一标识
* @param chunkIndex 分片索引
* @returntrue-已上传,false-未上传
*/
booleancheckChunkExists(String fileId, Integer chunkIndex);
}
package com.example.excelimport.service.impl;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.IdUtil;
import com.example.excelimport.dto.FileChunkDTO;
import com.example.excelimport.service.FileUploadService;
import com.example.excelimport.vo.FileUploadResultVO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* 文件上传服务实现
*
* @author ken
*/
@Slf4j
@Service
publicclassFileUploadServiceImplimplementsFileUploadService {
/**
* 临时文件存储路径
*/
@Value("${file.storage.temp-path}")
private String tempPath;
/**
* 正式文件存储路径
*/
@Value("${file.storage.path}")
private String storagePath;
/**
* 最大文件大小
*/
@Value("${file.storage.max-size}")
private Long maxFileSize;
/**
* 日期格式化器
*/
privatestaticfinalDateTimeFormatterDATE_FORMATTER= DateTimeFormatter.ofPattern("yyyyMMdd");
@Override
public FileUploadResultVO uploadChunk(FileChunkDTO chunkDTO, Long userId) {
// 参数校验
StringUtils.hasText(chunkDTO.getFileId(), "文件标识不能为空");
StringUtils.hasText(chunkDTO.getFileName(), "文件名不能为空");
if (chunkDTO.getChunkIndex() == null || chunkDTO.getChunkIndex() < 0) {
thrownewIllegalArgumentException("分片索引必须大于等于0");
}
if (chunkDTO.getTotalChunks() == null || chunkDTO.getTotalChunks() <= 0) {
thrownewIllegalArgumentException("总分片数必须大于0");
}
if (chunkDTO.getChunkFile() == null || chunkDTO.getChunkFile().isEmpty()) {
thrownewIllegalArgumentException("分片文件不能为空");
}
if (chunkDTO.getTotalSize() == null || chunkDTO.getTotalSize() <= 0) {
thrownewIllegalArgumentException("文件总大小必须大于0");
}
if (chunkDTO.getTotalSize() > maxFileSize) {
thrownewIllegalArgumentException("文件大小超过限制,最大支持" + maxFileSize / (1024 * 1024) + "MB");
}
// 创建临时目录
StringchunkDir= getChunkDir(chunkDTO.getFileId());
FiledirFile=newFile(chunkDir);
if (!dirFile.exists() && !dirFile.mkdirs()) {
log.error("创建分片目录失败: {}", chunkDir);
thrownewRuntimeException("上传失败,无法创建临时目录");
}
// 保存分片文件
StringchunkFileName= chunkDTO.getChunkIndex().toString();
FilechunkFile=newFile(chunkDir, chunkFileName);
try {
chunkDTO.getChunkFile().transferTo(chunkFile);
log.info("分片上传成功,fileId: {}, chunkIndex: {}, userId: {}",
chunkDTO.getFileId(), chunkDTO.getChunkIndex(), userId);
} catch (IOException e) {
log.error("分片上传失败,fileId: {}, chunkIndex: {}", chunkDTO.getFileId(), chunkDTO.getChunkIndex(), e);
// 失败时删除可能的残留文件
if (chunkFile.exists() && !chunkFile.delete()) {
log.warn("删除失败的分片文件失败: {}", chunkFile.getAbsolutePath());
}
thrownewRuntimeException("上传失败: " + e.getMessage());
}
// 检查是否所有分片都已上传
booleanallUploaded= checkAllChunksUploaded(chunkDTO.getFileId(), chunkDTO.getTotalChunks());
FileUploadResultVOresult=newFileUploadResultVO();
result.setFileId(chunkDTO.getFileId());
result.setFileName(chunkDTO.getFileName());
result.setChunkIndex(chunkDTO.getChunkIndex());
result.setTotalChunks(chunkDTO.getTotalChunks());
result.setAllUploaded(allUploaded);
result.setMessage("分片上传成功");
return result;
}
@Override
public String mergeChunks(String fileId, String fileName, Integer totalChunks, Long userId) {
StringUtils.hasText(fileId, "文件标识不能为空");
StringUtils.hasText(fileName, "文件名不能为空");
if (totalChunks == null || totalChunks <= 0) {
thrownewIllegalArgumentException("总分片数必须大于0");
}
// 验证所有分片是否已上传
if (!checkAllChunksUploaded(fileId, totalChunks)) {
thrownewRuntimeException("存在未上传的分片,无法合并");
}
try {
// 获取文件存储目录
StringdateDir= DATE_FORMATTER.format(LocalDate.now());
StringsaveDir= storagePath + File.separator + dateDir;
FilesaveDirFile=newFile(saveDir);
if (!saveDirFile.exists() && !saveDirFile.mkdirs()) {
log.error("创建文件存储目录失败: {}", saveDir);
thrownewRuntimeException("合并失败,无法创建存储目录");
}
// 生成唯一文件名,保留原扩展名
Stringext= FileUtil.extName(fileName);
StringuniqueFileName= IdUtil.fastSimpleUUID() + (StringUtils.hasText(ext) ? "." + ext : "");
StringfilePath= saveDir + File.separator + uniqueFileName;
// 合并分片
PathtargetPath= Paths.get(filePath);
StringchunkDir= getChunkDir(fileId);
// 按分片索引升序排列
List<File> chunkFiles = newArrayList<>();
for (inti=0; i < totalChunks; i++) {
FilechunkFile=newFile(chunkDir, String.valueOf(i));
if (!chunkFile.exists()) {
thrownewRuntimeException("分片文件缺失: " + i);
}
chunkFiles.add(chunkFile);
}
// 合并所有分片到目标文件
for (File chunkFile : chunkFiles) {
Files.write(targetPath, Files.readAllBytes(chunkFile.toPath()), StandardOpenOption.CREATE, StandardOpenOption.APPEND);
}
log.info("文件合并成功,fileId: {}, fileName: {}, savePath: {}, userId: {}",
fileId, fileName, filePath, userId);
// 清理临时分片文件
deleteChunkDir(fileId);
return filePath;
} catch (IOException e) {
log.error("文件合并失败,fileId: {}", fileId, e);
thrownewRuntimeException("合并失败: " + e.getMessage());
}
}
@Override
publicbooleancheckChunkExists(String fileId, Integer chunkIndex) {
StringUtils.hasText(fileId, "文件标识不能为空");
if (chunkIndex == null || chunkIndex < 0) {
thrownewIllegalArgumentException("分片索引必须大于等于0");
}
StringchunkDir= getChunkDir(fileId);
FilechunkFile=newFile(chunkDir, chunkIndex.toString());
return chunkFile.exists() && chunkFile.length() > 0;
}
/**
* 获取分片存储目录
*/
private String getChunkDir(String fileId) {
return tempPath + File.separator + "chunks" + File.separator + fileId;
}
/**
* 检查所有分片是否已上传
*/
privatebooleancheckAllChunksUploaded(String fileId, int totalChunks) {
StringchunkDir= getChunkDir(fileId);
FiledirFile=newFile(chunkDir);
if (!dirFile.exists()) {
returnfalse;
}
// 列出所有分片文件并检查数量是否匹配
File[] chunkFiles = dirFile.listFiles();
if (chunkFiles == null || chunkFiles.length != totalChunks) {
returnfalse;
}
// 检查每个分片是否存在
for (inti=0; i < totalChunks; i++) {
FilechunkFile=newFile(dirFile, String.valueOf(i));
if (!chunkFile.exists() || chunkFile.length() == 0) {
returnfalse;
}
}
returntrue;
}
/**
* 删除分片目录及文件
*/
privatevoiddeleteChunkDir(String fileId) {
StringchunkDir= getChunkDir(fileId);
FiledirFile=newFile(chunkDir);
if (dirFile.exists()) {
try {
// 递归删除目录
FileUtil.del(dirFile);
log.info("分片目录已删除: {}", chunkDir);
} catch (Exception e) {
log.warn("删除分片目录失败: {}", chunkDir, e);
}
}
}
}
package com.example.excelimport.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
/**
* 创建导入任务DTO
*
* @author ken
*/
@Data
@Schema(description = "创建导入任务参数")
publicclassCreateImportTaskDTO {
/**
* 文件唯一标识
*/
@Schema(description = "文件唯一标识", requiredMode = Schema.RequiredMode.REQUIRED)
private String fileId;
/**
* 文件名
*/
@Schema(description = "文件名", requiredMode = Schema.RequiredMode.REQUIRED)
private String fileName;
/**
* 总分片数
*/
@Schema(description = "总分片数", requiredMode = Schema.RequiredMode.REQUIRED)
private Integer totalChunks;
}
package com.example.excelimport.vo;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 导入任务VO
*
* @author ken
*/
@Data
@Schema(description = "导入任务信息")
publicclassImportTaskVO {
/**
* 任务ID
*/
@Schema(description = "任务ID")
privateLong id;
/**
* 任务编号
*/
@Schema(description = "任务编号")
private String taskNo;
/**
* 文件名
*/
@Schema(description = "文件名")
private String fileName;
/**
* 文件大小(MB)
*/
@Schema(description = "文件大小(MB)")
privateDouble fileSizeMB;
/**
* 总记录数
*/
@Schema(description = "总记录数")
private Integer totalRows;
/**
* 成功记录数
*/
@Schema(description = "成功记录数")
private Integer successRows;
/**
* 失败记录数
*/
@Schema(description = "失败记录数")
private Integer failRows;
/**
* 状态:0-初始化,1-处理中,2-完成,3-失败
*/
@Schema(description = "状态:0-初始化,1-处理中,2-完成,3-失败")
private Integer status;
/**
* 状态描述
*/
@Schema(description = "状态描述")
private String statusDesc;
/**
* 进度(%)
*/
@Schema(description = "进度(%)")
private Integer progress;
/**
* 错误信息
*/
@Schema(description = "错误信息")
private String errorMsg;
/**
* 结果文件下载地址
*/
@Schema(description = "结果文件下载地址")
private String resultFileUrl;
/**
* 创建时间
*/
@Schema(description = "创建时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime createTime;
/**
* 更新时间
*/
@Schema(description = "更新时间")
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime updateTime;
}
package com.example.excelimport.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.excelimport.dto.CreateImportTaskDTO;
import com.example.excelimport.entity.ImportTask;
import com.example.excelimport.vo.ImportTaskVO;
/**
* 导入任务服务
*
* @author ken
*/
publicinterfaceImportTaskService {
/**
* 创建导入任务
*
* @param dto 创建任务参数
* @param userId 用户ID
* @param userName 用户名
* @return 任务信息
*/
ImportTaskVO createTask(CreateImportTaskDTO dto, Long userId, String userName);
/**
* 查询任务详情
*
* @param taskId 任务ID
* @param userId 用户ID
* @return 任务详情
*/
ImportTaskVO getTaskDetail(Long taskId, Long userId);
/**
* 分页查询用户的导入任务
*
* @param page 分页参数
* @param userId 用户ID
* @param status 任务状态,null表示查询所有
* @return 任务分页列表
*/
IPage<ImportTaskVO> queryUserTasks(Page<ImportTask> page, Long userId, Integer status);
/**
* 更新任务状态
*
* @param taskId 任务ID
* @param status 新状态
* @param progress 进度(%)
* @param successRows 成功记录数
* @param failRows 失败记录数
* @param errorMsg 错误信息
* @param resultFilePath 结果文件路径
* @return 是否更新成功
*/
boolean updateTaskStatus(Long taskId, Integer status, Integer progress,
Integer successRows, Integer failRows,
String errorMsg, String resultFilePath);
/**
* 获取任务实体
*
* @param taskId 任务ID
* @return 任务实体
*/
ImportTask getTaskById(Long taskId);
}
package com.example.excelimport.service.impl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.excelimport.dto.CreateImportTaskDTO;
import com.example.excelimport.entity.ImportTask;
import com.example.excelimport.mapper.ImportTaskMapper;
import com.example.excelimport.service.ImportTaskService;
import com.example.excelimport.service.FileUploadService;
import com.example.excelimport.vo.ImportTaskVO;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.io.File;
import java.text.MessageFormat;
import java.util.Map;
import java.util.UUID;
/**
* 导入任务服务实现
*
* @author ken
*/
@Slf4j
@Service
publicclassImportTaskServiceImplextendsServiceImpl<ImportTaskMapper, ImportTask> implementsImportTaskService {
/**
* 任务状态描述映射
*/
privatestaticfinal Map<Integer, String> STATUS_DESC_MAP = Maps.newHashMap();
static {
STATUS_DESC_MAP.put(0, "初始化");
STATUS_DESC_MAP.put(1, "处理中");
STATUS_DESC_MAP.put(2, "完成");
STATUS_DESC_MAP.put(3, "失败");
}
@Autowired
private FileUploadService fileUploadService;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 结果文件访问路径前缀
*/
@Value("${server.servlet.context-path:}")
private String contextPath;
/**
* 导入任务交换机
*/
@Value("${rabbitmq.exchange.import-task:import.task.exchange}")
private String importTaskExchange;
/**
* 导入任务路由键
*/
@Value("${rabbitmq.routing-key.import-task:import.task.process}")
private String importTaskRoutingKey;
@Override
@Transactional(rollbackFor = Exception.class)
public ImportTaskVO createTask(CreateImportTaskDTO dto, Long userId, String userName) {
// 参数校验
StringUtils.hasText(dto.getFileId(), "文件标识不能为空");
StringUtils.hasText(dto.getFileName(), "文件名不能为空");
if (dto.getTotalChunks() == null || dto.getTotalChunks() <= 0) {
thrownewIllegalArgumentException("总分片数必须大于0");
}
ObjectUtils.isEmpty(userId, "用户ID不能为空");
StringUtils.hasText(userName, "用户名不能为空");
// 合并文件分片
StringfilePath= fileUploadService.mergeChunks(dto.getFileId(), dto.getFileName(), dto.getTotalChunks(), userId);
if (!StringUtils.hasText(filePath)) {
thrownewRuntimeException("文件合并失败");
}
// 获取文件大小
Filefile=newFile(filePath);
if (!file.exists() || !file.isFile()) {
thrownewRuntimeException("合并后的文件不存在");
}
longfileSize= file.length();
// 创建任务记录
ImportTasktask=newImportTask();
task.setTaskNo(generateTaskNo());
task.setFileName(dto.getFileName());
task.setFilePath(filePath);
task.setFileSize(fileSize);
task.setStatus(0); // 初始化状态
task.setProgress(0);
task.setUserId(userId);
task.setUserName(userName);
intinsert= baseMapper.insert(task);
if (insert <= 0) {
log.error("创建导入任务失败,userId: {}, fileName: {}", userId, dto.getFileName());
thrownewRuntimeException("创建导入任务失败");
}
log.info("创建导入任务成功,taskId: {}, taskNo: {}, fileName: {}, userId: {}",
task.getId(), task.getTaskNo(), dto.getFileName(), userId);
// 发送消息到队列,异步处理导入
try {
rabbitTemplate.convertAndSend(importTaskExchange, importTaskRoutingKey, task.getId());
log.info("导入任务已发送到队列,taskId: {}", task.getId());
} catch (Exception e) {
log.error("发送导入任务到队列失败,taskId: {}", task.getId(), e);
// 发送失败时,更新任务状态为失败
task.setStatus(3);
task.setErrorMsg("任务提交失败: " + e.getMessage());
baseMapper.updateById(task);
thrownewRuntimeException("创建任务成功,但提交处理失败,请稍后重试");
}
return convertToVO(task);
}
@Override
public ImportTaskVO getTaskDetail(Long taskId, Long userId) {
ObjectUtils.isEmpty(taskId, "任务ID不能为空");
ObjectUtils.isEmpty(userId, "用户ID不能为空");
ImportTasktask= baseMapper.selectOne(Wrappers.<ImportTask>lambdaQuery()
.eq(ImportTask::getId, taskId)
.eq(ImportTask::getUserId, userId));
if (task == null) {
returnnull;
}
return convertToVO(task);
}
@Override
public IPage<ImportTaskVO> queryUserTasks(Page<ImportTask> page, Long userId, Integer status) {
ObjectUtils.isEmpty(userId, "用户ID不能为空");
IPage<ImportTask> taskPage = baseMapper.selectPage(page, Wrappers.<ImportTask>lambdaQuery()
.eq(ImportTask::getUserId, userId)
.eq(status != null, ImportTask::getStatus, status)
.orderByDesc(ImportTask::getCreateTime));
IPage<ImportTaskVO> resultPage = newPage<>();
BeanUtils.copyProperties(taskPage, resultPage);
resultPage.setRecords(taskPage.getRecords().stream()
.map(this::convertToVO)
.collect(java.util.stream.Collectors.toList()));
return resultPage;
}
@Override
@Transactional(rollbackFor = Exception.class)
publicbooleanupdateTaskStatus(Long taskId, Integer status, Integer progress,
Integer successRows, Integer failRows,
String errorMsg, String resultFilePath) {
ObjectUtils.isEmpty(taskId, "任务ID不能为空");
ObjectUtils.isEmpty(status, "状态不能为空");
ImportTasktask=newImportTask();
task.setId(taskId);
task.setStatus(status);
if (progress != null) {
task.setProgress(progress);
}
if (successRows != null) {
task.setSuccessRows(successRows);
}
if (failRows != null) {
task.setFailRows(failRows);
}
if (totalRows != null) {
task.setTotalRows(totalRows);
}
task.setErrorMsg(errorMsg);
task.setResultFilePath(resultFilePath);
intupdate= baseMapper.updateById(task);
return update > 0;
}
@Override
public ImportTask getTaskById(Long taskId) {
if (taskId == null) {
returnnull;
}
return baseMapper.selectById(taskId);
}
/**
* 生成任务编号
*/
private String generateTaskNo() {
return"IMPT" + System.currentTimeMillis() + UUID.randomUUID().toString().substring(0, 8).toUpperCase();
}
/**
* 转换为VO对象
*/
private ImportTaskVO convertToVO(ImportTask task) {
ImportTaskVOvo=newImportTaskVO();
BeanUtils.copyProperties(task, vo);
// 计算文件大小(MB)
if (task.getFileSize() != null) {
vo.setFileSizeMB(task.getFileSize() / (1024.0 * 1024.0));
}
// 设置状态描述
vo.setStatusDesc(STATUS_DESC_MAP.getOrDefault(task.getStatus(), "未知状态"));
// 设置结果文件下载地址
if (StringUtils.hasText(task.getResultFilePath())) {
StringrelativePath= task.getResultFilePath().replace(File.separator, "/");
vo.setResultFileUrl(contextPath + "/api/v1/files/download?path=" + relativePath);
}
return vo;
}
}
package com.example.excelimport.controller;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.example.excelimport.dto.CreateImportTaskDTO;
import com.example.excelimport.dto.FileChunkDTO;
import com.example.excelimport.entity.ImportTask;
import com.example.excelimport.service.ImportTaskService;
import com.example.excelimport.service.FileUploadService;
import com.example.excelimport.vo.FileUploadResultVO;
import com.example.excelimport.vo.ImportTaskVO;
import com.example.excelimport.vo.ResponseVO;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletResponse;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
/**
* 文件导入控制器
*
* @author ken
*/
@Slf4j
@RestController
@RequestMapping("/api/v1/import")
@Tag(name = "文件导入接口", description = "大文件分片上传及异步导入处理")
publicclassImportController {
@Autowired
private FileUploadService fileUploadService;
@Autowired
private ImportTaskService importTaskService;
@Operation(summary = "上传文件分片", description = "将大文件拆分为分片上传")
@PostMapping("/file/chunk")
public ResponseVO<FileUploadResultVO> uploadChunk(
@Parameter(description = "用户ID", required = true)@RequestHeader("X-User-Id")Long userId,
@Parameter(description = "文件分片信息", required = true)@ModelAttribute FileChunkDTO chunkDTO) {
log.info("接收文件分片上传请求,fileId: {}, chunkIndex: {}, userId: {}",
chunkDTO.getFileId(), chunkDTO.getChunkIndex(), userId);
FileUploadResultVO result = fileUploadService.uploadChunk(chunkDTO, userId);
return ResponseVO.success(result);
}
@Operation(summary = "检查分片是否已上传", description = "用于断点续传时检查分片状态")
@GetMapping("/file/chunk/exists")
public ResponseVO<Boolean> checkChunkExists(
@Parameter(description = "用户ID", required = true)@RequestHeader("X-User-Id")Long userId,
@Parameter(description = "文件唯一标识", required = true)@RequestParam String fileId,
@Parameter(description = "分片索引", required = true)@RequestParam Integer chunkIndex) {
log.info("检查分片是否已上传,fileId: {}, chunkIndex: {}, userId: {}", fileId, chunkIndex, userId);
boolean exists = fileUploadService.checkChunkExists(fileId, chunkIndex);
return ResponseVO.success(exists);
}
@Operation(summary = "创建导入任务", description = "合并文件分片并创建导入任务")
@PostMapping("/task")
public ResponseVO<ImportTaskVO> createImportTask(
@Parameter(description = "用户ID", required = true)@RequestHeader("X-User-Id")Long userId,
@Parameter(description = "用户名", required = true)@RequestHeader("X-User-Name") String userName,
@Parameter(description = "创建任务参数", required = true)@RequestBody CreateImportTaskDTO dto) {
log.info("创建导入任务,fileId: {}, fileName: {}, userId: {}", dto.getFileId(), dto.getFileName(), userId);
ImportTaskVO taskVO = importTaskService.createTask(dto, userId, userName);
return ResponseVO.success(taskVO);
}
@Operation(summary = "查询任务详情", description = "获取导入任务的详细信息和进度")
@GetMapping("/task/{taskId}")
public ResponseVO<ImportTaskVO> getTaskDetail(
@Parameter(description = "用户ID", required = true)@RequestHeader("X-User-Id")Long userId,
@Parameter(description = "任务ID", required = true)@PathVariableLong taskId) {
log.info("查询任务详情,taskId: {}, userId: {}", taskId, userId);
ImportTaskVO taskVO = importTaskService.getTaskDetail(taskId, userId);
return ResponseVO.success(taskVO);
}
@Operation(summary = "查询用户导入任务列表", description = "分页查询当前用户的导入任务")
@GetMapping("/tasks")
public ResponseVO<IPage<ImportTaskVO>> queryUserTasks(
@Parameter(description = "用户ID", required = true)@RequestHeader("X-User-Id")Long userId,
@Parameter(description = "页码,从1开始")@RequestParam(defaultValue = "1") Integer pageNum,
@Parameter(description = "每页条数")@RequestParam(defaultValue = "10") Integer pageSize,
@Parameter(description = "任务状态:0-初始化,1-处理中,2-完成,3-失败")@RequestParam(required = false) Integer status) {
log.info("查询用户导入任务列表,userId: {}, pageNum: {}, pageSize: {}, status: {}",
userId, pageNum, pageSize, status);
Page<ImportTask> page = new Page<>(pageNum, pageSize);
IPage<ImportTaskVO> taskPage = importTaskService.queryUserTasks(page, userId, status);
return ResponseVO.success(taskPage);
}
@Operation(summary = "下载导入结果文件", description = "下载包含导入结果的Excel文件")
@GetMapping("/file/result")
public void downloadResultFile(
@Parameter(description = "用户ID", required = true)@RequestHeader("X-User-Id")Long userId,
@Parameter(description = "任务ID", required = true)@RequestParamLong taskId,
HttpServletResponse response) throws IOException {
log.info("下载导入结果文件,taskId: {}, userId: {}", taskId, userId);
// 获取任务信息
ImportTask task = importTaskService.getTaskById(taskId);
if (task == null) {
response.sendError(HttpServletResponse.SC_NOT_FOUND, "任务不存在");
return;
}
// 验证权限
if (!task.getUserId().equals(userId)) {
response.sendError(HttpServletResponse.SC_FORBIDDEN, "没有权限下载该文件");
return;
}
// 检查结果文件是否存在
String resultFilePath = task.getResultFilePath();
if (!org.springframework.util.StringUtils.hasText(resultFilePath)) {
response.sendError(HttpServletResponse.SC_NOT_FOUND, "结果文件不存在");
return;
}
File file = new File(resultFilePath);
if (!file.exists() || !file.isFile()) {
response.sendError(HttpServletResponse.SC_NOT_FOUND, "结果文件不存在");
return;
}
// 设置响应头
response.setContentType("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet");
String fileName = "导入结果_" + task.getFileName();
String encodedFileName = URLEncoder.encode(fileName, StandardCharsets.UTF_8.name());
response.setHeader("Content-Disposition", "attachment; filename*=UTF-8''" + encodedFileName);
response.setContentLengthLong(file.length());
// 写入文件内容
try (FileInputStream fis = new FileInputStream(file);
OutputStream os = response.getOutputStream()) {
byte[] buffer = new byte[1024 * 1024]; // 1MB缓冲区
int len;
while ((len = fis.read(buffer)) != -1) {
os.write(buffer, 0, len);
}
os.flush();
} catch (IOException e) {
log.error("下载结果文件失败,taskId: {}", taskId, e);
throw e;
}
}
}
package com.example.excelimport.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* RabbitMQ配置
*
* @authorken
*/
@Configuration
publicclassRabbitMQConfig {
/**
* 导入任务交换机
*/
@Value("${rabbitmq.exchange.import-task:import.task.exchange}")
privateString importTaskExchange;
/**
* 导入任务队列
*/
@Value("${rabbitmq.queue.import-task:import.task.queue}")
privateString importTaskQueue;
/**
* 导入任务路由键
*/
@Value("${rabbitmq.routing-key.import-task:import.task.process}")
privateString importTaskRoutingKey;
/**
* 死信交换机
*/
@Value("${rabbitmq.exchange.dlq:import.dlq.exchange}")
privateString dlqExchange;
/**
* 死信队列
*/
@Value("${rabbitmq.queue.dlq:import.dlq.queue}")
privateString dlqQueue;
/**
* 死信路由键
*/
@Value("${rabbitmq.routing-key.dlq:import.dlq.routing}")
privateString dlqRoutingKey;
/**
* 死信交换机
*/
@Bean
publicDirectExchangedlqExchange() {
returnnewDirectExchange(dlqExchange, true, false);
}
/**
* 死信队列
*/
@Bean
publicQueuedlqQueue() {
returnQueueBuilder.durable(dlqQueue)
.build();
}
/**
* 死信队列绑定
*/
@Bean
publicBindingdlqBinding() {
returnBindingBuilder.bind(dlqQueue())
.to(dlqExchange())
.with(dlqRoutingKey);
}
/**
* 导入任务交换机
*/
@Bean
publicDirectExchangeimportTaskExchange() {
returnnewDirectExchange(importTaskExchange, true, false);
}
/**
* 导入任务队列
* 设置死信队列,用于处理失败的任务
*/
@Bean
publicQueueimportTaskQueue() {
Map<String, Object> arguments = newHashMap<>(3);
// 绑定死信交换机
arguments.put("x-dead-letter-exchange", dlqExchange);
// 绑定死信路由键
arguments.put("x-dead-letter-routing-key", dlqRoutingKey);
// 消息过期时间 30分钟
arguments.put("x-message-ttl", 30 * 60 * 1000);
returnQueueBuilder.durable(importTaskQueue)
.withArguments(arguments)
.build();
}
/**
* 导入任务队列绑定
*/
@Bean
publicBindingimportTaskBinding() {
returnBindingBuilder.bind(importTaskQueue())
.to(importTaskExchange())
.with(importTaskRoutingKey);
}
}
package com.example.excelimport.excel;
import com.alibaba.excel.annotation.ExcelProperty;
import com.alibaba.excel.annotation.write.style.ColumnWidth;
import com.alibaba.excel.annotation.write.style.ContentRowHeight;
import com.alibaba.excel.annotation.write.style.HeadRowHeight;
import lombok.Data;
import java.math.BigDecimal;
/**
* 商品导入Excel模型
*
* @author ken
*/
@Data
@HeadRowHeight(20)
@ContentRowHeight(18)
@ColumnWidth(20)
publicclassProductImportModel {
@ExcelProperty(value = "商品编码", index = 0)
private String productCode;
@ExcelProperty(value = "商品名称", index = 1)
private String productName;
@ExcelProperty(value = "分类ID", index = 2)
privateLong categoryId;
@ExcelProperty(value = "价格", index = 3)
private BigDecimal price;
@ExcelProperty(value = "库存", index = 4)
private Integer stock;
@ExcelProperty(value = "状态(0-禁用 1-启用)", index = 5)
private Integer status;
/**
* 导入结果状态:成功/失败
*/
@ExcelProperty(value = "导入结果", index = 6)
private String importStatus;
/**
* 导入失败原因
*/
@ExcelProperty(value = "失败原因", index = 7)
private String errorMsg;
}
package com.example.excelimport.excel;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.read.listener.ReadListener;
import com.alibaba.excel.util.ListUtils;
import com.example.excelimport.entity.BusinessData;
import com.example.excelimport.service.BusinessDataService;
import com.example.excelimport.service.ImportTaskService;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 商品导入Excel监听器
*
* @author ken
*/
@Slf4j
publicclassProductImportListenerimplementsReadListener<ProductImportModel> {
/**
* 批处理阈值
*/
privatestaticfinalintBATCH_COUNT=1000;
/**
* 缓存的数据列表
*/
private List<ProductImportModel> cachedDataList = ListUtils.newArrayListWithExpectedSize(BATCH_COUNT);
/**
* 所有数据列表(用于生成结果文件)
*/
private List<ProductImportModel> allDataList = Lists.newArrayList();
/**
* 当前行号(从1开始,包含表头)
*/
privateAtomicIntegerrowNum=newAtomicInteger(0);
/**
* 任务ID
*/
privatefinal Long taskId;
/**
* 处理用户ID
*/
privatefinal Long userId;
/**
* 业务数据服务
*/
privatefinal BusinessDataService businessDataService;
/**
* 导入任务服务
*/
privatefinal ImportTaskService importTaskService;
/**
* 构造函数
*/
publicProductImportListener(Long taskId, Long userId,
BusinessDataService businessDataService,
ImportTaskService importTaskService) {
this.taskId = taskId;
this.userId = userId;
this.businessDataService = businessDataService;
this.importTaskService = importTaskService;
}
/**
* 每解析一行数据都会调用此方法
*/
@Override
publicvoidinvoke(ProductImportModel data, AnalysisContext context) {
// 行号递增(表头行也会被计数,所以需要过滤)
intcurrentRowNum= rowNum.incrementAndGet();
// 跳过表头行
if (currentRowNum == 1) {
return;
}
// 记录原始行号(展示给用户时从1开始)
data.setRowNum(currentRowNum - 1);
// 添加到缓存列表
cachedDataList.add(data);
allDataList.add(data);
// 达到批处理阈值时进行处理
if (cachedDataList.size() >= BATCH_COUNT) {
processBatchData();
// 清空缓存
cachedDataList = ListUtils.newArrayListWithExpectedSize(BATCH_COUNT);
// 更新进度
updateTaskProgress();
}
}
/**
* 解析完成后调用此方法
*/
@Override
publicvoiddoAfterAllAnalysed(AnalysisContext context) {
// 处理剩余数据
if (!CollectionUtils.isEmpty(cachedDataList)) {
processBatchData();
}
log.info("Excel解析完成,taskId: {}, 总记录数: {}", taskId, allDataList.size());
// 更新总记录数
importTaskService.updateTaskStatus(taskId, null, null, null, null, null, null, allDataList.size());
}
/**
* 处理批量数据
*/
privatevoidprocessBatchData() {
log.info("开始处理批量数据,taskId: {}, 数量: {}", taskId, cachedDataList.size());
try {
// 1. 数据校验
businessDataService.validateData(cachedDataList);
// 2. 检查重复数据
businessDataService.checkDuplicateData(cachedDataList);
// 3. 过滤有效数据
List<BusinessData> validDataList = businessDataService.filterValidData(cachedDataList, userId);
// 4. 批量导入数据
if (!CollectionUtils.isEmpty(validDataList)) {
businessDataService.batchImportData(validDataList);
}
} catch (Exception e) {
log.error("处理批量数据失败,taskId: {}", taskId, e);
// 标记该批次数据为失败
cachedDataList.forEach(data -> {
if (data.getImportStatus() == null) {
data.setImportStatus("失败");
data.setErrorMsg("系统处理错误: " + e.getMessage());
}
});
}
}
/**
* 更新任务进度
*/
privatevoidupdateTaskProgress() {
try {
// 计算总记录数(预估)
inttotalCount= allDataList.size() + (cachedDataList.size() > 0 ? BATCH_COUNT : 0);
if (totalCount == 0) {
return;
}
// 计算进度百分比
intprogress= Math.min(90, (allDataList.size() * 100) / totalCount);
importTaskService.updateTaskStatus(taskId, 1, progress, null, null, null, null, null);
} catch (Exception e) {
log.error("更新任务进度失败,taskId: {}", taskId, e);
}
}
/**
* 获取所有数据列表
*/
public List<ProductImportModel> getAllDataList() {
return allDataList;
}
}
package com.example.excelimport.service;
import com.example.excelimport.entity.BusinessData;
import com.example.excelimport.excel.ProductImportModel;
import java.util.List;
/**
* 业务数据服务
*
* @author ken
*/
publicinterfaceBusinessDataService {
/**
* 数据校验
*
* @param dataList 导入数据列表
*/
voidvalidateData(List<ProductImportModel> dataList);
/**
* 检查重复数据(包括列表内部重复和与数据库重复)
*
* @param dataList 导入数据列表
*/
voidcheckDuplicateData(List<ProductImportModel> dataList);
/**
* 过滤有效数据
*
* @param dataList 导入数据列表
* @param userId 用户ID
* @return 有效业务数据列表
*/
List<BusinessData> filterValidData(List<ProductImportModel> dataList, Long userId);
/**
* 批量导入数据
*
* @param dataList 业务数据列表
* @return 导入成功数量
*/
intbatchImportData(List<BusinessData> dataList);
/**
* 根据编码列表查询数据
*
* @param codeList 编码列表
* @return 业务数据列表
*/
List<BusinessData> getByCodes(List<String> codeList);
}
package com.example.excelimport.service.impl;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.example.excelimport.entity.BusinessData;
import com.example.excelimport.excel.ProductImportModel;
import com.example.excelimport.mapper.BusinessDataMapper;
import com.example.excelimport.service.BusinessDataService;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* 业务数据服务实现
*
* @author ken
*/
@Slf4j
@Service
publicclassBusinessDataServiceImplimplementsBusinessDataService {
/**
* Redis分布式锁前缀
*/
privatestaticfinalStringLOCK_PREFIX="import:lock:product:";
/**
* 分布式锁过期时间(秒)
*/
privatestaticfinalintLOCK_EXPIRE_SECONDS=60;
@Autowired
private BusinessDataMapper businessDataMapper;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
publicvoidvalidateData(List<ProductImportModel> dataList) {
if (CollectionUtils.isEmpty(dataList)) {
return;
}
for (ProductImportModel data : dataList) {
// 重置之前的校验结果
data.setImportStatus(null);
data.setErrorMsg(null);
List<String> errors = Lists.newArrayList();
// 商品编码校验
if (!StringUtils.hasText(data.getProductCode())) {
errors.add("商品编码不能为空");
} elseif (data.getProductCode().length() > 64) {
errors.add("商品编码长度不能超过64个字符");
}
// 商品名称校验
if (!StringUtils.hasText(data.getProductName())) {
errors.add("商品名称不能为空");
} elseif (data.getProductName().length() > 255) {
errors.add("商品名称长度不能超过255个字符");
}
// 分类ID校验
if (data.getCategoryId() == null || data.getCategoryId() <= 0) {
errors.add("分类ID必须大于0");
}
// 价格校验
if (data.getPrice() == null) {
errors.add("价格不能为空");
} elseif (data.getPrice().compareTo(BigDecimal.ZERO) <= 0) {
errors.add("价格必须大于0");
} elseif (data.getPrice().scale() > 2) {
errors.add("价格最多保留两位小数");
}
// 库存校验
if (data.getStock() == null) {
errors.add("库存不能为空");
} elseif (data.getStock() < 0) {
errors.add("库存不能为负数");
}
// 状态校验
if (data.getStatus() == null) {
errors.add("状态不能为空");
} elseif (data.getStatus() != 0 && data.getStatus() != 1) {
errors.add("状态必须为0(禁用)或1(启用)");
}
// 设置校验结果
if (!errors.isEmpty()) {
data.setImportStatus("失败");
data.setErrorMsg(String.join(";", errors));
}
}
}
@Override
publicvoidcheckDuplicateData(List<ProductImportModel> dataList) {
if (CollectionUtils.isEmpty(dataList)) {
return;
}
// 1. 过滤已校验失败的数据
List<ProductImportModel> validDataList = dataList.stream()
.filter(data -> data.getImportStatus() == null)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(validDataList)) {
return;
}
// 2. 检查列表内部重复
Multimap<String, ProductImportModel> codeMap = HashMultimap.create();
for (ProductImportModel data : validDataList) {
codeMap.put(data.getProductCode(), data);
}
// 标记重复数据
for (Map.Entry<String, ProductImportModel> entry : codeMap.entries()) {
Stringcode= entry.getKey();
ProductImportModeldata= entry.getValue();
// 如果编码对应的记录数大于1,则表示有重复
if (codeMap.get(code).size() > 1) {
data.setImportStatus("失败");
data.setErrorMsg("商品编码在导入文件中重复");
}
}
// 3. 过滤掉内部重复的数据,检查数据库中是否已存在
List<ProductImportModel> uniqueDataList = validDataList.stream()
.filter(data -> data.getImportStatus() == null)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(uniqueDataList)) {
return;
}
// 4. 批量查询数据库中已存在的编码
List<String> codeList = uniqueDataList.stream()
.map(ProductImportModel::getProductCode)
.collect(Collectors.toList());
List<BusinessData> existDataList = businessDataMapper.selectByCodes(codeList);
if (!CollectionUtils.isEmpty(existDataList)) {
Set<String> existCodeSet = existDataList.stream()
.map(BusinessData::getProductCode)
.collect(Collectors.toSet());
// 标记数据库中已存在的数据
for (ProductImportModel data : uniqueDataList) {
if (existCodeSet.contains(data.getProductCode())) {
data.setImportStatus("失败");
data.setErrorMsg("商品编码在系统中已存在");
}
}
}
}
@Override
public List<BusinessData> filterValidData(List<ProductImportModel> dataList, Long userId) {
if (CollectionUtils.isEmpty(dataList)) {
return Lists.newArrayList();
}
// 过滤出校验通过的数据
List<ProductImportModel> validDataList = dataList.stream()
.filter(data -> data.getImportStatus() == null)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(validDataList)) {
return Lists.newArrayList();
}
// 转换为业务实体
List<BusinessData> businessDataList = Lists.newArrayListWithExpectedSize(validDataList.size());
for (ProductImportModel data : validDataList) {
BusinessDatabusinessData=newBusinessData();
BeanUtils.copyProperties(data, businessData);
businessData.setCreateUser(userId);
businessData.setCreateTime(LocalDateTime.now());
businessDataList.add(businessData);
}
return businessDataList;
}
@Override
@Transactional(rollbackFor = Exception.class)
publicintbatchImportData(List<BusinessData> dataList) {
if (CollectionUtils.isEmpty(dataList)) {
return0;
}
log.info("开始批量导入数据,数量: {}", dataList.size());
// 1. 并发控制:使用分布式锁确保数据唯一性
List<BusinessData> finalDataList = Lists.newArrayList();
for (BusinessData data : dataList) {
StringlockKey= LOCK_PREFIX + data.getProductCode();
Booleanlocked= redisTemplate.opsForValue().setIfAbsent(lockKey, "1", LOCK_EXPIRE_SECONDS, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(locked)) {
try {
// 再次检查数据库,确保没有并发插入
BusinessDataexistData= businessDataMapper.selectOne(Wrappers.<BusinessData>lambdaQuery()
.eq(BusinessData::getProductCode, data.getProductCode())
.eq(BusinessData::getDeleted, 0));
if (existData == null) {
finalDataList.add(data);
}
} finally {
// 释放锁
redisTemplate.delete(lockKey);
}
} else {
log.warn("获取分布式锁失败,可能有并发操作,productCode: {}", data.getProductCode());
}
}
if (CollectionUtils.isEmpty(finalDataList)) {
log.info("没有可导入的数据(可能被并发操作拦截)");
return0;
}
// 2. 批量插入数据
intinsertCount= businessDataMapper.batchInsert(finalDataList);
log.info("批量导入完成,计划导入: {}, 实际导入: {}", finalDataList.size(), insertCount);
return insertCount;
}
@Override
public List<BusinessData> getByCodes(List<String> codeList) {
if (CollectionUtils.isEmpty(codeList)) {
return Lists.newArrayList();
}
return businessDataMapper.selectByCodes(codeList);
}
}
package com.example.excelimport.consumer;
import com.alibaba.excel.EasyExcel;
import com.example.excelimport.entity.ImportTask;
import com.example.excelimport.excel.ProductImportModel;
import com.example.excelimport.excel.ProductImportListener;
import com.example.excelimport.service.BusinessDataService;
import com.example.excelimport.service.ImportTaskService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.io.File;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.UUID;
/**
* 导入任务消费者
*
* @author ken
*/
@Slf4j
@Component
publicclassImportTaskConsumer {
@Autowired
private ImportTaskService importTaskService;
@Autowired
private BusinessDataService businessDataService;
/**
* 结果文件存储路径
*/
@Value("${file.storage.result-path}")
private String resultPath;
/**
* 日期格式化器
*/
privatestaticfinalDateTimeFormatterDATE_FORMATTER= DateTimeFormatter.ofPattern("yyyyMMdd");
/**
* 处理导入任务
*/
@RabbitListener(queues = "${rabbitmq.queue.import-task:import.task.queue}")
publicvoidprocessImportTask(Long taskId) {
log.info("开始处理导入任务,taskId: {}", taskId);
if (taskId == null) {
log.error("导入任务ID为空,跳过处理");
return;
}
ImportTasktask=null;
try {
// 1. 获取任务信息
task = importTaskService.getTaskById(taskId);
if (task == null) {
log.error("导入任务不存在,taskId: {}", taskId);
return;
}
log.info("开始处理文件导入,taskId: {}, fileName: {}, filePath: {}",
taskId, task.getFileName(), task.getFilePath());
// 2. 更新任务状态为处理中
importTaskService.updateTaskStatus(taskId, 1, 5, 0, 0, null, null, 0);
// 3. 验证文件是否存在
StringfilePath= task.getFilePath();
if (!StringUtils.hasText(filePath)) {
thrownewRuntimeException("文件路径为空");
}
Filefile=newFile(filePath);
if (!file.exists() || !file.isFile()) {
thrownewRuntimeException("文件不存在或不是有效文件: " + filePath);
}
// 4. 创建监听器并读取Excel
ProductImportListenerlistener=newProductImportListener(
taskId, task.getUserId(), businessDataService, importTaskService);
EasyExcel.read(file, ProductImportModel.class, listener)
.sheet()
.doRead();
// 5. 获取所有数据并统计结果
List<ProductImportModel> allDataList = listener.getAllDataList();
inttotalCount= allDataList.size();
intsuccessCount= (int) allDataList.stream()
.filter(data -> "成功".equals(data.getImportStatus()))
.count();
intfailCount= totalCount - successCount;
// 6. 生成结果文件
StringresultFilePath= generateResultFile(task, allDataList);
// 7. 更新任务状态为完成
importTaskService.updateTaskStatus(
taskId, 2, 100, successCount, failCount, null, resultFilePath, totalCount);
log.info("导入任务处理完成,taskId: {}, 总记录数: {}, 成功: {}, 失败: {}",
taskId, totalCount, successCount, failCount);
} catch (Exception e) {
log.error("导入任务处理失败,taskId: {}", taskId, e);
// 更新任务状态为失败
StringerrorMsg= e.getMessage();
if (errorMsg != null && errorMsg.length() > 1000) {
errorMsg = errorMsg.substring(0, 1000) + "...";
}
if (task != null) {
importTaskService.updateTaskStatus(taskId, 3, null, null, null, errorMsg, null, null);
}
}
}
/**
* 处理死信队列中的失败任务
*/
@RabbitListener(queues = "${rabbitmq.queue.dlq:import.dlq.queue}")
publicvoidprocessFailedTask(Long taskId) {
log.warn("接收到死信队列中的失败任务,taskId: {}", taskId);
// 这里可以做一些告警或者重试处理
// 例如:记录到失败任务表,通知管理员处理
ImportTasktask= importTaskService.getTaskById(taskId);
if (task != null) {
log.warn("失败任务详情:taskNo: {}, fileName: {}, status: {}",
task.getTaskNo(), task.getFileName(), task.getStatus());
// TODO: 发送告警通知
}
}
/**
* 生成结果文件
*/
private String generateResultFile(ImportTask task, List<ProductImportModel> dataList) {
try {
// 创建结果文件存储目录
StringdateDir= DATE_FORMATTER.format(LocalDate.now());
StringsaveDir= resultPath + File.separator + dateDir;
FilesaveDirFile=newFile(saveDir);
if (!saveDirFile.exists() && !saveDirFile.mkdirs()) {
log.error("创建结果文件目录失败: {}", saveDir);
thrownewRuntimeException("生成结果文件失败,无法创建存储目录");
}
// 生成结果文件名
StringoriginalFileName= task.getFileName();
Stringext= originalFileName.contains(".") ? originalFileName.substring(originalFileName.lastIndexOf(".")) : ".xlsx";
StringresultFileName="result_" + System.currentTimeMillis() + "_" + UUID.randomUUID().toString().substring(0, 8) + ext;
StringresultFilePath= saveDir + File.separator + resultFileName;
// 写入结果文件
EasyExcel.write(resultFilePath, ProductImportModel.class)
.sheet("导入结果")
.doWrite(dataList);
log.info("结果文件生成成功,taskId: {}, filePath: {}", task.getId(), resultFilePath);
return resultFilePath;
} catch (Exception e) {
log.error("生成结果文件失败,taskId: {}", task.getId(), e);
thrownewRuntimeException("生成结果文件失败: " + e.getMessage());
}
}
}
在多用户并发上传的场景下,需要确保数据的唯一性,避免重复导入。我们使用 Redis 实现分布式锁来控制并发访问:
package com.example.excelimport.util;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* Redis分布式锁实现
*
* @author ken
*/
@Slf4j
@Component
publicclassRedisDistributedLockimplementsLock {
privatefinal RedisTemplate<String, Object> redisTemplate;
/**
* 锁的默认过期时间(毫秒)
*/
privatestaticfinallongDEFAULT_EXPIRE_MILLIS=30000;
/**
* 锁的键
*/
privatefinal String lockKey;
/**
* 锁的持有者标识
*/
privatefinal String lockValue;
/**
* 锁的过期时间(毫秒)
*/
privatefinallong expireMillis;
/**
* 构造函数
*/
publicRedisDistributedLock(RedisTemplate<String, Object> redisTemplate, String lockKey) {
this(redisTemplate, lockKey, UUID.randomUUID().toString(), DEFAULT_EXPIRE_MILLIS);
}
/**
* 构造函数
*/
publicRedisDistributedLock(RedisTemplate<String, Object> redisTemplate, String lockKey, String lockValue, long expireMillis) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey;
this.lockValue = lockValue;
this.expireMillis = expireMillis;
}
@Override
publicvoidlock() {
// 循环获取锁,直到成功
while (!tryLock()) {
// 短暂休眠,避免过度消耗CPU
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
@Override
publicvoidlockInterruptibly()throws InterruptedException {
if (Thread.interrupted()) {
thrownewInterruptedException();
}
// 循环获取锁,直到成功或被中断
while (!tryLock()) {
Thread.sleep(50);
if (Thread.interrupted()) {
thrownewInterruptedException();
}
}
}
@Override
publicbooleantryLock() {
return tryLock(expireMillis, TimeUnit.MILLISECONDS);
}
@Override
publicbooleantryLock(long time, TimeUnit unit) {
longmillisToWait= unit.toMillis(time);
longstart= System.currentTimeMillis();
// 循环获取锁,直到成功或超时
while (true) {
// 尝试获取锁
Booleansuccess= redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, expireMillis, TimeUnit.MILLISECONDS);
if (Boolean.TRUE.equals(success)) {
log.debug("获取分布式锁成功,lockKey: {}, lockValue: {}", lockKey, lockValue);
returntrue;
}
// 检查是否超时
if (System.currentTimeMillis() - start > millisToWait) {
log.debug("获取分布式锁超时,lockKey: {}, lockValue: {}", lockKey, lockValue);
returnfalse;
}
// 短暂休眠,避免过度消耗CPU
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
returnfalse;
}
}
}
@Override
publicvoidunlock() {
// 使用Lua脚本保证删除操作的原子性
Stringscript="if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
script = script.replace("ARGV[1]", "'" + lockValue + "'");
Longresult= (Long) redisTemplate.execute(newDefaultRedisScript<>(script, Long.class), Lists.newArrayList(lockKey));
if (result != null && result > 0) {
log.debug("释放分布式锁成功,lockKey: {}, lockValue: {}", lockKey, lockValue);
} else {
log.warn("释放分布式锁失败,可能锁已过期或被其他线程持有,lockKey: {}, lockValue: {}", lockKey, lockValue);
}
}
@Override
public Condition newCondition() {
thrownewUnsupportedOperationException("RedisDistributedLock不支持Condition");
}
/**
* 获取锁的键
*/
public String getLockKey() {
return lockKey;
}
/**
* 获取锁的持有者标识
*/
public String getLockValue() {
return lockValue;
}
}
package com.example.excelimport.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 异步任务配置
*
* @author ken
*/
@Configuration
@EnableAsync
publicclassAsyncConfig {
/**
* 核心线程数
*/
@Value("${async.executor.core-pool-size:4}")
private int corePoolSize;
/**
* 最大线程数
*/
@Value("${async.executor.max-pool-size:16}")
private int maxPoolSize;
/**
* 队列容量
*/
@Value("${async.executor.queue-capacity:100}")
private int queueCapacity;
/**
* 线程存活时间(秒)
*/
@Value("${async.executor.keep-alive-seconds:60}")
private int keepAliveSeconds;
/**
* 线程名称前缀
*/
@Value("${async.executor.thread-name-prefix:ExcelImport-}")
private String threadNamePrefix;
/**
* 异步任务执行器
*/
@Bean(name = "asyncExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心线程数
executor.setCorePoolSize(corePoolSize);
// 最大线程数
executor.setMaxPoolSize(maxPoolSize);
// 队列容量
executor.setQueueCapacity(queueCapacity);
// 线程存活时间
executor.setKeepAliveSeconds(keepAliveSeconds);
// 线程名称前缀
executor.setThreadNamePrefix(threadNamePrefix);
// 拒绝策略:当线程池和队列都满了,由提交任务的线程执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 初始化
executor.initialize();
return executor;
}
}
本文详细介绍了一个大 Excel 文件异步上传和导入的完整解决方案,从前端分片上传到后端异步处理,再到数据校验、并发控制和结果生成,涵盖了整个流程的关键技术点。
该方案的核心优势在于:
通过本文提供的方案和代码实现,开发者可以快速构建一个稳定、高效的大 Excel 导入功能,满足企业级应用的需求。同时,也可以根据实际业务场景进行定制和扩展,进一步提升系统的性能和可靠性。