zk解析(5):Follower
前言
节点未成为主节点且未设置为observer,此时该节点就处于Follower状态。
代码参考zk解析(1)-zk集群启动及选主:
org.apache.zookeeper.server.quorum.QuorumPeer#run
org.apache.zookeeper.server.quorum.Follower#followLeader
QuorumPeer#run处于FOLLOWING状态时执行follower.followLeader();
1
2
3
4
5
6
7
8
9
10
11
12
13 case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
setPeerState(ServerState.LOOKING);
}
break;
Follower类
继承关系
Follower继承了Learner类,先查阅zk解析(4)-Learner章节1
public class Follower extends Learner{}
构造器
1 | Follower(QuorumPeer self,FollowerZooKeeperServer zk) { |
followLeader()
- 计算选主耗时;
- 连接主节点;
- 向主节点注册并获取新的zxid;
- zxid校验;
- 同步主节点数据;
- 循环接收数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47void followLeader() throws InterruptedException {
self.end_fle = Time.currentElapsedTime();
long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);
LOG.info("FOLLOWING - LEADER ELECTION TOOK - {}", electionTimeTaken);
self.start_fle = 0;
self.end_fle = 0;
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
try {
QuorumServer leaderServer = findLeader();
try {
// 连接主节点
connectToLeader(leaderServer.addr, leaderServer.hostname);
// 请求主节点获取新的zxid
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
//check to see if the leader zxid is lower than ours
//this should never happen but is just a safety check
// zxid检查
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
if (newEpoch < self.getAcceptedEpoch()) {
LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
+ " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
throw new IOException("Error: Epoch of leader is lower");
}
// 同步主节点的历史数据
syncWithLeader(newEpochZxid);
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
} catch (Exception e) {
LOG.warn("Exception when following the leader", e);
try {
sock.close();
} catch (IOException e1) {
e1.printStackTrace();
}
// clear pending revalidations
pendingRevalidations.clear();
}
} finally {
zk.unregisterJMX((Learner)this);
}
}
processPacket
1 | protected void processPacket(QuorumPacket qp) throws IOException{ |