从DefaultThreadFactory到FastThreadLocalThread再到FastThreadLocalRunnable的解析过程中可以发现这些类的作用都是在为FastThreadLocal作处理。

所以本篇开始解析FastThreadLocal,FastThreadLocal是ThreadLocal的高性能实现。

FastThreadLocal总结

为了便于阅读将总结放到具体解析之前。

ThreadLocal

jdk的ThreadLocal实现底层使用的是数组来实现map,一个线程一个map(数组),不同ThreadLocal对象不同的threadLocalHashCode值;hash算法是序号对table取余【key.threadLocalHashCode & (table.length - 1)】,使用线性探测法来处理hash冲突。这就存在一个问题当hash冲突时,查询数据就需要遍历数组去查找,时间复杂度为O(n)。

FastThreadLocal

Netty的FastThreadLocal底层也是使用数组来实现map,一个线程一个map(数组),不同FastThreadLocal对象不同的索引值;直接根据唯一索引index进行定位数据,保证时间复杂度为O(1)。

下标是通过原子类nextIndex递增的,如果数组的下标很大数组也会很大,整体来说是以空间换时间。

FastThreadLocal没有使用弱引用,依赖于FastThreadLocalThread->FastThreadLocalRunnable,在FastThreadLocalRunnable的run方法中执行FastThreadLocal.removeAll来统一清理数据,所以没有内存泄漏的问题。

底层实现

解析

InternalThreadLocalMap

FastThreadLocal中完全依赖于InternalThreadLocalMap来实现ThreadLocal功能,所以要解析FastThreadLocal需要先解析InternalThreadLocalMap。

继承关系

InternalThreadLocalMap继承了UnpaddedInternalThreadLocalMap,UnpaddedInternalThreadLocalMap抽取了大部分的成员变量没有任何方法的实现。

1
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {}

UnpaddedInternalThreadLocalMap抽取了成员变量,没有任何方法实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class UnpaddedInternalThreadLocalMap {
// 支持ThreadLocal
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
static final AtomicInteger nextIndex = new AtomicInteger();

// 存储FastThreadLocal
Object[] indexedVariables;

// Core thread-locals
int futureListenerStackDepth;
int localChannelReaderStackDepth;
Map<Class<?>, Boolean> handlerSharableCache;
IntegerHolder counterHashCode;
ThreadLocalRandom random;
Map<Class<?>, TypeParameterMatcher> typeParameterMatcherGetCache;
Map<Class<?>, Map<String, TypeParameterMatcher>> typeParameterMatcherFindCache;

// String-related thread-locals
StringBuilder stringBuilder;
Map<Charset, CharsetEncoder> charsetEncoderCache;
Map<Charset, CharsetDecoder> charsetDecoderCache;

// ArrayList-related thread-locals
ArrayList<Object> arrayList;

UnpaddedInternalThreadLocalMap(Object[] indexedVariables) {
this.indexedVariables = indexedVariables;
}
}

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 默认数组长度
private static final int DEFAULT_ARRAY_LIST_INITIAL_CAPACITY = 8;
private static final int STRING_BUILDER_INITIAL_SIZE;
private static final int STRING_BUILDER_MAX_SIZE;

public static final Object UNSET = new Object();

private BitSet cleanerFlags;

static {
STRING_BUILDER_INITIAL_SIZE =
SystemPropertyUtil.getInt("io.netty.threadLocalMap.stringBuilder.initialSize", 1024);
logger.debug("-Dio.netty.threadLocalMap.stringBuilder.initialSize: {}", STRING_BUILDER_INITIAL_SIZE);

STRING_BUILDER_MAX_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalMap.stringBuilder.maxSize", 1024 * 4);
logger.debug("-Dio.netty.threadLocalMap.stringBuilder.maxSize: {}", STRING_BUILDER_MAX_SIZE);
}
// 缓存行
// Cache line padding (must be public)
// With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;

核心方法

get方法

InternalThreadLocalMap的get是返回当前线程的InternalThreadLocalMap

