大纲
1.Redisson公平锁RedissonFairLock概述
2.公平锁源码之加锁和排队
3.公平锁源码之可重入加锁
4.公平锁源码之新旧版本对比
5.公平锁源码之队列重排
6.公平锁源码之释放锁
7.公平锁源码之按顺序依次加锁
1.Redisson公平锁RedissonFairLock概述
(1)非公平和公平的可重入锁
(2)Redisson公平锁的简单使用
(3)Redisson公平锁的初始化
(1)非公平和公平的可重入锁
一.非公平可重入锁
锁被释放后,排队获取锁的线程会重新无序获取锁,没有任何顺序性可言。
二.公平可重入锁
锁被释放后,排队获取锁的线程会按照请求获取锁时候的顺序去获取锁。公平锁可以保证线程获取锁的顺序,与其请求获取锁的顺序是一样的。也就是谁先申请获取到这把锁,谁就可以先获取到这把锁。公平可重入锁会把各个线程的加锁请求进行排队处理,保证先申请获取锁的线程,可以优先获取锁,从而实现所谓的公平性。
三.可重入的非公平锁和公平锁不同点
可重入的非公平锁和公平锁,在整体的技术实现框架上都是一样的。唯一的不同点就是加锁和解锁的逻辑不一样。非公平锁的加锁逻辑,比较简单。公平锁的加锁逻辑,要加入排队机制,保证各个线程排队能按顺序获取锁。
(2)Redisson公平锁的简单使用
Redisson的可重入锁RedissonLock指的是非公平可重入锁,Redisson的公平锁RedissonFairLock指的是公平可重入锁。
Redisson的公平可重入锁实现了java.util.concurrent.locks.Lock接口,保证了当多个线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒之后才会继续分配下一个线程。
RedissonFairLock是RedissonLock的子类。RedissonFairLock的锁实现框架,和RedissonLock基本一样。而在获取锁和释放锁的lua脚本中,RedissonFairLock的逻辑才有所区别。- //1.最常见的使用方法
- RedissonClient redisson = Redisson.create(config);
- RLock fairLock = redisson.getFairLock("myLock");
- fairLock.lock();
- //2.10秒钟以后自动解锁,无需调用unlock方法手动解锁
- fairLock.lock(10, TimeUnit.SECONDS);
- //3.尝试加锁,最多等待100秒,上锁以后10秒自动解锁
- boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
- fairLock.unlock();
- //4.Redisson为公平的可重入锁提供了异步执行的相关方法
- RLock fairLock = redisson.getFairLock("myLock");
- fairLock.lockAsync();
- fairLock.lockAsync(10, TimeUnit.SECONDS);
- Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
复制代码 (3)Redisson公平锁的初始化- public class RedissonDemo {
- public static void main(String[] args) throws Exception {
- ...
- //创建RedissonClient实例
- RedissonClient redisson = Redisson.create(config);
-
- //获取公平的可重入锁
- RLock fairLock = redisson.getFairLock("myLock");
- fairLock.lock();//加锁
- fairLock.unlock();//释放锁
- }
- }
- public class Redisson implements RedissonClient {
- //Redis的连接管理器,封装了一个Config实例
- protected final ConnectionManager connectionManager;
- //Redis的命令执行器,封装了一个ConnectionManager实例
- protected final CommandAsyncExecutor commandExecutor;
- ...
- protected Redisson(Config config) {
- this.config = config;
- Config configCopy = new Config(config);
- //初始化Redis的连接管理器
- connectionManager = ConfigSupport.createConnectionManager(configCopy);
- ...
- //初始化Redis的命令执行器
- commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
- ...
- }
-
- public RLock getFairLock(String name) {
- return new RedissonFairLock(commandExecutor, name);
- }
- ...
- }
- public class RedissonFairLock extends RedissonLock implements RLock {
- private final long threadWaitTime;
- private final CommandAsyncExecutor commandExecutor;
- ...
- public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
- this(commandExecutor, name, 60000*5);
- }
-
- public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
- super(commandExecutor, name);
- this.commandExecutor = commandExecutor;
- this.threadWaitTime = threadWaitTime;
- ...
- }
- ...
- }
- public class RedissonLock extends RedissonBaseLock {
- protected long internalLockLeaseTime;
- final CommandAsyncExecutor commandExecutor;
- ...
- public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
- super(commandExecutor, name);
- this.commandExecutor = commandExecutor;
- //与WatchDog有关的internalLockLeaseTime
- //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
- //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
- //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间
- this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
- ...
- }
- ...
- }
- public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
- ...
- protected long internalLockLeaseTime;
- final String id;
- final String entryName;
- final CommandAsyncExecutor commandExecutor;
-
- public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
- super(commandExecutor, name);
- this.commandExecutor = commandExecutor;
- this.id = commandExecutor.getConnectionManager().getId();//获取UUID
- this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
- this.entryName = id + ":" + name;
- }
- ...
- }
- abstract class RedissonExpirable extends RedissonObject implements RExpirable {
- RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
- super(connectionManager, name);
- }
- ...
- }
- public abstract class RedissonObject implements RObject {
- protected final CommandAsyncExecutor commandExecutor;
- protected String name;
- protected final Codec codec;
-
- public RedissonObject(CommandAsyncExecutor commandExecutor, String name) {
- this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
- }
-
- public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
- this.codec = codec;
- this.commandExecutor = commandExecutor;
- if (name == null) {
- throw new NullPointerException("name can't be null");
- }
- setName(name);
- }
- ...
- }
- public class ConfigSupport {
- ...
- //创建Redis的连接管理器
- public static ConnectionManager createConnectionManager(Config configCopy) {
- //生成UUID
- UUID id = UUID.randomUUID();
- ...
- if (configCopy.getClusterServersConfig() != null) {
- validate(configCopy.getClusterServersConfig());
- //返回ClusterConnectionManager实例
- return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
- }
- ...
- }
- ...
- }
- public class ClusterConnectionManager extends MasterSlaveConnectionManager {
- public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
- super(config, id);
- ...
- }
- ...
- }
- public class MasterSlaveConnectionManager implements ConnectionManager {
- protected final String id;//初始化时为UUID
- private final Config cfg;
- protected Codec codec;
- ...
- protected MasterSlaveConnectionManager(Config cfg, UUID id) {
- this.id = id.toString();//传入的是UUID
- ...
- this.cfg = cfg;
- this.codec = cfg.getCodec();
- ...
- }
-
- public String getId() {
- return id;
- }
-
- public Codec getCodec() {
- return codec;
- }
- ...
- }
复制代码
2.公平锁源码之加锁和排队
(1)加锁时的执行流程
(2)获取公平锁的lua脚本相关参数说明
(3)lua脚本步骤一:进入while循环移除队列和有序集合中等待超时的线程
(4)lua脚本步骤二:判断当前线程能否获取锁
(5)lua脚本步骤三:执行获取锁的操作
(6)lua脚本步骤四:判断锁是否已经被当前线程持有(可重入锁)
(7)lua脚本步骤五:判断当前获取锁失败的线程是否已经在队列中排队
(8)lua脚本步骤六:对获取锁失败的线程进行排队
(9)获取锁失败的第一个线程执行lua脚本的流程
(10)获取锁失败的第二个线程执行lua脚本的流程
(1)加锁时的执行流程
使用Redisson的公平锁RedissonFairLock进行加锁时:首先调用的是RedissonLock的lock()方法,然后会调用RedissonLock的tryAcquire()方法,接着会调用RedissonLock的tryAcquireAsync()方法。
在RedissonLock的tryAcquireAsync()方法中,会调用一个可以被RedissonLock子类重载的tryLockInnerAsync()方法。对于非公平锁,执行到这会调用RedissonLock的tryLockInnerAsync()方法。对于公平锁,执行到这会调用RedissonFairLock的tryLockInnerAsync()方法。
在RedissonFairLock的tryLockInnerAsync()方法中,便执行具体的lua脚本。
[code]public class RedissonDemo { public static void main(String[] args) throws Exception { ... //创建RedissonClient实例 RedissonClient redisson = Redisson.create(config); //获取公平的可重入锁 RLock fairLock = redisson.getFairLock("myLock"); fairLock.lock();//加锁 fairLock.unlock();//释放锁 }}public class RedissonLock extends RedissonBaseLock { ... //不带参数的加锁 public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } //带参数的加锁 public void lock(long leaseTime, TimeUnit unit) { try { lock(leaseTime, unit, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); //加锁成功 if (ttl == null) { return; } //加锁失败 ... } private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } private RFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture ttlRemainingFuture; if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //非公平锁,接下来调用的是RedissonLock.tryLockInnerAsync()方法 //公平锁,接下来调用的是RedissonFairLock.tryLockInnerAsync()方法 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } //对RFuture类型的ttlRemainingFuture添加回调监听 CompletionStage f = ttlRemainingFuture.thenApply(ttlRemaining -> { //tryLockInnerAsync()里的加锁lua脚本异步执行完毕,会回调如下方法逻辑: //加锁成功 if (ttlRemaining == null) { if (leaseTime != -1) { //如果传入的leaseTime不是-1,也就是指定锁的过期时间,那么就不创建定时调度任务 internalLockLeaseTime = unit.toMillis(leaseTime); } else { //创建定时调度任务 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper(f); } ...}public class RedissonFairLock extends RedissonLock implements RLock { private final long threadWaitTime;//线程可以等待锁的时间 private final CommandAsyncExecutor commandExecutor; private final String threadsQueueName; private final String timeoutSetName; public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) { this(commandExecutor, name, 60000*5);//传入60秒*5=5分钟 } public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.threadWaitTime = threadWaitTime; threadsQueueName = prefixName("redisson_lock_queue", name); timeoutSetName = prefixName("redisson_lock_timeout", name); } ... @Override RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { long wait = threadWaitTime; if (waitTime != -1) { //将传入的指定的获取锁等待时间赋值给wait变量 wait = unit.toMillis(waitTime); } ... if (command == RedisCommands.EVAL_LONG) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, //步骤一:remove stale threads,移除等待超时的线程 "while true do " + //获取队列中的第一个元素 //KEYS[2]是一个用来对线程排队的队列的名字 "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" + "if firstThreadId2 == false then " + "break;" + "end;" + //获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间 //KEYS[3]是一个用来对线程排序的有序集合的名字 "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" + //如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除 //ARGV[4]是当前时间 "if timeout |