前言

在第二篇Leader初始化解析中见到了主节点工作的类ZookeeperServer/LeaderZooKeeperServer,请求的处理工作是交给LeaderZooKeeperServer来处理的。

初始化

leader.lead() -> leader.startZkServer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private synchronized void startZkServer() {
// Update lastCommitted and Db's zxid to a value representing the new epoch
lastCommitted = zk.getZxid();
LOG.info("Have quorum of supporters, sids: [ "
+ getSidSetString(newLeaderProposal.ackSet)
+ " ]; starting up and setting last processed zxid: 0x{}",
Long.toHexString(zk.getZxid()));
zk.startup();
/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
* send leader election notifications to the ensemble.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
*/
self.updateElectionVote(getEpoch());

zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
}

其他调用

LearnerHandler.run()

由于zk有集群和单机的区别,所以ZookeeperServer在这两种情况是不一样的。集群环境使用的是LeaderZooKeeperServer。LeaderZooKeeperServer是继承QuorumZooKeeperServer,而QuorumZooKeeperServer继承了ZooKeeperServer,大部分的方法都是在ZooKeeperServer类中实现

ZookeeperServer 类

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
int minSessionTimeout, int maxSessionTimeout,
DataTreeBuilder treeBuilder, ZKDatabase zkDb) {
serverStats = new ServerStats(this);
this.txnLogFactory = txnLogFactory;
this.txnLogFactory.setServerStats(this.serverStats);
this.zkDb = zkDb;
this.tickTime = tickTime;
this.minSessionTimeout = minSessionTimeout;
this.maxSessionTimeout = maxSessionTimeout;

listener = new ZooKeeperServerListenerImpl(this);

LOG.info("Created server with tickTime " + tickTime
+ " minSessionTimeout " + getMinSessionTimeout()
+ " maxSessionTimeout " + getMaxSessionTimeout()
+ " datadir " + txnLogFactory.getDataDir()
+ " snapdir " + txnLogFactory.getSnapDir());
}

主要方法解析

startup()

1
2
3
4
5
6
7
8
9
10
11
12
public synchronized void startup() {
if (sessionTracker == null) {
createSessionTracker();
}
startSessionTracker();
setupRequestProcessors();

registerJMX();
// 初始化完成并通知所有等待中的线程
setState(State.RUNNING);
notifyAll();
}

初始SessionTracker createSessionTracker()

1
2
3
4
protected void createSessionTracker() {
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
tickTime, 1, getZooKeeperServerListener());
}

启动SessionTracker线程 startSessionTracker()

1
2
3
protected void startSessionTracker() {
((SessionTrackerImpl)sessionTracker).start();
}

初始请求处理器 setupRequestProcessors()

RequestProcessor是用于处理从节点收到请求后向主节点发起的请求,调用在

LearnerCnxAcceptor.run() -> LearnerHandler.run() -> leader.zk.submitRequest(si)

1
2
3
4
5
6
7
8
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}

提交请求 submitRequest()

主节点服务器接收其他服务器的请求(Leader.REQUEST),执行该方法处理请求. 发生在

LearnerHandler.run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public void run() {
// ... 省略
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if(type == OpCode.sync){
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
// 处理请求
leader.zk.submitRequest(si);
break;
// ... 省略
}

而请求的处理在submitRequest方法中是交由请求处理器链去处理,处理链类似于filter的形式

firstProcessor.processRequest(si);

1
2
3
4
5
6
private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
int xid, ByteBuffer bb, List<Id> authInfo) {
// 封装请求
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
submitRequest(si);
}

void submitRequest(Request si)方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void submitRequest(Request si) {
if (firstProcessor == null) {
synchronized (this) {// 等待初始化完成
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
//刷新session
touch(si.cnxn);
// 校验type
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
// 由请求处理器处理,处理器处理请求放在后面单独解析
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess(); // 计数器加一
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}

创建session createSession()

1
2
3
4
5
6
7
8
9
10
11
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
long sessionId = sessionTracker.createSession(timeout);
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
ByteBuffer to = ByteBuffer.allocate(4);
to.putInt(timeout);
cnxn.setSessionId(sessionId);
// 这里也调用了请求处理
submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
return sessionId;
}

设置session所属服务器 setOwner()

1
2
3
public void setOwner(long id, Object owner) throws SessionExpiredException {
sessionTracker.setOwner(id, owner);
}

刷新session touch()

1
2
3
4
5
6
7
8
9
10
11
12
13
void touch(ServerCnxn cnxn) throws MissingSessionException {
if (cnxn == null) {
return;
}
long id = cnxn.getSessionId();
int to = cnxn.getSessionTimeout();
// 更新session过期时间
if (!sessionTracker.touchSession(id, to)) {
throw new MissingSessionException(
"No session with sessionid 0x" + Long.toHexString(id)
+ " exists, probably expired and removed");
}
}

处理请求包 processPacket()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if(ap != null) {
try {
authReturn = ap.handleAuthentication(cnxn, authPacket.getAuth());
} catch(RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn!= KeeperException.Code.OK) {
if (ap == null) {
LOG.warn("No authentication provider for scheme: "
+ scheme + " has "
+ ProviderRegistry.listProviders());
} else {
LOG.warn("Authentication failed for scheme: " + scheme);
}
// send a response...
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication succeeded for scheme: "
+ scheme);
}
LOG.info("auth success " + cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
}
return;
} else {
if (h.getType() == OpCode.sasl) {
Record rsp = processSasl(incomingBuffer,cnxn);
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
return;
}
else {
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
submitRequest(si);
}
}
cnxn.incrOutstandingRequests(h);
}

submitRequest提交请求

public void submitRequest(Request si)方法中请求链

1
firstProcessor.processRequest(si);

首先查看处理链的初始化过程:

1
2
3
4
5
6
7
8
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}

根据初始化的代码可知firstProcessor的具体类型是PrepRequestProcessor

PrepRequestProcessor类-前置处理(读写请求区分)

PrepRequestProcessor类负责将请求处理封装

继承关系

1
2
public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
RequestProcessor

构造器

nextProcessor保存了下一个要执行的Processor

1
2
3
4
5
6
7
public PrepRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("ProcessThread(sid:" + zks.getServerId() + " cport:"
+ zks.getClientPort() + "):", zks.getZooKeeperServerListener());
this.nextProcessor = nextProcessor;
this.zks = zks;
}

处理请求

processRequest方法将请求保存到队列中(LinkedBlockingQueue),在run()方法中异步处理请求

1
2
3
4
public void processRequest(Request request) {
// request.addRQRec(">prep="+zks.outstandingChanges.size());
submittedRequests.add(request);
}

run()方法

run方法将队列中请求取出并交由pRequest()方法去处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public void run() {
try {
while (true) {
Request request = submittedRequests.take();
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
}
// 处理请求
pRequest(request);
}
} catch (RequestProcessorException e) {
if (e.getCause() instanceof XidRolloverException) {
LOG.info(e.getCause().getMessage());
}
handleException(this.getName(), e);
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
}

pRequest()处理请求

写请求生成request.hdr用于区分
根据request.type做处理:

  1. 其中create/delete/setData/check/setACL/createSession/closeSession的处理一致都是通过pRequest2Txn()方法进行校验并生成request.txn,并且此时会生成新的Zxid(也就是zxid递增)
  2. sync/exists/getData/getACL/getChildren/getChildren2/ping/setWatches不用生成Txn,只用校验serssion;
  3. multi是多个请求合并为一个原子操作,这里是将各个请求使用pRequest2Txn生成Txn.

