Netty解析十四:ServerBootstrap
ServerBootstrap是我们在写Netty程序时接触的第二个类,负责简化Netty的启动。
AbstractBootstrap
ServerBootstrap继承父类AbstractBootstrap,AbstractBootstrap中完成了大部分的功能,ServerBootstrap增加了childGroup(EventLoopGroup)来处理请求的读写。
ServerBootstrap与AbstractBootstrap都是支持链式编程的。
接下来解析先AbstractBootstrap。
成员变量
泛型B的用途是为了方法返回当前类(继承该类的子类)实例时能返回正确的类型,避免类型转换。1
2
3
4
5
6
7
8
9
10
11
12
13
14public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
// 定制化的线程池能够处理io事件或者自定义任务
volatile EventLoopGroup group;
// channel工厂类,默认是通过ReflectiveChannelFactory,也就是反射生成Channel对象
("deprecation")
private volatile ChannelFactory<? extends C> channelFactory;
// 本地地址
private volatile SocketAddress localAddress;
// channel的配置和属性
private final Map<ChannelOption<?>, Object> options = new ConcurrentHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
// channel处理器
private volatile ChannelHandler handler;
}
构造器
构造器提供了一个复制功能。1
2
3
4
5
6
7
8
9
10
11
12AbstractBootstrap() {
// Disallow extending from a different package.
}
AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
group = bootstrap.group;
channelFactory = bootstrap.channelFactory;
handler = bootstrap.handler;
localAddress = bootstrap.localAddress;
options.putAll(bootstrap.options);
attrs.putAll(bootstrap.attrs);
}
抽象方法
1 | public abstract AbstractBootstrapConfig<B, C> config(); |
self()
获取自身实例1
2
3private B self() {
return (B) this;
}
channel(Class<? extends C>)
在设置channelClass的时候就会构建一个ChannelFactory对象:ReflectiveChannelFactory
1 | public B channel(Class<? extends C> channelClass) { |
ReflectiveChannelFactory是通过反射构建新实例的Channel工厂类,其核心逻辑如下:
- 构造器中传入Class,通过Class获取其无参构造器;
- newChannel方法中通过无参构造器的newInstance生成新对象。
1 | public ReflectiveChannelFactory(Class<? extends T> clazz) { |
option(ChannelOption option, T value)
向map中传入配置1
2
3
4
5
6
7
8
9public <T> B option(ChannelOption<T> option, T value) {
ObjectUtil.checkNotNull(option, "option");
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
return self();
}
attr(AttributeKey key, T value)
向map中传入属性1
2
3
4
5
6
7
8
9public <T> B attr(AttributeKey<T> key, T value) {
ObjectUtil.checkNotNull(key, "key");
if (value == null) {
attrs.remove(key);
} else {
attrs.put(key, value);
}
return self();
}
validate()
参数校验,只校验是否有group和channelFactory。1
2
3
4
5
6
7
8
9public B validate() {
if (group == null) {
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return self();
}
register()
注册逻辑通过initAndRegister()方法实现。1
2
3
4public ChannelFuture register() {
validate();
return initAndRegister();
}
初始化与注册initAndRegister()
- 使用channelFactory创建新的channel;
- 执行init(channel),init方法由子类实现;
- 通过config()获取EventLoopGroup然后执行regiester方法;
- 结果校验,并返回。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29final ChannelFuture initAndRegister() {
// 实例化channel,然后调用初始化方法,init方法由子类实现
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// config()由子类实现,获取线程池(EventLoopGroup)执行他的register方法
// 最终执行的是channel.unsafe().register()
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
bind方法
bind有多个方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(String inetHost, int inetPort) {
return bind(SocketUtils.socketAddress(inetHost, inetPort));
}
public ChannelFuture bind(InetAddress inetHost, int inetPort) {
return bind(new InetSocketAddress(inetHost, inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
这些bind方法除了入参不一致其他逻辑一致,最终都是执行doBind方法。
doBind
1 | private ChannelFuture doBind(final SocketAddress localAddress) { |
doBind0
将bind任务添加到eventLoop中,这样能够确保Bind之前的任务全部执行了,例如register任务。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
// 将bind任务添加到eventLoop中,这样能够确保Bind之前的任务全部执行了,例如register任务
channel.eventLoop().execute(new Runnable() {
public void run() {
if (regFuture.isSuccess()) {
// 执行channel的bind方法,添加失败时关闭的监听器
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
内部类PendingRegistrationPromise
PendingRegistrationPromise封装了注册失败后的executor处理,GlobalEventExecutor全局事件处理器。
1 | static final class PendingRegistrationPromise extends DefaultChannelPromise { |
ServerBootstrap
ServerBootstrap继承AbstractBootstrap,增加了child(EventLoopGroup)的概念,child负责处理channel的读写事件,而AbstractBootstrap中的group负责连接、绑定等事件。这是Reactor模型的实现。
当然ServerBootstrap也支持child和AbstractBootstrap中的group共用一个对象。
成员变量
1 | public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { |
成员变量中增加了childGroup以及配套的配置属性:childHandler、childOptions、childAttrs。
构造器
1 | public ServerBootstrap() { } |
group方法
单EventLoopGroup1
2
3
4
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
两个EventLoopGroup
1 | public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { |
validate()
重写了validate方法添加校验childHandler和childGroup。
1 |
|
init(Channel channel)
关于ServerBootstrapAcceptor为什么要在事件循环中添加?
为了解决这个bug:https://github.com/netty/netty/issues/5566#ref-commit-a76553e
handler如果是ChannelInitializer类型,则可能存在handler中添加的处理器接收不到读事件;因为initChannel这个方法是在Eventloop上执行的,而添加ServerBootstrapAcceptor不在Eventloop上就会导致ServerBootstrapAcceptor在handler中添加的处理器的前面,这样ServerBootstrapAcceptor会拦截读事件;而加到事件循环中就能保证ServerBootstrapAcceptor一定在handler中添加的处理器的后面。
1 |
|
内部类ServerBootstrapAcceptor
ServerBootstrapAcceptor继承ChannelInboundHandlerAdapter是处理Inbound事件的handler。
ServerBootstrapAcceptor的核心在channelRead方法,channelRead方法中将产生read事件的channel注册到chlidGroup上,让chlidGroup处理读写事件。
如果将childGroup和bossGroup配置的一样那就是同一个group处理连接和读写事件。
成员变量
1 | private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { |
构造器
1 | ServerBootstrapAcceptor( |
channelRead(ChannelHandlerContext ctx, Object msg)
1 |
|