鸳剿 发表于 2025-6-3 14:07:30

Netty源码—7.ByteBuf原理二

大纲
9.Netty的内存规格
10.缓存数据结构
11.命中缓存的分配流程
12.Netty里有关内存分配的重要概念
13.Page级别的内存分配
14.SubPage级别的内存分配
15.ByteBuf的回收
 
9.Netty的内存规格
(1)4种内存规格
(2)内存申请单位
 
(1)4种内存规格
一.tiny:表示从0到512字节之间的内存大小
二.small:表示从512字节到8K范围的内存大小
三.normal:表示从8K到16M范围的内存大小
四.huge:表示大于16M的内存大小
 
(2)内存申请单位
Netty里所有的内存申请都是以Chunk为单位向操作系统申请的,后续所有的内存分配都是在这个Chunk里进行对应的操作。比如要分配1M的内存,那么首先要申请一个16M的Chunk,然后在这个16M的Chunk里取出一段1M的连续内存放入到Netty的ByteBuf里。
 
注意:一个Chunk的大小为16M,一个Page的大小为8K,一个SubPage的大小是0~8K,一个Chunk可以分成2048个Page。
 
10.缓存数据结构
(1)MemoryRegionCache的组成
(2)MemoryRegionCache的类型
(3)MemoryRegionCache的源码
 
(1)MemoryRegionCache的组成
Netty中与缓存相关的数据结构叫MemoryRegionCache,这是内存相关的一个缓存。MemoryRegionCache由三部分组成:queue、sizeClass、size。
 
一.queue
queue是一个队列,里面的每个元素都是MemoryRegionCache内部类Entry的一个实体,每一个Entry实体里都有一个chunk和一个handle。Netty里所有的内存都是以Chunk为单位进行分配的,而每一个handle都指向唯一一段连续的内存。所以一个chunk + 一个指向连续内存的handle,就能确定这块Entry的内存大小和内存位置,然后所有这些Entry组合起来就变成一个缓存的链。
 
二.sizeClass
sizeClass是Netty里的内存规格,其中有三种类型的内存规则。一种是tiny(0~512B),一种是small(512B~8K),一种是normal(8K~16M)。由于huge是直接使用非缓存的内存分配,所以不在该sizeClass范围内。
 
三.size
一个MemoryRegionCache所缓存的一个ByteBuf的大小是固定的。如果MemoryRegionCache里缓存了1K的ByteBuf,那么queue里所有的元素都是1K的ByteBuf。也就是说,同一个MemoryRegionCache它的queue里的所有元素都是固定大小的。这些固定大小分别有:tiny类型规则的是16B的整数倍直到498B,small类型规则的有512B、1K、2K、4K,normal类型规定的有8K、16K、32K。所以对于32K以上是不缓存的。
 
(2)MemoryRegionCache的类型
Netty里所有规格的MemoryRegionCache如下图示,下面的每个节点就相当于一个MemoryRegionCache的数据结构。
其中tiny类型的内存规格有32种,也就是32个节点,分别是16B、32B、48B、......、496B。这里面的每个节点都是一个MemoryRegionCache,每个MemoryRegionCache里都有一个queue。假设要分配一个16B的ByteBuf:首先会定位到small类型的内存规格里的第二个节点,然后从该节点维护的queue队列里取出一个Entry元素。通过该Entry元素可以拿到它属于哪一个chunk以及哪一个handle,从而进行内存划分。
 
small类型的内存规格有4种,也就是4个节点,分别是512B、1K、2K、4K。每个节点都是一个MemoryRegionCache,每个MemoryRegionCache里都有一个queue。假设要分配一个1K的ByteBuf:首先会定位到small类型的内存规格里的第二个节点,然后从该节点维护的queue里取出一个Entry元素。这样就可以基于这个Entry元素分配出1K内存的ByteBuf,不需要再去Chunk上找一段临时内存了。
 
normal类型的内存规格有3种,也就是3个节点,分别是8K、16K、32K,关于Normal大小的ByteBuf的内存分配也是同样道理。
 
(3)MemoryRegionCache的源码
每个线程都会有一个PoolThreadCache对象,每个PoolThreadCache对象都会有tiny、small、normal三种规格的缓存。每种规格又分heap和direct,所以每个PoolThreadCache对象会有6种缓存。PoolThreadCache类正是使用了6个MemoryRegionCache数组来维护这6种缓存。如:
 
数组tinySubPageHeapCaches拥有32个MemoryRegionCache元素,下标为n的元素用于缓存大小为n * 16B的ByteBuf。
 
数组smallSubPageHeapCaches拥有4个MemoryRegionCache元素,下标为n的元素用于缓存大小为2^n * 512B的ByteBuf。
 
数组normalHeapCaches拥有3个MemoryRegionCache元素,下标为n的元素用于缓存大小为2^n * 8K的ByteBuf。
 
数组tinySubPageHeapCaches里的每个MemoryRegionCache元素,最多可以缓存tinyCacheSize个即512个ByteBuf。
 
数组smallSubPageHeapCaches里的每个MemoryRegionCache元素,最多可以缓存smallCacheSize个即256个ByteBuf。
 
数组normalHeapCaches里的每个MemoryRegionCache元素,最多可以缓存normalCacheSize个即64个ByteBuf。
final class PoolThreadCache {
    //真正要分配的内存其实就是byte[] 或者 ByteBuffer,所以实际的分配就是得到一个数值handle进行定位
    final PoolArena<byte[]> heapArena;
    final PoolArena<ByteBuffer> directArena;

    //Hold the caches for the different size classes, which are tiny, small and normal.
    //有32个MemoryRegionCache元素,分别存放16B、32B、48B、...、480B、496B的SubPage级别的内存
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    //有4个MemoryRegionCache元素,分别存放512B、1K、2K、4K的SubPage级别的内存
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    //有3个MemoryRegionCache元素,分别存放8K、16K、32K的Page级别的内存
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
   
    PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
            int tinyCacheSize, int smallCacheSize, int normalCacheSize,
            int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
      ...
      this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
      this.heapArena = heapArena;
      this.directArena = directArena;
      if (directArena != null) {
            tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
            numShiftsNormalDirect = log2(directArena.pageSize);
            normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);
            directArena.numThreadCaches.getAndIncrement();
      } else {
            //No directArea is configured so just null out all caches
            tinySubPageDirectCaches = null;
            smallSubPageDirectCaches = null;
            normalDirectCaches = null;
            numShiftsNormalDirect = -1;
      }
      if (heapArena != null) {
            //Create the caches for the heap allocations
            tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
            numShiftsNormalHeap = log2(heapArena.pageSize);
            normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);
            heapArena.numThreadCaches.getAndIncrement();
      } else {
            //No heapArea is configured so just null out all caches
            tinySubPageHeapCaches = null;
            smallSubPageHeapCaches = null;
            normalHeapCaches = null;
            numShiftsNormalHeap = -1;
      }
      //The thread-local cache will keep a list of pooled buffers which must be returned to the pool when the thread is not alive anymore.
      ThreadDeathWatcher.watch(thread, freeTask);
    }
   
    private static <T> MemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass) {
      if (cacheSize > 0) {
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache;
            for (int i = 0; i < cache.length; i++) {
                cache = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
            }
            return cache;
      } else {
            return null;
      }
    }

    private static <T> MemoryRegionCache<T>[] createNormalCaches(int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
      if (cacheSize > 0) {
            int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
            int arraySize = Math.max(1, log2(max / area.pageSize) + 1);

            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache;
            for (int i = 0; i < cache.length; i++) {
                cache = new NormalMemoryRegionCache<T>(cacheSize);
            }
            return cache;
      } else {
            return null;
      }
    }
   
    private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
      SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
            super(size, sizeClass);
      }
      ...
    }

    private static int log2(int val) {
      int res = 0;
      while (val > 1) {
            val >>= 1;
            res++;
      }
      return res;
    }
   
    ...
   
    private abstract static class MemoryRegionCache<T> {
      private final int size;
      private final Queue<Entry<T>> queue;
      private final SizeClass sizeClass;

      MemoryRegionCache(int size, SizeClass sizeClass) {
            this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
            queue = PlatformDependent.newFixedMpscQueue(this.size);
            this.sizeClass = sizeClass;
      }
      ...
      
      static final class Entry<T> {
            final Handle<Entry<?>> recyclerHandle;
            PoolChunk<T> chunk;
            long handle = -1;

            Entry(Handle<Entry<?>> recyclerHandle) {
                this.recyclerHandle = recyclerHandle;
            }

            void recycle() {
                chunk = null;
                handle = -1;
                recyclerHandle.recycle(this);
            }
      }
    }
}

abstract class PoolArena<T> implements PoolArenaMetric {
    enum SizeClass {
      Tiny,
      Small,
      Normal
    }
    ...
}(3)标记连续内存的区段为未使用
标记方式会根据Page级别和SubPage级别进行标记,其中Page级别是根据二叉树来进行标记,SubPage级别是通过位图进行标记。
public class PooledByteBufAllocator extends AbstractByteBufAllocator {
    private final PoolThreadLocalCache threadCache;
    private final PoolArena<byte[]>[] heapArenas;//一个线程会和一个PoolArena绑定
    private final PoolArena<ByteBuffer>[] directArenas;//一个线程会和一个PoolArena绑定
    ...
    @Override
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
      PoolThreadCache cache = threadCache.get();
      PoolArena<byte[]> heapArena = cache.heapArena;
      ByteBuf buf;
      if (heapArena != null) {
            //分配堆内存
            buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
      } else {
            buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
      }
      return toLeakAwareBuffer(buf);
    }

    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
      PoolThreadCache cache = threadCache.get();
      PoolArena<ByteBuffer> directArena = cache.directArena;
      ByteBuf buf;
      if (directArena != null) {
            //分配直接内存
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
      } else {
            if (PlatformDependent.hasUnsafe()) {
                buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
            } else {
                buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
            }
      }
      return toLeakAwareBuffer(buf);
    }
    ...
}

abstract class PoolArena<T> implements PoolArenaMetric {
    ...
    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
      PooledByteBuf<T> buf = newByteBuf(maxCapacity);//创建ByteBuf对象
      allocate(cache, buf, reqCapacity);//基于PoolThreadCache对ByteBuf对象进行内存分配
      return buf;
    }
   
    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
      //1.根据reqCapacity进行分段规格化
      final int normCapacity = normalizeCapacity(reqCapacity);
      if (isTinyOrSmall(normCapacity)) {//capacity < pageSize,需要分配的内存小于8K
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) {//< 512
                //2.进行缓存分配
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                  //命中缓存,was able to allocate out of the cache so move on
                  return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                //2.进行缓存分配
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                  //命中缓存,was able to allocate out of the cache so move on
                  return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            final PoolSubpage<T> head = table;

            //Synchronize on the head.
            //This is needed as PoolChunk#allocateSubpage(int) and PoolChunk#free(long) may modify the doubly linked list as well.
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                  assert s.doNotDestroy && s.elemSize == normCapacity;
                  long handle = s.allocate();
                  assert handle >= 0;
                  s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                  if (tiny) {
                        allocationsTiny.increment();
                  } else {
                        allocationsSmall.increment();
                  }
                  return;
                }
            }
            //没有命中缓存
            allocateNormal(buf, reqCapacity, normCapacity);
            return;
      }
      if (normCapacity <= chunkSize) {//需要分配的内存大于8K,但小于16M
            //2.进行缓存分配
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                //命中缓存,was able to allocate out of the cache so move on
                return;
            }
            //没有命中缓存
            allocateNormal(buf, reqCapacity, normCapacity);
      } else {//需要分配的内存大于16M
            //Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
      }
    }
   
    //根据reqCapacity进行分段规格化
    int normalizeCapacity(int reqCapacity) {
      if (reqCapacity < 0) {
            throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
      }
      if (reqCapacity >= chunkSize) {
            return reqCapacity;
      }
      if (!isTiny(reqCapacity)) { // >= 512
            int normalizedCapacity = reqCapacity;
            normalizedCapacity --;
            normalizedCapacity |= normalizedCapacity >>>1;
            normalizedCapacity |= normalizedCapacity >>>2;
            normalizedCapacity |= normalizedCapacity >>>4;
            normalizedCapacity |= normalizedCapacity >>>8;
            normalizedCapacity |= normalizedCapacity >>> 16;
            normalizedCapacity ++;
            if (normalizedCapacity < 0) {
                normalizedCapacity >>>= 1;
            }
            return normalizedCapacity;
      }
      if ((reqCapacity & 15) == 0) {
            return reqCapacity;
      }
      return (reqCapacity & ~15) + 16;
    }
    ...
}

final class PoolThreadCache {
    final PoolArena<byte[]> heapArena;
    final PoolArena<ByteBuffer> directArena;

    //Hold the caches for the different size classes, which are tiny, small and normal.
    //有32个MemoryRegionCache元素,分别存放16B、32B、48B、...、480B、496B的SubPage级别的内存
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    //有4个MemoryRegionCache元素,分别存放512B、1K、2K、4K的SubPage级别的内存
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    //有3个MemoryRegionCache元素,分别存放8K、16K、32K的Page级别的内存
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
    ...
   
    //Try to allocate a tiny buffer out of the cache. Returns true if successful false otherwise
    boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
      //首先调用cacheForTiny()方法找到需要分配的size对应的MemoryRegionCache
      //然后调用allocate()方法基于MemoryRegionCache去给ByteBuf对象分配内存
      return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
    }
   
    //找到需要分配的size对应的MemoryRegionCache
    private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
      int idx = PoolArena.tinyIdx(normCapacity);
      if (area.isDirect()) {
            return cache(tinySubPageDirectCaches, idx);
      }
      return cache(tinySubPageHeapCaches, idx);
    }
   
    //根据索引去缓存数组中返回一个MemoryRegionCache元素
    private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
      if (cache == null || idx > cache.length - 1) {
            return null;
      }
      return cache;
    }
   
    //基于MemoryRegionCache去给ByteBuf对象分配内存
    private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
      if (cache == null) {
            return false;
      }
      //调用MemoryRegionCache的allocate()方法给buf分配大小为reqCapacity的一块内存
      boolean allocated = cache.allocate(buf, reqCapacity);
      if (++ allocations >= freeSweepAllocationThreshold) {
            allocations = 0;
            trim();
      }
      return allocated;
    }
    ...
    private abstract static class MemoryRegionCache<T> {
      private final int size;
      private final Queue<Entry<T>> queue;
      private final SizeClass sizeClass;
      private int allocations;
      ...
      //Allocate something out of the cache if possible and remove the entry from the cache.
      public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
            //步骤一:从queue队列中弹出一个Entry元素
            Entry<T> entry = queue.poll();
            if (entry == null) {
                return false;
            }
            //步骤二:初始化buf
            initBuf(entry.chunk, entry.handle, buf, reqCapacity);
            //步骤三:将弹出的Entry元素放入对象池中进行复用
            entry.recycle();

            //allocations is not thread-safe which is fine as this is only called from the same thread all time.
            ++ allocations;
            return true;
      }
   
      //Init the PooledByteBuf using the provided chunk and handle with the capacity restrictions.
      protected abstract void initBuf(PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity);
      
      static final class Entry<T> {
            final Handle<Entry<?>> recyclerHandle;
            PoolChunk<T> chunk;
            long handle = -1;

            Entry(Handle<Entry<?>> recyclerHandle) {
                this.recyclerHandle = recyclerHandle;
            }

            void recycle() {
                chunk = null;
                handle = -1;
                recyclerHandle.recycle(this);
            }
      }
    }
}(4)将ByteBuf对象添加到对象池
一开始时,对象池是没有PooledByteBuf对象的,当PooledByteBuf对象被释放时不会被立即销毁,而是会加入到对象池里。
 
这样当Netty每次去拿一个PooledByteBuf对象时,就可以先从对象池里获取,取出对象之后就可以进行内存分配以及初始化了。
 
考虑到PooledByteBuf对象会经常被申请和释放,如果QPS非常高,可能会产生很多PooledByteBuf对象,而且频繁创建和释放PooledByteBuf对象也会比较耗费资源和降低性能。
 
所以Netty便使用了对象池来减少GC:当申请PooledByteBuf对象时,就可以尽可能从对象池里去取。当释放PooledByteBuf对象时,则可以将对象添加到对象池,从而实现对象复用。
final class PoolThreadCache {
    final PoolArena<byte[]> heapArena;
    final PoolArena<ByteBuffer> directArena;

    //Hold the caches for the different size classes, which are tiny, small and normal.
    //有32个MemoryRegionCache元素,分别存放16B、32B、48B、...、480B、496B的SubPage级别的内存
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    //有4个MemoryRegionCache元素,分别存放512B、1K、2K、4K的SubPage级别的内存
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    //有3个MemoryRegionCache元素,分别存放8K、16K、32K的Page级别的内存
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
    ...


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Netty源码—7.ByteBuf原理二