在解析ServerBootstrap类的时候发现无法和之前解析的类整合起来,仔细分析发现少了NioEventLoop类的解析。NioEventLoop代表一个线程,而这个线程到底怎么运作影响了我们对于Netty理解。

NioEventLoop

看本篇前先阅读-【EventLoop的基类SingleThreadEventLoop】。

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public final class NioEventLoop extends SingleThreadEventLoop {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioEventLoop.class);
// 硬件限制,取消上限
private static final int CLEANUP_INTERVAL = 256; // XXX Hard-coded value, but won't need customization.

private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
// 最小select返回次数
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
// select重建上限,达到中数就selectRebuildSelector
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
// 封装了selectNow()
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
// 选择器
private Selector selector;
// 没有包装的选择器
private Selector unwrappedSelector;
// 封装的selectedKey
private SelectedSelectionKeySet selectedKeys;
// 选择器提供者
private final SelectorProvider provider;

private final AtomicBoolean wakenUp = new AtomicBoolean();
private volatile long nextWakeupTime = Long.MAX_VALUE;
// 执行选择器:hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
private final SelectStrategy selectStrategy;
// io任务比例
private volatile int ioRatio = 50;
// 取消的key数
private int cancelledKeys;
// 是否需要重新select
private boolean needsToSelectAgain;
}

静态代码块

静态代码块中加载变量中的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static {
final String key = "sun.nio.ch.bugLevel";
final String bugLevel = SystemPropertyUtil.get(key);
if (bugLevel == null) {
try {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
System.setProperty(key, "");
return null;
}
});
} catch (final SecurityException e) {
logger.debug("Unable to get/set System Property: " + key, e);
}
}

int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}

SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;

if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEY_SET_OPTIMIZATION);
logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
}
}

构造器

触发创建NioEventLoop的流程:

  1. new MultithreadEventExecutorGroup->newChild()
  2. NioEventLoopGroup.newChild()->new NioEventLoop
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}

newTaskQueue()

创建的队列是多生产者单消费者的队列,具体的后面做单独解析【Mpsc.newMpscQueue()】。

1
2
3
4
5
6
7
8
9
10
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return newTaskQueue0(maxPendingTasks);
}

private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}

register(final SelectableChannel ch, final int interestOps, final NioTask<?> task)

注册最终调用ch的register方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public void register(final SelectableChannel ch, final int interestOps, final NioTask<?> task) {
if (ch == null) {
throw new NullPointerException("ch");
}
if (interestOps == 0) {
throw new IllegalArgumentException("interestOps must be non-zero.");
}
if ((interestOps & ~ch.validOps()) != 0) {
throw new IllegalArgumentException(
"invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')');
}
if (task == null) {
throw new NullPointerException("task");
}

if (isShutdown()) {
throw new IllegalStateException("event loop shut down");
}

if (inEventLoop()) {
register0(ch, interestOps, task);
} else {
try {
// Offload to the EventLoop as otherwise java.nio.channels.spi.AbstractSelectableChannel.register
// may block for a long time while trying to obtain an internal lock that may be hold while selecting.
// 注册在EventLoop所在的线程上处理
// sync异步转同步
submit(new Runnable() {
@Override
public void run() {
register0(ch, interestOps, task);
}
}).sync();
} catch (InterruptedException ignore) {
// Even if interrupted we did schedule it so just mark the Thread as interrupted.
Thread.currentThread().interrupt();
}
}
}

register0

注册最终调用channel的register方法

1
2
3
4
5
6
7
private void register0(SelectableChannel ch, int interestOps, NioTask<?> task) {
try {
ch.register(unwrappedSelector, interestOps, task);
} catch (Exception e) {
throw new EventLoopException("failed to register a channel", e);
}
}

openSelector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// java nio的provider
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 没有key优化就直接返回
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
// 通过反射获取Selector的属性,然后将selectedKeySet设置进去
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
// 类型检查
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}

final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// 通过反射将selectedKeySet设置到unwrappedSelector中
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);

if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}

Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
// 通过反射设置selectedKeySet
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});

if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
// 封装Selector和selectedKeySet封装
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}

rebuildSelector

rebuildSelector处理事件循环,实际重建逻辑在rebuildSelector0()。

重建Selector是为了解决NIO死循环的bug:若Selector的轮询结果为空,也没有wakeup或新消息处理,则发生空轮询,CPU使用率100%

解决方法:

  • 对Selector的select操作周期进行统计,每完成一次空的select操作进行一次计数,
  • 若在某个周期内连续发生N次空轮询,则触发了epoll死循环bug。
  • 重建Selector,判断是否是其他线程发起的重建请求,若不是则将原SocketChannel从旧的Selector上去除注册,重新注册到新的Selector上,并将原来的Selector关闭。
1
2
3
4
5
6
7
8
9
10
11
12
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}

rebuildSelector0()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;

if (oldSelector == null) {
return;
}

try {
// 构建新的selector
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}

// Register all channels to the new Selector.
int nChannels = 0;
// 将原来的SelectionKey注册到新的Selector上
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}

int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
// 设置到成员变量上
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;

try {
// time to close the old selector as everything else is registered to the new one
// 关闭旧Selector
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}

if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}

线程执行逻辑run()

run()是线程的核心,也是弄懂netty运行逻辑的核心。

循环中的核心逻辑比较简单:

  1. selectStrategy策略决定当前是否执行;
  2. 根据io任务与普通任务的比例执行io任务和普通任务;
  3. 处理ShuttingDown的情况。

处理上面的还有大量的异常处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
@Override
protected void run() {
for (;;) {
try {
try {
// 判断是select还是执行任务
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT:
// 将唤醒标志位置为false
select(wakenUp.getAndSet(false));
// 如果唤醒则执行selector唤醒
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
// 处理io任务和普通任务的占比
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// 普通任务的执行就是从队列中取出任务执行
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// io任务执行的时间*(普通任务比例/io任务比例)
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
// 关闭处理
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}

select(boolean oldWakenUp)

  1. 计算下个任务的执行时间;
  2. 无限循环:
    1. 计算到下次任务的延迟时间timeoutMillis
    2. 如果当前时间已经到执行下个任务的时间了:
      1. 如果select执行的次数为0,则执行selector.selectNow();然后执行次数置1
      2. 退出循环
    3. 有任务且更新wakenUp为true成功,则执行selectNow(),然后执行次数置1并退出
    4. 执行堵塞的select: selectedKeys = selector.select(timeoutMillis);
    5. select计数值加一;
    6. 如果有下面的情况则break:
      1. selectedKeys!=0有事件触发
      2. wakenUp.get()被唤醒
      3. hasTasks() || hasScheduledTasks()有任务
    7. 如果线程中断(Thread.interrupted())则select计数值置1;
    8. 当前时间是否到执行下一个任务的时间:
      1. 如果是则select计数值置1;
      2. 如果不是则判断select次数是否达到上限SELECTOR_AUTO_REBUILD_THRESHOLD,达到上限则重建selectRebuildSelector并将计数置一;
    9. 更新当前时间currentTimeNanos;
  3. 如果select次数达到MIN_PREMATURE_SELECTOR_RETURNS,打印个日志;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// delayNanos(currentTimeNanos)计算下一个任务需要延时的时间
// selectDeadLineNanos就是下一个任务开始运行的时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
// 下次唤醒时间
if (nextWakeupTime != normalizedDeadlineNanos) {
nextWakeupTime = normalizedDeadlineNanos;
}

for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}

// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
// 有任务且更新wakenUp为true成功,则selectNow并退出
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// select,最大等待时间timeoutMillis
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// selectedKeys有事件触发
// wakenUp.get()被唤醒
// hasTasks() || hasScheduledTasks()有任务
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
// 中断处理
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
// 当前时间是否到执行下一个任务的时间
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
// select次数达到上限,处理select没有事件发送空循环的bug
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 重建Selector
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}

currentTimeNanos = time;
}
// 次数达到报警值
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}

执行io任务processSelectedKeys

根据selectedKeys是否为空判断是走processSelectedKeysOptimized还是processSelectedKeysPlain。

这里的优化是在openSelector()中开始的,优化的方式是使用自定义的SelectedSelectionKeySet替代原来的。

1
2
3
4
5
6
7
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

processSelectedKeysOptimized

selectedKeys替代了原有的selector.selectedKeys()获取方式,selectedKeys是Netty自定义的类,添加了reset方法使得可以复用selectedKeys对象。

processSelectedKeysOptimized处理的是已经在selectedKeys中持有的事件,优化了处理事件保持引用的问题

SelectionKey拥有的attachment对象有两种类型,一种是AbstractNioChannel,另一种是NioTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
// 去除引用
selectedKeys.keys[i] = null;

final Object a = k.attachment();

if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// 执行cancel(SelectionKey key) 后会将needsToSelectAgain置为true
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
// 字段置null
selectedKeys.reset(i + 1);
// 执行selector.select(),并将needsToSelectAgain置为false
selectAgain();
i = -1;
}
}
}

processSelectedKeysPlain

processSelectedKeysPlain是直接将selector.selectedKeys()的结果作为参数传入的,所以少了对selectedKeys引用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}

Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();

if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}

if (!i.hasNext()) {
break;
}
// 再次更新selectedKeys
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();

// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}

processSelectedKey(SelectionKey k, AbstractNioChannel ch)

processSelectedKey(SelectionKey k, AbstractNioChannel ch)处理io的连接/读/写事件处理。

io事件的处理是依赖ch.unsafe属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}

try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
// 连接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
// 移除连接事件的监听
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
// 写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}

// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 读事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}

processSelectedKey(SelectionKey k, NioTask task)

执行task的channelReady任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static void processSelectedKey(SelectionKey k, NioTask<SelectableChannel> task) {
int state = 0;
try {
task.channelReady(k.channel(), k);
state = 1;
} catch (Exception e) {
k.cancel();
invokeChannelUnregistered(task, k, e);
state = 2;
} finally {
switch (state) {
case 0:
k.cancel();
invokeChannelUnregistered(task, k, null);
break;
case 1:
if (!k.isValid()) { // Cancelled by channelReady()
invokeChannelUnregistered(task, k, null);
}
break;
}
}
}

selectAgain

1
2
3
4
5
6
7
8
private void selectAgain() {
needsToSelectAgain = false;
try {
selector.selectNow();
} catch (Throwable t) {
logger.warn("Failed to update SelectionKeys.", t);
}
}