找回密码
 立即注册
首页 业界区 业界 Java源码分析系列笔记-12.BlockingQueue

Java源码分析系列笔记-12.BlockingQueue

孟清妍 2025-6-26 10:40:31
目录

  • 1. 是什么
  • 2. 使用场景
  • 3. 如何使用
  • 4. 各种BlockingQueue详解以及对比
  • 5. ArrayBlockingQueue

    • 5.1. 是什么
    • 5.2. 如何使用

      • 5.2.1. 方法选择

    • 5.3. 原理分析

      • 5.3.1. uml
      • 5.3.2. 构造方法

        • 5.3.2.1. 底层使用数组+Lock+Condtion实现

      • 5.3.3. put【阻塞】

        • 5.3.3.1. 加锁
        • 5.3.3.2. 如果数组已经满了,那么等待
        • 5.3.3.3. 没满则入队并唤醒读者

      • 5.3.4. take【阻塞】

        • 5.3.4.1. 加锁
        • 5.3.4.2. 如果数组为空,那么等待
        • 5.3.4.3. 不为空则出队并唤醒写者

      • 5.3.5. offer【返回特殊值】
      • 5.3.6. poll【返回特殊值】
      • 5.3.7. add【抛出异常】
      • 5.3.8. remove【抛出异常】
      • 5.3.9. element【抛出异常】
      • 5.3.10. peek【返回特殊值】

    • 5.4. 总结

  • 6. LinkedBlockingQueue

    • 6.1. 是什么
    • 6.2. 如何使用
    • 6.3. 源码分析

      • 6.3.1. 构造方法

        • 6.3.1.1. 底层使用单向链表+Lock+Condition实现
        • 6.3.1.2. Node

      • 6.3.2. put【阻塞】

        • 6.3.2.1. 加写锁
        • 6.3.2.2. 如果队列已满那么等待
        • 6.3.2.3. 未满则入队
        • 6.3.2.4. 入队完发现队列没满,那么继续唤醒写者入队
        • 6.3.2.5. 入队完解锁后发现之前队列是空的,那么唤醒读者

      • 6.3.3. take【阻塞】

        • 6.3.3.1. 加读锁
        • 6.3.3.2. 队列为空那么等待
        • 6.3.3.3. 未空则出队
        • 6.3.3.4. 出了队发现队列没空,那么继续唤醒读者
        • 6.3.3.5. 出了队解了锁发现之前队列是满的,那么唤醒写者

      • 6.3.4. offer 返回特殊值
      • 6.3.5. poll 返回特殊值
      • 6.3.6. peek 返回特殊值

    • 6.4. 总结

  • 7. PriorityBlockingQueue

    • 7.1. 是什么

      • 7.1.1. 二叉堆

    • 7.2. 如何使用
    • 7.3. 原理分析

      • 7.3.1. 构造方法

        • 7.3.1.1. 底层使用数组+Lock+Condition实现

      • 7.3.2. put

        • 7.3.2.1. 转调offer,不需要阻塞

          • 7.3.2.1.1. 加锁
          • 7.3.2.1.2. 判断是否需要扩容

            • 7.3.2.1.2.1. 需要的话进行扩容

          • 7.3.2.1.3. 把元素加入堆的末尾

            • 7.3.2.1.3.1. 上浮操作调整堆
            • 7.3.2.1.3.2. 调整的过程图



      • 7.3.3. take

        • 7.3.3.1. 加锁
        • 7.3.3.2. 一直阻塞等待,直到出队成功

          • 7.3.3.2.1. 出队具体操作

            • 7.3.3.2.1.1. 移除堆顶,末尾元素放到堆顶
            • 7.3.3.2.1.2. 下沉操作调整堆
            • 7.3.3.2.1.3. 调整的过程图




    • 7.4. 总结

  • 8. SynchronousQueue

    • 8.1. 是什么
    • 8.2. 使用
    • 8.3. 原理

      • 8.3.1. 构造方法

        • 8.3.1.1. Transfer
        • 8.3.1.2. QNode

      • 8.3.2. put 阻塞

        • 8.3.2.1. 调用TransferQueue

      • 8.3.3. take 阻塞

    • 8.4. 总结

  • 9. 参考

1. 是什么

线程安全的阻塞队列。
特点:

  • 先进先出:
    既然是队列那肯定是先进先出
  • 阻塞
    支持在插入元素时,如果队列已满,那么阻塞,等待队列非满
    也支持在删除元素时,如果队列为空,那么阻塞,等待队列非空
  • 无界有界
    数组容量的大小。无界其实是Integer.MAX_VALUE
  • 线程安全
2. 使用场景

生产者、消费者
3. 如何使用

