找回密码
 立即注册
首页 业界区 业界 从零开始实现简易版Netty(一) MyNetty Reactor模式 ...

从零开始实现简易版Netty(一) MyNetty Reactor模式

粹脍誊 2025-9-25 11:01:28
从零开始实现简易版Netty(一) MyNetty Reactor模式

自从18年作为一个java程序员入行以来,所接触到的大量组件如dubbo、rocketmq、redisson等都是基于netty这一高性能网络框架实现的。
限于个人水平,在过去很长一段时间中都只能算是netty的初级使用者;在使用基于netty的中间件时,总是因为对netty底层不够了解而导致排查问题时效率不高。
因此,在过去的一段时间中我对netty源码进行了一定的研究,并以博客的形式将心得分享出来,希望能帮助到同样对netty工作原理感兴趣的读者。


非常感谢大佬bin的技术小屋,在我学习netty的过程中给了我很大的帮助。
1. MyNetty介绍

不同于大多数博客直接针对netty官方源码进行解析的方式,本系列博客通过从零到一的实现一个简易版的netty(即MyNetty)来帮助读者更好的理解netty的工作原理。
相比于完整版的netty,MyNetty只实现了netty中最核心的功能点,目的是降低复杂度,避免初学者在学习netty的过程中,对netty源码中复杂的抽象及过深的调用链感到畏惧。
本博客会按照以下顺序,通过一个接一个的小迭代由简单到复杂的实现MyNetty,每一个迭代都会有一篇与之对应的技术博客。



  • Reactor模式
  • Pipeline管道
  • 高效的数据读取
  • 高效的数据写出
  • FastThreadLocal
  • ByteBuf
  • Normal级别的池化内存分配(伙伴算法)
  • Small级别的池化内存分配(slab算法)
  • 池化内存分配支持线程本地缓存(ThreadLocalCache)
  • 常用的编解码器(FixedLengthFrameDecoder/LineBasedFrameDecoder等)


MyNetty的核心逻辑主要参考自netty 4.1.80.Final版本。
2. 操作系统I/O模型与Reactor模式介绍

作为MyNetty系列的第一篇博客,按照规划,第一个迭代中需要实现基于NIO的reactor模式。这也是netty最核心的功能,一个基于事件循环的reactor线程工作模型。


在学习的过程中,我们要尽量做到知其然且知其所以然。
因此,在介绍Reactor模式之前,先简单介绍一下两种常见的操作系统网络I/O模型,只要在了解其各自的优缺点后,才能帮助我们更好的理解为什么Netty最终选择了reactor模式。
2.1 操作系统I/O模型介绍

同步阻塞I/O(BIO)

同步阻塞IO,顾名思义,其读写是阻塞性的,在数据还没有准备好时(比如客户端还未发送新请求,或者未收到服务端响应),当前处理IO的线程是处于阻塞态的,直到数据就绪(比如接受到客户端发送的请求,或收到服务端响应)时才会被唤醒。
由于其阻塞的特性,因此在服务端并发时,每一个新的客户端连接都需要一个独立的线程来承载。


BIO详情优点简单易理解,同步阻塞式的线性代码执行流符合人的直觉。因此普通的web业务后台服务器大多是基于BIO模型开发的缺点由于客户端连接数与服务器线程数是1:1的,而服务器由于线程上下文切换的CPU开销和内存大小限制,难以应对大规模的并发连接(大几千甚至几万),性能较差BIO服务端demo
  1. public class BIOEchoServer {
  2.     private static final ExecutorService threadPool = Executors.newCachedThreadPool();
  3.     public static void main(String[] args) throws IOException {
  4.         int port = 8080;
  5.         ServerSocket serverSocket = new ServerSocket(port);
  6.         System.out.println("BIOServer started on port " + port);
  7.         while (true) {
  8.             Socket clientSocket = serverSocket.accept();
  9.             System.out.println("New client connected: " + clientSocket.getInetAddress());
  10.             // 每个新的连接都启用一个线程去处理
  11.             threadPool.execute(
  12.                 () -> handleClientConnect(clientSocket)
  13.             );
  14.         }
  15.     }
  16.     private static void handleClientConnect(Socket clientSocket) {
  17.         try (BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
  18.              PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)) {
  19.             String inputLine;
  20.             while ((inputLine = in.readLine()) != null) {
  21.                 System.out.println("Received from client: " + inputLine);
  22.                 // echo message
  23.                 String responseMessage = "server echo: " + inputLine;
  24.                 out.println(responseMessage);
  25.                 System.out.println("Sent response: " + responseMessage);
  26.             }
  27.         } catch (IOException e) {
  28.             System.out.println("Client connection closed: " + e.getMessage());
  29.         } finally {
  30.             try {
  31.                 clientSocket.close();
  32.                 System.out.println("clientSocket closed! " + clientSocket.getInetAddress());
  33.             } catch (IOException e) {
  34.                 System.err.println("Error closing client socket: " + e.getMessage());
  35.             }
  36.         }
  37.     }
  38. }
复制代码
BIO客户端demo
  1. public class BIOClient {
  2.     public static void main(String[] args) throws IOException {
  3.         String hostname = "127.0.0.1";
  4.         int port = 8080;
  5.         try (Socket socket = new Socket(hostname, port);
  6.              PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
  7.              BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
  8.              BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))) {
  9.             System.out.println("Connected to server. Type messages (type 'exit' to quit)");
  10.             String userInput;
  11.             while ((userInput = stdIn.readLine()) != null) {
  12.                 out.println(userInput);
  13.                 System.out.println("Server response: " + in.readLine());
  14.             }
  15.         }
  16.     }
  17. }
复制代码
I/O多路复用

I/O多路复用,顾名思义,其不同于BIO中一个线程对应一个客户端连接的模式。I/O多路复用模型中,一个服务端线程能够同时处理多个客户端连接。
I/O多路复用解决了传统BIO模型下面对海量并发时系统资源不足的问题,但同时也引入了一些新的问题。


I/O多路复用详情优点性能好,吞吐量高。单个线程即可处理海量连接缺点比起BIO的阻塞模式,基于事件触发的编程模型非常复杂。IO多路复用服务端demo
  1. public class NIOEchoServer {
  2.     public static void main(String[] args) throws IOException {
  3.         SelectorProvider selectorProvider = SelectorProvider.provider();
  4.         Selector selector = selectorProvider.openSelector();
  5.         // 服务端监听accept事件的channel
  6.         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  7.         serverSocketChannel.socket().bind(new InetSocketAddress(8080));
  8.         serverSocketChannel.configureBlocking(false);
  9.         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  10.         for(;;){
  11.             try{
  12.                 int keys = selector.select(60000);
  13.                 if (keys == 0) {
  14.                     System.out.println("server 60s未监听到事件,继续监听!");
  15.                     continue;
  16.                 }
  17.                 // processSelectedKeysPlain
  18.                 Iterator<SelectionKey> selectionKeyItr = selector.selectedKeys().iterator();
  19.                 while (selectionKeyItr.hasNext()) {
  20.                     SelectionKey key = selectionKeyItr.next();
  21.                     System.out.println("process SelectionKey=" + key.readyOps());
  22.                     try {
  23.                         // 拿出来后,要把集合中已经获取到的事件移除掉,避免重复的处理
  24.                         selectionKeyItr.remove();
  25.                         if (key.isAcceptable()) {
  26.                             // 处理accept事件(接受到来自客户端的连接请求)
  27.                             processAcceptEvent(key);
  28.                         }
  29.                         if (key.isReadable()) {
  30.                             // 处理read事件
  31.                             processReadEvent(key);
  32.                         }
  33.                     }catch (Exception e){
  34.                         System.out.println("server event loop process an selectionKey error! " + e.getMessage());
  35.                         e.printStackTrace();
  36.                         key.cancel();
  37.                         if(key.channel() != null){
  38.                             System.out.println("has error, close channel! " + key.channel());
  39.                             key.channel().close();
  40.                         }
  41.                     }
  42.                 }
  43.             }catch (Exception e){
  44.                 System.out.println("server event loop error! ");
  45.                 e.getStackTrace();
  46.             }
  47.         }
  48.     }
  49.     private static void processAcceptEvent(SelectionKey key) throws IOException {
  50.         // 能收到accept事件的channel一定是ServerSocketChannel
  51.         ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
  52.         // 获得与客户端建立的那个连接
  53.         SocketChannel socketChannel = ssChannel.accept();
  54.         socketChannel.configureBlocking(false);
  55.         socketChannel.finishConnect();
  56.         System.out.println("socketChannel=" + socketChannel + " finishConnect!");
  57.         // 将接受到的连接注册到同样的selector中,并监听read事件
  58.         socketChannel.register(key.selector(),SelectionKey.OP_READ);
  59.     }
  60.     private static void processReadEvent(SelectionKey key) throws IOException {
  61.         SocketChannel socketChannel = (SocketChannel)key.channel();
  62.         // 简单起见,buffer不缓存,每次读事件来都新创建一个
  63.         // 暂时也不考虑黏包/拆包场景(Netty中靠ByteToMessageDecoder解决,后续再分析其原理),理想的认为每个消息都小于1024,且每次读事件都只有一个消息
  64.         ByteBuffer readBuffer = ByteBuffer.allocate(1024);
  65.         int byteRead = socketChannel.read(readBuffer);
  66.         if(byteRead == -1){
  67.             // 简单起见不考虑tcp半连接的情况,返回-1直接关掉连接
  68.             socketChannel.close();
  69.         }else{
  70.             // 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
  71.             readBuffer.flip();
  72.             // 根据缓冲区可读字节数创建字节数组
  73.             byte[] bytes = new byte[readBuffer.remaining()];
  74.             // 将缓冲区可读字节数组复制到新建的数组中
  75.             readBuffer.get(bytes);
  76.             String receivedStr = new String(bytes, StandardCharsets.UTF_8);
  77.             System.out.println("received message:" + receivedStr + " ,from " + socketChannel.socket().getRemoteSocketAddress());
  78.             // 读完了,echo服务器准备回写数据到客户端
  79.             String echoMessage = "server echo:" + receivedStr;
  80.             ByteBuffer writeBuffer = ByteBuffer.allocateDirect(1024);
  81.             writeBuffer.put(echoMessage.getBytes(StandardCharsets.UTF_8));
  82.             writeBuffer.flip(); // 写完了,flip供后续去读取
  83.             socketChannel.write(writeBuffer);
  84.         }
  85.     }
  86. }
