首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >Redis分布式锁 (图解-秒懂-史上最全)

Redis分布式锁 (图解-秒懂-史上最全)

原创
作者头像
曾高飞
发布2026-05-25 20:54:06
发布2026-05-25 20:54:06
1980
举报

跨JVM的线程安全问题

在单体的应用开发场景中,在多线程的环境下,涉及并发同步的时候,为了保证一个代码块在同一时间只能由一个线程访问,我们一般可以使用synchronized语法和ReetrantLock去保证,这实际上是本地锁的方式。

也就是说,在同一个JVM内部,大家往往采用synchronized或者Lock的方式来解决多线程间的安全问题。但在分布式集群工作的开发场景中,在JVM之间,那么就需要一种更加高级的锁机制,来处理种跨JVM进程之间的线程安全问题.

解决方案是:使用分布式锁

总之,对于分布式场景,我们可以使用分布式锁,它是控制分布式系统之间互斥访问共享资源的一种方式。

比如说在一个分布式系统中,多台机器上部署了多个服务,当客户端一个用户发起一个数据插入请求时,如果没有分布式锁机制保证,那么那多台机器上的多个服务可能进行并发插入操作,导致数据重复插入,对于某些不允许有多余数据的业务来说,这就会造成问题。而分布式锁机制就是为了解决类似这类问题,保证多个服务之间互斥的访问共享资源,如果一个服务抢占了分布式锁,其他服务没获取到锁,就不进行后续操作。

大致意思如下图所示(不一定准确):

在这里插入图片描述
在这里插入图片描述

何为分布式锁?

何为分布式锁?

  • 当在分布式模型下,数据只有一份(或有限制),此时需要利用锁的技术控制某一时刻修改数据的进程数。
  • 用一个状态值表示锁,对锁的占用和释放通过状态值来标识。

分布式锁的条件:

  • 互斥性。在任意时刻,只有一个客户端能持有锁。
  • 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
  • 具有容错性。只要大部分的 Redis 节点正常运行,客户端就可以加锁和解锁。
  • 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

分布式锁的实现:

分布式锁的实现由很多种,文件锁、数据库、redis等等,比较多;分布式锁常见的多种实现方式:

  1. 数据库悲观锁、
  2. 数据库乐观锁;
  3. 基于Redis的分布式锁;
  4. 基于ZooKeeper的分布式锁。

在实践中,还是redis做分布式锁性能会高一些


数据库悲观锁

所谓悲观锁,悲观锁是对数据被的修改持悲观态度(认为数据在被修改的时候一定会存在并发问题),因此在整个数据处理过程中将数据锁定。

悲观锁的实现,往往依靠数据库提供的锁机制(也只有数据库层提供的锁机制才能真正保证数据访问的排他性,否则,即使在应用层中实现了加锁机制,也无法保证外部系统不会修改数据)。

数据库的行锁、表锁、排他锁等都是悲观锁,这里以行锁为例,进行介绍。以我们常用的MySQL为例,我们通过使用select...for update语句, 执行该语句后,会在表上加持行锁,一直到事务提交,解除行锁。

使用场景举例:

在秒杀案例中,生成订单和扣减库存的操作,可以通过商品记录的行锁,进行保护。们通过使用select...for update语句,在查询商品表库存时将该条记录加锁,待下单减库存完成后,再释放锁。

示例的SQL如下:

代码语言:javascript
复制
//0.开始事务
begin; 
	
//1.查询出商品信息

select stockCount from seckill_good where id=1 for update;

//2.根据商品信息生成订单

insert into seckill_order (id,good_id) values (null,1);

//3.修改商品stockCount减一

update seckill_good set stockCount=stockCount-1 where id=1;

//4.提交事务

commit;

以上,在对id = 1的记录修改前,先通过for update的方式进行加锁,然后再进行修改。这就是比较典型的悲观锁策略。

如果以上修改库存的代码发生并发,同一时间只有一个线程可以开启事务并获得id=1的锁,其它的事务必须等本次事务提交之后才能执行。这样我们可以保证当前的数据不会被其它事务修改。

我们使用select_for_update,另外一定要写在事务中. 注意:要使用悲观锁,我们必须关闭mysql数据库中自动提交的属性,命令set autocommit=0;即可关闭,因为MySQL默认使用autocommit模式,也就是说,当你执行一个更新操作后,MySQL会立刻将结果进行提交。

悲观锁的实现,往往依靠数据库提供的锁机制。在数据库中,悲观锁的流程如下:

  • 在对记录进行修改前,先尝试为该记录加上排他锁(exclusive locking)。
  • 如果加锁失败,说明该记录正在被修改,那么当前查询可能要等待或者抛出异常。具体响应方式由开发者根据实际需要决定。
  • 如果成功加锁,那么就可以对记录做修改,事务完成后就会解锁了。
  • 其间如果有其他事务对该记录做加锁的操作,都要等待当前事务解锁或直接抛出异常。

数据库乐观锁

使用乐观锁就不需要借助数据库的锁机制了。

乐观锁的概念中其实已经阐述了他的具体实现细节:主要就是两个步骤:冲突检测和数据更新。其实现方式有一种比较典型的就是Compare and Swap(CAS)技术

CAS是项乐观锁技术,当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其它线程都失败,失败的线程并不会被挂起,而是被告知这次竞争中失败,并可以再次尝试。

CAS的实现中,在表中增加一个version字段,操作前先查询version信息,在数据提交时检查version字段是否被修改,如果没有被修改则进行提交,否则认为是过期数据。

比如前面的扣减库存问题,通过乐观锁可以实现如下:

代码语言:javascript
复制
//1.查询出商品信息			
select stockCount, version from seckill_good where id=1;
			
//2.根据商品信息生成订单
insert into seckill_order (id,good_id) values (null,1);

//3.修改商品库存
update seckill_good set stockCount=stockCount-1, version = version+1 where id=1, version=version;

以上,我们在更新之前,先查询一下库存表中当前版本(version),然后在做update的时候,以version 作为一个修改条件。

当我们提交更新的时候,判断数据库表对应记录的当前version与第一次取出来的version进行比对,如果数据库表当前version与第一次取出来的version相等,则予以更新,否则认为是过期数据。

CAS 乐观锁有两个问题:

(1) CAS 存在一个比较重要的问题,即ABA问题. 解决的办法是version字段顺序递增。

(2) 乐观锁的方式,在高并发时,只有一个线程能执行成功,会造成大量的失败,这给用户的体验显然是很不好的。


Zookeeper分布式锁

除了在数据库层面加分布式锁,通常还可以使用以下更高性能、更高可用的分布式锁:

  • 分布式缓存(如redis)锁
  • 分布式协调(如zookeeper)锁

有关zookeeper分布式锁的原理和实现,具体请参见下面的博客: Zookeeper 分布式锁 (图解+秒懂+史上最全)

或者阅读笔者的《Java高并发核心编程(卷1加强版)》

在这里插入图片描述
在这里插入图片描述

Redis分布式锁

本文重点介绍Redis分布式锁,分为两个维度进行介绍:

(1)基于Jedis手工造轮子分布式锁

(2)介绍Redission 分布式锁的使用和原理。

分布式锁一般有如下的特点:

  • 互斥性: 同一时刻只能有一个线程持有锁
  • 可重入性: 同一节点上的同一个线程如果获取了锁之后能够再次获取锁
  • 锁超时:和J.U.C中的锁一样支持锁超时,防止死锁
  • 高性能和高可用: 加锁和解锁需要高效,同时也需要保证高可用,防止分布式锁失效
  • 具备阻塞和非阻塞性:能够及时从阻塞状态中被唤醒

手工造轮子:基于Jedis 的API实现分布式锁

我们首先讲解 Jedis 普通分布式锁实现,并且是纯手工的模式,从最为基础的Redis命令开始。

只有充分了解与分布式锁相关的普通Redis命令,才能更好的了解高级的Redis分布式锁的实现,因为高级的分布式锁的实现完全基于普通Redis命令。

Redis几种架构

Redis发展到现在,几种常见的部署架构有:

  • 单机模式;
  • 主从模式;
  • 哨兵模式;
  • 集群模式;

从分布式锁的角度来说, 无论是单机模式、主从模式、哨兵模式、集群模式,其原理都是类同的。 只是主从模式、哨兵模式、集群模式的更加的高可用、或者更加高并发。

所以,接下来先基于单机模式,基于Jedis手工造轮子实现自己的分布式锁。

首先看两个命令:

Redis分布式锁机制,主要借助setnx和expire两个命令完成。

setnx命令:

SETNX 是SET if Not eXists的简写。将 key 的值设为 value,当且仅当 key 不存在; 若给定的 key 已经存在,则 SETNX 不做任何动作。

下面为客户端使用示例:

代码语言:javascript
复制
127.0.0.1:6379> set lock "unlock"
OK
127.0.0.1:6379> setnx lock "unlock"
(integer) 0
127.0.0.1:6379> setnx lock "lock"
(integer) 0
127.0.0.1:6379> 

expire命令:

expire命令为 key 设置生存时间,当 key 过期时(生存时间为 0 ),它会被自动删除. 其格式为:

EXPIRE key seconds

下面为客户端使用示例:

代码语言:javascript
复制
127.0.0.1:6379> expire lock 10
(integer) 1
127.0.0.1:6379> ttl lock
8

基于Jedis API的分布式锁的总体流程:

通过Redis的setnx、expire命令可以实现简单的锁机制:

  • key不存在时创建,并设置value和过期时间,返回值为1;成功获取到锁;
  • 如key存在时直接返回0,抢锁失败;
  • 持有锁的线程释放锁时,手动删除key; 或者过期时间到,key自动删除,锁释放。

线程调用setnx方法成功返回1认为加锁成功,其他线程要等到当前线程业务操作完成释放锁后,才能再次调用setnx加锁成功。

在这里插入图片描述
在这里插入图片描述

以上简单redis分布式锁的问题:

如果出现了这么一个问题:如果setnx是成功的,但是expire设置失败,一旦出现了释放锁失败,或者没有手工释放,那么这个锁永远被占用,其他线程永远也抢不到锁。

所以,需要保障setnx和expire两个操作的原子性,要么全部执行,要么全部不执行,二者不能分开。

解决的办法有两种:

  • 使用set的命令时,同时设置过期时间,不再单独使用 expire命令
  • 使用lua脚本,将加锁的命令放在lua脚本中原子性的执行

简单加锁:使用set的命令时,同时设置过期时间

使用set的命令时,同时设置过期时间的示例如下:

代码语言:javascript
复制
127.0.0.1:6379> set unlock "234" EX 100 NX
(nil)
127.0.0.1:6379> 
127.0.0.1:6379> set test "111" EX 100 NX
OK

这样就完美的解决了分布式锁的原子性; set 命令的完整格式:

set key value EX seconds NX|XX

代码语言:javascript
复制
EX seconds:设置失效时长,单位秒
PX milliseconds:设置失效时长,单位毫秒
NX:key不存在时设置value,成功返回OK,失败返回(nil)
XX:key存在时设置value,成功返回OK,失败返回(nil)

使用set命令实现加锁操作,先展示加锁的简单代码实习,再带大家慢慢解释为什么这样实现。

加锁的简单代码实现

代码语言:javascript
复制
package com.crazymaker.springcloud.standard.lock;

@Slf4j
@Data
@AllArgsConstructor
public class JedisCommandLock {

    private  RedisTemplate redisTemplate;

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    /**
     * 尝试获取分布式锁
     * @param jedis Redis客户端
     * @param lockKey 锁
     * @param requestId 请求标识
     * @param expireTime 超期时间
     * @return 是否获取成功
     */
    public static   boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }
}

可以看到,我们加锁用到了Jedis的set Api

jedis.set(String key, String value, String nxxx, String expx, int time)

这个set()方法一共有五个形参:

  • 第一个为key,我们使用key来当锁,因为key是唯一的。
  • 第二个为value,我们传的是requestId,很多童鞋可能不明白,有key作为锁不就够了吗,为什么还要用到value?原因就是我们在上面讲到可靠性时,分布式锁要满足第四个条件解铃还须系铃人,通过给value赋值为requestId,我们就知道这把锁是哪个请求加的了,在解锁的时候就可以有依据。 requestId可以使用UUID.randomUUID().toString()方法生成。
  • 第三个为nxxx,这个参数我们填的是NX,意思是SET IF NOT EXIST,即当key不存在时,我们进行set操作;若key已经存在,则不做任何操作;
  • 第四个为expx,这个参数我们传的是PX,意思是我们要给这个key加一个过期的设置,具体时间由第五个参数决定。
  • 第五个为time,与第四个参数相呼应,代表key的过期时间。

总的来说,执行上面的set()方法就只会导致两种结果:

  1. 当前没有锁(key不存在),那么就进行加锁操作,并对锁设置个有效期,同时value表示加锁的客户端。
  2. 已有锁存在,不做任何操作。

心细的童鞋就会发现了,我们的加锁代码满足前面描述的四个条件中的三个。

  • 首先,set()加入了NX参数,可以保证如果已有key存在,则函数不会调用成功,也就是只有一个客户端能持有锁,满足互斥性。
  • 其次,由于我们对锁设置了过期时间,即使锁的持有者后续发生崩溃而没有解锁,锁也会因为到了过期时间而自动解锁(即key被删除),不会被永远占用(而发生死锁)。
  • 最后,因为我们将value赋值为requestId,代表加锁的客户端请求标识,那么在客户端在解锁的时候就可以进行校验是否是同一个客户端。
  • 由于我们只考虑Redis单机部署的场景,所以容错性我们暂不考虑。

那么这段Lua代码的功能是什么呢?

其实很简单,首先获取锁对应的value值,检查是否与requestId相等,如果相等则删除锁(解锁)。

编写RedisLockService用于管理JedisLock

编写个分布式锁服务,用于加载lua脚本,创建 分布式锁,代码如下:

代码语言:javascript
复制
package com.crazymaker.springcloud.standard.lock;

import com.crazymaker.springcloud.common.util.IOUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

@Slf4j
@Data
public class RedisLockService
{

    private RedisTemplate redisTemplate;

    static String lockLua = "script/lock.lua";
    static String unLockLua = "script/unlock.lua";
    static RedisScript<Long> lockScript = null;
    static RedisScript<Long> unLockScript = null;
    {
        String script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),lockLua);
//        String script = FileUtil.readString(lockLua, Charset.forName("UTF-8" ));
        if(StringUtils.isEmpty(script))
        {
            log.error("lua load failed:"+lockLua);
        }

        lockScript = new DefaultRedisScript<>(script, Long.class);

//        script = FileUtil.readString(unLockLua, Charset.forName("UTF-8" ));
        script =  IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),unLockLua);
        if(StringUtils.isEmpty(script))
        {
            log.error("lua load failed:"+unLockLua);
        }
        unLockScript = new DefaultRedisScript<>(script, Long.class);

    }

    public RedisLockService(RedisTemplate redisTemplate)
    {
        this.redisTemplate = redisTemplate;
    }


    public Lock getLock(String lockKey, String lockValue) {
        JedisLock lock=new JedisLock(lockKey,lockValue);
        lock.setRedisTemplate(redisTemplate);
        lock.setLockScript(lockScript);
        lock.setUnLockScript(unLockScript);
        return lock;
    }
}

测试用例

接下来,终于可以上测试用例了

代码语言:javascript
复制
package com.crazymaker.springcloud.lock;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {DemoCloudApplication.class})
// 指定启动类
public class RedisLockTest {

    @Resource
    RedisLockService redisLockService;

    private ExecutorService pool = Executors.newFixedThreadPool(10);

    @Test
    public void testLock() {
        int threads = 10;
        final int[] count = {0};
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                String lockValue = UUID.randomUUID().toString();

                try {
                    Lock lock = redisLockService.getLock("test:lock:1", lockValue);
                    boolean locked = lock.tryLock(10, TimeUnit.SECONDS);

                    if (locked) {
                        for (int j = 0; j < 1000; j++) {
                            count[0]++;
                        }

                        log.info("count = " + count[0]);
                        lock.unlock();
                    } else {
                        System.out.println("抢锁失败");
                    }


                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("10个线程每个累加1000为: = " + count[0]);
        //输出统计结果
        float time = System.currentTimeMillis() - start;

        System.out.println("运行的时长为(ms):" + time);
        System.out.println("每一次执行的时长为(ms):" + time / count[0]);

    }

}

执行用例,结果如下:

代码语言:javascript
复制
2021-05-04 23:02:11.900  INFO 22120 --- [pool-1-thread-7] c.c.springcloud.lock.RedisLockTest       LN:50 count = 6000
2021-05-04 23:02:11.901  INFO 22120 --- [pool-1-thread-1] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新开始,turn:3,剩余时间:9585
2021-05-04 23:02:11.902  INFO 22120 --- [pool-1-thread-1] c.c.springcloud.lock.RedisLockTest       LN:50 count = 7000
2021-05-04 23:02:12.100  INFO 22120 --- [pool-1-thread-4] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新开始,turn:3,剩余时间:9586
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新开始,turn:3,剩余时间:9585
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-8] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新开始,turn:3,剩余时间:9585
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-4] c.c.springcloud.lock.RedisLockTest       LN:50 count = 8000
2021-05-04 23:02:12.102  INFO 22120 --- [pool-1-thread-8] c.c.springcloud.lock.RedisLockTest       LN:50 count = 9000
2021-05-04 23:02:12.304  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新开始,turn:4,剩余时间:9383
2021-05-04 23:02:12.307  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.lock.RedisLockTest       LN:50 count = 10000
10个线程每个累加1000为: = 10000
运行的时长为(ms):827.0
每一次执行的时长为(ms):0.0827

STW导致的锁过期问题

下面有一个简单的使用锁的例子,在10秒内占着锁:

代码语言:javascript
复制
  //写数据到文件
function writeData(filename, data) {
    boolean locked = lock.tryLock(10, TimeUnit.SECONDS);
    if (!locked) {
        throw 'Failed to acquire lock';
    }

    try {
        //将数据写到文件
        var file = storage.readFile(filename);
        var updated = updateContents(file, data);
        storage.writeFile(filename, updated);
    } finally {
        lock.unlock();
    }
}

问题是:如果在写文件过程中,发生了 fullGC,并且其时间跨度较长, 超过了10秒, 那么,分布式就自动释放了。

在此过程中,client2 抢到锁,写了文件。

client1 的fullGC完成后,也继续写文件,注意,此时client1 的并没有占用锁,此时写入会导致文件数据错乱,发生线程安全问题。

这就是STW导致的锁过期问题。

STW导致的锁过期问题,具体如下图所示:

在这里插入图片描述
在这里插入图片描述

STW导致的锁过期问题,大概的解决方案,有: 1: 模拟CAS乐观锁的方式,增加版本号 2:watch dog自动延期机制

1: 模拟CAS乐观锁的方式,增加版本号(如下图中的token)

c
c

此方案如果要实现,需要调整业务逻辑,与之配合,所以会入侵代码。

2:watch dog自动延期机制

客户端1加锁的锁key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?

简单!只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。

redission,采用的就是这种方案, 此方案不会入侵业务代码。

注意:

单机版的watch dog 并不能解决 STW的过期问题, 需要分布式版本的 watch dog, 独立的看门狗服务。

锁删除之后, 取消看门狗服务的 对应的key记录, 当然,这就使得系统变得复杂, 还要保证看门狗服务的高并发、高可用、数据一致性的问题。

为啥推荐使用Redission

作为 Java 开发人员,我们若想在程序中集成 Redis,必须使用 Redis 的第三方库。目前大家使用的最多的第三方库是jedis。

和SpringCloud gateway一样,Redisson也是基于Netty实现的,是更高性能的第三方库。 所以,这里推荐大家使用Redission替代 jedis。

在使用Redission之前,建议大家先掌握Netty的知识。 推荐大家阅读被很多小伙伴评价为史上最为易懂的NIO、Netty书籍:《Java高并发核心编程(卷1)》

在这里插入图片描述
在这里插入图片描述

Redisson简介

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还实现了可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等,还提供了许多分布式服务。

img
img

Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

img
img

Redisson与Jedis的对比

1.概况对比

Jedis是Redis的java实现的客户端,其API提供了比较全面的的Redis命令的支持,Redisson实现了分布式和可扩展的的java数据结构,和Jedis相比,功能较为简单,不支持字符串操作,不支持排序,事物,管道,分区等Redis特性。Redisson的宗旨是促进使用者对Redis的关注分离,从而让使用者能够将精力更集中的放在处理业务逻辑上。

2.可伸缩性

Jedis使用阻塞的I/O,且其方法调用都是同步的,程序流程要等到sockets处理完I/O才能执行,不支持异步,Jedis客户端实例不是线程安全的,所以需要通过连接池来使用Jedis。

Redisson使用非阻塞的I/O和基于Netty框架的事件驱动的通信层,其方法调用时异步的。Redisson的API是线程安全的,所以操作单个Redisson连接来完成各种操作。

3.第三方框架整合

Redisson在Redis的基础上实现了java缓存标准规范;Redisson还提供了Spring Session回话管理器的实现。

Redission 的源码地址:

特性 & 功能:

  • 支持 Redis 单节点(single)模式、哨兵(sentinel)模式、主从(Master/Slave)模式以及集群(Redis Cluster)模式
  • 程序接口调用方式采用异步执行和异步流执行两种方式
  • 数据序列化,Redisson 的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在 Redis 里的读取和存储
  • 单个集合数据分片,在集群模式下,Redisson 为单个 Redis 集合类型提供了自动分片的功能
  • 提供多种分布式对象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等
  • 提供丰富的分布式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等
  • 分布式锁和同步器的实现,可重入锁(Reentrant Lock),公平锁(Fair Lock),联锁(MultiLock),红锁(Red Lock),信号量(Semaphonre),可过期性信号锁(PermitExpirableSemaphore)等
  • 提供先进的分布式服务,如分布式远程服务(Remote Service),分布式实时对象(Live Object)服务,分布式执行服务(Executor Service),分布式调度任务服务(Schedule Service)和分布式映射归纳服务(MapReduce)

Redisson的使用

如何安装 Redisson

安装 Redisson 最便捷的方法是使用 Maven 或者 Gradle:

•Maven

代码语言:javascript
复制
<dependency>	
    <groupId>org.redisson</groupId>	
    <artifactId>redisson</artifactId>	
    <version>3.11.4</version>	
</dependency>

•Gradle

代码语言:javascript
复制
compile group: 'org.redisson', name: 'redisson', version: '3.11.4'

目前 Redisson 最新版是 3.11.4,当然你也可以通过搜索 Maven 中央仓库 mvnrepository1 来找到 Redisson 的各种版本。

获取RedissonClient对象

RedissonClient有多种模式,主要的模式有:

  • 单节点模式
  • 哨兵模式
  • 主从模式
  • 集群模式

首先介绍单节点模式。

单节点模式的程序化配置方法,大致如下:

代码语言:javascript
复制
Config config = new Config();
config.useSingleServer().setAddress("redis://myredisserver:6379");
RedissonClient redisson = Redisson.create(config);xxxxxxxxxx Config config = new Config();config.useSingleServer().setAddress("redis://myredisserver:6379");RedissonClient redisson = Redisson.create(config);// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();
代码语言:javascript
复制
SingleServerConfig singleConfig = config.useSingleServer();

SingleServerConfig类的设置参数如下

