找回密码
 立即注册
首页 业界区 业界 Java源码分析系列笔记-7.Lock之Condition

Java源码分析系列笔记-7.Lock之Condition

渭茱瀑 2025-9-24 15:46:20
目录

  • 1. 是什么
  • 2. 如何使用
  • 3. 实现原理

    • 3.1. uml
    • 3.2. 创建Condition对象

      • 3.2.1. 创建AQS.ConditionObject对象

        • 3.2.1.1. ConditionObject内部也有一个双向队列


    • 3.3. await方法【阻塞等待】

      • 3.3.1. 加入condition队列尾部
      • 3.3.2. 调用AQS解锁,释放互斥量

        • 3.3.2.1. 尝试释放互斥量

      • 3.3.3. 检测是否在AQS队列,不在则需要阻塞
      • 3.3.4. 当前节点已经在AQS队列中了,获取锁

        • 3.3.4.1. 判断是否需要阻塞

          • 3.3.4.1.1. 阻塞当前线程



    • 3.4. signalAll方法【唤醒所有阻塞等待的节点】

      • 3.4.1. 把condition队列的所有节点转移到AQS队列中

        • 3.4.1.1. 每转移一个condition队列中的节点到aqs队列中,就唤醒一个

          • 3.4.1.1.1. 如何转移的



    • 3.5. signal方法【只唤醒头部阻塞等待的节点】

      • 3.5.1. 唤醒头节点


  • 4. 为什么await需要先释放锁,而signal不需要
  • 5. 总结
  • 6. 参考链接

1. 是什么

类似object的wait和notify方法配合synchronized使用
condition的await和notify方法配合Lock使用,用来实现条件等待与唤醒
2. 如何使用


  • 生产者消费者模式
  1. public class ConditionTest
  2. {
  3.     private Lock lock;//一个锁说明读写互斥
  4.     private int capacity;
  5.     private List<Object> items;
  6.     private Condition notFull;//用来唤醒写线程
  7.     private Condition notEmpty;//用来唤醒读线程
  8.     public ConditionTest(int capacity)
  9.     {
  10.         this.capacity = capacity;
  11.         this.items = new ArrayList<>();
  12.         this.lock = new ReentrantLock();
  13.         this.notFull = lock.newCondition();
  14.         this.notEmpty = lock.newCondition();
  15.     }
  16.     public void add(Object data) throws InterruptedException
  17.     {
  18.         try
  19.         {
  20.             lock.lock();
  21.             //新增的时候如果已经满了,那么等待 非满信号 唤醒
  22.             while (this.items.size() == capacity)
  23.             {
  24.                 this.notFull.await();
  25.             }
  26.             //增加了一个元素,那么 唤醒非空
  27.             this.items.add(data);
  28.             this.notEmpty.signalAll();
  29.         }
  30.         finally
  31.         {
  32.             lock.unlock();
  33.         }
  34.     }
  35.     public Object remove() throws InterruptedException
  36.     {
  37.         try
  38.         {
  39.             lock.lock();
  40.             //删除的时候已经空了,那么等待 非空信号 唤醒
  41.             while (this.items.size() == 0)
  42.             {
  43.                 this.notEmpty.await();
  44.             }
  45.             //删除了一个元素,那么 唤醒非满
  46.             Object data = this.items.remove(0);
  47.             this.notFull.signalAll();
  48.             return data;
  49.         }
  50.         finally
  51.         {
  52.             lock.unlock();
  53.         }
  54.     }
  55.     public static void main(String[] args)
  56.     {
  57.         ConditionTest conditionTest = new ConditionTest(5);
  58.         new Thread(() -> {
  59.             for (int i = 0; i < 1000; i++)
  60.             {
  61.                 try
  62.                 {
  63.                     conditionTest.add(i);
  64.                     System.out.println(String.format("生产者放入%d", i));
  65.                 }
  66.                 catch (InterruptedException e)
  67.                 {
  68.                     e.printStackTrace();
  69.                 }
  70.             }
  71.         }).start();
  72.         new Thread(() -> {
  73.             try
  74.             {
  75.                 while (true)
  76.                 {
  77.                     Object data = conditionTest.remove();
  78.                     System.out.println(String.format("消费者消费%d", data));
  79.                 }
  80.             }
  81.             catch (InterruptedException e)
  82.             {
  83.                 e.printStackTrace();
  84.             }
  85.         }).start();
  86.     }
  87. }
