目录
- 1. 是什么
- 2. 使用场景
- 3. 如何使用
- 4. 各种BlockingQueue详解以及对比
- 5. ArrayBlockingQueue
- 5.1. 是什么
- 5.2. 如何使用
- 5.3. 原理分析
- 5.3.1. uml
- 5.3.2. 构造方法
- 5.3.2.1. 底层使用数组+Lock+Condtion实现
- 5.3.3. put【阻塞】
- 5.3.3.1. 加锁
- 5.3.3.2. 如果数组已经满了,那么等待
- 5.3.3.3. 没满则入队并唤醒读者
- 5.3.4. take【阻塞】
- 5.3.4.1. 加锁
- 5.3.4.2. 如果数组为空,那么等待
- 5.3.4.3. 不为空则出队并唤醒写者
- 5.3.5. offer【返回特殊值】
- 5.3.6. poll【返回特殊值】
- 5.3.7. add【抛出异常】
- 5.3.8. remove【抛出异常】
- 5.3.9. element【抛出异常】
- 5.3.10. peek【返回特殊值】
- 5.4. 总结
- 6. LinkedBlockingQueue
- 6.1. 是什么
- 6.2. 如何使用
- 6.3. 源码分析
- 6.3.1. 构造方法
- 6.3.1.1. 底层使用单向链表+Lock+Condition实现
- 6.3.1.2. Node
- 6.3.2. put【阻塞】
- 6.3.2.1. 加写锁
- 6.3.2.2. 如果队列已满那么等待
- 6.3.2.3. 未满则入队
- 6.3.2.4. 入队完发现队列没满,那么继续唤醒写者入队
- 6.3.2.5. 入队完解锁后发现之前队列是空的,那么唤醒读者
- 6.3.3. take【阻塞】
- 6.3.3.1. 加读锁
- 6.3.3.2. 队列为空那么等待
- 6.3.3.3. 未空则出队
- 6.3.3.4. 出了队发现队列没空,那么继续唤醒读者
- 6.3.3.5. 出了队解了锁发现之前队列是满的,那么唤醒写者
- 6.3.4. offer 返回特殊值
- 6.3.5. poll 返回特殊值
- 6.3.6. peek 返回特殊值
- 6.4. 总结
- 7. PriorityBlockingQueue
- 7.1. 是什么
- 7.2. 如何使用
- 7.3. 原理分析
- 7.3.1. 构造方法
- 7.3.1.1. 底层使用数组+Lock+Condition实现
- 7.3.2. put
- 7.3.2.1. 转调offer,不需要阻塞
- 7.3.2.1.1. 加锁
- 7.3.2.1.2. 判断是否需要扩容
- 7.3.2.1.3. 把元素加入堆的末尾
- 7.3.2.1.3.1. 上浮操作调整堆
- 7.3.2.1.3.2. 调整的过程图
- 7.3.3. take
- 7.3.3.1. 加锁
- 7.3.3.2. 一直阻塞等待,直到出队成功
- 7.3.3.2.1. 出队具体操作
- 7.3.3.2.1.1. 移除堆顶,末尾元素放到堆顶
- 7.3.3.2.1.2. 下沉操作调整堆
- 7.3.3.2.1.3. 调整的过程图
- 7.4. 总结
- 8. SynchronousQueue
- 8.1. 是什么
- 8.2. 使用
- 8.3. 原理
- 8.3.1. 构造方法
- 8.3.1.1. Transfer
- 8.3.1.2. QNode
- 8.3.2. put 阻塞
- 8.3.3. take 阻塞
- 8.4. 总结
- 9. 参考
1. 是什么
线程安全的阻塞队列。
特点:
- 先进先出:
既然是队列那肯定是先进先出
- 阻塞
支持在插入元素时,如果队列已满,那么阻塞,等待队列非满
也支持在删除元素时,如果队列为空,那么阻塞,等待队列非空
- 无界有界
数组容量的大小。无界其实是Integer.MAX_VALUE
- 线程安全
2. 使用场景
生产者、消费者
3. 如何使用
方法\处理方式抛出异常返回特殊值一直阻塞超时退出插入方法add(e)offer(e)put(e)offer(e,time,unit)移除方法remove()poll()take()poll(time,unit)检查方法element()peek()不可用不可用4. 各种BlockingQueue详解以及对比
ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueueSynchronousQueue数据结构数组单向链表数组(二叉堆)单向链表怎么实现阻塞Lock+ConditionLock+ConditionLock+ConditionCAS+LockSupport有界/无界有界有界无界无界(不存储元素)吞吐量(以LinkedBlockingQueue为基准)比LinkedBlockingQueue低(读读、读写、写写相互阻塞)/ (读读、写写相互阻塞,读写不相互阻塞)无界(读读、读写、写写相互阻塞)比LinkedBlockingQueue高(读写匹配才能进行下去)
- ArrayBlockingQueue.md
- LinkedBlockingQueue.md
- PriorityBlockingQueue.md
- SynchronousQueue.md
5. ArrayBlockingQueue
5.1. 是什么
使用Object数组实现的有界的阻塞队列
读读、读写、写写相互阻塞
5.2. 如何使用
- public class ArrayBlockingQueueTest
- {
- public static void main(String[] args) throws InterruptedException
- {
- ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
- CountDownLatch latch = new CountDownLatch(2);
- new Thread(()->{
- for (int i = 0;;i++)
- {
- try
- {
- String data = "data" + i;
- queue.put(data);
- System.out.println("Producer放入消息:" + data);
- TimeUnit.SECONDS.sleep(1);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- finally
- {
- latch.countDown();
- }
- }
- }).start();
- new Thread(()->{
- for (;;)
- {
- try
- {
- System.out.println("Consumer获取消息:" + queue.take());
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- finally
- {
- latch.countDown();
- }
- }
- }).start();
- latch.await();
- }
- }
复制代码 5.2.1. 方法选择
方法\处理方式抛出异常返回特殊值一直阻塞超时退出插入方法add(e)offer(e)put(e)offer(e,time,unit)移除方法remove()poll()take()poll(time,unit)检查方法element()peek()不可用不可用5.3. 原理分析
5.3.1. uml
5.3.2. 构造方法
5.3.2.1. 底层使用数组+Lock+Condtion实现
- public class ArrayBlockingQueue<E> extends AbstractQueue<E>
- implements BlockingQueue<E>, java.io.Serializable {
- //底层是数组实现的
- final Object[] items;
- //take, poll, peek or remove等读方法,读取下一个元素的位置
- int takeIndex;
- //put, offer, or add等方法,写入下一个元素的位置
- int putIndex;
- //数组中实际元素的数量
- //当count==item.length()的时候说明数组已满
- int count;
- //一个锁说明读写互斥
- final ReentrantLock lock;
- //两个条件量
- private final Condition notEmpty;//用来唤醒读线程
- private final Condition notFull;//用来唤醒写线程
- public ArrayBlockingQueue(int capacity, boolean fair) {
- if (capacity <= 0)
- throw new IllegalArgumentException();
-
- this.items = new Object[capacity];
- lock = new ReentrantLock(fair);
- notEmpty = lock.newCondition();
- notFull = lock.newCondition();
- }
- }
复制代码 6.3. 源码分析
6.3.1. 构造方法
6.3.1.1. 底层使用单向链表+Lock+Condition实现
- public void put(E e) throws InterruptedException {
- checkNotNull(e);
- //加锁
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- //如果数组已经满了,那么等待。读者取出元素后唤醒
- while (count == items.length)
- notFull.await();
- //没满,加入数组
- enqueue(e);
- } finally {
- lock.unlock();
- }
- }
复制代码
- 7行:加读锁。一旦加了读锁其他读者无法同时进来读取数据,但是写者可以同时进来写数据
- 10-12行:链表实际容量为0,那么读者阻塞等待写者写入
- 14行:链表不为空的话,那么删除链表头部的元素
- 15行:更新队列中元素的数量,-1,返回原值
- 16-17行:取出了元素后发现队列还是不为空,那么唤醒其他读者继续读取
- 22-24行:由这句c = count.getAndDecrement();可看出-1后返回的是c的原值,当他为capacity的时候说明之前队列可能是满的,那么加写锁、唤醒写者写入元素、解写锁
下面具体分析:
6.3.3.1. 加读锁
- //加锁
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- //...
- } finally {
- lock.unlock();
- }
复制代码 6.3.3.2. 队列为空那么等待
- //如果数组已经满了,那么等待。直到读者取出元素后唤醒
- while (count == items.length)
- notFull.await();
复制代码 6.3.3.3. 未空则出队
6.3.3.4. 出了队发现队列没空,那么继续唤醒读者
- private void enqueue(E x) {
- //把元素加入到队尾
- final Object[] items = this.items;
- items[putIndex] = x;
- //已插入到末尾,重置插入索引为0
- //这个数组是可以循环使用的,不需要扩容。
- if (++putIndex == items.length)
- putIndex = 0;
- count++;
- //插入后唤醒读者
- notEmpty.signal();
- }
复制代码 6.3.3.5. 出了队解了锁发现之前队列是满的,那么唤醒写者
- public E take() throws InterruptedException {
- //加锁
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- //如果数组为空,那么等待。写者加入元素后唤醒
- while (count == 0)
- notEmpty.await();
- //出队
- return dequeue();
- } finally {
- //释放锁
- lock.unlock();
- }
- }
复制代码- //加锁
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- //...
- } finally {
- //释放锁
- lock.unlock();
- }
复制代码 6.3.4. offer 返回特殊值
- //如果数组为空,那么等待。写者加入元素后唤醒
- while (count == 0)
- notEmpty.await();
复制代码 6.3.5. poll 返回特殊值
- private E dequeue() {
- final Object[] items = this.items;
- @SuppressWarnings("unchecked")
- //获取最后一个元素并置为null
- E x = (E) items[takeIndex];
- items[takeIndex] = null;
- //已取到末尾,重置取值索引为0
- //这个数组是可以循环使用的,不需要扩容。
- if (++takeIndex == items.length)
- takeIndex = 0;
- count--;
- if (itrs != null)
- itrs.elementDequeued();
- //出队后唤醒写者
- notFull.signal();
- return x;
- }
复制代码 6.3.6. peek 返回特殊值
- public boolean offer(E e) {
- checkNotNull(e);
- //加锁
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- //已满,直接返回false
- if (count == items.length)
- return false;
- else {
- //未满,加入队列同时唤醒读者
- enqueue(e);
- return true;
- }
- } finally {
- //解锁
- lock.unlock();
- }
- }
复制代码 6.4. 总结
底层使用单向数组实现,可以有界也可以无界队列。
并且用了两个锁和两个condition。两个个锁说明读写可以同时进行,两个conditon说明读写相互唤醒
7. PriorityBlockingQueue
目录
- 1. 是什么
- 2. 使用场景
- 3. 如何使用
- 4. 各种BlockingQueue详解以及对比
- 5. ArrayBlockingQueue
- 5.1. 是什么
- 5.2. 如何使用
- 5.3. 原理分析
- 5.3.1. uml
- 5.3.2. 构造方法
- 5.3.2.1. 底层使用数组+Lock+Condtion实现
- 5.3.3. put【阻塞】
- 5.3.3.1. 加锁
- 5.3.3.2. 如果数组已经满了,那么等待
- 5.3.3.3. 没满则入队并唤醒读者
- 5.3.4. take【阻塞】
- 5.3.4.1. 加锁
- 5.3.4.2. 如果数组为空,那么等待
- 5.3.4.3. 不为空则出队并唤醒写者
- 5.3.5. offer【返回特殊值】
- 5.3.6. poll【返回特殊值】
- 5.3.7. add【抛出异常】
- 5.3.8. remove【抛出异常】
- 5.3.9. element【抛出异常】
- 5.3.10. peek【返回特殊值】
- 5.4. 总结
- 6. LinkedBlockingQueue
- 6.1. 是什么
- 6.2. 如何使用
- 6.3. 源码分析
- 6.3.1. 构造方法
- 6.3.1.1. 底层使用单向链表+Lock+Condition实现
- 6.3.1.2. Node
- 6.3.2. put【阻塞】
- 6.3.2.1. 加写锁
- 6.3.2.2. 如果队列已满那么等待
- 6.3.2.3. 未满则入队
- 6.3.2.4. 入队完发现队列没满,那么继续唤醒写者入队
- 6.3.2.5. 入队完解锁后发现之前队列是空的,那么唤醒读者
- 6.3.3. take【阻塞】
- 6.3.3.1. 加读锁
- 6.3.3.2. 队列为空那么等待
- 6.3.3.3. 未空则出队
- 6.3.3.4. 出了队发现队列没空,那么继续唤醒读者
- 6.3.3.5. 出了队解了锁发现之前队列是满的,那么唤醒写者
- 6.3.4. offer 返回特殊值
- 6.3.5. poll 返回特殊值
- 6.3.6. peek 返回特殊值
- 6.4. 总结
- 7. PriorityBlockingQueue
- 7.1. 是什么
- 7.2. 如何使用
- 7.3. 原理分析
- 7.3.1. 构造方法
- 7.3.1.1. 底层使用数组+Lock+Condition实现
- 7.3.2. put
- 7.3.2.1. 转调offer,不需要阻塞
- 7.3.2.1.1. 加锁
- 7.3.2.1.2. 判断是否需要扩容
- 7.3.2.1.3. 把元素加入堆的末尾
- 7.3.2.1.3.1. 上浮操作调整堆
- 7.3.2.1.3.2. 调整的过程图
- 7.3.3. take
- 7.3.3.1. 加锁
- 7.3.3.2. 一直阻塞等待,直到出队成功
- 7.3.3.2.1. 出队具体操作
- 7.3.3.2.1.1. 移除堆顶,末尾元素放到堆顶
- 7.3.3.2.1.2. 下沉操作调整堆
- 7.3.3.2.1.3. 调整的过程图
- 7.4. 总结
- 8. SynchronousQueue
- 8.1. 是什么
- 8.2. 使用
- 8.3. 原理
- 8.3.1. 构造方法
- 8.3.1.1. Transfer
- 8.3.1.2. QNode
- 8.3.2. put 阻塞
- 8.3.3. take 阻塞
- 8.4. 总结
- 9. 参考
7.1. 是什么
底层使用数组(二叉堆)实现的无界的阻塞队列
读读、读写、写写相互阻塞
可以排序
由于无界,所以put操作不会阻塞,但是take操作会阻塞(队列为空的时候)
7.1.1. 二叉堆
一颗完全二叉树,堆序性质为,每个节点的值都小于其左右子节点的值,二叉堆中最小的值就是根节点。
底层用数组进行存储。对于数组中的元素 a,其左子节点为 a[2i+1],其右子节点为 a[2i + 2],其父节点为 a[(i-1)/2]。
结构如下图:
7.2. 如何使用
- public E poll() {
- final ReentrantLock lock = this.lock;
- //加锁
- lock.lock();
- try {
- //长度为0直接返回null,否则出队并唤醒写者
- return (count == 0) ? null : dequeue();
- } finally {
- lock.unlock();
- }
- }
复制代码 7.3. 原理分析
7.3.1. 构造方法
7.3.1.1. 底层使用数组+Lock+Condition实现
[code]public class PriorityBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { //底层使用数组实现(堆) private transient Object[] queue; //实际使用的长度 private transient int size; //comparator确定元素的顺序,如果是null那么是自然序 private transient Comparator |