找回密码
 立即注册
首页 业界区 业界 Java源码分析系列笔记-6.ReentrantLock

Java源码分析系列笔记-6.ReentrantLock

锑砖 2025-6-24 10:29:54
目录

  • 1. 是什么

    • 1.1. synchronized vs ReentranLock

  • 2. 实现原理

    • 2.1. uml图

  • 3. 公平锁

    • 3.1. 如何使用
    • 3.2. 原理分析

      • 3.2.1. 构造方法

        • 3.2.1.1. 底层使用AQS实现

      • 3.2.2. 加锁

        • 3.2.2.1. 调用公平锁的lock方法
        • 3.2.2.2. 调用AQS的acquire方法获取锁
        • 3.2.2.3. 尝试获取锁【只有队头才允许抢占锁--公平锁】
        • 3.2.2.4. 尝试获取锁失败加入阻塞队列

          • 3.2.2.4.1. 入队的操作

        • 3.2.2.5. 阻塞,等待唤醒继续获取锁

          • 3.2.2.5.1. 判断是否需要阻塞

            • 3.2.2.5.1.1. 阻塞当前线程



      • 3.2.3. 解锁

        • 3.2.3.1. 使用AQS释放锁
        • 3.2.3.2. 尝试释放锁
        • 3.2.3.3. 释放锁成功后唤醒阻塞队列中的节点



  • 4. 非公平锁

    • 4.1. 如何使用
    • 4.2. 实现原理

      • 4.2.1. 构造方法
      • 4.2.2. 加锁

        • 4.2.2.1. 使用非公平锁加锁
        • 4.2.2.2. 通过AQS加锁
        • 4.2.2.3. 通过非公平锁尝试加锁

          • 4.2.2.3.1. 非公平锁尝试加锁的操作【不管是否队头都可以抢占锁--非公平锁】

        • 4.2.2.4. 尝试加锁失败,加入阻塞队列

          • 4.2.2.4.1. 加入队列的操作


      • 4.2.3. 解锁

        • 4.2.3.1. 使用AQS释放锁
        • 4.2.3.2. 尝试释放锁

          • 4.2.3.2.1. 释放锁成功后唤醒阻塞队列中的后续节点




  • 5. 参考

1. 是什么

在jdk5之前,synchronized效率极低,于是写了ReentranLock代替。
后来jdk7优化了synchronized,参考Java源码分析系列笔记-2.锁的优化 - ThinkerQAQ - 博客园。两者性能区别不大
1.1. synchronized vs ReentranLock

比较SynchronizedReentrantLock等待结合object wait/notify结合condition await/signal使用难度简单。jvm会处理加锁,解锁的过程麻烦。需要手动lock、unlock,且unlock得放在finally块中特性可重入 不可中断 非公平可重入 可中断 可公平实现原理monitorAQS2. 实现原理

2.1. uml图

1.png

由uml图可以看出ReentranLock底层是用AQS实现的,有一个Sync属性(继承AQS类),如果是非公平锁则用的NonfairSync实现类,否则用的FairSync类
具体的实现参考
3. 公平锁

所谓公平锁,遵循先到先得的原则。
即使锁已经被释放了,后到的也不能去抢占锁,得等到前面没人时才能去获取
3.1. 如何使用
  1. public class TestReentrantLock
  2. {
  3.     private static int val = 0;
  4.     private final static Lock lock = new ReentrantLock(true);//公平锁
  5.     public static void main(String[] args) throws InterruptedException
  6.     {
  7.         Thread thread1 = new Thread(() -> {
  8.             for (int i = 0; i < 100000; i++)
  9.             {
  10.                 try
  11.                 {
  12.                     lock.lock();
  13.                     val++;
  14.                 }
  15.                 finally
  16.                 {
  17.                     lock.unlock();
  18.                 }
  19.             }
  20.         });
  21.         Thread thread2 = new Thread(() -> {
  22.             for (int i = 0; i < 100000; i++)
  23.             {
  24.                 try
  25.                 {
  26.                     lock.lock();
  27.                     val--;
  28.                 }
  29.                 finally
  30.                 {
  31.                     lock.unlock();
  32.                 }
  33.             }
  34.         });
  35.         thread1.start();
  36.         thread2.start();
  37.         thread1.join();
  38.         thread2.join();
  39.         System.out.println(val);
  40.     }
  41. }
复制代码
3.2. 原理分析

3.2.1. 构造方法

3.2.1.1. 底层使用AQS实现
  1. public class ReentrantLock implements Lock, java.io.Serializable {
  2.     private final Sync sync;
  3.     //默认非公平锁
  4.     public ReentrantLock() {
  5.         sync = new NonfairSync();
  6.     }
  7.     public ReentrantLock(boolean fair) {
  8.         //true的话,公平锁使用FairSync,否则是NonfairSync
  9.         sync = fair ? new FairSync() : new NonfairSync();
  10.     }
  11.     //Sync是AQS的子类
  12.     abstract static class Sync extends AbstractQueuedSynchronizer {}
  13.     //FairSync是Sync的子类
  14.     static final class FairSync extends Sync {}
  15. }
复制代码
3.2.2. 加锁


  • lock
  1. public void lock() {
  2.     //调用FairSync的lock
  3.     sync.lock();
  4. }
复制代码
3.2.2.1. 调用公平锁的lock方法


  • FairSync.lock
  1. final void lock() {
  2.     //调用AQS的acquire
  3.     acquire(1);
  4. }
复制代码
3.2.2.2. 调用AQS的acquire方法获取锁


  • AQS.acquire
  1. public final void acquire(int arg) {
  2.     //调用FairSync的tryAcquire获取锁
  3.     if (!tryAcquire(arg) &&
  4.             //获取锁失败加入AQS队列。并且死循环阻塞当前线程,等待唤醒继续获取锁
  5.         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  6.             //恢复中断标记
  7.         selfInterrupt();
  8. }
复制代码
由于FairSync重写了AQS的tryAcquire方法,因此这里会调用FairSync的tryAcquire
其他的逻辑同5.AQS.md,下面只是简要说一下主要逻辑
3.2.2.3. 尝试获取锁【只有队头才允许抢占锁--公平锁】


  • FairSync.tryAcquire
  1. protected final boolean tryAcquire(int acquires) {
  2. final Thread current = Thread.currentThread();
  3.     int c = getState();
  4.     //锁尚未被获取
  5.     if (c == 0) {
  6.             //【公平锁】:队列中我的前面没人等待锁(队列为空或者我就是队列的队头)
  7.         if (!hasQueuedPredecessors() &&
  8.                 //CAS设置state获取锁成功
  9.             compareAndSetState(0, acquires)) {
  10.             //设置持有锁的线程为当前线程
  11.             setExclusiveOwnerThread(current);
  12.             return true;
  13.         }
  14.     }
  15.     //锁已经被获取,且是当前线程,那么重入
  16.     else if (current == getExclusiveOwnerThread()) {
  17.             //增加state量
  18.         int nextc = c + acquires;
  19.         if (nextc < 0)
  20.             throw new Error("Maximum lock count exceeded");
  21.         setState(nextc);
  22.         return true;
  23.     }
  24.     //获取锁失败返回false
  25.     return false;
  26. }
复制代码
3.2.2.4. 尝试获取锁失败加入阻塞队列


  • AQS.addWaiter
  1. private Node addWaiter(Node mode) {
  2.         //用当前线程、EXCLUSIVE模式构造节点
  3.     Node node = new Node(Thread.currentThread(), mode);
  4.     // 队列不为空
  5.     Node pred = tail;
  6.     if (pred != null) {
  7.             //插入到队尾
  8.         node.prev = pred;
  9.         if (compareAndSetTail(pred, node)) {
  10.             pred.next = node;
  11.             return node;
  12.         }
  13.     }
  14.     //队列为空或者插入到队尾失败
  15.     enq(node);
  16.     return node;
  17. }
复制代码
3.2.2.4.1. 入队的操作


  • enq
  1. private Node enq(final Node node) {
  2. //死循环直到入队成功
  3. for (;;) {
  4.     Node t = tail;
  5.         //队列为空,那么初始化头节点。注意是new Node而不是当前node(即队头是个占位符)
  6.     if (t == null) {
  7.         if (compareAndSetHead(new Node()))
  8.             tail = head;
  9.         //队列不为空,插入到队尾
  10.     } else {
  11.         node.prev = t;
  12.         if (compareAndSetTail(t, node)) {
  13.             t.next = node;
  14.             return t;
  15.         }
  16.     }
  17. }
  18. }
复制代码
3.2.2.5. 阻塞,等待唤醒继续获取锁


  • acquireQueued
  1. final boolean acquireQueued(final Node node, int arg) {
  2.     boolean failed = true;
  3.     try {
  4.         boolean interrupted = false;
  5.             //死循环直到获取锁成功
  6.         for (;;) {
  7.                 //逻辑1.
  8.                     //当前节点的前一个节点时头节点的时候(公平锁:即我的前面没有人等待获取锁),尝试获取锁
  9.             final Node p = node.predecessor();
  10.             if (p == head && tryAcquire(arg)) {
  11.                     //获取锁成功后设置头节点为当前节点
  12.                 setHead(node);
  13.                 p.next = null; // help GC
  14.                 failed = false;
  15.                 return interrupted;
  16.             }
  17.                 //逻辑2.
  18.             //当前节点的前一个节点状态时SIGNAL(承诺唤醒当前节点)的时候,阻塞当前线程。
  19.             //什么时候唤醒?释放锁的时候
  20.             //唤醒之后干什么?继续死循环执行上面的逻辑1
  21.             if (shouldParkAfterFailedAcquire(p, node) &&
  22.                 parkAndCheckInterrupt())
  23.                 interrupted = true;
  24.         }
  25.     } finally {
  26.             //何时执行这段逻辑?发生异常导致获取锁失败的时候
  27.         if (failed)
  28.             cancelAcquire(node);
  29.     }
  30. }
复制代码
3.2.2.5.1. 判断是否需要阻塞


  • shouldParkAfterFailedAcquire
  1. //根据(前一个节点,当前节点)->是否阻塞当前线程
  2. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  3.     int ws = pred.waitStatus;
  4.     //前一个节点的状态时SIGNAL,即释放锁后承诺唤醒当前节点,那么返回true可以阻塞当前线程
  5.     if (ws == Node.SIGNAL)
  6.         return true;
  7.     //前一个节点状态>0,即CANCEL。
  8.     //那么往前遍历找到没有取消的前置节点。同时从链表中移除CANCEL状态的节点
  9.     if (ws > 0) {
  10.         do {
  11.             node.prev = pred = pred.prev;
  12.         } while (pred.waitStatus > 0);
  13.         pred.next = node;
  14.     // 前置节点状态>=0,即0或者propagate。
  15.     //这里通过CAS把前置节点状态改成signal成功获取锁,失败的话再阻塞。why?
  16.     } else {
  17.         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  18.     }
  19.     return false;
  20. }
复制代码
3.2.2.5.1.1. 阻塞当前线程


  • parkAndCheckInterrupt
  1. private final boolean parkAndCheckInterrupt() {
  2.     //使用Unsafe阻塞当前线程,这里会清除线程中断的标记,因此需要返回中断的标记
  3.     LockSupport.park(this);
  4.     return Thread.interrupted();
  5. }
复制代码
3.2.3. 解锁
  1. public void unlock() {
  2.     //调用AQS的release方法
  3.     sync.release(1);
  4. }
复制代码
3.2.3.1. 使用AQS释放锁


  • release
  1. public final boolean release(int arg) {
  2.     //Sync重写了调用Sync释放锁成功
  3.     if (tryRelease(arg)) {
  4.         Node h = head;
  5.             //队头不为空且状态正常,那么唤醒头节点
  6.         if (h != null && h.waitStatus != 0)
  7.             unparkSuccessor(h);
  8.         return true;
  9.     }
  10.     return false;
  11. }
复制代码
Sync重写了tryRelease方法,因此这里调用的是Sync.tryRelease
其他的逻辑同5.AQS.md,下面只是简要说一下主要逻辑
3.2.3.2. 尝试释放锁


  • Sync.tryRelease
  1. protected final boolean tryRelease(int releases) {
  2.     //解锁
  3.     int c = getState() - releases;
  4.     //加锁解锁必须同一个线程
  5.     if (Thread.currentThread() != getExclusiveOwnerThread())
  6.         throw new IllegalMonitorStateException();
  7.     boolean free = false;
  8.     if (c == 0) {
  9.         //锁全部释放成功后,置占用锁的线程为空
  10.         free = true;
  11.         setExclusiveOwnerThread(null);
  12.     }
  13.     //CAS设置解锁
  14.     setState(c);
  15.     return free;
  16. }
复制代码
3.2.3.3. 释放锁成功后唤醒阻塞队列中的节点


  • AQS.unparkSuccessor
  1. private void unparkSuccessor(Node node) {
  2.     int ws = node.waitStatus;
  3.     //当前节点的状态<0,则把状态改为0
  4.     //0是空的状态,因为node这个节点的线程释放了锁后续不需要做任何
  5.     if (ws < 0)
  6.         compareAndSetWaitStatus(node, ws, 0);
  7.      //当前节点的下一个节点为空或者状态>0(即是取消状态)
  8.     Node s = node.next;
  9.     if (s == null || s.waitStatus > 0) {
  10.         s = null;
  11.         //那么从队尾开始往前遍历找到离当前节点最近的下一个状态<=0的节点(即非取消状态)
  12.         for (Node t = tail; t != null && t != node; t = t.prev)
  13.             if (t.waitStatus <= 0)
  14.                 s = t;
  15.     }
  16.         //唤醒下一个节点(公平锁)
  17.     if (s != null)
  18.         LockSupport.unpark(s.thread);
  19. }
复制代码
4.2. 实现原理

4.2.1. 构造方法
  1. public class TestReentrantLock
  2. {
  3.     private static int val = 0;
  4.     private final static Lock lock = new ReentrantLock();//非公平锁
  5.     public static void main(String[] args) throws InterruptedException
  6.     {
  7.         Thread thread1 = new Thread(() -> {
  8.             for (int i = 0; i < 100000; i++)
  9.             {
  10.                 try
  11.                 {
  12.                     lock.lock();
  13.                     val++;
  14.                 }
  15.                 finally
  16.                 {
  17.                     lock.unlock();
  18.                 }
  19.             }
  20.         });
  21.         Thread thread2 = new Thread(() -> {
  22.             for (int i = 0; i < 100000; i++)
  23.             {
  24.                 try
  25.                 {
  26.                     lock.lock();
  27.                     val--;
  28.                 }
  29.                 finally
  30.                 {
  31.                     lock.unlock();
  32.                 }
  33.             }
  34.         });
  35.         thread1.start();
  36.         thread2.start();
  37.         thread1.join();
  38.         thread2.join();
  39.         System.out.println(val);
  40.     }
  41. }
复制代码
4.2.2. 加锁
  1. public class ReentrantLock implements Lock, java.io.Serializable {
  2.     private final Sync sync;
  3.     //默认非公平锁
  4.     public ReentrantLock() {
  5.         sync = new NonfairSync();
  6.     }
  7.     public ReentrantLock(boolean fair) {
  8.         //true的话,公平锁使用FairSync,否则是NonfairSync
  9.         sync = fair ? new FairSync() : new NonfairSync();
  10.     }
  11.     //Sync是AQS的子类
  12.     abstract static class Sync extends AbstractQueuedSynchronizer {}
  13.     //FairSync是Sync的子类
  14.     static final class FairSync extends Sync {}
  15. }
复制代码
4.2.2.1. 使用非公平锁加锁


  • NonfairSync lock方法
  1. public void lock() {
  2.         //简单得调用Sync属性的lock方法。即NonfairSync的lock方法
  3.     sync.lock();
  4. }
复制代码
4.2.2.2. 通过AQS加锁


  • AQS.acquire方法
  1. final void lock() {
  2.         //获取锁。使用CAS设置state的值为1,这里state代表互斥量
  3.     if (compareAndSetState(0, 1))
  4.             //设置当前线程为拥有互斥量的线程
  5.         setExclusiveOwnerThread(Thread.currentThread());
  6.     else
  7.             //获取失败则调用AQS的acquire方法
  8.         acquire(1);
  9. }
复制代码
由于NonfairSync重写了AQS的tryAcquire方法,因此这里会调用NonfairSync的tryAcquire
其他的逻辑同5.AQS.md,下面只是简要说一下主要逻辑
4.2.2.3. 通过非公平锁尝试加锁


  • NonfairSync.tryAcquire
  1. public final void acquire(int arg) {
  2.         //调用NonFairSync的tryAcquire获取锁
  3.     if (!tryAcquire(arg) &&
  4.             //获取锁失败加入AQS队列。并且死循环阻塞当前线程,等待唤醒继续获取锁
  5.         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  6.             //恢复中断标记
  7.         selfInterrupt();
  8. }
复制代码
4.2.2.3.1. 非公平锁尝试加锁的操作【不管是否队头都可以抢占锁--非公平锁】


  • NonfairSync.nonfairTryAcquire
  1. protected final boolean tryAcquire(int acquires) {
  2.     //调用NonfairSync.nonfairTryAcquire
  3.     return nonfairTryAcquire(acquires);
  4. }
复制代码
4.2.2.4. 尝试加锁失败,加入阻塞队列


  • AQS.addWaiter
  1. final boolean nonfairTryAcquire(int acquires) {
  2.     final Thread current = Thread.currentThread();
  3.     int c = getState();
  4.     //锁尚未被获取
  5.     if (c == 0) {
  6.             //不管前面是否有人等待,直接尝试获取锁(非公平锁)
  7.         if (compareAndSetState(0, acquires)) {
  8.             setExclusiveOwnerThread(current);
  9.             return true;
  10.         }
  11.     }
  12.     //锁已被获取且时当前线程,重入
  13.     else if (current == getExclusiveOwnerThread()) {
  14.         int nextc = c + acquires;
  15.         if (nextc < 0) // overflow
  16.             throw new Error("Maximum lock count exceeded");
  17.         setState(nextc);
  18.         return true;
  19.     }
  20.     return false;
  21. }
复制代码
4.2.2.4.1. 加入队列的操作


  • AQS.enq
  1. private Node addWaiter(Node mode) {
  2.         //用当前线程、EXCLUSIVE模式构造节点
  3.     Node node = new Node(Thread.currentThread(), mode);
  4.     // 队列不为空
  5.     Node pred = tail;
  6.     if (pred != null) {
  7.             //插入到队尾
  8.         node.prev = pred;
  9.         if (compareAndSetTail(pred, node)) {
  10.             pred.next = node;
  11.             return node;
  12.         }
  13.     }
  14.     //队列为空或者插入到队尾失败
  15.     enq(node);
  16.     return node;
  17. }
复制代码

  • acquireQueued
  1. private Node enq(final Node node) {
  2.         //死循环直到入队成功
  3.     for (;;) {
  4.         Node t = tail;
  5.             //队列为空,那么初始化头节点。注意是new Node而不是当前node(即队头是个占位符)
  6.         if (t == null) {
  7.             if (compareAndSetHead(new Node()))
  8.                 tail = head;
  9.                 //队列不为空,插入到队尾
  10.         } else {
  11.             node.prev = t;
  12.             if (compareAndSetTail(t, node)) {
  13.                 t.next = node;
  14.                 return t;
  15.             }
  16.         }
  17.     }
  18. }
复制代码

  • shouldParkAfterFailedAcquire
  1. final boolean acquireQueued(final Node node, int arg) {
  2.     boolean failed = true;
  3.     try {
  4.         boolean interrupted = false;
  5.             //死循环直到获取锁成功
  6.         for (;;) {
  7.                 //逻辑1.
  8.                     //当前节点的前一个节点时头节点的时候(公平锁:即我的前面没有人等待获取锁),尝试获取锁
  9.             final Node p = node.predecessor();
  10.             if (p == head && tryAcquire(arg)) {
  11.                     //获取锁成功后设置头节点为当前节点
  12.                 setHead(node);
  13.                 p.next = null; // help GC
  14.                 failed = false;
  15.                 return interrupted;
  16.             }
  17.                 //逻辑2.
  18.             //当前节点的前一个节点状态时SIGNAL(承诺唤醒当前节点)的时候,阻塞当前线程。
  19.             //什么时候唤醒?释放锁的时候
  20.             //唤醒之后干什么?继续死循环执行上面的逻辑1
  21.             if (shouldParkAfterFailedAcquire(p, node) &&
  22.                 parkAndCheckInterrupt())
  23.                 interrupted = true;
  24.         }
  25.     //如果发生了异常,那么执行下面的逻辑
  26.     } finally {
  27.             //除了获取锁成功的情况都会执行cancelAcquire方法
  28.         if (failed)
  29.             cancelAcquire(node);
  30.     }
  31. }
复制代码

  • parkAndCheckInterrupt
  1. //根据(前一个节点,当前节点)->是否阻塞当前线程
  2. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  3.     int ws = pred.waitStatus;
  4.         //前一个节点的状态时SIGNAL,即释放锁后承诺唤醒当前节点,那么返回true可以阻塞当前线程
  5.     if (ws == Node.SIGNAL)
  6.         return true;
  7.     //前一个节点状态>0,即CANCEL。
  8.     //那么往前遍历找到没有取消的前置节点。同时从链表中移除CANCEL状态的节点
  9.     if (ws > 0) {
  10.         do {
  11.             node.prev = pred = pred.prev;
  12.         } while (pred.waitStatus > 0);
  13.         pred.next = node;
  14.         // 前置节点状态>=0,即0或者propagate。
  15.         //这里通过CAS把前置节点状态改成signal成功获取锁,失败的话再阻塞。why?
  16.     } else {
  17.         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  18.     }
  19.     return false;
  20. }
复制代码
4.2.3. 解锁
  1. private final boolean parkAndCheckInterrupt() {
  2.         //使用Unsafe阻塞当前线程,这里会清除线程中断的标记,因此需要返回中断的标记
  3.     LockSupport.park(this);
  4.     return Thread.interrupted();
  5. }
复制代码
4.2.3.1. 使用AQS释放锁


  • release
  1. public void unlock() {
  2.                 //简单得调用AQS的release方法
  3.         sync.release(1);
  4.     }
复制代码
4.2.3.2. 尝试释放锁


  • Sync.tryRelease
  1. public final boolean release(int arg) {
  2.         //调用Sync释放锁成功
  3.     if (tryRelease(arg)) {
  4.         Node h = head;
  5.                 //队头不为空且状态正常,那么唤醒头节点
  6.         if (h != null && h.waitStatus != 0)
  7.             unparkSuccessor(h);
  8.         return true;
  9.     }
  10.     return false;
  11. }
复制代码
4.2.3.2.1. 释放锁成功后唤醒阻塞队列中的后续节点


  • unparkSuccessor
[code]private void unparkSuccessor(Node node) {        int ws = node.waitStatus;        //当前节点的状态0(即是取消状态)        Node s = node.next;        if (s == null || s.waitStatus > 0) {            s = null;            //那么从队尾开始往前遍历找到离当前节点最近的下一个状态

相关推荐

您需要登录后才可以回帖 登录 | 立即注册