找回密码
 立即注册
首页 业界区 业界 Java源码分析系列笔记-8.CyclicBarrier

Java源码分析系列笔记-8.CyclicBarrier

章海 2025-6-25 06:37:39
目录

  • 1. 是什么
  • 2. 如何使用

    • 2.1. 不带Runnable
    • 2.2. 带Runnale

  • 3. 原理分析

    • 3.1. uml
    • 3.2. 构造方法

      • 3.2.1. 使用Lock和Condition实现

    • 3.3. await方法

      • 3.3.1. 首先是加锁
      • 3.3.2. 然后减信号量
      • 3.3.3. 最后一个到达的需要执行prepareAction、唤醒线程并换代

        • 3.3.3.1. 怎么唤醒并换代的

      • 3.3.4. 最后一个线程未到达前,其他线程阻塞等待唤醒

        • 3.3.4.1. 线程被中断,置当前代失效的操作

      • 3.3.5. 最后一个线程到达唤醒其他所有线程后,其他所有线程退出循环


  • 4. 总结
  • 5. 参考

1. 是什么

可重复使用的计数器,让一堆线程互相等待,条件满足时一起往下执行
底层使用Lock+Condition实现阻塞等待和唤醒
2. 如何使用

2.1. 不带Runnable

当所有线程都到达await点的时候才一起往下执行
  1. public class CyclicBarrierTest
  2. {
  3.     private static final int count = 20;
  4.     private static final AtomicInteger val = new AtomicInteger();
  5.     private static final CyclicBarrier barrier = new CyclicBarrier(count);
  6.     private static class CalcAction implements Runnable
  7.     {
  8.         @Override
  9.         public void run()
  10.         {
  11.             try
  12.             {
  13.                 System.out.println("1." + Thread.currentThread().getName() + "到达await点");
  14.                 int result = val.incrementAndGet();
  15.                 if (result % 5 == 0)
  16.                 {
  17.                     System.out.println("2." + Thread.currentThread().getName() + "休眠3s");
  18.                     TimeUnit.SECONDS.sleep(3);
  19.                 }
  20.                 barrier.await();
  21.             }
  22.             catch (InterruptedException | BrokenBarrierException e)
  23.             {
  24.                 e.printStackTrace();
  25.             }
  26.             System.out.println("3." + Thread.currentThread().getName() + "继续执行");
  27.         }
  28.     }
  29.     public static void main(String[] args)
  30.     {
  31.         for (int i = 0; i < count; i++)
  32.         {
  33.             new Thread(new CalcAction()).start();
  34.         }
  35.     }
  36. }
复制代码
2.2. 带Runnale

当所有线程都到达await点的时候,最后一个到达的线程执行prepare,再一起往下执行
  1. public class CyclicBarrierTest
  2. {
  3.     private static final int count = 20;
  4.     private static final AtomicInteger val = new AtomicInteger();
  5.     private static final CyclicBarrier barrier = new CyclicBarrier(count, new PrepareAction());
  6.     private static class PrepareAction implements Runnable
  7.     {
  8.         @Override
  9.         public void run()
  10.         {
  11.             try
  12.             {
  13.                 System.out.println("2.所有线程到达await,最后一个到达的线程" + Thread.currentThread().getName() + "先执行PrepareAction,休眠3s");
  14.                 TimeUnit.SECONDS.sleep(3);
  15.             }
  16.             catch (InterruptedException e)
  17.             {
  18.                 e.printStackTrace();
  19.             }
  20.             System.out.println("3.执行PrepareAction完毕");
  21.         }
  22.     }
  23.     private static class CalcAction implements Runnable
  24.     {
  25.         @Override
  26.         public void run()
  27.         {
  28.             try
  29.             {
  30.                 System.out.println("1." + Thread.currentThread().getName() + "到达await点");
  31.                 barrier.await();
  32.             }
  33.             catch (InterruptedException | BrokenBarrierException e)
  34.             {
  35.                 e.printStackTrace();
  36.             }
  37.             int result = val.incrementAndGet();
  38.             System.out.println("4." + Thread.currentThread().getName() + "执行计算:result: " + result);
  39.         }
  40.     }
  41.     public static void main(String[] args)
  42.     {
  43.         for (int i = 0; i < count; i++)
  44.         {
  45.             new Thread(new CalcAction()).start();
  46.         }
  47.     }
  48. }
复制代码
3. 原理分析

3.1. uml

1.png

3.2. 构造方法