以上处理后调用nextProcessor.processRequest(request),由处理链中的下一个处理器处理,根据初始请求处理器setupRequestProcessors()可知下一个处理器是SyncRequestProcessor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
protected void pRequest(Request request) throws RequestProcessorException {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
request.hdr = null;
request.txn = null;

try {
switch (request.type) {
case OpCode.create:
CreateRequest createRequest = new CreateRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
break;
case OpCode.delete:
DeleteRequest deleteRequest = new DeleteRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
break;
case OpCode.setData:
SetDataRequest setDataRequest = new SetDataRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
break;
case OpCode.setACL:
SetACLRequest setAclRequest = new SetACLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
break;
case OpCode.check:
CheckVersionRequest checkRequest = new CheckVersionRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
break;
case OpCode.multi:
MultiTransactionRecord multiRequest = new MultiTransactionRecord();
try {
ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
} catch(IOException e) {
request.hdr = new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
Time.currentWallTime(), OpCode.multi);
throw e;
}
List<Txn> txns = new ArrayList<Txn>();
//Each op in a multi-op must have the same zxid!
long zxid = zks.getNextZxid();
KeeperException ke = null;

//Store off current pending change records in case we need to rollback
HashMap<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);

int index = 0;
for(Op op: multiRequest) {
Record subrequest = op.toRequestRecord() ;

/* If we've already failed one of the ops, don't bother
* trying the rest as we know it's going to fail and it
* would be confusing in the logfiles.
*/
if (ke != null) {
request.hdr.setType(OpCode.error);
request.txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
}

/* Prep the request and convert to a Txn */
else {
try {
pRequest2Txn(op.getType(), zxid, request, subrequest, false);
} catch (KeeperException e) {
ke = e;
request.hdr.setType(OpCode.error);
request.txn = new ErrorTxn(e.code().intValue());
LOG.info("Got user-level KeeperException when processing "
+ request.toString() + " aborting remaining multi ops."
+ " Error Path:" + e.getPath()
+ " Error:" + e.getMessage());

request.setException(e);

/* Rollback change records from failed multi-op */
rollbackPendingChanges(zxid, pendingChanges);
}
}

//FIXME: I don't want to have to serialize it here and then
// immediately deserialize in next processor. But I'm
// not sure how else to get the txn stored into our list.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
request.txn.serialize(boa, "request") ;
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());

txns.add(new Txn(request.hdr.getType(), bb.array()));
index++;
}

request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
Time.currentWallTime(), request.type);
request.txn = new MultiTxn(txns);

break;

//create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
break;

//All the rest don't need to create a Txn - just verify session
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
break;
default:
LOG.warn("unknown type " + request.type);
break;
}
} catch (KeeperException e) {
if (request.hdr != null) {
request.hdr.setType(OpCode.error);
request.txn = new ErrorTxn(e.code().intValue());
}
LOG.info("Got user-level KeeperException when processing "
+ request.toString()
+ " Error Path:" + e.getPath()
+ " Error:" + e.getMessage());
request.setException(e);
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process " + request, e);

StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
if(bb != null){
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
} else {
sb.append("request buffer is null");
}

LOG.error("Dumping request buffer: 0x" + sb.toString());
if (request.hdr != null) {
request.hdr.setType(OpCode.error);
request.txn = new ErrorTxn(Code.MARSHALLINGERROR.intValue());
}
}
request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
}

SyncRequestProcessor类-日志记录和快照存储

SyncRequestProcessor的作用是将请求log记录到硬盘上,直到log完全记录才会执行下一个处理器;同时一定周期会将内存的数据生成快照保存到硬盘中。

继承关系

1
public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {}

构造器

1
2
3
4
5
6
7
8
public SyncRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("SyncThread:" + zks.getServerId(), zks
.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
running = true;
}

processRequest

processRequest与上一个处理器一样都是放进队列中交由线程(run()方法)处理。

1
2
3
4
public void processRequest(Request request) {
// request.addRQRec(">sync");
queuedRequests.add(request);
}

run

run方法中zks.getZKDatabase().append(si)将请求记录到日志中,在flush(toFlush)执行commit将日志写入到硬盘中,写入成功后执行下一个处理器的processRequest方法。

根据初始请求处理器setupRequestProcessors()可知下一个处理器是FinalRequestProcessor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Override
public void run() {
try {
int logCount = 0;

// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
setRandRoll(r.nextInt(snapCount/2));
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
setRandRoll(r.nextInt(snapCount/2));
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}

FinalRequestProcessor类-请求处理及响应

FinalRequestProcessor是最后的处理器,负责处理

继承关系

1
public class FinalRequestProcessor implements RequestProcessor{}

构造器

1
2
3
public FinalRequestProcessor(ZooKeeperServer zks) {
this.zks = zks;
}

processRequest

  1. request.hdr != null (hdr在PreRequest类中生成)代表写请求执行
    ZooKeeperServer.processTxn()->getZKDatabase().processTxn(hdr, txn);

    将写请求在内存中执行
  2. Request.isQuorum(request.type) 区分写操作,执行
    zks.getZKDatabase().addCommittedProposal(request);

    将内存中的数据写入到硬盘
  3. 清除小于当前请求zxid的请求;
    1. closeSession请求处理;
    2. ping请求处理,响应;
    3. createSession处理,创建session响应;
    4. multi处理,将多请求封装成MultiResponse;
    5. create/setData/setACL/check设置lastOp和Record对象(lastOp = “CREA”;rsp = new CreateResponse(rc.path);)
    6. exits/getData/getACL/getChildren/getChildren2设置lastOp,校验权限并查询数据封装Response
    7. setWatches设置lastOp,并设置对应的watches
  4. 获取DataTree上最后接受的提案Zxid
  5. 封装ReplyHeader
  6. 向从服务器发送响应(cnxn.sendResponse(hdr, rsp, “response”);// 请求头,响应,类型)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    public void processRequest(Request request) {
    if (LOG.isDebugEnabled()) {
    LOG.debug("Processing request:: " + request);
    }
    // request.addRQRec(">final");
    long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
    if (request.type == OpCode.ping) {
    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
    }
    if (LOG.isTraceEnabled()) {
    ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
    }
    ProcessTxnResult rc = null;
    synchronized (zks.outstandingChanges) {
    while (!zks.outstandingChanges.isEmpty()
    && zks.outstandingChanges.get(0).zxid <= request.zxid) {
    ChangeRecord cr = zks.outstandingChanges.remove(0);
    if (cr.zxid < request.zxid) {
    LOG.warn("Zxid outstanding "
    + cr.zxid
    + " is less than current " + request.zxid);
    }
    if (zks.outstandingChangesForPath.get(cr.path) == cr) {
    zks.outstandingChangesForPath.remove(cr.path);
    }
    }
    if (request.hdr != null) {
    TxnHeader hdr = request.hdr;
    Record txn = request.txn;

    rc = zks.processTxn(hdr, txn);
    }
    // do not add non quorum packets to the queue.
    if (Request.isQuorum(request.type)) {
    zks.getZKDatabase().addCommittedProposal(request);
    }
    }

    if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {
    ServerCnxnFactory scxn = zks.getServerCnxnFactory();
    // this might be possible since
    // we might just be playing diffs from the leader
    if (scxn != null && request.cnxn == null) {
    // calling this if we have the cnxn results in the client's
    // close session response being lost - we've already closed
    // the session/socket here before we can send the closeSession
    // in the switch block below
    scxn.closeSession(request.sessionId);
    return;
    }
    }

    if (request.cnxn == null) {
    return;
    }
    ServerCnxn cnxn = request.cnxn;

    String lastOp = "NA";
    zks.decInProcess();
    Code err = Code.OK;
    Record rsp = null;
    boolean closeSession = false;
    try {
    if (request.hdr != null && request.hdr.getType() == OpCode.error) {
    throw KeeperException.create(KeeperException.Code.get((
    (ErrorTxn) request.txn).getErr()));
    }

    KeeperException ke = request.getException();
    if (ke != null && request.type != OpCode.multi) {
    throw ke;
    }

    if (LOG.isDebugEnabled()) {
    LOG.debug("{}",request);
    }
    switch (request.type) {
    case OpCode.ping: {
    zks.serverStats().updateLatency(request.createTime);

    lastOp = "PING";
    cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
    request.createTime, Time.currentElapsedTime());

    cnxn.sendResponse(new ReplyHeader(-2,
    zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");
    return;
    }
    case OpCode.createSession: {
    zks.serverStats().updateLatency(request.createTime);

    lastOp = "SESS";
    cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
    request.createTime, Time.currentElapsedTime());

    zks.finishSessionInit(request.cnxn, true);
    return;
    }
    case OpCode.multi: {
    lastOp = "MULT";
    rsp = new MultiResponse() ;

    for (ProcessTxnResult subTxnResult : rc.multiResult) {

    OpResult subResult ;

    switch (subTxnResult.type) {
    case OpCode.check:
    subResult = new CheckResult();
    break;
    case OpCode.create:
    subResult = new CreateResult(subTxnResult.path);
    break;
    case OpCode.delete:
    subResult = new DeleteResult();
    break;
    case OpCode.setData:
    subResult = new SetDataResult(subTxnResult.stat);
    break;
    case OpCode.error:
    subResult = new ErrorResult(subTxnResult.err) ;
    break;
    default:
    throw new IOException("Invalid type of op");
    }

    ((MultiResponse)rsp).add(subResult);
    }

    break;
    }
    case OpCode.create: {
    lastOp = "CREA";
    rsp = new CreateResponse(rc.path);
    err = Code.get(rc.err);
    break;
    }
    case OpCode.delete: {
    lastOp = "DELE";
    err = Code.get(rc.err);
    break;
    }
    case OpCode.setData: {
    lastOp = "SETD";
    rsp = new SetDataResponse(rc.stat);
    err = Code.get(rc.err);
    break;
    }
    case OpCode.setACL: {
    lastOp = "SETA";
    rsp = new SetACLResponse(rc.stat);
    err = Code.get(rc.err);
    break;
    }
    case OpCode.closeSession: {
    lastOp = "CLOS";
    closeSession = true;
    err = Code.get(rc.err);
    break;
    }
    case OpCode.sync: {
    lastOp = "SYNC";
    SyncRequest syncRequest = new SyncRequest();
    ByteBufferInputStream.byteBuffer2Record(request.request,
    syncRequest);
    rsp = new SyncResponse(syncRequest.getPath());
    break;
    }
    case OpCode.check: {
    lastOp = "CHEC";
    rsp = new SetDataResponse(rc.stat);
    err = Code.get(rc.err);
    break;
    }
    case OpCode.exists: {
    lastOp = "EXIS";
    // TODO we need to figure out the security requirement for this!
    ExistsRequest existsRequest = new ExistsRequest();
    ByteBufferInputStream.byteBuffer2Record(request.request,
    existsRequest);
    String path = existsRequest.getPath();
    if (path.indexOf('\0') != -1) {
    throw new KeeperException.BadArgumentsException();
    }
    Stat stat = zks.getZKDatabase().statNode(path, existsRequest
    .getWatch() ? cnxn : null);
    rsp = new ExistsResponse(stat);
    break;
    }
    case OpCode.getData: {
    lastOp = "GETD";
    GetDataRequest getDataRequest = new GetDataRequest();
    ByteBufferInputStream.byteBuffer2Record(request.request,
    getDataRequest);
    DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
    if (n == null) {
    throw new KeeperException.NoNodeException();
    }
    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
    ZooDefs.Perms.READ,
    request.authInfo);
    Stat stat = new Stat();
    byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
    getDataRequest.getWatch() ? cnxn : null);
    rsp = new GetDataResponse(b, stat);
    break;
    }
    case OpCode.setWatches: {
    lastOp = "SETW";
    SetWatches setWatches = new SetWatches();
    // XXX We really should NOT need this!!!!
    request.request.rewind();
    ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
    long relativeZxid = setWatches.getRelativeZxid();
    zks.getZKDatabase().setWatches(relativeZxid,
    setWatches.getDataWatches(),
    setWatches.getExistWatches(),
    setWatches.getChildWatches(), cnxn);
    break;
    }
    case OpCode.getACL: {
    lastOp = "GETA";
    GetACLRequest getACLRequest = new GetACLRequest();
    ByteBufferInputStream.byteBuffer2Record(request.request,
    getACLRequest);
    Stat stat = new Stat();
    List<ACL> acl =
    zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);
    rsp = new GetACLResponse(acl, stat);
    break;
    }
    case OpCode.getChildren: {
    lastOp = "GETC";
    GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
    ByteBufferInputStream.byteBuffer2Record(request.request,
    getChildrenRequest);
    DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
    if (n == null) {
    throw new KeeperException.NoNodeException();
    }
    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
    ZooDefs.Perms.READ,
    request.authInfo);
    List<String> children = zks.getZKDatabase().getChildren(
    getChildrenRequest.getPath(), null, getChildrenRequest
    .getWatch() ? cnxn : null);
    rsp = new GetChildrenResponse(children);
    break;
    }
    case OpCode.getChildren2: {
    lastOp = "GETC";
    GetChildren2Request getChildren2Request = new GetChildren2Request();
    ByteBufferInputStream.byteBuffer2Record(request.request,
    getChildren2Request);
    Stat stat = new Stat();
    DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());
    if (n == null) {
    throw new KeeperException.NoNodeException();
    }
    PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
    ZooDefs.Perms.READ,
    request.authInfo);
    List<String> children = zks.getZKDatabase().getChildren(
    getChildren2Request.getPath(), stat, getChildren2Request
    .getWatch() ? cnxn : null);
    rsp = new GetChildren2Response(children, stat);
    break;
    }
    }
    } catch (SessionMovedException e) {
    // session moved is a connection level error, we need to tear
    // down the connection otw ZOOKEEPER-710 might happen
    // ie client on slow follower starts to renew session, fails
    // before this completes, then tries the fast follower (leader)
    // and is successful, however the initial renew is then
    // successfully fwd/processed by the leader and as a result
    // the client and leader disagree on where the client is most
    // recently attached (and therefore invalid SESSION MOVED generated)
    cnxn.sendCloseSession();
    return;
    } catch (KeeperException e) {
    err = e.code();
    } catch (Exception e) {
    // log at error level as we are returning a marshalling
    // error to the user
    LOG.error("Failed to process " + request, e);
    StringBuilder sb = new StringBuilder();
    ByteBuffer bb = request.request;
    bb.rewind();
    while (bb.hasRemaining()) {
    sb.append(Integer.toHexString(bb.get() & 0xff));
    }
    LOG.error("Dumping request buffer: 0x" + sb.toString());
    err = Code.MARSHALLINGERROR;
    }

    long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
    ReplyHeader hdr =
    new ReplyHeader(request.cxid, lastZxid, err.intValue());

    zks.serverStats().updateLatency(request.createTime);
    cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
    request.createTime, Time.currentElapsedTime());

    try {
    cnxn.sendResponse(hdr, rsp, "response");
    if (closeSession) {
    cnxn.sendCloseSession();
    }
    } catch (IOException e) {
    LOG.error("FIXMSG",e);
    }
    }

LeaderZooKeeperServer类

继承关系

1
public class LeaderZooKeeperServer extends QuorumZooKeeperServer {}

因为LeaderZooKeeperServer继承了QuorumZooKeeperServer,所以先看看QuorumZooKeeperServer类

QuorumZooKeeperServer类

继承关系

1
public abstract class QuorumZooKeeperServer extends ZooKeeperServer{}

方法

QuorumZooKeeperServer重写了两个方法,dumpConf变成打印,setState还是保存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public void dumpConf(PrintWriter pwriter) {
super.dumpConf(pwriter);

pwriter.print("initLimit=");
pwriter.println(self.getInitLimit());
pwriter.print("syncLimit=");
pwriter.println(self.getSyncLimit());
pwriter.print("electionAlg=");
pwriter.println(self.getElectionType());
pwriter.print("electionPort=");
pwriter.println(self.quorumPeers.get(self.getId()).electionAddr
.getPort());
pwriter.print("quorumPort=");
pwriter.println(self.quorumPeers.get(self.getId()).addr.getPort());
pwriter.print("peerType=");
pwriter.println(self.getLearnerType().ordinal());
}
@Override
protected void setState(State state) {
this.state = state;
}

构造器

1
2
3
4
5
LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self,
DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
super(logFactory, self.tickTime, self.minSessionTimeout,
self.maxSessionTimeout, treeBuilder, zkDb, self);
}

