找回密码
 立即注册
首页 业界区 业界 分布式锁—5.Redisson的读写锁

分布式锁—5.Redisson的读写锁

兜蛇 2025-6-4 22:11:59
大纲
1.Redisson读写锁RedissonReadWriteLock概述
2.读锁RedissonReadLock的获取读锁逻辑
3.写锁RedissonWriteLock的获取写锁逻辑
4.读锁RedissonReadLock的读读不互斥逻辑
5.RedissonReadLock和RedissonWriteLock的读写互斥逻辑
6.写锁RedissonWriteLock的写写互斥逻辑
7.写锁RedissonWriteLock的可重入逻辑
8.读锁RedissonReadLock的释放读锁逻辑
9.写锁RedissonWriteLock的释放写锁逻辑
 
1.Redisson读写锁RedissonReadWriteLock概述
(1)RedissonReadWriteLock的简介
(2)RedissonReadWriteLock的使用
(3)RedissonReadWriteLock的初始化
 
(1)RedissonReadWriteLock的简介
RedissonReadWriteLock提供了两个方法分别获取读锁和写锁。
 
RedissonReadWriteLock的readLock()方法可以获取读锁RedissonReadLock。
 
RedissonReadWriteLock的writeLock()方法可以获取写锁RedissonWriteLock。
 
由于RedissonReadLock和RedissonWriteLock都是RedissonLock的子类,所以只需关注RedissonReadLock和RedissonWriteLock的如下内容即可。
 
一是获取读锁(写锁)的lua脚本逻辑
二是释放读锁(写锁)的lua脚本逻辑
三是读锁(写锁)的WathDog检查读锁(写锁)和处理锁过期时间的逻辑
 
(2)RedissonReadWriteLock的使用
  1. //读写锁
  2. RedissonClient redisson = Redisson.create(config);
  3. RReadWriteLock rwlock = redisson.getReadWriteLock("myLock");
  4. rwlock.readLock().lock();//获取读锁
  5. rwlock.readLock().unlock();//释放读锁
  6. rwlock.writeLock().lock();//获取写锁
  7. rwlock.writeLock().unlock();//释放写锁
  8. ---------------------------------------------------------------
  9. //如果没有主动释放锁的话,10秒后将会自动释放锁
  10. rwlock.readLock().lock(10, TimeUnit.SECONDS);
  11. rwlock.writeLock().lock(10, TimeUnit.SECONDS);
  12. //加锁等待最多是100秒;加锁成功后如果没有主动释放锁的话,锁会在10秒后自动释放
  13. boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
  14. boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
复制代码
(3)RedissonReadWriteLock的初始化
RedissonReadWriteLock实现了RReadWriteLock接口,RedissonReadLock实现了RLock接口,RedissonWriteLock实现了RLock接口。
  1. public class Redisson implements RedissonClient {
  2.     //Redis的连接管理器,封装了一个Config实例
  3.     protected final ConnectionManager connectionManager;
  4.     //Redis的命令执行器,封装了一个ConnectionManager实例
  5.     protected final CommandAsyncExecutor commandExecutor;
  6.     ...
  7.    
  8.     protected Redisson(Config config) {
  9.         this.config = config;
  10.         Config configCopy = new Config(config);
  11.         //初始化Redis的连接管理器
  12.         connectionManager = ConfigSupport.createConnectionManager(configCopy);
  13.         ...  
  14.         //初始化Redis的命令执行器
  15.         commandExecutor = new CommandSyncService(connectionManager, objectBuilder);
  16.         ...
  17.     }
  18.    
  19.     @Override
  20.     public RReadWriteLock getReadWriteLock(String name) {
  21.         return new RedissonReadWriteLock(commandExecutor, name);
  22.     }
  23.     ...
  24. }
  25. public class RedissonReadWriteLock extends RedissonExpirable implements RReadWriteLock {
  26.     public RedissonReadWriteLock(CommandAsyncExecutor commandExecutor, String name) {
  27.         super(commandExecutor, name);
  28.     }
  29.    
  30.     @Override
  31.     public RLock readLock() {
  32.         return new RedissonReadLock(commandExecutor, getRawName());
  33.     }
  34.    
  35.     @Override
  36.     public RLock writeLock() {
  37.         return new RedissonWriteLock(commandExecutor, getRawName());
  38.     }
  39. }
  40. public class RedissonReadLock extends RedissonLock implements RLock {
  41.     public RedissonReadLock(CommandAsyncExecutor commandExecutor, String name) {
  42.         super(commandExecutor, name);
  43.     }
  44.     ...
  45. }
  46. public class RedissonWriteLock extends RedissonLock implements RLock {
  47.     protected RedissonWriteLock(CommandAsyncExecutor commandExecutor, String name) {
  48.         super(commandExecutor, name);
  49.     }
  50.     ...
  51. }
复制代码
 
2.读锁RedissonReadLock的获取读锁逻辑
(1)加读锁的lua脚本逻辑
(2)WathDog处理读锁过期时间的lua脚本逻辑
 
(1)加读锁的lua脚本逻辑
假设客户端A的线程(UUID1:ThreadID1)作为第一个线程进来加读锁,执行流程如下:
  1. public class RedissonLock extends RedissonBaseLock {
  2.     ...
  3.     //不带参数的加锁
  4.     public void lock() {
  5.         ...
  6.         lock(-1, null, false);
  7.         ...
  8.     }
  9.    
  10.     //带参数的加锁
  11.     public void lock(long leaseTime, TimeUnit unit) {
  12.         ...
  13.        lock(leaseTime, unit, false);
  14.         ...
  15.     }
  16.    
  17.     private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  18.         long threadId = Thread.currentThread().getId();
  19.         Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
  20.         //加锁成功
  21.         if (ttl == null) {
  22.             return;
  23.         }
  24.         //加锁失败
  25.         ...
  26.     }
  27.    
  28.     private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  29.         return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
  30.     }
  31.    
  32.     private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  33.         RFuture<Long> ttlRemainingFuture;
  34.         if (leaseTime != -1) {
  35.             ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  36.         } else {
  37.             //非公平锁,接下来调用的是RedissonLock.tryLockInnerAsync()方法
  38.             //公平锁,接下来调用的是RedissonFairLock.tryLockInnerAsync()方法
  39.             //读写锁中的读锁,接下来调用RedissonReadLock.tryLockInnerAsync()方法
  40.             ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  41.         }
  42.       
  43.         //对RFuture<Long>类型的ttlRemainingFuture添加回调监听
  44.         CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
  45.             //tryLockInnerAsync()里的加锁lua脚本异步执行完毕,会回调如下方法逻辑:
  46.             //加锁成功
  47.             if (ttlRemaining == null) {
  48.                 if (leaseTime != -1) {
  49.                     //如果传入的leaseTime不是-1,也就是指定锁的过期时间,那么就不创建定时调度任务
  50.                     internalLockLeaseTime = unit.toMillis(leaseTime);
  51.                 } else {
  52.                     //创建定时调度任务
  53.                     scheduleExpirationRenewal(threadId);
  54.                 }
  55.             }
  56.             return ttlRemaining;
  57.         });
  58.         return new CompletableFutureWrapper<>(f);
  59.     }
  60.     ...
  61. }
  62. public class RedissonReadLock extends RedissonLock implements RLock {
  63.     ...
  64.     @Override
  65.     <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  66.         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
  67.             //执行命令"hget myLock mode",尝试获取一个Hash值mode
  68.             "local mode = redis.call('hget', KEYS[1], 'mode'); " +
  69.             //mode为false则执行加读锁的逻辑
  70.             "if (mode == false) then " +
  71.                 //hset myLock mode read
  72.                 "redis.call('hset', KEYS[1], 'mode', 'read'); " +
  73.                 //hset myLock UUID1:ThreadID1 1
  74.                 "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  75.                 //set {myLock}:UUID1:ThreadID1:rwlock_timeout:1 1
  76.                 "redis.call('set', KEYS[2] .. ':1', 1); " +
  77.                 //pexpire {myLock}:UUID1:ThreadID1:rwlock_timeout:1 30000
  78.                 "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
  79.                 //pexpire myLock 30000
  80.                 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  81.                 "return nil; " +
  82.             "end; " +
  83.             //如果已经有线程加了读锁 或者 有线程加了写锁且是自己加的写锁
  84.             //所以一个线程如果加了写锁,它是可以重入自己的写锁和自己的读锁的
  85.             "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
  86.                 //hincrby myLock UUID2:ThreadID2 1
  87.                 //ind表示重入次数,线程可以重入自己的读锁和写锁,线程后加的读锁可以重入线程自己的读锁或写锁
  88.                 "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  89.                 //key = {myLock}:UUID2:ThreadID2:rwlock_timeout:1
  90.                 "local key = KEYS[2] .. ':' .. ind;" +
  91.                 //set {myLock}:UUID2:ThreadID2:rwlock_timeout:1 1
  92.                 "redis.call('set', key, 1); " +
  93.                 //pexpire myLock 30000
  94.                 "redis.call('pexpire', key, ARGV[1]); " +
  95.                 "local remainTime = redis.call('pttl', KEYS[1]); " +
  96.                 //pexpire {myLock}:UUID2:ThreadID2:rwlock_timeout:1 30000
  97.                 "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
  98.                 "return nil; " +
  99.             "end;" +
  100.             //执行命令"pttl myLock",返回myLock的剩余过期时间
  101.             "return redis.call('pttl', KEYS[1]);",
  102.             //KEYS[1] = myLock
  103.             //KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout 或 KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeout
  104.             Arrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)),
  105.             unit.toMillis(leaseTime),//ARGV[1] = 30000
  106.             getLockName(threadId),//ARGV[2] = UUID1:ThreadID1 或 ARGV[2] = UUID2:ThreadID2
  107.             getWriteLockName(threadId)//ARGV[3] = UUID1:ThreadID1:write 或 ARGV[3] = UUID2:ThreadID2:write
  108.         );
  109.     }
  110.     ...
  111. }
复制代码
一.参数说明
  1. KEYS[1] = myLock
  2. KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout
  3. ARGV[1] = 30000
  4. ARGV[2] = UUID1:ThreadID1
  5. ARGV[3] = UUID1:ThreadID1:write
复制代码
二.执行lua脚本的获取读锁逻辑
首先执行命令"hget myLock mode",尝试获取一个Hash值mode,也就是从key为myLock的Hash值里获取一个field为mode的value值。但是此时一开始都还没有加锁,所以mode肯定是false。于是就执行如下加读锁的逻辑:设置两个Hash值 + 设置一个字符串。
  1. hset myLock mode read
  2. //用来记录当前客户端线程重入锁的次数
  3. hset myLock UUID1:ThreadID1 1
  4. //用来记录当前客户端线程第1个重入锁过期时间
  5. set {myLock}:UUID1:ThreadID1:rwlock_timeout:1 1
  6. pexpire {myLock}:UUID1:ThreadID1:rwlock_timeout:1 30000
  7. pexpire myLock 30000
复制代码
执行完加读锁逻辑后,Redis存在如下结构的数据。其实加读锁的核心在于构造一个递增序列,记录不同线程的读锁和同一个线程不同的重入锁。
 
field为类似于UUID1:ThreadID1的value值,是用来记录当前客户端线程重入锁次数的。key为类似于{myLock}:UUID1:ThreadID1:rwlock_timeout:1的String,是用来记录当前客户端线程第n个重入锁过期时间的。
 
假设将key为myLock称为父读锁,key为UUID1:ThreadID1称为子读锁。那么记录每一个子读锁的过期时间,是因为需要根据多个子读锁的过期时间更新父读锁的过期时间。
  1. //1.线程1第一次加读锁
  2. //Hash结构
  3. myLock: {
  4.     "mode": "read",
  5.     "UUID1:ThreadID1": 1
  6. }
  7. //String结构
  8. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
  9. //2.线程1第二次加读锁
  10. //Hash结构
  11. myLock: {
  12.     "mode": "read",
  13.     "UUID1:ThreadID1": 2
  14. }
  15. //String结构
  16. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
  17. {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
  18. //3.线程1第三次加读锁
  19. //Hash结构
  20. myLock: {
  21.     "mode": "read",
  22.     "UUID1:ThreadID1": 3
  23. }
  24. //String结构
  25. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
  26. {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
  27. {myLock}:UUID1:ThreadID1:rwlock_timeout:3 ==> 1
  28. //4.线程2第一次加读锁
  29. //Hash结构
  30. myLock: {
  31.     "mode": "read",
  32.     "UUID1:ThreadID1": 3,
  33.     "UUID2:ThreadID2": 1
  34. }
  35. //String结构
  36. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
  37. {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
  38. {myLock}:UUID1:ThreadID1:rwlock_timeout:3 ==> 1
  39. {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
  40. //5.线程2第二次加读锁
  41. //Hash结构
  42. myLock: {
  43.     "mode": "read",
  44.     "UUID1:ThreadID1": 3,
  45.     "UUID2:ThreadID2": 2
  46. }
  47. //String结构
  48. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
  49. {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
  50. {myLock}:UUID1:ThreadID1:rwlock_timeout:3 ==> 1
  51. {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
  52. {myLock}:UUID2:ThreadID2:rwlock_timeout:2 ==> 1
复制代码
(2)WathDog处理读锁过期时间的lua脚本逻辑
假设客户端A的线程(UUID1:ThreadID1)已经成功获取到一个读锁,此时会创建一个WatchDog定时调度任务,10秒后检查该读锁。执行流程如下:
  1. public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
  2.     ...
  3.     protected void scheduleExpirationRenewal(long threadId) {
  4.         ExpirationEntry entry = new ExpirationEntry();
  5.         ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
  6.         if (oldEntry != null) {
  7.             oldEntry.addThreadId(threadId);
  8.         } else {
  9.             entry.addThreadId(threadId);
  10.             try {
  11.                 //创建一个更新过期时间的定时调度任务
  12.                 renewExpiration();
  13.             } finally {
  14.                 if (Thread.currentThread().isInterrupted()) {
  15.                     cancelExpirationRenewal(threadId);
  16.                 }
  17.             }
  18.         }
  19.     }
  20.    
  21.     //更新过期时间
  22.     private void renewExpiration() {
  23.         ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  24.         if (ee == null) {
  25.             return;
  26.         }
  27.         //使用了Netty的定时任务机制:HashedWheelTimer + TimerTask + Timeout
  28.         //创建一个更新过期时间的定时调度任务,下面会调用MasterSlaveConnectionManager.newTimeout()方法
  29.         //即创建一个定时调度任务TimerTask交给HashedWheelTimer,10秒后执行
  30.         Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
  31.             @Override
  32.             public void run(Timeout timeout) throws Exception {
  33.                 ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  34.                 if (ent == null) {
  35.                     return;
  36.                 }
  37.                 Long threadId = ent.getFirstThreadId();
  38.                 if (threadId == null) {
  39.                     return;
  40.                 }
  41.                 //异步执行lua脚本去更新锁的过期时间
  42.                 //对于读写锁,接下来会执行RedissonReadLock.renewExpirationAsync()方法
  43.                 RFuture<Boolean> future = renewExpirationAsync(threadId);
  44.                 future.whenComplete((res, e) -> {
  45.                     if (e != null) {
  46.                         log.error("Can't update lock " + getRawName() + " expiration", e);
  47.                         EXPIRATION_RENEWAL_MAP.remove(getEntryName());
  48.                         return;
  49.                     }
  50.                     //res就是执行renewExpirationAsync()里的lua脚本的返回值
  51.                     if (res) {
  52.                         //重新调度自己
  53.                         renewExpiration();
  54.                     } else {
  55.                         //执行清理工作
  56.                         cancelExpirationRenewal(null);
  57.                     }
  58.                 });
  59.             }
  60.         }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
  61.         ee.setTimeout(task);
  62.     }
  63.    
  64.     protected void cancelExpirationRenewal(Long threadId) {
  65.         ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  66.         if (task == null) {
  67.             return;
  68.         }
  69.         if (threadId != null) {
  70.             task.removeThreadId(threadId);
  71.         }
  72.         if (threadId == null || task.hasNoThreads()) {
  73.             Timeout timeout = task.getTimeout();
  74.             if (timeout != null) {
  75.                 timeout.cancel();
  76.             }
  77.             EXPIRATION_RENEWAL_MAP.remove(getEntryName());
  78.         }
  79.     }
  80.     ...
  81. }
  82. public class RedissonReadLock extends RedissonLock implements RLock {
  83.     ...
  84.     @Override
  85.     protected RFuture<Boolean> renewExpirationAsync(long threadId) {
  86.         String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
  87.         String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
  88.         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  89.             //执行命令"hget myLock UUID1:ThreadID1",获取当前这个线程是否还持有这个读锁
  90.             "local counter = redis.call('hget', KEYS[1], ARGV[2]); " +
  91.             "if (counter ~= false) then " +
  92.                 //指定的线程还在持有锁,那么就执行"pexpire myLock 30000"刷新锁的过期时间
  93.                 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  94.                 "if (redis.call('hlen', KEYS[1]) > 1) then " +
  95.                     //获取key为myLock的Hash值的所有key
  96.                     "local keys = redis.call('hkeys', KEYS[1]); " +
  97.                     //遍历已被线程获取的所有重入和非重入的读锁
  98.                     "for n, key in ipairs(keys) do " +
  99.                         "counter = tonumber(redis.call('hget', KEYS[1], key)); " +
  100.                         //排除掉key为mode的Hash值
  101.                         "if type(counter) == 'number' then " +
  102.                             //递减拼接重入锁的key,刷新同一个线程的所有重入锁的过期时间
  103.                             "for i=counter, 1, -1 do " +
  104.                                 "redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " +
  105.                             "end; " +
  106.                         "end; " +
  107.                     "end; " +
  108.                 "end; " +
  109.                 "return 1; " +
  110.             "end; " +
  111.             "return 0;",
  112.             //KEYS[1] = myLock
  113.             //KEYS[2] = {myLock}
  114.             Arrays.<Object>asList(getRawName(), keyPrefix),
  115.             internalLockLeaseTime,//ARGV[1] = 30000毫秒
  116.             getLockName(threadId)//ARGV[2] = UUID1:ThreadID1
  117.         );
  118.     }
  119.     ...
  120. }
复制代码
一.参数说明
  1. KEYS[1] = myLock
  2. KEYS[2] = {myLock}
  3. ARGV[1] = 30000
  4. ARGV[2] = UUID1:ThreadID1
复制代码
二.执行lua脚本的处理逻辑
执行命令"hget myLock UUID1:ThreadID1",尝试获取一个Hash值,也就是获取指定的这个线程是否还持有这个读锁。如果指定的这个线程还在持有这个锁,那么这里返回的是1,于是就会执行"pexpire myLock 30000"刷新锁的过期时间。
 
接着执行命令"hlen myLock",判断key为锁名的Hash元素个数是否大于1。如果指定的这个线程还在持有这个锁,那么key为myLock的Hash值就至少有两个kv对。其中一个key是mode,一个key是UUID1:ThreadID1。所以这里的判断是成立的,于是遍历处理key为锁名的Hash值。
 
在遍历处理key为锁名的Hash值时,需要排除掉key为mode的Hash值。然后根据key为UUID + 线程ID的Hash值,通过递减拼接,进行循环遍历,把每一个不同线程的读锁或同一个线程不同的重入锁,都刷新过期时间。
 
三.总结
WatchDog在处理读锁时,如果指定的线程还持有读锁,那么就会:刷新读锁key的过期时间为30秒,根据重入读锁的次数进行遍历,对重入读锁对应的key的过期时间也刷新为30秒。
  1. //KEYS[1] = myLock
  2. //KEYS[2] = {myLock}
  3. "if (redis.call('hlen', KEYS[1]) > 1) then " +
  4.     "local keys = redis.call('hkeys', KEYS[1]); " +
  5.     //遍历处理key为锁名的Hash值
  6.     "for n, key in ipairs(keys) do " +
  7.         "counter = tonumber(redis.call('hget', KEYS[1], key)); " +
  8.         //排除掉key为mode的Hash值
  9.         "if type(counter) == 'number' then " +
  10.             "for i=counter, 1, -1 do " +
  11.                 //递减拼接,把不同线程的读锁或同一个线程不同的重入锁,都刷新过期时间
  12.                 "redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " +
  13.             "end; " +
  14.         "end; " +
  15.     "end; " +
  16. "end; " +
  17. //Hash结构
  18. myLock: {
  19.     "mode": "read",
  20.     "UUID1:ThreadID1": 3,
  21.     "UUID2:ThreadID2": 2
  22. }
  23. //String结构
  24. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
  25. {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
  26. {myLock}:UUID1:ThreadID1:rwlock_timeout:3 ==> 1
  27. {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
  28. {myLock}:UUID2:ThreadID2:rwlock_timeout:2 ==> 1
复制代码
 
3.写锁RedissonWriteLock的获取写锁逻辑
(1)获取写锁的执行流程
(2)获取写锁的lua脚本逻辑
 
(1)获取写锁的执行流程
假设客户端A的线程(UUID1:ThreadID1)作为第一个线程进来加写锁,执行流程如下:
  1. public class RedissonLock extends RedissonBaseLock {
  2.     ...
  3.     //不带参数的加锁
  4.     public void lock() {
  5.         ...
  6.         lock(-1, null, false);
  7.         ...
  8.     }
  9.    
  10.     //带参数的加锁
  11.     public void lock(long leaseTime, TimeUnit unit) {
  12.         ...
  13.         lock(leaseTime, unit, false);
  14.         ...
  15.     }
  16.    
  17.     private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
  18.         long threadId = Thread.currentThread().getId();
  19.         Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
  20.         //加锁成功
  21.         if (ttl == null) {
  22.             return;
  23.         }
  24.         //加锁失败
  25.         ...
  26.     }
  27.    
  28.     private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  29.         return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
  30.     }
  31.    
  32.     private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
  33.         RFuture<Long> ttlRemainingFuture;
  34.         if (leaseTime != -1) {
  35.             ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  36.         } else {
  37.             //非公平锁,接下来调用的是RedissonLock.tryLockInnerAsync()方法
  38.             //公平锁,接下来调用的是RedissonFairLock.tryLockInnerAsync()方法
  39.             //读写锁中的读锁,接下来调用RedissonReadLock.tryLockInnerAsync()方法
  40.             //读写锁中的写锁,接下来调用RedissonWriteLock.tryLockInnerAsync()方法
  41.             ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  42.         }
  43.         //对RFuture<Long>类型的ttlRemainingFuture添加回调监听
  44.         CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
  45.             //tryLockInnerAsync()里的加锁lua脚本异步执行完毕,会回调如下方法逻辑:
  46.             //加锁成功
  47.             if (ttlRemaining == null) {
  48.                 if (leaseTime != -1) {
  49.                     //如果传入的leaseTime不是-1,也就是指定锁的过期时间,那么就不创建定时调度任务
  50.                     internalLockLeaseTime = unit.toMillis(leaseTime);
  51.                 } else {
  52.                     //创建定时调度任务
  53.                     scheduleExpirationRenewal(threadId);
  54.                 }
  55.             }
  56.             return ttlRemaining;
  57.         });
  58.         return new CompletableFutureWrapper<>(f);
  59.     }
  60.     ...
  61. }
  62. public class RedissonWriteLock extends RedissonLock implements RLock {
  63.     ...
  64.     @Override
  65.     <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  66.         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
  67.             //执行命令"hget myLock mode",尝试获取一个Hash值mode
  68.             "local mode = redis.call('hget', KEYS[1], 'mode'); " +
  69.             //获取不到,说明没有加读锁或者写锁
  70.             "if (mode == false) then " +
  71.                 "redis.call('hset', KEYS[1], 'mode', 'write'); " +
  72.                 "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  73.                 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  74.                 "return nil; " +
  75.             "end; " +
  76.             //如果加过锁,那么就要看是不是写锁 + 写锁是不是自己加过的(即重入写锁)
  77.             "if (mode == 'write') then " +
  78.                 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  79.                     //重入写锁
  80.                     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  81.                     "local currentExpire = redis.call('pttl', KEYS[1]); " +
  82.                     "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
  83.                     "return nil; " +
  84.                 "end; " +
  85.             "end;" +
  86.             //执行命令"pttl myLock",返回myLock的剩余过期时间
  87.             "return redis.call('pttl', KEYS[1]);",
  88.             Arrays.<Object>asList(getRawName()),//KEYS[1] = myLock
  89.             unit.toMillis(leaseTime),//ARGV[1] = 30000
  90.             getLockName(threadId)//ARGV[2] = UUID1:ThreadID1:write
  91.         );
  92.     }
  93.     ...
  94. }
复制代码
(2)获取写锁的lua脚本逻辑
一.参数说明
  1. KEYS[1] = myLock
  2. ARGV[1] = 30000
  3. ARGV[2] = UUID1:ThreadID1:write
复制代码
二.执行分析
首先执行命令"hget myLock mode",尝试获取一个Hash值mode,也就是从key为myLock的Hash值里获取一个field为mode的value值。但是此时一开始都还没有加锁,所以mode肯定是false。于是就执行如下加读锁的逻辑:设置两个Hash值。
  1. hset myLock mode write
  2. hset myLock UUID1:ThreadID1:write 1
  3. pexpire myLock 30000
复制代码
完成加锁操作后,Redis中存在如下数据:
  1. //Hash结构
  2. myLock: {
  3.     "mode": "write",
  4.     "UUID1:ThreadID1:write": 1
  5. }
复制代码
 
4.读锁RedissonReadLock的读读不互斥逻辑
(1)不同客户端线程读锁与读锁不互斥说明
(2)客户端A先加读锁的Redis命令执行过程和结果
(3)客户端B后加读锁的Redis命令执行过程和结果
 
(1)不同客户端线程读锁与读锁不互斥说明
假设客户端A(UUID1:ThreadID1)对myLock这个锁先加了一个读锁,客户端B(UUID2:ThreadID2)也要对myLock这个锁加一个读锁,那么此时这两个读锁是不会互斥的,客户端B可以加锁成功。
  1. public class RedissonReadLock extends RedissonLock implements RLock {
  2.     ...
  3.     @Override
  4.     <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  5.         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
  6.             //执行命令"hget myLock mode",尝试获取一个Hash值mode
  7.             "local mode = redis.call('hget', KEYS[1], 'mode'); " +
  8.             //mode为false则执行加读锁的逻辑
  9.             "if (mode == false) then " +
  10.                 //hset myLock mode read
  11.                 "redis.call('hset', KEYS[1], 'mode', 'read'); " +
  12.                 //hset myLock UUID1:ThreadID1 1
  13.                 "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  14.                 //set {myLock}:UUID1:ThreadID1:rwlock_timeout:1 1
  15.                 "redis.call('set', KEYS[2] .. ':1', 1); " +
  16.                 //pexpire {myLock}:UUID1:ThreadID1:rwlock_timeout:1 30000
  17.                 "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
  18.                 //pexpire myLock 30000
  19.                 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  20.                 "return nil; " +
  21.             "end; " +
  22.             //如果已经有线程加了读锁 或者 有线程加了写锁且是自己加的写锁
  23.             //所以一个线程如果加了写锁,它是可以重入自己的写锁和自己的读锁的
  24.             "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
  25.                 //hincrby myLock UUID2:ThreadID2 1
  26.                 //ind表示重入次数,线程可以重入自己的读锁和写锁,线程后加的读锁可以重入线程自己的读锁或写锁
  27.                 "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  28.                 //key = {myLock}:UUID2:ThreadID2:rwlock_timeout:1
  29.                 "local key = KEYS[2] .. ':' .. ind;" +
  30.                 //set {myLock}:UUID2:ThreadID2:rwlock_timeout:1 1
  31.                 "redis.call('set', key, 1); " +
  32.                 //pexpire myLock 30000
  33.                 "redis.call('pexpire', key, ARGV[1]); " +
  34.                 "local remainTime = redis.call('pttl', KEYS[1]); " +
  35.                 //pexpire {myLock}:UUID2:ThreadID2:rwlock_timeout:1 30000
  36.                 "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
  37.                 "return nil; " +
  38.             "end;" +
  39.             //执行命令"pttl myLock",返回myLock的剩余过期时间
  40.             "return redis.call('pttl', KEYS[1]);",
  41.             //KEYS[1] = myLock
  42.             //KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout 或 KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeout
  43.             Arrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)),
  44.             unit.toMillis(leaseTime),//ARGV[1] = 30000
  45.             getLockName(threadId),//ARGV[2] = UUID1:ThreadID1 或 ARGV[2] = UUID2:ThreadID2
  46.             getWriteLockName(threadId)//ARGV[3] = UUID1:ThreadID1:write 或 ARGV[3] = UUID2:ThreadID2:write
  47.         );
  48.     }
  49.     ...
  50. }
复制代码
(2)客户端A先加读锁的Redis命令执行过程和结果
参数说明:
  1. KEYS[1] = myLock
  2. KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout
  3. ARGV[1] = 30000
  4. ARGV[2] = UUID1:ThreadID1
  5. ARGV[3] = UUID1:ThreadID1:write
复制代码
Redis命令的执行过程:
  1. hset myLock mode read
  2. hset myLock UUID1:ThreadID1 1
  3. set {myLock}:UUID1:ThreadID1:rwlock_timeout:1 1
  4. pexpire {myLock}:UUID1:ThreadID1:rwlock_timeout:1 30000
  5. pexpire myLock 30000
复制代码
Redis执行结果:
  1. //Hash结构
  2. myLock: {
  3.     "mode": "read",
  4.     "UUID1:ThreadID1": 1
  5. }
  6. //String结构
  7. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
复制代码
(3)客户端B后加读锁的Redis命令执行过程和结果
参数说明:
  1. KEYS[1] = myLock
  2. KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeout
  3. ARGV[1] = 30000
  4. ARGV[2] = UUID2:ThreadID2
  5. ARGV[3] = UUID2:ThreadID2:write
复制代码
Redis命令的执行过程:
  1. hget myLock mode ===> 获取到mode=read,表示此时已经有线程加了读锁
  2. hincrby myLock UUID2:ThreadID2 1
  3. set {myLock}:UUID2:ThreadID2:rwlock_timeout:1 1
  4. pexpire myLock 30000
  5. pexpire {myLock}:UUID2:ThreadID2:rwlock_timeout:1 30000
复制代码
Redis执行结果:
  1. //Hash结构
  2. myLock: {
  3.     "mode": "read",
  4.     "UUID1:ThreadID1": 1,
  5.     "UUID2:ThreadID2": 1
  6. }
  7. //String结构
  8. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
  9. {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
复制代码
需要注意的是:多个客户端同时加读锁,读锁与读锁不互斥。会不断在key为锁名的Hash里,自增field为客户端UUID + 线程ID的value值。每个客户端成功加的一次读锁或写锁,都会维持一个WatchDog,不断刷新myLock的生存时间 + 刷新该客户端这次加的锁的过期时间。
 
加读锁的lua脚本中,ind表示重入次数。线程可重入自己的读锁和写锁。也就是说,线程后加的读锁可以重入线程自己先加的读锁或写锁。
 
5.RedissonReadLock和RedissonWriteLock的读写互斥逻辑
(1)不同客户端线程先读锁后写锁如何互斥
(2)不同客户端线程先写锁后读锁如何互斥
 
(1)不同客户端线程先读锁后写锁如何互斥
首先,客户端A(UUID1:ThreadID1)和客户端B(UUID2:ThreadID2)先加读锁,此时Redis中存在如下的数据:
  1. //Hash结构
  2. myLock: {
  3.     "mode": "read",
  4.     "UUID1:ThreadID1": 1,
  5.     "UUID2:ThreadID2": 1
  6. }
  7. //String结构
  8. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
  9. {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
复制代码
接着,客户端C(UUID3:ThreadID3)来加写锁。
  1. public class RedissonWriteLock extends RedissonLock implements RLock {
  2.     ...
  3.     @Override
  4.     <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  5.         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
  6.             //执行命令"hget myLock mode",尝试获取一个Hash值mode
  7.             "local mode = redis.call('hget', KEYS[1], 'mode'); " +
  8.             //此时发现mode=read,说明已有线程加了锁了
  9.             "if (mode == false) then " +
  10.                 "redis.call('hset', KEYS[1], 'mode', 'write'); " +
  11.                 "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  12.                 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  13.                 "return nil; " +
  14.             "end; " +
  15.             //如果加过锁,那么就要看是不是写锁 + 写锁是不是自己加过的(即重入写锁)
  16.             "if (mode == 'write') then " +
  17.                 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  18.                     //重入写锁
  19.                     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  20.                     "local currentExpire = redis.call('pttl', KEYS[1]); " +
  21.                     "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
  22.                     "return nil; " +
  23.                 "end; " +
  24.             "end;" +
  25.             //执行命令"pttl myLock",返回myLock的剩余过期时间
  26.             "return redis.call('pttl', KEYS[1]);",
  27.             Arrays.<Object>asList(getRawName()),//KEYS[1] = myLock
  28.             unit.toMillis(leaseTime),//ARGV[1] = 30000
  29.             getLockName(threadId)//ARGV[2] = UUID3:ThreadID3:write
  30.         );
  31.     }
  32.     ...
  33. }