复制代码
IO多路复用客户端demo
  1. public class NIOClient {
  2.     private static volatile SocketChannel clientSocketChannel;
  3.     public static void main(String[] args) throws Exception {
  4.         SelectorProvider selectorProvider = SelectorProvider.provider();
  5.         Selector selector = selectorProvider.openSelector();
  6.         CountDownLatch countDownLatch = new CountDownLatch(1);
  7.         new Thread(()->{
  8.             try {
  9.                 startClient(selector,countDownLatch);
  10.             } catch (IOException e) {
  11.                 e.printStackTrace();
  12.             }
  13.         }).start();
  14.         countDownLatch.await();
  15.         System.out.println("please input message:");
  16.         while(true){
  17.             Scanner sc = new Scanner(System.in);
  18.             String msg = sc.next();
  19.             System.out.println("get input message:" + msg);
  20.             // 发送消息
  21.             ByteBuffer writeBuffer = ByteBuffer.allocate(64);
  22.             writeBuffer.put(msg.getBytes(StandardCharsets.UTF_8));
  23.             writeBuffer.flip(); // 写完了,flip供后续去读取
  24.             clientSocketChannel.write(writeBuffer);
  25.         }
  26.     }
  27.     private static void startClient(Selector selector, CountDownLatch countDownLatch) throws IOException {
  28.         SocketChannel socketChannel = SocketChannel.open();
  29.         socketChannel.configureBlocking(false);
  30.         clientSocketChannel = socketChannel;
  31.         // doConnect
  32.         // Returns: true if a connection was established,
  33.         //          false if this channel is in non-blocking mode and the connection operation is in progress;
  34.         if(!socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080))) {
  35.             // 配置为非阻塞,会返回false,通过注册并监听connect事件的方式进行交互
  36.             socketChannel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
  37.         }
  38.         for(;;){
  39.             try {
  40.                 int keys = selector.select(60000);
  41.                 if (keys == 0) {
  42.                     System.out.println("client 60s未监听到事件,继续监听!");
  43.                     continue;
  44.                 }
  45.                 // processSelectedKeysPlain
  46.                 Iterator<SelectionKey> selectionKeyItr = selector.selectedKeys().iterator();
  47.                 while (selectionKeyItr.hasNext()) {
  48.                     SelectionKey key = selectionKeyItr.next();
  49.                     try {
  50.                         System.out.println("process SelectionKey=" + key.readyOps());
  51.                         // 拿出来后,要把集合中已经获取到的事件移除掉,避免重复的处理
  52.                         selectionKeyItr.remove();
  53.                         if (key.isConnectable()) {
  54.                             // 处理连接相关事件
  55.                             processConnectEvent(key,countDownLatch);
  56.                         }
  57.                         if (key.isReadable()){
  58.                             processReadEvent(key);
  59.                         }
  60.                         if (key.isWritable()){
  61.                             System.out.println("watch an write event!");
  62.                         }
  63.                     } catch (Exception e) {
  64.                         System.out.println("client event loop process an selectionKey error! " + e.getMessage());
  65.                         key.cancel();
  66.                         if(key.channel() != null){
  67.                             key.channel().close();
  68.                             System.out.println("has error, close channel!" );
  69.                         }
  70.                     }
  71.                 }
  72.             } catch (Exception e) {
  73.                 System.out.println("client event loop error! ");
  74.                 e.getStackTrace();
  75.             }
  76.         }
  77.     }
  78.     private static void processConnectEvent(SelectionKey key, CountDownLatch countDownLatch) throws IOException {
  79.         // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
  80.         int ops = key.interestOps();
  81.         ops &= ~SelectionKey.OP_CONNECT;
  82.         key.interestOps(ops);
  83.         SocketChannel socketChannel = (SocketChannel) key.channel();
  84.         if(socketChannel.finishConnect()){
  85.             // 确认完成连接
  86.             System.out.println("client channel connected!");
  87.             countDownLatch.countDown();
  88.         }else{
  89.             // 连接建立失败,程序退出
  90.             System.out.println("client channel connect failed!");
  91.             System.exit(1);
  92.         }
  93.     }
  94.     private static void processReadEvent(SelectionKey key) throws IOException {
  95.         SocketChannel socketChannel = (SocketChannel) key.channel();
  96.         // 创建ByteBuffer,并开辟一个1M的缓冲区
  97.         ByteBuffer buffer = ByteBuffer.allocate(64);
  98.         // 读取请求码流,返回读取到的字节数
  99.         int readBytes = socketChannel.read(buffer);
  100.         // 读取到字节,对字节进行编解码
  101.         if(readBytes > 0){
  102.             // 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
  103.             buffer.flip();
  104.             // 根据缓冲区可读字节数创建字节数组
  105.             byte[] bytes = new byte[buffer.remaining()];
  106.             // 将缓冲区可读字节数组复制到新建的数组中
  107.             buffer.get(bytes);
  108.             String response = new String(bytes, StandardCharsets.UTF_8);
  109.             System.out.println("client received response message: " + response);
  110.         }
  111.         // 读取到了EOF,关闭连接
  112.         if(readBytes < 0){
  113.             socketChannel.close();
  114.         }
  115.     }
  116. }
复制代码


上述对于操作系统I/O模型的介绍限于篇幅,点到为止。想进一步了解的读者可以参考我之前写的博客:谈谈对不同I/O模型的理解
2.2 Reactor模式

从上面的介绍中我们可以看到,I/O多路复用模型的高性能、高吞吐的特点更加适合互联网时代海量连接的场景,所以netty自然也是基于I/O多路复用模型的。
但上述给出的I/O多路复用的demo中存在两个很严重的问题,第一个问题是java中NIO的能力过于底层,在开发业务时所需要考虑的细节太多,一个简单的、不考虑各种异常、边界场景的echo服务器都要写近百行的代码。
第二个问题则是服务端单线程的I/O多路复用模型没法很好的利用现代的多核CPU硬件,会出现处理大量连接时一核有难八核围观的问题。


针对第一个问题,正是netty作为java NIO的更高层次封装而诞生的原因,我们会在后续的迭代中逐步的优化这一问题。
而第二个问题的解决方案便是本章要引出的主题,reactor模式。


I/O多路复用模型与多线程并不冲突,一个线程可以独自处理所有连接,也可以用多个线程来均匀的分摊所有来自客户端的连接。
在reactor模式下,接收连接与处理连接后续读写的任务的线程会被分离开。接受客户端连接的逻辑较为简单,因此一个线程(cpu核心)通常足够处理这一任务。
相对的,处理连接建立后的读写操作则压力会大的多,所以需要多个CPU核心(多个线程)来分摊压力。
在reactor模式下,将专门用于接受连接的线程称为Boss线程,而连接建立后处理读写操作的线程成为Worker线程(Boss工作压力小,Worker工作压力大;Boss接了单子后把活直接派给Worker)。
reactor模式示意图

1.png

3. MyNetty reactor模式实现源码解析

从上文IO多路复用的demo可以看到,程序最核心的逻辑便是处理selector.select获取到的事件key集合。
当前线程会不断地尝试获取到激活的事件集合,然后按顺序处理,并循环往复。这一工作机制被称为事件循环(EventLoop)。
事件被抽象为4种类型,OP_READ(可读事件)、OP_WRITE(可写事件)、OP_CONNECT(连接建立事件)和OP_ACCEPT(连接接受事件),而在demo中我们已经接触到了除了OP_WRITE事件外的三种(OP_WRITE事件会在lab4高效的数据写出中再展开介绍)。
针对事件循环,Netty中抽象出了两个概念,EventLoopGroup和EventLoop,EventLoop对应的就是上述的无限循环处理IO事件的线程,而EventLoopGroup顾名思义便是将一组EventLoop统一管理的集合。


下面我们结合MyNetty的源码,来进一步讲解reactor模式的工作原理。
MyNetty NioServer源码
  1. public class MyNettyNioServer {
  2.     private static final Logger logger = LoggerFactory.getLogger(MyNettyNioServer.class);
  3.     private final InetSocketAddress endpointAddress;
  4.     private final MyNioEventLoopGroup bossGroup;
  5.     public MyNettyNioServer(InetSocketAddress endpointAddress, MyEventHandler myEventHandler,
  6.                             int bossThreads, int childThreads) {
  7.         this.endpointAddress = endpointAddress;
  8.         MyNioEventLoopGroup childGroup = new MyNioEventLoopGroup(myEventHandler,childThreads);
  9.         this.bossGroup = new MyNioEventLoopGroup(myEventHandler,bossThreads,childGroup);
  10.     }
  11.     public void start() throws IOException {
  12.         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  13.         serverSocketChannel.configureBlocking(false);
  14.         MyNioEventLoop myNioEventLoop = this.bossGroup.next();
  15.         myNioEventLoop.execute(()->{
  16.             try {
  17.                 Selector selector = myNioEventLoop.getUnwrappedSelector();
  18.                 serverSocketChannel.socket().bind(endpointAddress);
  19.                 SelectionKey selectionKey = serverSocketChannel.register(selector, 0);
  20.                 // 监听accept事件
  21.                 selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT);
  22.                 logger.info("MyNioServer do start! endpointAddress={}",endpointAddress);
  23.             } catch (IOException e) {
  24.                 logger.error("MyNioServer do bind error!",e);
  25.             }
  26.         });
  27.     }
  28. }
