
在单体的应用开发场景中,在多线程的环境下,涉及并发同步的时候,为了保证一个代码块在同一时间只能由一个线程访问,我们一般可以使用synchronized语法和ReetrantLock去保证,这实际上是本地锁的方式。
也就是说,在同一个JVM内部,大家往往采用synchronized或者Lock的方式来解决多线程间的安全问题。但在分布式集群工作的开发场景中,在JVM之间,那么就需要一种更加高级的锁机制,来处理种跨JVM进程之间的线程安全问题.
解决方案是:使用分布式锁
总之,对于分布式场景,我们可以使用分布式锁,它是控制分布式系统之间互斥访问共享资源的一种方式。
比如说在一个分布式系统中,多台机器上部署了多个服务,当客户端一个用户发起一个数据插入请求时,如果没有分布式锁机制保证,那么那多台机器上的多个服务可能进行并发插入操作,导致数据重复插入,对于某些不允许有多余数据的业务来说,这就会造成问题。而分布式锁机制就是为了解决类似这类问题,保证多个服务之间互斥的访问共享资源,如果一个服务抢占了分布式锁,其他服务没获取到锁,就不进行后续操作。
大致意思如下图所示(不一定准确):

何为分布式锁?
分布式锁的条件:
分布式锁的实现:
分布式锁的实现由很多种,文件锁、数据库、redis等等,比较多;分布式锁常见的多种实现方式:
在实践中,还是redis做分布式锁性能会高一些
所谓悲观锁,悲观锁是对数据被的修改持悲观态度(认为数据在被修改的时候一定会存在并发问题),因此在整个数据处理过程中将数据锁定。
悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在应用层中实现了加锁机制,也无法保证外部系统不会修改数据)。
数据库的行锁、表锁、排他锁等都是悲观锁,这里以行锁为例,进行介绍。以我们常用的MySQL为例,我们通过使用select...for update语句, 执行该语句后,会在表上加持行锁,一直到事务提交,解除行锁。
使用场景举例:
在秒杀案例中,生成订单和扣减库存的操作,可以通过商品记录的行锁,进行保护。们通过使用select...for update语句,在查询商品表库存时将该条记录加锁,待下单减库存完成后,再释放锁。
示例的SQL如下:
//0.开始事务
begin;
//1.查询出商品信息
select stockCount from seckill_good where id=1 for update;
//2.根据商品信息生成订单
insert into seckill_order (id,good_id) values (null,1);
//3.修改商品stockCount减一
update seckill_good set stockCount=stockCount-1 where id=1;
//4.提交事务
commit;以上,在对id = 1的记录修改前,先通过for update的方式进行加锁,然后再进行修改。这就是比较典型的悲观锁策略。
如果以上修改库存的代码发生并发,同一时间只有一个线程可以开启事务并获得id=1的锁,其它的事务必须等本次事务提交之后才能执行。这样我们可以保证当前的数据不会被其它事务修改。
我们使用select_for_update,另外一定要写在事务中. 注意:要使用悲观锁,我们必须关闭mysql数据库中自动提交的属性,命令set autocommit=0;即可关闭,因为MySQL默认使用autocommit模式,也就是说,当你执行一个更新操作后,MySQL会立刻将结果进行提交。
悲观锁的实现,往往依靠数据库提供的锁机制。在数据库中,悲观锁的流程如下:
使用乐观锁就不需要借助数据库的锁机制了。
乐观锁的概念中其实已经阐述了他的具体实现细节:主要就是两个步骤:冲突检测和数据更新。其实现方式有一种比较典型的就是Compare and Swap(CAS)技术。
CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。
CAS的实现中,在表中增加一个version字段,操作前先查询version信息,在数据提交时检查version字段是否被修改,如果没有被修改则进行提交,否则认为是过期数据。
比如前面的扣减库存问题,通过乐观锁可以实现如下:
//1.查询出商品信息
select stockCount, version from seckill_good where id=1;
//2.根据商品信息生成订单
insert into seckill_order (id,good_id) values (null,1);
//3.修改商品库存
update seckill_good set stockCount=stockCount-1, version = version+1 where id=1, version=version;以上,我们在更新之前,先查询一下库存表中当前版本(version),然后在做update的时候,以version 作为一个修改条件。
当我们提交更新的时候,判断数据库表对应记录的当前version与第一次取出来的version进行比对,如果数据库表当前version与第一次取出来的version相等,则予以更新,否则认为是过期数据。
CAS 乐观锁有两个问题:
(1) CAS 存在一个比较重要的问题,即ABA问题. 解决的办法是version字段顺序递增。
(2) 乐观锁的方式,在高并发时,只有一个线程能执行成功,会造成大量的失败,这给用户的体验显然是很不好的。
除了在数据库层面加分布式锁,通常还可以使用以下更高性能、更高可用的分布式锁:
有关zookeeper分布式锁的原理和实现,具体请参见下面的博客: Zookeeper 分布式锁 (图解+秒懂+史上最全)
或者阅读笔者的《Java高并发核心编程(卷1加强版)》

本文重点介绍Redis分布式锁,分为两个维度进行介绍:
(1)基于Jedis手工造轮子分布式锁
(2)介绍Redission 分布式锁的使用和原理。
我们首先讲解 Jedis 普通分布式锁实现,并且是纯手工的模式,从最为基础的Redis命令开始。
只有充分了解与分布式锁相关的普通Redis命令,才能更好的了解高级的Redis分布式锁的实现,因为高级的分布式锁的实现完全基于普通Redis命令。
Redis发展到现在,几种常见的部署架构有:
从分布式锁的角度来说, 无论是单机模式、主从模式、哨兵模式、集群模式,其原理都是类同的。 只是主从模式、哨兵模式、集群模式的更加的高可用、或者更加高并发。
所以,接下来先基于单机模式,基于Jedis手工造轮子实现自己的分布式锁。
Redis分布式锁机制,主要借助setnx和expire两个命令完成。
setnx命令:
SETNX 是SET if Not eXists的简写。将 key 的值设为 value,当且仅当 key 不存在; 若给定的 key 已经存在,则 SETNX 不做任何动作。
下面为客户端使用示例:
127.0.0.1:6379> set lock "unlock"
OK
127.0.0.1:6379> setnx lock "unlock"
(integer) 0
127.0.0.1:6379> setnx lock "lock"
(integer) 0
127.0.0.1:6379> expire命令:
expire命令为 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除. 其格式为:
EXPIRE key seconds
下面为客户端使用示例:
127.0.0.1:6379> expire lock 10
(integer) 1
127.0.0.1:6379> ttl lock
8通过Redis的setnx、expire命令可以实现简单的锁机制:
线程调用setnx方法成功返回1认为加锁成功,其他线程要等到当前线程业务操作完成释放锁后,才能再次调用setnx加锁成功。

以上简单redis分布式锁的问题:
如果出现了这么一个问题:如果setnx是成功的,但是expire设置失败,一旦出现了释放锁失败,或者没有手工释放,那么这个锁永远被占用,其他线程永远也抢不到锁。
所以,需要保障setnx和expire两个操作的原子性,要么全部执行,要么全部不执行,二者不能分开。
解决的办法有两种:
使用set的命令时,同时设置过期时间的示例如下:
127.0.0.1:6379> set unlock "234" EX 100 NX
(nil)
127.0.0.1:6379>
127.0.0.1:6379> set test "111" EX 100 NX
OK这样就完美的解决了分布式锁的原子性; set 命令的完整格式:
set key value EX seconds NX|XX
EX seconds:设置失效时长,单位秒
PX milliseconds:设置失效时长,单位毫秒
NX:key不存在时设置value,成功返回OK,失败返回(nil)
XX:key存在时设置value,成功返回OK,失败返回(nil)使用set命令实现加锁操作,先展示加锁的简单代码实习,再带大家慢慢解释为什么这样实现。
加锁的简单代码实现
package com.crazymaker.springcloud.standard.lock;
@Slf4j
@Data
@AllArgsConstructor
public class JedisCommandLock {
private RedisTemplate redisTemplate;
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
/**
* 尝试获取分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
}可以看到,我们加锁用到了Jedis的set Api:
jedis.set(String key, String value, String nxxx, String expx, int time)
这个set()方法一共有五个形参:
UUID.randomUUID().toString()方法生成。总的来说,执行上面的set()方法就只会导致两种结果:
心细的童鞋就会发现了,我们的加锁代码满足前面描述的四个条件中的三个。
那么这段Lua代码的功能是什么呢?
其实很简单,首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。
编写个分布式锁服务,用于加载lua脚本,创建 分布式锁,代码如下:
package com.crazymaker.springcloud.standard.lock;
import com.crazymaker.springcloud.common.util.IOUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@Slf4j
@Data
public class RedisLockService
{
private RedisTemplate redisTemplate;
static String lockLua = "script/lock.lua";
static String unLockLua = "script/unlock.lua";
static RedisScript<Long> lockScript = null;
static RedisScript<Long> unLockScript = null;
{
String script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),lockLua);
// String script = FileUtil.readString(lockLua, Charset.forName("UTF-8" ));
if(StringUtils.isEmpty(script))
{
log.error("lua load failed:"+lockLua);
}
lockScript = new DefaultRedisScript<>(script, Long.class);
// script = FileUtil.readString(unLockLua, Charset.forName("UTF-8" ));
script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),unLockLua);
if(StringUtils.isEmpty(script))
{
log.error("lua load failed:"+unLockLua);
}
unLockScript = new DefaultRedisScript<>(script, Long.class);
}
public RedisLockService(RedisTemplate redisTemplate)
{
this.redisTemplate = redisTemplate;
}
public Lock getLock(String lockKey, String lockValue) {
JedisLock lock=new JedisLock(lockKey,lockValue);
lock.setRedisTemplate(redisTemplate);
lock.setLockScript(lockScript);
lock.setUnLockScript(unLockScript);
return lock;
}
}接下来,终于可以上测试用例了
package com.crazymaker.springcloud.lock;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {DemoCloudApplication.class})
// 指定启动类
public class RedisLockTest {
@Resource
RedisLockService redisLockService;
private ExecutorService pool = Executors.newFixedThreadPool(10);
@Test
public void testLock() {
int threads = 10;
final int[] count = {0};
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
String lockValue = UUID.randomUUID().toString();
try {
Lock lock = redisLockService.getLock("test:lock:1", lockValue);
boolean locked = lock.tryLock(10, TimeUnit.SECONDS);
if (locked) {
for (int j = 0; j < 1000; j++) {
count[0]++;
}
log.info("count = " + count[0]);
lock.unlock();
} else {
System.out.println("抢锁失败");
}
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("10个线程每个累加1000为: = " + count[0]);
//输出统计结果
float time = System.currentTimeMillis() - start;
System.out.println("运行的时长为(ms):" + time);
System.out.println("每一次执行的时长为(ms):" + time / count[0]);
}
}执行用例,结果如下:
2021-05-04 23:02:11.900 INFO 22120 --- [pool-1-thread-7] c.c.springcloud.lock.RedisLockTest LN:50 count = 6000
2021-05-04 23:02:11.901 INFO 22120 --- [pool-1-thread-1] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新开始,turn:3,剩余时间:9585
2021-05-04 23:02:11.902 INFO 22120 --- [pool-1-thread-1] c.c.springcloud.lock.RedisLockTest LN:50 count = 7000
2021-05-04 23:02:12.100 INFO 22120 --- [pool-1-thread-4] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新开始,turn:3,剩余时间:9586
2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新开始,turn:3,剩余时间:9585
2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-8] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新开始,turn:3,剩余时间:9585
2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-4] c.c.springcloud.lock.RedisLockTest LN:50 count = 8000
2021-05-04 23:02:12.102 INFO 22120 --- [pool-1-thread-8] c.c.springcloud.lock.RedisLockTest LN:50 count = 9000
2021-05-04 23:02:12.304 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新开始,turn:4,剩余时间:9383
2021-05-04 23:02:12.307 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.lock.RedisLockTest LN:50 count = 10000
10个线程每个累加1000为: = 10000
运行的时长为(ms):827.0
每一次执行的时长为(ms):0.0827下面有一个简单的使用锁的例子,在10秒内占着锁:
//写数据到文件
function writeData(filename, data) {
boolean locked = lock.tryLock(10, TimeUnit.SECONDS);
if (!locked) {
throw 'Failed to acquire lock';
}
try {
//将数据写到文件
var file = storage.readFile(filename);
var updated = updateContents(file, data);
storage.writeFile(filename, updated);
} finally {
lock.unlock();
}
}问题是:如果在写文件过程中,发生了 fullGC,并且其时间跨度较长, 超过了10秒, 那么,分布式就自动释放了。
在此过程中,client2 抢到锁,写了文件。
client1 的fullGC完成后,也继续写文件,注意,此时client1 的并没有占用锁,此时写入会导致文件数据错乱,发生线程安全问题。
这就是STW导致的锁过期问题。
STW导致的锁过期问题,具体如下图所示:

STW导致的锁过期问题,大概的解决方案,有: 1: 模拟CAS乐观锁的方式,增加版本号 2:watch dog自动延期机制
1: 模拟CAS乐观锁的方式,增加版本号(如下图中的token)

此方案如果要实现,需要调整业务逻辑,与之配合,所以会入侵代码。
2:watch dog自动延期机制
客户端1加锁的锁key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?
简单!只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。
redission,采用的就是这种方案, 此方案不会入侵业务代码。
注意:
单机版的watch dog 并不能解决 STW的过期问题, 需要分布式版本的 watch dog, 独立的看门狗服务。
锁删除之后, 取消看门狗服务的 对应的key记录, 当然,这就使得系统变得复杂, 还要保证看门狗服务的高并发、高可用、数据一致性的问题。
作为 Java 开发人员,我们若想在程序中集成 Redis,必须使用 Redis 的第三方库。目前大家使用的最多的第三方库是jedis。
和SpringCloud gateway一样,Redisson也是基于Netty实现的,是更高性能的第三方库。 所以,这里推荐大家使用Redission替代 jedis。
在使用Redission之前,建议大家先掌握Netty的知识。 推荐大家阅读被很多小伙伴评价为史上最为易懂的NIO、Netty书籍:《Java高并发核心编程(卷1)》

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还实现了可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等,还提供了许多分布式服务。

Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

1.概况对比
Jedis是Redis的java实现的客户端,其API提供了比较全面的的Redis命令的支持,Redisson实现了分布式和可扩展的的java数据结构,和Jedis相比,功能较为简单,不支持字符串操作,不支持排序,事物,管道,分区等Redis特性。Redisson的宗旨是促进使用者对Redis的关注分离,从而让使用者能够将精力更集中的放在处理业务逻辑上。
2.可伸缩性
Jedis使用阻塞的I/O,且其方法调用都是同步的,程序流程要等到sockets处理完I/O才能执行,不支持异步,Jedis客户端实例不是线程安全的,所以需要通过连接池来使用Jedis。
Redisson使用非阻塞的I/O和基于Netty框架的事件驱动的通信层,其方法调用时异步的。Redisson的API是线程安全的,所以操作单个Redisson连接来完成各种操作。
3.第三方框架整合
Redisson在Redis的基础上实现了java缓存标准规范;Redisson还提供了Spring Session回话管理器的实现。

安装 Redisson 最便捷的方法是使用 Maven 或者 Gradle:
•Maven
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.4</version>
</dependency>•Gradle
compile group: 'org.redisson', name: 'redisson', version: '3.11.4'目前 Redisson 最新版是 3.11.4,当然你也可以通过搜索 Maven 中央仓库 mvnrepository1 来找到 Redisson 的各种版本。
RedissonClient有多种模式,主要的模式有:
首先介绍单节点模式。
单节点模式的程序化配置方法,大致如下:
Config config = new Config();
config.useSingleServer().setAddress("redis://myredisserver:6379");
RedissonClient redisson = Redisson.create(config);xxxxxxxxxx Config config = new Config();config.useSingleServer().setAddress("redis://myredisserver:6379");RedissonClient redisson = Redisson.create(config);// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();SingleServerConfig singleConfig = config.useSingleServer();SingleServerConfig类的设置参数如下:
address(节点地址) 可以通过
host:port的格式来指定节点地址。 subscriptionConnectionMinimumIdleSize(发布和订阅连接的最小空闲连接数) 默认值:1用于发布和订阅连接的最小保持连接数(长连接)。Redisson内部经常通过发布和订阅来实现许多功能。长期保持一定数量的发布订阅连接是必须的。 subscriptionConnectionPoolSize(发布和订阅连接池大小) 默认值:50用于发布和订阅连接的连接池最大容量。连接池的连接数量自动弹性伸缩。 connectionMinimumIdleSize(最小空闲连接数) 默认值:32最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时写入反应速度。 connectionPoolSize(连接池大小) 默认值:64连接池最大容量。连接池的连接数量自动弹性伸缩。 dnsMonitoring(是否启用DNS监测) 默认值:false在启用该功能以后,Redisson将会监测DNS的变化情况。 dnsMonitoringInterval(DNS监测时间间隔,单位:毫秒) 默认值:5000监测DNS的变化情况的时间间隔。 idleConnectionTimeout(连接空闲超时,单位:毫秒) 默认值:10000如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。 connectTimeout(连接超时,单位:毫秒) 默认值:10000同节点建立连接时的等待超时。时间单位是毫秒。 timeout(命令等待超时,单位:毫秒) 默认值:3000等待节点回复命令的时间。该时间从命令发送成功时开始计时。 retryAttempts(命令失败重试次数) 默认值:3如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。 retryInterval(命令重试发送时间间隔,单位:毫秒) 默认值:1500在一条命令发送失败以后,等待重试发送的时间间隔。时间单位是毫秒。 reconnectionTimeout(重新连接时间间隔,单位:毫秒) 默认值:3000当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。 failedAttempts(执行失败最大次数) 默认值:3在某个节点执行相同或不同命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。 database(数据库编号) 默认值:0尝试连接的数据库编号。 password(密码) 默认值:null用于节点身份验证的密码。 subscriptionsPerConnection(单个连接最大订阅数量) 默认值:5每个连接的最大订阅数量。 clientName(客户端名称) 默认值:null在Redis节点里显示的客户端名称。 sslEnableEndpointIdentification(启用SSL终端识别) 默认值:true开启SSL终端识别能力。 sslProvider(SSL实现方式) 默认值:JDK确定采用哪种方式(JDK或OPENSSL)来实现SSL连接。 sslTruststore(SSL信任证书库路径) 默认值:null指定SSL信任证书库的路径。 sslTruststorePassword(SSL信任证书库密码) 默认值:null指定SSL信任证书库的密码。 sslKeystore(SSL钥匙库路径) 默认值:null指定SSL钥匙库的路径。 sslKeystorePassword(SSL钥匙库密码) 默认值:null指定SSL钥匙库的密码。
Redisson有多种模式,首先介绍单机模式的整合。
<!-- redisson-springboot -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.4</version>
</dependency>spring:
redis:
host: 127.0.0.1
port: 6379
database: 0
timeout: 5000RedissonConfig.java
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Autowired
private RedisProperties redisProperties;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + "");
config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());
config.useSingleServer().setDatabase(3);
return Redisson.create(config);
}
}由于redission可以有多种模式,处于学习的目的,将多种模式封装成一个start,可以学习一下starter的制作。

封装一个RedissonManager,通过策略模式,根据不同的配置类型,创建 RedissionConfig实例,然后创建RedissionClient对象。

Redission模拟了Java的面向对象编程思想,可以简单理解为一切皆为对象。
每一个 Redisson 对象 实现了**RObject** and RExpirable 两个interfaces.
Usage example:
RObject object = redisson.get...()
object.sizeInMemory();
object.delete();
object.rename("newname");
object.isExists();
// catch expired event
object.addListener(new ExpiredObjectListener() {
...
});
// catch delete event
object.addListener(new DeletedObjectListener() {
...
});每一个Redisson 对象的名字,就是 Redis中的 Key.
RMap map = redisson.getMap("mymap");
map.getName(); // = mymap可以通过 RKeys 接口操作Redis中的keys.
Usage example:
RKeys keys = redisson.getKeys();
Iterable<String> allKeys = keys.getKeys();
Iterable<String> foundedKeys = keys.getKeysByPattern('key*');
long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");
long deletedKeysAmount = keys.deleteByPattern("test?");
String randomKey = keys.randomKey();
long keysAmount = keys.count();
keys.flushall();
keys.flushdb();Redisson通过RBucket接口代表可以访问任何类型的基础对象,或者普通对象。
RBucket有一系列的工具方法,如compareAndSet(),get(),getAndDelete(),getAndSet(),set(),size(),trySet()等等,用于设值/取值/获取尺寸。
RBucket普通对象的最大大小,为512兆字节。
RBucket<AnyObject> bucket = redisson.getBucket("anyObject");
bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();
bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));
Lua是一种开源、简单易学、轻量小巧的脚本语言,用标准C语言编写。
其设计的目的就是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
Redis从2.6版本开始支持Lua脚本,Redis使用Lua可以:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testLuaExamples() {
// 默认连接上 127.0.0.1:6379
RedissonClient redisson = redissonManager.getRedisson();
redisson.getBucket("redission:test:foo").set("bar");
String r = redisson.getScript().eval(RScript.Mode.READ_ONLY,
"return redis.call('get', 'redission:test:foo')", RScript.ReturnType.VALUE);
System.out.println("foo: " + r);
// 通过预存的脚本进行同样的操作
RScript s = redisson.getScript();
// 首先将脚本加载到Redis
String sha1 = s.scriptLoad("return redis.call('get', 'redission:test:foo')");
// 返回值 res == 282297a0228f48cd3fc6a55de6316f31422f5d17
System.out.println("sha1: " + sha1);
// 再通过SHA值调用脚本
Future<Object> r1 = redisson.getScript().evalShaAsync(RScript.Mode.READ_ONLY,
sha1,
RScript.ReturnType.VALUE,
Collections.emptyList());
try {
System.out.println("res: " + r1.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
client.shutdown();
}
}运行上面的代码时,将会看到以下输出:
foo: bar
sha1: 282297a0228f48cd3fc6a55de6316f31422f5d17
res: bar
RLock 是 Java 中可重入锁的分布式实现,下面的代码演示了 RLock 的用法:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testLockExamples() {
// 默认连接上 127.0.0.1:6379
RedissonClient redisson = redissonManager.getRedisson();
// RLock 继承了 java.util.concurrent.locks.Lock 接口
RLock lock = redisson.getLock("redission:test:lock:1");
final int[] count = {0};
int threads = 10;
ExecutorService pool = Executors.newFixedThreadPool(10);
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
for (int j = 0; j < 1000; j++) {
lock.lock();
count[0]++;
lock.unlock();
}
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("10个线程每个累加1000为: = " + count[0]);
//输出统计结果
float time = System.currentTimeMillis() - start;
System.out.println("运行的时长为:" + time);
System.out.println("每一次执行的时长为:" + time/count[0]);
}
}此代码将产生以下输出:
10个线程每个累加1000为: = 10000
运行的时长为:14172.0
每一次执行的时长为:1.4172RAtomicLong 是 Java 中 AtomicLong 类的分布式“替代品”,用于在并发环境中保存长值。以下示例代码演示了 RAtomicLong 的用法:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testRAtomicLongExamples() {
// 默认连接上 127.0.0.1:6379
RedissonClient redisson = redissonManager.getRedisson();
RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
// 线程数
final int threads = 10;
// 每条线程的执行轮数
final int turns = 1000;
ExecutorService pool = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++)
{
pool.submit(() ->
{
try
{
for (int j = 0; j < turns; j++)
{
atomicLong.incrementAndGet();
}
} catch (Exception e)
{
e.printStackTrace();
}
});
}
ThreadUtil.sleepSeconds(5);
System.out.println("atomicLong: " + atomicLong.get());
redisson.shutdown();
}
}此代码的输出将是:
atomicLong: 10000
基于Redis的Redisson分布式整长型累加器(LongAdder)采用了与java.util.concurrent.atomic.LongAdder类似的接口。通过利用客户端内置的LongAdder对象,为分布式环境下递增和递减操作提供了很高得性能。据统计其性能最高比分布式AtomicLong对象快 12000 倍。
完美适用于分布式统计计量场景。下面是RLongAdder的使用案例:
RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();以下示例代码演示了 RLongAdder 的用法:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testRAtomicLongExamples() {
// 默认连接上 127.0.0.1:6379
RedissonClient redisson = redissonManager.getRedisson();
RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
// 线程数
final int threads = 10;
// 每条线程的执行轮数
final int turns = 1000;
ExecutorService pool = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++)
{
pool.submit(() ->
{
try
{
for (int j = 0; j < turns; j++)
{
atomicLong.incrementAndGet();
}
} catch (Exception e)
{
e.printStackTrace();
}
});
}
ThreadUtil.sleepSeconds(5);
System.out.println("atomicLong: " + atomicLong.get());
redisson.shutdown();
}
}此代码将产生以下输出:
longAdder: 10000
运行的时长为:5085.0
每一次执行的时长为:0.5085当不再使用整长型累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。
RLongAdder atomicLong = ...
atomicLong.destroy();Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。Redisson提供了以下几种的对象编码应用,以供大家选择:
编码类名称 | 说明 |
|---|---|
org.redisson.codec.JsonJacksonCodec | Jackson JSON 编码 默认编码 |
org.redisson.codec.AvroJacksonCodec | Avro 一个二进制的JSON编码 |
org.redisson.codec.SmileJacksonCodec | Smile 另一个二进制的JSON编码 |
org.redisson.codec.CborJacksonCodec | CBOR 又一个二进制的JSON编码 |
org.redisson.codec.MsgPackJacksonCodec | MsgPack 再来一个二进制的JSON编码 |
org.redisson.codec.IonJacksonCodec | Amazon Ion 亚马逊的Ion编码,格式与JSON类似 |
org.redisson.codec.KryoCodec | Kryo 二进制对象序列化编码 |
org.redisson.codec.SerializationCodec | JDK序列化编码 |
org.redisson.codec.FstCodec | FST 10倍于JDK序列化性能而且100%兼容的编码 |
org.redisson.codec.LZ4Codec | LZ4 压缩型序列化对象编码 |
org.redisson.codec.SnappyCodec | Snappy 另一个压缩型序列化对象编码 |
org.redisson.client.codec.JsonJacksonMapCodec | 基于Jackson的映射类使用的编码。可用于避免序列化类的信息,以及用于解决使用byte[]遇到的问题。 |
org.redisson.client.codec.StringCodec | 纯字符串编码(无转换) |
org.redisson.client.codec.LongCodec | 纯整长型数字编码(无转换) |
org.redisson.client.codec.ByteArrayCodec | 字节数组编码 |
org.redisson.codec.CompositeCodec | 用来组合多种不同编码在一起 |
由Redisson默认的编码器为二进制编码器,为了序列化后的内容可见,需要使用Json文本序列化编码工具类。Redisson提供了编码器 JsonJacksonCodec,作为Json文本序列化编码工具类。
问题是:JsonJackson在序列化有双向引用的对象时,会出现无限循环异常。而fastjson在检查出双向引用后会自动用引用符$ref替换,终止循环。
所以,一些特殊场景中:用fastjson能 正常序列化到redis,而JsonJackson则抛出无限循环异常。
为了序列化后的内容可见,所以不用redission其他自带的,自行实现fastjson编码器:
package com.crayon.distributedredissionspringbootstarter.codec;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import java.io.IOException;
public class FastjsonCodec extends BaseCodec {
private final Encoder encoder = in -> {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try {
ByteBufOutputStream os = new ByteBufOutputStream(out);
JSON.writeJSONString(os, in, SerializerFeature.WriteClassName);
return os.buffer();
} catch (IOException e) {
out.release();
throw e;
} catch (Exception e) {
out.release();
throw new IOException(e);
}
};
private final Decoder<Object> decoder = (buf, state) ->
JSON.parseObject(new ByteBufInputStream(buf), Object.class);
@Override
public Decoder<Object> getValueDecoder() {
return decoder;
}
@Override
public Encoder getValueEncoder() {
return encoder;
}
}替换的方法如下:
*/
@Slf4j
public class StandaloneConfigImpl implements RedissonConfigService {
@Override
public Config createRedissonConfig(RedissonConfig redissonConfig) {
Config config = new Config();
try {
String address = redissonConfig.getAddress();
String password = redissonConfig.getPassword();
int database = redissonConfig.getDatabase();
String redisAddr = GlobalConstant.REDIS_CONNECTION_PREFIX.getConstant_value() + address;
config.useSingleServer().setAddress(redisAddr);
config.useSingleServer().setDatabase(database);
//密码可以为空
if (!StringUtils.isEmpty(password)) {
config.useSingleServer().setPassword(password);
}
log.info("初始化[单机部署]方式Config,redisAddress:" + address);
// config.setCodec( new FstCodec());
config.setCodec( new FastjsonCodec());
} catch (Exception e) {
log.error("单机部署 Redisson init error", e);
}
return config;
}
}哨兵模式即sentinel模式,配置Redis哨兵服务的官方文档在这里。
哨兵模式实现代码和单机模式几乎一样,唯一的不同就是Config的构造.
程序化配置哨兵模式的方法如下:
Config config = new Config();
config.useSentinelServers()
.setMasterName("mymaster")
// use "rediss://" for SSL connection
.addSentinelAddress("redis://127.0.0.1:26389", "redis://127.0.0.1:26379")
.addSentinelAddress("redis://127.0.0.1:26319");
RedissonClient redisson = Redisson.create(config);Redisson的哨兵模式的使用方法如下:
SentinelServersConfig sentinelConfig = config.useSentinelServers();SentinelServersConfig配置参数如下:
配置Redis哨兵服务的官方文档在这里。Redisson的哨兵模式的使用方法如下:
SentinelServersConfig sentinelConfig = config.useSentinelServers();SentinelServersConfig类的设置参数如下: dnsMonitoringInterval(DNS监控间隔,单位:毫秒) 默认值:5000用来指定检查节点DNS变化的时间间隔。使用的时候应该确保JVM里的DNS数据的缓存时间保持在足够低的范围才有意义。用-1来禁用该功能。 masterName(主服务器的名称) 主服务器的名称是哨兵进程中用来监测主从服务切换情况的。 addSentinelAddress(添加哨兵节点地址) 可以通过host:port的格式来指定哨兵节点的地址。多个节点可以一次性批量添加。 readMode(读取操作的负载均衡模式) 默认值:SLAVE(只在从服务节点里读取) 注:在从服务节点里读取的数据说明已经至少有两个节点保存了该数据,确保了数据的高可用性。 设置读取操作选择节点的模式。可用值为:SLAVE- 只在从服务节点里读取。MASTER- 只在主服务节点里读取。MASTER_SLAVE- 在主从服务节点里都可以读取。 subscriptionMode(订阅操作的负载均衡模式) 默认值:SLAVE(只在从服务节点里订阅) 设置订阅操作选择节点的模式。可用值为:SLAVE- 只在从服务节点里订阅。MASTER- 只在主服务节点里订阅。 loadBalancer(负载均衡算法类的选择) 默认值:org.redisson.connection.balancer.RoundRobinLoadBalancer在使用多个Redis服务节点的环境里,可以选用以下几种负载均衡方式选择一个节点:org.redisson.connection.balancer.WeightedRoundRobinBalancer- 权重轮询调度算法org.redisson.connection.balancer.RoundRobinLoadBalancer- 轮询调度算法org.redisson.connection.balancer.RandomLoadBalancer- 随机调度算法 subscriptionConnectionMinimumIdleSize(从节点发布和订阅连接的最小空闲连接数) 默认值:1多从节点的环境里,每个 从服务节点里用于发布和订阅连接的最小保持连接数(长连接)。Redisson内部经常通过发布和订阅来实现许多功能。长期保持一定数量的发布订阅连接是必须的。 subscriptionConnectionPoolSize(从节点发布和订阅连接池大小) 默认值:50多从节点的环境里,每个 从服务节点里用于发布和订阅连接的连接池最大容量。连接池的连接数量自动弹性伸缩。 slaveConnectionMinimumIdleSize(从节点最小空闲连接数) 默认值:32多从节点的环境里,每个 从服务节点里用于普通操作(非 发布和订阅)的最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时读取反映速度。 slaveConnectionPoolSize(从节点连接池大小) 默认值:64多从节点的环境里,每个 从服务节点里用于普通操作(非 发布和订阅)连接的连接池最大容量。连接池的连接数量自动弹性伸缩。 masterConnectionMinimumIdleSize(主节点最小空闲连接数) 默认值:32多从节点的环境里,每个 主节点的最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时写入反应速度。 masterConnectionPoolSize(主节点连接池大小) 默认值:64主节点的连接池最大容量。连接池的连接数量自动弹性伸缩。 idleConnectionTimeout(连接空闲超时,单位:毫秒) 默认值:10000如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。 connectTimeout(连接超时,单位:毫秒) 默认值:10000同任何节点建立连接时的等待超时。时间单位是毫秒。 timeout(命令等待超时,单位:毫秒) 默认值:3000等待节点回复命令的时间。该时间从命令发送成功时开始计时。 retryAttempts(命令失败重试次数) 默认值:3如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。 retryInterval(命令重试发送时间间隔,单位:毫秒) 默认值:1500在一条命令发送失败以后,等待重试发送的时间间隔。时间单位是毫秒。 reconnectionTimeout(重新连接时间间隔,单位:毫秒) 默认值:3000当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。 failedAttempts(执行失败最大次数) 默认值:3在某个节点执行相同或不同命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。 database(数据库编号) 默认值:0尝试连接的数据库编号。 password(密码) 默认值:null用于节点身份验证的密码。 subscriptionsPerConnection(单个连接最大订阅数量) 默认值:5每个连接的最大订阅数量。 clientName(客户端名称) 默认值:null在Redis节点里显示的客户端名称。 sslEnableEndpointIdentification(启用SSL终端识别) 默认值:true开启SSL终端识别能力。 sslProvider(SSL实现方式) 默认值:JDK确定采用哪种方式(JDK或OPENSSL)来实现SSL连接。 sslTruststore(SSL信任证书库路径) 默认值:null指定SSL信任证书库的路径。 sslTruststorePassword(SSL信任证书库密码) 默认值:null指定SSL信任证书库的密码。 sslKeystore(SSL钥匙库路径) 默认值:null指定SSL钥匙库的路径。 sslKeystorePassword(SSL钥匙库密码) 默认值:null指定SSL钥匙库的密码。
通过属性文件,配置的示例如下:
---
sentinelServersConfig:
idleConnectionTimeout: 10000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
failedSlaveReconnectionInterval: 3000
failedSlaveCheckInterval: 60000
password: null
subscriptionsPerConnection: 5
clientName: null
loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
subscriptionConnectionMinimumIdleSize: 1
subscriptionConnectionPoolSize: 50
slaveConnectionMinimumIdleSize: 24
slaveConnectionPoolSize: 64
masterConnectionMinimumIdleSize: 24
masterConnectionPoolSize: 64
readMode: "SLAVE"
subscriptionMode: "SLAVE"
sentinelAddresses:
- "redis://127.0.0.1:26379"
- "redis://127.0.0.1:26389"
masterName: "mymaster"
database: 0
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.MarshallingCodec> {}
transportMode: "NIO"介绍配置Redis主从服务组态的文档在这里.
程序化配置主从模式的方法如下:
Config config = new Config();
config.useMasterSlaveServers()
// use "rediss://" for SSL connection
.setMasterAddress("redis://127.0.0.1:6379")
.addSlaveAddress("redis://127.0.0.1:6389", "redis://127.0.0.1:6332", "redis://127.0.0.1:6419")
.addSlaveAddress("redis://127.0.0.1:6399");
RedissonClient redisson = Redisson.create(config);主从模式使用到MasterSlaveServersConfig :
MasterSlaveServersConfig masterSlaveConfig = config.useMasterSlaveServers();
MasterSlaveServersConfig 类的设置参数如下:
dnsMonitoringInterval(DNS监控间隔,单位:毫秒) 默认值:
5000用来指定检查节点DNS变化的时间间隔。使用的时候应该确保JVM里的DNS数据的缓存时间保持在足够低的范围才有意义。用-1来禁用该功能。 masterAddress(主节点地址) 可以通过host:port的格式来指定主节点地址。 addSlaveAddress(添加从主节点地址) 可以通过host:port的格式来指定从节点的地址。多个节点可以一次性批量添加。 readMode(读取操作的负载均衡模式) 默认值:SLAVE(只在从服务节点里读取) 注:在从服务节点里读取的数据说明已经至少有两个节点保存了该数据,确保了数据的高可用性。 设置读取操作选择节点的模式。可用值为:SLAVE- 只在从服务节点里读取。MASTER- 只在主服务节点里读取。MASTER_SLAVE- 在主从服务节点里都可以读取。 subscriptionMode(订阅操作的负载均衡模式) 默认值:SLAVE(只在从服务节点里订阅) 设置订阅操作选择节点的模式。可用值为:SLAVE- 只在从服务节点里订阅。MASTER- 只在主服务节点里订阅。 loadBalancer(负载均衡算法类的选择) 默认值:org.redisson.connection.balancer.RoundRobinLoadBalancer在使用多个Redis服务节点的环境里,可以选用以下几种负载均衡方式选择一个节点:org.redisson.connection.balancer.WeightedRoundRobinBalancer- 权重轮询调度算法org.redisson.connection.balancer.RoundRobinLoadBalancer- 轮询调度算法org.redisson.connection.balancer.RandomLoadBalancer- 随机调度算法 subscriptionConnectionMinimumIdleSize(从节点发布和订阅连接的最小空闲连接数) 默认值:1多从节点的环境里,每个 从服务节点里用于发布和订阅连接的最小保持连接数(长连接)。Redisson内部经常通过发布和订阅来实现许多功能。长期保持一定数量的发布订阅连接是必须的。 subscriptionConnectionPoolSize(从节点发布和订阅连接池大小) 默认值:50多从节点的环境里,每个 从服务节点里用于发布和订阅连接的连接池最大容量。连接池的连接数量自动弹性伸缩。 slaveConnectionMinimumIdleSize(从节点最小空闲连接数) 默认值:32多从节点的环境里,每个 从服务节点里用于普通操作(非 发布和订阅)的最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时读取反映速度。 slaveConnectionPoolSize(从节点连接池大小) 默认值:64多从节点的环境里,每个 从服务节点里用于普通操作(非 发布和订阅)连接的连接池最大容量。连接池的连接数量自动弹性伸缩。 masterConnectionMinimumIdleSize(主节点最小空闲连接数) 默认值:32多从节点的环境里,每个 主节点的最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时写入反应速度。 masterConnectionPoolSize(主节点连接池大小) 默认值:64主节点的连接池最大容量。连接池的连接数量自动弹性伸缩。 idleConnectionTimeout(连接空闲超时,单位:毫秒) 默认值:10000如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。 connectTimeout(连接超时,单位:毫秒) 默认值:10000同任何节点建立连接时的等待超时。时间单位是毫秒。 timeout(命令等待超时,单位:毫秒) 默认值:3000等待节点回复命令的时间。该时间从命令发送成功时开始计时。 retryAttempts(命令失败重试次数) 默认值:3如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。 retryInterval(命令重试发送时间间隔,单位:毫秒) 默认值:1500在一条命令发送失败以后,等待重试发送的时间间隔。时间单位是毫秒。 reconnectionTimeout(重新连接时间间隔,单位:毫秒) 默认值:3000当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。 failedAttempts(执行失败最大次数) 默认值:3在某个节点执行相同或不同命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。 database(数据库编号) 默认值:0尝试连接的数据库编号。 password(密码) 默认值:null用于节点身份验证的密码。 subscriptionsPerConnection(单个连接最大订阅数量) 默认值:5每个连接的最大订阅数量。 clientName(客户端名称) 默认值:null在Redis节点里显示的客户端名称。 sslEnableEndpointIdentification(启用SSL终端识别) 默认值:true开启SSL终端识别能力。 sslProvider(SSL实现方式) 默认值:JDK确定采用哪种方式(JDK或OPENSSL)来实现SSL连接。 sslTruststore(SSL信任证书库路径) 默认值:null指定SSL信任证书库的路径。 sslTruststorePassword(SSL信任证书库密码) 默认值:null指定SSL信任证书库的密码。 sslKeystore(SSL钥匙库路径) 默认值:null指定SSL钥匙库的路径。 sslKeystorePassword(SSL钥匙库密码) 默认值:null指定SSL钥匙库的密码。
集群模式除了适用于Redis集群环境,也适用于任何云计算服务商提供的集群模式,例如AWS ElastiCache集群版、Azure Redis Cache和阿里云(Aliyun)的云数据库Redis版。
介绍配置Redis集群组态的文档在这里。 Redis集群组态的最低要求是必须有三个主节点。
以上代码使用了异步回调模式,RFuture 继承了 java.util.concurrent.Future, CompletionStage两大接口,异步回调模式的基础知识,请参见 《Java高并发核心编程 卷2 》

在RedissonLock对象的lock()方法主要调用tryAcquire()方法


由于leaseTime == -1,于是走tryLockInnerAsync()方法,这个方法才是关键

首先,看一下evalWriteAsync方法的定义
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);这和前面的jedis调用lua脚本类似,最后两个参数分别是keys和params。
单独将调用的那一段摘出来看,实际调用是这样的:
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));结合上面的参数声明,我们可以知道,这里KEYS1就是getName(),ARGV2是getLockName(threadId)
假设:
那么KEYS1=DISLOCK,ARGV2=01a6d806-d282-4715-9bec-f51b9aa98110:1
因此,这段脚本的意思是
1、判断有没有一个叫“DISLOCK”的key
2、如果没有,则在其下设置一个字段为“01a6d806-d282-4715-9bec-f51b9aa98110:1”,值为“1”的键值对 ,并设置它的过期时间
3、如果存在,则进一步判断“01a6d806-d282-4715-9bec-f51b9aa98110:1”是否存在,若存在,则其值加1,并重新设置过期时间
4、返回“DISLOCK”的生存时间(毫秒)
这里用的数据结构是hash,hash的结构是: key 字段1 值1 字段2 值2 。。。
用在锁这个场景下,key就表示锁的名称,也可以理解为临界资源,字段就表示当前获得锁的线程
所有竞争这把锁的线程都要判断在这个key下有没有自己线程的字段,如果没有则不能获得锁,如果有,则相当于重入,字段值加1(次数)

为何要使用lua语言?
因为一大堆复杂的业务逻辑,可以通过封装在lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性

回顾一下evalWriteAsync方法的定义
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);注意,其最后两个参数分别是keys和params。
KEYS1代表的是你加锁的那个key,比如说:
RLock lock = redisson.getLock("DISLOCK");
这里你自己设置了加锁的那个锁key就是“DISLOCK”。
ARGV1代表的就是锁key的默认生存时间
调用的时候,传递的参数为 internalLockLeaseTime ,该值默认30秒。
ARGV2代表的是加锁的客户端的ID,类似于下面这样:
01a6d806-d282-4715-9bec-f51b9aa98110:1
lua脚本的第一段if判断语句,就是用“exists DISLOCK”命令判断一下,如果你要加锁的那个锁key不存在的话,你就进行加锁。
如何加锁呢?很简单,用下面的redis命令:
hset DISLOCK 01a6d806-d282-4715-9bec-f51b9aa98110:1 1
通过这个命令设置一个hash数据结构,这行命令执行后,会出现一个类似下面的数据结构:
DISLOCK:
{
8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
}接着会执行“pexpire DISLOCK 30000”命令,设置DISLOCK这个锁key的生存时间是30秒(默认)
那么在这个时候,如果客户端2来尝试加锁,执行了同样的一段lua脚本,会咋样呢?
很简单,第一个if判断会执行“exists DISLOCK”,发现DISLOCK 这个锁key已经存在了。
接着第二个if判断,判断一下,DISLOCK锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。
所以,客户端2会获取到pttl DISLOCK返回的一个数字,这个数字代表了DISLOCK 这个锁key的剩余生存时间。比如还剩15000毫秒的生存时间。
此时客户端2会进入一个while循环,不停的尝试加锁。
如果客户端1都已经持有了这把锁了,结果可重入的加锁会怎么样呢?
RLock lock = redisson.getLock("DISLOCK")
lock.lock();
//业务代码
lock.lock();
//业务代码
lock.unlock();
lock.unlock();分析上面那段lua脚本。
第一个if判断肯定不成立,“exists DISLOCK”会显示锁key已经存在了。
第二个if判断会成立,因为DISLOCK的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”
此时就会执行可重入加锁的逻辑,他会用:
incrby DISLOCK
8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
通过这个命令,对客户端1的加锁次数,累加1。
此时DISLOCK数据结构变为下面这样:
DISLOCK:
{
8743c9c0-0795-4907-87fd-6c719a6b4586:1 2
}如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。
其实说白了,就是每次都对DISLOCK数据结构中的那个加锁次数减1。
如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:
“del DISLOCK”命令,从redis里删除这个key。
然后呢,另外的客户端2就可以尝试完成加锁了。
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
// Future<Void> future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
}再深入一下,实际调用的是unlockInnerAsync方法

