在解析PoolChunk、PoolSubpage和PoolArena中会发现最终分配的内存和PooledByteBuf有密切的关系。

PooledByteBuf有几个实现类:PooledDirectByteBuf、PooledUnsafeDirectByteBuf、PooledHeapByteBuf、PooledUnsafeHeapByteBuf。
这些实现类从名字上就能看出它们的区别,Pooled代表池化的,Direct代表jvm外内存,Heap代表堆内存,Unsafe代表底层操作使用的是Unsafe类实现的(地址/指针操作)。
AbstractByteBuf
ByteBuf接口中的方法非常多就不解析了,主要是AbstractByteBuf抽象类。
AbstractByteBuf类通过读索引和写索引实现了可以同时读写的功能。
1 2 3 4 5 6 7 8 9 10
| int readerIndex;
int writerIndex;
private int markedReaderIndex;
private int markedWriterIndex;
private int maxCapacity;
|
AbstractReferenceCountedByteBuf
AbstractReferenceCountedByteBuf实现引用计数的功能。
PooledByteBuf
成员变量及集成关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf { private final Recycler.Handle<PooledByteBuf<T>> recyclerHandle; protected PoolChunk<T> chunk; protected long handle; protected T memory; protected int offset; protected int length; int maxLength; PoolThreadCache cache; ByteBuffer tmpNioBuf; private ByteBufAllocator allocator; }
|
构造器
1 2 3 4 5
| protected PooledByteBuf(Recycler.Handle<? extends PooledByteBuf<T>> recyclerHandle, int maxCapacity) { super(maxCapacity); this.recyclerHandle = (Handle<PooledByteBuf<T>>) recyclerHandle; }
|
初始化init
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| void init(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { init0(chunk, nioBuffer, handle, offset, length, maxLength, cache); }
void initUnpooled(PoolChunk<T> chunk, int length) { init0(chunk, null, 0, chunk.offset, length, length, null); }
private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { assert handle >= 0; assert chunk != null;
this.chunk = chunk; memory = chunk.memory; tmpNioBuf = nioBuffer; allocator = chunk.arena.parent; this.cache = cache; this.handle = handle; this.offset = offset; this.length = length; this.maxLength = maxLength; }
|
init方法是ByteBuf复用的核心方法,在Chunk分配内存后会调用该方法
buf.init(this, nioBuffer, handle, runOffset(memoryMapIdx) + offset,
reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache());
buf.init(this, nioBuffer, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, arena.parent.threadCache());
释放内存deallocate
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Override protected final void deallocate() { if (handle >= 0) { final long handle = this.handle; this.handle = -1; memory = null; chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache); tmpNioBuf = null; chunk = null; recycle(); } }
|
回收到对象池recycle()
将当前对象存储到线程ThreadLocal的栈中
1 2 3
| private void recycle() { recyclerHandle.recycle(this); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override public void recycle(Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle"); }
Stack<?> stack = this.stack; if (lastRecycledId != recycleId || stack == null) { throw new IllegalStateException("recycled already"); } stack.push(this); }
|
PooledDirectByteBuf
继承关系及成员变量
1 2 3 4 5 6 7 8 9
| final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> { private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() { @Override protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) { return new PooledDirectByteBuf(handle, 0); } }; }
|
构造对象newInstance
newInstance通过对象池Recycler来获取对象,当对象池中没有满足的对象时会调用newObject方法构建新的对象。
1 2 3 4 5 6 7 8 9
| static PooledDirectByteBuf newInstance(int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; }
private PooledDirectByteBuf(Recycler.Handle<PooledDirectByteBuf> recyclerHandle, int maxCapacity) { super(recyclerHandle, maxCapacity); }
|
获取数据getBytes
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Override public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { checkDstIndex(index, length, dstIndex, dst.capacity()); if (dst.hasArray()) { getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length); } else if (dst.nioBufferCount() > 0) { for (ByteBuffer bb: dst.nioBuffers(dstIndex, length)) { int bbLen = bb.remaining(); getBytes(index, bb); index += bbLen; } } else { dst.setBytes(dstIndex, this, index, length); } return this; }
|
PooledHeapByteBuf
继承关系以及成员变量
1 2 3 4 5 6 7 8 9
| class PooledHeapByteBuf extends PooledByteBuf<byte[]> { private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() { @Override protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) { return new PooledHeapByteBuf(handle, 0); } }; }
|
构造对象newInstance
newInstance通过对象池Recycler来获取对象,当对象池中没有满足的对象时会调用newObject方法构建新的对象。
1 2 3 4 5 6 7 8 9
| static PooledHeapByteBuf newInstance(int maxCapacity) { PooledHeapByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; }
PooledHeapByteBuf(Recycler.Handle<? extends PooledHeapByteBuf> recyclerHandle, int maxCapacity) { super(recyclerHandle, maxCapacity); }
|
getBytes
1 2 3 4 5 6
| @Override public final ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { checkDstIndex(index, length, dstIndex, dst.length); System.arraycopy(memory, idx(index), dst, dstIndex, length); return this; }
|