前言

ZookeeperServer类是非常关键的类,客户端发起的请求和服务间的请求都是处理都是交由ZookeeperServer来处理。

最关键的就是public void submitRequest(Request si),请求都是交由该方法处理。

1
2
3
4
5
public void submitRequest(Request si) {
// ... 省略
firstProcessor.processRequest(si);
// ... 省略
}

对于Leader、Learner和Observer来说处理请求的区别就在于setupRequestProcessors()方法初始的firstProcessor处理链。

FollowerZookeeperServer

继承关系

1
public class FollowerZooKeeperServer extends LearnerZooKeeperServer {}

成员变量

1
2
3
4
5
6
7
CommitProcessor commitProcessor; //两步提交管理

SyncRequestProcessor syncProcessor; //日志记录
/*
* Pending sync requests
*/
ConcurrentLinkedQueue<Request> pendingSyncs; //等待队列

构造器

1
2
3
4
5
6
FollowerZooKeeperServer(FileTxnSnapLog logFactory,QuorumPeer self,
DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
super(logFactory, self.tickTime, self.minSessionTimeout,
self.maxSessionTimeout, treeBuilder, zkDb, self);
this.pendingSyncs = new ConcurrentLinkedQueue<Request>();
}

方法

setupRequestProcessors()

FollowerRequestProcessor写请求转Leader -> CommitProcessor两步提交处理 -> FinalRequestProcessor执行请求

syncProcessor 用于记录日志

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true,
getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}

logRequest() 记录日志

调用SyncRequestProcessor来记录日志

1
2
3
4
5
6
7
8
9
10
11
public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(null, hdr.getClientId(), hdr.getCxid(),
hdr.getType(), null, null);
request.hdr = hdr;
request.txn = txn;
request.zxid = hdr.getZxid();
if ((request.zxid & 0xffffffffL) != 0) {
pendingTxns.add(request); // 保存请求到等待队列中
}
syncProcessor.processRequest(request);
}

commit() 提交请求

Follower服务器接收到Leader的提交请求后才会执行commit()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void commit(long zxid) {
if (pendingTxns.size() == 0) {
LOG.warn("Committing " + Long.toHexString(zxid)
+ " without seeing txn");
return;
}
long firstElementZxid = pendingTxns.element().zxid;
if (firstElementZxid != zxid) {
LOG.error("Committing zxid 0x" + Long.toHexString(zxid)
+ " but next pending txn 0x"
+ Long.toHexString(firstElementZxid));
System.exit(12);
}
Request request = pendingTxns.remove();
// 提交请求,走到最终的执行处理器FinalRequestProcessor
commitProcessor.commit(request);
}

同步完成sync()

在对数据一致性要求高的场合中需要读取从节点的数据与主节点一致,这个时候可以选择发送sync指令到节点,等待从节点完成主节点之前的请求。

这个sync()方法是在主节点完成所有请求后发给从节点执行的方法。

  1. 从节点收到客户端sync请求,在FollowerRequestProcessor和写请求一样处理;
  2. 主节点收到从节点的sync请求,如果当前其他写请求已完成则回复从节点;
  3. 从节点收到主节点的sync响应,执行sync()方法。
1
2
3
4
5
6
7
8
9
10
synchronized public void sync(){
if(pendingSyncs.size() ==0){
LOG.warn("Not expecting a sync.");
return;
}

Request r = pendingSyncs.remove();
//
commitProcessor.commit(r);
}

FollowerRequestProcessor类

FollowerRequestProcessor是在FollowerZookeeperServer中负责处理请求的处理器。

继承关系

1
2
public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements
RequestProcessor{}

成员变量

1
2
3
4
5
6
7
FollowerZooKeeperServer zks;  //ZookeeperServer

RequestProcessor nextProcessor; //下一个处理器

LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>(); //请求队列

boolean finished = false;

构造器

1
2
3
4
5
6
7
public FollowerRequestProcessor(FollowerZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("FollowerRequestProcessor:" + zks.getServerId(), zks
.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
}

方法

processRequest()

processRequest方法将请求存入队列,队列中的请求由run()在独立线程中处理。

1
2
3
4
5
public void processRequest(Request request) {
if (!finished) {
queuedRequests.add(request);
}
}

run()

先将请求交由下一个处理器(CommitProcessor),CommitProcessor将写请求保存在队列中返回,如果是写请求将请求发送给Leader服务器,Leader服务器在有过半服务器同意后响应commit请求,此时Follower调用Follower.commit()提交CommitProcessor中的请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void run() {
try {
while (!finished) {
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
// 给下一个处理器
nextProcessor.processRequest(request);

// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this follower has pending, so we
// add it to pendingSyncs.
// 写请求由Follower类处理,发给主节点
switch (request.type) {
case OpCode.sync: // 同步请求
zks.pendingSyncs.add(request);
zks.getFollower().request(request);
break;
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
case OpCode.multi:
zks.getFollower().request(request);
break;
}
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("FollowerRequestProcessor exited loop!");
}