找回密码
 立即注册
首页 业界区 业界 23. Java JUC源码分析系列笔记-ReentrantReadWriteLock ...

23. Java JUC源码分析系列笔记-ReentrantReadWriteLock

觞刈 昨天 15:50
目录

  • 1. ReentrantReadWriteLock是什么
  • 2. 非公平ReentrantReadWriteLock

    • 2.1. 是什么
    • 2.2. 怎么使用
    • 2.3. 源码分析

      • 2.3.1. uml
      • 2.3.2. 构造方法
      • 2.3.3. 读锁加锁

        • 2.3.3.1. 使用AQS加共享锁

          • 2.3.3.1.1. 使用Sync尝试加共享锁

            • 2.3.3.1.1.1. 判断是否需要阻塞读【非公平锁】
            • 2.3.3.1.1.2. 快速尝试加锁失败,那么改用死循环加锁

          • 2.3.3.1.2. 尝试加锁失败,那么入AQS队列并阻塞,等待唤醒继续抢占锁

            • 2.3.3.1.2.1. 加入AQS队列并阻塞



      • 2.3.4. 读锁解锁

        • 2.3.4.1. 使用AQS解共享锁

          • 2.3.4.1.1. 使用Sync尝试解锁
          • 2.3.4.1.2. 所有共享锁都被释放,唤醒AQS队列中头节点的下一个节点

            • 2.3.4.1.2.1. 唤醒AQS队列中头节点的下一个节点



      • 2.3.5. 写锁加锁

        • 2.3.5.1. 调用AQS加互斥锁

          • 2.3.5.1.1. 使用Sync尝试加互斥锁

            • 2.3.5.1.1.1. 判断写是否需要阻塞【非公平锁】

          • 2.3.5.1.2. 尝试加锁失败,那么入AQS队列并阻塞,等待唤醒继续抢占锁


      • 2.3.6. 写锁解锁

        • 2.3.6.1. 调用AQS解互斥锁

          • 2.3.6.1.1. 调用Sync尝试解互斥锁
          • 2.3.6.1.2. 尝试解锁成功后,唤醒AQS队列中头节点的下一个节点




  • 3. 公平ReentrantReadWriteLock

    • 3.1. 是什么
    • 3.2. 怎么使用
    • 3.3. 源码分析

      • 3.3.1. uml
      • 3.3.2. 构造方法
      • 3.3.3. 读锁加锁

        • 3.3.3.1. 使用AQS加共享锁

          • 3.3.3.1.1. 使用Sync尝试加共享锁

            • 3.3.3.1.1.1. 判断是否需要阻塞读【公平】
            • 3.3.3.1.1.2. 快速尝试加锁失败,那么改用死循环加锁

          • 3.3.3.1.2. 尝试加锁失败,那么入AQS队列并阻塞,等待唤醒继续抢占锁

            • 3.3.3.1.2.1. 加入AQS队列并阻塞



      • 3.3.4. 读锁解锁

        • 3.3.4.1. 使用AQS解共享锁

          • 3.3.4.1.1. 使用Sync尝试解锁
          • 3.3.4.1.2. 所有共享锁都被释放,唤醒AQS队列中头节点的下一个节点

            • 3.3.4.1.2.1. 唤醒AQS队列中头节点的下一个节点



      • 3.3.5. 写锁加锁

        • 3.3.5.1. 调用AQS加互斥锁

          • 3.3.5.1.1. 使用Sync尝试加互斥锁

            • 3.3.5.1.1.1. 判断写是否需要阻塞【公平】

          • 3.3.5.1.2. 尝试加锁失败,那么入AQS队列并阻塞,等待唤醒继续抢占锁


      • 3.3.6. 写锁解锁

        • 3.3.6.1. 调用AQS解互斥锁

          • 3.3.6.1.1. 调用Sync尝试解互斥锁
          • 3.3.6.1.2. 尝试解锁成功后,唤醒AQS队列中头节点的下一个节点




  • 4. 参考

1. ReentrantReadWriteLock是什么

