Java源码分析系列笔记-12.BlockingQueue
目录[*]1. 是什么
[*]2. 使用场景
[*]3. 如何使用
[*]4. 各种BlockingQueue详解以及对比
[*]5. ArrayBlockingQueue
[*]5.1. 是什么
[*]5.2. 如何使用
[*]5.2.1. 方法选择
[*]5.3. 原理分析
[*]5.3.1. uml
[*]5.3.2. 构造方法
[*]5.3.2.1. 底层使用数组+Lock+Condtion实现
[*]5.3.3. put【阻塞】
[*]5.3.3.1. 加锁
[*]5.3.3.2. 如果数组已经满了,那么等待
[*]5.3.3.3. 没满则入队并唤醒读者
[*]5.3.4. take【阻塞】
[*]5.3.4.1. 加锁
[*]5.3.4.2. 如果数组为空,那么等待
[*]5.3.4.3. 不为空则出队并唤醒写者
[*]5.3.5. offer【返回特殊值】
[*]5.3.6. poll【返回特殊值】
[*]5.3.7. add【抛出异常】
[*]5.3.8. remove【抛出异常】
[*]5.3.9. element【抛出异常】
[*]5.3.10. peek【返回特殊值】
[*]5.4. 总结
[*]6. LinkedBlockingQueue
[*]6.1. 是什么
[*]6.2. 如何使用
[*]6.3. 源码分析
[*]6.3.1. 构造方法
[*]6.3.1.1. 底层使用单向链表+Lock+Condition实现
[*]6.3.1.2. Node
[*]6.3.2. put【阻塞】
[*]6.3.2.1. 加写锁
[*]6.3.2.2. 如果队列已满那么等待
[*]6.3.2.3. 未满则入队
[*]6.3.2.4. 入队完发现队列没满,那么继续唤醒写者入队
[*]6.3.2.5. 入队完解锁后发现之前队列是空的,那么唤醒读者
[*]6.3.3. take【阻塞】
[*]6.3.3.1. 加读锁
[*]6.3.3.2. 队列为空那么等待
[*]6.3.3.3. 未空则出队
[*]6.3.3.4. 出了队发现队列没空,那么继续唤醒读者
[*]6.3.3.5. 出了队解了锁发现之前队列是满的,那么唤醒写者
[*]6.3.4. offer 返回特殊值
[*]6.3.5. poll 返回特殊值
[*]6.3.6. peek 返回特殊值
[*]6.4. 总结
[*]7. PriorityBlockingQueue
[*]7.1. 是什么
[*]7.1.1. 二叉堆
[*]7.2. 如何使用
[*]7.3. 原理分析
[*]7.3.1. 构造方法
[*]7.3.1.1. 底层使用数组+Lock+Condition实现
[*]7.3.2. put
[*]7.3.2.1. 转调offer,不需要阻塞
[*]7.3.2.1.1. 加锁
[*]7.3.2.1.2. 判断是否需要扩容
[*]7.3.2.1.2.1. 需要的话进行扩容
[*]7.3.2.1.3. 把元素加入堆的末尾
[*]7.3.2.1.3.1. 上浮操作调整堆
[*]7.3.2.1.3.2. 调整的过程图
[*]7.3.3. take
[*]7.3.3.1. 加锁
[*]7.3.3.2. 一直阻塞等待,直到出队成功
[*]7.3.3.2.1. 出队具体操作
[*]7.3.3.2.1.1. 移除堆顶,末尾元素放到堆顶
[*]7.3.3.2.1.2. 下沉操作调整堆
[*]7.3.3.2.1.3. 调整的过程图
[*]7.4. 总结
[*]8. SynchronousQueue
[*]8.1. 是什么
[*]8.2. 使用
[*]8.3. 原理
[*]8.3.1. 构造方法
[*]8.3.1.1. Transfer
[*]8.3.1.2. QNode
[*]8.3.2. put 阻塞
[*]8.3.2.1. 调用TransferQueue
[*]8.3.3. take 阻塞
[*]8.4. 总结
[*]9. 参考
1. 是什么
线程安全的阻塞队列。
特点:
[*]先进先出:
既然是队列那肯定是先进先出
[*]阻塞
支持在插入元素时,如果队列已满,那么阻塞,等待队列非满
也支持在删除元素时,如果队列为空,那么阻塞,等待队列非空
[*]无界有界
数组容量的大小。无界其实是Integer.MAX_VALUE
[*]线程安全
2. 使用场景
生产者、消费者
3. 如何使用
方法\处理方式抛出异常返回特殊值一直阻塞超时退出插入方法add(e)offer(e)put(e)offer(e,time,unit)移除方法remove()poll()take()poll(time,unit)检查方法element()peek()不可用不可用4. 各种BlockingQueue详解以及对比
ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueueSynchronousQueue数据结构数组单向链表数组(二叉堆)单向链表怎么实现阻塞Lock+ConditionLock+ConditionLock+ConditionCAS+LockSupport有界/无界有界有界无界无界(不存储元素)吞吐量(以LinkedBlockingQueue为基准)比LinkedBlockingQueue低(读读、读写、写写相互阻塞)/ (读读、写写相互阻塞,读写不相互阻塞)无界(读读、读写、写写相互阻塞)比LinkedBlockingQueue高(读写匹配才能进行下去)
[*]ArrayBlockingQueue.md
[*]LinkedBlockingQueue.md
[*]PriorityBlockingQueue.md
[*]SynchronousQueue.md
5. ArrayBlockingQueue
5.1. 是什么
使用Object数组实现的有界的阻塞队列
读读、读写、写写相互阻塞
5.2. 如何使用
public class ArrayBlockingQueueTest
{
public static void main(String[] args) throws InterruptedException
{
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(1);
CountDownLatch latch = new CountDownLatch(2);
new Thread(()->{
for (int i = 0;;i++)
{
try
{
String data = "data" + i;
queue.put(data);
System.out.println("Producer放入消息:" + data);
TimeUnit.SECONDS.sleep(1);
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
latch.countDown();
}
}
}).start();
new Thread(()->{
for (;;)
{
try
{
System.out.println("Consumer获取消息:" + queue.take());
}
catch (Exception e)
{
e.printStackTrace();
}
finally
{
latch.countDown();
}
}
}).start();
latch.await();
}
}5.2.1. 方法选择
方法\处理方式抛出异常返回特殊值一直阻塞超时退出插入方法add(e)offer(e)put(e)offer(e,time,unit)移除方法remove()poll()take()poll(time,unit)检查方法element()peek()不可用不可用5.3. 原理分析
5.3.1. uml
5.3.2. 构造方法
5.3.2.1. 底层使用数组+Lock+Condtion实现
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//底层是数组实现的
final Object[] items;
//take, poll, peek or remove等读方法,读取下一个元素的位置
int takeIndex;
//put, offer, or add等方法,写入下一个元素的位置
int putIndex;
//数组中实际元素的数量
//当count==item.length()的时候说明数组已满
int count;
//一个锁说明读写互斥
final ReentrantLock lock;
//两个条件量
private final Condition notEmpty;//用来唤醒读线程
private final Condition notFull;//用来唤醒写线程
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object;
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull =lock.newCondition();
}
}6.3. 源码分析
6.3.1. 构造方法
6.3.1.1. 底层使用单向链表+Lock+Condition实现
public void put(E e) throws InterruptedException {
checkNotNull(e);
//加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果数组已经满了,那么等待。读者取出元素后唤醒
while (count == items.length)
notFull.await();
//没满,加入数组
enqueue(e);
} finally {
lock.unlock();
}
}
[*]7行:加读锁。一旦加了读锁其他读者无法同时进来读取数据,但是写者可以同时进来写数据
[*]10-12行:链表实际容量为0,那么读者阻塞等待写者写入
[*]14行:链表不为空的话,那么删除链表头部的元素
[*]15行:更新队列中元素的数量,-1,返回原值
[*]16-17行:取出了元素后发现队列还是不为空,那么唤醒其他读者继续读取
[*]22-24行:由这句c = count.getAndDecrement();可看出-1后返回的是c的原值,当他为capacity的时候说明之前队列可能是满的,那么加写锁、唤醒写者写入元素、解写锁
下面具体分析:
6.3.3.1. 加读锁
//加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//...
} finally {
lock.unlock();
}6.3.3.2. 队列为空那么等待
//如果数组已经满了,那么等待。直到读者取出元素后唤醒
while (count == items.length)
notFull.await();6.3.3.3. 未空则出队
[*]dequeue
enqueue(e);6.3.3.4. 出了队发现队列没空,那么继续唤醒读者
private void enqueue(E x) {
//把元素加入到队尾
final Object[] items = this.items;
items = x;
//已插入到末尾,重置插入索引为0
//这个数组是可以循环使用的,不需要扩容。
if (++putIndex == items.length)
putIndex = 0;
count++;
//插入后唤醒读者
notEmpty.signal();
}6.3.3.5. 出了队解了锁发现之前队列是满的,那么唤醒写者
public E take() throws InterruptedException {
//加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果数组为空,那么等待。写者加入元素后唤醒
while (count == 0)
notEmpty.await();
//出队
return dequeue();
} finally {
//释放锁
lock.unlock();
}
}
[*]signalNotFull
//加锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//...
} finally {
//释放锁
lock.unlock();
}6.3.4. offer 返回特殊值
//如果数组为空,那么等待。写者加入元素后唤醒
while (count == 0)
notEmpty.await();6.3.5. poll 返回特殊值
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//获取最后一个元素并置为null
E x = (E) items;
items = null;
//已取到末尾,重置取值索引为0
//这个数组是可以循环使用的,不需要扩容。
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//出队后唤醒写者
notFull.signal();
return x;
}6.3.6. peek 返回特殊值
public boolean offer(E e) {
checkNotNull(e);
//加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//已满,直接返回false
if (count == items.length)
return false;
else {
//未满,加入队列同时唤醒读者
enqueue(e);
return true;
}
} finally {
//解锁
lock.unlock();
}
}6.4. 总结
底层使用单向数组实现,可以有界也可以无界队列。
并且用了两个锁和两个condition。两个个锁说明读写可以同时进行,两个conditon说明读写相互唤醒
7. PriorityBlockingQueue
目录
[*]1. 是什么
[*]2. 使用场景
[*]3. 如何使用
[*]4. 各种BlockingQueue详解以及对比
[*]5. ArrayBlockingQueue
[*]5.1. 是什么
[*]5.2. 如何使用
[*]5.2.1. 方法选择
[*]5.3. 原理分析
[*]5.3.1. uml
[*]5.3.2. 构造方法
[*]5.3.2.1. 底层使用数组+Lock+Condtion实现
[*]5.3.3. put【阻塞】
[*]5.3.3.1. 加锁
[*]5.3.3.2. 如果数组已经满了,那么等待
[*]5.3.3.3. 没满则入队并唤醒读者
[*]5.3.4. take【阻塞】
[*]5.3.4.1. 加锁
[*]5.3.4.2. 如果数组为空,那么等待
[*]5.3.4.3. 不为空则出队并唤醒写者
[*]5.3.5. offer【返回特殊值】
[*]5.3.6. poll【返回特殊值】
[*]5.3.7. add【抛出异常】
[*]5.3.8. remove【抛出异常】
[*]5.3.9. element【抛出异常】
[*]5.3.10. peek【返回特殊值】
[*]5.4. 总结
[*]6. LinkedBlockingQueue
[*]6.1. 是什么
[*]6.2. 如何使用
[*]6.3. 源码分析
[*]6.3.1. 构造方法
[*]6.3.1.1. 底层使用单向链表+Lock+Condition实现
[*]6.3.1.2. Node
[*]6.3.2. put【阻塞】
[*]6.3.2.1. 加写锁
[*]6.3.2.2. 如果队列已满那么等待
[*]6.3.2.3. 未满则入队
[*]6.3.2.4. 入队完发现队列没满,那么继续唤醒写者入队
[*]6.3.2.5. 入队完解锁后发现之前队列是空的,那么唤醒读者
[*]6.3.3. take【阻塞】
[*]6.3.3.1. 加读锁
[*]6.3.3.2. 队列为空那么等待
[*]6.3.3.3. 未空则出队
[*]6.3.3.4. 出了队发现队列没空,那么继续唤醒读者
[*]6.3.3.5. 出了队解了锁发现之前队列是满的,那么唤醒写者
[*]6.3.4. offer 返回特殊值
[*]6.3.5. poll 返回特殊值
[*]6.3.6. peek 返回特殊值
[*]6.4. 总结
[*]7. PriorityBlockingQueue
[*]7.1. 是什么
[*]7.1.1. 二叉堆
[*]7.2. 如何使用
[*]7.3. 原理分析
[*]7.3.1. 构造方法
[*]7.3.1.1. 底层使用数组+Lock+Condition实现
[*]7.3.2. put
[*]7.3.2.1. 转调offer,不需要阻塞
[*]7.3.2.1.1. 加锁
[*]7.3.2.1.2. 判断是否需要扩容
[*]7.3.2.1.2.1. 需要的话进行扩容
[*]7.3.2.1.3. 把元素加入堆的末尾
[*]7.3.2.1.3.1. 上浮操作调整堆
[*]7.3.2.1.3.2. 调整的过程图
[*]7.3.3. take
[*]7.3.3.1. 加锁
[*]7.3.3.2. 一直阻塞等待,直到出队成功
[*]7.3.3.2.1. 出队具体操作
[*]7.3.3.2.1.1. 移除堆顶,末尾元素放到堆顶
[*]7.3.3.2.1.2. 下沉操作调整堆
[*]7.3.3.2.1.3. 调整的过程图
[*]7.4. 总结
[*]8. SynchronousQueue
[*]8.1. 是什么
[*]8.2. 使用
[*]8.3. 原理
[*]8.3.1. 构造方法
[*]8.3.1.1. Transfer
[*]8.3.1.2. QNode
[*]8.3.2. put 阻塞
[*]8.3.2.1. 调用TransferQueue
[*]8.3.3. take 阻塞
[*]8.4. 总结
[*]9. 参考
7.1. 是什么
底层使用数组(二叉堆)实现的无界的阻塞队列
读读、读写、写写相互阻塞
可以排序
由于无界,所以put操作不会阻塞,但是take操作会阻塞(队列为空的时候)
7.1.1. 二叉堆
一颗完全二叉树,堆序性质为,每个节点的值都小于其左右子节点的值,二叉堆中最小的值就是根节点。
底层用数组进行存储。对于数组中的元素 a,其左子节点为 ai+1],其右子节点为 ai + 2],其父节点为 a[(i-1)/2]。
结构如下图:
7.2. 如何使用
public E poll() {
final ReentrantLock lock = this.lock;
//加锁
lock.lock();
try {
//长度为0直接返回null,否则出队并唤醒写者
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}7.3. 原理分析
7.3.1. 构造方法
7.3.1.1. 底层使用数组+Lock+Condition实现
public class PriorityBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable { //底层使用数组实现(堆) private transient Object[] queue; //实际使用的长度 private transient int size; //comparator确定元素的顺序,如果是null那么是自然序 private transient Comparator
页:
[1]