publicvoidregister(final SelectableChannel ch, finalint interestOps, final NioTask<?> task){ if (ch == null) { thrownew NullPointerException("ch"); } if (interestOps == 0) { thrownew IllegalArgumentException("interestOps must be non-zero."); } if ((interestOps & ~ch.validOps()) != 0) { thrownew IllegalArgumentException( "invalid interestOps: " + interestOps + "(validOps: " + ch.validOps() + ')'); } if (task == null) { thrownew NullPointerException("task"); }
if (isShutdown()) { thrownew IllegalStateException("event loop shut down"); }
if (inEventLoop()) { register0(ch, interestOps, task); } else { try { // Offload to the EventLoop as otherwise java.nio.channels.spi.AbstractSelectableChannel.register // may block for a long time while trying to obtain an internal lock that may be hold while selecting. // 注册在EventLoop所在的线程上处理 // sync异步转同步 submit(new Runnable() { @Override publicvoidrun(){ register0(ch, interestOps, task); } }).sync(); } catch (InterruptedException ignore) { // Even if interrupted we did schedule it so just mark the Thread as interrupted. Thread.currentThread().interrupt(); } } }
register0
注册最终调用channel的register方法
1 2 3 4 5 6 7
privatevoidregister0(SelectableChannel ch, int interestOps, NioTask<?> task){ try { ch.register(unwrappedSelector, interestOps, task); } catch (Exception e) { thrownew EventLoopException("failed to register a channel", e); } }
private SelectorTuple openSelector(){ final Selector unwrappedSelector; try { // java nio的provider unwrappedSelector = provider.openSelector(); } catch (IOException e) { thrownew ChannelException("failed to open a new selector", e); } // 没有key优化就直接返回 if (DISABLE_KEY_SET_OPTIMIZATION) { returnnew SelectorTuple(unwrappedSelector); } // 通过反射获取Selector的属性,然后将selectedKeySet设置进去 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run(){ try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); // 类型检查 if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } returnnew SelectorTuple(unwrappedSelector); }
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); // 通过反射将selectedKeySet设置到unwrappedSelector中 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run(){ try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. // This allows us to also do this in Java9+ without any extra flags. long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject( unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); returnnull; } // We could not retrieve the offset, lets try reflection as last-resort. }
privatevoidrebuildSelector0(){ final Selector oldSelector = selector; final SelectorTuple newSelectorTuple;
if (oldSelector == null) { return; }
try { // 构建新的selector newSelectorTuple = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; }
// Register all channels to the new Selector. int nChannels = 0; // 将原来的SelectionKey注册到新的Selector上 for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; }
int interestOps = key.interestOps(); key.cancel(); SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) { // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } } // 设置到成员变量上 selector = newSelectorTuple.selector; unwrappedSelector = newSelectorTuple.unwrappedSelector;
try { // time to close the old selector as everything else is registered to the new one // 关闭旧Selector oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } }
if (logger.isInfoEnabled()) { logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } }
@Override protectedvoidrun(){ for (;;) { try { try { // 判断是select还是执行任务 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue;
case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: // 将唤醒标志位置为false select(wakenUp.getAndSet(false)); // 如果唤醒则执行selector唤醒 if (wakenUp.get()) { selector.wakeup(); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); handleLoopException(e); continue; }
privatevoidselect(boolean oldWakenUp)throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); // delayNanos(currentTimeNanos)计算下一个任务需要延时的时间 // selectDeadLineNanos就是下一个任务开始运行的时间 long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime(); // 下次唤醒时间 if (nextWakeupTime != normalizedDeadlineNanos) { nextWakeupTime = normalizedDeadlineNanos; }
for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; }
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call // Selector#wakeup. So we need to check task queue again before executing select operation. // If we don't, the task might be pended until select operation was timed out. // It might be pended until idle timeout if IdleStateHandler existed in pipeline. // 有任务且更新wakenUp为true成功,则selectNow并退出 if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } // select,最大等待时间timeoutMillis int selectedKeys = selector.select(timeoutMillis); selectCnt ++; // selectedKeys有事件触发 // wakenUp.get()被唤醒 // hasTasks() || hasScheduledTasks()有任务 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } // 中断处理 if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } // 当前时间是否到执行下一个任务的时间 long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; // select次数达到上限,处理select没有事件发送空循环的bug } elseif (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // 重建Selector selector = selectRebuildSelector(selectCnt); selectCnt = 1; break; }
currentTimeNanos = time; } // 次数达到报警值 if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } // Harmless exception - log anyway } }
privatevoidprocessSelectedKeysOptimized(){ for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 // 去除引用 selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } // 执行cancel(SelectionKey key) 后会将needsToSelectAgain置为true if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 // 字段置null selectedKeys.reset(i + 1); // 执行selector.select(),并将needsToSelectAgain置为false selectAgain(); i = -1; } } }
privatevoidprocessSelectedKeysPlain(Set<SelectionKey> selectedKeys){ // check if the set is empty and if so just return to not create garbage by // creating a new Iterator every time even if there is nothing to process. // See https://github.com/netty/netty/issues/597 if (selectedKeys.isEmpty()) { return; }
Iterator<SelectionKey> i = selectedKeys.iterator(); for (;;) { final SelectionKey k = i.next(); final Object a = k.attachment(); i.remove();
if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); }
if (!i.hasNext()) { break; } // 再次更新selectedKeys if (needsToSelectAgain) { selectAgain(); selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException if (selectedKeys.isEmpty()) { break; } else { i = selectedKeys.iterator(); } } } }
privatevoidprocessSelectedKey(SelectionKey k, AbstractNioChannel ch){ final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { // If the channel implementation throws an exception because there is no event loop, we ignore this // because we are only trying to determine if ch is registered to this event loop and thus has authority // to close ch. return; } // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is // still healthy and should not be closed. // See https://github.com/netty/netty/issues/5125 if (eventLoop != this || eventLoop == null) { return; } // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); return; }
try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. // 连接事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 // 移除连接事件的监听 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
unsafe.finishConnect(); }
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory. // 写事件 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // 读事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }