Netty解析二十五:Netty内存分配PoolArena
前面解析了PoolChunk、PoolSubpage和PoolChunkList的内容,接下来解析PoolArena。在之前解析的过程中可以发现PoolChunk、PoolSubpage和PoolChunkList都是被PoolArena管理着。
PoolArena
概述
PoolArena是Netty内存管理的一个核心入口,它来统筹内存的分配。无论要分配多大的内存空间都从PoolArena#allocate(io.netty.buffer.PoolThreadCache, int, int)方法中分配。它返回的是PooledByteBuf,内存空间的包装类,将堆内存和jvm外内存进行统一管理。
PoolArena将需要分配的内存大小划分为Tiny、Small、Normal和Huge。
- Tiny:小于pageSize,并小于512字节
- Small:小于pageSize
- Normal:小于等于chunkSize
- Huge:大于chunkSize
PoolArena可以理解为一个管理器/调度器,PoolArena不直接管理内存区域,内存区域的管理交由PoolChunk来管理。PoolArena中会使用PoolChunkList链表将使用的Chunk进行管理,并根据使用率调节优先级。
PoolChunk是一大块内存空间,其总大小为chunkSize,PoolChunk是由一小块一小块的Page组合成的,这个组合是二叉树的结构。通过二叉树节点的标记可以处理不同内存的大小的使用。
但是Page的大小PageSize也是比较大的,当需要分配Tiny/Small大小的内存时,直接使用Page是比较浪费的,所以在Page之下还有Subpage,Subpage是Page按一定长度进行划分的区域,每块SubPage可能大小不一样。在Chunk第一次划分了Subpage后PoolArena会将该Subpage加入到对应大小的tinySubpagePools/smallSubpagePools数组中,下次需要分配相同大小的空间时可以直接使用数组定位,而不用通过Chunk再划分。
PooledByteBuf:内存分为堆内存和jvm外内存,Netty为了统一多种内存的操作将他们进行了封装,这就是ByteBuf。ByteBuf有个成员变量memory,它代表实际的内存空间,例如:byte[]或者ByteBuffer。
继承关系以及成员变量
PoolArena是一个抽象类,可以发现PoolChunk、PoolSubpage和PoolChunkList这三个类都不是抽象类,这是因为PoolArena作为内存块总的管理入口对内存类型进行了抽象,它有两个实现类:DirectArena、HeapArena。
PoolArena中也定义了内存块的大小:Tiny、Small和Normal。
PlatformDependent类是Netty中用于处理不同jdk版本的类。
1 | abstract class PoolArena<T> implements PoolArenaMetric { |
构造器
构造器中主要是成员变量的初始化。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57protected PoolArena(PooledByteBufAllocator parent, int pageSize,
int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
directMemoryCacheAlignment = cacheAlignment;
directMemoryCacheAlignmentMask = cacheAlignment - 1;
subpageOverflowMask = ~(pageSize - 1);
// 创建数组
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
// 构建头结点
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
numSmallSubpagePools = pageShifts - 9;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
// 各个使用率的ChunkList
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);
qInit.prevList(qInit);
// 指标
List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
metrics.add(qInit);
metrics.add(q000);
metrics.add(q025);
metrics.add(q050);
metrics.add(q075);
metrics.add(q100);
chunkListMetrics = Collections.unmodifiableList(metrics);
}
private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
head.prev = head;
head.next = head;
return head;
}
("unchecked")
private PoolSubpage<T>[] newSubpagePoolArray(int size) {
return new PoolSubpage[size];
}
双向链表:高利用率的chunk会前后移动,低利用率的chunk会向前移动。
null<-q000<-q025<-q050<-q075<-q100
qInit->q000->q025->q050->q075->q100
qInit用于存储第一次初始化的chunk。
内存分配allocate
1 | PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { |
实际是调用allocate(PoolThreadCache cache, PooledByteBuf1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
// 将容量进行处理
final int normCapacity = normalizeCapacity(reqCapacity);
// 容量小于pageSize
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
// tiny
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
// 从ThreadCache中分配
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
// 定位tinySubpagePools数组中的位置
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
// 从ThreadCache中分配
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
// 定位smallSubpagePools数组中的位置
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
// 头结点加锁
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
// 使用Subpage的分配方法
long handle = s.allocate();
assert handle >= 0;
// 初始化
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
// count加一
incTinySmallAllocation(tiny);
return;
}
}
// tiny 和 small都没有分配成功则执行allocateNormal进行分配
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
// count加一
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
// 从ThreadCache中分配
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
// 分配
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// huge级别的内存分配
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
- 将容量reqCapacity进行处理得到normCapacity;
- 如果小于pageSize(也就是为tiny/small):
- 如果是tiny:先从ThreaCache中分配,如果失败则计算tiny数组中的位置tableIdx,并将table设置为tinySubpagePools;
- 如果是small:先从ThreaCache中分配,如果失败则计算small数组中的位置tableIdx,并将table设置为smallSubpagePools;
- 取出tableIdx所在的元素head;
- 加锁:
- 取出s=head.next
- 如果是s!=head:
- 调用s(Subpage)的allocate()进行分配;
- s.chunk执行initBufWithSubpage初始化;
- tiny使用计数器加一;
- return;
- 加锁执行Normal的内存分配allocateNormal();
- 如果normCapacity小于chunkSize
- 先从ThreaCache中分配,成功则return;
- 加锁:
- allocateNormal()方法分配内存;
- allocationsNormal使用计数器加一;
- 执行allocateHuge()方法分配内存。
tiny级别内存分配
1 | static int tinyIdx(int normCapacity) { |
small级别内存分配
1 | static int smallIdx(int normCapacity) { |
normal级别内存分配
1 | private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { |
- 不同使用率的PoolChunkList上进行分配,分配成功则返回;
- 创建一个新的Chunk,并分配对应的内存;
- 将新创建的Chunk添加到qInit中。
huge级别内存分配
huge级别内存的分配是通过构建一个未受池化的PoolChunk,并调用PooledByteBuf的initUnpooled方法初始化。1
2
3
4
5
6private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
activeBytesHuge.add(chunk.chunkSize());
buf.initUnpooled(chunk, reqCapacity);
allocationsHuge.increment();
}
内存释放free
1 | void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) { |
destroyChunk
destroyChunk由子类实现。1
protected abstract void destroyChunk(PoolChunk<T> chunk);
HeapArena
堆内内存释放由GC去回收。1
2
3
4
protected void destroyChunk(PoolChunk<byte[]> chunk) {
// Rely on GC.
}
DirectArena
堆外内存由专门的清理类清理。1
2
3
4
5
6
7
8
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {
if (PlatformDependent.useDirectBufferNoCleaner()) {
PlatformDependent.freeDirectNoCleaner(chunk.memory);
} else {
PlatformDependent.freeDirectBuffer(chunk.memory);
}
}
freeChunk
释放Chunk中的内存空间。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer, boolean finalizer) {
final boolean destroyChunk;
synchronized (this) {
// We only call this if freeChunk is not called because of the PoolThreadCache finalizer as otherwise this
// may fail due lazy class-loading in for example tomcat.
if (!finalizer) {
switch (sizeClass) {
case Normal:
++deallocationsNormal;
break;
case Small:
++deallocationsSmall;
break;
case Tiny:
++deallocationsTiny;
break;
default:
throw new Error();
}
}
// PoolChunkList的free方法,会根据当前chunk的使用率迁移chunk的位置
destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer);
}
// 如果上面的没有清理成功,则直接销毁这个Chunk
if (destroyChunk) {
// destroyChunk not need to be called while holding the synchronized lock.
destroyChunk(chunk);
}
}
内存扩容reallocate
分配一个新的内存空间,并将旧数据进行复制。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
assert newCapacity >= 0 && newCapacity <= buf.maxCapacity();
int oldCapacity = buf.length;
if (oldCapacity == newCapacity) {
return;
}
PoolChunk<T> oldChunk = buf.chunk;
ByteBuffer oldNioBuffer = buf.tmpNioBuf;
long oldHandle = buf.handle;
T oldMemory = buf.memory;
int oldOffset = buf.offset;
int oldMaxLength = buf.maxLength;
// This does not touch buf's reader/writer indices
// 分配一个新内存
allocate(parent.threadCache(), buf, newCapacity);
int bytesToCopy;
if (newCapacity > oldCapacity) {
bytesToCopy = oldCapacity;
} else {
buf.trimIndicesToCapacity(newCapacity);
bytesToCopy = newCapacity;
}
// 内存复制
memoryCopy(oldMemory, oldOffset, buf.memory, buf.offset, bytesToCopy);
if (freeOldMemory) {
free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache);
}
}
查找Subpage头节点findSubpagePoolHead
findSubpagePoolHead在chunk中使用,当第一次分配分配Subpage内存时(tiny/small)需要通过chunk进行分配,chunk中构建Subpage通过该方法查找头结点,然后将大小一样的Subpage串成链表。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20PoolSubpage<T> findSubpagePoolHead(int elemSize) {
int tableIdx;
PoolSubpage<T>[] table;
if (isTiny(elemSize)) { // < 512
// 除以16
tableIdx = elemSize >>> 4;
table = tinySubpagePools;
} else {
tableIdx = 0;
// 除以1024
elemSize >>>= 10;
while (elemSize != 0) {
elemSize >>>= 1;
tableIdx ++;
}
table = smallSubpagePools;
}
return table[tableIdx];
}
jvm外内存实现DirectArena
DirectArena是分配堆外内存的实现。
newChunk
构建新的Chunk1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder,
int pageShifts, int chunkSize) {
if (directMemoryCacheAlignment == 0) {
return new PoolChunk<ByteBuffer>(this,
allocateDirect(chunkSize), pageSize, maxOrder,
pageShifts, chunkSize, 0);
}
final ByteBuffer memory = allocateDirect(chunkSize
+ directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, memory, pageSize,
maxOrder, pageShifts, chunkSize,
offsetCacheLine(memory));
}
newUnpooledChunk
构建非池化的Chunk1
2
3
4
5
6
7
8
9
10
11
protected PoolChunk<ByteBuffer> newUnpooledChunk(int capacity) {
if (directMemoryCacheAlignment == 0) {
return new PoolChunk<ByteBuffer>(this,
allocateDirect(capacity), capacity, 0);
}
final ByteBuffer memory = allocateDirect(capacity
+ directMemoryCacheAlignment);
return new PoolChunk<ByteBuffer>(this, memory, capacity,
offsetCacheLine(memory));
}
allocateDirect
构建堆外内存对象ByteBuffer1
2
3
4private static ByteBuffer allocateDirect(int capacity) {
return PlatformDependent.useDirectBufferNoCleaner() ?
PlatformDependent.allocateDirectNoCleaner(capacity) : ByteBuffer.allocateDirect(capacity);
}
newByteBuf
ByteBuf是Netty的类,用于对内存(堆内存,jvm外部内存)的统一抽象和管理,这里构建的PooledByteBuf1
2
3
4
5
6
7
8
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) {
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
memoryCopy
内存空间复制1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected void memoryCopy(ByteBuffer src, int srcOffset, ByteBuffer dst, int dstOffset, int length) {
if (length == 0) {
return;
}
if (HAS_UNSAFE) {
PlatformDependent.copyMemory(
PlatformDependent.directBufferAddress(src) + srcOffset,
PlatformDependent.directBufferAddress(dst) + dstOffset, length);
} else {
// We must duplicate the NIO buffers because they may be accessed by other Netty buffers.
src = src.duplicate();
dst = dst.duplicate();
src.position(srcOffset).limit(srcOffset + length);
dst.position(dstOffset);
dst.put(src);
}
}
堆内存实现HeapArena
HeapArena实现非常简单,因为内存空间是堆内存,所以直接使用byte数组就可以存储。
newByteArray
1024以内的内存直接new byte[size],unsafe中有操作数组的方法,比较大的数组就用unsafe类去操作1
2
3private static byte[] newByteArray(int size) {
return PlatformDependent.allocateUninitializedArray(size);
}
newChunk
1 |
|
newUnpooledChunk
1 |
|
newByteBuf
1 |
|
memoryCopy
1 |
|