2021-05-05 16:51  阅读(89)
文章分类:Netty 学习之旅 文章标签:NettyNetty 学习
©  原文作者:中间件兴趣圈 原文地址:https://blog.csdn.net/prestigeding/article/details/53977445

非阻塞IO,存在一个特殊的问题,就是半包问题。Netty 为了屏蔽底层的半包问题,提供编码解码器。编码解码器在 Netty 里编织为一个个 Handler。本文重点分析一下消息解码器 ByteToMessageDecoder 的实现原理,该类的职责就是将字节流中解析为一个一个有效的客户端请求报文。

1、ByteToMessageDecoder 类概述

        /**
         * A {@link ChannelHandler} which decodes bytes in a stream-like fashion from one {@link ByteBuf} to an
         * other Message type.
         *
         * For example here is an implementation which reads all readable bytes from
         * the input {@link ByteBuf} and create a new {@link ByteBuf}.
         *
         * <pre>
         *     public class SquareDecoder extends {@link ByteToMessageDecoder} {
         *         {@code @Override}
         *         public void decode({@link ChannelHandlerContext} ctx, {@link ByteBuf} in, List<Object> out)
         *                 throws {@link Exception} {
         *             out.add(in.readBytes(in.readableBytes()));
         *         }
         *     }
         * </pre>
         *
         * <h3>Frame detection</h3>
         * <p>
         * Generally frame detection should be handled earlier in the pipeline by adding a
         * {@link DelimiterBasedFrameDecoder}, {@link FixedLengthFrameDecoder}, {@link LengthFieldBasedFrameDecoder},
         * or {@link LineBasedFrameDecoder}.
         * <p>
         * If a custom frame decoder is required, then one needs to be careful when implementing
         * one with {@link ByteToMessageDecoder}. Ensure there are enough bytes in the buffer for a
         * complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes
         * for a complete frame, return without modifying the reader index to allow more bytes to arrive.
         * <p>
         * To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.
         * One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}.
         * For example calling <tt>in.getInt(0)</tt> is assuming the frame starts at the beginning of the buffer, which
         * is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
         * <h3>Pitfalls</h3>
         * <p>
         * Be aware that sub-classes of {@link ByteToMessageDecoder} <strong>MUST NOT</strong>
         * annotated with {@link @Sharable}.
         * <p>
         * Some methods such as {@link ByteBuf#readBytes(int)} will cause a memory leak if the returned buffer
         * is not released or added to the <tt>out</tt> {@link List}. Use derived buffers like {@link ByteBuf#readSlice(int)}
         * to avoid leaking memory.
         */
  • 如果需要定制一个解码器的实现时,在检测是否有足够的字节到达(包含一个完整的请求)时,如果没有足够的字节到达时,不要改变累积缓存区(buufer)的readerIndex,writerIndex值。可以使用类似getInt(index)等。
  • 注意及时释放相关ByteBuf,避免内存泄漏。

2、ByteToMessageDecoder源码分析

2.1 累积器的实现原理

        public static final Cumulator MERGE_CUMULATOR = new Cumulator() { 
                @Override
                public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {  //@1
                    ByteBuf buffer;
                    if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()    //@2
                            || cumulation.refCnt() > 1) {   //@3
                        // Expand cumulation (by replace it) when either there is not more room in the buffer
                        // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                        // duplicate().retain().
                        //
                        // See:
                        // - https://github.com/netty/netty/issues/2327
                        // - https://github.com/netty/netty/issues/1764
                        buffer = expandCumulation(alloc, cumulation, in.readableBytes());    //@4
                    } else {
                        buffer = cumulation;    
                    }
                    buffer.writeBytes(in);   // @5
                    in.release();                  //@6
                    return buffer;
                }
            };

代码@1:参数1:具体的内存分配器,参数2:已累计接收的自己缓冲,参数3:本次读取的字节缓冲区。

代码@2:首先检测当前的累积缓存区是否能够容纳新增加的ByteBuf,如果容量不够,则需要扩展ByteBuf,为了避免内存泄漏,手动去扩展。

代码@3:如果累积缓存区引用数超过1,也需要扩展。

代码@4:扩充累积缓存区。

代码@5:将新输入的字节写入到累积缓存区。

代码@6:释放缓存区。

代码@4:扩展缓冲区实现。

        static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
                ByteBuf oldCumulation = cumulation;
                cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
                cumulation.writeBytes(oldCumulation);
                oldCumulation.release();
                return cumulation;
            }

这里有个非常关键的点:每次扩展的时候,都是产生一个新的累积缓存区,这里主要是确保每一次通道读,所涉及的缓存区不是同一个,这样减少释放跟踪的难度,避免内存泄露。

2.2 相关事件处理

解码器 ByteToMessageDecoder 在 Netty 中属于 InBound(输入方向)。主要关注的事件包括 handlerRemoved、channelReader、channelReadComplete、channelInactive。

2.2.1 handlerRemoved事件

handler从ChannelPipeline中移除时调用。

        @Override
            public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
                ByteBuf buf = internalBuffer();
                int readable = buf.readableBytes();
                if (readable > 0) {
                    ByteBuf bytes = buf.readBytes(readable);
                    buf.release();
                    ctx.fireChannelRead(bytes);
                    ctx.fireChannelReadComplete();
                } else {
                    buf.release();
                }
                cumulation = null;
                handlerRemoved0(ctx);
            }

该方法主要的实现思路是,如果内部的累积缓存区可读,则需要将剩余的字节处理,然后释放内部累积缓存区,并设置为空,然后提供一个钩子函数,供子类去实现。handlerRemoved0(ctx)。

2.2.2 channelRead 通道读事件

        @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                if (msg instanceof ByteBuf) {    //@1
                    RecyclableArrayList out = RecyclableArrayList.newInstance();  //@2
                    try {
                        ByteBuf data = (ByteBuf) msg;                                                   
                        first = cumulation == null;                                                          //@3
                        if (first) {
                            cumulation = data;
                        } else {
                            cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);   //@4
                        }
                        callDecode(ctx, cumulation, out);                                                             //@5
                    } catch (DecoderException e) {
                        throw e;
                    } catch (Throwable t) {
                        throw new DecoderException(t);
                    } finally {
                        if (cumulation != null && !cumulation.isReadable()) {     //@6
                            cumulation.release();
                            cumulation = null;
                        }
                        int size = out.size();
    
                        for (int i = 0; i < size; i ++) {                  //@7
                            ctx.fireChannelRead(out.get(i));
                        }
                        out.recycle();                                        //@8
                    }
                } else {
                    ctx.fireChannelRead(msg);
                }
            }

代码@1:如果消息是字节流(ByteBuf),则进行解码,否则直接转发给下游Handler处理。

代码@2:新建一个List,每一个元素代表解码后的一条完整的客户端请求,在Netty 5的实现中,ChannelHandler的执行是线程安全的,因为每一个Channel相关事件的执行都在与Channel绑定的线程执行器(EventLoop)中执行。由于这里使用了线程本地对象池(Recycler),ArrayList是可以重复使用的。能在这里使用线程本地池,为了线程安全,线程池的的线程个数应该是1个,在这里,我觉得非常有必要再深究一下Netty线程模型关于执行相关的具体细节,故该篇文章讲解完后,会进一步回到前面的Netty线程模型篇,细化一下SingleThreadEventExecutor的执行逻辑。

代码@3、4:如果当前的累积区为空,说明是初次解码,直接设置累积区为本次读入的字节。否则,将读入的字节添加到累积区。合并累积区已在上文中讲解。

代码@5:解码器具体逻辑实现,稍后会详解。

代码@6:如果累积缓存区不为空并且不可读,释放该累积缓存区。

代码@7:将解码后的一条一条客户端请求(消息)转发给下游Handler进行处理。

代码@8:将资源回收,放入对象池,其实这里有资源泄露的可能。

接下来,重点看callDecode方法:

        /**
             * Called once data should be decoded from the given {@link ByteBuf}. This method will call
             * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place.
             *
             * @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
             * @param in            the {@link ByteBuf} from which to read data
             * @param out           the {@link List} to which decoded messages should be added
             */
            protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
                try {
                    while (in.isReadable()) {        //@1
                        int outSize = out.size();    //@2
                        int oldInputLength = in.readableBytes(); //@3
                        decode(ctx, in, out);                                 //@4
    
                        // Check if this handler was removed before continuing the loop.
                        // If it was removed, it is not safe to continue to operate on the buffer.
                        //
                        // See https://github.com/netty/netty/issues/1664
                        if (ctx.isRemoved()) {
                            break;
                        }
    
                        if (outSize == out.size()) {                                       //@5
                            if (oldInputLength == in.readableBytes()) {
                                break;
                            } else {
                                continue;
                            }
                        }
    
                        if (oldInputLength == in.readableBytes()) {           //@6
                            throw new DecoderException(
                                    StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                        }
    
                        if (isSingleDecode()) {
                            break;
                        }
                    }
                } catch (DecoderException e) {
                    throw e;
                } catch (Throwable cause) {
                    throw new DecoderException(cause);
                }
            }

参数详解:

  • ChannelHandlerContext ctx 执行上下文。
  • ByteBuf in 当前累积的缓存区。
  • out 解码出消息的集合。

代码@1:如果当前累积区可读。

代码@2:当前解码后的消息条数,该值主要是判断本次解码,是否成功解码到消息。

代码@3:当前累积缓存区当前可读字节数。同样是用于判断是否成功解码。

代码@4:执行具体的解码实现(也成为编码解码协议的具体实现),该方法在具体的子类中实现。从这里也可以看出,整个ByteToMessageDecoder的设计,使用了模板模式。

代码@5:如果没有解码出消息(累积缓存区中的内容没有包含一个完整的请求信息,并且累积缓存区的可读数据没有发生变化,则结束本次解码。

代码@6:如果解码出消息,但是累积缓存区的可读数据没有发生变化,则抛出异常。

从代码@5,@6:可以得出如下重要结论,并且在我们实现自己的解码器时要特别注意:

  1. 如果没有成功从本次累积缓存区解码出需要的消息,则不能修改累积缓存区的readerIndex,writerIndex。
  2. 如果解码出合适的消息,则readerIndex,writerIndex要修改成已解析的字节的位置。

2.2.3 通道非激活事件实现 channelInactive

        @Override
            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                RecyclableArrayList out = RecyclableArrayList.newInstance();
                try {
                    if (cumulation != null) {
                        callDecode(ctx, cumulation, out);
                        decodeLast(ctx, cumulation, out);
                    } else {
                        decodeLast(ctx, Unpooled.EMPTY_BUFFER, out);
                    }
                } catch (DecoderException e) {
                    throw e;
                } catch (Exception e) {
                    throw new DecoderException(e);
                } finally {
                    try {
                        if (cumulation != null) {
                            cumulation.release();
                            cumulation = null;
                        }
                        int size = out.size();
                        for (int i = 0; i < size; i++) {
                            ctx.fireChannelRead(out.get(i));
                        }
                        if (size > 0) {
                            // Something was read, call fireChannelReadComplete()
                            ctx.fireChannelReadComplete();
                        }
                        ctx.fireChannelInactive();
                    } finally {
                        // recycle in all cases
                        out.recycle();
                    }
                }
            }

channelInActivie,通道变为非激活时触发的事件,处理逻辑就是如果当前的累积区有数据,则需要将数据解码并发送给下游handler,处理通道读相关事件。这里对有调用一个新的方法,decodeLast,表示通道转为非激活状态最后一次解码,可以供子类去实现。

关于ByteToMessageDecoder的实现原理就分析到这了,ByteToMessageDecoder可以是说是Netty提供的一个标签解码器的模板(典型的模板模式),用户可以基于此模板定制自己的私有协议。为了更好定制自己的解码器,接下来将重点分析Netty提供的一些解码器的实现。

3、LineBasedFrameDecoder 解码器实现分析

LineBasedFrameDecoder是基于行分割符符合的解码器。(\n 或\r\n)。