复制代码
3. 实现原理

3.1. uml

1.png

3.2. 创建Condition对象


  • newCondition方法
  1. public Condition newCondition() {
  2.         //调用Sync的newCondition方法
  3.     return sync.newCondition();
  4. }
复制代码
3.2.1. 创建AQS.ConditionObject对象


  • Sync newConditoin方法
  1. final ConditionObject newCondition() {
  2.         //AQS的ConditionObject
  3.     return new ConditionObject();
  4. }
复制代码
3.2.1.1. ConditionObject内部也有一个双向队列
  1. public class ConditionObject implements Condition, java.io.Serializable {
  2.         //condition队列也是一个双向队列
  3.     private transient Node firstWaiter;
  4.     private transient Node lastWaiter;
  5.     public ConditionObject() { }
  6. }
复制代码
结构如下图:
2.png

没错,Condition队列和AQS就是两个不同队列,Condition的操作就是在这两个队列中来回移动
3.3. await方法【阻塞等待】
  1. public final void await() throws InterruptedException {
  2.     if (Thread.interrupted())
  3.         throw new InterruptedException();
  4.     //加入condition队列尾部
  5.     Node node = addConditionWaiter();
  6.     //调用AQS解锁,释放互斥量(执行await肯定是在获取了锁后的)
  7.     int savedState = fullyRelease(node);
  8.     int interruptMode = 0;
  9.     //调用AQS死循环检测是否在AQS队列中,不在的话阻塞当前线程。
  10.     //什么时候加入AQS队列呢?signal的时候
  11.     while (!isOnSyncQueue(node)) {
  12.         LockSupport.park(this);
  13.         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  14.             break;
  15.     }
  16.     //获已经在AQS队列中了,获取锁
  17.     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  18.         interruptMode = REINTERRUPT;
  19.     //如果node不是condition队列的尾节点
  20.     if (node.nextWaiter != null) // clean up if cancelled
  21.         //那么遍历删除conditoin队列中所有cancel节点
  22.         unlinkCancelledWaiters();
  23.     if (interruptMode != 0)
  24.         reportInterruptAfterWait(interruptMode);
  25. }
复制代码

  • 5行:加入condition队列尾部
  • 7行:调用AQS解锁,释放互斥量(由此可知执行await肯定是在获取了锁后的)
  • 11-15行:不停地检查是否在AQS阻塞队列中,不在的话阻塞当前线程。等待唤醒继续检查
  • 17-22行:到达这里的时候说明已经在AQS队列中了,并且已被唤醒,那么我就要去抢占锁了。如果抢占失败继续回到11-15行
下面对这几个步骤作详细说明
3.3.1. 加入condition队列尾部


  • addConditionWaiter
  1. private Node addConditionWaiter() {
  2.         //队尾
  3.     Node t = lastWaiter;
  4.     //队尾的状态不为CONDITION(即为CANCEL)
  5.     if (t != null && t.waitStatus != Node.CONDITION) {
  6.             //删除conditoin队列中所有cancel节点
  7.         unlinkCancelledWaiters();
  8.         //重新从尾节点开始
  9.         t = lastWaiter;
  10.     }
  11.     //构造节点(当前线程,CONDITION状态)
  12.     Node node = new Node(Thread.currentThread(), Node.CONDITION);
  13.     //把节点加入condition队列尾部
  14.     //这是队列为空的情况
  15.     if (t == null)
  16.         firstWaiter = node;
  17.     //队列不为空的情况
  18.     else
  19.         t.nextWaiter = node;
  20.     lastWaiter = node;//新的尾节点
  21.     return node;
  22. }
复制代码
上面的代码所作的就是用当前线程构造成Condition节点,加入Condition队列的尾部;
除此之外,unlinkCancelledWaiters还会从头部开始往后删除conditoin队列中所有cancel节点,如下:

  • unlinkCancelledWaiters方法
  1. private void unlinkCancelledWaiters() {
  2.         //从头节点出发
  3.     Node t = firstWaiter;
  4.     Node trail = null;
  5.     //遍历condition队列
  6.     while (t != null) {
  7.         Node next = t.nextWaiter;
  8.         //如果节点状态为CANCEL
  9.         if (t.waitStatus != Node.CONDITION) {
  10.             //那么从condition队列中删除
  11.             t.nextWaiter = null;
  12.             //头节点是CANCEL的,那就修改头节点
  13.             if (trail == null)
  14.                 firstWaiter = next;
  15.             //头节点不是CANCEL的,那就修改前一个节点的nextWaiter
  16.             else
  17.                 trail.nextWaiter = next;
  18.             if (next == null)
  19.                 lastWaiter = trail;
  20.         }
  21.         //节点状态不为CANCEL,那么跳过
  22.         else
  23.             trail = t;
  24.         //继续下一个节点
  25.         t = next;
  26.     }
  27. }
复制代码
3.3.2. 调用AQS解锁,释放互斥量


  • AQS fullyRelease方法
  1. final int fullyRelease(Node node) {
  2.     boolean failed = true;
  3.     try {
  4.             //获取当前互斥量
  5.         int savedState = getState();
  6.         //调用AQS.release释放这些互斥量
  7.         if (release(savedState)) {
  8.             //释放成功后返回释放的互斥量个数
  9.             failed = false;
  10.             return savedState;
  11.         } else {
  12.             throw new IllegalMonitorStateException();
  13.         }
  14.     } finally {
  15.             //解锁失败需要把当前节点置为CANCEL状态
  16.         if (failed)
  17.             node.waitStatus = Node.CANCELLED;
  18.     }
  19. }
复制代码

  • AQS release
  1. public final boolean release(int arg) {
  2.     //调用AQS.tryRelease释放锁
  3.     if (tryRelease(arg)) {
  4.             //释放锁成功后把AQS队列的头节点的线程唤醒
  5.         Node h = head;
  6.         if (h != null && h.waitStatus != 0)
  7.             unparkSuccessor(h);
  8.         return true;
  9.     }
  10.     return false;
  11. }
复制代码
由于ReentrantLock重写的了AQS的tryRelease,因此调用的是ReentrantLock.tryRelease,如下:
3.3.2.1. 尝试释放互斥量


  • ReentrantLock.tryRelease
  1. protected final boolean tryRelease(int releases) {
  2.         //计算释放完releases个信号量还剩多少要释放
  3.     int c = getState() - releases;
  4.         //解锁的必须和加锁同一线程
  5.     if (Thread.currentThread() != getExclusiveOwnerThread())
  6.         throw new IllegalMonitorStateException();
  7.     boolean free = false;
  8.     if (c == 0) {//剩余0个说明解锁成功
  9.         free = true;
  10.         setExclusiveOwnerThread(null);//置持有锁的线程为空
  11.     }
  12.         //设置剩余的信号量
  13.         //由于解锁的只有一个线程,所以这里不需要使用CAS操作设置state
  14.     setState(c);
  15.     return free;
  16. }
