目录
- 1. 是什么
- 2. 如何使用
- 2.1. 不带Runnable
- 2.2. 带Runnale
- 3. 原理分析
- 3.1. uml
- 3.2. 构造方法
- 3.2.1. 使用Lock和Condition实现
- 3.3. await方法
- 3.3.1. 首先是加锁
- 3.3.2. 然后减信号量
- 3.3.3. 最后一个到达的需要执行prepareAction、唤醒线程并换代
- 3.3.4. 最后一个线程未到达前,其他线程阻塞等待唤醒
- 3.3.5. 最后一个线程到达唤醒其他所有线程后,其他所有线程退出循环
- 4. 总结
- 5. 参考
1. 是什么
可重复使用的计数器,让一堆线程互相等待,条件满足时一起往下执行
底层使用Lock+Condition实现阻塞等待和唤醒
2. 如何使用
2.1. 不带Runnable
当所有线程都到达await点的时候才一起往下执行- public class CyclicBarrierTest
- {
- private static final int count = 20;
- private static final AtomicInteger val = new AtomicInteger();
- private static final CyclicBarrier barrier = new CyclicBarrier(count);
- private static class CalcAction implements Runnable
- {
- @Override
- public void run()
- {
- try
- {
- System.out.println("1." + Thread.currentThread().getName() + "到达await点");
- int result = val.incrementAndGet();
- if (result % 5 == 0)
- {
- System.out.println("2." + Thread.currentThread().getName() + "休眠3s");
- TimeUnit.SECONDS.sleep(3);
- }
- barrier.await();
- }
- catch (InterruptedException | BrokenBarrierException e)
- {
- e.printStackTrace();
- }
- System.out.println("3." + Thread.currentThread().getName() + "继续执行");
- }
- }
- public static void main(String[] args)
- {
- for (int i = 0; i < count; i++)
- {
- new Thread(new CalcAction()).start();
- }
- }
- }
复制代码 2.2. 带Runnale
当所有线程都到达await点的时候,最后一个到达的线程执行prepare,再一起往下执行- public class CyclicBarrierTest
- {
- private static final int count = 20;
- private static final AtomicInteger val = new AtomicInteger();
- private static final CyclicBarrier barrier = new CyclicBarrier(count, new PrepareAction());
- private static class PrepareAction implements Runnable
- {
- @Override
- public void run()
- {
- try
- {
- System.out.println("2.所有线程到达await,最后一个到达的线程" + Thread.currentThread().getName() + "先执行PrepareAction,休眠3s");
- TimeUnit.SECONDS.sleep(3);
- }
- catch (InterruptedException e)
- {
- e.printStackTrace();
- }
- System.out.println("3.执行PrepareAction完毕");
- }
- }
- private static class CalcAction implements Runnable
- {
- @Override
- public void run()
- {
- try
- {
- System.out.println("1." + Thread.currentThread().getName() + "到达await点");
- barrier.await();
- }
- catch (InterruptedException | BrokenBarrierException e)
- {
- e.printStackTrace();
- }
- int result = val.incrementAndGet();
- System.out.println("4." + Thread.currentThread().getName() + "执行计算:result: " + result);
- }
- }
- public static void main(String[] args)
- {
- for (int i = 0; i < count; i++)
- {
- new Thread(new CalcAction()).start();
- }
- }
- }
复制代码 3. 原理分析
3.1. uml
3.2. 构造方法
3.2.1. 使用Lock和Condition实现
- public class CyclicBarrier {
- //CyclicBarrier可以循环使用
- private static class Generation {
- //当前代是否损坏
- boolean broken = false;
- }
- //使用lock阻塞在await点等待
- private final ReentrantLock lock = new ReentrantLock();
- //使用lock.condition唤醒阻塞的所有线程往下执行
- private final Condition trip = lock.newCondition();
- //总共的信号量
- private final int parties;
- //最后一个到达的线程先执行barrierCommand,所有线程再一起继续往下执行
- private final Runnable barrierCommand;
- //CyclicBarrier可以重复使用,每次使用都是一个generation
- private Generation generation = new Generation();
- //剩下多少个线程没有到达await点
- private int count;
- public CyclicBarrier(int parties, Runnable barrierAction) {
- if (parties <= 0) throw new IllegalArgumentException();
- //初始化总的信号量
- this.parties = parties;
- //初始化剩余未使用的信号量=总的信号量
- //相当于一开始就加锁了parties次,那么也就需要解锁parties次
- this.count = parties;
- this.barrierCommand = barrierAction;
- }
- }
复制代码 3.3.4.1. 线程被中断,置当前代失效的操作
- public int await() throws InterruptedException, BrokenBarrierException {
- try {
- //调用dowait并且默认不设置超时
- return dowait(false, 0L);
- } catch (TimeoutException toe) {
- throw new Error(toe); // cannot happen
- }
- }
复制代码 3.3.5. 最后一个线程到达唤醒其他所有线程后,其他所有线程退出循环
- private int dowait(boolean timed, long nanos)
- throws InterruptedException, BrokenBarrierException,
- TimeoutException {
- final ReentrantLock lock = this.lock;
- //1.先获取锁才能往下执行
- lock.lock();
- try {
- //当前代,每reset一次代+1
- final Generation g = generation;
- //当前代已损坏--什么情况会导致损坏?--线程被中断,执行breakBarrier方法
- if (g.broken)
- throw new BrokenBarrierException();
- //如果线程被中断,那么置当前代失效
- if (Thread.interrupted()) {
- breakBarrier();
- throw new InterruptedException();
- }
- int index = --count;
- //剩余的信号量为0,那么可以继续往下执行了
- //即最后一个线程到达
- if (index == 0) { // tripped
- boolean ranAction = false;
- try {
- final Runnable command = barrierCommand;
- //3.最后一个到达的线程先执行Runnable
- if (command != null)
- command.run();
- ranAction = true;
- //4.唤醒所有线程继续往下执行(46行)并且换代
- nextGeneration();
- //5.返回,不往下执行死循环
- return 0;
- } finally {
- if (!ranAction)
- breakBarrier();
- }
- }
- // 死循环直到超时或者信号量都用完或者中断
- for (;;) {
- try {
- if (!timed)
- //2.未设置超时,那么调用Condition.await()方法等待唤醒
- trip.await();
- else if (nanos > 0L)
- //设置超时,那么调用Condition.await(超时)方法等待唤醒或者超时
- nanos = trip.awaitNanos(nanos);
- } catch (InterruptedException ie) {
- //发生了中断,break当前代
- if (g == generation && ! g.broken) {
- breakBarrier();
- throw ie;
- } else {
-
- Thread.currentThread().interrupt();
- }
- }
- //break当前代只是个标记,这里才会抛出break异常
- if (g.broken)
- throw new BrokenBarrierException();
- //6.正常执行并且已经换代,退出循环
- if (g != generation)
- return index;
- //超时时间设置不对,直接break当前代
- if (timed && nanos <= 0L) {
- breakBarrier();
- throw new TimeoutException();
- }
- }
- } finally {
- //7.解锁
- lock.unlock();
- }
- }
-
复制代码 4. 总结
- 让一堆线程互相等待,条件满足(信号量降为0)时最后一个到达的线程先执行Runnable,接着所有线程一起往下执行
- 调用await的时候
- 除了最后一个线程外,其他所有线程依次获取lock,对信号量-1,阻塞等待唤醒(加入condition队列并释放锁)
- 最后一个线程到达后执行Runnable,唤醒所有线程(把所有condition队列中的节点转到AQS中)
- 唤醒的所有线程依次抢占到锁(从AQS队列中移除)后往下执行,检查代后退出循环,解锁
5. 参考
- 并发工具类(二)同步屏障CyclicBarrier | 并发编程网 – ifeve.com
- Java并发之CyclicBarrier - 后端 - 掘金
- 【死磕 Java 并发】—- J.U.C 之并发工具类:CyclicBarrier | 芋道源码 —— 纯源码解析博客
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |