Netty源码—8.编解码原理
大纲1.读数据入口
2.拆包原理
3.ByteToMessageDecoder解码步骤
4.解码器抽象的解码过程总结
5.Netty里常见的开箱即用的解码器
6.writeAndFlush()方法的大体步骤
7.MessageToByteEncoder的编码步骤
8.unsafe.write()写队列
9.unsafe.flush()刷新写队列
10.如何把对象变成字节流写到unsafe底层
1.读数据入口
当客户端Channel的Reactor线程NioEventLoop检测到有读事件时,会执行NioByteUnsafe的read()方法。该方法会调用doReadBytes()方法将TCP缓冲区的数据读到由ByteBufAllocator分配的一个ByteBuf对象中,然后通过pipeline.fireChannelRead()方法带上这个ByteBuf对象向下传播ChannelRead事件。
在传播的过程中,首先会来到pipeline的head结点的channelRead()方法。该方法会继续带着那个ByteBuf对象向下传播ChannelRead事件,比如会来到ByteToMessageDecoder结点的channelRead()方法。
注意:服务端Channel的unsafe变量是一个NioMessageUnsafe对象,客户端Channel的unsafe变量是一个NioByteUnsafe对象。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
Selector selector;
private SelectedSelectionKeySet selectedKeys;
private boolean needsToSelectAgain;
private int cancelledKeys;
...
@Override
protected void run() {
for (;;) {
...
//1.调用select()方法执行一次事件轮询
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
...
//2.处理产生IO事件的Channel
needsToSelectAgain = false;
processSelectedKeys();
...
//3.执行外部线程放入TaskQueue的任务
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
private void processSelectedKeys() {
if (selectedKeys != null) {
//selectedKeys.flip()会返回一个数组
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
//1.首先取出IO事件
final SelectionKey k = selectedKeys;
if (k == null) {
break;
}
selectedKeys = null;//Help GC
//2.然后获取对应的Channel和处理该Channel
//默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
//网络事件的处理
processSelectedKey(k, (AbstractNioChannel) a);
} else {
//NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
//3.最后判断是否应该再进行一次轮询
if (needsToSelectAgain) {
for (;;) {
i++;
if (selectedKeys == null) {
break;
}
selectedKeys = null;
}
selectAgain();
//selectedKeys.flip()会返回一个数组
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
...
try {
int readyOps = k.readyOps();
...
//新连接已准备接入或者已经存在的连接有数据可读
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
//如果是新连接已准备接入,则调用NioMessageUnsafe的read()方法
//如果是已经存在的连接有数据可读,执行的是NioByteUnsafe的read()方法
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
...
}
public abstract class AbstractNioByteChannel extends AbstractNioChannel {
...
protected class NioByteUnsafe extends AbstractNioUnsafe {
...
@Override
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
//创建ByteBuf分配器
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
do {
//1.分配一个ByteBuf
byteBuf = allocHandle.allocate(allocator);
//2.将数据读取到分配的ByteBuf中
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
break;
}
...
//3.调用DefaultChannelPipeline的fireChannelRead()方法从Head结点开始传播事件
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
//4.调用DefaultChannelPipeline的fireChannelReadComplete()方法从Head结点开始传播事件
pipeline.fireChannelReadComplete();
...
}
}
}(3)基于分隔符解码器
可以向基于分隔符解码器DelimiterBasedFrameDecoder传递一个分隔符列表,这样该解码器就会按照分隔符列表对数据包进行拆分。基于分隔符解码器的decode()方法和基于行分隔符解码器的decode()方法基本类似。
//A decoder that splits the received ByteBufs by one or more delimiters. public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder { private final ByteBuf[] delimiters; private final int maxFrameLength; private final boolean stripDelimiter; private final boolean failFast; private boolean discardingTooLongFrame; private int tooLongFrameLength; private final LineBasedFrameDecoder lineBasedDecoder; ... //Creates a new instance. //@param maxFrameLength,the maximum length of the decoded frame. //A TooLongFrameException is thrown if the length of the frame exceeds this value. //@param stripDelimiter,whether the decoded frame should strip out the delimiter or not //@param failFast,If true, a TooLongFrameException is thrown as soon as the decoder //notices the length of the frame will exceed maxFrameLength regardless of //whether the entire frame has been read. //If false, a TooLongFrameException is thrown after the entire frame that exceeds maxFrameLength has been read. //@param delimiters the delimiters public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) { validateMaxFrameLength(maxFrameLength); if (delimiters == null) { throw new NullPointerException("delimiters"); } if (delimiters.length == 0) { throw new IllegalArgumentException("empty delimiters"); } if (isLineBased(delimiters) && !isSubclass()) { lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast); this.delimiters = null; } else { this.delimiters = new ByteBuf; for (int i = 0; i < delimiters.length; i ++) { ByteBuf d = delimiters; validateDelimiter(d); this.delimiters = d.slice(d.readerIndex(), d.readableBytes()); } lineBasedDecoder = null; } this.maxFrameLength = maxFrameLength; this.stripDelimiter = stripDelimiter; this.failFast = failFast; } //Returns true if the delimiters are "\n" and "\r\n". private static boolean isLineBased(final ByteBuf[] delimiters) { if (delimiters.length != 2) { return false; } ByteBuf a = delimiters; ByteBuf b = delimiters; if (a.capacity() < b.capacity()) { a = delimiters; b = delimiters; } return a.capacity() == 2 && b.capacity() == 1 && a.getByte(0) == '\r' && a.getByte(1) == '\n' && b.getByte(0) == '\n'; } //Return true if the current instance is a subclass of DelimiterBasedFrameDecoder private boolean isSubclass() { return getClass() != DelimiterBasedFrameDecoder.class; } @Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } } //Create a frame out of the {@link ByteBuf} and return it. //@param ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to //@param buffer,the ByteBuf from which to read data //@return frame,the ByteBuf which represent the frame or null if no frame could be created. protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { if (lineBasedDecoder != null) { return lineBasedDecoder.decode(ctx, buffer); } //Try all delimiters and choose the delimiter which yields the shortest frame. int minFrameLength = Integer.MAX_VALUE; ByteBuf minDelim = null; for (ByteBuf delim: delimiters) { int frameLength = indexOf(buffer, delim); if (frameLength >= 0 && frameLength < minFrameLength) { minFrameLength = frameLength; minDelim = delim; } } if (minDelim != null) { int minDelimLength = minDelim.capacity(); ByteBuf frame; if (discardingTooLongFrame) { //We've just finished discarding a very large frame. //Go back to the initial state. discardingTooLongFrame = false; buffer.skipBytes(minFrameLength + minDelimLength); int tooLongFrameLength = this.tooLongFrameLength; this.tooLongFrameLength = 0; if (!failFast) { fail(tooLongFrameLength); } return null; } if (minFrameLength > maxFrameLength) { //Discard read frame. buffer.skipBytes(minFrameLength + minDelimLength); fail(minFrameLength); return null; } if (stripDelimiter) { frame = buffer.readRetainedSlice(minFrameLength); buffer.skipBytes(minDelimLength); } else { frame = buffer.readRetainedSlice(minFrameLength + minDelimLength); } return frame; } else { if (!discardingTooLongFrame) { if (buffer.readableBytes() > maxFrameLength) { //Discard the content of the buffer until a delimiter is found. tooLongFrameLength = buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); discardingTooLongFrame = true; if (failFast) { fail(tooLongFrameLength); } } } else { //Still discarding the buffer since a delimiter is not found. tooLongFrameLength += buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); } return null; } } private void fail(long frameLength) { if (frameLength > 0) { throw new TooLongFrameException("frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded"); } else { throw new TooLongFrameException("frame length exceeds " + maxFrameLength + " - discarding"); } } //Returns the number of bytes between the readerIndex of the haystack and the first needle found in the haystack. //-1 is returned if no needle is found in the haystack. private static int indexOf(ByteBuf haystack, ByteBuf needle) { for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i ++) { int haystackIndex = i; int needleIndex; for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex ++) { if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) { break; } else { haystackIndex ++; if (haystackIndex == haystack.writerIndex() && needleIndex != needle.capacity() - 1) { return -1; } } } if (needleIndex == needle.capacity()) { //Found the needle from the haystack! return i - haystack.readerIndex(); } } return -1; } private static void validateDelimiter(ByteBuf delimiter) { if (delimiter == null) { throw new NullPointerException("delimiter"); } if (!delimiter.isReadable()) { throw new IllegalArgumentException("empty delimiter"); } } private static void validateMaxFrameLength(int maxFrameLength) { if (maxFrameLength
页:
[1]