复制代码
3.3.3. 检测是否在AQS队列,不在则需要阻塞


  • isOnSyncQueue方法
  1. final boolean isOnSyncQueue(Node node) {
  2.                 //node的状态是CONDITION,说明还在condition队列中 或者 前一个节点为空
  3.         if (node.waitStatus == Node.CONDITION || node.prev == null)
  4.             return false;//返回false,表示不在AQS队列中
  5.         //next不为空(next是AQS队列专用,nextWaiter是Condition队列专用),一定在AQS队列中
  6.         if (node.next != null) // If has successor, it must be on queue
  7.             return true;//返回true,表示在AQS队列中
  8.       
  9.          //以上两种情况都不符合,那么只能到AQS队列中查找
  10.         return findNodeFromTail(node);
  11.     }
  12.    
  13. private boolean findNodeFromTail(Node node) {
  14.     Node t = tail;
  15.     //从尾开始遍历,找到node
  16.     for (;;) {
  17.         if (t == node)
  18.             return true;
  19.         if (t == null)
  20.             return false;
  21.         t = t.prev;
  22.     }
  23. }
复制代码
3.3.4. 当前节点已经在AQS队列中了,获取锁


  • acquireQueue
  1. final boolean acquireQueued(final Node node, int arg) {
  2.     boolean failed = true;
  3.     try {
  4.         boolean interrupted = false;
  5.             //死循环直到获取锁成功
  6.         for (;;) {
  7.                 //逻辑1.
  8.                     //当前节点的前一个节点时头节点的时候(公平锁:即我的前面没有人等待获取锁),尝试获取锁
  9.             final Node p = node.predecessor();
  10.             if (p == head && tryAcquire(arg)) {
  11.                     //获取锁成功后设置头节点为当前节点
  12.                 setHead(node);
  13.                 p.next = null; // help GC
  14.                 failed = false;
  15.                 return interrupted;
  16.             }
  17.                 //逻辑2.
  18.             //当前节点的前一个节点状态时SIGNAL(承诺唤醒当前节点)的时候,阻塞当前线程。
  19.             //什么时候唤醒?释放锁的时候
  20.             //唤醒之后干什么?继续死循环执行上面的逻辑1
  21.             if (shouldParkAfterFailedAcquire(p, node) &&
  22.                 parkAndCheckInterrupt())
  23.                 interrupted = true;
  24.         }
  25.     } finally {
  26.             //如果发生了异常,那么执行下面的cancelAcquire方法
  27.         if (failed)
  28.             cancelAcquire(node);
  29.     }
  30. }
复制代码
3.3.4.1. 判断是否需要阻塞


  • shouldParkAfterFailedAcquire
  1. //根据(前一个节点,当前节点)->是否阻塞当前线程
  2. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  3.     int ws = pred.waitStatus;
  4.     //前一个节点的状态时SIGNAL,即释放锁后承诺唤醒当前节点,那么返回true可以阻塞当前线程
  5.     if (ws == Node.SIGNAL)
  6.         return true;
  7.     //前一个节点状态>0,即CANCEL。
  8.     //那么往前遍历找到没有取消的前置节点。同时从链表中移除CANCEL状态的节点
  9.     if (ws > 0) {
  10.         do {
  11.             node.prev = pred = pred.prev;
  12.         } while (pred.waitStatus > 0);
  13.         pred.next = node;
  14.     // 前置节点状态>=0,即0或者propagate。
  15.     //这里通过CAS把前置节点状态改成signal成功获取锁,失败的话再阻塞。why?
  16.     } else {
  17.         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  18.     }
  19.     return false;
  20. }
复制代码
3.3.4.1.1. 阻塞当前线程


  • parkAndCheckInterrupt
  1. private final boolean parkAndCheckInterrupt() {
  2.     //使用Unsafe阻塞当前线程,这里会清除线程中断的标记,因此需要返回中断的标记
  3.     LockSupport.park(this);
  4.     return Thread.interrupted();
  5. }
复制代码
3.4. signalAll方法【唤醒所有阻塞等待的节点】


  • ConditionObject signalAll
  1. public final void signalAll() {
  2.         //如果当前线程不是持有互斥量的线程,直接抛出异常
  3.     if (!isHeldExclusively())
  4.         throw new IllegalMonitorStateException();
  5.     //Condition队列不为空
  6.     Node first = firstWaiter;
  7.     if (first != null)
  8.             //把condition队列的所有节点转移到AQS队列中并唤醒所有线程
  9.         doSignalAll(first);
  10. }
