站竣凰 发表于 2025-6-3 14:52:28

Netty源码—2.Reactor线程模型一

大纲
1.关于NioEventLoop的问题整理
2.理解Reactor线程模型主要分三部分
3.NioEventLoop的创建
4.NioEventLoop的启动
 
1.关于NioEventLoop的问题整理
一.默认下Netty服务端起多少线程及何时启动?
答:默认是2倍CPU核数个线程。在调用EventExcutor的execute(task)方法时,会判断当前线程是否为Netty的Reactor线程,也就是判断当前线程是否为NioEventLoop对应的线程实体。如果是,则说明Netty的Reactor线程已经启动了。如果不是,则说明是外部线程调用EventExcutor的execute()方法。于是会先调用startThread()方法判断当前线程是否已被启动,如果还没有被启动就启动当前线程作为Netty的Reactor线程。
 
二.Netty是如何解决JDK空轮询的?
答:Netty会判断如果当前阻塞的一个Select()操作并没有花那么长时间,那么就说明此时有可能触发了空轮询Bug。默认情况下如果这个现象达到512次,那么就重建一个Selector,并且把之前Selector上所有的key重新移交到新Selector上。通过以上这种处理方式来避免JDK空轮询Bug。
 
三.Netty是如何保证异步串行无锁化的?
答:异步串行无锁化有两个场景。
场景一:拿到客户端一个Channel,不需要对该Channel进行同步,直接就可以多线程并发读写。
场景二:ChannelHandler里的所有操作都是线程安全的,不需要进行同步。
 
Netty在所有外部线程去调用EventLoop或者Channel的方法时,会通过inEventLoop()方法来判断出当前线程是外部线程(非NioEventLoop的线程实体)。在这种情况下,会把所有操作都封装成一个Task放入MPSC队列,然后在NioEventLoop的执行逻辑也就是run()方法里,这些Task会被逐个执行。
 
2.理解Reactor线程模型主要分三部分
一.NioEventLoop的创建
二.NioEventLoop的启动
三.NioEventLoop的执行
 
3.NioEventLoop的创建
(1)创建入口
(2)确定NioEventLoop的个数
(3)NioEventLoopGroup的创建流程
(4)创建线程执行器ThreadPerTaskExecutor
(5)创建NioEventLoop
(6)创建线程选择器EventExecutorChooser
(7)NioEventLoopGroup的创建总结
 
(1)创建入口
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();(2)确定NioEventLoop的个数
由NioEventLoopGroup的构造方法来确定NioEventLoop的个数。如果NioEventLoopGroup没有传递构造参数,那么NioEventLoop线程的个数为CPU核数的2倍。如果NioEventLoopGroup传递了参数n,那么NioEventLoop线程的个数就是n。
 
(3)NioEventLoopGroup的创建流程
NioEventLoopGroup的构造方法会触发创建流程。
一.创建线程执行器ThreadPerTaskExecutor
每次调用ThreadPerTaskExecutor.execute()方法时都会创建一个线程。
二.创建NioEventLoop
NioEventLoop对应NioEventLoopGroup线程池里的线程,NioEventLoopGroup的构造方法会用一个for循环通过调用newChild()方法来创建NioEventLoop线程。
三.创建线程选择器EventExecutorChooser
线程选择器的作用是用于给每个新连接分配一个NioEventLoop线程,也就是从NioEventLoopGroup线程池中选择一个NioEventLoop线程来处理新连接。
//MultithreadEventLoopGroup implementations which is used for NIO Selector based Channels.
public class NioEventLoopGroup extends MultithreadEventLoopGroup {

    //Create a new instance using the default number of threads,
    //the default ThreadFactory and the SelectorProvider which is returned by SelectorProvider#provider().
    public NioEventLoopGroup() {
      this(0);
    }
   
    //Create a new instance using the specified number of threads,
    //ThreadFactory and the SelectorProvider which is returned by SelectorProvider#provider().
    public NioEventLoopGroup(int nThreads) {
      this(nThreads, (Executor) null);
    }
   
    public NioEventLoopGroup(int nThreads, Executor executor) {
      this(nThreads, executor, SelectorProvider.provider());
    }
   
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
      this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }
   
    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
      super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }
    ...
}

//Abstract base class for EventLoopGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);
    private static final int DEFAULT_EVENT_LOOP_THREADS;
    static {
      DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
      if (logger.isDebugEnabled()) logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
    }

    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
      super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    ...
}

//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    private final EventExecutor[] children;
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;
    ...
    //Create a new instance.
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
      this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }
   
    //Create a new instance.
    //@param nThreads,the number of threads that will be used by this instance.
    //@param executor,the Executor to use, or null if the default should be used.
    //@param chooserFactory,the EventExecutorChooserFactory to use.
    //@param args,arguments which will passed to each #newChild(Executor, Object...) call
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
      if (nThreads <= 0) throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
      //1.创建ThreadPerTaskExecutor线程执行器
      if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
      //2.创建NioEventLoop
      children = new EventExecutor;
      for (int i = 0; i < nThreads; i ++) {
            ...
            //创建每一个NioEventLoop时,都会调用newChild()方法给每一个NioEventLoop配置一些核心参数
            //传入线程执行器executor去创建NioEventLoop
            children = newChild(executor, args);
      }
      //3.创建线程选择器
      chooser = chooserFactory.newChooser(children);
      ...
    }
    ...
}(5)创建NioEventLoop
说明一:
由MultithreadEventExecutorGroup的构造方法可知,Netty会使用for循环 + newChild()方法来创建nThreads个NioEventLoop,而且一个NioEventLoop对应一个线程实体FastThreadLocalThread。
new NioEventLoopGroup() //线程组,线程个数默认为2 * CPU核数
new ThreadPerTaskExecutor() //创建线程执行器,作用是负责创建NioEventLoop对应的线程
for(...) { newChild() } //构造NioEventLoop,创建NioEventLoop线程组
chooserFactory.newChooser() //线程选择器,用于给每个新连接分配一个NioEventLoop线程说明四:
NioEventLoop的构造方法还会调用其父类的父类SingleThreadEventExecutor的构造方法。SingleThreadEventExecutor的构造方法里有两个关键的操作:一是把线程执行器保存起来,因为后面创建NioEventLoop对应的线程时要用到。二是创建一个MPSC任务队列,因为Netty中所有异步执行的本质都是通过该任务队列来协调完成的。
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    private final EventExecutor[] children;
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;
    ...   
    //Create a new instance.
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {
      if (nThreads <= 0) throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
      //1.创建ThreadPerTaskExecutor线程执行器
      if (executor == null) executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
      //2.创建NioEventLoop
      children = new EventExecutor;
      for (int i = 0; i < nThreads; i ++) {
            ...
            //创建每一个NioEventLoop时,都会调用newChild()方法给每一个NioEventLoop配置一些核心参数
            //传入线程执行器executor去创建NioEventLoop
            children = newChild(executor, args);
      }
      //3.创建线程选择器
      chooser = chooserFactory.newChooser(children);
      ...
    }
   
    protected ThreadFactory newDefaultThreadFactory() {
      //getClass()是获取该方法所属的对象类型,也就是NioEventLoopGroup类型
      //因为是通过NioEventLoopGroup的构造方法层层调用到这里的
      return new DefaultThreadFactory(getClass());
    }
    ...
}

public final class ThreadPerTaskExecutor implements Executor {
    private final ThreadFactory threadFactory;
    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
      if (threadFactory == null) throw new NullPointerException("threadFactory");
      this.threadFactory = threadFactory;
    }
   
    @Override
    public void execute(Runnable command) {
      //调用DefaultThreadFactory的newThread()方法执行Runnable任务
      threadFactory.newThread(command).start();
    }
}

//A ThreadFactory implementation with a simple naming rule.
public class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolId = new AtomicInteger();
    private final AtomicInteger nextId = new AtomicInteger();
    private final boolean daemon;
    private final int priority;
    protected final ThreadGroup threadGroup;
    ...
    public DefaultThreadFactory(Class<?> poolType) {
      this(poolType, false, Thread.NORM_PRIORITY);
    }
   
    public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
      //toPoolName()方法会把NioEventLoopGroup的首字母变成小写
      this(toPoolName(poolType), daemon, priority);
    }
   
    public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
      this(poolName, daemon, priority,
      System.getSecurityManager() == null? Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
    }
   
    public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
      ...
      //prefix用来标记线程名字的前缀
      prefix = poolName + '-' + poolId.incrementAndGet() + '-';
      this.daemon = daemon;
      this.priority = priority;
      this.threadGroup = threadGroup;
    }
   
    @Override
    public Thread newThread(Runnable r) {
      Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());
      if (t.isDaemon()) {
            if (!daemon) t.setDaemon(false);
      } else {
            if (daemon) t.setDaemon(true);
      }
      if (t.getPriority() != priority) t.setPriority(priority);
      return t;
    }
   
    protected Thread newThread(Runnable r, String name) {
      return new FastThreadLocalThread(threadGroup, r, name);
    }
    ...
}MPSC队列也就是多生产者单消费者队列。单消费者是指某个NioEventLoop对应的线程(执行其run()方法的那个线程)。多生产者就是这个NioEventLoop对应的线程之外的线程,通常情况下就是我们的业务线程。比如,一些线程在调用writeAndFlush()方法时可以不用考虑线程安全而随意调用,那么这些线程就是多生产者。
 
MPSC队列是通过JCTools这个工具包来实现的,Netty的高性能很大程度上要归功于这个工具包。MPSC的全称是Muti Producer Single Consumer。Muti Producer对应的是外部线程,Single Consumer对应的是Netty的NioEventLoop线程。外部线程在执行Netty的一些任务时,如果判断不是由NioEventLoop对应的线程执行的,就会直接放入一个任务队列里,然后由一个NioEventLoop对应的线程去执行。
 
创建NioEventLoop总结:
NioEventLoopGroup的newChild()方法创建NioEventLoop时做了三项事情:一.创建一个Selector用于轮询注册到该NioEventLoop上的连接,二.创建一个MPSC任务队列,三.保存线程执行器到NioEventLoop。
 
(6)创建线程选择器EventExecutorChooser
说明一:
在传统的BIO编程中,一个新连接被创建后,通常需要给这个连接绑定一个Selector,之后这个连接的整个生命周期都由这个Selector管理。
 
说明二:
创建NioEventLoop时会创建一个Selector,所以一个Selector会对应一个NioEventLoop,一个NioEventLoop上会有一个Selector。线程选择器的作用就是为一个连接在NioEventLoopGroup中选择一个NioEventLoop,从而将该连接绑定到这个NioEventLoop的Selector上。
 
说明三:
根据MultithreadEventExecutorGroup的构造方法,会使用DefaultEventExecutorChooserFactory的newChooser()方法来创建线程选择器。创建好线程选择器EventExecutorChooser之后,便可以通过其next()方法获取一个NioEventLoop。
 
Netty通过判断NioEventLoopGroup中的NioEventLoop个数是否是2的幂来创建不同的线程选择器。但不管是哪一种选择器,最终效果都是从第一个NioEventLoop开始遍历到最后一个NioEventLoop,然后再从第一个NioEventLoop开始,如此循环。
//Abstract base class for EventExecutorGroup implementations that handles their tasks with multiple threads at the same time.public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {    private final EventExecutor[] children;    private final EventExecutorChooserFactory.EventExecutorChooser chooser;    ...    //Create a new instance.    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {      this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);    }      //Create a new instance.    //@param nThreads,the number of threads that will be used by this instance.    //@param executor,the Executor to use, or null if the default should be used.    //@param chooserFactory,the EventExecutorChooserFactory to use.    //@param args,arguments which will passed to each #newChild(Executor, Object...) call    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {      if (nThreads
页: [1]
查看完整版本: Netty源码—2.Reactor线程模型一