从零开始实现简易版Netty(七) MyNetty 实现Normal规格的池化内存分配
1. Netty池化内存分配介绍
在上一篇博客中,lab6版本的MyNetty中实现了一个简易的非池化ByteBuf容器,池化内存分配是netty中非常核心也非常复杂的一个功能,没法在一次迭代中完整的实现,MyNetty打算分为4个迭代逐步的将其实现。
按照计划,lab7版本的MyNetty需要实现Normal规格的内存分配。
由于本文属于系列博客,读者需要对之前的博客内容有所了解才能更好地理解本文内容。
- lab1版本博客:从零开始实现简易版Netty(一) MyNetty Reactor模式
- lab2版本博客:从零开始实现简易版Netty(二) MyNetty pipeline流水线
- lab3版本博客:从零开始实现简易版Netty(三) MyNetty 高效的数据读取实现
- lab4版本博客:从零开始实现简易版Netty(四) MyNetty 高效的数据写出实现
- lab5版本博客:从零开始实现简易版Netty(五) MyNetty FastThreadLocal实现
- lab6版本博客:从零开始实现简易版Netty(六) MyNetty ByteBuf实现
在上一篇博客中我们提到,Netty的ByteBuf容器相比jdk的ByteBuffer的最大的一个优势就是实现了池化功能。通过对ByteBuf容器的池化,大幅提高了需要大量创建并回收ByteBuf容器的场景下的性能。
在Netty中对于ByteBuf的池化分为两部分,一个是ByteBuf对象本身的池化,另一个则是对ByteBuf背后底层数组所使用内存的池化。
对于ByteBuf对象的池化,Netty中实现了一个简单的对象池(Recycler类)以支持并发的对象创建与回收,相对简单。而对ByteBuf底层数组使用内存的池化设计则很大程度上参考了jemalloc这一适用于多处理器操作系统内核的内存分配器。
关于jemalloc的工作原理,官方有两篇经典论文,分别是写于2006年的A Scalable Concurrent malloc(3) Implementation for FreeBSD与2011年的scalable-memory-allocation-using-jemalloc。
其中2011年的论文中提到,相比起2006年最初版本的jemalloc,新版本的jemalloc做了进一步的优化,大幅提升了jemalloc的性能与易用性。强烈建议读者在开始着手研究Netty池化内存的工作原理细节之前,先阅读这两篇高屋建瓴的关于jemalloc的论文。掌握了核心的设计理念后,再去看细节能起到事半功倍的作用。
在后续介绍Netty的实现细节时,我也会结合这两篇论文中的内容来进行讲解,个人认为背后的设计理念与实现的细节同样重要,应该争取做到知其然而知其所以然。
2. ByteBuf对象池的实现解析
2.1 池化的ByteBuf分配器(PooledByteBufAllocator)
为了能够让使用者灵活的控制所创建的ByteBuf是否需要池化,Netty抽象出了ByteBufAllocator这一接口。使用者可以通过其对应的接口来创建ByteBuf,而不用特别关心其底层的实现。
MyNetty池化的ByteBuf分配器实现
- 从下面展示的MyPooledByteBufAllocator的源码中可以看到,用于分配池化ByteBuf的功能主要集中在子类的newHeapBuffer方法中。
该方法中为当前线程分配一个固定的PoolArena,再通过选中的PoolArena去进行实际的分配。
- 熟悉前面提到的jemalloc论文的读者可以看到,Netty中的PoolArena其实就对应着jemalloc中反复提到的Arena的概念。
"Application threads are assigned arenas in round-robin fashion upon first allocating a small/large object. Arenas are completely independent of each other. They maintain their own chunks, from which they carve page runs for small/large objects."
"应用线程在首次分配small或large对象时,使用round-robin轮训为其分配一个arena。不同的Arena彼此之间完全独立。Arena维护独属于它自己的Chunk集合,从中切割出连续的页段用于分配small或large对象。"
- Netty参考jemalloc也实现了Arena与线程的绑定关系,并且通过FastThreadLocal实现了ByteBuf线程缓存的能力。
因为Arena与线程是一对多的关系,通过Arena来分配池化内存,必然会因为要变更Arena的内部元数据(trace metadata)而需要加锁防并发。通过线程级别的池化能力,可以以略微增加内存碎片为代价,减少同步竞争而大幅增加池化内存分配的吞吐量。
"The main goal of thread caches is to reduce the volume of synchronization events."
"引入线程缓存的主要目的是减少同步事件的量。"
- MyNetty中分为3个迭代来完成池化ByteBuf的功能,线程缓存的功能被放在了最后一个迭代,也就是lab9中去实现。
在本篇博客(lab7),MyNetty聚焦于PoolArena的内部实现,所以直接简单的设置所有线程共用同一个PoolArena。并且与lab6一致,简单起见只实现了HeapByteBuf相关的池化,不考虑DirectByteBuf相关的功能。
- public abstract class MyAbstractByteBufAllocator implements MyByteBufAllocator{
- // 。。。 已省略无关逻辑
-
- static final int DEFAULT_INIT_CAPACITY = 256;
- static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;
- static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page
- @Override
- public MyByteBuf heapBuffer() {
- // 以默认参数值创建一个heapBuffer
- return newHeapBuffer(DEFAULT_INIT_CAPACITY,DEFAULT_MAX_CAPACITY);
- }
- @Override
- public MyByteBuf heapBuffer(int initialCapacity) {
- return heapBuffer(initialCapacity,DEFAULT_MAX_CAPACITY);
- }
- @Override
- public MyByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
- // 简单起见,不实现netty里空buf优化
- // capacity参数校验
- validate(initialCapacity, maxCapacity);
- return newHeapBuffer(initialCapacity,maxCapacity);
- }
- }
复制代码- public class MyPooledByteBufAllocator extends MyAbstractByteBufAllocator{
- private final MyPoolArena<byte[]>[] heapArenas;
- public MyPooledByteBufAllocator() {
- // 简单起见,Arena数量写死为1方便测试,后续支持与线程绑定后再拓展为与处理器数量挂钩
- int arenasNum = 1;
- // 初始化好heapArena数组
- heapArenas = new MyPoolArena.HeapArena[arenasNum];
- for (int i = 0; i < heapArenas.length; i ++) {
- MyPoolArena.HeapArena arena = new MyPoolArena.HeapArena(this);
- heapArenas[i] = arena;
- }
- }
- @Override
- protected MyByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
- // 简单起见,Arena数量写死为1方便测试,后续支持与线程绑定后再拓展为与处理器数量挂钩
- MyPoolArena<byte[]> targetArena = heapArenas[0];
- return targetArena.allocate(initialCapacity, maxCapacity);
- }
- }
复制代码 2.2 从对象池中获取ByteBuf对象
- PoolArena中生成可用PooledByteBuf的方法中,只做了两件事情。
首先是从ByteBuf的对象池中获得一个ByteBuf对象,然后再为这个对象的底层数组分配相匹配的内存。本小节,我们主要探讨前一个操作。
- 获取ByteBuf对象逻辑是在PooledHeapByteBuf.newInstance方法中,通过一个全局的ObjectPool对象池来获得(RECYCLER.get())。
获取到一个可用的ByteBuf对象后,通过PooledHeapByteBuf的reuse方法,将自身内部的读写指针等内部属性重新初始化一遍,避免被之前的数据污染。
- public class MyPoolArena{
- // 。。。 已省略无关逻辑
- /**
- * 从当前PoolArena中申请分配内存,并将其包装成一个PooledByteBuf返回
- * */
- MyPooledByteBuf<T> allocate(int reqCapacity, int maxCapacity) {
- // 从对象池中获取缓存的PooledByteBuf对象
- MyPooledByteBuf<T> buf = newByteBuf(maxCapacity);
- // 为其分配底层数组对应的内存
- allocate(buf, reqCapacity);
- return buf;
- }
- public static final class HeapArena extends MyPoolArena<byte[]>{
- @Override
- protected MyPooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
- return MyPooledHeapByteBuf.newInstance(maxCapacity);
- }
- }
- }
复制代码 [code]public class MyPooledHeapByteBuf extends MyPooledByteBuf{ // 。。。 已省略无关逻辑 private static final MyObjectPool RECYCLER = MyObjectPool.newPool( new MyObjectPool.ObjectCreator() { @Override public MyPooledHeapByteBuf newObject(MyObjectPool.Handle handle) { return new MyPooledHeapByteBuf(handle, 0); } }); MyPooledHeapByteBuf(MyObjectPool.Handle |