Netty对象缓存处处可见,通过继承Recycler类即可实现对象缓存。

Recycler的使用方式

定义一个拥有Recycler.Handle属性的类,并有方法执行对象回收:handle.recycle(this)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static final class ThreadLocalUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
private final Handle<ThreadLocalUnsafeDirectByteBuf> handle;

private ThreadLocalUnsafeDirectByteBuf(Handle<ThreadLocalUnsafeDirectByteBuf> handle) {
super(UnpooledByteBufAllocator.DEFAULT, 256, Integer.MAX_VALUE);
this.handle = handle;
}

@Override
protected void deallocate() {
if (capacity() > THREAD_LOCAL_BUFFER_SIZE) {
super.deallocate();
} else {
clear();
handle.recycle(this);
}
}
}

实现一个Recycler对象,会要求实现newObject方法,这个是缓存池中没有对象时调用该方法创建对象。

1
2
3
4
5
6
7
private static final Recycler<ThreadLocalUnsafeDirectByteBuf> RECYCLER =
new Recycler<ThreadLocalUnsafeDirectByteBuf>() {
@Override
protected ThreadLocalUnsafeDirectByteBuf newObject(Handle<ThreadLocalUnsafeDirectByteBuf> handle) {
return new ThreadLocalUnsafeDirectByteBuf(handle);
}
};

获取对象:

1
ThreadLocalUnsafeDirectByteBuf buf = RECYCLER.get();

当对象使用完毕时,手动执行对象释放

1
buf.deallocate()

注意deallocate()方法内部必须调用handle.recycle(this)。

Recycler原理解析

继承关系以及成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public abstract class Recycler<T> {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(Recycler.class);

@SuppressWarnings("rawtypes")
private static final Handle NOOP_HANDLE = new Handle() {
@Override
public void recycle(Object object) {
// NOOP
}
};
private static final AtomicInteger ID_GENERATOR = new AtomicInteger(Integer.MIN_VALUE);
private static final int OWN_THREAD_ID = ID_GENERATOR.getAndIncrement();
private static final int DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD = 4 * 1024; // Use 4k instances as default.
private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
private static final int INITIAL_CAPACITY;
private static final int MAX_SHARED_CAPACITY_FACTOR;
private static final int MAX_DELAYED_QUEUES_PER_THREAD;
private static final int LINK_CAPACITY;
private static final int RATIO;
// 每条线程最大容量
private final int maxCapacityPerThread;
//
private final int maxSharedCapacityFactor;
private final int ratioMask;
// 每个线程最大的延迟队列数量
private final int maxDelayedQueuesPerThread;

// FastThreadLocal存储栈,栈中存储对象
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
ratioMask, maxDelayedQueuesPerThread);
}

@Override
protected void onRemoval(Stack<T> value) {
// Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
if (value.threadRef.get() == Thread.currentThread()) {
if (DELAYED_RECYCLED.isSet()) {
DELAYED_RECYCLED.get().remove(value);
}
}
}
};
//
private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
@Override
protected Map<Stack<?>, WeakOrderQueue> initialValue() {
return new WeakHashMap<Stack<?>, WeakOrderQueue>();
}
};
}

静态代码块中初始化默认值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static {
// In the future, we might have different maxCapacity for different object types.
// e.g. io.netty.recycler.maxCapacity.writeTask
// io.netty.recycler.maxCapacity.outboundBuffer
int maxCapacityPerThread = SystemPropertyUtil.getInt("io.netty.recycler.maxCapacityPerThread",
SystemPropertyUtil.getInt("io.netty.recycler.maxCapacity", DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD));
if (maxCapacityPerThread < 0) {
maxCapacityPerThread = DEFAULT_INITIAL_MAX_CAPACITY_PER_THREAD;
}

DEFAULT_MAX_CAPACITY_PER_THREAD = maxCapacityPerThread;

MAX_SHARED_CAPACITY_FACTOR = max(2,
SystemPropertyUtil.getInt("io.netty.recycler.maxSharedCapacityFactor",
2));

MAX_DELAYED_QUEUES_PER_THREAD = max(0,
SystemPropertyUtil.getInt("io.netty.recycler.maxDelayedQueuesPerThread",
// We use the same value as default EventLoop number
NettyRuntime.availableProcessors() * 2));

LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));

// By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
// This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
// bursts.
RATIO = safeFindNextPositivePowerOfTwo(SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));

