Netty解析十一:ChannelPipeline与DefaultChannelPipeline
在解析channel时发现channel内部实现非常依赖于ChannelPipeline,在每个事件节点都会调用ChannelPipeline的方法。
在Netty里,Channel是通讯的载体,而ChannelHandler负责Channel中的逻辑处理。
那么ChannelPipeline是什么呢?我觉得可以理解为ChannelHandler的容器:一个Channel包含一个ChannelPipeline,所有ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来。
ChannelPipeline
在Channel的各个生命周期事件的时候调用对应的pipeline的方法然后执行ChannelHandler链。
ChannelPipeline不直接持有ChannelHandler,而是将其封装成ChannelHandlerContext,然后各种操作由ChannelHandlerContext来处理。ChannelHandlerAdapter的实现类如果是可共享的则可以被多个pipeline持有,不然只能被一个pipeline持有。
Netty的ChannelPipeline包含两条线路:Inbound和Outbound。Inbound对应上行,接收到的消息、被动的状态改变,都属于Inbound。Outbound则对应下行,发送的消息、主动的状态改变,都属于Outbound。
下面的是源码的注释,I/O事件被ChannelHandler处理的流程,Inbound是从链头到链尾,而Outbound是从链尾到链头。
I/O Request
via {@link Channel} or
{@link ChannelHandlerContext}
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
ChannelHandlerContext中的方法对应各个事件,这些方法也被分为Inbound和Outbound。
Inbound事件方法1
2
3
4
5
6
7
8
9ChannelHandlerContext#fireChannelRegistered()
ChannelHandlerContext#fireChannelActive()
ChannelHandlerContext#fireChannelRead(Object)
ChannelHandlerContext#fireChannelReadComplete()
ChannelHandlerContext#fireExceptionCaught(Throwable)
ChannelHandlerContext#fireUserEventTriggered(Object)
ChannelHandlerContext#fireChannelWritabilityChanged()
ChannelHandlerContext#fireChannelInactive()
ChannelHandlerContext#fireChannelUnregistered()
Outbound事件方法1
2
3
4
5
6
7
8ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext#write(Object, ChannelPromise)
ChannelHandlerContext#flush()
ChannelHandlerContext#read()
ChannelHandlerContext#disconnect(ChannelPromise)
ChannelHandlerContext#close(ChannelPromise)
ChannelHandlerContext#deregister(ChannelPromise)
接口
ChannelPipeline接口继承了ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>>接口,ChannelInboundInvoker中定义了Inbound事件的方法,ChannelOutboundInvoker中定义了Outbound事件的方法,Iterable用于迭代ChannelHandler。
ChannelPipeline自身定义的方法用于操作ChannelHandler的存取。
1 | public interface ChannelPipeline |
DefaultChannelPipeline
DefaultChannelPipeline是ChannelPipeline的默认实现。
成员变量
1 | public class DefaultChannelPipeline implements ChannelPipeline { |
构造器
构造器中传入channel与之绑定。1
2
3
4
5
6
7
8
9
10
11protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
addFirst
- 检测是否是共享的handler,如果不是共享的handler已经加载了则抛异常;
- 不传name则为simple class name#0;
- newCtx = new DefaultChannelHandlerContext,pipeline中不直接持有handler,而是封装成ChannelHandlerContext;
- 将ChannelHandlerContext加到链表的头中;
- 如果channel还没注册,则ctx状态cas改为添加等待(ADD_PENDING),添加PendingHandlerAddedTask任务到pendingHandlerCallbackHead任务链表中;
- 当前是否在事件循环中:
- 如果不在:则newCtx.setAddPending();然后在executor中执行任务callHandlerAdded0()
- 如果在:则执行callHandlerAdded0()
在callHandlerAdded0()中会执行handler的handlerAdded()事件,表示添加完成1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
return addFirst(null, name, handler);
}
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 检测是否是共享的handler,如果不是共享的handler已经加载了则抛异常
checkMultiplicity(handler);
// 不传name则为simple class name#0
name = filterName(name, handler);
// new DefaultChannelHandlerContext
newCtx = newContext(group, name, handler);
// 将ChannelHandlerContext加到链表的头中
addFirst0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
// 还没注册
if (!registered) {
// ctx状态cas改为添加等待(ADD_PENDING)
newCtx.setAddPending();
// 添加PendingHandlerAddedTask任务到pendingHandlerCallbackHead任务链表中
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
// newCtx.setAddPending();然后在executor中执行任务callHandlerAdded0
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
// 执行ctx的callHandlerAdded方法
callHandlerAdded0(newCtx);
return this;
}
// head与head.next之间插入newCtx
private void addFirst0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext nextCtx = head.next;
newCtx.prev = head;
newCtx.next = nextCtx;
head.next = newCtx;
nextCtx.prev = newCtx;
}
添加或者移除回调callHandlerCallbackLater
PendingHandlerAddedTask任务中执行callHandlerAdded01
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// 将任务加入任务队列尾部
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
handlerAdded事件触发callHandlerAdded0
ctx的callHandlerAdded会调用ChannelHandler的handlerAdded(ChannelHandlerContext ctx)
1 | private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) { |
addLast
addLast与addFast逻辑基本一致,只有在加入链表的时候不一样,一个是插入尾部,一个是插入头部1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public final ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
// 插入尾部
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
// 插入到tail.prev和tail之间
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
addBefore
addBefore相较与addLast或者addFast就是多了一个查询某个ctx然后在其头部插入。1
2ctx = getContextOrDie(baseName);
newCtx = newContext(group, name, handler);
1 | private static void addBefore0(AbstractChannelHandlerContext ctx, AbstractChannelHandlerContext newCtx) { |
remove
- remove中使用synchronized加锁处理;
- 在所有的remove方法移除ctx后都会执行该对象的handler方法。
1 |
|
getContextOrDie
1 | private AbstractChannelHandlerContext getContextOrDie(String name) { |
context()
遍历链表匹配1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final ChannelHandlerContext context(String name) {
if (name == null) {
throw new NullPointerException("name");
}
return context0(name);
}
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
bind
1 |
|
AbstractChannelHandlerContext
ChannelHandlerContext的bind会查作下一个ctx,从尾部向前传递。
AbstractChannelHandlerContext中查找InboundHandler或者OutboundHandler类型的对象时是通过ctx.executionMask & mask判断的,InboundHandler或者OutboundHandler都有其对应的掩码,具体的代码可见io.netty.channel.ChannelHandlerMask#mask0。
findContextOutbound(int mask)方法从后向前查找OutboundHandler类型的handler。
1 |
|
connect/disconnect/close/deregister/flush/bind 逻辑基本一致,通过findContextOutbound向前传递。
read
1 |
|
AbstractChannelHandlerContext.read()
直接查找下一个ChannelOutboundHandler然后执行invokeRead方法。
1 |
|
write
1 |
|
AbstractChannelHandlerContext.write()
直接查询下一个ctx,如果实例化失败则直接再次执行write()查找下一个,逻辑与read基本一致。
1 |
|
writeAndFlush与write一致,只是封装了一下入口。
Inbound事件
Inbound事件全部由AbstractChannelHandlerContext的静态方法处理,同时入参为handler链的头结点。关于Inbound事件的这些静态方法的解析统一放到AbstractChannelHandlerContext的解析中。
1 |
|