
Redis 作为一款高性能的键值存储数据库,凭借其丰富的数据类型和出色的性能,已成为现代分布式系统中不可或缺的组件。本文将深入剖析 Redis 的 7 种常用数据类型,结合 21 个真实业务场景,从底层原理到实战代码,全方位展示如何最大化发挥 Redis 的威力。无论是缓存设计、计数器实现,还是分布式锁、消息队列,你都能在这里找到经过验证的最佳实践。
在 Redis 的世界里,"数据类型" 远不止是存储格式的差异,更是解决问题的不同范式。与传统关系型数据库相比,Redis 的每个数据类型都对应着特定的应用场景和优化策略。

选择合适的数据类型不仅能显著提升性能,还能简化业务逻辑。例如,实现一个排行榜功能,使用有序集合只需几行代码,而使用字符串则可能需要大量的额外逻辑和性能开销。
本文将按照 "数据类型→底层结构→核心命令→实战场景→代码实现" 的脉络,系统讲解 Redis 的实战应用。所有示例均基于 Redis 7.0 + 版本,并提供可直接运行的 Java 代码。
字符串是 Redis 最基础也最常用的数据类型,它能存储任何形式的字符串,包括二进制数据。在 Redis 内部,字符串以简单动态字符串 (SDS) 的形式存储,这使得它比 C 语言的字符串更高效且安全。
Redis 的字符串并非 C 语言中的字符数组,而是自定义的 SDS 结构,其优势包括:
预览

命令 | 功能 | 时间复杂度 |
|---|---|---|
SET key value | 设置键值对 | O(1) |
GET key | 获取值 | O(1) |
INCR key | 自增 1 | O(1) |
DECR key | 自减 1 | O(1) |
INCRBY key increment | 增加指定值 | O(1) |
DECRBY key decrement | 减少指定值 | O(1) |
SETEX key seconds value | 设置带过期时间的键值对 | O(1) |
SETNX key value | 仅当键不存在时设置 | O(1) |
MSET key1 value1 key2 value2... | 批量设置 | O(N) |
MGET key1 key2... | 批量获取 | O(N) |
缓存是 Redis 最经典的应用场景。对于频繁访问且不常变化的数据,如商品详情、用户信息等,将其缓存到 Redis 可以显著减轻数据库压力,提升系统响应速度。
实现思路:
Java 代码实现:
import com.alibaba.fastjson2.JSON;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 商品缓存服务
*
* @author ken
*/
@Slf4j
@Service
public class ProductCacheService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private ProductMapper productMapper;
/**
* 缓存过期时间:30分钟
*/
private static final long CACHE_EXPIRE_MINUTES = 30;
/**
* 商品缓存键前缀
*/
private static final String CACHE_KEY_PREFIX = "product:";
/**
* 获取商品详情
*
* @param productId 商品ID
* @return 商品信息
*/
public ProductDTO getProductDetail(Long productId) {
StringUtils.hasText(productId.toString(), "商品ID不能为空");
// 1. 构建缓存键
String cacheKey = CACHE_KEY_PREFIX + productId;
// 2. 从Redis获取数据
String productJson = stringRedisTemplate.opsForValue().get(cacheKey);
// 3. 缓存命中
if (StringUtils.hasText(productJson)) {
log.info("商品缓存命中,productId:{}", productId);
return JSON.parseObject(productJson, ProductDTO.class);
}
// 4. 缓存未命中,查询数据库
log.info("商品缓存未命中,查询数据库,productId:{}", productId);
ProductDO productDO = productMapper.selectById(productId);
if (productDO == null) {
// 缓存空对象,避免缓存穿透
stringRedisTemplate.opsForValue().set(
cacheKey,
JSON.toJSONString(null),
5,
TimeUnit.MINUTES
);
log.info("商品不存在,缓存空对象,productId:{}", productId);
return null;
}
// 5. 转换为DTO
ProductDTO productDTO = convertToDTO(productDO);
// 6. 存入Redis并设置过期时间
stringRedisTemplate.opsForValue().set(
cacheKey,
JSON.toJSONString(productDTO),
CACHE_EXPIRE_MINUTES,
TimeUnit.MINUTES
);
log.info("商品数据存入缓存,productId:{}", productId);
return productDTO;
}
/**
* 更新商品信息,并更新缓存
*
* @param productDTO 商品信息
* @return 是否更新成功
*/
public boolean updateProduct(ProductDTO productDTO) {
StringUtils.hasText(productDTO.getId().toString(), "商品ID不能为空");
// 1. 更新数据库
ProductDO productDO = convertToDO(productDTO);
int rows = productMapper.updateById(productDO);
if (rows <= 0) {
log.info("更新商品失败,productId:{}", productDTO.getId());
return false;
}
// 2. 更新缓存(先删除再更新,避免脏写)
String cacheKey = CACHE_KEY_PREFIX + productDTO.getId();
stringRedisTemplate.delete(cacheKey);
stringRedisTemplate.opsForValue().set(
cacheKey,
JSON.toJSONString(productDTO),
CACHE_EXPIRE_MINUTES,
TimeUnit.MINUTES
);
log.info("更新商品成功,并更新缓存,productId:{}", productDTO.getId());
return true;
}
/**
* DO转DTO
*/
private ProductDTO convertToDTO(ProductDO productDO) {
// 转换逻辑
return new ProductDTO();
}
/**
* DTO转DO
*/
private ProductDO convertToDO(ProductDTO productDTO) {
// 转换逻辑
return new ProductDO();
}
}
缓存设计要点:
在分布式系统中,实现高效、准确的计数功能并非易事。Redis 的 INCR 命令提供了原子性的自增操作,非常适合实现分布式计数器。
常见应用:
Java 代码实现:
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;
/**
* 分布式计数器服务
*
* @author ken
*/
@Slf4j
@Service
public class DistributedCounterService {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 阅读量计数器键前缀
*/
private static final String VIEW_COUNT_PREFIX = "view:count:";
/**
* 接口限流计数器键前缀
*/
private static final String RATE_LIMIT_PREFIX = "rate:limit:";
/**
* 增加文章阅读量
*
* @param articleId 文章ID
* @return 增加后的阅读量
*/
public Long incrementArticleViewCount(Long articleId) {
StringUtils.hasText(articleId.toString(), "文章ID不能为空");
String key = VIEW_COUNT_PREFIX + articleId;
// INCR命令是原子操作,确保并发安全
Long count = stringRedisTemplate.opsForValue().increment(key);
// 第一次设置时,给一个较长的过期时间,如30天
if (count != null && count == 1) {
stringRedisTemplate.expire(key, 30, TimeUnit.DAYS);
}
log.info("文章阅读量增加,articleId:{}, 最新阅读量:{}", articleId, count);
return count;
}
/**
* 获取文章阅读量
*
* @param articleId 文章ID
* @return 阅读量
*/
public Long getArticleViewCount(Long articleId) {
StringUtils.hasText(articleId.toString(), "文章ID不能为空");
String key = VIEW_COUNT_PREFIX + articleId;
String countStr = stringRedisTemplate.opsForValue().get(key);
// 不存在时返回0
return StringUtils.hasText(countStr) ? Long.parseLong(countStr) : 0;
}
/**
* 检查接口是否被限流
*
* @param userId 用户ID
* @param apiName 接口名称
* @param maxRequests 单位时间内最大请求数
* @param periodSeconds 时间周期(秒)
* @return true:被限流 false:未被限流
*/
public boolean isApiLimited(Long userId, String apiName, int maxRequests, int periodSeconds) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
StringUtils.hasText(apiName, "接口名称不能为空");
String key = RATE_LIMIT_PREFIX + apiName + ":" + userId;
// 原子性自增
Long count = stringRedisTemplate.opsForValue().increment(key);
// 第一次设置过期时间
if (count != null && count == 1) {
stringRedisTemplate.expire(key, periodSeconds, TimeUnit.SECONDS);
}
boolean isLimited = count != null && count > maxRequests;
log.info("接口限流检查,userId:{}, apiName:{}, 已请求次数:{}, 是否限流:{}",
userId, apiName, count, isLimited);
return isLimited;
}
}
分布式计数器优势:
在分布式系统中,多个进程需要共享资源时,分布式锁是保证数据一致性的重要手段。Redis 的 SETNX 命令可以实现简单而高效的分布式锁。
实现原理:

Java 代码实现:
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
* 分布式锁工具类
*
* @author ken
*/
@Slf4j
@Component
public class RedisDistributedLock {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 锁的默认过期时间:30秒
*/
private static final long DEFAULT_LOCK_EXPIRE = 30;
/**
* 锁的前缀
*/
private static final String LOCK_PREFIX = "lock:";
/**
* Lua脚本:释放锁
*/
private static final String UNLOCK_LUA_SCRIPT =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setScriptText(UNLOCK_LUA_SCRIPT);
UNLOCK_SCRIPT.setResultType(Long.class);
}
/**
* 获取分布式锁
*
* @param lockKey 锁的键
* @param expireSeconds 锁的过期时间(秒)
* @param timeoutMillis 获取锁的超时时间(毫秒)
* @return 锁的标识,释放锁时需要传入;null表示获取失败
*/
public String tryLock(String lockKey, long expireSeconds, long timeoutMillis) {
StringUtils.hasText(lockKey, "锁的键不能为空");
// 生成唯一标识,用于释放锁时验证
String lockValue = UUID.randomUUID().toString();
String key = LOCK_PREFIX + lockKey;
// 开始时间
long startTime = System.currentTimeMillis();
while (true) {
// 尝试获取锁:SETNX命令
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(
key,
lockValue,
expireSeconds,
TimeUnit.SECONDS
);
// 获取成功
if (Boolean.TRUE.equals(success)) {
log.info("获取分布式锁成功,lockKey:{}", lockKey);
return lockValue;
}
// 检查是否超时
long elapsedTime = System.currentTimeMillis() - startTime;
if (elapsedTime >= timeoutMillis) {
log.info("获取分布式锁超时,lockKey:{}", lockKey);
return null;
}
// 等待一段时间后重试,避免频繁尝试
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
log.error("获取锁时线程被中断", e);
Thread.currentThread().interrupt();
return null;
}
}
}
/**
* 获取分布式锁,使用默认过期时间
*
* @param lockKey 锁的键
* @param timeoutMillis 获取锁的超时时间(毫秒)
* @return 锁的标识
*/
public String tryLock(String lockKey, long timeoutMillis) {
return tryLock(lockKey, DEFAULT_LOCK_EXPIRE, timeoutMillis);
}
/**
* 释放分布式锁
*
* @param lockKey 锁的键
* @param lockValue 锁的标识
* @return 是否释放成功
*/
public boolean unlock(String lockKey, String lockValue) {
StringUtils.hasText(lockKey, "锁的键不能为空");
StringUtils.hasText(lockValue, "锁的标识不能为空");
String key = LOCK_PREFIX + lockKey;
// 使用Lua脚本执行原子操作
Long result = stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(key),
lockValue
);
boolean success = result != null && result > 0;
if (success) {
log.info("释放分布式锁成功,lockKey:{}", lockKey);
} else {
log.warn("释放分布式锁失败,可能锁已过期或被其他线程持有,lockKey:{}", lockKey);
}
return success;
}
}
使用示例:
/**
* 订单服务,使用分布式锁确保库存操作的原子性
*/
@Slf4j
@Service
public class OrderService {
@Resource
private RedisDistributedLock distributedLock;
@Resource
private InventoryMapper inventoryMapper;
/**
* 创建订单,扣减库存
*/
public OrderDTO createOrder(Long productId, Integer quantity, Long userId) {
StringUtils.hasText(productId.toString(), "商品ID不能为空");
StringUtils.hasText(quantity.toString(), "数量不能为空");
StringUtils.hasText(userId.toString(), "用户ID不能为空");
// 构建锁的键
String lockKey = "product:inventory:" + productId;
// 尝试获取锁,最多等待1秒
String lockValue = distributedLock.tryLock(lockKey, 1000);
if (lockValue == null) {
throw new BusinessException("系统繁忙,请稍后再试");
}
try {
// 1. 检查库存
InventoryDO inventory = inventoryMapper.selectByProductId(productId);
if (inventory == null || inventory.getStock() < quantity) {
throw new BusinessException("商品库存不足");
}
// 2. 扣减库存
int rows = inventoryMapper.decreaseStock(productId, quantity);
if (rows <= 0) {
throw new BusinessException("扣减库存失败");
}
// 3. 创建订单
OrderDO order = new OrderDO();
order.setProductId(productId);
order.setQuantity(quantity);
order.setUserId(userId);
order.setStatus(OrderStatus.PENDING);
orderMapper.insert(order);
// 4. 转换为DTO并返回
return convertToDTO(order);
} finally {
// 释放锁
distributedLock.unlock(lockKey, lockValue);
}
}
}
分布式锁注意事项:
哈希类型适用于存储对象类型的数据,它可以将一个对象的多个字段存储在一个键中,既节省空间又便于操作。
Redis 的哈希类型在存储数据量较小时使用压缩列表 (ziplist),当数据量超过阈值时自动转换为哈希表 (hashtable):
转换阈值可通过配置修改:

