找回密码
 立即注册
首页 业界区 业界 从零开始实现简易版Netty(七) MyNetty 实现Normal规格的 ...

从零开始实现简易版Netty(七) MyNetty 实现Normal规格的池化内存分配

峰邑 2025-10-1 13:38:57
从零开始实现简易版Netty(七) MyNetty 实现Normal规格的池化内存分配

1. Netty池化内存分配介绍

在上一篇博客中,lab6版本的MyNetty中实现了一个简易的非池化ByteBuf容器,池化内存分配是netty中非常核心也非常复杂的一个功能,没法在一次迭代中完整的实现,MyNetty打算分为4个迭代逐步的将其实现。
按照计划,lab7版本的MyNetty需要实现Normal规格的内存分配。
由于本文属于系列博客,读者需要对之前的博客内容有所了解才能更好地理解本文内容。

  • lab1版本博客:从零开始实现简易版Netty(一) MyNetty Reactor模式
  • lab2版本博客:从零开始实现简易版Netty(二) MyNetty pipeline流水线
  • lab3版本博客:从零开始实现简易版Netty(三) MyNetty 高效的数据读取实现
  • lab4版本博客:从零开始实现简易版Netty(四) MyNetty 高效的数据写出实现
  • lab5版本博客:从零开始实现简易版Netty(五) MyNetty FastThreadLocal实现
  • lab6版本博客:从零开始实现简易版Netty(六) MyNetty ByteBuf实现


在上一篇博客中我们提到,Netty的ByteBuf容器相比jdk的ByteBuffer的最大的一个优势就是实现了池化功能。通过对ByteBuf容器的池化,大幅提高了需要大量创建并回收ByteBuf容器的场景下的性能。
在Netty中对于ByteBuf的池化分为两部分,一个是ByteBuf对象本身的池化,另一个则是对ByteBuf背后底层数组所使用内存的池化
对于ByteBuf对象的池化,Netty中实现了一个简单的对象池(Recycler类)以支持并发的对象创建与回收,相对简单。而对ByteBuf底层数组使用内存的池化设计则很大程度上参考了jemalloc这一适用于多处理器操作系统内核的内存分配器。
关于jemalloc的工作原理,官方有两篇经典论文,分别是写于2006年的A Scalable Concurrent malloc(3) Implementation for FreeBSD与2011年的scalable-memory-allocation-using-jemalloc。
其中2011年的论文中提到,相比起2006年最初版本的jemalloc,新版本的jemalloc做了进一步的优化,大幅提升了jemalloc的性能与易用性。强烈建议读者在开始着手研究Netty池化内存的工作原理细节之前,先阅读这两篇高屋建瓴的关于jemalloc的论文。掌握了核心的设计理念后,再去看细节能起到事半功倍的作用。
在后续介绍Netty的实现细节时,我也会结合这两篇论文中的内容来进行讲解,个人认为背后的设计理念与实现的细节同样重要,应该争取做到知其然而知其所以然。
2. ByteBuf对象池的实现解析

2.1 池化的ByteBuf分配器(PooledByteBufAllocator)

为了能够让使用者灵活的控制所创建的ByteBuf是否需要池化,Netty抽象出了ByteBufAllocator这一接口。使用者可以通过其对应的接口来创建ByteBuf,而不用特别关心其底层的实现。
MyNetty池化的ByteBuf分配器实现


  • 从下面展示的MyPooledByteBufAllocator的源码中可以看到,用于分配池化ByteBuf的功能主要集中在子类的newHeapBuffer方法中。
    该方法中为当前线程分配一个固定的PoolArena,再通过选中的PoolArena去进行实际的分配。
  • 熟悉前面提到的jemalloc论文的读者可以看到,Netty中的PoolArena其实就对应着jemalloc中反复提到的Arena的概念。
    "Application threads are assigned arenas in round-robin fashion upon first allocating a small/large object. Arenas are completely independent of each other. They maintain their own chunks, from which they carve page runs for small/large objects."
    "应用线程在首次分配small或large对象时,使用round-robin轮训为其分配一个arena。不同的Arena彼此之间完全独立。Arena维护独属于它自己的Chunk集合,从中切割出连续的页段用于分配small或large对象。"
  • Netty参考jemalloc也实现了Arena与线程的绑定关系,并且通过FastThreadLocal实现了ByteBuf线程缓存的能力。
    因为Arena与线程是一对多的关系,通过Arena来分配池化内存,必然会因为要变更Arena的内部元数据(trace metadata)而需要加锁防并发。通过线程级别的池化能力,可以以略微增加内存碎片为代价,减少同步竞争而大幅增加池化内存分配的吞吐量。
    "The main goal of thread caches is to reduce the volume of synchronization events."
    "引入线程缓存的主要目的是减少同步事件的量。"
  • MyNetty中分为3个迭代来完成池化ByteBuf的功能,线程缓存的功能被放在了最后一个迭代,也就是lab9中去实现。
    在本篇博客(lab7),MyNetty聚焦于PoolArena的内部实现,所以直接简单的设置所有线程共用同一个PoolArena。并且与lab6一致,简单起见只实现了HeapByteBuf相关的池化,不考虑DirectByteBuf相关的功能。

  1. public abstract class MyAbstractByteBufAllocator implements MyByteBufAllocator{
  2.     // 。。。 已省略无关逻辑
  3.    
  4.     static final int DEFAULT_INIT_CAPACITY = 256;
  5.     static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;
  6.     static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page
  7.     @Override
  8.     public MyByteBuf heapBuffer() {
  9.         // 以默认参数值创建一个heapBuffer
  10.         return newHeapBuffer(DEFAULT_INIT_CAPACITY,DEFAULT_MAX_CAPACITY);
  11.     }
  12.     @Override
  13.     public MyByteBuf heapBuffer(int initialCapacity) {
  14.         return heapBuffer(initialCapacity,DEFAULT_MAX_CAPACITY);
  15.     }
  16.     @Override
  17.     public MyByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
  18.         // 简单起见,不实现netty里空buf优化
  19.         // capacity参数校验
  20.         validate(initialCapacity, maxCapacity);
  21.         return newHeapBuffer(initialCapacity,maxCapacity);
  22.     }
  23. }
复制代码
  1. public class MyPooledByteBufAllocator extends MyAbstractByteBufAllocator{
  2.     private final MyPoolArena<byte[]>[] heapArenas;
  3.     public MyPooledByteBufAllocator() {
  4.         // 简单起见,Arena数量写死为1方便测试,后续支持与线程绑定后再拓展为与处理器数量挂钩
  5.         int arenasNum = 1;
  6.         // 初始化好heapArena数组
  7.         heapArenas = new MyPoolArena.HeapArena[arenasNum];
  8.         for (int i = 0; i < heapArenas.length; i ++) {
  9.             MyPoolArena.HeapArena arena = new MyPoolArena.HeapArena(this);
  10.             heapArenas[i] = arena;
  11.         }
  12.     }
  13.     @Override
  14.     protected MyByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
  15.         // 简单起见,Arena数量写死为1方便测试,后续支持与线程绑定后再拓展为与处理器数量挂钩
  16.         MyPoolArena<byte[]> targetArena = heapArenas[0];
  17.         return targetArena.allocate(initialCapacity, maxCapacity);
  18.     }
  19. }
复制代码
2.2 从对象池中获取ByteBuf对象


  • PoolArena中生成可用PooledByteBuf的方法中,只做了两件事情。
    首先是从ByteBuf的对象池中获得一个ByteBuf对象,然后再为这个对象的底层数组分配相匹配的内存。本小节,我们主要探讨前一个操作。
  • 获取ByteBuf对象逻辑是在PooledHeapByteBuf.newInstance方法中,通过一个全局的ObjectPool对象池来获得(RECYCLER.get())。
    获取到一个可用的ByteBuf对象后,通过PooledHeapByteBuf的reuse方法,将自身内部的读写指针等内部属性重新初始化一遍,避免被之前的数据污染。
  1. public class MyPoolArena{
  2.     // 。。。 已省略无关逻辑
  3.     /**
  4.      * 从当前PoolArena中申请分配内存,并将其包装成一个PooledByteBuf返回
  5.      * */
  6.     MyPooledByteBuf<T> allocate(int reqCapacity, int maxCapacity) {
  7.         // 从对象池中获取缓存的PooledByteBuf对象
  8.         MyPooledByteBuf<T> buf = newByteBuf(maxCapacity);
  9.         // 为其分配底层数组对应的内存
  10.         allocate(buf, reqCapacity);
  11.         return buf;
  12.     }
  13.     public static final class HeapArena extends MyPoolArena<byte[]>{
  14.         @Override
  15.         protected MyPooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
  16.             return MyPooledHeapByteBuf.newInstance(maxCapacity);
  17.         }
  18.     }
  19. }
复制代码
[code]public class MyPooledHeapByteBuf extends MyPooledByteBuf{    // 。。。 已省略无关逻辑        private static final MyObjectPool RECYCLER = MyObjectPool.newPool(        new MyObjectPool.ObjectCreator() {            @Override            public MyPooledHeapByteBuf newObject(MyObjectPool.Handle handle) {                return new MyPooledHeapByteBuf(handle, 0);            }        });    MyPooledHeapByteBuf(MyObjectPool.Handle

相关推荐

您需要登录后才可以回帖 登录 | 立即注册