ReentrantLock保证了同一时间只有一个线程可以在临界区读或者写数据,这意味着如果有两个读线程同时读取数据,ReentrantLock也只允许其中一个通过,但我们想要的是读可以并发执行,一旦有写则其他线程等待。如下表:
是否可以同时进行读写读√×写××因此,ReentrantReadWriteLock就诞生了
2. 非公平ReentrantReadWriteLock

2.1. 是什么

无论队列前面是否有人排队等待锁,我直接去抢
2.2. 怎么使用
  1. public class ReadWriteLockTest
  2. {
  3.     private static ReadWriteLock lock = new ReentrantReadWriteLock();//默认非公平
  4.     private static Lock readLock = lock.readLock();
  5.     private static Lock writeLock = lock.writeLock();
  6.     private static List<Integer> data = new ArrayList<>();
  7.     public static void main(String[] args) throws InterruptedException
  8.     {
  9.         Thread readThread = new Thread(() -> {
  10.             while (true)
  11.             {
  12.                 try
  13.                 {
  14.                     TimeUnit.MILLISECONDS.sleep(500);
  15.                     readLock.lock();
  16.                     System.out.println(Thread.currentThread().getName() + " read: " + data);
  17.                 }
  18.                 catch (InterruptedException e)
  19.                 {
  20.                     e.printStackTrace();
  21.                 }
  22.                 finally
  23.                 {
  24.                     readLock.unlock();
  25.                 }
  26.             }
  27.         });
  28.         Thread readThread2 = new Thread(() -> {
  29.             while (true)
  30.             {
  31.                 try
  32.                 {
  33.                     TimeUnit.MILLISECONDS.sleep(300);
  34.                     readLock.lock();
  35.                     System.out.println(Thread.currentThread().getName() + " read: " + data);
  36.                 }
  37.                 catch (InterruptedException e)
  38.                 {
  39.                     e.printStackTrace();
  40.                 }
  41.                 finally
  42.                 {
  43.                     readLock.unlock();
  44.                 }
  45.             }
  46.         });
  47.         Thread writeThread = new Thread(() -> {
  48.             int i = 0;
  49.             while (true)
  50.             {
  51.                 try
  52.                 {
  53.                     TimeUnit.MILLISECONDS.sleep(200);
  54.                     writeLock.lock();
  55.                     if (i % 2 == 0)
  56.                     {
  57.                         data.add(i);
  58.                     }else
  59.                     {
  60.                         data.remove(0);
  61.                     }
  62.                     i++;
  63.                 }
  64.                 catch (InterruptedException e)
  65.                 {
  66.                     e.printStackTrace();
  67.                 }
  68.                 finally
  69.                 {
  70.                     writeLock.unlock();
  71.                 }
  72.             }
  73.         });
  74.         readThread.start();
  75.         readThread2.start();
  76.         writeThread.start();
  77.         readThread.join();
  78.         readThread2.join();
  79.         writeThread.join();
  80.     }
  81. }
复制代码
2.3. 源码分析

2.3.1. uml
  1. @startuml
  2. skinparam classAttributeIconSize 0
  3. interface AQS{
  4. }
  5. class Sync{
  6. }
  7. interface Lock{
  8. }
  9. interface ReadWriteLock{
  10. }
  11. class ReentrantReadWriteLock{
  12. }
  13. class WriteLock{
  14. }
  15. class ReadLock{
  16. }
  17. class WriteLock{
  18. }
  19. Lock <|-- ReadLock
  20. Lock <|-- WriteLock
  21. ReadWriteLock <|-- ReentrantReadWriteLock
  22. AQS <|-- Sync
  23. ReentrantReadWriteLock --> ReadLock
  24. ReentrantReadWriteLock --> WriteLock
  25. ReadLock --> Sync
  26. WriteLock --> Sync
  27. @enduml
复制代码
2.3.2. 构造方法


  • ReentrantReadWriteLock
  1. public ReentrantReadWriteLock() {
  2.     //默认是false
  3.     this(false);
  4. }
  5. public ReentrantReadWriteLock(boolean fair) {
  6.     //初始化了Sync
  7.     //false的话使用的是NonfairSync
  8.     sync = fair ? new FairSync() : new NonfairSync();
  9.     //初始化读写锁
  10.     readerLock = new ReadLock(this);
  11.     writerLock = new WriteLock(this);
  12. }
复制代码

  • ReentrantReadWriteLock.ReadLock
  1. protected ReadLock(ReentrantReadWriteLock lock) {
  2.     //其实就是保存了ReentrantReadWriteLock的Sync
  3.     sync = lock.sync;
  4. }
复制代码

  • ReentrantReadWriteLock.WriteLock
  1. protected WriteLock(ReentrantReadWriteLock lock) {
  2.     //其实就是保存了ReentrantReadWriteLock的Sync
  3.     sync = lock.sync;
  4. }
复制代码
2.3.3. 读锁加锁


  • ReentrantReadWriteLock.readLock
  1. //返回就是读锁
  2. public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
复制代码

  • ReadLock.lock
  1. public void lock() {
  2.     //使用AQS加共享锁
  3.     sync.acquireShared(1);
  4. }
复制代码
2.3.3.1. 使用AQS加共享锁


  • AQS acquireShared
  1. public final void acquireShared(int arg) {
  2.     //ReentrantReadWriteLock的Sync重写了tryAcquireShared
  3.     //所以这里调用的是Sync的tryAcquireShared
  4.    
  5.     //如果返回<0说明尝试加锁失败,执行doAcquireShared入AQS队列并阻塞,等待唤醒
  6.     //返回>=0说明加锁成功,执行后续的业务逻辑
  7.     if (tryAcquireShared(arg) < 0)
  8.         doAcquireShared(arg);
  9. }
复制代码
2.3.3.1.1. 使用Sync尝试加共享锁


  • ReentrantReadWriteLock.Sync.tryAcquireShared
  1. protected final int tryAcquireShared(int unused) {
  2.     //当前线程
  3.     Thread current = Thread.currentThread();
  4.     //当前的state数目(或者说加锁【既包括读锁也包括写锁】的数量)
  5.     int c = getState();
  6.     //从state中获取互斥锁的数目(写锁的数量)
  7.     //如果数量不为0(即>0)说明已经加了写锁
  8.     if (exclusiveCount(c) != 0 &&
  9.         //判断加锁的线程是否当前线程
  10.         getExclusiveOwnerThread() != current)
  11.         //不是的话返回-1表示已经有其他线程加了写锁(后面就需要入AQS队列阻塞等待唤醒)
  12.         return -1;
  13.    
  14.     //走到这里说明没有线程加写锁或者加写锁的就是本线程
  15.    
  16.     //从state中获取共享锁的数目(读锁的数目)
  17.     int r = sharedCount(c);
  18.     //非公平锁:这里调用NonfairSync的readerShouldBlock判断是否需要阻塞读
  19.     //返回false的话不需要阻塞,接着执行&&后面的逻辑
  20.     if (!readerShouldBlock() &&
  21.         //判断读锁数量是否小于最大数目
  22.         r < MAX_COUNT &&
  23.         //使用CAS加读锁
  24.         compareAndSetState(c, c + SHARED_UNIT)) {
  25.         //上面的if中的逻辑执行完毕后,锁state已经加好了读锁
  26.         //下面的逻辑负责处理其他需要修改的属性
  27.         //第一次加读锁
  28.         if (r == 0) {
  29.             //设置第一次加锁的线程以及初始化读锁数量
  30.             firstReader = current;
  31.             firstReaderHoldCount = 1;
  32.         //第n次加锁的仍旧是第一次加锁的线程
  33.         } else if (firstReader == current) {
  34.             //修改读锁数量即可
  35.             firstReaderHoldCount++;
  36.         //第n次加锁的不是第一次加锁的线程
  37.         } else {
  38.             //cachedHoldCounter是缓存
  39.             HoldCounter rh = cachedHoldCounter;
  40.             if (rh == null || rh.tid != getThreadId(current))
  41.                 cachedHoldCounter = rh = readHolds.get();
  42.             else if (rh.count == 0)
  43.                 readHolds.set(rh);
  44.             rh.count++;
  45.         }
  46.         //返回1(正数)表示加读锁成功
  47.         return 1;
  48.     }
  49.     //走到这里说明发生了以下几种情况:
  50.     //1.需要阻塞读
  51.     //2.读锁数目已经超过最大值
  52.     //3.CAS加读锁失败
  53.     return fullTryAcquireShared(current);
  54. }
复制代码
2.3.3.1.1.1. 判断是否需要阻塞读【非公平锁】


  • ReentrantReadWriteLock.NonfairSync#readerShouldBlock
  1. final boolean readerShouldBlock() {
  2.     //队头是互斥节点(加写锁的节点)情况下才需要阻塞读
  3.     //这就是非公平锁的特点
  4.     //即使队列前面已经有其他加读锁的线程等待我也不管
  5.     return apparentlyFirstQueuedIsExclusive();
  6. }
复制代码

  • apparentlyFirstQueuedIsExclusive
  1. final boolean apparentlyFirstQueuedIsExclusive() {
  2.     Node h, s;
  3.     //队列不为空
  4.     return (h = head) != null &&
  5.         //并且 队列的实际头节点【之所以说实际队列的头节点是个占位符】不为空
  6.         (s = h.next)  != null &&
  7.         //并且 队列的实际头节点不是共享的头节点(即加的不是读锁)
  8.         !s.isShared()         &&
  9.         //并且 队列的实际头节点的线程不为空
  10.         s.thread != null;
  11.     //满足以上所有情况才需要阻塞当前加读锁的线程
  12. }
复制代码
2.3.3.1.1.2. 快速尝试加锁失败,那么改用死循环加锁


  • ReentrantReadWriteLock.Sync#fullTryAcquireShared
  1. final int fullTryAcquireShared(Thread current) {
  2.     HoldCounter rh = null;
  3.     //死循环
  4.     for (;;) {
  5.         int c = getState();
  6.         //写锁的数量不为0
  7.         if (exclusiveCount(c) != 0) {
  8.             //且加写锁的不是本线程
  9.             if (getExclusiveOwnerThread() != current)
  10.                 //返回-1
  11.                 return -1;
  12.             // else we hold the exclusive lock; blocking here
  13.             // would cause deadlock.
  14.         //没有人加写锁,如果读需要阻塞
  15.         } else if (readerShouldBlock()) {
  16.             // Make sure we're not acquiring read lock reentrantly
  17.             if (firstReader == current) {
  18.                 // assert firstReaderHoldCount > 0;
  19.             } else {
  20.                 if (rh == null) {
  21.                     rh = cachedHoldCounter;
  22.                     if (rh == null || rh.tid != getThreadId(current)) {
  23.                         rh = readHolds.get();
  24.                         if (rh.count == 0)
  25.                             readHolds.remove();
  26.                     }
  27.                 }
  28.                 if (rh.count == 0)
  29.                     return -1;
  30.             }
  31.         }
  32.         //走到这里说明没有加写锁,读也不需要阻塞
  33.         if (sharedCount(c) == MAX_COUNT)
  34.             //加锁已超过最大值
  35.             throw new Error("Maximum lock count exceeded");
  36.         //加读锁,同上面的tryAcquireShared
  37.         if (compareAndSetState(c, c + SHARED_UNIT)) {
  38.             if (sharedCount(c) == 0) {
  39.                 firstReader = current;
  40.                 firstReaderHoldCount = 1;
  41.             } else if (firstReader == current) {
  42.                 firstReaderHoldCount++;
  43.             } else {
  44.                 if (rh == null)
  45.                     rh = cachedHoldCounter;
  46.                 if (rh == null || rh.tid != getThreadId(current))
  47.                     rh = readHolds.get();
  48.                 else if (rh.count == 0)
  49.                     readHolds.set(rh);
  50.                 rh.count++;
  51.                 cachedHoldCounter = rh; // cache for release
  52.             }
  53.             return 1;
  54.         }
  55.     }
  56. }
复制代码
2.3.3.1.2. 尝试加锁失败,那么入AQS队列并阻塞,等待唤醒继续抢占锁
  1. private void doAcquireShared(int arg) {
  2.     //构造SHARE节点加入AQS队列阻塞等待唤醒
  3.     final Node node = addWaiter(Node.SHARED);
  4.     boolean failed = true;
  5.     try {
  6.         boolean interrupted = false;
  7.         //死循环抢占锁
  8.         for (;;) {
  9.             //当前节点的前一个节点是头节点
  10.             final Node p = node.predecessor();
  11.             if (p == head) {
  12.                 //继续尝试加共享锁
  13.                 int r = tryAcquireShared(arg);
  14.                 if (r >= 0) {
  15.                     setHeadAndPropagate(node, r);
  16.                     p.next = null; // help GC
  17.                     if (interrupted)
  18.                         selfInterrupt();
  19.                     failed = false;
  20.                     return;
  21.                 }
  22.             }
  23.             //判断是否需要阻塞
  24.             if (shouldParkAfterFailedAcquire(p, node) &&
  25.                 //需要的话进行阻塞
  26.                 parkAndCheckInterrupt())
  27.                 interrupted = true;
  28.         }
  29.     } finally {
  30.         if (failed)
  31.             cancelAcquire(node);
  32.     }
  33. }
复制代码
2.3.3.1.2.1. 加入AQS队列并阻塞


  • addWaiter
  1. private Node addWaiter(Node mode) {
  2.     Node node = new Node(Thread.currentThread(), mode);
  3.     // Try the fast path of enq; backup to full enq on failure
  4.     //快速尝试加入队尾
  5.     Node pred = tail;
  6.     if (pred != null) {
  7.         node.prev = pred;
  8.         if (compareAndSetTail(pred, node)) {
  9.             pred.next = node;
  10.             return node;
  11.         }
  12.     }
  13.     //快速尝试加入队尾失败,那么改用enq加入队尾
  14.     enq(node);
  15.     return node;
  16. }
复制代码
参考:5.AQS.md
2.3.4. 读锁解锁


  • ReentrantReadWriteLock.ReadLock#unlock
  1. public void unlock() {
  2.     //使用AQS解共享锁
  3.     sync.releaseShared(1);
  4. }
复制代码
2.3.4.1. 使用AQS解共享锁


  • AbstractQueuedSynchronizer#releaseShared
  1. public final boolean releaseShared(int arg) {
  2.     //调用Sync尝试解共享锁
  3.     //如果共享锁【读锁】已经全部释放完,那么执行doReleaseShared
  4.     if (tryReleaseShared(arg)) {
  5.         doReleaseShared();
  6.         return true;
  7.     }
  8.     return false;
  9. }
复制代码
2.3.4.1.1. 使用Sync尝试解锁


  • ReentrantReadWriteLock.Sync#tryReleaseShared
  1. protected final boolean tryReleaseShared(int unused) {
  2.     Thread current = Thread.currentThread();
  3.     //当前线程就是第一次加读锁的线程
  4.     if (firstReader == current) {
  5.         //所有读锁都释放完毕
  6.         if (firstReaderHoldCount == 1)
  7.             //那么置加锁线程为空
  8.             firstReader = null;
  9.         //没有释放完那么减读锁数量
  10.         else
  11.             firstReaderHoldCount--;
  12.     //当前线程不是第一次加读锁的线程
  13.     } else {
  14.         HoldCounter rh = cachedHoldCounter;
  15.         if (rh == null || rh.tid != getThreadId(current))
  16.             rh = readHolds.get();
  17.         int count = rh.count;
  18.         if (count <= 1) {
  19.             readHolds.remove();
  20.             if (count <= 0)
  21.                 throw unmatchedUnlockException();
  22.         }
  23.         --rh.count;
  24.     }
  25.     //死循环修改state数量
  26.     for (;;) {
  27.         int c = getState();
  28.         int nextc = c - SHARED_UNIT;
  29.         //CAS修改state数量
  30.         if (compareAndSetState(c, nextc))
  31.             return nextc == 0;//减为0了那么返回true
  32.     }
  33. }
复制代码
3.3. 源码分析

3.3.1. uml
  1. @startuml
  2. skinparam classAttributeIconSize 0
  3. interface AQS{
  4. }
  5. class Sync{
  6. }
  7. interface Lock{
  8. }
  9. interface ReadWriteLock{
  10. }
  11. class ReentrantReadWriteLock{
  12. }
  13. class WriteLock{
  14. }
  15. class ReadLock{
  16. }
  17. class WriteLock{
  18. }
  19. Lock <|-- ReadLock
  20. Lock <|-- WriteLock
  21. ReadWriteLock <|-- ReentrantReadWriteLock
  22. AQS <|-- Sync
  23. ReentrantReadWriteLock --> ReadLock
  24. ReentrantReadWriteLock --> WriteLock
  25. ReadLock --> Sync
  26. WriteLock --> Sync
  27. @enduml
复制代码
3.3.2. 构造方法


  • ReentrantReadWriteLock
  1. private void unparkSuccessor(Node node) {
  2.    
  3.     int ws = node.waitStatus;
  4.     //如果当前节点的状态是正常的?
  5.     if (ws < 0)
  6.         //当前节点的状态<0,则把状态改为0
  7.         //0是空的状态,因为node这个节点的线程释放了锁后续不需要做任何
  8.         compareAndSetWaitStatus(node, ws, 0);
  9.    
  10.      //获取当前节点的下一个节点
  11.     Node s = node.next;
  12.     //如果下一个节点是空(即当前节点是尾节点)或者下一个节点的状态>0(取消)
  13.     if (s == null || s.waitStatus > 0) {
  14.         s = null;
  15.         //从尾节点往前遍历至当前节点
  16.         for (Node t = tail; t != null && t != node; t = t.prev)
  17.             //找到最靠近当前节点的状态<=0(非取消)的节点
  18.             if (t.waitStatus <= 0)
  19.                 s = t;
  20.     }
  21.     //唤醒当前节点的下一个节点
  22.     if (s != null)
  23.         LockSupport.unpark(s.thread);
  24. }
复制代码

  • ReentrantReadWriteLock.ReadLock
  1. protected ReadLock(ReentrantReadWriteLock lock) {
  2.     //其实就是保存了ReentrantReadWriteLock的Sync
  3.     sync = lock.sync;
  4. }
复制代码

  • ReentrantReadWriteLock.WriteLock
  1. protected WriteLock(ReentrantReadWriteLock lock) {
  2.     //其实就是保存了ReentrantReadWriteLock的Sync
  3.     sync = lock.sync;
  4. }
复制代码
3.3.3. 读锁加锁


  • ReentrantReadWriteLock.readLock
  1. //返回就是读锁
  2. public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }
复制代码

  • ReadLock.lock
  1. public void lock() {
  2.     //使用AQS加共享锁
  3.     sync.acquireShared(1);
  4. }
复制代码
3.3.3.1. 使用AQS加共享锁


  • AQS acquireShared
  1. public final void acquireShared(int arg) {
  2.     //ReentrantReadWriteLock的Sync重写了tryAcquireShared
  3.     //所以这里调用的是Sync的tryAcquireShared
  4.    
  5.     //如果返回<0说明尝试加锁失败,执行doAcquireShared入AQS队列并阻塞,等待唤醒
  6.     //返回>=0说明加锁成功,执行后续的业务逻辑
  7.     if (tryAcquireShared(arg) < 0)
  8.         doAcquireShared(arg);
  9. }
