Netty的内存分配与JEMalloc类似,从大到小可以划分为:Arena, Chunk, Page, SubPage

其中核心的是Chunk,Chunk使用伙伴分配算法分配Chunk中的Page节点,而Page由更小的SubPage组成。

PoolChunk的伙伴分配算法

PoolChunk中有两个用数组实现的二叉树【memoryMap,depthMap】,初始时memoryMap,depthMap中存储的都是当前节点的高度,memoryMap用于存储分配信息,depthMap只是存储高度初始化后就不再改变。

左图表示每个节点的编号,节点从1开始,0被舍弃。这样节点的父子关系更容易计算:子节点加倍,父节点减半,比如512的子节点为1024=512 * 2。右图表示每个节点的高度,从0开始。在代表二叉树的数组中,左图中节点上的数字作为数组索引即id,右图节点上的数字作为值。初始状态时,memoryMap和depthMap相等。

当某个节点被使用时会将memoryMap中的节点高度值改为最大高度加一(maxOrder+1)而depthMap在修改后不变。如下图(节点序号|高度):

  1. 查找到4号节点;
  2. 将4号节点的高度设置成已使用(maxOrder+1);
  3. 依次将父节点的高度设置为其子节点的最小值。

这个改变的高度就是memoryMap数组中存储的值。

PoolChunk解析

继承关系

1
final class PoolChunk<T> implements PoolChunkMetric {}

PoolChunkMetric接口指定了3个指标方法:usage(), chunkSize(), freeBytes();

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Arena对象
final PoolArena<T> arena;
// 实际的内存存储对象,例如ByteBuf
final T memory;
// 是否池化
final boolean unpooled;
// 偏移量
final int offset;
// 二叉树存储,memoryMap分配信息,depthMap各节点初始树高
private final byte[] memoryMap;
private final byte[] depthMap;
// 子页
private final PoolSubpage<T>[] subpages;
// 子页mask,~(pageSize -1)大于pageSize的位为1,pageSize的位为0
// 需要分配容量 & subpageOverflowMask !=0 则容量要大于PageSize
// 判断分配请求为Tiny/Small即分配subpage
private final int subpageOverflowMask;
// 一个Page的大小, 默认8KB=8192
private final int pageSize;
// 从1开始左移到页大小的位置,默认13,1<<13 = 8192
private final int pageShifts;
// 最大树高,默认11
private final int maxOrder;
// chunk块大小,默认16MB
private final int chunkSize;
// log2(16MB) = 24
private final int log2ChunkSize;
// 可分配subpage的最大节点数即11层节点数,默认2048
private final int maxSubpageAllocs;
// 节点易分配(不可用)的标记值,maxOrder + 1
private final byte unusable;
// nioBuffer缓存双向队列
private final Deque<ByteBuffer> cachedNioBuffers;
// 剩余空间
private int freeBytes;
// PoolChunkList 用于Arena管理Chunk
PoolChunkList<T> parent;
PoolChunk<T> prev;
PoolChunk<T> next;

构造器

有两个构造器,一个普通(池化)构造器,一个非池化构造器。在构造器中有一些值的初始化是非常需要关注的。

普通(池化)构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {
unpooled = false;
this.arena = arena;
this.memory = memory;
this.pageSize = pageSize;
this.pageShifts = pageShifts;
this.maxOrder = maxOrder;
this.chunkSize = chunkSize;
this.offset = offset;
unusable = (byte) (maxOrder + 1); // 标记不可用的值
log2ChunkSize = log2(chunkSize); // chunk大小的幂
subpageOverflowMask = ~(pageSize - 1);
freeBytes = chunkSize;

assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
maxSubpageAllocs = 1 << maxOrder;

memoryMap = new byte[maxSubpageAllocs << 1];
depthMap = new byte[memoryMap.length];
int memoryMapIndex = 1;
// 初始化两个二叉树,值都是节点高度
for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
int depth = 1 << d;
for (int p = 0; p < depth; ++ p) {
memoryMap[memoryMapIndex] = (byte) d;
depthMap[memoryMapIndex] = (byte) d;
memoryMapIndex ++;
}
}
// new PoolSubpage[size]
subpages = newSubpageArray(maxSubpageAllocs);
cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
}

非池化构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
PoolChunk(PoolArena<T> arena, T memory, int size, int offset) {
unpooled = true;
this.arena = arena;
this.memory = memory;
this.offset = offset;
memoryMap = null;
depthMap = null;
subpages = null;
subpageOverflowMask = 0;
pageSize = 0;
pageShifts = 0;
maxOrder = 0;
unusable = (byte) (maxOrder + 1);
chunkSize = size;
log2ChunkSize = log2(chunkSize);
maxSubpageAllocs = 0;
cachedNioBuffers = null;
}

分配内存

分配内存的入口在allocate方法。

reqCapacity是实际需要的空间,normCapacity是PoolArena#normalizeCapacity方法计算的空间。例如reqCapacity要123b空间,不可能给123b空间,必然是大于这个数。因为chunk的空间大小划分是均分的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
final long handle;
// 如果需要分配的空间大于等于pageSize则执行allocateRun,不然执行allocateSubpage
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
handle = allocateRun(normCapacity);
} else {
handle = allocateSubpage(normCapacity);
}
// handle是分配的id,小于0则分配失败
if (handle < 0) {
return false;
}
// 从缓存中取出buffer
ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
// 初始化
initBuf(buf, nioBuffer, handle, reqCapacity);
return true;
}

allocateRun

allocateRun分配的空间大于等于pageSize的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
private long allocateRun(int normCapacity) {
// 最大树高 - (normCapacity的幂 - 从1开始左移到页大小的位置,默认13,1<<13 = 8192)
// d是满足normCapacity容量需要的树高
int d = maxOrder - (log2(normCapacity) - pageShifts);
// 根据树高分配节点
int id = allocateNode(d);
if (id < 0) {
return id;
}
// 计算剩余容量
freeBytes -= runLength(id);
return id;
}

  1. 计算这个空间需要的树高d;
  2. 分配对应的节点;
  3. 计算剩余空间freeBytes;
1
2
3
4
5
private int runLength(int id) {
// 很好理解, 2^24=16M, 而x层每一个节点的大小为2^(24-depth)
// 16M=2^(24-0),那么x层的节点大小为2^(24-depth(id)),比如depth=2,算出来2^22=4M
return 1 << log2ChunkSize - depth(id);
}

allocateNode

