Netty源码—4.客户端接入流程
大纲1.关于Netty客户端连接接入问题整理
2.Reactor线程模型和服务端启动流程
3.Netty新连接接入的整体处理逻辑
4.新连接接入之检测新连接
5.新连接接入之创建NioSocketChannel
6.新连接接入之绑定NioEventLoop线程
7.新连接接入之注册Selector和注册读事件
8.注册Reactor线程总结
9.新连接接入总结
1.关于Netty客户端连接接入问题整理
一.Netty是在哪里检测有新连接接入的?
答:boss线程第一个过程轮询出ACCEPT事件,然后boss线程第二个过程通过JDK底层Channel的accept()方法创建一条连接。
二.新连接是怎样注册到NioEventLoop线程的?
答:boss线程调用chooser的next()方法拿到一个NioEventLoop,然后将新连接注册到NioEventLoop的Selector上。
2.Reactor线程模型和服务端启动流程
(1)Netty中的Reactor线程模型
(2)服务端启动流程
(1)Netty中的Reactor线程模型
Netty中最核心的是两种类型的Reactor线程,这两种类型的Reactor线程可以看作Netty中的两组发动机,驱动着Netty整个框架的运转。一种类型是boss线程,专门用来接收新连接,然后将连接封装成Channel对象传递给worker线程。另一种类型是worker线程,专门用来处理连接上的数据读写。
boss线程和worker线程所做的事情均分为3步。第一是轮询注册在Selector上的IO事件,第二是处理IO事件,第三是执行异步任务。对boss线程来说,第一步轮询出来的基本都是ACCEPT事件,表示有新的连接。对worker线程来说,第一步轮询出来的基本都是READ事件或WRITE事件,表示网络的读写。
(2)服务端启动流程
服务端是在用户线程中开启的,通过ServerBootstrap.bind()方法,在第一次添加异步任务的时候启动boss线程。启动之后,当前服务器就可以开启监听。
3.Netty新连接接入的整体处理逻辑
新连接接入的处理总体就是:检测新连接 + 注册Reactor线程,具体就可以分为如下4个过程。
一.检测新连接
服务端Channel对应的NioEventLoop会轮询该Channel绑定的Selector中是否发生了ACCEPT事件,如果是则说明有新连接接入了。
二.创建NioSocketChannel
检测出新连接之后,便会基于JDK NIO的Channel创建出一个NioSocketChannel,也就是客户端Channel。
三.分配worker线程及注册Selector
接着Netty给客户端Channel分配一个NioEventLoop,也就是分配worker线程。然后把这个客户端Channel注册到这个NioEventLoop对应的Selector上,之后这个客户端Channel的读写事件都会由这个NioEventLoop进行处理。
四.向Selector注册读事件
最后向这个客户端Channel对应的Selector注册READ事件,注册的逻辑和服务端Channel启动时注册ACCEPT事件的一样。
4.新连接接入之检测新连接
(1)何时会检测到有新连接
(2)新连接接入的流程梳理
(3)新连接接入的总结
(1)何时会检测到有新连接
当调用辅助启动类ServerBootstrap的bind()方法启动服务端之后,服务端的Channel也就是NioServerSocketChannel就会注册到boss的Reactor线程上。boss的Reactor线程会不断检测是否有新的事件,直到检测出有ACCEPT事件发生即有新连接接入。此时boss的Reactor线程将通过服务端Channel的unsafe变量来进行实际操作。
注意:服务端Channel的unsafe变量是一个NioMessageUnsafe对象,客户端Channel的unsafe变量是一个NioByteUnsafe对象。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
private SelectedSelectionKeySet selectedKeys;
private boolean needsToSelectAgain;
private int cancelledKeys;
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
//2.处理产生IO事件的Channel
needsToSelectAgain = false;
processSelectedKeys();
...
//3.执行外部线程放入TaskQueue的任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
private void processSelectedKeys() {
if (selectedKeys != null) {
//selectedKeys.flip()会返回一个数组
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
//1.首先取出IO事件
final SelectionKey k = selectedKeys;
if (k == null) {
break;
}
selectedKeys = null;//Help GC
//2.然后获取对应的Channel和处理该Channel
//默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//网络事件的处理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
//NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
//3.最后判断是否应该再进行一次轮询
if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys == null) {
break;
}
selectedKeys = null;
}
selectAgain();
//selectedKeys.flip()会返回一个数组
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
//If the channel implementation throws an exception because there is no event loop,
//we ignore this because we are only trying to determine if ch is registered to this event loop and thus has authority to close ch.
return;
}
//Only close ch if ch is still registerd to this EventLoop.
//ch could have deregistered from the event loop and thus the SelectionKey could be cancelled as part of the deregistration process,
//but the channel is still healthy and should not be closed.
if (eventLoop != this || eventLoop == null) {
return;
}
//close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
//We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
//the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
//remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
//Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
//Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop
//boss的Reactor线程已经轮询到有ACCEPT事件,即表明有新连接接入
//此时将调用Channel的unsafe变量来进行实际操作
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//进行新连接接入处理
unsafe.read();
if (!ch.isOpen()) {
//Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
...
}(2)新连接接入的流程梳理
一.NioMessageUnsafe的read()方法说明
首先使用一条断言确保该read()方法必须来自Reactor线程调用,然后获得Channel对应的Pipeline和RecvByteBufAllocator.Handle。
接着调用NioServerSocketChannel的doReadMessages()方法不断地读取新连接到readBuf容器。然后使用for循环处理readBuf容器里的新连接,也就是通过pipeline.fireChannelRead()方法让每个新连接都经过一层服务端Channel的Pipeline逻辑处理,最后清理容器并执行pipeline.fireChannelReadComplete()。
//AbstractNioChannel base class for Channels that operate on messages.
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
...
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//临时存放读到的连接NioSocketChannel
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
//断言确保该read()方法必须来自Reactor线程调用
assert eventLoop().inEventLoop();
//获得Channel对应的Pipeline
final ChannelPipeline pipeline = pipeline();
//获得Channel对应的RecvByteBufAllocator.Handle
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
do {
//1.调用NioServerSocketChannel的doReadMessages()方法创建NioSocketChannel
//通过JDK的accept()方法去创建JDK Channel,然后把它包装成Netty自定义的Channel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
} while (allocHandle.continueReading());//控制连接的接入速率,默认一次性读取16个连接
//2.设置并绑定NioSocketChannel
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
//3.清理容器并触发pipeline.fireChannelReadComplete()
readBuf.clear();
pipeline.fireChannelReadComplete();
}
}
//Read messages into the given array and return the amount which was read.
protected abstract int doReadMessages(List<Object> buf) throws Exception;
...
}二.新连接接入的流程梳理
首先会从服务端Channel对应的NioEventLoop的run()方法的第二个步骤处理IO事件开始。然后会调用服务端Channel的unsafe变量的read()方法,也就是NioMessageUnsafe对象的read()方法。
接着循环调用NioServerSocketChannel的doReadMessages()方法来创建新连接对象NioSocketChannel。其中创建新连接对象最核心的方法就是调用JDK Channel的accept()方法来创建JDK Channel。
与服务端启动一样,Netty会把JDK底层Channel包装成Netty自定义的NioSocketChannel。
NioEventLoop.processSelectedKeys(key, channel) //入口
NioMessageUnsafe.read() //新连接接入处理
NioServerSocketChannel.doReadMessages() //创建新连接对象NioSocketChannel
javaChannel.accept() //创建JDK Channel(3)新连接接入的总结
在服务端Channel对应的NioEventLoop的run()方法的processSelectedKeys()方法里,发现产生的IO事件是ACCEPT事件之后,会通过JDK Channel的accept()方法取创建JDK的Channel,并把它包装成Netty自定义的NioSocketChannel。在这个过程中会通过一个RecvByteBufAllocator.Handle对象控制连接接入的速率,默认一次性读取16个连接。
5.新连接接入之创建NioSocketChannel
(1)doReadMessages()方法相关说明
(2)创建NioSocketChannel的流程梳理
(3)创建NioSocketChannel的总结
(4)Netty中的Channel分类
(1)doReadMessages()方法相关说明
首先通过javaChannel().accept()创建一个JDK的Channel,即客户端Channel。然后把服务端Channel和这个客户端Channel作为参数传入NioSocketChannel的构造方法中,从而把JDK的Channel封装成Netty自定义的NioSocketChannel。最后把封装好的NioSocketChannel添加到一个List里,以便外层可以遍历List进行处理。
//A ServerSocketChannel implementation which uses NIO selector based implementation to accept new connections.
public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel {
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private final ServerSocketChannelConfig config;
...
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
//1.创建JDK的Channel
SocketChannel ch = javaChannel().accept();
//2.封装成Netty的Channel,即把服务端Channel和客户端Channel当作参数传递到NioSocketChannel的构造方法里
if (ch != null) {
//先创建一个NioSocketChannel对象,再添加到buf里
buf.add(new NioSocketChannel(this, ch));
return 1;
}
return 0;
}
//Create a new instance
public NioServerSocketChannel() {
//创建服务端Channel
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
//创建服务端Channel
return provider.openServerSocketChannel();
}
//Create a new instance using the given ServerSocketChannel.
public NioServerSocketChannel(ServerSocketChannel channel) {
//创建服务端Channel,关注ACCEPT事件
super(null, channel, SelectionKey.OP_ACCEPT);
//javaChannel().socket()会调用JDK Channel的socket()方法
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
@Override
protected ServerSocketChannel javaChannel() {
//返回一个JDK的Channel -> ServerSocketChannel
return (ServerSocketChannel) super.javaChannel();
}
...
}
//AbstractNioChannel base class for Channels that operate on messages.
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
...
//创建服务端Channel
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
...
}
//SocketChannel which uses NIO selector based implementation.
public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {
private final SocketChannelConfig config;
...
//Create a new instance
//@param parent,the Channel which created this instance or null if it was created by the user
//@param socket,the SocketChannel which will be used
public NioSocketChannel(Channel parent, SocketChannel socket) {
//创建客户端Channel
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
@Override
protected SocketChannel javaChannel() {
//返回一个JDK的Channel -> ServerSocketChannel
return (SocketChannel) super.javaChannel();
}
private final class NioSocketChannelConfigextends DefaultSocketChannelConfig {
private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) {
super(channel, javaSocket);
}
...
}
...
}
//The default SocketChannelConfig implementation.
public class DefaultSocketChannelConfig extends DefaultChannelConfig implements SocketChannelConfig {
protected final Socket javaSocket;
//Creates a new instance.
public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
...
this.javaSocket = javaSocket;
setTcpNoDelay(true);//禁止Nagle算法
...
}
...
}
//AbstractNioChannel base class for Channels that operate on bytes.
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
...
//Create a new instance
//@param parent,the parent Channel by which this instance was created. May be null
//@param ch,the underlying SelectableChannel on which it operates
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
//创建客户端Channel,关注READ事件
super(parent, ch, SelectionKey.OP_READ);
}
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioByteUnsafe();
}
...
}
//Abstract base class for Channel implementations which use a Selector based approach.
public abstract class AbstractNioChannel extends AbstractChannel {
private final SelectableChannel ch;
protected final int readInterestOp;
...
//Create a new instance
//@param parent,the parent Channel by which this instance was created. May be null
//@param ch,the underlying SelectableChannel on which it operates
//@param readInterestOp,the ops to set to receive data from the SelectableChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
ch.configureBlocking(false);
...
}
protected SelectableChannel javaChannel() {
return ch;
}
@Override
public NioUnsafe unsafe() {
return (NioUnsafe) super.unsafe();
}
...
}
//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
...
//Creates a new instance.
//@param parent,the parent of this channel. null if there's no parent.
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
//Returns a new DefaultChannelId instance.
//Subclasses may override this method to assign custom ChannelIds to Channels that use the AbstractChannel#AbstractChannel(Channel) constructor.
protected ChannelId newId() {
return DefaultChannelId.newInstance();
}
//Create a new AbstractUnsafe instance which will be used for the life-time of the Channel
protected abstract AbstractUnsafe newUnsafe();
//Returns a new DefaultChannelPipeline instance.
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
@Override
public Unsafe unsafe() {
return unsafe;
}
@Override
public ChannelPipeline pipeline() {
return pipeline;
}
@Override
public EventLoop eventLoop() {
EventLoop eventLoop = this.eventLoop;
if (eventLoop == null) throw new IllegalStateException("channel not registered to an event loop");
return eventLoop;
}
protected abstract class AbstractUnsafe implements Unsafe {
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
//绑定事件循环器,即绑定一个NioEventLoop到该Channel上
AbstractChannel.this.eventLoop = eventLoop;
//注册Selector,并启动一个NioEventLoop
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
...
//通过启动这个NioEventLoop线程来调用register0()方法将这个服务端Channel注册到Selector上
//其实执行的是SingleThreadEventExecutor的execute()方法
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
...
}
}
...
}
...
}(2)创建NioSocketChannel的流程梳理
NioServerSocketChannel和NioSocketChannel都有同一个父类AbstractNioChannel,所以创建NioSocketChannel的模版和创建NioServerSocketChannel保持一致。
但要注意的是:客户端Channel是通过new关键字创建的,服务端Channel是通过反射的方式创建的。
此外,Nagle算法会让小数据包尽量聚合成大的数据包再发送出去,Netty为了使数据能够及时发送出去会禁止该算法。
new NioSocketChannel(p, ch) //入口,客户端Channel是通过new关键字创建的,服务端Channel是通过反射的方式创建的
new AbstractNioByteChannel(p, ch) //逐层调用父类的构造方法
new AbstractNioChannel(p, ch, op_read) //逐层调用父类的构造方法
ch.configureBlocking(false) + save op //配置此Channel为非阻塞,以及将感兴趣的读事件保存到成员变量以方便后续注册到Selector
new AbstractChannel() //创建Channel的相关组件:
newId() //id作为Channel的唯一标识
newUnsafe() //unsafe用来进行底层数据读写
newChannelPipeline() //pipeline作为业务逻辑载体
new NioSocketChannelConfig() //创建和NioSocketChannel绑定的配置类
setTcpNoDelay(true) //禁止Nagle算法(3)创建NioSocketChannel的总结
创建NioSocketChannel的逻辑可以分成两部分。
第一部分是逐层调用父类的构造方法,其中会设置这个客户端Channel的阻塞模式为false,然后再把感兴趣的读事件OP_READ保存到这个Channel的成员变量中以便后续注册到Selector,接着会创建一系列的组件,包括作为Channel唯一标识的Id组件、用来进行底层数据读写的unsafe组件、用来作为业务逻辑载体的pipeline组件。
第二部分是创建和这个客户端Channel相关的config对象,该config对象会设置关闭Nagle算法,从而让小数据包尽快发送出去、降低延时。
(4)Netty中的Channel分类
说明一:
Channel继承Comparable表示Channel是一个可以比较的对象。
说明二:
Channel继承AttributeMap表示Channel是一个可以绑定属性的对象,我们经常在代码中使用channel.attr(...)来给Channel绑定属性,其实就是把属性设置到AttributeMap中。
说明三:
AbstractChannel用来实现Channel的大部分方法,在AbstractChannel的构造方法中会创建一个Channel对象所包含的基本组件,这里的Channel通常是指SocketChannel和ServerSocketChannel。
说明四:
AbstractNioChannel继承了AbstractChannel,然后通过Selector处理一些NIO相关的操作。比如它会保存JDK底层SelectableChannel的引用,并且在构造方法中设置Channel为非阻塞模式。注意:设置非阻塞模式是NIO编程必须的。
说明五:
Netty的两大Channel是指:服务端的NioServerSocketChannel和客户端NioSocketChannel,分别对应着服务端接收新连接的过程和服务端新连接读写数据的过程。
说明六:
服务端Channel和客户端Channel的区别是:服务端Channel通过反射方式创建,客户端Channel通过new关键字创建。服务端Channel注册的是ACCEPT事件,对应接收新连接。客户端Channel注册的是READ事件,对应新连接读写。服务端Channel和客户端Channel底层都会依赖一个unsafe对象,这个unsafe对象会用来实现这两种Channel底层的数据读写操作。对于读操作,服务端的读是读一条连接doReadMessages(),客户端的读是读取数据doReadBytes()。最后每一个Channel都会绑定一个ChannelConfig,每一个ChannelConfig都会实现Channel的一些配置。
6.新连接接入之绑定NioEventLoop线程
(1)将新连接绑定到Reactor线程的入口
(2)服务端Channel的Pipeline介绍
(3)服务端Channel默认的Pipeline处理器
(4)服务端Channel处理新连接的步骤
(5)总结
(1)将新连接绑定到Reactor线程的入口
创建完NioSocketChannel后,接下来便要对NioSocketChannel进行一些设置,并且需要将它绑定到一个正在执行的Reactor线程中。
NioMessageUnsafe.read()方法里的readBuf容器会承载着所有新建的连接,如果某个时刻Netty轮询到多个连接,那么通过使用for循环就可以批量处理这些NioSocketChannel连接。
处理每个NioSocketChannel连接时,是通过NioServerSocketChannel的pipeline的fireChannelRead()方法来处理的。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
private SelectedSelectionKeySet selectedKeys;
private boolean needsToSelectAgain;
private int cancelledKeys;
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
//2.处理产生IO事件的Channel
needsToSelectAgain = false;
processSelectedKeys();
...
//3.执行外部线程放入TaskQueue的任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
private void processSelectedKeys() {
if (selectedKeys != null) {
//selectedKeys.flip()会返回一个数组
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
//1.首先取出IO事件
final SelectionKey k = selectedKeys;
if (k == null) {
break;
}
selectedKeys = null;//Help GC
//2.然后获取对应的Channel和处理该Channel
//默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//网络事件的处理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
//NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
//3.最后判断是否应该再进行一次轮询
if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys == null) {
break;
}
selectedKeys = null;
}
selectAgain();
//selectedKeys.flip()会返回一个数组
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {
int readyOps = k.readyOps();
//We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
//the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
//remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
//Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
//Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead to a spin loop
//boss的Reactor线程已经轮询到有ACCEPT事件,即表明有新连接接入
//此时将调用Channel的unsafe变量来进行实际操作
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//进行新连接接入处理
unsafe.read();
if (!ch.isOpen()) {
//Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
...
}
//AbstractNioChannel base class for Channels that operate on messages.
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
...
private final class NioMessageUnsafe extends AbstractNioUnsafe {
//临时存放读到的连接NioSocketChannel
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
//断言确保该read()方法必须来自Reactor线程调用
assert eventLoop().inEventLoop();
//获得Channel对应的Pipeline
final ChannelPipeline pipeline = pipeline();
//获得Channel对应的RecvByteBufAllocator.Handle
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
do {
//1.调用NioServerSocketChannel的doReadMessages()方法创建NioSocketChannel
//通过JDK的accept()方法去创建JDK Channel,然后把它包装成Netty自定义的Channel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
} while (allocHandle.continueReading());//控制连接的接入速率,默认一次性读取16个连接
//2.设置并绑定NioSocketChannel
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
//调用DefaultChannelPipeline的fireChannelRead()方法
//开始处理每个NioSocketChannel连接
pipeline.fireChannelRead(readBuf.get(i));
}
//3.清理容器并触发DefaultChannelPipeline的fireChannelReadComplete()方法
readBuf.clear();
//结束处理每个NioSocketChannel连接
pipeline.fireChannelReadComplete();
}
}
//Read messages into the given array and return the amount which was read.
protected abstract int doReadMessages(List<Object> buf) throws Exception;
...
}(2)服务端Channel的Pipeline介绍
在Netty的各种类型的Channel中,都会包含一个Pipeline。Pipeline可理解为一条流水线,流水线有起点有结束,中间还会有各种各样的流水线关卡。对Channel的处理会在流水线的起点开始,然后经过各个流水线关卡的加工,最后到达流水线的终点结束。
流水线Pipeline的开始是HeadContext,结束是TailContext。HeadContext中会调用Unsafe进行具体的操作,TailContext中会向用户抛出流水线Pipeline中未处理异常和未处理消息的警告。
在服务端的启动过程中,Netty会给服务端Channel自动添加一个Pipeline处理器ServerBootstrapAcceptor,并且会将用户代码中设置的一系列参数传入到这个ServerBootstrapAcceptor的构造方法中。
服务端Channel的Pipeline如下所示:
所以服务端Channel的Pipeline在传播ChannelRead事件时首先会从HeadContext处理器开始,然后传播到ServerBootstrapAcceptor处理器,最后传播到TailContext处理器结束。
(3)服务端Channel默认的Pipeline处理器
首先,服务端启动时会给服务端Channel的Pipeline添加一个ServerBootstrapAcceptor处理器。
//Bootstrap sub-class which allows easy bootstrap of ServerChannel
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
...
@Override
void init(Channel channel) throws Exception {
//1.设置服务端Channel的Option与Attr
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
channel.config().setOptions(options);
}
final Map, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//2.设置客户端Channel的Option与Attr
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//3.配置服务端启动逻辑
ChannelPipeline p = channel.pipeline();
//p.addLast()用于定义服务端启动过程中需要执行哪些逻辑
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
//一.添加用户自定义的Handler,注意这是handler,而不是childHandler
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) pipeline.addLast(handler);
//二.添加一个特殊的Handler用于接收新连接
//自定义的childHandler会作为参数传入连接器ServerBootstrapAcceptor
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//调用DefaultChannelPipeline的addLast()方法
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup,
currentChildHandler,
currentChildOptions,
currentChildAttrs)
);
}
});
}
});
}
...
}
//A skeletal Channel implementation.
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private final DefaultChannelPipeline pipeline;
...
//Creates a new instance.
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
@Override
public ChannelPipeline pipeline() {
return pipeline;
}
...
}
//The default ChannelPipeline implementation.
//It is usually created by a Channel implementation when the Channel is created.
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
...
protected DefaultChannelPipeline(Channel channel) {
...
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
...
for (ChannelHandler h: handlers) {
if (h == null) break;
addLast(executor, null, h);
}
return this;
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
...
}
...
}
//往Pipeline中添加ChannelHandler处理器
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
...
}然后,新连接接入调用到服务端Channel的Pipeline的fireChannelRead()方法时,便会触发调用ServerBootstrapAcceptor处理器的channelRead()方法。最终会调用NioEventLoop的register()方法注册这个新连接Channel,即给新连接Channel绑定一个Reactor线程。
//The default ChannelPipeline implementation.//It is usually created by a Channel implementation when the Channel is created.public class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; ... protected DefaultChannelPipeline(Channel channel) { ... tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } @Override public final ChannelPipeline fireChannelRead(Object msg) { //从Pipeline的第一个HeadContext处理器开始调用 AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { ... @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //调用AbstractChannelHandlerContext的fireChannelRead()方法 ctx.fireChannelRead(msg); } @Override public ChannelHandler handler() { return this; } ... } ...}abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint { ... static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { //首先调用的是Pipeline的第一个处理器HeadContext的channelRead()方法 //注意:HeadContext继承了AbstractChannelHandlerContext ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } @Override public ChannelHandlerContext fireChannelRead(final Object msg) { invokeChannelRead(findContextInbound(), msg); return this; } private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { //注意:HeadContext继承了AbstractChannelHandlerContext //所以如果this是HeadContext,那么这里会获取下一个节点ServerBootstrapAcceptor ctx = ctx.next; } while (!ctx.inbound); return ctx; } ...}public class ServerBootstrap extends AbstractBootstrap { ... private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { private final EventLoopGroup childGroup; private final ChannelHandler childHandler; private final Entry
页:
[1]