复制代码
3.3.3.1.1. 使用Sync尝试加共享锁


  • ReentrantReadWriteLock.Sync.tryAcquireShared
  1. protected final int tryAcquireShared(int unused) {
  2.     //当前线程
  3.     Thread current = Thread.currentThread();
  4.     //当前的state数目(或者说加锁【既包括读锁也包括写锁】的数量)
  5.     int c = getState();
  6.     //从state中获取互斥锁的数目(写锁的数量)
  7.     //如果数量不为0(即>0)说明已经加了写锁
  8.     if (exclusiveCount(c) != 0 &&
  9.         //判断加锁的线程是否当前线程
  10.         getExclusiveOwnerThread() != current)
  11.         //不是的话返回-1表示已经有其他线程加了写锁(后面就需要入AQS队列阻塞等待唤醒)
  12.         return -1;
  13.    
  14.     //走到这里说明没有线程加写锁或者加写锁的就是本线程
  15.    
  16.     //从state中获取共享锁的数目(读锁的数目)
  17.     int r = sharedCount(c);
  18.     //非公平锁:这里调用NonfairSync的readerShouldBlock判断是否需要阻塞读
  19.     //返回false的话不需要阻塞,接着执行&&后面的逻辑
  20.     if (!readerShouldBlock() &&
  21.         //判断读锁数量是否小于最大数目
  22.         r < MAX_COUNT &&
  23.         //使用CAS加读锁
  24.         compareAndSetState(c, c + SHARED_UNIT)) {
  25.         //上面的if中的逻辑执行完毕后,锁state已经加好了读锁
  26.         //下面的逻辑负责处理其他需要修改的属性
  27.         //第一次加读锁
  28.         if (r == 0) {
  29.             //设置第一次加锁的线程以及初始化读锁数量
  30.             firstReader = current;
  31.             firstReaderHoldCount = 1;
  32.         //第n次加锁的仍旧是第一次加锁的线程
  33.         } else if (firstReader == current) {
  34.             //修改读锁数量即可
  35.             firstReaderHoldCount++;
  36.         //第n次加锁的不是第一次加锁的线程
  37.         } else {
  38.             //cachedHoldCounter是缓存
  39.             HoldCounter rh = cachedHoldCounter;
  40.             if (rh == null || rh.tid != getThreadId(current))
  41.                 cachedHoldCounter = rh = readHolds.get();
  42.             else if (rh.count == 0)
  43.                 readHolds.set(rh);
  44.             rh.count++;
  45.         }
  46.         //返回1(正数)表示加读锁成功
  47.         return 1;
  48.     }
  49.     //走到这里说明发生了以下几种情况:
  50.     //1.需要阻塞读
  51.     //2.读锁数目已经超过最大值
  52.     //3.CAS加读锁失败
  53.     return fullTryAcquireShared(current);
  54. }
复制代码
3.3.3.1.1.1. 判断是否需要阻塞读【公平】


  • ReentrantReadWriteLock.FairSync#readerShouldBlock
  1. //...
复制代码

  • AQS hasQueuedPredecessors
  1. public void unlock() {
  2.     //AQS解锁
  3.     sync.release(1);
  4. }
复制代码
3.3.3.1.1.2. 快速尝试加锁失败,那么改用死循环加锁


  • ReentrantReadWriteLock.Sync#fullTryAcquireShared
  1. final int fullTryAcquireShared(Thread current) {
  2.     HoldCounter rh = null;
  3.     //死循环
  4.     for (;;) {
  5.         int c = getState();
  6.         //写锁的数量不为0
  7.         if (exclusiveCount(c) != 0) {
  8.             //且加写锁的不是本线程
  9.             if (getExclusiveOwnerThread() != current)
  10.                 //返回-1
  11.                 return -1;
  12.             // else we hold the exclusive lock; blocking here
  13.             // would cause deadlock.
  14.         //没有人加写锁,如果读需要阻塞
  15.         } else if (readerShouldBlock()) {
  16.             // Make sure we're not acquiring read lock reentrantly
  17.             if (firstReader == current) {
  18.                 // assert firstReaderHoldCount > 0;
  19.             } else {
  20.                 if (rh == null) {
  21.                     rh = cachedHoldCounter;
  22.                     if (rh == null || rh.tid != getThreadId(current)) {
  23.                         rh = readHolds.get();
  24.                         if (rh.count == 0)
  25.                             readHolds.remove();
  26.                     }
  27.                 }
  28.                 if (rh.count == 0)
  29.                     return -1;
  30.             }
  31.         }
  32.         //走到这里说明没有加写锁,读也不需要阻塞
  33.         if (sharedCount(c) == MAX_COUNT)
  34.             //加锁已超过最大值
  35.             throw new Error("Maximum lock count exceeded");
  36.         //加读锁,同上面的tryAcquireShared
  37.         if (compareAndSetState(c, c + SHARED_UNIT)) {
  38.             if (sharedCount(c) == 0) {
  39.                 firstReader = current;
  40.                 firstReaderHoldCount = 1;
  41.             } else if (firstReader == current) {
  42.                 firstReaderHoldCount++;
  43.             } else {
  44.                 if (rh == null)
  45.                     rh = cachedHoldCounter;
  46.                 if (rh == null || rh.tid != getThreadId(current))
  47.                     rh = readHolds.get();
  48.                 else if (rh.count == 0)
  49.                     readHolds.set(rh);
  50.                 rh.count++;
  51.                 cachedHoldCounter = rh; // cache for release
  52.             }
  53.             return 1;
  54.         }
  55.     }
  56. }
