粒浊 发表于 2025-6-4 22:39:26

分布式锁—3.Redisson的公平锁

大纲
1.Redisson公平锁RedissonFairLock概述
2.公平锁源码之加锁和排队
3.公平锁源码之可重入加锁
4.公平锁源码之新旧版本对比
5.公平锁源码之队列重排
6.公平锁源码之释放锁
7.公平锁源码之按顺序依次加锁
 
1.Redisson公平锁RedissonFairLock概述
(1)非公平和公平的可重入锁
(2)Redisson公平锁的简单使用
(3)Redisson公平锁的初始化
 
(1)非公平和公平的可重入锁
一.非公平可重入锁
锁被释放后,排队获取锁的线程会重新无序获取锁,没有任何顺序性可言。
 
二.公平可重入锁
锁被释放后,排队获取锁的线程会按照请求获取锁时候的顺序去获取锁。公平锁可以保证线程获取锁的顺序,与其请求获取锁的顺序是一样的。也就是谁先申请获取到这把锁,谁就可以先获取到这把锁。公平可重入锁会把各个线程的加锁请求进行排队处理,保证先申请获取锁的线程,可以优先获取锁,从而实现所谓的公平性。
 
三.可重入的非公平锁和公平锁不同点
可重入的非公平锁和公平锁,在整体的技术实现框架上都是一样的。唯一的不同点就是加锁和解锁的逻辑不一样。非公平锁的加锁逻辑,比较简单。公平锁的加锁逻辑,要加入排队机制,保证各个线程排队能按顺序获取锁。
 
(2)Redisson公平锁的简单使用
Redisson的可重入锁RedissonLock指的是非公平可重入锁,Redisson的公平锁RedissonFairLock指的是公平可重入锁。
 
Redisson的公平可重入锁实现了java.util.concurrent.locks.Lock接口,保证了当多个线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒之后才会继续分配下一个线程。
 
RedissonFairLock是RedissonLock的子类。RedissonFairLock的锁实现框架,和RedissonLock基本一样。而在获取锁和释放锁的lua脚本中,RedissonFairLock的逻辑才有所区别。
//1.最常见的使用方法
RedissonClient redisson = Redisson.create(config);
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lock();

//2.10秒钟以后自动解锁,无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);

//3.尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
fairLock.unlock();

//4.Redisson为公平的可重入锁提供了异步执行的相关方法
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);(3)Redisson公平锁的初始化
public class RedissonDemo {
    public static void main(String[] args) throws Exception {
      ...
      //创建RedissonClient实例
      RedissonClient redisson = Redisson.create(config);
      
      //获取公平的可重入锁
      RLock fairLock = redisson.getFairLock("myLock");
      fairLock.lock();//加锁
      fairLock.unlock();//释放锁
    }
}

public class Redisson implements RedissonClient {
    //Redis的连接管理器,封装了一个Config实例
    protected final ConnectionManager connectionManager;
    //Redis的命令执行器,封装了一个ConnectionManager实例
    protected final CommandAsyncExecutor commandExecutor;
    ...
    protected Redisson(Config config) {
      this.config = config;
      Config configCopy = new Config(config);
      //初始化Redis的连接管理器
      connectionManager = ConfigSupport.createConnectionManager(configCopy);
      ...
      //初始化Redis的命令执行器
      commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
      ...
    }
   
    public RLock getFairLock(String name) {
      return new RedissonFairLock(commandExecutor, name);
    }
    ...
}

public class RedissonFairLock extends RedissonLock implements RLock {
    private final long threadWaitTime;
    private final CommandAsyncExecutor commandExecutor;
    ...
    public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
      this(commandExecutor, name, 60000*5);
    }
   
    public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
      super(commandExecutor, name);
      this.commandExecutor = commandExecutor;
      this.threadWaitTime = threadWaitTime;
         ...
    }
    ...
}

public class RedissonLock extends RedissonBaseLock {
    protected long internalLockLeaseTime;
    final CommandAsyncExecutor commandExecutor;
    ...
    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
      super(commandExecutor, name);
      this.commandExecutor = commandExecutor;
      //与WatchDog有关的internalLockLeaseTime
      //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
      //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
      //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间
      this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
      ...
    }
    ...
}

public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
    ...
    protected long internalLockLeaseTime;
    final String id;
    final String entryName;
    final CommandAsyncExecutor commandExecutor;
   
    public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
      super(commandExecutor, name);
      this.commandExecutor = commandExecutor;
      this.id = commandExecutor.getConnectionManager().getId();//获取UUID
      this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
      this.entryName = id + ":" + name;
    }
    ...
}

abstract class RedissonExpirable extends RedissonObject implements RExpirable {
    RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
      super(connectionManager, name);
    }
    ...
}

public abstract class RedissonObject implements RObject {
    protected final CommandAsyncExecutor commandExecutor;
    protected String name;
    protected final Codec codec;
   
    public RedissonObject(CommandAsyncExecutor commandExecutor, String name) {
      this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
    }
   
    public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
      this.codec = codec;
      this.commandExecutor = commandExecutor;
      if (name == null) {
            throw new NullPointerException("name can't be null");
      }
      setName(name);
    }
    ...
}

public class ConfigSupport {
    ...
    //创建Redis的连接管理器
    public static ConnectionManager createConnectionManager(Config configCopy) {
      //生成UUID
      UUID id = UUID.randomUUID();
      ...
      if (configCopy.getClusterServersConfig() != null) {
            validate(configCopy.getClusterServersConfig());
            //返回ClusterConnectionManager实例
            return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
      }
      ...
    }
    ...
}