if (logger.isDebugEnabled()) {
if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: disabled");
logger.debug("-Dio.netty.recycler.linkCapacity: disabled");
logger.debug("-Dio.netty.recycler.ratio: disabled");
} else {
logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
logger.debug("-Dio.netty.recycler.maxSharedCapacityFactor: {}", MAX_SHARED_CAPACITY_FACTOR);
logger.debug("-Dio.netty.recycler.linkCapacity: {}", LINK_CAPACITY);
logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
}
}

INITIAL_CAPACITY = min(DEFAULT_MAX_CAPACITY_PER_THREAD, 256);
}

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
protected Recycler() {
this(DEFAULT_MAX_CAPACITY_PER_THREAD);
}

protected Recycler(int maxCapacityPerThread) {
this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);
}

protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
}

protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
int ratio, int maxDelayedQueuesPerThread) {
ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1;
if (maxCapacityPerThread <= 0) {
this.maxCapacityPerThread = 0;
this.maxSharedCapacityFactor = 1;
this.maxDelayedQueuesPerThread = 0;
} else {
this.maxCapacityPerThread = maxCapacityPerThread;
this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
}
}

获取对象get

1
2
3
4
5
6
7
8
9
10
11
12
public final T get() {
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
Stack<T> stack = threadLocal.get();
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
  1. 如果最大线程缓存数为0,即不允许缓存了,则执行newObject()构建信息对象;
  2. 通过threadLocal获取当前线程的栈缓存;
  3. 从栈中弹出栈顶元素DefaultHandle;
  4. 如果DefaultHandle为null:
    1. 通过栈创建新的DefaultHandle;
    2. 为新的DefaultHandle创建对象newObject();
  5. 返回handle.value。

返还对象recycle

Handle#recycle(Object)替代了该方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Deprecated
public final boolean recycle(T o, Handle<T> handle) {
if (handle == NOOP_HANDLE) {
return false;
}

DefaultHandle<T> h = (DefaultHandle<T>) handle;
if (h.stack.parent != this) {
return false;
}

h.recycle(o);
return true;
}

  1. 如果handle是NOOP_HANDLE则返回false;
  2. 如果handler的stack所属的Recycle不是当前对象则返回false;
  3. 执行handler的recycle方法。

Recycler.DefaultHandle

DefaultHandle是Handle的默认实现。

Handle接口的核心是提供recycle方法的回调,在recycle方法中可以做资源回收。

  • 当element出栈pop的时候会将recycleId和lastRecycledId设置为0;
  • 当element存入栈的时候会将recycleId和lastRecycledId设置为OWN_THREAD_ID;
  • 当element存入延迟队列的时候会将lastRecycledId设置为队列的id;
  • 当element从延迟队列迁移到栈中时会将recycleId设置为lastRecycledId;

通过这两个状态可以判断是否被回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static final class DefaultHandle<T> implements Handle<T> {
private int lastRecycledId;
private int recycleId;

boolean hasBeenRecycled;

private Stack<?> stack;
private Object value;

DefaultHandle(Stack<?> stack) {
this.stack = stack;
}

@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}

Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}

stack.push(this);
}
}

DefaultHandle的核心就在于它的recycle方法,在recycle方法中会将this进行回收,将this入栈。

recycle方法:

  1. 校验回收的对象是否一致;
  2. 校验循环id;
  3. 将当前对象push进栈;

Recycler.Stack

这个栈有个特殊的结构:

  • 如果对象获取的线程和回收的线程一致,则会执行push()->pushNow()将对象存入栈;
  • 如果对象获取的线程和回收的线程不一致,则会在当前线程创建一个延迟队列,执行push()->pushLater()将对象存入延迟队列;且栈会持有该队列(链表存队列);
  • 当获取对象时,先从栈中取对象;如果栈长为0,则会执行scavenge()->WeakOrderQueue.transfer()将持有的队列中的对象迁移到栈中,然后返回对象。

WeakOrderQueue实现了延迟队列,使得对象可以跨线程共享。

继承关系以及成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
static final class Stack<T> {
// 所属的Recycler
final Recycler<T> parent;
// 当前线程的弱引用
final WeakReference<Thread> threadRef;
// 可用技术
final AtomicInteger availableSharedCapacity;
final int maxDelayedQueues;
// 最大容量
private final int maxCapacity;
private final int ratioMask;
// 存储数组
private DefaultHandle<?>[] elements;
// 当前大小
private int size;
private int handleRecycleCount = -1; // Start with -1 so the first one will be recycled.
private WeakOrderQueue cursor, prev;
private volatile WeakOrderQueue head;
}