复制代码
客户端C(UUID3:ThreadID3)加写锁时的参数:
  1. KEYS[1] = myLock
  2. ARGV[1] = 30000
  3. ARGV[2] = UUID3:ThreadID3:write
复制代码
客户端C(UUID3:ThreadID3)加写锁时:首先执行命令"hget myLock mode"发现mode = read,说明已有线程加了锁了。由于已加的锁不是当前线程加的写锁,而是其他线程加的读锁。所以此时会执行命令"pttl myLock",返回myLock的剩余过期时间。这会导致客户端C加锁失败,会在while循环中阻塞和重试,从而实现先读锁后写锁的互斥。
 
(2)不同客户端线程先写锁后读锁如何互斥
假设客户端A(UUID1:ThreadID1)先加了一个写锁,此时Redis中存在如下的数据:
  1. //Hash结构
  2. myLock: {
  3.     "mode": "write",
  4.     "UUID1:ThreadID1:write": 1
  5. }
复制代码
然后客户端B(UUID2:ThreadID2)再来加读锁。
  1. public class RedissonReadLock extends RedissonLock implements RLock {
  2.     ...
  3.     @Override
  4.     <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  5.         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
  6.             //执行命令"hget myLock mode",尝试获取一个Hash值mode
  7.             "local mode = redis.call('hget', KEYS[1], 'mode'); " +
  8.             //发现mode=write,说明已有线程加了锁了
  9.             "if (mode == false) then " +
  10.                 "redis.call('hset', KEYS[1], 'mode', 'read'); " +
  11.                 "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  12.                 "redis.call('set', KEYS[2] .. ':1', 1); " +
  13.                 "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
  14.                 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  15.                 "return nil; " +
  16.             "end; " +
  17.             //如果已经有线程加了读锁 或者 有线程加了写锁且是自己加的写锁
  18.             //所以一个线程如果加了写锁,它是可以重入自己的写锁和自己的读锁的
  19.             "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
  20.                 //hincrby myLock UUID2:ThreadID2 1
  21.                 //ind表示重入次数,线程可以重入自己的读锁和写锁,线程后加的读锁可以重入线程自己的读锁或写锁
  22.                 "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  23.                 //key = {myLock}:UUID2:ThreadID2:rwlock_timeout:1
  24.                 "local key = KEYS[2] .. ':' .. ind;" +
  25.                 //set {myLock}:UUID2:ThreadID2:rwlock_timeout:1 1
  26.                 "redis.call('set', key, 1); " +
  27.                 //pexpire myLock 30000
  28.                 "redis.call('pexpire', key, ARGV[1]); " +
  29.                 "local remainTime = redis.call('pttl', KEYS[1]); " +
  30.                 //pexpire {myLock}:UUID2:ThreadID2:rwlock_timeout:1 30000
  31.                 "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
  32.                 "return nil; " +
  33.             "end;" +
  34.             //执行命令"pttl myLock",返回myLock的剩余过期时间
  35.             "return redis.call('pttl', KEYS[1]);",
  36.             //KEYS[1] = myLock
  37.             //KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeout
  38.             Arrays.<Object>asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)),
  39.             unit.toMillis(leaseTime),//ARGV[1] = 30000
  40.             getLockName(threadId),//ARGV[2] = UUID2:ThreadID2
  41.             getWriteLockName(threadId)//ARGV[3] = UUID2:ThreadID2:write
  42.         );
  43.     }
  44.     ...
  45. }
复制代码
客户端B(UUID2:ThreadID2)加读锁时的参数:
  1. KEYS[1] = myLock
  2. KEYS[2] = {myLock}:UUID2:ThreadID2:rwlock_timeout
  3. ARGV[1] = 30000
  4. ARGV[2] = UUID2:ThreadID2
  5. ARGV[3] = UUID2:ThreadID2:write
复制代码
客户端B(UUID2:ThreadID2)加读锁时:首先执行命令"hget myLock mode"发现mode = write,说明已有线程加了锁了。接下来执行命令"hexists myLock UUID2:ThreadID2:write",发现不存在。也就是说,如果客户端B之前加过写锁,此时B加读锁才能通过判断。但是,之前加写锁的是客户端A,所以这里的判断条件不会通过。于是返回"pttl myLock",导致加读锁失败,会在while循环中阻塞和重试,从而实现先写锁后读锁的互斥。
 
(3)总结
如果客户端线程A之前先加了写锁,此时该线程再加读锁,可以成功。
 
如果客户端线程A之前先加了写锁,此时该线程再加写锁,可以成功。
 
如果客户端线程A之前先加了读锁,此时该线程再加读锁,可以成功。
 
如果客户端线程A之前先加了读锁,此时该线程再加写锁,不可以成功。
 
所以写锁可以被自己的写锁重入,也可以被自己的读锁重入。但是读锁可以被任意的读锁重入,不可以被任意的写锁重入。
 
6.写锁RedissonWriteLock的写写互斥逻辑
(1)不同客户端线程先加写锁的情况
(2)不同客户端线程再加写锁的情况
 
(1)不同客户端线程先加写锁的情况
假设客户端A(UUID1:ThreadID1)先加写锁:
  1. //传入参数KEYS[1] = myLock
  2. ARGV[1] = 30000
  3. ARGV[2] = UUID1:ThreadID1:write//执行结果myLock: {    "mode": "write",    "UUID1:ThreadID1:write": 1}
复制代码
(2)不同客户端线程再加写锁的情况
假设客户端B(UUID2:ThreadID2)再加写锁:首先执行命令"hget myLock mode"发现mode = write,说明已有线程加了写锁。然后继续执行命令"hexists myLock UUID2:ThreadID2:write",判断已加的写锁是否是当前客户端B(UUID2:ThreadID2)加的。由于已加的写锁是客户端A(UUID1:ThreadID1)加的,所以判断不通过。于是执行"pttl myLock"返回myLock的剩余过期时间。这样会导致客户端B加写锁失败,于是会在while循环阻塞和重试加写锁,从而实现不同客户端线程的写锁和写锁的互斥。
  1. public class RedissonWriteLock extends RedissonLock implements RLock {
  2.     ...
  3.     @Override
  4.     <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  5.         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
  6.             //执行命令"hget myLock mode",尝试获取一个Hash值mode
  7.             "local mode = redis.call('hget', KEYS[1], 'mode'); " +
  8.             //获取不到,说明没有加读锁或者写锁
  9.             "if (mode == false) then " +
  10.                 "redis.call('hset', KEYS[1], 'mode', 'write'); " +
  11.                 "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  12.                 "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  13.                 "return nil; " +
  14.             "end; " +
  15.             //如果加过锁,那么就要看是不是写锁+写锁是不是自己加过的(即重入写锁)
  16.             "if (mode == 'write') then " +
  17.                 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  18.                     //重入写锁
  19.                     "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  20.                     "local currentExpire = redis.call('pttl', KEYS[1]); " +
  21.                     "redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
  22.                     "return nil; " +
  23.                 "end; " +
  24.             "end;" +
  25.             //执行命令"pttl myLock",返回myLock的剩余过期时间
  26.             "return redis.call('pttl', KEYS[1]);",
  27.             Arrays.<Object>asList(getRawName()),//KEYS[1] = myLock
  28.             unit.toMillis(leaseTime),//ARGV[1] = 30000
  29.             getLockName(threadId)//ARGV[2] = UUID1:ThreadID1:write 或 ARGV[2] = UUID2:ThreadID2:write
  30.         );
  31.     }
  32.     ...
  33. }
复制代码
 
