在解析NioEventLoopGroup中可以看到最终处理逻辑的是NioEventLoop,而且NioEventLoop是继承SingleThreadEventLoop,考虑先将SingleThreadEventLoop看明白。

SingleThreadEventLoop的子类

可以看到SingleThreadEventLoop有众多子类。

解析

SingleThreadEventLoop的继承关系

  • ExecutorService: jdk线程池的基本接口;
  • AbstractExecutorService: jdk线程池的基类封装大部分方法,保留execute方法给子类实现具体的执行逻辑;
  • AbstractEventExecutor: Netty线程池的基类,实现了EventExecutor接口本身的方法;
  • AbstractScheduledEventExecutor: 加入优先级队列,实现定时/延时执行;
  • SingleThreadEventExecutor:加入任务队列,实现任务排队;所有任务的执行都在一个线程上;保留run方法给子类实现线程启动时如何执行;
  • SingleThreadEventLoop: 实现EventLoopGroup接口的方法。

AbstractEventExecutor

Netty线程池的基类,实现了EventExecutor接口本身的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractEventExecutor.class);

static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;

private final EventExecutorGroup parent;
private final Collection<EventExecutor> selfCollection = Collections.<EventExecutor>singleton(this);

@Override
public EventExecutorGroup parent() {
return parent;
}

@Override
public boolean inEventLoop() {
// 由子类实现inEventLoop(Thread)
return inEventLoop(Thread.currentThread());
}
@Override
public <V> Promise<V> newPromise() {
return new DefaultPromise<V>(this);
}

@Override
public <V> ProgressivePromise<V> newProgressivePromise() {
return new DefaultProgressivePromise<V>(this);
}

@Override
public <V> Future<V> newSucceededFuture(V result) {
return new SucceededFuture<V>(this, result);
}

@Override
public <V> Future<V> newFailedFuture(Throwable cause) {
return new FailedFuture<V>(this, cause);
}

@Override
public Future<?> submit(Runnable task) {
return (Future<?>) super.submit(task);
}
}

AbstractEventExecutor

加入优先级队列,实现定时/延时执行,具体的执行还是依赖子类execute方法的实现。

schedule执行逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
// 比较器
private static final Comparator<ScheduledFutureTask<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
new Comparator<ScheduledFutureTask<?>>() {
@Override
public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
return o1.compareTo(o2);
}
};
// 处理延迟任务的优先级队列
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
// 队列初始化
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
SCHEDULED_FUTURE_TASK_COMPARATOR,
// Use same initial capacity as java.util.PriorityQueue
11);
}
return scheduledTaskQueue;
}
// 执行延迟任务
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
delay = 0;
}
validateScheduled0(delay, unit);

return schedule(new ScheduledFutureTask<Void>(
this, command, deadlineNanos(unit.toNanos(delay))));
}
// 执行延时/定时任务的总入口
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
// 在循环中则直接加入任务
scheduledTaskQueue().add(task.setId(nextTaskId++));
} else {
// 直接执行任务也是加入队列中
executeScheduledRunnable(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task.setId(nextTaskId++));
}
}, true, task.deadlineNanos());
}

return task;
}

void executeScheduledRunnable(Runnable runnable,
@SuppressWarnings("unused") boolean isAddition,
@SuppressWarnings("unused") long deadlineNanos) {
execute(runnable);
}
}

SingleThreadEventExecutor

加入任务队列,实现任务排队;所有任务的执行都在一个线程上;保留run方法给子类实现线程启动时如何执行。

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));

private static final InternalLogger logger =
InternalLoggerFactory.getInstance(SingleThreadEventExecutor.class);
// 状态
private static final int ST_NOT_STARTED = 1;
private static final int ST_STARTED = 2;
private static final int ST_SHUTTING_DOWN = 3;
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
// 空任务
private static final Runnable WAKEUP_TASK = ()->{// Do nothing.};
private static final Runnable NOOP_TASK = ()->{// Do nothing.};

private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");
private static final AtomicReferenceFieldUpdater<SingleThreadEventExecutor, ThreadProperties> PROPERTIES_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(
SingleThreadEventExecutor.class, ThreadProperties.class, "threadProperties");
// 任务队列
private final Queue<Runnable> taskQueue;
// 运行线程
private volatile Thread thread;
@SuppressWarnings("unused")
private volatile ThreadProperties threadProperties;
private final Executor executor;
private volatile boolean interrupted;

private final CountDownLatch threadLock = new CountDownLatch(1);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>();
private final boolean addTaskWakesUp;
private final int maxPendingTasks;
private final RejectedExecutionHandler rejectedExecutionHandler;
// 最后执行时间
private long lastExecutionTime;

@SuppressWarnings({ "FieldMayBeFinal", "unused" })
private volatile int state = ST_NOT_STARTED;

private volatile long gracefulShutdownQuietPeriod;
private volatile long gracefulShutdownTimeout;
private long gracefulShutdownStartTime;

private final Promise<?> terminationFuture = new DefaultPromise<Void>(GlobalEventExecutor.INSTANCE);

addTaskWakesUp变量会在代码中经常看到,这里专门解释一下:

由于SingleThreadEventExecutor是单线程的,所以所有任务的执行都需要这个线程去做,那么在线程等待的时候需要是否需要主动唤醒线程就是我们要考虑的,addTaskWakesUp就是在添加任务的时候是否会主动唤醒线程,如果会就是true,不然为false。

核心构造器

ThreadExecutorMap内部有FastThreadLocal用于在当前线程存放执行的EventExecutor

1
2
3
4
5
6
7
8
9
10
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ThreadExecutorMap.apply(executor, this);
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

ThreadExecutorMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
// 将Executor封装成自己的
return new Executor() {
@Override
public void execute(final Runnable command) {
executor.execute(apply(command, eventExecutor));
}
};
}

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
// 先设置eventExecutor
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
// 执行完成后清除
setCurrentEventExecutor(null);
}
}
};
}

执行execute(Runnable task)

execute方法是线程池执行任务的核心方法,在父类AbstractEventExecutor中的延时/定时任务的执行也依赖于execute方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

boolean inEventLoop = inEventLoop();
// 先添加到任务队列taskQueue
addTask(task);
if (!inEventLoop) { // 不在事件循环中
startThread(); // 开启线程
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

添加任务到队列addTask(Runnable task)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}

final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
// 添加到队列
return taskQueue.offer(task);
}

开启线程startThread()

开启线程必须通过cas将state从ST_NOT_STARTED状态更新到ST_STARTED,通过这个状态保证了当前只能有一个线程启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}

实际开启线程的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private void doStartThread() {
assert thread == null;
// 通过构造器中传入的executor来执行
executor.execute(new Runnable() {
@Override
public void run() {
// executor执行的当前线程
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}

boolean success = false;
// 更新最后执行时间
updateLastExecutionTime();
try {
// 执行SingleThreadEventExecutor类的run方法
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}

// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}

try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
// 这里会执行runAllTasks()完成所有任务
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();

STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.countDown();
if (logger.isWarnEnabled() && !taskQueue.isEmpty()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}

执行run()

在execute(Runnable task)方法中会执行startThread()->doStartThread()来开启线程,doStartThread()开启的线程会调用SingleThreadEventExecutor.this.run()来执行实际逻辑。

目前SingleThreadEventExecutor的run方法未实现留给子类实现。

1
protected abstract void run();

优雅的关闭shutdownGracefully()

io.netty.util.concurrent.AbstractEventExecutor#shutdownGracefully实现了优雅的关闭,最终是调用的shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) ,而在io.netty.util.concurrent.SingleThreadEventExecutor中实现了shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)方法。

1
2
3
4
5
6
static final long DEFAULT_SHUTDOWN_QUIET_PERIOD = 2;
static final long DEFAULT_SHUTDOWN_TIMEOUT = 15;
@Override
public Future<?> shutdownGracefully() {
return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
}

shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)实现

1
2
3
4
@Override
public boolean isShuttingDown() {
return state >= ST_SHUTTING_DOWN;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
if (quietPeriod < 0) {
throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
}
if (timeout < quietPeriod) {
throw new IllegalArgumentException(
"timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
}
if (unit == null) {
throw new NullPointerException("unit");
}

if (isShuttingDown()) { // 是否立即关闭
return terminationFuture();
}

boolean inEventLoop = inEventLoop();
boolean wakeup;
int oldState;
for (;;) {
if (isShuttingDown()) {
return terminationFuture();
}
int newState;
wakeup = true;
oldState = state;
if (inEventLoop) {
newState = ST_SHUTTING_DOWN;
} else {
switch (oldState) {
case ST_NOT_STARTED:
case ST_STARTED:
newState = ST_SHUTTING_DOWN;
break;
default:
newState = oldState;
wakeup = false;
}
}
// 直到更新成功
if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
break;
}
}
// 更新时间
gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
gracefulShutdownTimeout = unit.toNanos(timeout);
// 如果线程状态是ST_NOT_STARTED则执行doStartThread()方法启动线程
if (ensureThreadStarted(oldState)) {
return terminationFuture;
}

if (wakeup) { //在事件循环中
// 加入空任务
taskQueue.offer(WAKEUP_TASK);
if (!addTaskWakesUp) {
wakeup(inEventLoop); // 不在事件循环中也taskQueue.offer(WAKEUP_TASK);
}
}

return terminationFuture();
}

获取任务takeTask()

获取任务会从定时/延迟任务队列scheduledTask和普通任务队列taskQueue中获取任务。

对于定时/延迟任务队列scheduledTask中达到执行时间的任务会通过fetchFromScheduledTaskQueue方法将任务从scheduledTask移动到taskQueue。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
protected Runnable takeTask() {
assert inEventLoop();
if (!(taskQueue instanceof BlockingQueue)) {
throw new UnsupportedOperationException();
}

BlockingQueue<Runnable> taskQueue = (BlockingQueue<Runnable>) this.taskQueue;
for (;;) {
// 先去延时/定时任务队列中
// scheduledTaskQueue.peek()
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
Runnable task = null;
try {
// 取任务队列
task = taskQueue.take();
// 唤醒任务
if (task == WAKEUP_TASK) {
task = null;
}
} catch (InterruptedException e) {
// Ignore
}
return task;
} else {
long delayNanos = scheduledTask.delayNanos();
Runnable task = null;
if (delayNanos > 0) {
try {
// 延迟获取
task = taskQueue.poll(delayNanos, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// Waken up.
return null;
}
}
if (task == null) {
// We need to fetch the scheduled tasks now as otherwise there may be a chance that
// scheduled tasks are never executed if there is always one task in the taskQueue.
// This is for example true for the read task of OIO Transport
// See https://github.com/netty/netty/issues/1614
// 从定时任务队列更新数据
// 将定时任务队列中已经达到执行时间的任务移到taskQueue中
fetchFromScheduledTaskQueue();
task = taskQueue.poll();
}

if (task != null) {
return task;
}
}
}
}

SingleThreadEventLoop

SingleThreadEventLoop主要实现的是EventLoopGroup接口的方法。

成员变量

1
2
3
4
5
6
public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));

private final Queue<Runnable> tailTasks;
}

构造器

因为SingleThreadEventLoop本身只有一个非static的成员变量,所以构造器都是依赖父类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {
this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_TASKS, RejectedExecutionHandlers.reject());
}

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, threadFactory, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}

protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks);
}

EventLoopGroup接口方法

这些方法主要目的还是实现链式编程。

register
1
2
3
4
5
6
7
8
9
10
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
afterRunningAllTasks

关闭前会执行confirmShutdown()->runAllTasks()->afterRunningAllTasks()

1
2
3
4
@Override
protected void afterRunningAllTasks() {
runAllTasksFrom(tailTasks);
}

参考

netty SingleThreadEventExecutor 类 addTaskWakesUp 成员变量详解