1
2
3
4
5
6
7
8
9
10
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
// 如果线程类是FastThreadLocalThread,则使用FastThreadLocalThread
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
// slow是使用jdk的ThreadLocal
return slowGet();
}
}

fastGet

从FastThreadLocalThread中取出InternalThreadLocalMap

1
2
3
4
5
6
7
8
9
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
// 从thread中取
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
if (threadLocalMap == null) {
// new一个
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}

slowGet

从UnpaddedInternalThreadLocalMap的ThreadLocal中取出InternalThreadLocalMap

1
2
3
4
5
6
7
8
9
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
remove方法

FastThreadLocalThread则设置thread的ThreadLocalMap为null,不然执行ThreadLocal(UnpaddedInternalThreadLocalMap.slowThreadLocalMap)的remove方法。

1
2
3
4
5
6
7
8
public static void remove() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
((FastThreadLocalThread) thread).setThreadLocalMap(null);
} else {
slowThreadLocalMap.remove();
}
}

destroy方法

执行slowThreadLocalMap的remove方法。

1
2
3
public static void destroy() {
slowThreadLocalMap.remove();
}

下一个变量序号nextVariableIndex

通过原子类对象nextIndex递增生成下一个序号。

1
2
3
4
5
6
7
8
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement();
if (index < 0) { // 溢出
nextIndex.decrementAndGet();
throw new IllegalStateException("too many thread-local indexed variables");
}
return index;
}

上一个变量序号lastVariableIndex
1
2
3
public static int lastVariableIndex() {
return nextIndex.get() - 1;
}
初始化变量数组newIndexedVariableTable

在InternalThreadLocalMap构造器中会执行newIndexedVariableTable方法初始化,用于初始化UnpaddedInternalThreadLocalMap的indexedVariables属性。

1
2
3
4
5
private static Object[] newIndexedVariableTable() {
Object[] array = new Object[32];
Arrays.fill(array, UNSET);
return array;
}

获取变量indexedVariable

根据变量序号从indexedVariables数组中获取变量。

1
2
3
4
5
public Object indexedVariable(int index) {
// 成员变量转局部变量
Object[] lookup = indexedVariables;
return index < lookup.length? lookup[index] : UNSET;
}
设置变量setIndexedVariable

在index处设置变量,如果该位置是第一次赋值则返回true,不然返回false

1
2
3
4
5
6
7
8
9
10
11
12
public boolean setIndexedVariable(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
// 扩容
expandIndexedVariableTableAndSet(index, value);
return true;
}
}

扩容并赋值expandIndexedVariableTableAndSet

位运算扩容到下一个2幂次的数,例如:2 扩容到 4, 3 扩容到 4,4扩容到8

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void expandIndexedVariableTableAndSet(int index, Object value) {
Object[] oldArray = indexedVariables;
final int oldCapacity = oldArray.length;
int newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity ++;

Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
newArray[index] = value;
indexedVariables = newArray;
}

移除序号上的元素removeIndexedVariable
1
2
3
4
5
6
7
8
9
10
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object v = lookup[index];
lookup[index] = UNSET;
return v;
} else {
return UNSET;
}
}
序号上是否设置元素isIndexedVariableSet

如果该序号设置了元素则返回true,不然返回false。

1
2
3
4
public boolean isIndexedVariableSet(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length && lookup[index] != UNSET;
}

设置清除标记setCleanerFlag

在cleanerFlags上设置清除标记

1
2
3
4
5
6
public void setCleanerFlag(int index) {
if (cleanerFlags == null) {
cleanerFlags = new BitSet();
}
cleanerFlags.set(index);
}

当前序号是否设置清除标记isCleanerFlagSet
1
2
3
public boolean isCleanerFlagSet(int index) {
return cleanerFlags != null && cleanerFlags.get(index);
}

构造器

InternalThreadLocalMap的构造器是私有的,只能通过静态的get方法或者getIfSet方法获取。

