ChannelHandler负责处理I/O事件或者拦截I/O操作,并且通过ChannelPipeline转发到下一个处理程序。
ChannelHandler中没有提供什么方法,要是实现只能选择ChannelInboundHandler和ChannelOutboundHandler去处理inbound和outbound操作。
ChannelHandler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public interface ChannelHandler { void handlerAdded (ChannelHandlerContext ctx) throws Exception ; void handlerRemoved (ChannelHandlerContext ctx) throws Exception ; @Deprecated void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception ; @Inherited @Documented @Target (ElementType.TYPE) @Retention (RetentionPolicy.RUNTIME) @interface Sharable { } }
ChannelInboundHandler 在上一节ChannelPipeline中的解析其实对ChannelInboundHandler已经比较了解了,总共9个事件入口1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public interface ChannelInboundHandler extends ChannelHandler { void channelRegistered (ChannelHandlerContext ctx) throws Exception ; void channelUnregistered (ChannelHandlerContext ctx) throws Exception ; void channelActive (ChannelHandlerContext ctx) throws Exception ; void channelInactive (ChannelHandlerContext ctx) throws Exception ; void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception ; void channelReadComplete (ChannelHandlerContext ctx) throws Exception ; void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception ; void channelWritabilityChanged (ChannelHandlerContext ctx) throws Exception ; @Override @SuppressWarnings ("deprecation" ) void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception ; }
ChannelOutboundHandler 共8个方法,Outbound需要主动操作1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public interface ChannelOutboundHandler extends ChannelHandler { void bind (ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception ; void connect ( ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception ; void disconnect (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception ; void close (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception ; void deregister (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception ; void read (ChannelHandlerContext ctx) throws Exception ; void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception ; void flush (ChannelHandlerContext ctx) throws Exception ; }
ChannelHandlerAdapter ChannelHandler实现的基本骨架,主要解决handler是否可共享。
如果类上加上Sharable注释则代表可以共享。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public abstract class ChannelHandlerAdapter implements ChannelHandler { boolean added; protected void ensureNotSharable () { if (isSharable()) { throw new IllegalStateException("ChannelHandler " + getClass().getName() + " is not allowed to be shared" ); } } public boolean isSharable () { Class<?> clazz = getClass(); Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache(); Boolean sharable = cache.get(clazz); if (sharable == null ) { sharable = clazz.isAnnotationPresent(Sharable.class); cache.put(clazz, sharable); } return sharable; } @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { } @Override public void handlerRemoved (ChannelHandlerContext ctx) throws Exception { } @Skip @Override @Deprecated public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } }
ChannelInboundHandlerAdapter ChannelInboundHandlerAdapter中将所有的方法实现交由传入的ChannelHandlerContext去执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler { @Skip @Override public void channelRegistered (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); } @Skip @Override public void channelUnregistered (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelUnregistered(); } @Skip @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); } @Skip @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelInactive(); } @Skip @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { ctx.fireChannelRead(msg); } @Skip @Override public void channelReadComplete (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelReadComplete(); } @Skip @Override public void userEventTriggered (ChannelHandlerContext ctx, Object evt) throws Exception { ctx.fireUserEventTriggered(evt); } @Skip @Override public void channelWritabilityChanged (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelWritabilityChanged(); } @Skip @Override @SuppressWarnings ("deprecation" ) public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.fireExceptionCaught(cause); } }
ChannelOutboundHandlerAdapter ChannelOutboundHandlerAdapter与ChannelInboundHandlerAdapter逻辑一致将具体的执行逻辑交由ChannelHandlerContext处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler { @Skip @Override public void bind (ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.bind(localAddress, promise); } @Skip @Override public void connect (ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception { ctx.connect(remoteAddress, localAddress, promise); } @Skip @Override public void disconnect (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.disconnect(promise); } @Skip @Override public void close (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.close(promise); } @Skip @Override public void deregister (ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { ctx.deregister(promise); } @Skip @Override public void read (ChannelHandlerContext ctx) throws Exception { ctx.read(); } @Skip @Override public void write (ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg, promise); } @Skip @Override public void flush (ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }