从零开始实现简易版Netty(一) MyNetty Reactor模式
自从18年作为一个java程序员入行以来,所接触到的大量组件如dubbo、rocketmq、redisson等都是基于netty这一高性能网络框架实现的。
限于个人水平,在过去很长一段时间中都只能算是netty的初级使用者;在使用基于netty的中间件时,总是因为对netty底层不够了解而导致排查问题时效率不高。
因此,在过去的一段时间中我对netty源码进行了一定的研究,并以博客的形式将心得分享出来,希望能帮助到同样对netty工作原理感兴趣的读者。
非常感谢大佬bin的技术小屋,在我学习netty的过程中给了我很大的帮助。
1. MyNetty介绍
不同于大多数博客直接针对netty官方源码进行解析的方式,本系列博客通过从零到一的实现一个简易版的netty(即MyNetty)来帮助读者更好的理解netty的工作原理。
相比于完整版的netty,MyNetty只实现了netty中最核心的功能点,目的是降低复杂度,避免初学者在学习netty的过程中,对netty源码中复杂的抽象及过深的调用链感到畏惧。
本博客会按照以下顺序,通过一个接一个的小迭代由简单到复杂的实现MyNetty,每一个迭代都会有一篇与之对应的技术博客。
- Reactor模式
- Pipeline管道
- 高效的数据读取
- 高效的数据写出
- FastThreadLocal
- ByteBuf
- Normal级别的池化内存分配(伙伴算法)
- Small级别的池化内存分配(slab算法)
- 池化内存分配支持线程本地缓存(ThreadLocalCache)
- 常用的编解码器(FixedLengthFrameDecoder/LineBasedFrameDecoder等)
MyNetty的核心逻辑主要参考自netty 4.1.80.Final版本。
2. 操作系统I/O模型与Reactor模式介绍
作为MyNetty系列的第一篇博客,按照规划,第一个迭代中需要实现基于NIO的reactor模式。这也是netty最核心的功能,一个基于事件循环的reactor线程工作模型。
在学习的过程中,我们要尽量做到知其然且知其所以然。
因此,在介绍Reactor模式之前,先简单介绍一下两种常见的操作系统网络I/O模型,只要在了解其各自的优缺点后,才能帮助我们更好的理解为什么Netty最终选择了reactor模式。
2.1 操作系统I/O模型介绍
同步阻塞I/O(BIO)
同步阻塞IO,顾名思义,其读写是阻塞性的,在数据还没有准备好时(比如客户端还未发送新请求,或者未收到服务端响应),当前处理IO的线程是处于阻塞态的,直到数据就绪(比如接受到客户端发送的请求,或收到服务端响应)时才会被唤醒。
由于其阻塞的特性,因此在服务端并发时,每一个新的客户端连接都需要一个独立的线程来承载。
BIO详情优点简单易理解,同步阻塞式的线性代码执行流符合人的直觉。因此普通的web业务后台服务器大多是基于BIO模型开发的缺点由于客户端连接数与服务器线程数是1:1的,而服务器由于线程上下文切换的CPU开销和内存大小限制,难以应对大规模的并发连接(大几千甚至几万),性能较差BIO服务端demo
- public class BIOEchoServer {
- private static final ExecutorService threadPool = Executors.newCachedThreadPool();
- public static void main(String[] args) throws IOException {
- int port = 8080;
- ServerSocket serverSocket = new ServerSocket(port);
- System.out.println("BIOServer started on port " + port);
- while (true) {
- Socket clientSocket = serverSocket.accept();
- System.out.println("New client connected: " + clientSocket.getInetAddress());
- // 每个新的连接都启用一个线程去处理
- threadPool.execute(
- () -> handleClientConnect(clientSocket)
- );
- }
- }
- private static void handleClientConnect(Socket clientSocket) {
- try (BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
- PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)) {
- String inputLine;
- while ((inputLine = in.readLine()) != null) {
- System.out.println("Received from client: " + inputLine);
- // echo message
- String responseMessage = "server echo: " + inputLine;
- out.println(responseMessage);
- System.out.println("Sent response: " + responseMessage);
- }
- } catch (IOException e) {
- System.out.println("Client connection closed: " + e.getMessage());
- } finally {
- try {
- clientSocket.close();
- System.out.println("clientSocket closed! " + clientSocket.getInetAddress());
- } catch (IOException e) {
- System.err.println("Error closing client socket: " + e.getMessage());
- }
- }
- }
- }
复制代码 BIO客户端demo
- public class BIOClient {
- public static void main(String[] args) throws IOException {
- String hostname = "127.0.0.1";
- int port = 8080;
- try (Socket socket = new Socket(hostname, port);
- PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
- BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in))) {
- System.out.println("Connected to server. Type messages (type 'exit' to quit)");
- String userInput;
- while ((userInput = stdIn.readLine()) != null) {
- out.println(userInput);
- System.out.println("Server response: " + in.readLine());
- }
- }
- }
- }
复制代码 I/O多路复用
I/O多路复用,顾名思义,其不同于BIO中一个线程对应一个客户端连接的模式。I/O多路复用模型中,一个服务端线程能够同时处理多个客户端连接。
I/O多路复用解决了传统BIO模型下面对海量并发时系统资源不足的问题,但同时也引入了一些新的问题。
I/O多路复用详情优点性能好,吞吐量高。单个线程即可处理海量连接缺点比起BIO的阻塞模式,基于事件触发的编程模型非常复杂。IO多路复用服务端demo
IO多路复用客户端demo
上述对于操作系统I/O模型的介绍限于篇幅,点到为止。想进一步了解的读者可以参考我之前写的博客:谈谈对不同I/O模型的理解
2.2 Reactor模式
从上面的介绍中我们可以看到,I/O多路复用模型的高性能、高吞吐的特点更加适合互联网时代海量连接的场景,所以netty自然也是基于I/O多路复用模型的。
但上述给出的I/O多路复用的demo中存在两个很严重的问题,第一个问题是java中NIO的能力过于底层,在开发业务时所需要考虑的细节太多,一个简单的、不考虑各种异常、边界场景的echo服务器都要写近百行的代码。
第二个问题则是服务端单线程的I/O多路复用模型没法很好的利用现代的多核CPU硬件,会出现处理大量连接时一核有难八核围观的问题。
针对第一个问题,正是netty作为java NIO的更高层次封装而诞生的原因,我们会在后续的迭代中逐步的优化这一问题。
而第二个问题的解决方案便是本章要引出的主题,reactor模式。
I/O多路复用模型与多线程并不冲突,一个线程可以独自处理所有连接,也可以用多个线程来均匀的分摊所有来自客户端的连接。
在reactor模式下,接收连接与处理连接后续读写的任务的线程会被分离开。接受客户端连接的逻辑较为简单,因此一个线程(cpu核心)通常足够处理这一任务。
相对的,处理连接建立后的读写操作则压力会大的多,所以需要多个CPU核心(多个线程)来分摊压力。
在reactor模式下,将专门用于接受连接的线程称为Boss线程,而连接建立后处理读写操作的线程成为Worker线程(Boss工作压力小,Worker工作压力大;Boss接了单子后把活直接派给Worker)。
reactor模式示意图
3. MyNetty reactor模式实现源码解析
从上文IO多路复用的demo可以看到,程序最核心的逻辑便是处理selector.select获取到的事件key集合。
当前线程会不断地尝试获取到激活的事件集合,然后按顺序处理,并循环往复。这一工作机制被称为事件循环(EventLoop)。
事件被抽象为4种类型,OP_READ(可读事件)、OP_WRITE(可写事件)、OP_CONNECT(连接建立事件)和OP_ACCEPT(连接接受事件),而在demo中我们已经接触到了除了OP_WRITE事件外的三种(OP_WRITE事件会在lab4高效的数据写出中再展开介绍)。
针对事件循环,Netty中抽象出了两个概念,EventLoopGroup和EventLoop,EventLoop对应的就是上述的无限循环处理IO事件的线程,而EventLoopGroup顾名思义便是将一组EventLoop统一管理的集合。
下面我们结合MyNetty的源码,来进一步讲解reactor模式的工作原理。
MyNetty NioServer源码
- public class MyNettyNioServer {
- private static final Logger logger = LoggerFactory.getLogger(MyNettyNioServer.class);
- private final InetSocketAddress endpointAddress;
- private final MyNioEventLoopGroup bossGroup;
- public MyNettyNioServer(InetSocketAddress endpointAddress, MyEventHandler myEventHandler,
- int bossThreads, int childThreads) {
- this.endpointAddress = endpointAddress;
- MyNioEventLoopGroup childGroup = new MyNioEventLoopGroup(myEventHandler,childThreads);
- this.bossGroup = new MyNioEventLoopGroup(myEventHandler,bossThreads,childGroup);
- }
- public void start() throws IOException {
- ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
- serverSocketChannel.configureBlocking(false);
- MyNioEventLoop myNioEventLoop = this.bossGroup.next();
- myNioEventLoop.execute(()->{
- try {
- Selector selector = myNioEventLoop.getUnwrappedSelector();
- serverSocketChannel.socket().bind(endpointAddress);
- SelectionKey selectionKey = serverSocketChannel.register(selector, 0);
- // 监听accept事件
- selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_ACCEPT);
- logger.info("MyNioServer do start! endpointAddress={}",endpointAddress);
- } catch (IOException e) {
- logger.error("MyNioServer do bind error!",e);
- }
- });
- }
- }
复制代码 MyNetty NioClient源码
- public class MyNettyNioClient {
- private static final Logger logger = LoggerFactory.getLogger(MyNettyNioClient.class);
- private final InetSocketAddress remoteAddress;
- private final MyNioEventLoopGroup eventLoopGroup;
- private SocketChannel socketChannel;
- public MyNettyNioClient(InetSocketAddress remoteAddress, MyEventHandler myEventHandler, int nThreads) {
- this.remoteAddress = remoteAddress;
- this.eventLoopGroup = new MyNioEventLoopGroup(myEventHandler,nThreads);
- }
- public void start() throws IOException {
- SocketChannel socketChannel = SocketChannel.open();
- socketChannel.configureBlocking(false);
- this.socketChannel = socketChannel;
- MyNioEventLoop myNioEventLoop = this.eventLoopGroup.next();
- myNioEventLoop.execute(()->{
- try {
- Selector selector = myNioEventLoop.getUnwrappedSelector();
- // doConnect
- // Returns: true if a connection was established,
- // false if this channel is in non-blocking mode and the connection operation is in progress;
- if(!socketChannel.connect(remoteAddress)){
- SelectionKey selectionKey = socketChannel.register(selector, 0);
- int clientInterestOps = SelectionKey.OP_CONNECT | SelectionKey.OP_READ;
- selectionKey.interestOps(selectionKey.interestOps() | clientInterestOps);
- }
- // 监听connect事件
- logger.info("MyNioClient do start! remoteAddress={}",remoteAddress);
- } catch (IOException e) {
- logger.error("MyNioClient do connect error!",e);
- }
- });
- }
- }
复制代码 MyNetty EventLoop源码
- public class MyNioEventLoop implements Executor {
- private static final Logger logger = LoggerFactory.getLogger(MyNioEventLoop.class);
- /**
- * 原始的jdk中的selector
- * */
- private final Selector unwrappedSelector;
- private final Queue<Runnable> taskQueue = new LinkedBlockingQueue<>(16);
- private volatile Thread thread;
- private final MyNioEventLoopGroup childGroup;
- private final AtomicBoolean threadStartedFlag = new AtomicBoolean(false);
- private MyEventHandler myEventHandler;
- public MyNioEventLoop(){
- this(null);
- }
- public MyNioEventLoop(MyNioEventLoopGroup childGroup) {
- this.childGroup = childGroup;
- SelectorProvider selectorProvider = SelectorProvider.provider();
- try {
- this.unwrappedSelector = selectorProvider.openSelector();
- } catch (IOException e) {
- throw new RuntimeException("open selector error!",e);
- }
- }
- @Override
- public void execute(Runnable task) {
- // 将任务加入eventLoop所属的任务队列,事件循环中会
- taskQueue.add(task);
- if(this.thread != Thread.currentThread()){
- // 如果执行execute方法的线程不是当前线程,可能当前eventLoop对应的thread还没有启动
- // 尝试启动当前eventLoop对应的线程(cas防并发)
- if(threadStartedFlag.compareAndSet(false,true)){
- // 类似netty的ThreadPerTaskExecutor,启动一个线程来执行事件循环
- new Thread(()->{
- // 将eventLoop的thread与新启动的这个thread进行绑定
- this.thread = Thread.currentThread();
- // 执行监听selector的事件循环
- doEventLoop();
- }).start();
- }
- }
- }
- public Selector getUnwrappedSelector() {
- return unwrappedSelector;
- }
- public void setMyEventHandler(MyEventHandler myEventHandler) {
- this.myEventHandler = myEventHandler;
- }
- private void doEventLoop(){
- // 事件循环
- for(;;){
- try{
- if(taskQueue.isEmpty()){
- int keys = unwrappedSelector.select(60000);
- if (keys == 0) {
- logger.info("server 60s未监听到事件,继续监听!");
- continue;
- }
- }else{
- // 确保任务队列里的任务能够被触发
- unwrappedSelector.selectNow();
- }
- // 简单起见,暂不实现基于时间等元素的更为公平的执行策略
- // 直接先处理io,再处理所有task(ioRatio=100)
- try {
- // 处理监听到的io事件
- processSelectedKeys();
- }finally {
- // Ensure we always run tasks.
- // 处理task队列里的任务
- runAllTasks();
- }
- }catch (Throwable e){
- logger.error("server event loop error!",e);
- }
- }
- }
- private void processSelectedKeys() throws IOException {
- // processSelectedKeysPlain
- Iterator<SelectionKey> selectionKeyItr = unwrappedSelector.selectedKeys().iterator();
- while (selectionKeyItr.hasNext()) {
- SelectionKey key = selectionKeyItr.next();
- logger.info("process SelectionKey={}",key.readyOps());
- try {
- // 拿出来后,要把集合中已经获取到的事件移除掉,避免重复的处理
- selectionKeyItr.remove();
- if (key.isConnectable()) {
- // 处理客户端连接建立相关事件
- processConnectEvent(key);
- }
- if (key.isAcceptable()) {
- // 处理服务端accept事件(接受到来自客户端的连接请求)
- processAcceptEvent(key);
- }
- if (key.isReadable()) {
- // 处理read事件
- processReadEvent(key);
- }
- }catch (Throwable e){
- logger.error("server event loop process an selectionKey error!",e);
- // 处理io事件有异常,取消掉监听的key,并且尝试把channel也关闭掉
- key.cancel();
- if(key.channel() != null){
- logger.error("has error, close channel={} ",key.channel());
- key.channel().close();
- }
- }
- }
- }
- private void runAllTasks(){
- for (;;) {
- // 通过无限循环,直到把队列里的任务全部捞出来执行掉
- Runnable task = taskQueue.poll();
- if (task == null) {
- return;
- }
- try {
- task.run();
- } catch (Throwable t) {
- logger.warn("A task raised an exception. Task: {}", task, t);
- }
- }
- }
- private void processAcceptEvent(SelectionKey key) throws IOException {
- ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
- SocketChannel socketChannel = ssChannel.accept();
- if(this.childGroup != null){
- // boss/worker模式,boss线程只负责接受和建立连接
- // 将建立的连接交给child线程组去处理后续的读写
- MyNioEventLoop childEventLoop = childGroup.next();
- childEventLoop.execute(()->{
- doRegister(childEventLoop,socketChannel);
- });
- }else{
- doRegister(this,socketChannel);
- }
- }
- private void processConnectEvent(SelectionKey key) throws IOException {
- // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
- // See https://github.com/netty/netty/issues/924
- int ops = key.interestOps();
- ops &= ~SelectionKey.OP_CONNECT;
- key.interestOps(ops);
- SocketChannel socketChannel = (SocketChannel) key.channel();
- if(socketChannel.finishConnect()){
- // 确认完成连接
- logger.info("client channel connected! socketChannel={}",socketChannel);
- }else{
- logger.error("client channel connect failed!");
- // 连接建立失败,连接关闭(上层catch住会关闭连接)
- throw new Error();
- }
- }
- private void processReadEvent(SelectionKey key) throws Exception {
- SocketChannel socketChannel = (SocketChannel)key.channel();
- // 简单起见,buffer不缓存,每次读事件来都新创建一个
- // 暂时也不考虑黏包/拆包场景(Netty中靠ByteToMessageDecoder解决,后续再分析其原理),理想的认为每个消息都小于1024,且每次读事件都只有一个消息
- ByteBuffer readBuffer = ByteBuffer.allocate(64);
- int byteRead = socketChannel.read(readBuffer);
- if(byteRead == -1){
- // 简单起见不考虑tcp半连接的情况,返回-1直接关掉连接
- socketChannel.close();
- // 取消key的监听
- key.cancel();
- }else{
- // 将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
- readBuffer.flip();
- // 根据缓冲区可读字节数创建字节数组
- byte[] bytes = new byte[readBuffer.remaining()];
- // 将缓冲区可读字节数组复制到新建的数组中
- readBuffer.get(bytes);
- if(myEventHandler != null) {
- myEventHandler.fireChannelRead(socketChannel, bytes);
- }
- }
- }
- private void doRegister(SocketChannel socketChannel){
- try {
- // nio的非阻塞channel
- socketChannel.configureBlocking(false);
- socketChannel.finishConnect();
- logger.info("socketChannel={} finishConnect!",socketChannel);
- // 将接受到的连接注册到selector中,并监听read事件
- socketChannel.register(unwrappedSelector, SelectionKey.OP_READ);
- logger.info("socketChannel={} doRegister success!",socketChannel);
- }catch (Exception e){
- logger.error("register socketChannel={} error!",socketChannel,e);
- try {
- socketChannel.close();
- } catch (IOException ex) {
- logger.error("register channel close={} error!",socketChannel,ex);
- }
- }
- }
- }
复制代码 MyNetty EventLoopGroup源码
[code]public class MyNioEventLoopGroup { private final MyNioEventLoop[] executors; private final int nThreads; private final AtomicInteger atomicInteger = new AtomicInteger(); public MyNioEventLoopGroup(MyEventHandler myEventHandler, int nThreads) { this(myEventHandler,nThreads,null); } public MyNioEventLoopGroup(MyEventHandler myEventHandler, int nThreads, MyNioEventLoopGroup childGroup) { if(nThreads |