上一篇简单列举了几个核心接口,但是对于各个接口的功能并不能合理描述,所以接下来开始从实现类的角度去解析,更利于理解各个接口的设计与功能。

本篇解析NioEventLoopGroup以及它关联的一些接口或类。

解析

AbstractEventExecutorGroup和MultithreadEventExecutorGroup是NioEventLoopGroup类的父类,所以将这两个类的解析放在上面。

AbstractEventExecutorGroup

AbstractEventExecutorGroup实现了EventExecutorGroup接口,未实现shutdown()和next()方法。它实现的方法都是通过next()获取EventExecutor来执行,如下

1
2
3
4
5
6
7
public Future<?> submit(Runnable task) {
return next().submit(task);
}

public <T> Future<T> submit(Runnable task, T result) {
return next().submit(task, result);
}

EventExecutor接口继承了EventExecutorGroup,但是它的next()方法返回的是自身,所以实际干活的是EventExecutor

MultithreadEventExecutorGroup

MultithreadEventExecutorGroup继承了AbstractEventExecutorGroup,并实现了部分方法。

MultithreadEventExecutorGroup提供newChild方法由子类实现EventExecutor的生成

成员变量

1
2
3
4
5
private final EventExecutor[] children; // 具体的执行器
private final Set<EventExecutor> readonlyChildren; // 只读执行器
private final AtomicInteger terminatedChildren = new AtomicInteger(); // 中断计数器
private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); //Promise继承了Future,提供了写的方法
private final EventExecutorChooserFactory.EventExecutorChooser chooser; // EventExecutor选择器

构造器

1
2
3
4
5
6
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}

核心构造器

构造器中预留了newChild(executor, args)方法让子类去实现具体的EventExecutor生成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}

if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// EventExecutor对应线程数
children = new EventExecutor[nThreads];
// 初始化EventExecutor
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 由子类实习newChild方法
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 不成功则释放资源
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 通过选择构建选择器
chooser = chooserFactory.newChooser(children);

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
// 设置中断监听器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}

Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

next()方法

next()是一个核心方法,在AbstractEventExecutorGroup抽象类中利用next()方法返回EventExecutor实现了许多的方法。

使用选择器EventExecutorChooser选出执行器EventExecutor。

1
2
3
public EventExecutor next() {
return chooser.next();
}

shutdown()方法

线程相关的shutdown/isShuttingDown/isShutdown/isTerminated/awaitTermination都是通过遍历children来实现的。

1
2
3
4
5
public void shutdown() {
for (EventExecutor l: children) {
l.shutdown();
}
}

NioEventLoopGroup

继承关系

1
public class NioEventLoopGroup extends MultithreadEventLoopGroup {}

NioEventLoopGroup继承关系

父类的解析见上面AbstractEventExecutorGroupMultithreadEventExecutorGroup

成员变量

构造器

NioEventLoopGroup都是依赖父类MultithreadEventLoopGroup的构造器本身并没有多余的操作。

1
2
3
4
5
6
7
8
public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory,
final RejectedExecutionHandler rejectedExecutionHandler,
final EventLoopTaskQueueFactory taskQueueFactory) {
super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
rejectedExecutionHandler, taskQueueFactory);
}

newChild()方法

newChild方法是NioEventLoopGroup类的核心方法,MultithreadEventLoopGroup在构造器中调用该方法用于生产EventExecutor对象。

实现逻辑是从args中取EventLoopTaskQueueFactory对象,然后new一个NioEventLoop对象返回。实际的运行逻辑在NioEventLoop中。

1
2
3
4
5
6
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

rebuildSelectors()方法

rebuildSelectors方法是使用新的多路复用器,这个是用于处理epoll空轮询,CPU使用率100%的bug。

问题简单描述:若Selector的轮询结果为空,也没有wakeup或新消息处理,则发生空轮询,CPU使用率100%

解决方法:

  • 对Selector的select操作周期进行统计,每完成一次空的select操作进行一次计数,
  • 若在某个周期内连续发生N次空轮询,则触发了epoll死循环bug。
  • 重建Selector,判断是否是其他线程发起的重建请求,若不是则将原SocketChannel从旧的Selector上去除注册,重新注册到新的Selector上,并将原来的Selector关闭。
1
2
3
4
5
public void rebuildSelectors() {
for (EventExecutor e: this) {
((NioEventLoop) e).rebuildSelector();
}
}

设置任务比例setIoRatio()

Netty将任务分为两种:

  1. IO任务;
  2. 非IO任务;

通过ioRatio可以调节两种的比例,默认为50。

1
2
3
4
5
public void setIoRatio(int ioRatio) {
for (EventExecutor e: this) {
((NioEventLoop) e).setIoRatio(ioRatio);
}
}

参考

臭名昭著的epoll bug
762266)