// Will be set to null if no child executor should be used, otherwise it will be set to the // child executor. // 如果没有子executor就设为null,除非它要被设为子executor final EventExecutor executor; private ChannelFuture succeededFuture;
// Lazily instantiated tasks used to trigger events to a handler with different executor. // There is no need to make this volatile as at worse it will just create a few more instances then needed. // 延迟实例化任务被用于触发事件去处理不同的executor private Tasks invokeTasks; // handler状态 privatevolatileint handlerState = INIT;
构造器
1 2 3 4 5 6 7 8 9
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) { this.name = ObjectUtil.checkNotNull(name, "name"); this.pipeline = pipeline; this.executor = executor; this.executionMask = mask(handlerClass); // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor. ordered = executor == null || executor instanceof OrderedEventExecutor; }
staticintmask(Class<? extends ChannelHandler> clazz){ // Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast // lookup in the future. Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get(); Integer mask = cache.get(clazz); if (mask == null) { mask = mask0(clazz); cache.put(clazz, mask); } return mask; } privatestaticintmask0(Class<? extends ChannelHandler> handlerType){ int mask = MASK_EXCEPTION_CAUGHT; try { if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) { mask |= MASK_ALL_INBOUND;
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_REGISTERED; } if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_UNREGISTERED; } if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_ACTIVE; } if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_INACTIVE; } if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) { mask &= ~MASK_CHANNEL_READ; } if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_READ_COMPLETE; } if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) { mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED; } if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) { mask &= ~MASK_USER_EVENT_TRIGGERED; } }
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) { mask |= MASK_ALL_OUTBOUND;
@Override public ChannelFuture write(Object msg){ return write(msg, newPromise()); } @Override public ChannelFuture write(final Object msg, final ChannelPromise promise){ write(msg, false, promise);
return promise; } privatevoidwrite(Object msg, boolean flush, ChannelPromise promise){ ObjectUtil.checkNotNull(msg, "msg"); try { if (isNotValidPromise(promise, true)) { ReferenceCountUtil.release(msg); // cancelled return; } } catch (RuntimeException e) { ReferenceCountUtil.release(msg); throw e; } // 查找OutboundHandler final AbstractChannelHandlerContext next = findContextOutbound(flush ? (MASK_WRITE | MASK_FLUSH) : MASK_WRITE); // 内存泄漏跟踪 final Object m = pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { if (flush) { next.invokeWriteAndFlush(m, promise); } else { next.invokeWrite(m, promise); } } else { // 不在事件循环中的添加任务并执行 final AbstractWriteTask task; if (flush) { task = WriteAndFlushTask.newInstance(next, m, promise); } else { task = WriteTask.newInstance(next, m, promise); } if (!safeExecute(executor, task, promise, m)) { // We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes // and put it back in the Recycler for re-use later. // // See https://github.com/netty/netty/issues/8343. task.cancel(); } } }
finalvoidsetAddPending(){ boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING); assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved(). }
pipeline添加回调callHandlerAdded
1 2 3 4 5 6 7
finalvoidcallHandlerAdded()throws Exception { // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates // any pipeline events ctx.handler() will miss them because the state will not allow it. if (setAddComplete()) { handler().handlerAdded(this); } }
pipeline移除回调callHandlerRemoved
1 2 3 4 5 6 7 8 9 10 11
finalvoidcallHandlerRemoved()throws Exception { try { // Only call handlerRemoved(...) if we called handlerAdded(...) before. if (handlerState == ADD_COMPLETE) { handler().handlerRemoved(this); } } finally { // Mark the handler as removed in any case. setRemoved(); } }
invokeHandler是否实例化
1 2 3 4 5
privatebooleaninvokeHandler(){ // Store in local variable to reduce volatile reads. int handlerState = this.handlerState; return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING); }
newPromise
1 2 3 4
@Override public ChannelPromise newPromise(){ returnnew DefaultChannelPromise(channel(), executor()); }