找回密码
 立即注册
首页 业界区 业界 Disruptor—1.原理和使用简介

Disruptor—1.原理和使用简介

请蒂 2025-6-3 00:17:09
大纲
1.Disruptor简介
2.Disruptor和BlockingQueue的压测对比
3.Disruptor的编程模型
4.Disruptor的数据结构与生产消费模型
5.RingBuffer + Disruptor + Sequence相关类
6.Disruptor的WaitStrategy消费者等待策略
7.EventProcessor + EventHandler等类
8.Disruptor的运行原理图
9.复杂业务需求下的编码方案和框架
10.Disruptor的串行操作
11.Disruptor的并行操作
12.Disruptor的多边形操作
13.Disruptor的多生产者和多消费者
 
1.Disruptor简介
(1)Disruptor是什么
(2)Disruptor的特点
(3)Disruptor的核心
 
(1)Disruptor是什么
Martin Fowler在自己网站上写了一篇LMAX架构的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,能够以很低的延迟产生大量的交易。LMAX是建立在JVM平台上,其核心是一个业务逻辑处理器,能够在一个线程里每秒处理6百万订单。LMAX业务逻辑处理器完全是运行在内存中,使用事件驱动方式,其核心是Disruptor。
 
(2)Disruptor的特点
大大简化了并发程序开发的难度,性能上比Java提供的一些并发包还好。
 
Disruptor是一个高性能异步处理框架,实现了观察者模式。Disruptor是无锁的、是CPU友好的。Disruptor不会清除缓存中的数据,只会覆盖缓存中的数据,不需要进行垃圾回收。Disruptor业务逻辑是纯内存操作,使用事件驱动方式。
 
(3)Disruptor的核心
Disruptor核心是一个RingBuffer,RingBuffer是一个数组,没有首尾指针。RingBuffer是一个首尾相接的环,用于在不同线程之间传递数据。
 
如果RingBuffer满了,是继续覆盖还是等待消费,由生产者和消费者决定。假设RingBuffer满了,生产者有两个选择:选择一是等待RingBuffer有空位再填充,选择二是直接覆盖。同时消费者也有两种选择:选择一是等待RingBuffer满了再消费,选择二是RingBuffer填充一个就消费一个。
 
RingBuffer有一个序号Sequence,这个序号指向数组中下一个可用元素。随着数据不断地填充这个数组,这个序号会一直增长,直到绕过这个环。序号指向的元素,可以通过mod计算:序号 % 长度 = 索引。建议将长度设为2的n次方,有利于二进制计算:序号 & (长度 - 1) = 索引。
 
Sequence通过顺序递增的序号来进行编号,以此管理正在进行交换的数据(事件)。对数据处理的过程总是沿着需要逐个递增处理,从而实现线程安全。一个Sequence用于跟踪标识某个特定的事件处理者的处理进度。
 
2.Disruptor和BlockingQueue的压测对比
Disruptor的性能是ArrayBlockingQueue的3倍+,这里的测试代码都是基于单线程的单生产者单消费者模式运行的。但是Disruptor本身就支持多生产者多消费者模型,测试中使用单线程明显降低了其性能。而ArrayBlockingQueue在多生产者多消费者场景下,其性能又会比单生产者单消费者场景下更低。因此,在实际应用中,Disruptor的性能会是ArrayBlockingQueue的3倍+。
  1. public interface Constants {
  2.     int EVENT_NUM_OHM = 100000000;
  3.     int EVENT_NUM_FM = 50000000;
  4.     int EVENT_NUM_OM = 10000000;
  5. }
  6. public class ArrayBlockingQueue4Test {
  7.     public static void main(String[] args) {
  8.         //初始化一个大小为100000000的有界队列ArrayBlockingQueue,为了避免在测试时由于扩容影响性能,所以一开始就初始化大小为1亿
  9.         final ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<Data>(100000000);
  10.         //开始时间
  11.         final long startTime = System.currentTimeMillis();
  12.         //向容器中添加元素
  13.         new Thread(new Runnable() {
  14.             public void run() {
  15.                 long i = 0;
  16.                 //首先把数据投递到有界队列ArrayBlockingQueue,单线程的生产者
  17.                 while (i < Constants.EVENT_NUM_OHM) {
  18.                     Data data = new Data(i, "c" + i);
  19.                     try {
  20.                         queue.put(data);
  21.                     } catch (InterruptedException e) {
  22.                         e.printStackTrace();
  23.                     }
  24.                     i++;
  25.                 }
  26.             }
  27.         }).start();
  28.   
  29.         //从容器中取出元素
  30.         new Thread(new Runnable() {
  31.             public void run() {
  32.                 int k = 0;
  33.                 //然后才开始消费有界队列中的数据,单线程的消费者
  34.                 while (k < Constants.EVENT_NUM_OHM) {
  35.                     try {
  36.                         queue.take();
  37.                     } catch (InterruptedException e) {
  38.                         e.printStackTrace();
  39.                     }
  40.                     k++;
  41.                 }
  42.                 //结束时间
  43.                 long endTime = System.currentTimeMillis();
  44.                 //整个main函数就是单线程运行,处理1千万数据,大概耗时3.6秒
  45.                 System.out.println("ArrayBlockingQueue costTime = " + (endTime - startTime) + "ms");
  46.             }
  47.         }).start();
  48.     }
  49. }
  50. public class DisruptorSingle4Test {
  51.     public static void main(String[] args) {
  52.         int ringBufferSize = 65536;
  53.         final Disruptor<Data> disruptor = new Disruptor<Data>(
  54.             new EventFactory<Data>() {
  55.                 public Data newInstance() {
  56.                     return new Data();
  57.                 }
  58.             },
  59.             ringBufferSize,
  60.             //设置为单线程运行
  61.             Executors.newSingleThreadExecutor(),
  62.             //单生产者模式
  63.             ProducerType.SINGLE,
  64.             //new BlockingWaitStrategy()
  65.             new YieldingWaitStrategy()
  66.         );
  67.   
  68.         //创建一个消费者事件处理器
  69.         DataConsumer consumer = new DataConsumer();
  70.         //消费数据
  71.         disruptor.handleEventsWith(consumer);
  72.         disruptor.start();
  73.   
  74.         //单线程的消费者
  75.         new Thread(new Runnable() {
  76.             public void run() {
  77.                 RingBuffer<Data> ringBuffer = disruptor.getRingBuffer();
  78.                 for (long i = 0; i < Constants.EVENT_NUM_OHM; i++) {
  79.                     long seq = ringBuffer.next();
  80.                     Data data = ringBuffer.get(seq);
  81.                     data.setId(i);
  82.                     data.setName("c" + i);
  83.                     //发布一个数据被消费的事件
  84.                     ringBuffer.publish(seq);
  85.                 }
  86.             }
  87.         }).start();
  88.     }
  89. }
  90. public class DataConsumer implements EventHandler<Data> {
  91.     private long startTime;
  92.     private int i;
  93.    
  94.     public DataConsumer() {
  95.         this.startTime = System.currentTimeMillis();
  96.     }
  97.    
  98.     public void onEvent(Data data, long seq, boolean bool) throws Exception {
  99.         i++;
  100.         if (i == Constants.EVENT_NUM_OHM) {
  101.             long endTime = System.currentTimeMillis();
  102.             //处理1千万的数据,大概耗时1.1秒
  103.             System.out.println("Disruptor costTime = " + (endTime - startTime) + "ms");
  104.             //可见Disruptor的性能是ArrayBlockingQueue的3倍+
  105.         }
  106.     }
  107. }
