在构建NioEventLoopGroup需要传入线程池Executor,默认是使用它的父类MultithreadEventLoopGroup.newDefaultThreadFactory()方法返回的DefaultThreadFactory。

所以本篇讲解DefaultThreadFactory与FastThreadLocalThread。

解析

线程工厂类DefaultThreadFactory

在MultithreadEventLoopGroup中封装了一个DefaultThreadFactory的构建。

1
2
3
4
5
public final static int MAX_PRIORITY = 10;
@Override
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
}

继承关系

DefaultThreadFactory实现了ThreadFactory接口。

1
public class DefaultThreadFactory implements ThreadFactory {}

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
// 线程池下一个id
private static final AtomicInteger poolId = new AtomicInteger();
// 下一个线程id
private final AtomicInteger nextId = new AtomicInteger();
// 线程名前缀(包含线程池id)
private final String prefix;
// 是否守护线程
private final boolean daemon;
// 优先级
private final int priority;
//jdk线程组
protected final ThreadGroup threadGroup;

构造器

1
2
3
4
5
6
7
public DefaultThreadFactory(Class<?> poolType, int priority) {
this(poolType, false, priority);
}
public DefaultThreadFactory(String poolName, boolean daemon, int priority) {
this(poolName, daemon, priority, System.getSecurityManager() == null ?
Thread.currentThread().getThreadGroup() : System.getSecurityManager().getThreadGroup());
}

核心构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public DefaultThreadFactory(String poolName, boolean daemon, int priority, ThreadGroup threadGroup) {
if (poolName == null) {
throw new NullPointerException("poolName");
}
if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
"priority: " + priority + " (expected: Thread.MIN_PRIORITY <= priority <= Thread.MAX_PRIORITY)");
}
// 构造线程名前缀
prefix = poolName + '-' + poolId.incrementAndGet() + '-';
this.daemon = daemon;
this.priority = priority;
this.threadGroup = threadGroup;
}

newThread方法

Thread newThread(Runnable r) 方法是ThreadFactory接口中定义用于生产线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public Thread newThread(Runnable r) {
// 调用下面的newThread方法构建线程对象
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
try {
// 设置守护线程和优先级
if (t.isDaemon() != daemon) {
t.setDaemon(daemon);
}

if (t.getPriority() != priority) {
t.setPriority(priority);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
}
return t;
}

DefaultThreadFactory中返回的是FastThreadLocalThread,FastThreadLocalThread对于ThreadLocal做了优化。

1
2
3
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}

线程类FastThreadLocalThread

FastThreadLocalThread继承了Thread类,是一个特殊的Thread提供了访问更快的FastThreadLocal

FastThreadLocalThread中传入的runnable都会使用FastThreadLocalRunnable.wrap(target)封装成FastThreadLocalRunnable。

成员变量

1
2
3
4
// 线程执行runnable之后是否需要清理FastThreadLocal
private final boolean cleanupFastThreadLocals;
// 存储线程的map
private InternalThreadLocalMap threadLocalMap;

构造器

FastThreadLocalThread有许多构造器,都依赖父类Thread,简单列举几个。

1
2
3
4
5
6
7
8
9
10
11
public FastThreadLocalThread() {
cleanupFastThreadLocals = false;
}
public FastThreadLocalThread(Runnable target) {
super(FastThreadLocalRunnable.wrap(target));
cleanupFastThreadLocals = true;
}
public FastThreadLocalThread(Runnable target, String name) {
super(FastThreadLocalRunnable.wrap(target), name);
cleanupFastThreadLocals = true;
}

其他方法

剩下的方法可以理解为两个成员的get和set方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final InternalThreadLocalMap threadLocalMap() {
return threadLocalMap;
}

public final void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
this.threadLocalMap = threadLocalMap;
}

@UnstableApi
public boolean willCleanupFastThreadLocals() {
return cleanupFastThreadLocals;
}

@UnstableApi
public static boolean willCleanupFastThreadLocals(Thread thread) {
return thread instanceof FastThreadLocalThread &&
((FastThreadLocalThread) thread).willCleanupFastThreadLocals();
}

FastThreadLocalRunnable

FastThreadLocalRunnable实现了Runnable接口,功能是在实际的runnable对象的run()方法执行完成后执行FastThreadLocal.removeAll();

成员变量

1
private final Runnable runnable;

构造器

1
2
3
private FastThreadLocalRunnable(Runnable runnable) {
this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
}

run()方法

在真正的runnable方法执行完成后执行FastThreadLocal.removeAll()

1
2
3
4
5
6
7
8
@Override
public void run() {
try {
runnable.run();
} finally {
FastThreadLocal.removeAll();
}
}

wrap(Runnable runnable)方法

如果runnable对象不是FastThreadLocalRunnable类,则封装成FastThreadLocalRunnable。

1
2
3
static Runnable wrap(Runnable runnable) {
return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
}

`