找回密码
 立即注册
首页 业界区 业界 分布式锁—3.Redisson的公平锁

分布式锁—3.Redisson的公平锁

粒浊 2025-6-4 22:39:26
大纲
1.Redisson公平锁RedissonFairLock概述
2.公平锁源码之加锁和排队
3.公平锁源码之可重入加锁
4.公平锁源码之新旧版本对比
5.公平锁源码之队列重排
6.公平锁源码之释放锁
7.公平锁源码之按顺序依次加锁
 
1.Redisson公平锁RedissonFairLock概述
(1)非公平和公平的可重入锁
(2)Redisson公平锁的简单使用
(3)Redisson公平锁的初始化
 
(1)非公平和公平的可重入锁
一.非公平可重入锁
锁被释放后,排队获取锁的线程会重新无序获取锁,没有任何顺序性可言。
 
二.公平可重入锁
锁被释放后,排队获取锁的线程会按照请求获取锁时候的顺序去获取锁。公平锁可以保证线程获取锁的顺序,与其请求获取锁的顺序是一样的。也就是谁先申请获取到这把锁,谁就可以先获取到这把锁。公平可重入锁会把各个线程的加锁请求进行排队处理,保证先申请获取锁的线程,可以优先获取锁,从而实现所谓的公平性。
 
三.可重入的非公平锁和公平锁不同点
可重入的非公平锁和公平锁,在整体的技术实现框架上都是一样的。唯一的不同点就是加锁和解锁的逻辑不一样。非公平锁的加锁逻辑,比较简单。公平锁的加锁逻辑,要加入排队机制,保证各个线程排队能按顺序获取锁。
 
(2)Redisson公平锁的简单使用
Redisson的可重入锁RedissonLock指的是非公平可重入锁,Redisson的公平锁RedissonFairLock指的是公平可重入锁。
 
Redisson的公平可重入锁实现了java.util.concurrent.locks.Lock接口,保证了当多个线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒之后才会继续分配下一个线程。
 
RedissonFairLock是RedissonLock的子类。RedissonFairLock的锁实现框架,和RedissonLock基本一样。而在获取锁和释放锁的lua脚本中,RedissonFairLock的逻辑才有所区别。
  1. //1.最常见的使用方法
  2. RedissonClient redisson = Redisson.create(config);
  3. RLock fairLock = redisson.getFairLock("myLock");
  4. fairLock.lock();
  5. //2.10秒钟以后自动解锁,无需调用unlock方法手动解锁
  6. fairLock.lock(10, TimeUnit.SECONDS);
  7. //3.尝试加锁,最多等待100秒,上锁以后10秒自动解锁
  8. boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
  9. fairLock.unlock();
  10. //4.Redisson为公平的可重入锁提供了异步执行的相关方法
  11. RLock fairLock = redisson.getFairLock("myLock");
  12. fairLock.lockAsync();
  13. fairLock.lockAsync(10, TimeUnit.SECONDS);
  14. Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