address(节点地址) 可以通过host:port的格式来指定节点地址。 subscriptionConnectionMinimumIdleSize(发布和订阅连接的最小空闲连接数) 默认值:1 用于发布和订阅连接的最小保持连接数(长连接)。Redisson内部经常通过发布和订阅来实现许多功能。长期保持一定数量的发布订阅连接是必须的。 subscriptionConnectionPoolSize(发布和订阅连接池大小) 默认值:50 用于发布和订阅连接的连接池最大容量。连接池的连接数量自动弹性伸缩。 connectionMinimumIdleSize(最小空闲连接数) 默认值:32 最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时写入反应速度。 connectionPoolSize(连接池大小) 默认值:64 连接池最大容量。连接池的连接数量自动弹性伸缩。 dnsMonitoring(是否启用DNS监测) 默认值:false 在启用该功能以后,Redisson将会监测DNS的变化情况。 dnsMonitoringInterval(DNS监测时间间隔,单位:毫秒) 默认值:5000 监测DNS的变化情况的时间间隔。 idleConnectionTimeout(连接空闲超时,单位:毫秒) 默认值:10000 如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。 connectTimeout(连接超时,单位:毫秒) 默认值:10000 同节点建立连接时的等待超时。时间单位是毫秒。 timeout(命令等待超时,单位:毫秒) 默认值:3000 等待节点回复命令的时间。该时间从命令发送成功时开始计时。 retryAttempts(命令失败重试次数) 默认值:3 如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。 retryInterval(命令重试发送时间间隔,单位:毫秒) 默认值:1500 在一条命令发送失败以后,等待重试发送的时间间隔。时间单位是毫秒。 reconnectionTimeout(重新连接时间间隔,单位:毫秒) 默认值:3000 当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。 failedAttempts(执行失败最大次数) 默认值:3 在某个节点执行相同或不同命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。 database(数据库编号) 默认值:0 尝试连接的数据库编号。 password(密码) 默认值:null 用于节点身份验证的密码。 subscriptionsPerConnection(单个连接最大订阅数量) 默认值:5 每个连接的最大订阅数量。 clientName(客户端名称) 默认值:null 在Redis节点里显示的客户端名称。 sslEnableEndpointIdentification(启用SSL终端识别) 默认值:true 开启SSL终端识别能力。 sslProvider(SSL实现方式) 默认值:JDK 确定采用哪种方式(JDK或OPENSSL)来实现SSL连接。 sslTruststore(SSL信任证书库路径) 默认值:null 指定SSL信任证书库的路径。 sslTruststorePassword(SSL信任证书库密码) 默认值:null 指定SSL信任证书库的密码。 sslKeystore(SSL钥匙库路径) 默认值:null 指定SSL钥匙库的路径。 sslKeystorePassword(SSL钥匙库密码) 默认值:null 指定SSL钥匙库的密码。

SpringBoot整合Redisson

Redisson有多种模式,首先介绍单机模式的整合。

一、导入Maven依赖
代码语言:javascript
复制
<!-- redisson-springboot -->
   <dependency>
       <groupId>org.redisson</groupId>
       <artifactId>redisson-spring-boot-starter</artifactId>
       <version>3.11.4</version>
   </dependency>
二、核心配置文件
代码语言:javascript
复制
spring:
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    timeout: 5000
三、添加配置类

RedissonConfig.java

代码语言:javascript
复制
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RedissonConfig {

    @Autowired
    private RedisProperties redisProperties;

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + "");
        config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());
        config.useSingleServer().setDatabase(3);
        return Redisson.create(config);
    }

}
自定义starter

由于redission可以有多种模式,处于学习的目的,将多种模式封装成一个start,可以学习一下starter的制作。

在这里插入图片描述
在这里插入图片描述

封装一个RedissonManager,通过策略模式,根据不同的配置类型,创建 RedissionConfig实例,然后创建RedissionClient对象。

在这里插入图片描述
在这里插入图片描述

使用RBucket操作分布式对象

Redission模拟了Java的面向对象编程思想,可以简单理解为一切皆为对象。

每一个 Redisson 对象 实现了**RObject** and RExpirable 两个interfaces.

Usage example:

代码语言:javascript
复制
RObject object = redisson.get...()

object.sizeInMemory();

object.delete();

object.rename("newname");

object.isExists();

// catch expired event
object.addListener(new ExpiredObjectListener() {
   ...
});

// catch delete event
object.addListener(new DeletedObjectListener() {
   ...
});

每一个Redisson 对象的名字,就是 Redis中的 Key.

代码语言:javascript
复制
RMap map = redisson.getMap("mymap");
map.getName(); // = mymap

可以通过 RKeys 接口操作Redis中的keys.

Usage example:

代码语言:javascript
复制
RKeys keys = redisson.getKeys();

Iterable<String> allKeys = keys.getKeys();

Iterable<String> foundedKeys = keys.getKeysByPattern('key*');

long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");

long deletedKeysAmount = keys.deleteByPattern("test?");

String randomKey = keys.randomKey();

long keysAmount = keys.count();

keys.flushall();

keys.flushdb();

Redisson通过RBucket接口代表可以访问任何类型的基础对象,或者普通对象

RBucket有一系列的工具方法,如compareAndSet(),get(),getAndDelete(),getAndSet(),set(),size(),trySet()等等,用于设值/取值/获取尺寸。

RBucket普通对象的最大大小,为512兆字节

代码语言:javascript
复制
RBucket<AnyObject> bucket = redisson.getBucket("anyObject");

bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();

bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));
在这里插入图片描述
在这里插入图片描述

执行 Lua脚本

Lua是一种开源、简单易学、轻量小巧的脚本语言,用标准C语言编写。

其设计的目的就是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

Redis从2.6版本开始支持Lua脚本,Redis使用Lua可以:

  1. 原子操作。Redis会将整个脚本作为一个整体执行,不会被中断。可以用来批量更新、批量插入
  2. 减少网络开销。多个Redis操作合并为一个脚本,减少网络时延
  3. 代码复用。客户端发送的脚本可以存储在Redis中,其他客户端可以根据脚本的id调用。
代码语言:javascript
复制
public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testLuaExamples() {
        // 默认连接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();

        redisson.getBucket("redission:test:foo").set("bar");
        String r = redisson.getScript().eval(RScript.Mode.READ_ONLY,
                "return redis.call('get', 'redission:test:foo')", RScript.ReturnType.VALUE);
        System.out.println("foo: " + r);

        // 通过预存的脚本进行同样的操作
        RScript s = redisson.getScript();
        // 首先将脚本加载到Redis
        String sha1 = s.scriptLoad("return redis.call('get', 'redission:test:foo')");
        // 返回值 res == 282297a0228f48cd3fc6a55de6316f31422f5d17
        System.out.println("sha1: " + sha1);
        // 再通过SHA值调用脚本
        Future<Object> r1 = redisson.getScript().evalShaAsync(RScript.Mode.READ_ONLY,
                sha1,
                RScript.ReturnType.VALUE,
                Collections.emptyList());
        try {
            System.out.println("res: " + r1.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        client.shutdown();
    }
}

运行上面的代码时,将会看到以下输出:

代码语言:javascript
复制
foo: bar
sha1: 282297a0228f48cd3fc6a55de6316f31422f5d17
res: bar
在这里插入图片描述
在这里插入图片描述

使用 RLock 实现 Redis 分布式锁

RLock 是 Java 中可重入锁的分布式实现,下面的代码演示了 RLock 的用法:

代码语言:javascript
复制
public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

 @Test
    public void testLockExamples() {
        // 默认连接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();
        // RLock 继承了 java.util.concurrent.locks.Lock 接口
        RLock lock = redisson.getLock("redission:test:lock:1");

        final int[] count = {0};
        int threads = 10;
        ExecutorService pool = Executors.newFixedThreadPool(10);
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                for (int j = 0; j < 1000; j++) {
                    lock.lock();

                    count[0]++;
                    lock.unlock();
                }
                countDownLatch.countDown();
            });
        }

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("10个线程每个累加1000为: = " + count[0]);
        //输出统计结果
        float time = System.currentTimeMillis() - start;

        System.out.println("运行的时长为:" + time);
        System.out.println("每一次执行的时长为:" + time/count[0]);
    }

}

此代码将产生以下输出:

代码语言:javascript
复制
10个线程每个累加1000为: = 10000
运行的时长为:14172.0
每一次执行的时长为:1.4172

使用 RAtomicLong 实现 Redis 原子操作

RAtomicLong 是 Java 中 AtomicLong 类的分布式“替代品”,用于在并发环境中保存长值。以下示例代码演示了 RAtomicLong 的用法:

代码语言:javascript
复制
public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testRAtomicLongExamples() {
        // 默认连接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();
        RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
          // 线程数
        final int threads = 10;
        // 每条线程的执行轮数
        final int turns = 1000;
        ExecutorService pool = Executors.newFixedThreadPool(threads);
        for (int i = 0; i < threads; i++)
        {
            pool.submit(() ->
            {
                try
                {
                    for (int j = 0; j < turns; j++)
                    {
                        atomicLong.incrementAndGet();
                    }

                } catch (Exception e)
                {
                    e.printStackTrace();
                }
            });
        }

        ThreadUtil.sleepSeconds(5);
        System.out.println("atomicLong: " + atomicLong.get());
        redisson.shutdown();
    }

}

此代码的输出将是:

代码语言:javascript
复制
atomicLong: 10000
在这里插入图片描述
在这里插入图片描述

整长型累加器(LongAdder)

基于Redis的Redisson分布式整长型累加器(LongAdder)采用了与java.util.concurrent.atomic.LongAdder类似的接口。通过利用客户端内置的LongAdder对象,为分布式环境下递增和递减操作提供了很高得性能。据统计其性能最高比分布式AtomicLong对象快 12000 倍。

完美适用于分布式统计计量场景。下面是RLongAdder的使用案例:

代码语言:javascript
复制
RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();

以下示例代码演示了 RLongAdder 的用法:

代码语言:javascript
复制
public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testRAtomicLongExamples() {
        // 默认连接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();
        RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
          // 线程数
        final int threads = 10;
        // 每条线程的执行轮数
        final int turns = 1000;
        ExecutorService pool = Executors.newFixedThreadPool(threads);
        for (int i = 0; i < threads; i++)
        {
            pool.submit(() ->
            {
                try
                {
                    for (int j = 0; j < turns; j++)
                    {
                        atomicLong.incrementAndGet();
                    }

                } catch (Exception e)
                {
                    e.printStackTrace();
                }
            });
        }

        ThreadUtil.sleepSeconds(5);
        System.out.println("atomicLong: " + atomicLong.get());
        redisson.shutdown();
    }

}

此代码将产生以下输出:

代码语言:javascript
复制
longAdder: 10000
运行的时长为:5085.0
每一次执行的时长为:0.5085

当不再使用整长型累加器对象的时候应该自行手动销毁,如果Redisson对象被关闭(shutdown)了,则不用手动销毁。

代码语言:javascript
复制
RLongAdder atomicLong = ...
atomicLong.destroy();

序列化

Redisson的对象编码类是用于将对象进行序列化和反序列化,以实现对该对象在Redis里的读取和存储。Redisson提供了以下几种的对象编码应用,以供大家选择:

编码类名称

说明

org.redisson.codec.JsonJacksonCodec

Jackson JSON 编码 默认编码

org.redisson.codec.AvroJacksonCodec

Avro 一个二进制的JSON编码

org.redisson.codec.SmileJacksonCodec

Smile 另一个二进制的JSON编码

org.redisson.codec.CborJacksonCodec

CBOR 又一个二进制的JSON编码

org.redisson.codec.MsgPackJacksonCodec

MsgPack 再来一个二进制的JSON编码

org.redisson.codec.IonJacksonCodec

Amazon Ion 亚马逊的Ion编码,格式与JSON类似

org.redisson.codec.KryoCodec

Kryo 二进制对象序列化编码

org.redisson.codec.SerializationCodec

JDK序列化编码

org.redisson.codec.FstCodec

FST 10倍于JDK序列化性能而且100%兼容的编码

org.redisson.codec.LZ4Codec

LZ4 压缩型序列化对象编码

org.redisson.codec.SnappyCodec

Snappy 另一个压缩型序列化对象编码

org.redisson.client.codec.JsonJacksonMapCodec

基于Jackson的映射类使用的编码。可用于避免序列化类的信息,以及用于解决使用byte[]遇到的问题。

org.redisson.client.codec.StringCodec

纯字符串编码(无转换)

org.redisson.client.codec.LongCodec

纯整长型数字编码(无转换)

org.redisson.client.codec.ByteArrayCodec

字节数组编码

org.redisson.codec.CompositeCodec

用来组合多种不同编码在一起

由Redisson默认的编码器为二进制编码器,为了序列化后的内容可见,需要使用Json文本序列化编码工具类。Redisson提供了编码器 JsonJacksonCodec,作为Json文本序列化编码工具类。

问题是:JsonJackson在序列化有双向引用的对象时,会出现无限循环异常。而fastjson在检查出双向引用后会自动用引用符$ref替换,终止循环。

所以,一些特殊场景中:用fastjson能 正常序列化到redis,而JsonJackson则抛出无限循环异常。

为了序列化后的内容可见,所以不用redission其他自带的,自行实现fastjson编码器:

代码语言:javascript
复制
package com.crayon.distributedredissionspringbootstarter.codec;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;

import java.io.IOException;

public class FastjsonCodec extends BaseCodec {

    private final Encoder encoder = in -> {
        ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
        try {
            ByteBufOutputStream os = new ByteBufOutputStream(out);
            JSON.writeJSONString(os, in, SerializerFeature.WriteClassName);
            return os.buffer();
        } catch (IOException e) {
            out.release();
            throw e;
        } catch (Exception e) {
            out.release();
            throw new IOException(e);
        }
    };

    private final Decoder<Object> decoder = (buf, state) ->
            JSON.parseObject(new ByteBufInputStream(buf), Object.class);

    @Override
    public Decoder<Object> getValueDecoder() {
        return decoder;
    }

    @Override
    public Encoder getValueEncoder() {
        return encoder;
    }
}

替换的方法如下:

代码语言:javascript
复制
 */
@Slf4j
public class StandaloneConfigImpl implements RedissonConfigService {

    @Override
    public Config createRedissonConfig(RedissonConfig redissonConfig) {
        Config config = new Config();
        try {
            String address = redissonConfig.getAddress();
            String password = redissonConfig.getPassword();
            int database = redissonConfig.getDatabase();
            String redisAddr = GlobalConstant.REDIS_CONNECTION_PREFIX.getConstant_value() + address;
            config.useSingleServer().setAddress(redisAddr);
            config.useSingleServer().setDatabase(database);
            //密码可以为空
            if (!StringUtils.isEmpty(password)) {
                config.useSingleServer().setPassword(password);
            }
            log.info("初始化[单机部署]方式Config,redisAddress:" + address);

//            config.setCodec( new FstCodec());
            config.setCodec( new FastjsonCodec());
        } catch (Exception e) {
            log.error("单机部署 Redisson init error", e);
        }
        return config;
    }
}

哨兵模式

哨兵模式即sentinel模式,配置Redis哨兵服务的官方文档在这里。

哨兵模式实现代码和单机模式几乎一样,唯一的不同就是Config的构造.

程序化配置哨兵模式的方法如下

代码语言:javascript
复制
Config config = new Config();
config.useSentinelServers()
    .setMasterName("mymaster")
    // use "rediss://" for SSL connection
    .addSentinelAddress("redis://127.0.0.1:26389", "redis://127.0.0.1:26379")
    .addSentinelAddress("redis://127.0.0.1:26319");

RedissonClient redisson = Redisson.create(config);

Redisson的哨兵模式的使用方法如下:

代码语言:javascript
复制
SentinelServersConfig sentinelConfig = config.useSentinelServers();

SentinelServersConfig配置参数如下

配置Redis哨兵服务的官方文档在这里。Redisson的哨兵模式的使用方法如下:SentinelServersConfig sentinelConfig = config.useSentinelServers(); SentinelServersConfig 类的设置参数如下: dnsMonitoringInterval(DNS监控间隔,单位:毫秒) 默认值:5000 用来指定检查节点DNS变化的时间间隔。使用的时候应该确保JVM里的DNS数据的缓存时间保持在足够低的范围才有意义。用-1来禁用该功能。 masterName(主服务器的名称) 主服务器的名称是哨兵进程中用来监测主从服务切换情况的。 addSentinelAddress(添加哨兵节点地址) 可以通过host:port的格式来指定哨兵节点的地址。多个节点可以一次性批量添加。 readMode(读取操作的负载均衡模式) 默认值: SLAVE(只在从服务节点里读取) 注:在从服务节点里读取的数据说明已经至少有两个节点保存了该数据,确保了数据的高可用性。 设置读取操作选择节点的模式。可用值为:SLAVE - 只在从服务节点里读取。MASTER - 只在主服务节点里读取。MASTER_SLAVE - 在主从服务节点里都可以读取。 subscriptionMode(订阅操作的负载均衡模式) 默认值:SLAVE(只在从服务节点里订阅) 设置订阅操作选择节点的模式。可用值为:SLAVE - 只在从服务节点里订阅。MASTER - 只在主服务节点里订阅。 loadBalancer(负载均衡算法类的选择) 默认值: org.redisson.connection.balancer.RoundRobinLoadBalancer 在使用多个Redis服务节点的环境里,可以选用以下几种负载均衡方式选择一个节点:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 权重轮询调度算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 轮询调度算法org.redisson.connection.balancer.RandomLoadBalancer - 随机调度算法 subscriptionConnectionMinimumIdleSize(从节点发布和订阅连接的最小空闲连接数) 默认值:1 多从节点的环境里,每个 从服务节点里用于发布和订阅连接的最小保持连接数(长连接)。Redisson内部经常通过发布和订阅来实现许多功能。长期保持一定数量的发布订阅连接是必须的。 subscriptionConnectionPoolSize(从节点发布和订阅连接池大小) 默认值:50 多从节点的环境里,每个 从服务节点里用于发布和订阅连接的连接池最大容量。连接池的连接数量自动弹性伸缩。 slaveConnectionMinimumIdleSize(从节点最小空闲连接数) 默认值:32 多从节点的环境里,每个 从服务节点里用于普通操作( 发布和订阅)的最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时读取反映速度。 slaveConnectionPoolSize(从节点连接池大小) 默认值:64 多从节点的环境里,每个 从服务节点里用于普通操作( 发布和订阅)连接的连接池最大容量。连接池的连接数量自动弹性伸缩。 masterConnectionMinimumIdleSize(主节点最小空闲连接数) 默认值:32 多从节点的环境里,每个 主节点的最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时写入反应速度。 masterConnectionPoolSize(主节点连接池大小) 默认值:64 主节点的连接池最大容量。连接池的连接数量自动弹性伸缩。 idleConnectionTimeout(连接空闲超时,单位:毫秒) 默认值:10000 如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。 connectTimeout(连接超时,单位:毫秒) 默认值:10000 同任何节点建立连接时的等待超时。时间单位是毫秒。 timeout(命令等待超时,单位:毫秒) 默认值:3000 等待节点回复命令的时间。该时间从命令发送成功时开始计时。 retryAttempts(命令失败重试次数) 默认值:3 如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。 retryInterval(命令重试发送时间间隔,单位:毫秒) 默认值:1500 在一条命令发送失败以后,等待重试发送的时间间隔。时间单位是毫秒。 reconnectionTimeout(重新连接时间间隔,单位:毫秒) 默认值:3000 当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。 failedAttempts(执行失败最大次数) 默认值:3 在某个节点执行相同或不同命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。 database(数据库编号) 默认值:0 尝试连接的数据库编号。 password(密码) 默认值:null 用于节点身份验证的密码。 subscriptionsPerConnection(单个连接最大订阅数量) 默认值:5 每个连接的最大订阅数量。 clientName(客户端名称) 默认值:null 在Redis节点里显示的客户端名称。 sslEnableEndpointIdentification(启用SSL终端识别) 默认值:true 开启SSL终端识别能力。 sslProvider(SSL实现方式) 默认值:JDK 确定采用哪种方式(JDK或OPENSSL)来实现SSL连接。 sslTruststore(SSL信任证书库路径) 默认值:null 指定SSL信任证书库的路径。 sslTruststorePassword(SSL信任证书库密码) 默认值:null 指定SSL信任证书库的密码。 sslKeystore(SSL钥匙库路径) 默认值:null 指定SSL钥匙库的路径。 sslKeystorePassword(SSL钥匙库密码) 默认值:null 指定SSL钥匙库的密码。

通过属性文件,配置的示例如下:

代码语言:javascript
复制
---
sentinelServersConfig:
idleConnectionTimeout: 10000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
failedSlaveReconnectionInterval: 3000
failedSlaveCheckInterval: 60000
password: null
subscriptionsPerConnection: 5
clientName: null
loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
subscriptionConnectionMinimumIdleSize: 1
subscriptionConnectionPoolSize: 50
slaveConnectionMinimumIdleSize: 24
slaveConnectionPoolSize: 64
masterConnectionMinimumIdleSize: 24
masterConnectionPoolSize: 64
readMode: "SLAVE"
subscriptionMode: "SLAVE"
sentinelAddresses:
  - "redis://127.0.0.1:26379"
  - "redis://127.0.0.1:26389"
masterName: "mymaster"
database: 0
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.MarshallingCodec> {}
transportMode: "NIO"

主从模式

介绍配置Redis主从服务组态的文档在这里.

程序化配置主从模式的方法如下

代码语言:javascript
复制
Config config = new Config();
config.useMasterSlaveServers()
    // use "rediss://" for SSL connection
    .setMasterAddress("redis://127.0.0.1:6379")
    .addSlaveAddress("redis://127.0.0.1:6389", "redis://127.0.0.1:6332", "redis://127.0.0.1:6419")
    .addSlaveAddress("redis://127.0.0.1:6399");

RedissonClient redisson = Redisson.create(config);

主从模式使用到MasterSlaveServersConfig :

MasterSlaveServersConfig masterSlaveConfig = config.useMasterSlaveServers();

MasterSlaveServersConfig 类的设置参数如下:

dnsMonitoringInterval(DNS监控间隔,单位:毫秒) 默认值:5000 用来指定检查节点DNS变化的时间间隔。使用的时候应该确保JVM里的DNS数据的缓存时间保持在足够低的范围才有意义。用-1来禁用该功能。 masterAddress(主节点地址) 可以通过host:port的格式来指定主节点地址。 addSlaveAddress(添加从主节点地址) 可以通过host:port的格式来指定从节点的地址。多个节点可以一次性批量添加。 readMode(读取操作的负载均衡模式) 默认值: SLAVE(只在从服务节点里读取) 注:在从服务节点里读取的数据说明已经至少有两个节点保存了该数据,确保了数据的高可用性。 设置读取操作选择节点的模式。可用值为:SLAVE - 只在从服务节点里读取。MASTER - 只在主服务节点里读取。MASTER_SLAVE - 在主从服务节点里都可以读取。 subscriptionMode(订阅操作的负载均衡模式) 默认值:SLAVE(只在从服务节点里订阅) 设置订阅操作选择节点的模式。可用值为:SLAVE - 只在从服务节点里订阅。MASTER - 只在主服务节点里订阅。 loadBalancer(负载均衡算法类的选择) 默认值: org.redisson.connection.balancer.RoundRobinLoadBalancer 在使用多个Redis服务节点的环境里,可以选用以下几种负载均衡方式选择一个节点:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 权重轮询调度算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 轮询调度算法org.redisson.connection.balancer.RandomLoadBalancer - 随机调度算法 subscriptionConnectionMinimumIdleSize(从节点发布和订阅连接的最小空闲连接数) 默认值:1 多从节点的环境里,每个 从服务节点里用于发布和订阅连接的最小保持连接数(长连接)。Redisson内部经常通过发布和订阅来实现许多功能。长期保持一定数量的发布订阅连接是必须的。 subscriptionConnectionPoolSize(从节点发布和订阅连接池大小) 默认值:50 多从节点的环境里,每个 从服务节点里用于发布和订阅连接的连接池最大容量。连接池的连接数量自动弹性伸缩。 slaveConnectionMinimumIdleSize(从节点最小空闲连接数) 默认值:32 多从节点的环境里,每个 从服务节点里用于普通操作( 发布和订阅)的最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时读取反映速度。 slaveConnectionPoolSize(从节点连接池大小) 默认值:64 多从节点的环境里,每个 从服务节点里用于普通操作( 发布和订阅)连接的连接池最大容量。连接池的连接数量自动弹性伸缩。 masterConnectionMinimumIdleSize(主节点最小空闲连接数) 默认值:32 多从节点的环境里,每个 主节点的最小保持连接数(长连接)。长期保持一定数量的连接有利于提高瞬时写入反应速度。 masterConnectionPoolSize(主节点连接池大小) 默认值:64 主节点的连接池最大容量。连接池的连接数量自动弹性伸缩。 idleConnectionTimeout(连接空闲超时,单位:毫秒) 默认值:10000 如果当前连接池里的连接数量超过了最小空闲连接数,而同时有连接空闲时间超过了该数值,那么这些连接将会自动被关闭,并从连接池里去掉。时间单位是毫秒。 connectTimeout(连接超时,单位:毫秒) 默认值:10000 同任何节点建立连接时的等待超时。时间单位是毫秒。 timeout(命令等待超时,单位:毫秒) 默认值:3000 等待节点回复命令的时间。该时间从命令发送成功时开始计时。 retryAttempts(命令失败重试次数) 默认值:3 如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。 retryInterval(命令重试发送时间间隔,单位:毫秒) 默认值:1500 在一条命令发送失败以后,等待重试发送的时间间隔。时间单位是毫秒。 reconnectionTimeout(重新连接时间间隔,单位:毫秒) 默认值:3000 当与某个节点的连接断开时,等待与其重新建立连接的时间间隔。时间单位是毫秒。 failedAttempts(执行失败最大次数) 默认值:3 在某个节点执行相同或不同命令时,连续 失败 failedAttempts(执行失败最大次数) 时,该节点将被从可用节点列表里清除,直到 reconnectionTimeout(重新连接时间间隔) 超时以后再次尝试。 database(数据库编号) 默认值:0 尝试连接的数据库编号。 password(密码) 默认值:null 用于节点身份验证的密码。 subscriptionsPerConnection(单个连接最大订阅数量) 默认值:5 每个连接的最大订阅数量。 clientName(客户端名称) 默认值:null 在Redis节点里显示的客户端名称。 sslEnableEndpointIdentification(启用SSL终端识别) 默认值:true 开启SSL终端识别能力。 sslProvider(SSL实现方式) 默认值:JDK 确定采用哪种方式(JDK或OPENSSL)来实现SSL连接。 sslTruststore(SSL信任证书库路径) 默认值:null 指定SSL信任证书库的路径。 sslTruststorePassword(SSL信任证书库密码) 默认值:null 指定SSL信任证书库的密码。 sslKeystore(SSL钥匙库路径) 默认值:null 指定SSL钥匙库的路径。 sslKeystorePassword(SSL钥匙库密码) 默认值:null 指定SSL钥匙库的密码。

集群模式

集群模式除了适用于Redis集群环境,也适用于任何云计算服务商提供的集群模式,例如AWS ElastiCache集群版、Azure Redis Cache和阿里云(Aliyun)的云数据库Redis版。

介绍配置Redis集群组态的文档在这里。 Redis集群组态的最低要求是必须有三个主节点。

以上代码使用了异步回调模式,RFuture 继承了 java.util.concurrent.Future, CompletionStage两大接口,异步回调模式的基础知识,请参见 《Java高并发核心编程 卷2 》

在这里插入图片描述
在这里插入图片描述

tryAcquire()方法

在RedissonLock对象的lock()方法主要调用tryAcquire()方法

img
img

tryLockInnerAsync

img
img

由于leaseTime == -1,于是走tryLockInnerAsync()方法,这个方法才是关键

img
img

首先,看一下evalWriteAsync方法的定义

代码语言:javascript
复制
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

这和前面的jedis调用lua脚本类似,最后两个参数分别是keys和params。

单独将调用的那一段摘出来看,实际调用是这样的:

代码语言:javascript
复制
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

结合上面的参数声明,我们可以知道,这里KEYS1就是getName(),ARGV2是getLockName(threadId)

假设:

  • 前面获取锁时传的name是“DISLOCK”,
  • 假设调用的线程ID是1,
  • 假设成员变量UUID类型的id是01a6d806-d282-4715-9bec-f51b9aa98110

那么KEYS1=DISLOCK,ARGV2=01a6d806-d282-4715-9bec-f51b9aa98110:1

因此,这段脚本的意思是

  1、判断有没有一个叫“DISLOCK”的key

  2、如果没有,则在其下设置一个字段为“01a6d806-d282-4715-9bec-f51b9aa98110:1”,值为“1”的键值对 ,并设置它的过期时间

  3、如果存在,则进一步判断“01a6d806-d282-4715-9bec-f51b9aa98110:1”是否存在,若存在,则其值加1,并重新设置过期时间

  4、返回“DISLOCK”的生存时间(毫秒)

原理:加锁机制

这里用的数据结构是hash,hash的结构是: key 字段1 值1 字段2 值2 。。。

用在锁这个场景下,key就表示锁的名称,也可以理解为临界资源,字段就表示当前获得锁的线程

所有竞争这把锁的线程都要判断在这个key下有没有自己线程的字段,如果没有则不能获得锁,如果有,则相当于重入,字段值加1(次数)

在这里插入图片描述
在这里插入图片描述

Lua脚本的详解

为何要使用lua语言?

因为一大堆复杂的业务逻辑,可以通过封装在lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性

在这里插入图片描述
在这里插入图片描述

回顾一下evalWriteAsync方法的定义

代码语言:javascript
复制
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

注意,其最后两个参数分别是keys和params。

关于 lua脚本的参数解释:

KEYS1代表的是你加锁的那个key,比如说:

RLock lock = redisson.getLock("DISLOCK");

这里你自己设置了加锁的那个锁key就是“DISLOCK”。

ARGV1代表的就是锁key的默认生存时间

调用的时候,传递的参数为 internalLockLeaseTime ,该值默认30秒。

ARGV2代表的是加锁的客户端的ID,类似于下面这样:

01a6d806-d282-4715-9bec-f51b9aa98110:1

lua脚本的第一段if判断语句,就是用“exists DISLOCK”命令判断一下,如果你要加锁的那个锁key不存在的话,你就进行加锁。

如何加锁呢?很简单,用下面的redis命令:

hset DISLOCK 01a6d806-d282-4715-9bec-f51b9aa98110:1 1

通过这个命令设置一个hash数据结构,这行命令执行后,会出现一个类似下面的数据结构:

代码语言:javascript
复制
DISLOCK:
    {
        8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
    }

接着会执行“pexpire DISLOCK 30000”命令,设置DISLOCK这个锁key的生存时间是30秒(默认)

锁互斥机制

那么在这个时候,如果客户端2来尝试加锁,执行了同样的一段lua脚本,会咋样呢?

很简单,第一个if判断会执行“exists DISLOCK”,发现DISLOCK 这个锁key已经存在了。

接着第二个if判断,判断一下,DISLOCK锁key的hash数据结构中,是否包含客户端2的ID,但是明显不是的,因为那里包含的是客户端1的ID。

所以,客户端2会获取到pttl DISLOCK返回的一个数字,这个数字代表了DISLOCK 这个锁key的剩余生存时间。比如还剩15000毫秒的生存时间。

此时客户端2会进入一个while循环,不停的尝试加锁。

可重入加锁机制

如果客户端1都已经持有了这把锁了,结果可重入的加锁会怎么样呢?

代码语言:javascript
复制
RLock lock = redisson.getLock("DISLOCK")
lock.lock();
//业务代码
lock.lock();
//业务代码
lock.unlock();
lock.unlock();

分析上面那段lua脚本。

第一个if判断肯定不成立,“exists DISLOCK”会显示锁key已经存在了。

第二个if判断会成立,因为DISLOCK的hash数据结构中包含的那个ID,就是客户端1的那个ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”

此时就会执行可重入加锁的逻辑,他会用:

incrby DISLOCK

8743c9c0-0795-4907-87fd-6c719a6b4586:1 1

通过这个命令,对客户端1的加锁次数,累加1。

此时DISLOCK数据结构变为下面这样:

代码语言:javascript
复制
DISLOCK:
    {
        8743c9c0-0795-4907-87fd-6c719a6b4586:1 2
    }

释放锁机制

如果执行lock.unlock(),就可以释放分布式锁,此时的业务逻辑也是非常简单的。

其实说白了,就是每次都对DISLOCK数据结构中的那个加锁次数减1。

如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:

“del DISLOCK”命令,从redis里删除这个key。

然后呢,另外的客户端2就可以尝试完成加锁了。

unlock 源码

代码语言:javascript
复制
  @Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
        
//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);
    }

再深入一下,实际调用的是unlockInnerAsync方法

unlockInnerAsync方法

在这里插入图片描述
在这里插入图片描述

原理:Redision 解锁机制

上图没有截取完整,完整的源码如下:

代码语言:javascript
复制
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }

我们还是假设name=DISLOCK,假设线程ID是1

同理,我们可以知道

KEYS1是getName(),即KEYS1=DISLOCK

KEYS2是getChannelName(),即KEYS2=redisson_lock__channel:{DISLOCK}

ARGV1是LockPubSub.unlockMessage,即ARGV1=0

ARGV2是生存时间

ARGV3是getLockName(threadId),即ARGV3=8743c9c0-0795-4907-87fd-6c719a6b4586:1

因此,上面脚本的意思是:

  1、判断是否存在一个叫“DISLOCK”的key

  2、如果不存在,返回nil

  3、如果存在,使用Redis Hincrby 命令用于为哈希表中的字段值加上指定增量值 -1 ,代表减去1

  4、若counter >,返回空,若字段存在,则字段值减1

  5、若减完以后,counter > 0 值仍大于0,则返回0

  6、减完后,若字段值小于或等于0,则用 publish 命令广播一条消息,广播内容是0,并返回1;

可以猜测,广播0表示资源可用,即通知那些等待获取锁的线程现在可以获得锁了

在这里插入图片描述
在这里插入图片描述

通过redis Channel 解锁订阅

以上是正常情况下获取到锁的情况,那么当无法立即获取到锁的时候怎么办呢?

再回到前面获取锁的位置

代码语言:javascript
复制
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    //    订阅
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

    try {
        while (true) {
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}


protected static final LockPubSub PUBSUB = new LockPubSub();

protected RFuture<RedissonLockEntry> subscribe(long threadId) {
    return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
    PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

这里会订阅Channel,当资源可用时可以及时知道,并抢占,防止无效的轮询而浪费资源购买

这里的channel为:

redisson_lock__channel:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

当资源可用用的时候,循环去尝试获取锁,由于多个线程同时去竞争资源,所以这里用了信号量,对于同一个资源只允许一个线程获得锁,其它的线程阻塞

这点,有点儿类似 Zookeeper分布式锁:

有关zookeeper分布式锁的原理和实现,具体请参见下面的博客: Zookeeper 分布式锁 (图解+秒懂+史上最全)

watch dog自动延期机制

客户端1加锁的锁key默认生存时间才30秒,如果超过了30秒,客户端1还想一直持有这把锁,怎么办呢?

简单!只要客户端1一旦加锁成功,就会启动一个watch dog看门狗,他是一个后台线程,会每隔10秒检查一下,如果客户端1还持有锁key,那么就会不断的延长锁key的生存时间。


使用watchDog机制实现锁的续期

但是聪明的同学肯定会问:

有效时间设置多长,假如我的业务操作比有效时间长,我的业务代码还没执行完,就自动给我解锁了,不就完蛋了吗。

这个问题就有点棘手了,在网上也有很多讨论:

第一种解决方法就是靠程序员自己去把握,预估一下业务代码需要执行的时间,然后设置有效期时间比执行时间长一些,保证不会因为自动解锁影响到客户端业务代码的执行。

但是这并不是万全之策,比如网络抖动这种情况是无法预测的,也有可能导致业务代码执行的时间变长,所以并不安全。

第二种方法,使用监事狗watchDog机制实现锁的续期。

第二种方法比较靠谱一点,而且无业务入侵。

在Redisson框架实现分布式锁的思路,就使用watchDog机制实现锁的续期。

当加锁成功后,同时开启守护线程,默认有效期是30秒,每隔10秒就会给锁续期到30秒,只要持有锁的客户端没有宕机,就能保证一直持有锁,直到业务代码执行完毕由客户端自己解锁,如果宕机了自然就在有效期失效后自动解锁。

这里,和前面解决 JVM STW的锁过期问题有点类似,只不过,watchDog自动续期,也没有完全解决JVM STW的锁过期问题。 如何彻底解决 JVM STW的锁过期问题,可以来疯狂创客圈的社群讨论。

redisson watchdog 使用和原理

实际上,redisson加锁的基本流程图如下:

在这里插入图片描述
在这里插入图片描述

这里专注于介绍watchdog。

首先watchdog的具体思路是 加锁时,默认加锁 30秒,每10秒钟检查一次,如果存在就重新设置 过期时间为30秒。

然后设置默认加锁时间的参数是 lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)

官方文档描述如下

lockWatchdogTimeout(监控锁的看门狗超时,单位:毫秒)

默认值:30000

监控锁的看门狗超时时间单位为毫秒。该参数只适用于分布式锁的加锁请求中未明确使用leaseTimeout参数的情况。如果该看门狗未使用lockWatchdogTimeout去重新调整一个分布式锁的lockWatchdogTimeout超时,那么这个锁将变为失效状态。这个参数可以用来避免由Redisson客户端节点宕机或其他原因造成死锁的情况。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 跨JVM的线程安全问题
    • 何为分布式锁?
    • 数据库悲观锁
    • 数据库乐观锁
    • Zookeeper分布式锁
    • Redis分布式锁
    • 分布式锁一般有如下的特点:
  • 手工造轮子:基于Jedis 的API实现分布式锁
    • Redis几种架构
    • 首先看两个命令:
    • 基于Jedis API的分布式锁的总体流程:
    • 简单加锁:使用set的命令时,同时设置过期时间
    • 编写RedisLockService用于管理JedisLock
    • 测试用例
  • STW导致的锁过期问题
    • 为啥推荐使用Redission
  • Redisson简介
    • Redisson与Jedis的对比
    • Redission 的源码地址:
    • 特性 & 功能:
  • Redisson的使用
    • 如何安装 Redisson
    • 获取RedissonClient对象
    • SpringBoot整合Redisson
      • 一、导入Maven依赖
      • 二、核心配置文件
      • 三、添加配置类
      • 自定义starter
    • 使用RBucket操作分布式对象
    • 执行 Lua脚本
    • 使用 RLock 实现 Redis 分布式锁
    • 使用 RAtomicLong 实现 Redis 原子操作
    • 整长型累加器(LongAdder)
    • 序列化
    • 哨兵模式
    • 主从模式
    • 集群模式
    • tryAcquire()方法
    • tryLockInnerAsync
    • 原理:加锁机制
    • Lua脚本的详解
    • 关于 lua脚本的参数解释:
    • 锁互斥机制
    • 可重入加锁机制
    • 释放锁机制
    • unlock 源码
    • unlockInnerAsync方法
    • 原理:Redision 解锁机制
    • 通过redis Channel 解锁订阅
    • watch dog自动延期机制
  • 使用watchDog机制实现锁的续期
    • redisson watchdog 使用和原理
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档