ChannelHandlerContext是ChannelHandler的上下文,关联了Channel、ChannelHandler和ChannelPipeline。

ChannelHandlerContext

ChannelHandlerContext继承关系

ChannelHandlerContext持有channel、ChannelHandler和ChannelPipeline,ChannelPipeline持有ChannelHandlerContext。

一个ChannelHandler可以有多个ChannelHandlerContext前提是ChannelHandler是共享的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

Channel channel();

EventExecutor executor();

String name();

ChannelHandler handler();

boolean isRemoved();

@Override
ChannelHandlerContext fireChannelRegistered();

@Override
ChannelHandlerContext fireChannelUnregistered();

@Override
ChannelHandlerContext fireChannelActive();

@Override
ChannelHandlerContext fireChannelInactive();

@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);

@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);

@Override
ChannelHandlerContext fireChannelRead(Object msg);

@Override
ChannelHandlerContext fireChannelReadComplete();

@Override
ChannelHandlerContext fireChannelWritabilityChanged();

@Override
ChannelHandlerContext read();

@Override
ChannelHandlerContext flush();

ChannelPipeline pipeline();

ByteBufAllocator alloc();

@Deprecated
@Override
<T> Attribute<T> attr(AttributeKey<T> key);

@Deprecated
@Override
<T> boolean hasAttr(AttributeKey<T> key);
}

AbstractChannelHandlerContext

AbstractChannelHandlerContext主要讲解之前pipeline中没有讲解到的内容。

AbstractChannelHandlerContext本身存储首尾节点是一个双向队列。

ResourceLeakHint接口只有一个toHintString方法,用于内存泄漏跟踪时的信息,【’\’’ + name + “‘ will handle the message from this point.”】

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannelHandlerContext.class);
// 头尾,双向链表设计
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
// handlerState状态的原子更新属性
private static final AtomicIntegerFieldUpdater<AbstractChannelHandlerContext> HANDLER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractChannelHandlerContext.class, "handlerState");

// 即将被调用
private static final int ADD_PENDING = 1;
// 已经被调用
private static final int ADD_COMPLETE = 2;
// 已经取消
private static final int REMOVE_COMPLETE = 3;
// handlerAdded和handlerRemoved都还没有调用
private static final int INIT = 0;
// 关联的pipeline
private final DefaultChannelPipeline pipeline;
// 独一无二的名字
private final String name;
// 是否是OrderedEventExecutor
private final boolean ordered;
// 掩码,可以理解为类型,用二进制提高性能
private final int executionMask;

// Will be set to null if no child executor should be used, otherwise it will be set to the
// child executor.
// 如果没有子executor就设为null,除非它要被设为子executor
final EventExecutor executor;
private ChannelFuture succeededFuture;

// Lazily instantiated tasks used to trigger events to a handler with different executor.
// There is no need to make this volatile as at worse it will just create a few more instances then needed.
// 延迟实例化任务被用于触发事件去处理不同的executor
private Tasks invokeTasks;
// handler状态
private volatile int handlerState = INIT;

构造器

1
2
3
4
5
6
7
8
9
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.executionMask = mask(handlerClass);
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}

ChannelHandlerMask类专门处理mask

根据不同类型计算mask,这些类的区别体现在mask的不同位上。Inbound和Onbound是两大类,具体的根据的类型细分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
static int mask(Class<? extends ChannelHandler> clazz) {
// Try to obtain the mask from the cache first. If this fails calculate it and put it in the cache for fast
// lookup in the future.
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;

if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
}

if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;

if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}

if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}

return mask;
}

简单的get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public Channel channel() {
return pipeline.channel();
}

@Override
public ChannelPipeline pipeline() {
return pipeline;
}

@Override
public ByteBufAllocator alloc() {
return channel().config().getAllocator();
}

@Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
@Override
public String name() {
return name;
}

查询下一个Inbound的处理器findContextInbound

查询下一个Inbound的处理器是从当前处理器向后查询的。

1
2
3
4
5
6
7
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}

查询下一个Onbound的处理器findContextOutbound

查询下一个Onbound的处理器是从当前处理器向前查询的,查询的方向是Inbound和Onbound的一个不同点。

1
2
3
4
5
6
7
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}

Inbound事件实现

Inbound事件的实现逻辑基本一致,3步曲,只是mask和最后执行的handler的方法不一样。

  • fireChannelRegistered
  • fireChannelUnregistered
  • fireChannelActive
  • fireChannelInactive
  • fireExceptionCaught
  • fireUserEventTriggered
  • fireChannelRead
  • fireChannelReadComplete
  • fireChannelWritabilityChanged

下面以fireChannelRegistered示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
// 统一处理事件循环,如果不在事件循环中则添加任务到executor
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
// 如果没有实例化就执行fireChannelRegistered(),也就是查找下一个InboundHandler
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
// 执行handler的方法
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}

Onbound事件实现

与Inbound事件类似Onbound事件也是比较相似的实现。

  • bind
  • connect
  • disconnect
  • close
  • deregister
  • read
  • flush

bind

都是3步曲,一个处理promise,一个查找handler和处理事件循环,最后一个处理状态和执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return bind(localAddress, newPromise());
}
// 处理查询下一个OutboundHandler,处理事件循环
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}

final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
// 处理是否实例化
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}

write和writeAndFlush

writeAndFlush使用的就是write的代码,write相对于前面的多了task的处理。

pipeline.touch对可能产生内存的情况做了跟踪。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
write(msg, false, promise);

return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
// 查找OutboundHandler
final AbstractChannelHandlerContext next = findContextOutbound(flush ?
(MASK_WRITE | MASK_FLUSH) : MASK_WRITE);
// 内存泄漏跟踪
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
// 不在事件循环中的添加任务并执行
final AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
if (!safeExecute(executor, task, promise, m)) {
// We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
task.cancel();
}
}
}

安全执行safeExecute

safeExecute处理异常情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
try {
executor.execute(runnable);
return true;
} catch (Throwable cause) {
try {
promise.setFailure(cause);
} finally {
if (msg != null) {
ReferenceCountUtil.release(msg);
}
}
return false;
}
}

处理器状态设置setAddPending

AddPending代表pipeline中的channel还没有注册到eventloop

1
2
3
4
final void setAddPending() {
boolean updated = HANDLER_STATE_UPDATER.compareAndSet(this, INIT, ADD_PENDING);
assert updated; // This should always be true as it MUST be called before setAddComplete() or setRemoved().
}

pipeline添加回调callHandlerAdded

1
2
3
4
5
6
7
final void callHandlerAdded() throws Exception {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
if (setAddComplete()) {
handler().handlerAdded(this);
}
}

pipeline移除回调callHandlerRemoved

1
2
3
4
5
6
7
8
9
10
11
final void callHandlerRemoved() throws Exception {
try {
// Only call handlerRemoved(...) if we called handlerAdded(...) before.
if (handlerState == ADD_COMPLETE) {
handler().handlerRemoved(this);
}
} finally {
// Mark the handler as removed in any case.
setRemoved();
}
}

invokeHandler是否实例化

1
2
3
4
5
private boolean invokeHandler() {
// Store in local variable to reduce volatile reads.
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}

newPromise

1
2
3
4
@Override
public ChannelPromise newPromise() {
return new DefaultChannelPromise(channel(), executor());
}

AbstractWriteTask类

写任务

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
abstract static class AbstractWriteTask implements Runnable {

private static final boolean ESTIMATE_TASK_SIZE_ON_SUBMIT =
SystemPropertyUtil.getBoolean("io.netty.transport.estimateSizeOnSubmit", true);

// Assuming a 64-bit JVM, 16 bytes object header, 3 reference fields and one int field, plus alignment
private static final int WRITE_TASK_OVERHEAD =
SystemPropertyUtil.getInt("io.netty.transport.writeTaskSizeOverhead", 48);
// Recycler各个线程的对象池(ThreadLocal)
// 资源(对象)回收的处理器,只有recycle方法
private final Recycler.Handle<AbstractWriteTask> handle;
private AbstractChannelHandlerContext ctx;
private Object msg;
private ChannelPromise promise;
private int size;
}

构造器

Recycler.Handle接口只有recycle(AbstractWriteTask)这一个方法。

1
2
3
private AbstractWriteTask(Recycler.Handle<? extends AbstractWriteTask> handle) {
this.handle = (Recycler.Handle<AbstractWriteTask>) handle;
}

初始化init

init方法是对象重复利用的核心,通过Recycler来循环使用AbstractWriteTask对象,每次使用之前调用init来初始化对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected static void init(AbstractWriteTask task, AbstractChannelHandlerContext ctx,
Object msg, ChannelPromise promise) {
task.ctx = ctx;
task.msg = msg;
task.promise = promise;

if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
task.size = ctx.pipeline.estimatorHandle().size(msg) + WRITE_TASK_OVERHEAD;
// ChannelOutboundBuffer.setUnwritable
ctx.pipeline.incrementPendingOutboundBytes(task.size);
} else {
task.size = 0;
}
}

run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public final void run() {
try {
// ChannelOutboundBuffer.setWritable
decrementPendingOutboundBytes();
write(ctx, msg, promise);
} finally {
recycle();
}
}
private void decrementPendingOutboundBytes() {
if (ESTIMATE_TASK_SIZE_ON_SUBMIT) {
ctx.pipeline.decrementPendingOutboundBytes(size);
}
}
protected void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.invokeWrite(msg, promise);
}

cancel

1
2
3
4
5
6
7
void cancel() {
try {
decrementPendingOutboundBytes();
} finally {
recycle();
}
}

recycle

1
2
3
4
5
6
7
8
private void recycle() {
// Set to null so the GC can collect them directly
ctx = null;
msg = null;
promise = null;
// 资源你回收处理
handle.recycle(this);
}

WriteTask类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static final class WriteTask extends AbstractWriteTask implements SingleThreadEventLoop.NonWakeupRunnable {
// 各个线程的对象池(ThreadLocal)
private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
@Override
protected WriteTask newObject(Handle<WriteTask> handle) {
return new WriteTask(handle);
}
};

static WriteTask newInstance(
AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
WriteTask task = RECYCLER.get();
// 对象复用的核心,使用前调用初始化方法
init(task, ctx, msg, promise);
return task;
}

private WriteTask(Recycler.Handle<WriteTask> handle) {
super(handle);
}
}

WriteAndFlushTask类

WriteAndFlushTask任何只是在write方法中调用AbstractWriteTask的write方法后执行ctx.invokeFlush().

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static final class WriteAndFlushTask extends AbstractWriteTask {

private static final Recycler<WriteAndFlushTask> RECYCLER = new Recycler<WriteAndFlushTask>() {
@Override
protected WriteAndFlushTask newObject(Handle<WriteAndFlushTask> handle) {
return new WriteAndFlushTask(handle);
}
};

static WriteAndFlushTask newInstance(
AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
WriteAndFlushTask task = RECYCLER.get();
init(task, ctx, msg, promise);
return task;
}

private WriteAndFlushTask(Recycler.Handle<WriteAndFlushTask> handle) {
super(handle);
}

@Override
public void write(AbstractChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
super.write(ctx, msg, promise);
// 多一个刷新
ctx.invokeFlush();
}
}

Tasks任务工具类

Tasks是一个工具类,简化通用Runnable任务对象的构建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private static final class Tasks {
private final AbstractChannelHandlerContext next;
private final Runnable invokeChannelReadCompleteTask = new Runnable() {
@Override
public void run() {
next.invokeChannelReadComplete();
}
};
private final Runnable invokeReadTask = new Runnable() {
@Override
public void run() {
next.invokeRead();
}
};
private final Runnable invokeChannelWritableStateChangedTask = new Runnable() {
@Override
public void run() {
next.invokeChannelWritabilityChanged();
}
};
private final Runnable invokeFlushTask = new Runnable() {
@Override
public void run() {
next.invokeFlush();
}
};

Tasks(AbstractChannelHandlerContext next) {
this.next = next;
}
}

DefaultChannelHandlerContext

AbstractChannelHandlerContext中完成了几乎所有的功能,实现ChannelHandlerContext只要继承AbstractChannelHandlerContext就可以。

DefaultChannelHandlerContext中只要完成handler()方法的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

private final ChannelHandler handler;

DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, handler.getClass());
this.handler = handler;
}

@Override
public ChannelHandler handler() {
return handler;
}
}