2021-05-05 16:51  阅读(133)
文章分类:Netty 学习之旅 文章标签:NettyNetty 学习
©  原文作者:中间件兴趣圈 原文地址:https://blog.csdn.net/prestigeding/article/details/53977445

首先,我们再看一下 ByteBuf 的类设计图,从中更进一步了解ByteBuf。

202105051651183621.png

ByteBuf 继承自 ReferenceCounted,引用计数,也就是说 ByteBuf 的内存回收使用的是引用计数器来实现。

UnpooledHeapByteBuf 是非池化的堆内存实现,而 UnpooledDirectByteBuf 是非池化的堆外内存(直接内存)。非池化的ByteBuf 就是利用完之后就需要销毁,无法重用。

1、UnpooledHeapByteBuf 详解

其继承链:UnpooledHeapByteBuf --> AbstractReferenceCountedByteBuf --> AbstractByteBuf。

1.1 AbstractByteBuf 源码分析

AbstractByteBuf 定义 ByteBuf 的基本属性,诸如 readerIndex, writerIndex, markedReaderIndex, markedWriterIndex, maxCapacity, 我们知道 ByteBuf 的容量是可以自动扩容的。

AbstractByteBuf 的这两个属性,应该引起我们的注意:

  1. SwappedByteBuf swappedBuf
    这个是大端序列与小端序列的转换。
  2. ResourceLeakDetector leakDetector = new ResourceLeakDetector (ByteBuf.class);
    Netty 用来解决内存泄漏检测机制,下一篇会详细介绍。

这里截取一下 SwappedByteBuf 的源码,采用了典型的装饰模式来设计。

        public class SwappedByteBuf extends ByteBuf {
    
            private final ByteBuf buf;
    
            private final ByteOrder order;
    
            public SwappedByteBuf(ByteBuf buf) {
    
                if (buf == null) {
    
                    throw new NullPointerException("buf");
    
                }
    
                this.buf = buf;
    
                if (buf.order() == ByteOrder.BIG_ENDIAN) {
    
                    order = ByteOrder.LITTLE_ENDIAN;
    
                } else {
    
                    order = ByteOrder.BIG_ENDIAN;
    
                }
    
            }
    
            @Override
    
            public ByteOrder order() {
    
                return order;
    
            }
    
            @Override
    
            public ByteBuf order(ByteOrder endianness) {
    
                if (endianness == null) {
    
                    throw new NullPointerException("endianness");
    
                }
    
                if (endianness == order) {
    
                    return this;
    
                }
    
                return buf;
    
            }
    
        }

关于其他 AbstractByteBuf, 该类设计使用了典型的模板模式,对 ByteBuf 提供的类,实现时提供一种模板,然后再提供一个钩子方法,供子类实现,比如_getLong方法,_setLong等方法,由于该类的实现原理不复杂,就不做进一步的源码解读。

1.2 AbstractReferenceCountedByteBuf

该类主要是实现引用计算的常规方法,充分利用 voliate 内存可见性与 CAS 操作完成 refCnt 变量的维护。

其源码实现如下:

        package io.netty.buffer;
    
        import io.netty.util.IllegalReferenceCountException;
        import io.netty.util.internal.PlatformDependent;
    
        import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
    
        /**
         * Abstract base class for {@link ByteBuf} implementations that count references.
         */
        public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
    
            private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;
    
            static {
                AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =
                        PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
                if (updater == null) {
                    updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
                }
                refCntUpdater = updater;
            }
    
            private volatile int refCnt = 1;
    
            protected AbstractReferenceCountedByteBuf(int maxCapacity) {
                super(maxCapacity);
            }
    
            @Override
            public final int refCnt() {
                return refCnt;
            }
    
            /**
             * An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly
             */
            protected final void setRefCnt(int refCnt) {
                this.refCnt = refCnt;
            }
    
            @Override
            public ByteBuf retain() {
                for (;;) {
                    int refCnt = this.refCnt;
                    if (refCnt == 0) {
                        throw new IllegalReferenceCountException(0, 1);
                    }
                    if (refCnt == Integer.MAX_VALUE) {
                        throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
                    }
                    if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
                        break;
                    }
                }
                return this;
            }
    
            @Override
            public ByteBuf retain(int increment) {
                if (increment <= 0) {
                    throw new IllegalArgumentException("increment: " + increment + " (expected: > 0)");
                }
    
                for (;;) {
                    int refCnt = this.refCnt;
                    if (refCnt == 0) {
                        throw new IllegalReferenceCountException(0, increment);
                    }
                    if (refCnt > Integer.MAX_VALUE - increment) {
                        throw new IllegalReferenceCountException(refCnt, increment);
                    }
                    if (refCntUpdater.compareAndSet(this, refCnt, refCnt + increment)) {
                        break;
                    }
                }
                return this;
            }
    
            @Override
            public ByteBuf touch() {
                return this;
            }
    
            @Override
            public ByteBuf touch(Object hint) {
                return this;
            }
    
            @Override
            public final boolean release() {
                for (;;) {
                    int refCnt = this.refCnt;
                    if (refCnt == 0) {
                        throw new IllegalReferenceCountException(0, -1);
                    }
    
                    if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
                        if (refCnt == 1) {
                            deallocate();
                            return true;
                        }
                        return false;
                    }
                }
            }
    
            @Override
            public final boolean release(int decrement) {
                if (decrement <= 0) {
                    throw new IllegalArgumentException("decrement: " + decrement + " (expected: > 0)");
                }
    
                for (;;) {
                    int refCnt = this.refCnt;
                    if (refCnt < decrement) {
                        throw new IllegalReferenceCountException(refCnt, -decrement);
                    }
    
                    if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {
                        if (refCnt == decrement) {
                            deallocate();
                            return true;
                        }
                        return false;
                    }
                }
            }
    
            /**
             * Called once {@link #refCnt()} is equals 0.
             */
            protected abstract void deallocate();
        }

该类,我们只需要了解,当一个ByteBuf 被引用的次数为 0 时,dealocate() 方法将被调用,该方法就是具体回收 ByteBuf 的操作,由具体的子类去实现。

1.3 UnpooledHeapByteBuf 与 UnpooledDirectByteBuf

首先该类的内部结构如下:

202105051651185112.png

202105051651186123.png

对于非池化的 UnpooledByteBuf,内部就是使用 array 来存储数据,相对简单,所以源码分析,我还是侧重于UnpooledDirectByteBuf。重点关注如下两个方面:

  1. 容量的扩容
  2. 内存的分配

1.3.1 capacity(int newCapacity)

        public ByteBuf capacity(int newCapacity) {
                ensureAccessible();     // @1 
                if (newCapacity < 0 || newCapacity > maxCapacity()) {
                    throw new IllegalArgumentException("newCapacity: " + newCapacity);
                }
    
                int readerIndex = readerIndex();
                int writerIndex = writerIndex();
    
                int oldCapacity = capacity;
                if (newCapacity > oldCapacity) {   // @2
                    ByteBuffer oldBuffer = buffer;
                    ByteBuffer newBuffer = allocateDirect(newCapacity);  //@21
                    oldBuffer.position(0).limit(oldBuffer.capacity());          //@22
                    newBuffer.position(0).limit(oldBuffer.capacity());        //@23
                    newBuffer.put(oldBuffer);                                            //@24
                    newBuffer.clear();                                                         //@25
                    setByteBuffer(newBuffer);                                            //@26
                } else if (newCapacity < oldCapacity) { //@3
                    ByteBuffer oldBuffer = buffer;
                    ByteBuffer newBuffer = allocateDirect(newCapacity);
                    if (readerIndex < newCapacity) {
                        if (writerIndex > newCapacity) {
                            writerIndex(writerIndex = newCapacity);
                        }
                        oldBuffer.position(readerIndex).limit(writerIndex);
                        newBuffer.position(readerIndex).limit(writerIndex);
                        newBuffer.put(oldBuffer);
                        newBuffer.clear();
                    } else {
                        setIndex(newCapacity, newCapacity);
                    }
                    setByteBuffer(newBuffer);
                }
                return this;
            }

代码@1,检测一下访问性,可达性,就是引用数必须大于0,否则该 ByteBuf 的内部空间已经被回收了(堆外内存)。

代码@2,扩容操作,思路新建一个缓存区,然后将原先缓存区的数据全部写入到新的缓存区,然后释放旧的缓存区。

代码@21、22,申请一个直接缓存区,然后将原缓冲区的 postion 设置为0,将 limit 设置为 capacity, 处于释放状态(从缓存区读)。

代码@23,将新缓存区的 postion,limit 属性设置为0,老缓存区 limit。

代码@24,将原缓冲区写入到新的缓存区,然后将缓存区置的 position 设置为0,limt 设置为 capacity,其实这里设置position,capacity 的意义不大,因为 ByteBuf 并不会利用内部的 ByteBuffe r的 limit,postion 属性,而是使用readerIndex, wriateIndex。

