Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
学习Netty之前最好先学习NIO的相关内容便于理解。
使用
包引入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.43.Final</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.12</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>${logback.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>${logback.version}</version> </dependency>
<dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-access</artifactId> <version>${logback.version}</version> </dependency>
|
服务端
ChannelHandler
一个请求对应一个channel,一个channel拥有多个ChannelHandler。ChannelHandler可以理解为请求处理,既可以是接收请求的处理,也可以是发送请求的处理。ChannelInboundHandler负责接收请求处理,ChannelOutboundHandlerAdapter负责发送请求的处理。
这个处理包括编码、解码、数据处理等。
请求处理 DiscardServerHandler,客户端发送【hello】服务端回【你好】,客户端发【time】服务端回【当前时间】
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; try { byte[] b = new byte[1024]; int size = 0; while (in.isReadable()){ if (size == b.length){ b = Arrays.copyOf(b, b.length*2); } b[size++] = in.readByte(); } String str = new String(b, 0, size); System.out.println(str); final ByteBuf byteBuf = ctx.alloc().buffer(4); if ("hello".equals(str)){ byteBuf.writeCharSequence("你好", StandardCharsets.UTF_8); } if ("time".equals(str)){ byteBuf.writeCharSequence(new Date().toString(), StandardCharsets.UTF_8); } final ChannelFuture f = ctx.writeAndFlush(byteBuf); }finally { ReferenceCountUtil.release(msg); } }
@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
|
启动
EventLoopGroup 事件循环组(维护一组线程)
- boss接受新连接线程,主要负责创建新连接
- worker负责读取数据的线程,主要用于读取数据以及业务逻辑处理
ChannelOption.SO_BACKLOG, 1024
BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class NetService { private int port;
public static void main(String[] args) throws InterruptedException { new NetService(8080).serverRun(); }
public NetService(int port){ this.port = port; }
public void serverRun() throws InterruptedException { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(boss, worker) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); }finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
|
客户端
ChannelHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { final ByteBuf byteBuf = ctx.alloc().buffer(4); byteBuf.writeCharSequence("hello", StandardCharsets.UTF_8); final ChannelFuture f = ctx.writeAndFlush(byteBuf);
}
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; try { byte[] b = new byte[1024]; int size = 0; while (in.isReadable()){ if (size == b.length){ b = Arrays.copyOf(b, b.length*2); } b[size++] = in.readByte(); } String str = new String(b, 0, size); System.out.println(str); System.out.flush(); if ("你好".equals(str)){ final ByteBuf byteBuf = ctx.alloc().buffer(4); byteBuf.writeCharSequence("time", StandardCharsets.UTF_8); ctx.writeAndFlush(byteBuf); } }finally { ReferenceCountUtil.release(msg); } } }
|
启动
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class NetClient {
public static void main(String[] args) throws InterruptedException { String host = "127.0.0.1"; int port = 8080; EventLoopGroup workerGroup = new NioEventLoopGroup();
try { Bootstrap b = new Bootstrap(); b.group(workerGroup); b.channel(NioSocketChannel.class); b.option(ChannelOption.SO_KEEPALIVE, true); b.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); }finally { workerGroup.shutdownGracefully(); } } }
|
粘包和拆包
上面的实现代码直接将ByteBuf中的字节数组转化为字符串并没有考虑粘包和拆包的情况,如果发送数据包比较多就会发现数据不正确。
Netty提供了几个编解码器用于处理粘包和拆包:
- 固定长度的拆包器 FixedLengthFrameDecoder
- 行拆包器 LineBasedFrameDecoder
- 分隔符拆包器 DelimiterBasedFrameDecoder
- 基于数据包长度的拆包器 LengthFieldBasedFrameDecoder
r