Netty解析九:EventLoop的基类SingleThreadEventLoop
在解析NioEventLoopGroup中可以看到最终处理逻辑的是NioEventLoop,而且NioEventLoop是继承SingleThreadEventLoop,考虑先将SingleThreadEventLoop看明白。

可以看到SingleThreadEventLoop有众多子类。
解析

- ExecutorService: jdk线程池的基本接口;
- AbstractExecutorService: jdk线程池的基类封装大部分方法,保留execute方法给子类实现具体的执行逻辑;
- AbstractEventExecutor: Netty线程池的基类,实现了EventExecutor接口本身的方法;
- AbstractScheduledEventExecutor: 加入优先级队列,实现定时/延时执行;
- SingleThreadEventExecutor:加入任务队列,实现任务排队;所有任务的执行都在一个线程上;保留run方法给子类实现线程启动时如何执行;
- SingleThreadEventLoop: 实现EventLoopGroup接口的方法。
AbstractEventExecutor
Netty线程池的基类,实现了EventExecutor接口本身的方法。
1 | public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor { |
AbstractEventExecutor
加入优先级队列,实现定时/延时执行,具体的执行还是依赖子类execute方法的实现。

1 | public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor { |
SingleThreadEventExecutor
加入任务队列,实现任务排队;所有任务的执行都在一个线程上;保留run方法给子类实现线程启动时如何执行。
成员变量
1 | static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16, |
addTaskWakesUp变量会在代码中经常看到,这里专门解释一下:
由于SingleThreadEventExecutor是单线程的,所以所有任务的执行都需要这个线程去做,那么在线程等待的时候需要是否需要主动唤醒线程就是我们要考虑的,addTaskWakesUp就是在添加任务的时候是否会主动唤醒线程,如果会就是true,不然为false。
核心构造器
ThreadExecutorMap内部有FastThreadLocal
1 | protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, |
ThreadExecutorMap
1 | public static Executor apply(final Executor executor, final EventExecutor eventExecutor) { |
执行execute(Runnable task)
execute方法是线程池执行任务的核心方法,在父类AbstractEventExecutor中的延时/定时任务的执行也依赖于execute方法。

1 |
|
添加任务到队列addTask(Runnable task)
1 | protected void addTask(Runnable task) { |
开启线程startThread()
开启线程必须通过cas将state从ST_NOT_STARTED状态更新到ST_STARTED,通过这个状态保证了当前只能有一个线程启动。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
实际开启线程的方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70private void doStartThread() {
assert thread == null;
// 通过构造器中传入的executor来执行
executor.execute(new Runnable() {
public void run() {
// executor执行的当前线程
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
// 更新最后执行时间
updateLastExecutionTime();
try {
// 执行SingleThreadEventExecutor类的run方法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
// 这里会执行runAllTasks()完成所有任务
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
执行run()
在execute(Runnable task)方法中会执行startThread()->doStartThread()来开启线程,doStartThread()开启的线程会调用SingleThreadEventExecutor.this.run()来执行实际逻辑。
目前SingleThreadEventExecutor的run方法未实现留给子类实现。1
protected abstract void run();
优雅的关闭shutdownGracefully()
io.netty.util.concurrent.AbstractEventExecutor#shutdownGracefully实现了优雅的关闭,最终是调用的shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) ,而在io.netty.util.concurrent.SingleThreadEventExecutor中实现了shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)方法。
1 | static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2; |
shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)实现
1 |
|
1 |
|
获取任务takeTask()
获取任务会从定时/延迟任务队列scheduledTask和普通任务队列taskQueue中获取任务。
对于定时/延迟任务队列scheduledTask中达到执行时间的任务会通过fetchFromScheduledTaskQueue方法将任务从scheduledTask移动到taskQueue。
1 | protected Runnable takeTask() { |
SingleThreadEventLoop
SingleThreadEventLoop主要实现的是EventLoopGroup接口的方法。
成员变量
1 | public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop { |
构造器
因为SingleThreadEventLoop本身只有一个非static的成员变量,所以构造器都是依赖父类。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
}
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}
EventLoopGroup接口方法
这些方法主要目的还是实现链式编程。
register
1 |
|
afterRunningAllTasks
关闭前会执行confirmShutdown()->runAllTasks()->afterRunningAllTasks()
1 |
|