复制代码
3.4.1. 把condition队列的所有节点转移到AQS队列中


  • doSignalAll方法
  1. private void doSignalAll(Node first) {
  2.         //清空condition队列的头、尾节点
  3.     lastWaiter = firstWaiter = null;
  4.     //遍历condition队列
  5.     do {
  6.         Node next = first.nextWaiter;
  7.         first.nextWaiter = null;
  8.             //转移到AQS队列中
  9.         transferForSignal(first);
  10.         first = next;
  11.     } while (first != null);
  12. }
复制代码
3.4.1.1. 每转移一个condition队列中的节点到aqs队列中,就唤醒一个


  • tansferForSignal方法
  1. final boolean transferForSignal(Node node) {
  2.      //当前节点是CONDITION状态,CAS设置为0,如果成功继续15行
  3.     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  4.         return false;//CAS设置失败,那么返回false表示唤醒失败
  5.    //调用AQS enq,把当前节点加入AQS队列
  6.     Node p = enq(node);
  7.     int ws = p.waitStatus;
  8.     //如果该结点的状态为cancel 或者 修改waitStatus为SIGNAL失败
  9.     //没搞懂这个条件什么意思
  10.     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  11.         //唤醒当前节点的线程
  12.         LockSupport.unpark(node.thread);
  13.     return true;
  14. }
复制代码
3.4.1.1.1. 如何转移的


  • AQS.enq
  1. private Node enq(final Node node) {
  2.         //死循环直到加入队尾成功
  3.         for (;;) {
  4.                 Node t = tail;
  5.                 //队列为空初始化头节点(占位符)
  6.                 if (t == null) {
  7.                     if (compareAndSetHead(new Node()))
  8.                         tail = head;
  9.                 } else {//加入队尾
  10.                     node.prev = t;
  11.                     if (compareAndSetTail(t, node)) {
  12.                         t.next = node;
  13.                         return t;
  14.                     }
  15.                 }
  16.         }
  17. }
复制代码
3.5. signal方法【只唤醒头部阻塞等待的节点】


  • ConditionObject signal
  1. public final void signal() {
  2.         //调用ReentrantLock的方法判断当前线程是否持有锁的线程
  3.     if (!isHeldExclusively())
  4.         throw new IllegalMonitorStateException();
  5.     //condition队列不为空
  6.     Node first = firstWaiter;
  7.     if (first != null)
  8.         doSignal(first);
  9. }
复制代码
3.5.1. 唤醒头节点


  • doSignal
  1. private void doSignal(Node first) {
  2.     do {
  3.         if ( (firstWaiter = first.nextWaiter) == null)//修改头结点,完成旧头结点的移出工作
  4.             lastWaiter = null;
  5.         first.nextWaiter = null;
  6.     //将老的头结点,加入到AQS的等待队列中
  7.     //一旦成功唤醒一个,那么退出循环返回(signalAll是唤醒所有)
  8.     } while (!transferForSignal(first) &&
  9.              (first = firstWaiter) != null);
  10. }
复制代码
4. 为什么await需要先释放锁,而signal不需要

因为await可能需要阻塞,所以在阻塞前需要先释放锁。
5. 总结

condition的一系列操作其实只涉及了AQS队列和condition队列的来回移动

  • 当执行await方法时,会把当前线程加入到condition队列中,然后释放锁。接着不断检查是否在AQS队列中。
    是的话开始竞争锁,只有AQS队列中的首节点能抢占成功。否则挂起
  • 当执行signalAll方法时,会把condition队列中所有节点转移到AQS队列中,并唤醒所有线程。被唤醒的节点会退出是否在AQS队列中的检查,开始抢占锁
  • 当执行signal方法时,会把condition队列中头节点转移到AQS队列中,并唤醒该头节点的线程。被唤醒的节点会退出是否在AQS队列中的检查,开始抢占锁
6. 参考链接


  • [Java 锁]Condition 详解 - 简书
  • 并发编程之 Condition 源码分析 - 掘金

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册