allocateNode分配满足树高的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private int allocateNode(int d) {
int id = 1;
// 小于等于1<<d的位为0,高于的位为1
int initial = - (1 << d); // has last d bits = 0 and rest all = 1
// 计算id为1的节点的实际分配情况
byte val = value(id);
if (val > d) { // unusable
return -1;
}
// 找到val == d的节点,且树高最大的节点
// (id & initial) == 0 用于父节点和字节点的树高值相同的情况去获取最底层(初始树高最大)的节点
// 父节点和字节点的树高值相同即有一个子节点已经被分配了
while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
// <<1 扩大2倍,即为查找当前节点的左节点
id <<= 1;
val = value(id);
if (val > d) {
// +1 当前节点的兄弟节点
id ^= 1;
val = value(id);
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
// 设置节点为已分配
setValue(id, unusable); // mark as unusable
// 更新父节点的树高信息
updateParentsAlloc(id);
return id;
}

  1. 从id=1开始判断树高是否满足;
  2. 获取到实际树高满足的节点;
  3. 将节点设置成已分配(unusable);
  4. 更新分配节点的父节点的树高;
1
2
3
4
5
6
7
8
9
10
11
private void updateParentsAlloc(int id) {
while (id > 1) {
int parentId = id >>> 1;
byte val1 = value(id);
byte val2 = value(id ^ 1);
// 父节点的树高取其子节点中的最小值
byte val = val1 < val2 ? val1 : val2;
setValue(parentId, val);
id = parentId;
}
}

循环更新父节点->父节点的父节点->…的树高信息,的树高取其子节点中的最小值。

allocateSubpage

allocateSubpage是分配小于pageSize的空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private long allocateSubpage(int normCapacity) {
// arena根据空间大小判断是tiny还是small,然后从对应的队列中返回PoolSubpage head
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
// 只能从叶子节点上查找
int d = maxOrder;
// 加锁
synchronized (head) {
// 分配节点
int id = allocateNode(d);
if (id < 0) {
return id;
}

final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;

freeBytes -= pageSize;
// 计算在subPage数组上的位置
// 得到叶子节点的偏移索引,从0开始,即2048-0,2049-1,...
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
// 将这个subpage插入到head后面
if (subpage == null) {
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
return subpage.allocate();
}
}
  1. 获取arena中的PoolSubpage链表的头;
  2. 加锁;
  3. 分配page节点(只能从叶子节点中分配);
  4. 计算剩余空间;
  5. 计算在subPage数组上的位置;
  6. 获取chunk的subpages中的SubPage对象;
  7. 如果SubPage对象存在则执行init,不然则new一个对象;
  8. 执行SubPage对象的allocate方法。
1
2
3
4
5
6
7
8
// 得到叶子节点的偏移索引,从0开始,即2048-0,2049-1,...
// maxSubpageAllocs = 1 << maxOrder
// 得到第11层节点的偏移索引,= id - 2048
// id < (1 << (maxOrder+1))
// 所以只会保留低于1 << maxOrder的位
private int subpageIdx(int memoryMapIdx) {
return memoryMapIdx ^ maxSubpageAllocs; // remove highest set bit, to get offset
}

initBuf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) {
// memoryMapIdx = (int)handle
int memoryMapIdx = memoryMapIdx(handle);
// bitmapIdx就是handle的高32位
int bitmapIdx = bitmapIdx(handle);
if (bitmapIdx == 0) { // 如果高32位0则为page,不然就是分配了subPage
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
// 偏移量:节点的偏移量(id ^ 1 << depth(id))*分配的大小 + offset
buf.init(this, nioBuffer, handle, runOffset(memoryMapIdx) + offset,
reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache());
} else {
initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx, reqCapacity);
}
}

subPage的初始化,两种的区别主要是最终buf初始化的偏移量不一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
long handle, int bitmapIdx, int reqCapacity) {
assert bitmapIdx != 0;

int memoryMapIdx = memoryMapIdx(handle);

PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage.doNotDestroy;
assert reqCapacity <= subpage.elemSize;

buf.init(
this, nioBuffer, handle,
偏移量:节点的偏移量(id ^ 1 << depth(id))*分配的大小+ subPage的偏移量*大小 + offset
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, arena.parent.threadCache());
}

内存释放

free

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void free(long handle, ByteBuffer nioBuffer) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);

if (bitmapIdx != 0) { // free a subpage
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage != null && subpage.doNotDestroy;

// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
synchronized (head) {
// subPage的free方法
if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
return;
}
}
}
freeBytes += runLength(memoryMapIdx);
// 将该节点的树高设置为初始值
setValue(memoryMapIdx, depth(memoryMapIdx));
// 更新父节点
updateParentsFree(memoryMapIdx);
// 将nioBuffer存入队列
if (nioBuffer != null && cachedNioBuffers != null &&
cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
cachedNioBuffers.offer(nioBuffer);
}
}

updateParentsFree

updateParentsFree更新父节点的val,如果两个子节点都被释放了则将父节点更新成初始高度,不然更新为两个子节点中的最小值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void updateParentsFree(int id) {
int logChild = depth(id) + 1;
while (id > 1) {
int parentId = id >>> 1;
byte val1 = value(id);
byte val2 = value(id ^ 1);
logChild -= 1; // in first iteration equals log, subsequently reduce 1 from logChild as we traverse up
// 两个子节点都释放了,则设置为初始值
if (val1 == logChild && val2 == logChild) {
setValue(parentId, (byte) (logChild - 1));
} else {
byte val = val1 < val2 ? val1 : val2;
setValue(parentId, val);
}

id = parentId;
}
}

参考

自顶向下深入分析Netty(十)–PoolChunk