目录
- 1. 是什么
- 2. 如何使用
- 2.1. CountDownLatch VS CyclicBarrier
- 3. uml
- 4. 构造方法
- 5. countDown方法
- 5.1. 使用AQS释放锁
- 5.1.1. 尝试释放锁
- 5.1.2. 所有锁释放成功后,移除AQS队列中SIGNAL的节点,并一个个唤醒
- 6. await方法
- 6.1. 使用AQS加锁
- 6.1.1. 判断是否需要加锁
- 6.1.2. 需要加锁,那么加入AQS队列阻塞等待其他线程执行完
- 6.1.2.1. 构造节点加入AQS队列
- 6.1.2.2. 判断是否需要阻塞
- 6.1.2.3. 真正阻塞
- 6.1.3. 不需要加锁
- 7. 总结
- 8. 参考
1. 是什么
不能重复使用的计数器。让一个线程等待其他线程完事再往下执行,类似于Thread.join()
底层使用AQS实现
2. 如何使用
- public class CountDownLatchTest
- {
- public static void main(String[] args) throws InterruptedException
- {
- CountDownLatch latch = new CountDownLatch(10);
- for (int i = 0; i < 10; i++)
- {
- int finalI = i;
- new Thread(() -> {
- try
- {
- if (finalI == 5)
- {
- TimeUnit.SECONDS.sleep(10L);
- }
- System.out.println(String.format("线程%s,时间【%s】 countdown", Thread.currentThread().getName(),LocalDateTime.now()));
- latch.countDown();
- System.out.println(String.format("线程%s,时间【%s】 执行完毕", Thread.currentThread().getName(),LocalDateTime.now()));
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
-
- }).start();
- }
-
-
- latch.await();
- System.out.println(Thread.currentThread().getName() + "开始执行");
- }
- }
复制代码
- 注意
这里countdown的线程不会互相等待,谁先执行完谁就先退出
2.1. CountDownLatch VS CyclicBarrier
CountDownLatchCyclicBarrier使用场景一个线程等待其他线程执行完毕,再往下执行所有线程相互等待直到最后一个线程到达,再往下执行能否重复使用不可以可以底层实现AQSLock+Condition3. uml
4. 构造方法
- public class CountDownLatch {
- //继承了AQS
- private final Sync sync;
- public CountDownLatch(int count) {
- if (count < 0) throw new IllegalArgumentException("count < 0");
- //默认就设置了count个信号量(即相当于一开始就加锁了count次)
- this.sync = new Sync(count);
- }
- }
复制代码 4.1. Sync【AQS子类】
- private static final class Sync extends AbstractQueuedSynchronizer {
- Sync(int count) {
- setState(count);
- }
- int getCount() {
- return getState();
- }
- //重写的是AQS共享获取锁的方法
- protected int tryAcquireShared(int acquires) {
- return (getState() == 0) ? 1 : -1;
- }
- //重写的是AQS共享释放锁的方法
- protected boolean tryReleaseShared(int releases) {
- // Decrement count; signal when transition to zero
- for (;;) {
- int c = getState();
- if (c == 0)
- return false;
- int nextc = c-1;
- if (compareAndSetState(c, nextc))
- return nextc == 0;
- }
- }
- }
复制代码 5. countDown方法
- public void countDown() {
- //调用了AQS的releaseShared方法
- sync.releaseShared(1);
- }
复制代码 5.1. 使用AQS释放锁
- public final boolean releaseShared(int arg) {
- //调用Sync重写的tryReleaseShared释放信号量
- if (tryReleaseShared(arg)) {
- //释放锁成功后调用Sync的doReleaseShared方法
- doReleaseShared();
- return true;
- }
- return false;
- }
复制代码
- 3行:调用AQS的tryReleaseShared方法释放锁,由于Sync重写了这个方法,所以调用的是Sync重写的tryReleaseShared释放锁。当锁的数量减为0返回ture,表明所有线程都准备就绪
- 5行:使用tryReleaseShared释放锁成功后调用Sync的doReleaseShared方法。移除AQS队列中SIGNAL的节点并一个个唤醒
下面具体说明:
5.1.1. 尝试释放锁
- protected boolean tryReleaseShared(int releases) {
- // Decrement count; signal when transition to zero
- //不断尝试
- for (;;) {
- //信号量为0,表明还没有人加锁,自然没法解锁,返回失败
- int c = getState();
- if (c == 0)
- return false;
- //CAS设置信号量-1。
- int nextc = c-1;
- if (compareAndSetState(c, nextc))
- //看是否为0,是则返回成功
- return nextc == 0;
- }
- }
复制代码 5.1.2. 所有锁释放成功后,移除AQS队列中SIGNAL的节点,并一个个唤醒
- private void doReleaseShared() {
- //不断尝试
- for (;;) {
-
- Node h = head;
- //AQS队列不为空,把队列中SIGNAL的节点移除
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- //头节点状态为SIGNAL
- if (ws == Node.SIGNAL) {
- //在头节点状态为signal的情况设置为0,失败了继续直到成功
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- continue; // loop to recheck cases
- //把头节点从AQS队列中移除
- unparkSuccessor(h);
- }
- //头节点状态为0,那么设置为PROPAGATE,失败了继续直到成功
- else if (ws == 0 &&
- !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- continue; // loop on failed CAS
- }
- //队列中没有SIGNAL的节点
- if (h == head) // loop if head changed
- break;
- }
- }
复制代码 5.1.2.1. 把头节点从AQS队列中移除
- private void unparkSuccessor(Node node) {
- int ws = node.waitStatus;
- //当前节点的状态<0,则把状态改为0
- //0是空的状态,因为node这个节点的线程释放了锁后续不需要做任何
- if (ws < 0)
- compareAndSetWaitStatus(node, ws, 0);
-
- //当前节点的下一个节点为空或者状态>0(即是取消状态)
- Node s = node.next;
- if (s == null || s.waitStatus > 0) {
- s = null;
- //那么从队尾开始往前遍历找到离当前节点最近的下一个状态<=0的节点(即非取消状态)
- for (Node t = tail; t != null && t != node; t = t.prev)
- if (t.waitStatus <= 0)
- s = t;
- }
- //唤醒下一个节点
- if (s != null)
- LockSupport.unpark(s.thread);
- }
复制代码 6.1.2.3. 真正阻塞
- public void await() throws InterruptedException {
- //调用AQS的acquireSharedInterruptibly方法加锁
- sync.acquireSharedInterruptibly(1);
- }
复制代码 6.1.3. 不需要加锁
当state=0说明所有信号量已被释放完,那么直接返回,执行业务逻辑
7. 总结
- 让一个线程等待其他线程完事再往下执行,类似于Thread.join()
- 主线程创建CountDownLatch的时候初始化了信号量,相当于一开始就有N个人加锁。
- 主线程调用await的时候检查信号量是否为0,不为0说明其他线程没有执行完,那么加入AQS队列阻塞,等待唤醒
- 其他线程调用countDown的时候会使信号量-1,最后一个线程减为0的时候会唤醒AQS队列中的所有节点(主线程),让其继续往下执行
- 主线程被唤醒继续往下执行
8. 参考
- 【死磕 Java 并发】—– J.U.C 之并发工具类:CountDownLatch | 芋道源码 —— 纯源码解析博客
- CountDownLatch源码解析 | 并发编程网 – ifeve.com
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |