分布式锁—3.Redisson的公平锁
大纲1.Redisson公平锁RedissonFairLock概述
2.公平锁源码之加锁和排队
3.公平锁源码之可重入加锁
4.公平锁源码之新旧版本对比
5.公平锁源码之队列重排
6.公平锁源码之释放锁
7.公平锁源码之按顺序依次加锁
1.Redisson公平锁RedissonFairLock概述
(1)非公平和公平的可重入锁
(2)Redisson公平锁的简单使用
(3)Redisson公平锁的初始化
(1)非公平和公平的可重入锁
一.非公平可重入锁
锁被释放后,排队获取锁的线程会重新无序获取锁,没有任何顺序性可言。
二.公平可重入锁
锁被释放后,排队获取锁的线程会按照请求获取锁时候的顺序去获取锁。公平锁可以保证线程获取锁的顺序,与其请求获取锁的顺序是一样的。也就是谁先申请获取到这把锁,谁就可以先获取到这把锁。公平可重入锁会把各个线程的加锁请求进行排队处理,保证先申请获取锁的线程,可以优先获取锁,从而实现所谓的公平性。
三.可重入的非公平锁和公平锁不同点
可重入的非公平锁和公平锁,在整体的技术实现框架上都是一样的。唯一的不同点就是加锁和解锁的逻辑不一样。非公平锁的加锁逻辑,比较简单。公平锁的加锁逻辑,要加入排队机制,保证各个线程排队能按顺序获取锁。
(2)Redisson公平锁的简单使用
Redisson的可重入锁RedissonLock指的是非公平可重入锁,Redisson的公平锁RedissonFairLock指的是公平可重入锁。
Redisson的公平可重入锁实现了java.util.concurrent.locks.Lock接口,保证了当多个线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒之后才会继续分配下一个线程。
RedissonFairLock是RedissonLock的子类。RedissonFairLock的锁实现框架,和RedissonLock基本一样。而在获取锁和释放锁的lua脚本中,RedissonFairLock的逻辑才有所区别。
//1.最常见的使用方法
RedissonClient redisson = Redisson.create(config);
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lock();
//2.10秒钟以后自动解锁,无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);
//3.尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
fairLock.unlock();
//4.Redisson为公平的可重入锁提供了异步执行的相关方法
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);(3)Redisson公平锁的初始化
public class RedissonDemo {
public static void main(String[] args) throws Exception {
...
//创建RedissonClient实例
RedissonClient redisson = Redisson.create(config);
//获取公平的可重入锁
RLock fairLock = redisson.getFairLock("myLock");
fairLock.lock();//加锁
fairLock.unlock();//释放锁
}
}
public class Redisson implements RedissonClient {
//Redis的连接管理器,封装了一个Config实例
protected final ConnectionManager connectionManager;
//Redis的命令执行器,封装了一个ConnectionManager实例
protected final CommandAsyncExecutor commandExecutor;
...
protected Redisson(Config config) {
this.config = config;
Config configCopy = new Config(config);
//初始化Redis的连接管理器
connectionManager = ConfigSupport.createConnectionManager(configCopy);
...
//初始化Redis的命令执行器
commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
...
}
public RLock getFairLock(String name) {
return new RedissonFairLock(commandExecutor, name);
}
...
}
public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime;
private final CommandAsyncExecutor commandExecutor;
...
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor, name, 60000*5);
}
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.threadWaitTime = threadWaitTime;
...
}
...
}
public class RedissonLock extends RedissonBaseLock {
protected long internalLockLeaseTime;
final CommandAsyncExecutor commandExecutor;
...
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
//与WatchDog有关的internalLockLeaseTime
//通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
//通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
//通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
...
}
...
}
public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
...
protected long internalLockLeaseTime;
final String id;
final String entryName;
final CommandAsyncExecutor commandExecutor;
public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
this.id = commandExecutor.getConnectionManager().getId();//获取UUID
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
...
}
abstract class RedissonExpirable extends RedissonObject implements RExpirable {
RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
super(connectionManager, name);
}
...
}
public abstract class RedissonObject implements RObject {
protected final CommandAsyncExecutor commandExecutor;
protected String name;
protected final Codec codec;
public RedissonObject(CommandAsyncExecutor commandExecutor, String name) {
this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
this.codec = codec;
this.commandExecutor = commandExecutor;
if (name == null) {
throw new NullPointerException("name can't be null");
}
setName(name);
}
...
}
public class ConfigSupport {
...
//创建Redis的连接管理器
public static ConnectionManager createConnectionManager(Config configCopy) {
//生成UUID
UUID id = UUID.randomUUID();
...
if (configCopy.getClusterServersConfig() != null) {
validate(configCopy.getClusterServersConfig());
//返回ClusterConnectionManager实例
return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);
}
...
}
...
}
public class ClusterConnectionManager extends MasterSlaveConnectionManager {
public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {
super(config, id);
...
}
...
}
public class MasterSlaveConnectionManager implements ConnectionManager {
protected final String id;//初始化时为UUID
private final Config cfg;
protected Codec codec;
...
protected MasterSlaveConnectionManager(Config cfg, UUID id) {
this.id = id.toString();//传入的是UUID
...
this.cfg = cfg;
this.codec = cfg.getCodec();
...
}
public String getId() {
return id;
}
public Codec getCodec() {
return codec;
}
...
}
2.公平锁源码之加锁和排队
(1)加锁时的执行流程
(2)获取公平锁的lua脚本相关参数说明
(3)lua脚本步骤一:进入while循环移除队列和有序集合中等待超时的线程
(4)lua脚本步骤二:判断当前线程能否获取锁
(5)lua脚本步骤三:执行获取锁的操作
(6)lua脚本步骤四:判断锁是否已经被当前线程持有(可重入锁)
(7)lua脚本步骤五:判断当前获取锁失败的线程是否已经在队列中排队
(8)lua脚本步骤六:对获取锁失败的线程进行排队
(9)获取锁失败的第一个线程执行lua脚本的流程
(10)获取锁失败的第二个线程执行lua脚本的流程
(1)加锁时的执行流程
使用Redisson的公平锁RedissonFairLock进行加锁时:首先调用的是RedissonLock的lock()方法,然后会调用RedissonLock的tryAcquire()方法,接着会调用RedissonLock的tryAcquireAsync()方法。
在RedissonLock的tryAcquireAsync()方法中,会调用一个可以被RedissonLock子类重载的tryLockInnerAsync()方法。对于非公平锁,执行到这会调用RedissonLock的tryLockInnerAsync()方法。对于公平锁,执行到这会调用RedissonFairLock的tryLockInnerAsync()方法。
在RedissonFairLock的tryLockInnerAsync()方法中,便执行具体的lua脚本。
public class RedissonDemo { public static void main(String[] args) throws Exception { ... //创建RedissonClient实例 RedissonClient redisson = Redisson.create(config); //获取公平的可重入锁 RLock fairLock = redisson.getFairLock("myLock"); fairLock.lock();//加锁 fairLock.unlock();//释放锁 }}public class RedissonLock extends RedissonBaseLock { ... //不带参数的加锁 public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } //带参数的加锁 public void lock(long leaseTime, TimeUnit unit) { try { lock(leaseTime, unit, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(-1, leaseTime, unit, threadId); //加锁成功 if (ttl == null) { return; } //加锁失败 ... } private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId)); } privateRFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) { RFuture ttlRemainingFuture; if (leaseTime != -1) { ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } else { //非公平锁,接下来调用的是RedissonLock.tryLockInnerAsync()方法 //公平锁,接下来调用的是RedissonFairLock.tryLockInnerAsync()方法 ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); } //对RFuture类型的ttlRemainingFuture添加回调监听 CompletionStage f = ttlRemainingFuture.thenApply(ttlRemaining -> { //tryLockInnerAsync()里的加锁lua脚本异步执行完毕,会回调如下方法逻辑: //加锁成功 if (ttlRemaining == null) { if (leaseTime != -1) { //如果传入的leaseTime不是-1,也就是指定锁的过期时间,那么就不创建定时调度任务 internalLockLeaseTime = unit.toMillis(leaseTime); } else { //创建定时调度任务 scheduleExpirationRenewal(threadId); } } return ttlRemaining; }); return new CompletableFutureWrapper(f); } ...}public class RedissonFairLock extends RedissonLock implements RLock { private final long threadWaitTime;//线程可以等待锁的时间 private final CommandAsyncExecutor commandExecutor; private final String threadsQueueName; private final String timeoutSetName; public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name) { this(commandExecutor, name, 60000*5);//传入60秒*5=5分钟 } public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.threadWaitTime = threadWaitTime; threadsQueueName = prefixName("redisson_lock_queue", name); timeoutSetName = prefixName("redisson_lock_timeout", name); } ... @Override RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { long wait = threadWaitTime; if (waitTime != -1) { //将传入的指定的获取锁等待时间赋值给wait变量 wait = unit.toMillis(waitTime); } ... if (command == RedisCommands.EVAL_LONG) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, //步骤一:remove stale threads,移除等待超时的线程 "while true do " + //获取队列中的第一个元素 //KEYS是一个用来对线程排队的队列的名字 "local firstThreadId2 = redis.call('lindex', KEYS, 0);" + "if firstThreadId2 == false then " + "break;" + "end;" + //获取队列中第一个元素对应的分数,也就是排第一的线程的过期时间 //KEYS是一个用来对线程排序的有序集合的名字 "local timeout = tonumber(redis.call('zscore', KEYS, firstThreadId2));" + //如果排第一的线程的过期时间小于当前时间,说明该线程等待超时了都还没获取到锁,所以要移除 //ARGV是当前时间 "if timeout
页:
[1]