找回密码
 立即注册
首页 业界区 业界 Java源码分析系列笔记-9.CountDownLatch

Java源码分析系列笔记-9.CountDownLatch

挺喽蹂 2025-6-25 12:52:29
目录

  • 1. 是什么
  • 2. 如何使用

    • 2.1. CountDownLatch VS CyclicBarrier

  • 3. uml
  • 4. 构造方法

    • 4.1. Sync【AQS子类】

  • 5. countDown方法

    • 5.1. 使用AQS释放锁

      • 5.1.1. 尝试释放锁
      • 5.1.2. 所有锁释放成功后,移除AQS队列中SIGNAL的节点,并一个个唤醒

        • 5.1.2.1. 把头节点从AQS队列中移除



  • 6. await方法

    • 6.1. 使用AQS加锁

      • 6.1.1. 判断是否需要加锁
      • 6.1.2. 需要加锁,那么加入AQS队列阻塞等待其他线程执行完

        • 6.1.2.1. 构造节点加入AQS队列
        • 6.1.2.2. 判断是否需要阻塞
        • 6.1.2.3. 真正阻塞

      • 6.1.3. 不需要加锁


  • 7. 总结
  • 8. 参考

1. 是什么

不能重复使用的计数器。让一个线程等待其他线程完事再往下执行,类似于Thread.join()
底层使用AQS实现
2. 如何使用
  1. public class CountDownLatchTest
  2. {
  3.     public static void main(String[] args) throws InterruptedException
  4.     {
  5.         CountDownLatch latch = new CountDownLatch(10);
  6.         for (int i = 0; i < 10; i++)
  7.         {
  8.             int finalI = i;
  9.             new Thread(() -> {
  10.                 try
  11.                 {
  12.                     if (finalI == 5)
  13.                     {
  14.                         TimeUnit.SECONDS.sleep(10L);
  15.                     }
  16.                     System.out.println(String.format("线程%s,时间【%s】 countdown", Thread.currentThread().getName(),LocalDateTime.now()));
  17.                     latch.countDown();
  18.                     System.out.println(String.format("线程%s,时间【%s】 执行完毕", Thread.currentThread().getName(),LocalDateTime.now()));
  19.                 }
  20.                 catch (InterruptedException e)
  21.                 {
  22.                     e.printStackTrace();
  23.                 }
  24.                
  25.             }).start();
  26.         }
  27.         
  28.         
  29.         latch.await();
  30.         System.out.println(Thread.currentThread().getName() + "开始执行");
  31.     }
  32. }
复制代码

  • 注意
    这里countdown的线程不会互相等待,谁先执行完谁就先退出
2.1. CountDownLatch VS CyclicBarrier

CountDownLatchCyclicBarrier使用场景一个线程等待其他线程执行完毕,再往下执行所有线程相互等待直到最后一个线程到达,再往下执行能否重复使用不可以可以底层实现AQSLock+Condition3. uml

1.png

4. 构造方法
  1. public class CountDownLatch {
  2.         //继承了AQS
  3.         private final Sync sync;
  4.         public CountDownLatch(int count) {
  5.             if (count < 0) throw new IllegalArgumentException("count < 0");
  6.             //默认就设置了count个信号量(即相当于一开始就加锁了count次)
  7.             this.sync = new Sync(count);
  8.         }       
  9. }
复制代码
4.1. Sync【AQS子类】
  1. private static final class Sync extends AbstractQueuedSynchronizer {
  2.     Sync(int count) {
  3.         setState(count);
  4.     }
  5.     int getCount() {
  6.         return getState();
  7.     }
  8.         //重写的是AQS共享获取锁的方法
  9.     protected int tryAcquireShared(int acquires) {
  10.         return (getState() == 0) ? 1 : -1;
  11.     }
  12.         //重写的是AQS共享释放锁的方法
  13.     protected boolean tryReleaseShared(int releases) {
  14.         // Decrement count; signal when transition to zero
  15.         for (;;) {
  16.             int c = getState();
  17.             if (c == 0)
  18.                 return false;
  19.             int nextc = c-1;
  20.             if (compareAndSetState(c, nextc))
  21.                 return nextc == 0;
  22.         }
  23.     }
  24. }
复制代码
5. countDown方法
  1. public void countDown() {
  2.         //调用了AQS的releaseShared方法
  3.     sync.releaseShared(1);
  4. }
复制代码
5.1. 使用AQS释放锁


  • AQS releaseShared
  1. public final boolean releaseShared(int arg) {
  2.         //调用Sync重写的tryReleaseShared释放信号量
  3.     if (tryReleaseShared(arg)) {
  4.             //释放锁成功后调用Sync的doReleaseShared方法
  5.         doReleaseShared();
  6.         return true;
  7.     }
  8.     return false;
  9. }
复制代码

  • 3行:调用AQS的tryReleaseShared方法释放锁,由于Sync重写了这个方法,所以调用的是Sync重写的tryReleaseShared释放锁。当锁的数量减为0返回ture,表明所有线程都准备就绪
  • 5行:使用tryReleaseShared释放锁成功后调用Sync的doReleaseShared方法。移除AQS队列中SIGNAL的节点并一个个唤醒
下面具体说明:
5.1.1. 尝试释放锁


  • Sync.tryReleaseShared
  1. protected boolean tryReleaseShared(int releases) {
  2.     // Decrement count; signal when transition to zero
  3.     //不断尝试
  4.     for (;;) {
  5.             //信号量为0,表明还没有人加锁,自然没法解锁,返回失败
  6.         int c = getState();
  7.         if (c == 0)
  8.             return false;
  9.         //CAS设置信号量-1。
  10.         int nextc = c-1;
  11.         if (compareAndSetState(c, nextc))
  12.                 //看是否为0,是则返回成功
  13.             return nextc == 0;
  14.     }
  15. }
复制代码
5.1.2. 所有锁释放成功后,移除AQS队列中SIGNAL的节点,并一个个唤醒


  • doReleaseShared
  1. private void doReleaseShared() {
  2.     //不断尝试
  3.     for (;;) {
  4.            
  5.         Node h = head;
  6.         //AQS队列不为空,把队列中SIGNAL的节点移除
  7.         if (h != null && h != tail) {
  8.             int ws = h.waitStatus;
  9.             //头节点状态为SIGNAL
  10.             if (ws == Node.SIGNAL) {
  11.                     //在头节点状态为signal的情况设置为0,失败了继续直到成功
  12.                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  13.                     continue;            // loop to recheck cases
  14.                 //把头节点从AQS队列中移除
  15.                 unparkSuccessor(h);
  16.             }
  17.             //头节点状态为0,那么设置为PROPAGATE,失败了继续直到成功
  18.             else if (ws == 0 &&
  19.                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  20.                 continue;                // loop on failed CAS
  21.         }
  22.         //队列中没有SIGNAL的节点
  23.         if (h == head)                   // loop if head changed
  24.             break;
  25.     }
  26. }
复制代码
5.1.2.1. 把头节点从AQS队列中移除


  • unparkSuccessor
  1. private void unparkSuccessor(Node node) {
  2.         int ws = node.waitStatus;
  3.         //当前节点的状态<0,则把状态改为0
  4.         //0是空的状态,因为node这个节点的线程释放了锁后续不需要做任何
  5.         if (ws < 0)
  6.             compareAndSetWaitStatus(node, ws, 0);
  7.   
  8.          //当前节点的下一个节点为空或者状态>0(即是取消状态)
  9.         Node s = node.next;
  10.         if (s == null || s.waitStatus > 0) {
  11.             s = null;
  12.             //那么从队尾开始往前遍历找到离当前节点最近的下一个状态<=0的节点(即非取消状态)
  13.             for (Node t = tail; t != null && t != node; t = t.prev)
  14.                 if (t.waitStatus <= 0)
  15.                     s = t;
  16.         }
  17.             //唤醒下一个节点
  18.         if (s != null)
  19.             LockSupport.unpark(s.thread);
  20.     }
复制代码
6.1.2.3. 真正阻塞


  • parkAndCheckInterrupt
  1. public void await() throws InterruptedException {
  2.         //调用AQS的acquireSharedInterruptibly方法加锁
  3.     sync.acquireSharedInterruptibly(1);
  4. }
复制代码
6.1.3. 不需要加锁

当state=0说明所有信号量已被释放完,那么直接返回,执行业务逻辑
7. 总结


  • 让一个线程等待其他线程完事再往下执行,类似于Thread.join()
  • 主线程创建CountDownLatch的时候初始化了信号量,相当于一开始就有N个人加锁。
  • 主线程调用await的时候检查信号量是否为0,不为0说明其他线程没有执行完,那么加入AQS队列阻塞,等待唤醒
  • 其他线程调用countDown的时候会使信号量-1,最后一个线程减为0的时候会唤醒AQS队列中的所有节点(主线程),让其继续往下执行
  • 主线程被唤醒继续往下执行
8. 参考


  • 【死磕 Java 并发】—– J.U.C 之并发工具类:CountDownLatch | 芋道源码 —— 纯源码解析博客
  • CountDownLatch源码解析 | 并发编程网 – ifeve.com

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册