孟清妍 发表于 2025-6-26 10:40:31

Java源码分析系列笔记-12.BlockingQueue

目录

[*]1. 是什么
[*]2. 使用场景
[*]3. 如何使用
[*]4. 各种BlockingQueue详解以及对比
[*]5. ArrayBlockingQueue

[*]5.1. 是什么
[*]5.2. 如何使用

[*]5.2.1. 方法选择

[*]5.3. 原理分析

[*]5.3.1. uml
[*]5.3.2. 构造方法

[*]5.3.2.1. 底层使用数组+Lock+Condtion实现

[*]5.3.3. put【阻塞】

[*]5.3.3.1. 加锁
[*]5.3.3.2. 如果数组已经满了,那么等待
[*]5.3.3.3. 没满则入队并唤醒读者

[*]5.3.4. take【阻塞】

[*]5.3.4.1. 加锁
[*]5.3.4.2. 如果数组为空,那么等待
[*]5.3.4.3. 不为空则出队并唤醒写者

[*]5.3.5. offer【返回特殊值】
[*]5.3.6. poll【返回特殊值】
[*]5.3.7. add【抛出异常】
[*]5.3.8. remove【抛出异常】
[*]5.3.9. element【抛出异常】
[*]5.3.10. peek【返回特殊值】

[*]5.4. 总结

[*]6. LinkedBlockingQueue

[*]6.1. 是什么
[*]6.2. 如何使用
[*]6.3. 源码分析

[*]6.3.1. 构造方法

[*]6.3.1.1. 底层使用单向链表+Lock+Condition实现
[*]6.3.1.2. Node

[*]6.3.2. put【阻塞】

[*]6.3.2.1. 加写锁
[*]6.3.2.2. 如果队列已满那么等待
[*]6.3.2.3. 未满则入队
[*]6.3.2.4. 入队完发现队列没满,那么继续唤醒写者入队
[*]6.3.2.5. 入队完解锁后发现之前队列是空的,那么唤醒读者

[*]6.3.3. take【阻塞】

[*]6.3.3.1. 加读锁
[*]6.3.3.2. 队列为空那么等待
[*]6.3.3.3. 未空则出队
[*]6.3.3.4. 出了队发现队列没空,那么继续唤醒读者
[*]6.3.3.5. 出了队解了锁发现之前队列是满的,那么唤醒写者

[*]6.3.4. offer 返回特殊值
[*]6.3.5. poll 返回特殊值
[*]6.3.6. peek 返回特殊值

[*]6.4. 总结

[*]7. PriorityBlockingQueue

[*]7.1. 是什么

[*]7.1.1. 二叉堆

[*]7.2. 如何使用
[*]7.3. 原理分析

[*]7.3.1. 构造方法

[*]7.3.1.1. 底层使用数组+Lock+Condition实现

[*]7.3.2. put

[*]7.3.2.1. 转调offer,不需要阻塞

[*]7.3.2.1.1. 加锁
[*]7.3.2.1.2. 判断是否需要扩容

[*]7.3.2.1.2.1. 需要的话进行扩容

[*]7.3.2.1.3. 把元素加入堆的末尾

[*]7.3.2.1.3.1. 上浮操作调整堆
[*]7.3.2.1.3.2. 调整的过程图



[*]7.3.3. take

[*]7.3.3.1. 加锁
[*]7.3.3.2. 一直阻塞等待,直到出队成功

[*]7.3.3.2.1. 出队具体操作

[*]7.3.3.2.1.1. 移除堆顶,末尾元素放到堆顶
[*]7.3.3.2.1.2. 下沉操作调整堆
[*]7.3.3.2.1.3. 调整的过程图




[*]7.4. 总结

[*]8. SynchronousQueue

[*]8.1. 是什么
[*]8.2. 使用
[*]8.3. 原理

[*]8.3.1. 构造方法

[*]8.3.1.1. Transfer
[*]8.3.1.2. QNode

[*]8.3.2. put 阻塞

[*]8.3.2.1. 调用TransferQueue

[*]8.3.3. take 阻塞

[*]8.4. 总结

[*]9. 参考

1. 是什么

