前面解析了PoolChunk、PoolSubpage和PoolChunkList的内容,接下来解析PoolArena。在之前解析的过程中可以发现PoolChunk、PoolSubpage和PoolChunkList都是被PoolArena管理着。

PoolArena

概述

PoolArena是Netty内存管理的一个核心入口,它来统筹内存的分配。无论要分配多大的内存空间都从PoolArena#allocate(io.netty.buffer.PoolThreadCache, int, int)方法中分配。它返回的是PooledByteBuf,内存空间的包装类,将堆内存和jvm外内存进行统一管理。

PoolArena将需要分配的内存大小划分为Tiny、Small、Normal和Huge。

  • Tiny:小于pageSize,并小于512字节
  • Small:小于pageSize
  • Normal:小于等于chunkSize
  • Huge:大于chunkSize

PoolArena可以理解为一个管理器/调度器,PoolArena不直接管理内存区域,内存区域的管理交由PoolChunk来管理。PoolArena中会使用PoolChunkList链表将使用的Chunk进行管理,并根据使用率调节优先级。

PoolChunk是一大块内存空间,其总大小为chunkSize,PoolChunk是由一小块一小块的Page组合成的,这个组合是二叉树的结构。通过二叉树节点的标记可以处理不同内存的大小的使用。

但是Page的大小PageSize也是比较大的,当需要分配Tiny/Small大小的内存时,直接使用Page是比较浪费的,所以在Page之下还有Subpage,Subpage是Page按一定长度进行划分的区域,每块SubPage可能大小不一样。在Chunk第一次划分了Subpage后PoolArena会将该Subpage加入到对应大小的tinySubpagePools/smallSubpagePools数组中,下次需要分配相同大小的空间时可以直接使用数组定位,而不用通过Chunk再划分。

PooledByteBuf:内存分为堆内存和jvm外内存,Netty为了统一多种内存的操作将他们进行了封装,这就是ByteBuf。ByteBuf有个成员变量memory,它代表实际的内存空间,例如:byte[]或者ByteBuffer。

继承关系以及成员变量

PoolArena是一个抽象类,可以发现PoolChunk、PoolSubpage和PoolChunkList这三个类都不是抽象类,这是因为PoolArena作为内存块总的管理入口对内存类型进行了抽象,它有两个实现类:DirectArena、HeapArena。

PoolArena中也定义了内存块的大小:Tiny、Small和Normal。

PlatformDependent类是Netty中用于处理不同jdk版本的类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
abstract class PoolArena<T> implements PoolArenaMetric {
// jdk版本中是否有sun的unsafe类,unsafe类可用于内存操作
static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
// 定义了需要分配内存块的大小
enum SizeClass {
Tiny,
Small,
Normal
}
// 存Tiny大小的Subpage数组长度,默认为32
static final int numTinySubpagePools = 512 >>> 4;
// 池化ByteBuf分配器
final PooledByteBufAllocator parent;
// chunk的最大树高(层数)
private final int maxOrder;
// chunk的page大小
final int pageSize;
// 从1开始左移到页大小的位置,默认13,1<<13 = 8192
final int pageShifts;
// chunk的大小
final int chunkSize;
// 子页mask,~(pageSize -1)大于pageSize的位为1,pageSize的位为0
// 需要分配容量 & subpageOverflowMask !=0 则容量要大于PageSize
// 判断分配请求为Tiny/Small即分配subpage
final int subpageOverflowMask;
// 存Small大小的Subpage数组长度,默认为32
final int numSmallSubpagePools;
//
final int directMemoryCacheAlignment;
final int directMemoryCacheAlignmentMask;
// tiny和small
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
// qxxx代表了PoolChunkList的使用率
// 低使用率的在前,chunk使用率低的会不断的从高使用率的List中移动到低使用率的List中
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit; // 刚初始化的
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
// 指标
private final List<PoolChunkListMetric> chunkListMetrics;

// Metrics for allocations and deallocations
private long allocationsNormal;
// 原子性计数器,对jdk类的封装,>=1.8底层使用LongAddr,<1.8使用AtomicLong
private final LongCounter allocationsTiny = PlatformDependent.newLongCounter();
private final LongCounter allocationsSmall = PlatformDependent.newLongCounter();
private final LongCounter allocationsHuge = PlatformDependent.newLongCounter();
// 激活的Huge级别内存量
private final LongCounter activeBytesHuge = PlatformDependent.newLongCounter();
// 剩余存储单元
private long deallocationsTiny;
private long deallocationsSmall;
private long deallocationsNormal;

// We need to use the LongCounter here as this is not guarded via synchronized block.
private final LongCounter deallocationsHuge = PlatformDependent.newLongCounter();

// Number of thread caches backed by this arena.
final AtomicInteger numThreadCaches = new AtomicInteger();
}

构造器

构造器中主要是成员变量的初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
protected PoolArena(PooledByteBufAllocator parent, int pageSize,
int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
directMemoryCacheAlignment = cacheAlignment;
directMemoryCacheAlignmentMask = cacheAlignment - 1;
subpageOverflowMask = ~(pageSize - 1);
// 创建数组
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
// 构建头结点
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}

numSmallSubpagePools = pageShifts - 9;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
// 各个使用率的ChunkList
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);

q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);
qInit.prevList(qInit);
// 指标
List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
metrics.add(qInit);
metrics.add(q000);
metrics.add(q025);
metrics.add(q050);
metrics.add(q075);
metrics.add(q100);
chunkListMetrics = Collections.unmodifiableList(metrics);
}
private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
head.prev = head;
head.next = head;
return head;
}

@SuppressWarnings("unchecked")
private PoolSubpage<T>[] newSubpagePoolArray(int size) {
return new PoolSubpage[size];
}

双向链表:高利用率的chunk会前后移动,低利用率的chunk会向前移动。

null<-q000<-q025<-q050<-q075<-q100

qInit->q000->q025->q050->q075->q100

qInit用于存储第一次初始化的chunk。

内存分配allocate

1
2
3
4
5
6
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
// newByteBuf抽象方法
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}

实际是调用allocate(PoolThreadCache cache, PooledByteBuf buf, final int reqCapacity)方法分配内存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
// 将容量进行处理
final int normCapacity = normalizeCapacity(reqCapacity);
// 容量小于pageSize
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
// tiny
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
// 从ThreadCache中分配
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
// 定位tinySubpagePools数组中的位置
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
// 从ThreadCache中分配
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
// 定位smallSubpagePools数组中的位置
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}

final PoolSubpage<T> head = table[tableIdx];
// 头结点加锁
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
// 使用Subpage的分配方法
long handle = s.allocate();
assert handle >= 0;
// 初始化
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
// count加一
incTinySmallAllocation(tiny);
return;
}
}
// tiny 和 small都没有分配成功则执行allocateNormal进行分配
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
// count加一
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
// 从ThreadCache中分配
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
// 分配
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// huge级别的内存分配
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}

  1. 将容量reqCapacity进行处理得到normCapacity;
  2. 如果小于pageSize(也就是为tiny/small):
    1. 如果是tiny:先从ThreaCache中分配,如果失败则计算tiny数组中的位置tableIdx,并将table设置为tinySubpagePools;
    2. 如果是small:先从ThreaCache中分配,如果失败则计算small数组中的位置tableIdx,并将table设置为smallSubpagePools;
    3. 取出tableIdx所在的元素head;
    4. 加锁:
      1. 取出s=head.next
      2. 如果是s!=head:
        1. 调用s(Subpage)的allocate()进行分配;
        2. s.chunk执行initBufWithSubpage初始化;
        3. tiny使用计数器加一;
        4. return;
    5. 加锁执行Normal的内存分配allocateNormal();
  3. 如果normCapacity小于chunkSize
    1. 先从ThreaCache中分配,成功则return;
    2. 加锁:
    3. allocateNormal()方法分配内存;
    4. allocationsNormal使用计数器加一;
  4. 执行allocateHuge()方法分配内存。

tiny级别内存分配

1
2
3
static int tinyIdx(int normCapacity) {
return normCapacity >>> 4;
}

small级别内存分配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static int smallIdx(int normCapacity) {
int tableIdx = 0;
int i = normCapacity >>> 10;
while (i != 0) {
i >>>= 1;
tableIdx ++;
}
return tableIdx;
}
private void incTinySmallAllocation(boolean tiny) {
if (tiny) {
allocationsTiny.increment();
} else {
allocationsSmall.increment();
}
}

normal级别内存分配

1
2
3
4
5
6
7
8
9
10
11
12
13
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
return;
}

// Add a new chunk.
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
boolean success = c.allocate(buf, reqCapacity, normCapacity);
assert success;
qInit.add(c);
}
  1. 不同使用率的PoolChunkList上进行分配,分配成功则返回;
  2. 创建一个新的Chunk,并分配对应的内存;
  3. 将新创建的Chunk添加到qInit中。

huge级别内存分配

huge级别内存的分配是通过构建一个未受池化的PoolChunk,并调用PooledByteBuf的initUnpooled方法初始化。

1
2
3
4
5
6
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
activeBytesHuge.add(chunk.chunkSize());
buf.initUnpooled(chunk, reqCapacity);
allocationsHuge.increment();
}

内存释放free

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
// 非池化内存处理
if (chunk.unpooled) {
int size = chunk.chunkSize();
// 销毁
destroyChunk(chunk);
// huge块使用量减去释放的量
activeBytesHuge.add(-size);
//
deallocationsHuge.increment();
} else {
// 内存大小类型 Tiny Small Normal
SizeClass sizeClass = sizeClass(normCapacity);
// 有cache就加入到cache中
if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
// 释放Chunk
freeChunk(chunk, handle, sizeClass, nioBuffer, false);
}
}

destroyChunk

destroyChunk由子类实现。

1
protected abstract void destroyChunk(PoolChunk<T> chunk);

HeapArena

堆内内存释放由GC去回收。

1
2
3
4
@Override
protected void destroyChunk(PoolChunk<byte[]> chunk) {
// Rely on GC.
}

DirectArena

堆外内存由专门的清理类清理。

1
2
3
4
5
6
7
8
@Override
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
if (PlatformDependent.useDirectBufferNoCleaner()) {
PlatformDependent.freeDirectNoCleaner(chunk.memory);
} else {
PlatformDependent.freeDirectBuffer(chunk.memory);
}
}

freeChunk

释放Chunk中的内存空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer, boolean finalizer) {
final boolean destroyChunk;
synchronized (this) {
// We only call this if freeChunk is not called because of the PoolThreadCache finalizer as otherwise this
// may fail due lazy class-loading in for example tomcat.
if (!finalizer) {
switch (sizeClass) {
case Normal:
++deallocationsNormal;
break;
case Small:
++deallocationsSmall;
break;
case Tiny:
++deallocationsTiny;
break;
default:
throw new Error();
}
}
// PoolChunkList的free方法,会根据当前chunk的使用率迁移chunk的位置
destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer);
}
// 如果上面的没有清理成功,则直接销毁这个Chunk
if (destroyChunk) {
// destroyChunk not need to be called while holding the synchronized lock.
destroyChunk(chunk);
}
}

内存扩容reallocate

分配一个新的内存空间,并将旧数据进行复制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
assert newCapacity >= 0 && newCapacity <= buf.maxCapacity();

int oldCapacity = buf.length;
if (oldCapacity == newCapacity) {
return;
}

PoolChunk<T> oldChunk = buf.chunk;
ByteBuffer oldNioBuffer = buf.tmpNioBuf;
long oldHandle = buf.handle;
T oldMemory = buf.memory;
int oldOffset = buf.offset;
int oldMaxLength = buf.maxLength;

// This does not touch buf's reader/writer indices
// 分配一个新内存
allocate(parent.threadCache(), buf, newCapacity);
int bytesToCopy;
if (newCapacity > oldCapacity) {
bytesToCopy = oldCapacity;
} else {
buf.trimIndicesToCapacity(newCapacity);
bytesToCopy = newCapacity;
}
// 内存复制
memoryCopy(oldMemory, oldOffset, buf.memory, buf.offset, bytesToCopy);
if (freeOldMemory) {
free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache);
}
}

查找Subpage头节点findSubpagePoolHead

findSubpagePoolHead在chunk中使用,当第一次分配分配Subpage内存时(tiny/small)需要通过chunk进行分配,chunk中构建Subpage通过该方法查找头结点,然后将大小一样的Subpage串成链表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
PoolSubpage<T> findSubpagePoolHead(int elemSize) {
int tableIdx;
PoolSubpage<T>[] table;
if (isTiny(elemSize)) { // < 512
// 除以16
tableIdx = elemSize >>> 4;
table = tinySubpagePools;
} else {
tableIdx = 0;
// 除以1024
elemSize >>>= 10;
while (elemSize != 0) {
elemSize >>>= 1;
tableIdx ++;
}
table = smallSubpagePools;
}

return table[tableIdx];
}

jvm外内存实现DirectArena

DirectArena是分配堆外内存的实现。

newChunk

构建新的Chunk

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder,
int pageShifts, int chunkSize) {
if (directMemoryCacheAlignment == 0) {
return new PoolChunk<ByteBuffer>(this,
allocateDirect(chunkSize), pageSize, maxOrder,
pageShifts, chunkSize, 0);
}
final ByteBuffer memory = allocateDirect(chunkSize
+ directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, memory, pageSize,
maxOrder, pageShifts, chunkSize,
offsetCacheLine(memory));
}

newUnpooledChunk

构建非池化的Chunk

1
2
3
4
5
6
7
8
9
10
11
@Override
protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
if (directMemoryCacheAlignment == 0) {
return new PoolChunk<ByteBuffer>(this,
allocateDirect(capacity), capacity, 0);
}
final ByteBuffer memory = allocateDirect(capacity
+ directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, memory, capacity,
offsetCacheLine(memory));
}

allocateDirect

构建堆外内存对象ByteBuffer

1
2
3
4
private static ByteBuffer allocateDirect(int capacity) {
return PlatformDependent.useDirectBufferNoCleaner() ?
PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity);
}

newByteBuf

ByteBuf是Netty的类,用于对内存(堆内存,jvm外部内存)的统一抽象和管理,这里构建的PooledByteBuf只是一个jvm对象,它的memory属性指向实际的内存空间,目前为空,只有通过Arena的allocate方法分配实际的内存空间后才会有值。

1
2
3
4
5
6
7
8
 @Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) {
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}

memoryCopy

内存空间复制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
protected void memoryCopy(ByteBuffer src, int srcOffset, ByteBuffer dst, int dstOffset, int length) {
if (length == 0) {
return;
}

if (HAS_UNSAFE) {
PlatformDependent.copyMemory(
PlatformDependent.directBufferAddress(src) + srcOffset,
PlatformDependent.directBufferAddress(dst) + dstOffset, length);
} else {
// We must duplicate the NIO buffers because they may be accessed by other Netty buffers.
src = src.duplicate();
dst = dst.duplicate();
src.position(srcOffset).limit(srcOffset + length);
dst.position(dstOffset);
dst.put(src);
}
}

堆内存实现HeapArena

HeapArena实现非常简单,因为内存空间是堆内存,所以直接使用byte数组就可以存储。

newByteArray

1024以内的内存直接new byte[size],unsafe中有操作数组的方法,比较大的数组就用unsafe类去操作

1
2
3
private static byte[] newByteArray(int size) {
return PlatformDependent.allocateUninitializedArray(size);
}

newChunk

1
2
3
4
@Override
protected PoolChunk<byte[]> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {
return new PoolChunk<byte[]>(this, newByteArray(chunkSize), pageSize, maxOrder, pageShifts, chunkSize, 0);
}

newUnpooledChunk

1
2
3
4
@Override
protected PoolChunk<byte[]> newUnpooledChunk(int capacity) {
return new PoolChunk<byte[]>(this, newByteArray(capacity), capacity, 0);
}

newByteBuf

1
2
3
4
5
@Override
protected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {
return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity)
: PooledHeapByteBuf.newInstance(maxCapacity);
}

memoryCopy

1
2
3
4
5
6
7
8
@Override
protected void memoryCopy(byte[] src, int srcOffset, byte[] dst, int dstOffset, int length) {
if (length == 0) {
return;
}

System.arraycopy(src, srcOffset, dst, dstOffset, length);
}