复制代码
3.3.3.1.2. 尝试加锁失败,那么入AQS队列并阻塞,等待唤醒继续抢占锁
  1. private void doAcquireShared(int arg) {
  2.     //构造SHARE节点加入AQS队列阻塞等待唤醒
  3.     final Node node = addWaiter(Node.SHARED);
  4.     boolean failed = true;
  5.     try {
  6.         boolean interrupted = false;
  7.         //死循环抢占锁
  8.         for (;;) {
  9.             //当前节点的前一个节点是头节点
  10.             final Node p = node.predecessor();
  11.             if (p == head) {
  12.                 //继续尝试加共享锁
  13.                 int r = tryAcquireShared(arg);
  14.                 if (r >= 0) {
  15.                     setHeadAndPropagate(node, r);
  16.                     p.next = null; // help GC
  17.                     if (interrupted)
  18.                         selfInterrupt();
  19.                     failed = false;
  20.                     return;
  21.                 }
  22.             }
  23.             //判断是否需要阻塞
  24.             if (shouldParkAfterFailedAcquire(p, node) &&
  25.                 //需要的话进行阻塞
  26.                 parkAndCheckInterrupt())
  27.                 interrupted = true;
  28.         }
  29.     } finally {
  30.         if (failed)
  31.             cancelAcquire(node);
  32.     }
  33. }
复制代码
3.3.3.1.2.1. 加入AQS队列并阻塞


  • addWaiter
  1. private Node addWaiter(Node mode) {
  2.     Node node = new Node(Thread.currentThread(), mode);
  3.     // Try the fast path of enq; backup to full enq on failure
  4.     //快速尝试加入队尾
  5.     Node pred = tail;
  6.     if (pred != null) {
  7.         node.prev = pred;
  8.         if (compareAndSetTail(pred, node)) {
  9.             pred.next = node;
  10.             return node;
  11.         }
  12.     }
  13.     //快速尝试加入队尾失败,那么改用enq加入队尾
  14.     enq(node);
  15.     return node;
  16. }
复制代码
参考:5.AQS.md
3.3.4. 读锁解锁


  • ReentrantReadWriteLock.ReadLock#unlock
  1. public void unlock() {
  2.     //使用AQS解共享锁
  3.     sync.releaseShared(1);
  4. }
复制代码
3.3.4.1. 使用AQS解共享锁


  • AbstractQueuedSynchronizer#releaseShared
  1. public final boolean releaseShared(int arg) {
  2.     //调用Sync尝试解共享锁
  3.     //如果共享锁【读锁】已经全部释放完,那么执行doReleaseShared
  4.     if (tryReleaseShared(arg)) {
  5.         doReleaseShared();
  6.         return true;
  7.     }
  8.     return false;
  9. }
复制代码
3.3.4.1.1. 使用Sync尝试解锁


  • ReentrantReadWriteLock.Sync#tryReleaseShared
[code]protected final boolean tryReleaseShared(int unused) {    Thread current = Thread.currentThread();    //当前线程就是第一次加读锁的线程    if (firstReader == current) {        //所有读锁都释放完毕        if (firstReaderHoldCount == 1)            //那么置加锁线程为空            firstReader = null;        //没有释放完那么减读锁数量        else            firstReaderHoldCount--;    //当前线程不是第一次加读锁的线程    } else {        HoldCounter rh = cachedHoldCounter;        if (rh == null || rh.tid != getThreadId(current))            rh = readHolds.get();        int count = rh.count;        if (count
您需要登录后才可以回帖 登录 | 立即注册