ChannelHandler负责处理I/O事件或者拦截I/O操作,并且通过ChannelPipeline转发到下一个处理程序。

ChannelHandler中没有提供什么方法,要是实现只能选择ChannelInboundHandler和ChannelOutboundHandler去处理inbound和outbound操作。

ChannelHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface ChannelHandler {
// 添加和移除上下文
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
// 是否可分享的,可分享的handler能被多个ChannelPipeline持有。
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value
}
}

ChannelInboundHandler

在上一节ChannelPipeline中的解析其实对ChannelInboundHandler已经比较了解了,总共9个事件入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public interface ChannelInboundHandler extends ChannelHandler {
// channel已注册
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
// channel已注销
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
// channel存活
void channelActive(ChannelHandlerContext ctx) throws Exception;
// channel死亡
void channelInactive(ChannelHandlerContext ctx) throws Exception;
// channel读取
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
// channel读取完成
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
// 用户事件触发
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
// channel可写状态变更
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

ChannelOutboundHandler

共8个方法,Outbound需要主动操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface ChannelOutboundHandler extends ChannelHandler {
// 绑定
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
// 连接
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
// 断开连接
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
// 关闭
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
// 撤销登记
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
// 读
void read(ChannelHandlerContext ctx) throws Exception;
// 写
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
// 刷新
void flush(ChannelHandlerContext ctx) throws Exception;
}

ChannelHandlerAdapter

ChannelHandler实现的基本骨架,主要解决handler是否可共享。

如果类上加上Sharable注释则代表可以共享。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public abstract class ChannelHandlerAdapter implements ChannelHandler {

boolean added;

protected void ensureNotSharable() {
if (isSharable()) {
throw new IllegalStateException("ChannelHandler " + getClass().getName() + " is not allowed to be shared");
}
}
// 是否可共享的,通过类是否有Sharable注解
public boolean isSharable() {
/**
* Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a
* {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different
* {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of
* {@link Thread}s are quite limited anyway.
*
* See <a href="https://github.com/netty/netty/issues/2289">#2289</a>.
*/
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP
}

@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// NOOP
}

@Skip
@Override
@Deprecated
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}

ChannelInboundHandlerAdapter

ChannelInboundHandlerAdapter中将所有的方法实现交由传入的ChannelHandlerContext去执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {

@Skip
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}

@Skip
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}

@Skip
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}

@Skip
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}

@Skip
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}

@Skip
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}

@Skip
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}

@Skip
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}

@Skip
@Override
@SuppressWarnings("deprecation")
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
ctx.fireExceptionCaught(cause);
}
}

ChannelOutboundHandlerAdapter

ChannelOutboundHandlerAdapter与ChannelInboundHandlerAdapter逻辑一致将具体的执行逻辑交由ChannelHandlerContext处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {

@Skip
@Override
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}

@Skip
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}

@Skip
@Override
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.disconnect(promise);
}

@Skip
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
ctx.close(promise);
}

@Skip
@Override
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}

@Skip
@Override
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}

@Skip
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}

@Skip
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}