复制代码
MyNetty NioClient源码
  1. public class MyNettyNioClient {
  2.     private static final Logger logger = LoggerFactory.getLogger(MyNettyNioClient.class);
  3.     private final InetSocketAddress remoteAddress;
  4.     private final MyNioEventLoopGroup eventLoopGroup;
  5.     private SocketChannel socketChannel;
  6.     public MyNettyNioClient(InetSocketAddress remoteAddress, MyEventHandler myEventHandler, int nThreads) {
  7.         this.remoteAddress = remoteAddress;
  8.         this.eventLoopGroup = new MyNioEventLoopGroup(myEventHandler,nThreads);
  9.     }
  10.     public void start() throws IOException {
  11.         SocketChannel socketChannel = SocketChannel.open();
  12.         socketChannel.configureBlocking(false);
  13.         this.socketChannel = socketChannel;
  14.         MyNioEventLoop myNioEventLoop = this.eventLoopGroup.next();
  15.         myNioEventLoop.execute(()->{
  16.             try {
  17.                 Selector selector = myNioEventLoop.getUnwrappedSelector();
  18.                 // doConnect
  19.                 // Returns: true if a connection was established,
  20.                 //          false if this channel is in non-blocking mode and the connection operation is in progress;
  21.                 if(!socketChannel.connect(remoteAddress)){
  22.                     SelectionKey selectionKey = socketChannel.register(selector, 0);
  23.                     int clientInterestOps = SelectionKey.OP_CONNECT | SelectionKey.OP_READ;
  24.                     selectionKey.interestOps(selectionKey.interestOps() | clientInterestOps);
  25.                 }
  26.                 // 监听connect事件
  27.                 logger.info("MyNioClient do start! remoteAddress={}",remoteAddress);
  28.             } catch (IOException e) {
  29.                 logger.error("MyNioClient do connect error!",e);
  30.             }
  31.         });
  32.     }
  33. }
