但婆 发表于 2025-6-3 14:53:31

Netty源码—3.Reactor线程模型二

大纲
5.NioEventLoop的执行总体框架
6.Reactor线程执行一次事件轮询
7.Reactor线程处理产生IO事件的Channel
8.Reactor线程处理任务队列之添加任务
9.Reactor线程处理任务队列之执行任务
10.NioEventLoop总结
 
5.NioEventLoop的执行总体框架
(1)Reactor线程所做的三件事情
(2)处理多久IO事件就执行多久任务
(3)NioEventLoop.run()方法的执行流程
 
(1)Reactor线程所做的三件事情
NioEventLoop的run()方法里有个无限for循环,for循环里便是Reactor线程所要做的3件事情。
 
一.首先是调用select()方法进行一次事件轮询
由于一个NioEventLoop对应一个Selector,所以该select()方法便是轮询注册到这个Reactor线程对应的Selector上的所有Channel的IO事件。注意,select()方法里也有一个无限for循环,但是这个无限for循环可能会被某些条件中断。
 
二.然后调用processSelectedKeys()方法处理轮询出来的IO事件
 
三.最后调用runAllTasks()方法来处理外部线程放入TaskQueue的任务
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    private volatile int ioRatio = 50;
    ...
    @Override
    protected void run() {
      for (;;) {
            ...
            //1.调用select()方法执行一次事件轮询
            select(wakenUp.getAndSet(false));
            if (wakenUp.get()) {
                selector.wakeup();
            }
            ...
            //2.处理产生IO事件的Channel
            processSelectedKeys();
            ...
            //3.执行外部线程放入TaskQueue的任务
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
      }
    }

    private void select(boolean oldWakenUp) throws IOException {
      for(;;) {
            //1.定时任务截止时间快到了,中断本次轮询
            //2.轮询过程中发现有任务加入,中断本次轮询
            //3.阻塞式select操作: selector.select(timeoutMills)
            //4.避免JDK空轮询Bug
      }
    }
    ...
}(2)处理多久IO事件就执行多久任务
在NioEventLoop的run()方法中,有个ioRatio默认是50,代表处理IO事件的时间和执行任务的时间是1:1。也就是执行了多久的processSelectedKeys()方法后,紧接着就执行多久的runAllTasks()方法。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    private volatile int ioRatio = 50;
    ...
    @Override
    protected void run() {
      for (;;) {
            ...
            //1.调用select()方法执行一次事件轮询
            select(wakenUp.getAndSet(false));
            if (wakenUp.get()) {
                selector.wakeup();
            }
            ...
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                  processSelectedKeys();
                } finally {
                  // Ensure we always run tasks.
                  runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                  processSelectedKeys();
                } finally {
                  // Ensure we always run tasks.
                  final long ioTime = System.nanoTime() - ioStartTime;
                  runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
            ...
      }
    }
    ...
}(3)NioEventLoop.run()方法的执行流程
NioEventLoop.run() -> for(;;)
select() //执行一次事件轮询检查是否有IO事件
processSelectedKeys() //处理产生IO事件的Channel
runAllTasks() //处理异步任务队列
//这3步放在一个线程处理应该是为了节约线程,因为不是总会有IO事件和异步任务的 
6.Reactor线程执行一次事件轮询
(1)执行select操作前设置wakeUp变量
(2)定时任务快开始了则中断本次轮询
(3)轮询中发现有任务加入则中断本次轮询
(4)执行阻塞式select操作
(5)避免JDK的空轮询Bug
(6)执行一次事件轮询的总结
 
(1)执行select操作前设置wakeUp变量
NioEventLoop有个wakenUp成员变量表示是否应该唤醒正在阻塞的select操作。NioEventLoop的run()方法准备执行select()方法进行一次新的循环逻辑之前,都会将wakenUp设置成false,标志新一轮循环的开始。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    //Boolean that controls determines if a blocked Selector.select should break out of its selection process.
    //In our case we use a timeout for the select method and the select method will block for that time unless waken up.
    private final AtomicBoolean wakenUp = new AtomicBoolean();
    ...
    @Override
    protected void run() {
      for (;;) {
            ...
            //1.调用select()方法执行一次事件轮询
            select(wakenUp.getAndSet(false));
            if (wakenUp.get()) {
                selector.wakeup();
            }
            ...
      }
    }
    ...
}如下是NioEventLoop的select()方法的执行逻辑,也就是Netty关于事件循环的4段逻辑。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    ...
    private void select(boolean oldWakenUp) throws IOException {
      Selector selector = this.selector;
      for(;;) {
            //1.定时任务截止时间快到了,中断本次轮询
            //2.轮询过程中发现有任务加入,中断本次轮询
            //3.阻塞式select操作: selector.select(timeoutMills)
            //4.避免JDK空轮询Bug
      }
    }
    ...
}(2)定时任务快开始了则中断本次轮询
NioEventLoop中的Reactor线程的select操作也是一个for循环。
 
在for循环第一步,如果发现当前定时任务队列中某个任务的开始时间快到了(小于0.5ms),那么就跳出循环。在跳出循环之前,如果发现目前为止还没有进行过select操作,就调用一次selectNow()方法执行非阻塞式select操作。
 
Netty里的定时任务队列是按照延迟时间从小到大进行排序的,所以delayNanos()方法返回的第一个定时任务的延迟时间便是最早截止的时间。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    ...
    private void select(boolean oldWakenUp) throws IOException {
      Selector selector = this.selector;
      int selectCnt = 0;
      long currentTimeNanos = System.nanoTime();//当前时间
      long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);//当前时间 + 定时任务的最早截止时间
      for(;;) {
            //1.定时任务截止时间快到了,中断本次轮询
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                  selector.selectNow();//非阻塞执行select操作
                  selectCnt = 1;
                }
                break;
            }
            ...
      }
    }
    ...
}

//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    ...
    protected long delayNanos(long currentTimeNanos) {
      ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
      if (scheduledTask == null) {
            return SCHEDULE_PURGE_INTERVAL;
      }
      return scheduledTask.delayNanos(currentTimeNanos);
    }
    ...
}

//Abstract base class for EventExecutors that want to support scheduling.
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
    Queue<ScheduledFutureTask<?>> scheduledTaskQueue;//定时任务队列
    ...
    final ScheduledFutureTask<?> peekScheduledTask() {
      Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
      if (scheduledTaskQueue == null) {
            return null;
      }
      return scheduledTaskQueue.peek();
    }
    ...
}

final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V> {
    ...
    public long delayNanos(long currentTimeNanos) {
      return Math.max(0, deadlineNanos() - (currentTimeNanos - START_TIME));
    }
    public long deadlineNanos() {
      return deadlineNanos;
    }
    ...

9.Reactor线程处理任务队列之执行任务
(1)runAllTasks()方法需要传入超时时间
(2)Reactor线程执行任务的步骤
(3)Netty性能优化之批量策略
(4)NioEventLoop.run()方法执行任务总结
 
(1)runAllTasks()方法需要传入超时时间
SingleThreadEventExecutor的runAllTasks()方法需要传入参数timeoutNanos,表示尽量在timeoutNanos时间内将所有的任务都取出来执行一遍。因为如果Reactor线程在执行任务时停留的时间过长,那么将会累积许多IO事件无法及时处理,从而导致大量客户端请求阻塞。因此Netty会精细控制内部任务队列的执行时间。
 
(2)Reactor线程执行任务的步骤
一.任务聚合
转移定时任务到MPSC队列,这里只是将快到期的定时任务转移到MPSC队列里。
 
二.时间计算
计算本轮任务执行的截止时间,此时所有截止时间已到达的定时任务均被填充到普通的任务队列(MPSC队列)里了。
 
三.任务执行
首先不抛异常地同步执行任务,然后累加当前已执行的任务数,接着每隔64次计算一下当前时间是否已超截止时间,最后判断本轮任务是否已经执行完毕。
//Abstract base class for OrderedEventExecutor's that execute all its submitted tasks in a single thread.public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {    //每一个NioEventLoop会有一个MPSC队列    private final Queue taskQueue;    ...      //Poll all tasks from the task queue and run them via Runnable#run() method.    //This method stops running the tasks in the task queue and returns if it ran longer than timeoutNanos.    protected boolean runAllTasks(long timeoutNanos) {      //1.转移定时任务到MPSC队列,也就是任务聚合      fetchFromScheduledTaskQueue();      //从普通的任务队列(MPSC队列)里获取任务      Runnable task = pollTask();      if (task == null) {            afterRunningAllTasks();            return false;      }      //2.计算本轮任务执行的截止时间      final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;      long runTasks = 0;      long lastExecutionTime;      //3.执行任务,通过for循环逐个执行pollTask()取出的任务      for (;;) {            //3.1 不抛异常地执行任务(同步阻塞),确保任务可以安全执行            safeExecute(task);            //3.2 累加当前已执行的任务数            runTasks ++;            //3.3 每隔64次计算一下当前时间是否已经超过截止时间,因为ScheduledFutureTask.nanoTime()也挺耗时的            if ((runTasks & 0x3F) == 0) {                lastExecutionTime = ScheduledFutureTask.nanoTime();                if (lastExecutionTime >= deadline) {                  break;                }            }            //3.4 判断本轮任务是否已经执行完毕            task = pollTask();            if (task == null) {                lastExecutionTime = ScheduledFutureTask.nanoTime();                break;            }      }      afterRunningAllTasks();      this.lastExecutionTime = lastExecutionTime;      return true;    }      private boolean fetchFromScheduledTaskQueue() {      long nanoTime = AbstractScheduledEventExecutor.nanoTime();      Runnable scheduledTask= pollScheduledTask(nanoTime);      while (scheduledTask != null) {            if (!taskQueue.offer(scheduledTask)) {                scheduledTaskQueue().add((ScheduledFutureTask) scheduledTask);                return false;            }            scheduledTask= pollScheduledTask(nanoTime);      }      return true;    }      protected Runnable pollTask() {      assert inEventLoop();      return pollTaskFrom(taskQueue);    }    protected final Runnable pollTaskFrom(Queue taskQueue) {      for (;;) {            Runnable task = taskQueue.poll();            if (task == WAKEUP_TASK) {                continue;            }            return task;      }    }    ...}//Abstract base class for EventExecutors that want to support scheduling.public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {    Queue> scheduledTaskQueue = this.scheduledTaskQueue;      ScheduledFutureTask scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();      if (scheduledTask == null) {            return null;      }      if (scheduledTask.deadlineNanos()
页: [1]
查看完整版本: Netty源码—3.Reactor线程模型二