构造器

1
2
3
4
5
6
7
8
9
10
11
12
 Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
int ratioMask, int maxDelayedQueues) {
this.parent = parent;
// 弱引用
threadRef = new WeakReference<Thread>(thread);
this.maxCapacity = maxCapacity;
// 计数器
availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
this.ratioMask = ratioMask;
this.maxDelayedQueues = maxDelayedQueues;
}

newHandle

1
2
3
DefaultHandle<T> newHandle() {
return new DefaultHandle<T>(this);
}

入栈push

1
2
3
4
5
6
7
8
9
10
11
12
13
void push(DefaultHandle<?> item) {
// 校验线程是否一致
Thread currentThread = Thread.currentThread();
if (threadRef.get() == currentThread) {
// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
pushNow(item);
} else {
// The current Thread is not the one that belongs to the Stack
// (or the Thread that belonged to the Stack was collected already), we need to signal that the push
// happens later.
pushLater(item, currentThread);
}
}
  1. 如果当前线程与Stack中弱引用存储的线程一样,则立即入栈执行pushNow;
  2. 不然延迟入栈,执行pushLater;

Stack入栈实现了同步回收(同一个线程获取对象并回收),和异步回收(获取对象和回收对象的线程不一致)。

  • 同步回收:直接将对象存入栈中;
  • 异步回收:将对象存储延迟队列中,只有在Stack中没有对象时才会将Stack存储的延迟队列链表中的对象存入栈中。

立即入栈pushNow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void pushNow(DefaultHandle<?> item) {
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
//
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;

int size = this.size;
// 存不下了释放
if (size >= maxCapacity || dropHandle(item)) {
// Hit the maximum capacity or should drop - drop the possibly youngest object.
return;
}
// 数组两倍扩容
if (size == elements.length) {
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
// 存起来
elements[size] = item;
this.size = size + 1;
}
  1. 校验handle的循环id;
  2. 如果当前容量已经到上限,则释放并返回;
  3. 如果当前长度size和数组长度一样,则对数组进行两倍扩容;
  4. 将handle存入数组的末尾,并将size加一;

延迟入栈pushLater

当Stock中线程的弱引用与持有Stock的线程不一致时,就会执行pushLater将handle存储延迟队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void pushLater(DefaultHandle<?> item, Thread thread) {
// Map是WeakHashMap,避免强引用
// 获取延迟队列
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
// 一个线程最大的延迟队列数
if (delayedRecycled.size() >= maxDelayedQueues) {
// Add a dummy queue so we know we should drop the object
// 舍弃这个handle
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// Check if we already reached the maximum number of delayed queues and if we can allocate at all.
// 分配一个队列
// 并且会将这个队列插入Stack的队列链表的head
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
// drop object
return;
}
// 延迟队列加入map
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object
return;
}
// 将handle加入延迟队列
queue.add(item);
}

  1. 获取延迟队列;
  2. 如果队列没有:
    1. 判断是否达到单个线程最大延迟队列数,如果是则舍弃这个handle,将单前栈的延迟队列设置为WeakOrderQueue.DUMMY;
    2. WeakOrderQueue.allocate分配一个队列,如果没有分配成功则return;
    3. 将新分配的延迟队列加入到map中;
  3. 如果队列是WeakOrderQueue.DUMMY,则return;
  4. 将handle加入到队列中。

栈顶出栈pop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DefaultHandle<T> pop() {
int size = this.size;
if (size == 0) {
// 如果没有则从延迟队列中采集
if (!scavenge()) {
return null;
}
size = this.size;
if (size <= 0) {
// double check, avoid races
return null;
}
}
// 从栈尾去除对象
size --;
DefaultHandle ret = elements[size];
elements[size] = null;
// 校验循环id
// 并发
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}
  1. 如果当前栈大小为0,则调用scavenge()到延迟队列中采集;
    1. 如果还是小于0则返回null;
  2. 栈大小减一;
  3. 将栈尾的数据去除;
  4. 校验循环id;
  5. 将循环id都设置为0,并返回。

收集scavenge

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
boolean scavenge() {
// continue an existing scavenge, if any
if (scavengeSome()) {
return true;
}

// reset our scavenge cursor
prev = null;
cursor = head;
return false;
}

