挺喽蹂 发表于 2025-6-25 12:52:29

Java源码分析系列笔记-9.CountDownLatch

目录

[*]1. 是什么
[*]2. 如何使用

[*]2.1. CountDownLatch VS CyclicBarrier

[*]3. uml
[*]4. 构造方法

[*]4.1. Sync【AQS子类】

[*]5. countDown方法

[*]5.1. 使用AQS释放锁

[*]5.1.1. 尝试释放锁
[*]5.1.2. 所有锁释放成功后,移除AQS队列中SIGNAL的节点,并一个个唤醒

[*]5.1.2.1. 把头节点从AQS队列中移除



[*]6. await方法

[*]6.1. 使用AQS加锁

[*]6.1.1. 判断是否需要加锁
[*]6.1.2. 需要加锁,那么加入AQS队列阻塞等待其他线程执行完

[*]6.1.2.1. 构造节点加入AQS队列
[*]6.1.2.2. 判断是否需要阻塞
[*]6.1.2.3. 真正阻塞

[*]6.1.3. 不需要加锁


[*]7. 总结
[*]8. 参考

1. 是什么

不能重复使用的计数器。让一个线程等待其他线程完事再往下执行,类似于Thread.join()
底层使用AQS实现
2. 如何使用

public class CountDownLatchTest
{
    public static void main(String[] args) throws InterruptedException
    {
      CountDownLatch latch = new CountDownLatch(10);
      for (int i = 0; i < 10; i++)
      {
            int finalI = i;
            new Thread(() -> {
                try
                {
                  if (finalI == 5)
                  {
                        TimeUnit.SECONDS.sleep(10L);
                  }
                  System.out.println(String.format("线程%s,时间【%s】 countdown", Thread.currentThread().getName(),LocalDateTime.now()));

                  latch.countDown();
                  System.out.println(String.format("线程%s,时间【%s】 执行完毕", Thread.currentThread().getName(),LocalDateTime.now()));

                }
                catch (InterruptedException e)
                {
                  e.printStackTrace();
                }
               
            }).start();
      }
      
      
      latch.await();
      System.out.println(Thread.currentThread().getName() + "开始执行");
    }
}

[*]注意
这里countdown的线程不会互相等待,谁先执行完谁就先退出
2.1. CountDownLatch VS CyclicBarrier

CountDownLatchCyclicBarrier使用场景一个线程等待其他线程执行完毕,再往下执行所有线程相互等待直到最后一个线程到达,再往下执行能否重复使用不可以可以底层实现AQSLock+Condition3. uml


4. 构造方法

public class CountDownLatch {

        //继承了AQS
        private final Sync sync;

        public CountDownLatch(int count) {
          if (count < 0) throw new IllegalArgumentException("count < 0");
          //默认就设置了count个信号量(即相当于一开始就加锁了count次)
          this.sync = new Sync(count);
        }       

}4.1. Sync【AQS子类】

private static final class Sync extends AbstractQueuedSynchronizer {

    Sync(int count) {
      setState(count);
    }

    int getCount() {
      return getState();
    }

        //重写的是AQS共享获取锁的方法
    protected int tryAcquireShared(int acquires) {
      return (getState() == 0) ? 1 : -1;
    }

        //重写的是AQS共享释放锁的方法
    protected boolean tryReleaseShared(int releases) {
      // Decrement count; signal when transition to zero
      for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
      }
    }
}5. countDown方法

public void countDown() {
        //调用了AQS的releaseShared方法
    sync.releaseShared(1);
}5.1. 使用AQS释放锁


[*]AQS releaseShared
public final boolean releaseShared(int arg) {
        //调用Sync重写的tryReleaseShared释放信号量
    if (tryReleaseShared(arg)) {
            //释放锁成功后调用Sync的doReleaseShared方法
      doReleaseShared();
      return true;
    }
    return false;
}

[*]3行:调用AQS的tryReleaseShared方法释放锁,由于Sync重写了这个方法,所以调用的是Sync重写的tryReleaseShared释放锁。当锁的数量减为0返回ture,表明所有线程都准备就绪
[*]5行:使用tryReleaseShared释放锁成功后调用Sync的doReleaseShared方法。移除AQS队列中SIGNAL的节点并一个个唤醒
下面具体说明:
5.1.1. 尝试释放锁


[*]Sync.tryReleaseShared
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    //不断尝试
    for (;;) {
            //信号量为0,表明还没有人加锁,自然没法解锁,返回失败
      int c = getState();
      if (c == 0)
            return false;
      //CAS设置信号量-1。
      int nextc = c-1;
      if (compareAndSetState(c, nextc))
              //看是否为0,是则返回成功
            return nextc == 0;
    }
}5.1.2. 所有锁释放成功后,移除AQS队列中SIGNAL的节点,并一个个唤醒


[*]doReleaseShared
private void doReleaseShared() {
    //不断尝试
    for (;;) {
           
      Node h = head;
      //AQS队列不为空,把队列中SIGNAL的节点移除
      if (h != null && h != tail) {
            int ws = h.waitStatus;
            //头节点状态为SIGNAL
            if (ws == Node.SIGNAL) {
                    //在头节点状态为signal的情况设置为0,失败了继续直到成功
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                  continue;            // loop to recheck cases
                //把头节点从AQS队列中移除
                unparkSuccessor(h);
            }
            //头节点状态为0,那么设置为PROPAGATE,失败了继续直到成功
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
      }

      //队列中没有SIGNAL的节点
      if (h == head)                   // loop if head changed
            break;
    }
}5.1.2.1. 把头节点从AQS队列中移除


[*]unparkSuccessor
private void unparkSuccessor(Node node) {
      int ws = node.waitStatus;
      //当前节点的状态<0,则把状态改为0
      //0是空的状态,因为node这个节点的线程释放了锁后续不需要做任何
      if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);


         //当前节点的下一个节点为空或者状态>0(即是取消状态)
      Node s = node.next;
      if (s == null || s.waitStatus > 0) {
            s = null;
            //那么从队尾开始往前遍历找到离当前节点最近的下一个状态<=0的节点(即非取消状态)
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                  s = t;
      }
            //唤醒下一个节点
      if (s != null)
            LockSupport.unpark(s.thread);
    }6.1.2.3. 真正阻塞


[*]parkAndCheckInterrupt
public void await() throws InterruptedException {
        //调用AQS的acquireSharedInterruptibly方法加锁
    sync.acquireSharedInterruptibly(1);
}6.1.3. 不需要加锁

当state=0说明所有信号量已被释放完,那么直接返回,执行业务逻辑
7. 总结


[*]让一个线程等待其他线程完事再往下执行,类似于Thread.join()
[*]主线程创建CountDownLatch的时候初始化了信号量,相当于一开始就有N个人加锁。
[*]主线程调用await的时候检查信号量是否为0,不为0说明其他线程没有执行完,那么加入AQS队列阻塞,等待唤醒
[*]其他线程调用countDown的时候会使信号量-1,最后一个线程减为0的时候会唤醒AQS队列中的所有节点(主线程),让其继续往下执行
[*]主线程被唤醒继续往下执行
8. 参考


[*]【死磕 Java 并发】—– J.U.C 之并发工具类:CountDownLatch | 芋道源码 —— 纯源码解析博客
[*]CountDownLatch源码解析 | 并发编程网 – ifeve.com

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Java源码分析系列笔记-9.CountDownLatch