1
2
3
private InternalThreadLocalMap() {
super(newIndexedVariableTable());
}

FastThreadLocal

解析过InternalThreadLocalMap后再解析FastThreadLocal就会异常轻松。

类定义

1
public class FastThreadLocal<V> {}

成员变量

  • variablesToRemoveIndex:移除set所在的索引,这个与index变量一样指向InternalThreadLocalMap.indexedVariables;移除集合用于记录同一个线程中所有使用的FastThreadLocal对象;variablesToRemoveIndex的值为0;
  • index:InternalThreadLocalMap中变量数组的索引,这个位置用于存储这个线程的变量(ThreadLocal.set)。
1
2
private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
private final int index;

构造器

在构造器中会通过InternalThreadLocalMap的nextVariableIndex生成索引。

1
2
3
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}

核心方法

FastThreadLocal方法的实现都依赖于InternalThreadLocalMap,这里选择一些典型的方法做解析。

addToVariablesToRemove方法

将InternalThreadLocalMap加入到移除集合中;

【移除集合】与FastThreadLocal.set保存的变量一样保存在InternalThreadLocalMap.indexedVariables中,区别就是索引【序号】不一样。

1
2
3
4
5
6
7
8
9
10
11
12
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}

variablesToRemove.add(variable);
}
set方法
  1. 通过InternalThreadLocalMap获取当前线程的InternalThreadLocalMap;
  2. 通过index向InternalThreadLocalMap的变量数组中存入value
  3. 当前InternalThreadLocalMap加入移除set中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
setKnownNotUnset(threadLocalMap, value);
} else {
remove();
}
}
private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
if (threadLocalMap.setIndexedVariable(index, value)) {
// 将当前加入移除set中
addToVariablesToRemove(threadLocalMap, this);
}
}
get方法

通过构造器中生成的索引从InternalThreadLocalMap.indexedVariables数组中获取对象。

1
2
3
4
5
6
7
8
9
10
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
// 通过索引获取对象
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
// 初始化
return initialize(threadLocalMap);
}
initialize方法
  1. 执行初始化方法initialValue()赋值变量v,当前实现为null;
  2. 将变量v保存到InternalThreadLocalMap.indexedVariables数组index索引处;
  3. 当前InternalThreadLocalMap加入移除set中。
1
2
3
4
5
6
7
8
9
10
11
12
13
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
// v = null
v = initialValue();
} catch (Exception e) {
PlatformDependent.throwException(e);
}

threadLocalMap.setIndexedVariable(index, v);
addToVariablesToRemove(threadLocalMap, this);
return v;
}
移除所有变量removeAll

一个线程只有一个InternalThreadLocalMap对象,一个FastThreadLocal可以有多个InternalThreadLocalMap(不同线程),一个线程可以有多个FastThreadLocal对象。

removeAll()方法会在FastThreadLocalRunnable执行runnable后执行.

1
2
3
4
5
6
7
public void run() {
try {
runnable.run();
} finally {
FastThreadLocal.removeAll();
}
}

removeAll用于清理执行产生的数据。这些数据与当前线程绑定,所以要删除当前线程的InternalThreadLocalMap对象和多个FastThreadLocal中存储的变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void removeAll() {
// 获取当前线程的InternalThreadLocalMap
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}

try {
// 获取当前线程的FastThreadLocal集合
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
@SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[0]);
// 执行FastThreadLocal的remove方法会将FastThreadLocal移出variablesToRemove集合
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
tlv.remove(threadLocalMap);
}
}
} finally {
// 清理当前线程存储的threadLocalMap
InternalThreadLocalMap.remove();
}
}
移除remove
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
// 将this从移除队列中移除
Object v = threadLocalMap.removeIndexedVariable(index);
removeFromVariablesToRemove(threadLocalMap, this);

if (v != InternalThreadLocalMap.UNSET) {
try {
// 空实现,给子类扩展
onRemoval((V) v);
} catch (Exception e) {
PlatformDependent.throwException(e);
}
}
}

}
`