复制代码
 
3.Disruptor的编程模型
(1)Disruptor的使用步骤
(2)Disruptor的使用演示
 
(1)Disruptor的使用步骤
  1. 步骤一:建立一个Event工厂类,用于创建数据(Event类实例对象)
  2. 步骤二:建立一个监听事件类(Event处理器),用于处理数据(Event类实例对象)
  3. 步骤三:创建Disruptor实例,配置一系列参数
  4. 步骤四:编写生产者组件,向Disruptor容器投递数据
复制代码
(2)Disruptor的使用演示
一.引入pom依赖
  1. <dependency>
  2.     <groupId>com.lmax</groupId>
  3.     disruptor</artifactId>
  4.     <version>3.3.2</version>
  5. </dependency>
复制代码
二.建立Event工厂类用于创建数据
Event工厂类创建的数据就是Event类实例对象。
  1. public class OrderEvent {
  2.     //订单的价格
  3.     private long value;
  4.    
  5.     public long getValue() {
  6.         return value;
  7.     }
  8.    
  9.     public void setValue(long value) {
  10.         this.value = value;
  11.     }
  12. }
  13. public class OrderEventFactory implements EventFactory<OrderEvent> {
  14.     //返回一个空的数据对象(OrderEvent对象实例)
  15.     public OrderEvent newInstance() {
  16.         return new OrderEvent();
  17.     }
  18. }
复制代码
三.建立监听事件类用于处理数据
监听事件类就是Event处理器,处理的数据就是Event类实例对象
  1. public class OrderEventHandler implements EventHandler<OrderEvent> {
  2.     public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {
  3.         Thread.sleep(1000);
  4.         System.err.println("消费者: " + event.getValue());
  5.     }
  6. }
复制代码
四.创建Disruptor对象实例
  1. public class Main {
  2.     public static void main(String[] args) {
  3.         //参数准备
  4.         OrderEventFactory orderEventFactory = new OrderEventFactory();
  5.         int ringBufferSize = 4;
  6.         ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  7.   
  8.         //参数一:eventFactory,消息(Event)工厂对象
  9.         //参数二:ringBufferSize,容器的长度
  10.         //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler
  11.         //参数四:ProducerType,单生产者还是多生产者
  12.         //参数五:waitStrategy,等待策略
  13.         //1.实例化Disruptor对象
  14.         Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
  15.             orderEventFactory,
  16.             ringBufferSize,
  17.             executor,
  18.             ProducerType.SINGLE,//单生产者
  19.             new BlockingWaitStrategy()
  20.         );
  21.   
  22.         //2.添加Event处理器,用于处理事件
  23.         //也就是构建Disruptor与消费者的一个关联关系
  24.         disruptor.handleEventsWith(new OrderEventHandler());
  25.   
  26.         //3.启动disruptor
  27.         disruptor.start();
  28.         ...
  29.     }
  30. }
复制代码
五.编写生产者组件向Disruptor容器投递数据
  1. public class Main {
  2.     public static void main(String[] args) {
  3.         //参数准备
  4.         OrderEventFactory orderEventFactory = new OrderEventFactory();
  5.         int ringBufferSize = 4;
  6.         ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
  7.   
  8.         //参数一:eventFactory,消息(Event)工厂对象
  9.         //参数二:ringBufferSize,容器的长度
  10.         //参数三:executor,线程池(建议使用自定义线程池),RejectedExecutionHandler
  11.         //参数四:ProducerType,单生产者还是多生产者
  12.         //参数五:waitStrategy,等待策略
  13.         //1.实例化Disruptor对象
  14.         Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(
  15.             orderEventFactory,
  16.             ringBufferSize,
  17.             executor,
  18.             ProducerType.SINGLE,
  19.             new BlockingWaitStrategy()
  20.         );
  21.   
  22.         //2.添加Event处理器,用于处理事件
  23.         //也就是构建Disruptor与消费者的一个关联关系
  24.         disruptor.handleEventsWith(new OrderEventHandler());
  25.   
  26.         //3.启动disruptor
  27.         disruptor.start();
  28.          
  29.         //4.获取实际存储数据的容器: RingBuffer
  30.         RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();
  31.         OrderEventProducer producer = new OrderEventProducer(ringBuffer);
  32.         ByteBuffer bb = ByteBuffer.allocate(8);
  33.         for (long i = 0; i < 5; i++) {
  34.             bb.putLong(0, i);
  35.             //向容器中投递数据
  36.             producer.sendData(bb);
  37.         }
  38.         
  39.         disruptor.shutdown();
  40.         executor.shutdown();
  41.     }
  42. }
  43. public class OrderEventProducer {
  44.     private RingBuffer<OrderEvent> ringBuffer;
  45.    
  46.     public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {
  47.         this.ringBuffer = ringBuffer;
  48.     }
  49.    
  50.     public void sendData(ByteBuffer data) {
  51.         //1.在生产者发送消息时, 首先需要从ringBuffer里获取一个可用的序号
  52.         long sequence = ringBuffer.next();
  53.         try {
  54.             //2.根据这个序号, 找到具体的"OrderEvent"元素
  55.             //注意:此时获取的OrderEvent对象是一个没有被赋值的"空对象"
  56.             OrderEvent event = ringBuffer.get(sequence);
  57.             //3.进行实际的赋值处理
  58.             event.setValue(data.getLong(0));
  59.         } finally {
  60.             //4.提交发布操作
  61.             ringBuffer.publish(sequence);
  62.         }
  63.     }
  64. }
复制代码
 
4.Disruptor的数据结构与生产消费模型
(1)Disruptor的核心与原理
(2)Disruptor的RingBuffer数据结构
(3)Disruptor的生产消费模型
 
(1)Disruptor的核心与原理
Disruptor的核心是RingBuffer,生产者向RingBuffer中写入元素,消费者从RingBuffer中消费元素。
 
(2)Disruptor的RingBuffer数据结构
RingBuffer是一个首尾相接的环(数组),用于在不同上下文(线程)之间传递数据。
 
RingBuffer拥有一个序号,这个序号指向数组中下一个可用的元素。随着生产者不停地往RingBuffer写入元素,这个序号也会一直增长,直到这个序号绕过这个环。
 
要找到RingBuffer数组中当前序号指向的元素,可以通过mod操作:序号 % 数组长度 = 数组索引。建议将长度设为2的n次方,有利于二进制计算:序号 & (长度 - 1) = 索引。
 
(3)Disruptor的生产消费模型
一.消费快生产慢
如果消费者从RingBuffer消费元素的速度大于生产者写入元素的速度,那么当消费者发现RingBuffer没有元素时,就要停下等待生产者写入元素。
 
二.生产快消费慢
如果生产者向RingBuffer写入元素的速度大于消费者消费元素的速度,那么当生产者发现RingBuffer已经满了,就要停下等待消费者消费元素。
 
因为RingBuffer数组的长度是有限的,生产者写入到RingBuffer的末尾时,会从RingBuffer的开始位置继续写入,这时候生产者就可能会追上消费者。
 
5.RingBuffer + Disruptor + Sequence相关类
(1)RingBuffer类
(2)Disruptor类
(3)Sequence类
(4)Sequencer接口
(5)SequenceBarrier类
 
(1)RingBuffer类
RingBuffer不仅是基于数组的缓存,也是创建Sequencer与定义WaitStrategy的入口。
 
(2)Disruptor类
Disruptor类可认为是一个持有RingBuffer、消费者线程池、消费者集合等引用的辅助类。
 
(3)Sequence类
通过顺序递增的序号来编号,管理正在进行交换的数据(事件)。对数据(事件)的处理总是沿着序号逐个递增,所以能够实现多线程下的并发安全与原子性。
 
一个Sequence用于跟踪标识某个特定的事件处理者的处理进度,也就是事件处理者在RingBuffer中的处理进度。每一个Producer和Consumer都有一个自己的Sequence。
 
Sequence可以看成是一个AtomicLong类型字段,用于标识进度。Sequence还可以防止不同Sequence之间CPU缓存的伪共享问题。
 
Sequence的两个作用:
作用一:用于递增标识进度
作用二:用于消除伪共享
 
(4)Sequencer接口
一.Sequencer包含Sequence
二.Sequencer接口有两个实现类
第一个实现类是SingleProducerSequencer
第二个实现类是MultiProducerSequencer
 
(5)SequenceBarrier类
作用一:用于保持对RingBuffer的生产者和消费者之间的平衡关系,比如让生产者或消费者进行等待、唤醒生产者或消费者
 
作用二:决定消费者是否还有可处理的事件
 
6.Disruptor的WaitStrategy消费者等待策略
(1)WaitStrategy接口的作用
(2)消费者等待策略的种类
(3)BlockingWaitStrategy
(4)SleepingWaitStrategy
(5)YieldingWaitStrategy
 
(1)WaitStrategy接口的作用
决定一个消费者将会如何等待生产者将Event投递到Disruptor。
 
(2)消费者等待策略的种类
  1. BlockingWaitStrategy,通过阻塞的方式进行等待
  2. SleepingWaitStrategy,通过休眠的方式进行等待
  3. YieldingWaitStrategy,通过线程间的切换的方式进行等待
复制代码
(3)BlockingWaitStrategy
BlockingWaitStrategy是最低效的等待策略,但是对CPU的消耗最小,并且在各种不同部署环境中能提供一致的性能表现。该策略需要使用到Java中的锁,也就是会通过ReentrantLock来阻塞消费者线程。而Disruptor本身是一个无锁并发框架,所以如果追求高性能,就不要选择这种策略。
 
(4)SleepingWaitStrategy
SleepingWaitStrategy是性能一般的等待策略,其性能表现和BlockingWaitStrategy差不多。但由于SleepingWaitStrategy是无锁的,所以对生产者线程的影响最小。该策略对CPU的消耗一般,通过在单个线程循环 + yield切换线程实现,所以这种策略特别适合于异步日志类似的场景。
 
(5)YieldingWaitStrategy
YieldingWaitStrategy的性能是最好的,适合于低延迟的系统。不过该策略对CPU的消耗最高,因为完全基于yield切换线程来实现。推荐用于要求高性能且事件处理线程数小于CPU逻辑核心数的场景中,尤其是当CPU开启了超线程特性的时候。
 
7.EventProcessor + EventHandler等类
(1)Event对象
(2)EventProcessor接口
(3)EventHandler接口
(4)WorkProcessor类
 
(1)Event对象
Disruptor中的Event指的是从生产者到消费者过程中所处理的数据对象。Disruptor中没有代码表示Event,它用泛型表示,完全由用户定义。比如创建一个RingBuffer对象时,其中的泛型就表示着这个Event对象。
 
(2)EventProcessor接口
EventProcessor用于处理Disruptor中的Event,拥有消费者的Sequence,它有一个实现类叫BatchEventProcessor。
 
由于EventProcessor接口继承自Runnable接口,所以BatchEventProcessor类会实现Runnable接口的run()方法。
 
其实BatchEventProcessor类是Disruptor框架中最核心的类,因为它的run()方法会不断轮询并获取数据对象,然后把数据对象(Event)交给消费者去处理,也就是即回调EventHandler接口的实现类对象的onEvent()方法。
 
(3)EventHandler接口
EventHandler是由用户实现的并且代表了Disruptor中的一个消费者接口,也就是消费者逻辑需要在EventHandler接口的onEvent()方法实现。
 
(4)WorkProcessor类
WorkProcessor类可确保每个Sequence只被一个Processor消费。注意:在单消费者模式下,使用的是EventHandler,对应于EventProcessor。在多消费者模式下,使用的是WorkHandler,对应于WorkProcessor。
 
8.Disruptor的运行原理图
1.png
 
9.复杂业务需求下的编码方案和框架
(1)方案选择
(2)框架选择
 
(1)方案选择
方案一:完全解耦的模式,比如一个子业务线也开一个项目,此时重复代码会比较多。
 
方案二:模版方法模式,如果业务快速迭代,可能也会需要经常重构底层的模版方法。
 
(2)框架选择
一.使用有限状态机框架
二.使用Disruptor框架
 
10.Disruptor的串行操作
Disruptor的串行操作,可以通过链式调用handleEventsWith()方法来实现。
 
如果使用RingBuffer对象来发布事件,那么需要先从RingBuffer对象中获取一个可用的序号,然后根据序号获取Event对象并对Event对象赋值,最后调用RingBuffer的publish()方法发布事件。
 
如果使用Disruptor对象来发布事件,那么直接调用Disruptor的publishEvent()方法发布事件即可。
 
此外,实际应用中不建议通过Executors来创建线程池,而应通过ThreadPoolExecutor构造函数具体指定线程池的每一个参数。因为Executors创建的线程池还是可能有安全隐患,比如Executors的newFixedThreadPool()方法使用的是无界队列,其使用的LinkedBlockingQueue是一个可选是否有界的阻塞队列。
  1. //Disruptor中的Event
  2. public class Trade {
  3.     private String id;
  4.     private String name;
  5.     private double price;
  6.     private AtomicInteger count = new AtomicInteger(0);
  7.    
  8.     public Trade() {
  9.    
  10.     }
  11.    
  12.     public String getId() {
  13.         return id;
  14.     }
  15.    
  16.     public void setId(String id) {
  17.         this.id = id;
  18.     }
  19.    
  20.     public String getName() {
  21.         return name;
  22.     }
  23.    
  24.     public void setName(String name) {
  25.         this.name = name;
  26.     }
  27.    
  28.     public double getPrice() {
  29.         return price;
  30.     }
  31.    
  32.     public void setPrice(double price) {
  33.         this.price = price;
  34.     }
  35.    
  36.     public AtomicInteger getCount() {
  37.         return count;
  38.     }
  39.    
  40.     public void setCount(AtomicInteger count) {
  41.         this.count = count;
  42.     }
  43. }
  44. public class Main {
  45.     @SuppressWarnings("unchecked")
  46.     public static void main(String[] args) throws Exception {
  47.         //实际应用中不建议这样创建线程池,而应通过ThreadPoolExecutor构造函数具体指定每个参数
  48.         //因为这种创建的线程池还是有安全隐患,比如newFixedThreadPool()使用的是无界队列
  49.         //LinkedBlockingQueue是一个可选是否有界的阻塞队列
  50.         ExecutorService es1 = Executors.newFixedThreadPool(8);
  51.         //构建一个线程池用于提交任务
  52.         ExecutorService es2 = Executors.newFixedThreadPool(1);
  53.   
  54.         //1.构建Disruptor
  55.         Disruptor<Trade> disruptor = new Disruptor<Trade>(
  56.             new EventFactory<Trade>() {
  57.                 public Trade newInstance() {
  58.                     return new Trade();
  59.                 }
  60.             },
  61.             1024 * 1024,
  62.             es1,
  63.             ProducerType.SINGLE,
  64.             new BusySpinWaitStrategy()
  65.         );
  66.   
  67.         //2.把消费者设置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法
  68.         //串行操作,通过链式编程实现
  69.         disruptor.handleEventsWith(new Handler1())
  70.             .handleEventsWith(new Handler2())
  71.             .handleEventsWith(new Handler3());
  72.   
  73.         //3.启动disruptor并获取RingBuffer
  74.         RingBuffer<Trade> ringBuffer = disruptor.start();
  75.   
  76.         CountDownLatch latch = new CountDownLatch(1);
  77.         long begin = System.currentTimeMillis();
  78.         //通过线程池向Disruptor发布事件(生产数据)
  79.         es2.submit(new TradePublisher(latch, disruptor));
  80.         latch.await();
  81.   
  82.         disruptor.shutdown();
  83.         es1.shutdown();
  84.         es2.shutdown();
  85.         System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
  86.     }
  87. }
  88. public class TradePublisher implements Runnable {
  89.     private static int PUBLISH_COUNT = 10;
  90.     private Disruptor<Trade> disruptor;
  91.     private CountDownLatch latch;
  92.    
  93.     public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
  94.         this.disruptor = disruptor;
  95.         this.latch = latch;
  96.     }
  97.    
  98.     public void run() {
  99.         TradeEventTranslator eventTranslator = new TradeEventTranslator();
  100.         for (int i = 0; i < PUBLISH_COUNT; i++) {
  101.             //新的发布事件的方式,另一种方式就是通过传入的RingBuffer的publish()方法发布事件
  102.             disruptor.publishEvent(eventTranslator);
  103.         }
  104.         latch.countDown();
  105.     }
  106. }
  107. class TradeEventTranslator implements EventTranslator<Trade> {
  108.     private Random random = new Random();
  109.    
  110.     public void translateTo(Trade event, long sequence) {
  111.         this.generateTrade(event);
  112.     }
  113.    
  114.     private void generateTrade(Trade event) {
  115.         event.setPrice(random.nextDouble() * 9999);
  116.     }
  117. }
  118. public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {
  119.     //实现EventHandler的onEvent()方法,可以监听生产者发布的事件
  120.     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  121.         this.onEvent(event);
  122.     }
  123.    
  124.     //实现WorkHandler的onEvent()方法,也可以监听生产者发布的事件
  125.     public void onEvent(Trade event) throws Exception {
  126.         System.err.println("handler 1 : SET NAME");
  127.         Thread.sleep(1000);
  128.         event.setName("H1");
  129.     }
  130. }
  131. public class Handler2 implements EventHandler<Trade> {
  132.     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  133.         System.err.println("handler 2 : SET ID");
  134.         Thread.sleep(2000);
  135.         event.setId(UUID.randomUUID().toString());
  136.     }
  137. }
  138. public class Handler3 implements EventHandler<Trade> {
  139.     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  140.         System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());
  141.     }
  142. }
复制代码
 
11.Disruptor的并行操作
Disruptor的并行操作可以有两种方式实现:方式一是调用handleEventsWith()方法时传入多个handler对象,方式二是分别多次调用handleEventsWith()方法。
  1. public class Main {
  2.     @SuppressWarnings("unchecked")
  3.     public static void main(String[] args) throws Exception {
  4.         //实际应用中不建议这样创建线程池,而应通过ThreadPoolExecutor构造函数具体指定每个参数
  5.         //因为这种创建的线程池还是有安全隐患,比如newFixedThreadPool()使用的是无界队列
  6.         //LinkedBlockingQueue是一个可选是否有界的阻塞队列
  7.         ExecutorService es1 = Executors.newFixedThreadPool(8);
  8.         //构建一个线程池用于提交任务
  9.         ExecutorService es2 = Executors.newFixedThreadPool(1);
  10.   
  11.         //1.构建Disruptor
  12.         Disruptor<Trade> disruptor = new Disruptor<Trade>(
  13.             new EventFactory<Trade>() {
  14.                 public Trade newInstance() {
  15.                     return new Trade();
  16.                 }
  17.             },
  18.             1024 * 1024,
  19.             es1,
  20.             ProducerType.SINGLE,
  21.             new BusySpinWaitStrategy()
  22.         );
  23.   
  24.         //2.把消费者设置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法
  25.         //Disruptor的并行操作可以有两种方式实现
  26.         //方式一:调用handleEventsWith方法时传入多个handler对象
  27.         disruptor.handleEventsWith(new Handler1(), new Handler2(), new Handler3());
  28.       
  29.         //方式二:分别多次调用handleEventsWith()方法
  30.         //disruptor.handleEventsWith(new Handler1());
  31.         //disruptor.handleEventsWith(new Handler2());
  32.         //disruptor.handleEventsWith(new Handler3());
  33.   
  34.         //3.启动disruptor并获取RingBuffer
  35.         RingBuffer<Trade> ringBuffer = disruptor.start();
  36.   
  37.         CountDownLatch latch = new CountDownLatch(1);
  38.         long begin = System.currentTimeMillis();
  39.         //通过线程池向Disruptor发布事件(生产数据)
  40.         es2.submit(new TradePublisher(latch, disruptor));
  41.         latch.await();
  42.   
  43.         disruptor.shutdown();
  44.         es1.shutdown();
  45.         es2.shutdown();
  46.         System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
  47.     }
  48. }
  49. public class TradePublisher implements Runnable {
  50.     private static int PUBLISH_COUNT = 10;
  51.     private Disruptor<Trade> disruptor;
  52.     private CountDownLatch latch;
  53.    
  54.     public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
  55.         this.disruptor = disruptor;
  56.         this.latch = latch;
  57.     }
  58.    
  59.     public void run() {
  60.         TradeEventTranslator eventTranslator = new TradeEventTranslator();
  61.         for (int i = 0; i < PUBLISH_COUNT; i++) {
  62.             //新的发布事件的方式,另一种方式就是通过传入的RingBuffer的publish()方法发布事件
  63.             disruptor.publishEvent(eventTranslator);
  64.         }
  65.         latch.countDown();
  66.     }
  67. }
  68. class TradeEventTranslator implements EventTranslator<Trade> {
  69.     private Random random = new Random();
  70.    
  71.     public void translateTo(Trade event, long sequence) {
  72.         this.generateTrade(event);
  73.     }
  74.    
  75.     private void generateTrade(Trade event) {
  76.         event.setPrice(random.nextDouble() * 9999);
  77.     }
  78. }
  79. public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {
  80.     //实现EventHandler的onEvent()方法,可以监听生产者发布的事件
  81.     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  82.         this.onEvent(event);
  83.     }
  84.    
  85.     //实现WorkHandler的onEvent()方法,也可以监听生产者发布的事件
  86.     public void onEvent(Trade event) throws Exception {
  87.         System.err.println("handler 1 : SET NAME");
  88.         Thread.sleep(1000);
  89.         event.setName("H1");
  90.     }
  91. }
  92. public class Handler2 implements EventHandler<Trade> {
  93.     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  94.         System.err.println("handler 2 : SET ID");
  95.         Thread.sleep(2000);
  96.         event.setId(UUID.randomUUID().toString());
  97.     }
  98. }
  99. public class Handler3 implements EventHandler<Trade> {
  100.     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  101.         System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());
  102.     }
  103. }
