Netty源码—7.ByteBuf原理二
大纲9.Netty的内存规格
10.缓存数据结构
11.命中缓存的分配流程
12.Netty里有关内存分配的重要概念
13.Page级别的内存分配
14.SubPage级别的内存分配
15.ByteBuf的回收
9.Netty的内存规格
(1)4种内存规格
(2)内存申请单位
(1)4种内存规格
一.tiny:表示从0到512字节之间的内存大小
二.small:表示从512字节到8K范围的内存大小
三.normal:表示从8K到16M范围的内存大小
四.huge:表示大于16M的内存大小
(2)内存申请单位
Netty里所有的内存申请都是以Chunk为单位向操作系统申请的,后续所有的内存分配都是在这个Chunk里进行对应的操作。比如要分配1M的内存,那么首先要申请一个16M的Chunk,然后在这个16M的Chunk里取出一段1M的连续内存放入到Netty的ByteBuf里。
注意:一个Chunk的大小为16M,一个Page的大小为8K,一个SubPage的大小是0~8K,一个Chunk可以分成2048个Page。
10.缓存数据结构
(1)MemoryRegionCache的组成
(2)MemoryRegionCache的类型
(3)MemoryRegionCache的源码
(1)MemoryRegionCache的组成
Netty中与缓存相关的数据结构叫MemoryRegionCache,这是内存相关的一个缓存。MemoryRegionCache由三部分组成:queue、sizeClass、size。
一.queue
queue是一个队列,里面的每个元素都是MemoryRegionCache内部类Entry的一个实体,每一个Entry实体里都有一个chunk和一个handle。Netty里所有的内存都是以Chunk为单位进行分配的,而每一个handle都指向唯一一段连续的内存。所以一个chunk + 一个指向连续内存的handle,就能确定这块Entry的内存大小和内存位置,然后所有这些Entry组合起来就变成一个缓存的链。
二.sizeClass
sizeClass是Netty里的内存规格,其中有三种类型的内存规则。一种是tiny(0~512B),一种是small(512B~8K),一种是normal(8K~16M)。由于huge是直接使用非缓存的内存分配,所以不在该sizeClass范围内。
三.size
一个MemoryRegionCache所缓存的一个ByteBuf的大小是固定的。如果MemoryRegionCache里缓存了1K的ByteBuf,那么queue里所有的元素都是1K的ByteBuf。也就是说,同一个MemoryRegionCache它的queue里的所有元素都是固定大小的。这些固定大小分别有:tiny类型规则的是16B的整数倍直到498B,small类型规则的有512B、1K、2K、4K,normal类型规定的有8K、16K、32K。所以对于32K以上是不缓存的。
(2)MemoryRegionCache的类型
Netty里所有规格的MemoryRegionCache如下图示,下面的每个节点就相当于一个MemoryRegionCache的数据结构。
其中tiny类型的内存规格有32种,也就是32个节点,分别是16B、32B、48B、......、496B。这里面的每个节点都是一个MemoryRegionCache,每个MemoryRegionCache里都有一个queue。假设要分配一个16B的ByteBuf:首先会定位到small类型的内存规格里的第二个节点,然后从该节点维护的queue队列里取出一个Entry元素。通过该Entry元素可以拿到它属于哪一个chunk以及哪一个handle,从而进行内存划分。
small类型的内存规格有4种,也就是4个节点,分别是512B、1K、2K、4K。每个节点都是一个MemoryRegionCache,每个MemoryRegionCache里都有一个queue。假设要分配一个1K的ByteBuf:首先会定位到small类型的内存规格里的第二个节点,然后从该节点维护的queue里取出一个Entry元素。这样就可以基于这个Entry元素分配出1K内存的ByteBuf,不需要再去Chunk上找一段临时内存了。
normal类型的内存规格有3种,也就是3个节点,分别是8K、16K、32K,关于Normal大小的ByteBuf的内存分配也是同样道理。
(3)MemoryRegionCache的源码
每个线程都会有一个PoolThreadCache对象,每个PoolThreadCache对象都会有tiny、small、normal三种规格的缓存。每种规格又分heap和direct,所以每个PoolThreadCache对象会有6种缓存。PoolThreadCache类正是使用了6个MemoryRegionCache数组来维护这6种缓存。如:
数组tinySubPageHeapCaches拥有32个MemoryRegionCache元素,下标为n的元素用于缓存大小为n * 16B的ByteBuf。
数组smallSubPageHeapCaches拥有4个MemoryRegionCache元素,下标为n的元素用于缓存大小为2^n * 512B的ByteBuf。
数组normalHeapCaches拥有3个MemoryRegionCache元素,下标为n的元素用于缓存大小为2^n * 8K的ByteBuf。
数组tinySubPageHeapCaches里的每个MemoryRegionCache元素,最多可以缓存tinyCacheSize个即512个ByteBuf。
数组smallSubPageHeapCaches里的每个MemoryRegionCache元素,最多可以缓存smallCacheSize个即256个ByteBuf。
数组normalHeapCaches里的每个MemoryRegionCache元素,最多可以缓存normalCacheSize个即64个ByteBuf。
final class PoolThreadCache {
//真正要分配的内存其实就是byte[] 或者 ByteBuffer,所以实际的分配就是得到一个数值handle进行定位
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
//Hold the caches for the different size classes, which are tiny, small and normal.
//有32个MemoryRegionCache元素,分别存放16B、32B、48B、...、480B、496B的SubPage级别的内存
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
//有4个MemoryRegionCache元素,分别存放512B、1K、2K、4K的SubPage级别的内存
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
//有3个MemoryRegionCache元素,分别存放8K、16K、32K的Page级别的内存
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
...
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
tinySubPageDirectCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
//No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) {
//Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);
heapArena.numThreadCaches.getAndIncrement();
} else {
//No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
//The thread-local cache will keep a list of pooled buffers which must be returned to the pool when the thread is not alive anymore.
ThreadDeathWatcher.watch(thread, freeTask);
}
private static <T> MemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0) {
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache;
for (int i = 0; i < cache.length; i++) {
cache = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}
private static <T> MemoryRegionCache<T>[] createNormalCaches(int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
if (cacheSize > 0) {
int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache;
for (int i = 0; i < cache.length; i++) {
cache = new NormalMemoryRegionCache<T>(cacheSize);
}
return cache;
} else {
return null;
}
}
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size, sizeClass);
}
...
}
private static int log2(int val) {
int res = 0;
while (val > 1) {
val >>= 1;
res++;
}
return res;
}
...
private abstract static class MemoryRegionCache<T> {
private final int size;
private final Queue<Entry<T>> queue;
private final SizeClass sizeClass;
MemoryRegionCache(int size, SizeClass sizeClass) {
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
...
static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
PoolChunk<T> chunk;
long handle = -1;
Entry(Handle<Entry<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
void recycle() {
chunk = null;
handle = -1;
recyclerHandle.recycle(this);
}
}
}
}
abstract class PoolArena<T> implements PoolArenaMetric {
enum SizeClass {
Tiny,
Small,
Normal
}
...
}(3)标记连续内存的区段为未使用
标记方式会根据Page级别和SubPage级别进行标记,其中Page级别是根据二叉树来进行标记,SubPage级别是通过位图进行标记。
public class PooledByteBufAllocator extends AbstractByteBufAllocator {
private final PoolThreadLocalCache threadCache;
private final PoolArena<byte[]>[] heapArenas;//一个线程会和一个PoolArena绑定
private final PoolArena<ByteBuffer>[] directArenas;//一个线程会和一个PoolArena绑定
...
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<byte[]> heapArena = cache.heapArena;
ByteBuf buf;
if (heapArena != null) {
//分配堆内存
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
//分配直接内存
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
if (PlatformDependent.hasUnsafe()) {
buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
...
}
abstract class PoolArena<T> implements PoolArenaMetric {
...
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);//创建ByteBuf对象
allocate(cache, buf, reqCapacity);//基于PoolThreadCache对ByteBuf对象进行内存分配
return buf;
}
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
//1.根据reqCapacity进行分段规格化
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) {//capacity < pageSize,需要分配的内存小于8K
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) {//< 512
//2.进行缓存分配
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
//命中缓存,was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
//2.进行缓存分配
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
//命中缓存,was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table;
//Synchronize on the head.
//This is needed as PoolChunk#allocateSubpage(int) and PoolChunk#free(long) may modify the doubly linked list as well.
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
if (tiny) {
allocationsTiny.increment();
} else {
allocationsSmall.increment();
}
return;
}
}
//没有命中缓存
allocateNormal(buf, reqCapacity, normCapacity);
return;
}
if (normCapacity <= chunkSize) {//需要分配的内存大于8K,但小于16M
//2.进行缓存分配
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
//命中缓存,was able to allocate out of the cache so move on
return;
}
//没有命中缓存
allocateNormal(buf, reqCapacity, normCapacity);
} else {//需要分配的内存大于16M
//Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
//根据reqCapacity进行分段规格化
int normalizeCapacity(int reqCapacity) {
if (reqCapacity < 0) {
throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
}
if (reqCapacity >= chunkSize) {
return reqCapacity;
}
if (!isTiny(reqCapacity)) { // >= 512
int normalizedCapacity = reqCapacity;
normalizedCapacity --;
normalizedCapacity |= normalizedCapacity >>>1;
normalizedCapacity |= normalizedCapacity >>>2;
normalizedCapacity |= normalizedCapacity >>>4;
normalizedCapacity |= normalizedCapacity >>>8;
normalizedCapacity |= normalizedCapacity >>> 16;
normalizedCapacity ++;
if (normalizedCapacity < 0) {
normalizedCapacity >>>= 1;
}
return normalizedCapacity;
}
if ((reqCapacity & 15) == 0) {
return reqCapacity;
}
return (reqCapacity & ~15) + 16;
}
...
}
final class PoolThreadCache {
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
//Hold the caches for the different size classes, which are tiny, small and normal.
//有32个MemoryRegionCache元素,分别存放16B、32B、48B、...、480B、496B的SubPage级别的内存
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
//有4个MemoryRegionCache元素,分别存放512B、1K、2K、4K的SubPage级别的内存
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
//有3个MemoryRegionCache元素,分别存放8K、16K、32K的Page级别的内存
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
...
//Try to allocate a tiny buffer out of the cache. Returns true if successful false otherwise
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
//首先调用cacheForTiny()方法找到需要分配的size对应的MemoryRegionCache
//然后调用allocate()方法基于MemoryRegionCache去给ByteBuf对象分配内存
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
//找到需要分配的size对应的MemoryRegionCache
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity);
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
//根据索引去缓存数组中返回一个MemoryRegionCache元素
private static <T> MemoryRegionCache<T> cache(MemoryRegionCache<T>[] cache, int idx) {
if (cache == null || idx > cache.length - 1) {
return null;
}
return cache;
}
//基于MemoryRegionCache去给ByteBuf对象分配内存
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
return false;
}
//调用MemoryRegionCache的allocate()方法给buf分配大小为reqCapacity的一块内存
boolean allocated = cache.allocate(buf, reqCapacity);
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}
...
private abstract static class MemoryRegionCache<T> {
private final int size;
private final Queue<Entry<T>> queue;
private final SizeClass sizeClass;
private int allocations;
...
//Allocate something out of the cache if possible and remove the entry from the cache.
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
//步骤一:从queue队列中弹出一个Entry元素
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
//步骤二:初始化buf
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
//步骤三:将弹出的Entry元素放入对象池中进行复用
entry.recycle();
//allocations is not thread-safe which is fine as this is only called from the same thread all time.
++ allocations;
return true;
}
//Init the PooledByteBuf using the provided chunk and handle with the capacity restrictions.
protected abstract void initBuf(PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity);
static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
PoolChunk<T> chunk;
long handle = -1;
Entry(Handle<Entry<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
void recycle() {
chunk = null;
handle = -1;
recyclerHandle.recycle(this);
}
}
}
}(4)将ByteBuf对象添加到对象池
一开始时,对象池是没有PooledByteBuf对象的,当PooledByteBuf对象被释放时不会被立即销毁,而是会加入到对象池里。
这样当Netty每次去拿一个PooledByteBuf对象时,就可以先从对象池里获取,取出对象之后就可以进行内存分配以及初始化了。
考虑到PooledByteBuf对象会经常被申请和释放,如果QPS非常高,可能会产生很多PooledByteBuf对象,而且频繁创建和释放PooledByteBuf对象也会比较耗费资源和降低性能。
所以Netty便使用了对象池来减少GC:当申请PooledByteBuf对象时,就可以尽可能从对象池里去取。当释放PooledByteBuf对象时,则可以将对象添加到对象池,从而实现对象复用。
final class PoolThreadCache {
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
//Hold the caches for the different size classes, which are tiny, small and normal.
//有32个MemoryRegionCache元素,分别存放16B、32B、48B、...、480B、496B的SubPage级别的内存
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
//有4个MemoryRegionCache元素,分别存放512B、1K、2K、4K的SubPage级别的内存
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
//有3个MemoryRegionCache元素,分别存放8K、16K、32K的Page级别的内存
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
...
}
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]