复制代码
(3)Redisson公平锁的初始化
  1. public class RedissonDemo {
  2.     public static void main(String[] args) throws Exception {
  3.         ...
  4.         //创建RedissonClient实例
  5.         RedissonClient redisson = Redisson.create(config);
  6.         
  7.         //获取公平的可重入锁
  8.         RLock fairLock = redisson.getFairLock("myLock");
  9.         fairLock.lock();//加锁
  10.         fairLock.unlock();//释放锁
  11.     }
  12. }
  13. public class Redisson implements RedissonClient {
  14.     //Redis的连接管理器,封装了一个Config实例
  15.     protected final ConnectionManager connectionManager;
  16.     //Redis的命令执行器,封装了一个ConnectionManager实例
  17.     protected final CommandAsyncExecutor commandExecutor;
  18.     ...
  19.     protected Redisson(Config config) {
  20.         this.config = config;
  21.         Config configCopy = new Config(config);
  22.         //初始化Redis的连接管理器
  23.         connectionManager = ConfigSupport.createConnectionManager(configCopy);
  24.         ...  
  25.         //初始化Redis的命令执行器
  26.         commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
  27.         ...
  28.     }
  29.    
  30.     public RLock getFairLock(String name) {
  31.         return new RedissonFairLock(commandExecutor, name);
  32.     }
  33.     ...
  34. }
  35. public class RedissonFairLock extends RedissonLock implements RLock {
  36.     private final long threadWaitTime;
  37.     private final CommandAsyncExecutor commandExecutor;
  38.     ...
  39.     public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
  40.         this(commandExecutor, name, 60000*5);
  41.     }
  42.    
  43.     public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
  44.         super(commandExecutor, name);
  45.         this.commandExecutor = commandExecutor;
  46.         this.threadWaitTime = threadWaitTime;
  47.          ...
  48.     }
  49.     ...
  50. }
  51. public class RedissonLock extends RedissonBaseLock {
  52.     protected long internalLockLeaseTime;
  53.     final CommandAsyncExecutor commandExecutor;
  54.     ...
  55.     public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
  56.         super(commandExecutor, name);
  57.         this.commandExecutor = commandExecutor;
  58.         //与WatchDog有关的internalLockLeaseTime
  59.         //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
  60.         //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
  61.         //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间
  62.         this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
  63.         ...
  64.     }
  65.     ...
  66. }
  67. public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
  68.     ...
  69.     protected long internalLockLeaseTime;
  70.     final String id;
  71.     final String entryName;
  72.     final CommandAsyncExecutor commandExecutor;
  73.    
  74.     public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
  75.         super(commandExecutor, name);
  76.         this.commandExecutor = commandExecutor;
  77.         this.id = commandExecutor.getConnectionManager().getId();//获取UUID
  78.         this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
  79.         this.entryName = id + ":" + name;
  80.     }
  81.     ...
  82. }
  83. abstract class RedissonExpirable extends RedissonObject implements RExpirable {
  84.     RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
  85.         super(connectionManager, name);
  86.     }
  87.     ...
  88. }
  89. public abstract class RedissonObject implements RObject {
  90.     protected final CommandAsyncExecutor commandExecutor;
  91.     protected String name;
  92.     protected final Codec codec;
  93.    
  94.     public RedissonObject(CommandAsyncExecutor commandExecutor, String name) {
  95.         this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
  96.     }
  97.    
  98.     public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
  99.         this.codec = codec;
  100.         this.commandExecutor = commandExecutor;
  101.         if (name == null) {
  102.             throw new NullPointerException("name can't be null");
  103.         }
  104.         setName(name);
  105.     }
  106.     ...
  107. }
  108. public class ConfigSupport {
  109.     ...
  110.     //创建Redis的连接管理器
  111.     public static ConnectionManager createConnectionManager(Config configCopy) {
  112.         //生成UUID
  113.         UUID id = UUID.randomUUID();
  114.         ...
  115.         if (configCopy.getClusterServersConfig() != null) {
  116.             validate(configCopy.getClusterServersConfig());
  117.             //返回ClusterConnectionManager实例
  118.             return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
  119.         }
  120.         ...
  121.     }
  122.     ...
  123. }
  124. public class ClusterConnectionManager extends MasterSlaveConnectionManager {
  125.     public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
  126.         super(config, id);
  127.         ...
  128.     }
  129.     ...
  130. }
  131. public class MasterSlaveConnectionManager implements ConnectionManager {
  132.     protected final String id;//初始化时为UUID
  133.     private final Config cfg;
  134.     protected Codec codec;
  135.     ...
  136.     protected MasterSlaveConnectionManager(Config cfg, UUID id) {
  137.         this.id = id.toString();//传入的是UUID
  138.         ...
  139.         this.cfg = cfg;
  140.         this.codec = cfg.getCodec();
  141.         ...
  142.     }
  143.    
  144.     public String getId() {
  145.         return id;
  146.     }
  147.    
  148.     public Codec getCodec() {
  149.         return codec;
  150.     }
  151.     ...
  152. }
复制代码
 
2.公平锁源码之加锁和排队
(1)加锁时的执行流程
(2)获取公平锁的lua脚本相关参数说明
(3)lua脚本步骤一:进入while循环移除队列和有序集合中等待超时的线程
(4)lua脚本步骤二:判断当前线程能否获取锁
(5)lua脚本步骤三:执行获取锁的操作
(6)lua脚本步骤四:判断锁是否已经被当前线程持有(可重入锁)
(7)lua脚本步骤五:判断当前获取锁失败的线程是否已经在队列中排队
(8)lua脚本步骤六:对获取锁失败的线程进行排队
(9)获取锁失败的第一个线程执行lua脚本的流程
(10)获取锁失败的第二个线程执行lua脚本的流程
 
