在解析channel时发现channel内部实现非常依赖于ChannelPipeline,在每个事件节点都会调用ChannelPipeline的方法。

在Netty里,Channel是通讯的载体,而ChannelHandler负责Channel中的逻辑处理。

那么ChannelPipeline是什么呢?我觉得可以理解为ChannelHandler的容器:一个Channel包含一个ChannelPipeline,所有ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来。

ChannelPipeline

在Channel的各个生命周期事件的时候调用对应的pipeline的方法然后执行ChannelHandler链。

ChannelPipeline不直接持有ChannelHandler,而是将其封装成ChannelHandlerContext,然后各种操作由ChannelHandlerContext来处理。ChannelHandlerAdapter的实现类如果是可共享的则可以被多个pipeline持有,不然只能被一个pipeline持有。

Netty的ChannelPipeline包含两条线路:Inbound和Outbound。Inbound对应上行,接收到的消息、被动的状态改变,都属于Inbound。Outbound则对应下行,发送的消息、主动的状态改变,都属于Outbound。

下面的是源码的注释,I/O事件被ChannelHandler处理的流程,Inbound是从链头到链尾,而Outbound是从链尾到链头。

                                                  I/O Request
                                             via {@link Channel} or
                                         {@link ChannelHandlerContext}
                                                       |
   +---------------------------------------------------+---------------+
   |                           ChannelPipeline         |               |
   |                                                  \|/              |
   |    +---------------------+            +-----------+----------+    |
   |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
   |    +----------+----------+            +-----------+----------+    |
   |              /|\                                  |               |
   |               |                                  \|/              |
   |    +----------+----------+            +-----------+----------+    |
   |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
   |    +----------+----------+            +-----------+----------+    |
   |              /|\                                  .               |
   |               .                                   .               |
   | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
   |        [ method call]                       [method call]         |
   |               .                                   .               |
   |               .                                  \|/              |
   |    +----------+----------+            +-----------+----------+    |
   |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
   |    +----------+----------+            +-----------+----------+    |
   |              /|\                                  |               |
   |               |                                  \|/              |
   |    +----------+----------+            +-----------+----------+    |
   |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
   |    +----------+----------+            +-----------+----------+    |
   |              /|\                                  |               |
   +---------------+-----------------------------------+---------------+
                   |                                  \|/
   +---------------+-----------------------------------+---------------+
   |               |                                   |               |
   |       [ Socket.read() ]                    [ Socket.write() ]     |
   |                                                                   |
   |  Netty Internal I/O Threads (Transport Implementation)            |
   +-------------------------------------------------------------------+

ChannelHandlerContext中的方法对应各个事件,这些方法也被分为Inbound和Outbound。
Inbound事件方法

1
2
3
4
5
6
7
8
9
ChannelHandlerContext#fireChannelRegistered()
ChannelHandlerContext#fireChannelActive()
ChannelHandlerContext#fireChannelRead(Object)
ChannelHandlerContext#fireChannelReadComplete()
ChannelHandlerContext#fireExceptionCaught(Throwable)
ChannelHandlerContext#fireUserEventTriggered(Object)
ChannelHandlerContext#fireChannelWritabilityChanged()
ChannelHandlerContext#fireChannelInactive()
ChannelHandlerContext#fireChannelUnregistered()

Outbound事件方法

1
2
3
4
5
6
7
8
ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext#write(Object, ChannelPromise)
ChannelHandlerContext#flush()
ChannelHandlerContext#read()
ChannelHandlerContext#disconnect(ChannelPromise)
ChannelHandlerContext#close(ChannelPromise)
ChannelHandlerContext#deregister(ChannelPromise)

接口

ChannelPipeline接口继承了ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>>接口,ChannelInboundInvoker中定义了Inbound事件的方法,ChannelOutboundInvoker中定义了Outbound事件的方法,Iterable用于迭代ChannelHandler。

ChannelPipeline自身定义的方法用于操作ChannelHandler的存取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

ChannelPipeline remove(ChannelHandler handler);
ChannelHandler remove(String name);
<T extends ChannelHandler> T remove(Class<T> handlerType);
ChannelHandler removeFirst();
ChannelHandler removeLast();
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
<T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
ChannelHandler newHandler);
ChannelHandler first();
ChannelHandlerContext firstContext();
ChannelHandler last();
ChannelHandlerContext lastContext();
ChannelHandler get(String name);
<T extends ChannelHandler> T get(Class<T> handlerType);
ChannelHandlerContext context(ChannelHandler handler);
ChannelHandlerContext context(String name);
ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);
Channel channel();
List<String> names();
Map<String, ChannelHandler> toMap();

@Override
ChannelPipeline fireChannelRegistered();
@Override
ChannelPipeline fireChannelUnregistered();
@Override
ChannelPipeline fireChannelActive();
@Override
ChannelPipeline fireChannelInactive();
@Override
ChannelPipeline fireExceptionCaught(Throwable cause);
@Override
ChannelPipeline fireUserEventTriggered(Object event);
@Override
ChannelPipeline fireChannelRead(Object msg);
@Override
ChannelPipeline fireChannelReadComplete();
@Override
ChannelPipeline fireChannelWritabilityChanged();
@Override
ChannelPipeline flush();
}

DefaultChannelPipeline

DefaultChannelPipeline是ChannelPipeline的默认实现。

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class DefaultChannelPipeline implements ChannelPipeline {
// HeadContext#0 和 TailContext#0
private static final String HEAD_NAME = generateName0(HeadContext.class);
private static final String TAIL_NAME = generateName0(TailContext.class);

private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() {
return new WeakHashMap<Class<?>, String>();
}
};
// estimatorHandle属性的原子更新
private static final AtomicReferenceFieldUpdater<DefaultChannelPipeline, MessageSizeEstimator.Handle> ESTIMATOR =
AtomicReferenceFieldUpdater.newUpdater(
DefaultChannelPipeline.class, MessageSizeEstimator.Handle.class, "estimatorHandle");
// ChannelHandlerContext的头和尾,组成链表,ChannelHandler先封装成ChannelHandlerContext再插入链表
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
// 绑定的channel
private final Channel channel;
// 成功的future
private final ChannelFuture succeededFuture;
private final VoidChannelPromise voidPromise;
// 内存泄漏检测是否开启
private final boolean touch = ResourceLeakDetector.isEnabled();
//
private Map<EventExecutorGroup, EventExecutor> childExecutors;
private volatile MessageSizeEstimator.Handle estimatorHandle;
// 第一次注册
private boolean firstRegistration = true;
// 等待任务列表
private PendingHandlerCallback pendingHandlerCallbackHead;
// channel是否注册到eventloop中,在调用pipeline的invokeHandlerAddedIfNeeded()时间会将其设置为true
private boolean registered;
}

构造器

构造器中传入channel与之绑定。

1
2
3
4
5
6
7
8
9
10
11
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

tail = new TailContext(this);
head = new HeadContext(this);

head.next = tail;
tail.prev = head;
}

addFirst

  1. 检测是否是共享的handler,如果不是共享的handler已经加载了则抛异常;
  2. 不传name则为simple class name#0;
  3. newCtx = new DefaultChannelHandlerContext,pipeline中不直接持有handler,而是封装成ChannelHandlerContext;
  4. 将ChannelHandlerContext加到链表的头中;
  5. 如果channel还没注册,则ctx状态cas改为添加等待(ADD_PENDING),添加PendingHandlerAddedTask任务到pendingHandlerCallbackHead任务链表中;
  6. 当前是否在事件循环中:
    1. 如果不在:则newCtx.setAddPending();然后在executor中执行任务callHandlerAdded0()
    2. 如果在:则执行callHandlerAdded0()

在callHandlerAdded0()中会执行handler的handlerAdded()事件,表示添加完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Override
public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
return addFirst(null, name, handler);
}

@Override
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 检测是否是共享的handler,如果不是共享的handler已经加载了则抛异常
checkMultiplicity(handler);
// 不传name则为simple class name#0
name = filterName(name, handler);
// new DefaultChannelHandlerContext
newCtx = newContext(group, name, handler);
// 将ChannelHandlerContext加到链表的头中
addFirst0(newCtx);

// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
// 还没注册
if (!registered) {
// ctx状态cas改为添加等待(ADD_PENDING)
newCtx.setAddPending();
// 添加PendingHandlerAddedTask任务到pendingHandlerCallbackHead任务链表中
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
// newCtx.setAddPending();然后在executor中执行任务callHandlerAdded0
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 执行ctx的callHandlerAdded方法
callHandlerAdded0(newCtx);
return this;
}
// head与head.next之间插入newCtx
private void addFirst0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext nextCtx = head.next;
newCtx.prev = head;
newCtx.next = nextCtx;
head.next = newCtx;
nextCtx.prev = newCtx;
}

添加或者移除回调callHandlerCallbackLater

PendingHandlerAddedTask任务中执行callHandlerAdded0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;

PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// 将任务加入任务队列尾部
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}

handlerAdded事件触发callHandlerAdded0

ctx的callHandlerAdded会调用ChannelHandler的handlerAdded(ChannelHandlerContext ctx)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// ctx的callHandlerAdded会调用ChannelHandler的handlerAdded(ChannelHandlerContext ctx)
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
// 移除
atomicRemoveFromHandlerList(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}

if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}

addLast

addLast与addFast逻辑基本一致,只有在加入链表的时候不一样,一个是插入尾部,一个是插入头部

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);

newCtx = newContext(group, filterName(name, handler), handler);
// 插入尾部
addLast0(newCtx);

// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
// 插入到tail.prev和tail之间
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}

addBefore

addBefore相较与addLast或者addFast就是多了一个查询某个ctx然后在其头部插入。

1
2
ctx = getContextOrDie(baseName);
newCtx = newContext(group, name, handler);

1
2
3
4
5
6
private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) {
newCtx.prev = ctx.prev;
newCtx.next = ctx;
ctx.prev.next = newCtx;
ctx.prev = newCtx;
}

remove

  • remove中使用synchronized加锁处理;
  • 在所有的remove方法移除ctx后都会执行该对象的handler方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;

synchronized (this) {
atomicRemoveFromHandlerList(ctx);

// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we remove the context from the pipeline and add a task that will call
// ChannelHandler.handlerRemoved(...) once the channel is registered.
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}

EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
callHandlerRemoved0(ctx);
return ctx;
}
private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}

getContextOrDie

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private AbstractChannelHandlerContext getContextOrDie(String name) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(name);
if (ctx == null) {
throw new NoSuchElementException(name);
} else {
return ctx;
}
}

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
private AbstractChannelHandlerContext getContextOrDie(Class<? extends ChannelHandler> handlerType) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handlerType);
if (ctx == null) {
throw new NoSuchElementException(handlerType.getName());
} else {
return ctx;
}
}

context()
遍历链表匹配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public final ChannelHandlerContext context(String name) {
if (name == null) {
throw new NullPointerException("name");
}

return context0(name);
}
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}

bind

1
2
3
4
@Override
public final ChannelFuture bind(SocketAddress localAddress) {
return tail.bind(localAddress);
}

AbstractChannelHandlerContext

ChannelHandlerContext的bind会查作下一个ctx,从尾部向前传递。

AbstractChannelHandlerContext中查找InboundHandler或者OutboundHandler类型的对象时是通过ctx.executionMask & mask判断的,InboundHandler或者OutboundHandler都有其对应的掩码,具体的代码可见io.netty.channel.ChannelHandlerMask#mask0。

findContextOutbound(int mask)方法从后向前查找OutboundHandler类型的handler。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return bind(localAddress, newPromise());
}
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}

final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
// 向前传递
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}

connect/disconnect/close/deregister/flush/bind 逻辑基本一致,通过findContextOutbound向前传递。

read

1
2
3
4
5
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}

AbstractChannelHandlerContext.read()

直接查找下一个ChannelOutboundHandler然后执行invokeRead方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public ChannelHandlerContext read() {
// 找下一个
final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else { // 不在事件循环则加入任务中执行
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
executor.execute(tasks.invokeReadTask);
}

return this;
}

private void invokeRead() {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else { // 实例化失败就找下一个处理器
read();
}
}

write

1
2
3
4
5
6
7
8
9
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}

@Override
public final ChannelFuture write(Object msg, ChannelPromise promise) {
return tail.write(msg, promise);
}

AbstractChannelHandlerContext.write()

直接查询下一个ctx,如果实例化失败则直接再次执行write()查找下一个,逻辑与read基本一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
write(msg, false, promise);

return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
// 查找下一个
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
// 内存泄漏跟踪
final Object m = pipeline.touch(msg, next);
// 在事件循环中执行
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
if (!safeExecute(executor, task, promise, m)) {
// We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
task.cancel();
}
}
}
private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else { // 实例化失败则调用下一个
write(msg, promise);
}
}

private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}

writeAndFlush与write一致,只是封装了一下入口。

Inbound事件

Inbound事件全部由AbstractChannelHandlerContext的静态方法处理,同时入参为handler链的头结点。关于Inbound事件的这些静态方法的解析统一放到AbstractChannelHandlerContext的解析中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Override
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}

@Override
public final ChannelPipeline fireChannelUnregistered() {
AbstractChannelHandlerContext.invokeChannelUnregistered(head);
return this;
}
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}

@Override
public final ChannelPipeline fireChannelInactive() {
AbstractChannelHandlerContext.invokeChannelInactive(head);
return this;
}

@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
return this;
}

@Override
public final ChannelPipeline fireUserEventTriggered(Object event) {
AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
return this;
}

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}

@Override
public final ChannelPipeline fireChannelReadComplete() {
AbstractChannelHandlerContext.invokeChannelReadComplete(head);
return this;
}

@Override
public final ChannelPipeline fireChannelWritabilityChanged() {
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
return this;
}

参考

Netty源码解读(三)Channel与Pipeline