在Netty里,Channel是通讯的载体,而ChannelHandler负责Channel中的逻辑处理。Channel是理解和使用Netty的核心,所以在正式解析流程前先将Channel做一个详细的拆解。

Channel

Channel接口定义了各种操作的方法;此外还有一个内部类Channel.Unsafe,Unsafe类封装了与底层buffer交互的逻辑,为Channel提供了统一的抽象,通常普通使用者不应该使用这个类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {

ChannelId id();

EventLoop eventLoop();

Channel parent();

ChannelConfig config();

boolean isOpen();

boolean isRegistered();

boolean isActive();

ChannelMetadata metadata();

SocketAddress localAddress();

SocketAddress remoteAddress();

ChannelFuture closeFuture();

boolean isWritable();

long bytesBeforeUnwritable();

long bytesBeforeWritable();

Unsafe unsafe();

ChannelPipeline pipeline();

ByteBufAllocator alloc();

@Override
Channel read();

@Override
Channel flush();

interface Unsafe {

RecvByteBufAllocator.Handle recvBufAllocHandle();

SocketAddress localAddress();

SocketAddress remoteAddress();

void register(EventLoop eventLoop, ChannelPromise promise);

void bind(SocketAddress localAddress, ChannelPromise promise);

void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);

void disconnect(ChannelPromise promise);

void close(ChannelPromise promise);

void closeForcibly();

void deregister(ChannelPromise promise);

void beginRead();

void write(Object msg, ChannelPromise promise);

void flush();

ChannelPromise voidPromise();

ChannelOutboundBuffer outboundBuffer();
}
}

在写服务器与客户端的例子的时候我们用到了NioServerSocketChannel和NioSocketChannel,可以看一下这两个类的继承关系。

NioSocketChannel

NioServerSocketChannel

可以看到核心的实现类是AbstractChannel,然后NIO的核心实现类是AbstractNioChannel。

AbstractChannel

Channel最基础的实现类,所有的Channel实现类都是基于AbstractChannel。

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);

private final Channel parent;
// 通道id,默认实现DefaultChannelId,机器id+4位进程id+递增序列号+8位时间戳+4位随机值
private final ChannelId id;
// unsafe对象
private final Unsafe unsafe;
// 流水线
private final DefaultChannelPipeline pipeline;
// 响应
private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
// 关闭响应
private final CloseFuture closeFuture = new CloseFuture(this);
// 本地与远程地址
private volatile SocketAddress localAddress;
private volatile SocketAddress remoteAddress;
// 事件循环
private volatile EventLoop eventLoop;
// 是否注册
private volatile boolean registered;
//
private boolean closeInitiated;
private Throwable initialCloseCause;

/** Cache for the string representation of this channel */
private boolean strValActive;
private String strVal;
}

核心方法

DefaultChannelPipeline处理实际的连接、注册、读写等交互逻辑,unsafe处理底层buffer交互。

是否可写isWritable()

1
2
3
4
5
@Override
public boolean isWritable() {
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
return buf != null && buf.isWritable();
}

还未写入的字节数bytesBeforeWritable()

1
2
3
4
5
6
7
@Override
public long bytesBeforeWritable() {
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
// isWritable() is currently assuming if there is no outboundBuffer then the channel is not writable.
// We should be consistent with that here.
return buf != null ? buf.bytesBeforeWritable() : Long.MAX_VALUE;
}

绑定bind

1
2
3
4
5
6
7
8
@Override
public ChannelFuture bind(SocketAddress localAddress) {
return pipeline.bind(localAddress);
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}

连接connect

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}

@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, localAddress, promise);
}

读取read

1
2
3
4
5
@Override
public Channel read() {
pipeline.read();
return this;
}

写write

1
2
3
4
5
6
7
8
9
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}

@Override
public ChannelFuture write(Object msg, ChannelPromise promise) {
return pipeline.write(msg, promise);
}

写并刷新writeAndFlush

1
2
3
4
5
6
7
8
9
@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}

@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
return pipeline.writeAndFlush(msg, promise);
}

future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public ChannelFuture newSucceededFuture() {
return pipeline.newSucceededFuture();
}

@Override
public ChannelFuture newFailedFuture(Throwable cause) {
return pipeline.newFailedFuture(cause);
}

@Override
public ChannelFuture closeFuture() {
return closeFuture;
}

AbstractUnsafe

AbstractUnsafe实现方法中实现底层实现无关的内容例如pipeLine各种事件回调,将底层实现相关的封装成doXX方法交由子类去实现。

成员变量

1
2
3
4
5
6
7
8
protected abstract class AbstractUnsafe implements Unsafe {

private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
private RecvByteBufAllocator.Handle recvHandle;
private boolean inFlush0;
/** true if the channel has never been registered, false otherwise */
private boolean neverRegistered = true;
}

注册register

  1. 具体的注册逻辑doRegister()为空方法交由子类实现;
  2. 注册会先将pipeline的handler初始化[pipeline.invokeHandlerAddedIfNeeded()]
  3. 然后触发pipeline.fireChannelRegistered();
  4. 如果是第一次注册则还会触发pipeline.fireChannelActive();
  5. 如果开启了自动读取(autoRead)则会执行beginRead()方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}

AbstractChannel.this.eventLoop = eventLoop;
// 在事件循环中则注册
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 加入事件循环
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

实际的注册逻辑register0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 空方法由子类实现
doRegister();
neverRegistered = false;
registered = true;

// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 实例化handler
pipeline.invokeHandlerAddedIfNeeded();

safeSetSuccess(promise);
// Inbound 事件的fireChannelRegistered
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
// fireChannelActive
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

设置成功safeSetSuccess

1
2
3
4
5
protected final void safeSetSuccess(ChannelPromise promise) {
if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
}
}

绑定bind

  1. 具体的绑定逻辑doBind(localAddress)为空方法交由子类实现;
  2. 之前不是activie状态,在绑定之后变为了active时异步触发pipeline.fireChannelActive()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();

if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}

// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}

boolean wasActive = isActive();
try {
// 空方法由之类实现
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// !wasActive之前不是activie状态,isActive()在绑定之后变为了active时异步触发pipeline.fireChannelActive()
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}

safeSetSuccess(promise);
}

断开连接disconnect

  1. 具体的断开逻辑doDisconnect()为空方法交由子类实现;
  2. 之前是activie状态,在断开之后变为了不是active时异步触发pipeline.fireChannelInactive()
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public final void disconnect(final ChannelPromise promise) {
    assertEventLoop();

    if (!promise.setUncancellable()) {
    return;
    }

    boolean wasActive = isActive();
    try {
    doDisconnect();
    // Reset remoteAddress and localAddress
    remoteAddress = null;
    localAddress = null;
    } catch (Throwable t) {
    safeSetFailure(promise, t);
    closeIfClosed();
    return;
    }

    if (wasActive && !isActive()) {
    invokeLater(new Runnable() {
    @Override
    public void run() {
    pipeline.fireChannelInactive();
    }
    });
    }

    safeSetSuccess(promise);
    closeIfClosed(); // doDisconnect() might have closed the channel
    }

开始读beginRead

  • 保留doBeginRead()空方法给子类实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public final void beginRead() {
    assertEventLoop();

    if (!isActive()) {
    return;
    }

    try {
    doBeginRead();
    } catch (final Exception e) {
    invokeLater(new Runnable() {
    @Override
    public void run() {
    pipeline.fireExceptionCaught(e);
    }
    });
    close(voidPromise());
    }
    }

写write

将msg写到ChannelOutboundBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();

ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, newClosedChannelException(initialCloseCause));
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}

int size;
try {
// msg处理,默认不处理
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}

outboundBuffer.addMessage(msg, size, promise);
}

刷新buffer flush

1
2
3
4
5
6
7
8
9
10
11
public final void flush() {
assertEventLoop();

ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}

outboundBuffer.addFlush();
flush0();
}

flush0中执行doWrite()方法,doWrite方法由子类实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}

final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}

inFlush0 = true;

// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(new NotYetConnectedException(), true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false);
}
} finally {
inFlush0 = false;
}
return;
}

try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
initialCloseCause = t;
close(voidPromise(), t, newClosedChannelException(t), false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
initialCloseCause = t;
close(voidPromise(), t2, newClosedChannelException(t), false);
}
}
} finally {
inFlush0 = false;
}
}

AbstractNioChannel

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public abstract class AbstractNioChannel extends AbstractChannel {
// java nio的channel
private final SelectableChannel ch;
protected final int readInterestOp;
// key
volatile SelectionKey selectionKey;

boolean readPending;
private final Runnable clearReadPendingRunnable = new Runnable() {
@Override
public void run() {
clearReadPending0();
}
};

/**
* The future of the current connection attempt. If not null, subsequent
* connection attempts will fail.
*/
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
// 远程地址
private SocketAddress requestedRemoteAddress;
}

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}

throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}

NioUnsafe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface NioUnsafe extends Unsafe {
/**
* Return underlying {@link SelectableChannel}
*/
SelectableChannel ch();
/**
* Finish connect
*/
void finishConnect();
/**
* Read from underlying {@link SelectableChannel}
*/
void read();
void forceFlush();
}

AbstractNioUnsafe

连接connnect

AbstractNioUnsafe实现了连接的通用处理比如超时,但是实际的连接用了doConnect空方法交由子类去实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}

try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}

boolean wasActive = isActive();
// doConnect 空方法留给子类实现
if (doConnect(remoteAddress, localAddress)) {
// 连接成功执行pipeline().fireChannelActive()
// 如果取消了连接则
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;

// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
// 连接超时处理
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
// 添加监听器
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// 取消了连接则取消超时
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}

注册doRegister

使用jdk channel的注册

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 注册
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}

开始读doBeginRead

doBeginRead将selectionKey设置readInterestOp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 设置readInterestOp
selectionKey.interestOps(interestOps | readInterestOp);
}
}

获取新缓冲区newDirectBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.safeRelease(buf);
return Unpooled.EMPTY_BUFFER;
}

final ByteBufAllocator alloc = alloc();
// DirectBuffer池中的循环使用
if (alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
// threadLocal中的
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf != null) {
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
// 其他不直接创建
// Allocating and deallocating an unpooled direct buffer is very expensive; give up.
return buf;
}

doClose

执行connectPromise.tryFailure并且取消超时connectTimeoutFuture.cancel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected void doClose() throws Exception {
ChannelPromise promise = connectPromise;
if (promise != null) {
// Use tryFailure() instead of setFailure() to avoid the race against cancel().
promise.tryFailure(new ClosedChannelException());
connectPromise = null;
}

ScheduledFuture<?> future = connectTimeoutFuture;
if (future != null) {
future.cancel(false);
connectTimeoutFuture = null;
}
}