源码分析LineBasedFrameDecoder的解码实现。

        /**
             * Create a frame out of the {@link ByteBuf} and return it.
             *
             * @param   ctx             the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
             * @param   buffer          the {@link ByteBuf} from which to read data
             * @return  frame           the {@link ByteBuf} which represent the frame or {@code null} if no frame could
             *                          be created.
             */
            protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
                final int eol = findEndOfLine(buffer);     //@1
                if (!discarding) {                                      //@2
                    if (eol >= 0) {                                      //@3
                        final ByteBuf frame;
                        final int length = eol - buffer.readerIndex();   //@4
                        final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;      //@5
    
                        if (length > maxLength) {     //@6
                            buffer.readerIndex(eol + delimLength);
                            fail(ctx, length);
                            return null;
                        }
    
                        if (stripDelimiter) { // @7
                            frame = buffer.readSlice(length);
                            buffer.skipBytes(delimLength);
                        } else {
                            frame = buffer.readSlice(length + delimLength);
                        }
    
                        return frame.retain();   //@8
                    } else {  //@9
                        final int length = buffer.readableBytes(); 
                        if (length > maxLength) {       // @10
                            discardedBytes = length;
                            buffer.readerIndex(buffer.writerIndex());
                            discarding = true;
                            if (failFast) {
                                fail(ctx, "over " + discardedBytes);
                            }
                        }
                        return null;
                    }
                } else {
                    if (eol >= 0) {
                        final int length = discardedBytes + eol - buffer.readerIndex();
                        final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
                        buffer.readerIndex(eol + delimLength);
                        discardedBytes = 0;
                        discarding = false;
                        if (!failFast) {
                            fail(ctx, length);
                        }
                    } else {
                        discardedBytes = buffer.readableBytes();
                        buffer.readerIndex(buffer.writerIndex());
                    }
                    return null;
                }
            }

代码@1:从累积缓存区中,试图找到行结束符合(\r\n),具体实现再看。

代码@2:是否丢弃了一部分数据,当解码后的数据长度超过帧允许的最大长度时(maxLength)时,将丢弃整个累积缓存区。

代码@3:如果找到一个行结束标记,说明该累积缓存区中至少有一个完整的帧(请求信息),进入解码处理逻辑。

代码@4:计算该帧(请求信息)的长度,用eof减去当前累积区域的rederIndex即可。

代码@5:计算分割符所占用的字节长度,如果为\r\n则为两个字节,如果\n则表示1个字节。

代码@6:如果帧长度超过最大允许的长度,将累积缓存区的readerIndex设置为eof加上分隔符的长度,以便下次解码。同时触发exceptionCaught事件。

代码@7:根据是剥离分割符(剥离的话,就是该帧数据不会包含分割符合),从累积缓存区中读取一帧数据,使用的方式是 readSlice方法,共用累积缓存区的数据

代码@8:将解码处理的消息,引用加1,并返回处理,待交给下游Handler进一步处理。

代码@9,10:如果没有找到分割符,并且长度已经超过了maxLength的话,直接将该部分丢弃。

4、编码器实现原理(MessageToByteEncoder)

首先上文中提到了Netty解码器的实现原理,主要解决的问题是TCP的粘包,就是从请求流中解析出一个一个的客户端请求。解码器的职责是面向输入的,解析请求的(输入流)。而编码器,是面向响应的,将响应信息按照相关约定进行组织,方便接收端解析请求。上篇提到一个解码器(LineBasedFrameDecoder,请求信息以\n或\r\n),那是需要一个LineBasedFreameEncoder呢?答案是否定的,因为如果使用LineBasedFrameDecoder解码器解码请求信息的时候,与此响应报文中,肯定会以\n或\r\n结束,否则编码器将无法发挥作用,这也是编码器(响应流)会根据约定进行组织响应报文的原理。编码器的左右主要是实现自定义协议时需要用到的,重点实现ecode方法:下文给出MessageToByteEncoder的源码,由于实现原理简单,就不做过多讲解:

        package io.netty.handler.codec;
    
        import io.netty.buffer.ByteBuf;
        import io.netty.buffer.Unpooled;
        import io.netty.channel.ChannelHandler;
        import io.netty.channel.ChannelHandlerAdapter;
        import io.netty.channel.ChannelHandlerContext;
        import io.netty.channel.ChannelPipeline;
        import io.netty.channel.ChannelPromise;
        import io.netty.util.ReferenceCountUtil;
        import io.netty.util.internal.TypeParameterMatcher;
    
        /**
         * {@link ChannelHandlerAdapter} which encodes message in a stream-like fashion from one message to an
         * {@link ByteBuf}.
         *
         *
         * Example implementation which encodes {@link Integer}s to a {@link ByteBuf}.
         *
         * <pre>
         *     public class IntegerEncoder extends {@link MessageToByteEncoder}<{@link Integer}> {
         *         {@code @Override}
         *         public void encode({@link ChannelHandlerContext} ctx, {@link Integer} msg, {@link ByteBuf} out)
         *                 throws {@link Exception} {
         *             out.writeInt(msg);
         *         }
         *     }
         * </pre>
         */
        public abstract class MessageToByteEncoder<I> extends ChannelHandlerAdapter {
    
            private final TypeParameterMatcher matcher;
            private final boolean preferDirect;
    
            /**
             * @see {@link #MessageToByteEncoder(boolean)} with {@code true} as boolean parameter.
             */
            protected MessageToByteEncoder() {
                this(true);
            }
    
            /**
             * @see {@link #MessageToByteEncoder(Class, boolean)} with {@code true} as boolean value.
             */
            protected MessageToByteEncoder(Class<? extends I> outboundMessageType) {
                this(outboundMessageType, true);
            }
    
            /**
             * Create a new instance which will try to detect the types to match out of the type parameter of the class.
             *
             * @param preferDirect          {@code true} if a direct {@link ByteBuf} should be tried to be used as target for
             *                              the encoded messages. If {@code false} is used it will allocate a heap
             *                              {@link ByteBuf}, which is backed by an byte array.
             */
            protected MessageToByteEncoder(boolean preferDirect) {
                matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
                this.preferDirect = preferDirect;
            }
    
            /**
             * Create a new instance
             *
             * @param outboundMessageType   The tpye of messages to match
             * @param preferDirect          {@code true} if a direct {@link ByteBuf} should be tried to be used as target for
             *                              the encoded messages. If {@code false} is used it will allocate a heap
             *                              {@link ByteBuf}, which is backed by an byte array.
             */
            protected MessageToByteEncoder(Class<? extends I> outboundMessageType, boolean preferDirect) {
                matcher = TypeParameterMatcher.get(outboundMessageType);
                this.preferDirect = preferDirect;
            }
    
            /**
             * Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
             * {@link ChannelHandler} in the {@link ChannelPipeline}.
             */
            public boolean acceptOutboundMessage(Object msg) throws Exception {
                return matcher.match(msg);
            }
    
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                ByteBuf buf = null;
                try {
                    if (acceptOutboundMessage(msg)) {
                        @SuppressWarnings("unchecked")
                        I cast = (I) msg;
                        buf = allocateBuffer(ctx, cast, preferDirect);
                        try {
                            encode(ctx, cast, buf);
                        } finally {
                            ReferenceCountUtil.release(cast);
                        }
    
                        if (buf.isReadable()) {
                            ctx.write(buf, promise);
                        } else {
                            buf.release();
                            ctx.write(Unpooled.EMPTY_BUFFER, promise);
                        }
                        buf = null;
                    } else {
                        ctx.write(msg, promise);
                    }
                } catch (EncoderException e) {
                    throw e;
                } catch (Throwable e) {
                    throw new EncoderException(e);
                } finally {
                    if (buf != null) {
                        buf.release();
                    }
                }
            }
    
            /**
             * Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}.
             * Sub-classes may override this method to returna {@link ByteBuf} with a perfect matching {@code initialCapacity}.
             */
            protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
                                       boolean preferDirect) throws Exception {
                if (preferDirect) {
                    return ctx.alloc().ioBuffer();
                } else {
                    return ctx.alloc().heapBuffer();
                }
            }
    
            /**
             * Encode a message into a {@link ByteBuf}. This method will be called for each written message that can be handled
             * by this encoder.
             *
             * @param ctx           the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to
             * @param msg           the message to encode
             * @param out           the {@link ByteBuf} into which the encoded message will be written
             * @throws Exception    is thrown if an error accour
             */
            protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
        }

总结:

由于使用非阻塞IO与流式请求,,IO的一次读写无法保证是一个完整的请求。【场景,客户端用一条通道发送了3个请求信息,在调用多次读API后,服务端只能保证整个请求序列是正确的,但无法保证单次读就能刚好是一个请求序列。比如 如下3个请求 【ABC】 [CD] [EFG] 】 服务端第一次读,可能是[AB] 第二次读[CDEFG]。所以需要将请求信息进行解码,如果不足一个请求,则需要累积请求序列,每读一次,就将新读到的字节序列与原来累积的序列进行合并后再尝试解析。故ByteToMessageDecoder应用而生,该类使用模板模式进行代码设计,累积缓存区的操作是亮点。


来源:https://blog.csdn.net/prestigeding/article/details/53977445

点赞(0)
版权归原创作者所有,任何形式转载请联系作者; Java 技术驿站 >> 源码分析Netty解码编码器实现原理
上一篇
源码分析ChannelPipeline实现原理
下一篇
高仿Dubbo服务调用模型、私有协议实现、编码解码器使用实践