复制代码
MyNetty EventLoop源码
  1. public class MyNioEventLoop implements Executor {
  2.     private static final Logger logger = LoggerFactory.getLogger(MyNioEventLoop.class);
  3.     /**
  4.      * 原始的jdk中的selector
  5.      * */
  6.     private final Selector unwrappedSelector;
  7.     private final Queue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
  8.     private volatile Thread thread;
  9.     private final MyNioEventLoopGroup childGroup;
  10.     private final AtomicBoolean threadStartedFlag = new AtomicBoolean(false);
  11.     private MyEventHandler myEventHandler;
  12.     public MyNioEventLoop(){
  13.         this(null);
  14.     }
  15.     public MyNioEventLoop(MyNioEventLoopGroup childGroup) {
  16.         this.childGroup = childGroup;
  17.         SelectorProvider selectorProvider = SelectorProvider.provider();
  18.         try {
  19.             this.unwrappedSelector = selectorProvider.openSelector();
  20.         } catch (IOException e) {
  21.             throw new RuntimeException("open selector error!",e);
  22.         }
  23.     }
  24.     @Override
  25.     public void execute(Runnable task) {
  26.         // 将任务加入eventLoop所属的任务队列,事件循环中会
  27.         taskQueue.add(task);
  28.         if(this.thread != Thread.currentThread()){
  29.             // 如果执行execute方法的线程不是当前线程,可能当前eventLoop对应的thread还没有启动
  30.             // 尝试启动当前eventLoop对应的线程(cas防并发)
  31.             if(threadStartedFlag.compareAndSet(false,true)){
  32.                 // 类似netty的ThreadPerTaskExecutor,启动一个线程来执行事件循环
  33.                 new Thread(()->{
  34.                     // 将eventLoop的thread与新启动的这个thread进行绑定
  35.                     this.thread = Thread.currentThread();
  36.                     // 执行监听selector的事件循环
  37.                     doEventLoop();
  38.                 }).start();
  39.             }
  40.         }
  41.     }
  42.     public Selector getUnwrappedSelector() {
  43.         return unwrappedSelector;
  44.     }
  45.     public void setMyEventHandler(MyEventHandler myEventHandler) {
  46.         this.myEventHandler = myEventHandler;
  47.     }
  48.     private void doEventLoop(){
  49.         // 事件循环
  50.         for(;;){
  51.             try{
  52.                 if(taskQueue.isEmpty()){
  53.                     int keys = unwrappedSelector.select(60000);
  54.                     if (keys == 0) {
  55.                         logger.info("server 60s未监听到事件,继续监听!");
  56.                         continue;
  57.                     }
  58.                 }else{
  59.                     // 确保任务队列里的任务能够被触发
  60.                     unwrappedSelector.selectNow();
  61.                 }
  62.                 // 简单起见,暂不实现基于时间等元素的更为公平的执行策略
  63.                 // 直接先处理io,再处理所有task(ioRatio=100)
  64.                 try {
  65.                     // 处理监听到的io事件
  66.                     processSelectedKeys();
  67.                 }finally {
  68.                     // Ensure we always run tasks.
  69.                     // 处理task队列里的任务
  70.                     runAllTasks();
  71.                 }
  72.             }catch (Throwable e){
  73.                 logger.error("server event loop error!",e);
  74.             }
  75.         }
  76.     }
  77.     private void processSelectedKeys() throws IOException {
  78.         // processSelectedKeysPlain
  79.         Iterator<SelectionKey> selectionKeyItr = unwrappedSelector.selectedKeys().iterator();
  80.         while (selectionKeyItr.hasNext()) {
  81.             SelectionKey key = selectionKeyItr.next();
  82.             logger.info("process SelectionKey={}",key.readyOps());
  83.             try {
  84.                 // 拿出来后,要把集合中已经获取到的事件移除掉,避免重复的处理
  85.                 selectionKeyItr.remove();
  86.                 if (key.isConnectable()) {
  87.                     // 处理客户端连接建立相关事件
  88.                     processConnectEvent(key);
  89.                 }
  90.                 if (key.isAcceptable()) {
  91.                     // 处理服务端accept事件(接受到来自客户端的连接请求)
  92.                     processAcceptEvent(key);
  93.                 }
  94.                 if (key.isReadable()) {
  95.                     // 处理read事件
  96.                     processReadEvent(key);
  97.                 }
  98.             }catch (Throwable e){
  99.                 logger.error("server event loop process an selectionKey error!",e);
  100.                 // 处理io事件有异常,取消掉监听的key,并且尝试把channel也关闭掉
  101.                 key.cancel();
  102.                 if(key.channel() != null){
  103.                     logger.error("has error, close channel={} ",key.channel());
  104.                     key.channel().close();
  105.                 }
  106.             }
  107.         }
  108.     }
  109.     private void runAllTasks(){
  110.         for (;;) {
  111.             // 通过无限循环,直到把队列里的任务全部捞出来执行掉
  112.             Runnable task = taskQueue.poll();
  113.             if (task == null) {
  114.                 return;
  115.             }
  116.             try {
  117.                 task.run();
  118.             } catch (Throwable t) {
  119.                 logger.warn("A task raised an exception. Task: {}", task, t);
  120.             }
  121.         }
  122.     }
  123.     private void processAcceptEvent(SelectionKey key) throws IOException {
  124.         ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
  125.         SocketChannel socketChannel = ssChannel.accept();
  126.         if(this.childGroup != null){
  127.             // boss/worker模式,boss线程只负责接受和建立连接
  128.             // 将建立的连接交给child线程组去处理后续的读写
  129.             MyNioEventLoop childEventLoop = childGroup.next();
  130.             childEventLoop.execute(()->{
  131.                 doRegister(childEventLoop,socketChannel);
  132.             });
  133.         }else{
  134.             doRegister(this,socketChannel);
  135.         }
  136.     }
  137.     private void processConnectEvent(SelectionKey key) throws IOException {
  138.         // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
  139.         // See https://github.com/netty/netty/issues/924
  140.         int ops = key.interestOps();
  141.         ops &= ~SelectionKey.OP_CONNECT;
  142.         key.interestOps(ops);
  143.         SocketChannel socketChannel = (SocketChannel) key.channel();
  144.         if(socketChannel.finishConnect()){
  145.             // 确认完成连接
  146.             logger.info("client channel connected! socketChannel={}",socketChannel);
  147.         }else{
  148.             logger.error("client channel connect failed!");
  149.             // 连接建立失败,连接关闭(上层catch住会关闭连接)
  150.             throw new Error();
  151.         }
  152.     }
  153.     private void processReadEvent(SelectionKey key) throws Exception {
  154.         SocketChannel socketChannel = (SocketChannel)key.channel();
  155.         // 简单起见,buffer不缓存,每次读事件来都新创建一个
  156.         // 暂时也不考虑黏包/拆包场景(Netty中靠ByteToMessageDecoder解决,后续再分析其原理),理想的认为每个消息都小于1024,且每次读事件都只有一个消息
  157.         ByteBuffer readBuffer = ByteBuffer.allocate(64);
  158.         int byteRead = socketChannel.read(readBuffer);
  159.         if(byteRead == -1){
  160.             // 简单起见不考虑tcp半连接的情况,返回-1直接关掉连接
  161.             socketChannel.close();
  162.             // 取消key的监听
  163.             key.cancel();
  164.         }else{
  165.             // 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
  166.             readBuffer.flip();
  167.             // 根据缓冲区可读字节数创建字节数组
  168.             byte[] bytes = new byte[readBuffer.remaining()];
  169.             // 将缓冲区可读字节数组复制到新建的数组中
  170.             readBuffer.get(bytes);
  171.             if(myEventHandler != null) {
  172.                 myEventHandler.fireChannelRead(socketChannel, bytes);
  173.             }
  174.         }
  175.     }
  176.     private void doRegister(SocketChannel socketChannel){
  177.         try {
  178.             // nio的非阻塞channel
  179.             socketChannel.configureBlocking(false);
  180.             socketChannel.finishConnect();
  181.             logger.info("socketChannel={} finishConnect!",socketChannel);
  182.             // 将接受到的连接注册到selector中,并监听read事件
  183.             socketChannel.register(unwrappedSelector, SelectionKey.OP_READ);
  184.             logger.info("socketChannel={} doRegister success!",socketChannel);
  185.         }catch (Exception e){
  186.             logger.error("register socketChannel={} error!",socketChannel,e);
  187.             try {
  188.                 socketChannel.close();
  189.             } catch (IOException ex) {
  190.                 logger.error("register channel close={} error!",socketChannel,ex);
  191.             }
  192.         }
  193.     }
  194. }
复制代码
MyNetty EventLoopGroup源码

[code]public class MyNioEventLoopGroup {    private final MyNioEventLoop[] executors;    private final int nThreads;    private final AtomicInteger atomicInteger = new AtomicInteger();    public MyNioEventLoopGroup(MyEventHandler myEventHandler, int nThreads) {        this(myEventHandler,nThreads,null);    }    public MyNioEventLoopGroup(MyEventHandler myEventHandler, int nThreads, MyNioEventLoopGroup childGroup) {        if(nThreads

相关推荐

3 天前

举报

鼓励转贴优秀软件安全工具和文档!
您需要登录后才可以回帖 登录 | 立即注册