ServerBootstrap是我们在写Netty程序时接触的第二个类,负责简化Netty的启动。

AbstractBootstrap

ServerBootstrap继承父类AbstractBootstrap,AbstractBootstrap中完成了大部分的功能,ServerBootstrap增加了childGroup(EventLoopGroup)来处理请求的读写。

ServerBootstrap与AbstractBootstrap都是支持链式编程的。

接下来解析先AbstractBootstrap。

成员变量

泛型B的用途是为了方法返回当前类(继承该类的子类)实例时能返回正确的类型,避免类型转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
// 定制化的线程池能够处理io事件或者自定义任务
volatile EventLoopGroup group;
// channel工厂类,默认是通过ReflectiveChannelFactory,也就是反射生成Channel对象
@SuppressWarnings("deprecation")
private volatile ChannelFactory<? extends C> channelFactory;
// 本地地址
private volatile SocketAddress localAddress;
// channel的配置和属性
private final Map<ChannelOption<?>, Object> options = new ConcurrentHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
// channel处理器
private volatile ChannelHandler handler;
}

构造器

构造器提供了一个复制功能。

1
2
3
4
5
6
7
8
9
10
11
12
AbstractBootstrap() {
// Disallow extending from a different package.
}

AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
group = bootstrap.group;
channelFactory = bootstrap.channelFactory;
handler = bootstrap.handler;
localAddress = bootstrap.localAddress;
options.putAll(bootstrap.options);
attrs.putAll(bootstrap.attrs);
}

抽象方法

1
2
public abstract AbstractBootstrapConfig<B, C> config();
abstract void init(Channel channel) throws Exception;

self()

获取自身实例

1
2
3
private B self() {
return (B) this;
}

channel(Class<? extends C>)

在设置channelClass的时候就会构建一个ChannelFactory对象:ReflectiveChannelFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}

this.channelFactory = channelFactory;
return self();
}

ReflectiveChannelFactory是通过反射构建新实例的Channel工厂类,其核心逻辑如下:

  1. 构造器中传入Class,通过Class获取其无参构造器;
  2. newChannel方法中通过无参构造器的newInstance生成新对象。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
// 获取构造器对象
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}

@Override
public T newChannel() {
try {
// 反射生成新对象
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}

option(ChannelOption option, T value)

向map中传入配置

1
2
3
4
5
6
7
8
9
public <T> B option(ChannelOption<T> option, T value) {
ObjectUtil.checkNotNull(option, "option");
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
return self();
}

attr(AttributeKey key, T value)

向map中传入属性

1
2
3
4
5
6
7
8
9
public <T> B attr(AttributeKey<T> key, T value) {
ObjectUtil.checkNotNull(key, "key");
if (value == null) {
attrs.remove(key);
} else {
attrs.put(key, value);
}
return self();
}

validate()

参数校验,只校验是否有group和channelFactory。

1
2
3
4
5
6
7
8
9
public B validate() {
if (group == null) {
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return self();
}

register()

注册逻辑通过initAndRegister()方法实现。

1
2
3
4
public ChannelFuture register() {
validate();
return initAndRegister();
}

初始化与注册initAndRegister()

  1. 使用channelFactory创建新的channel;
  2. 执行init(channel),init方法由子类实现;
  3. 通过config()获取EventLoopGroup然后执行regiester方法;
  4. 结果校验,并返回。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    final ChannelFuture initAndRegister() {
    // 实例化channel,然后调用初始化方法,init方法由子类实现
    Channel channel = null;
    try {
    channel = channelFactory.newChannel();
    init(channel);
    } catch (Throwable t) {
    if (channel != null) {
    // channel can be null if newChannel crashed (eg SocketException("too many open files"))
    channel.unsafe().closeForcibly();
    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
    return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    // config()由子类实现,获取线程池(EventLoopGroup)执行他的register方法
    // 最终执行的是channel.unsafe().register()
    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
    if (channel.isRegistered()) {
    channel.close();
    } else {
    channel.unsafe().closeForcibly();
    }
    }

    return regFuture;
    }

bind方法

bind有多个方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(String inetHost, int inetPort) {
return bind(SocketUtils.socketAddress(inetHost, inetPort));
}
public ChannelFuture bind(InetAddress inetHost, int inetPort) {
return bind(new InetSocketAddress(inetHost, inetPort));
}

public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

这些bind方法除了入参不一致其他逻辑一致,最终都是执行doBind方法。

doBind

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private ChannelFuture doBind(final SocketAddress localAddress) {
// 先确保注册
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 注册失败了
if (regFuture.cause() != null) {
return regFuture;
}
// 注册完成
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 注册没有完成就添加一个监听事件等待完成,完成后再执行doBind0
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

doBind0
将bind任务添加到eventLoop中,这样能够确保Bind之前的任务全部执行了,例如register任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
// 将bind任务添加到eventLoop中,这样能够确保Bind之前的任务全部执行了,例如register任务
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 执行channel的bind方法,添加失败时关闭的监听器
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

内部类PendingRegistrationPromise

PendingRegistrationPromise封装了注册失败后的executor处理,GlobalEventExecutor全局事件处理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static final class PendingRegistrationPromise extends DefaultChannelPromise {

// Is set to the correct EventExecutor once the registration was successful. Otherwise it will
// stay null and so the GlobalEventExecutor.INSTANCE will be used for notifications.
private volatile boolean registered;

PendingRegistrationPromise(Channel channel) {
super(channel);
}

void registered() {
registered = true;
}

@Override
protected EventExecutor executor() {
if (registered) {
// If the registration was a success executor is set.
//
// See https://github.com/netty/netty/issues/2586
return super.executor();
}
// The registration failed so we can only use the GlobalEventExecutor as last resort to notify.
return GlobalEventExecutor.INSTANCE;
}
}

ServerBootstrap

ServerBootstrap继承AbstractBootstrap,增加了child(EventLoopGroup)的概念,child负责处理channel的读写事件,而AbstractBootstrap中的group负责连接、绑定等事件。这是Reactor模型的实现。

当然ServerBootstrap也支持child和AbstractBootstrap中的group共用一个对象。

成员变量

1
2
3
4
5
6
7
8
9
10
11
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);

private final Map<ChannelOption<?>, Object> childOptions = new ConcurrentHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
// 将ServerBootstrap自身转成config,所有的属性值就是ServerBootstrap自身的属性值
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
}

成员变量中增加了childGroup以及配套的配置属性:childHandler、childOptions、childAttrs。

构造器

1
2
3
4
5
6
7
8
9
public ServerBootstrap() { }

private ServerBootstrap(ServerBootstrap bootstrap) {
super(bootstrap);
childGroup = bootstrap.childGroup;
childHandler = bootstrap.childHandler;
childOptions.putAll(bootstrap.childOptions);
childAttrs.putAll(bootstrap.childAttrs);
}

group方法

单EventLoopGroup

1
2
3
4
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}

两个EventLoopGroup

1
2
3
4
5
6
7
8
9
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
ObjectUtil.checkNotNull(childGroup, "childGroup");
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}

validate()

重写了validate方法添加校验childHandler和childGroup。

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public ServerBootstrap validate() {
super.validate();
if (childHandler == null) {
throw new IllegalStateException("childHandler not set");
}
if (childGroup == null) {
logger.warn("childGroup is not set. Using parentGroup instead.");
childGroup = config.group();
}
return this;
}

init(Channel channel)

关于ServerBootstrapAcceptor为什么要在事件循环中添加?

为了解决这个bug:https://github.com/netty/netty/issues/5566#ref-commit-a76553e

handler如果是ChannelInitializer类型,则可能存在handler中添加的处理器接收不到读事件;因为initChannel这个方法是在Eventloop上执行的,而添加ServerBootstrapAcceptor不在Eventloop上就会导致ServerBootstrapAcceptor在handler中添加的处理器的前面,这样ServerBootstrapAcceptor会拦截读事件;而加到事件循环中就能保证ServerBootstrapAcceptor一定在handler中添加的处理器的后面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@Override
void init(Channel channel) {
// 设置参数与属性
setChannelOptions(channel, options0().entrySet().toArray(newOptionArray(0)), logger);
setAttributes(channel, attrs0().entrySet().toArray(newAttrArray(0)));

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions =
childOptions.entrySet().toArray(newOptionArray(0));
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));

// pipeline设置ChannelHandler
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
// 配置handler
if (handler != null) {
pipeline.addLast(handler);
}
// 设置一个ServerBootstrapAcceptor
// ServerBootstrapAcceptor是设置child的配置
// 等到initChannel执行后再执行
// 这里为什么要用线程执行=>https://github.com/netty/netty/issues/5566#ref-commit-a76553e
// 为了解决上面那个bug,就是上面的handler如果是ChannelInitializer类型,则可能存在handler中添加的处理器接收不到读事件
// 因为initChannel这个方法是在Eventloop上执行的,而添加ServerBootstrapAcceptor不在Eventloop上就会导致
// ServerBootstrapAcceptor在handler中添加的处理器的前面,这样ServerBootstrapAcceptor会拦截读事件
// 加到事件循环中就能保证ServerBootstrapAcceptor一定在handler中添加的处理器的后面
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

内部类ServerBootstrapAcceptor

ServerBootstrapAcceptor继承ChannelInboundHandlerAdapter是处理Inbound事件的handler。

ServerBootstrapAcceptor的核心在channelRead方法,channelRead方法中将产生read事件的channel注册到chlidGroup上,让chlidGroup处理读写事件。

如果将childGroup和bossGroup配置的一样那就是同一个group处理连接和读写事件。

成员变量

1
2
3
4
5
6
7
8
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {

private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;
}

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ServerBootstrapAcceptor(
final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;

// Task which is scheduled to re-enable auto-read.
// It's important to create this Runnable before we try to submit it as otherwise the URLClassLoader may
// not be able to load the class because of the file limit it already reached.
//
// See https://github.com/netty/netty/issues/1328
enableAutoReadTask = new Runnable() {
@Override
public void run() {
channel.config().setAutoRead(true);
}
};
}

channelRead(ChannelHandlerContext ctx, Object msg)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 配置child
child.pipeline().addLast(childHandler);

setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);

try {
// 将channel注册到childGroup上
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}