找回密码
 立即注册
首页 业界区 安全 [数据结构/Java] 数据结构之循环队列

[数据结构/Java] 数据结构之循环队列

豺独 3 小时前
1 概述:循环队列

循环队列


  • 循环队列: 一种先进先出(FIFO)的数据结构——它通过将【顺序队列】的末尾连接到开头,形成一个【环状结构】,从而解决了【顺序队列】的【虚假满状态问题】。
1.png

2.gif



  • 【队列】:一种先进先出(First In First Out)的线性表,简称FIFO。允许插入的一端称为【队尾(head)】,允许删除的一端称为【队头(tail)】。


  • 循环队列在大数据领域应用场景较为丰富。


  • Hadoop MapReduce 在 Shuffle 过程中的环形缓冲区
别名:循环缓冲队列,Circular Buffer Queue
在 Shuffle 过程中,【环形缓冲区】主要用于存储来自 Mapper 节点传输的数据。


  • 实时计算中,缓存单个设备的最近N秒的状态数据
  • ...


  • 环形缓冲区(Ring Buffer,也称为循环缓冲区或 Circular Buffer Queue)是【无锁队列】实现中一种【高效的数据结构】,特别适合【高性能并发场景】,如线程池任务调度、实时系统、服务器请求处理等。


  • 环形缓冲区是一个逻辑上通过【头指针】(head)和【尾指针】(tail)形成循环队列的数据结构。
换言之,它由一个【固定大小】的【列表】和两个【指针】组成。一个指针 head 用于指向队列的头部,另一个指针 tail 则用于指向队列的尾部。
当指针到达数组末尾时,自动“绕回”到开头(通过模运算)。它常用于【无锁队列】,因为:


  • 固定内存:避免动态分配,减少内存碎片。
  • 缓存友好:连续内存布局提高缓存命中率。
  • 高效操作:头尾指针通过原子操作更新,支持并发访问。
核心组件


  • 缓冲区:固定大小的数组或链表。
  • 头指针(head):指向下一个读取位置(消费者使用)。
  • 尾指针(tail):指向下一个写入位置(生产者使用)。
  • 大小计数器(size):跟踪队列中元素数量(可选,视场景)。
  • 原子操作:使用 std::atomic(C/C++) / AtomicXxx(Java) 等锁机制,确保 head 和 tail 的线程安全更新。
不同策略下的循环队列


  • 有锁的循环队列


  • 覆盖策略(满时入队覆盖队首)
  • 阻塞策略(满/空时阻塞等待)
  • 非阻塞策略(满/空时返回 null / 抛异常)


  • 无锁的循环队列
基于 ConcurrentLinkedDeque 实现线程安全的循环队列的思路


  • ConcurrentLinkedDeque 是 Java 并发包中提供的非阻塞线程安全的【双端队列】,基于【无锁】(CAS)机制实现。
java.util.concurrent.ConcurrentLinkedDeque


  • 要基于 ConcurrentLinkedDeque 实现线程安全的循环队列。
核心思路是:利用 ConcurrentLinkedDeque 的并发安全特性封装队列操作,通过固定容量限制实现“【循环】”(队列满时入队会覆盖/阻塞/抛异常,队空时出队会阻塞/抛异常)。


  • 核心设计要点


  • 固定容量:循环队列的核心是容量固定,满后入队需遵循循环规则(覆盖旧元素/阻塞/拒绝)。
  • 线程安全:复用  ConcurrentLinkedDeque  的并发安全特性,避免手动加锁。
  • 循环逻辑:入队时若队列满,根据策略处理(如覆盖队首、阻塞等待、抛异常);出队时若空,同理。
循环队列的特点


  • 优势:


  • 无动态分配,性能高。
  • 内存布局连续,缓存命中率高。
  • 适合固定容量的高性能场景。


  • 挑战:


  • 固定容量,可能溢出或不足。
  • MPMC 场景下【头尾指针竞争】,可能导致 CAS 重试。
  • 需要仔细处理【内存序】和【数据一致性】。
2 覆盖式循环队列

此版本,项目亲测。
2.1 实现思路


  • 基于 Java JDK 的 并发双端队列(java.util.concurrent.ConcurrentLinkedDeque)实现覆盖式循环队列。
容量固定,满时入队覆盖队首元素 (即: 新元素从队尾进入,满时覆盖队首元素(第n个元素向前覆盖第n-1个元素,n>=2))
2.2 源码实现(Java)

CoverStrategyCircularQueue
  1. import com.alibaba.fastjson.JSON;
  2. import javax.annotation.Nullable;
  3. import java.io.Serializable;
  4. import java.util.ArrayList;
  5. import java.util.Iterator;
  6. import java.util.List;
  7. import java.util.concurrent.ConcurrentLinkedDeque;
  8. import java.util.concurrent.atomic.AtomicInteger;
  9. /**
  10. * 基于 ConcurrentLinkedDeque 的循环队列(覆盖策略)
  11. * 容量固定,满时入队覆盖队首元素 (即: 新元素从队尾进入,满时覆盖队首元素(第n个元素向前覆盖第n-1个元素,n>=2))
  12. * @param <E> 队列元素类型
  13. */
  14. public class CoverStrategyCircularQueue<E> implements Serializable {
  15.     // 并发安全的双端队列(底层存储)
  16.     private final ConcurrentLinkedDeque<E> deque;
  17.     // 队列最大容量(原子类保证并发下计数准确)
  18.     private final int capacity;
  19.     // 当前元素数量(原子操作,避免并发计数错误)
  20.     private final AtomicInteger size = new AtomicInteger(0);
  21.     /**
  22.      * 构造循环队列
  23.      * @param capacity 最大容量(必须>0)
  24.      */
  25.     public CoverStrategyCircularQueue(int capacity) {
  26.         if (capacity <= 0) {
  27.             throw new IllegalArgumentException("The capacity must be greater than 0");//容量必须大于0
  28.         }
  29.         this.capacity = capacity;
  30.         this.deque = new ConcurrentLinkedDeque<>();
  31.     }
  32.     /**
  33.      * 基于 list , 构造循环队列
  34.      * @param capacity
  35.      * @param list 允许为 null
  36.      */
  37.     public CoverStrategyCircularQueue(int capacity, @Nullable List<E> list) {
  38.         this(capacity);
  39.         if(list != null && list.size() != 0){
  40.             for (E element : list) {
  41.                 offer(element);
  42.             }
  43.         }
  44.     }
  45.     /**
  46.      * 入队:新元素从队尾进入,满时覆盖队首元素(第n个元素向前覆盖第n-1个元素,n>=2)
  47.      * @param element 待入队元素
  48.      * @return 成功入队返回true(覆盖时也返回true)
  49.      */
  50.     public boolean offer(E element) {
  51.         if (element == null) {
  52.             throw new NullPointerException("The element not be null");//元素不能为null
  53.         }
  54.         // 循环核心:满了先移除队首,再入队
  55.         while (size.get() >= capacity) {
  56.             // 移除队首(CAS保证原子性,避免并发下重复移除)
  57.             if (deque.pollFirst() != null) {
  58.                 size.decrementAndGet();
  59.             }
  60.         }
  61.         // 入队尾
  62.         deque.offerLast(element);
  63.         size.incrementAndGet();
  64.         return true;
  65.     }
  66.     /**
  67.      * 出队:空时返回null
  68.      * @return 队首元素,空则返回null
  69.      */
  70.     public E poll() {
  71.         E element = deque.pollFirst();
  72.         if (element != null) {
  73.             size.decrementAndGet();
  74.         }
  75.         return element;
  76.     }
  77.     /**
  78.      * 查看队首元素(不移除)
  79.      * @return 队首元素,空则返回null
  80.      */
  81.     public E peekFirst() {
  82.         return deque.peekFirst();
  83.     }
  84.     /**
  85.      * 查看队尾元素(不移除)
  86.      * @return 队尾元素,空则返回 null
  87.      */
  88.     public E peekLast() {
  89.         return deque.peekLast();
  90.     }
  91.     /**
  92.      * 获取倒数第 index 个元素(不删除元素)
  93.      * @note 从队尾向队首遍历
  94.      * @param index 倒数第 index 个元素 , index ∈ [0, size - 1]
  95.      * @return
  96.      */
  97.     public E getLast(Integer index) {
  98.         Iterator<E> iterator = deque.descendingIterator();
  99.         E result = null;
  100.         int cursor = 0;
  101.         while( iterator.hasNext() && cursor < index) {
  102.             result = iterator.next();
  103.         }
  104.         return result;
  105.     }
  106.     /**
  107.      * 获取正数第 index 个元素(不删除元素)
  108.      * @note 从队首向队尾遍历
  109.      * @param index 正数第 index 个元素 , index ∈ [0, size - 1]
  110.      * @return
  111.      */
  112.     public E get(Integer index) {
  113.         Iterator<E> iterator = deque.iterator();
  114.         E result = null;
  115.         int cursor = 0;
  116.         while( iterator.hasNext() && cursor < index) {
  117.             result = iterator.next();
  118.         }
  119.         return result;
  120.     }
  121.     /**
  122.      * 获取当前元素数量
  123.      * @return 元素个数
  124.      */
  125.     public int size() {
  126.         return size.get();
  127.     }
  128.     /**
  129.      * 判断队列是否已满
  130.      * @return 满则true
  131.      */
  132.     public boolean isFull() {
  133.         return size.get() >= capacity;
  134.     }
  135.     /**
  136.      * 判断队列是否为空
  137.      * @return 空则true
  138.      */
  139.     public boolean isEmpty() {
  140.         return size.get() == 0;
  141.     }
  142.     /**
  143.      * 清空队列
  144.      */
  145.     public void clear() {
  146.         deque.clear();
  147.         size.set(0);
  148.     }
  149.     /**
  150.      * 获取底层的队列
  151.      * @note 原则上,不允许获取底层队列,但为了方便特殊场景下的数据操纵、及验证测试,故此处提供该方法
  152.      * @return
  153.      */
  154.     public ConcurrentLinkedDeque getDeque() {
  155.         return deque;
  156.     }
  157.     /**
  158.      * 转 List
  159.      * @return
  160.      */
  161.     public List<E> toList(){
  162.         List<E> list = new ArrayList<>();
  163.         if( this.deque == null || this.deque.isEmpty() ){
  164.             return list;
  165.         }
  166.         for (E element : this.deque) {
  167.             list.add(element);
  168.         }
  169.         return list;
  170.     }
  171.     @Override
  172.     public String toString() {
  173.         return "CoverStrategyCircularQueue{" +
  174.                 "deque=" + JSON.toJSONString(deque) +
  175.                 ", capacity=" + capacity +
  176.                 ", size=" + size +
  177.                 '}';
  178.     }
  179. }
复制代码
CircularQueueTest
  1. public class CircularQueueTest {
  2.     private final static Logger log = LoggerFactory.getLogger(CircularQueueTest.class);
  3.     @Test
  4.     public void CoverStrategyCircularQueueTest(){
  5.         int length = 5;
  6.         CoverStrategyCircularQueue<String> queue = new CoverStrategyCircularQueue<>(length);
  7.         //入队
  8.         for (int i = 0; i < length + 3; i++) {
  9.             queue.offer( (new Integer(i+1)).toString() );
  10.         }
  11.         log.info("queue:{}", queue);//[4, 5, 6, 7, 8]
  12.         //出队
  13.         queue.poll();
  14.         queue.poll();
  15.         queue.poll();
  16.         log.info("queue:{}", queue);//[7, 8]
  17.     }
  18. }
复制代码
3 阻塞式循环队列(等待策略)


  • 满时入队阻塞,空时出队阻塞,适合生产-消费模型,需结合 Lock  和 Condition  实现。


  • ReentrantLock : 可重入的互斥锁,又被称为“独占锁”
  • Condition : Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition的await()、signal()这种方式实现线程间协作更加安全和高效。因此,通常来说比较推荐使用Condition。
源码实现(Java)
  1. import java.util.concurrent.ConcurrentLinkedDeque;
  2. import java.util.concurrent.locks.Condition;
  3. import java.util.concurrent.locks.ReentrantLock;
  4. /**
  5. * 基于 ConcurrentLinkedDeque 的阻塞循环队列
  6. * 满时入队阻塞,空时出队阻塞
  7. * @param <E> 元素类型
  8. */
  9. public class BlockingCircularQueue<E> {
  10.     private final ConcurrentLinkedDeque<E> deque;
  11.     private final int capacity;
  12.     // 锁 + 条件变量(空/满)
  13.     private final ReentrantLock lock = new ReentrantLock();
  14.     private final Condition notEmpty = lock.newCondition();
  15.     private final Condition notFull = lock.newCondition();
  16.     public BlockingCircularQueue(int capacity) {
  17.         if (capacity <= 0) {
  18.             throw new IllegalArgumentException("容量必须大于0");
  19.         }
  20.         this.capacity = capacity;
  21.         this.deque = new ConcurrentLinkedDeque<>();
  22.     }
  23.     /**
  24.      * 阻塞入队:满则等待
  25.      * @param element 元素
  26.      * @throws InterruptedException 中断异常
  27.      */
  28.     public void put(E element) throws InterruptedException {
  29.         if (element == null) {
  30.             throw new NullPointerException("元素不能为null");
  31.         }
  32.         lock.lockInterruptibly();
  33.         try {
  34.             // 满则等待
  35.             while (deque.size() >= capacity) {
  36.                 notFull.await();
  37.             }
  38.             deque.offerLast(element);
  39.             // 唤醒出队阻塞的线程
  40.             notEmpty.signal();
  41.         } finally {
  42.             lock.unlock();
  43.         }
  44.     }
  45.     /**
  46.      * 阻塞出队:空则等待
  47.      * @return 队首元素
  48.      * @throws InterruptedException 中断异常
  49.      */
  50.     public E take() throws InterruptedException {
  51.         lock.lockInterruptibly();
  52.         try {
  53.             // 空则等待
  54.             while (deque.isEmpty()) {
  55.                 notEmpty.await();
  56.             }
  57.             E element = deque.pollFirst();
  58.             // 唤醒入队阻塞的线程
  59.             notFull.signal();
  60.             return element;
  61.         } finally {
  62.             lock.unlock();
  63.         }
  64.     }
  65.     /**
  66.      * 非阻塞入队:满则返回false
  67.      * @param element 元素
  68.      * @return 成功则true
  69.      */
  70.     public boolean offer(E element) {
  71.         if (element == null) {
  72.             throw new NullPointerException("元素不能为null");
  73.         }
  74.         lock.lock();
  75.         try {
  76.             if (deque.size() >= capacity) {
  77.                 return false;
  78.             }
  79.             deque.offerLast(element);
  80.             notEmpty.signal();
  81.             return true;
  82.         } finally {
  83.             lock.unlock();
  84.         }
  85.     }
  86.     /**
  87.      * 非阻塞出队:空则返回null
  88.      * @return 队首元素
  89.      */
  90.     public E poll() {
  91.         lock.lock();
  92.         try {
  93.             if (deque.isEmpty()) {
  94.                 return null;
  95.             }
  96.             E element = deque.pollFirst();
  97.             notFull.signal();
  98.             return element;
  99.         } finally {
  100.             lock.unlock();
  101.         }
  102.     }
  103.     // 辅助方法(size/isEmpty/isFull/clear)
  104.     public int size() {
  105.         lock.lock();
  106.         try {
  107.             return deque.size();
  108.         } finally {
  109.             lock.unlock();
  110.         }
  111.     }
  112.     public boolean isEmpty() {
  113.         return size() == 0;
  114.     }
  115.     public boolean isFull() {
  116.         return size() >= capacity;
  117.     }
  118.     public void clear() {
  119.         lock.lock();
  120.         try {
  121.             deque.clear();
  122.             notFull.signalAll(); // 唤醒所有入队阻塞线程
  123.         } finally {
  124.             lock.unlock();
  125.         }
  126.     }
  127. }
复制代码
关键注意事项


  • 并发计数准确性:


  • ConcurrentLinkedDeque.size()  是O(n)操作,高并发下性能差,因此基础版用  AtomicInteger  维护计数,阻塞版用锁保护计数。

  • null元素禁止: ConcurrentLinkedDeque  不允许null元素,因此入队时需校验。
  • 循环策略选择:


  • 覆盖策略:适合日志缓存、临时数据存储(允许旧数据被覆盖)。
  • 阻塞策略:适合生产-消费模型(如任务队列,需严格控制容量)。
  • 非阻塞策略:适合快速响应场景(满/空时直接返回,不阻塞)。
使用示例
  1. public class CircularQueueTest {
  2.     public static void main(String[] args) {
  3.         // 基础版(覆盖策略)
  4.         CircularQueue<String> queue = new CircularQueue<>(3);
  5.         queue.offer("A");
  6.         queue.offer("B");
  7.         queue.offer("C");
  8.         queue.offer("D"); // 满,覆盖队首"A"
  9.         System.out.println(queue.poll()); // 输出B
  10.         System.out.println(queue.size()); // 输出3(B/C/D)
  11.         // 阻塞版(生产-消费)
  12.         BlockingCircularQueue<Integer> blockingQueue = new BlockingCircularQueue<>(2);
  13.         // 生产者线程
  14.         new Thread(() -> {
  15.             try {
  16.                 blockingQueue.put(1);
  17.                 blockingQueue.put(2);
  18.                 blockingQueue.put(3); // 满,阻塞
  19.             } catch (InterruptedException e) {
  20.                 Thread.currentThread().interrupt();
  21.             }
  22.         }).start();
  23.         // 消费者线程
  24.         new Thread(() -> {
  25.             try {
  26.                 Thread.sleep(1000);
  27.                 System.out.println(blockingQueue.take()); // 输出1,唤醒生产者
  28.             } catch (InterruptedException e) {
  29.                 Thread.currentThread().interrupt();
  30.             }
  31.         }).start();
  32.     }
  33. }
复制代码
4 非阻塞循环队列(满/空时返回 null / 抛异常)

实现思路

基于 ConcurrentLinkedDeque 实现非阻塞循环队列


  • ConcurrentLinkedDeque 是 Java 并发包中提供的非阻塞、线程安全的双端队列,基于无锁(CAS)机制实现。
  • 要基于它封装非阻塞式的循环队列,核心思路是:

  • 限制队列容量,模拟循环队列的“固定长度”特性;
  • 利用双端队列的首尾操作(offer/poll)模拟循环入队/出队;
  • 基于 CAS 保证并发安全,避免阻塞(非阻塞核心);
  • 处理队列满/空时的非阻塞策略(如返回 false/空值,而非等待)。
核心实现要点


  • 容量限制:通过原子变量记录当前元素数,入队前检查是否已满,出队后更新计数;
  • 循环逻辑:无需手动维护头尾指针(ConcurrentLinkedDeque 已封装链表节点的 CAS 操作),仅需在容量满时拒绝入队,空时拒绝出队;
  • 非阻塞特性:所有操作均为非阻塞,失败时立即返回结果,不使用锁或条件变量等待。
源码实现(Java)

NonBlockingCircularQueue
  1. import java.util.concurrent.ConcurrentLinkedDeque;
  2. import java.util.concurrent.atomic.AtomicInteger;
  3. /**
  4. * 基于 ConcurrentLinkedDeque 实现的非阻塞循环队列
  5. * 特性:固定容量、线程安全、非阻塞(操作失败立即返回,不等待)
  6. * @param <E> 队列元素类型
  7. */
  8. public class NonBlockingCircularQueue<E> {
  9.     // 底层非阻塞双端队列
  10.     private final ConcurrentLinkedDeque<E> deque;
  11.     // 循环队列的固定容量
  12.     private final int capacity;
  13.     // 原子计数器,记录当前元素数量(保证并发下计数准确)
  14.     private final AtomicInteger size;
  15.     /**
  16.      * 构造函数:初始化循环队列容量
  17.      * @param capacity 队列容量(必须>0)
  18.      */
  19.     public NonBlockingCircularQueue(int capacity) {
  20.         if (capacity <= 0) {
  21.             throw new IllegalArgumentException("容量必须大于0");
  22.         }
  23.         this.capacity = capacity;
  24.         this.deque = new ConcurrentLinkedDeque<>();
  25.         this.size = new AtomicInteger(0);
  26.     }
  27.     /**
  28.      * 非阻塞入队(尾插):队列满则返回false,否则入队并返回true
  29.      * @param e 待入队元素(非null)
  30.      * @return 入队成功返回true,失败(队列满)返回false
  31.      */
  32.     public boolean offer(E e) {
  33.         if (e == null) {
  34.             throw new NullPointerException("元素不能为null");
  35.         }
  36.         // 自旋CAS保证计数准确(避免并发下容量判断错误)
  37.         while (true) {
  38.             int currentSize = size.get();
  39.             // 队列已满,直接返回false(非阻塞核心)
  40.             if (currentSize >= capacity) {
  41.                 return false;
  42.             }
  43.             // CAS更新计数:成功则执行入队,失败则重试
  44.             if (size.compareAndSet(currentSize, currentSize + 1)) {
  45.                 deque.offerLast(e);
  46.                 return true;
  47.             }
  48.             // CAS失败说明有其他线程修改了计数,自旋重试
  49.         }
  50.     }
  51.     /**
  52.      * 非阻塞出队(头出):队列空则返回null,否则出队并返回元素
  53.      * @return 出队元素,队列为空返回null
  54.      */
  55.     public E poll() {
  56.         // 自旋CAS保证计数准确
  57.         while (true) {
  58.             int currentSize = size.get();
  59.             // 队列为空,直接返回null(非阻塞核心)
  60.             if (currentSize <= 0) {
  61.                 return null;
  62.             }
  63.             // CAS更新计数:成功则执行出队,失败则重试
  64.             if (size.compareAndSet(currentSize, currentSize - 1)) {
  65.                 return deque.pollFirst();
  66.             }
  67.             // CAS失败说明有其他线程修改了计数,自旋重试
  68.         }
  69.     }
  70.     /**
  71.      * 查看队头元素(不删除)
  72.      * @return 队头元素,队列为空返回null
  73.      */
  74.     public E peek() {
  75.         return deque.peekFirst();
  76.     }
  77.     /**
  78.      * 获取当前队列元素数量
  79.      * @return 元素数量
  80.      */
  81.     public int size() {
  82.         return size.get();
  83.     }
  84.     /**
  85.      * 判断队列是否已满
  86.      * @return 满返回true,否则false
  87.      */
  88.     public boolean isFull() {
  89.         return size.get() >= capacity;
  90.     }
  91.     /**
  92.      * 判断队列是否为空
  93.      * @return 空返回true,否则false
  94.      */
  95.     public boolean isEmpty() {
  96.         return size.get() <= 0;
  97.     }
  98.     /**
  99.      * 获取队列容量
  100.      * @return 容量
  101.      */
  102.     public int getCapacity() {
  103.         return capacity;
  104.     }
  105. }
复制代码
关键细节说明


  • 原子计数器(AtomicInteger)

    • 必须用原子变量记录元素数量,避免并发下size()与实际队列元素数不一致(ConcurrentLinkedDeque 的size()是遍历计数,性能差且非原子);
    • 入队/出队时通过compareAndSet(CAS)自旋更新计数,保证计数与实际操作的原子性。

  • 非阻塞特性

    • 入队时若队列满,直接返回false,不阻塞等待;
    • 出队时若队列空,直接返回null,不阻塞等待;
    • 所有操作基于 CAS 自旋,无锁、无阻塞,适合高并发低延迟场景。

  • 循环逻辑

    • 无需手动维护循环队列的头尾指针(如数组实现的循环队列),ConcurrentLinkedDeque 已通过链表节点的 CAS 操作实现高效的首尾操作;
    • “循环”体现在“固定容量+满则拒绝入队”,出队后释放容量可再次入队,模拟循环复用空间。

  • 线程安全

    • 底层 ConcurrentLinkedDeque 保证了队列操作的线程安全;
    • 原子计数器保证了容量判断的准确性;
    • CAS 自旋解决了“检查-更新”的竞态条件(如先判断容量,再入队的间隙被其他线程修改容量)。

适用场景与局限性

适用场景


  • 高并发、低延迟的生产-消费场景;
  • 不需要阻塞等待(如生产者可丢弃数据,消费者可跳过空队列);
  • 容量固定,需循环复用队列空间。
局限性


  • 不支持阻塞式操作(如需阻塞等待,应使用ArrayBlockingQueue/LinkedBlockingDeque);
  • 基于链表实现,内存开销略高于数组实现的循环队列;
  • CAS 自旋在高并发下可能导致 CPU 占用升高(可通过限制自旋次数优化)。
优化方向


  • 自旋次数限制:避免无限自旋,可设置最大自旋次数,超过后返回失败;
  • 批量操作:提供批量入队/出队方法,减少 CAS 次数;
  • 公平性:可选公平策略(如按线程顺序自旋),避免线程饥饿;
  • 元素过期:支持过期元素自动清理,适配缓存场景。
Z 最佳实践

循环队列的性能优化建议


  • 高并发场景下,优先选择覆盖策略,避免【锁竞争】。
  • 若需阻塞策略,可考虑直接使用  ArrayBlockingQueue(JDK 原生,性能更优)
  • 避免频繁调用 size()  方法,高并发下改用  isEmpty() / isFull()  替代(O(1) 操作)。
基于循环队列实现滑动窗口


  • [Python/循环队列] 数据结构之滑动窗口 - 博客园/千千寰宇
方案2:基于【循环队列】实现【滑动窗口】
Hadoop MapReduce 在 Shuffle 过程中环形缓冲区的应用


  • Hadoop MapReduce 在 Shuffle 过程中的环形缓冲区
别名:循环缓冲队列,Circular Buffer Queue
在 Shuffle 过程中,【环形缓冲区】主要用于存储来自 Mapper 节点传输的数据。


  • 环形缓冲区的工作原理
【环形缓冲区】的工作原理是【基于生产者-消费者模型】的。


  • 在 Shuffle 过程中,Mapper 节点充当生产者的角色,将数据写入【环形缓冲区】;而 Reducer 节点则充当【消费者】的角色,从【环形缓冲区】中读取数据并进行后续的处理。
  • 当 Mapper 节点将数据写入【环形缓冲区】时,tail 指针会递增。
如果 tail 指针追上了 head 指针,表示缓冲区已满,此时 Mapper 节点会等待一段时间,直到 Reducer 节点读取并释放了一些空间,再将数据写入【环形缓冲区】。


  • 当 Reducer 节点从【环形缓冲区】中读取数据时,head 指针会递增。
如果 head 指针追上了 tail 指针,表示缓冲区已空,此时 Reducer 节点会等待一段时间,直到 Mapper 节点写入了更多的数据,再继续读取。


  • 环形缓冲区在 Shuffle 过程中的作用


  • 环形缓冲区在 Shuffle 过程中起到了至关重要的作用。
  • 它将 Mapper 节点产生的数据进行【临时存储】,以便 Reducer 节点能够按照预定的顺序和方式进行读取和处理。
  • 另外,由于 Hadoop 在 Shuffle 过程中使用了磁盘进行大规模的数据传输,而磁盘读写较慢。
因此,【环形缓冲区】通过在【内存】中存储数据,加速了数据的传输和处理过程,提高了整个 Shuffle 过程的效率和性能。


  • 推荐文献


  • Hadoop:Shuffle 过程中的环形缓冲区 - 极简博客
Y 推荐文献

X 参考文献


  • 环形缓冲区(Ring Buffer,也称为循环缓冲区或 Circular Buffer)是无锁队列实现中一种高效的数据结构 - CSDN
    本文作者:        千千寰宇   
    本文链接:         https://www.cnblogs.com/johnnyzen   
    关于博文:评论和私信会在第一时间回复,或直接私信我。   
    版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA     许可协议。转载请注明出处!
    日常交流:大数据与软件开发-QQ交流群: 774386015        【入群二维码】参见左下角。您的支持、鼓励是博主技术写作的重要动力!   

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册