复制代码
 
12.Disruptor的多边形操作
(1)Disruptor的菱形操作
(2)Disruptor的六边形操作
 
Disruptor可以实现串并行同时编码。
 
(1)Disruptor的菱形操作
可以理解为先并行执行,然后再串行执行,类似于CyclicBarrier。
2.png
菱形操作方式一:调用handleEventsWith()方法时传入多个参数 + 链式调用。
 
菱形操作方式二:调用handleEventsWith()方法时传入多个参数 + 使用then()方法。
  1. public class Main {
  2.     @SuppressWarnings("unchecked")
  3.     public static void main(String[] args) throws Exception {
  4.         //实际应用中不建议这样创建线程池,而应通过ThreadPoolExecutor构造函数具体指定每个参数
  5.         //因为这种创建的线程池还是有安全隐患,比如newFixedThreadPool()使用的是无界队列
  6.         //LinkedBlockingQueue是一个可选是否有界的阻塞队列
  7.         ExecutorService es1 = Executors.newFixedThreadPool(8);
  8.         //构建一个线程池用于提交任务
  9.         ExecutorService es2 = Executors.newFixedThreadPool(1);
  10.   
  11.         //1.构建Disruptor
  12.         Disruptor<Trade> disruptor = new Disruptor<Trade>(
  13.             new EventFactory<Trade>() {
  14.                 public Trade newInstance() {
  15.                     return new Trade();
  16.                 }
  17.             },
  18.             1024 * 1024,
  19.             es1,
  20.             ProducerType.SINGLE,
  21.             new BusySpinWaitStrategy()
  22.         );
  23.   
  24.         //2.把消费者设置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法
  25.         //菱形操作一
  26.         disruptor.handleEventsWith(new Handler1(), new Handler2())
  27.            .handleEventsWith(new Handler3());
  28.         
  29.         //菱形操作二
  30.         //EventHandlerGroup<Trade> ehGroup = disruptor.handleEventsWith(new Handler1(), new Handler2());
  31.         //ehGroup.then(new Handler3());
  32.   
  33.         //3.启动disruptor并获取RingBuffer
  34.         RingBuffer<Trade> ringBuffer = disruptor.start();
  35.   
  36.         CountDownLatch latch = new CountDownLatch(1);
  37.         long begin = System.currentTimeMillis();
  38.         //通过线程池向Disruptor发布事件(生产数据)
  39.         es2.submit(new TradePublisher(latch, disruptor));
  40.         latch.await();
  41.   
  42.         disruptor.shutdown();
  43.         es1.shutdown();
  44.         es2.shutdown();
  45.         System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
  46.     }
  47. }
  48. public class TradePublisher implements Runnable {
  49.     private static int PUBLISH_COUNT = 10;
  50.     private Disruptor<Trade> disruptor;
  51.     private CountDownLatch latch;
  52.    
  53.     public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
  54.         this.disruptor = disruptor;
  55.         this.latch = latch;
  56.     }
  57.    
  58.     public void run() {
  59.         TradeEventTranslator eventTranslator = new TradeEventTranslator();
  60.         for (int i = 0; i < PUBLISH_COUNT; i++) {
  61.             //新的发布事件的方式,另一种方式就是通过传入的RingBuffer的publish()方法发布事件
  62.             disruptor.publishEvent(eventTranslator);
  63.         }
  64.         latch.countDown();
  65.     }
  66. }
  67. class TradeEventTranslator implements EventTranslator<Trade> {
  68.     private Random random = new Random();
  69.    
  70.     public void translateTo(Trade event, long sequence) {
  71.         this.generateTrade(event);
  72.     }
  73.    
  74.     private void generateTrade(Trade event) {
  75.         event.setPrice(random.nextDouble() * 9999);
  76.     }
  77. }
  78. public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {
  79.     //实现EventHandler的onEvent()方法,可以监听生产者发布的事件
  80.     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  81.         this.onEvent(event);
  82.     }
  83.    
  84.     //实现WorkHandler的onEvent()方法,也可以监听生产者发布的事件
  85.     public void onEvent(Trade event) throws Exception {
  86.         System.err.println("handler 1 : SET NAME");
  87.         Thread.sleep(1000);
  88.         event.setName("H1");
  89.     }
  90. }
  91. public class Handler2 implements EventHandler<Trade> {
  92.     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  93.         System.err.println("handler 2 : SET ID");
  94.         Thread.sleep(2000);
  95.         event.setId(UUID.randomUUID().toString());
  96.     }
  97. }
  98. public class Handler3 implements EventHandler<Trade> {
  99.     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  100.         System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());
  101.     }
  102. }
复制代码
 
(2)Disruptor的六边形操作
通过Disruptor的after()方法 + 菱形操作,可实现六边形操作。
 
注意在单消费者模式下:一个EventHandler会对应一个BatchEventProcessor,所以如果有n个EventHandler监听Disruptor,那么初始化Disruptor时的线程池就要有n个线程,否则可能导致多边形操作失效。
 
在单消费者模式下,如果有非常多EventHandler,就需要非常多线程。此时是不合理的,所以如果有很多EventHandler,可采用多消费者模式。
3.png
  1. public class Main {
  2.     @SuppressWarnings("unchecked")
  3.     public static void main(String[] args) throws Exception {
  4.         //实际应用中不建议这样创建线程池,而应通过ThreadPoolExecutor构造函数具体指定每个参数
  5.         //因为这种创建的线程池还是有安全隐患,比如newFixedThreadPool()使用的是无界队列
  6.         //LinkedBlockingQueue是一个可选是否有界的阻塞队列
  7.         ExecutorService es1 = Executors.newFixedThreadPool(8);
  8.         //构建一个线程池用于提交任务
  9.         ExecutorService es2 = Executors.newFixedThreadPool(1);
  10.   
  11.         //1.构建Disruptor
  12.         Disruptor<Trade> disruptor = new Disruptor<Trade>(
  13.             new EventFactory<Trade>() {
  14.                 public Trade newInstance() {
  15.                     return new Trade();
  16.                 }
  17.             },
  18.             1024 * 1024,
  19.             es1,
  20.             ProducerType.SINGLE,
  21.             new BusySpinWaitStrategy()
  22.         );
  23.   
  24.         //2.把消费者设置到Disruptor中,也就是使用Disruptor.handleEventsWith()方法
  25.         //六边形操作
  26.         Handler1 h1 = new Handler1();
  27.         Handler2 h2 = new Handler2();
  28.         Handler3 h3 = new Handler3();
  29.         Handler4 h4 = new Handler4();
  30.         Handler5 h5 = new Handler5();
  31.         disruptor.handleEventsWith(h1, h4);
  32.         disruptor.after(h1).handleEventsWith(h2);
  33.         disruptor.after(h4).handleEventsWith(h5);
  34.         disruptor.after(h2, h5).handleEventsWith(h3);
  35.   
  36.         //3.启动disruptor并获取RingBuffer
  37.         RingBuffer<Trade> ringBuffer = disruptor.start();
  38.   
  39.         CountDownLatch latch = new CountDownLatch(1);
  40.         long begin = System.currentTimeMillis();
  41.         //通过线程池向Disruptor发布事件(生产数据)
  42.         es2.submit(new TradePublisher(latch, disruptor));
  43.         latch.await();
  44.   
  45.         disruptor.shutdown();
  46.         es1.shutdown();
  47.         es2.shutdown();
  48.         System.err.println("总耗时: " + (System.currentTimeMillis() - begin));
  49.     }
  50. }
  51. public class TradePublisher implements Runnable {
  52.     private static int PUBLISH_COUNT = 10;
  53.     private Disruptor<Trade> disruptor;
  54.     private CountDownLatch latch;
  55.    
  56.     public TradePublisher(CountDownLatch latch, Disruptor<Trade> disruptor) {
  57.         this.disruptor = disruptor;
  58.         this.latch = latch;
  59.     }
  60.    
  61.     public void run() {
  62.         TradeEventTranslator eventTranslator = new TradeEventTranslator();
  63.         for (int i = 0; i < PUBLISH_COUNT; i++) {
  64.             //新的发布事件的方式,另一种方式就是通过传入的RingBuffer的publish()方法发布事件
  65.             disruptor.publishEvent(eventTranslator);
  66.         }
  67.         latch.countDown();
  68.     }
  69. }
  70. class TradeEventTranslator implements EventTranslator<Trade> {
  71.     private Random random = new Random();
  72.    
  73.     public void translateTo(Trade event, long sequence) {
  74.         this.generateTrade(event);
  75.     }
  76.    
  77.     private void generateTrade(Trade event) {
  78.         event.setPrice(random.nextDouble() * 9999);
  79.     }
  80. }
  81. public class Handler1 implements EventHandler<Trade>, WorkHandler<Trade> {
  82.     //实现EventHandler的onEvent()方法,可以监听生产者发布的事件
  83.     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  84.         this.onEvent(event);
  85.     }
  86.    
  87.     //实现WorkHandler的onEvent()方法,也可以监听生产者发布的事件
  88.     public void onEvent(Trade event) throws Exception {
  89.         System.err.println("handler 1 : SET NAME");
  90.         Thread.sleep(1000);
  91.         event.setName("H1");
  92.     }
  93. }
  94. public class Handler2 implements EventHandler<Trade> {
  95.     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  96.         System.err.println("handler 2 : SET ID");
  97.         Thread.sleep(2000);
  98.         event.setId(UUID.randomUUID().toString());
  99.     }
  100. }
  101. public class Handler3 implements EventHandler<Trade> {
  102.     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  103.         System.err.println("handler 3 : NAME: " + event.getName() + ", ID: " + event.getId() + ", PRICE: " + event.getPrice() + " INSTANCE : " + event.toString());
  104.     }
  105. }
  106. public class Handler4 implements EventHandler<Trade> {
  107.     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  108.         System.err.println("handler 4 : SET PRICE");
  109.         Thread.sleep(1000);
  110.         event.setPrice(17.0);
  111.     }
  112. }
  113. public class Handler5 implements EventHandler<Trade> {
  114.     public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
  115.         System.err.println("handler 5 : GET PRICE: " + event.getPrice());
  116.         Thread.sleep(1000);
  117.         event.setPrice(event.getPrice() + 3.0);
  118.     }
  119. }
复制代码
 
13.Disruptor的多生产者和多消费者
注意一:使用多消费者模式时,每个消费者都需要实现WorkHandler接口,而不是EventHandler接口。单消费者模式,使用的是EventHandler,对应于EventProcessor。多消费者模式,使用的是WorkHandler,对应于WorkProcessor。
 
注意二:使用多消费者模式时,需要构建消费者工作池WorkerPool。
 
注意三:使用多消费者模式时,每个消费者需要一个Sequence来标记当前消费的最小序号。这样生产者投递消息时才能遍历消费者的Sequence找出最小的序号,然后写到最小的序号位置进行阻塞等待。
 
比如下图中,在某一时刻:消费者1消费了序号0和2,但序号1还没有消费完毕。消费者2消费了序号3和4,消费者3消费了序号5。此时,在RingBuffer中,虽然序号0、2、3、4、5都可以覆盖了,但由于序号1还没被消费,所以生产者最多只能覆盖到序号0的位置。然后等待序号1被消费者1消费完毕后,才能继续往RingBuffer投递消息。
4.png
  1. //Disruptor中的 Event
  2. public class Order {
  3.     private String id;
  4.     private String name;
  5.     private double price;
  6.    
  7.     public Order() {
  8.    
  9.     }
  10.    
  11.     public String getId() {
  12.         return id;
  13.     }
  14.    
  15.     public void setId(String id) {
  16.         this.id = id;
  17.     }
  18.    
  19.     public String getName() {
  20.         return name;
  21.     }
  22.    
  23.     public void setName(String name) {
  24.         this.name = name;
  25.     }
  26.    
  27.     public double getPrice() {
  28.         return price;
  29.     }
  30.    
  31.     public void setPrice(double price) {
  32.         this.price = price;
  33.     }
  34. }
  35. public class Main {
  36.     public static void main(String[] args) throws InterruptedException {
  37.         //1.创建RingBuffer
  38.         RingBuffer<Order> ringBuffer = RingBuffer.create(
  39.             ProducerType.MULTI,//多生产者
  40.             new EventFactory<Order>() {
  41.                 public Order newInstance() {
  42.                     return new Order();
  43.                 }
  44.             },
  45.             1024 * 1024,
  46.             new YieldingWaitStrategy()
  47.         );
  48.   
  49.         //2.通过ringBuffer创建一个屏障
  50.         SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
  51.   
  52.         //3.创建消费者数组,每个消费者Consumer都需要实现WorkHandler接口
  53.         Consumer[] consumers = new Consumer[10];
  54.         for (int i = 0; i < consumers.length; i++) {
  55.             consumers[i] = new Consumer("C" + i);
  56.         }
  57.   
  58.         //4.构建多消费者工作池WorkerPool,因为多消费者模式下需要使用WorkerPool
  59.         WorkerPool<Order> workerPool = new WorkerPool<Order>(
  60.             ringBuffer,
  61.             sequenceBarrier,
  62.             new EventExceptionHandler(),
  63.             consumers
  64.         );
  65.   
  66.         //5.设置多个消费者的sequence序号,用于单独统计每个消费者的消费进度, 并且设置到RingBuffer中
  67.         ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
  68.   
  69.         //6.启动workerPool
  70.         workerPool.start(Executors.newFixedThreadPool(5));
  71.   
  72.         final CountDownLatch latch = new CountDownLatch(1);
  73.         for (int i = 0; i < 100; i++) {
  74.             final Producer producer = new Producer(ringBuffer);
  75.             new Thread(new Runnable() {
  76.                 public void run() {
  77.                     try {
  78.                         latch.await();
  79.                     } catch (Exception e) {
  80.                         e.printStackTrace();
  81.                     }
  82.                     for (int j = 0; j < 100; j++) {
  83.                         producer.sendData(UUID.randomUUID().toString());
  84.                     }
  85.                 }
  86.             }).start();
  87.         }
  88.   
  89.         Thread.sleep(2000);
  90.         System.err.println("----------等待线程创建完毕,才开始生产数据----------");
  91.         latch.countDown();
  92.         Thread.sleep(10000);
  93.         System.err.println("任务总数:" + consumers[2].getCount());
  94.     }
  95.     static class EventExceptionHandler implements ExceptionHandler<Order> {
  96.         public void handleEventException(Throwable ex, long sequence, Order event) {
  97.         
  98.         }
  99.         
  100.         public void handleOnStartException(Throwable ex) {
  101.         
  102.         }
  103.         
  104.         public void handleOnShutdownException(Throwable ex) {
  105.         
  106.         }
  107.     }
  108. }
  109. public class Consumer implements WorkHandler<Order> {
  110.     private static AtomicInteger count = new AtomicInteger(0);
  111.     private String consumerId;
  112.     private Random random = new Random();
  113.    
  114.     public Consumer(String consumerId) {
  115.         this.consumerId = consumerId;
  116.     }
  117.    
  118.     public void onEvent(Order event) throws Exception {
  119.         Thread.sleep(1 * random.nextInt(5));
  120.         System.err.println("当前消费者: " + this.consumerId + ", 消费信息ID: " + event.getId());
  121.         count.incrementAndGet();
  122.     }
  123.    
  124.     public int getCount() {
  125.         return count.get();
  126.     }
  127. }
  128. public class Producer {
  129.     private RingBuffer<Order> ringBuffer;
  130.    
  131.     public Producer(RingBuffer<Order> ringBuffer) {
  132.         this.ringBuffer = ringBuffer;
  133.     }
  134.    
  135.     public void sendData(String uuid) {
  136.         long sequence = ringBuffer.next();
  137.         try {
  138.             Order order = ringBuffer.get(sequence);
  139.             order.setId(uuid);
  140.         } finally {
  141.             ringBuffer.publish(sequence);
  142.         }
  143.     }
  144. }
复制代码
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册