前言

ServerCnxnFactory负责与客户端的连接。

初始化

org.apache.zookeeper.server.quorum.QuorumPeerMain#runFromConfig

1
2
3
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());

默认使用NIOServerCnxnFactory,可以通过

启动

org.apache.zookeeper.server.quorum.QuorumPeer#start

1
2
3
4
5
6
7
@Override
public synchronized void start() {
loadDataBase();
cnxnFactory.start();
startLeaderElection();
super.start();
}

NIOServerCnxnFactory

继承关系

1
public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {}

构造器

1
2
public NIOServerCnxnFactory() throws IOException {
}

configure()

  1. 初始了一个ZookeeperThread;
  2. 初始了ServerSocketChannel(NIO中的接收连接的Server),负责接收请求
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    @Override
    public void configure(InetSocketAddress addr, int maxcc) throws IOException {
    configureSaslLogin();

    thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);
    thread.setDaemon(true);
    maxClientCnxns = maxcc;
    this.ss = ServerSocketChannel.open();
    ss.socket().setReuseAddress(true);
    LOG.info("binding to port " + addr);
    ss.socket().bind(addr);
    ss.configureBlocking(false);
    // 设置选择器,通过选择器来管理网络连接
    ss.register(selector, SelectionKey.OP_ACCEPT);
    }

run()

  1. 有连接事件(SelectionKey.OP_ACCEPT)将连接封装成NIOServerCnxn,并保存在SelectionKey中;
  2. 有读写事件(SelectionKey.OP_READ | SelectionKey.OP_WRITE)从SelectionKey中取出NIOServerCnxn,执行NIOServerCnxn.doIO()方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    public void run() {
    while (!ss.socket().isClosed()) {
    try {
    selector.select(1000);
    Set<SelectionKey> selected;
    synchronized (this) {
    selected = selector.selectedKeys();
    }
    ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
    selected);
    Collections.shuffle(selectedList);
    // 遍历连接的请求
    for (SelectionKey k : selectedList) {
    // 连接
    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
    SocketChannel sc = ((ServerSocketChannel) k
    .channel()).accept();
    InetAddress ia = sc.socket().getInetAddress();
    int cnxncount = getClientCnxnCount(ia);
    // 上限
    if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
    LOG.warn("Too many connections from " + ia
    + " - max is " + maxClientCnxns );
    sc.close();
    } else {
    LOG.info("Accepted socket connection from "
    + sc.socket().getRemoteSocketAddress());
    sc.configureBlocking(false);
    SelectionKey sk = sc.register(selector,
    SelectionKey.OP_READ);
    // 维护上下文
    NIOServerCnxn cnxn = createConnection(sc, sk);
    sk.attach(cnxn);
    addCnxn(cnxn);
    }
    // 读写
    } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
    NIOServerCnxn c = (NIOServerCnxn) k.attachment();
    c.doIO(k);
    } else {
    if (LOG.isDebugEnabled()) {
    LOG.debug("Unexpected ops in select "
    + k.readyOps());
    }
    }
    }
    selected.clear();
    } catch (RuntimeException e) {
    LOG.warn("Ignoring unexpected runtime exception", e);
    } catch (Exception e) {
    LOG.warn("Ignoring exception", e);
    }
    }
    closeAll();
    LOG.info("NIOServerCnxn factory exited run method");
    }

createConnection()

返回NIOServerCnxn对象。

1
2
3
4
protected NIOServerCnxn createConnection(SocketChannel sock,
SelectionKey sk) throws IOException {
return new NIOServerCnxn(zkServer, sock, sk, this);
}

NIOServerCnxn 连接管理

一个NIOServerCnxn代表着一个连接,NIOServerCnxn负责客户端的读写,NIOServerCnxnFactory接收到连接后交由NIOServerCnxn.doIO()来处理。

读取过程

响应过程

服务器在FinalRequestProcessor处理器处理完请求后响应客户端:

org.apache.zookeeper.server.FinalRequestProcessor#processRequest
cnxn.sendResponse(new ReplyHeader())

继承关系

1
public class NIOServerCnxn extends ServerCnxn {}

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
NIOServerCnxnFactory factory;

final SocketChannel sock;

protected final SelectionKey sk;

boolean initialized;

ByteBuffer lenBuffer = ByteBuffer.allocate(4);

ByteBuffer incomingBuffer = lenBuffer;

LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();

int sessionTimeout;

protected final ZooKeeperServer zkServer;

/**
* The number of requests that have been submitted but not yet responded to.
*/
int outstandingRequests;

/**
* This is the id that uniquely identifies the session of a client. Once
* this session is no longer active, the ephemeral nodes will go away.
*/
long sessionId;

static long nextSessionId = 1;
int outstandingLimit = 1;

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
SelectionKey sk, NIOServerCnxnFactory factory) throws IOException {
this.zkServer = zk;
this.sock = sock;
this.sk = sk;
this.factory = factory;
if (this.factory.login != null) {
this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
}
if (zk != null) {
outstandingLimit = zk.getGlobalOutstandingLimit();
}
sock.socket().setTcpNoDelay(true);
/* set socket linger to false, so that socket close does not
* block */
sock.socket().setSoLinger(false, -1);
InetAddress addr = ((InetSocketAddress) sock.socket()
.getRemoteSocketAddress()).getAddress();
authInfo.add(new Id("ip", addr.getHostAddress()));
sk.interestOps(SelectionKey.OP_READ);
}

方法

doIO 处理读写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
void doIO(SelectionKey k) throws InterruptedException {
try {
if (isSocketOpen() == false) {
LOG.warn("trying to do i/o on a null socket for session:0x"
+ Long.toHexString(sessionId));

return;
}
if (k.isReadable()) {
/*
处理读操作的流程
1.最开始incomingBuffer就是lenBuffer,容量为4.第一次读取4个字节,即此次请求报文的长度
2.根据请求报文的长度分配incomingBuffer的大小
3.将读到的字节存放在incomingBuffer中,直至读满
(由于第2步中为incomingBuffer分配的长度刚好是报文的长度,此时incomingBuffer中刚好时一个报文)
4.处理报文
*/
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
/*
只有incomingBuffer.remaining() == 0,才会进行下一步的处理,否则一直读取数据直到incomingBuffer读满,此时有两种可能:
1.incomingBuffer就是lenBuffer,此时incomingBuffer的内容是此次请求报文的长度.
根据lenBuffer为incomingBuffer分配空间后调用readPayload().
在readPayload()中会立马进行一次数据读取,(1)若可以将incomingBuffer读满,则incomingBuffer中就是一个完整的请求,处理该请求;
(2)若不能将incomingBuffer读满,说明出现了拆包问题,此时不能构造一个完整的请求,只能等待客户端继续发送数据,等到下次socketChannel可读时,继续将数据读取到incomingBuffer中
2.incomingBuffer不是lenBuffer,说明上次读取时出现了拆包问题,incomingBuffer中只有一个请求的部分数据.
而这次读取的数据加上上次读取的数据凑成了一个完整的请求,调用readPayload()
*/
if (incomingBuffer.remaining() == 0) { // 完整报文读取,remaining还能存多少数据
boolean isPayload;
if (incomingBuffer == lenBuffer) { // start of next request
incomingBuffer.flip();
isPayload = readLength(k);
incomingBuffer.clear();
} else {
// continuation
isPayload = true;
}
if (isPayload) { // not the case for 4letterword
readPayload(); // 这里处理请求
}
else {
// four letter words take care
// need not do anything else
return;
}
}
}
if (k.isWritable()) { //发送请求
// ZooLog.logTraceMessage(LOG,
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
// "outgoingBuffers.size() = " +
// outgoingBuffers.size());
if (outgoingBuffers.size() > 0) { //队列中有要发送的数据
// ZooLog.logTraceMessage(LOG,
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
// "sk " + k + " is valid: " +
// k.isValid());

/*
* This is going to reset the buffer position to 0 and the
* limit to the size of the buffer, so that we can fill it
* with data from the non-direct buffers that we need to
* send.
*/
ByteBuffer directBuffer = factory.directBuffer;
directBuffer.clear();

for (ByteBuffer b : outgoingBuffers) {
if (directBuffer.remaining() < b.remaining()) {
/*
* When we call put later, if the directBuffer is to
* small to hold everything, nothing will be copied,
* so we've got to slice the buffer if it's too big.
*/
b = (ByteBuffer) b.slice().limit(
directBuffer.remaining());
}
/*
* put() is going to modify the positions of both
* buffers, put we don't want to change the position of
* the source buffers (we'll do that after the send, if
* needed), so we save and reset the position after the
* copy
*/
int p = b.position();
directBuffer.put(b);
b.position(p);
if (directBuffer.remaining() == 0) {
break;
}
}
/*
* Do the flip: limit becomes position, position gets set to
* 0. This sets us up for the write.
*/
directBuffer.flip(); // 位置置0便于读取

int sent = sock.write(directBuffer);
ByteBuffer bb;

// Remove the buffers that we have sent
while (outgoingBuffers.size() > 0) {
bb = outgoingBuffers.peek();
if (bb == ServerCnxnFactory.closeConn) {
throw new CloseRequestException("close requested");
}
int left = bb.remaining() - sent;
if (left > 0) {
/*
* We only partially sent this buffer, so we update
* the position and exit the loop.
*/
bb.position(bb.position() + sent);
break;
}
packetSent();
/* We've sent the whole buffer, so drop the buffer */
sent -= bb.remaining();
outgoingBuffers.remove();
}
// ZooLog.logTraceMessage(LOG,
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
// outgoingBuffers.size() = " + outgoingBuffers.size());
}

synchronized(this.factory){
if (outgoingBuffers.size() == 0) {
if (!initialized
&& (sk.interestOps() & SelectionKey.OP_READ) == 0) {
throw new CloseRequestException("responded to info probe");
}
sk.interestOps(sk.interestOps()
& (~SelectionKey.OP_WRITE));
} else {
sk.interestOps(sk.interestOps()
| SelectionKey.OP_WRITE);
}
}
}
} catch (CancelledKeyException e) {
LOG.warn("CancelledKeyException causing close of session 0x"
+ Long.toHexString(sessionId));
if (LOG.isDebugEnabled()) {
LOG.debug("CancelledKeyException stack trace", e);
}
close();
} catch (CloseRequestException e) {
// expecting close to log session closure
close();
} catch (EndOfStreamException e) {
LOG.warn(e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("EndOfStreamException stack trace", e);
}
// expecting close to log session closure
close();
} catch (IOException e) {
LOG.warn("Exception causing close of session 0x"
+ Long.toHexString(sessionId) + ": " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("IOException stack trace", e);
}
close();
}
}

readPayload()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void readPayload() throws IOException, InterruptedException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
if (rc < 0) {
throw new EndOfStreamException(
"Unable to read additional data from client sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely client has closed socket");
}
}

if (incomingBuffer.remaining() == 0) { // have we read length bytes?
packetReceived();
incomingBuffer.flip();// 位置设为0,即可以从头开始读取
if (!initialized) {
readConnectRequest();
} else {
readRequest();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}

readConnectRequest()

1
2
3
4
5
6
7
private void readConnectRequest() throws IOException, InterruptedException {
if (!isZKServerRunning()) {
throw new IOException("ZooKeeperServer not running");
}
zkServer.processConnectRequest(this, incomingBuffer);
initialized = true;
}

readRequest()

1
2
3
private void readRequest() throws IOException {
zkServer.processPacket(this, incomingBuffer);
}

internalSendBuffer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
protected void internalSendBuffer(ByteBuffer bb) {
if (bb != ServerCnxnFactory.closeConn) {
// We check if write interest here because if it is NOT set,
// nothing is queued, so we can try to send the buffer right
// away without waking up the selector
if(sk.isValid() &&
((sk.interestOps() & SelectionKey.OP_WRITE) == 0)) {
try {
sock.write(bb);
} catch (IOException e) {
// we are just doing best effort right now
}
}
// if there is nothing left to send, we are done
if (bb.remaining() == 0) {
packetSent();
return;
}
}

synchronized(this.factory){
sk.selector().wakeup();
if (LOG.isTraceEnabled()) {
LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
+ " is valid: " + sk.isValid());
}
outgoingBuffers.add(bb);
if (sk.isValid()) {
sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
}
}
}

sendBuffer()

1
2
3
4
5
6
7
public void sendBuffer(ByteBuffer bb) {
try {
internalSendBuffer(bb);
} catch(Exception e) {
LOG.error("Unexpected Exception: ", e);
}
}

sendResponse() 发送请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Make space for length
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
try {
baos.write(fourBytes);
bos.writeRecord(h, "header");
if (r != null) {
bos.writeRecord(r, tag);
}
baos.close();
} catch (IOException e) {
LOG.error("Error serializing response");
}
byte b[] = baos.toByteArray();
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
sendBuffer(bb);
if (h.getXid() > 0) {
synchronized(this){
outstandingRequests--;
}
// check throttling
synchronized (this.factory) {
if (zkServer.getInProcess() < outstandingLimit
|| outstandingRequests < 1) {
sk.selector().wakeup();
enableRecv();
}
}
}
} catch(Exception e) {
LOG.warn("Unexpected exception. Destruction averted.", e);
}
}

process() watch事件响应

process()将发生的事件发送给对应的客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
synchronized public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}

// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();

sendResponse(h, e, "notification");
}

ZookeeperServer处理客户端请求

processPacket()

processPacket中大部分代码都是在处理权限校验,权限校验后响应;下次请求从ServerCnxn中获取权限校验后的信息。

请求处理submitRequest(si),这个是使用处理链去处理,根据不同子类而不同。详见leader请求处理链解析、FollowerZookeeperServer和ObserverZookeeperServer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) { // 请求头权限校验
LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if(ap != null) {
try {
authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());
} catch(RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn!= KeeperException.Code.OK) {
if (ap == null) {
LOG.warn("No authentication provider for scheme: "
+ scheme + " has "
+ ProviderRegistry.listProviders());
} else {
LOG.warn("Authentication failed for scheme: " + scheme);
}
// send a response...
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication succeeded for scheme: "
+ scheme);
}
LOG.info("auth success " + cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
}
return;
} else {
if (h.getType() == OpCode.sasl) { //请求权限校验
Record rsp = processSasl(incomingBuffer,cnxn);
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
return;
}
else {
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
submitRequest(si);
}
}
cnxn.incrOutstandingRequests(h);
}

参考

ZooKeeper-客户端连接ServerCnxn之NIOServerCnxn