方法\处理方式抛出异常返回特殊值一直阻塞超时退出插入方法add(e)offer(e)put(e)offer(e,time,unit)移除方法remove()poll()take()poll(time,unit)检查方法element()peek()不可用不可用4. 各种BlockingQueue详解以及对比

ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueueSynchronousQueue数据结构数组单向链表数组(二叉堆)单向链表怎么实现阻塞Lock+ConditionLock+ConditionLock+ConditionCAS+LockSupport有界/无界有界有界无界无界(不存储元素)吞吐量(以LinkedBlockingQueue为基准)比LinkedBlockingQueue低(读读、读写、写写相互阻塞)/ (读读、写写相互阻塞,读写不相互阻塞)无界(读读、读写、写写相互阻塞)比LinkedBlockingQueue高(读写匹配才能进行下去)

  • ArrayBlockingQueue.md
  • LinkedBlockingQueue.md
  • PriorityBlockingQueue.md
  • SynchronousQueue.md
5. ArrayBlockingQueue

5.1. 是什么

使用Object数组实现的有界的阻塞队列
读读、读写、写写相互阻塞
5.2. 如何使用
  1. public class ArrayBlockingQueueTest
  2. {
  3.     public static void main(String[] args) throws InterruptedException
  4.     {
  5.         ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
  6.         CountDownLatch latch = new CountDownLatch(2);
  7.         new Thread(()->{
  8.             for (int i = 0;;i++)
  9.             {
  10.                 try
  11.                 {
  12.                          String data = "data" + i;
  13.                     queue.put(data);
  14.                     System.out.println("Producer放入消息:" + data);
  15.                     TimeUnit.SECONDS.sleep(1);
  16.                 }
  17.                 catch (Exception e)
  18.                 {
  19.                     e.printStackTrace();
  20.                 }
  21.                 finally
  22.                 {
  23.                     latch.countDown();
  24.                 }
  25.             }
  26.         }).start();
  27.         new Thread(()->{
  28.             for (;;)
  29.             {
  30.                 try
  31.                 {
  32.                     System.out.println("Consumer获取消息:" + queue.take());
  33.                 }
  34.                 catch (Exception e)
  35.                 {
  36.                     e.printStackTrace();
  37.                 }
  38.                 finally
  39.                 {
  40.                     latch.countDown();
  41.                 }
  42.             }
  43.         }).start();
  44.         latch.await();
  45.     }
  46. }
复制代码
5.2.1. 方法选择

方法\处理方式抛出异常返回特殊值一直阻塞超时退出插入方法add(e)offer(e)put(e)offer(e,time,unit)移除方法remove()poll()take()poll(time,unit)检查方法element()peek()不可用不可用5.3. 原理分析

5.3.1. uml

1.png

5.3.2. 构造方法

5.3.2.1. 底层使用数组+Lock+Condtion实现
  1. public class ArrayBlockingQueue<E> extends AbstractQueue<E>
  2.     implements BlockingQueue<E>, java.io.Serializable {
  3.         //底层是数组实现的
  4.         final Object[] items;
  5.         //take, poll, peek or remove等读方法,读取下一个元素的位置
  6.         int takeIndex;
  7.         //put, offer, or add等方法,写入下一个元素的位置
  8.         int putIndex;
  9.         //数组中实际元素的数量
  10.         //当count==item.length()的时候说明数组已满
  11.         int count;
  12.         //一个锁说明读写互斥
  13.         final ReentrantLock lock;
  14.     //两个条件量
  15.     private final Condition notEmpty;//用来唤醒读线程
  16.         private final Condition notFull;//用来唤醒写线程
  17.         public ArrayBlockingQueue(int capacity, boolean fair) {
  18.                 if (capacity <= 0)
  19.                         throw new IllegalArgumentException();
  20.        
  21.                 this.items = new Object[capacity];
  22.                 lock = new ReentrantLock(fair);
  23.                 notEmpty = lock.newCondition();
  24.                 notFull =  lock.newCondition();
  25.         }
  26. }
复制代码
6.3. 源码分析

6.3.1. 构造方法

6.3.1.1. 底层使用单向链表+Lock+Condition实现
  1. public void put(E e) throws InterruptedException {
  2.     checkNotNull(e);
  3.     //加锁
  4.     final ReentrantLock lock = this.lock;
  5.     lock.lockInterruptibly();
  6.     try {
  7.             //如果数组已经满了,那么等待。读者取出元素后唤醒
  8.         while (count == items.length)
  9.             notFull.await();
  10.         //没满,加入数组
  11.         enqueue(e);
  12.     } finally {
  13.         lock.unlock();
  14.     }
  15. }
复制代码

  • 7行:加读锁。一旦加了读锁其他读者无法同时进来读取数据,但是写者可以同时进来写数据
  • 10-12行:链表实际容量为0,那么读者阻塞等待写者写入
  • 14行:链表不为空的话,那么删除链表头部的元素
  • 15行:更新队列中元素的数量,-1,返回原值
  • 16-17行:取出了元素后发现队列还是不为空,那么唤醒其他读者继续读取
  • 22-24行:由这句c = count.getAndDecrement();可看出-1后返回的是c的原值,当他为capacity的时候说明之前队列可能是满的,那么加写锁、唤醒写者写入元素、解写锁
下面具体分析:
6.3.3.1. 加读锁
  1. //加锁
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5.     //...
  6. } finally {
  7.     lock.unlock();
  8. }
复制代码
6.3.3.2. 队列为空那么等待
  1. //如果数组已经满了,那么等待。直到读者取出元素后唤醒
  2. while (count == items.length)
  3.     notFull.await();
复制代码
6.3.3.3. 未空则出队


  • dequeue
  1. enqueue(e);
复制代码
6.3.3.4. 出了队发现队列没空,那么继续唤醒读者
  1. private void enqueue(E x) {
  2.     //把元素加入到队尾
  3.     final Object[] items = this.items;
  4.     items[putIndex] = x;
  5.     //已插入到末尾,重置插入索引为0
  6.     //这个数组是可以循环使用的,不需要扩容。
  7.     if (++putIndex == items.length)
  8.         putIndex = 0;
  9.     count++;
  10.     //插入后唤醒读者
  11.     notEmpty.signal();
  12. }
复制代码
6.3.3.5. 出了队解了锁发现之前队列是满的,那么唤醒写者
  1. public E take() throws InterruptedException {
  2.         //加锁
  3.     final ReentrantLock lock = this.lock;
  4.     lock.lockInterruptibly();
  5.     try {
  6.             //如果数组为空,那么等待。写者加入元素后唤醒
  7.         while (count == 0)
  8.             notEmpty.await();
  9.             //出队
  10.         return dequeue();
  11.     } finally {
  12.             //释放锁
  13.         lock.unlock();
  14.     }
  15. }
复制代码

  • signalNotFull
  1. //加锁
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5.         //...
  6. } finally {
  7.         //释放锁
  8.     lock.unlock();
  9. }
复制代码
6.3.4. offer 返回特殊值
  1. //如果数组为空,那么等待。写者加入元素后唤醒
  2. while (count == 0)
  3.     notEmpty.await();
复制代码
6.3.5. poll 返回特殊值
  1. private E dequeue() {
  2.     final Object[] items = this.items;
  3.     @SuppressWarnings("unchecked")
  4.     //获取最后一个元素并置为null
  5.     E x = (E) items[takeIndex];
  6.     items[takeIndex] = null;
  7.     //已取到末尾,重置取值索引为0
  8.      //这个数组是可以循环使用的,不需要扩容。
  9.     if (++takeIndex == items.length)
  10.         takeIndex = 0;
  11.     count--;
  12.     if (itrs != null)
  13.         itrs.elementDequeued();
  14.     //出队后唤醒写者
  15.     notFull.signal();
  16.     return x;
  17. }
复制代码
6.3.6. peek 返回特殊值
  1. public boolean offer(E e) {
  2.     checkNotNull(e);
  3.     //加锁
  4.     final ReentrantLock lock = this.lock;
  5.     lock.lock();
  6.     try {
  7.             //已满,直接返回false
  8.         if (count == items.length)
  9.             return false;
  10.         else {
  11.                 //未满,加入队列同时唤醒读者
  12.             enqueue(e);
  13.             return true;
  14.         }
  15.     } finally {
  16.             //解锁
  17.         lock.unlock();
  18.     }
  19. }
复制代码
6.4. 总结

底层使用单向数组实现,可以有界也可以无界队列。
并且用了两个锁和两个condition。两个个锁说明读写可以同时进行,两个conditon说明读写相互唤醒
7. PriorityBlockingQueue


目录

  • 1. 是什么
  • 2. 使用场景
  • 3. 如何使用
  • 4. 各种BlockingQueue详解以及对比
  • 5. ArrayBlockingQueue

    • 5.1. 是什么
    • 5.2. 如何使用

      • 5.2.1. 方法选择

    • 5.3. 原理分析

      • 5.3.1. uml
      • 5.3.2. 构造方法

        • 5.3.2.1. 底层使用数组+Lock+Condtion实现

      • 5.3.3. put【阻塞】

        • 5.3.3.1. 加锁
        • 5.3.3.2. 如果数组已经满了,那么等待
        • 5.3.3.3. 没满则入队并唤醒读者

      • 5.3.4. take【阻塞】

        • 5.3.4.1. 加锁
        • 5.3.4.2. 如果数组为空,那么等待
        • 5.3.4.3. 不为空则出队并唤醒写者

      • 5.3.5. offer【返回特殊值】
      • 5.3.6. poll【返回特殊值】
      • 5.3.7. add【抛出异常】
      • 5.3.8. remove【抛出异常】
      • 5.3.9. element【抛出异常】
      • 5.3.10. peek【返回特殊值】

    • 5.4. 总结

  • 6. LinkedBlockingQueue

    • 6.1. 是什么
    • 6.2. 如何使用
    • 6.3. 源码分析

      • 6.3.1. 构造方法

        • 6.3.1.1. 底层使用单向链表+Lock+Condition实现
        • 6.3.1.2. Node

      • 6.3.2. put【阻塞】

        • 6.3.2.1. 加写锁
        • 6.3.2.2. 如果队列已满那么等待
        • 6.3.2.3. 未满则入队
        • 6.3.2.4. 入队完发现队列没满,那么继续唤醒写者入队
        • 6.3.2.5. 入队完解锁后发现之前队列是空的,那么唤醒读者

      • 6.3.3. take【阻塞】

        • 6.3.3.1. 加读锁
        • 6.3.3.2. 队列为空那么等待
        • 6.3.3.3. 未空则出队
        • 6.3.3.4. 出了队发现队列没空,那么继续唤醒读者
        • 6.3.3.5. 出了队解了锁发现之前队列是满的,那么唤醒写者

      • 6.3.4. offer 返回特殊值
      • 6.3.5. poll 返回特殊值
      • 6.3.6. peek 返回特殊值

    • 6.4. 总结

  • 7. PriorityBlockingQueue

    • 7.1. 是什么

      • 7.1.1. 二叉堆

    • 7.2. 如何使用
    • 7.3. 原理分析

      • 7.3.1. 构造方法

        • 7.3.1.1. 底层使用数组+Lock+Condition实现

      • 7.3.2. put

        • 7.3.2.1. 转调offer,不需要阻塞

          • 7.3.2.1.1. 加锁
          • 7.3.2.1.2. 判断是否需要扩容

            • 7.3.2.1.2.1. 需要的话进行扩容

          • 7.3.2.1.3. 把元素加入堆的末尾

            • 7.3.2.1.3.1. 上浮操作调整堆
            • 7.3.2.1.3.2. 调整的过程图



      • 7.3.3. take

        • 7.3.3.1. 加锁
        • 7.3.3.2. 一直阻塞等待,直到出队成功

          • 7.3.3.2.1. 出队具体操作

            • 7.3.3.2.1.1. 移除堆顶,末尾元素放到堆顶
            • 7.3.3.2.1.2. 下沉操作调整堆
            • 7.3.3.2.1.3. 调整的过程图




    • 7.4. 总结

  • 8. SynchronousQueue

    • 8.1. 是什么
    • 8.2. 使用
    • 8.3. 原理

      • 8.3.1. 构造方法

        • 8.3.1.1. Transfer
        • 8.3.1.2. QNode

      • 8.3.2. put 阻塞

        • 8.3.2.1. 调用TransferQueue

      • 8.3.3. take 阻塞

    • 8.4. 总结

  • 9. 参考

7.1. 是什么

底层使用数组(二叉堆)实现的无界的阻塞队列
读读、读写、写写相互阻塞
可以排序
由于无界,所以put操作不会阻塞,但是take操作会阻塞(队列为空的时候)
7.1.1. 二叉堆

一颗完全二叉树,堆序性质为,每个节点的值都小于其左右子节点的值,二叉堆中最小的值就是根节点。
底层用数组进行存储。对于数组中的元素 a,其左子节点为 a[2i+1],其右子节点为 a[2i + 2],其父节点为 a[(i-1)/2]。
结构如下图:
2.png

7.2. 如何使用
  1. public E poll() {
  2.     final ReentrantLock lock = this.lock;
  3.         //加锁
  4.     lock.lock();
  5.     try {
  6.             //长度为0直接返回null,否则出队并唤醒写者
  7.         return (count == 0) ? null : dequeue();
  8.     } finally {
  9.         lock.unlock();
  10.     }
  11. }
复制代码
7.3. 原理分析

7.3.1. 构造方法

7.3.1.1. 底层使用数组+Lock+Condition实现

[code]public class PriorityBlockingQueue extends AbstractQueue    implements BlockingQueue, java.io.Serializable {        //底层使用数组实现(堆)        private transient Object[] queue;        //实际使用的长度    private transient int size;    //comparator确定元素的顺序,如果是null那么是自然序    private transient Comparator
您需要登录后才可以回帖 登录 | 立即注册