zk解析(3):leader请求处理及处理链(ZookeeperServer/LeaderZooKeeperServer)
前言
在第二篇Leader初始化解析中见到了主节点工作的类ZookeeperServer/LeaderZooKeeperServer,请求的处理工作是交给LeaderZooKeeperServer来处理的。
初始化
leader.lead() -> leader.startZkServer()1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private synchronized void startZkServer() {
// Update lastCommitted and Db's zxid to a value representing the new epoch
lastCommitted = zk.getZxid();
LOG.info("Have quorum of supporters, sids: [ "
+ getSidSetString(newLeaderProposal.ackSet)
+ " ]; starting up and setting last processed zxid: 0x{}",
Long.toHexString(zk.getZxid()));
zk.startup();
/*
* Update the election vote here to ensure that all members of the
* ensemble report the same vote to new servers that start up and
* send leader election notifications to the ensemble.
*
* @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
*/
self.updateElectionVote(getEpoch());
zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
}
其他调用
LearnerHandler.run()
由于zk有集群和单机的区别,所以ZookeeperServer在这两种情况是不一样的。集群环境使用的是LeaderZooKeeperServer。LeaderZooKeeperServer是继承QuorumZooKeeperServer,而QuorumZooKeeperServer继承了ZooKeeperServer,大部分的方法都是在ZooKeeperServer类中实现
ZookeeperServer 类
构造器
1 | public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, |
主要方法解析
startup()
1 | public synchronized void startup() { |
初始SessionTracker createSessionTracker()
1 | protected void createSessionTracker() { |
启动SessionTracker线程 startSessionTracker()
1 | protected void startSessionTracker() { |
初始请求处理器 setupRequestProcessors()
RequestProcessor是用于处理从节点收到请求后向主节点发起的请求,调用在
LearnerCnxAcceptor.run() -> LearnerHandler.run() -> leader.zk.submitRequest(si)
1
2
3
4
5
6
7
8 protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
提交请求 submitRequest()
主节点服务器接收其他服务器的请求(Leader.REQUEST),执行该方法处理请求. 发生在
LearnerHandler.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public void run() {
// ... 省略
case Leader.REQUEST:
bb = ByteBuffer.wrap(qp.getData());
sessionId = bb.getLong();
cxid = bb.getInt();
type = bb.getInt();
bb = bb.slice();
Request si;
if(type == OpCode.sync){
si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
} else {
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
// 处理请求
leader.zk.submitRequest(si);
break;
// ... 省略
}
而请求的处理在submitRequest方法中是交由请求处理器链去处理,处理链类似于filter的形式
firstProcessor.processRequest(si);
1
2
3
4
5
6 private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
int xid, ByteBuffer bb, List<Id> authInfo) {
// 封装请求
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
submitRequest(si);
}
void submitRequest(Request si)方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42public void submitRequest(Request si) {
if (firstProcessor == null) {
synchronized (this) {// 等待初始化完成
try {
// Since all requests are passed to the request
// processor it should wait for setting up the request
// processor chain. The state will be updated to RUNNING
// after the setup.
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
//刷新session
touch(si.cnxn);
// 校验type
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
// 由请求处理器处理,处理器处理请求放在后面单独解析
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess(); // 计数器加一
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
创建session createSession()
1 | long createSession(ServerCnxn cnxn, byte passwd[], int timeout) { |
设置session所属服务器 setOwner()
1 | public void setOwner(long id, Object owner) throws SessionExpiredException { |
刷新session touch()
1 | void touch(ServerCnxn cnxn) throws MissingSessionException { |
处理请求包 processPacket()
1 | public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException { |
submitRequest提交请求
public void submitRequest(Request si)方法中请求链1
firstProcessor.processRequest(si);
首先查看处理链的初始化过程:1
2
3
4
5
6
7
8protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
根据初始化的代码可知firstProcessor的具体类型是PrepRequestProcessor
PrepRequestProcessor类-前置处理(读写请求区分)
PrepRequestProcessor类负责将请求处理封装
继承关系
1 | public class PrepRequestProcessor extends ZooKeeperCriticalThread implements |
构造器
nextProcessor保存了下一个要执行的Processor1
2
3
4
5
6
7public PrepRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("ProcessThread(sid:" + zks.getServerId() + " cport:"
+ zks.getClientPort() + "):", zks.getZooKeeperServerListener());
this.nextProcessor = nextProcessor;
this.zks = zks;
}
处理请求
processRequest方法将请求保存到队列中(LinkedBlockingQueue),在run()方法中异步处理请求1
2
3
4public void processRequest(Request request) {
// request.addRQRec(">prep="+zks.outstandingChanges.size());
submittedRequests.add(request);
}
run()方法
run方法将队列中请求取出并交由pRequest()方法去处理。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void run() {
try {
while (true) {
Request request = submittedRequests.take();
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
}
// 处理请求
pRequest(request);
}
} catch (RequestProcessorException e) {
if (e.getCause() instanceof XidRolloverException) {
LOG.info(e.getCause().getMessage());
}
handleException(this.getName(), e);
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
}
pRequest()处理请求
写请求生成request.hdr用于区分
根据request.type做处理:
- 其中create/delete/setData/check/setACL/createSession/closeSession的处理一致都是通过
pRequest2Txn()方法进行校验并生成request.txn,并且此时会生成新的Zxid(也就是zxid递增); - sync/exists/getData/getACL/getChildren/getChildren2/ping/setWatches不用生成Txn,只用校验serssion;
- multi是多个请求合并为一个原子操作,这里是将各个请求使用pRequest2Txn生成Txn.
以上处理后调用nextProcessor.processRequest(request),由处理链中的下一个处理器处理,根据初始请求处理器setupRequestProcessors()可知下一个处理器是SyncRequestProcessor。
1 | protected void pRequest(Request request) throws RequestProcessorException { |
SyncRequestProcessor类-日志记录和快照存储
SyncRequestProcessor的作用是将请求log记录到硬盘上,直到log完全记录才会执行下一个处理器;同时一定周期会将内存的数据生成快照保存到硬盘中。
继承关系
1 | public class SyncRequestProcessor extends ZooKeeperCriticalThread implements RequestProcessor {} |
构造器
1 | public SyncRequestProcessor(ZooKeeperServer zks, |
processRequest
processRequest与上一个处理器一样都是放进队列中交由线程(run()方法)处理。1
2
3
4public void processRequest(Request request) {
// request.addRQRec(">sync");
queuedRequests.add(request);
}
run
run方法中zks.getZKDatabase().append(si)将请求记录到日志中,在flush(toFlush)执行commit将日志写入到硬盘中,写入成功后执行下一个处理器的processRequest方法。
根据初始请求处理器setupRequestProcessors()可知下一个处理器是FinalRequestProcessor。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public void run() {
try {
int logCount = 0;
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
setRandRoll(r.nextInt(snapCount/2));
while (true) {
Request si = null;
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// track the number of records written to the log
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
setRandRoll(r.nextInt(snapCount/2));
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
nextProcessor.processRequest(si);
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
FinalRequestProcessor类-请求处理及响应
FinalRequestProcessor是最后的处理器,负责处理
继承关系
1 | public class FinalRequestProcessor implements RequestProcessor{} |
构造器
1 | public FinalRequestProcessor(ZooKeeperServer zks) { |
processRequest
- request.hdr != null (hdr在PreRequest类中生成)代表写请求执行
ZooKeeperServer.processTxn()->getZKDatabase().processTxn(hdr, txn);
将写请求在内存中执行 - Request.isQuorum(request.type) 区分写操作,执行
zks.getZKDatabase().addCommittedProposal(request);
将内存中的数据写入到硬盘 - 清除小于当前请求zxid的请求;
- closeSession请求处理;
- ping请求处理,响应;
- createSession处理,创建session响应;
- multi处理,将多请求封装成MultiResponse;
- create/setData/setACL/check设置lastOp和Record对象(lastOp = “CREA”;rsp = new CreateResponse(rc.path);)
- exits/getData/getACL/getChildren/getChildren2设置lastOp,校验权限并查询数据封装Response
- setWatches设置lastOp,并设置对应的watches
- 获取DataTree上最后接受的提案Zxid
- 封装ReplyHeader
- 向从服务器发送响应(cnxn.sendResponse(hdr, rsp, “response”);// 请求头,响应,类型)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313public void processRequest(Request request) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing request:: " + request);
}
// request.addRQRec(">final");
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
}
ProcessTxnResult rc = null;
synchronized (zks.outstandingChanges) {
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.get(0).zxid <= request.zxid) {
ChangeRecord cr = zks.outstandingChanges.remove(0);
if (cr.zxid < request.zxid) {
LOG.warn("Zxid outstanding "
+ cr.zxid
+ " is less than current " + request.zxid);
}
if (zks.outstandingChangesForPath.get(cr.path) == cr) {
zks.outstandingChangesForPath.remove(cr.path);
}
}
if (request.hdr != null) {
TxnHeader hdr = request.hdr;
Record txn = request.txn;
rc = zks.processTxn(hdr, txn);
}
// do not add non quorum packets to the queue.
if (Request.isQuorum(request.type)) {
zks.getZKDatabase().addCommittedProposal(request);
}
}
if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {
ServerCnxnFactory scxn = zks.getServerCnxnFactory();
// this might be possible since
// we might just be playing diffs from the leader
if (scxn != null && request.cnxn == null) {
// calling this if we have the cnxn results in the client's
// close session response being lost - we've already closed
// the session/socket here before we can send the closeSession
// in the switch block below
scxn.closeSession(request.sessionId);
return;
}
}
if (request.cnxn == null) {
return;
}
ServerCnxn cnxn = request.cnxn;
String lastOp = "NA";
zks.decInProcess();
Code err = Code.OK;
Record rsp = null;
boolean closeSession = false;
try {
if (request.hdr != null && request.hdr.getType() == OpCode.error) {
throw KeeperException.create(KeeperException.Code.get((
(ErrorTxn) request.txn).getErr()));
}
KeeperException ke = request.getException();
if (ke != null && request.type != OpCode.multi) {
throw ke;
}
if (LOG.isDebugEnabled()) {
LOG.debug("{}",request);
}
switch (request.type) {
case OpCode.ping: {
zks.serverStats().updateLatency(request.createTime);
lastOp = "PING";
cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
request.createTime, Time.currentElapsedTime());
cnxn.sendResponse(new ReplyHeader(-2,
zks.getZKDatabase().getDataTreeLastProcessedZxid(), 0), null, "response");
return;
}
case OpCode.createSession: {
zks.serverStats().updateLatency(request.createTime);
lastOp = "SESS";
cnxn.updateStatsForResponse(request.cxid, request.zxid, lastOp,
request.createTime, Time.currentElapsedTime());
zks.finishSessionInit(request.cnxn, true);
return;
}
case OpCode.multi: {
lastOp = "MULT";
rsp = new MultiResponse() ;
for (ProcessTxnResult subTxnResult : rc.multiResult) {
OpResult subResult ;
switch (subTxnResult.type) {
case OpCode.check:
subResult = new CheckResult();
break;
case OpCode.create:
subResult = new CreateResult(subTxnResult.path);
break;
case OpCode.delete:
subResult = new DeleteResult();
break;
case OpCode.setData:
subResult = new SetDataResult(subTxnResult.stat);
break;
case OpCode.error:
subResult = new ErrorResult(subTxnResult.err) ;
break;
default:
throw new IOException("Invalid type of op");
}
((MultiResponse)rsp).add(subResult);
}
break;
}
case OpCode.create: {
lastOp = "CREA";
rsp = new CreateResponse(rc.path);
err = Code.get(rc.err);
break;
}
case OpCode.delete: {
lastOp = "DELE";
err = Code.get(rc.err);
break;
}
case OpCode.setData: {
lastOp = "SETD";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.setACL: {
lastOp = "SETA";
rsp = new SetACLResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.closeSession: {
lastOp = "CLOS";
closeSession = true;
err = Code.get(rc.err);
break;
}
case OpCode.sync: {
lastOp = "SYNC";
SyncRequest syncRequest = new SyncRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
syncRequest);
rsp = new SyncResponse(syncRequest.getPath());
break;
}
case OpCode.check: {
lastOp = "CHEC";
rsp = new SetDataResponse(rc.stat);
err = Code.get(rc.err);
break;
}
case OpCode.exists: {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
existsRequest);
String path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
Stat stat = zks.getZKDatabase().statNode(path, existsRequest
.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
break;
}
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
}
case OpCode.setWatches: {
lastOp = "SETW";
SetWatches setWatches = new SetWatches();
// XXX We really should NOT need this!!!!
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(), cnxn);
break;
}
case OpCode.getACL: {
lastOp = "GETA";
GetACLRequest getACLRequest = new GetACLRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getACLRequest);
Stat stat = new Stat();
List<ACL> acl =
zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);
rsp = new GetACLResponse(acl, stat);
break;
}
case OpCode.getChildren: {
lastOp = "GETC";
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getChildrenRequest);
DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);
List<String> children = zks.getZKDatabase().getChildren(
getChildrenRequest.getPath(), null, getChildrenRequest
.getWatch() ? cnxn : null);
rsp = new GetChildrenResponse(children);
break;
}
case OpCode.getChildren2: {
lastOp = "GETC";
GetChildren2Request getChildren2Request = new GetChildren2Request();
ByteBufferInputStream.byteBuffer2Record(request.request,
getChildren2Request);
Stat stat = new Stat();
DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);
List<String> children = zks.getZKDatabase().getChildren(
getChildren2Request.getPath(), stat, getChildren2Request
.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);
break;
}
}
} catch (SessionMovedException e) {
// session moved is a connection level error, we need to tear
// down the connection otw ZOOKEEPER-710 might happen
// ie client on slow follower starts to renew session, fails
// before this completes, then tries the fast follower (leader)
// and is successful, however the initial renew is then
// successfully fwd/processed by the leader and as a result
// the client and leader disagree on where the client is most
// recently attached (and therefore invalid SESSION MOVED generated)
cnxn.sendCloseSession();
return;
} catch (KeeperException e) {
err = e.code();
} catch (Exception e) {
// log at error level as we are returning a marshalling
// error to the user
LOG.error("Failed to process " + request, e);
StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
LOG.error("Dumping request buffer: 0x" + sb.toString());
err = Code.MARSHALLINGERROR;
}
long lastZxid = zks.getZKDatabase().getDataTreeLastProcessedZxid();
ReplyHeader hdr =
new ReplyHeader(request.cxid, lastZxid, err.intValue());
zks.serverStats().updateLatency(request.createTime);
cnxn.updateStatsForResponse(request.cxid, lastZxid, lastOp,
request.createTime, Time.currentElapsedTime());
try {
cnxn.sendResponse(hdr, rsp, "response");
if (closeSession) {
cnxn.sendCloseSession();
}
} catch (IOException e) {
LOG.error("FIXMSG",e);
}
}
LeaderZooKeeperServer类
继承关系
1 | public class LeaderZooKeeperServer extends QuorumZooKeeperServer {} |
因为LeaderZooKeeperServer继承了QuorumZooKeeperServer,所以先看看QuorumZooKeeperServer类
QuorumZooKeeperServer类
继承关系
1 | public abstract class QuorumZooKeeperServer extends ZooKeeperServer{} |
方法
QuorumZooKeeperServer重写了两个方法,dumpConf变成打印,setState还是保存。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void dumpConf(PrintWriter pwriter) {
super.dumpConf(pwriter);
pwriter.print("initLimit=");
pwriter.println(self.getInitLimit());
pwriter.print("syncLimit=");
pwriter.println(self.getSyncLimit());
pwriter.print("electionAlg=");
pwriter.println(self.getElectionType());
pwriter.print("electionPort=");
pwriter.println(self.quorumPeers.get(self.getId()).electionAddr
.getPort());
pwriter.print("quorumPort=");
pwriter.println(self.quorumPeers.get(self.getId()).addr.getPort());
pwriter.print("peerType=");
pwriter.println(self.getLearnerType().ordinal());
}
protected void setState(State state) {
this.state = state;
}
构造器
1 | LeaderZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, |
处理链
LeaderZooKeeperServer重写了处理链的初始化,使得处理流程与单机版不一样1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
finalProcessor, getLeader().toBeApplied);
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Long.toString(getServerId()), false,
getZooKeeperServerListener());
commitProcessor.start();
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
proposalProcessor.initialize();
firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
几个处理器在上一节ZookeeperServer.submitRequest提交请求中已经接下,接下来解析其它几个。
ProposalRequestProcessor类-发起判断提案
ProposalRequestProcessor向各个服务器发送提案,将提案交由CommitProcessor去处理(得到过半服务器响应后会回调CommitProcessor继续处理);同时调用SyncRequestProcessor将日志记录到硬盘中。
构造器
1 | public ProposalRequestProcessor(LeaderZooKeeperServer zks, |
processRequest
1 | public void processRequest(Request request) throws RequestProcessorException { |
CommitProcessor类-两步提交处理
构造器
1 | public CommitProcessor(RequestProcessor nextProcessor, String id, |
processRequest方法
processRequest()方法将requet放入队列1
2
3
4
5
6
7
8
9
10synchronized public void processRequest(Request request) {
// request.addRQRec(">commit");
if (LOG.isDebugEnabled()) {
LOG.debug("Processing request:: " + request);
}
if (!finished) {
queuedRequests.add(request);
notifyAll();
}
}
commit方法
提案通过过半服务器响应,回调commit方法执行提案.
LearnerHandler.run() -> leader.processAck() -> commitProcessor.commit(request)1
2
3
4
5
6
7
8
9
10
11
12
13
14synchronized public void commit(Request request) {
if (!finished) {
if (request == null) {
LOG.warn("Committed a null!",
new Exception("committing a null! "));
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Committing request:: " + request);
}
committedRequests.add(request);
notifyAll();
}
}
run()
- create/delete/setData/multi/setACL/createSession/closeSession类型的请求会保存到nextPending中;
- 如果committedRequests需要提交的请求队列如果为空则等待;
- LearnerHandler.run() -> leader.processAck() -> commitProcessor.commit(request) 过半服务器同意的请求加入到committedRequests队列
- 将请求保存到toProcess队列中
- toProcess队列中的请求使用nextProcessor去执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public void run() {
try {
Request nextPending = null;
while (!finished) {
int len = toProcess.size();
for (int i = 0; i < len; i++) {
nextProcessor.processRequest(toProcess.get(i));
}
toProcess.clear();
synchronized (this) {
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() == 0) {
wait();
continue;
}
// First check and see if the commit came in for the pending
// request
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() > 0) {
Request r = committedRequests.remove();
/*
* We match with nextPending so that we can move to the
* next request when it is committed. We also want to
* use nextPending because it has the cnxn member set
* properly.
*/
if (nextPending != null
&& nextPending.sessionId == r.sessionId
&& nextPending.cxid == r.cxid) {
// we want to send our version of the request.
// the pointer to the connection in the request
nextPending.hdr = r.hdr;
nextPending.txn = r.txn;
nextPending.zxid = r.zxid;
toProcess.add(nextPending);
nextPending = null;
} else {
// this request came from someone else so just
// send the commit packet
toProcess.add(r);
}
}
}
// We haven't matched the pending requests, so go back to
// waiting
if (nextPending != null) {
continue;
}
synchronized (this) {
// Process the next requests in the queuedRequests
while (nextPending == null && queuedRequests.size() > 0) {
Request request = queuedRequests.remove();
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
nextPending = request;
break;
case OpCode.sync:
if (matchSyncs) {
nextPending = request;
} else {
toProcess.add(request);
}
break;
default:
toProcess.add(request);
}
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting", e);
} catch (Throwable e) {
LOG.error("Unexpected exception causing CommitProcessor to exit", e);
}
LOG.info("CommitProcessor exited loop!");
}
ToBeAppliedRequestProcessor类-清理Leader类中的提案
构造器
1 | ToBeAppliedRequestProcessor(RequestProcessor next, |
processRequest方法
调用下一个处理器(FinalRequestProcessor)去处理,去除Leader.toBeApplied队列头的提案1
2
3
4
5
6
7
8
9public void processRequest(Request request) throws RequestProcessorException {
// request.addRQRec(">tobe");
next.processRequest(request);
Proposal p = toBeApplied.peek();
if (p != null && p.request != null
&& p.request.zxid == request.zxid) {
toBeApplied.remove();
}
}