汲佩杉 发表于 2025-7-30 19:55:10

JUC干货之六大阻塞队列BlockingQueue

摘要:你如果还不了解Java 21中BlockingQueue的六大阻塞队列,那么看这篇文章就够了。我会介绍阻塞队列的定义、种类、实现原理以及应用。
JUC干货系列目录:

[*]JAVA JUC干货之线程池状态和状态切换
[*]JAVA JUC干货之线程池实现原理和源码详解(上)
[*]JAVA JUC干货之线程池实现原理和源码详解(下)
[*]JAVA JUC干货之六大阻塞队列BlockingQueue
综述

  大家如果看完上述【JAVA JUC干货系列】文章,相信对线程池的理解会更丝滑流畅,例如java中的线程池由下列四个核心组件组成——线程池管理器ThreadPoolExecutor、用作缓冲区的任务队列、创建新线程的线程工厂和拒绝策略等。本文就在上述博客的基础上,分享线程池核心组件任务队列的基本概念、常用种类、实现原理以及应用。肝文不易,看完别忘了点赞关注哦。
  在多线程编程领域,所谓阻塞,是指在某些情况下会挂起线程,一旦条件满足,被挂起的线程又会自动被唤醒。阻塞队列中的“阻塞”也是这个意思。
  熟悉消息队列(MessageQueue)八股文的老铁一定知道消息队列有解耦、异步处理、提高系统可扩展性和削峰填谷神奇效果。同样阻塞队列BlockingQueue的作用也包含这四种,区别是BlockingQueue只作用于本机器,而消息队列相当于分布式BlockingQueue。
  为什么需要阻塞队列?它在消息传输过程中充当临时保存消息的容器,是实现生产者-消费者模型等常见并发模式的重要工具。当系统中出现“生产”和“消费”的速度不一致或稳定性等影响系统健壮性因素的时候,就需要阻塞队列削峰填谷,作为抽象层,能够有效地平衡生产者和消费者之间的速度差异,提供一种平滑和安全的数据交互方式。
  Java 中的线程池使用了两种类型的任务队列:有界队列和无界队列。有界队列可以限制任务队列的最大长度,控制待处理任务的数量;而无界队列则没有长度限制,可以永无止境地向其提交新任务。
BlockingQueue核心操作方法

  本章节介绍阻塞队列常用的方法及其行为。

[*]remove():移除队首元素,若移除成功,则返回true;如果移除失败(队列为空),则会抛出异常。
[*]element():返回队列头部元素但不移除,如果队列为空,抛出异常。
[*]peek():返回队列头部元素但不移除,如果队列为空,返回 null。
下面根据插入和获取对操作方法进行分类介绍。
插入数据

  add(E e) 向队列尾部写入新的数据e,如果插入成功,则返回true;如果队列已满则插入失败,抛出队列已满的异常。
  offer(E e):表示如果可能的话,将新数据写入队列尾部,即如果BlockingQueue有空间,则插入成功并返回true;否则,因队列已满而插入失败,返回false。本方法不阻塞当前执行方法的线程。
  offer(E e, long timeout, TimeUnit unit):添加元素到队列中,如果队列满了返回 false。可以设定等待插入元素的时间,如果在指定的时间内还不能加入,则抛出IllegalStateException(“Queue full”)异常。
  put(E e):添加元素到队列中,如果队列满了则调用此方法的线程被挂起,直到队列有空间再继续。
获取数据

  poll():从队列中移除并获取位于队首的元素,若成功,则返回队首元素;如果队列为空则返回 null。
  poll(long time):取走排在队列首位的对象。若队列为空不能立即取出元素,则可以等待time参数规定的时间, 超时仍然取不到时返回null;否则,返回取得的元素。
  poll(long timeout, TimeUnit unit):从队列头部获取数据并且该数据会从队列头部移除,如果队列为空则当前线程会阻塞指定的时间,直到在此期间有新的数据写入,或者阻塞的当前线程被其它线程中断,当线程由于超时退出阻塞时,返回值为null。
  take():从队列头部获取排在首位的对象并且该数据会从队列头部移除,如果队列没有任何元素则阻塞,直到队列中有元素被加入。
  drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
  peek():获取队首元素,若成功,则返回队首元素;否则,返回null。它只查看但不移除队列中的元素。
  我们以表格形式总结一下各个方法:
方法/处理方式抛出异常返回特殊值一直阻塞超时退出插入方法add(E e)offer(E e)put(E e)offer(E e, long timeout, TimeUnit unit)移除方法remove()poll()take()poll(long timeout, TimeUnit unit)检查方法element()peek()不可用不可用  虽然看前文后已经大概了解各个列标题的含义,但是,这里再深入总结一把,请熟悉的猿友直接跳过:

[*]抛出异常:当队列满时,如果再往队列里插入元素,会抛出IllegalStateException(“Queuefull”)异常。当队列空时,从队列里获取元素会抛出NoSuchElementException异常。
[*]返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。
[*]一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。 当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
[*]超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。
  如果是无界阻塞队列,队列不可能会出现被打满的情况,所以使用put或offer方法永远不会被阻塞,而且使用offer方法时,该方法永远返回true。
ArrayBlockingQueue

  基于数组结构实现的有界阻塞队列,按先进先出(FIFO)原则对任务排序。由于其容量固定,一旦创建,队列的大小不能改变。在高并发场景下,若任务提交速度过快,可能会频繁触发拒绝策略,适用于对资源使用较为严格、任务量相对稳定的场景。创建队列时需指定队列大小,故需要仔细斟酌队列长度,保证生产者和消费者速率相匹配。
实现原理

  内部维护一个数组用于存储元素,通过ReentrantLock来保证线程安全。在进行插入和移除操作时,会获取锁,操作完成后释放锁。
核心轮子

  put(E e)方法用于将元素放入队列,若队列已满则阻塞;take()方法用于从队列中取出元素,若队列为空则阻塞。下面提供一个调用add()和take() 方法的示例。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

/**
* @Author Wiener
* @Date 2025-07-09
* @Description: 队列超过指定容量
*/
public class FullQueueDemo {

    public static void main(String[] args) {
      // 初始化时指定队列最大容量为3
      BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
      try {
            queue.add("测试1");
            queue.add("测试2");
            queue.add("测试3");
            //队列已经饱和,不再接收新元素抛出异常
            queue.add("我触发异常");
      } catch (IllegalStateException ie) {
            System.out.println(ie);
            try {
                System.out.println("取出队头元素:" + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 输出队列中剩余元素个数
            System.out.println("剩余元素个数:" + queue.size());
      }
    }
}执行结果如下:
java.lang.IllegalStateException: Queue full
取出队头元素:测试1
剩余元素个数:2LinkedBlockingQueue

  基于单向链表结构实现的阻塞队列,按先进先出原则对任务排序。如果创建时没有指定队列容量,则默认是无界队列,理论上大小为Integer.MAX_VALUE = 2^31 - 1;
  开发过程中使用无界队列时,存在内存溢出风险,建议初始化的时候指定队列长度。在并发场景下执行入队和出队操作时,ArrayBlockingQueue共用一把锁,并发较低;而LinkedBlockingQueue分别使用了写和读两把锁,故吞吐量高于前者。
  适用场景不同,ArrayBlockingQueue适用于明确限制队列大小(即初始化大小后不能扩容)的场景,防止生产速度大于消费速度的时候,造成内存溢出、资源耗尽。LinkedBlockingQueue适用于业务高峰期可以自动扩容提升消费速度的场景。
  当使用该队列时,线程池中的线程数量通常不会超过核心线程数,因为在核心线程都繁忙时,新来的任务就会被放入这个无界的队列中等待执行,而不会创建新的线程(除非指定任务队列容量且已满时,还在提交任务)。这种队列适用于任务量较大且执行时间较短的场景,可避免过多线程创建导致的资源耗尽问题,但需注意可能会因为任务堆积过多而耗尽内存。
实现原理

  使用单向链表存储元素,同样使用ReentrantLock保证线程安全,不过它有两把锁,分别用于入队和出队操作,减少锁竞争。在队列爆仓或为空时,通过Condition实现线程的阻塞和唤醒。
核心轮子

  put(E e)方法用于将元素放入队列,take()方法用于从队列中取出元素,与ArrayBlockingQueue类似,但由于理论上无界,put方法不会因为队列满而阻塞。
SynchronousQueue

  SynchronousQueue中文名是同步移交队列,队列长度为0,所以没有容量。从名字就知道它的作用是一个线程向队列插入数据的时候,必须一直阻塞等待另一个线程从队列中取走数据。同样,从队列中取走数据的时候,必须等待另一个线程往队列中插入数据。
基本特性

阻塞操作:put() 和 take() 操作会相互阻塞,直到一个插入操作和一个删除操作配对成功为止。
线程安全:使用内部锁ReentrantLock和条件变量Condition来确保线程安全。
高效传输:没有内部容量用于存储元素,适合一对一的直接数据交换,而不涉及数据的存储。
应用场景

  由这些基本特性可知,SynchronousQueue更像是一种任务传递的媒介,适合于需要线程直接交换数据的场景,即把任务从生产者线程传递到消费者线程手上,保证了任务即时处理,不存在任务排队等待的情况。SynchronousQueue 非常适合以下场景:

[*]线程池:在 Executors.newCachedThreadPool() 方法中,SynchronousQueue 被用作线程池的工作队列。它使得线程池能够根据需要动态地扩展和收缩线程数量,弊端是可能导致线程池创建过多线程,这些线程都在竞争CPU时间片,等待CPU调度,最终会拖慢任务处理速度。
[*]直接数据交换:在线程之间需要进行一对一数据交换的场景中使用SynchronousQueue,例如生产者和消费者之间直接交换数据而不需要中间存储。
[*]精细化调控任务执行速度:用于限制任务的执行速度,例如一个线程必须等待另一个线程完成后才能继续处理任务。
实现原理

内部通过TransferQueue接口实现,采用一种复杂的 “配对” 机制。当一个线程执行插入操作(put)时,它会等待另一个线程来执行移除操作(take),两者直接进行数据传递,而不经过队列存储。
  SynchronousQueue 使用 ReentrantLock 和 Condition 来实现线程间的等待和通知机制。每个 SynchronousQueue 实例都包含一个核心内部类 Transferer,该类使用了一种高效的基于锁和条件变量的机制来管理生产者和消费者之间的直接通信,而无需存储任何元素。这种设计使得 SynchronousQueue 在某些特定的高并发场景下非常高效。Transferer 有两种基本模式:
直接传输:一个线程将元素传递给另一个线程,两者直接进行数据交换。
请求等待:一个线程请求一个元素,如果没有可用的元素,它将阻塞等待;另一个线程提供一个元素,然后唤醒等待的线程。
案例分析

  下面看一个SynchronousQueue的简单用例。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;

/**
* @Author Wiener
* @Date 2025-07-22
*/
public class SyncQueueTest {
    // 创建一个 SynchronousQueue
    private static BlockingQueue<Integer> queue = new SynchronousQueue<>();

    public static void main(String[] args) {

      // 生产者线程,往队列中放5个元素
      Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                  System.out.println(Thread.currentThread().getName() + " 插入数据: " + i);
                  // 阻塞插入
                  queue.put(i);
                  // 模拟延迟500毫秒
                  Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
      });

      // 消费者线程,从队列中取出5个元素
      Thread consumer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                  Integer value = queue.take();// 阻塞移除
                  System.out.println(Thread.currentThread().getName() + " 移除元素: " + value);
                  // 模拟延迟1000毫秒
                  Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
      });
      // 启动线程
      producer.start();
      consumer.start();
    }

}执行结果:
Thread-0 插入数据: 1
Thread-1 移除元素: 1
Thread-0 插入数据: 2
Thread-1 移除元素: 2
Thread-0 插入数据: 3
Thread-1 移除元素: 3
Thread-0 插入数据: 4
Thread-1 移除元素: 4
Thread-0 插入数据: 5
Thread-1 移除元素: 5优点
  高效的并发性能:由于使用了锁分离技术,读写操作可以并发执行,提高了性能。
  灵活的双端操作:支持从队列的两端插入和删除元素。
  阻塞特性:简化了多线程编程,避免了忙等待。
缺点
  高并发下的性能问题:如果队列大小设置不当,可能在高并发下导致资源争用。
  内存占用:对于无界队列,如果数据生产速度远大于消费速度,可能导致内存溢出。
总结


  LinkedBlockingDeque 是一个非常强大的并发数据结构,结合了双端队列和阻塞队列的优点,适用于需要高效并发处理任务的场景。它非常适合用于多线程环境下的任务调度、数据缓存、生产者-消费者模型等。如果你有更多关于 LinkedBlockingDeque 的具体问题或需要进一步的示例,请在评论区告诉我!希望这些信息对你有帮助!
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: JUC干货之六大阻塞队列BlockingQueue