线程安全的阻塞队列。
特点:

[*]先进先出:
既然是队列那肯定是先进先出
[*]阻塞
支持在插入元素时,如果队列已满,那么阻塞,等待队列非满
也支持在删除元素时,如果队列为空,那么阻塞,等待队列非空
[*]无界有界
数组容量的大小。无界其实是Integer.MAX_VALUE
[*]线程安全
2. 使用场景

生产者、消费者
3. 如何使用

方法\处理方式抛出异常返回特殊值一直阻塞超时退出插入方法add(e)offer(e)put(e)offer(e,time,unit)移除方法remove()poll()take()poll(time,unit)检查方法element()peek()不可用不可用4. 各种BlockingQueue详解以及对比

ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueueSynchronousQueue数据结构数组单向链表数组(二叉堆)单向链表怎么实现阻塞Lock+ConditionLock+ConditionLock+ConditionCAS+LockSupport有界/无界有界有界无界无界(不存储元素)吞吐量(以LinkedBlockingQueue为基准)比LinkedBlockingQueue低(读读、读写、写写相互阻塞)/ (读读、写写相互阻塞,读写不相互阻塞)无界(读读、读写、写写相互阻塞)比LinkedBlockingQueue高(读写匹配才能进行下去)

[*]ArrayBlockingQueue.md
[*]LinkedBlockingQueue.md
[*]PriorityBlockingQueue.md
[*]SynchronousQueue.md
5. ArrayBlockingQueue

5.1. 是什么

使用Object数组实现的有界的阻塞队列
读读、读写、写写相互阻塞
5.2. 如何使用

public class ArrayBlockingQueueTest
{
    public static void main(String[] args) throws InterruptedException
    {
      ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
      CountDownLatch latch = new CountDownLatch(2);

      new Thread(()->{
            for (int i = 0;;i++)
            {
                try
                {
                       String data = "data" + i;
                  queue.put(data);
                  System.out.println("Producer放入消息:" + data);
                  TimeUnit.SECONDS.sleep(1);
                }
                catch (Exception e)
                {
                  e.printStackTrace();
                }
                finally
                {
                  latch.countDown();
                }
            }
      }).start();

      new Thread(()->{
            for (;;)
            {
                try
                {
                  System.out.println("Consumer获取消息:" + queue.take());
                }
                catch (Exception e)
                {
                  e.printStackTrace();
                }
                finally
                {
                  latch.countDown();
                }
            }
      }).start();

      latch.await();

    }
}5.2.1. 方法选择

方法\处理方式抛出异常返回特殊值一直阻塞超时退出插入方法add(e)offer(e)put(e)offer(e,time,unit)移除方法remove()poll()take()poll(time,unit)检查方法element()peek()不可用不可用5.3. 原理分析

5.3.1. uml


5.3.2. 构造方法

5.3.2.1. 底层使用数组+Lock+Condtion实现

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
        //底层是数组实现的
        final Object[] items;

        //take, poll, peek or remove等读方法,读取下一个元素的位置
        int takeIndex;
        //put, offer, or add等方法,写入下一个元素的位置
        int putIndex;

        //数组中实际元素的数量
        //当count==item.length()的时候说明数组已满
        int count;

        //一个锁说明读写互斥
        final ReentrantLock lock;
    //两个条件量
    private final Condition notEmpty;//用来唤醒读线程
        private final Condition notFull;//用来唤醒写线程

        public ArrayBlockingQueue(int capacity, boolean fair) {
                if (capacity <= 0)
                        throw new IllegalArgumentException();
       
                this.items = new Object;
                lock = new ReentrantLock(fair);
                notEmpty = lock.newCondition();
                notFull =lock.newCondition();
        }
}6.3. 源码分析

6.3.1. 构造方法

6.3.1.1. 底层使用单向链表+Lock+Condition实现

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    //加锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
            //如果数组已经满了,那么等待。读者取出元素后唤醒
      while (count == items.length)
            notFull.await();
      //没满,加入数组
      enqueue(e);
    } finally {
      lock.unlock();
    }
}

