黎娅茜 发表于 2025-6-3 13:37:03

Netty源码—8.编解码原理

大纲
1.读数据入口
2.拆包原理
3.ByteToMessageDecoder解码步骤
4.解码器抽象的解码过程总结
5.Netty里常见的开箱即用的解码器
6.writeAndFlush()方法的大体步骤
7.MessageToByteEncoder的编码步骤
8.unsafe.write()写队列
9.unsafe.flush()刷新写队列
10.如何把对象变成字节流写到unsafe底层
 
1.读数据入口
当客户端Channel的Reactor线程NioEventLoop检测到有读事件时,会执行NioByteUnsafe的read()方法。该方法会调用doReadBytes()方法将TCP缓冲区的数据读到由ByteBufAllocator分配的一个ByteBuf对象中,然后通过pipeline.fireChannelRead()方法带上这个ByteBuf对象向下传播ChannelRead事件。
 
在传播的过程中,首先会来到pipeline的head结点的channelRead()方法。该方法会继续带着那个ByteBuf对象向下传播ChannelRead事件,比如会来到ByteToMessageDecoder结点的channelRead()方法。
 
注意:服务端Channel的unsafe变量是一个NioMessageUnsafe对象,客户端Channel的unsafe变量是一个NioByteUnsafe对象。
//SingleThreadEventLoop implementation which register the Channel's to a Selector and so does the multi-plexing of these in the event loop.
public final class NioEventLoop extends SingleThreadEventLoop {
    Selector selector;
    private SelectedSelectionKeySet selectedKeys;
    private boolean needsToSelectAgain;
    private int cancelledKeys;
    ...
    @Override
    protected void run() {
        for (;;) {
            ...
            //1.调用select()方法执行一次事件轮询
            select(wakenUp.getAndSet(false));
            if (wakenUp.get()) {
                selector.wakeup();
            }
            ...
            //2.处理产生IO事件的Channel
            needsToSelectAgain = false;
            processSelectedKeys();
            ...
            //3.执行外部线程放入TaskQueue的任务
            runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
        }
    }
    
    private void processSelectedKeys() {
        if (selectedKeys != null) {
            //selectedKeys.flip()会返回一个数组
            processSelectedKeysOptimized(selectedKeys.flip());
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }
    
    private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
        for (int i = 0;; i ++) {
            //1.首先取出IO事件
            final SelectionKey k = selectedKeys;
            if (k == null) {
                break;
            }
            selectedKeys = null;//Help GC
            //2.然后获取对应的Channel和处理该Channel
            //默认情况下,这个a就是NioChannel,也就是服务端启动时经过Netty封装的Channel
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                //网络事件的处理
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                //NioTask主要用于当一个SelectableChannel注册到Selector时,执行的一些任务
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }
            //3.最后判断是否应该再进行一次轮询
            if (needsToSelectAgain) {
                for (;;) {
                    i++;
                    if (selectedKeys == null) {
                        break;
                    }
                    selectedKeys = null;
                }
                selectAgain();
                //selectedKeys.flip()会返回一个数组
                selectedKeys = this.selectedKeys.flip();
                i = -1;
            }
        }
    }
    
    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        ...
        try {
            int readyOps = k.readyOps();
            ...
            //新连接已准备接入或者已经存在的连接有数据可读
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                //如果是新连接已准备接入,则调用NioMessageUnsafe的read()方法
                //如果是已经存在的连接有数据可读,执行的是NioByteUnsafe的read()方法
                unsafe.read();
                if (!ch.isOpen()) {
                    return;
                }
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }
    ...
}

public abstract class AbstractNioByteChannel extends AbstractNioChannel {
    ...
    protected class NioByteUnsafe extends AbstractNioUnsafe {
        ...
        @Override
        public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            //创建ByteBuf分配器
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            do {
                //1.分配一个ByteBuf
                byteBuf = allocHandle.allocate(allocator);
                //2.将数据读取到分配的ByteBuf中
                allocHandle.lastBytesRead(doReadBytes(byteBuf));
                if (allocHandle.lastBytesRead() <= 0) {
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    break;
                }
                ...
                //3.调用DefaultChannelPipeline的fireChannelRead()方法从Head结点开始传播事件
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            //4.调用DefaultChannelPipeline的fireChannelReadComplete()方法从Head结点开始传播事件
            pipeline.fireChannelReadComplete();
            ...
        }
    }
}(3)基于分隔符解码器
可以向基于分隔符解码器DelimiterBasedFrameDecoder传递一个分隔符列表,这样该解码器就会按照分隔符列表对数据包进行拆分。基于分隔符解码器的decode()方法和基于行分隔符解码器的decode()方法基本类似。
//A decoder that splits the received ByteBufs by one or more delimiters.  public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {    private final ByteBuf[] delimiters;    private final int maxFrameLength;    private final boolean stripDelimiter;    private final boolean failFast;    private boolean discardingTooLongFrame;    private int tooLongFrameLength;    private final LineBasedFrameDecoder lineBasedDecoder;    ...    //Creates a new instance.    //@param maxFrameLength,the maximum length of the decoded frame.    //A TooLongFrameException is thrown if the length of the frame exceeds this value.    //@param stripDelimiter,whether the decoded frame should strip out the delimiter or not    //@param failFast,If true, a TooLongFrameException is thrown as soon as the decoder     //notices the length of the frame will exceed maxFrameLength regardless of     //whether the entire frame has been read.    //If false, a TooLongFrameException is thrown after the entire frame that exceeds maxFrameLength has been read.    //@param delimiters  the delimiters    public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {        validateMaxFrameLength(maxFrameLength);        if (delimiters == null) {            throw new NullPointerException("delimiters");        }        if (delimiters.length == 0) {            throw new IllegalArgumentException("empty delimiters");        }        if (isLineBased(delimiters) && !isSubclass()) {            lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);            this.delimiters = null;        } else {            this.delimiters = new ByteBuf;            for (int i = 0; i < delimiters.length; i ++) {                ByteBuf d = delimiters;                validateDelimiter(d);                this.delimiters = d.slice(d.readerIndex(), d.readableBytes());            }            lineBasedDecoder = null;        }        this.maxFrameLength = maxFrameLength;        this.stripDelimiter = stripDelimiter;        this.failFast = failFast;    }        //Returns true if the delimiters are "\n" and "\r\n".    private static boolean isLineBased(final ByteBuf[] delimiters) {        if (delimiters.length != 2) {            return false;        }        ByteBuf a = delimiters;        ByteBuf b = delimiters;        if (a.capacity() < b.capacity()) {            a = delimiters;            b = delimiters;        }        return a.capacity() == 2 && b.capacity() == 1            && a.getByte(0) == '\r' && a.getByte(1) == '\n'            && b.getByte(0) == '\n';    }    //Return true if the current instance is a subclass of DelimiterBasedFrameDecoder    private boolean isSubclass() {        return getClass() != DelimiterBasedFrameDecoder.class;    }    @Override    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {        Object decoded = decode(ctx, in);        if (decoded != null) {            out.add(decoded);        }    }    //Create a frame out of the {@link ByteBuf} and return it.    //@param   ctx,the ChannelHandlerContext which this ByteToMessageDecoder belongs to    //@param   buffer,the ByteBuf from which to read data    //@return  frame,the ByteBuf which represent the frame or null if no frame could be created.    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {        if (lineBasedDecoder != null) {            return lineBasedDecoder.decode(ctx, buffer);        }        //Try all delimiters and choose the delimiter which yields the shortest frame.        int minFrameLength = Integer.MAX_VALUE;        ByteBuf minDelim = null;        for (ByteBuf delim: delimiters) {            int frameLength = indexOf(buffer, delim);            if (frameLength >= 0 && frameLength < minFrameLength) {                minFrameLength = frameLength;                minDelim = delim;            }        }        if (minDelim != null) {            int minDelimLength = minDelim.capacity();            ByteBuf frame;            if (discardingTooLongFrame) {                //We've just finished discarding a very large frame.                //Go back to the initial state.                discardingTooLongFrame = false;                buffer.skipBytes(minFrameLength + minDelimLength);                int tooLongFrameLength = this.tooLongFrameLength;                this.tooLongFrameLength = 0;                if (!failFast) {                    fail(tooLongFrameLength);                }                return null;            }            if (minFrameLength > maxFrameLength) {                //Discard read frame.                buffer.skipBytes(minFrameLength + minDelimLength);                fail(minFrameLength);                return null;            }            if (stripDelimiter) {                frame = buffer.readRetainedSlice(minFrameLength);                buffer.skipBytes(minDelimLength);            } else {                frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);            }            return frame;        } else {            if (!discardingTooLongFrame) {                if (buffer.readableBytes() > maxFrameLength) {                    //Discard the content of the buffer until a delimiter is found.                    tooLongFrameLength = buffer.readableBytes();                    buffer.skipBytes(buffer.readableBytes());                    discardingTooLongFrame = true;                    if (failFast) {                        fail(tooLongFrameLength);                    }                }            } else {                //Still discarding the buffer since a delimiter is not found.                tooLongFrameLength += buffer.readableBytes();                buffer.skipBytes(buffer.readableBytes());            }            return null;        }    }        private void fail(long frameLength) {        if (frameLength > 0) {            throw new TooLongFrameException("frame length exceeds " + maxFrameLength + ": " + frameLength + " - discarded");        } else {            throw new TooLongFrameException("frame length exceeds " + maxFrameLength + " - discarding");        }    }    //Returns the number of bytes between the readerIndex of the haystack and the first needle found in the haystack.      //-1 is returned if no needle is found in the haystack.    private static int indexOf(ByteBuf haystack, ByteBuf needle) {        for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i ++) {            int haystackIndex = i;            int needleIndex;            for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex ++) {                if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {                    break;                } else {                    haystackIndex ++;                    if (haystackIndex == haystack.writerIndex() && needleIndex != needle.capacity() - 1) {                        return -1;                    }                }            }            if (needleIndex == needle.capacity()) {                //Found the needle from the haystack!                return i - haystack.readerIndex();            }        }        return -1;    }    private static void validateDelimiter(ByteBuf delimiter) {        if (delimiter == null) {            throw new NullPointerException("delimiter");        }        if (!delimiter.isReadable()) {            throw new IllegalArgumentException("empty delimiter");        }    }    private static void validateMaxFrameLength(int maxFrameLength) {        if (maxFrameLength
页: [1]
查看完整版本: Netty源码—8.编解码原理