Netty解析二十八:Netty对象池Recycler
Netty对象缓存处处可见,通过继承Recycler类即可实现对象缓存。
Recycler的使用方式
定义一个拥有Recycler.Handle属性的类,并有方法执行对象回收:handle.recycle(this)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18static final class ThreadLocalUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
private final Handle<ThreadLocalUnsafeDirectByteBuf> handle;
private ThreadLocalUnsafeDirectByteBuf(Handle<ThreadLocalUnsafeDirectByteBuf> handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
}
protected void deallocate() {
if (capacity() > THREAD_LOCAL_BUFFER_SIZE) {
super.deallocate();
} else {
clear();
handle.recycle(this);
}
}
}
实现一个Recycler对象,会要求实现newObject方法,这个是缓存池中没有对象时调用该方法创建对象。1
2
3
4
5
6
7private static final Recycler<ThreadLocalUnsafeDirectByteBuf> RECYCLER =
new Recycler<ThreadLocalUnsafeDirectByteBuf>() {
protected ThreadLocalUnsafeDirectByteBuf newObject(Handle<ThreadLocalUnsafeDirectByteBuf> handle) {
return new ThreadLocalUnsafeDirectByteBuf(handle);
}
};
获取对象:1
ThreadLocalUnsafeDirectByteBuf buf = RECYCLER.get();
当对象使用完毕时,手动执行对象释放1
buf.deallocate()
注意deallocate()方法内部必须调用handle.recycle(this)。
Recycler原理解析
继承关系以及成员变量
1 | public abstract class Recycler<T> { |
静态代码块中初始化默认值1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45static {
// In the future, we might have different maxCapacity for different object types.
// e.g. io.netty.recycler.maxCapacity.writeTask
// io.netty.recycler.maxCapacity.outboundBuffer
int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
if (maxCapacityPerThread < 0) {
maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
}
DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;
MAX_SHARED_CAPACITY_FACTOR = max(2,
SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
2));
MAX_DELAYED_QUEUES_PER_THREAD = max(0,
SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread",
// We use the same value as default EventLoop number
NettyRuntime.availableProcessors() * 2));
LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
// By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
// This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
// bursts.
RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
if (logger.isDebugEnabled()) {
if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
logger.debug("-Dio.netty.recycler.ratio: disabled");
} else {
logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
}
}
INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
}
构造器
1 | protected Recycler() { |
获取对象get
1 | public final T get() { |
- 如果最大线程缓存数为0,即不允许缓存了,则执行newObject()构建信息对象;
- 通过threadLocal获取当前线程的栈缓存;
- 从栈中弹出栈顶元素DefaultHandle;
- 如果DefaultHandle为null:
- 通过栈创建新的DefaultHandle;
- 为新的DefaultHandle创建对象newObject();
- 返回handle.value。
返还对象recycle
Handle#recycle(Object)替代了该方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final boolean recycle(T o, Handle<T> handle) {
if (handle == NOOP_HANDLE) {
return false;
}
DefaultHandle<T> h = (DefaultHandle<T>) handle;
if (h.stack.parent != this) {
return false;
}
h.recycle(o);
return true;
}
- 如果handle是NOOP_HANDLE则返回false;
- 如果handler的stack所属的Recycle不是当前对象则返回false;
- 执行handler的recycle方法。
Recycler.DefaultHandle
DefaultHandle是Handle的默认实现。
Handle接口的核心是提供recycle方法的回调,在recycle方法中可以做资源回收。
- 当element出栈pop的时候会将recycleId和lastRecycledId设置为0;
- 当element存入栈的时候会将recycleId和lastRecycledId设置为OWN_THREAD_ID;
- 当element存入延迟队列的时候会将lastRecycledId设置为队列的id;
- 当element从延迟队列迁移到栈中时会将recycleId设置为lastRecycledId;
通过这两个状态可以判断是否被回收。
1 | static final class DefaultHandle<T> implements Handle<T> { |
DefaultHandle的核心就在于它的recycle方法,在recycle方法中会将this进行回收,将this入栈。
recycle方法:
- 校验回收的对象是否一致;
- 校验循环id;
- 将当前对象push进栈;
Recycler.Stack

这个栈有个特殊的结构:
- 如果对象获取的线程和回收的线程一致,则会执行push()->pushNow()将对象存入栈;
- 如果对象获取的线程和回收的线程不一致,则会在当前线程创建一个延迟队列,执行push()->pushLater()将对象存入延迟队列;且栈会持有该队列(链表存队列);
- 当获取对象时,先从栈中取对象;如果栈长为0,则会执行scavenge()->WeakOrderQueue.transfer()将持有的队列中的对象迁移到栈中,然后返回对象。
WeakOrderQueue实现了延迟队列,使得对象可以跨线程共享。
继承关系以及成员变量
1 | static final class Stack<T> { |
构造器
1 | Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor, |
newHandle
1 | DefaultHandle<T> newHandle() { |
入栈push
1 | void push(DefaultHandle<?> item) { |
- 如果当前线程与Stack中弱引用存储的线程一样,则立即入栈执行pushNow;
- 不然延迟入栈,执行pushLater;
Stack入栈实现了同步回收(同一个线程获取对象并回收),和异步回收(获取对象和回收对象的线程不一致)。
- 同步回收:直接将对象存入栈中;
- 异步回收:将对象存储延迟队列中,只有在Stack中没有对象时才会将Stack存储的延迟队列链表中的对象存入栈中。
立即入栈pushNow
1 | private void pushNow(DefaultHandle<?> item) { |
- 校验handle的循环id;
- 如果当前容量已经到上限,则释放并返回;
- 如果当前长度size和数组长度一样,则对数组进行两倍扩容;
- 将handle存入数组的末尾,并将size加一;
延迟入栈pushLater
当Stock中线程的弱引用与持有Stock的线程不一致时,就会执行pushLater将handle存储延迟队列中。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29private void pushLater(DefaultHandle<?> item, Thread thread) {
// Map是WeakHashMap,避免强引用
// 获取延迟队列
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
// 一个线程最大的延迟队列数
if (delayedRecycled.size() >= maxDelayedQueues) {
// Add a dummy queue so we know we should drop the object
// 舍弃这个handle
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// Check if we already reached the maximum number of delayed queues and if we can allocate at all.
// 分配一个队列
// 并且会将这个队列插入Stack的队列链表的head
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
// drop object
return;
}
// 延迟队列加入map
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object
return;
}
// 将handle加入延迟队列
queue.add(item);
}
- 获取延迟队列;
- 如果队列没有:
- 判断是否达到单个线程最大延迟队列数,如果是则舍弃这个handle,将单前栈的延迟队列设置为WeakOrderQueue.DUMMY;
- WeakOrderQueue.allocate分配一个队列,如果没有分配成功则return;
- 将新分配的延迟队列加入到map中;
- 如果队列是WeakOrderQueue.DUMMY,则return;
- 将handle加入到队列中。
栈顶出栈pop
1 | DefaultHandle<T> pop() { |
- 如果当前栈大小为0,则调用scavenge()到延迟队列中采集;
- 如果还是小于0则返回null;
- 栈大小减一;
- 将栈尾的数据去除;
- 校验循环id;
- 将循环id都设置为0,并返回。
收集scavenge
1 | boolean scavenge() { |
- 如果cursor没有,则取head;
- 如果head也没有,则返回false;
- 循环cursor不为null且!success:
- cursor.transfer将队列中的Handle存到Stack上,设置success为true;
- 如果cursor.owner.get() == null,线程被回收了:
- 如果cursor中还有数据,循环:
- cursor.transfer将队列中的Handle存到Stack上;
- 清空成功则设置success为true;
- 不然则break;
- cursor.transfer将队列中的Handle存到Stack上;
- 将cursor从链表中去除,前节点连接到后续节点;
- 如果cursor中还有数据,循环:
- 前置节点设为cursor;
- cursor设置为cursor的next节点;
- 更新this.prev = prev和this.cursor = cursor;
- 返回success。
Recycler.WeakOrderQueue
继承关系及成员变量
1 | private static final class WeakOrderQueue { |
WeakOrderQueue.Link
Link是队列中真正存储数据的地方,每个Link可以存储LINK_CAPACITY个对象,一个队列中的Link数为Head.availableSharedCapacity/LINK_CAPACITY
1 | static final class Link extends AtomicInteger { |
WeakOrderQueue.Head
Head维护availableSharedCapacity,来控制Link的数量。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37static final class Head {
private final AtomicInteger availableSharedCapacity;
Link link;
Head(AtomicInteger availableSharedCapacity) {
this.availableSharedCapacity = availableSharedCapacity;
}
/// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
protected void finalize() throws Throwable {
// ...代码省略了
}
// 将Link释放了,空间加回来
void reclaimSpace(int space) {
assert space >= 0;
availableSharedCapacity.addAndGet(space);
}
// 申请空间,用于创建Link,因为一个Link要占用LINK_CAPACITY大小
boolean reserveSpace(int space) {
return reserveSpace(availableSharedCapacity, space);
}
static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
assert space >= 0;
for (;;) {
int available = availableSharedCapacity.get();
if (available < space) {
return false;
}
if (availableSharedCapacity.compareAndSet(available, available - space)) {
return true;
}
}
}
}
分配队列allocate
1 | static WeakOrderQueue allocate(Stack<?> stack, Thread thread) { |
创建新队列newQueue
1 | static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) { |
添加add
1 | void add(DefaultHandle<?> handle) { |
- 将handle的lastRecycledId设置为队列的Id
- 如果writeIndex到了数组尾:
- 如果不能从availableSharedCapacity中减到LINK_CAPACITY,则退出;
- 不然,new Link(),writeIndex指向新Link的起始位置;
- 将handle存入tail.elements[writeIndex]
- 清空handle所属的Stack
- writeIndex加一
将队列数据存到栈中transfer
从Stock中获取对象时如果没有对象,则调用Stock.scavenge()将延迟队列中的对象迁到栈中。
transfer一次只会将一个Link中的对象迁移到栈中,所以Stock.scavenge()->scavengeSome()中会循环调用transfer方法。
1 | boolean transfer(Stack<?> dst) { |
- 如果全部读完了,则返回false;
- 不然就读下一个Link;
- 计算这个Link要读多少个元素;
- 栈扩容;
- 循环拷贝Link数组中的元素到栈中,并清空Link数组;
- 拷贝时校验recycleId和lastRecycledId;
- 并将element.recycleId = element.lastRecycledId,此时lastRecycledId为队列id;
- 释放link空间;
- 如果没有从队列中迁移对象到栈中,则返回false;
- 不然,更新栈长度,且放回true;