前言

ZookeeperServer类是非常关键的类,客户端发起的请求和服务间的请求都是处理都是交由ZookeeperServer来处理。

最关键的就是public void submitRequest(Request si),请求都是交由该方法处理。

1
2
3
4
5
public void submitRequest(Request si) {
// ... 省略
firstProcessor.processRequest(si);
// ... 省略
}

对于Leader、Learner和Observer来说处理请求的区别就在于setupRequestProcessors()方法初始的firstProcessor处理链。

ObserverZooKeeperServer

继承关系

1
public class ObserverZooKeeperServer extends LearnerZooKeeperServer {}

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled();
/*
* Request processors
*/
private CommitProcessor commitProcessor;
private SyncRequestProcessor syncProcessor;

/*
* Pending sync requests
*/
ConcurrentLinkedQueue<Request> pendingSyncs =
new ConcurrentLinkedQueue<Request>();

构造器

1
2
3
4
5
6
ObserverZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self,
DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
super(logFactory, self.tickTime, self.minSessionTimeout,
self.maxSessionTimeout, treeBuilder, zkDb, self);
LOG.info("syncEnabled =" + syncRequestProcessorEnabled);
}

方法

commitRequest()

commit提案,在接收到服务器的Leader.INFORM请求后提交。

1
2
3
4
5
6
7
public void commitRequest(Request request) {     
if (syncRequestProcessorEnabled) {
// Write to txnlog and take periodic snapshot
syncProcessor.processRequest(request);
}
commitProcessor.commit(request);
}

setupRequestProcessors()

ObserverRequestProcessor -> CommitProcessor -> FinalRequestProcessor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
protected void setupRequestProcessors() {
// We might consider changing the processor behaviour of
// Observers to, for example, remove the disk sync requirements.
// Currently, they behave almost exactly the same as followers.
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor,
Long.toString(getServerId()), true,
getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
((ObserverRequestProcessor) firstProcessor).start();

/*
* Observer should write to disk, so that the it won't request
* too old txn from the leader which may lead to getting an entire
* snapshot.
*
* However, this may degrade performance as it has to write to disk
* and do periodic snapshot which may double the memory requirements
*/
if (syncRequestProcessorEnabled) {
syncProcessor = new SyncRequestProcessor(this, null);
syncProcessor.start();
}
}

sync()

1
2
3
4
5
6
7
8
synchronized public void sync(){
if(pendingSyncs.size() ==0){
LOG.warn("Not expecting a sync.");
return;
}
Request r = pendingSyncs.remove();
commitProcessor.commit(r);
}

ObserverRequestProcessor类

继承关系

1
2
public class ObserverRequestProcessor extends ZooKeeperCriticalThread implements
RequestProcessor {}

成员变量

1
2
3
4
5
6
7
8
9
ObserverZooKeeperServer zks;

RequestProcessor nextProcessor;

// We keep a queue of requests. As requests get submitted they are
// stored here. The queue is drained in the run() method.
LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();

boolean finished = false;

构造器

1
2
3
4
5
6
7
public ObserverRequestProcessor(ObserverZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("ObserverRequestProcessor:" + zks.getServerId(), zks
.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
}

方法

processRequest()

1
2
3
4
5
public void processRequest(Request request) {
if (!finished) {
queuedRequests.add(request);
}
}

run()

ObserverRequestProcessor处理请求的逻辑与FollowerRequestProcessor基本一致,都是写请求转发给Leader服务器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override
public void run() {
try {
while (!finished) {
Request request = queuedRequests.take();
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
'F', request, "");
}
if (request == Request.requestOfDeath) {
break;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
// 下一个处理器CommitProcessor,写请求添加进队列等待提交请求
nextProcessor.processRequest(request);

// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this Observer has pending, so we
// add it to pendingSyncs.
// 写请求转发给Leader服务器
switch (request.type) {
case OpCode.sync:
zks.pendingSyncs.add(request);
zks.getObserver().request(request);
break;
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
case OpCode.multi:
zks.getObserver().request(request);
break;
}
}
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("ObserverRequestProcessor exited loop!");
}