public class ClusterConnectionManager extends MasterSlaveConnectionManager {
    public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
      super(config, id);
      ...
    }
    ...
}

public class MasterSlaveConnectionManager implements ConnectionManager {
    protected final String id;//初始化时为UUID
    private final Config cfg;
    protected Codec codec;
    ...
    protected MasterSlaveConnectionManager(Config cfg, UUID id) {
      this.id = id.toString();//传入的是UUID
      ...
      this.cfg = cfg;
      this.codec = cfg.getCodec();
      ...
    }
   
    public String getId() {
      return id;
    }
   
    public Codec getCodec() {
      return codec;
    }
    ...

2.公平锁源码之加锁和排队
(1)加锁时的执行流程
(2)获取公平锁的lua脚本相关参数说明
(3)lua脚本步骤一:进入while循环移除队列和有序集合中等待超时的线程
(4)lua脚本步骤二:判断当前线程能否获取锁
(5)lua脚本步骤三:执行获取锁的操作
(6)lua脚本步骤四:判断锁是否已经被当前线程持有(可重入锁)
(7)lua脚本步骤五:判断当前获取锁失败的线程是否已经在队列中排队
(8)lua脚本步骤六:对获取锁失败的线程进行排队
(9)获取锁失败的第一个线程执行lua脚本的流程
(10)获取锁失败的第二个线程执行lua脚本的流程
 
(1)加锁时的执行流程
使用Redisson的公平锁RedissonFairLock进行加锁时:首先调用的是RedissonLock的lock()方法,然后会调用RedissonLock的tryAcquire()方法,接着会调用RedissonLock的tryAcquireAsync()方法。
 
在RedissonLock的tryAcquireAsync()方法中,会调用一个可以被RedissonLock子类重载的tryLockInnerAsync()方法。对于非公平锁,执行到这会调用RedissonLock的tryLockInnerAsync()方法。对于公平锁,执行到这会调用RedissonFairLock的tryLockInnerAsync()方法。
 
在RedissonFairLock的tryLockInnerAsync()方法中,便执行具体的lua脚本。
public class RedissonDemo {    public static void main(String[] args) throws Exception {      ...      //创建RedissonClient实例      RedissonClient redisson = Redisson.create(config);                //获取公平的可重入锁      RLock fairLock = redisson.getFairLock("myLock");      fairLock.lock();//加锁      fairLock.unlock();//释放锁    }}public class RedissonLock extends RedissonBaseLock {    ...    //不带参数的加锁    public void lock() {      try {            lock(-1, null, false);      } catch (InterruptedException e) {            throw new IllegalStateException();      }    }      //带参数的加锁    public void lock(long leaseTime, TimeUnit unit) {      try {            lock(leaseTime, unit, false);      } catch (InterruptedException e) {            throw new IllegalStateException();      }    }      private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {      long threadId = Thread.currentThread().getId();      Long ttl = tryAcquire(-1, leaseTime, unit, threadId);      //加锁成功      if (ttl == null) {            return;      }      //加锁失败      ...    }      private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {      return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));    }      privateRFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {      RFuture ttlRemainingFuture;      if (leaseTime != -1) {            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);      } else {            //非公平锁,接下来调用的是RedissonLock.tryLockInnerAsync()方法            //公平锁,接下来调用的是RedissonFairLock.tryLockInnerAsync()方法            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);      }      //对RFuture类型的ttlRemainingFuture添加回调监听      CompletionStage f = ttlRemainingFuture.thenApply(ttlRemaining -> {            //tryLockInnerAsync()里的加锁lua脚本异步执行完毕,会回调如下方法逻辑:            //加锁成功            if (ttlRemaining == null) {                if (leaseTime != -1) {                  //如果传入的leaseTime不是-1,也就是指定锁的过期时间,那么就不创建定时调度任务                  internalLockLeaseTime = unit.toMillis(leaseTime);                } else {                  //创建定时调度任务                  scheduleExpirationRenewal(threadId);                }            }            return ttlRemaining;      });      return new CompletableFutureWrapper(f);    }    ...}public class RedissonFairLock extends RedissonLock implements RLock {    private final long threadWaitTime;//线程可以等待锁的时间    private final CommandAsyncExecutor commandExecutor;    private final String threadsQueueName;    private final String timeoutSetName;      public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {      this(commandExecutor, name, 60000*5);//传入60秒*5=5分钟    }      public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {      super(commandExecutor, name);      this.commandExecutor = commandExecutor;      this.threadWaitTime = threadWaitTime;      threadsQueueName = prefixName("redisson_lock_queue", name);      timeoutSetName = prefixName("redisson_lock_timeout", name);    }    ...    @Override   RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {      long wait = threadWaitTime;      if (waitTime != -1) {            //将传入的指定的获取锁等待时间赋值给wait变量            wait = unit.toMillis(waitTime);      }          ...      if (command == RedisCommands.EVAL_LONG) {            return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,                //步骤一:remove stale threads,移除等待超时的线程                "while true do " +                  //获取队列中的第一个元素                  //KEYS是一个用来对线程排队的队列的名字                  "local firstThreadId2 = redis.call('lindex', KEYS, 0);" +                  "if firstThreadId2 == false then " +                        "break;" +                  "end;" +                  //获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间                  //KEYS是一个用来对线程排序的有序集合的名字                  "local timeout = tonumber(redis.call('zscore', KEYS, firstThreadId2));" +                  //如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除                  //ARGV是当前时间                  "if timeout
页: [1]
查看完整版本: 分布式锁—3.Redisson的公平锁