Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

学习Netty之前最好先学习NIO的相关内容便于理解。

使用

包引入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.43.Final</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-access</artifactId>
<version>${logback.version}</version>
</dependency>

服务端

ChannelHandler

一个请求对应一个channel,一个channel拥有多个ChannelHandler。ChannelHandler可以理解为请求处理,既可以是接收请求的处理,也可以是发送请求的处理。ChannelInboundHandler负责接收请求处理,ChannelOutboundHandlerAdapter负责发送请求的处理。

这个处理包括编码、解码、数据处理等。

请求处理 DiscardServerHandler,客户端发送【hello】服务端回【你好】,客户端发【time】服务端回【当前时间】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
try {
byte[] b = new byte[1024];
int size = 0;
// 读取客户端发送的请求
while (in.isReadable()){
if (size == b.length){
b = Arrays.copyOf(b, b.length*2);
}
b[size++] = in.readByte();
}
String str = new String(b, 0, size);
System.out.println(str);
// 响应
final ByteBuf byteBuf = ctx.alloc().buffer(4);
if ("hello".equals(str)){
byteBuf.writeCharSequence("你好", StandardCharsets.UTF_8);
}
if ("time".equals(str)){
byteBuf.writeCharSequence(new Date().toString(), StandardCharsets.UTF_8);
}
final ChannelFuture f = ctx.writeAndFlush(byteBuf);
}finally {
// 释放缓冲
ReferenceCountUtil.release(msg);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}

启动
EventLoopGroup 事件循环组(维护一组线程)

  1. boss接受新连接线程,主要负责创建新连接
  2. worker负责读取数据的线程,主要用于读取数据以及业务逻辑处理

ChannelOption.SO_BACKLOG, 1024

BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class NetService {
private int port;

public static void main(String[] args) throws InterruptedException {
new NetService(8080).serverRun();
}

public NetService(int port){
this.port = port;
}

public void serverRun() throws InterruptedException {
// 处理连接
EventLoopGroup boss = new NioEventLoopGroup();
// 处理读写
EventLoopGroup worker = new NioEventLoopGroup();
try {
// 启动类
ServerBootstrap b = new ServerBootstrap();
b.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// ChannelHandler链
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
// 等待关闭,堵塞
f.channel().closeFuture().sync();
}finally {
// 资源释放
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

客户端

ChannelHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class ClientHandler extends ChannelInboundHandlerAdapter {
// 和服务端建立了连接
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final ByteBuf byteBuf = ctx.alloc().buffer(4);
// 发送消息
byteBuf.writeCharSequence("hello", StandardCharsets.UTF_8);
final ChannelFuture f = ctx.writeAndFlush(byteBuf);
/*f.addListener((ChannelFutureListener) future -> {
assert f == future;
ctx.close();
});*/
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
try {
byte[] b = new byte[1024];
int size = 0;
while (in.isReadable()){
if (size == b.length){
b = Arrays.copyOf(b, b.length*2);
}
b[size++] = in.readByte();
}
String str = new String(b, 0, size);
System.out.println(str);
System.out.flush();
// 服务器响应后再发消息
if ("你好".equals(str)){
final ByteBuf byteBuf = ctx.alloc().buffer(4);
byteBuf.writeCharSequence("time", StandardCharsets.UTF_8);
ctx.writeAndFlush(byteBuf);
}
}finally {
ReferenceCountUtil.release(msg);
}
}
}

启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class NetClient {

public static void main(String[] args) throws InterruptedException {
String host = "127.0.0.1";
int port = 8080;
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 添加处理链
ch.pipeline().addLast(new ClientHandler());
}
});
// 连接
ChannelFuture f = b.connect(host, port).sync();
// 等待连接断开,堵塞
f.channel().closeFuture().sync();
}finally {
workerGroup.shutdownGracefully();
}
}
}

粘包和拆包

上面的实现代码直接将ByteBuf中的字节数组转化为字符串并没有考虑粘包和拆包的情况,如果发送数据包比较多就会发现数据不正确。

Netty提供了几个编解码器用于处理粘包和拆包:

  • 固定长度的拆包器 FixedLengthFrameDecoder
  • 行拆包器 LineBasedFrameDecoder
  • 分隔符拆包器 DelimiterBasedFrameDecoder
  • 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder

r