[*]7行:加读锁。一旦加了读锁其他读者无法同时进来读取数据,但是写者可以同时进来写数据
[*]10-12行:链表实际容量为0,那么读者阻塞等待写者写入
[*]14行:链表不为空的话,那么删除链表头部的元素
[*]15行:更新队列中元素的数量,-1,返回原值
[*]16-17行:取出了元素后发现队列还是不为空,那么唤醒其他读者继续读取
[*]22-24行:由这句c = count.getAndDecrement();可看出-1后返回的是c的原值,当他为capacity的时候说明之前队列可能是满的,那么加写锁、唤醒写者写入元素、解写锁
下面具体分析:
6.3.3.1. 加读锁

//加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
    //...
} finally {
    lock.unlock();
}6.3.3.2. 队列为空那么等待

//如果数组已经满了,那么等待。直到读者取出元素后唤醒
while (count == items.length)
    notFull.await();6.3.3.3. 未空则出队


[*]dequeue
enqueue(e);6.3.3.4. 出了队发现队列没空,那么继续唤醒读者

private void enqueue(E x) {
    //把元素加入到队尾
    final Object[] items = this.items;
    items = x;
    //已插入到末尾,重置插入索引为0
    //这个数组是可以循环使用的,不需要扩容。
    if (++putIndex == items.length)
      putIndex = 0;
    count++;
    //插入后唤醒读者
    notEmpty.signal();
}6.3.3.5. 出了队解了锁发现之前队列是满的,那么唤醒写者

public E take() throws InterruptedException {
        //加锁
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
            //如果数组为空,那么等待。写者加入元素后唤醒
      while (count == 0)
            notEmpty.await();
            //出队
      return dequeue();
    } finally {
            //释放锁
      lock.unlock();
    }
}

[*]signalNotFull
//加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
        //...
} finally {
        //释放锁
    lock.unlock();
}6.3.4. offer 返回特殊值

//如果数组为空,那么等待。写者加入元素后唤醒
while (count == 0)
    notEmpty.await();6.3.5. poll 返回特殊值

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    //获取最后一个元素并置为null
    E x = (E) items;
    items = null;
    //已取到末尾,重置取值索引为0
   //这个数组是可以循环使用的,不需要扩容。
    if (++takeIndex == items.length)
      takeIndex = 0;
    count--;
    if (itrs != null)
      itrs.elementDequeued();
    //出队后唤醒写者
    notFull.signal();
    return x;
}6.3.6. peek 返回特殊值

public boolean offer(E e) {
    checkNotNull(e);
    //加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
            //已满,直接返回false
      if (count == items.length)
            return false;
      else {
              //未满,加入队列同时唤醒读者
            enqueue(e);
            return true;
      }
    } finally {
            //解锁
      lock.unlock();
    }
}6.4. 总结

底层使用单向数组实现,可以有界也可以无界队列。
并且用了两个锁和两个condition。两个个锁说明读写可以同时进行,两个conditon说明读写相互唤醒
7. PriorityBlockingQueue


目录

[*]1. 是什么
[*]2. 使用场景
[*]3. 如何使用
[*]4. 各种BlockingQueue详解以及对比
[*]5. ArrayBlockingQueue

[*]5.1. 是什么
[*]5.2. 如何使用

[*]5.2.1. 方法选择

[*]5.3. 原理分析

[*]5.3.1. uml
[*]5.3.2. 构造方法

[*]5.3.2.1. 底层使用数组+Lock+Condtion实现

[*]5.3.3. put【阻塞】

[*]5.3.3.1. 加锁
[*]5.3.3.2. 如果数组已经满了,那么等待
[*]5.3.3.3. 没满则入队并唤醒读者

[*]5.3.4. take【阻塞】

[*]5.3.4.1. 加锁
[*]5.3.4.2. 如果数组为空,那么等待
[*]5.3.4.3. 不为空则出队并唤醒写者

[*]5.3.5. offer【返回特殊值】
[*]5.3.6. poll【返回特殊值】
[*]5.3.7. add【抛出异常】
[*]5.3.8. remove【抛出异常】
[*]5.3.9. element【抛出异常】
[*]5.3.10. peek【返回特殊值】

[*]5.4. 总结