(1)加锁时的执行流程
使用Redisson的公平锁RedissonFairLock进行加锁时:首先调用的是RedissonLock的lock()方法,然后会调用RedissonLock的tryAcquire()方法,接着会调用RedissonLock的tryAcquireAsync()方法。
 
在RedissonLock的tryAcquireAsync()方法中,会调用一个可以被RedissonLock子类重载的tryLockInnerAsync()方法。对于非公平锁,执行到这会调用RedissonLock的tryLockInnerAsync()方法。对于公平锁,执行到这会调用RedissonFairLock的tryLockInnerAsync()方法。
 
在RedissonFairLock的tryLockInnerAsync()方法中,便执行具体的lua脚本。
[code]public class RedissonDemo {    public static void main(String[] args) throws Exception {        ...        //创建RedissonClient实例        RedissonClient redisson = Redisson.create(config);                //获取公平的可重入锁        RLock fairLock = redisson.getFairLock("myLock");        fairLock.lock();//加锁        fairLock.unlock();//释放锁    }}public class RedissonLock extends RedissonBaseLock {    ...    //不带参数的加锁    public void lock() {        try {            lock(-1, null, false);        } catch (InterruptedException e) {            throw new IllegalStateException();        }    }        //带参数的加锁    public void lock(long leaseTime, TimeUnit unit) {        try {            lock(leaseTime, unit, false);        } catch (InterruptedException e) {            throw new IllegalStateException();        }    }        private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {        long threadId = Thread.currentThread().getId();        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);        //加锁成功        if (ttl == null) {            return;        }        //加锁失败        ...    }        private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));    }        private  RFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {        RFuture ttlRemainingFuture;        if (leaseTime != -1) {            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);        } else {            //非公平锁,接下来调用的是RedissonLock.tryLockInnerAsync()方法            //公平锁,接下来调用的是RedissonFairLock.tryLockInnerAsync()方法            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);        }        //对RFuture类型的ttlRemainingFuture添加回调监听        CompletionStage f = ttlRemainingFuture.thenApply(ttlRemaining -> {            //tryLockInnerAsync()里的加锁lua脚本异步执行完毕,会回调如下方法逻辑:            //加锁成功            if (ttlRemaining == null) {                if (leaseTime != -1) {                    //如果传入的leaseTime不是-1,也就是指定锁的过期时间,那么就不创建定时调度任务                    internalLockLeaseTime = unit.toMillis(leaseTime);                } else {                    //创建定时调度任务                    scheduleExpirationRenewal(threadId);                }            }            return ttlRemaining;        });        return new CompletableFutureWrapper(f);    }    ...}public class RedissonFairLock extends RedissonLock implements RLock {    private final long threadWaitTime;//线程可以等待锁的时间    private final CommandAsyncExecutor commandExecutor;    private final String threadsQueueName;    private final String timeoutSetName;        public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {        this(commandExecutor, name, 60000*5);//传入60秒*5=5分钟    }        public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {        super(commandExecutor, name);        this.commandExecutor = commandExecutor;        this.threadWaitTime = threadWaitTime;        threadsQueueName = prefixName("redisson_lock_queue", name);        timeoutSetName = prefixName("redisson_lock_timeout", name);    }    ...    @Override     RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {        long wait = threadWaitTime;        if (waitTime != -1) {            //将传入的指定的获取锁等待时间赋值给wait变量            wait = unit.toMillis(waitTime);        }          ...        if (command == RedisCommands.EVAL_LONG) {            return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,                //步骤一:remove stale threads,移除等待超时的线程                "while true do " +                    //获取队列中的第一个元素                    //KEYS[2]是一个用来对线程排队的队列的名字                    "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +                    "if firstThreadId2 == false then " +                        "break;" +                    "end;" +                    //获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间                    //KEYS[3]是一个用来对线程排序的有序集合的名字                    "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +                    //如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除                    //ARGV[4]是当前时间                    "if timeout
您需要登录后才可以回帖 登录 | 立即注册