处理链

LeaderZooKeeperServer重写了处理链的初始化,使得处理流程与单机版不一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
finalProcessor, getLeader().toBeApplied);
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false,
getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
proposalProcessor.initialize();
firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
((PrepRequestProcessor)firstProcessor).start();
}

几个处理器在上一节ZookeeperServer.submitRequest提交请求中已经接下,接下来解析其它几个。

ProposalRequestProcessor类-发起判断提案

ProposalRequestProcessor向各个服务器发送提案,将提案交由CommitProcessor去处理(得到过半服务器响应后会回调CommitProcessor继续处理);同时调用SyncRequestProcessor将日志记录到硬盘中。

构造器

1
2
3
4
5
6
7
public ProposalRequestProcessor(LeaderZooKeeperServer zks,
RequestProcessor nextProcessor) {
this.zks = zks;
this.nextProcessor = nextProcessor;
AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
syncProcessor = new SyncRequestProcessor(zks, ackProcessor);
}

processRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void processRequest(Request request) throws RequestProcessorException {
// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
// request.type + " id = " + request.sessionId);
// request.addRQRec(">prop");

/* In the following IF-THEN-ELSE block, we process syncs on the leader.
* If the sync is coming from a follower, then the follower
* handler adds it to syncHandler. Otherwise, if it is a client of
* the leader that issued the sync command, then syncHandler won't
* contain the handler. In this case, we add it to syncHandler, and
* call processRequest on the next processor.
*/
// 同步请求
if(request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
// 调用下一个处理器CommitProcessor,此时CommitProcessor不会直接提交提案,会等待下面一步向所有服务器发起请求,
// 且过半服务器同意后才会继续执行这个请求
nextProcessor.processRequest(request);
if (request.hdr != null) {
// We need to sync and get consensus on any transactions
try {
// 这里发起向所有从服务器发起提案请求leader.propose() -> 统计同意数,更新LearnerHandler.run()
// -> leader.processAck() -> commitProcessor.commit(request)
// -> 此时commitProcessor线程继续执行
zks.getLeader().propose(request);
} catch (XidRolloverException e) {
throw new RequestProcessorException(e.getMessage(), e);
}
// 日志记录到硬盘
syncProcessor.processRequest(request);
}
}
}