7.写锁RedissonWriteLock的可重入逻辑
(1)同一个客户端线程先加读锁再加读锁
(2)同一个客户端线程先加读锁再加写锁
(3)同一个客户端线程先加写锁再加读锁
(4)同一个客户端线程先加写锁再加写锁
 
前面分析了不同客户端线程的四种加锁情况:
情况一:先加读锁再加读锁,不互斥
情况二:先加读锁再加写锁,互斥
情况三:先加写锁再加读锁,互斥
情况四:先加写锁再加写锁,互斥
 
接下来分析同一个客户端线程的四种加锁情况:
情况一:先加读锁再加读锁,不互斥
情况二:先加读锁再加写锁,互斥
情况三:先加写锁再加读锁,不互斥
情况四:先加写锁再加写锁,不互斥
 
可以这样理解:写锁优先级高,读锁优先级低。同一个线程如果先加了优先级高的写锁,那就可以继续加优先级低的读锁。同一个线程如果先加了优先级低的读锁,那就不可以再加优先级高的写锁。一般锁可以降级,不可以升级。
 
(1)同一个客户端线程先加读锁再加读锁
客户端A(UUID1:ThreadID1)先加了一次读锁时:
  1. //传入参数KEYS[1] = myLockKEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout ARGV[1] = 30000ARGV[2] = UUID1:ThreadID1ARGV[3] = UUID1:ThreadID1:write//执行结果//Hash结构
  2. myLock: {
  3.     "mode": "read",
  4.     "UUID1:ThreadID1": 1
  5. }
  6. //String结构
  7. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
复制代码
客户端A(UUID1:ThreadID1)再加一次读锁时,判断通过可以加成功。
  1. //执行命令
  2. hget myLock mode,发现mode=read,表示已经加过读锁
  3. hincrby myLock UUID1:ThreadID1 1
  4. set {myLock}:UUID1:ThreadID1:rwlock_timeout:2 1
  5. pexpire myLock 30000
  6. pexpire {myLock}:UUID1:ThreadID1:rwlock_timeout:2 30000
  7. //执行结果
  8. //Hash结构
  9. myLock: {
  10.     "mode": "read",
  11.     "UUID1:ThreadID1": 2
  12. }
  13. //String结构
  14. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
  15. {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
复制代码
(2)同一个客户端线程先加读锁再加写锁
客户端A(UUID1:ThreadID1)先加了一次读锁时:
  1. //传入参数KEYS[1] = myLockKEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout ARGV[1] = 30000ARGV[2] = UUID1:ThreadID1ARGV[3] = UUID1:ThreadID1:write//执行结果//Hash结构
  2. myLock: {
  3.     "mode": "read",
  4.     "UUID1:ThreadID1": 1
  5. }
  6. //String结构
  7. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
复制代码
客户端A(UUID1:ThreadID1)再加一次写锁时,判断不通过,不可以加成功。
  1. //传入参数KEYS[1] = myLock
  2. ARGV[1] = 30000
  3. ARGV[2] = UUID1:ThreadID1:write
复制代码
执行命令"hget myLock mode",发现mode = read,不符合加写锁条件。所以同一个客户端线程,先加读锁再加写锁,是会互斥的。
 
(3)同一个客户端线程先加写锁再加读锁
客户端A(UUID1:ThreadID1)先加了一次写锁时:
  1. //传入参数KEYS[1] = myLock
  2. ARGV[1] = 30000
  3. ARGV[2] = UUID1:ThreadID1:write//执行结果myLock: {    "mode": "write",    "UUID1:ThreadID1:write": 1}
复制代码
客户端A(UUID1:ThreadID1)再加一次读锁时,判断通过,可以加成功。
  1. //传入参数
  2. KEYS[1] = myLock
  3. KEYS[2] = {myLock}:UUID1:ThreadID1:rwlock_timeout
  4. ARGV[1] = 30000
  5. ARGV[2] = UUID1:ThreadID1
  6. ARGV[3] = UUID1:ThreadID1:write
  7. //执行命令
  8. hget myLock mode,发现mode=write,表示已经加过写锁
  9. hexists myLock UUID1:ThreadID1:write,判断写锁是自己加的,条件成立
  10. hincrby myLock UUID1:ThreadID1 1,表示此时加了一个读锁
  11. set {myLock}:UUID1:ThreadID1:rwlock_timeout:1 1
  12. pexpire myLock 30000
  13. pexpire {myLock}:UUID1:ThreadID11:rwlock_timeout:1 30000
  14. //执行结果
  15. //Hash结构
  16. myLock: {
  17.     "mode": "write",
  18.     "UUID1:ThreadID1:write": 1,
  19.     "UUID1:ThreadID1": 1
  20. }
  21. //String结构
  22. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
复制代码
可见:如果是同一个客户端线程,先加写锁再加读锁,是可以加成功的。所以默认在线程持有写锁的期间,同样的线程可以多次加读锁。
 
(4)同一个客户端线程先加写锁再加写锁
客户端A(UUID1:ThreadID1)先加了一次写锁时:
  1. //传入参数KEYS[1] = myLock
  2. ARGV[1] = 30000
  3. ARGV[2] = UUID1:ThreadID1:write//执行结果myLock: {    "mode": "write",    "UUID1:ThreadID1:write": 1}
复制代码
客户端A(UUID1:ThreadID1)再加一次写锁时,判断通过,可以加成功。
  1. //执行命令
  2. hexists myLock UUID1:ThreadID1:write,判断是否是自己加的写锁
  3. hincrby myLock UUID1:ThreadID1:write 1
  4. pexpire myLock 50000
  5. //执行结果
  6. myLock: {
  7.     "mode": "write",
  8.     "UUID1:ThreadID1:write": 2
  9. }
复制代码
可见:读写锁也是一种可重入锁。同一个客户端线程多次加写锁,是可以重入加锁的。先加的写锁是可以被读锁重入,先加的读锁则不可以被写锁重入。
 
8.读锁RedissonReadLock的释放读锁逻辑
(1)RedissonReadLock的释放读锁的流程
(2)释放读锁前主要三种情况
(3)RedissonReadLock的释放读锁的lua脚本
(4)对合并的情况一和情况二执行lua脚本
(5)对情况三执行lua脚本
 
(1)RedissonReadLock的释放读锁的流程
释放读锁调用的是RedissonLock的unlock()方法。
 
在RedissonLock的unlock()方法中,会执行get(unlockAsync())代码。也就是首先调用RedissonBaseLock的unlockAsync()方法,然后调用RedissonObject的get()方法。
 
其中unlockAsync()方法是异步化执行的方法,释放锁的操作就是异步执行的。而RedisObject的get()方法会通过RFuture同步等待获取异步执行的结果,可以将get(unlockAsync())理解为异步转同步。
 
在RedissonBaseLock的unlockAsync()方法中:可重入锁会调用RedissonLock.unlockInnerAsync()方法进行异步释放锁,读锁则会调用RedissonReadLock的unlockInnerAsync()方法进行异步释放锁,然后当完成释放锁的处理后,再通过异步去取消定时调度任务。
  1. public class Application {
  2.     public static void main(String[] args) throws Exception {
  3.         Config config = new Config();
  4.         config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");
  5.         //读写锁
  6.         RedissonClient redisson = Redisson.create(config);
  7.         RReadWriteLock rwlock = redisson.getReadWriteLock("myLock");
  8.         rwlock.readLock().lock();//获取读锁
  9.         rwlock.readLock().unlock();//释放读锁
  10.         rwlock.writeLock().lock();//获取写锁
  11.         rwlock.writeLock().unlock();//释放写锁
  12.         ...
  13.     }
  14. }
  15. public class RedissonLock extends RedissonBaseLock {
  16.     ...
  17.     @Override
  18.     public void unlock() {
  19.         ...
  20.         //异步转同步
  21.         //首先调用的是RedissonBaseLock的unlockAsync()方法
  22.         //然后调用的是RedissonObject的get()方法
  23.         get(unlockAsync(Thread.currentThread().getId()));
  24.         ...
  25.     }
  26.     ...
  27. }
  28. public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
  29.     ...
  30.     @Override
  31.     public RFuture<Void> unlockAsync(long threadId) {
  32.         //异步执行释放锁的lua脚本
  33.         RFuture<Boolean> future = unlockInnerAsync(threadId);
  34.         CompletionStage<Void> f = future.handle((opStatus, e) -> {
  35.             //取消定时调度任务
  36.             cancelExpirationRenewal(threadId);
  37.             if (e != null) {
  38.                 throw new CompletionException(e);
  39.             }
  40.             if (opStatus == null) {
  41.                 IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId);
  42.                 throw new CompletionException(cause);
  43.             }
  44.             return null;
  45.         });
  46.         return new CompletableFutureWrapper<>(f);
  47.     }
  48.     protected abstract RFuture<Boolean> unlockInnerAsync(long threadId);
  49.     ...
  50. }
  51. public class RedissonReadLock extends RedissonLock implements RLock {
  52.     ...
  53.     @Override
  54.     protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  55.         String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
  56.         String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
  57.         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  58.             "...",
  59.             Arrays.<Object>asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix),
  60.             LockPubSub.UNLOCK_MESSAGE,
  61.             getLockName(threadId)
  62.         );
  63.     }
  64.     ...
  65. }
复制代码
(2)释放读锁前主要三种情况
情况一:不同客户端线程加了读锁
  1. //Hash结构
  2. myLock: {
  3.     "mode": "read",
  4.     "UUID1:ThreadID1": 1,
  5.     "UUID2:ThreadID2": 1,
  6. }
  7. //String结构
  8. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
  9. {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
复制代码
情况二:同一个客户端线程多次重入加读锁
  1. //Hash结构
  2. myLock: {
  3.     "mode": "read",
  4.     "UUID1:ThreadID1": 2
  5. }
  6. //String结构
  7. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
  8. {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
复制代码
情况一可以和情况二进行合并:
  1. //Hash结构
  2. myLock: {
  3.     "mode": "read",
  4.     "UUID1:ThreadID1": 2,
  5.     "UUID2:ThreadID2": 1,
  6. }
  7. //String结构
  8. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
  9. {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
  10. {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
复制代码
情况三:同一个客户端线程先加写锁再加读锁
  1. //Hash结构
  2. myLock: {
  3.     "mode": "write",
  4.     "UUID1:ThreadID1:write": 1,
  5.     "UUID1:ThreadID1": 1
  6. }
  7. //String结构
  8. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
复制代码
(3)RedissonReadLock的释放读锁的lua脚本
  1. public class RedissonReadLock extends RedissonLock implements RLock {
  2.     ...
  3.     @Override
  4.     protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  5.         String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
  6.         String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
  7.         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  8.             //执行命令"hget myLock mode"
  9.             "local mode = redis.call('hget', KEYS[1], 'mode'); " +
  10.             //如果mode为false就发布一个消息
  11.             "if (mode == false) then " +
  12.                 "redis.call('publish', KEYS[2], ARGV[1]); " +
  13.                 "return 1; " +
  14.             "end; " +
  15.             //执行命令"hexists myLock UUID1:ThreadIdD1",判断当前线程对应的Hash值是否存在
  16.             "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
  17.             "if (lockExists == 0) then " +
  18.                 "return nil;" +
  19.             "end; " +
  20.             //执行命令"hincrby myLock UUID1:ThreadID1 -1",递减当前线程对应的Hash值               
  21.             "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
  22.             "if (counter == 0) then " +
  23.                 "redis.call('hdel', KEYS[1], ARGV[2]); " +
  24.             "end;" +
  25.             //例如执行"del {myLock}:UUID1:ThreadId1:rwlock_timeout:2"
  26.             //删除当前客户端线程UUID1:ThreadId1的一个重入读锁;
  27.             "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
  28.             //执行命令"hlen myLock > 1",判断Hash里的元素是否超过1个
  29.             "if (redis.call('hlen', KEYS[1]) > 1) then " +
  30.                 "local maxRemainTime = -3; " +
  31.                 //获取key为锁名的Hash值的所有key
  32.                 "local keys = redis.call('hkeys', KEYS[1]); " +
  33.                 //遍历这些key,获取这些重入和非重入的读锁的最大剩余过期时间
  34.                 "for n, key in ipairs(keys) do " +
  35.                     "counter = tonumber(redis.call('hget', KEYS[1], key)); " +
  36.                     //把key为mode的kv对排除
  37.                     "if type(counter) == 'number' then " +
  38.                         //通过递减拼接重入锁的key
  39.                         "for i=counter, 1, -1 do " +
  40.                             "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
  41.                             "maxRemainTime = math.max(remainTime, maxRemainTime);" +
  42.                         "end; " +
  43.                     "end; " +
  44.                 "end; " +
  45.                 //找出所有重入的和非重入的读锁的最大剩余过期时间后,就重置锁的过期时间为该时间
  46.                 "if maxRemainTime > 0 then " +
  47.                     "redis.call('pexpire', KEYS[1], maxRemainTime); " +
  48.                     "return 0; " +
  49.                 "end;" +
  50.                     
  51.                 "if mode == 'write' then " +
  52.                     "return 0;" +
  53.                 "end; " +
  54.             "end; " +
  55.             //删除锁
  56.             "redis.call('del', KEYS[1]); " +
  57.             //发布一个事件
  58.             "redis.call('publish', KEYS[2], ARGV[1]); " +
  59.             "return 1; ",
  60.             //KEYS[1] = myLock,表示锁的名字
  61.             //KEYS[2] = redisson_rwlock:{myLock},用于Redis的发布订阅用
  62.             //KEYS[3] = {myLock}:UUID1:ThreadID1:rwlock_timeout
  63.             //KEYS[4] = {myLock}
  64.             Arrays.<Object>asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix),
  65.             LockPubSub.UNLOCK_MESSAGE,//ARGV[1] = 0,表示发布事件类型
  66.             getLockName(threadId)//ARGV[2] = UUID1:ThreadID1,表示锁里面的该客户端线程代表的key
  67.         );
  68.     }
  69.     ...
  70. }
复制代码
参数说明:
  1. KEYS[1] = myLock,表示锁的名字
  2. KEYS[2] = redisson_rwlock:{myLock},用于Redis的发布订阅用
  3. KEYS[3] = {myLock}:UUID1:ThreadID1:rwlock_timeout
  4. KEYS[4] = {myLock}
  5. ARGV[1] = 0,表示发布事件类型
  6. ARGV[2] = UUID1:ThreadID1,表示锁里面的该客户端线程代表的key
复制代码
(4)对合并的情况一和情况二执行lua脚本
一.客户端A(UUID1:ThreadID1)先释放一次读锁
二.客户端A(UUID1:ThreadID1)再释放一次读锁
三.客户端B(UUID2:ThreadID2)再释放一次读锁
  1. //Hash结构
  2. myLock: {
  3.     "mode": "read",
  4.     "UUID1:ThreadID1": 2,
  5.     "UUID2:ThreadID2": 1,
  6. }
  7. //String结构
  8. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
  9. {myLock}:UUID1:ThreadID1:rwlock_timeout:2 ==> 1
  10. {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
复制代码
一.客户端A(UUID1:ThreadID1)先释放一次读锁
首先执行命令"hget myLock mode",发现mode = read。然后执行命令"hexists myLock UUID1:ThreadIdD1",发现肯定是存在的,因为这个客户端线程UUID1:ThreadIdD1加过读锁。
 
接着执行命令"hincrby myLock UUID1:ThreadID1 -1",将这个客户端线程对应的加读锁次数递减1,counter由2变成1。当counter大于1,说明还有线程持有着这个读锁。于是接着执行"del {myLock}:UUID1:ThreadId1:rwlock_timeout:2",也就是删除用来记录当前客户端线程第2个重入锁过期时间的key。
 
此时myLock锁的数据变成如下:
  1. //Hash结构
  2. myLock: {
  3.     "mode": "read",
  4.     "UUID1:ThreadID1": 1,
  5.     "UUID2:ThreadID2": 1,
  6. }
  7. //String结构
  8. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
  9. {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
复制代码
于是接着执行命令"hlen myLock",判断Hash里的元素是否超过1个。如果超过1,那么就遍历已被线程获取的所有重入和非重入的读锁,即遍历所有类似"{myLock}:UUID2:ThreadID2:rwlock_timeout:1"的key。
 
然后接着执行命令"pttl {myLock}:UUID1:ThreadID1:rwlock_timeout:1"。即获取每一个重入读锁和非重入读锁的剩余过期时间,并找出其中最大的。执行"pexpire myLock"重置读锁的过期时间,为最大的剩余过期时间。
 
二.客户端A(UUID1:ThreadID1)再释放一次读锁
首先执行命令"hincrby myLock UUID1:ThreadID1 -1",将这个客户端线程对应的加读锁次数递减1,counter由1变成0。当counter=0时,就执行命令"hdel myLock UUID1:ThreadID1",即删除用来记录当前客户端线程重入锁次数的key。
 
然后接着执行命令"del {myLock}:UUID1:ThreadID1:rwlock_timeout:1",即删除用来记录当前客户端线程第1个重入锁过期时间的key。最后获取每个重入读锁和非重入读锁的剩余过期时间,并找出其中最大的。执行"pexpire myLock"重置读锁的过期时间,为最大的剩余过期时间。
 
此时myLock锁的数据变成如下:
  1. //Hash结构
  2. myLock: {
  3.     "mode": "read",
  4.     "UUID2:ThreadID2": 1,
  5. }
  6. //String结构
  7. {myLock}:UUID2:ThreadID2:rwlock_timeout:1 ==> 1
复制代码
三.客户端B(UUID2:ThreadID2)再释放一次读锁
首先执行命令"hincrby myLock UUID2:ThreadID2 -1",将这个客户端线程对应的加读锁次数递减1,counter由1变成0。然后执行命令"hdel myLock UUID2:ThreadID2",即删除用来记录当前客户端线程重入锁次数的key。接着执行命令"del {myLock}:UUID1:ThreadID1:rwlock_timeout:1",即删除用来记录当前客户端线程第1个重入锁过期时间的key。
 
此时myLock锁的数据变成如下:
  1. //Hash结构
  2. myLock: {
  3.     "mode": "read"
  4. }
复制代码
此时继续执行命令"hlen myLock",发现为1,判断不通过,于是执行"del myLock"。也就是当没有线程再持有这个读锁时,就会彻底删除这个读锁,然后发布一个事件出去。
 
(5)对情况三执行lua脚本
这种情况是:同一个客户端线程先加写锁再加读锁。此时myLock锁的数据如下:
  1. //Hash结构
  2. myLock: {
  3.     "mode": "write",
  4.     "UUID1:ThreadID1:write": 1,
  5.     "UUID1:ThreadID1": 1
  6. }
  7. //String结构
  8. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
复制代码
首先执行命令"hincrby myLock UUID1:ThreadID1 -1",将这个客户端线程对应的加读锁次数递减1,counter由1变成0。然后执行命令"hdel myLock UUID1:ThreadID1",即删除用来记录当前客户端线程重入锁次数的key。接着执行"del {myLock}:UUID1:ThreadID1:rwlock_timeout:1",即删除用来记录当前客户端线程第1个重入锁过期时间的key。
 
此时myLock锁的数据变成如下:
  1. //Hash结构
  2. myLock: {
  3.     "mode": "write",
  4.     "UUID1:ThreadID1:write": 1
  5. }
复制代码
接着执行命令"hlen myLock > 1",判断Hash里的元素是否超过1个。发现判断通过,但由于没有了读锁,所以最后会判断mode如果是write,就返回0。
 
9.写锁RedissonWriteLock的释放写锁逻辑
(1)释放写锁前主要有两种情况
(2)RedissonWriteLock的释放写锁的lua脚本
(3)执行释放写锁的lua脚本
 
(1)释放写锁前主要有两种情况
情况一:同一个客户端线程多次重入加写锁
情况二:同一个客户端线程先加写锁再加读锁
这两种情况的锁数据可以合并为如下:
  1. //Hash结构
  2. myLock: {
  3.     "mode": "write",
  4.     "UUID1:ThreadID1:write": 2,
  5.     "UUID1:ThreadID1": 1
  6. }
  7. //String结构
  8. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
复制代码
接下来以这种锁数据为前提进行lua脚本分析。
 
(2)RedissonWriteLock的释放写锁的lua脚本
  1. public class RedissonWriteLock extends RedissonLock implements RLock {
  2.     ...
  3.     @Override
  4.     protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  5.         return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  6.             //首先执行命令"hget myLock mode",发现mode=write
  7.             "local mode = redis.call('hget', KEYS[1], 'mode'); " +
  8.             "if (mode == false) then " +
  9.                 "redis.call('publish', KEYS[2], ARGV[1]); " +
  10.                 "return 1; " +
  11.             "end;" +
  12.             "if (mode == 'write') then " +
  13.                 //然后执行命令"hexists myLock UUID1:ThreadIdD1:write",发现存在
  14.                 "local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
  15.                 "if (lockExists == 0) then " +
  16.                     "return nil;" +
  17.                 "else " +
  18.                     //于是接着执行命令"hincrby myLock UUID1:ThreadID1:write -1"
  19.                     "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
  20.                     "if (counter > 0) then " +
  21.                         //当counter大于0,说明还有线程持有写锁,那么就重置锁的过期时间
  22.                         "redis.call('pexpire', KEYS[1], ARGV[2]); " +
  23.                         "return 0; " +
  24.                     "else " +
  25.                         //当counter为0,就执行命令"hdel myLock UUID1:ThreadID1:write"
  26.                         "redis.call('hdel', KEYS[1], ARGV[3]); " +
  27.                         //判断key为锁名的Hash里元素是否超过1个
  28.                         "if (redis.call('hlen', KEYS[1]) == 1) then " +
  29.                             //如果只有1个,则说明没有线程持有锁了,此时可以删除掉锁对应的key
  30.                             "redis.call('del', KEYS[1]); " +
  31.                             "redis.call('publish', KEYS[2], ARGV[1]); " +
  32.                         "else " +
  33.                             //如果有超过1个,则说明还有线程持有读锁,此时需要将写锁转读锁
  34.                             "redis.call('hset', KEYS[1], 'mode', 'read'); " +
  35.                         "end; " +
  36.                         "return 1; "+
  37.                     "end; " +
  38.                 "end; " +
  39.             "end; " +
  40.             "return nil;",
  41.             //KEYS[1] = myLock,KEYS[2] = redisson_rwlock:{myLock}
  42.             Arrays.<Object>asList(getRawName(), getChannelName()),
  43.             LockPubSub.READ_UNLOCK_MESSAGE,//ARGV[1] = 0
  44.             internalLockLeaseTime,//ARGV[2] = 30000
  45.             getLockName(threadId)//ARGV[3] = UUID1:ThreadID1:write
  46.         );
  47.     }
  48.     ...
  49. }
复制代码
(3)执行释放写锁的lua脚本
一.参数说明
  1. KEYS[1] = myLock
  2. KEYS[2] = redisson_rwlock:{myLock}
  3. ARGV[1] = 0
  4. ARGV[2] = 30000
  5. ARGV[3] = UUID1:ThreadID1:write
复制代码
二.lua脚本执行分析
首先执行命令"hget myLock mode",发现mode = write。然后执行命令"hexists myLock UUID1:ThreadIdD1:write",发现存在。于是接着执行命令"hincrby myLock UUID1:ThreadID1:write -1",也就是将这个客户端线程对应的加写锁次数递减1,counter由2变成1。当counter大于0,说明还有线程持有写锁,那么就重置锁的过期时间。当counter为0,就执行命令"hdel myLock UUID1:ThreadID1:write",即删除用来记录当前客户端线程重入写锁次数的key。
 
删除后,myLock的锁数据如下:
  1. //Hash结构
  2. myLock: {
  3.     "mode": "write",
  4.     "UUID1:ThreadID1": 1
  5. }
  6. //String结构
  7. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
复制代码
接着执行命令"hlen myLock",判断key为锁名的Hash里元素是否超过1个。如果只有1个,则说明没有线程持有锁了,此时可以删除掉锁对应的key。如果有超过1个,则说明还有线程持有读锁,此时需要将写锁转读锁。
 
因此,最后myLock的锁数据如下:
  1. //Hash结构
  2. myLock: {
  3.     "mode": "read",
  4.     "UUID1:ThreadID1": 1
  5. }
  6. //String结构
  7. {myLock}:UUID1:ThreadID1:rwlock_timeout:1 ==> 1
复制代码
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册