boolean scavengeSome() {
WeakOrderQueue prev;
WeakOrderQueue cursor = this.cursor;
if (cursor == null) {
prev = null;
cursor = head;
if (cursor == null) {
return false;
}
} else {
prev = this.prev;
}

boolean success = false;
do {
if (cursor.transfer(this)) {
success = true;
break;
}
WeakOrderQueue next = cursor.next;
// 线程引用没有了,线程被回收了
if (cursor.owner.get() == null) {
// If the thread associated with the queue is gone, unlink it, after
// performing a volatile read to confirm there is no data left to collect.
// We never unlink the first queue, as we don't want to synchronize on updating the head.
if (cursor.hasFinalData()) {
for (;;) {
if (cursor.transfer(this)) {
success = true;
} else {
break;
}
}
}

if (prev != null) {
prev.setNext(next);
}
} else {
prev = cursor;
}

cursor = next;

} while (cursor != null && !success);

this.prev = prev;
this.cursor = cursor;
return success;
}
  1. 如果cursor没有,则取head;
  2. 如果head也没有,则返回false;
  3. 循环cursor不为null且!success:
    1. cursor.transfer将队列中的Handle存到Stack上,设置success为true;
    2. 如果cursor.owner.get() == null,线程被回收了:
      1. 如果cursor中还有数据,循环:
        1. cursor.transfer将队列中的Handle存到Stack上;
          1. 清空成功则设置success为true;
          2. 不然则break;
      2. 将cursor从链表中去除,前节点连接到后续节点;
    3. 前置节点设为cursor;
    4. cursor设置为cursor的next节点;
  4. 更新this.prev = prev和this.cursor = cursor;
  5. 返回success。

Recycler.WeakOrderQueue

继承关系及成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static final class WeakOrderQueue {
// 当一个线程的延迟队列数达到上限,就不允许handle再创建了,此时设置为DUMMY
// DUMMY队列不会添加任何数据
static final WeakOrderQueue DUMMY = new WeakOrderQueue();
// 队列的头部,Head其实是实际的队列,存储了队列的总容量availableSharedCapacity
// 队列存储的数据是Head中的Link节点,Link节点构成链表
// 每个Link节点有长度为16的数组,当Link存储达到上限是会从Head的availableSharedCapacity划去16长度
// 然后创建新的Link节点
private final Head head;
private Link tail;
// pointer to another queue of delayed items for the same stack
// 队列本身是链表的节点
private WeakOrderQueue next;
// 持有线程的弱引用
private final WeakReference<Thread> owner;
// 唯一id,每个队列的id都不一样
private final int id = ID_GENERATOR.getAndIncrement();
}

Link是队列中真正存储数据的地方,每个Link可以存储LINK_CAPACITY个对象,一个队列中的Link数为Head.availableSharedCapacity/LINK_CAPACITY

1
2
3
4
5
6
static final class Link extends AtomicInteger {
private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];

private int readIndex;
Link next;
}

WeakOrderQueue.Head

Head维护availableSharedCapacity,来控制Link的数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
static final class Head {
private final AtomicInteger availableSharedCapacity;

Link link;

Head(AtomicInteger availableSharedCapacity) {
this.availableSharedCapacity = availableSharedCapacity;
}

/// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
@Override
protected void finalize() throws Throwable {
// ...代码省略了
}
// 将Link释放了,空间加回来
void reclaimSpace(int space) {
assert space >= 0;
availableSharedCapacity.addAndGet(space);
}
// 申请空间,用于创建Link,因为一个Link要占用LINK_CAPACITY大小
boolean reserveSpace(int space) {
return reserveSpace(availableSharedCapacity, space);
}

static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
assert space >= 0;
for (;;) {
int available = availableSharedCapacity.get();
if (available < space) {
return false;
}
if (availableSharedCapacity.compareAndSet(available, available - space)) {
return true;
}
}
}
}

分配队列allocate

1
2
3
4
5
static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
// We allocated a Link so reserve the space
return Head.reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
? newQueue(stack, thread) : null;
}

创建新队列newQueue

1
2
3
4
5
6
7
8
9
static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
// Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
// may be accessed while its still constructed.
// 将队列插入的栈的队列链表中
stack.setHead(queue);

return queue;
}