命令 | 功能 | 时间复杂度 |
|---|---|---|
HSET key field value | 设置哈希字段值 | O(1) |
HGET key field | 获取哈希字段值 | O(1) |
HMSET key field1 value1 field2 value2... | 批量设置哈希字段 | O(N) |
HMGET key field1 field2... | 批量获取哈希字段 | O(N) |
HGETALL key | 获取所有字段和值 | O(N) |
HDEL key field1 field2... | 删除字段 | O(N) |
HEXISTS key field | 判断字段是否存在 | O(1) |
HKEYS key | 获取所有字段 | O(N) |
HVALS key | 获取所有值 | O(N) |
HLEN key | 获取字段数量 | O(1) |
HINCRBY key field increment | 字段值自增 | O(1) |
用户信息包含多个字段(姓名、年龄、邮箱等),使用哈希类型可以将这些字段组织在一个键下,便于整体管理和部分更新。
Java 代码实现:
import com.alibaba.fastjson2.JSON;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 用户信息服务
*
* @author ken
*/
@Slf4j
@Service
public class UserService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private UserMapper userMapper;
/**
* 用户信息缓存键前缀
*/
private static final String USER_INFO_PREFIX = "user:info:";
/**
* 缓存过期时间:24小时
*/
private static final long CACHE_EXPIRE_HOURS = 24;
/**
* 获取用户信息
*
* @param userId 用户ID
* @return 用户信息
*/
public UserDTO getUserById(Long userId) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
String key = USER_INFO_PREFIX + userId;
HashOperations<String, String, String> hashOps = stringRedisTemplate.opsForHash();
// 1. 尝试从Redis获取
Map<String, String> userFields = hashOps.entries(key);
if (!CollectionUtils.isEmpty(userFields)) {
log.info("从缓存获取用户信息,userId:{}", userId);
return mapToUserDTO(userFields);
}
// 2. 缓存未命中,查询数据库
log.info("缓存未命中,查询数据库获取用户信息,userId:{}", userId);
UserDO userDO = userMapper.selectById(userId);
if (userDO == null) {
log.info("用户不存在,userId:{}", userId);
return null;
}
// 3. 转换为DTO
UserDTO userDTO = convertToDTO(userDO);
// 4. 存入Redis
Map<String, String> fieldMap = userDTOToMap(userDTO);
hashOps.putAll(key, fieldMap);
stringRedisTemplate.expire(key, CACHE_EXPIRE_HOURS, TimeUnit.HOURS);
log.info("用户信息存入缓存,userId:{}", userId);
return userDTO;
}
/**
* 更新用户部分信息
*
* @param userId 用户ID
* @param fieldMap 要更新的字段
* @return 是否更新成功
*/
public boolean updateUserFields(Long userId, Map<String, Object> fieldMap) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
if (CollectionUtils.isEmpty(fieldMap)) {
log.warn("更新用户信息时字段为空,userId:{}", userId);
return false;
}
// 1. 更新数据库
UserDO userDO = new UserDO();
userDO.setId(userId);
// 这里使用MyBatis-Plus的更新方法,只更新非空字段
int rows = userMapper.update(userDO,
Wrappers.<UserDO>lambdaUpdate()
.eq(UserDO::getId, userId)
.set(fieldMap.containsKey("nickname"), UserDO::getNickname, fieldMap.get("nickname"))
.set(fieldMap.containsKey("avatar"), UserDO::getAvatar, fieldMap.get("avatar"))
.set(fieldMap.containsKey("phone"), UserDO::getPhone, fieldMap.get("phone")));
if (rows <= 0) {
log.info("更新用户信息失败,userId:{}", userId);
return false;
}
// 2. 更新缓存(只更新变化的字段)
String key = USER_INFO_PREFIX + userId;
HashOperations<String, String, String> hashOps = stringRedisTemplate.opsForHash();
Map<String, String> redisFieldMap = Maps.newHashMap();
fieldMap.forEach((k, v) -> {
if (v != null) {
redisFieldMap.put(k, v.toString());
}
});
hashOps.putAll(key, redisFieldMap);
// 重置过期时间
stringRedisTemplate.expire(key, CACHE_EXPIRE_HOURS, TimeUnit.HOURS);
log.info("更新用户信息成功,并更新缓存,userId:{}", userId);
return true;
}
/**
* 获取用户的某个字段值
*
* @param userId 用户ID
* @param field 字段名
* @return 字段值
*/
public String getUserField(Long userId, String field) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
StringUtils.hasText(field, "字段名不能为空");
String key = USER_INFO_PREFIX + userId;
HashOperations<String, String, String> hashOps = stringRedisTemplate.opsForHash();
String value = hashOps.get(key, field);
// 缓存中没有,查询数据库并更新缓存
if (!StringUtils.hasText(value)) {
log.info("用户字段缓存未命中,查询数据库,userId:{}, field:{}", userId, field);
UserDO userDO = userMapper.selectById(userId);
if (userDO != null) {
UserDTO userDTO = convertToDTO(userDO);
Map<String, String> fieldMap = userDTOToMap(userDTO);
hashOps.putAll(key, fieldMap);
stringRedisTemplate.expire(key, CACHE_EXPIRE_HOURS, TimeUnit.HOURS);
value = fieldMap.get(field);
}
}
return value;
}
/**
* UserDTO转换为Map
*/
private Map<String, String> userDTOToMap(UserDTO userDTO) {
Map<String, String> map = Maps.newHashMap();
map.put("id", userDTO.getId().toString());
map.put("username", userDTO.getUsername());
map.put("nickname", userDTO.getNickname());
map.put("avatar", userDTO.getAvatar());
map.put("phone", userDTO.getPhone());
map.put("email", userDTO.getEmail());
map.put("status", userDTO.getStatus().toString());
map.put("createTime", userDTO.getCreateTime().toString());
return map;
}
/**
* Map转换为UserDTO
*/
private UserDTO mapToUserDTO(Map<String, String> map) {
UserDTO userDTO = new UserDTO();
userDTO.setId(Long.valueOf(map.get("id")));
userDTO.setUsername(map.get("username"));
userDTO.setNickname(map.get("nickname"));
userDTO.setAvatar(map.get("avatar"));
userDTO.setPhone(map.get("phone"));
userDTO.setEmail(map.get("email"));
userDTO.setStatus(Integer.valueOf(map.get("status")));
userDTO.setCreateTime(parseDateTime(map.get("createTime")));
return userDTO;
}
// 其他辅助方法...
}
使用哈希存储用户信息的优势:
购物车需要存储用户购买的商品 ID、数量等信息,并且需要支持添加、删除、修改数量等操作,哈希类型非常适合这种场景。
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/**
* 购物车服务
*
* @author ken
*/
@Slf4j
@Service
public class ShoppingCartService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private ProductMapper productMapper;
/**
* 购物车键前缀
*/
private static final String CART_PREFIX = "cart:";
/**
* 购物车过期时间:7天
*/
private static final long CART_EXPIRE_DAYS = 7;
/**
* 添加商品到购物车
*
* @param userId 用户ID
* @param productId 商品ID
* @param quantity 数量
* @return 添加后的总数量
*/
public Long addToCart(Long userId, Long productId, Integer quantity) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
StringUtils.hasText(productId.toString(), "商品ID不能为空");
if (quantity <= 0) {
throw new BusinessException("商品数量必须大于0");
}
// 检查商品是否存在
ProductDO product = productMapper.selectById(productId);
if (product == null) {
throw new BusinessException("商品不存在");
}
String key = CART_PREFIX + userId;
HashOperations<String, String, String> hashOps = stringRedisTemplate.opsForHash();
// 使用HINCRBY命令原子性增加数量
Long newQuantity = hashOps.increment(key, productId.toString(), quantity);
// 设置过期时间
stringRedisTemplate.expire(key, CART_EXPIRE_DAYS, TimeUnit.DAYS);
log.info("商品添加到购物车,userId:{}, productId:{}, 数量:{}", userId, productId, quantity);
return newQuantity;
}
/**
* 从购物车移除商品
*
* @param userId 用户ID
* @param productIds 商品ID列表
* @return 移除的商品数量
*/
public Long removeFromCart(Long userId, List<Long> productIds) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
if (CollectionUtils.isEmpty(productIds)) {
log.warn("移除购物车商品时,商品ID列表为空,userId:{}", userId);
return 0L;
}
String key = CART_PREFIX + userId;
HashOperations<String, String, String> hashOps = stringRedisTemplate.opsForHash();
// 转换为字符串列表
List<String> productIdStrs = Lists.newArrayList();
productIds.forEach(id -> productIdStrs.add(id.toString()));
// 删除多个字段
Long removedCount = hashOps.delete(key, productIdStrs.toArray());
log.info("从购物车移除商品,userId:{}, 商品数量:{}", userId, removedCount);
return removedCount;
}
/**
* 更新购物车商品数量
*
* @param userId 用户ID
* @param productId 商品ID
* @param quantity 新的数量
*/
public void updateCartItemQuantity(Long userId, Long productId, Integer quantity) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
StringUtils.hasText(productId.toString(), "商品ID不能为空");
if (quantity <= 0) {
// 数量为0时,直接移除
removeFromCart(userId, Lists.newArrayList(productId));
return;
}
String key = CART_PREFIX + userId;
HashOperations<String, String, String> hashOps = stringRedisTemplate.opsForHash();
// 设置新的数量
hashOps.put(key, productId.toString(), quantity.toString());
stringRedisTemplate.expire(key, CART_EXPIRE_DAYS, TimeUnit.DAYS);
log.info("更新购物车商品数量,userId:{}, productId:{}, 新数量:{}", userId, productId, quantity);
}
/**
* 获取用户购物车所有商品
*
* @param userId 用户ID
* @return 购物车商品列表
*/
public List<CartItemDTO> getUserCart(Long userId) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
String key = CART_PREFIX + userId;
HashOperations<String, String, String> hashOps = stringRedisTemplate.opsForHash();
// 获取所有字段和值
Map<String, String> cartItems = hashOps.entries(key);
if (CollectionUtils.isEmpty(cartItems)) {
log.info("用户购物车为空,userId:{}", userId);
return Lists.newArrayList();
}
// 转换为DTO列表
List<CartItemDTO> result = Lists.newArrayList();
cartItems.forEach((productIdStr, quantityStr) -> {
Long productId = Long.valueOf(productIdStr);
Integer quantity = Integer.valueOf(quantityStr);
// 查询商品详情
ProductDO product = productMapper.selectById(productId);
if (product != null) {
CartItemDTO item = new CartItemDTO();
item.setProductId(productId);
item.setProductName(product.getName());
item.setPrice(product.getPrice());
item.setQuantity(quantity);
item.setTotalPrice(product.getPrice().multiply(BigDecimal.valueOf(quantity)));
result.add(item);
} else {
// 商品不存在,从购物车中移除
hashOps.delete(key, productIdStr);
log.warn("购物车中商品不存在,已自动移除,productId:{}", productId);
}
});
log.info("获取用户购物车,userId:{}, 商品数量:{}", userId, result.size());
return result;
}
/**
* 获取购物车商品总数
*
* @param userId 用户ID
* @return 商品总数
*/
public Integer getCartItemCount(Long userId) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
String key = CART_PREFIX + userId;
HashOperations<String, String, String> hashOps = stringRedisTemplate.opsForHash();
// 获取所有值(数量)
List<String> quantities = hashOps.values(key);
if (CollectionUtils.isEmpty(quantities)) {
return 0;
}
// 计算总和
int total = 0;
for (String qtyStr : quantities) {
total += Integer.parseInt(qtyStr);
}
log.info("获取用户购物车商品总数,userId:{}, 总数:{}", userId, total);
return total;
}
/**
* 清空购物车
*
* @param userId 用户ID
* @return 是否清空成功
*/
public boolean clearCart(Long userId) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
String key = CART_PREFIX + userId;
Boolean deleted = stringRedisTemplate.delete(key);
log.info("清空用户购物车,userId:{}, 结果:{}", userId, deleted);
return Boolean.TRUE.equals(deleted);
}
}
哈希类型实现购物车的优势:
Redis 的列表是有序的字符串集合,支持在两端进行插入和删除操作,底层采用双向链表或压缩列表实现。
Redis 列表的底层实现有两种:
转换条件由以下配置决定:

命令 | 功能 | 时间复杂度 |
|---|---|---|
LPUSH key value1 value2... | 从左侧插入元素 | O(N) |
RPUSH key value1 value2... | 从右侧插入元素 | O(N) |
LPOP key | 从左侧弹出元素 | O(1) |
RPOP key | 从右侧弹出元素 | O(1) |
LRANGE key start stop | 获取指定范围元素 | O(S+N) |
LLEN key | 获取列表长度 | O(1) |
LREM key count value | 删除指定值的元素 | O(N) |
LSET key index value | 设置指定索引的元素值 | O(N) |
LTRIM key start stop | 保留指定范围元素,删除其他 | O(N) |
BLPOP key1 key2... timeout | 阻塞式左侧弹出 | O(1) |
BRPOP key1 key2... timeout | 阻塞式右侧弹出 | O(1) |
Redis 列表可以作为简单的消息队列使用,LPUSH 用于生产消息,BRPOP 用于消费消息,实现简单的生产者 - 消费者模型。
实现思路:

Java 代码实现:
import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* 基于Redis List的消息队列
*
* @author ken
*/
@Slf4j
@Component
public class RedisMessageQueue {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 消息队列键前缀
*/
private static final String QUEUE_PREFIX = "queue:";
/**
* 死信队列键前缀
*/
private static final String DEAD_LETTER_QUEUE_PREFIX = "queue:dead:";
/**
* 阻塞超时时间(秒)
*/
private static final int BLOCK_TIMEOUT = 30;
/**
* 发送消息
*
* @param queueName 队列名称
* @param message 消息内容
* @return 发送成功后的队列长度
*/
public Long sendMessage(String queueName, Object message) {
StringUtils.hasText(queueName, "队列名称不能为空");
if (message == null) {
throw new IllegalArgumentException("消息内容不能为空");
}
String key = QUEUE_PREFIX + queueName;
String messageJson = JSON.toJSONString(message);
// 从左侧插入消息
Long size = stringRedisTemplate.opsForList().leftPush(key, messageJson);
log.info("发送消息到队列,queueName:{}, 消息内容:{}, 队列长度:{}",
queueName, messageJson, size);
return size;
}
/**
* 接收消息(非阻塞)
*
* @param queueName 队列名称
* @param clazz 消息类型
* @return 消息对象,null表示没有消息
*/
public <T> T receiveMessage(String queueName, Class<T> clazz) {
StringUtils.hasText(queueName, "队列名称不能为空");
String key = QUEUE_PREFIX + queueName;
// 从右侧弹出消息
String messageJson = stringRedisTemplate.opsForList().rightPop(key);
if (!StringUtils.hasText(messageJson)) {
log.debug("队列中没有消息,queueName:{}", queueName);
return null;
}
log.info("从队列接收消息,queueName:{}, 消息内容:{}", queueName, messageJson);
return JSON.parseObject(messageJson, clazz);
}
/**
* 接收消息(阻塞)
*
* @param queueName 队列名称
* @param clazz 消息类型
* @return 消息对象
*/
public <T> T blockingReceiveMessage(String queueName, Class<T> clazz) {
return blockingReceiveMessage(queueName, clazz, BLOCK_TIMEOUT);
}
/**
* 接收消息(阻塞,指定超时时间)
*
* @param queueName 队列名称
* @param clazz 消息类型
* @param timeout 超时时间(秒)
* @return 消息对象,超时返回null
*/
public <T> T blockingReceiveMessage(String queueName, Class<T> clazz, int timeout) {
StringUtils.hasText(queueName, "队列名称不能为空");
if (timeout <= 0) {
throw new IllegalArgumentException("超时时间必须大于0");
}
String key = QUEUE_PREFIX + queueName;
// 阻塞式从右侧弹出消息
List<String> result = stringRedisTemplate.opsForList().rightPop(key, timeout, TimeUnit.SECONDS);
if (result == null || result.isEmpty()) {
log.debug("队列接收消息超时,queueName:{}, timeout:{}s", queueName, timeout);
return null;
}
String messageJson = result.get(0);
log.info("从队列阻塞接收消息,queueName:{}, 消息内容:{}", queueName, messageJson);
return JSON.parseObject(messageJson, clazz);
}
/**
* 将消息放入死信队列
*
* @param queueName 原队列名称
* @param message 消息内容
* @param reason 失败原因
*/
public void sendToDeadLetterQueue(String queueName, Object message, String reason) {
StringUtils.hasText(queueName, "队列名称不能为空");
if (message == null) {
throw new IllegalArgumentException("消息内容不能为空");
}
String key = DEAD_LETTER_QUEUE_PREFIX + queueName;
// 构建死信消息
DeadLetterMessage deadLetter = new DeadLetterMessage();
deadLetter.setMessage(message);
deadLetter.setReason(reason);
deadLetter.setTimestamp(System.currentTimeMillis());
String messageJson = JSON.toJSONString(deadLetter);
// 放入死信队列
stringRedisTemplate.opsForList().leftPush(key, messageJson);
log.warn("消息放入死信队列,queueName:{}, reason:{}, 消息内容:{}",
queueName, reason, messageJson);
}
/**
* 获取队列长度
*
* @param queueName 队列名称
* @return 队列长度
*/
public Long getQueueSize(String queueName) {
StringUtils.hasText(queueName, "队列名称不能为空");
String key = QUEUE_PREFIX + queueName;
return stringRedisTemplate.opsForList().size(key);
}
/**
* 死信消息封装类
*/
public static class DeadLetterMessage {
private Object message;
private String reason;
private long timestamp;
// getter和setter省略
}
}
消费者示例:
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* 订单消息消费者
*
* @author ken
*/
@Slf4j
@Component
public class OrderMessageConsumer implements CommandLineRunner {
@Resource
private RedisMessageQueue messageQueue;
@Resource
private OrderService orderService;
/**
* 订单消息队列名称
*/
private static final String ORDER_QUEUE_NAME = "order";
/**
* 最大重试次数
*/
private static final int MAX_RETRY_COUNT = 3;
@Override
public void run(String... args) throws Exception {
// 启动消费者线程
new Thread(this::startConsuming, "order-message-consumer").start();
log.info("订单消息消费者启动成功");
}
/**
* 开始消费消息
*/
private void startConsuming() {
while (!Thread.currentThread().isInterrupted()) {
try {
// 阻塞式接收消息
OrderMessage message = messageQueue.blockingReceiveMessage(ORDER_QUEUE_NAME, OrderMessage.class);
if (message != null) {
processMessage(message);
}
} catch (Exception e) {
log.error("消费消息发生异常", e);
try {
// 发生异常时,短暂休眠避免CPU空转
Thread.sleep(1000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
log.info("订单消息消费者已停止");
}
/**
* 处理消息
*/
private void processMessage(OrderMessage message) {
int retryCount = 0;
boolean success = false;
while (retryCount < MAX_RETRY_COUNT && !success) {
try {
// 根据消息类型处理
if (message.getType() == OrderMessageType.PAYMENT_SUCCESS) {
orderService.handlePaymentSuccess(message.getOrderId());
} else if (message.getType() == OrderMessageType.ORDER_CANCELLED) {
orderService.handleOrderCancelled(message.getOrderId());
}
success = true;
log.info("处理订单消息成功,orderId:{}, messageType:{}",
message.getOrderId(), message.getType());
} catch (Exception e) {
retryCount++;
log.error("处理订单消息失败,将进行第{}次重试,orderId:{}, messageType:{}",
retryCount, message.getOrderId(), message.getType(), e);
if (retryCount >= MAX_RETRY_COUNT) {
// 达到最大重试次数,放入死信队列
messageQueue.sendToDeadLetterQueue(ORDER_QUEUE_NAME, message,
"达到最大重试次数:" + MAX_RETRY_COUNT + ", 异常信息:" + e.getMessage());
} else {
// 重试前休眠一段时间,指数退避策略
try {
Thread.sleep(1000 * (1 << retryCount)); // 1s, 2s, 4s...
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
}
Redis 列表作为消息队列的优缺点:
在社交应用中,展示用户的最新动态是常见需求。使用 Redis 列表可以高效地实现这一功能,限制列表长度只保留最新的 N 条记录。
Java 代码实现:
import com.alibaba.fastjson2.JSON;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.stream.Collectors;
/**
* 用户动态服务
*
* @author ken
*/
@Slf4j
@Service
public class UserFeedService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private UserFeedMapper userFeedMapper;
/**
* 用户动态列表键前缀
*/
private static final String USER_FEED_PREFIX = "user:feed:";
/**
* 最多保留的动态数量
*/
private static final int MAX_FEED_COUNT = 100;
/**
* 发布用户动态
*
* @param userId 用户ID
* @param feed 动态内容
* @return 发布的动态ID
*/
public Long publishFeed(Long userId, UserFeedDTO feed) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
StringUtils.hasText(feed.getContent(), "动态内容不能为空");
// 1. 保存到数据库
UserFeedDO feedDO = convertToDO(feed);
feedDO.setUserId(userId);
feedDO.setCreateTime(System.currentTimeMillis());
userFeedMapper.insert(feedDO);
Long feedId = feedDO.getId();
feed.setId(feedId);
feed.setCreateTime(feedDO.getCreateTime());
// 2. 同步到Redis列表
String key = USER_FEED_PREFIX + userId;
String feedJson = JSON.toJSONString(feed);
// 从左侧插入,保证最新的在最前面
stringRedisTemplate.opsForList().leftPush(key, feedJson);
// 只保留最新的MAX_FEED_COUNT条
stringRedisTemplate.opsForList().trim(key, 0, MAX_FEED_COUNT - 1);
log.info("用户发布动态,userId:{}, feedId:{}", userId, feedId);
return feedId;
}
/**
* 获取用户最新动态
*
* @param userId 用户ID
* @param count 获取数量
* @return 动态列表
*/
public List<UserFeedDTO> getUserFeeds(Long userId, int count) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
if (count <= 0) {
return Lists.newArrayList();
}
// 限制最大获取数量
count = Math.min(count, MAX_FEED_COUNT);
String key = USER_FEED_PREFIX + userId;
// 1. 尝试从Redis获取
List<String> feedJsons = stringRedisTemplate.opsForList().range(key, 0, count - 1);
if (!CollectionUtils.isEmpty(feedJsons)) {
log.info("从Redis获取用户动态,userId:{}, 数量:{}", userId, feedJsons.size());
return feedJsons.stream()
.map(json -> JSON.parseObject(json, UserFeedDTO.class))
.collect(Collectors.toList());
}
// 2. Redis中没有,从数据库获取并同步到Redis
log.info("Redis中没有用户动态,从数据库获取,userId:{}", userId);
List<UserFeedDO> feedDOs = userFeedMapper.selectByUserId(
userId, 0, Math.min(count, MAX_FEED_COUNT));
if (CollectionUtils.isEmpty(feedDOs)) {
log.info("用户没有动态,userId:{}", userId);
return Lists.newArrayList();
}
// 转换为DTO
List<UserFeedDTO> feedDTOs = feedDOs.stream()
.map(this::convertToDTO)
.collect(Collectors.toList());
// 同步到Redis,注意顺序:最新的在前
List<String> jsonList = feedDTOs.stream()
.map(JSON::toJSONString)
.collect(Collectors.toList());
stringRedisTemplate.opsForList().leftPushAll(key, jsonList);
stringRedisTemplate.opsForList().trim(key, 0, MAX_FEED_COUNT - 1);
log.info("用户动态同步到Redis,userId:{}, 数量:{}", userId, feedDTOs.size());
return feedDTOs;
}
/**
* 删除用户动态
*
* @param userId 用户ID
* @param feedId 动态ID
* @return 是否删除成功
*/
public boolean deleteFeed(Long userId, Long feedId) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
StringUtils.hasText(feedId.toString(), "动态ID不能为空");
// 1. 从数据库删除
int rows = userFeedMapper.deleteByIdAndUserId(feedId, userId);
if (rows <= 0) {
log.info("删除用户动态失败,动态不存在或不属于该用户,userId:{}, feedId:{}", userId, feedId);
return false;
}
// 2. 从Redis删除
String key = USER_FEED_PREFIX + userId;
// 先获取所有动态,找到要删除的那条
List<String> feedJsons = stringRedisTemplate.opsForList().range(key, 0, -1);
if (!CollectionUtils.isEmpty(feedJsons)) {
for (String json : feedJsons) {
UserFeedDTO feed = JSON.parseObject(json, UserFeedDTO.class);
if (feedId.equals(feed.getId())) {
// 删除该元素
stringRedisTemplate.opsForList().remove(key, 1, json);
log.info("从Redis删除用户动态,userId:{}, feedId:{}", userId, feedId);
break;
}
}
}
log.info("删除用户动态成功,userId:{}, feedId:{}", userId, feedId);
return true;
}
/**
* DO转DTO
*/
private UserFeedDTO convertToDTO(UserFeedDO feedDO) {
UserFeedDTO dto = new UserFeedDTO();
dto.setId(feedDO.getId());
dto.setUserId(feedDO.getUserId());
dto.setContent(feedDO.getContent());
dto.setImages(feedDO.getImages());
dto.setCreateTime(feedDO.getCreateTime());
return dto;
}
/**
* DTO转DO
*/
private UserFeedDO convertToDO(UserFeedDTO feedDTO) {
UserFeedDO doObj = new UserFeedDO();
doObj.setId(feedDTO.getId());
doObj.setUserId(feedDTO.getUserId());
doObj.setContent(feedDTO.getContent());
doObj.setImages(feedDTO.getImages());
doObj.setCreateTime(feedDTO.getCreateTime());
return doObj;
}
}
使用列表存储最新动态的优势:
Redis 集合是无序的字符串集合,每个元素都是唯一的,适合存储需要去重的数据。
Redis 集合的底层实现有两种:
转换条件:

命令 | 功能 | 时间复杂度 |
|---|---|---|
SADD key member1 member2... | 添加元素 | O(N) |
SREM key member1 member2... | 删除元素 | O(N) |
SMEMBERS key | 获取所有元素 | O(N) |
SISMEMBER key member | 判断元素是否存在 | O(1) |
SCARD key | 获取元素数量 | O(1) |
SPOP key count | 随机弹出元素 | O(count) |
SRANDMEMBER key count | 随机获取元素 | O(count) |
SINTER key1 key2... | 求交集 | O (N),N 是所有集合中元素最少的集合的大小 |
SUNION key1 key2... | 求并集 | O (N),N 是所有集合的元素总数 |
SDIFF key1 key2... | 求差集 | O (N),N 是第一个集合的元素数量 |
SINTERSTORE destination key1 key2... | 求交集并存储到新集合 | O(N) |
SUNIONSTORE destination key1 key2... | 求并集并存储到新集合 | O(N) |
SDIFFSTORE destination key1 key2... | 求差集并存储到新集合 | O(N) |
为用户添加标签(如兴趣爱好、消费习惯等),使用集合可以方便地进行添加、删除和查询,还可以通过集合运算找到具有共同标签的用户。
Java 代码实现:
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 用户标签服务
*
* @author ken
*/
@Slf4j
@Service
public class UserTagService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private UserTagRelationMapper userTagRelationMapper;
/**
* 用户标签集合键前缀
*/
private static final String USER_TAGS_PREFIX = "user:tags:";
/**
* 标签用户集合键前缀
*/
private static final String TAG_USERS_PREFIX = "tag:users:";
/**
* 为用户添加标签
*
* @param userId 用户ID
* @param tags 标签列表
* @return 添加的标签数量
*/
public Long addUserTags(Long userId, List<String> tags) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
if (CollectionUtils.isEmpty(tags)) {
log.warn("为用户添加标签时,标签列表为空,userId:{}", userId);
return 0L;
}
String userTagsKey = USER_TAGS_PREFIX + userId;
// 1. 向用户标签集合添加标签
Long addedCount = stringRedisTemplate.opsForSet().add(userTagsKey, tags.toArray(new String[0]));
// 2. 同时更新标签-用户反向映射
for (String tag : tags) {
String tagUsersKey = TAG_USERS_PREFIX + tag;
stringRedisTemplate.opsForSet().add(tagUsersKey, userId.toString());
}
// 3. 同步到数据库
userTagRelationMapper.batchInsertOrUpdate(userId, tags);
log.info("为用户添加标签,userId:{}, 标签数量:{}, 新增标签数量:{}",
userId, tags.size(), addedCount);
return addedCount;
}
/**
* 从用户移除标签
*
* @param userId 用户ID
* @param tags 标签列表
* @return 移除的标签数量
*/
public Long removeUserTags(Long userId, List<String> tags) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
if (CollectionUtils.isEmpty(tags)) {
log.warn("从用户移除标签时,标签列表为空,userId:{}", userId);
return 0L;
}
String userTagsKey = USER_TAGS_PREFIX + userId;
// 1. 从用户标签集合移除标签
Long removedCount = stringRedisTemplate.opsForSet().remove(userTagsKey, tags.toArray(new String[0]));
// 2. 同时更新标签-用户反向映射
for (String tag : tags) {
String tagUsersKey = TAG_USERS_PREFIX + tag;
stringRedisTemplate.opsForSet().remove(tagUsersKey, userId.toString());
}
// 3. 从数据库删除
userTagRelationMapper.batchDelete(userId, tags);
log.info("从用户移除标签,userId:{}, 标签数量:{}, 实际移除数量:{}",
userId, tags.size(), removedCount);
return removedCount;
}
/**
* 获取用户的所有标签
*
* @param userId 用户ID
* @return 标签列表
*/
public Set<String> getUserTags(Long userId) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
String key = USER_TAGS_PREFIX + userId;
// 1. 从Redis获取
Set<String> tags = stringRedisTemplate.opsForSet().members(key);
if (!CollectionUtils.isEmpty(tags)) {
log.info("获取用户标签,userId:{}, 标签数量:{}", userId, tags.size());
return tags;
}
// 2. Redis中没有,从数据库获取并同步到Redis
log.info("Redis中没有用户标签,从数据库获取,userId:{}", userId);
List<String> tagList = userTagRelationMapper.selectTagsByUserId(userId);
if (!CollectionUtils.isEmpty(tagList)) {
stringRedisTemplate.opsForSet().add(key, tagList.toArray(new String[0]));
// 同步标签-用户反向映射
for (String tag : tagList) {
String tagUsersKey = TAG_USERS_PREFIX + tag;
stringRedisTemplate.opsForSet().add(tagUsersKey, userId.toString());
}
tags = Sets.newHashSet(tagList);
} else {
tags = Sets.newHashSet();
}
log.info("用户标签同步到Redis,userId:{}, 标签数量:{}", userId, tags.size());
return tags;
}
/**
* 判断用户是否有某个标签
*
* @param userId 用户ID
* @param tag 标签
* @return true:有 false:没有
*/
public boolean hasUserTag(Long userId, String tag) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
StringUtils.hasText(tag, "标签不能为空");
String key = USER_TAGS_PREFIX + userId;
Boolean has = stringRedisTemplate.opsForSet().isMember(key, tag);
return Boolean.TRUE.equals(has);
}
/**
* 获取具有指定标签的用户ID列表
*
* @param tag 标签
* @return 用户ID列表
*/
public List<Long> getUsersByTag(String tag) {
StringUtils.hasText(tag, "标签不能为空");
String key = TAG_USERS_PREFIX + tag;
Set<String> userIdsStr = stringRedisTemplate.opsForSet().members(key);
if (CollectionUtils.isEmpty(userIdsStr)) {
log.info("没有用户具有标签:{}", tag);
return Lists.newArrayList();
}
// 转换为Long类型
List<Long> userIds = userIdsStr.stream()
.map(Long::valueOf)
.collect(Collectors.toList());
log.info("获取具有标签的用户,tag:{}, 用户数量:{}", tag, userIds.size());
return userIds;
}
/**
* 找到同时具有所有指定标签的用户
*
* @param tags 标签列表
* @return 用户ID列表
*/
public List<Long> getUsersWithAllTags(List<String> tags) {
if (CollectionUtils.isEmpty(tags)) {
log.warn("获取用户时,标签列表为空");
return Lists.newArrayList();
}
// 构建所有标签对应的用户集合键
List<String> tagUserKeys = tags.stream()
.map(tag -> TAG_USERS_PREFIX + tag)
.collect(Collectors.toList());
// 计算交集:同时具有所有标签的用户
Set<String> userIdsStr = stringRedisTemplate.opsForSet().intersect(tagUserKeys);
if (CollectionUtils.isEmpty(userIdsStr)) {
log.info("没有用户同时具有所有指定标签,标签数量:{}", tags.size());
return Lists.newArrayList();
}
// 转换为Long类型
List<Long> userIds = userIdsStr.stream()
.map(Long::valueOf)
.collect(Collectors.toList());
log.info("获取同时具有所有指定标签的用户,标签数量:{}, 用户数量:{}", tags.size(), userIds.size());
return userIds;
}
/**
* 找到具有任意一个指定标签的用户
*
* @param tags 标签列表
* @return 用户ID列表
*/
public List<Long> getUsersWithAnyTags(List<String> tags) {
if (CollectionUtils.isEmpty(tags)) {
log.warn("获取用户时,标签列表为空");
return Lists.newArrayList();
}
// 构建所有标签对应的用户集合键
List<String> tagUserKeys = tags.stream()
.map(tag -> TAG_USERS_PREFIX + tag)
.collect(Collectors.toList());
// 计算并集:具有任意一个标签的用户
Set<String> userIdsStr = stringRedisTemplate.opsForSet().union(tagUserKeys);
if (CollectionUtils.isEmpty(userIdsStr)) {
log.info("没有用户具有指定的任何标签,标签数量:{}", tags.size());
return Lists.newArrayList();
}
// 转换为Long类型
List<Long> userIds = userIdsStr.stream()
.map(Long::valueOf)
.collect(Collectors.toList());
log.info("获取具有任意一个指定标签的用户,标签数量:{}, 用户数量:{}", tags.size(), userIds.size());
return userIds;
}
}
集合类型实现标签系统的优势:
集合的随机元素获取功能非常适合实现随机推荐功能,如随机推荐商品、随机推荐好友等。
Java 代码实现:
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* 商品推荐服务
*
* @author ken
*/
@Slf4j
@Service
public class ProductRecommendationService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private ProductMapper productMapper;
/**
* 商品集合键
*/
private static final String ALL_PRODUCTS_KEY = "products:all";
/**
* 用户已浏览商品集合键前缀
*/
private static final String USER_VIEWED_PRODUCTS_PREFIX = "user:viewed:products:";
/**
* 商品分类集合键前缀
*/
private static final String CATEGORY_PRODUCTS_PREFIX = "category:products:";
/**
* 初始化商品集合
*/
public void initProductSet() {
// 查询所有商品ID
List<Long> productIds = productMapper.selectAllProductIds();
if (CollectionUtils.isEmpty(productIds)) {
log.warn("没有商品数据,无法初始化商品集合");
return;
}
// 转换为字符串
String[] productIdStrs = productIds.stream()
.map(String::valueOf)
.toArray(String[]::new);
// 先清空再添加,避免重复
stringRedisTemplate.delete(ALL_PRODUCTS_KEY);
Long addedCount = stringRedisTemplate.opsForSet().add(ALL_PRODUCTS_KEY, productIdStrs);
log.info("初始化商品集合完成,总商品数量:{}, 添加到集合的数量:{}",
productIds.size(), addedCount);
// 同时初始化分类商品集合
initCategoryProductSets();
}
/**
* 初始化分类商品集合
*/
private void initCategoryProductSets() {
// 获取所有分类
List<Long> categoryIds = productMapper.selectAllCategoryIds();
if (CollectionUtils.isEmpty(categoryIds)) {
log.warn("没有分类数据,无法初始化分类商品集合");
return;
}
for (Long categoryId : categoryIds) {
// 查询该分类下的所有商品ID
List<Long> productIds = productMapper.selectProductIdsByCategoryId(categoryId);
if (CollectionUtils.isEmpty(productIds)) {
log.info("分类下没有商品,categoryId:{}", categoryId);
continue;
}
// 转换为字符串
String[] productIdStrs = productIds.stream()
.map(String::valueOf)
.toArray(String[]::new);
String key = CATEGORY_PRODUCTS_PREFIX + categoryId;
// 先清空再添加
stringRedisTemplate.delete(key);
Long addedCount = stringRedisTemplate.opsForSet().add(key, productIdStrs);
log.info("初始化分类商品集合,categoryId:{}, 商品数量:{}", categoryId, addedCount);
}
}
/**
* 记录用户浏览商品
*
* @param userId 用户ID
* @param productId 商品ID
*/
public void recordUserViewedProduct(Long userId, Long productId) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
StringUtils.hasText(productId.toString(), "商品ID不能为空");
String key = USER_VIEWED_PRODUCTS_PREFIX + userId;
// 添加到用户已浏览商品集合
stringRedisTemplate.opsForSet().add(key, productId.toString());
// 设置过期时间,如30天,避免存储太久的浏览记录
stringRedisTemplate.expire(key, 30, java.util.concurrent.TimeUnit.DAYS);
log.info("记录用户浏览商品,userId:{}, productId:{}", userId, productId);
}
/**
* 随机推荐商品(排除已浏览的)
*
* @param userId 用户ID
* @param count 推荐数量
* @return 推荐的商品ID列表
*/
public List<Long> recommendRandomProducts(Long userId, int count) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
if (count <= 0) {
return Lists.newArrayList();
}
// 1. 获取用户已浏览的商品
String viewedKey = USER_VIEWED_PRODUCTS_PREFIX + userId;
Set<String> viewedProducts = stringRedisTemplate.opsForSet().members(viewedKey);
// 2. 计算所有商品与已浏览商品的差集
Set<String> recommendCandidates;
if (CollectionUtils.isEmpty(viewedProducts)) {
// 如果没有浏览记录,直接从所有商品中随机推荐
recommendCandidates = stringRedisTemplate.opsForSet().members(ALL_PRODUCTS_KEY);
} else {
// 排除已浏览的商品
recommendCandidates = stringRedisTemplate.opsForSet().difference(ALL_PRODUCTS_KEY, viewedKey);
}
if (CollectionUtils.isEmpty(recommendCandidates)) {
log.info("没有可推荐的商品,userId:{}", userId);
return Lists.newArrayList();
}
// 3. 如果候选商品数量小于需要推荐的数量,直接返回所有
List<Long> result;
if (recommendCandidates.size() <= count) {
result = recommendCandidates.stream()
.map(Long::valueOf)
.collect(Collectors.toList());
} else {
// 4. 随机获取指定数量的商品
// 先将集合转为列表
List<String> candidateList = Lists.newArrayList(recommendCandidates);
// 随机获取
Set<String> randomProducts = stringRedisTemplate.opsForSet().randomMembers(ALL_PRODUCTS_KEY, count);
result = randomProducts.stream()
.map(Long::valueOf)
.collect(Collectors.toList());
}
log.info("为用户随机推荐商品,userId:{}, 推荐数量:{}", userId, result.size());
return result;
}
/**
* 基于分类的随机推荐
*
* @param userId 用户ID
* @param categoryId 分类ID
* @param count 推荐数量
* @return 推荐的商品ID列表
*/
public List<Long> recommendRandomProductsByCategory(Long userId, Long categoryId, int count) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
StringUtils.hasText(categoryId.toString(), "分类ID不能为空");
if (count <= 0) {
return Lists.newArrayList();
}
String categoryKey = CATEGORY_PRODUCTS_PREFIX + categoryId;
String viewedKey = USER_VIEWED_PRODUCTS_PREFIX + userId;
// 1. 计算分类商品与已浏览商品的差集
Set<String> recommendCandidates = stringRedisTemplate.opsForSet().difference(categoryKey, viewedKey);
if (CollectionUtils.isEmpty(recommendCandidates)) {
log.info("分类下没有可推荐的商品,userId:{}, categoryId:{}", userId, categoryId);
return Lists.newArrayList();
}
// 2. 随机获取指定数量的商品
List<Long> result;
if (recommendCandidates.size() <= count) {
result = recommendCandidates.stream()
.map(Long::valueOf)
.collect(Collectors.toList());
} else {
Set<String> randomProducts = stringRedisTemplate.opsForSet().randomMembers(categoryKey, count);
result = randomProducts.stream()
.map(Long::valueOf)
.collect(Collectors.toList());
}
log.info("为用户推荐分类商品,userId:{}, categoryId:{}, 推荐数量:{}",
userId, categoryId, result.size());
return result;
}
}
集合类型实现随机推荐的优势:
有序集合是 Redis 中最强大的数据类型之一,它既像集合一样保证元素的唯一性,又像列表一样可以排序,排序的依据是每个元素的分数 (score)。
有序集合的底层实现有两种:
转换条件:
跳跃表是一种高效的有序数据结构,支持平均 O (logN)、最坏 O (N) 的查找、插入、删除操作,Redis 使用跳跃表作为有序集合的主要实现。

