Netty解析二十二:Netty内存分配PoolChunk
Netty的内存分配与JEMalloc类似,从大到小可以划分为:Arena, Chunk, Page, SubPage
其中核心的是Chunk,Chunk使用伙伴分配算法分配Chunk中的Page节点,而Page由更小的SubPage组成。
PoolChunk的伙伴分配算法
PoolChunk中有两个用数组实现的二叉树【memoryMap,depthMap】,初始时memoryMap,depthMap中存储的都是当前节点的高度,memoryMap用于存储分配信息,depthMap只是存储高度初始化后就不再改变。

左图表示每个节点的编号,节点从1开始,0被舍弃。这样节点的父子关系更容易计算:子节点加倍,父节点减半,比如512的子节点为1024=512 * 2。右图表示每个节点的高度,从0开始。在代表二叉树的数组中,左图中节点上的数字作为数组索引即id,右图节点上的数字作为值。初始状态时,memoryMap和depthMap相等。
当某个节点被使用时会将memoryMap中的节点高度值改为最大高度加一(maxOrder+1)而depthMap在修改后不变。如下图(节点序号|高度):

- 查找到4号节点;
- 将4号节点的高度设置成已使用(maxOrder+1);
- 依次将父节点的高度设置为其子节点的最小值。
这个改变的高度就是memoryMap数组中存储的值。
PoolChunk解析
继承关系
1 | final class PoolChunk<T> implements PoolChunkMetric {} |
PoolChunkMetric接口指定了3个指标方法:usage(), chunkSize(), freeBytes();
成员变量
1 | // Arena对象 |
构造器
有两个构造器,一个普通(池化)构造器,一个非池化构造器。在构造器中有一些值的初始化是非常需要关注的。
普通(池化)构造器1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {
unpooled = false;
this.arena = arena;
this.memory = memory;
this.pageSize = pageSize;
this.pageShifts = pageShifts;
this.maxOrder = maxOrder;
this.chunkSize = chunkSize;
this.offset = offset;
unusable = (byte) (maxOrder + 1); // 标记不可用的值
log2ChunkSize = log2(chunkSize); // chunk大小的幂
subpageOverflowMask = ~(pageSize - 1);
freeBytes = chunkSize;
assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
maxSubpageAllocs = 1 << maxOrder;
memoryMap = new byte[maxSubpageAllocs << 1];
depthMap = new byte[memoryMap.length];
int memoryMapIndex = 1;
// 初始化两个二叉树,值都是节点高度
for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
int depth = 1 << d;
for (int p = 0; p < depth; ++ p) {
memoryMap[memoryMapIndex] = (byte) d;
depthMap[memoryMapIndex] = (byte) d;
memoryMapIndex ++;
}
}
// new PoolSubpage[size]
subpages = newSubpageArray(maxSubpageAllocs);
cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
}
非池化构造器1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18PoolChunk(PoolArena<T> arena, T memory, int size, int offset) {
unpooled = true;
this.arena = arena;
this.memory = memory;
this.offset = offset;
memoryMap = null;
depthMap = null;
subpages = null;
subpageOverflowMask = 0;
pageSize = 0;
pageShifts = 0;
maxOrder = 0;
unusable = (byte) (maxOrder + 1);
chunkSize = size;
log2ChunkSize = log2(chunkSize);
maxSubpageAllocs = 0;
cachedNioBuffers = null;
}
分配内存
分配内存的入口在allocate方法。
reqCapacity是实际需要的空间,normCapacity是PoolArena#normalizeCapacity方法计算的空间。例如reqCapacity要123b空间,不可能给123b空间,必然是大于这个数。因为chunk的空间大小划分是均分的。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
final long handle;
// 如果需要分配的空间大于等于pageSize则执行allocateRun,不然执行allocateSubpage
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
handle = allocateRun(normCapacity);
} else {
handle = allocateSubpage(normCapacity);
}
// handle是分配的id,小于0则分配失败
if (handle < 0) {
return false;
}
// 从缓存中取出buffer
ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
// 初始化
initBuf(buf, nioBuffer, handle, reqCapacity);
return true;
}
allocateRun
allocateRun分配的空间大于等于pageSize的方法。1
2
3
4
5
6
7
8
9
10
11
12
13private long allocateRun(int normCapacity) {
// 最大树高 - (normCapacity的幂 - 从1开始左移到页大小的位置,默认13,1<<13 = 8192)
// d是满足normCapacity容量需要的树高
int d = maxOrder - (log2(normCapacity) - pageShifts);
// 根据树高分配节点
int id = allocateNode(d);
if (id < 0) {
return id;
}
// 计算剩余容量
freeBytes -= runLength(id);
return id;
}
- 计算这个空间需要的树高d;
- 分配对应的节点;
- 计算剩余空间freeBytes;
1 | private int runLength(int id) { |
allocateNode
allocateNode分配满足树高的节点。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31private int allocateNode(int d) {
int id = 1;
// 小于等于1<<d的位为0,高于的位为1
int initial = - (1 << d); // has last d bits = 0 and rest all = 1
// 计算id为1的节点的实际分配情况
byte val = value(id);
if (val > d) { // unusable
return -1;
}
// 找到val == d的节点,且树高最大的节点
// (id & initial) == 0 用于父节点和字节点的树高值相同的情况去获取最底层(初始树高最大)的节点
// 父节点和字节点的树高值相同即有一个子节点已经被分配了
while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
// <<1 扩大2倍,即为查找当前节点的左节点
id <<= 1;
val = value(id);
if (val > d) {
// +1 当前节点的兄弟节点
id ^= 1;
val = value(id);
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
// 设置节点为已分配
setValue(id, unusable); // mark as unusable
// 更新父节点的树高信息
updateParentsAlloc(id);
return id;
}
- 从id=1开始判断树高是否满足;
- 获取到实际树高满足的节点;
- 将节点设置成已分配(unusable);
- 更新分配节点的父节点的树高;
1 | private void updateParentsAlloc(int id) { |
循环更新父节点->父节点的父节点->…的树高信息,的树高取其子节点中的最小值。
allocateSubpage
allocateSubpage是分配小于pageSize的空间。
1 | private long allocateSubpage(int normCapacity) { |
- 获取arena中的PoolSubpage链表的头;
- 加锁;
- 分配page节点(只能从叶子节点中分配);
- 计算剩余空间;
- 计算在subPage数组上的位置;
- 获取chunk的subpages中的SubPage对象;
- 如果SubPage对象存在则执行init,不然则new一个对象;
- 执行SubPage对象的allocate方法。
1 | // 得到叶子节点的偏移索引,从0开始,即2048-0,2049-1,... |
initBuf
1 | void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) { |
subPage的初始化,两种的区别主要是最终buf初始化的偏移量不一样。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
long handle, int bitmapIdx, int reqCapacity) {
assert bitmapIdx != 0;
int memoryMapIdx = memoryMapIdx(handle);
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage.doNotDestroy;
assert reqCapacity <= subpage.elemSize;
buf.init(
this, nioBuffer, handle,
偏移量:节点的偏移量(id ^ 1 << depth(id))*分配的大小+ subPage的偏移量*大小 + offset
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, arena.parent.threadCache());
}
内存释放
free
1 | void free(long handle, ByteBuffer nioBuffer) { |
updateParentsFree
updateParentsFree更新父节点的val,如果两个子节点都被释放了则将父节点更新成初始高度,不然更新为两个子节点中的最小值。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18private void updateParentsFree(int id) {
int logChild = depth(id) + 1;
while (id > 1) {
int parentId = id >>> 1;
byte val1 = value(id);
byte val2 = value(id ^ 1);
logChild -= 1; // in first iteration equals log, subsequently reduce 1 from logChild as we traverse up
// 两个子节点都释放了,则设置为初始值
if (val1 == logChild && val2 == logChild) {
setValue(parentId, (byte) (logChild - 1));
} else {
byte val = val1 < val2 ? val1 : val2;
setValue(parentId, val);
}
id = parentId;
}
}