峰邑 发表于 2025-10-1 13:38:57

从零开始实现简易版Netty(七) MyNetty 实现Normal规格的池化内存分配

从零开始实现简易版Netty(七) MyNetty 实现Normal规格的池化内存分配

1. Netty池化内存分配介绍

在上一篇博客中,lab6版本的MyNetty中实现了一个简易的非池化ByteBuf容器,池化内存分配是netty中非常核心也非常复杂的一个功能,没法在一次迭代中完整的实现,MyNetty打算分为4个迭代逐步的将其实现。
按照计划,lab7版本的MyNetty需要实现Normal规格的内存分配。
由于本文属于系列博客,读者需要对之前的博客内容有所了解才能更好地理解本文内容。

[*]lab1版本博客:从零开始实现简易版Netty(一) MyNetty Reactor模式
[*]lab2版本博客:从零开始实现简易版Netty(二) MyNetty pipeline流水线
[*]lab3版本博客:从零开始实现简易版Netty(三) MyNetty 高效的数据读取实现
[*]lab4版本博客:从零开始实现简易版Netty(四) MyNetty 高效的数据写出实现
[*]lab5版本博客:从零开始实现简易版Netty(五) MyNetty FastThreadLocal实现
[*]lab6版本博客:从零开始实现简易版Netty(六) MyNetty ByteBuf实现


在上一篇博客中我们提到,Netty的ByteBuf容器相比jdk的ByteBuffer的最大的一个优势就是实现了池化功能。通过对ByteBuf容器的池化,大幅提高了需要大量创建并回收ByteBuf容器的场景下的性能。
在Netty中对于ByteBuf的池化分为两部分,一个是ByteBuf对象本身的池化,另一个则是对ByteBuf背后底层数组所使用内存的池化。
对于ByteBuf对象的池化,Netty中实现了一个简单的对象池(Recycler类)以支持并发的对象创建与回收,相对简单。而对ByteBuf底层数组使用内存的池化设计则很大程度上参考了jemalloc这一适用于多处理器操作系统内核的内存分配器。
关于jemalloc的工作原理,官方有两篇经典论文,分别是写于2006年的A Scalable Concurrent malloc(3) Implementation for FreeBSD与2011年的scalable-memory-allocation-using-jemalloc。
其中2011年的论文中提到,相比起2006年最初版本的jemalloc,新版本的jemalloc做了进一步的优化,大幅提升了jemalloc的性能与易用性。强烈建议读者在开始着手研究Netty池化内存的工作原理细节之前,先阅读这两篇高屋建瓴的关于jemalloc的论文。掌握了核心的设计理念后,再去看细节能起到事半功倍的作用。
在后续介绍Netty的实现细节时,我也会结合这两篇论文中的内容来进行讲解,个人认为背后的设计理念与实现的细节同样重要,应该争取做到知其然而知其所以然。
2. ByteBuf对象池的实现解析

2.1 池化的ByteBuf分配器(PooledByteBufAllocator)

为了能够让使用者灵活的控制所创建的ByteBuf是否需要池化,Netty抽象出了ByteBufAllocator这一接口。使用者可以通过其对应的接口来创建ByteBuf,而不用特别关心其底层的实现。
MyNetty池化的ByteBuf分配器实现


[*]从下面展示的MyPooledByteBufAllocator的源码中可以看到,用于分配池化ByteBuf的功能主要集中在子类的newHeapBuffer方法中。
该方法中为当前线程分配一个固定的PoolArena,再通过选中的PoolArena去进行实际的分配。
[*]熟悉前面提到的jemalloc论文的读者可以看到,Netty中的PoolArena其实就对应着jemalloc中反复提到的Arena的概念。
"Application threads are assigned arenas in round-robin fashion upon first allocating a small/large object. Arenas are completely independent of each other. They maintain their own chunks, from which they carve page runs for small/large objects."
"应用线程在首次分配small或large对象时,使用round-robin轮训为其分配一个arena。不同的Arena彼此之间完全独立。Arena维护独属于它自己的Chunk集合,从中切割出连续的页段用于分配small或large对象。"
[*]Netty参考jemalloc也实现了Arena与线程的绑定关系,并且通过FastThreadLocal实现了ByteBuf线程缓存的能力。
因为Arena与线程是一对多的关系,通过Arena来分配池化内存,必然会因为要变更Arena的内部元数据(trace metadata)而需要加锁防并发。通过线程级别的池化能力,可以以略微增加内存碎片为代价,减少同步竞争而大幅增加池化内存分配的吞吐量。
"The main goal of thread caches is to reduce the volume of synchronization events."
"引入线程缓存的主要目的是减少同步事件的量。"
[*]MyNetty中分为3个迭代来完成池化ByteBuf的功能,线程缓存的功能被放在了最后一个迭代,也就是lab9中去实现。
在本篇博客(lab7),MyNetty聚焦于PoolArena的内部实现,所以直接简单的设置所有线程共用同一个PoolArena。并且与lab6一致,简单起见只实现了HeapByteBuf相关的池化,不考虑DirectByteBuf相关的功能。


public abstract class MyAbstractByteBufAllocator implements MyByteBufAllocator{
    // 。。。 已省略无关逻辑
   
    static final int DEFAULT_INIT_CAPACITY = 256;
    static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;

    static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page

    @Override
    public MyByteBuf heapBuffer() {
      // 以默认参数值创建一个heapBuffer
      return newHeapBuffer(DEFAULT_INIT_CAPACITY,DEFAULT_MAX_CAPACITY);
    }

    @Override
    public MyByteBuf heapBuffer(int initialCapacity) {
      return heapBuffer(initialCapacity,DEFAULT_MAX_CAPACITY);
    }

    @Override
    public MyByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
      // 简单起见,不实现netty里空buf优化

      // capacity参数校验
      validate(initialCapacity, maxCapacity);

      return newHeapBuffer(initialCapacity,maxCapacity);
    }
}public class MyPooledByteBufAllocator extends MyAbstractByteBufAllocator{

    private final MyPoolArena<byte[]>[] heapArenas;

    public MyPooledByteBufAllocator() {
      // 简单起见,Arena数量写死为1方便测试,后续支持与线程绑定后再拓展为与处理器数量挂钩
      int arenasNum = 1;

      // 初始化好heapArena数组
      heapArenas = new MyPoolArena.HeapArena;
      for (int i = 0; i < heapArenas.length; i ++) {
            MyPoolArena.HeapArena arena = new MyPoolArena.HeapArena(this);
            heapArenas = arena;
      }
    }

    @Override
    protected MyByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
      // 简单起见,Arena数量写死为1方便测试,后续支持与线程绑定后再拓展为与处理器数量挂钩
      MyPoolArena<byte[]> targetArena = heapArenas;

      return targetArena.allocate(initialCapacity, maxCapacity);
    }
}2.2 从对象池中获取ByteBuf对象


[*]PoolArena中生成可用PooledByteBuf的方法中,只做了两件事情。
首先是从ByteBuf的对象池中获得一个ByteBuf对象,然后再为这个对象的底层数组分配相匹配的内存。本小节,我们主要探讨前一个操作。
[*]获取ByteBuf对象逻辑是在PooledHeapByteBuf.newInstance方法中,通过一个全局的ObjectPool对象池来获得(RECYCLER.get())。
获取到一个可用的ByteBuf对象后,通过PooledHeapByteBuf的reuse方法,将自身内部的读写指针等内部属性重新初始化一遍,避免被之前的数据污染。
public class MyPoolArena{
    // 。。。 已省略无关逻辑
    /**
   * 从当前PoolArena中申请分配内存,并将其包装成一个PooledByteBuf返回
   * */
    MyPooledByteBuf<T> allocate(int reqCapacity, int maxCapacity) {
      // 从对象池中获取缓存的PooledByteBuf对象
      MyPooledByteBuf<T> buf = newByteBuf(maxCapacity);
      // 为其分配底层数组对应的内存
      allocate(buf, reqCapacity);
      return buf;
    }

    public static final class HeapArena extends MyPoolArena<byte[]>{

      @Override
      protected MyPooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
            return MyPooledHeapByteBuf.newInstance(maxCapacity);
      }
    }
}public class MyPooledHeapByteBuf extends MyPooledByteBuf{    // 。。。 已省略无关逻辑      private static final MyObjectPool RECYCLER = MyObjectPool.newPool(      new MyObjectPool.ObjectCreator() {            @Override            public MyPooledHeapByteBuf newObject(MyObjectPool.Handle handle) {                return new MyPooledHeapByteBuf(handle, 0);            }      });    MyPooledHeapByteBuf(MyObjectPool.Handle
页: [1]
查看完整版本: 从零开始实现简易版Netty(七) MyNetty 实现Normal规格的池化内存分配