命令 | 功能 | 时间复杂度 |
|---|---|---|
ZADD key score1 member1 score2 member2... | 添加元素 | O(logN) per element |
ZREM key member1 member2... | 删除元素 | O(logN) per element |
ZSCORE key member | 获取元素分数 | O(1) |
ZINCRBY key increment member | 增加元素分数 | O(logN) |
ZCARD key | 获取元素数量 | O(1) |
ZRANK key member | 获取元素排名(升序) | O(logN) |
ZREVRANK key member | 获取元素排名(降序) | O(logN) |
ZRANGE key start stop [WITHSCORES] | 获取指定范围元素(升序) | O (logN + M),M 是返回的元素数量 |
ZREVRANGE key start stop [WITHSCORES] | 获取指定范围元素(降序) | O(logN + M) |
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count] | 按分数范围获取元素(升序) | O(logN + M) |
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count] | 按分数范围获取元素(降序) | O(logN + M) |
ZCOUNT key min max | 统计分数范围内的元素数量 | O(logN) |
ZREMRangeByRank key start stop | 按排名范围删除元素 | O(logN + M) |
ZREMRangeByScore key min max | 按分数范围删除元素 | O(logN + M) |
ZINTERSTORE destination numkeys key1 key2... [WEIGHTS weight1 weight2...] [AGGREGATE SUM|MIN|MAX] | 计算交集并存储 | O(NK + MlogM),N 是最小集合的大小,K 是集合数量,M 是结果集合的大小 |
ZUNIONSTORE destination numkeys key1 key2... [WEIGHTS weight1 weight2...] [AGGREGATE SUM|MIN|MAX] | 计算并集并存储 | O (N + M log M),N 是所有集合的元素总数,M 是结果集合的大小 |
排行榜是有序集合最经典的应用场景,如游戏积分排行、商品销量排行、用户贡献排行等。有序集合可以轻松实现实时更新和查询。
Java 代码实现:
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.DefaultTypedTuple;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.*;
import java.util.stream.Collectors;
/**
* 排行榜服务
*
* @author ken
*/
@Slf4j
@Service
public class RankingService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private UserScoreMapper userScoreMapper;
/**
* 总积分排行榜键
*/
private static final String TOTAL_SCORE_RANK_KEY = "rank:total:score";
/**
* 每日积分排行榜键前缀
*/
private static final String DAILY_SCORE_RANK_PREFIX = "rank:daily:score:";
/**
* 商品销量排行榜键
*/
private static final String PRODUCT_SALES_RANK_KEY = "rank:product:sales";
/**
* 更新用户总积分
*
* @param userId 用户ID
* @param score 积分(可为负数,表示减少)
* @return 更新后的总积分
*/
public Double updateUserTotalScore(Long userId, double score) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
String member = userId.toString();
// 1. 更新Redis中的排行榜
Double newScore = stringRedisTemplate.opsForZSet().incrementScore(TOTAL_SCORE_RANK_KEY, member, score);
// 2. 同步到数据库
if (newScore != null) {
userScoreMapper.updateTotalScore(userId, score, newScore);
}
log.info("更新用户总积分,userId:{}, 变动积分:{}, 新积分:{}", userId, score, newScore);
return newScore;
}
/**
* 更新用户每日积分
*
* @param userId 用户ID
* @param score 积分
* @param date 日期(格式:yyyyMMdd)
* @return 更新后的当日积分
*/
public Double updateUserDailyScore(Long userId, double score, String date) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
StringUtils.hasText(date, "日期不能为空");
String key = DAILY_SCORE_RANK_PREFIX + date;
String member = userId.toString();
// 更新Redis中的每日排行榜
Double newScore = stringRedisTemplate.opsForZSet().incrementScore(key, member, score);
log.info("更新用户每日积分,userId:{}, date:{}, 变动积分:{}, 新积分:{}",
userId, date, score, newScore);
return newScore;
}
/**
* 更新商品销量
*
* @param productId 商品ID
* @param quantity 销量(可为负数,表示减少)
* @return 更新后的总销量
*/
public Double updateProductSales(Long productId, double quantity) {
StringUtils.hasText(productId.toString(), "商品ID不能为空");
String member = productId.toString();
// 更新Redis中的销量排行榜
Double newSales = stringRedisTemplate.opsForZSet().incrementScore(PRODUCT_SALES_RANK_KEY, member, quantity);
log.info("更新商品销量,productId:{}, 变动销量:{}, 总销量:{}", productId, quantity, newSales);
return newSales;
}
/**
* 获取总积分排行榜前N名
*
* @param count 数量
* @return 排行榜列表,按积分降序排列
*/
public List<RankingDTO> getTopTotalScores(int count) {
if (count <= 0) {
return Lists.newArrayList();
}
// 获取前count名,按分数降序
Set<ZSetOperations.TypedTuple<String>> tuples = stringRedisTemplate.opsForZSet()
.reverseRangeWithScores(TOTAL_SCORE_RANK_KEY, 0, count - 1);
if (CollectionUtils.isEmpty(tuples)) {
log.info("总积分排行榜为空");
return Lists.newArrayList();
}
// 转换为DTO
List<RankingDTO> result = convertTuplesToRankingDTOs(tuples);
log.info("获取总积分排行榜前{}名,实际返回{}条", count, result.size());
return result;
}
/**
* 获取指定日期的积分排行榜前N名
*
* @param date 日期(格式:yyyyMMdd)
* @param count 数量
* @return 排行榜列表
*/
public List<RankingDTO> getTopDailyScores(String date, int count) {
StringUtils.hasText(date, "日期不能为空");
if (count <= 0) {
return Lists.newArrayList();
}
String key = DAILY_SCORE_RANK_PREFIX + date;
// 获取前count名,按分数降序
Set<ZSetOperations.TypedTuple<String>> tuples = stringRedisTemplate.opsForZSet()
.reverseRangeWithScores(key, 0, count - 1);
if (CollectionUtils.isEmpty(tuples)) {
log.info("{}的每日积分排行榜为空", date);
return Lists.newArrayList();
}
// 转换为DTO
List<RankingDTO> result = convertTuplesToRankingDTOs(tuples);
log.info("获取{}的每日积分排行榜前{}名,实际返回{}条", date, count, result.size());
return result;
}
/**
* 获取商品销量排行榜前N名
*
* @param count 数量
* @return 排行榜列表
*/
public List<ProductRankingDTO> getTopProductSales(int count) {
if (count <= 0) {
return Lists.newArrayList();
}
// 获取前count名,按销量降序
Set<ZSetOperations.TypedTuple<String>> tuples = stringRedisTemplate.opsForZSet()
.reverseRangeWithScores(PRODUCT_SALES_RANK_KEY, 0, count - 1);
if (CollectionUtils.isEmpty(tuples)) {
log.info("商品销量排行榜为空");
return Lists.newArrayList();
}
// 转换为商品排行榜DTO
List<ProductRankingDTO> result = Lists.newArrayList();
int rank = 1;
for (ZSetOperations.TypedTuple<String> tuple : tuples) {
String productIdStr = tuple.getValue();
Double sales = tuple.getScore();
if (productIdStr != null && sales != null) {
Long productId = Long.valueOf(productIdStr);
// 查询商品信息
ProductDO product = productMapper.selectById(productId);
if (product != null) {
ProductRankingDTO dto = new ProductRankingDTO();
dto.setRank(rank);
dto.setProductId(productId);
dto.setProductName(product.getName());
dto.setSales(sales.longValue());
result.add(dto);
rank++;
}
}
}
log.info("获取商品销量排行榜前{}名,实际返回{}条", count, result.size());
return result;
}
/**
* 获取用户在总积分排行榜中的排名和积分
*
* @param userId 用户ID
* @return 排名信息,null表示不在排行榜中
*/
public UserRankingDTO getUserTotalScoreRank(Long userId) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
String member = userId.toString();
// 获取分数
Double score = stringRedisTemplate.opsForZSet().score(TOTAL_SCORE_RANK_KEY, member);
if (score == null) {
log.info("用户不在总积分排行榜中,userId:{}", userId);
return null;
}
// 获取排名(降序)
Long rank = stringRedisTemplate.opsForZSet().reverseRank(TOTAL_SCORE_RANK_KEY, member);
if (rank == null) {
log.info("用户不在总积分排行榜中,userId:{}", userId);
return null;
}
// 查询用户信息
UserDO user = userMapper.selectById(userId);
UserRankingDTO dto = new UserRankingDTO();
dto.setUserId(userId);
dto.setUsername(user != null ? user.getUsername() : null);
dto.setScore(score);
dto.setRank(rank + 1); // 排名从1开始
log.info("获取用户总积分排名,userId:{}, 排名:{}, 积分:{}", userId, dto.getRank(), score);
return dto;
}
/**
* 获取用户附近的排名(前后各n名)
*
* @param userId 用户ID
* @param n 前后各n名
* @return 排名列表
*/
public List<RankingDTO> getNearbyRankings(Long userId, int n) {
StringUtils.hasText(userId.toString(), "用户ID不能为空");
if (n <= 0) {
return Lists.newArrayList();
}
String member = userId.toString();
// 获取用户排名(升序)
Long rank = stringRedisTemplate.opsForZSet().rank(TOTAL_SCORE_RANK_KEY, member);
if (rank == null) {
log.info("用户不在总积分排行榜中,userId:{}", userId);
return Lists.newArrayList();
}
// 计算起始和结束位置
long start = Math.max(0, rank - n);
long end = rank + n;
// 获取范围内的排名
Set<ZSetOperations.TypedTuple<String>> tuples = stringRedisTemplate.opsForZSet()
.reverseRangeWithScores(TOTAL_SCORE_RANK_KEY, start, end);
if (CollectionUtils.isEmpty(tuples)) {
log.info("用户附近没有排名数据,userId:{}", userId);
return Lists.newArrayList();
}
// 转换为DTO
List<RankingDTO> result = convertTuplesToRankingDTOs(tuples);
log.info("获取用户附近的排名,userId:{}, 范围:{}~{}, 数量:{}",
userId, start, end, result.size());
return result;
}
/**
* 将TypedTuple集合转换为RankingDTO列表
*/
private List<RankingDTO> convertTuplesToRankingDTOs(Set<ZSetOperations.TypedTuple<String>> tuples) {
List<RankingDTO> result = Lists.newArrayList();
int rank = 1;
for (ZSetOperations.TypedTuple<String> tuple : tuples) {
String userIdStr = tuple.getValue();
Double score = tuple.getScore();
if (userIdStr != null && score != null) {
Long userId = Long.valueOf(userIdStr);
// 查询用户信息
UserDO user = userMapper.selectById(userId);
RankingDTO dto = new RankingDTO();
dto.setRank(rank);
dto.setUserId(userId);
dto.setUsername(user != null ? user.getUsername() : null);
dto.setScore(score);
result.add(dto);
rank++;
}
}
return result;
}
/**
* 排行榜DTO
*/
public static class RankingDTO {
private int rank;
private Long userId;
private String username;
private Double score;
// getter和setter省略
}
/**
* 用户排名DTO
*/
public static class UserRankingDTO {
private Long userId;
private String username;
private Double score;
private Long rank;
// getter和setter省略
}
/**
* 商品排行榜DTO
*/
public static class ProductRankingDTO {
private int rank;
private Long productId;
private String productName;
private Long sales;
// getter和setter省略
}
}
有序集合实现排行榜的优势:
有序集合可以用来实现延迟任务队列,将任务的执行时间作为分数,定期获取分数小于当前时间的任务进行处理。
Java 代码实现:
import com.alibaba.fastjson2.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.Set;
import java.util.UUID;
/**
* 基于Redis有序集合的延迟任务队列
*
* @author ken
*/
@Slf4j
@Component
public class RedisDelayQueue {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private OrderService orderService;
@Resource
private NotificationService notificationService;
/**
* 延迟任务队列键
*/
private static final String DELAY_QUEUE_KEY = "delay:queue";
/**
* 处理中的任务键前缀
*/
private static final String PROCESSING_TASK_PREFIX = "processing:task:";
/**
* 任务处理器注册表
*/
private final TaskHandlerRegistry taskHandlerRegistry = new TaskHandlerRegistry();
/**
* 初始化任务处理器
*/
public RedisDelayQueue() {
// 注册订单超时取消任务处理器
taskHandlerRegistry.register(TaskType.ORDER_TIMEOUT_CANCEL,
task -> orderService.cancelOrderByTimeout(Long.valueOf(task.getPayload())));
// 注册订单支付提醒任务处理器
taskHandlerRegistry.register(TaskType.NOTIFICATION_PAYMENT_REMIND,
task -> notificationService.sendPaymentReminder(Long.valueOf(task.getPayload())));
// 可以注册更多任务处理器...
}
/**
* 添加延迟任务
*
* @param taskType 任务类型
* @param payload 任务数据
* @param delayMillis 延迟时间(毫秒)
* @return 任务ID
*/
public String addDelayTask(TaskType taskType, String payload, long delayMillis) {
StringUtils.hasText(taskType.name(), "任务类型不能为空");
StringUtils.hasText(payload, "任务数据不能为空");
if (delayMillis <= 0) {
throw new IllegalArgumentException("延迟时间必须大于0");
}
// 生成唯一任务ID
String taskId = UUID.randomUUID().toString();
// 计算执行时间戳(当前时间 + 延迟时间)
long executeTime = System.currentTimeMillis() + delayMillis;
// 构建任务对象
DelayTask task = new DelayTask();
task.setTaskId(taskId);
task.setTaskType(taskType);
task.setPayload(payload);
task.setCreateTime(System.currentTimeMillis());
task.setExecuteTime(executeTime);
// 序列化为JSON
String taskJson = JSON.toJSONString(task);
// 添加到有序集合,分数为执行时间戳
Boolean added = stringRedisTemplate.opsForZSet().add(DELAY_QUEUE_KEY, taskJson, executeTime);
if (Boolean.TRUE.equals(added)) {
log.info("添加延迟任务成功,taskId:{}, taskType:{}, delay:{}ms, executeTime:{}",
taskId, taskType, delayMillis, executeTime);
return taskId;
} else {
log.error("添加延迟任务失败,taskId:{}, taskType:{}", taskId, taskType);
throw new RuntimeException("添加延迟任务失败");
}
}
/**
* 定期从延迟队列中获取并处理到期的任务
* 每10秒执行一次
*/
@Scheduled(fixedRate = 10000)
public void processExpiredTasks() {
log.info("开始处理到期的延迟任务");
// 当前时间戳
long now = System.currentTimeMillis();
// 获取所有执行时间 <= 当前时间的任务
Set<ZSetOperations.TypedTuple<String>> tuples = stringRedisTemplate.opsForZSet()
.rangeByScoreWithScores(DELAY_QUEUE_KEY, 0, now);
if (CollectionUtils.isEmpty(tuples)) {
log.info("没有到期的延迟任务需要处理");
return;
}
log.info("发现{}个到期的延迟任务,开始处理", tuples.size());
int successCount = 0;
int failCount = 0;
for (ZSetOperations.TypedTuple<String> tuple : tuples) {
String taskJson = tuple.getValue();
Double score = tuple.getScore();
if (!StringUtils.hasText(taskJson) || score == null) {
log.warn("无效的延迟任务数据,跳过处理");
continue;
}
try {
// 解析任务
DelayTask task = JSON.parseObject(taskJson, DelayTask.class);
// 检查任务是否有效
if (task == null || !StringUtils.hasText(task.getTaskId())) {
log.warn("无效的延迟任务,跳过处理,taskJson:{}", taskJson);
// 从队列中移除无效任务
stringRedisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY, taskJson);
failCount++;
continue;
}
log.info("开始处理延迟任务,taskId:{}, taskType:{}", task.getTaskId(), task.getTaskType());
// 使用分布式锁确保任务只被处理一次
String lockKey = PROCESSING_TASK_PREFIX + task.getTaskId();
Boolean locked = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", 60, java.util.concurrent.TimeUnit.SECONDS);
if (Boolean.TRUE.equals(locked)) {
try {
// 再次检查任务是否还在队列中(可能已被其他节点处理)
Double currentScore = stringRedisTemplate.opsForZSet().score(DELAY_QUEUE_KEY, taskJson);
if (currentScore == null) {
log.info("延迟任务已被处理,跳过,taskId:{}", task.getTaskId());
continue;
}
// 执行任务
TaskHandler handler = taskHandlerRegistry.getHandler(task.getTaskType());
if (handler != null) {
handler.handle(task);
successCount++;
log.info("延迟任务处理成功,taskId:{}, taskType:{}", task.getTaskId(), task.getTaskType());
} else {
log.error("找不到延迟任务处理器,taskId:{}, taskType:{}", task.getTaskId(), task.getTaskType());
failCount++;
}
// 从队列中移除任务
stringRedisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY, taskJson);
} finally {
// 释放锁
stringRedisTemplate.delete(lockKey);
}
} else {
log.info("延迟任务已被其他节点处理,跳过,taskId:{}", task.getTaskId());
}
} catch (Exception e) {
log.error("处理延迟任务发生异常,taskJson:{}", taskJson, e);
failCount++;
}
}
log.info("延迟任务处理完成,成功:{}个,失败:{}个", successCount, failCount);
}
/**
* 取消延迟任务
*
* @param taskId 任务ID
* @return 是否取消成功
*/
public boolean cancelDelayTask(String taskId) {
StringUtils.hasText(taskId, "任务ID不能为空");
log.info("尝试取消延迟任务,taskId:{}", taskId);
// 由于无法直接通过taskId查询,这里采用扫描的方式(实际应用中可以考虑维护taskId到taskJson的映射)
// 注意:大范围扫描会影响性能,实际应用中应优化
long now = System.currentTimeMillis();
Set<String> tasks = stringRedisTemplate.opsForZSet().rangeByScore(DELAY_QUEUE_KEY, 0, now + 86400000); // 扫描未来24小时的任务
if (CollectionUtils.isEmpty(tasks)) {
log.info("没有找到延迟任务,取消失败,taskId:{}", taskId);
return false;
}
for (String taskJson : tasks) {
try {
DelayTask task = JSON.parseObject(taskJson, DelayTask.class);
if (task != null && taskId.equals(task.getTaskId())) {
// 找到任务,从队列中移除
Long removed = stringRedisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY, taskJson);
if (removed != null && removed > 0) {
log.info("延迟任务取消成功,taskId:{}", taskId);
return true;
}
}
} catch (Exception e) {
log.error("解析任务JSON发生异常,taskJson:{}", taskJson, e);
}
}
log.info("没有找到延迟任务,取消失败,taskId:{}", taskId);
return false;
}
/**
* 延迟任务类
*/
public static class DelayTask {
private String taskId;
private TaskType taskType;
private String payload;
private long createTime;
private long executeTime;
// getter和setter省略
}
/**
* 任务类型枚举
*/
public enum TaskType {
ORDER_TIMEOUT_CANCEL, // 订单超时取消
NOTIFICATION_PAYMENT_REMIND, // 支付提醒通知
// 可以添加更多任务类型...
}
/**
* 任务处理器接口
*/
@FunctionalInterface
public interface TaskHandler {
void handle(DelayTask task);
}
/**
* 任务处理器注册表
*/
public static class TaskHandlerRegistry {
private final java.util.Map<TaskType, TaskHandler> handlers = new java.util.HashMap<>();
public void register(TaskType taskType, TaskHandler handler) {
handlers.put(taskType, handler);
}
public TaskHandler getHandler(TaskType taskType) {
return handlers.get(taskType);
}
}
}
使用示例:
/**
* 订单服务中使用延迟队列
*/
@Slf4j
@Service
public class OrderServiceImpl implements OrderService {
@Resource
private RedisDelayQueue delayQueue;
@Resource
private OrderMapper orderMapper;
/**
* 创建订单,并添加订单超时取消的延迟任务
*/
@Override
public OrderDTO createOrder(OrderCreateDTO createDTO) {
// 参数校验
StringUtils.hasText(createDTO.getUserId().toString(), "用户ID不能为空");
StringUtils.hasText(createDTO.getProductId().toString(), "商品ID不能为空");
if (createDTO.getQuantity() <= 0) {
throw new BusinessException("订单数量必须大于0");
}
// 1. 构建订单对象
OrderDO order = new OrderDO();
order.setOrderNo(generateOrderNo());
order.setUserId(createDTO.getUserId());
order.setProductId(createDTO.getProductId());
order.setQuantity(createDTO.getQuantity());
order.setAmount(calculateOrderAmount(createDTO));
order.setStatus(OrderStatus.PENDING_PAYMENT);
order.setCreateTime(System.currentTimeMillis());
order.setUpdateTime(System.currentTimeMillis());
// 2. 保存订单到数据库
int insertRows = orderMapper.insert(order);
if (insertRows <= 0) {
log.error("创建订单失败,保存数据库时返回0行影响,userId:{}", createDTO.getUserId());
throw new BusinessException("创建订单失败,请稍后重试");
}
// 3. 添加订单超时取消延迟任务(30分钟后执行)
long delayMillis = 30 * 60 * 1000; // 30分钟
String taskId = delayQueue.addDelayTask(
RedisDelayQueue.TaskType.ORDER_TIMEOUT_CANCEL,
order.getId().toString(), // 任务数据:订单ID
delayMillis
);
// 4. 记录任务ID到订单表(便于后续取消任务)
order.setDelayTaskId(taskId);
order.setUpdateTime(System.currentTimeMillis());
orderMapper.updateById(order);
log.info("创建订单成功,orderId:{}, orderNo:{}, delayTaskId:{}",
order.getId(), order.getOrderNo(), taskId);
// 5. 转换为DTO返回
return convertToDTO(order);
}
/**
* 取消超时未支付订单
*/
@Override
public boolean cancelOrderByTimeout(Long orderId) {
StringUtils.hasText(orderId.toString(), "订单ID不能为空");
// 1. 查询订单当前状态
OrderDO order = orderMapper.selectById(orderId);
if (order == null) {
log.warn("取消超时订单失败,订单不存在,orderId:{}", orderId);
return false;
}
// 2. 只有待支付状态的订单才能被超时取消
if (!OrderStatus.PENDING_PAYMENT.equals(order.getStatus())) {
log.info("订单状态不是待支付,无需超时取消,orderId:{}, currentStatus:{}",
orderId, order.getStatus());
return true; // 状态正确,视为处理成功
}
// 3. 更新订单状态为已取消
OrderDO updateOrder = new OrderDO();
updateOrder.setId(orderId);
updateOrder.setStatus(OrderStatus.CANCELLED);
updateOrder.setCancelReason("超时未支付");
updateOrder.setCancelTime(System.currentTimeMillis());
updateOrder.setUpdateTime(System.currentTimeMillis());
int updateRows = orderMapper.updateById(updateOrder);
if (updateRows <= 0) {
log.error("取消超时订单失败,更新数据库时返回0行影响,orderId:{}", orderId);
return false;
}
// 4. 触发后续业务逻辑(如恢复库存)
boolean restoreStockSuccess = restoreProductStock(order.getProductId(), order.getQuantity());
if (!restoreStockSuccess) {
log.error("取消超时订单成功,但恢复库存失败,orderId:{}, productId:{}, quantity:{}",
orderId, order.getProductId(), order.getQuantity());
// 此处可根据业务需求决定是否抛出异常或进行补偿处理
}
log.info("取消超时订单成功,orderId:{}, orderNo:{}", orderId, order.getOrderNo());
return true;
}
/**
* 支付订单(支付成功后取消延迟任务)
*/
@Override
public boolean payOrder(Long orderId, String paymentNo) {
StringUtils.hasText(orderId.toString(), "订单ID不能为空");
StringUtils.hasText(paymentNo, "支付单号不能为空");
// 1. 查询订单当前状态
OrderDO order = orderMapper.selectById(orderId);
if (order == null) {
log.warn("支付订单失败,订单不存在,orderId:{}", orderId);
throw new BusinessException("订单不存在");
}
// 2. 检查订单状态是否为待支付
if (!OrderStatus.PENDING_PAYMENT.equals(order.getStatus())) {
log.warn("支付订单失败,订单状态不是待支付,orderId:{}, currentStatus:{}",
orderId, order.getStatus());
throw new BusinessException("订单状态异常,无法支付");
}
// 3. 更新订单状态为已支付
OrderDO updateOrder = new OrderDO();
updateOrder.setId(orderId);
updateOrder.setStatus(OrderStatus.PAID);
updateOrder.setPaymentNo(paymentNo);
updateOrder.setPaymentTime(System.currentTimeMillis());
updateOrder.setUpdateTime(System.currentTimeMillis());
int updateRows = orderMapper.updateById(updateOrder);
if (updateRows <= 0) {
log.error("支付订单失败,更新数据库时返回0行影响,orderId:{}", orderId);
throw new BusinessException("支付失败,请稍后重试");
}
// 4. 取消订单超时取消的延迟任务
if (StringUtils.hasText(order.getDelayTaskId())) {
boolean cancelTaskSuccess = delayQueue.cancelDelayTask(order.getDelayTaskId());
if (cancelTaskSuccess) {
log.info("支付订单成功,已取消延迟任务,orderId:{}, delayTaskId:{}",
orderId, order.getDelayTaskId());
} else {
log.warn("支付订单成功,但取消延迟任务失败,orderId:{}, delayTaskId:{}",
orderId, order.getDelayTaskId());
// 此处可记录日志并进行补偿处理(如定时任务清理无效任务)
}
}
// 5. 触发后续业务逻辑(如扣减库存、生成物流单)
processAfterPayment(order);
log.info("支付订单成功,orderId:{}, orderNo:{}, paymentNo:{}",
orderId, order.getOrderNo(), paymentNo);
return true;
}
/**
* 生成订单号(年月日时分秒 + 6位随机数)
*/
private String generateOrderNo() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
String dateStr = sdf.format(new Date());
String randomStr = String.format("%06d", new Random().nextInt(1000000));
return dateStr + randomStr;
}
/**
* 计算订单金额
*/
private BigDecimal calculateOrderAmount(OrderCreateDTO createDTO) {
// 1. 查询商品单价
ProductDO product = productMapper.selectById(createDTO.getProductId());
if (product == null) {
throw new BusinessException("商品不存在");
}
// 2. 计算金额(单价 * 数量)
return product.getPrice().multiply(new BigDecimal(createDTO.getQuantity()));
}
/**
* 恢复商品库存
*/
private boolean restoreProductStock(Long productId, Integer quantity) {
// 使用MyBatis-Plus的乐观锁或分布式锁确保库存操作安全
int updateRows = productMapper.increaseStock(productId, quantity);
return updateRows > 0;
}
/**
* 支付后处理逻辑
*/
private void processAfterPayment(OrderDO order) {
// 1. 扣减库存(此处假设创建订单时已预扣库存,支付后确认扣减)
productMapper.confirmDecreaseStock(order.getProductId(), order.getQuantity());
// 2. 生成物流单(可异步处理)
logisticsService.createLogisticsOrder(order);
// 3. 发送支付成功通知(可通过消息队列异步发送)
notificationService.sendPaymentSuccessNotification(order.getUserId(), order.getId());
}
/**
* DO转DTO
*/
private OrderDTO convertToDTO(OrderDO order) {
OrderDTO dto = new OrderDTO();
BeanUtils.copyProperties(order, dto);
// 补充DTO特有字段(如商品名称、用户昵称等)
ProductDO product = productMapper.selectById(order.getProductId());
if (product != null) {
dto.setProductName(product.getName());
dto.setProductImage(product.getImage());
}
UserDO user = userMapper.selectById(order.getUserId());
if (user != null) {
dto.setUserName(user.getNickname());
}
return dto;
}
}
/**
* 通知服务实现类
*/
@Slf4j
@Service
public class NotificationServiceImpl implements NotificationService {
@Resource
private RedisDelayQueue delayQueue;
@Resource
private SmsService smsService;
@Resource
private UserMapper userMapper;
/**
* 发送支付提醒通知
*/
@Override
public void sendPaymentReminder(Long orderId) {
StringUtils.hasText(orderId.toString(), "订单ID不能为空");
// 1. 查询订单信息
OrderDO order = orderMapper.selectById(orderId);
if (order == null) {
log.warn("发送支付提醒失败,订单不存在,orderId:{}", orderId);
return;
}
// 2. 检查订单状态(仅待支付订单需要提醒)
if (!OrderStatus.PENDING_PAYMENT.equals(order.getStatus())) {
log.info("订单状态不是待支付,无需发送支付提醒,orderId:{}, currentStatus:{}",
orderId, order.getStatus());
return;
}
// 3. 查询用户信息(获取手机号)
UserDO user = userMapper.selectById(order.getUserId());
if (user == null || !StringUtils.hasText(user.getPhone())) {
log.warn("发送支付提醒失败,用户信息不存在或手机号为空,userId:{}, orderId:{}",
order.getUserId(), orderId);
return;
}
// 4. 构建短信内容
String smsContent = String.format(
"【电商平台】您的订单(订单号:%s)尚未支付,剩余支付时间不多啦~ 点击查看详情:https://example.com/order/%d",
order.getOrderNo(), orderId
);
// 5. 发送短信
boolean sendSuccess = smsService.sendSms(user.getPhone(), smsContent);
if (sendSuccess) {
log.info("发送支付提醒成功,orderId:{}, userId:{}, phone:{}",
orderId, order.getUserId(), maskPhone(user.getPhone()));
} else {
log.error("发送支付提醒失败,orderId:{}, userId:{}, phone:{}",
orderId, order.getUserId(), maskPhone(user.getPhone()));
// 发送失败时,可添加重试任务(限制重试次数)
retrySendPaymentReminder(orderId, order.getUserId());
}
}
/**
* 发送支付成功通知
*/
@Override
public void sendPaymentSuccessNotification(Long userId, Long orderId) {
// 实现逻辑类似,此处省略
}
/**
* 重试发送支付提醒(最多重试3次)
*/
private void retrySendPaymentReminder(Long orderId, Long userId) {
// 1. 查询重试次数(可存储在Redis或数据库)
String retryCountKey = "notification:payment:reminder:retry:" + orderId;
String retryCountStr = stringRedisTemplate.opsForValue().get(retryCountKey);
int retryCount = StringUtils.hasText(retryCountStr) ? Integer.parseInt(retryCountStr) : 0;
// 2. 检查是否超过最大重试次数
if (retryCount >= 3) {
log.error("支付提醒重试次数已达上限,停止重试,orderId:{}, userId:{}", orderId, userId);
// 记录失败日志,便于后续人工处理
saveNotificationFailLog(userId, orderId, "支付提醒", "重试次数超限");
return;
}
// 3. 计算下次重试时间(指数退避:1分钟、2分钟、4分钟)
long delayMillis = (long) (Math.pow(2, retryCount) * 60 * 1000);
// 4. 添加重试任务到延迟队列
String taskId = delayQueue.addDelayTask(
RedisDelayQueue.TaskType.NOTIFICATION_PAYMENT_REMIND,
orderId.toString(),
delayMillis
);
// 5. 更新重试次数
stringRedisTemplate.opsForValue().set(
retryCountKey,
String.valueOf(retryCount + 1),
24,
TimeUnit.HOURS
);
log.info("添加支付提醒重试任务,orderId:{}, retryCount:{}, delay:{}ms, taskId:{}",
orderId, retryCount + 1, delayMillis, taskId);
}
/**
* 手机号脱敏(显示前3位和后4位,中间用*代替)
*/
private String maskPhone(String phone) {
if (!StringUtils.hasText(phone) || phone.length() != 11) {
return phone;
}
return phone.substring(0, 3) + "****" + phone.substring(7);
}
/**
* 保存通知失败日志
*/
private void saveNotificationFailLog(Long userId, Long orderId, String notificationType, String reason) {
NotificationFailLogDO logDO = new NotificationFailLogDO();
logDO.setUserId(userId);
logDO.setOrderId(orderId);
logDO.setNotificationType(notificationType);
logDO.setFailReason(reason);
logDO.setCreateTime(System.currentTimeMillis());
notificationFailLogMapper.insert(logDO);
}
}
有序集合实现延迟队列的优势:
ZRANGEBYSCORE可快速筛选出到期任务,时间复杂度 O (logN + M)注意事项:
位图并非独立的数据类型,而是字符串类型的特殊应用,它将字符串的每个字节视为二进制位(bit),通过位操作来存储和处理数据。位图特别适合存储大量布尔类型的状态信息(如是否签到、是否在线)。
Redis 位图的底层是字符串(SDS 结构),每个字符占 8 个二进制位。例如,一个长度为 4 的字符串可以表示 32 个布尔状态(4 * 8 = 32 位)。
位图的核心优势在于空间效率:存储 100 万个布尔值仅需约 125KB(1000000 / 8 / 1024 ≈ 122KB),远低于其他数据类型。

命令 | 功能 | 时间复杂度 |
|---|---|---|
SETBIT key offset value | 设置指定偏移量的位值(0 或 1) | O(1) |
GETBIT key offset | 获取指定偏移量的位值 | O(1) |
BITCOUNT key [start end] | 统计指定范围内值为 1 的位数量 | O (N),N 是字节数 |
BITOP operation destkey key1 key2... | 对多个位图执行位运算(AND/OR/XOR/NOT) | O (N),N 是最长位图的字节数 |
BITPOS key value [start end] | 查找指定值(0 或 1)的第一个位置 | O(N) |
用户签到是典型的二进制状态场景(签到 / 未签到),使用位图可以高效存储和统计用户的签到记录。
实现思路:
sign:user:1001)@Service
public class SignServiceImpl implements SignService {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 用户签到
* @param userId 用户ID
* @param date 签到日期(精确到天)
* @return 是否签到成功(已签过返回false)
*/
@Override
public boolean sign(Long userId, LocalDate date) {
// 1. 校验参数
if (userId == null || date == null) {
throw new IllegalArgumentException("用户ID和日期不能为空");
}
// 2. 构建Redis键(按用户+年份+月份存储,方便统计月度签到)
String key = String.format("sign:user:%d:%d:%d",
userId, date.getYear(), date.getMonthValue());
// 3. 计算偏移量(当月第几天-1,如1号是0,2号是1...)
int offset = date.getDayOfMonth() - 1;
// 4. 设置位值(如果已为1则返回false,否则返回true)
Boolean result = stringRedisTemplate.opsForValue().setBit(key, offset, true);
// 5. 记录签到日志(可选)
if (Boolean.TRUE.equals(result)) {
log.info("用户签到成功,userId:{}, date:{}", userId, date);
return true;
} else {
log.info("用户已签到,userId:{}, date:{}", userId, date);
return false;
}
}
/**
* 查询用户某天是否签到
*/
@Override
public boolean isSigned(Long userId, LocalDate date) {
String key = String.format("sign:user:%d:%d:%d",
userId, date.getYear(), date.getMonthValue());
int offset = date.getDayOfMonth() - 1;
Boolean isSigned = stringRedisTemplate.opsForValue().getBit(key, offset);
return Boolean.TRUE.equals(isSigned);
}
/**
* 统计用户当月签到次数
*/
@Override
public int countMonthSignedDays(Long userId, LocalDate date) {
String key = String.format("sign:user:%d:%d:%d",
userId, date.getYear(), date.getMonthValue());
// BITCOUNT命令统计值为1的位数量
Long count = stringRedisTemplate.execute(
(RedisCallback<Long>) connection -> connection.bitCount(key.getBytes())
);
return count != null ? count.intValue() : 0;
}
/**
* 获取用户当月签到记录(哪几天签了到)
*/
@Override
public List<Integer> getMonthSignedDays(Long userId, LocalDate date) {
String key = String.format("sign:user:%d:%d:%d",
userId, date.getYear(), date.getMonthValue());
// 获取当月天数
int daysInMonth = date.lengthOfMonth();
// 获取位图数据
byte[] bytes = stringRedisTemplate.opsForValue().get(key.getBytes());
if (bytes == null) {
return Collections.emptyList();
}
List<Integer> signedDays = new ArrayList<>();
for (int i = 0; i < daysInMonth; i++) {
// 计算字节索引和位索引
int byteIndex = i / 8;
int bitIndex = i % 8;
// 检查是否越界
if (byteIndex >= bytes.length) {
continue;
}
// 判断该位是否为1
if ((bytes[byteIndex] & (1 << bitIndex)) != 0) {
signedDays.add(i + 1); // 转换为日期(1号开始)
}
}
return signedDays;
}
/**
* 获取用户连续签到天数(截至当前日期)
*/
@Override
public int getContinuousSignedDays(Long userId, LocalDate date) {
String key = String.format("sign:user:%d:%d:%d",
userId, date.getYear(), date.getMonthValue());
int currentDay = date.getDayOfMonth();
int continuousDays = 0;
// 从当天往前检查
for (int i = currentDay - 1; i >= 0; i--) {
Boolean isSigned = stringRedisTemplate.opsForValue().getBit(key, i);
if (Boolean.TRUE.equals(isSigned)) {
continuousDays++;
} else {
break; // 遇到未签到的日期,停止计数
}
}
return continuousDays;
}
}
位图实现签到的优势:
位图可以高效统计网站的活跃用户,尤其是需要按日 / 周 / 月维度进行交叉分析的场景。
实现思路:
active:20231001)@Service
public class ActiveUserServiceImpl implements ActiveUserService {
@Resource
private StringRedisTemplate stringRedisTemplate;
/**
* 记录用户活跃
*/
@Override
public void recordActiveUser(Long userId, LocalDate date) {
if (userId == null || date == null) {
throw new IllegalArgumentException("用户ID和日期不能为空");
}
// 构建每日活跃用户的位图键
String key = String.format("active:%s", date.format(DateTimeFormatter.ofPattern("yyyyMMdd")));
// 设置对应位为1(用户ID作为偏移量)
stringRedisTemplate.opsForValue().setBit(key, userId, true);
}
/**
* 统计单日活跃用户数
*/
@Override
public long countDailyActiveUsers(LocalDate date) {
String key = String.format("active:%s", date.format(DateTimeFormatter.ofPattern("yyyyMMdd")));
Long count = stringRedisTemplate.execute(
(RedisCallback<Long>) connection -> connection.bitCount(key.getBytes())
);
return count != null ? count : 0;
}
/**
* 统计多日活跃用户数(去重)
*/
@Override
public long countMultiDaysActiveUsers(LocalDate startDate, LocalDate endDate) {
if (startDate.isAfter(endDate)) {
throw new IllegalArgumentException("开始日期不能晚于结束日期");
}
// 收集日期范围内的所有位图键
List<String> keys = new ArrayList<>();
LocalDate current = startDate;
while (!current.isAfter(endDate)) {
keys.add(String.format("active:%s", current.format(DateTimeFormatter.ofPattern("yyyyMMdd"))));
current = current.plusDays(1);
}
if (keys.isEmpty()) {
return 0;
}
// 生成临时结果键
String tempKey = "active:temp:" + UUID.randomUUID().toString();
try {
// 对所有位图执行OR运算(合并活跃用户,去重)
String[] keyArray = keys.toArray(new String[0]);
Long result = stringRedisTemplate.execute(
(RedisCallback<Long>) connection -> connection.bitOp(
BitOperation.OR,
tempKey.getBytes(),
Arrays.stream(keyArray).map(String::getBytes).toArray(byte[][]::new)
)
);
// 统计结果位图中1的数量
if (result != null) {
Long count = stringRedisTemplate.execute(
(RedisCallback<Long>) connection -> connection.bitCount(tempKey.getBytes())
);
return count != null ? count : 0;
}
} finally {
// 删除临时键
stringRedisTemplate.delete(tempKey);
}
return 0;
}
/**
* 统计连续多日活跃的用户数
*/
@Override
public long countContinuousActiveUsers(LocalDate startDate, LocalDate endDate) {
// 实现逻辑类似,使用AND运算替代OR运算
// 代码省略...
}
}
位图实现活跃统计的优势:
Redis 3.2 版本引入了地理空间数据类型,用于存储和查询地理位置信息(经纬度),支持距离计算、范围查询等功能,特别适合 LBS(Location-Based Service)场景。
地理空间类型的底层实际上是有序集合(ZSet),它通过一种名为GeoHash的编码方式将二维的经纬度转换为一维的字符串,然后以该字符串作为 ZSet 的成员(member),以其对应的数值作为分数(score)。
GeoHash 编码的原理是将地球表面划分为网格,对每个网格进行编码,越精确的位置编码越长。两个位置越近,它们的 GeoHash 编码前缀越相似。

命令 | 功能 | 时间复杂度 |
|---|---|---|
GEOADD key longitude latitude member [longitude latitude member ...] | 添加地理位置 | O (logN),N 是元素数量 |
GEOPOS key member [member ...] | 获取地理位置的经纬度 | O(1) |
GEODIST key member1 member2 [unit] | 计算两个位置的距离 | O(1) |
GEORADIUS key longitude latitude radius unit [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] | 根据经纬度查询指定范围内的位置 | O (N + logM),N 是范围内元素数,M 是总元素数 |
GEORADIUSBYMEMBER key member radius unit [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] | 根据成员查询指定范围内的位置 | O(N + logM) |
GEOHASH key member [member ...] | 获取位置的 GeoHash 编码 | O(1) |
以电商平台的 "附近的店铺" 功能为例,展示地理空间类型的实际应用。
@Service
public class ShopServiceImpl implements ShopService {
@Resource
private StringRedisTemplate stringRedisTemplate;
// 存储店铺地理位置的Redis键
private static final String SHOP_GEO_KEY = "shop:geo";
/**
* 添加店铺地理位置
*/
@Override
public void addShopLocation(Long shopId, double longitude, double latitude) {
// 校验经纬度范围( longitude: -180~180, latitude: -85.05112878~85.05112878)
if (longitude < -180 || longitude > 180 || latitude < -85.05112878 || latitude > 85.05112878) {
throw new IllegalArgumentException("经纬度超出有效范围");
}
// GEOADD命令添加地理位置(member为店铺ID)
stringRedisTemplate.opsForGeo().add(SHOP_GEO_KEY, new Point(longitude, latitude), shopId.toString());
}
/**
* 获取店铺地理位置
*/
@Override
public Point getShopLocation(Long shopId) {
List<Point> points = stringRedisTemplate.opsForGeo().position(SHOP_GEO_KEY, shopId.toString());
return points != null && !points.isEmpty() ? points.get(0) : null;
}
/**
* 计算两个店铺之间的距离
*/
@Override
public Distance calculateDistanceBetweenShops(Long shopId1, Long shopId2, Metric unit) {
return stringRedisTemplate.opsForGeo().distance(SHOP_GEO_KEY, shopId1.toString(), shopId2.toString(), unit);
}
/**
* 搜索用户附近的店铺
* @param longitude 用户经度
* @param latitude 用户纬度
* @param radius 搜索半径
* @param unit 半径单位
* @param count 最多返回数量
* @return 附近的店铺列表(按距离排序)
*/
@Override
public List<NearbyShopDTO> searchNearbyShops(double longitude, double latitude,
double radius, Metric unit, int count) {
// 构建查询条件
GeoRadiusCommandArgs args = GeoRadiusCommandArgs.newGeoRadiusArgs()
.includeDistance() // 包含距离信息
.includeCoordinates() // 包含经纬度
.sortAscending() // 按距离升序排列
.limit(count); // 限制返回数量
// 执行范围查询
GeoResults<GeoLocation<String>> results = stringRedisTemplate.opsForGeo()
.radius(SHOP_GEO_KEY, new Point(longitude, latitude), new Distance(radius, unit), args);
if (results == null) {
return Collections.emptyList();
}
// 转换结果
List<NearbyShopDTO> nearbyShops = new ArrayList<>();
for (GeoResult<GeoLocation<String>> result : results) {
GeoLocation<String> location = result.getContent();
NearbyShopDTO dto = new NearbyShopDTO();
dto.setShopId(Long.parseLong(location.getName()));
dto.setLongitude(location.getPoint().getX());
dto.setLatitude(location.getPoint().getY());
dto.setDistance(result.getDistance().getValue());
dto.setDistanceUnit(result.getDistance().getUnit().toString());
// 补充店铺其他信息(从数据库查询)
ShopDO shop = shopMapper.selectById(dto.getShopId());
if (shop != null) {
dto.setShopName(shop.getName());
dto.setShopLogo(shop.getLogo());
dto.setScore(shop.getScore());
}
nearbyShops.add(dto);
}
return nearbyShops;
}
/**
* 搜索指定店铺附近的其他店铺
*/
@Override
public List<NearbyShopDTO> searchNearbyShopsByShopId(Long shopId, double radius,
Metric unit, int count) {
// 实现逻辑类似,使用GEORADIUSBYMEMBER命令
// 代码省略...
}
/**
* 删除店铺地理位置
*/
@Override
public void removeShopLocation(Long shopId) {
// 由于GEO基于ZSet实现,删除使用ZREM命令
stringRedisTemplate.opsForZSet().remove(SHOP_GEO_KEY, shopId.toString());
}
}
地理空间类型的优势:
Redis 提供了丰富多样的数据类型,每种类型都有其独特的适用场景。掌握这些数据类型的特性和用法,能够帮助我们构建高效、可靠的分布式系统。
业务场景 | 推荐数据类型 | 核心优势 |
|---|---|---|
缓存、计数器、分布式锁 | String | 简单直观,操作高效 |
列表、队列、栈、最新消息 | List | 有序,支持两端操作 |
去重、集合运算、标签系统 | Set | 自动去重,支持交集 / 并集 |
排行榜、带权重的队列 | ZSet | 可排序,支持范围查询 |
存储对象、哈希表 | Hash | 适合存储对象属性,节省空间 |
延迟任务、定时任务 | ZSet(有序集合) | 可按时间排序,实现延迟执行 |
签到、活跃统计、二进制状态 | Bitmap | 空间效率极高,适合大量布尔状态 |
附近的人、位置搜索 | Geospatial | 支持地理位置计算和范围查询 |
合理设置过期时间:对缓存数据设置合理的过期时间(TTL),避免内存溢出
// 设置键过期时间(30分钟)
stringRedisTemplate.expire("key",30,TimeUnit.MINUTES);批量操作提升性能:使用批量命令(如 MGET、HMSET、Pipeline)减少网络往返
// 使用Pipeline批量操作
List<Object> results = stringRedisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public Object execute(RedisOperations operations) throws DataAccessException {
for (long i = 0; i < 1000; i++) {
operations.opsForValue().set("key:" + i, "value:" + i);
}
return null;
}
});避免大键(Big Key):单个键存储的数据不宜过大,否则会影响 Redis 性能和稳定性
防止缓存穿透:对不存在的键设置短期空值缓存,避免穿透到数据库
// 查询缓存,如果不存在则设置空值并短期过期
String value = stringRedisTemplate.opsForValue().get(key);
if (value == null) {
// 从数据库查询
value = dbService.queryData(key);
if (value == null) {
// 设置空值缓存,过期时间较短(如5分钟)
stringRedisTemplate.opsForValue().set(key, "", 5, TimeUnit.MINUTES);
} else {
stringRedisTemplate.opsForValue().set(key, value, 30, TimeUnit.MINUTES);
}
}
处理缓存击穿:热点数据永不过期或加锁保护,避免并发请求穿透到数据库
// 双重检查锁防止缓存击穿
String value = stringRedisTemplate.opsForValue().get(key);
if (value == null) {
synchronized (key.intern()) {
value = stringRedisTemplate.opsForValue().get(key);
if (value == null) {
// 从数据库查询并更新缓存
value = dbService.queryData(key);
stringRedisTemplate.opsForValue().set(key, value, 30, TimeUnit.MINUTES);
}
}
}
应对缓存雪崩:缓存过期时间加随机值,避免大量缓存同时失效
// 过期时间添加随机值,避免集中过期
int baseExpire = 30; // 基础过期时间(分钟)
int random = new Random().nextInt(10); // 0-9分钟随机值
stringRedisTemplate.opsForValue().set(key, value, baseExpire + random, TimeUnit.MINUTES);
使用命名空间:键名添加业务前缀(如user:info:1001),便于管理和排查问题
监控与告警:监控 Redis 的内存使用、命中率、响应时间等指标,设置合理的告警阈值
通过合理选择和使用 Redis 的数据类型,并遵循最佳实践,我们可以充分发挥 Redis 的性能优势,构建出高效、可靠的分布式系统。在实际开发中,还需要根据具体业务场景进行权衡和优化,不断积累经验,才能更好地驾驭这个强大的工具。