CommitProcessor类-两步提交处理

构造器

1
2
3
4
5
6
public CommitProcessor(RequestProcessor nextProcessor, String id,
boolean matchSyncs, ZooKeeperServerListener listener) {
super("CommitProcessor:" + id, listener);
this.nextProcessor = nextProcessor;
this.matchSyncs = matchSyncs;
}

processRequest方法

processRequest()方法将requet放入队列

1
2
3
4
5
6
7
8
9
10
synchronized public void processRequest(Request request) {
// request.addRQRec(">commit");
if (LOG.isDebugEnabled()) {
LOG.debug("Processing request:: " + request);
}
if (!finished) {
queuedRequests.add(request);
notifyAll();
}
}

commit方法

提案通过过半服务器响应,回调commit方法执行提案.

LearnerHandler.run() -> leader.processAck() -> commitProcessor.commit(request)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
synchronized public void commit(Request request) {
if (!finished) {
if (request == null) {
LOG.warn("Committed a null!",
new Exception("committing a null! "));
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Committing request:: " + request);
}
committedRequests.add(request);
notifyAll();
}
}

run()

  1. create/delete/setData/multi/setACL/createSession/closeSession类型的请求会保存到nextPending中;
  2. 如果committedRequests需要提交的请求队列如果为空则等待;
  3. LearnerHandler.run() -> leader.processAck() -> commitProcessor.commit(request) 过半服务器同意的请求加入到committedRequests队列
  4. 将请求保存到toProcess队列中
  5. toProcess队列中的请求使用nextProcessor去执行。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    @Override
    public void run() {
    try {
    Request nextPending = null;
    while (!finished) {
    int len = toProcess.size();
    for (int i = 0; i < len; i++) {
    nextProcessor.processRequest(toProcess.get(i));
    }
    toProcess.clear();
    synchronized (this) {
    if ((queuedRequests.size() == 0 || nextPending != null)
    && committedRequests.size() == 0) {
    wait();
    continue;
    }
    // First check and see if the commit came in for the pending
    // request
    if ((queuedRequests.size() == 0 || nextPending != null)
    && committedRequests.size() > 0) {
    Request r = committedRequests.remove();
    /*
    * We match with nextPending so that we can move to the
    * next request when it is committed. We also want to
    * use nextPending because it has the cnxn member set
    * properly.
    */
    if (nextPending != null
    && nextPending.sessionId == r.sessionId
    && nextPending.cxid == r.cxid) {
    // we want to send our version of the request.
    // the pointer to the connection in the request
    nextPending.hdr = r.hdr;
    nextPending.txn = r.txn;
    nextPending.zxid = r.zxid;
    toProcess.add(nextPending);
    nextPending = null;
    } else {
    // this request came from someone else so just
    // send the commit packet
    toProcess.add(r);
    }
    }
    }

    // We haven't matched the pending requests, so go back to
    // waiting
    if (nextPending != null) {
    continue;
    }

    synchronized (this) {
    // Process the next requests in the queuedRequests
    while (nextPending == null && queuedRequests.size() > 0) {
    Request request = queuedRequests.remove();
    switch (request.type) {
    case OpCode.create:
    case OpCode.delete:
    case OpCode.setData:
    case OpCode.multi:
    case OpCode.setACL:
    case OpCode.createSession:
    case OpCode.closeSession:
    nextPending = request;
    break;
    case OpCode.sync:
    if (matchSyncs) {
    nextPending = request;
    } else {
    toProcess.add(request);
    }
    break;
    default:
    toProcess.add(request);
    }
    }
    }
    }
    } catch (InterruptedException e) {
    LOG.warn("Interrupted exception while waiting", e);
    } catch (Throwable e) {
    LOG.error("Unexpected exception causing CommitProcessor to exit", e);
    }
    LOG.info("CommitProcessor exited loop!");
    }

ToBeAppliedRequestProcessor类-清理Leader类中的提案

构造器

1
2
3
4
5
6
7
8
9
10
11
12
13
ToBeAppliedRequestProcessor(RequestProcessor next,
ConcurrentLinkedQueue<Proposal> toBeApplied) {
if (!(next instanceof FinalRequestProcessor)) {
throw new RuntimeException(ToBeAppliedRequestProcessor.class
.getName()
+ " must be connected to "
+ FinalRequestProcessor.class.getName()
+ " not "
+ next.getClass().getName());
}
this.toBeApplied = toBeApplied;
this.next = next;
}

processRequest方法

调用下一个处理器(FinalRequestProcessor)去处理,去除Leader.toBeApplied队列头的提案

1
2
3
4
5
6
7
8
9
public void processRequest(Request request) throws RequestProcessorException {
// request.addRQRec(">tobe");
next.processRequest(request);
Proposal p = toBeApplied.peek();
if (p != null && p.request != null
&& p.request.zxid == request.zxid) {
toBeApplied.remove();
}
}