代码@26,关联新的 ByteBuffer, 并释放原缓存区的空间。

代码@3,压缩缓存区。实现思路是新建一个缓存区,如果 readerIndex 大于新建的 ByteBuffer 的 capacity,则无需将旧的缓存区内容写入到新的缓存区中。如果 readerIndex 小于新 capacity,那需要将 readerIndex 至( Math.min(writerIndex, newCapacity) )直接的内容写入到新的缓存,然后释放旧的缓存区。

我们在重点关注一下 setByteBuffer(newBuffer) 方法,该方法还负责销毁原先的 ByteBuffer。

        private void setByteBuffer(ByteBuffer buffer) {
                ByteBuffer oldBuffer = this.buffer;
                if (oldBuffer != null) {
                    if (doNotFree) {
                        doNotFree = false;
                    } else {
                        freeDirect(oldBuffer);
                    }
                }
    
                this.buffer = buffer;
                tmpNioBuf = null;
                capacity = buffer.remaining();
            }

释放原先的内存。

1.3.2 内存分配

Netty 在为内存的分配,单独封装,相关类图:

202105051651187994.png

目前,先关注 UnpooledByteBufAllocator,对象池的ByteBuf在后续章节中重点关注。

结合原代码,有如下两个方法引起了我的注意:

  1. 容量扩容规则(容量增长规则)calculateNewCapacity 方法。
  2. 直接内存的分配。newDirectBuffer 方法。

1.3.2.1 calculateNewCapacity

        public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
                if (minNewCapacity < 0) {
                    throw new IllegalArgumentException("minNewCapacity: " + minNewCapacity + " (expectd: 0+)");
                }
                if (minNewCapacity > maxCapacity) {
                    throw new IllegalArgumentException(String.format(
                            "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                            minNewCapacity, maxCapacity));
                }
                final int threshold = 1048576 * 4; // 4 MiB page
    
                if (minNewCapacity == threshold) {
                    return threshold;
                }
    
                // If over threshold, do not double but just increase by threshold.
                if (minNewCapacity > threshold) {               //@1
                    int newCapacity = minNewCapacity / threshold * threshold;
                    if (newCapacity > maxCapacity - threshold) {
                        newCapacity = maxCapacity;
                    } else {
                        newCapacity += threshold;
                    }
                    return newCapacity;
                }
    
                // Not over threshold. Double up to 4 MiB, starting from 64.
                int newCapacity = 64;c   // @2
                while (newCapacity < minNewCapacity) {
                    newCapacity <<= 1;
                }
    
                return Math.min(newCapacity, maxCapacity);
            }
  • minNewCapacity:本次需要申请的最小内存。
  • macCapacity:最大总内存申请值。

代码@1,如果最小需要的内存超过设置的 threshold(阔值的话),则循环,每次增加threshold,然后看是否达到本次申请目标。

代码@2,如果需要申请的内存小于阔值,则以64个字节以2的幂增长。这里体现了内存扩容时的一个优化点。

1.3.2.2 newDirectBuffer 方法

        @Override
            protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
                ByteBuf buf;
                if (PlatformDependent.hasUnsafe()) {
                    buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
                } else {
                    buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
                }
    
                return toLeakAwareBuffer(buf);
            }

该方法中,除了见到申请一个直接内存外,还将该 buf 变成一个可感知的对象。toLeakAwareBuffer 方法,用于该对象被引用的情况,因为 UnpooledDirectByteBuf 是一个聚合对象,内部维护了一个 java.nio.ByteBuffer 的直接对外内存空间,在什么是释放UnpooledDirectByteBuf 中的堆外内存呢?在 UnpooledDirectByteBuf 被java垃圾回收的时候,应该于此同时需要释放指向的堆外内存,但堆外内存不受JVM GC的管理,所以我们只有感知到UnpooledDirectByteBuf被JVM虚拟机回收后,手动去释放堆外内存,大家想想都知道,我们可以通过JAVA提供的引用机制,来实现跟踪垃圾回收器的收集工作,虚引用的作用来了,下一篇,我将会以这个为入口点,重点分析 Netty 堆外内存如何管理,也就是内存泄露检测等方面的课题。


来源:https://blog.csdn.net/prestigeding/article/details/53977445

点赞(0)
版权归原创作者所有,任何形式转载请联系作者; Java 技术驿站 >> ByteBuf源码解读之初探UnpooledHeapByteBuf、UnpooledDirectByteBuf
上一篇
ByteBuf 篇之 ByteBuf 内部结构与 API 学习
下一篇
源码分析Netty内存泄漏检测