3.2.1. 使用Lock和Condition实现
  1. public class CyclicBarrier {
  2.         //CyclicBarrier可以循环使用
  3.         private static class Generation {
  4.                 //当前代是否损坏
  5.             boolean broken = false;
  6.         }
  7.     //使用lock阻塞在await点等待
  8.     private final ReentrantLock lock = new ReentrantLock();
  9.     //使用lock.condition唤醒阻塞的所有线程往下执行
  10.     private final Condition trip = lock.newCondition();
  11.     //总共的信号量
  12.     private final int parties;
  13.     //最后一个到达的线程先执行barrierCommand,所有线程再一起继续往下执行
  14.     private final Runnable barrierCommand;
  15.     //CyclicBarrier可以重复使用,每次使用都是一个generation
  16.     private Generation generation = new Generation();
  17.     //剩下多少个线程没有到达await点
  18.     private int count;
  19.         public CyclicBarrier(int parties, Runnable barrierAction) {
  20.                 if (parties <= 0) throw new IllegalArgumentException();
  21.                 //初始化总的信号量
  22.                 this.parties = parties;
  23.                 //初始化剩余未使用的信号量=总的信号量
  24.                 //相当于一开始就加锁了parties次,那么也就需要解锁parties次
  25.                 this.count = parties;
  26.                 this.barrierCommand = barrierAction;
  27.         }
  28. }
复制代码
3.3.4.1. 线程被中断,置当前代失效的操作


  • breakBarrier
  1. public int await() throws InterruptedException, BrokenBarrierException {
  2.     try {
  3.             //调用dowait并且默认不设置超时
  4.         return dowait(false, 0L);
  5.     } catch (TimeoutException toe) {
  6.         throw new Error(toe); // cannot happen
  7.     }
  8. }
复制代码
3.3.5. 最后一个线程到达唤醒其他所有线程后,其他所有线程退出循环
  1. private int dowait(boolean timed, long nanos)
  2.         throws InterruptedException, BrokenBarrierException,
  3.                TimeoutException {
  4.         final ReentrantLock lock = this.lock;
  5.         //1.先获取锁才能往下执行
  6.         lock.lock();
  7.         try {
  8.                 //当前代,每reset一次代+1
  9.             final Generation g = generation;
  10.                         //当前代已损坏--什么情况会导致损坏?--线程被中断,执行breakBarrier方法
  11.             if (g.broken)
  12.                 throw new BrokenBarrierException();
  13.                         //如果线程被中断,那么置当前代失效
  14.             if (Thread.interrupted()) {
  15.                 breakBarrier();
  16.                 throw new InterruptedException();
  17.             }
  18.             int index = --count;
  19.             //剩余的信号量为0,那么可以继续往下执行了
  20.             //即最后一个线程到达
  21.             if (index == 0) {  // tripped
  22.                 boolean ranAction = false;
  23.                 try {
  24.                     final Runnable command = barrierCommand;
  25.                     //3.最后一个到达的线程先执行Runnable
  26.                     if (command != null)
  27.                         command.run();
  28.                     ranAction = true;
  29.                     //4.唤醒所有线程继续往下执行(46行)并且换代
  30.                     nextGeneration();
  31.                     //5.返回,不往下执行死循环
  32.                     return 0;
  33.                 } finally {
  34.                     if (!ranAction)
  35.                         breakBarrier();
  36.                 }
  37.             }
  38.             // 死循环直到超时或者信号量都用完或者中断
  39.             for (;;) {
  40.                 try {
  41.                     if (!timed)
  42.                             //2.未设置超时,那么调用Condition.await()方法等待唤醒
  43.                         trip.await();
  44.                     else if (nanos > 0L)
  45.                             //设置超时,那么调用Condition.await(超时)方法等待唤醒或者超时
  46.                         nanos = trip.awaitNanos(nanos);
  47.                 } catch (InterruptedException ie) {
  48.                         //发生了中断,break当前代
  49.                     if (g == generation && ! g.broken) {
  50.                         breakBarrier();
  51.                         throw ie;
  52.                     } else {
  53.                         
  54.                         Thread.currentThread().interrupt();
  55.                     }
  56.                 }
  57.                                 //break当前代只是个标记,这里才会抛出break异常
  58.                 if (g.broken)
  59.                     throw new BrokenBarrierException();
  60.                                 //6.正常执行并且已经换代,退出循环
  61.                 if (g != generation)
  62.                     return index;
  63.                                 //超时时间设置不对,直接break当前代
  64.                 if (timed && nanos <= 0L) {
  65.                     breakBarrier();
  66.                     throw new TimeoutException();
  67.                 }
  68.             }
  69.         } finally {
  70.                 //7.解锁
  71.             lock.unlock();
  72.         }
  73.     }
  74.    
复制代码
4. 总结


  • 让一堆线程互相等待,条件满足(信号量降为0)时最后一个到达的线程先执行Runnable,接着所有线程一起往下执行
  • 调用await的时候

    • 除了最后一个线程外,其他所有线程依次获取lock,对信号量-1,阻塞等待唤醒(加入condition队列并释放锁)
    • 最后一个线程到达后执行Runnable,唤醒所有线程(把所有condition队列中的节点转到AQS中)
    • 唤醒的所有线程依次抢占到锁(从AQS队列中移除)后往下执行,检查代后退出循环,解锁

5. 参考


  • 并发工具类(二)同步屏障CyclicBarrier | 并发编程网 – ifeve.com
  • Java并发之CyclicBarrier - 后端 - 掘金
  • 【死磕 Java 并发】—- J.U.C 之并发工具类:CyclicBarrier | 芋道源码 —— 纯源码解析博客

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册