Netty解析十八:Netty服务器端读写事件流程
之前解析了Netty服务器端启动流程和线程模型,现在解析Netty对于读写事件的处理流程。
读事件
例子按照http服务器端的那个例子:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public void serverRun() throws InterruptedException {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(1024*1024));
// 自定义处理逻辑
ch.pipeline().addLast(new HttpServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
;
ChannelFuture future = serverBootstrap.bind(80).sync();
future.channel().closeFuture().sync();
}finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
Http请求事件的处理(读事件)

处理的核心就是通过ChannelHandlerContext的链式结构不断向后传递,最终达到自定义的处理逻辑HttpServerHandler中。
这个传递逻辑可以看之前解析的ChannelHandlerContext篇。findContextInbound()不断的查找下一个Inbound处理器上下文。
第一个头节点的执行:
DefaultChannelPipeline1
2
3
4public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
AbstractChannelHandlerContext1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
// 非静态方法,当前处理器处理完成后执行
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
对于HttpServerCodec.HttpServerRequestDecoder它是继承了ByteToMessageDecoder:1
2
3
4
5static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
写事件
写事件其实就是读事件的逆顺序执行。
在你在HttpServerHandler中处理完自定义逻辑,然后要将信息响应给客户端时,就会如下执行1
ctx.writeAndFlush(response);
依赖的就是上下文的writeAndFlush方法,从当前处理器上下文向前查找Onbound处理器上下文。
或者客户端直接发送信息,这样就是从尾节点(tail)开始:1
2ChannelFuture future = serverBootstrap.bind(80).sync();
future.channel().pipeline().write("xxx");
DefaultChannelPipeline
pipeline中正好与read相反,处理器从尾节点开始。1
2
3public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
最终与读类似,依赖ChannelHandlerContext的write方法,write方法中通过findContextOutbound()不断查找上一个Onbound的处理器上下文。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16private void write(Object msg, boolean flush, ChannelPromise promise) {
// ...
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
// ...
}
}