添加add

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void add(DefaultHandle<?> handle) {
// 将handle的lastRecycledId设置为队列的Id
handle.lastRecycledId = id;
// 队尾
Link tail = this.tail;
int writeIndex;
// LINK_CAPACITY = 16
if ((writeIndex = tail.get()) == LINK_CAPACITY) {
// 是否到队里存储上限availableSharedCapacity
// 如果能从availableSharedCapacity中减到LINK_CAPACITY,就可以继续存
if (!head.reserveSpace(LINK_CAPACITY)) {
// Drop it.
return;
}
// We allocate a Link so reserve the space
// 划分到了LINK_CAPACITY空间就可以创建Link节点了
this.tail = tail = tail.next = new Link();
// 写指针更新为新的节点
writeIndex = tail.get();
}
// 尾节点存入handle
tail.elements[writeIndex] = handle;
// 清空handle所属的Stack
handle.stack = null;
// we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
// this also means we guarantee visibility of an element in the queue if we see the index updated
// 更新index,lazySet是原子类的UNSAFE的方法
// 用于更新,相对于普通的更新,它少了一部分内存屏障(store-load),提升了性能
// 缺点是其他线程看到数据变更会有延迟
tail.lazySet(writeIndex + 1);
}
  1. 将handle的lastRecycledId设置为队列的Id
  2. 如果writeIndex到了数组尾:
    1. 如果不能从availableSharedCapacity中减到LINK_CAPACITY,则退出;
    2. 不然,new Link(),writeIndex指向新Link的起始位置;
  3. 将handle存入tail.elements[writeIndex]
  4. 清空handle所属的Stack
  5. writeIndex加一

将队列数据存到栈中transfer

从Stock中获取对象时如果没有对象,则调用Stock.scavenge()将延迟队列中的对象迁到栈中。

transfer一次只会将一个Link中的对象迁移到栈中,所以Stock.scavenge()->scavengeSome()中会循环调用transfer方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
boolean transfer(Stack<?> dst) {
Link head = this.head.link;
if (head == null) {
return false;
}

if (head.readIndex == LINK_CAPACITY) {
// 读完了
if (head.next == null) {
return false;
}
// 读下一个link
this.head.link = head = head.next;
// 是否空间
this.head.reclaimSpace(LINK_CAPACITY);
}

final int srcStart = head.readIndex;
int srcEnd = head.get();
final int srcSize = srcEnd - srcStart;
if (srcSize == 0) {
return false;
}

final int dstSize = dst.size;
// Stack需要的容量
final int expectedCapacity = dstSize + srcSize;

if (expectedCapacity > dst.elements.length) {
// 栈进行扩容
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}

if (srcStart != srcEnd) {
final DefaultHandle[] srcElems = head.elements;
final DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
// 将link中的数组拷贝到栈的数组上
for (int i = srcStart; i < srcEnd; i++) {
DefaultHandle element = srcElems[i];
// 更新recycleId
// 当element出栈pop的时候会将recycleId和lastRecycledId设置为0
// 当element存入栈的时候会将recycleId和lastRecycledId设置为OWN_THREAD_ID
// 当element存入延迟队列的时候会将lastRecycledId设置为队列的id
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
// 清空link的栈
srcElems[i] = null;
// 循环使用超过一定次数就会放弃回收该对象
if (dst.dropHandle(element)) {
// Drop the object.
continue;
}
element.stack = dst;
// 存到栈
dstElems[newDstSize ++] = element;
}
// 如果整个link都被清除了,而且有后续节点,就释放空间
if (srcEnd == LINK_CAPACITY && head.next != null) {
// Add capacity back as the Link is GCed.
this.head.reclaimSpace(LINK_CAPACITY);
this.head.link = head.next;
}
head.readIndex = srcEnd;
// 没有从队列中将对象存到栈中
if (dst.size == newDstSize) {
return false;
}
// 更新新的长度
dst.size = newDstSize;
return true;
} else {
// The destination stack is full already.
return false;
}
}
}
  1. 如果全部读完了,则返回false;
  2. 不然就读下一个Link;
  3. 计算这个Link要读多少个元素;
  4. 栈扩容;
  5. 循环拷贝Link数组中的元素到栈中,并清空Link数组;
  6. 拷贝时校验recycleId和lastRecycledId;
  7. 并将element.recycleId = element.lastRecycledId,此时lastRecycledId为队列id;
  8. 释放link空间;
  9. 如果没有从队列中迁移对象到栈中,则返回false;
  10. 不然,更新栈长度,且放回true;