上图没有截取完整,完整的源码如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}我们还是假设name=DISLOCK,假设线程ID是1
同理,我们可以知道
KEYS1是getName(),即KEYS1=DISLOCK
KEYS2是getChannelName(),即KEYS2=redisson_lock__channel:{DISLOCK}
ARGV1是LockPubSub.unlockMessage,即ARGV1=0
ARGV2是生存时间
ARGV3是getLockName(threadId),即ARGV3=8743c9c0-0795-4907-87fd-6c719a6b4586:1
因此,上面脚本的意思是:
1、判断是否存在一个叫“DISLOCK”的key
2、如果不存在,返回nil
3、如果存在,使用Redis Hincrby 命令用于为哈希表中的字段值加上指定增量值 -1 ,代表减去1
4、若counter >,返回空,若字段存在,则字段值减1
5、若减完以后,counter > 0 值仍大于0,则返回0
6、减完后,若字段值小于或等于0,则用 publish 命令广播一条消息,广播内容是0,并返回1;
可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了

以上是正常情况下获取到锁的情况,那么当无法立即获取到锁的时候怎么办呢?
再回到前面获取锁的位置
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 订阅
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
protected static final LockPubSub PUBSUB = new LockPubSub();
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}这里会订阅Channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源购买。 |
|---|
这里的channel为:
redisson_lock__channel:


当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞
这点,有点儿类似 Zookeeper分布式锁:
有关zookeeper分布式锁的原理和实现,具体请参见下面的博客: Zookeeper 分布式锁 (图解+秒懂+史上最全)
客户端1加锁的锁key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?
简单!只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。
但是聪明的同学肯定会问:
有效时间设置多长,假如我的业务操作比有效时间长,我的业务代码还没执行完,就自动给我解锁了,不就完蛋了吗。
这个问题就有点棘手了,在网上也有很多讨论:
第一种解决方法就是靠程序员自己去把握,预估一下业务代码需要执行的时间,然后设置有效期时间比执行时间长一些,保证不会因为自动解锁影响到客户端业务代码的执行。
但是这并不是万全之策,比如网络抖动这种情况是无法预测的,也有可能导致业务代码执行的时间变长,所以并不安全。
第二种方法,使用监事狗watchDog机制实现锁的续期。
第二种方法比较靠谱一点,而且无业务入侵。
在Redisson框架实现分布式锁的思路,就使用watchDog机制实现锁的续期。
当加锁成功后,同时开启守护线程,默认有效期是30秒,每隔10秒就会给锁续期到30秒,只要持有锁的客户端没有宕机,就能保证一直持有锁,直到业务代码执行完毕由客户端自己解锁,如果宕机了自然就在有效期失效后自动解锁。
这里,和前面解决 JVM STW的锁过期问题有点类似,只不过,watchDog自动续期,也没有完全解决JVM STW的锁过期问题。 如何彻底解决 JVM STW的锁过期问题,可以来疯狂创客圈的社群讨论。
实际上,redisson加锁的基本流程图如下:

这里专注于介绍watchdog。
首先watchdog的具体思路是 加锁时,默认加锁 30秒,每10秒钟检查一次,如果存在就重新设置 过期时间为30秒。
然后设置默认加锁时间的参数是 lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)
官方文档描述如下
lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)
默认值:
30000监控锁的看门狗超时时间单位为毫秒。该参数只适用于分布式锁的加锁请求中未明确使用
leaseTimeout参数的情况。如果该看门狗未使用lockWatchdogTimeout去重新调整一个分布式锁的lockWatchdogTimeout超时,那么这个锁将变为失效状态。这个参数可以用来避免由Redisson客户端节点宕机或其他原因造成死锁的情况。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。