在解析PoolChunk、PoolSubpage和PoolArena中会发现最终分配的内存和PooledByteBuf有密切的关系。

PooledByteBuf有几个实现类:PooledDirectByteBuf、PooledUnsafeDirectByteBuf、PooledHeapByteBuf、PooledUnsafeHeapByteBuf。

这些实现类从名字上就能看出它们的区别,Pooled代表池化的,Direct代表jvm外内存,Heap代表堆内存,Unsafe代表底层操作使用的是Unsafe类实现的(地址/指针操作)。

AbstractByteBuf

ByteBuf接口中的方法非常多就不解析了,主要是AbstractByteBuf抽象类。

AbstractByteBuf类通过读索引和写索引实现了可以同时读写的功能。

1
2
3
4
5
6
7
8
9
10
// 读索引
int readerIndex;
// 写索引
int writerIndex;
// 标记读索引,标记某个点,后续可以恢复到这个点
private int markedReaderIndex;
// 标记写索引,标记某个点,后续可以恢复到这个点
private int markedWriterIndex;
// 最大容量
private int maxCapacity;

AbstractReferenceCountedByteBuf

AbstractReferenceCountedByteBuf实现引用计数的功能。

PooledByteBuf

成员变量及集成关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
// 对象池的对象引用,通过Recycler.Handle实现对象池的功能,线程级的缓存
private final Recycler.Handle<PooledByteBuf<T>> recyclerHandle;
// 那个chunk分配的
protected PoolChunk<T> chunk;
// chunk分配内存后的handle(位置)
protected long handle;
// 实际内存区域(byte[]或者ByteBuffer)
protected T memory;
// 实际内存区域的开始偏移量
protected int offset;
// 长度
protected int length;
// 最大长度
int maxLength;
// 线程缓存
PoolThreadCache cache;
// 临时的Nio缓冲区
ByteBuffer tmpNioBuf;
// ByteBuf分配器
private ByteBufAllocator allocator;
}

构造器

1
2
3
4
5
protected PooledByteBuf(Recycler.Handle<? extends PooledByteBuf<T>> recyclerHandle, int maxCapacity) {
// 设置AbstractByteBuf的最大容量
super(maxCapacity);
this.recyclerHandle = (Handle<PooledByteBuf<T>>) recyclerHandle;
}

初始化init

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void init(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
init0(chunk, nioBuffer, handle, offset, length, maxLength, cache);
}

void initUnpooled(PoolChunk<T> chunk, int length) {
init0(chunk, null, 0, chunk.offset, length, length, null);
}

private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;

this.chunk = chunk;
// chunk的实际内存块
memory = chunk.memory;
tmpNioBuf = nioBuffer;
// 分配器
allocator = chunk.arena.parent;
this.cache = cache;
this.handle = handle;
this.offset = offset;
this.length = length;
this.maxLength = maxLength;
}

init方法是ByteBuf复用的核心方法,在Chunk分配内存后会调用该方法

buf.init(this, nioBuffer, handle, runOffset(memoryMapIdx) + offset,
reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache());
buf.init(this, nioBuffer, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, arena.parent.threadCache());

释放内存deallocate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
// 调用PoolArena的free方法释放内存
chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
tmpNioBuf = null;
chunk = null;
// ByteBuf对象回收,对象池的核心
recycle();
}
}

回收到对象池recycle()

将当前对象存储到线程ThreadLocal的栈中

1
2
3
private void recycle() {
recyclerHandle.recycle(this);
}

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}

Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
// 存入栈
stack.push(this);
}

PooledDirectByteBuf

继承关系及成员变量

1
2
3
4
5
6
7
8
9
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
// Recycler实现了对象池的功能,当对象池中没有满足的对象时会调用newObject方法构建新的对象。
private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
@Override
protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
return new PooledDirectByteBuf(handle, 0);
}
};
}

构造对象newInstance

newInstance通过对象池Recycler来获取对象,当对象池中没有满足的对象时会调用newObject方法构建新的对象。

1
2
3
4
5
6
7
8
9
static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
// 构造器私有
private PooledDirectByteBuf(Recycler.Handle<PooledDirectByteBuf> recyclerHandle, int maxCapacity) {
super(recyclerHandle, maxCapacity);
}

获取数据getBytes

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
checkDstIndex(index, length, dstIndex, dst.capacity());
// byte[]
if (dst.hasArray()) {
getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
} else if (dst.nioBufferCount() > 0) {
// ByteBuffer获取
for (ByteBuffer bb: dst.nioBuffers(dstIndex, length)) {
int bbLen = bb.remaining();
getBytes(index, bb);
index += bbLen;
}
} else {
dst.setBytes(dstIndex, this, index, length);
}
return this;
}

PooledHeapByteBuf

继承关系以及成员变量

1
2
3
4
5
6
7
8
9
class PooledHeapByteBuf extends PooledByteBuf<byte[]> {
// Recycler实现了对象池的功能,当对象池中没有满足的对象时会调用newObject方法构建新的对象。
private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
@Override
protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
return new PooledHeapByteBuf(handle, 0);
}
};
}

构造对象newInstance

newInstance通过对象池Recycler来获取对象,当对象池中没有满足的对象时会调用newObject方法构建新的对象。

1
2
3
4
5
6
7
8
9
static PooledHeapByteBuf newInstance(int maxCapacity) {
PooledHeapByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}

PooledHeapByteBuf(Recycler.Handle<? extends PooledHeapByteBuf> recyclerHandle, int maxCapacity) {
super(recyclerHandle, maxCapacity);
}

getBytes

1
2
3
4
5
6
 @Override
public final ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
checkDstIndex(index, length, dstIndex, dst.length);
System.arraycopy(memory, idx(index), dst, dstIndex, length);
return this;
}