[*]6. LinkedBlockingQueue

[*]6.1. 是什么
[*]6.2. 如何使用
[*]6.3. 源码分析

[*]6.3.1. 构造方法

[*]6.3.1.1. 底层使用单向链表+Lock+Condition实现
[*]6.3.1.2. Node

[*]6.3.2. put【阻塞】

[*]6.3.2.1. 加写锁
[*]6.3.2.2. 如果队列已满那么等待
[*]6.3.2.3. 未满则入队
[*]6.3.2.4. 入队完发现队列没满,那么继续唤醒写者入队
[*]6.3.2.5. 入队完解锁后发现之前队列是空的,那么唤醒读者

[*]6.3.3. take【阻塞】

[*]6.3.3.1. 加读锁
[*]6.3.3.2. 队列为空那么等待
[*]6.3.3.3. 未空则出队
[*]6.3.3.4. 出了队发现队列没空,那么继续唤醒读者
[*]6.3.3.5. 出了队解了锁发现之前队列是满的,那么唤醒写者

[*]6.3.4. offer 返回特殊值
[*]6.3.5. poll 返回特殊值
[*]6.3.6. peek 返回特殊值

[*]6.4. 总结

[*]7. PriorityBlockingQueue

[*]7.1. 是什么

[*]7.1.1. 二叉堆

[*]7.2. 如何使用
[*]7.3. 原理分析

[*]7.3.1. 构造方法

[*]7.3.1.1. 底层使用数组+Lock+Condition实现

[*]7.3.2. put

[*]7.3.2.1. 转调offer,不需要阻塞

[*]7.3.2.1.1. 加锁
[*]7.3.2.1.2. 判断是否需要扩容

[*]7.3.2.1.2.1. 需要的话进行扩容

[*]7.3.2.1.3. 把元素加入堆的末尾

[*]7.3.2.1.3.1. 上浮操作调整堆
[*]7.3.2.1.3.2. 调整的过程图



[*]7.3.3. take

[*]7.3.3.1. 加锁
[*]7.3.3.2. 一直阻塞等待,直到出队成功

[*]7.3.3.2.1. 出队具体操作

[*]7.3.3.2.1.1. 移除堆顶,末尾元素放到堆顶
[*]7.3.3.2.1.2. 下沉操作调整堆
[*]7.3.3.2.1.3. 调整的过程图




[*]7.4. 总结

[*]8. SynchronousQueue

[*]8.1. 是什么
[*]8.2. 使用
[*]8.3. 原理

[*]8.3.1. 构造方法

[*]8.3.1.1. Transfer
[*]8.3.1.2. QNode

[*]8.3.2. put 阻塞

[*]8.3.2.1. 调用TransferQueue

[*]8.3.3. take 阻塞

[*]8.4. 总结

[*]9. 参考

7.1. 是什么

底层使用数组(二叉堆)实现的无界的阻塞队列
读读、读写、写写相互阻塞
可以排序
由于无界,所以put操作不会阻塞,但是take操作会阻塞(队列为空的时候)
7.1.1. 二叉堆

一颗完全二叉树,堆序性质为,每个节点的值都小于其左右子节点的值,二叉堆中最小的值就是根节点。
底层用数组进行存储。对于数组中的元素 a,其左子节点为 ai+1],其右子节点为 ai + 2],其父节点为 a[(i-1)/2]。
结构如下图:

7.2. 如何使用

public E poll() {
    final ReentrantLock lock = this.lock;
        //加锁
    lock.lock();
    try {
            //长度为0直接返回null,否则出队并唤醒写者
      return (count == 0) ? null : dequeue();
    } finally {
      lock.unlock();
    }
}7.3. 原理分析

7.3.1. 构造方法

7.3.1.1. 底层使用数组+Lock+Condition实现

public class PriorityBlockingQueue extends AbstractQueue    implements BlockingQueue, java.io.Serializable {        //底层使用数组实现(堆)        private transient Object[] queue;        //实际使用的长度    private transient int size;    //comparator确定元素的顺序,如果是null那么是自然序    private transient Comparator
页: [1]
查看完整版本: Java源码分析系列笔记-12.BlockingQueue